package edu.cmu.cs.able.eseb;

import edu.cmu.cs.able.eseb.filter.BusDataQueueGroupSink;
import edu.cmu.cs.able.eseb.filter.DataTypeSocketConnectionSink;
import edu.cmu.cs.able.eseb.filter.EventFilterChain;
import edu.cmu.cs.able.typelib.prim.PrimitiveScope;
import edu.cmu.cs.able.typelib.prim.StringValue;
import edu.cmu.cs.able.typelib.type.DataValue;
import incubator.dispatch.Dispatcher;
import incubator.dispatch.DispatcherOp;
import incubator.dispatch.LocalDispatcher;
import incubator.pval.Ensure;
import incubator.wt.CloseableListener;
import incubator.wt.CloseableWorkerThread;
import incubator.wt.WorkerThread;
import incubator.wt.WtState;
import java.io.IOException;
import org.apache.log4j.Logger;

/* loaded from: input_file:edu/cmu/cs/able/eseb/ControlledDataTypeSocketConnectionImpl.class */
public class ControlledDataTypeSocketConnectionImpl implements ControlledDataTypeSocketConnection {
    static final String CMD_PREFIX = "esebctl:";
    static final String CMD_PING = "ping";
    static final String CMD_NOSEND = "nosend";
    private DataTypeSocketConnection m_connection;
    private LocalDispatcher<CloseableListener> m_closeable_dispatcher;
    private BusDataQueueGroupImpl m_queue_group;
    private boolean m_send_data;
    private boolean m_closed;
    private long m_last_received_ping;
    private long m_last_sent_ping;
    private WorkerThread m_pinger;
    private PrimitiveScope m_primitive_scope;
    private BusDataQueue m_receive_queue;
    private EventFilterChain m_outgoing_chain;
    private EventFilterChain m_incoming_chain;
    long m_ping_check_interval_ms;
    long m_ping_send_interval_ms;
    long m_ping_max_interval_ms;
    private static final Logger LOG = Logger.getLogger(ControlledDataTypeSocketConnectionImpl.class);
    private static long PING_CHECK_INTERVAL_MS = 250;
    private static long PING_SEND_INTERVAL_MS = 1000;
    private static long PING_MAX_INTERVAL_MS = 2500;

    public ControlledDataTypeSocketConnectionImpl(PrimitiveScope primitiveScope, DataTypeSocketConnection dataTypeSocketConnection) {
        Ensure.not_null(primitiveScope, "primitive_scope == null");
        Ensure.not_null(dataTypeSocketConnection, "conn == null");
        this.m_primitive_scope = primitiveScope;
        this.m_connection = dataTypeSocketConnection;
        this.m_outgoing_chain = new EventFilterChain(new DataTypeSocketConnectionSink(this.m_connection));
        this.m_closeable_dispatcher = new LocalDispatcher<>();
        this.m_queue_group = new BusDataQueueGroupImpl();
        this.m_incoming_chain = new EventFilterChain(new BusDataQueueGroupSink(this.m_queue_group));
        this.m_send_data = true;
        this.m_closed = false;
        this.m_receive_queue = new BusDataQueue();
        this.m_last_received_ping = 0L;
        this.m_last_sent_ping = 0L;
        this.m_ping_check_interval_ms = PING_CHECK_INTERVAL_MS;
        this.m_ping_send_interval_ms = PING_SEND_INTERVAL_MS;
        this.m_ping_max_interval_ms = PING_MAX_INTERVAL_MS;
        this.m_pinger = new CloseableWorkerThread<DataTypeSocketConnection>("Ping Control (" + dataTypeSocketConnection.thread_group().name() + ")", dataTypeSocketConnection, true) { // from class: edu.cmu.cs.able.eseb.ControlledDataTypeSocketConnectionImpl.1
            /* JADX INFO: Access modifiers changed from: protected */
            public void do_cycle_operation(DataTypeSocketConnection dataTypeSocketConnection2) throws Exception {
                ControlledDataTypeSocketConnectionImpl.this.handle_pings();
            }
        };
        this.m_connection.closeable_dispatcher().add(new CloseableListener() { // from class: edu.cmu.cs.able.eseb.ControlledDataTypeSocketConnectionImpl.2
            public void closed(IOException iOException) {
                ControlledDataTypeSocketConnectionImpl.this.connection_closed(iOException);
            }
        });
        this.m_connection.queue_group().add(this.m_receive_queue);
        this.m_receive_queue.dispatcher().add(new BusDataQueueListener() { // from class: edu.cmu.cs.able.eseb.ControlledDataTypeSocketConnectionImpl.3
            @Override // edu.cmu.cs.able.eseb.BusDataQueueListener
            public void data_added_to_queue() {
                ControlledDataTypeSocketConnectionImpl.this.bus_data_received();
            }
        });
    }

    @Override // edu.cmu.cs.able.eseb.ControlledDataTypeSocketConnection
    public Dispatcher<CloseableListener> closeable_dispatcher() {
        return this.m_closeable_dispatcher;
    }

    @Override // edu.cmu.cs.able.eseb.ControlledDataTypeSocketConnection
    public synchronized BusDataQueueGroup queue_group() {
        return this.m_queue_group;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void connection_closed(final IOException iOException) {
        this.m_closeable_dispatcher.dispatch(new DispatcherOp<CloseableListener>() { // from class: edu.cmu.cs.able.eseb.ControlledDataTypeSocketConnectionImpl.4
            public void dispatch(CloseableListener closeableListener) {
                closeableListener.closed(iOException);
            }
        });
        this.m_closed = true;
        this.m_closeable_dispatcher.dispatch(new Runnable() { // from class: edu.cmu.cs.able.eseb.ControlledDataTypeSocketConnectionImpl.5
            @Override // java.lang.Runnable
            public void run() {
                ControlledDataTypeSocketConnectionImpl.this.stop_pinger_if_running();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [edu.cmu.cs.able.eseb.BusDataQueue] */
    /* JADX WARN: Type inference failed for: r0v11 */
    /* JADX WARN: Type inference failed for: r0v12, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v13 */
    /* JADX WARN: Type inference failed for: r0v15, types: [edu.cmu.cs.able.eseb.filter.EventFilterChain] */
    /* JADX WARN: Type inference failed for: r0v2 */
    /* JADX WARN: Type inference failed for: r0v23 */
    /* JADX WARN: Type inference failed for: r0v24 */
    /* JADX WARN: Type inference failed for: r0v3, types: [java.lang.Throwable] */
    public synchronized void bus_data_received() {
        ?? r0 = this.m_receive_queue;
        synchronized (r0) {
            while (true) {
                BusData poll = this.m_receive_queue.poll();
                if (poll == null) {
                    return;
                }
                boolean z = poll.value() instanceof StringValue;
                r0 = z;
                if (z) {
                    String str = (String) poll.value().value();
                    boolean startsWith = str.startsWith(CMD_PREFIX);
                    r0 = startsWith;
                    if (startsWith) {
                        process_cmd(str.substring(CMD_PREFIX.length()));
                        return;
                    }
                }
                try {
                    r0 = this.m_incoming_chain;
                    r0.sink(poll);
                } catch (IOException e) {
                    r0 = e;
                    Ensure.never_thrown((Throwable) r0);
                }
            }
        }
    }

    private synchronized void process_cmd(String str) {
        Ensure.not_null(str);
        switch (str.hashCode()) {
            case -1039723319:
                if (str.equals(CMD_NOSEND)) {
                    this.m_send_data = false;
                    return;
                }
                return;
            case 3441010:
                if (str.equals(CMD_PING)) {
                    this.m_last_received_ping = System.currentTimeMillis();
                    return;
                }
                return;
            default:
                return;
        }
    }

    @Override // edu.cmu.cs.able.eseb.ControlledDataTypeSocketConnection
    public void write(DataValue dataValue) throws IOException {
        Ensure.not_null(dataValue);
        write(new BusData(dataValue));
    }

    @Override // edu.cmu.cs.able.eseb.ControlledDataTypeSocketConnection
    public void write(BusData busData) throws IOException {
        Ensure.not_null(busData);
        if (this.m_closed || !this.m_send_data) {
            return;
        }
        this.m_outgoing_chain.sink(busData);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void handle_pings() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.m_last_received_ping < currentTimeMillis - this.m_ping_max_interval_ms) {
            String str = "No heartbeat received in the last " + (currentTimeMillis - this.m_last_received_ping) + "ms.";
            LOG.info(String.valueOf(str) + " Closing connection.");
            throw new IOException(str);
        }
        if (this.m_last_sent_ping < currentTimeMillis - this.m_ping_send_interval_ms && !this.m_closed) {
            this.m_connection.write((DataValue) this.m_primitive_scope.string().make("esebctl:ping"));
            this.m_last_sent_ping = currentTimeMillis;
        }
        wait(this.m_ping_check_interval_ms);
    }

    @Override // edu.cmu.cs.able.eseb.ControlledDataTypeSocketConnection
    public void start() {
        long currentTimeMillis = System.currentTimeMillis();
        this.m_last_received_ping = currentTimeMillis;
        this.m_last_sent_ping = currentTimeMillis;
        this.m_connection.start();
        this.m_pinger.start();
    }

    @Override // edu.cmu.cs.able.eseb.ControlledDataTypeSocketConnection
    public void stop() {
        this.m_connection.stop();
        stop_pinger_if_running();
    }

    @Override // edu.cmu.cs.able.eseb.ControlledDataTypeSocketConnection
    public void publish_only() throws IOException {
        write((DataValue) this.m_primitive_scope.string().make("esebctl:nosend"));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.m_connection.close();
        stop_pinger_if_running();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void stop_pinger_if_running() {
        Throwable th = this.m_pinger;
        synchronized (th) {
            if (this.m_pinger.state() == WtState.RUNNING) {
                this.m_pinger.stop();
            }
            th = th;
        }
    }

    @Override // edu.cmu.cs.able.eseb.ControlledDataTypeSocketConnection
    public EventFilterChain incoming_chain() {
        return this.m_incoming_chain;
    }

    @Override // edu.cmu.cs.able.eseb.ControlledDataTypeSocketConnection
    public EventFilterChain outgoing_chain() {
        return this.m_outgoing_chain;
    }
}
