sofa-bolt源码阅读(2)-客户端的启动

sofa客户端访问服务器分为两步,第一步是初始化工作,第二步是建立连接。典型的代码是

// 1. create a rpc client
RpcClient client = new RpcClient();
// 2. add processor for connect and close event if you need
client.addConnectionEventProcessor(ConnectionEventType.CONNECT, clientConnectProcessor);
client.addConnectionEventProcessor(ConnectionEventType.CLOSE, clientDisConnectProcessor);
// 3. do init
client.startup();
// 4. invoke
RequestBody req = new RequestBody(2, "hello world sync");
try {
    String res = (String) client.invokeSync(addr, req, 3000);
    System.out.println("invoke sync result = [" + res + "]");
} catch (RemotingException e) {
    String errMsg = "RemotingException caught in oneway!";
    logger.error(errMsg, e);
    Assert.fail(errMsg);
} catch (InterruptedException e) {
    logger.error("interrupted!");
}
// 5. close
client.shutdown();

RpcClient.startup完成了初始化工作,包括连接的管理(ConnectionManager)、监控(DefaultConnectionMonitor)和重连(ReconnectManager)。

2.1 连接管理

ConnectionManager负责对连接进行管理。每一个url对应一个poolkey,每一个poolkey创建一个ConnectionPool,

一个ConnectionPool维护多个连接。如上图所示,当需要一个连接时,ConnectionManager从ConnectionPool中按照连接选择策略ConnectionSelectStrategy获取一个连接。

  • 初始化连接管理器

    com.alipay.remoting.rpc.RpcClient#startup
    
    ConnectionSelectStrategy connectionSelectStrategy = option(BoltGenericOption.CONNECTION_SELECT_STRATEGY);
    if (connectionSelectStrategy == null) {
        connectionSelectStrategy = new RandomSelectStrategy(switches());
    }
    this.connectionManager = new DefaultClientConnectionManager(connectionSelectStrategy,
        new RpcConnectionFactory(userProcessors, this), connectionEventHandler,
        connectionEventListener, switches());
    this.connectionManager.setAddressParser(this.addressParser);
    this.connectionManager.startup();
    • 连接选择策略 ConnectionSelectStrategy

      在获取Connection的时候,ConnectPool会调用ConnectionSelectStrategy来选择某一个Connection。

      com.alipay.remoting.ConnectionPool#get
      public Connection get() {
          //更新访问时间
          markAccess();
          if (null != connections) {
              List<Connection> snapshot = new ArrayList<Connection>(connections);
              if (snapshot.size() > 0) {
                  //按选择策略选择Connection
                  return strategy.select(snapshot);
              } else {
                  return null;
              }
          } else {
              return null;
          }
      }

      以随机选择策略RandomSelectStrategy为例,使用一个随机变量随机从传入的连接列表中获取一个正常的连接

      com.alipay.remoting.RandomSelectStrategy#select
      public Connection select(List<Connection> connections) {
          //从connections列表中随机获取一个正常的连接
          Connection result = randomGet(connections);
          return result;
      
      }
      
      com.alipay.remoting.RandomSelectStrategy#randomGet
      private Connection randomGet(List<Connection> connections) {
          if (null == connections || connections.isEmpty()) {
              return null;
          }
      
          int size = connections.size();
          int tries = 0;
          Connection result = null;
          while ((result == null || !result.isFine()) && tries++ < MAX_TIMES) {
              result = connections.get(this.random.nextInt(size));
          }
      
          if (result != null && !result.isFine()) {
              result = null;
          }
          return result;
      }
    • 地址解析器 RpcAddressParser

      RpcAddressParser将一个url字符串解析为一个Url对象,例如

    • 连接工厂类 RpcConnectionFactory

      用来创建ConnectionPool

    • 事件处理器 ConnectionEventHandler

      实现了ChannelHandler,会被注册到netty的pipeline里面。当产生连接打开或连接关闭事件时,转发给ConnectionEventListener处理

    • 事件监听器 ConnectionEventListener

      配合ConnectionEventHandler使用,实现事件产生时具体的业务处理

  • 启动连接管理器

    com.alipay.remoting.DefaultClientConnectionManager#startup
    @Override
    public void startup() throws LifeCycleException {
        //更新状态为启动
        super.startup();
    
        this.connectionEventHandler.setConnectionManager(this);
        this.connectionEventHandler.setConnectionEventListener(connectionEventListener);
        this.connectionFactory.init(connectionEventHandler);
    }

    startup方法依次执行

    • 更新状态为启动

    • 初始化事件处理器

    • 初始化连接工厂类

      connectionFactory.init方法完成对客户端netty的配置

      com.alipay.remoting.connection.AbstractConnectionFactory#init
      public void init(final ConnectionEventHandler connectionEventHandler) {
          bootstrap = new Bootstrap();
          bootstrap.group(workerGroup).channel(NettyEventLoopUtil.getClientSocketChannelClass())
              .option(ChannelOption.TCP_NODELAY, ConfigManager.tcp_nodelay())
              .option(ChannelOption.SO_REUSEADDR, ConfigManager.tcp_so_reuseaddr())
              .option(ChannelOption.SO_KEEPALIVE, ConfigManager.tcp_so_keepalive());
      
          // init netty write buffer water mark
          initWriteBufferWaterMark();
      
          // init byte buf allocator
          if (ConfigManager.netty_buffer_pooled()) {
              this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
          } else {
              this.bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
          }
      
          bootstrap.handler(new ChannelInitializer<SocketChannel>() {
      
              @Override
              protected void initChannel(SocketChannel channel) {
                  ChannelPipeline pipeline = channel.pipeline();
                  pipeline.addLast("decoder", codec.newDecoder());
                  pipeline.addLast("encoder", codec.newEncoder());
      
                  boolean idleSwitch = ConfigManager.tcp_idle_switch();
                  if (idleSwitch) {
                      pipeline.addLast("idleStateHandler",
                          new IdleStateHandler(ConfigManager.tcp_idle(), ConfigManager.tcp_idle(), 0,
                              TimeUnit.MILLISECONDS));
                      pipeline.addLast("heartbeatHandler", heartbeatHandler);
                  }
      
                  pipeline.addLast("connectionEventHandler", connectionEventHandler);
                  pipeline.addLast("handler", handler);
              }
          });
      }

      可以看到,客户端netty的pipeline与服务器的pipeline是几乎一样的。细微的差别是,客户端需要发送心跳,服务器端需要检测心跳。

      //client
      pipeline.addLast("heartbeatHandler", heartbeatHandler);
      
      //server
      pipeline.addLast("serverIdleHandler", serverIdleHandler);

2.2 连接监控

if (switches().isOn(GlobalSwitch.CONN_MONITOR_SWITCH)) {
    if (monitorStrategy == null) {
        connectionMonitor = new DefaultConnectionMonitor(new ScheduledDisconnectStrategy(),
            this.connectionManager);
    } else {
        connectionMonitor = new DefaultConnectionMonitor(monitorStrategy,
            this.connectionManager);
    }
    connectionMonitor.startup();
    logger.warn("Switch on connection monitor");
}

DefaultConnectionMonitor对连接进行监控,其在startup方法中启动一个 ScheduledThreadPoolExecutor ,调用ConnectionMonitorStrategy.monitor方法对连接池进行处理。

@Override
public void startup() throws LifeCycleException {
    super.startup();

    /* initial delay to execute schedule task, unit: ms */
    long initialDelay = ConfigManager.conn_monitor_initial_delay();

    /* period of schedule task, unit: ms*/
    long period = ConfigManager.conn_monitor_period();

    this.executor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(
        "ConnectionMonitorThread", true), new ThreadPoolExecutor.AbortPolicy());
    this.executor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                Map<String, RunStateRecordedFutureTask<ConnectionPool>> connPools = connectionManager
                    .getConnPools();
                strategy.monitor(connPools);
            } catch (Exception e) {
                logger.warn("MonitorTask error", e);
            }
        }
    }, initialDelay, period, TimeUnit.MILLISECONDS);
}

例如ScheduledDisconnectStrategy监控连接池中连接的数量,当数量超过配置指定的长度时,随机关闭连接池中的一个连接。

public void monitor(Map<String, RunStateRecordedFutureTask<ConnectionPool>> connPools) {
    try {
        if (connPools == null || connPools.size() == 0) {
            return;
        }

        for (Map.Entry<String, RunStateRecordedFutureTask<ConnectionPool>> entry : connPools
            .entrySet()) {
            String poolKey = entry.getKey();
            ConnectionPool pool = FutureTaskUtil.getFutureTaskResult(entry.getValue(), logger);

            List<Connection> serviceOnConnections = new ArrayList<Connection>();
            List<Connection> serviceOffConnections = new ArrayList<Connection>();
            for (Connection connection : pool.getAll()) {
                if (isConnectionOn(connection)) {
                    serviceOnConnections.add(connection);
                } else {
                    serviceOffConnections.add(connection);
                }
            }

            if (serviceOnConnections.size() > connectionThreshold) {
                Connection freshSelectConnect = serviceOnConnections.get(random
                    .nextInt(serviceOnConnections.size()));
                freshSelectConnect.setAttribute(Configs.CONN_SERVICE_STATUS,
                    Configs.CONN_SERVICE_STATUS_OFF);
                serviceOffConnections.add(freshSelectConnect);
            } else {
                if (logger.isInfoEnabled()) {
                    logger.info("serviceOnConnections({}) size[{}], CONNECTION_THRESHOLD[{}].",
                        poolKey, serviceOnConnections.size(), connectionThreshold);
                }
            }

            for (Connection offConn : serviceOffConnections) {
                if (offConn.isInvokeFutureMapFinish()) {
                    if (offConn.isFine()) {
                        offConn.close();
                    }
                } else {
                    if (logger.isInfoEnabled()) {
                        logger.info("Address={} won't close at this schedule turn",
                            RemotingUtil.parseRemoteAddress(offConn.getChannel()));
                    }
                }
            }
        }
    } catch (Exception e) {
        logger.error("ScheduledDisconnectStrategy monitor error", e);
    }
}

2.3 连接重连

if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
    reconnectManager = new ReconnectManager(connectionManager);
    reconnectManager.startup();

    connectionEventHandler.setReconnector(reconnectManager);
    logger.warn("Switch on reconnect manager");
}

与监控不同,重连在startup方法中启动了一个Thread,该线程作为一个消费者,处理添加的ReconnectTask

com.alipay.remoting.ReconnectManager#startup
@Override
public void startup() throws LifeCycleException {
    super.startup();

    this.healConnectionThreads = new Thread(new HealConnectionRunner());
    this.healConnectionThreads.start();
}
com.alipay.remoting.ReconnectManager.HealConnectionRunner#run
@Override
public void run() {
    while (isStarted()) {
        long start = -1;
        ReconnectTask task = null;
        try {
            if (this.lastConnectTime < HEAL_CONNECTION_INTERVAL) {
                Thread.sleep(HEAL_CONNECTION_INTERVAL);
            }

            try {
                task = ReconnectManager.this.tasks.take();
            } catch (InterruptedException e) {
                // ignore
            }

            if (task == null) {
                continue;
            }

            start = System.currentTimeMillis();
            if (!canceled.contains(task.url)) {
                task.run();
            } else {
                logger.warn("Invalid reconnect request task {}, cancel list size {}",
                    task.url, canceled.size());
            }
            this.lastConnectTime = System.currentTimeMillis() - start;
        } catch (Exception e) {
            if (start != -1) {
                this.lastConnectTime = System.currentTimeMillis() - start;
            }

            if (task != null) {
                logger.warn("reconnect target: {} failed.", task.url, e);
                tasks.add(task);
            }
        }
    }
}
  • ReconnectTask的创建

    创建由两种情况,一是连接异常断开后,二是重连失败后

    1. 连接异常断开的情况
    com.alipay.remoting.ConnectionEventHandler#channelInactive
    // add reconnect task
    if (this.globalSwitch != null
        && this.globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
        Connection conn = (Connection) attr.get();
        if (reconnectManager != null) {
            reconnectManager.reconnect(conn.getUrl());
        }
    }
  • ReconnectTask的取消

    当客户端强制关闭连接的时候会取消重连,并设置ReconnectManager#canceled变量

    com.alipay.remoting.rpc.RpcClient#closeConnection
    @Override
    public void closeConnection(Url url) {
        if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH) && reconnectManager != null) {
            reconnectManager.disableReconnect(url);
        }
        this.connectionManager.remove(url.getUniqueKey());
    }
    com.alipay.remoting.ReconnectManager#disableReconnect
    @Override
    public void disableReconnect(Url url) {
        canceled.add(url);
    }

2.4 建立连接

回到客户端启动的代码

RequestBody req = new RequestBody(2, "hello world sync");
try {
    String res = (String) client.invokeSync(addr, req, 3000);
    System.out.println("invoke sync result = [" + res + "]");
} catch (RemotingException e) {
    String errMsg = "RemotingException caught in oneway!";
    logger.error(errMsg, e);
    Assert.fail(errMsg);
} catch (InterruptedException e) {
    logger.error("interrupted!");
}

客户端调用invokeSync方法,发起对服务器的访问

com.alipay.remoting.rpc.RpcClient#invokeSync
@Override
public Object invokeSync(final String address, final Object request,
                         final InvokeContext invokeContext, final int timeoutMillis)                                throws RemotingException,InterruptedException {
    return this.rpcRemoting.invokeSync(address, request, invokeContext, timeoutMillis);
}
com.alipay.remoting.rpc.RpcRemoting#invokeSync
public Object invokeSync(final String addr, final Object request,
                             final InvokeContext invokeContext, final int timeoutMillis)
                            throws RemotingException,InterruptedException {
        //调用addressParser解析地址为Url对象
        Url url = this.addressParser.parse(addr);
        return this.invokeSync(url, request, invokeContext, timeoutMillis);
    }

com.alipay.remoting.rpc.RpcClientRemoting#invokeSync    
@Override
public Object invokeSync(Url url, Object request, InvokeContext invokeContext, int timeoutMillis) throws RemotingException,InterruptedException {
    //1.创建连接对象
    final Connection conn = getConnectionAndInitInvokeContext(url, invokeContext);
    //2.检查连接对象
    this.connectionManager.check(conn);
    //3.触发调用
    return this.invokeSync(conn, request, invokeContext, timeoutMillis);
}
  • 创建连接对象

    com.alipay.remoting.rpc.RpcClientRemoting#getConnectionAndInitInvokeContext
    protected Connection getConnectionAndInitInvokeContext(Url url, InvokeContext invokeContext)
                                                                                                throws RemotingException,
                                                                                                InterruptedException {
        long start = System.currentTimeMillis();
        Connection conn;
        try {
            //调用ConnectManager创建Connection对象
            conn = this.connectionManager.getAndCreateIfAbsent(url);
        } finally {
            if (null != invokeContext) {
                invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATETIME,
                    (System.currentTimeMillis() - start));
            }
        }
        return conn;
    }
    com.alipay.remoting.DefaultConnectionManager#getAndCreateIfAbsent
    @Override
    public Connection getAndCreateIfAbsent(Url url) throws InterruptedException, RemotingException {
        // 1. 创建连接池,后面可以看到poolkey就是Url的uniqueKey
        ConnectionPool pool = this.getConnectionPoolAndCreateIfAbsent(url.getUniqueKey(),
            new ConnectionPoolCall(url));
        if (null != pool) {
            //2. 获取连接
            return pool.get();
        } else {
            logger.error("[NOTIFYME] bug detected! pool here must not be null!");
            return null;
        }
    }
    • 获取连接可以参考

    • 创建连接池

      sofa-bolt源码阅读(2)-客户端的启动

      com.alipay.remoting.DefaultConnectionManager#getConnectionPoolAndCreateIfAbsent
      private ConnectionPool getConnectionPoolAndCreateIfAbsent(String poolKey,
                                                                Callable<ConnectionPool> callable)throws RemotingException,InterruptedException {
          RunStateRecordedFutureTask<ConnectionPool> initialTask;
          ConnectionPool pool = null;
      
          int retry = Constants.DEFAULT_RETRY_TIMES;
      
          int timesOfResultNull = 0;
          int timesOfInterrupt = 0;
      
          for (int i = 0; (i < retry) && (pool == null); ++i) {
              initialTask = this.connTasks.get(poolKey);
              if (null == initialTask) {
                  RunStateRecordedFutureTask<ConnectionPool> newTask = new RunStateRecordedFutureTask<ConnectionPool>(
                      callable);
                  initialTask = this.connTasks.putIfAbsent(poolKey, newTask);
                  if (null == initialTask) {
                      initialTask = newTask;
                      //运行task,实际调用的是ConnectionPoolCall.call()
                      initialTask.run();
                  }
              }
      
              try {
                  pool = initialTask.get();
                  ...
              } catch (ExecutionException e) {
                  ...
      
              }
          }
          return pool;
      }
      com.alipay.remoting.DefaultConnectionManager.ConnectionPoolCall#call
      @Override
      public ConnectionPool call() throws Exception {
          final ConnectionPool pool = new ConnectionPool(connectionSelectStrategy);
          if (whetherInitConnection) {
              try {
                  //初始化连接池
                  doCreate(this.url, pool, this.getClass().getSimpleName(), 1);
              } catch (Exception e) {
                  pool.removeAllAndTryClose();
                  throw e;
              }
          }
          return pool;
      }

      doCreate方法创建连接池中连接对象,可以同步和异步创建部分连接对象

      private void doCreate(final Url url, final ConnectionPool pool, final String taskName,final int syncCreateNumWhenNotWarmup) throws RemotingException {
          final int actualNum = pool.size();
          final int expectNum = url.getConnNum();
          if (actualNum >= expectNum) {
              return;
          }
          if (logger.isDebugEnabled()) {
              logger.debug("actual num {}, expect num {}, task name {}", actualNum, expectNum,
                  taskName);
          }
          if (url.isConnWarmup()) {
              for (int i = actualNum; i < expectNum; ++i) {
                  Connection connection = create(url);
                  pool.add(connection);
              }
          } else {
              if (syncCreateNumWhenNotWarmup < 0 || syncCreateNumWhenNotWarmup > url.getConnNum()) {
                  throw new IllegalArgumentException(
                      "sync create number when not warmup should be [0," + url.getConnNum() + "]");
              }
              // 同步创建对象
              if (syncCreateNumWhenNotWarmup > 0) {
                  for (int i = 0; i < syncCreateNumWhenNotWarmup; ++i) {
                      Connection connection = create(url);
                      pool.add(connection);
                  }
                  if (syncCreateNumWhenNotWarmup >= url.getConnNum()) {
                      return;
                  }
              }
      
              pool.markAsyncCreationStart();// mark the start of async
              try {
                  //异步创建连接对象
                  this.asyncCreateConnectionExecutor.execute(new Runnable() {
                      @Override
                      public void run() {
                          try {
                              for (int i = pool.size(); i < url.getConnNum(); ++i) {
                                  Connection conn = null;
                                  try {
                                      conn = create(url);
                                  } catch (RemotingException e) {
      
                                  }
                                  pool.add(conn);
                              }
                          } finally {
                              pool.markAsyncCreationDone();// mark the end of async
                          }
                      }
                  });
              } catch (RejectedExecutionException e) {
                  pool.markAsyncCreationDone();// mark the end of async when reject
                  throw e;
              }
          } // end of NOT warm up
      }
    • 创建连接对象

      通过连接工厂类创建连接对象

      com.alipay.remoting.DefaultConnectionManager#create
      public Connection create(Url url) throws RemotingException {
          Connection conn;
          try {
              conn = this.connectionFactory.createConnection(url);
          } catch (Exception e) {
              throw new RemotingException("Create connection failed. The address is "
                                          + url.getOriginUrl(), e);
          }
          return conn;
      }
      com.alipay.remoting.connection.AbstractConnectionFactory#createConnection
      @Override
      public Connection createConnection(Url url) throws Exception {
          Channel channel = doCreateConnection(url.getIp(), url.getPort(), url.getConnectTimeout());
          Connection conn = new Connection(channel, ProtocolCode.fromBytes(url.getProtocol()),
              url.getVersion(), url);
          channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
          return conn;
      }
      com.alipay.remoting.connection.AbstractConnectionFactory#doCreateConnection
      protected Channel doCreateConnection(String targetIP, int targetPort, int connectTimeout)
                                                                                               throws Exception {
          // prevent unreasonable value, at least 1000
          connectTimeout = Math.max(connectTimeout, 1000);
          String address = targetIP + ":" + targetPort;
          if (logger.isDebugEnabled()) {
              logger.debug("connectTimeout of address [{}] is [{}].", address, connectTimeout);
          }
          bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
          ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort));
      
          future.awaitUninterruptibly();
          ...
          return future.channel();
      }
  • 检查连接对象

    1. 检查connection存在
    2. 检查通道存活
    3. 检查通道可写
    com.alipay.remoting.DefaultConnectionManager#check
    public void check(Connection connection) throws RemotingException {
        if (connection == null) {
            throw new RemotingException("Connection is null when do check!");
        }
        if (connection.getChannel() == null || !connection.getChannel().isActive()) {
            this.remove(connection);
            throw new RemotingException("Check connection failed for address: "
                                        + connection.getUrl());
        }
        if (!connection.getChannel().isWritable()) {
            // No remove. Most of the time it is unwritable temporarily.
            throw new RemotingException("Check connection failed for address: "
                                        + connection.getUrl() + ", maybe write overflow!");
        }
    }
  • 触发调用

    public Object invokeSync(final Connection conn, final Object request,
                             final InvokeContext invokeContext, final int timeoutMillis)
                      throws RemotingException,InterruptedException {
        RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext,
            timeoutMillis);
        preProcessInvokeContext(invokeContext, requestCommand, conn);
        ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand,
            timeoutMillis);
        responseCommand.setInvokeContext(invokeContext);
    
        Object responseObject = RpcResponseResolver.resolveResponseObject(responseCommand,
            RemotingUtil.parseRemoteAddress(conn.getChannel()));
        return responseObject;
    }
    1. 将请求转为requestCommand
    2. 预处理调用上下文
    3. 发送请求
    4. 获取responseCommand并返回

    发送请求是将requestCommand写入到通道里

    com.alipay.remoting.BaseRemoting#invokeSync
    protected RemotingCommand invokeSync(final Connection conn, final RemotingCommand request,final int timeoutMillis) throws RemotingException, InterruptedException {
        final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());
        conn.addInvokeFuture(future);
        final int requestId = request.getId();
        try {
            conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                   ...
                }
            });
        } catch (Exception e) {
           ...
        }
        RemotingCommand response = future.waitResponse(timeoutMillis);
    
        if (response == null) {
            conn.removeInvokeFuture(requestId);
            response = this.commandFactory.createTimeoutResponse(conn.getRemoteAddress());
            logger.warn("Wait response, request id={} timeout!", requestId);
        }
    
        return response;
    }

相关推荐