thrift 连接池
简介
Thrift是Facebook的核心框架之一,使不同的开发语言开发的系统可以通过该框架实现彼此的通信,类似于webservice,但是Thrift提供了近乎变态的效率和开发的方便性,是webservice所不能比拟的。给分布式开发带来了极大的方便。但是这柄利器也有一些不完美。
问题
首先文档相当的少,只有一个wiki网站提供相应的帮助。这对于Thrift的推广极为不利。
其次框架本身实现有一些缺陷,就Thrift的java部分来说,没有提供连接池的支持,对RPC的调用效率有所影响。
对于文档稀少的问题,只能是通过一些Thrift的开发者和使用者多供献一些自己的心得来解决。这得需要一个过程。而连接池的问题的解决则可以快速一些。
提到池一般做过Java开发的肯定会想到ObjectPool,ApacheCommons项目确实给我们的开发得来了很大的便利性,其中的pool项目正是我们实现thrift连接池的基础。
实现
一,定义thrift连接池接口
ConnectionProvider
* * @(#)ConnectionProvider.java 0.1 05/11/17 * */ package com.qidea.thrift.pool; import org.apache.thrift.transport.TSocket; public interface ConnectionProvider { /** * 取链接池中的一个链接 * * @return */ public TSocket getConnection(); /** * 返回链接 * * @param socket */ public void returnCon(TSocket socket); }
二,实现连接池
GenericConnectionProvider
/* * @(#)DefaultConnectionProviderImpl.java 0.1 05/11/17 * * Copyright 2010 QISI, Inc. All rights reserved. * QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */ package com.qidea.thrift.pool; import org.apache.commons.pool.ObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.thrift.transport.TSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; public class GenericConnectionProvider implements ConnectionProvider, InitializingBean, DisposableBean { public static final Logger logger = LoggerFactory .getLogger(GenericConnectionProvider.class); /** 服务的IP地址 */ private String serviceIP; /** 服务的端口 */ private int servicePort; /** 连接超时配置 */ private int conTimeOut; /** 可以从缓存池中分配对象的最大数量 */ private int maxActive = GenericObjectPool.DEFAULT_MAX_ACTIVE; /** 缓存池中最大空闲对象数量 */ private int maxIdle = GenericObjectPool.DEFAULT_MAX_IDLE; /** 缓存池中最小空闲对象数量 */ private int minIdle = GenericObjectPool.DEFAULT_MIN_IDLE; /** 阻塞的最大数量 */ private long maxWait = GenericObjectPool.DEFAULT_MAX_WAIT; /** 从缓存池中分配对象,是否执行PoolableObjectFactory.validateObject方法 */ private boolean testOnBorrow = GenericObjectPool.DEFAULT_TEST_ON_BORROW; private boolean testOnReturn = GenericObjectPool.DEFAULT_TEST_ON_RETURN; private boolean testWhileIdle = GenericObjectPool.DEFAULT_TEST_WHILE_IDLE; /** 对象缓存池 */ private ObjectPool objectPool = null; /** * */ @Override public void afterPropertiesSet() throws Exception { // 对象池 objectPool = new GenericObjectPool(); // ((GenericObjectPool) objectPool).setMaxActive(maxActive); ((GenericObjectPool) objectPool).setMaxIdle(maxIdle); ((GenericObjectPool) objectPool).setMinIdle(minIdle); ((GenericObjectPool) objectPool).setMaxWait(maxWait); ((GenericObjectPool) objectPool).setTestOnBorrow(testOnBorrow); ((GenericObjectPool) objectPool).setTestOnReturn(testOnReturn); ((GenericObjectPool) objectPool).setTestWhileIdle(testWhileIdle); ((GenericObjectPool) objectPool) .setWhenExhaustedAction(GenericObjectPool.WHEN_EXHAUSTED_BLOCK); // 设置factory ThriftPoolableObjectFactory thriftPoolableObjectFactory = new ThriftPoolableObjectFactory( serviceIP, servicePort, conTimeOut); objectPool.setFactory(thriftPoolableObjectFactory); } @Override public void destroy() { try { objectPool.close(); } catch (Exception e) { throw new RuntimeException("erorr destroy()", e); } } @Override public TSocket getConnection() { try { TSocket socket = (TSocket) objectPool.borrowObject(); return socket; } catch (Exception e) { throw new RuntimeException("error getConnection()", e); } } @Override public void returnCon(TSocket socket) { try { objectPool.returnObject(socket); } catch (Exception e) { throw new RuntimeException("error returnCon()", e); } } public String getServiceIP() { return serviceIP; } public void setServiceIP(String serviceIP) { this.serviceIP = serviceIP; } public int getServicePort() { return servicePort; } public void setServicePort(int servicePort) { this.servicePort = servicePort; } public int getConTimeOut() { return conTimeOut; } public void setConTimeOut(int conTimeOut) { this.conTimeOut = conTimeOut; } public int getMaxActive() { return maxActive; } public void setMaxActive(int maxActive) { this.maxActive = maxActive; } public int getMaxIdle() { return maxIdle; } public void setMaxIdle(int maxIdle) { this.maxIdle = maxIdle; } public int getMinIdle() { return minIdle; } public void setMinIdle(int minIdle) { this.minIdle = minIdle; } public long getMaxWait() { return maxWait; } public void setMaxWait(long maxWait) { this.maxWait = maxWait; } public boolean isTestOnBorrow() { return testOnBorrow; } public void setTestOnBorrow(boolean testOnBorrow) { this.testOnBorrow = testOnBorrow; } public boolean isTestOnReturn() { return testOnReturn; } public void setTestOnReturn(boolean testOnReturn) { this.testOnReturn = testOnReturn; } public boolean isTestWhileIdle() { return testWhileIdle; } public void setTestWhileIdle(boolean testWhileIdle) { this.testWhileIdle = testWhileIdle; } public ObjectPool getObjectPool() { return objectPool; } public void setObjectPool(ObjectPool objectPool) { this.objectPool = objectPool; } }
ThriftPoolableObjectFactory
ThriftPoolableObjectFactory /* * @(#)ThriftPoolableObjectFactory.java 0.1 05/11/17 */ package com.qidea.thrift.pool; import org.apache.commons.pool.PoolableObjectFactory; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ThriftPoolableObjectFactory implements PoolableObjectFactory { /** 日志记录器 */ public static final Logger logger = LoggerFactory .getLogger(ThriftPoolableObjectFactory.class); /** 服务的IP */ private String serviceIP; /** 服务的端口 */ private int servicePort; /** 超时设置 */ private int timeOut; /** * * @param serviceIP * @param servicePort * @param timeOut */ public ThriftPoolableObjectFactory(String serviceIP, int servicePort, int timeOut) { this.serviceIP = serviceIP; this.servicePort = servicePort; this.timeOut = timeOut; } @Override public void destroyObject(Object arg0) throws Exception { if (arg0 instanceof TSocket) { TSocket socket = (TSocket) arg0; if (socket.isOpen()) { socket.close(); } } } /** * */ @Override public Object makeObject() throws Exception { try { TTransport transport = new TSocket(this.serviceIP, this.servicePort, this.timeOut); transport.open(); return transport; } catch (Exception e) { logger.error("error ThriftPoolableObjectFactory()", e); throw new RuntimeException(e); } } @Override public boolean validateObject(Object arg0) { try { if (arg0 instanceof TSocket) { TSocket thriftSocket = (TSocket) arg0; if (thriftSocket.isOpen()) { return true; } else { return false; } } else { return false; } } catch (Exception e) { return false; } } @Override public void passivateObject(Object arg0) throws Exception { // DO NOTHING } @Override public void activateObject(Object arg0) throws Exception { // DO NOTHING } public String getServiceIP() { return serviceIP; } public void setServiceIP(String serviceIP) { this.serviceIP = serviceIP; } public int getServicePort() { return servicePort; } public void setServicePort(int servicePort) { this.servicePort = servicePort; } public int getTimeOut() { return timeOut; } public void setTimeOut(int timeOut) { this.timeOut = timeOut; } }
三,定义连接的管理类
ConnectionManager
/* * @(#)ConnectionManager.java 0.1 05/11/17 * * Copyright 2010 QISI, Inc. All rights reserved. * QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */ package com.qidea.thrift.pool; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; import org.apache.thrift.transport.TSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author sunwei * @version 2010-8-10 * @since JDK1.5 */ public class ConnectionManager implements MethodInterceptor { /** 日志记录器 */ public Logger logger = LoggerFactory.getLogger(ConnectionManager.class); /** 保存local对象 */ ThreadLocal<TSocket> socketThreadSafe = new ThreadLocal<TSocket>(); /** 连接提供池 */ public ConnectionProvider connectionProvider; @Override public Object invoke(MethodInvocation arg0) throws Throwable { TSocket socket = null; try { socket = connectionProvider.getConnection(); socketThreadSafe.set(socket); Object ret = arg0.proceed(); return ret; } catch (Exception e) { logger.error("error ConnectionManager.invoke()", e); throw new Exception(e); } finally { connectionProvider.returnCon(socket); socketThreadSafe.remove(); } } /** * 取socket * * @return */ public TSocket getSocket() { return socketThreadSafe.get(); } public ConnectionProvider getConnectionProvider() { return connectionProvider; } public void setConnectionProvider(ConnectionProvider connectionProvider) { this.connectionProvider = connectionProvider; } }
相关推荐
huacuilaifa 2020-10-29
温攀峰 2020-08-17
幸运小侯子 2020-08-14
dongCSDN 2020-06-28
一恍过去 2020-06-26
qingmuluoyang 2020-06-26
jameszgw 2020-06-25
Rain 2020-06-25
MissFuTT 2020-06-16
标题无所谓 2020-06-14
xclxcl 2020-06-13
onlypersevere 2020-06-13
dongCSDN 2020-06-09
llltaotao 2020-06-03
GavinZhera 2020-06-03
langyue 2020-05-31
牧场SZShepherd 2020-05-27
geek00 2020-05-27
zhaolisha 2020-05-16