ZooKeeper实现分布式事物锁

闲暇时研究了下基于ZooKeeper实现的分布式事务锁,做下记录,便于回顾复习

public class DistributeLock implements Lock,Watcher {

	//锁的名称
	private String lockName;
	private long TIMEOUT = 3000;
	//当前锁的节点
	private String currentLockNode;
	//前一个锁的节点
	private String prevLockNode;
	private ZooKeeper zk;
	private int SESSION_TIME_OUT = 2000;
	private String ROOT_LOCK = "/locks";
	private String PATH_PREFIX = "_locks";
	//计数器-也为锁的触发器
	private CountDownLatch countDownLatch;
	
	public DistributeLock(String ip,String lockName) {
		this.lockName = lockName;
		try {
			zk = new ZooKeeper(ip, SESSION_TIME_OUT, this);
			
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	@Override
	public void process(WatchedEvent event) {
		if(countDownLatch!=null) {
			countDownLatch.countDown();
		}
	}

	@Override
	public void lock() {
		if(tryLock()) {
			return ;
		}else {
			waitForLock(TIMEOUT);
		}
	}
	
	public boolean waitForLock(long timeOut) {
		Stat stat;
		try {
			stat = zk.exists(prevLockNode, true);
			if(stat!=null) {
				countDownLatch = new CountDownLatch(1);
				//设置超时时间    (不清楚会不会出现异常情况)
				countDownLatch.await(TIMEOUT,TimeUnit.MILLISECONDS);
				countDownLatch = null;
			}
			return true;
		} catch (KeeperException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return false;
	}

	@Override
	public void lockInterruptibly() throws InterruptedException {
		if(zk!=null) {			
			zk.close();
		}
		
	}

	@Override
	public Condition newCondition() {
		return null;
	}

	@Override
	public boolean tryLock() {
		try {
			Stat stat = zk.exists(ROOT_LOCK, false);
			//没有根节点
			if(stat==null) {
				zk.create(ROOT_LOCK, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
			}
			if(lockName.contains(PATH_PREFIX)) {
				System.out.println("锁名称错误");
				return false;
			}
			String nodePath = zk.create(ROOT_LOCK+"/"+lockName+"/"+PATH_PREFIX, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
			//获取根目录下的子节点
			List<String> childList = zk.getChildren(ROOT_LOCK, false);
			List<String> lockPathList = new ArrayList<String>();
			for(String childNode:childList) {
				String splitePath = childNode.split("/"+PATH_PREFIX)[0];
				if(splitePath.equals(ROOT_LOCK+"/"+lockName)) {
					lockPathList.add(childNode);
				}
			}
			Collections.sort(lockPathList);
			//当前节点为最小节点
			if(nodePath.equals(lockPathList.get(0))) {
				currentLockNode = nodePath;
				return true;
			}
			//前一个节点
			prevLockNode = lockPathList.get(lockPathList.indexOf(lockPathList)-1);
			return false;
		} catch (KeeperException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return false;
	}

	@Override
	public boolean tryLock(long timeOut, TimeUnit timeUnit)
			throws InterruptedException {
		if(tryLock()) {
			return true;
		}
		return waitForLock(timeOut);
		
	}

	@Override
	public void unlock() {
		try {
			if(zk!=null) {
				zk.delete(currentLockNode, -1);
				zk.close();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (KeeperException e) {
			e.printStackTrace();
		}
	}
}

相关推荐