实现消息推送核心搭建(升级版)
前面已经实现了第一版实时消息推送核心框架搭建,下面就在第一版基础上加以升级完善。
使用技术框架:分布式调度 http://git.oschina.net/hugui/light-task-scheduler 请自行玩玩 这里简单介绍下。
light-task-scheduler 调度框架基本原理和第一版实现一样不过这里更加完善的是该框架实现了消息重发,当消息失败消息会被重新保存起来定期重发这里可以具体了解下框架自身原理,该框架本身已是生产者-消费者模式 在升级版中主要使用到了 jobclient:产生消息 jobtracker:处理连接client和task tasktracer:负责处理任务 queue:消息队列支持数据库队列 。在使用该框架的是时候需要打包jar在项目中引入。具体框架整合可以了解下框架本身。下面主要贴上消息生产 处理 以及自身使用redis作为消息存取配合任务调度使用
消息生产:
@Override
public void sendMessage(Message msMessage) {
//写入redis缓存系统队列
if(redisUtils.redisCheckStatus()){
String id=UUID.randomUUID().toString();
ArrayList<Message> messages=new ArrayList<Message>();
msMessage.setRedisstatus(1);//进入redis队列
msMessage.setId(id);
//消息加入redis
redisUtils.addMessageQueue(id, msMessage);
Job job = new Job();
job.setTaskId(id);
job.setParam("messageid",id);
job.setTaskTrackerNodeGroup("message_trade_TaskTracker"); //
job.setNeedFeedback(true);
job.setReplaceOnExist(true); // 当任务队列中存在这个任务的时候,是否替换更新
job.setCronExpression(null);//立即执行
Response response = jobClient.submitJob(job);
logger.info("执行结果:"+response.getMsg());
}else{
msMessage.setRedisstatus(0);//未进入redis队列
}
//写入数据库 防止消息丢失
try {
Session session=getHibernateTemplate().getSessionFactory().openSession();
Transaction transaction=session.beginTransaction();
session.save(msMessage);
transaction.commit();
session.close();
} catch (HibernateException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
消息处理:
@Override
public Result run(Job job) throws Throwable {
Result result=null;
try {
// TODO 业务逻辑
logger.error("我要执行:" + job);
String messageid=job.getParam("messageid");
boolean backresult=true;
if(!StringUtils.isEmpty(messageid)){
Message message=redisUtils.getMessageQueue(messageid);
if(message!=null){
backresult=sendMsg(message);//发送消息
if(backresult){
result=new Result(Action.EXECUTE_SUCCESS, "推送成功");
}else{
result=new Result(Action.EXECUTE_LATER, "推送失败,稍后重新推送");
}
}
}
} catch (Exception e) {
logger.info("Run job failed!", e);
result=new Result(Action.EXECUTE_LATER, e.getMessage());
}
return result;
}
注意:
<1.消息发送失败需要返回 new Result(Action.EXECUTE_LATER, "推送失败,稍后重新推送"); 通知任务调度该任务需要重发
<2.redis消息取出使用pop该消息就会重redis中删除 注意。。。
升级主要解决自行定时任务,以及消息发送失败重发机制实现。下面把相关代码贴上 没有jar包
dubbo-demo-provider:消息处理系统 dubbo-demo-consumer:测试端