springcloud(一)——spring-cloud-alibaba集成rocketmq
前言
在之前的工作中,微服务框架使用的是springcloud,消息中间件使用的rocketmq,这段时间看到阿里出了spring cloud alibaba集成了rocketmq,出于好奇,写了个demo
一些概念
- 官方对 Spring Cloud Stream 的一段介绍:Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。基于 SpringBoot 创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration 与 Broker 进行连接。
- Binder :Components responsible to provide integration with the external messaging systems.【与外部消息中间件进行集成】
- Binding:Bridge between the external messaging systems and application provided Producers and Consumers of messages (created by the Destination Binders).【在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,开发者只需使用应用程序的 生产者或消费者生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。】
- Message: The canonical data structure used by producers and consumers to communicate with Destination Binders (and thus other applications via external messaging systems).【生产者和消费者用于与目标绑定器通信的规范数据结构。】
快速在本地启动rocketmq
第一步:下载:https://www.apache.org/dyn/cl...
第二步:解压
第三步:修改三个配置文件:runbroker.sh,runserver.sh,tools.sh,将其中JAVA_HOME改成自己电脑的环境配置,修改完如下
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=自己的地址 #[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java [ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
第四步:依次执行命令
./mqnamesrv ./mqbroker -n localhost:9876 ./mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic
如果启动成功,没有报错,代表启动成功哈,下面就可以开发了
开发demo
第一步:导入相关的pom
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>0.2.1.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency>
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 为了Endpoint 信息查看 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>io.dropwizard.metrics</groupId> <artifactId>metrics-core</artifactId> <version>3.2.6</version> </dependency>
第二步:建一个springboot项目,启动类如下:
@SpringBootApplication @EnableBinding({ Source.class, Sink.class }) public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
第三步:创建provider
@Service public class RocketmqProducer { public void send(String message) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("test_producer_group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); Message msg = new Message("test-topic", "test-tag", message.getBytes()); producer.send(msg); } }
第四步:创建consumer
@Service public class ReceiveService { /** * 默认是input,在Sink类中指定,如果想要多个input,需要写一个实现Sink的类 * @param receiveMsg */ @StreamListener("input") public void receiveInput1(String receiveMsg) { System.out.println("input receive: " + receiveMsg); } }
第五步:加入配置文件:
server.port=8087 spring.application.name=spring-cloud-alibaba-rocketmq-demo # 配置rocketmq的nameserver地址 spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876 # 定义name为output的binding spring.cloud.stream.bindings.output.destination=test-topic spring.cloud.stream.bindings.output.content-type=application/json #定义name为input的binding spring.cloud.stream.bindings.input.destination=test-topic spring.cloud.stream.bindings.input.content-type=application/json spring.cloud.stream.bindings.input.group=test-group management.endpoint.health.show-details=always
第六步:写一个controller,启动项目,访问接口
@RestController @RequestMapping(value = "/api/demo/test") public class TestController { @Autowired RocketmqProducer rocketmqProducer; @RequestMapping(value = "/send", method = RequestMethod.GET) public String send() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { rocketmqProducer.send("test rocketmq message"); return "success"; } }
会看到控制台输出:input receive: test rocketmq message
Endpoint 信息查看
浏览器输入:http://127.0.0.1:8087/actuator/rocketmq-binder
结语
这一篇文章只是将spring cloud stream 和 rocketmq跑通了,其实对于spring cloud stream和rocketmq还是学习的阶段,只能感叹spring cloud博大精深
更多网站可以访问https://www.zplxjj.com或关注公众号:
相关推荐
LCFlxfldy 2020-08-17
IT农场 2020-11-13
ljcsdn 2020-07-27
LCFlxfldy 2020-07-05
lypgcs 2020-06-27
陈晨软件五千言 2020-06-17
qingyuerji 2020-06-14
MojitoBlogs 2020-06-14
lypgcs 2020-06-14
陈晨软件五千言 2020-06-14
meilongwhpu 2020-06-13
陈晨软件五千言 2020-06-11
qingyuerji 2020-06-09
MojitoBlogs 2020-06-09
meilongwhpu 2020-06-08
meilongwhpu 2020-06-08
lypgcs 2020-06-07
MojitoBlogs 2020-06-04
meilongwhpu 2020-05-30