定时任务分布式锁的一种实现(数据库锁)
在集群环境下,若每一台机器都运行一个定时任务,会导致生产数据一致性问题,所以必须要实现一个锁。保证当时任务在同一时间段只能在一台机器上面运行。定时任务实现方式很多种,比较常用的有quartz、TBSchedule等等。低版本的quartz无法满足多个不同的任务同时进行并且不并发执行,尝试了很久quartz还是放弃了。
换了一种思路,自己实现了一种数据库锁的机制,需要创建两张表,一张表来控制任务,另外一张表来记录执行信息。
SQL脚本:
CREATE TABLE qrtz_basejob_config(
key_name varchar(255) DEFAULT '' NOT NULL,
actual_pre_time TIMESTAMP DEFAULT NULL ,
job_state varchar(4) DEFAULT '0' NOT NULL ,
create_time TIMESTAMP DEFAULT NULL,
PRIMARY KEY (key_name)
) ;
COMMENT ON table qrtz_basejob_config
IS
'baseJob配置表';
COMMENT ON column qrtz_basejob_config.key_name
IS
'参数code';
COMMENT ON column qrtz_basejob_config.actual_pre_time
IS
'上一次实际执行时间';
COMMENT ON column qrtz_basejob_config.job_state
IS
'状态--1代表正在执行0代表等待执行';
COMMENT ON column qrtz_basejob_config.create_time
IS
'创建时间';
CREATE TABLE qrtz_basejob_config_record (
id DECIMAL(20) NOT NULL ,
key_name varchar(255) DEFAULT NULL,
start_time timestamp DEFAULT NULL,
end_time timestamp DEFAULT NULL ,
cost_time DECIMAL(20) DEFAULT 0 ,
ip varchar(50) DEFAULT NULL,
create_time timestamp DEFAULT NULL,
PRIMARY KEY (id)
);
COMMENT ON table qrtz_basejob_config_record
IS
'定时任务运行记录表';
COMMENT ON column qrtz_basejob_config_record.id
IS
'主键';
COMMENT ON column qrtz_basejob_config_record.key_name
IS
'定时任务名称';
COMMENT ON column qrtz_basejob_config_record.start_time
IS
'定时任务开始时间';
COMMENT ON column qrtz_basejob_config_record.end_time
IS
'定时任务结束时间';
COMMENT ON column qrtz_basejob_config_record.cost_time
IS
'耗时';
COMMENT ON column qrtz_basejob_config_record.ip
IS
'运行服务器IP';
COMMENT ON column qrtz_basejob_config_record.create_time
IS
'创建时间';
公共任务类:
package com.service.schedulerLockJob.lockJob;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.stereotype.Component;
import com.log.ElectronicsLogger;
import com.service.schedulerLockJob.IBasicJobConfigService;
import com.util.SpringContextHolder;
/**
* 创建日期:2017-11-21上午10:28:36
* 修改日期:
* 作者:ttan
* 描述:
*/
@Component
public abstract class BasicJob{
/*@Autowired(required = false)
@Qualifier("basicJobConfigServiceImpl")
IBasicJobConfigService basicJobConfigService;*/
private IBasicJobConfigService basicJobConfigService = (IBasicJobConfigService) SpringContextHolder.getBean("basicJobConfigServiceImpl");
public static String IP_STRING = null;
static{
try {
InetAddress ip = InetAddress.getLocalHost();
IP_STRING = ip.getHostAddress();
} catch (UnknownHostException e) {
e.printStackTrace();
}
}
/**
*
* 创建日期:2017-11-21下午4:41:22
* 修改日期:
* 作者:ttan
* 描述:
* keyName:任务ID
* resetTime:设置的超时时间
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public void lockJob(String keyName,String resetTime) throws ParseException {
if(keyName != null && keyName != ""){
SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//获取执行时间
Date now = new Date();
String nowTime = format.format(now);
Map baseJobConfig = new HashMap();
baseJobConfig.put("keyName", keyName);
baseJobConfig.put("createTime", nowTime);
List list = basicJobConfigService.getCount(baseJobConfig);
Calendar cal=Calendar.getInstance();
//任务列表无此任务,即初始化
if(list.size()==0||list==null){
baseJobConfig.put("jobState", "0");
cal.setTime(now);
cal.add(Calendar.DAY_OF_MONTH, -1);
String actualPreTime=format.format(cal.getTime());
//为避免初始化任务是出错,将执行时间设置为前一天,即可立即执行
baseJobConfig.put("actualPreTime", actualPreTime);
basicJobConfigService.insertJobConfig(baseJobConfig);
ElectronicsLogger.logger.info("**********************[basicJob.doJob]任务:"+keyName+" 初始化任务到任务列表**********************");
}
int num = 0;
cal.setTime(now);
cal.add(Calendar.MINUTE, -Integer.parseInt(resetTime));
String checkTime = format.format(cal.getTime());
//超时处理: 将状态修改为0
baseJobConfig.put("checkTime", checkTime);
num = basicJobConfigService.updateStateByTimeOut(baseJobConfig);
if(num > 0){
ElectronicsLogger.logger.info("**********************[basicJob.doJob]任务:"+keyName+" 执行超时,已重置任务状态**********************");
}
num = basicJobConfigService.updateStateAtStartTime(baseJobConfig);
if(num > 0){
Date startDate = new Date();
String startTime = format.format(startDate);
ElectronicsLogger.logger.info("**********************[basicJob.doJob]任务:"+keyName+" 执行开始,时间:"+startTime+"**********************");
try {
execute();
} catch (Exception e) {
ElectronicsLogger.logger.error("**********************[basicJob.doJob]任务:"+keyName+" 任务执行中出现异常**********************");
e.printStackTrace();
}finally{
//任务结束后将状态修改为 0
num = basicJobConfigService.updateStateAtEndTime(baseJobConfig);
if(num <= 0){
ElectronicsLogger.logger.error("**********************[basicJob.doJob]任务:"+keyName+" 修改任务状态失败**********************");
}
Date endDate = new Date();
String endTime = format.format(endDate);
long costTime = (endDate.getTime()-startDate.getTime())/1000;//耗时 单位 s
baseJobConfig.put("startTime", startTime);
baseJobConfig.put("endTime", endTime);
baseJobConfig.put("costTime", costTime);
baseJobConfig.put("ipAdress", IP_STRING);
//保存任务执行记录
try {
basicJobConfigService.insertJobConfigRecord(baseJobConfig);
} catch (Exception e) {
e.printStackTrace();
}
ElectronicsLogger.logger.info("**********************[basicJob.doJob]任务:"+keyName+" 执行结束,时间:"+endTime+"耗时:"+costTime+"s**********************");
}
}
}else{
ElectronicsLogger.logger.error("**********************[basicJob.doJob]任务名为空,任务无法执行!**********************");
}
}
//要执行的任务类
public abstract void execute();
}
任务类继承BasicJob,实现execute()方法,例如:
/**
* 创建日期:2017-9-9下午8:54:35
* 修改日期:
* 作者:ttan
*/
package com.service;
import com.service.schedulerLockJob.lockJob.BasicJob;
/**
* 创建日期:2017-9-9下午8:54:35
* 修改日期:
* 作者:ttan
*/
public class TestService extends BasicJob{
@SuppressWarnings("unused")
public void doJob() throws Exception{
super.lockJob("{任务名的ID(唯一)}", "{允许任务超时的分钟数}");
}
@Override
public void execute() {
System.out.printf("测试!!!!~~~~~~~~~~~~~~~~~");
}
}
注意:实际需要触发执行的是doJob方法,实现execute方法,将定时任务要执行的方法放到里面。
以下将一些具体的service和dao贴一下,稍微看一下就明白了。
Service接口:
package com.service.schedulerLockJob;
import java.util.List;
import java.util.Map;
/**
* 创建日期:2017-11-21下午5:19:41
* 修改日期:
* 作者:ttan
* 描述:
*/
public interface IBasicJobConfigService {
/**
*
* 创建日期:2017-11-21上午11:39:58
* 修改日期:
* 作者:ttan
* 描述:判断初始化任务是否存在
*/
@SuppressWarnings("rawtypes")
public List getCount(Map params);
/**
*
* 创建日期:2017-11-21下午2:58:22
* 修改日期:
* 作者:ttan
* 描述:插入任务列表
*/
@SuppressWarnings("rawtypes")
public int insertJobConfig(Map params);
/**
*
* 创建日期:2017-11-21下午2:59:06
* 修改日期:
* 作者:ttan
* 描述:插入运行记录表
* @throws Exception
*/
@SuppressWarnings("rawtypes")
public int insertJobConfigRecord(Map params) throws Exception;
/**
*
* 创建日期:2017-11-21上午11:40:02
* 修改日期:
* 作者:ttan
* 描述:修改状态在超时
*/
@SuppressWarnings("rawtypes")
public int updateStateByTimeOut(Map params);
/**
*
* 创建日期:2017-11-21上午11:40:05
* 修改日期:
* 作者:ttan
* 描述:修改状态在开始时间
*/
@SuppressWarnings("rawtypes")
public int updateStateAtStartTime(Map params);
/**
*
* 创建日期:2017-11-21上午11:40:08
* 修改日期:
* 作者:ttan
* 描述:修改状态在结束时间
*/
@SuppressWarnings("rawtypes")
public int updateStateAtEndTime(Map params);
}
Service实现类:
package com.service.schedulerLockJob.impl;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import com.dao.SequenceDao;
import com.dao.schedulerLockJob.IBasicJobConfigDao;
import com.service.schedulerLockJob.IBasicJobConfigService;
/**
* 创建日期:2017-11-21下午5:20:05
* 修改日期:
* 作者:ttan
* 描述:
*/
@Service("basicJobConfigServiceImpl")
public class BasicJobConfigServiceImpl implements IBasicJobConfigService{
@Autowired(required = false)
@Qualifier("basicJobConfigDaoImpl")
IBasicJobConfigDao basicJobConfigDao;
@SuppressWarnings("rawtypes")
@Override
public List getCount(Map params) {
return basicJobConfigDao.getCount(params);
}
@SuppressWarnings("rawtypes")
@Override
public int insertJobConfig(Map params) {
return basicJobConfigDao.insertJobConfig(params);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public int insertJobConfigRecord(Map params) throws Exception {
SequenceDao seqDao = new SequenceDao();
String id = seqDao.getGenratorId("QRTZ_BASEJOB_CONFIG_RECORD");
if(id==null||"".equals(id)){
throw new Exception("获取数据库序列失败!");
}
params.put("id", id);
return basicJobConfigDao.insertJobConfigRecord(params);
}
@SuppressWarnings("rawtypes")
@Override
public int updateStateByTimeOut(Map params) {
return basicJobConfigDao.updateStateByTimeOut(params);
}
@SuppressWarnings("rawtypes")
@Override
public int updateStateAtStartTime(Map params) {
return basicJobConfigDao.updateStateAtStartTime(params);
}
@SuppressWarnings("rawtypes")
@Override
public int updateStateAtEndTime(Map params) {
return basicJobConfigDao.updateStateAtEndTime(params);
}
}
Dao接口:
package com.dao.schedulerLockJob;
import java.util.List;
import java.util.Map;
/**
* 创建日期:2017-11-21上午11:36:57
* 修改日期:
* 作者:ttan
* 描述:
*/
public interface IBasicJobConfigDao {
/**
*
* 创建日期:2017-11-21上午11:39:58
* 修改日期:
* 作者:ttan
* 描述:判断初始化任务是否存在
*/
@SuppressWarnings("rawtypes")
public List getCount(Map params);
/**
*
* 创建日期:2017-11-21下午2:58:22
* 修改日期:
* 作者:ttan
* 描述:插入任务列表
*/
@SuppressWarnings("rawtypes")
public int insertJobConfig(Map params);
/**
*
* 创建日期:2017-11-21下午2:59:06
* 修改日期:
* 作者:ttan
* 描述:插入运行记录表
*/
@SuppressWarnings("rawtypes")
public int insertJobConfigRecord(Map params);
/**
*
* 创建日期:2017-11-21上午11:40:02
* 修改日期:
* 作者:ttan
* 描述:修改状态在超时
*/
@SuppressWarnings("rawtypes")
public int updateStateByTimeOut(Map params);
/**
*
* 创建日期:2017-11-21上午11:40:05
* 修改日期:
* 作者:ttan
* 描述:修改状态在开始时间
*/
@SuppressWarnings("rawtypes")
public int updateStateAtStartTime(Map params);
/**
*
* 创建日期:2017-11-21上午11:40:08
* 修改日期:
* 作者:ttan
* 描述:修改状态在结束时间
*/
@SuppressWarnings("rawtypes")
public int updateStateAtEndTime(Map params);
}
Dao实现类:
package com.dao.schedulerLockJob.impl;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import org.springframework.stereotype.Repository;
import com.dao.JdbcTemplateSupport;
import com.dao.schedulerLockJob.IBasicJobConfigDao;
import com.util.DateUtil;
import com.util.string.StringUtil;
/**
* 创建日期:2017-11-21下午2:18:10
* 修改日期:
* 作者:ttan
* 描述:
*/
@Repository("basicJobConfigDaoImpl")
public class BasicJobConfigDaoImpl extends JdbcTemplateSupport implements IBasicJobConfigDao {
@SuppressWarnings("rawtypes")
@Override
public List getCount(Map params) {
String sql = "SELECT * FROM QRTZ_BASEJOB_CONFIG WHERE KEY_NAME = ?";
String KEY_NAME = StringUtil.toString(params.get("keyName"));
return this.query(sql,new Object[]{KEY_NAME});
}
@SuppressWarnings("rawtypes")
@Override
public int updateStateByTimeOut(Map params) {
String sql = "UPDATE QRTZ_BASEJOB_CONFIG SET JOB_STATE = '0' " +
"WHERE KEY_NAME = ? AND JOB_STATE = '1' " +
"AND ACTUAL_PRE_TIME <= ?";
String KEY_NAME = StringUtil.toString(params.get("keyName"));
String checkTime = StringUtil.toString(params.get("checkTime"));
Timestamp timeStamp1 = DateUtil.convertStringToTimestamp(checkTime, "1");
return this.update(sql, new Object[]{KEY_NAME,timeStamp1});
}
@SuppressWarnings("rawtypes")
@Override
public int updateStateAtStartTime(Map params) {
String sql ="UPDATE QRTZ_BASEJOB_CONFIG SET JOB_STATE = '1', " +
"ACTUAL_PRE_TIME = current timestamp WHERE KEY_NAME = ? " +
"AND JOB_STATE = '0'";
String KEY_NAME = StringUtil.toString(params.get("keyName"));
return this.update(sql, new Object[]{KEY_NAME});
}
@SuppressWarnings("rawtypes")
@Override
public int updateStateAtEndTime(Map params) {
String sql ="UPDATE QRTZ_BASEJOB_CONFIG SET JOB_STATE = '0' " +
"WHERE KEY_NAME = ? " +
"AND JOB_STATE = '1'";
String KEY_NAME = StringUtil.toString(params.get("keyName"));
return this.update(sql, new Object[]{KEY_NAME});
}
@SuppressWarnings("rawtypes")
@Override
public int insertJobConfig(Map params) {
String sql = "INSERT INTO QRTZ_BASEJOB_CONFIG(KEY_NAME,JOB_STATE,ACTUAL_PRE_TIME,CREATE_TIME) VALUES(?,?,?,?)";
String KEY_NAME = StringUtil.toString(params.get("keyName"));
String JOB_STATE = StringUtil.toString(params.get("jobState"));
String ACTUAL_PRE_TIME = StringUtil.toString(params.get("actualPreTime"));
String CREATE_TIME = StringUtil.toString(params.get("createTime"));
Timestamp timeStamp1 = DateUtil.convertStringToTimestamp(ACTUAL_PRE_TIME, "1");
Timestamp timeStamp2 = DateUtil.convertStringToTimestamp(CREATE_TIME, "1");
return this.update(sql, new Object[]{KEY_NAME,JOB_STATE,timeStamp1,timeStamp2});
}
@SuppressWarnings("rawtypes")
@Override
public int insertJobConfigRecord(Map params) {
String sql = "INSERT INTO QRTZ_BASEJOB_CONFIG_RECORD(ID,KEY_NAME,START_TIME,END_TIME,COST_TIME,IP,CREATE_TIME) " +
"VALUES(?,?,?,?,?,?,?)";
long ID = Long.parseLong(StringUtil.toString(params.get("id")));;
String KEY_NAME = StringUtil.toString(params.get("keyName"));
String START_TIME_STR = StringUtil.toString(params.get("startTime"));
String END_TIME_STR = StringUtil.toString(params.get("endTime"));
long COST_TIME = (Long) params.get("costTime");
String IP = StringUtil.toString(params.get("ipAdress"));
String CREATE_TIME_STR = StringUtil.toString(params.get("createTime"));
Timestamp START_TIME = DateUtil.convertStringToTimestamp(START_TIME_STR, "1");
Timestamp END_TIME = DateUtil.convertStringToTimestamp(END_TIME_STR, "1");
Timestamp CREATE_TIME = DateUtil.convertStringToTimestamp(CREATE_TIME_STR, "1");
return this.update(sql,new Object[]{ID,KEY_NAME,START_TIME,END_TIME,COST_TIME,IP,CREATE_TIME});
}
}
总结:经过一部分测试,暂时没有发现什么问题,缺乏大剂量数据操作的测试。 不过合理设置超时时间基本可以避免数据的出错。
医学医生手工作与现代计算机接口