ActiveMq TransportConnector 解析

TransportConnector  在ActiveMq 主要是提供client的连接然后进行消息传递。

1:初始化

在TransportConnector  初始化时会生成TransportServer 主要是根据配置的URL生成ServiceSocket等待客户端的请求

  @Override
    public void run() {
        while (!isStopped()) {
            Socket socket = null;
            try {
                socket = serverSocket.accept();
                if (socket != null) {
                    if (isStopped() || getAcceptListener() == null) {
                        socket.close();
                    } else {
                        if (useQueueForAccept) {
                            socketQueue.put(socket);
                        } else {
                            handleSocket(socket);
                        }
                    }
                }
            } catch (SocketTimeoutException ste) {
                // expect this to happen
            } catch (Exception e) {
                if (!isStopping()) {
                    onAcceptError(e);
                } else if (!isStopped()) {
                    LOG.warn("run()", e);
                    onAcceptError(e);
                }
            }
        }
    }

当TransportServer 接受客户端的连接后,根据Socket生成Transport,在根据Transport生成TransportConnection。

Transport:主要负责读取socket数据然后交给TransportConnection处理,TransportConnection继承CommandVisitor,这个类是具体处理消息类型,

Response processAddConnection(ConnectionInfo info) throws Exception;

    Response processAddSession(SessionInfo info) throws Exception;

    Response processAddProducer(ProducerInfo info) throws Exception;

    Response processAddConsumer(ConsumerInfo info) throws Exception;

    Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception;

    Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception;

    Response processRemoveProducer(ProducerId id) throws Exception;

    Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception;

    Response processAddDestination(DestinationInfo info) throws Exception;

    Response processRemoveDestination(DestinationInfo info) throws Exception;

    Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception;

    Response processMessage(Message send) throws Exception;

    Response processMessageAck(MessageAck ack) throws Exception;

    Response processMessagePull(MessagePull pull) throws Exception;

    Response processBeginTransaction(TransactionInfo info) throws Exception;

    Response processPrepareTransaction(TransactionInfo info) throws Exception;

    Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception;

    Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception;

    Response processRollbackTransaction(TransactionInfo info) throws Exception;

    Response processWireFormat(WireFormatInfo info) throws Exception;

    Response processKeepAlive(KeepAliveInfo info) throws Exception;

    Response processShutdown(ShutdownInfo info) throws Exception;

    Response processFlush(FlushCommand command) throws Exception;

    Response processBrokerInfo(BrokerInfo info) throws Exception;

    Response processRecoverTransactions(TransactionInfo info) throws Exception;

    Response processForgetTransaction(TransactionInfo info) throws Exception;

    Response processEndTransaction(TransactionInfo info) throws Exception;

    Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception;

    Response processProducerAck(ProducerAck ack) throws Exception;

    Response processMessageDispatch(MessageDispatch dispatch) throws Exception;

    Response processControlCommand(ControlCommand command) throws Exception;

    Response processConnectionError(ConnectionError error) throws Exception;

    Response processConnectionControl(ConnectionControl control) throws Exception;

    Response processConsumerControl(ConsumerControl control) throws Exception;

}

3:消息发送,队列消息的发送都是由Queue类来处理,但生产者产生消息就存在待发消息队列里,然后有个线程一直循环这个队列发送消息个消费者

相关推荐