RocketMQ-Namesrv源码解析
前言
版本:RocketMQ4.4.0
RocketMQ架构
摘自《RocketMQ开发手册》
- Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
- Broker部署相对复杂,Broker分为Master和Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master和Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0 表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。
- Producer和Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server获取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
- Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
NameSrv集群是一个无状态节点,维护Broker注册的Topic信息,将向生产者和消费者提供Topic路由信息。
问题
看了RocketMQ架构,我们也大概了解了namesrv的作用,那么也产生了两个问题:
- Broker注册信息,注册了哪些信息?也就是namesrv维护了集群中的哪些信息?
- 由Broker定时上报注册信息,那么在Broker宕机时,无法注册信息,那么namesrv是如何处理已宕机的broker?
那么下面将进行阅读源码来解决这两个疑问。
Namesrv
RocketMQ包含的组件
- NameServer:单点,供Producer和Consumer获取Broker地址
- Producer:产生并发送消息
- Consumer:接收并消费消息
- Broker:消息存储,消息转发
namesrv组件
作用:Producer和Consumer获取Broker的地址。
目的:解耦Broker和Producer、Consumer。
原理:使用netty作为通信工具,监听指定端口,如果是broker注册,将broker的信息保存在内存中并保存到文件中,producer和consumer获取broker地址的请求
namesrv包结构说明
- NamesrvStartup为启动入口
- NamesrvController为启动控制器
- kvconfig为kv配置管理器
- processor为请求处理器,处理接收到的请求
- routeinfo为netty中实际存储路由信息的管理器/容器
NamesrvController包含的组件
- namesrvConfig:nameServer的配置
- nettyServerConfig:NameServer的netty配置
- remotingServer:NameServer 的netty服务器
- scheduledExecutorService:routeInfoManager和kvConfigManager使用的定时线程池
- remotingExecutor:netty使用的线程池
- brokerHosekeppingService:
- kvConfigManager:kv配置管理
- routeInfoManager:包含broker的ip和对应的队列信息,说明producer可以往哪一个broker发送消息,consumer从哪一个broker pull消息。
- fileWatchService:文件监控线程(4.3.0以上才拥有此功能)
NameServer启动
NamesrvStartup.main(0)
public static void main(String[] args) { main0(args); } public static NamesrvController main0(String[] args) { try { NamesrvController controller = createNamesrvController(args); start(controller); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; } public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); //PackageConflictDetect.detectFastjson(); Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); return null; } //1. NamesrvConfig配置 final NamesrvConfig namesrvConfig = new NamesrvConfig(); //2. 解析命令行参数,NettyServerConfig配置 final NettyServerConfig nettyServerConfig = new NettyServerConfig(); //配置监听端口 nettyServerConfig.setListenPort(9876); if (commandLine.hasOption(‘c‘)) { String file = commandLine.getOptionValue(‘c‘); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); //读取properties文件,加载数据到内存对象中 MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, %s%n", file); in.close(); } } if (commandLine.hasOption(‘p‘)) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); System.exit(0); } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } //3. logback配置 LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml"); log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); //4. NamesrvController初始化,initialize final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); // remember all configs to prevent discard // 将读取的配置导入,防止丢失 controller.getConfiguration().registerConfig(properties); return controller; } public static NamesrvController start(final NamesrvController controller) throws Exception { if (null == controller) { throw new IllegalArgumentException("NamesrvController is null"); } boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { controller.shutdown(); return null; } })); controller.start(); return controller; }
说明:
- NamesrvConfig配置
- 解析命令行参数,NettyServerConfig配置
- logback配置
- 创建NamesrvController并初始化参数。
- NamesrvStartup.start()
- NamesrvController.initialize()
- 注册shutdown钩子
- NamesrvController.start()
NamesrvController.initialize()
public boolean initialize() { //1. KVConfigManager.load加载原来的key-value文件到内存中 this.kvConfigManager.load(); //2. 初始化NettyRemotingServer this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); //3. 初始化nettyRomote线程池,默认8个,该线程池中队列为LinkedBlockingQueue,为无界队列 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); //4. 注册requestProcessor,默认为DefaultRequestProcessor,用来处理netty接收到的信息 this.registerProcessor(); //5. 启动定时线程,延迟5秒执行,每隔10s判断broker是否依然存活 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //6. 启动定时线程,延迟1秒执行,每隔10min打印出所有k-v this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( new String[]{ TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged = keyChanged = false; reloadServerSslContext(); } } private void reloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can‘t load the certificate dynamically"); } } return true; }
说明:
- KVConfigManager.load加载原来的key-value文件到内存中
- 初始化NettyRemotingServer
- 初始化nettyRomote线程池,默认8个
- 注册requestProcessor,默认为DefaultRequestProcessor,用来处理netty接收到的信息
- 启动定时线程,延迟5秒执行,每隔10s扫描未活动的Broker,这个是否用于解决问题2
- 启动定时线程,延迟1秒执行,每隔10min打印出所有k-v
NamesrvController.start()
启动netty server
public void start() throws Exception { this.remotingServer.start(); if (this.fileWatchService != null) { this.fileWatchService.start(); } }
启动Netty服务端
public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } }); ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler(TlsSystemConfig.tlsMode)) .addLast(defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyServerHandler() ); } }); if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } try { ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); }
对于RocketMQ使用Netty进行集群之间的通信,后面会有单独章节用于讲解该模块:RocketMQ-remoting。
NameServer处理信息
netty收到的所有消息都是org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor.processRequest
处理的,根据不同的RequestCode执行不同的操作。
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { if (ctx != null) { log.debug("receive request, {} {} {}", request.getCode(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), request); } switch (request.getCode()) { case RequestCode.PUT_KV_CONFIG: return this.putKVConfig(ctx, request); case RequestCode.GET_KV_CONFIG: return this.getKVConfig(ctx, request); case RequestCode.DELETE_KV_CONFIG: return this.deleteKVConfig(ctx, request); case RequestCode.QUERY_DATA_VERSION: return queryBrokerTopicConfig(ctx, request); case RequestCode.REGISTER_BROKER:// 注册borker信息 // 获取Broker的版本信息 Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { // 3.0.11版本后的处理方式 return this.registerBrokerWithFilterServer(ctx, request); } else { // 3.0.11版本之前的处理方式 return this.registerBroker(ctx, request); } case RequestCode.UNREGISTER_BROKER: return this.unregisterBroker(ctx, request);// 取消注册broker case RequestCode.GET_ROUTEINTO_BY_TOPIC: return this.getRouteInfoByTopic(ctx, request);// 根据topic获取路由信息,在producer发送消息和consumer在pull消息的时候的时候会从nameServer 中获取 case RequestCode.GET_BROKER_CLUSTER_INFO: return this.getBrokerClusterInfo(ctx, request); case RequestCode.WIPE_WRITE_PERM_OF_BROKER: return this.wipeWritePermOfBroker(ctx, request); case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: return getAllTopicListFromNameserver(ctx, request); case RequestCode.DELETE_TOPIC_IN_NAMESRV: return deleteTopicInNamesrv(ctx, request); case RequestCode.GET_KVLIST_BY_NAMESPACE: return this.getKVListByNamespace(ctx, request); case RequestCode.GET_TOPICS_BY_CLUSTER: return this.getTopicsByCluster(ctx, request); case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: return this.getSystemTopicListFromNs(ctx, request); case RequestCode.GET_UNIT_TOPIC_LIST: return this.getUnitTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: return this.getHasUnitSubTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST: return this.getHasUnitSubUnUnitTopicList(ctx, request); case RequestCode.UPDATE_NAMESRV_CONFIG: return this.updateConfig(ctx, request); case RequestCode.GET_NAMESRV_CONFIG: return this.getConfig(ctx, request); default: break; } return null; }
注册Broker
case RequestCode.REGISTER_BROKER:// 注册borker信息 // 获取Broker的版本信息 Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { // 3.0.11版本后的处理方式 return this.registerBrokerWithFilterServer(ctx, request); } else { // 3.0.11版本之前的处理方式 return this.registerBroker(ctx, request); }
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#registerBrokerWithFilterServer
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { // 解析请求,获取RequestHeader final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class); final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader(); final RegisterBrokerRequestHeader requestHeader = (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); if (!checksum(ctx, request, requestHeader)) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("crc32 not match"); return response; } RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody(); // 获取RegisterBrokerBody if (request.getBody() != null) { try { registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed()); } catch (Exception e) { throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e); } } else { registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0)); registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0); } // 注册Broker RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker( requestHeader.getClusterName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerName(), requestHeader.getBrokerId(), requestHeader.getHaServerAddr(), registerBrokerBody.getTopicConfigSerializeWrapper(), registerBrokerBody.getFilterServerList(), ctx.channel()); // 设置响应结果 responseHeader.setHaServerAddr(result.getHaServerAddr()); responseHeader.setMasterAddr(result.getMasterAddr()); byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG); response.setBody(jsonValue); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
对于netty通信这块这里不过多深究,我们知道由Broker发送过来的消息带有以下这些消息:
RouteInfoManager
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager
结构说明
//topic队列信息 private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; //broker信息 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; //cluster信息 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; //存活的broker信息 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; //broker filter private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
RouteInfoManager为管理Broker所注册的Topic路由信息和集群的信息。其实就是维护了相对应的几个Map
注册Broker:registerBroker
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker
public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { try { // 获取写锁 this.lock.writeLock().lockInterruptibly(); // 获取该Cluster下的所有Broker,并添加该Broker Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet<String>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); // 是否第一次注册,标志位 boolean registerFirst = false; // 获取该BrokerName的BrokerData信息 BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { // 为空,第一次注册,构建信息存入 registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this.brokerAddrTable.put(brokerName, brokerData); } // 存入brokerAddrs信息,若oldAddr为null则为第一次put String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); // 若TopicConfigSerializeWrapper不为空,该类为TopicConfig信息的包装类,方便传输 // 且该broker为主,即 brokerId==0 if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { // 判断Broker的TopicConfig信息是否改变||第一注册Broker信息 if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { // 填充topicQueueTable this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } } // 填充brokerLiveTable BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); // 第一次注册 if (null == prevBrokerLiveInfo) { log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr); } if (filterServerList != null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddr); } else { this.filterServerTable.put(brokerAddr, filterServerList); } } // 备机处理,获取Master地址,构建返回结果 if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { // 释放写锁 this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e); } return result; }
说明:
- 填充clusterAddrTable
- 填充brokerAddrTable
- 填充topicQueueTable
- 填充brokerLiveTable
- 填充filterServerTable
自此整个Broker的注册过程完结,其实就是填充这5个Map的过程。问题1答案也浮出水面。
如何处理未存活的Broker
Broker未存活的判断
- Broker和Namesrv的Channel通道发生close、excepiton、idle事件,namesrv端将调用onChannelDestroy进行处理
- Namesrv将启动一个定时线程每隔10s扫描已上报的brokerLiveTable中所有Broker信息是否已超时上报,默认超时为120s,若超时,调用onChannelDestroy进行处理。
BrokerHousekeepingService
org.apache.rocketmq.namesrv.routeinfo.BrokerHousekeepingService implement ChannelEventListener
在发生Channel Event时将回调该方法:
public class BrokerHousekeepingService implements ChannelEventListener { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); private final NamesrvController namesrvController; public BrokerHousekeepingService(NamesrvController namesrvController) { this.namesrvController = namesrvController; } @Override public void onChannelConnect(String remoteAddr, Channel channel) { } @Override public void onChannelClose(String remoteAddr, Channel channel) { this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); } @Override public void onChannelException(String remoteAddr, Channel channel) { this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); } @Override public void onChannelIdle(String remoteAddr, Channel channel) { this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); } }
定时线程scanNotActiveBroker
org.apache.rocketmq.namesrv.NamesrvController#initialize
public boolean initialize() { // ...省略代码 //5. 启动定时线程,延迟5秒执行,每隔10s判断broker是否依然存活 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); // ...省略代码 return true; }
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker
public void scanNotActiveBroker() { Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } }
若超时,也是调用onChannelDestroy方法进行移除路由信息
RouteInfoManager#onChannelDestroy
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#onChannelDestroy
public void onChannelDestroy(String remoteAddr, Channel channel) { // 获取BrokerAddr,注意remoteAddr是Broker与netty连接的地址,并不是Broker nettyServer所监听的地址 String brokerAddrFound = null; if (channel != null) { try { try { this.lock.readLock().lockInterruptibly(); Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable = this.brokerLiveTable.entrySet().iterator(); while (itBrokerLiveTable.hasNext()) { Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next(); if (entry.getValue().getChannel() == channel) { brokerAddrFound = entry.getKey(); break; } } } finally { this.lock.readLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); } } if (null == brokerAddrFound) { brokerAddrFound = remoteAddr; } else { log.info("the broker‘s channel destroyed, {}, clean it‘s data structure at once", brokerAddrFound); } if (brokerAddrFound != null && brokerAddrFound.length() > 0) { try { try { this.lock.writeLock().lockInterruptibly(); // 1.移除brokerLiveTable this.brokerLiveTable.remove(brokerAddrFound); // 2.移除filterServerTable this.filterServerTable.remove(brokerAddrFound); String brokerNameFound = null; boolean removeBrokerName = false; // 3.移除brokerAddrTable Iterator<Entry<String, BrokerData>> itBrokerAddrTable = this.brokerAddrTable.entrySet().iterator(); while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) { BrokerData brokerData = itBrokerAddrTable.next().getValue(); Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> entry = it.next(); Long brokerId = entry.getKey(); String brokerAddr = entry.getValue(); if (brokerAddr.equals(brokerAddrFound)) { brokerNameFound = brokerData.getBrokerName(); it.remove(); log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed", brokerId, brokerAddr); break; } } if (brokerData.getBrokerAddrs().isEmpty()) { removeBrokerName = true; itBrokerAddrTable.remove(); log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); } } // 4.移除clusterAddrTable if (brokerNameFound != null && removeBrokerName) { Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, Set<String>> entry = it.next(); String clusterName = entry.getKey(); Set<String> brokerNames = entry.getValue(); boolean removed = brokerNames.remove(brokerNameFound); if (removed) { log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed", brokerNameFound, clusterName); if (brokerNames.isEmpty()) { log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster", clusterName); it.remove(); } break; } } } // 5.移除topicQueueTable if (removeBrokerName) { Iterator<Entry<String, List<QueueData>>> itTopicQueueTable = this.topicQueueTable.entrySet().iterator(); while (itTopicQueueTable.hasNext()) { Entry<String, List<QueueData>> entry = itTopicQueueTable.next(); String topic = entry.getKey(); List<QueueData> queueDataList = entry.getValue(); Iterator<QueueData> itQueueData = queueDataList.iterator(); while (itQueueData.hasNext()) { QueueData queueData = itQueueData.next(); if (queueData.getBrokerName().equals(brokerNameFound)) { itQueueData.remove(); log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, queueData); } } if (queueDataList.isEmpty()) { itTopicQueueTable.remove(); log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); } } }
日志
20:18:32.951 [NettyServerCodecThread_1] WARN RocketmqRemoting - NETTY SERVER PIPELINE: exceptionCaught 127.0.0.1:8078 20:18:32.960 [NettyServerCodecThread_1] WARN RocketmqRemoting - NETTY SERVER PIPELINE: exceptionCaught exception. java.io.IOException: 远程主机强迫关闭了一个现有的连接。 at sun.nio.ch.SocketDispatcher.read0(Native Method) ~[na:1.8.0_111] at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) ~[na:1.8.0_111] at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_111] at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0_111] at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0_111] at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) ~[netty-all-4.0.42.Final.jar:4.0.42.Final] at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) ~[netty-all-4.0.42.Final.jar:4.0.42.Final] at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275) ~[netty-all-4.0.42.Final.jar:4.0.42.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) ~[netty-all-4.0.42.Final.jar:4.0.42.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652) ~[netty-all-4.0.42.Final.jar:4.0.42.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575) ~[netty-all-4.0.42.Final.jar:4.0.42.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489) ~[netty-all-4.0.42.Final.jar:4.0.42.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451) ~[netty-all-4.0.42.Final.jar:4.0.42.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) [netty-all-4.0.42.Final.jar:4.0.42.Final] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111] 20:18:32.963 [NettyServerNIOSelector_3_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:8078] result: true 20:18:32.963 [NettyServerCodecThread_1] INFO RocketmqRemoting - NETTY SERVER PIPELINE: channelInactive, the channel[127.0.0.1:8078] 20:18:32.963 [NettyServerCodecThread_1] INFO RocketmqRemoting - NETTY SERVER PIPELINE: channelUnregistered, the channel[127.0.0.1:8078] 20:19:19.394 [NettyEventExecutor] INFO RocketmqNamesrv - the broker‘s channel destroyed, 192.168.72.1:10911, clean it‘s data structure at once 20:20:29.565 [NettyEventExecutor] INFO RocketmqNamesrv - remove brokerAddr[0, 192.168.72.1:10911] from brokerAddrTable, because channel destroyed 20:20:50.713 [NettyEventExecutor] INFO RocketmqNamesrv - remove brokerName[broker-a] from brokerAddrTable, because channel destroyed 20:21:34.787 [NettyEventExecutor] INFO RocketmqNamesrv - remove brokerName[broker-a], clusterName[DefaultCluster] from clusterAddrTable, because channel destroyed 20:21:36.033 [NettyEventExecutor] INFO RocketmqNamesrv - remove the clusterName[DefaultCluster] from clusterAddrTable, because channel destroyed and no broker in this cluster 20:21:56.126 [NettyEventExecutor] INFO RocketmqNamesrv - remove topic[broker-a QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=7, topicSynFlag=0]], from topicQueueTable, because channel destroyed 20:22:02.100 [NettyEventExecutor] INFO RocketmqNamesrv - remove topic[broker-a] all queue, from topicQueueTable, because channel destroyed 20:22:07.553 [NettyEventExecutor] INFO RocketmqNamesrv - remove topic[SELF_TEST_TOPIC QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]], from topicQueueTable, because channel destroyed 20:22:07.554 [NettyEventExecutor] INFO RocketmqNamesrv - remove topic[SELF_TEST_TOPIC] all queue, from topicQueueTable, because channel destroyed 20:22:07.554 [NettyEventExecutor] INFO RocketmqNamesrv - remove topic[TBW102 QueueData [brokerName=broker-a, readQueueNums=8, writeQueueNums=8, perm=7, topicSynFlag=0]], from topicQueueTable, because channel destroyed 20:22:07.554 [NettyEventExecutor] INFO RocketmqNamesrv - remove topic[TBW102] all queue, from topicQueueTable, because channel destroyed 20:22:07.554 [NettyEventExecutor] INFO RocketmqNamesrv - remove topic[BenchmarkTest QueueData [brokerName=broker-a, readQueueNums=1024, writeQueueNums=1024, perm=6, topicSynFlag=0]], from topicQueueTable, because channel destroyed 20:22:07.554 [NettyEventExecutor] INFO RocketmqNamesrv - remove topic[BenchmarkTest] all queue, from topicQueueTable, because channel destroyed 20:22:07.554 [NettyEventExecutor] INFO RocketmqNamesrv - remove topic[DefaultCluster QueueData [brokerName=broker-a, readQueueNums=16, writeQueueNums=16, perm=7, topicSynFlag=0]], from topicQueueTable, because channel destroyed 20:22:07.554 [NettyEventExecutor] INFO RocketmqNamesrv - remove topic[DefaultCluster] all queue, from topicQueueTable, because channel destroyed 20:22:07.554 [NettyEventExecutor] INFO RocketmqNamesrv - remove topic[OFFSET_MOVED_EVENT QueueData [brokerName=broker-a, readQueueNums=1, writeQueueNums=1, perm=6, topicSynFlag=0]], from topicQueueTable, because channel destroyed 20:22:07.554 [NettyEventExecutor] INFO RocketmqNamesrv - remove topic[OFFSET_MOVED_EVENT] all queue, from topicQueueTable, because channel destroyed
说明:
实质就是移除RouteInfoManager中的5个Map
总结
NameSrv集群是一个无状态节点,维护Broker注册的Topic信息,将向生产者和消费者提供Topic路由信息。
对于提出的两个问题:
问题1:RouteInfoManager中有5个Map用于存储Broker的路由信息。
问题2:Namesrv中两种方式来处理Broker异常的情况,一是使用netty的channel事件机制实现不同事件的回调处理方法,该方式主要为处理channel通道发生异常情况。二是使用一个定时线程检测最新上报时间是否已超时(120s),若超时则认为该Broker断掉。