springboot kafka发送消息支持成功失败通知
springboot集成kafka是比较简单的是事情,但是kafka发送消息的失败回调在日常工作中,如果不容忍消息丢失的话,发送失败需要再次发送或者放到数据库中用任务重推。
以下是演示用的发送类代码
@Slf4j
@Component
public class TestRunner implements ApplicationRunner {
@Autowired
KafkaTemplate kafkaTemplate;
@Override
public void run(ApplicationArguments args) throws Exception {
KafkaMsgEntity kafkaMsgEntity = new KafkaMsgEntity();
kafkaMsgEntity.setActionName("login");
String tmpStr = "id:%d,msg:login";
for (int i = 1; i < 500; i++) {
String tmpStr1 = tmpStr.replace("%d", String.valueOf(i));
Thread.sleep(500);
kafkaMsgEntity.setMsgBody(tmpStr1);
kafkaTemplate.send("test", JSON.toJSONString(kafkaMsgEntity)).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof KafkaProducerException) {
String value = (String) ((KafkaProducerException) throwable).getProducerRecord().value();
log.info("{} get throwable msg:{}", value, throwable.getMessage());
} else {
log.info("get throwable msg:{}", throwable.getMessage());
}
}
@Override
public void onSuccess(SendResult<String, String> o) {
log.info("{}, success", o.getProducerRecord().value());
}
});
}
}
}在kafka运行过程中kill进程达到异常发送的条件。

相关推荐
sweetgirl0 2020-06-28
Lzs 2020-10-23
聚合室 2020-11-16
零 2020-09-18
Justhavefun 2020-10-22
ChaITSimpleLove 2020-10-06
周游列国之仕子 2020-09-15
afanti 2020-09-16
88234852 2020-09-15
YClimb 2020-09-15
风雨断肠人 2020-09-04
卖口粥湛蓝的天空 2020-09-15
stulen 2020-09-15
pythonxuexi 2020-09-06
abfdada 2020-08-26