package edu.cmu.cs.able.eseb.conn;

import edu.cmu.cs.able.eseb.BusData;
import edu.cmu.cs.able.eseb.BusDataQueue;
import edu.cmu.cs.able.eseb.BusDataQueueGroup;
import edu.cmu.cs.able.eseb.BusDataQueueGroupImpl;
import edu.cmu.cs.able.eseb.BusDataQueueListener;
import edu.cmu.cs.able.eseb.ControlledDataTypeSocketConnection;
import edu.cmu.cs.able.eseb.ControlledDataTypeSocketConnectionImpl;
import edu.cmu.cs.able.eseb.DataTypeSocketConnectionImpl;
import edu.cmu.cs.able.eseb.filter.BusDataQueueGroupSink;
import edu.cmu.cs.able.eseb.filter.EventFilterChain;
import edu.cmu.cs.able.eseb.filter.EventSink;
import edu.cmu.cs.able.eseb.participant.ParticipantModelFilter;
import edu.cmu.cs.able.typelib.enc.DataValueEncoding;
import edu.cmu.cs.able.typelib.prim.PrimitiveScope;
import edu.cmu.cs.able.typelib.txtenc.typelib.DefaultTextEncoding;
import edu.cmu.cs.able.typelib.type.DataValue;
import incubator.dispatch.DispatcherOp;
import incubator.dispatch.LocalDispatcher;
import incubator.exh.LocalCollector;
import incubator.pval.Ensure;
import incubator.wt.CloseableListener;
import incubator.wt.WorkerThread;
import incubator.wt.WtState;
import java.io.Closeable;
import java.io.IOException;
import java.net.Socket;
import java.util.Date;
import java.util.LinkedList;
import org.apache.log4j.Logger;

/* loaded from: input_file:edu/cmu/cs/able/eseb/conn/BusConnection.class */
public class BusConnection implements Closeable {
    private static final long ESTABLISHER_COOLING_PERIOD_MS = 5000;
    private static final Logger LOG;
    private String m_host;
    private short m_port;
    private BusConnectionState m_state;
    private LocalDispatcher<BusConnectionListener> m_dispatcher;
    private LinkedList<DataValue> m_out_buffer;
    private ControlledDataTypeSocketConnection m_connection;
    private WorkerThread m_connection_establisher;
    private Date m_connection_time;
    private int m_connect_count;
    private int m_receive_count;
    private int m_send_count;
    private LocalCollector m_collector;
    private PrimitiveScope m_primitive_scope;
    private BusDataQueue m_queue;
    private BusDataQueueGroupImpl m_queue_group;
    private EventFilterChain m_incoming_chain;
    private EventFilterChain m_outgoing_chain;
    private DataValueEncoding m_encoding;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: edu.cmu.cs.able.eseb.conn.BusConnection$8, reason: invalid class name */
    /* loaded from: input_file:edu/cmu/cs/able/eseb/conn/BusConnection$8.class */
    public static /* synthetic */ class AnonymousClass8 {
        static final /* synthetic */ int[] $SwitchMap$edu$cmu$cs$able$eseb$conn$BusConnectionState = new int[BusConnectionState.values().length];

        static {
            try {
                $SwitchMap$edu$cmu$cs$able$eseb$conn$BusConnectionState[BusConnectionState.CONNECTED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$edu$cmu$cs$able$eseb$conn$BusConnectionState[BusConnectionState.CONNECTING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$edu$cmu$cs$able$eseb$conn$BusConnectionState[BusConnectionState.DISCONNECTED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public BusConnection(String str, short s, PrimitiveScope primitiveScope) {
        this(str, s, primitiveScope, new DefaultTextEncoding(primitiveScope));
    }

    public BusConnection(String str, short s, PrimitiveScope primitiveScope, DataValueEncoding dataValueEncoding) {
        Ensure.not_null(str);
        Ensure.greater(s, 0L);
        Ensure.not_null(primitiveScope);
        Ensure.not_null(dataValueEncoding);
        this.m_host = str;
        this.m_port = s;
        this.m_state = BusConnectionState.DISCONNECTED;
        this.m_dispatcher = new LocalDispatcher<>();
        this.m_out_buffer = new LinkedList<>();
        this.m_connection = null;
        this.m_connection_establisher = null;
        this.m_connection_time = new Date();
        this.m_connect_count = 0;
        this.m_receive_count = 0;
        this.m_send_count = 0;
        this.m_encoding = dataValueEncoding;
        this.m_collector = new LocalCollector("BusClient (" + str + ":" + ((int) s) + ")");
        this.m_primitive_scope = primitiveScope;
        this.m_queue = new BusDataQueue();
        this.m_queue.dispatcher().add(new BusDataQueueListener() { // from class: edu.cmu.cs.able.eseb.conn.BusConnection.1
            @Override // edu.cmu.cs.able.eseb.BusDataQueueListener
            public void data_added_to_queue() {
                BusConnection.this.received();
            }
        });
        this.m_queue_group = new BusDataQueueGroupImpl();
        this.m_incoming_chain = new EventFilterChain(new BusDataQueueGroupSink(this.m_queue_group));
        this.m_outgoing_chain = new EventFilterChain(new EventSink() { // from class: edu.cmu.cs.able.eseb.conn.BusConnection.2
            @Override // edu.cmu.cs.able.eseb.filter.EventSink
            public void sink(BusData busData) throws IOException {
                Ensure.not_null(busData);
                BusConnection.this.internal_send(busData.value());
            }
        });
    }

    public synchronized void start() {
        switch (AnonymousClass8.$SwitchMap$edu$cmu$cs$able$eseb$conn$BusConnectionState[this.m_state.ordinal()]) {
            case 1:
            case ParticipantModelFilter.RENEWS_BEFORE_DROP /* 2 */:
                return;
            case 3:
                switch_disconnected_connecting();
                return;
            default:
                if (!$assertionsDisabled) {
                    throw new AssertionError();
                }
                return;
        }
    }

    public void stop() {
        WorkerThread workerThread;
        synchronized (this) {
            workerThread = this.m_connection_establisher;
            switch (AnonymousClass8.$SwitchMap$edu$cmu$cs$able$eseb$conn$BusConnectionState[this.m_state.ordinal()]) {
                case 1:
                    switch_connected_disconnected();
                    break;
                case ParticipantModelFilter.RENEWS_BEFORE_DROP /* 2 */:
                    switch_connecting_disconnected();
                    break;
                case 3:
                    break;
                default:
                    if (!$assertionsDisabled) {
                        throw new AssertionError();
                    }
                    break;
            }
        }
        if (workerThread != null) {
            while (workerThread.state() != WtState.STOPPED) {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                }
            }
        }
    }

    public void add_listener(BusConnectionListener busConnectionListener) {
        this.m_dispatcher.add(busConnectionListener);
    }

    public void remove_listener(BusConnectionListener busConnectionListener) {
        this.m_dispatcher.remove(busConnectionListener);
    }

    public synchronized BusConnectionState state() {
        return this.m_state;
    }

    public synchronized Date connection_time() {
        return this.m_connection_time;
    }

    public synchronized int receive_count() {
        return this.m_receive_count;
    }

    public synchronized int sent_count() {
        return this.m_send_count;
    }

    public synchronized int connect_count() {
        return this.m_connect_count;
    }

    public synchronized void send(DataValue dataValue) {
        try {
            notifyAll();
            this.m_outgoing_chain.sink(new BusData(dataValue));
        } catch (IOException e) {
            Ensure.never_thrown(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void internal_send(DataValue dataValue) {
        Ensure.not_null(dataValue, "value == null");
        this.m_send_count++;
        switch (AnonymousClass8.$SwitchMap$edu$cmu$cs$able$eseb$conn$BusConnectionState[this.m_state.ordinal()]) {
            case 1:
                if (this.m_out_buffer.size() > 0) {
                    LOG.debug("Queueing: " + dataValue);
                    this.m_out_buffer.addLast(dataValue);
                    return;
                }
                try {
                    LOG.debug("Writing (online): " + dataValue);
                    this.m_connection.write(dataValue);
                    return;
                } catch (IOException e) {
                    this.m_collector.collect(e, "send");
                    LOG.error("Error while writing {" + dataValue + "}.", e);
                    return;
                }
            case ParticipantModelFilter.RENEWS_BEFORE_DROP /* 2 */:
            case 3:
                this.m_out_buffer.add(dataValue);
                return;
            default:
                if (!$assertionsDisabled) {
                    throw new AssertionError();
                }
                return;
        }
    }

    private synchronized void switch_disconnected_connecting() {
        if (!$assertionsDisabled && this.m_state != BusConnectionState.DISCONNECTED) {
            throw new AssertionError();
        }
        this.m_state = BusConnectionState.CONNECTING;
        this.m_connection_establisher = new WorkerThread("Connection (" + this.m_host + ":" + ((int) this.m_port) + ")") { // from class: edu.cmu.cs.able.eseb.conn.BusConnection.3
            protected void do_cycle_operation() throws Exception {
                BusConnection.this.establisher_cycle();
            }
        };
        this.m_connection_establisher.start();
        notify_state_changed();
    }

    private synchronized void switch_connected_disconnected() {
        if (!$assertionsDisabled && this.m_state != BusConnectionState.CONNECTED) {
            throw new AssertionError();
        }
        try {
            this.m_connection.close();
        } catch (IOException e) {
            this.m_collector.collect(e, "connect->disconnect");
            LOG.error("Error while closing connection.", e);
        }
        this.m_connection.stop();
        this.m_state = BusConnectionState.DISCONNECTED;
        this.m_connection = null;
        this.m_connection_time = new Date();
        notify_state_changed();
    }

    private synchronized void switch_connecting_disconnected() {
        if (!$assertionsDisabled && this.m_state != BusConnectionState.CONNECTING) {
            throw new AssertionError();
        }
        this.m_state = BusConnectionState.DISCONNECTED;
        final WorkerThread workerThread = this.m_connection_establisher;
        this.m_connection_establisher = null;
        notify_state_changed();
        this.m_dispatcher.dispatch(new Runnable() { // from class: edu.cmu.cs.able.eseb.conn.BusConnection.4
            @Override // java.lang.Runnable
            public void run() {
                workerThread.stop();
            }
        });
    }

    private synchronized void switch_connecting_connected(Socket socket) throws IOException {
        if (!$assertionsDisabled && this.m_state != BusConnectionState.CONNECTING) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && socket == null) {
            throw new AssertionError();
        }
        WorkerThread workerThread = this.m_connection_establisher;
        this.m_state = BusConnectionState.CONNECTED;
        this.m_connection = new ControlledDataTypeSocketConnectionImpl(this.m_primitive_scope, new DataTypeSocketConnectionImpl("Connection " + this.m_host + ":" + ((int) this.m_port), socket, this.m_encoding, this.m_primitive_scope));
        this.m_connection_establisher = null;
        this.m_connection_time = new Date();
        this.m_connect_count++;
        send_clear_out_buffer();
        this.m_connection.queue_group().add(this.m_queue);
        this.m_connection.closeable_dispatcher().add(new CloseableListener() { // from class: edu.cmu.cs.able.eseb.conn.BusConnection.5
            public void closed(IOException iOException) {
                synchronized (BusConnection.this) {
                    if (BusConnection.this.m_state == BusConnectionState.CONNECTED) {
                        BusConnection.this.switch_connected_connecting();
                    }
                }
            }
        });
        this.m_connection.start();
        notify_state_changed();
        workerThread.stop();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void switch_connected_connecting() {
        switch_connected_disconnected();
        switch_disconnected_connecting();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void establisher_cycle() throws InterruptedException {
        synchronized (this) {
            if (this.m_state != BusConnectionState.CONNECTING) {
                wait(ESTABLISHER_COOLING_PERIOD_MS);
                return;
            }
            Socket socket = null;
            try {
                socket = new Socket(this.m_host, this.m_port);
            } catch (IOException e) {
                this.m_collector.collect(e, "connect");
                LOG.info("Error while connecting to host " + this.m_host + ", port " + ((int) this.m_port) + ".", e);
            }
            synchronized (this) {
                if (socket != null) {
                    if (this.m_state == BusConnectionState.CONNECTING) {
                        try {
                            switch_connecting_connected(socket);
                        } catch (IOException e2) {
                            this.m_collector.collect(e2, "establish after connect");
                            LOG.error("Error establishing connection after socket connect.", e2);
                        }
                    } else {
                        try {
                            socket.close();
                        } catch (IOException e3) {
                            this.m_collector.collect(e3, "close unnecessary socket");
                            LOG.error("Error while closing unnecessary socket.", e3);
                        }
                        socket = null;
                    }
                }
                if (socket == null && this.m_state == BusConnectionState.CONNECTING) {
                    wait(ESTABLISHER_COOLING_PERIOD_MS);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void send_clear_out_buffer() {
        this.m_dispatcher.dispatch(new Runnable() { // from class: edu.cmu.cs.able.eseb.conn.BusConnection.6
            @Override // java.lang.Runnable
            public void run() {
                synchronized (BusConnection.this) {
                    if (BusConnection.this.m_out_buffer.size() > 0 && BusConnection.this.m_state == BusConnectionState.CONNECTED) {
                        DataValue dataValue = (DataValue) BusConnection.this.m_out_buffer.removeFirst();
                        try {
                            BusConnection.LOG.debug("Writing (deferred): " + dataValue);
                            BusConnection.this.m_connection.write(dataValue);
                        } catch (IOException e) {
                            BusConnection.this.m_collector.collect(e, "send from buffer");
                            BusConnection.LOG.error("Error while writing {" + dataValue + "}.", e);
                        }
                        BusConnection.this.send_clear_out_buffer();
                    }
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void received() {
        while (true) {
            BusData poll = this.m_queue.poll();
            if (poll == null) {
                return;
            }
            this.m_receive_count++;
            LOG.debug("Received: {" + poll.value() + "}.");
            try {
                this.m_incoming_chain.sink(poll);
            } catch (IOException e) {
                Ensure.never_thrown(e);
            }
        }
    }

    private synchronized void notify_state_changed() {
        this.m_dispatcher.dispatch(new DispatcherOp<BusConnectionListener>() { // from class: edu.cmu.cs.able.eseb.conn.BusConnection.7
            public void dispatch(BusConnectionListener busConnectionListener) {
                busConnectionListener.connection_state_changed();
            }
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        stop();
    }

    public synchronized LocalCollector collector() {
        return this.m_collector;
    }

    public BusDataQueueGroup queue_group() {
        return this.m_queue_group;
    }

    public synchronized String host() {
        return this.m_host;
    }

    public synchronized short port() {
        return this.m_port;
    }

    public synchronized EventFilterChain incoming_chain() {
        return this.m_incoming_chain;
    }

    public synchronized EventFilterChain outgoing_chain() {
        return this.m_outgoing_chain;
    }

    public PrimitiveScope primitive_scope() {
        return this.m_primitive_scope;
    }

    public DataValueEncoding encoding() {
        return this.m_encoding;
    }

    static {
        $assertionsDisabled = !BusConnection.class.desiredAssertionStatus();
        LOG = Logger.getLogger(BusConnection.class);
    }
}
