sofa-bolt源码阅读(2)-客户端的启动
sofa客户端访问服务器分为两步,第一步是初始化工作,第二步是建立连接。典型的代码是
// 1. create a rpc client RpcClient client = new RpcClient(); // 2. add processor for connect and close event if you need client.addConnectionEventProcessor(ConnectionEventType.CONNECT, clientConnectProcessor); client.addConnectionEventProcessor(ConnectionEventType.CLOSE, clientDisConnectProcessor); // 3. do init client.startup(); // 4. invoke RequestBody req = new RequestBody(2, "hello world sync"); try { String res = (String) client.invokeSync(addr, req, 3000); System.out.println("invoke sync result = [" + res + "]"); } catch (RemotingException e) { String errMsg = "RemotingException caught in oneway!"; logger.error(errMsg, e); Assert.fail(errMsg); } catch (InterruptedException e) { logger.error("interrupted!"); } // 5. close client.shutdown();
RpcClient.startup完成了初始化工作,包括连接的管理(ConnectionManager)、监控(DefaultConnectionMonitor)和重连(ReconnectManager)。
2.1 连接管理
ConnectionManager负责对连接进行管理。每一个url对应一个poolkey,每一个poolkey创建一个ConnectionPool,
一个ConnectionPool维护多个连接。如上图所示,当需要一个连接时,ConnectionManager从ConnectionPool中按照连接选择策略ConnectionSelectStrategy获取一个连接。
初始化连接管理器
com.alipay.remoting.rpc.RpcClient#startup ConnectionSelectStrategy connectionSelectStrategy = option(BoltGenericOption.CONNECTION_SELECT_STRATEGY); if (connectionSelectStrategy == null) { connectionSelectStrategy = new RandomSelectStrategy(switches()); } this.connectionManager = new DefaultClientConnectionManager(connectionSelectStrategy, new RpcConnectionFactory(userProcessors, this), connectionEventHandler, connectionEventListener, switches()); this.connectionManager.setAddressParser(this.addressParser); this.connectionManager.startup();
连接选择策略 ConnectionSelectStrategy
在获取Connection的时候,ConnectPool会调用ConnectionSelectStrategy来选择某一个Connection。
com.alipay.remoting.ConnectionPool#get public Connection get() { //更新访问时间 markAccess(); if (null != connections) { List<Connection> snapshot = new ArrayList<Connection>(connections); if (snapshot.size() > 0) { //按选择策略选择Connection return strategy.select(snapshot); } else { return null; } } else { return null; } }
以随机选择策略RandomSelectStrategy为例,使用一个随机变量随机从传入的连接列表中获取一个正常的连接
com.alipay.remoting.RandomSelectStrategy#select public Connection select(List<Connection> connections) { //从connections列表中随机获取一个正常的连接 Connection result = randomGet(connections); return result; } com.alipay.remoting.RandomSelectStrategy#randomGet private Connection randomGet(List<Connection> connections) { if (null == connections || connections.isEmpty()) { return null; } int size = connections.size(); int tries = 0; Connection result = null; while ((result == null || !result.isFine()) && tries++ < MAX_TIMES) { result = connections.get(this.random.nextInt(size)); } if (result != null && !result.isFine()) { result = null; } return result; }
地址解析器 RpcAddressParser
RpcAddressParser将一个url字符串解析为一个Url对象,例如
连接工厂类 RpcConnectionFactory
用来创建ConnectionPool
事件处理器 ConnectionEventHandler
实现了ChannelHandler,会被注册到netty的pipeline里面。当产生连接打开或连接关闭事件时,转发给ConnectionEventListener处理
事件监听器 ConnectionEventListener
配合ConnectionEventHandler使用,实现事件产生时具体的业务处理
启动连接管理器
com.alipay.remoting.DefaultClientConnectionManager#startup @Override public void startup() throws LifeCycleException { //更新状态为启动 super.startup(); this.connectionEventHandler.setConnectionManager(this); this.connectionEventHandler.setConnectionEventListener(connectionEventListener); this.connectionFactory.init(connectionEventHandler); }
startup方法依次执行
更新状态为启动
初始化事件处理器
初始化连接工厂类
connectionFactory.init方法完成对客户端netty的配置
com.alipay.remoting.connection.AbstractConnectionFactory#init public void init(final ConnectionEventHandler connectionEventHandler) { bootstrap = new Bootstrap(); bootstrap.group(workerGroup).channel(NettyEventLoopUtil.getClientSocketChannelClass()) .option(ChannelOption.TCP_NODELAY, ConfigManager.tcp_nodelay()) .option(ChannelOption.SO_REUSEADDR, ConfigManager.tcp_so_reuseaddr()) .option(ChannelOption.SO_KEEPALIVE, ConfigManager.tcp_so_keepalive()); // init netty write buffer water mark initWriteBufferWaterMark(); // init byte buf allocator if (ConfigManager.netty_buffer_pooled()) { this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } else { this.bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT); } bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("decoder", codec.newDecoder()); pipeline.addLast("encoder", codec.newEncoder()); boolean idleSwitch = ConfigManager.tcp_idle_switch(); if (idleSwitch) { pipeline.addLast("idleStateHandler", new IdleStateHandler(ConfigManager.tcp_idle(), ConfigManager.tcp_idle(), 0, TimeUnit.MILLISECONDS)); pipeline.addLast("heartbeatHandler", heartbeatHandler); } pipeline.addLast("connectionEventHandler", connectionEventHandler); pipeline.addLast("handler", handler); } }); }
可以看到,客户端netty的pipeline与服务器的pipeline是几乎一样的。细微的差别是,客户端需要发送心跳,服务器端需要检测心跳。
//client pipeline.addLast("heartbeatHandler", heartbeatHandler); //server pipeline.addLast("serverIdleHandler", serverIdleHandler);
2.2 连接监控
if (switches().isOn(GlobalSwitch.CONN_MONITOR_SWITCH)) { if (monitorStrategy == null) { connectionMonitor = new DefaultConnectionMonitor(new ScheduledDisconnectStrategy(), this.connectionManager); } else { connectionMonitor = new DefaultConnectionMonitor(monitorStrategy, this.connectionManager); } connectionMonitor.startup(); logger.warn("Switch on connection monitor"); }
DefaultConnectionMonitor对连接进行监控,其在startup方法中启动一个 ScheduledThreadPoolExecutor ,调用ConnectionMonitorStrategy.monitor方法对连接池进行处理。
@Override public void startup() throws LifeCycleException { super.startup(); /* initial delay to execute schedule task, unit: ms */ long initialDelay = ConfigManager.conn_monitor_initial_delay(); /* period of schedule task, unit: ms*/ long period = ConfigManager.conn_monitor_period(); this.executor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory( "ConnectionMonitorThread", true), new ThreadPoolExecutor.AbortPolicy()); this.executor.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { Map<String, RunStateRecordedFutureTask<ConnectionPool>> connPools = connectionManager .getConnPools(); strategy.monitor(connPools); } catch (Exception e) { logger.warn("MonitorTask error", e); } } }, initialDelay, period, TimeUnit.MILLISECONDS); }
例如ScheduledDisconnectStrategy监控连接池中连接的数量,当数量超过配置指定的长度时,随机关闭连接池中的一个连接。
public void monitor(Map<String, RunStateRecordedFutureTask<ConnectionPool>> connPools) { try { if (connPools == null || connPools.size() == 0) { return; } for (Map.Entry<String, RunStateRecordedFutureTask<ConnectionPool>> entry : connPools .entrySet()) { String poolKey = entry.getKey(); ConnectionPool pool = FutureTaskUtil.getFutureTaskResult(entry.getValue(), logger); List<Connection> serviceOnConnections = new ArrayList<Connection>(); List<Connection> serviceOffConnections = new ArrayList<Connection>(); for (Connection connection : pool.getAll()) { if (isConnectionOn(connection)) { serviceOnConnections.add(connection); } else { serviceOffConnections.add(connection); } } if (serviceOnConnections.size() > connectionThreshold) { Connection freshSelectConnect = serviceOnConnections.get(random .nextInt(serviceOnConnections.size())); freshSelectConnect.setAttribute(Configs.CONN_SERVICE_STATUS, Configs.CONN_SERVICE_STATUS_OFF); serviceOffConnections.add(freshSelectConnect); } else { if (logger.isInfoEnabled()) { logger.info("serviceOnConnections({}) size[{}], CONNECTION_THRESHOLD[{}].", poolKey, serviceOnConnections.size(), connectionThreshold); } } for (Connection offConn : serviceOffConnections) { if (offConn.isInvokeFutureMapFinish()) { if (offConn.isFine()) { offConn.close(); } } else { if (logger.isInfoEnabled()) { logger.info("Address={} won't close at this schedule turn", RemotingUtil.parseRemoteAddress(offConn.getChannel())); } } } } } catch (Exception e) { logger.error("ScheduledDisconnectStrategy monitor error", e); } }
2.3 连接重连
if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) { reconnectManager = new ReconnectManager(connectionManager); reconnectManager.startup(); connectionEventHandler.setReconnector(reconnectManager); logger.warn("Switch on reconnect manager"); }
与监控不同,重连在startup方法中启动了一个Thread,该线程作为一个消费者,处理添加的ReconnectTask
com.alipay.remoting.ReconnectManager#startup @Override public void startup() throws LifeCycleException { super.startup(); this.healConnectionThreads = new Thread(new HealConnectionRunner()); this.healConnectionThreads.start(); }
com.alipay.remoting.ReconnectManager.HealConnectionRunner#run @Override public void run() { while (isStarted()) { long start = -1; ReconnectTask task = null; try { if (this.lastConnectTime < HEAL_CONNECTION_INTERVAL) { Thread.sleep(HEAL_CONNECTION_INTERVAL); } try { task = ReconnectManager.this.tasks.take(); } catch (InterruptedException e) { // ignore } if (task == null) { continue; } start = System.currentTimeMillis(); if (!canceled.contains(task.url)) { task.run(); } else { logger.warn("Invalid reconnect request task {}, cancel list size {}", task.url, canceled.size()); } this.lastConnectTime = System.currentTimeMillis() - start; } catch (Exception e) { if (start != -1) { this.lastConnectTime = System.currentTimeMillis() - start; } if (task != null) { logger.warn("reconnect target: {} failed.", task.url, e); tasks.add(task); } } } }
ReconnectTask的创建
创建由两种情况,一是连接异常断开后,二是重连失败后
1. 连接异常断开的情况 com.alipay.remoting.ConnectionEventHandler#channelInactive // add reconnect task if (this.globalSwitch != null && this.globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) { Connection conn = (Connection) attr.get(); if (reconnectManager != null) { reconnectManager.reconnect(conn.getUrl()); } }
ReconnectTask的取消
当客户端强制关闭连接的时候会取消重连,并设置ReconnectManager#canceled变量
com.alipay.remoting.rpc.RpcClient#closeConnection @Override public void closeConnection(Url url) { if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH) && reconnectManager != null) { reconnectManager.disableReconnect(url); } this.connectionManager.remove(url.getUniqueKey()); }
com.alipay.remoting.ReconnectManager#disableReconnect @Override public void disableReconnect(Url url) { canceled.add(url); }
2.4 建立连接
回到客户端启动的代码
RequestBody req = new RequestBody(2, "hello world sync"); try { String res = (String) client.invokeSync(addr, req, 3000); System.out.println("invoke sync result = [" + res + "]"); } catch (RemotingException e) { String errMsg = "RemotingException caught in oneway!"; logger.error(errMsg, e); Assert.fail(errMsg); } catch (InterruptedException e) { logger.error("interrupted!"); }
客户端调用invokeSync方法,发起对服务器的访问
com.alipay.remoting.rpc.RpcClient#invokeSync @Override public Object invokeSync(final String address, final Object request, final InvokeContext invokeContext, final int timeoutMillis) throws RemotingException,InterruptedException { return this.rpcRemoting.invokeSync(address, request, invokeContext, timeoutMillis); }
com.alipay.remoting.rpc.RpcRemoting#invokeSync public Object invokeSync(final String addr, final Object request, final InvokeContext invokeContext, final int timeoutMillis) throws RemotingException,InterruptedException { //调用addressParser解析地址为Url对象 Url url = this.addressParser.parse(addr); return this.invokeSync(url, request, invokeContext, timeoutMillis); } com.alipay.remoting.rpc.RpcClientRemoting#invokeSync @Override public Object invokeSync(Url url, Object request, InvokeContext invokeContext, int timeoutMillis) throws RemotingException,InterruptedException { //1.创建连接对象 final Connection conn = getConnectionAndInitInvokeContext(url, invokeContext); //2.检查连接对象 this.connectionManager.check(conn); //3.触发调用 return this.invokeSync(conn, request, invokeContext, timeoutMillis); }
创建连接对象
com.alipay.remoting.rpc.RpcClientRemoting#getConnectionAndInitInvokeContext protected Connection getConnectionAndInitInvokeContext(Url url, InvokeContext invokeContext) throws RemotingException, InterruptedException { long start = System.currentTimeMillis(); Connection conn; try { //调用ConnectManager创建Connection对象 conn = this.connectionManager.getAndCreateIfAbsent(url); } finally { if (null != invokeContext) { invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATETIME, (System.currentTimeMillis() - start)); } } return conn; }
com.alipay.remoting.DefaultConnectionManager#getAndCreateIfAbsent @Override public Connection getAndCreateIfAbsent(Url url) throws InterruptedException, RemotingException { // 1. 创建连接池,后面可以看到poolkey就是Url的uniqueKey ConnectionPool pool = this.getConnectionPoolAndCreateIfAbsent(url.getUniqueKey(), new ConnectionPoolCall(url)); if (null != pool) { //2. 获取连接 return pool.get(); } else { logger.error("[NOTIFYME] bug detected! pool here must not be null!"); return null; } }
获取连接可以参考
创建连接池
com.alipay.remoting.DefaultConnectionManager#getConnectionPoolAndCreateIfAbsent private ConnectionPool getConnectionPoolAndCreateIfAbsent(String poolKey, Callable<ConnectionPool> callable)throws RemotingException,InterruptedException { RunStateRecordedFutureTask<ConnectionPool> initialTask; ConnectionPool pool = null; int retry = Constants.DEFAULT_RETRY_TIMES; int timesOfResultNull = 0; int timesOfInterrupt = 0; for (int i = 0; (i < retry) && (pool == null); ++i) { initialTask = this.connTasks.get(poolKey); if (null == initialTask) { RunStateRecordedFutureTask<ConnectionPool> newTask = new RunStateRecordedFutureTask<ConnectionPool>( callable); initialTask = this.connTasks.putIfAbsent(poolKey, newTask); if (null == initialTask) { initialTask = newTask; //运行task,实际调用的是ConnectionPoolCall.call() initialTask.run(); } } try { pool = initialTask.get(); ... } catch (ExecutionException e) { ... } } return pool; }
com.alipay.remoting.DefaultConnectionManager.ConnectionPoolCall#call @Override public ConnectionPool call() throws Exception { final ConnectionPool pool = new ConnectionPool(connectionSelectStrategy); if (whetherInitConnection) { try { //初始化连接池 doCreate(this.url, pool, this.getClass().getSimpleName(), 1); } catch (Exception e) { pool.removeAllAndTryClose(); throw e; } } return pool; }
doCreate方法创建连接池中连接对象,可以同步和异步创建部分连接对象
private void doCreate(final Url url, final ConnectionPool pool, final String taskName,final int syncCreateNumWhenNotWarmup) throws RemotingException { final int actualNum = pool.size(); final int expectNum = url.getConnNum(); if (actualNum >= expectNum) { return; } if (logger.isDebugEnabled()) { logger.debug("actual num {}, expect num {}, task name {}", actualNum, expectNum, taskName); } if (url.isConnWarmup()) { for (int i = actualNum; i < expectNum; ++i) { Connection connection = create(url); pool.add(connection); } } else { if (syncCreateNumWhenNotWarmup < 0 || syncCreateNumWhenNotWarmup > url.getConnNum()) { throw new IllegalArgumentException( "sync create number when not warmup should be [0," + url.getConnNum() + "]"); } // 同步创建对象 if (syncCreateNumWhenNotWarmup > 0) { for (int i = 0; i < syncCreateNumWhenNotWarmup; ++i) { Connection connection = create(url); pool.add(connection); } if (syncCreateNumWhenNotWarmup >= url.getConnNum()) { return; } } pool.markAsyncCreationStart();// mark the start of async try { //异步创建连接对象 this.asyncCreateConnectionExecutor.execute(new Runnable() { @Override public void run() { try { for (int i = pool.size(); i < url.getConnNum(); ++i) { Connection conn = null; try { conn = create(url); } catch (RemotingException e) { } pool.add(conn); } } finally { pool.markAsyncCreationDone();// mark the end of async } } }); } catch (RejectedExecutionException e) { pool.markAsyncCreationDone();// mark the end of async when reject throw e; } } // end of NOT warm up }
创建连接对象
通过连接工厂类创建连接对象
com.alipay.remoting.DefaultConnectionManager#create public Connection create(Url url) throws RemotingException { Connection conn; try { conn = this.connectionFactory.createConnection(url); } catch (Exception e) { throw new RemotingException("Create connection failed. The address is " + url.getOriginUrl(), e); } return conn; }
com.alipay.remoting.connection.AbstractConnectionFactory#createConnection @Override public Connection createConnection(Url url) throws Exception { Channel channel = doCreateConnection(url.getIp(), url.getPort(), url.getConnectTimeout()); Connection conn = new Connection(channel, ProtocolCode.fromBytes(url.getProtocol()), url.getVersion(), url); channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT); return conn; }
com.alipay.remoting.connection.AbstractConnectionFactory#doCreateConnection protected Channel doCreateConnection(String targetIP, int targetPort, int connectTimeout) throws Exception { // prevent unreasonable value, at least 1000 connectTimeout = Math.max(connectTimeout, 1000); String address = targetIP + ":" + targetPort; if (logger.isDebugEnabled()) { logger.debug("connectTimeout of address [{}] is [{}].", address, connectTimeout); } bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout); ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort)); future.awaitUninterruptibly(); ... return future.channel(); }
检查连接对象
- 检查connection存在
- 检查通道存活
- 检查通道可写
com.alipay.remoting.DefaultConnectionManager#check public void check(Connection connection) throws RemotingException { if (connection == null) { throw new RemotingException("Connection is null when do check!"); } if (connection.getChannel() == null || !connection.getChannel().isActive()) { this.remove(connection); throw new RemotingException("Check connection failed for address: " + connection.getUrl()); } if (!connection.getChannel().isWritable()) { // No remove. Most of the time it is unwritable temporarily. throw new RemotingException("Check connection failed for address: " + connection.getUrl() + ", maybe write overflow!"); } }
触发调用
public Object invokeSync(final Connection conn, final Object request, final InvokeContext invokeContext, final int timeoutMillis) throws RemotingException,InterruptedException { RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis); preProcessInvokeContext(invokeContext, requestCommand, conn); ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand, timeoutMillis); responseCommand.setInvokeContext(invokeContext); Object responseObject = RpcResponseResolver.resolveResponseObject(responseCommand, RemotingUtil.parseRemoteAddress(conn.getChannel())); return responseObject; }
- 将请求转为requestCommand
- 预处理调用上下文
- 发送请求
- 获取responseCommand并返回
发送请求是将requestCommand写入到通道里
com.alipay.remoting.BaseRemoting#invokeSync protected RemotingCommand invokeSync(final Connection conn, final RemotingCommand request,final int timeoutMillis) throws RemotingException, InterruptedException { final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext()); conn.addInvokeFuture(future); final int requestId = request.getId(); try { conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { ... } }); } catch (Exception e) { ... } RemotingCommand response = future.waitResponse(timeoutMillis); if (response == null) { conn.removeInvokeFuture(requestId); response = this.commandFactory.createTimeoutResponse(conn.getRemoteAddress()); logger.warn("Wait response, request id={} timeout!", requestId); } return response; }