用Spring Cloud Stream构建消息驱动的微服务
Spring Cloud Stream 是一个用于构建“基于事件驱动的、与共享消息系统相连接的高度可扩展微服务”的框架,并提供了许多抽象和原语,以简化Spring生态系统消息驱动应用程序的开发。
核心概念:
Spring Cloud Stream的应用程序模型
应用程序通过inputs或者outputs来与Binder交互,其通过配置来绑定,Binder负责与中间件交互。
Binder抽象
提供与外部消息中间件集成的组件。
目前只提供了RabbitMQ和Kafka的Binder实现。
通过使用它所提供的扩展API来实现其他中间件的Binder。
持久的发布-订阅模型支持
消息通信方式遵循发布-订阅模式。
消费者组支持
当一个应用程序不同实例放置在一个具有竞争关系的消费组中,组里面的实例中只有一个能够消费消息。
消费者类型:
Message-driven (消息驱动型,有时简称为异步)
Polled (轮询型,有时简称为同步)
分区支持
分区的作用就是为了确保具有共同特征标识的数据由同一个消费者实例进行处理。
可拔插的Binder API
Spring Cloud Stream 提供了三个绑定消息通道的默认实现:
Sink:通过指定消费消息的目标来标识消息使用者的约定。
Source:与Sink相反,用于标识消息生产者的约定。
Processor:集成了Sink和Source的作用,标识消息生产者和使用者。
也可以自定义消息通道:
public interface OrderOutputChannel { String OUTPUT = "output"; @Output(OrderOutputChannel.OUTPUT) MessageChannel output(); } public interface OrderInputChannel { String INPUT = "input"; @Input(OrderInputChannel.INPUT) SubscribableChannel input(); }
创建消息生产者工程:
pom.xml的关键配置:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.7.RELEASE</version> <relativePath/> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Finchley.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
application.yml文件的配置信息:
server: port: 5502 spring: application: name: service-stream-sender rabbitmq: host: 192.168.134.134 port: 5672 username: guest password: guest cloud: stream: bindings: output: #通道名 destination: order #目的地 content-type: application/json #消息格式 group: default #消费组名
启动类:
@SpringBootApplication @RestController @EnableBinding(OrderOutputChannel.class) //启用与消息通道的绑定 public class Main { /** * 此处使用自定义的消息通道 */ @Autowired private OrderOutputChannel outputChannel; public static void main(String[] args) { SpringApplication.run(Main.class, args); } @GetMapping("/index") public String index(){ //将消息通过channel发送到目的地 User user = new User("cjm", "123"); Message<User> message = MessageBuilder.withPayload(user).build(); outputChannel.output().send(message); //Bean对象会转成json字符串存储到目的地 return "service stream sender"; } }
创建消息消费者工程:
pom.xml关键配置:
参考消息生产者工程。
application.yml文件的配置信息:
server: port: 5501 spring: application: name: service-stream-receiver rabbitmq: host: 192.168.134.134 port: 5672 username: guest password: guest cloud: stream: bindings: input: #通道名 destination: order #目的地 content-type: application/json #消息格式 group: default #消费组名,添加group后队列就是持久化的了
启动类:
@SpringBootApplication @RestController @EnableBinding({OrderInputChannel.class}) //启用与消息通道的绑定 public class Main { private String message = ""; public static void main(String[] args) { SpringApplication.run(Main.class, args); } @GetMapping("/index") public String index(){ return "service stream receiver: " + message; } /** * 监听指定通道,通过该通道接收指定目的地的消息 */ @StreamListener(OrderInputChannel.INPUT) public void receive(String payload) { message = payload; System.out.println("Received1: " + payload); } /** * 将json格式的消息转成User对象 */ @StreamListener(OrderInputChannel.INPUT) public void receive2(User user) { System.out.println(user.getClass().getName()); System.out.println("usernaem=" + user.getUsername() + ", password=" + user.getPassword()); } }