定时任务分布式锁的一种实现(数据库锁)

在集群环境下,若每一台机器都运行一个定时任务,会导致生产数据一致性问题,所以必须要实现一个锁。保证当时任务在同一时间段只能在一台机器上面运行。定时任务实现方式很多种,比较常用的有quartz、TBSchedule等等。低版本的quartz无法满足多个不同的任务同时进行并且不并发执行,尝试了很久quartz还是放弃了。

换了一种思路,自己实现了一种数据库锁的机制,需要创建两张表,一张表来控制任务,另外一张表来记录执行信息。

SQL脚本:

CREATE TABLE qrtz_basejob_config(

key_name varchar(255) DEFAULT '' NOT NULL,

actual_pre_time TIMESTAMP DEFAULT NULL ,

job_state varchar(4) DEFAULT '0' NOT NULL ,

create_time TIMESTAMP DEFAULT NULL,

PRIMARY KEY (key_name)

) ;

COMMENT ON table qrtz_basejob_config

IS

'baseJob配置表';

COMMENT ON column qrtz_basejob_config.key_name

IS

'参数code';

COMMENT ON column qrtz_basejob_config.actual_pre_time

IS

'上一次实际执行时间';

COMMENT ON column qrtz_basejob_config.job_state

IS

'状态--1代表正在执行0代表等待执行';

COMMENT ON column qrtz_basejob_config.create_time

IS

'创建时间';

CREATE TABLE qrtz_basejob_config_record (

id DECIMAL(20) NOT NULL ,

key_name varchar(255) DEFAULT NULL,

start_time timestamp DEFAULT NULL,

end_time timestamp DEFAULT NULL ,

cost_time DECIMAL(20) DEFAULT 0 ,

ip varchar(50) DEFAULT NULL,

create_time timestamp DEFAULT NULL,

PRIMARY KEY (id)

);

COMMENT ON table qrtz_basejob_config_record

IS

'定时任务运行记录表';

COMMENT ON column qrtz_basejob_config_record.id

IS

'主键';

COMMENT ON column qrtz_basejob_config_record.key_name

IS

'定时任务名称';

COMMENT ON column qrtz_basejob_config_record.start_time

IS

'定时任务开始时间';

COMMENT ON column qrtz_basejob_config_record.end_time

IS

'定时任务结束时间';

COMMENT ON column qrtz_basejob_config_record.cost_time

IS

'耗时';

COMMENT ON column qrtz_basejob_config_record.ip

IS

'运行服务器IP';

COMMENT ON column qrtz_basejob_config_record.create_time

IS

'创建时间';

公共任务类:

package com.service.schedulerLockJob.lockJob;

import java.net.InetAddress;

import java.net.UnknownHostException;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.Calendar;

import java.util.Date;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import org.springframework.stereotype.Component;

import com.log.ElectronicsLogger;

import com.service.schedulerLockJob.IBasicJobConfigService;

import com.util.SpringContextHolder;

/**

* 创建日期:2017-11-21上午10:28:36

* 修改日期:

* 作者:ttan

* 描述:

*/

@Component

public abstract class BasicJob{

/*@Autowired(required = false)

@Qualifier("basicJobConfigServiceImpl")

IBasicJobConfigService basicJobConfigService;*/

private IBasicJobConfigService basicJobConfigService = (IBasicJobConfigService) SpringContextHolder.getBean("basicJobConfigServiceImpl");

public static String IP_STRING = null;

static{

try {

InetAddress ip = InetAddress.getLocalHost();

IP_STRING = ip.getHostAddress();

} catch (UnknownHostException e) {

e.printStackTrace();

}

}

/**

*

* 创建日期:2017-11-21下午4:41:22

* 修改日期:

* 作者:ttan

* 描述:

* keyName:任务ID

* resetTime:设置的超时时间

*/

@SuppressWarnings({ "rawtypes", "unchecked" })

public void lockJob(String keyName,String resetTime) throws ParseException {

if(keyName != null && keyName != ""){

SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

//获取执行时间

Date now = new Date();

String nowTime = format.format(now);

Map baseJobConfig = new HashMap();

baseJobConfig.put("keyName", keyName);

baseJobConfig.put("createTime", nowTime);

List list = basicJobConfigService.getCount(baseJobConfig);

Calendar cal=Calendar.getInstance();

//任务列表无此任务,即初始化

if(list.size()==0||list==null){

baseJobConfig.put("jobState", "0");

cal.setTime(now);

cal.add(Calendar.DAY_OF_MONTH, -1);

String actualPreTime=format.format(cal.getTime());

//为避免初始化任务是出错,将执行时间设置为前一天,即可立即执行

baseJobConfig.put("actualPreTime", actualPreTime);

basicJobConfigService.insertJobConfig(baseJobConfig);

ElectronicsLogger.logger.info("**********************[basicJob.doJob]任务:"+keyName+" 初始化任务到任务列表**********************");

}

int num = 0;

cal.setTime(now);

cal.add(Calendar.MINUTE, -Integer.parseInt(resetTime));

String checkTime = format.format(cal.getTime());

//超时处理: 将状态修改为0

baseJobConfig.put("checkTime", checkTime);

num = basicJobConfigService.updateStateByTimeOut(baseJobConfig);

if(num > 0){

ElectronicsLogger.logger.info("**********************[basicJob.doJob]任务:"+keyName+" 执行超时,已重置任务状态**********************");

}

num = basicJobConfigService.updateStateAtStartTime(baseJobConfig);

if(num > 0){

Date startDate = new Date();

String startTime = format.format(startDate);

ElectronicsLogger.logger.info("**********************[basicJob.doJob]任务:"+keyName+" 执行开始,时间:"+startTime+"**********************");

try {

execute();

} catch (Exception e) {

ElectronicsLogger.logger.error("**********************[basicJob.doJob]任务:"+keyName+" 任务执行中出现异常**********************");

e.printStackTrace();

}finally{

//任务结束后将状态修改为 0

num = basicJobConfigService.updateStateAtEndTime(baseJobConfig);

if(num <= 0){

ElectronicsLogger.logger.error("**********************[basicJob.doJob]任务:"+keyName+" 修改任务状态失败**********************");

}

Date endDate = new Date();

String endTime = format.format(endDate);

long costTime = (endDate.getTime()-startDate.getTime())/1000;//耗时 单位 s

baseJobConfig.put("startTime", startTime);

baseJobConfig.put("endTime", endTime);

baseJobConfig.put("costTime", costTime);

baseJobConfig.put("ipAdress", IP_STRING);

//保存任务执行记录

try {

basicJobConfigService.insertJobConfigRecord(baseJobConfig);

} catch (Exception e) {

e.printStackTrace();

}

ElectronicsLogger.logger.info("**********************[basicJob.doJob]任务:"+keyName+" 执行结束,时间:"+endTime+"耗时:"+costTime+"s**********************");

}

}

}else{

ElectronicsLogger.logger.error("**********************[basicJob.doJob]任务名为空,任务无法执行!**********************");

}

}

//要执行的任务类

public abstract void execute();

}

任务类继承BasicJob,实现execute()方法,例如:

/**

* 创建日期:2017-9-9下午8:54:35

* 修改日期:

* 作者:ttan

*/

package com.service;

import com.service.schedulerLockJob.lockJob.BasicJob;

/**

* 创建日期:2017-9-9下午8:54:35

* 修改日期:

* 作者:ttan

*/

public class TestService extends BasicJob{

@SuppressWarnings("unused")

public void doJob() throws Exception{

super.lockJob("{任务名的ID(唯一)}", "{允许任务超时的分钟数}");

}

@Override

public void execute() {

System.out.printf("测试!!!!~~~~~~~~~~~~~~~~~");

}

}

注意:实际需要触发执行的是doJob方法,实现execute方法,将定时任务要执行的方法放到里面。

以下将一些具体的service和dao贴一下,稍微看一下就明白了。

Service接口:

package com.service.schedulerLockJob;

import java.util.List;

import java.util.Map;

/**

* 创建日期:2017-11-21下午5:19:41

* 修改日期:

* 作者:ttan

* 描述:

*/

public interface IBasicJobConfigService {

/**

*

* 创建日期:2017-11-21上午11:39:58

* 修改日期:

* 作者:ttan

* 描述:判断初始化任务是否存在

*/

@SuppressWarnings("rawtypes")

public List getCount(Map params);

/**

*

* 创建日期:2017-11-21下午2:58:22

* 修改日期:

* 作者:ttan

* 描述:插入任务列表

*/

@SuppressWarnings("rawtypes")

public int insertJobConfig(Map params);

/**

*

* 创建日期:2017-11-21下午2:59:06

* 修改日期:

* 作者:ttan

* 描述:插入运行记录表

* @throws Exception

*/

@SuppressWarnings("rawtypes")

public int insertJobConfigRecord(Map params) throws Exception;

/**

*

* 创建日期:2017-11-21上午11:40:02

* 修改日期:

* 作者:ttan

* 描述:修改状态在超时

*/

@SuppressWarnings("rawtypes")

public int updateStateByTimeOut(Map params);

/**

*

* 创建日期:2017-11-21上午11:40:05

* 修改日期:

* 作者:ttan

* 描述:修改状态在开始时间

*/

@SuppressWarnings("rawtypes")

public int updateStateAtStartTime(Map params);

/**

*

* 创建日期:2017-11-21上午11:40:08

* 修改日期:

* 作者:ttan

* 描述:修改状态在结束时间

*/

@SuppressWarnings("rawtypes")

public int updateStateAtEndTime(Map params);

}

Service实现类:

package com.service.schedulerLockJob.impl;

import java.util.List;

import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.stereotype.Service;

import com.dao.SequenceDao;

import com.dao.schedulerLockJob.IBasicJobConfigDao;

import com.service.schedulerLockJob.IBasicJobConfigService;

/**

* 创建日期:2017-11-21下午5:20:05

* 修改日期:

* 作者:ttan

* 描述:

*/

@Service("basicJobConfigServiceImpl")

public class BasicJobConfigServiceImpl implements IBasicJobConfigService{

@Autowired(required = false)

@Qualifier("basicJobConfigDaoImpl")

IBasicJobConfigDao basicJobConfigDao;

@SuppressWarnings("rawtypes")

@Override

public List getCount(Map params) {

return basicJobConfigDao.getCount(params);

}

@SuppressWarnings("rawtypes")

@Override

public int insertJobConfig(Map params) {

return basicJobConfigDao.insertJobConfig(params);

}

@SuppressWarnings({ "rawtypes", "unchecked" })

@Override

public int insertJobConfigRecord(Map params) throws Exception {

SequenceDao seqDao = new SequenceDao();

String id = seqDao.getGenratorId("QRTZ_BASEJOB_CONFIG_RECORD");

if(id==null||"".equals(id)){

throw new Exception("获取数据库序列失败!");

}

params.put("id", id);

return basicJobConfigDao.insertJobConfigRecord(params);

}

@SuppressWarnings("rawtypes")

@Override

public int updateStateByTimeOut(Map params) {

return basicJobConfigDao.updateStateByTimeOut(params);

}

@SuppressWarnings("rawtypes")

@Override

public int updateStateAtStartTime(Map params) {

return basicJobConfigDao.updateStateAtStartTime(params);

}

@SuppressWarnings("rawtypes")

@Override

public int updateStateAtEndTime(Map params) {

return basicJobConfigDao.updateStateAtEndTime(params);

}

}

Dao接口:

package com.dao.schedulerLockJob;

import java.util.List;

import java.util.Map;

/**

* 创建日期:2017-11-21上午11:36:57

* 修改日期:

* 作者:ttan

* 描述:

*/

public interface IBasicJobConfigDao {

/**

*

* 创建日期:2017-11-21上午11:39:58

* 修改日期:

* 作者:ttan

* 描述:判断初始化任务是否存在

*/

@SuppressWarnings("rawtypes")

public List getCount(Map params);

/**

*

* 创建日期:2017-11-21下午2:58:22

* 修改日期:

* 作者:ttan

* 描述:插入任务列表

*/

@SuppressWarnings("rawtypes")

public int insertJobConfig(Map params);

/**

*

* 创建日期:2017-11-21下午2:59:06

* 修改日期:

* 作者:ttan

* 描述:插入运行记录表

*/

@SuppressWarnings("rawtypes")

public int insertJobConfigRecord(Map params);

/**

*

* 创建日期:2017-11-21上午11:40:02

* 修改日期:

* 作者:ttan

* 描述:修改状态在超时

*/

@SuppressWarnings("rawtypes")

public int updateStateByTimeOut(Map params);

/**

*

* 创建日期:2017-11-21上午11:40:05

* 修改日期:

* 作者:ttan

* 描述:修改状态在开始时间

*/

@SuppressWarnings("rawtypes")

public int updateStateAtStartTime(Map params);

/**

*

* 创建日期:2017-11-21上午11:40:08

* 修改日期:

* 作者:ttan

* 描述:修改状态在结束时间

*/

@SuppressWarnings("rawtypes")

public int updateStateAtEndTime(Map params);

}

Dao实现类:

package com.dao.schedulerLockJob.impl;

import java.sql.Timestamp;

import java.util.List;

import java.util.Map;

import org.springframework.stereotype.Repository;

import com.dao.JdbcTemplateSupport;

import com.dao.schedulerLockJob.IBasicJobConfigDao;

import com.util.DateUtil;

import com.util.string.StringUtil;

/**

* 创建日期:2017-11-21下午2:18:10

* 修改日期:

* 作者:ttan

* 描述:

*/

@Repository("basicJobConfigDaoImpl")

public class BasicJobConfigDaoImpl extends JdbcTemplateSupport implements IBasicJobConfigDao {

@SuppressWarnings("rawtypes")

@Override

public List getCount(Map params) {

String sql = "SELECT * FROM QRTZ_BASEJOB_CONFIG WHERE KEY_NAME = ?";

String KEY_NAME = StringUtil.toString(params.get("keyName"));

return this.query(sql,new Object[]{KEY_NAME});

}

@SuppressWarnings("rawtypes")

@Override

public int updateStateByTimeOut(Map params) {

String sql = "UPDATE QRTZ_BASEJOB_CONFIG SET JOB_STATE = '0' " +

"WHERE KEY_NAME = ? AND JOB_STATE = '1' " +

"AND ACTUAL_PRE_TIME <= ?";

String KEY_NAME = StringUtil.toString(params.get("keyName"));

String checkTime = StringUtil.toString(params.get("checkTime"));

Timestamp timeStamp1 = DateUtil.convertStringToTimestamp(checkTime, "1");

return this.update(sql, new Object[]{KEY_NAME,timeStamp1});

}

@SuppressWarnings("rawtypes")

@Override

public int updateStateAtStartTime(Map params) {

String sql ="UPDATE QRTZ_BASEJOB_CONFIG SET JOB_STATE = '1', " +

"ACTUAL_PRE_TIME = current timestamp WHERE KEY_NAME = ? " +

"AND JOB_STATE = '0'";

String KEY_NAME = StringUtil.toString(params.get("keyName"));

return this.update(sql, new Object[]{KEY_NAME});

}

@SuppressWarnings("rawtypes")

@Override

public int updateStateAtEndTime(Map params) {

String sql ="UPDATE QRTZ_BASEJOB_CONFIG SET JOB_STATE = '0' " +

"WHERE KEY_NAME = ? " +

"AND JOB_STATE = '1'";

String KEY_NAME = StringUtil.toString(params.get("keyName"));

return this.update(sql, new Object[]{KEY_NAME});

}

@SuppressWarnings("rawtypes")

@Override

public int insertJobConfig(Map params) {

String sql = "INSERT INTO QRTZ_BASEJOB_CONFIG(KEY_NAME,JOB_STATE,ACTUAL_PRE_TIME,CREATE_TIME) VALUES(?,?,?,?)";

String KEY_NAME = StringUtil.toString(params.get("keyName"));

String JOB_STATE = StringUtil.toString(params.get("jobState"));

String ACTUAL_PRE_TIME = StringUtil.toString(params.get("actualPreTime"));

String CREATE_TIME = StringUtil.toString(params.get("createTime"));

Timestamp timeStamp1 = DateUtil.convertStringToTimestamp(ACTUAL_PRE_TIME, "1");

Timestamp timeStamp2 = DateUtil.convertStringToTimestamp(CREATE_TIME, "1");

return this.update(sql, new Object[]{KEY_NAME,JOB_STATE,timeStamp1,timeStamp2});

}

@SuppressWarnings("rawtypes")

@Override

public int insertJobConfigRecord(Map params) {

String sql = "INSERT INTO QRTZ_BASEJOB_CONFIG_RECORD(ID,KEY_NAME,START_TIME,END_TIME,COST_TIME,IP,CREATE_TIME) " +

"VALUES(?,?,?,?,?,?,?)";

long ID = Long.parseLong(StringUtil.toString(params.get("id")));;

String KEY_NAME = StringUtil.toString(params.get("keyName"));

String START_TIME_STR = StringUtil.toString(params.get("startTime"));

String END_TIME_STR = StringUtil.toString(params.get("endTime"));

long COST_TIME = (Long) params.get("costTime");

String IP = StringUtil.toString(params.get("ipAdress"));

String CREATE_TIME_STR = StringUtil.toString(params.get("createTime"));

Timestamp START_TIME = DateUtil.convertStringToTimestamp(START_TIME_STR, "1");

Timestamp END_TIME = DateUtil.convertStringToTimestamp(END_TIME_STR, "1");

Timestamp CREATE_TIME = DateUtil.convertStringToTimestamp(CREATE_TIME_STR, "1");

return this.update(sql,new Object[]{ID,KEY_NAME,START_TIME,END_TIME,COST_TIME,IP,CREATE_TIME});

}

}

总结:经过一部分测试,暂时没有发现什么问题,缺乏大剂量数据操作的测试。 不过合理设置超时时间基本可以避免数据的出错。

定时任务分布式锁的一种实现(数据库锁)

医学医生手工作与现代计算机接口

相关推荐