rabbitmq实现高吞吐量的rpc调用
rabbitmq实现rpc调用基本思路:
客户端(client):客户端发起rpc调用,这当成一个消息,发送到rabbitmq服务器。这个消息会携带两个特殊(额外)的信息,一个是调用序号,一个是回调队列名称。调用序号需要服务端原样返回,而回调队列名称是用于服务端将结果放入这个队列中,以便客户端取回结果。
服务端(service):服务端接收到了一个rpc调用后,执行调用代码,将结果返回到指定的回调队列中去。
这里我们可以约定一个数据模型,见代码:
publicclassRpcInvokeModelimplementsSerializable{
//真实传输的数据
privateObjecttarget;
//调用序号
privateLonginvokeFlag;
//回调队列名称
privateStringcallBackQueue;
}
客户端将所有的调用封装成这个模型,然后发送出去。服务端解析成这个模型,当然,你可以再这个模型中加入别的参数(如方法名称等等)。
客户端代码:
packagecom.pdy.rabbitmq.rpc;
importjava.io.IOException;
importjava.util.concurrent.TimeoutException;
importcom.rabbitmq.client.Channel;
importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.ConnectionFactory;
/**
*Representsaconnectionwithaqueue
*
*@authorsyntx
*
*/
publicabstractclassClientEndPoint{
protectedChannelproducerChannel;
protectedChannelconsumerChannel;
protectedConnectionconnection;
protectedStringendPointName;
publicClientEndPoint(StringendpointName)throwsIOException,
TimeoutException{
this.endPointName=endpointName;
//Createaconnectionfactory
ConnectionFactoryfactory=newConnectionFactory();
//hostnameofyourrabbitmqserver
//factory.setVirtualHost("pdy");
factory.setHost("localhost");
//factory.setUsername("pdy");
//factory.setPassword("pdy");
//factory.setVirtualHost("pdy11");
//gettingaconnection
connection=factory.newConnection();
//creatingachannel
producerChannel=connection.createChannel();
consumerChannel=connection.createChannel();
producerChannel.queueDeclare(endpointName,false,false,false,null);
}
/**
*关闭channel和connection。并非必须,因为隐含是自动调用的。
*
*@throwsIOException
*@throwsTimeoutException
*/
publicvoidclose()throwsIOException,TimeoutException{
this.producerChannel.close();
this.consumerChannel.close();
this.connection.close();
}
}
packagecom.pdy.rabbitmq.rpc;
importjava.io.IOException;
importjava.util.Arrays;
importjava.util.HashMap;
importjava.util.Map;
importjava.util.Random;
importjava.util.concurrent.TimeoutException;
importorg.apache.commons.lang3.SerializationUtils;
importcom.rabbitmq.client.AMQP.BasicProperties;
importcom.rabbitmq.client.AMQP.Queue.DeclareOk;
importcom.rabbitmq.client.Consumer;
importcom.rabbitmq.client.Envelope;
importcom.rabbitmq.client.ShutdownSignalException;
publicclassClientextendsClientEndPointimplementsConsumer,Runnable{
publicClient(StringendpointName)throwsIOException,TimeoutException{
super(endpointName);
DeclareOkok=super.consumerChannel.queueDeclare();
callBackQueue=ok.getQueue();
super.consumerChannel.basicConsume(callBackQueue,true,this);
}
@Override
publicvoidhandleConsumeOk(StringconsumerTag){
}
@Override
publicvoidhandleCancelOk(StringconsumerTag){
}
@Override
publicvoidhandleCancel(StringconsumerTag)throwsIOException{
}
@Override
publicvoidhandleDelivery(Stringarg0,Envelopearg1,
BasicPropertiesarg2,byte[]arg3)throwsIOException{
RpcInvokeModelinvokeModel=SerializationUtils.deserialize(arg3);
synchronized(callResult){
callResult.put(invokeModel.getInvokeFlag(),invokeModel.getObj());
callResult.notifyAll();
}
}
@Override
publicvoidhandleShutdownSignal(StringconsumerTag,
ShutdownSignalExceptionsig){
}
@Override
publicvoidhandleRecoverOk(StringconsumerTag){
}
publicvoidstart()throwsIOException{
Randomrandom=newRandom();
for(inti=0;i<10;i++){
Double[]arr=newDouble[10];
for(intj=0;j<arr.length;j++){
arr[j]=random.nextDouble();
}
System.out.println("调用rpc服务:"+Arrays.toString(arr));
RpcFuture<Double[]>result=invokeSort(arr);
Double[]obj=result.get();
System.out.println("调用rpc服务响应的结果:"+Arrays.toString(obj));
}
}
privateRpcFuture<Double[]>invokeSort(Double[]arr)throwsIOException{
RpcInvokeModelmo=newRpcInvokeModel();
mo.setInvokeFlag(++invokeFlag);
mo.setObj(arr);
byte[]body=SerializationUtils.serialize(mo);
BasicPropertiesbasicProperties=newBasicProperties().builder()
.replyTo(callBackQueue).build();
super.producerChannel.basicPublish("",super.endPointName,
basicProperties,body);
returnnewRpcFuture<>(this,mo.getInvokeFlag());
}
privatefinalStringcallBackQueue;
privatefinalMap<Long,Object>callResult=newHashMap<>();
privatevolatilelonginvokeFlag=0;
publicObjectgetResultByFlagKey(LongflagKey){
Objectresult=null;
while(true){
synchronized(callResult){
result=callResult.remove(flagKey);
if(result==null){
try{
callResult.wait();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}else{
returnresult;
}
}
}
}
@Override
publicvoidrun(){
try{
start();
}catch(IOExceptione){
e.printStackTrace();
}
}
}
packagecom.pdy.rabbitmq.rpc;
publicclassRpcFuture<T>{
privatefinalClientclient;
privatefinalLongflagKey;
publicRpcFuture(Clientclient,LongflagKey){
this.client=client;
this.flagKey=flagKey;
}
publicTget(){
return(T)client.getResultByFlagKey(flagKey);
}
}
服务端代码:
packagecom.pdy.rabbitmq.rpc;
importjava.io.IOException;
importjava.util.concurrent.TimeoutException;
importcom.rabbitmq.client.Channel;
importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.ConnectionFactory;
/**
*Representsaconnectionwithaqueue
*
*@authorsyntx
*
*/
publicabstractclassServiceEndPoint{
protectedChannelproducerChannel;
protectedChannelconsumerChannel;
protectedConnectionconnection;
protectedStringendPointName;
publicServiceEndPoint(StringendpointName)throwsIOException,
TimeoutException{
this.endPointName=endpointName;
//Createaconnectionfactory
ConnectionFactoryfactory=newConnectionFactory();
//hostnameofyourrabbitmqserver
//factory.setVirtualHost("pdy");
factory.setHost("localhost");
//factory.setUsername("pdy");
//factory.setPassword("pdy");
//factory.setVirtualHost("pdy11");
//gettingaconnection
connection=factory.newConnection();
//creatingachannel
producerChannel=connection.createChannel();
consumerChannel=connection.createChannel();
consumerChannel.queueDeclare(endpointName,false,false,false,null);
}
/**
*关闭channel和connection。并非必须,因为隐含是自动调用的。
*
*@throwsIOException
*@throwsTimeoutException
*/
publicvoidclose()throwsIOException,TimeoutException{
this.producerChannel.close();
this.consumerChannel.close();
this.connection.close();
}
}
packagecom.pdy.rabbitmq.rpc;
importjava.io.IOException;
importjava.util.Arrays;
importjava.util.concurrent.TimeoutException;
importorg.apache.commons.lang3.SerializationUtils;
importcom.rabbitmq.client.AMQP.BasicProperties;
importcom.rabbitmq.client.Consumer;
importcom.rabbitmq.client.Envelope;
importcom.rabbitmq.client.ShutdownSignalException;
publicclassServiceextendsServiceEndPointimplementsConsumer{
publicService(StringendpointName)throwsIOException,TimeoutException{
super(endpointName);
}
publicvoidstart()throwsIOException{
super.consumerChannel.basicConsume(super.endPointName,true,this);
}
@Override
publicvoidhandleConsumeOk(StringconsumerTag){
}
@Override
publicvoidhandleCancelOk(StringconsumerTag){
}
@Override
publicvoidhandleCancel(StringconsumerTag)throwsIOException{
}
@Override
publicvoidhandleDelivery(Stringarg0,Envelopearg1,
BasicPropertiesarg2,byte[]arg3)throwsIOException{
RpcInvokeModelinvokeModel=SerializationUtils.deserialize(arg3);
Double[]obj=(Double[])invokeModel.getObj();
Arrays.sort(obj);
invokeModel.setObj(obj);
byte[]body=SerializationUtils.serialize(invokeModel);
StringroutingKey=arg2.getReplyTo();
super.producerChannel.basicPublish("",routingKey,null,body);
}
@Override
publicvoidhandleShutdownSignal(StringconsumerTag,
ShutdownSignalExceptionsig){
}
@Override
publicvoidhandleRecoverOk(StringconsumerTag){
}
}
测试代码:
packagetest;
importjava.io.IOException;
importjava.util.concurrent.TimeoutException;
importcom.pdy.rabbitmq.rpc.Client;
importcom.pdy.rabbitmq.rpc.Service;
publicclassRpcTest{
publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{
Stringqueue="pdyRpc";
client(queue);
service(queue);
}
privatestaticvoidservice(Stringqueue)throwsIOException,
TimeoutException{
Serviceservice=newService(queue);
service.start();
}
privatestaticvoidclient(Stringqueue)throwsIOException,
TimeoutException{
Clientclient=newClient(queue);
newThread(client).start();
}
}
代码还不满足线程安全,但是改成线程安全是很容易的事情。