Zookeeper源码分析:Watcher机制
1. 设置Watcher
使用Watcher需要先实现Watcher接口,并将实现类对象传递到指定方法中,如getChildren, exist等。Zookeeper允许在构造Zookeeper对象时候指定一个默认Watcher对象.getChildren和exit方法可以使用这个默认的Watcher对象,也可以指定一个新Watcher对象。
Code 1: Watcher接口
public interface Watcher {
/**
* Event的状态
*/
public interface Event {
/**
* 在事件发生时,ZooKeeper的状态
*/
public enum KeeperState {
@Deprecated
Unknown (-1),
Disconnected (0),
@Deprecated
NoSyncConnected (1),
SyncConnected (3),
AuthFailed (4),
ConnectedReadOnly (5),
SaslAuthenticated(6),
Expired (-112);
private final int intValue;
KeeperState( int intValue) {
this.intValue = intValue;
}
......
}
/**
* ZooKeeper中的事件
*/
public enum EventType {
None (-1),
NodeCreated (1),
NodeDeleted (2),
NodeDataChanged (3),
NodeChildrenChanged (4);
private final int intValue; // Integer representation of value
// for sending over wire
EventType( int intValue) {
this.intValue = intValue;
}
......
}
}
//Watcher的回调方法
abstract public void process(WatchedEvent event);
}
Code 2: Zookeeper.getChildren(final String, Watcher)方法
public List<String> getChildren(final String path, Watcher watcher)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils. validatePath(clientPath);
WatchRegistration wcb = null;
//如果watcher不等于null, 构建WatchRegistration对象,
//该对象描述了watcher和path之间的关系
if (watcher != null) {
wcb = new ChildWatchRegistration(watcher, clientPath);
}
//在传入的path加上root path前缀,构成服务器端的绝对路径
final String serverPath = prependChroot(clientPath);
//构建RequestHeader对象
RequestHeader h = new RequestHeader();
//设置操作类型为OpCode. getChildren
h.setType(ZooDefs.OpCode. getChildren);
//构建GetChildrenRequest对象
GetChildrenRequest request = new GetChildrenRequest();
//设置path
request.setPath(serverPath);
//设置是否使用watcher
request.setWatch(watcher != null);
//构建GetChildrenResponse对象
GetChildrenResponse response = new GetChildrenResponse();
//提交请求,并阻塞等待结果
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code. get(r.getErr()),
clientPath);
}
return response.getChildren();
}
Follower的NIOServerCnxn类接到了Client的请求,会调用ZookeeperServer.processPacket()方法。该方法会构建一个Request对象,并调用第一个处理器FollowerRequestProcessor。
由于我们的请求只是一个读操作,而不是一个Quorum请求或者sync请求,所以FollowerRequestProcessor不需要调用Follower.request()方法将请求转给Leader,只需要将请求传递到下一个处理器CommitProcessor。
处理器CommitProcessor线程发现请求是读请求后,直接将Requet对象加入到toProcess队列中,在接下的循环中会调用FinalRequestProcessor.processRequest方法进行处理。
FinalRequestProcessor.processRequest方法最终会调用ZKDatabase中的读操作方法(如statNode和getData方法), 而ZKDatabase的这些方法会最终调用DataTree类的方法来获取指定path的znode信息并返回给Client端,同时也会设置Watcher。
Code 3: FinalRequestProcessor对OpCode.getData请求的处理
case OpCode. getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream. byteBuffer2Record(request.request,
getDataRequest);
//获得znode对象
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
//n为null, 抛出NoNodeException异常
if (n == null) {
throw new KeeperException.NoNodeException();
}
Long aclL;
synchronized(n) {
aclL = n. acl;
}
//检查是否有读权限
PrepRequestProcessor. checkACL(zks, zks.getZKDatabase().convertLong(aclL),
ZooDefs.Perms. READ,
request. authInfo);
//构建状态对象stat
Stat stat = new Stat();
//获得指定path的znode数据,
//如果GetDataRequest.getWatcher()返回true, 将ServerCnxn类型对象cnxn传递进去。
//ServerCnxn是实现了Watcher接口
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest. getWatch() ? cnxn : null);
//构建GetDataResponse对象
rsp = new GetDataResponse(b, stat);
break;
}
Code 4: DataTree.getData()方法
public byte[] getData(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
//从nodes map中获取指定path的DataNode对象
DataNode n = nodes.get(path);
//如果n为null, 则抛出NoNodeException异常
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
//将n的状态copy到stat中
n.copyStat(stat);
//如果watcher不会null, 则将(path, watcher)键值对放入dataWatchers Map里
if (watcher != null) {
dataWatches.addWatch(path, watcher);
}
//返回节点数据
return n.data ;
}
}
2. 修改znode数据触发Watcher
在Zookeeper二阶段提交的COMMIT阶段。当Follower从Leader那接收到一个写请求的Leader.COMMIT数据包,会调用FinalRequestProcessor.processRequest()方法。Leader本身在发送完Leader.COMMIT数据包,也会调用FinalRequestProcessor.processRequest()方法。
如果是setData修改数据请求,那么FinalRequestProcessor.processRequest()方法最终会调用到DataTree.setData方法将txn应用到指定znode上,同时触发Watcher,并发送notification给Client端。
其关SetData请求的时序图如下:
triggerWatcher
Code 5: DataTree.setData()方法
public Stat setData(String path, byte data[], int version, long zxid,
long time) throws KeeperException.NoNodeException {
Stat s = new Stat();
//根据path, 获得DataNode对象n
DataNode n = nodes.get(path);
//如果n为null, 则抛出NoNodeException异常
if (n == null) {
throw new KeeperException.NoNodeException();
}
byte lastdata[] = null;
synchronized (n) {
lastdata = n. data;
n. data = data;
n. stat.setMtime(time);
n. stat.setMzxid(zxid);
n. stat.setVersion(version);
n.copyStat(s);
}
// now update if the path is in a quota subtree.
String lastPrefix = getMaxPrefixWithQuota(path);
if(lastPrefix != null) {
this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
- (lastdata == null ? 0 : lastdata.length ));
}
//触发Watcher
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
Code 6: WatchManage.triggerWatcher()方法,触发Watcher。
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState. SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this ) {
//从watchTable删除掉path对于的watcher
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG .isTraceEnabled()) {
ZooTrace. logTraceMessage(LOG,
ZooTrace. EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
//循环处理所有关于path的Watcher, 这里Watcher对象实际上就是ServerCnxn类型对象
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);
}
return watchers;
}
Code 7: NIOServerCnxn.process方法,发送notification给Client端
synchronized public void process (WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG .isTraceEnabled()) {
ZooTrace. logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK ,
"Deliver event " + event + " to 0x"
+ Long. toHexString(this. sessionId)
+ " through " + this );
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
//发送notification给Client端
sendResponse(h, e, "notification");
}
3. 总结
Watcher具有one-time trigger的特性,在代码中我们也可以看到一个watcher被处理后会立即从watchTable中删掉。
--------------------------------------分割线 --------------------------------------