/*
 * Decompiled with CFR 0.152.
 */
package org.lsst.ccs.subsystem.cluster.monitor;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.time.Duration;
import org.lsst.ccs.Agent;
import org.lsst.ccs.bus.data.AgentInfo;
import org.lsst.ccs.bus.data.KeyValueData;
import org.lsst.ccs.bus.messages.CommandMessage;
import org.lsst.ccs.bus.messages.LogMessage;
import org.lsst.ccs.bus.messages.StatusMessage;
import org.lsst.ccs.commons.annotations.LookupField;
import org.lsst.ccs.framework.AgentPeriodicTask;
import org.lsst.ccs.framework.HasLifecycle;
import org.lsst.ccs.messaging.CommandMessageListener;
import org.lsst.ccs.messaging.LogMessageListener;
import org.lsst.ccs.messaging.StatusMessageListener;
import org.lsst.ccs.subsystem.cluster.monitor.Accumulate;
import org.lsst.ccs.subsystem.cluster.monitor.Counter;

public class BusTrafficMonitor
implements HasLifecycle {
    private static final Object counterLock = new Object();
    private static Counter counter = new Counter();
    private static Accumulate sizeAccumulator = new Accumulate();
    private static Accumulate timeAccumulator = new Accumulate();
    @LookupField(strategy=LookupField.Strategy.TOP)
    private Agent agent;

    public void build() {
        AgentPeriodicTask resetCounterAndPublish = new AgentPeriodicTask("busTrafficStatsUpdate", () -> this.resetCounterAndPublish()).withPeriod(Duration.ofSeconds(60L));
        this.agent.getAgentPeriodicTaskService().scheduleAgentPeriodicTask(resetCounterAndPublish);
    }

    public void start() {
        this.agent.getMessagingAccess().addCommandMessageListener((CommandMessageListener)new CommandBusCounterListener());
        this.agent.getMessagingAccess().addStatusMessageListener((StatusMessageListener)new StatusBusCounterListener());
        this.agent.getMessagingAccess().addLogMessageListener((LogMessageListener)new LogBusCounterListener());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void resetCounterAndPublish() {
        Counter oldCounter = counter;
        Accumulate oldSizeAccumulator = sizeAccumulator;
        Accumulate oldTimeAccumulator = timeAccumulator;
        Object object = counterLock;
        synchronized (object) {
            counter = new Counter();
            sizeAccumulator = new Accumulate();
            timeAccumulator = new Accumulate();
        }
        KeyValueData counterData = new KeyValueData("traffic", (Serializable)((Object)oldCounter.getCountersMap()));
        this.agent.publishSubsystemDataOnStatusBus(counterData);
        KeyValueData sizeData = new KeyValueData("size", (Serializable)((Object)oldSizeAccumulator.getAverageMap()));
        this.agent.publishSubsystemDataOnStatusBus(sizeData);
        KeyValueData timeData = new KeyValueData("time", (Serializable)((Object)oldTimeAccumulator.getAverageMap()));
        this.agent.publishSubsystemDataOnStatusBus(timeData);
    }

    public static long sizeOf(Serializable object) {
        ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
        try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteOutputStream);){
            objectOutputStream.writeObject(object);
            objectOutputStream.flush();
        }
        catch (IOException ieo) {
            return 0L;
        }
        return byteOutputStream.toByteArray().length;
    }

    private static class StatusBusCounterListener
    implements StatusMessageListener {
        String busName = "Status";

        private StatusBusCounterListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onStatusMessage(StatusMessage msg) {
            long deltaT = System.currentTimeMillis() - msg.getTimeStamp();
            String busKey = msg.getOriginAgentInfo().getType().ordinal() >= AgentInfo.AgentType.WORKER.ordinal() ? msg.getOriginAgentInfo().getName() : "client";
            busKey = busKey + "/" + this.busName;
            String messageKey = busKey + "/" + msg.getClass().getSimpleName();
            String busMessageTypeKey = this.busName + "/" + msg.getClass().getSimpleName();
            double size = BusTrafficMonitor.sizeOf((Serializable)msg);
            Object object = counterLock;
            synchronized (object) {
                counter.increment(this.busName + "/all");
                counter.increment(busKey + "/all");
                counter.increment(busMessageTypeKey);
                counter.increment(messageKey);
                sizeAccumulator.accumulate(size, this.busName + "/all");
                sizeAccumulator.accumulate(size, busMessageTypeKey);
                sizeAccumulator.accumulate(size, busKey + "/all");
                sizeAccumulator.accumulate(size, messageKey);
                timeAccumulator.accumulate(deltaT, this.busName + "/all");
                timeAccumulator.accumulate(deltaT, busMessageTypeKey);
                timeAccumulator.accumulate(deltaT, busKey + "/all");
                timeAccumulator.accumulate(deltaT, messageKey);
            }
        }
    }

    private static class LogBusCounterListener
    implements LogMessageListener {
        String busName = "Log";

        private LogBusCounterListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onLogMessage(LogMessage msg) {
            long deltaT = System.currentTimeMillis() - msg.getTimeStamp();
            String busKey = msg.getOriginAgentInfo().getType().ordinal() >= AgentInfo.AgentType.WORKER.ordinal() ? msg.getOriginAgentInfo().getName() : "client";
            busKey = busKey + "/" + this.busName;
            double size = BusTrafficMonitor.sizeOf((Serializable)msg);
            Object object = counterLock;
            synchronized (object) {
                counter.increment(this.busName + "/all");
                counter.increment(busKey);
                sizeAccumulator.accumulate(size, this.busName + "/all");
                sizeAccumulator.accumulate(size, busKey);
                timeAccumulator.accumulate(deltaT, this.busName + "/all");
                timeAccumulator.accumulate(deltaT, busKey);
            }
        }
    }

    private static class CommandBusCounterListener
    implements CommandMessageListener {
        String busName = "Command";

        private CommandBusCounterListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onCommandMessage(CommandMessage msg) {
            long deltaT = System.currentTimeMillis() - msg.getTimeStamp();
            String busKey = msg.getOriginAgentInfo().getType().ordinal() >= AgentInfo.AgentType.WORKER.ordinal() ? msg.getOriginAgentInfo().getName() : "client";
            busKey = busKey + "/" + this.busName;
            String busMessageTypeKey = this.busName + "/" + msg.getClass().getSimpleName();
            String messageKey = busKey + "/" + msg.getClass().getSimpleName();
            double size = BusTrafficMonitor.sizeOf((Serializable)msg);
            Object object = counterLock;
            synchronized (object) {
                counter.increment(this.busName + "/all");
                counter.increment(busMessageTypeKey);
                counter.increment(busKey + "/all");
                counter.increment(messageKey);
                sizeAccumulator.accumulate(size, this.busName + "/all");
                sizeAccumulator.accumulate(size, busMessageTypeKey);
                sizeAccumulator.accumulate(size, busKey + "/all");
                sizeAccumulator.accumulate(size, messageKey);
                timeAccumulator.accumulate(deltaT, this.busName + "/all");
                timeAccumulator.accumulate(deltaT, busMessageTypeKey);
                timeAccumulator.accumulate(deltaT, busKey + "/all");
                timeAccumulator.accumulate(deltaT, messageKey);
            }
        }
    }
}

