vertx实现redis版session共享
现在越来越流行微服务架构了,提到微服务架构的话,大家能想到的是spring boot和vertx吧!前者大家听的比交多些,但是今天我给大家分享的是后者vertx。想要了解更多请阅读vertx官网http://vertx.io/docs/vertx-we...
废话不多说了,直接进主题。今天分享的是vertx web里的session共享问题。在公司我用vertx开发了一个web平台,但是需要防止宕机无法继续提供服务这种情况,所以部署了两台机器,这里就开始涉及到了session共享了。为了性能考虑,我就想把session放入redis里来达到目的,可是在vertx官网没有这种实现,当时我就用Hazelcast(网友说,性能不怎么好)将就先用着。前几天我抽时间看了底层代码,自己动手封装了下,将session放入redis里。github地址: https://github.com/robin0909/...
原生vertx session 设计
下面给出 LocalSessionStoreImpl 和 ClusteredSessionStoreImpl 的结构关系:
LocalSession:
ClusteredSession:
从上面的结构中我们能找到一个继承实现关系,顶级接口是SessionStore,
而SessionStore是什么接口呢?在vertx里,session有一个专门的设计,这里的SessionStore就是专门为存储session而定义接口,看看这个接口里定义了哪些方法吧!
public interface SessionStore { //主要在分布式session共享时会用到的属性,从store里获取session的重试时间 long retryTimeout(); Session createSession(long timeout); //根据sessionId从store里获取Session void get(String id, Handler<AsyncResult<@Nullable Session>> resultHandler); //删除 void delete(String id, Handler<AsyncResult<Boolean>> resultHandler); //增加session void put(Session session, Handler<AsyncResult<Boolean>> resultHandler); //清空 void clear(Handler<AsyncResult<Boolean>> resultHandler); //store的size void size(Handler<AsyncResult<Integer>> resultHandler); //关闭,释放资源操作 void close(); }
上面很多会用到有一个属性,就是sessionId(id)。在session机制里,还需要依靠浏览器端的cookie。当服务器端session生成后,服务器会在cookie里设置一个vertx-web.session=4d9db69d-7577-4b17-8a66-4d6a2472cd33 返回给浏览器。想必大家也看出来了,就是一个uuid码,也就是sessionId。
接下来,我们可以看下二级子接口。二级子接口的作用,其实很简单,直接上代码,大家就懂了。
public interface LocalSessionStore extends SessionStore { long DEFAULT_REAPER_INTERVAL = 1000; String DEFAULT_SESSION_MAP_NAME = "vertx-web.sessions"; static LocalSessionStore create(Vertx vertx) { return new LocalSessionStoreImpl(vertx, DEFAULT_SESSION_MAP_NAME, DEFAULT_REAPER_INTERVAL); } static LocalSessionStore create(Vertx vertx, String sessionMapName) { return new LocalSessionStoreImpl(vertx, sessionMapName, DEFAULT_REAPER_INTERVAL); } static LocalSessionStore create(Vertx vertx, String sessionMapName, long reaperInterval) { return new LocalSessionStoreImpl(vertx, sessionMapName, reaperInterval); } }
这里主要为了方面在使用和构造时很优雅,router.route().handler(SessionHandler.create(LocalSessionStore.create(vertx))); 有点类似工厂,创造对象。在这个接口里,也可以初始化一些专有参数。所以没有什么难度。
对官方代码我们也理解的差不多了,接下来开始动手封装自己的RedisSessionStore吧!
自己的RedisSessionStore封装
首先我们定义一个RedisSessionStore接口, 接口继承SessionStore接口。
/** * Created by robinyang on 2017/3/13. */ public interface RedisSessionStore extends SessionStore { long DEFAULT_RETRY_TIMEOUT = 2 * 1000; String DEFAULT_SESSION_MAP_NAME = "vertx-web.sessions"; static RedisSessionStore create(Vertx vertx) { return new RedisSessionStoreImpl(vertx, DEFAULT_SESSION_MAP_NAME, DEFAULT_RETRY_TIMEOUT); } static RedisSessionStore create(Vertx vertx, String sessionMapName) { return new RedisSessionStoreImpl(vertx, sessionMapName, DEFAULT_RETRY_TIMEOUT); } static RedisSessionStore create(Vertx vertx, String sessionMapName, long reaperInterval) { return new RedisSessionStoreImpl(vertx, sessionMapName, reaperInterval); } RedisSessionStore host(String host); RedisSessionStore port(int port); RedisSessionStore auth(String pwd); }
接着创建一个RedisSessionStoreImpl类, 这里我先给出一个已经写好的RedisSessionStoreImpl, 稍后解释。
public class RedisSessionStoreImpl implements RedisSessionStore { private static final Logger logger = LoggerFactory.getLogger(RedisSessionStoreImpl.class); private final Vertx vertx; private final String sessionMapName; private final long retryTimeout; private final LocalMap<String, Session> localMap; //默认值 private String host = "localhost"; private int port = 6379; private String auth; RedisClient redisClient; // 清除所有时使用 private List<String> localSessionIds; public RedisSessionStoreImpl(Vertx vertx, String defaultSessionMapName, long retryTimeout) { this.vertx = vertx; this.sessionMapName = defaultSessionMapName; this.retryTimeout = retryTimeout; localMap = vertx.sharedData().getLocalMap(sessionMapName); localSessionIds = new Vector<>(); redisManager(); } @Override public long retryTimeout() { return retryTimeout; } @Override public Session createSession(long timeout) { return new SessionImpl(new PRNG(vertx), timeout, DEFAULT_SESSIONID_LENGTH); } @Override public Session createSession(long timeout, int length) { return new SessionImpl(new PRNG(vertx), timeout, length); } @Override public void get(String id, Handler<AsyncResult<Session>> resultHandler) { redisClient.getBinary(id, res->{ if(res.succeeded()) { Buffer buffer = res.result(); if(buffer != null) { SessionImpl session = new SessionImpl(new PRNG(vertx)); session.readFromBuffer(0, buffer); resultHandler.handle(Future.succeededFuture(session)); } else { resultHandler.handle(Future.succeededFuture(localMap.get(id))); } } else { resultHandler.handle(Future.failedFuture(res.cause())); } }); } @Override public void delete(String id, Handler<AsyncResult<Boolean>> resultHandler) { redisClient.del(id, res->{ if (res.succeeded()) { localSessionIds.remove(id); resultHandler.handle(Future.succeededFuture(true)); } else { resultHandler.handle(Future.failedFuture(res.cause())); logger.error("redis里删除sessionId: {} 失败", id, res.cause()); } }); } @Override public void put(Session session, Handler<AsyncResult<Boolean>> resultHandler) { //put 之前判断session是否存在,如果存在的话,校验下 redisClient.getBinary(session.id(), res1->{ if (res1.succeeded()) { //存在数据 if(res1.result()!=null) { Buffer buffer = res1.result(); SessionImpl oldSession = new SessionImpl(new PRNG(vertx)); oldSession.readFromBuffer(0, buffer); SessionImpl newSession = (SessionImpl)session; if(oldSession.version() != newSession.version()) { resultHandler.handle(Future.failedFuture("Version mismatch")); return; } newSession.incrementVersion(); writeSession(session, resultHandler); } else { //不存在数据 SessionImpl newSession = (SessionImpl)session; newSession.incrementVersion(); writeSession(session, resultHandler); } } else { resultHandler.handle(Future.failedFuture(res1.cause())); } }); } private void writeSession(Session session, Handler<AsyncResult<Boolean>> resultHandler) { Buffer buffer = Buffer.buffer(); SessionImpl sessionImpl = (SessionImpl)session; //将session序列化到 buffer里 sessionImpl.writeToBuffer(buffer); SetOptions setOptions = new SetOptions().setPX(session.timeout()); redisClient.setBinaryWithOptions(session.id(), buffer, setOptions, res->{ if (res.succeeded()) { logger.debug("set key: {} ", session.data()); localSessionIds.add(session.id()); resultHandler.handle(Future.succeededFuture(true)); } else { resultHandler.handle(Future.failedFuture(res.cause())); } }); } @Override public void clear(Handler<AsyncResult<Boolean>> resultHandler) { localSessionIds.stream().forEach(id->{ redisClient.del(id, res->{ //如果在localSessionIds里存在,但是在redis里过期不存在了, 只要通知下就行 localSessionIds.remove(id); }); }); resultHandler.handle(Future.succeededFuture(true)); } @Override public void size(Handler<AsyncResult<Integer>> resultHandler) { resultHandler.handle(Future.succeededFuture(localSessionIds.size())); } @Override public void close() { redisClient.close(res->{ logger.debug("关闭 redisClient "); }); } private void redisManager() { RedisOptions redisOptions = new RedisOptions(); redisOptions.setHost(host).setPort(port).setAuth(auth); redisClient = RedisClient.create(vertx, redisOptions); } @Override public RedisSessionStore host(String host) { this.host = host; return this; } @Override public RedisSessionStore port(int port) { this.port = port; return this; } @Override public RedisSessionStore auth(String pwd) { this.auth = pwd; return this; } }
首先,从get()和put()这两个方法开始,这两方法比较核心。
get(), 创建Cookie的时候会生成一个uuid,用这个id取session,第一次我们发现无法取到, 第56行代码就会根据这个id去生成一个session。
每次发送请求的时候,我们都会重置session过期时间,所以每次get完后,返回给浏览器之前都会有一个put操作,也就是更新数据。这里的put就稍微复杂一点点,在put之前,我们需要先根据传过来的session里的id从redis里取到session。如果获取不到,说明之前通过get获取的session不是同一个对象,就出异常,这就相当于设置了一道安全的门槛吧!当获取到了,再比较两个session的版本是不是一致的,如果不一致,说明session被破环了,算是第二个安全门槛设置吧!都没有问题了,就可以put session了,并且重新设置时间。
这里依赖vertx提供的redisClient来操作数据的,所以我们必须引入这个依赖:io.vertx:vertx-redis-client:3.4.1 。
接下来还有一点需要提的是序列化问题。这里我使用的是vertx封装的一种序列化,将数据序列化到Buffer里,而SessiomImpl类里又已经实现好了序列化,从SessionImple序列化成Buffer和Buffer反序列化。
public class SessionImpl implements Session, ClusterSerializable, Shareable { //... @Override public void writeToBuffer(Buffer buff) { byte[] bytes = id.getBytes(UTF8); buff.appendInt(bytes.length).appendBytes(bytes); buff.appendLong(timeout); buff.appendLong(lastAccessed); buff.appendInt(version); Buffer dataBuf = writeDataToBuffer(); buff.appendBuffer(dataBuf); } @Override public int readFromBuffer(int pos, Buffer buffer) { int len = buffer.getInt(pos); pos += 4; byte[] bytes = buffer.getBytes(pos, pos + len); pos += len; id = new String(bytes, UTF8); timeout = buffer.getLong(pos); pos += 8; lastAccessed = buffer.getLong(pos); pos += 8; version = buffer.getInt(pos); pos += 4; pos = readDataFromBuffer(pos, buffer); return pos; } //... }
以上就是序列化和反序列化的实现。
localSessionIds 主要考虑到清除session的时候使用,因为数据主要以保存在session为主,本地localSessionIds 保存sessionId是辅助作用。
用法
用法很简单,一行代码就说明。
router.route().handler(SessionHandler.create(RedisSessionStore.create(vertx).host("127.0.0.1").port(6349)));