rabbitmq实现高吞吐量的rpc调用

rabbitmq实现rpc调用基本思路:

客户端(client):客户端发起rpc调用,这当成一个消息,发送到rabbitmq服务器。这个消息会携带两个特殊(额外)的信息,一个是调用序号,一个是回调队列名称。调用序号需要服务端原样返回,而回调队列名称是用于服务端将结果放入这个队列中,以便客户端取回结果。

服务端(service):服务端接收到了一个rpc调用后,执行调用代码,将结果返回到指定的回调队列中去。

这里我们可以约定一个数据模型,见代码:

publicclassRpcInvokeModelimplementsSerializable{

//真实传输的数据

privateObjecttarget;

//调用序号

privateLonginvokeFlag;

//回调队列名称

privateStringcallBackQueue;

}

客户端将所有的调用封装成这个模型,然后发送出去。服务端解析成这个模型,当然,你可以再这个模型中加入别的参数(如方法名称等等)。

客户端代码:

packagecom.pdy.rabbitmq.rpc;

importjava.io.IOException;

importjava.util.concurrent.TimeoutException;

importcom.rabbitmq.client.Channel;

importcom.rabbitmq.client.Connection;

importcom.rabbitmq.client.ConnectionFactory;

/**

*Representsaconnectionwithaqueue

*

*@authorsyntx

*

*/

publicabstractclassClientEndPoint{

protectedChannelproducerChannel;

protectedChannelconsumerChannel;

protectedConnectionconnection;

protectedStringendPointName;

publicClientEndPoint(StringendpointName)throwsIOException,

TimeoutException{

this.endPointName=endpointName;

//Createaconnectionfactory

ConnectionFactoryfactory=newConnectionFactory();

//hostnameofyourrabbitmqserver

//factory.setVirtualHost("pdy");

factory.setHost("localhost");

//factory.setUsername("pdy");

//factory.setPassword("pdy");

//factory.setVirtualHost("pdy11");

//gettingaconnection

connection=factory.newConnection();

//creatingachannel

producerChannel=connection.createChannel();

consumerChannel=connection.createChannel();

producerChannel.queueDeclare(endpointName,false,false,false,null);

}

/**

*关闭channel和connection。并非必须,因为隐含是自动调用的。

*

*@throwsIOException

*@throwsTimeoutException

*/

publicvoidclose()throwsIOException,TimeoutException{

this.producerChannel.close();

this.consumerChannel.close();

this.connection.close();

}

}

packagecom.pdy.rabbitmq.rpc;

importjava.io.IOException;

importjava.util.Arrays;

importjava.util.HashMap;

importjava.util.Map;

importjava.util.Random;

importjava.util.concurrent.TimeoutException;

importorg.apache.commons.lang3.SerializationUtils;

importcom.rabbitmq.client.AMQP.BasicProperties;

importcom.rabbitmq.client.AMQP.Queue.DeclareOk;

importcom.rabbitmq.client.Consumer;

importcom.rabbitmq.client.Envelope;

importcom.rabbitmq.client.ShutdownSignalException;

publicclassClientextendsClientEndPointimplementsConsumer,Runnable{

publicClient(StringendpointName)throwsIOException,TimeoutException{

super(endpointName);

DeclareOkok=super.consumerChannel.queueDeclare();

callBackQueue=ok.getQueue();

super.consumerChannel.basicConsume(callBackQueue,true,this);

}

@Override

publicvoidhandleConsumeOk(StringconsumerTag){

}

@Override

publicvoidhandleCancelOk(StringconsumerTag){

}

@Override

publicvoidhandleCancel(StringconsumerTag)throwsIOException{

}

@Override

publicvoidhandleDelivery(Stringarg0,Envelopearg1,

BasicPropertiesarg2,byte[]arg3)throwsIOException{

RpcInvokeModelinvokeModel=SerializationUtils.deserialize(arg3);

synchronized(callResult){

callResult.put(invokeModel.getInvokeFlag(),invokeModel.getObj());

callResult.notifyAll();

}

}

@Override

publicvoidhandleShutdownSignal(StringconsumerTag,

ShutdownSignalExceptionsig){

}

@Override

publicvoidhandleRecoverOk(StringconsumerTag){

}

publicvoidstart()throwsIOException{

Randomrandom=newRandom();

for(inti=0;i<10;i++){

Double[]arr=newDouble[10];

for(intj=0;j<arr.length;j++){

arr[j]=random.nextDouble();

}

System.out.println("调用rpc服务:"+Arrays.toString(arr));

RpcFuture<Double[]>result=invokeSort(arr);

Double[]obj=result.get();

System.out.println("调用rpc服务响应的结果:"+Arrays.toString(obj));

}

}

privateRpcFuture<Double[]>invokeSort(Double[]arr)throwsIOException{

RpcInvokeModelmo=newRpcInvokeModel();

mo.setInvokeFlag(++invokeFlag);

mo.setObj(arr);

byte[]body=SerializationUtils.serialize(mo);

BasicPropertiesbasicProperties=newBasicProperties().builder()

.replyTo(callBackQueue).build();

super.producerChannel.basicPublish("",super.endPointName,

basicProperties,body);

returnnewRpcFuture<>(this,mo.getInvokeFlag());

}

privatefinalStringcallBackQueue;

privatefinalMap<Long,Object>callResult=newHashMap<>();

privatevolatilelonginvokeFlag=0;

publicObjectgetResultByFlagKey(LongflagKey){

Objectresult=null;

while(true){

synchronized(callResult){

result=callResult.remove(flagKey);

if(result==null){

try{

callResult.wait();

}catch(InterruptedExceptione){

e.printStackTrace();

}

}else{

returnresult;

}

}

}

}

@Override

publicvoidrun(){

try{

start();

}catch(IOExceptione){

e.printStackTrace();

}

}

}

packagecom.pdy.rabbitmq.rpc;

publicclassRpcFuture<T>{

privatefinalClientclient;

privatefinalLongflagKey;

publicRpcFuture(Clientclient,LongflagKey){

this.client=client;

this.flagKey=flagKey;

}

publicTget(){

return(T)client.getResultByFlagKey(flagKey);

}

}

服务端代码:

packagecom.pdy.rabbitmq.rpc;

importjava.io.IOException;

importjava.util.concurrent.TimeoutException;

importcom.rabbitmq.client.Channel;

importcom.rabbitmq.client.Connection;

importcom.rabbitmq.client.ConnectionFactory;

/**

*Representsaconnectionwithaqueue

*

*@authorsyntx

*

*/

publicabstractclassServiceEndPoint{

protectedChannelproducerChannel;

protectedChannelconsumerChannel;

protectedConnectionconnection;

protectedStringendPointName;

publicServiceEndPoint(StringendpointName)throwsIOException,

TimeoutException{

this.endPointName=endpointName;

//Createaconnectionfactory

ConnectionFactoryfactory=newConnectionFactory();

//hostnameofyourrabbitmqserver

//factory.setVirtualHost("pdy");

factory.setHost("localhost");

//factory.setUsername("pdy");

//factory.setPassword("pdy");

//factory.setVirtualHost("pdy11");

//gettingaconnection

connection=factory.newConnection();

//creatingachannel

producerChannel=connection.createChannel();

consumerChannel=connection.createChannel();

consumerChannel.queueDeclare(endpointName,false,false,false,null);

}

/**

*关闭channel和connection。并非必须,因为隐含是自动调用的。

*

*@throwsIOException

*@throwsTimeoutException

*/

publicvoidclose()throwsIOException,TimeoutException{

this.producerChannel.close();

this.consumerChannel.close();

this.connection.close();

}

}

packagecom.pdy.rabbitmq.rpc;

importjava.io.IOException;

importjava.util.Arrays;

importjava.util.concurrent.TimeoutException;

importorg.apache.commons.lang3.SerializationUtils;

importcom.rabbitmq.client.AMQP.BasicProperties;

importcom.rabbitmq.client.Consumer;

importcom.rabbitmq.client.Envelope;

importcom.rabbitmq.client.ShutdownSignalException;

publicclassServiceextendsServiceEndPointimplementsConsumer{

publicService(StringendpointName)throwsIOException,TimeoutException{

super(endpointName);

}

publicvoidstart()throwsIOException{

super.consumerChannel.basicConsume(super.endPointName,true,this);

}

@Override

publicvoidhandleConsumeOk(StringconsumerTag){

}

@Override

publicvoidhandleCancelOk(StringconsumerTag){

}

@Override

publicvoidhandleCancel(StringconsumerTag)throwsIOException{

}

@Override

publicvoidhandleDelivery(Stringarg0,Envelopearg1,

BasicPropertiesarg2,byte[]arg3)throwsIOException{

RpcInvokeModelinvokeModel=SerializationUtils.deserialize(arg3);

Double[]obj=(Double[])invokeModel.getObj();

Arrays.sort(obj);

invokeModel.setObj(obj);

byte[]body=SerializationUtils.serialize(invokeModel);

StringroutingKey=arg2.getReplyTo();

super.producerChannel.basicPublish("",routingKey,null,body);

}

@Override

publicvoidhandleShutdownSignal(StringconsumerTag,

ShutdownSignalExceptionsig){

}

@Override

publicvoidhandleRecoverOk(StringconsumerTag){

}

}

测试代码:

packagetest;

importjava.io.IOException;

importjava.util.concurrent.TimeoutException;

importcom.pdy.rabbitmq.rpc.Client;

importcom.pdy.rabbitmq.rpc.Service;

publicclassRpcTest{

publicstaticvoidmain(String[]args)throwsIOException,TimeoutException{

Stringqueue="pdyRpc";

client(queue);

service(queue);

}

privatestaticvoidservice(Stringqueue)throwsIOException,

TimeoutException{

Serviceservice=newService(queue);

service.start();

}

privatestaticvoidclient(Stringqueue)throwsIOException,

TimeoutException{

Clientclient=newClient(queue);

newThread(client).start();

}

}

代码还不满足线程安全,但是改成线程安全是很容易的事情。

相关推荐