httpclient源码分析之 PoolingHttpClientConnectionManager 获取连接 (转)

PoolingHttpClientConnectionManager是一个HttpClientConnection的连接池,可以为多线程提供并发请求服务。主要作用就是分配连接,回收连接等。同一个route的请求,会优先使用连接池提供的空闲长连接。

源码版本4.5.2,因为代码太多,很多不是自己关心的,为免看起来费力,这里代码贴的不全。省略代码的地方用省略号标注。

配置说明

<bean id="ky.pollingConnectionManager" class="org.apache.http.impl.conn.PoolingHttpClientConnectionManager">
        <!--整个连接池的最大连接数 -->
        <property name="maxTotal" value="1000" />
        <!--每个route默认的连接数-->
        <property name="defaultMaxPerRoute" value="32" />
    </bean>
  • maxTotal 是整个连接池的最大连接数
  • defaultMaxPerRoute 是每个route默认的最大连接数
  • setMaxPerRoute(final HttpRoute route, final int max) route的最大连接数,优先于defaultMaxPerRoute。
public PoolingHttpClientConnectionManager(
        final HttpClientConnectionOperator httpClientConnectionOperator,
        final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory,
        final long timeToLive, final TimeUnit tunit) {
        super();
        this.configData = new ConfigData();
        //defaultMaxPerRoute默认为2,maxTotal默认为20
        this.pool = new CPool(new InternalConnectionFactory(
                this.configData, connFactory), 2, 20, timeToLive, tunit);
        //validateAfterInactivity 空闲永久连接检查间隔,这个牵扯的还比较多
        //官方推荐使用这个来检查永久链接的可用性,而不推荐每次请求的时候才去检查        
        this.pool.setValidateAfterInactivity(2000);
        this.connectionOperator = Args.notNull(httpClientConnectionOperator, "HttpClientConnectionOperator");
        this.isShutDown = new AtomicBoolean(false);
    }

获取连接

获取连接分两步,首先新建一个ConnectionRequest,在通过request.get得到HttpClientConnection。

//org.apache.http.impl.conn.PoolingHttpClientConnectionManager
@Override
    public ConnectionRequest requestConnection(
            final HttpRoute route,
            final Object state) {
        ......
        
        //从连接池中获取一个CPoolEntry(Connection的包装类)
        final Future<CPoolEntry> future = this.pool.lease(route, state, null);
        return new ConnectionRequest() {
			......

            // ConnectionRequest的get方法。调用leaseConnection方法,并且传入future(CPoolEntry的封装(connection的封装))
            @Override
            public HttpClientConnection get(
                    final long timeout,
                    final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
                return leaseConnection(future, timeout, tunit);
            }
        };
    }


    protected HttpClientConnection leaseConnection(
            final Future<CPoolEntry> future,
            final long timeout,
            final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
        final CPoolEntry entry;
        try {
            //从future中get
            entry = future.get(timeout, tunit);
            if (entry == null || future.isCancelled()) {
                throw new InterruptedException();
            }
            Asserts.check(entry.getConnection() != null, "Pool entry with no connection");

            //封装返回
            return CPoolProxy.newProxy(entry);
        } catch (final TimeoutException ex) {
            throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
        }
    }

所以,CPoolEntry(ManagedHttpClientConnection的封装),实际是调用PoolingHttpClientConnectionManager的leaseConnection,通过future的get获得。

这里的future是Future future = this.pool.lease(route, state, null);得到的。

//org.apache.http.pool.AbstractConnPool
    @Override
    public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
        ......

        return new PoolEntryFuture<E>(this.lock, callback) {
            @Override
            public E getPoolEntry(
                    final long timeout,
                    final TimeUnit tunit)
                        throws InterruptedException, TimeoutException, IOException {
                //阻塞获取CPoolEntry        
                final E entry = getPoolEntryBlocking(route, state, timeout, tunit, this);
                onLease(entry);
                return entry;
            }

        };
    }

    private E getPoolEntryBlocking(
            final T route, final Object state,
            final long timeout, final TimeUnit tunit,
            final PoolEntryFuture<E> future)
                throws IOException, InterruptedException, TimeoutException {
        //设置超时时间点        
        ......

        //串行操作
        this.lock.lock();
        try {
            //每一个route都有一个连接池,这里获取指定route的连接池
            final RouteSpecificPool<T, C, E> pool = getPool(route);
            E entry = null;
            //循环取,直到超时
            while (entry == null) {
                Asserts.check(!this.isShutDown, "Connection pool shut down");
                for (;;) {
                    //从连接池中去一个空闲的连接,优先取state相同的。state默认是null
                    entry = pool.getFree(state);
                    //如果没有符合的连接,则调出,创建一个新连接
                    if (entry == null) {
                        break;
                    }
                    //如果连接超时,则关闭
                    if (entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                    //如果是永久连接,且最近周期内没有检验,则校验连接是否可用。不可用的连接需要关闭    
                    } else if (this.validateAfterInactivity > 0) {
                        if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) {
                            if (!validate(entry)) {
                                entry.close();
                            }
                        }
                    }
                    //如果连接已经关闭了,则释放掉,继续从池子中取符合条件的连接
                    if (entry.isClosed()) {
                        this.available.remove(entry);
                        pool.free(entry, false);
                    } else {
                        break;
                    }
                }
                //entry不为空,则修改连接池的参数,并返回。
                if (entry != null) {
                    this.available.remove(entry);
                    this.leased.add(entry);
                    onReuse(entry);
                    return entry;
                }

                // New connection is needed
                //获取池子的最大连接数,如果池子已经超过容量了,需要把超过的资源回收
                //如果池子中连接数没有超,空闲的连接还比较多,就先从别人的池子里借一个来用
                 ......

                //不能借,就自己动手了。新建并返回。
                final C conn = this.connFactory.create(route);
                entry = pool.add(conn);
                this.leased.add(entry);
                return entry;  
            }
            throw new TimeoutException("Timeout waiting for connection");
        } finally {
            this.lock.unlock();
        }
    }

读到这里,看起来拿到的entry要么是刚刚创建的热乎的,要么是没有过期的连接,要么是复用的池子中有效的永久连接。是这样的吗?再看一下复用的永久连接的情况:

//如果往前validateAfterInactivity ms之内没有校验,则校验entry。校验不通过则关闭并释放,继续从连接池中获取entry。
//如果往前validateAfterInactivity ms之内有过校验,则无需再次校验
 if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) {
     if (!validate(entry)) {
         entry.close();
     }
 }

判断连接是否可用:

//org.apache.http.impl.conn.CPool
    protected boolean validate(final CPoolEntry entry) {
        return !entry.getConnection().isStale();
    }

	//org.apache.http.impl.BHttpConnectionBase
	//判断连接是否不可用(go down)
    public boolean isStale() {
	    //没有打开,即socket为空,则不可用
        if (!isOpen()) {
            return true;
        }
        try {
        //socket链路有了,测试链路是否可用
        //这里的测试方法是查看很短的时间内(这里是1ms),是否可以从输入流中读到数据
        //如果测试结果返回-1说明不可用
            final int bytesRead = fillInputBuffer(1);
            return bytesRead < 0;
        } catch (final SocketTimeoutException ex) {
	        //注意这里SocketTimeoutException时,认为是可用的
            return false;
        } catch (final IOException ex) {
            //有I/O异常,不可用
            return true;
        }
    }

了解下测试连接是否可用的过程,梳理一下调用链路:

  • org.apache.http.impl.BHttpConnectionBase
    private int fillInputBuffer(final int timeout) throws IOException 不处理异常

  • org.apache.http.impl.io.SessionInputBufferImpl
    public int fillBuffer() throws IOException 不处理异常

  • org.apache.http.impl.conn.LoggingInputStream
    打印日志,不处理异常

@Override
    public int read(final byte[] b, final int off, final int len) throws IOException {
        try {
            final int bytesRead = in.read(b, off, len);
            if (bytesRead == -1) {
                wire.input("end of stream");
            } else if (bytesRead > 0) {
                wire.input(b, off, bytesRead);
            }
            return bytesRead;
        } catch (final IOException ex) {
            wire.input("[read] I/O error: " + ex.getMessage());
            throw ex;
        }
    }
  • sun.security.ssl.AppInputStream
    public synchronized int read(byte[] var1, int var2, int var3) throws IOException 不处理异常

  • sun.security.ssl.SSLSocketImpl
    void readDataRecord(InputRecord var1) throws IOException 不处理异常

  • sun.security.ssl.InputRecord
    void read(InputStream var1, OutputStream var2) throws IOException

  • java.net.SocketInputStream
    int read(byte b[], int off, int length, int timeout) throws IOException
    通过socket读取数据,如果发生ConnectionResetException异常,则throw new SocketException("Connection reset");

以上,是对PoolingHttpClientConnectionManager从连接池中获取一个连接给用户的过程。用户拿到的连接有三种:新创建的;未过期的短连接;间隔检查的永久链接。
需要注意,间隔检查的永久链接 如果在间隔时间(这里是2s)内,socket连接出现什么问题,是不知道的,因为没有进行检测。另外,检查链接是否可用的方法 isStale ,并不是100%靠谱的,即检测时出现SocketTimeoutException时,认为是可用的。而这时候,很有可能连接不可用,比如服务端关闭链接的情况。

出处:https://www.cnblogs.com/shoren/p/httpclient-leaseConnection.html

其他: https://blog.csdn.net/u013905744/article/details/94714696

https://stackoverflow.com/questions/13837012/spring-resttemplate-timeout       

https://www.cnblogs.com/coderjinjian/p/9644923.html

https://www.cnblogs.com/shoren/p/httpclient-leaseConnection.html

https://www.cnblogs.com/shoren/p/http-connection.html

https://blog.csdn.net/u014133299/article/details/80676147

  https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-resttemplate

相关推荐