/*
 * Decompiled with CFR 0.152.
 */
package org.lsst.ccs.subsystem.ocsbridge;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.lsst.ccs.Subsystem;
import org.lsst.ccs.bus.data.AgentInfo;
import org.lsst.ccs.bus.data.DataProviderInfo;
import org.lsst.ccs.bus.data.KeyValueData;
import org.lsst.ccs.bus.data.KeyValueDataList;
import org.lsst.ccs.bus.messages.BusMessage;
import org.lsst.ccs.bus.messages.StatusMessage;
import org.lsst.ccs.bus.messages.StatusSubsystemData;
import org.lsst.ccs.camera.Camera;
import org.lsst.ccs.camera.kafka.avro.AvroSchemaMapper;
import org.lsst.ccs.camera.kafka.avro.GenericRecordManager;
import org.lsst.ccs.camera.kafka.avro.KafkaBridge;
import org.lsst.ccs.camera.kafka.avro.RequestResult;
import org.lsst.ccs.camera.kafka.avro.Topic;
import org.lsst.ccs.camera.sal.classes.DataProviderInfoUtils;
import org.lsst.ccs.commons.annotations.ConfigurationParameter;
import org.lsst.ccs.commons.annotations.LookupField;
import org.lsst.ccs.framework.AgentPeriodicTask;
import org.lsst.ccs.framework.HasLifecycle;
import org.lsst.ccs.messaging.BusMessageFilterFactory;
import org.lsst.ccs.messaging.StatusMessageListener;
import org.lsst.ccs.services.AgentPeriodicTaskService;
import org.lsst.ccs.services.DataProviderDictionaryService;
import org.lsst.ccs.services.InfluxDbClientService;
import org.lsst.ccs.utilities.scheduler.Scheduler;

public class KafkaService
extends Subsystem
implements HasLifecycle,
DataProviderDictionaryService.DataProviderDictionaryListener,
StatusMessageListener {
    private static final Logger LOG = Logger.getLogger(KafkaService.class.getName());
    @LookupField(strategy=LookupField.Strategy.TOP)
    private Subsystem agent;
    @LookupField(strategy=LookupField.Strategy.TREE)
    private DataProviderDictionaryService dpdService;
    @LookupField(strategy=LookupField.Strategy.TREE)
    protected AgentPeriodicTaskService periodicTaskService;
    @LookupField(strategy=LookupField.Strategy.TREE)
    private InfluxDbClientService influxDbClientService;
    @LookupField(strategy=LookupField.Strategy.TREE)
    private KafkaBridge kafkaBridge;
    @ConfigurationParameter
    private volatile Camera device = Camera.MAIN_CAMERA;
    @ConfigurationParameter(description="The maximum number of records published for each topic", units="unitless")
    private volatile int maxNumberOfPublishedRecords = 50;
    private volatile GenericRecordManager recordManager;
    private final Object recordManagerLock = new Object();
    private AvroSchemaMapper mapper;
    private final Object topicRegistrationLock = new Object();
    private final Set<AgentInfo> relevantAgentInfo = new CopyOnWriteArraySet<AgentInfo>();
    private Scheduler kafkaScheduler;
    private List<String> ignoredPaths = new ArrayList<String>();
    private List<String> registeredTopics;
    private final Predicate<BusMessage> isRelevantAgent = bm -> {
        AgentInfo agentInfo = bm.getOriginAgentInfo();
        return this.relevantAgentInfo.contains(agentInfo);
    };
    private final Map<String, DataAccumulator> timeAccumMap = new ConcurrentHashMap<String, DataAccumulator>();
    private final Map<String, DataAccumulator> nRecordsAccumMap = new ConcurrentHashMap<String, DataAccumulator>();
    private final Map<String, DataAccumulator> sizeAccumMap = new ConcurrentHashMap<String, DataAccumulator>();
    private static final Object dataProcessingLock = new Object();

    public KafkaService() {
        super("kafka-service", AgentInfo.AgentType.SERVICE);
    }

    public void build() {
        this.kafkaScheduler = this.periodicTaskService.createScheduler("kafka-publication", 100);
        this.periodicTaskService.scheduleAgentPeriodicTask(new AgentPeriodicTask("kafka-influx-statistics", () -> this.processKafkaProxyData()).withIsFixedRate(true).withPeriod(Duration.ofSeconds(5L)));
    }

    public void postBuild() {
        if (this.kafkaBridge.supportsAggregateRecordsPublication()) {
            this.periodicTaskService.scheduleAgentPeriodicTask(new AgentPeriodicTask("publish-to-kafka", () -> this.resetRecordManagerAndPublishToKafka()).withIsFixedRate(true).withPeriod(Duration.ofSeconds(5L)));
        }
        this.periodicTaskService.scheduleAgentPeriodicTask(new AgentPeriodicTask("kafka-heartbeat", () -> this.publishHeartBeat()).withIsFixedRate(true).withPeriod(Duration.ofSeconds(1L)));
    }

    public void init() {
        LOG.log(Level.INFO, "Initializing KafkaService for Camera: {0}", this.device);
        this.mapper = new AvroSchemaMapper(this.device);
        this.dpdService.addDataProviderDictionaryListener((DataProviderDictionaryService.DataProviderDictionaryListener)this);
        this.agent.getMessagingAccess().addStatusMessageListener((StatusMessageListener)this, BusMessageFilterFactory.messageClass(StatusSubsystemData.class).and(this.isRelevantAgent));
        this.recordManager = new GenericRecordManager(this.mapper);
    }

    public void postInit() {
        try {
            this.registeredTopics = this.kafkaBridge.listTopics();
            LOG.log(Level.INFO, "Established connection to Kafka Bridge.\nExisting topis: {0}", this.registeredTopics);
        }
        catch (Exception e) {
            throw new RuntimeException("Could not fetch list of registered topics.", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onStatusMessage(StatusMessage msg) {
        StatusSubsystemData ssd = (StatusSubsystemData)msg;
        String agentName = msg.getOriginAgentInfo().getName();
        KeyValueDataList kvdl = ssd.getEncodedData();
        Object object = this.recordManagerLock;
        synchronized (object) {
            for (KeyValueData d : kvdl) {
                long timestamp;
                String fullPath = agentName + "/" + d.getKey();
                if (this.ignoredPaths.contains(fullPath)) continue;
                Serializable value = d.getValue();
                if (fullPath.endsWith("/state") && ((String)((Object)value)).equals("OFF_LINE") || this.recordManager.addKeyValueForPath(fullPath, (Object)value, timestamp = d.getCCSTimeStamp().getUTCInstant().toEpochMilli()) || this.ignoredPaths.contains(fullPath)) continue;
                LOG.log(Level.WARNING, "Ignoring path {0}", fullPath);
                this.ignoredPaths.add(fullPath);
            }
            if (!this.kafkaBridge.supportsAggregateRecordsPublication()) {
                this.resetRecordManagerAndPublishToKafka();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void dataProviderDictionaryUpdate(DataProviderDictionaryService.DataProviderDictionaryEvent evt) {
        if (evt.getEventType() == DataProviderDictionaryService.DataProviderDictionaryEvent.EventType.ADDED) {
            if (this.mapper.addSchemaForDictionaryAndAgent(evt.getDictionary(), evt.getAgentInfo())) {
                LOG.log(Level.INFO, "Adding avro topics and schemas for agent {0}", evt.getAgentInfo().getName());
                Object object = this.topicRegistrationLock;
                synchronized (object) {
                    for (Map.Entry e : this.mapper.getAllTopicsMap().entrySet()) {
                        String topicName = ((Topic)e.getValue()).getFullName();
                        if (this.registeredTopics.contains(topicName)) continue;
                        Topic topic = (Topic)e.getValue();
                        try {
                            if (this.kafkaBridge.registerKafkaTopic(topic)) {
                                this.registeredTopics.add(topicName);
                                continue;
                            }
                            LOG.log(Level.WARNING, "Failed to register topic {0}", topicName);
                        }
                        catch (Exception ex) {
                            LOG.log(Level.WARNING, "Failed to register topic " + topicName, ex);
                        }
                    }
                    String agentName = evt.getAgentInfo().getName();
                    for (DataProviderInfo dpi : evt.getDictionary().getDataProviderInfos()) {
                        if (DataProviderInfoUtils.acceptDataProviderInfo((DataProviderInfo)dpi)) continue;
                        String fullPath = agentName + "/" + dpi.getFullPath();
                        this.ignoredPaths.add(fullPath);
                    }
                    this.relevantAgentInfo.add(evt.getAgentInfo());
                }
            }
        } else if (evt.getEventType() == DataProviderDictionaryService.DataProviderDictionaryEvent.EventType.REMOVED) {
            this.relevantAgentInfo.remove(evt.getAgentInfo());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void resetRecordManagerAndPublishToKafka() {
        GenericRecordManager localRecordManager = this.recordManager;
        Iterator iterator = this.recordManagerLock;
        synchronized (iterator) {
            this.recordManager = new GenericRecordManager(this.mapper);
        }
        for (Topic t : localRecordManager.getTopicsWithData()) {
            if (!this.registeredTopics.contains(t.getFullName())) continue;
            Schema schema = this.mapper.getSchemaForTopic(t);
            ArrayList<GenericRecord> l = null;
            for (GenericRecord gr : localRecordManager.getRecordsForTopic(t)) {
                if (l == null) {
                    l = new ArrayList<GenericRecord>();
                }
                l.add(gr);
                if (l.size() < this.maxNumberOfPublishedRecords) continue;
                this.publishListOfRecordsForSchema(schema, l);
                l = null;
            }
            if (l == null) continue;
            this.publishListOfRecordsForSchema(schema, l);
        }
    }

    private void publishHeartBeat() {
        Topic hbTopic = this.mapper.getHeartbeatTopic();
        Schema schema = this.mapper.getSchemaForTopic(hbTopic);
        GenericData.Record record = new GenericData.Record(schema);
        record.put("timestamp", (Object)System.currentTimeMillis());
        record.put(this.mapper.getHeartbeatTopicField().getName(), (Object)true);
        this.publishListOfRecordsForSchema(schema, Collections.singletonList(record));
    }

    private void publishListOfRecordsForSchema(Schema schema, List<GenericRecord> records) {
        ArrayList<GenericRecord> tmpList = new ArrayList<GenericRecord>(records);
        this.kafkaScheduler.schedule(() -> {
            RequestResult result = this.kafkaBridge.publishRecordsForSchema(schema, tmpList);
            if (this.influxDbClientService != null && this.influxDbClientService.isEnabled()) {
                Object object = dataProcessingLock;
                synchronized (object) {
                    String topicName = schema.getName();
                    DataAccumulator timing = this.timeAccumMap.computeIfAbsent(topicName, n -> new DataAccumulator());
                    timing.accumulate(result.getTimeInMillis());
                    DataAccumulator nRecords = this.nRecordsAccumMap.computeIfAbsent(topicName, n -> new DataAccumulator());
                    nRecords.accumulate(result.getNumberOfRecords());
                    DataAccumulator size = this.sizeAccumMap.computeIfAbsent(topicName, n -> new DataAccumulator());
                    size.accumulate(result.getRequestSize());
                }
            }
        }, 0L, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processKafkaProxyData() {
        if (this.influxDbClientService != null && this.influxDbClientService.isEnabled()) {
            HashMap<String, DataAccumulator> oldSizeAccumMap;
            HashMap<String, DataAccumulator> oldNRecordsAccumMap;
            HashMap<String, DataAccumulator> oldTimeAccumMap;
            Object object = dataProcessingLock;
            synchronized (object) {
                oldTimeAccumMap = new HashMap<String, DataAccumulator>(this.timeAccumMap);
                oldNRecordsAccumMap = new HashMap<String, DataAccumulator>(this.nRecordsAccumMap);
                oldSizeAccumMap = new HashMap<String, DataAccumulator>(this.sizeAccumMap);
                this.timeAccumMap.clear();
                this.nRecordsAccumMap.clear();
                this.sizeAccumMap.clear();
            }
            BatchPoints bp = this.influxDbClientService.createBatchPoints();
            for (String topicName : oldTimeAccumMap.keySet()) {
                long time = System.currentTimeMillis();
                Point.Builder pointBuilder = Point.measurement((String)"kafka_proxy_metrics").time(time, TimeUnit.MILLISECONDS);
                DataAccumulator timing = (DataAccumulator)oldTimeAccumMap.get(topicName);
                pointBuilder = pointBuilder.addField("publish_time_avg", timing.getAverageValue()).addField("publish_time_max", timing.getMaxValue()).addField("publish_count", (long)timing.getCounts());
                DataAccumulator nRecords = (DataAccumulator)oldNRecordsAccumMap.get(topicName);
                pointBuilder = pointBuilder.addField("publish_records_avg", nRecords.getAverageValue()).addField("publish_records_max", nRecords.getMaxValue());
                DataAccumulator size = (DataAccumulator)oldSizeAccumMap.get(topicName);
                if (!(pointBuilder = pointBuilder.addField("publish_size_avg", size.getAverageValue()).addField("publish_size_max", size.getMaxValue())).hasFields()) continue;
                Point point = pointBuilder.tag("topic", topicName).tag(this.influxDbClientService.getGlobalTags()).build();
                bp = bp.point(point);
            }
            if (!bp.getPoints().isEmpty()) {
                this.influxDbClientService.write(bp);
            }
        }
    }

    private class DataAccumulator {
        private volatile double accum = 0.0;
        private volatile double max = -1.7976931348623157E308;
        private volatile int counter = 0;

        private DataAccumulator() {
        }

        public synchronized void accumulate(double value) {
            this.accum += value;
            ++this.counter;
            if (value > this.max) {
                this.max = value;
            }
        }

        public double getAverageValue() {
            return this.counter > 0 ? this.accum / (double)this.counter : Double.NaN;
        }

        public double getMaxValue() {
            return this.counter > 0 ? this.max : Double.NaN;
        }

        public double getTotalValue() {
            return this.accum;
        }

        public int getCounts() {
            return this.counter;
        }
    }
}

