ActiveMq TransportConnector 解析
TransportConnector 在ActiveMq 主要是提供client的连接然后进行消息传递。
1:初始化
在TransportConnector 初始化时会生成TransportServer 主要是根据配置的URL生成ServiceSocket等待客户端的请求
@Override
public void run() {
while (!isStopped()) {
Socket socket = null;
try {
socket = serverSocket.accept();
if (socket != null) {
if (isStopped() || getAcceptListener() == null) {
socket.close();
} else {
if (useQueueForAccept) {
socketQueue.put(socket);
} else {
handleSocket(socket);
}
}
}
} catch (SocketTimeoutException ste) {
// expect this to happen
} catch (Exception e) {
if (!isStopping()) {
onAcceptError(e);
} else if (!isStopped()) {
LOG.warn("run()", e);
onAcceptError(e);
}
}
}
}
当TransportServer 接受客户端的连接后,根据Socket生成Transport,在根据Transport生成TransportConnection。
Transport:主要负责读取socket数据然后交给TransportConnection处理,TransportConnection继承CommandVisitor,这个类是具体处理消息类型,
Response processAddConnection(ConnectionInfo info) throws Exception;
Response processAddSession(SessionInfo info) throws Exception;
Response processAddProducer(ProducerInfo info) throws Exception;
Response processAddConsumer(ConsumerInfo info) throws Exception;
Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception;
Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception;
Response processRemoveProducer(ProducerId id) throws Exception;
Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception;
Response processAddDestination(DestinationInfo info) throws Exception;
Response processRemoveDestination(DestinationInfo info) throws Exception;
Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception;
Response processMessage(Message send) throws Exception;
Response processMessageAck(MessageAck ack) throws Exception;
Response processMessagePull(MessagePull pull) throws Exception;
Response processBeginTransaction(TransactionInfo info) throws Exception;
Response processPrepareTransaction(TransactionInfo info) throws Exception;
Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception;
Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception;
Response processRollbackTransaction(TransactionInfo info) throws Exception;
Response processWireFormat(WireFormatInfo info) throws Exception;
Response processKeepAlive(KeepAliveInfo info) throws Exception;
Response processShutdown(ShutdownInfo info) throws Exception;
Response processFlush(FlushCommand command) throws Exception;
Response processBrokerInfo(BrokerInfo info) throws Exception;
Response processRecoverTransactions(TransactionInfo info) throws Exception;
Response processForgetTransaction(TransactionInfo info) throws Exception;
Response processEndTransaction(TransactionInfo info) throws Exception;
Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception;
Response processProducerAck(ProducerAck ack) throws Exception;
Response processMessageDispatch(MessageDispatch dispatch) throws Exception;
Response processControlCommand(ControlCommand command) throws Exception;
Response processConnectionError(ConnectionError error) throws Exception;
Response processConnectionControl(ConnectionControl control) throws Exception;
Response processConsumerControl(ConsumerControl control) throws Exception;
}
3:消息发送,队列消息的发送都是由Queue类来处理,但生产者产生消息就存在待发消息队列里,然后有个线程一直循环这个队列发送消息个消费者