RocketMQ 源码之 异步和同步请求是怎么做到的
不管是DefaultMQProducer还是DefaultMQPushConsumer,本质都是封装类,发起请求的实际上是RemotingClient,
它的start方法调用之后,启动了一个netty的客户端bootstrap,每次需要与nameService或者broker进行连接的时候,调用
getAndCreateChannel方法,从一个map中创建或者获取channel(创建的时候nameService和broker两者的区别在于addr参数是不是为null),
连接建立之后,发起请求调用的是invokeSync和invokeAsync,点进去看:
同步invokeSync的实现是新建一个responseFuture,放到responseTable中(key是自增的requestId),然后调用channel.writeAndFlush(request),
发起请求,最后调用responseFuture.waitResponse,等待响应。让线程等待用的是countDownLatch,那么latch之后怎样放行呢?
数据的发出是writeAndFlush,进来就应该是在channel的read方法中,去查看bootstrap的构造过程,发现添加的handler中有
NettyClientHandler,点进去一看重写了channelRead0方法(在其父类的channelRead方法被调用),里面有processMessageReceived,
点进去发现根据收到的RemotingCommand的type,是对方的主动请求还是对自己之前请求的应答,在我们这里,应该是后者,所以
进入processResponseCommand,一看,果然是从上面说的responseTable再把responseFuture取出来,如果有回调方法,这里是同步请求
,没有(待会说的异步请求才有),那么进入responseFuture.putResponse(cmd),点进去看,看见了countDownLatch.countDown(),至此
同步方法逻辑通了。
一些思考:其实不管是同步还是异步, 请求过去再回来,