分布式系统锁实现
为什么需要分布式系统锁
比如系统中的注册
需要先判断用户账号是否被注册 没有被注册则保存用户注册信息数据
在单系统情况下 可以这样做
String username = user.getUsername();
synchronized (username.intern()) {//防止多用户情况下 输入的相同账号都还未保存成功的情况下越过检查
if(userService.existUsername(username,"")){
mav.addObject("usernameMessage","用户名已存在");
return mav;
}
userService.save(user);
}
集群情况下可能是不同的服务器处理 无法阻止
分布式事务锁 底层借助memcached实现
/**
* 业务锁
*
* 通过 memcache 的 addCache获取锁
*
* @author 820381
*
*/
public class Lock {
/**
* 锁
* @param lockName
* @return true 成功/false 失败
*/
public static boolean lock(String lockName) {
return getLock(lockName, 0);
}
/**
* 解除锁
* @param lockName
* @return
*/
public static boolean unlock(String lockName) {
return CacheUtil.deleteKeyCache(lockName);
}
/**
* 锁,并在获得锁的时候执行回调,回调执行完成后,解除锁
* @param lockName 锁名
* @param call 回调函数
* @return BusinessRuntimeException: ExceptionCode.CMN_LOCK_FAILED
* @throws BusinessException
*/
public static <T> T lock(String lockName, Callback<T> call) throws BusinessException {
if(lock(lockName)) {
try {
return call.handle();
} finally {
unlock(lockName);
}
} else {
throw new BusinessException(ExceptionCode.CMN_LOCK_FAILED,
MessageFormat.format("连接超时[{0}]", lockName));
}
}
private static boolean getLock(String lockName, Integer count) {
boolean gotLock = false;
if(count > 1000) {
return gotLock;
}
while(true) {
gotLock = CacheUtil.addCache(lockName, true, null);
if(gotLock) {
return gotLock;
} else {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// 不处理异常睡眠异常
}
return getLock(lockName, ++count);
}
}
}
}
/**
*
* @ClassName: CacheUtil
* @Description: 缓存工具类
* @author Fanhy
* @date 2014-11-27 上午10:55:39
*
*/
public class CacheUtil {
/**
* 缓存客户端
*/
private static MemCachedClient memCachedClient;
/**
* 过期时间
*/
private String timeLenght;
/**
* 默认Key的超时时间(小于1000的值,除以1000以后都是0,即永不过期 )
*/
private static Date keyTimeOut;
@SuppressWarnings("unused")
private void init() {
keyTimeOut = new Date(Integer.parseInt(timeLenght));// 24小时缓存
}
/**
*
* @MethodName: getKeyCache
* @Description: 获取缓存
* @param keyName
* 缓存键
* @return 返回数据
*/
public static Object getKeyCache(String keyName) {
if (null != keyName) {
MemcachedItem item = memCachedClient.gets(keyName);
if (null != item) {
return item.getValue();
}
}
return null;
}
/**
*
* @MethodName: setCache
* @Description: 设置缓存
* @param keyName
* 缓存主键
* @param value
* 缓存数据
* @param date
* 过期时间(new Date(60000)过期时间是60秒)(如不设置默认缓存24小时)(不得超过30天)
*/
public static void setCache(String keyName, Object value, Date date) {
if (null != keyName && !"".equals(keyName)) {
if (null == date) {
memCachedClient.set(keyName, value, keyTimeOut);
} else {
memCachedClient.set(keyName, value, date);
}
}
}
/**
*
* @MethodName: setCache
* @Description: 设置缓存
* @param keyName
* 缓存主键
* @param value
* 缓存数据
*/
public static void setCache(String keyName, Object value) {
if (null != keyName && !"".equals(keyName)) {
memCachedClient.set(keyName, value, keyTimeOut);
}
}
/**
*
* @MethodName: deleteKeyCache
* @Description: 删除缓存
* @param keyName
* key名称
*/
public static boolean deleteKeyCache(String keyName) {
if (null != keyName && !"".equals(keyName)) {
return memCachedClient.delete(keyName);
}
return false;
}
public void setMemCachedClient(MemCachedClient memCachedClient) {
CacheUtil.memCachedClient = memCachedClient;
}
public void setTimeLenght(String timeLenght) {
this.timeLenght = timeLenght;
}
/**
*
* @MethodName: addCache
* @Description: 设置缓存不重复Key,暂时只用于分布式同步
* @param keyName
* @param value
* @param date
* @return void 返回类型
*/
public static boolean addCache(String keyName, Object value, Date date) {
boolean isTrue = false;
if (!StringUtils.isBlank(keyName)) {
if (null == date) {
Date dt = new Date();
date = DateUtils.addMinutes(dt, 3);
}
isTrue = memCachedClient.add(keyName, value, date);
}
return isTrue;
}
}
如何使用
@Aspect
@Component
public class LockAspect {
@Around("execution(* com.sfiec..*.*(..)) && @annotation(Lock)")
public Object around(final ProceedingJoinPoint point) throws Throwable {
String lockName = getLockName(point);
return com.sfiec.oms.common.Lock.lock(lockName, new Callback<Object>() {
@Override
public Object handle() throws BusinessException {
Object result;
try {
result = point.proceed();
} catch (Throwable e) {
if(e instanceof BusinessException) {
throw (BusinessException) e;
}
if(e instanceof BusinessRuntimeException) {
throw (BusinessRuntimeException) e;
}
throw new BusinessException("", e);
}
return result;
}
});
}
private String getLockName(ProceedingJoinPoint point) {
MethodSignature signature = MethodSignature.class.cast(point.getSignature());
Lock lock = signature.getMethod().getAnnotation(Lock.class);
// 不是自定义的锁标志的退出
if(lock == null) {
return null;
}
// 获取默认的锁
String lockName = lock.value();
// 不存在占位符,则直接返回
if(lockName.indexOf("${") == -1) {
return lockName;
}
// 获取占位符的内容
final Properties properties = new Properties();
String[] parameterNames = signature.getParameterNames();
Object[] args = point.getArgs();
for(int i = 0; i < parameterNames.length; i++) {
String name = parameterNames[i];
Object val = args[i];
if(isBaseDataType(val.getClass())) {
val = String.valueOf(val);
}
properties.put(name, val);
}
// TODO 如果存在 ${xx.xx} 属性,则使用反射获取内容
// 一般的 ${xxx},直接获取内容
PropertyPlaceholderHelper helper = new PropertyPlaceholderHelper("${", "}");
String lockName2 = helper.replacePlaceholders(lockName, properties);
return lockName2;
}
/**
* 判断一个类是否为基本数据类型。
* @param clazz 要判断的类。
* @return true 表示为基本数据类型。
*/
@SuppressWarnings("rawtypes")
private static boolean isBaseDataType(Class clazz) {
return
(
clazz.equals(String.class) ||
clazz.equals(Integer.class)||
clazz.equals(Byte.class) ||
clazz.equals(Long.class) ||
clazz.equals(Double.class) ||
clazz.equals(Float.class) ||
clazz.equals(Character.class) ||
clazz.equals(Short.class) ||
clazz.equals(BigDecimal.class) ||
clazz.equals(BigInteger.class) ||
clazz.equals(Boolean.class) ||
clazz.equals(Date.class) ||
clazz.isPrimitive()
);
}
}
@Lock(CommonConstant.LOCK_REGISTER + " ${user.username}")
public boolean register(User user) {
if(userService.existUsername(username,"")){
mav.addObject("usernameMessage","用户名已存在");
return false;
}
userService.save(user);
return true;
}