/*
 * Decompiled with CFR 0.152.
 */
package org.lsst.ccs.messaging.util;

import java.util.EnumMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import org.lsst.ccs.bus.definition.Bus;
import org.lsst.ccs.bus.messages.BusMessage;
import org.lsst.ccs.messaging.TransportStateException;
import org.lsst.ccs.messaging.util.AbstractDispatcher;
import org.lsst.ccs.messaging.util.AlertGate;
import org.lsst.ccs.messaging.util.Dispatcher;
import org.lsst.ccs.messaging.util.KeyQueueExecutor;
import org.lsst.ccs.utilities.scheduler.BasicThreadFactory;
import org.lsst.ccs.utilities.taitime.CCSTimeStamp;

public class MultiQueueDispatcher
extends AbstractDispatcher {
    private volatile boolean off = true;
    private final AtomicLong taskID = new AtomicLong();
    private final EnumMap<Bus, Object> inExecutors;
    private final EnumMap<Bus, ExecutorService> outNormExecutors;
    private final ThreadPoolExecutor outOobCcExec;
    private final ThreadPoolExecutor oobExec;
    private final AlertGate[] gatesDuration;
    private final AlertGate[] gatesQueueSize;
    private final AtomicInteger[] queueSize;

    public MultiQueueDispatcher(String args) {
        super(args);
        String prefixIncomingBus;
        String prefixIncoming;
        int incoming;
        int[] inThreads = this.getIntArrayArg("inThreads", new int[]{1, 1, 8});
        this.gatesDuration = new AlertGate[2 * Bus.values().length * Dispatcher.Stage.values().length];
        int index = 0;
        String prefix = "duration/";
        for (incoming = 0; incoming < 2; ++incoming) {
            prefixIncoming = prefix + (incoming == 0 ? "in" : "out") + "/";
            for (Bus bus : Bus.values()) {
                prefixIncomingBus = prefixIncoming + bus + "/";
                for (Dispatcher.Stage stage : Dispatcher.Stage.values()) {
                    String prefixIncomingBusStage = prefixIncomingBus + (Object)((Object)stage);
                    int[] duration = this.getIntArrayArg(prefixIncomingBusStage, new int[0]);
                    this.gatesDuration[index++] = new AlertGate(this, "dispatcher/" + prefixIncomingBusStage, "High message processing time.", duration);
                }
            }
        }
        this.gatesQueueSize = new AlertGate[2 * Bus.values().length];
        index = 0;
        prefix = "queue/";
        for (incoming = 0; incoming < 2; ++incoming) {
            prefixIncoming = prefix + (incoming == 0 ? "in" : "out") + "/";
            for (Bus bus : Bus.values()) {
                prefixIncomingBus = prefixIncoming + bus;
                int[] size = this.getIntArrayArg(prefixIncomingBus, new int[0]);
                this.gatesQueueSize[index++] = new AlertGate(this, "dispatcher/" + prefixIncomingBus, "Long message queue.", size);
            }
        }
        this.inExecutors = new EnumMap(Bus.class);
        for (Bus bus : Bus.values()) {
            int n = inThreads[bus.ordinal()];
            if (n == 1) {
                this.inExecutors.put(bus, (Object)Executors.newSingleThreadExecutor((ThreadFactory)((Object)new TFactory("MESSAGING_IN_" + bus))));
                continue;
            }
            this.inExecutors.put(bus, (Object)new KeyQueueExecutor("MESSAGING_IN_" + bus, n));
        }
        this.outNormExecutors = new EnumMap(Bus.class);
        for (Bus bus : Bus.values()) {
            this.outNormExecutors.put(bus, Executors.newSingleThreadExecutor((ThreadFactory)((Object)new TFactory("MESSAGING_OUT_" + bus))));
        }
        this.outOobCcExec = new ThreadPoolExecutor(2, 2, 70L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)((Object)new TFactory("MESSAGING_OUT_OOB_CC")));
        this.outOobCcExec.allowCoreThreadTimeOut(true);
        this.oobExec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 70L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), (ThreadFactory)((Object)new TFactory("MESSAGING_OOB")));
        int n = 2 * Bus.values().length;
        this.queueSize = new AtomicInteger[n];
        for (int i = 0; i < n; ++i) {
            this.queueSize[i] = new AtomicInteger(0);
        }
        this.config = null;
    }

    @Override
    public void initialize() {
        this.off = false;
    }

    @Override
    public void shutdown() {
        this.off = true;
        this.oobExec.shutdown();
        this.outOobCcExec.shutdown();
        this.outNormExecutors.values().forEach(exec -> exec.shutdown());
        this.inExecutors.values().forEach(exec -> {
            if (exec instanceof KeyQueueExecutor) {
                ((KeyQueueExecutor)exec).shutdown();
            } else {
                ((ExecutorService)exec).shutdown();
            }
        });
        try {
            this.oobExec.awaitTermination(1L, TimeUnit.MINUTES);
            this.outOobCcExec.awaitTermination(1L, TimeUnit.MINUTES);
            for (ExecutorService executorService : this.outNormExecutors.values()) {
                executorService.awaitTermination(1L, TimeUnit.MINUTES);
            }
            for (Object object : this.inExecutors.values()) {
                if (object instanceof KeyQueueExecutor) {
                    ((KeyQueueExecutor)object).awaitTermination(1L, TimeUnit.MINUTES);
                    continue;
                }
                ((ExecutorService)object).awaitTermination(1L, TimeUnit.MINUTES);
            }
        }
        catch (InterruptedException x) {
            Thread.currentThread().interrupt();
        }
        super.shutdown();
    }

    @Override
    public void in(Runnable run, Bus bus, String ... agents) {
        if (this.off) {
            return;
        }
        Dispatcher.Order order = agents == null ? Dispatcher.Order.OOB : (agents.length == 0 ? Dispatcher.Order.OOB_CC : Dispatcher.Order.NORM);
        TaskRunner runner = new TaskRunner(run, true, bus, order);
        this.stageEnded(runner, Dispatcher.Stage.START);
        try {
            switch (order) {
                case NORM: 
                case OOB_CC: {
                    Object exec = this.inExecutors.get(bus);
                    if (exec instanceof KeyQueueExecutor) {
                        ((KeyQueueExecutor)exec).execute(runner, agents);
                        break;
                    }
                    ((ExecutorService)exec).execute(runner);
                    break;
                }
                case OOB: {
                    this.oobExec.execute(runner);
                }
            }
            int queueSizeIndex = 0 + bus.ordinal();
            int size = this.queueSize[queueSizeIndex].incrementAndGet();
            this.gatesQueueSize[queueSizeIndex].check(size);
        }
        catch (RejectedExecutionException x) {
            throw new TransportStateException(x);
        }
        finally {
            this.stageEnded(runner, Dispatcher.Stage.SUBMIT);
        }
    }

    @Override
    public void out(Runnable run, Bus bus, Dispatcher.Order order) {
        if (this.off) {
            throw new TransportStateException();
        }
        TaskRunner runner = new TaskRunner(run, false, bus, order);
        this.stageEnded(runner, Dispatcher.Stage.START);
        try {
            switch (order) {
                case NORM: {
                    this.outNormExecutors.get(bus).execute(runner);
                    break;
                }
                case OOB_CC: {
                    this.outOobCcExec.execute(runner);
                    break;
                }
                case OOB: {
                    this.oobExec.execute(runner);
                }
            }
            int queueSizeIndex = 1 + bus.ordinal();
            int size = this.queueSize[queueSizeIndex].incrementAndGet();
            this.gatesQueueSize[queueSizeIndex].check(size);
        }
        catch (RejectedExecutionException x) {
            throw new TransportStateException();
        }
        finally {
            this.stageEnded(runner, Dispatcher.Stage.SUBMIT);
        }
    }

    private void stageEnded(TaskRunner runner, Dispatcher.Stage stage) {
        int time = (int)(System.currentTimeMillis() - runner.refTime);
        runner.setTime(stage, time);
        int duration = 0;
        switch (stage) {
            case START: {
                if (runner.busMessage != null) {
                    if (runner.incoming) {
                        runner.busMessage.setIncomingQueueInTimeStamp(CCSTimeStamp.currentTime());
                    } else {
                        runner.busMessage.setOutgoingQueueInTimeStamp(CCSTimeStamp.currentTime());
                    }
                }
                duration = time;
                break;
            }
            case WAIT: {
                if (runner.busMessage != null) {
                    if (runner.incoming) {
                        runner.busMessage.setIncomingQueueOutTimeStamp(CCSTimeStamp.currentTime());
                    } else {
                        runner.busMessage.setOutgoingQueueOutTimeStamp(CCSTimeStamp.currentTime());
                    }
                }
                duration = time - runner.getTime(Dispatcher.Stage.START);
                break;
            }
            case RUN: {
                duration = time - runner.getTime(Dispatcher.Stage.WAIT);
                break;
            }
            case SUBMIT: {
                duration = time - runner.getTime(Dispatcher.Stage.START);
            }
        }
        int i = (runner.incoming ? 0 : 1) * (Bus.values().length * Dispatcher.Stage.values().length) + runner.bus.ordinal() * Dispatcher.Stage.values().length + stage.ordinal();
        this.gatesDuration[i].check(duration);
        if (runner.runnable instanceof Dispatcher.Task) {
            ((Dispatcher.Task)runner.runnable).stageEnded(stage);
        }
    }

    private class TaskRunner
    implements Runnable {
        protected final long id;
        private final Runnable runnable;
        private final boolean incoming;
        private final Bus bus;
        private final Dispatcher.Order order;
        private final BusMessage busMessage;
        private final long refTime;
        private final int[] stageTime;

        TaskRunner(Runnable runnable, boolean incoming, Bus bus, Dispatcher.Order order) {
            this.id = MultiQueueDispatcher.this.taskID.getAndIncrement();
            this.stageTime = new int[Dispatcher.Stage.values().length];
            this.runnable = runnable;
            this.incoming = incoming;
            this.bus = bus;
            this.order = order;
            if (runnable instanceof AbstractDispatcher.Task) {
                AbstractDispatcher.Task task = (AbstractDispatcher.Task)runnable;
                this.busMessage = task.getBusMessage();
                this.refTime = task.getRefTime();
            } else {
                this.busMessage = null;
                this.refTime = System.currentTimeMillis();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            int queueSizeIndex = (this.incoming ? 0 : 1) + this.bus.ordinal();
            int size = MultiQueueDispatcher.this.queueSize[queueSizeIndex].decrementAndGet();
            MultiQueueDispatcher.this.gatesQueueSize[queueSizeIndex].check(size);
            MultiQueueDispatcher.this.stageEnded(this, Dispatcher.Stage.WAIT);
            if (this.busMessage != null) {
                if (this.incoming) {
                    this.busMessage.setIncomingQueueOutTimeStamp(CCSTimeStamp.currentTime());
                } else {
                    this.busMessage.setOutgoingQueueOutTimeStamp(CCSTimeStamp.currentTime());
                }
            }
            try {
                this.runnable.run();
            }
            catch (RuntimeException x) {
                String warningMessage = "Exception " + (this.incoming ? "receiving" : "sending") + " message ";
                if (this.busMessage != null) {
                    warningMessage = warningMessage + this.busMessage.getClass().getSimpleName() + " (" + this.busMessage.getClassName() + ") ";
                    if (this.incoming) {
                        warningMessage = warningMessage + "from " + this.busMessage.getOriginAgentInfo().getName() + " ";
                    }
                } else {
                    warningMessage = warningMessage + "null ";
                }
                warningMessage = warningMessage + "on " + this.bus + " bus, (" + (Object)((Object)this.order) + ")";
                MultiQueueDispatcher.this.getLogger().log(Level.WARNING, warningMessage, x);
            }
            finally {
                MultiQueueDispatcher.this.stageEnded(this, Dispatcher.Stage.RUN);
            }
        }

        long getID() {
            return this.id;
        }

        int getTime(Dispatcher.Stage stage) {
            return this.stageTime[stage.ordinal()];
        }

        void setTime(Dispatcher.Stage stage, int time) {
            this.stageTime[stage.ordinal()] = time;
        }
    }

    private class TFactory
    extends BasicThreadFactory {
        TFactory(String name) {
            super(name, null, true);
        }

        public Thread newThread(Runnable r) {
            Thread thread = super.newThread(r);
            thread.setUncaughtExceptionHandler((t, x) -> MultiQueueDispatcher.this.getLogger().log(Level.WARNING, "Exception thrown from messaging executor: " + thread.getName(), x));
            return thread;
        }
    }
}

