package edu.cmu.cs.able.eseb.bus;

import edu.cmu.cs.able.eseb.BusData;
import edu.cmu.cs.able.eseb.BusDataQueue;
import edu.cmu.cs.able.eseb.BusDataQueueListener;
import edu.cmu.cs.able.eseb.ControlledDataTypeSocketConnectionImpl;
import edu.cmu.cs.able.eseb.DataTypeSocketConnectionImpl;
import edu.cmu.cs.able.eseb.filter.EventFilterChain;
import edu.cmu.cs.able.typelib.enc.DataValueEncoding;
import edu.cmu.cs.able.typelib.prim.PrimitiveScope;
import edu.cmu.cs.able.typelib.txtenc.typelib.DefaultTextEncoding;
import incubator.ExceptionSuppress;
import incubator.dispatch.DispatcherOp;
import incubator.dispatch.LocalDispatcher;
import incubator.exh.LocalCollector;
import incubator.pval.Ensure;
import incubator.wt.CloseableListener;
import incubator.wt.CloseableWorkerThread;
import incubator.wt.WorkerThreadGroup;
import incubator.wt.WorkerThreadGroupCI;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;

/* loaded from: input_file:edu/cmu/cs/able/eseb/bus/EventBus.class */
public class EventBus implements Closeable {
    private static final Logger LOG = Logger.getLogger(EventBus.class);
    private static final int SOCKET_TMEOUT_MS = 100;
    private WorkerThreadGroup m_group;
    private Map<Integer, EventBusConnectionData> m_connections;
    private ServerSocket m_accept_socket;
    private LocalDispatcher<EventBusListener> m_dispatcher;
    private int m_next_connection_id;
    private PrimitiveScope m_scope;
    private LocalCollector m_collector;
    private short m_port;
    private List<EventBusAcceptPreprocessor> m_preprocessors;
    private DataValueEncoding m_encoding;

    public EventBus(short s, PrimitiveScope primitiveScope) throws IOException {
        this(s, primitiveScope, new DefaultTextEncoding(primitiveScope));
    }

    public EventBus(short s, PrimitiveScope primitiveScope, DataValueEncoding dataValueEncoding) throws IOException {
        Ensure.is_true(s > 0);
        Ensure.not_null(primitiveScope);
        Ensure.not_null(dataValueEncoding);
        this.m_group = new WorkerThreadGroup("Event Bus (" + ((int) s) + ")");
        this.m_encoding = dataValueEncoding;
        this.m_connections = new HashMap();
        this.m_accept_socket = new ServerSocket(s);
        this.m_accept_socket.setSoTimeout(SOCKET_TMEOUT_MS);
        this.m_group.add_thread(new CloseableWorkerThread<ServerSocket>("Event bus (" + ((int) s) + ") acceptor", this.m_accept_socket, true) { // from class: edu.cmu.cs.able.eseb.bus.EventBus.1
            /* JADX INFO: Access modifiers changed from: protected */
            public void do_cycle_operation(ServerSocket serverSocket) throws Exception {
                EventBus.this.accept_cycle(serverSocket);
            }
        });
        this.m_dispatcher = new LocalDispatcher<>();
        this.m_next_connection_id = 1;
        this.m_scope = primitiveScope;
        this.m_collector = new LocalCollector("Event bus (" + ((int) s) + ")");
        this.m_port = s;
        this.m_preprocessors = new ArrayList();
    }

    public synchronized void add_preprocessor(EventBusAcceptPreprocessor eventBusAcceptPreprocessor) {
        Ensure.not_null(eventBusAcceptPreprocessor);
        this.m_preprocessors.add(eventBusAcceptPreprocessor);
    }

    public synchronized void remove_preprocessor(EventBusAcceptPreprocessor eventBusAcceptPreprocessor) {
        Ensure.not_null(eventBusAcceptPreprocessor);
        Ensure.is_true(this.m_preprocessors.remove(eventBusAcceptPreprocessor));
    }

    public synchronized short port() {
        return this.m_port;
    }

    public synchronized void add_listener(EventBusListener eventBusListener) {
        this.m_dispatcher.add(eventBusListener);
    }

    public synchronized void remove_listener(EventBusListener eventBusListener) {
        this.m_dispatcher.remove(eventBusListener);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void accept_cycle(ServerSocket serverSocket) throws IOException {
        try {
            accept_connection(serverSocket.accept());
        } catch (SocketTimeoutException e) {
        }
    }

    private synchronized void accept_connection(Socket socket) throws IOException {
        if (this.m_accept_socket == null) {
            socket.close();
            return;
        }
        InetAddress inetAddress = socket.getInetAddress();
        final int i = this.m_next_connection_id;
        this.m_next_connection_id++;
        DataTypeSocketConnectionImpl dataTypeSocketConnectionImpl = new DataTypeSocketConnectionImpl("Client " + i, socket, this.m_encoding, this.m_scope);
        final BusDataQueue busDataQueue = new BusDataQueue();
        busDataQueue.dispatcher().add(new BusDataQueueListener() { // from class: edu.cmu.cs.able.eseb.bus.EventBus.2
            @Override // edu.cmu.cs.able.eseb.BusDataQueueListener
            public void data_added_to_queue() {
                EventBus.this.received(busDataQueue, i);
            }
        });
        CloseableListener closeableListener = new CloseableListener() { // from class: edu.cmu.cs.able.eseb.bus.EventBus.3
            public void closed(IOException iOException) {
                EventBus.this.closed(iOException, i);
            }
        };
        dataTypeSocketConnectionImpl.closeable_dispatcher().add(closeableListener);
        ControlledDataTypeSocketConnectionImpl controlledDataTypeSocketConnectionImpl = new ControlledDataTypeSocketConnectionImpl(this.m_scope, dataTypeSocketConnectionImpl);
        controlledDataTypeSocketConnectionImpl.queue_group().add(busDataQueue);
        final EventBusConnectionData eventBusConnectionData = new EventBusConnectionData(i, inetAddress, controlledDataTypeSocketConnectionImpl, busDataQueue, closeableListener);
        this.m_connections.put(Integer.valueOf(i), eventBusConnectionData);
        Iterator it = new ArrayList(this.m_preprocessors).iterator();
        while (it.hasNext()) {
            if (!((EventBusAcceptPreprocessor) it.next()).preprocess(controlledDataTypeSocketConnectionImpl)) {
                this.m_connections.remove(Integer.valueOf(i));
                controlledDataTypeSocketConnectionImpl.stop();
                LOG.info("Connection with ID " + i + " from address " + inetAddress + " was rejected by pre-processor.");
                controlledDataTypeSocketConnectionImpl.close();
                dataTypeSocketConnectionImpl.close();
                return;
            }
        }
        this.m_dispatcher.dispatch(new DispatcherOp<EventBusListener>() { // from class: edu.cmu.cs.able.eseb.bus.EventBus.4
            public void dispatch(EventBusListener eventBusListener) {
                eventBusListener.connection_accepted(eventBusConnectionData);
            }
        });
        controlledDataTypeSocketConnectionImpl.start();
        LOG.info("Accepted connection with ID " + i + " from address " + inetAddress + ".");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void received(BusDataQueue busDataQueue, int i) {
        final EventBusConnectionData eventBusConnectionData = this.m_connections.get(Integer.valueOf(i));
        if (eventBusConnectionData == null) {
            return;
        }
        while (true) {
            final BusData poll = busDataQueue.poll();
            if (poll == null) {
                return;
            }
            LOG.debug("Distributing from client " + i + ": " + poll.value());
            eventBusConnectionData.sent();
            for (EventBusConnectionData eventBusConnectionData2 : this.m_connections.values()) {
                eventBusConnectionData2.received();
                try {
                    eventBusConnectionData2.connection().write(poll);
                } catch (IOException e) {
                    this.m_collector.collect(e, "Writing to client '" + eventBusConnectionData2.id() + "'.");
                }
            }
            this.m_dispatcher.dispatch(new DispatcherOp<EventBusListener>() { // from class: edu.cmu.cs.able.eseb.bus.EventBus.5
                public void dispatch(EventBusListener eventBusListener) {
                    eventBusListener.distributed(poll, eventBusConnectionData);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void closed(IOException iOException, int i) {
        final EventBusConnectionData eventBusConnectionData = this.m_connections.get(Integer.valueOf(i));
        if (eventBusConnectionData == null) {
            return;
        }
        this.m_connections.remove(Integer.valueOf(i));
        eventBusConnectionData.connection().stop();
        this.m_dispatcher.dispatch(new DispatcherOp<EventBusListener>() { // from class: edu.cmu.cs.able.eseb.bus.EventBus.6
            public void dispatch(EventBusListener eventBusListener) {
                eventBusListener.connection_disconnected(eventBusConnectionData);
            }
        });
        LOG.info("Client " + i + " disconnected (e = " + iOException + ").");
        LOG.debug("Exception that closed client " + i + ":", iOException);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        ExceptionSuppress exceptionSuppress = new ExceptionSuppress();
        synchronized (this) {
            if (this.m_accept_socket == null) {
                return;
            }
            try {
                this.m_accept_socket.close();
            } catch (IOException e) {
                exceptionSuppress.add(e);
            }
            Iterator<EventBusConnectionData> it = this.m_connections.values().iterator();
            while (it.hasNext()) {
                try {
                    it.next().connection().close();
                } catch (IOException e2) {
                    exceptionSuppress.add(e2);
                }
            }
            LOG.info("Closing event bus.");
            this.m_accept_socket = null;
            this.m_group.stop_all();
            exceptionSuppress.maybe_throw();
        }
    }

    public WorkerThreadGroupCI thread_group() {
        return this.m_group;
    }

    public void start() {
        Ensure.not_null(this.m_accept_socket);
        LOG.info("Starting event bus.");
        this.m_group.start();
    }

    public synchronized boolean closed() {
        return this.m_accept_socket == null;
    }

    public synchronized EventFilterChain incoming_chain(int i) {
        EventBusConnectionData eventBusConnectionData = this.m_connections.get(Integer.valueOf(i));
        if (eventBusConnectionData == null) {
            return null;
        }
        return eventBusConnectionData.connection().incoming_chain();
    }

    public synchronized EventFilterChain outgoing_chain(int i) {
        EventBusConnectionData eventBusConnectionData = this.m_connections.get(Integer.valueOf(i));
        if (eventBusConnectionData == null) {
            return null;
        }
        return eventBusConnectionData.connection().outgoing_chain();
    }
}
