微服务之路(十一)spring cloud stream
前言
场景描述
当客户端向服务端请求,服务端返回出现了异常,对于客户端1返回为NULL,而对于客户端2返回的是正常数据。而服务端并不知道返回给客户端们的数据对不对,只能通过用户反馈来证实返回的错误性,显然是不正确的。
Stream简介
Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。通过使用 Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。但是目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置。
主要议题
- Kafka
- Spring Kafka
- Spring Boot Kafka
- Spring Cloud Stream
- Spring Cloud Stream Kafka Binder
- 问题总结
主体内容
一、Kafka
主要用途
- 消息中间件
- 流式计算处理
- 日志
执行脚本目录bin
E:\JavaEE\kafka-2.5.0-src\kafka-2.5.0-src\bin\windows
同类产品比较
- ActiveMQ:IMS(Java Message Service)规范实现
- RabbitMQ:AMQP(Advanved Message Queue Protocol)规范实现
- Kafka:并非某种规范的实现,它灵活和性能相对是优势的
快速上手步骤
1.下载并解压kafka压缩包。
2.下载并解压zookeeper压缩包,这里官方它的quickstart就是以zookeeper保证强一致性。zookeeper官方地址:https://zookeeper.apache.org/
3.以windows为例,我们先到zookeeper的conf目录下,把zoo_sample.cfg文件复制一份重命名为zoo.cfg。现在目录如下所示:
然后打开cmd,进入bin目录,启动服务。
zkServer.cmd
4.启动kafka。进入到kafka的window文件夹,执行启动命令。
kafka-server-start.bat ../../config/server.properties
5.创建kafka主题。再次打开一个cmd窗口,进入到windows文件夹
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic gupao
6.生产者发送消息/生产消息
kafka-console-producer.bat --broker-list localhost:9092 --topic gupao
然后输入要发送的消息:
7.消费者接收消息/消费消息
重新打开一个cmd,输入接收命令。
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic gupao
当我在生产者端输入消息,消费者端马上就接收到了消息。
如果消费命令后面加上--from beginning参数,那么他会接收到从开始就生产的消息。
那么被消费后的消息能否被其他消费者消费?我们再开一个cmd,利用新的消费者消费。答案是可以的。
使用Kafka标准API
1.从start.spring.io构建项目。
2.新建包raw.api,创建类KafkaProducerDemo,这里就是让生产者通过java api形式进行发送消息。
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.Future; /** * @ClassName * @Describe Kafka Producer Demo使用Kafka原始API * @Author 66477 * @Date 2020/6/1417:15 * @Version 1.0 */ public class KafkaProducerDemo { public static void main(String[] args) throws Exception { //初始化配置 Properties properties = new Properties(); properties.setProperty("bootstrap.servers","localhost:9092"); properties.setProperty("key.serializer", StringSerializer.class.getName()); properties.setProperty("value.serializer",StringSerializer.class.getName());//注意引包 //创建Kafka Producer KafkaProducer<String,String> kafkaProducer = new KafkaProducer(properties); //创建 Kafka消息 String topic = "gupao"; Integer partition=0; Long timestamp= System.currentTimeMillis(); String key="message-key"; String value = "gupao.com"; ProducerRecord<String,String> record = new ProducerRecord<String, String>(topic,partition,timestamp,key,value); //发送Kafka消息 Future<RecordMetadata> metadataFuture = kafkaProducer.send(record); //强制执行 metadataFuture.get(); } }
3.运行以上代码,然后你会发现,cmd窗口的消费者会接收消息。
二、Spring Kafka
那么接下来我们使用Spring整合的kafka。
官方文档
设计模式
Spring社区对data数据操作,有一个基本的模式,Template模式:
- JDBC:jdbcTemplate
- Redis:RedisTemplate
- Kafka:KafkaTemplate
- JMS:JmsTemplate
- Rest:RestTemplate
XXXTemplate一定实现XXXOpeations
Maven依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
三、Spring Boot Kafka
Maven依赖
自动装配器
KafkaAutoConfiguration
其中KafkaTemplate会被自动装配:
@Bean @ConditionalOnMissingBean({KafkaTemplate.class}) public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) { KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory); messageConverter.ifUnique(kafkaTemplate::setMessageConverter); kafkaTemplate.setProducerListener(kafkaProducerListener); kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic()); return kafkaTemplate; }
关闭Spring Security
依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-security</artifactId> </dependency>
import org.springframework.context.annotation.Configuration; import org.springframework.security.config.annotation.web.builders.WebSecurity; import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity; import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter; @Configuration @EnableWebSecurity public class SecurityConfig extends WebSecurityConfigurerAdapter { @Override public void configure(WebSecurity web) throws Exception { web.ignoring().antMatchers("/**"); } }
创建生产者
1.我们继续在上面的项目动刀子,我们先在application.properties文件转移之前demo类中配置。
#定义应用名称 spring.application.name=spring-cloud-stream-kafka #配置端口 server.port=8080 #Spring Kafka配置信息 spring.kafka.bootstrap-servers=localhost:9092 #配置需要的kafka主题 kafka.topic = gupao #生产者配置 spring.kafka.producer.bootstrap-servers=localhost:9092 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
2.然后编写一个controller类。
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * @ClassName * @Describe Kafka生产者Controller * @Author 66477 * @Date 2020/6/1418:07 * @Version 1.0 */ @RestController public class KafkaProducerController { private final KafkaTemplate<String,String> kafkaTemplate; private final String topic; @Autowired public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate, @Value("${kafka.topic}") String topic) { this.kafkaTemplate = kafkaTemplate; this.topic = topic; } @PostMapping("/message/send") public Boolean sendMessage(@RequestParam String message){ kafkaTemplate.send(topic,message); return true; } }
3.通过postman访问http://localhost:8080/message/send。
4.打开cmd消费端窗口,发现消息成功接收。
创建消费者
5.同样地,我们开始配置消费者,先去application.properties文件增加消费者配置。
#消费者配置 spring.kafka.consumer.group-id=gupao-1 spring.kafka.consumer.key-Derializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-Derializer=org.apache.kafka.common.serialization.StringDeserializer
6.因为消费者它是以监听的形式监听消息的,所以我们创建一个KafkaConsumerListener监听类。通过@KafkaListener来监听改主题的消息。
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * @ClassName * @Describe Kafka消费者监听器 * @Author 66477 * @Date 2020/6/1418:25 * @Version 1.0 */ @Component public class KafkaConsumerListener { @KafkaListener(topics ="${kafka.topic}" ) public void onMessage(String message){ System.out.println("Kafka消费者监听器接收到消息:"+message); } }
7.随后postman访问http://localhost:8080/message/send。控制台则会打印出:
2020-06-14 18:32:30.970 INFO 451856 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0 2020-06-14 18:32:30.970 INFO 451856 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84 2020-06-14 18:32:30.971 INFO 451856 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1592130750970 2020-06-14 18:32:30.976 INFO 451856 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: i1-NXUmvQRyaT-E27LPozQ Kafka消费者监听器接收到消息:hello world
四、Spring Cloud Stream
加上本章中的stream,上一篇中的架构图又丰富了些东西。
其中
- RabbitMQ:AMQP、jms规范。
- kafka:相对松散的消息队列协议
基本概念
Source:来源,近义词:Producer,Publisher
Sink:接收器,近义词:Consumer,Subcriber
Processor:对于上流而言是Sink,对于下流而言是Source
Reactive Streams
- Publisher
- Subscriber
- Processor
代码示例
1.我们拷贝上面的spring cloud stream kafka项目,导入IDEA。
2.启动zookeeper,参考以上。
3.启动kafka,参考以上。
4.我们需要引入spring cloud stream依赖。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency>
5.创建一个stream包,包下再创建producer包,创建一个类MessageProducerBean
消息大致分为两个部分,消息头和消息体。
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * @ClassName * @Describe TODO * @Author 66477 * @Date 2020/6/1421:58 * @Version 1.0 */ @Component @EnableBinding(Source.class) public class MessageProducerBean { @Autowired @Qualifier(Source.OUTPUT) private MessageChannel messageChannel; @Autowired private Source source; /** * 发送消息 * @param message 消息内容 */ public void send(String message){ //通过消息管道发送消息 source.output().send(MessageBuilder.withPayload(message).build()); } }
改写一下我们之前写的controller,增加另一种方式的接口。
import com.example.stream.producer.MessageProducerBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * @ClassName * @Describe Kafka生产者Controller * @Author 66477 * @Date 2020/6/1418:07 * @Version 1.0 */ @RestController public class KafkaProducerController { private final KafkaTemplate<String,String> kafkaTemplate; private final String topic; private final MessageProducerBean messageProducerBean; @Autowired public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate, @Value("${kafka.topic}") String topic, MessageProducerBean messageProducerBean) { this.kafkaTemplate = kafkaTemplate; this.topic = topic; this.messageProducerBean = messageProducerBean; } /** * 通过KafkaTemplate发送{@link KafkaTemplate} * @param message * @return */ @PostMapping("/message/send") public Boolean sendMessage(@RequestParam String message){ kafkaTemplate.send(topic,message); return true; } /** * 通过消息生产者Bean发送{@link com.example.stream.producer.MessageProducerBean} * @param message * @return */ @GetMapping("/message/send") public Boolean send(@RequestParam String message){ messageProducerBean.send(message); return true; } }
6.我们需要给引入依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
之前没有加上spring cloud 版本,现在要加上:
<properties> <java.version>1.8</java.version> <spring-cloud.version>Hoxton.BUILD-SNAPSHOT</spring-cloud.version> </properties>
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
7.这时我们再启动cmd中的consumer消费者。
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic gupao
8.注释掉之前配置的生产者序列化。
#生产者配置 #spring.kafka.producer.bootstrap-servers=localhost:9092 #spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer #spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
9.Postman分别以GET,POST方式访问http://localhost:8080/send/message,发现消费者正常收到消息。
拓展:如果想要多主题怎么办,那么我能不能仿造Source接口,搭建一个属于自己的管道呢?我们也在stream包下创建一个message包,message包下创建一个接口(仿Source)MyMessageSource.
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * @ClassName * @Describe TODO * @Author 66477 * @Date 2020/6/1423:27 * @Version 1.0 */ public interface MyMessagesSource { /** * 消息来源的管道名称 */ String NAME="gupao"; @Output(NAME) MessageChannel gupao(); }
然后我们仿造之前写的MessageProducerBean,再整一套自己的,也就是自定义消息发送源。
import com.example.stream.message.MyMessagesSource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * @ClassName * @Describe TODO * @Author 66477 * @Date 2020/6/1421:58 * @Version 1.0 */ @Component @EnableBinding({Source.class, MyMessagesSource.class}) public class MessageProducerBean { @Autowired @Qualifier(Source.OUTPUT) private MessageChannel messageChannel; @Autowired private Source source; @Autowired @Qualifier(MyMessagesSource.NAME)//Bean名称 private MessageChannel gupaoMessageChannel; @Autowired private MyMessagesSource myMessagesSource; /** * 发送消息 * @param message 消息内容 */ public void send(String message){ //通过消息管道发送消息 source.output().send(MessageBuilder.withPayload(message).build()); } /** * 发送消息 * @param message 消息内容 */ public void sendToGupao(String message){ //通过消息管道发送消息 myMessagesSource.gupao().send(MessageBuilder.withPayload(message).build()); } }
在application.properties文件增加一行属于自己的主题配置
spring.cloud.stream.bindings.gupao.destination=mygupao
这时去消费者监听类里面增加监听主题。
@KafkaListener(topics ="mygupao" ) public void onGupaoMessage(String message){ System.out.println("Kafka消费者监听器接收到主题mygupao消息:"+message); }
我们去cmd黑窗口,把刚刚主题为gupao的停掉,改成mygupao主题监听。
E:\JavaEE\kafka\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic mygupao
最后去controller类增加一个接口,用于发送消息至我们新创建的管道。
/** * 通过消息生产者Bean发送{@link com.example.stream.producer.MessageProducerBean} * @param message * @return */ @GetMapping("/message/sendToGupao") public Boolean sendToGupao(@RequestParam String message){ messageProducerBean.sendToGupao(message); return true; }
由于我之前杀死过8080端口,导致zookeeper进程被杀了(它也是8080端口),所以我将stream项目改成8081,重新启动了zookeeper,postman测试一下我们http://localhost:8081/message/sendToGupao。控制台信息如下:
2020-06-15 00:00:24.170 INFO 171780 --- [nio-8081-exec-3] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0 2020-06-15 00:00:24.170 INFO 171780 --- [nio-8081-exec-3] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84 2020-06-15 00:00:24.170 INFO 171780 --- [nio-8081-exec-3] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1592150424170 2020-06-15 00:00:24.174 INFO 171780 --- [ad | producer-3] org.apache.kafka.clients.Metadata : [Producer clientId=producer-3] Cluster ID: i1-NXUmvQRyaT-E27LPozQ Kafka消费者监听器接收到主题mygupao消息:mygupaoaaa
cmd窗口如下:
同样地,我们也可以创建一个消息消费Bean用于接收消息。
在stream包下继续创建一个consumer包,包下创建名为MessageConsumerBean的Bean。用来实现标准Sink监听,
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.messaging.SubscribableChannel; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * @ClassName * @Describe 消息消费Bean * @Author 66477 * @Date 2020/6/1520:25 * @Version 1.0 */ @Component @EnableBinding({Sink.class}) public class MessageConsumerBean { @Autowired @Qualifier(Sink.INPUT)//Bean名称 private SubscribableChannel subscribableChannel; @Autowired private Sink sink; //那么订阅消息有多种方式 //方式一:通过SubscribableChannel订阅消息 //当字段注入完成后的回调 @PostConstruct public void init(){ subscribableChannel.subscribe(new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { System.out.println(message.getPayload()); } }); } //方式二:通过@ServiceActivator方式订阅消息 @ServiceActivator(inputChannel = Sink.INPUT) public void onMessage(Object message){ System.out.println("onMessage :"+message); } //方式三:通过@StreamListener实现 @StreamListener(Sink.INPUT) public void onMessage(String message){ System.out.println("StreamListener:"+message); } }
application.properties下也要增加对应的input主题项了。
spring.cloud.stream.bindings.input.destination=${kafka.topic}
五、Spring Cloud Stream Kafka Binder(RabbitMQ)
我们复制一下上面的项目,准备为stream rabbitmq做准备。重命名为spring-cloud-stream-rabbitmq重新导入IDEA,里面pom文件的artifactId也要修改。清除掉有关kafka的代码,application.properties清除关于kafka的生产者,消费者配置。
现在项目结构如下:
修改依赖为
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency>
其中MessageConsumerBean
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.messaging.SubscribableChannel; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * @ClassName * @Describe 消息消费Bean * @Author 66477 * @Date 2020/6/1520:25 * @Version 1.0 */ @Component @EnableBinding({Sink.class}) public class MessageConsumerBean { @Autowired @Qualifier(Sink.INPUT)//Bean名称 private SubscribableChannel subscribableChannel; @Autowired private Sink sink; //方式一:通过Subcribe订阅消息 //当字段注入完成后的回调 @PostConstruct public void init(){ //实现异步回调 subscribableChannel.subscribe(new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { System.out.println("subscribe:"+message.getPayload()); } }); } //方式二:通过@ServiceActivator @ServiceActivator(inputChannel = Sink.INPUT) public void onMessage(Object message){ System.out.println("onMessage :"+message); } //方式三:通过@StreamListener实现 @StreamListener(Sink.INPUT) public void onMessage(String message){ System.out.println("StreamListener:"+message); } }
MyMessagesSource
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * @ClassName * @Describe TODO * @Author 66477 * @Date 2020/6/1423:27 * @Version 1.0 */ public interface MyMessagesSource { /** * 消息来源的管道名称 */ String NAME="gupao"; @Output(NAME) MessageChannel gupao(); }
MessageProducerBean
import com.example.rabbitmq.stream.message.MyMessagesSource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * @ClassName * @Describe TODO * @Author 66477 * @Date 2020/6/1421:58 * @Version 1.0 */ @Component @EnableBinding({Source.class, MyMessagesSource.class}) public class MessageProducerBean { @Autowired @Qualifier(Source.OUTPUT) private MessageChannel messageChannel; @Autowired private Source source; @Autowired @Qualifier(MyMessagesSource.NAME)//Bean名称 private MessageChannel gupaoMessageChannel; @Autowired private MyMessagesSource myMessagesSource; /** * 发送消息 * @param message 消息内容 */ public void send(String message){ //通过消息管道发送消息 source.output().send(MessageBuilder.withPayload(message).build()); } /** * 发送消息 * @param message 消息内容 */ public void sendToGupao(String message){ //通过消息管道发送消息 myMessagesSource.gupao().send(MessageBuilder.withPayload(message).build()); } }
MessageProducerController
import com.example.rabbitmq.stream.producer.MessageProducerBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * @ClassName * @Describe Rabbitmq生产者Controller * @Author 66477 * @Date 2020/6/1418:07 * @Version 1.0 */ @RestController class MessageProducerController { private final MessageProducerBean messageProducerBean; private final String topic; MessageProducerController(MessageProducerBean messageProducerBean, @Value("${kafka.topic}") String topic) { this.messageProducerBean = messageProducerBean; this.topic = topic; } /** * 通过消息生产者Bean发送{@link MessageProducerBean} * @param message * @return */ @GetMapping("/messageProducer/send") public Boolean send(@RequestParam String message){ messageProducerBean.send(message); return true; } /** * 通过消息生产者Bean发送{@link MessageProducerBean} * @param message * @return */ @GetMapping("/message/sendToGupao") public Boolean sendToGupao(@RequestParam String message){ messageProducerBean.sendToGupao(message); return true; } }
application.properties
#定义应用名称 spring.application.name=spring-cloud-stream-rabbitmq #配置端口 server.port=8081 #Spring Kafka配置信息 spring.kafka.bootstrap-servers=localhost:9092 #配置需要的kafka主题 kafka.topic = gupao #定义Spring Cloud Stream Source消息去向 #针对kafka而言,基本模式如下 #spring.cloud.stream.bindings.${channel-name}.destination=${kafka.topic} spring.cloud.stream.bindings.output.destination=${kafka.topic} spring.cloud.stream.bindings.gupao.destination=mygupao spring.cloud.stream.bindings.input.destination=${kafka.topic}
六、问题总结
1.当时用Future时,异步调用都可以使用get()方式强制执行吗?
解答:是的,get等待当前线程执行完毕,并且获取返回接口。
和kafka consumer有啥区别?
解答:没有实质区别,主要是编程模式。
@KafkaListener采用注解驱动
kafka consumer API 采用接口编程。
3.消费者接收消息的地方在哪?
解答:订阅并且处理后就消失了。
4.生产环境配置多个生产者和消费者只需要定义不同的group就可以了吗?
解答:group是一种,要看是不是相同topic。
5.为了不丢失数据,消息队列的容错,和排错后的处理,如何实现的?
解答这个依赖于zookeeper。
6.异步接收除了打印还有什么办法处理消息吗?
解答:可以处理其他逻辑,比如存储数据库。
7.kafka适合什么场景下使用?
解答:高性能的Stream处理。
8.Kafka消息一直都在,内存占用会很多吧,消息量不停产生消息咋办?
解答:kafka还是会删除的,并不是一直存在。
9.怎么没看到broker配置?
解答:broker不需要设置,它是单独启动。
10.consumer为什么要分组?
解答:consumer需要定义不同逻辑分组,相同主题里面不同分组,便于管理。
有什么用?
解答:@EnableBinding将Source、Sink以及Processor提升成相应的代理.
Source source 这种写法是默认用官方的实现?
解答:是官方的实现。
13.这么多消息框架,各自有点是什么,怎么选取?
解答:RabbitMQ:AMQP,JMS规范
kafka:相对松散的消息队列协议
ActiveMQ:AMQP,JMS规范
14.如果中间件有问题怎么办,我们只管用,不用维护吗?现在遇到的很多问题不是使用,而是俄日胡,中间件一有问题,消息堵塞或者丢失,只有重启?
解答:消息中间件无法保证不丢消息,多数高一致性的消息背会还是有持久化的。
,@EnableZuulProxy,@EnableDiscoverClient这些注解都是通过特定BeanPostProcessor实现的吗?
解答:不完全对,主要处理接口在@Import:
- ImportSelector实现类
- ImportBeanDefinitionsRegistrar实现类
- @Configuration标注类
- BeanPostProcessor实现类
16.我对流式处理还是懵懵的,到底啥事流式处理,怎样才能称为流式处理,一般应用在什么场景?
解答:Stream处理简单的说,异步处理,消息是一种处理方式。
提交申请,机器生产,对于高密度提交任务,多数场景采用异步处理,Stream,Evnet-Driven。举例说明:审核流程,鉴别黄图。
17.如果是大量消息,怎么快速消费,用多线程吗?
解答:确实是使用多线程,不过不一定奏效,依赖于处理的具体内容,比如:一个线程使用了25%的CPU,四个线程就将cpu耗尽,因此,并发100个处理,实际上还是4个线程在处理。I/O密集型,CPU密集型。大多数是多线程,其实也单线程,流式非阻塞。
18.购物车的价格计算可以使用流式计算来处理吗?能说下思路吗?有没有什么高性能的方式推荐?
解答:当商品添加到购物车的时候,就可以开始计算了。