Redis消息发布订阅

想找到一个消息推送的方案,隐约觉得Pub/Sub是一种解决问题的途径,但没在项目实践中用到。最新在了解学习阿里云,里面有demo。摘录记之。

消息的发布与订阅

场景介绍

ApsaraDBforRedis也提供了与Redis相同的消息发布(pub)与订阅(sub)功能。即一个client发布消息,其他多个client订阅消息。

需要注意的是,ApsaraDBforRedis发布的消息是“非持久”的,即消息发布者只负责发送消息,而不管消息是否有接收方,也不会保存之前发送的消息,即发布的消息“即发即失”;消息订阅者也只能得到订阅之后的消息,频道(channel)中此前的消息将无从获得。

此外消息发布者(即publish客户端),无需独占与服务器端的连接,你可以在发布消息的同时,使用同一个客户端连接进行其他操作(例如List操作等)。但是消息订阅者(即subscribe客户端),需要独占与服务器端的连接,即进行subscribe期间,该客户端无法执行其他操作,而是以阻塞的方式等待频道(channel)中的消息;因此消息订阅者需要使用单独的服务器连接,或者需要在单独的线程中使用(参见如下示例)。

消息发布者(即publishclient)

package message.kvstore.aliyun.com;

import redis.clients.jedis.Jedis;

public class KVStorePubClient {

    private Jedis jedis;//
    public KVStorePubClient(String host,int port, String password){
        jedis = new Jedis(host,port);

        //KVStore的实例ID及密码
        String authString = jedis.auth(password);//kvstore_instance_id:password

        if (!authString.equals("OK"))
        {
            System.err.println("AUTH Failed: " + authString);
            return;
        }
    }

    public void pub(String channel,String message){
        System.out.println("  >>> 发布(PUBLISH) > Channel:"+channel+" > 发送出的Message:"+message);

        jedis.publish(channel, message);
    }

    public void close(String channel){
        System.out.println("  >>> 发布(PUBLISH)结束 > Channel:"+channel+" > Message:quit");

        //消息发布者结束发送,即发送一个“quit”消息;
        jedis.publish(channel, "quit");

    }

}

消息订阅者(即subscribeclient)

package message.kvstore.aliyun.com;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class KVStoreSubClient extends Thread{

    private Jedis jedis;
    private String channel;
    private JedisPubSub listener;

    public KVStoreSubClient(String host,int port, String password){
        jedis = new Jedis(host,port);

        //ApsaraDB for Redis的实例ID及密码
                String authString = jedis.auth(password);//kvstore_instance_id:password

                if (!authString.equals("OK"))
                {
                    System.err.println("AUTH Failed: " + authString);
                    return;
                }
    }

    public void setChannelAndListener(JedisPubSub listener,String channel){
        this.listener=listener;
        this.channel=channel;
    }

    private void subscribe(){

        if(listener==null || channel==null){
            System.err.println("Error:SubClient> listener or channel is null");
        }

        System.out.println("  >>> 订阅(SUBSCRIBE) > Channel:"+channel);
        System.out.println();

        //接收者在侦听订阅的消息时,将会阻塞进程,直至接收到quit消息(被动方式),或主动取消订阅
        jedis.subscribe(listener, channel);
    }

    public void unsubscribe(String channel){
        System.out.println("  >>> 取消订阅(UNSUBSCRIBE) > Channel:"+channel);
        System.out.println();

        listener.unsubscribe(channel);
    }

    @Override
    public void run() {
        try{

            System.out.println();
            System.out.println("----------订阅消息SUBSCRIBE 开始-------");

            subscribe();

            System.out.println("----------订阅消息SUBSCRIBE 结束-------");
            System.out.println();
        }catch(Exception e){
            e.printStackTrace();
        }

    }

}

消息监听者

package message.kvstore.aliyun.com;

import redis.clients.jedis.JedisPubSub;

public class KVStoreMessageListener extends JedisPubSub{

    @Override
    public void onMessage(String channel, String message) {

        System.out.println("  <<< 订阅(SUBSCRIBE)< Channel:" + channel + " >接收到的Message:" + message );
        System.out.println();

        //当接收到的message为quit时,取消订阅(被动方式)
        if(message.equalsIgnoreCase("quit")){
            this.unsubscribe(channel);
        }
    }

    @Override
    public void onPMessage(String pattern, String channel, String message) {
        // TODO Auto-generated method stub

    }

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        // TODO Auto-generated method stub

    }

    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        // TODO Auto-generated method stub

    }

    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        // TODO Auto-generated method stub

    }

    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
        // TODO Auto-generated method stub

    }
}

示例主程序

package message.kvstore.aliyun.com;

import java.util.UUID;

import redis.clients.jedis.JedisPubSub;

public class KVStorePubSubTest {

    //ApsaraDB for Redis的连接信息,从控制台可以获得
    static final String host = "xxxxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
    static final int port = 6379;
    static final String password="xxxxxxxxxx:yyyyyyyy";//kvstore_instance_id:password

    public static void main(String[] args) throws Exception{
            KVStorePubClient pubClient = new KVStorePubClient(host, port,password);

            final String channel = "KVStore频道-A";

            //消息发送者开始发消息,此时还无人订阅,所以此消息不会被接收
            pubClient.pub(channel, "Aliyun消息1:(此时还无人订阅,所以此消息不会被接收)");

            //消息接收者
            KVStoreSubClient subClient = new KVStoreSubClient(host, port,password);

            JedisPubSub listener = new KVStoreMessageListener();
            subClient.setChannelAndListener(listener, channel);

            //消息接收者开始订阅
            subClient.start();


            //消息发送者继续发消息
            for (int i = 0; i < 5; i++) {

                String message=UUID.randomUUID().toString();
                pubClient.pub(channel, message);
                Thread.sleep(1000);
            }

            //消息接收者主动取消订阅
            subClient.unsubscribe(channel);

            Thread.sleep(1000);
            pubClient.pub(channel, "Aliyun消息2:(此时订阅取消,所以此消息不会被接收)");

            //消息发布者结束发送,即发送一个“quit”消息;
            //此时如果有其他的消息接收者,那么在listener.onMessage()中接收到“quit”时,将执行“unsubscribe”操作。
            pubClient.close(channel);

        }

    }

运行结果

>>>发布(PUBLISH)>Channel:KVStore频道-A>发送出的Message:Aliyun消息1:(此时还无人订阅,所以此消息不会被接收)

----------订阅消息SUBSCRIBE开始-------

>>>订阅(SUBSCRIBE)>Channel:KVStore频道-A

>>>发布(PUBLISH)>Channel:KVStore频道-A>发送出的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889

<<<订阅(SUBSCRIBE)<Channel:KVStore频道-A>接收到的Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889

>>>发布(PUBLISH)>Channel:KVStore频道-A>发送出的Message:ed5924a9-016b-469b-8203-7db63d06f812

<<<订阅(SUBSCRIBE)<Channel:KVStore频道-A>接收到的Message:ed5924a9-016b-469b-8203-7db63d06f812

>>>发布(PUBLISH)>Channel:KVStore频道-A>发送出的Message:f1f84e0f-8f35-4362-9567-25716b1531cd

<<<订阅(SUBSCRIBE)<Channel:KVStore频道-A>接收到的Message:f1f84e0f-8f35-4362-9567-25716b1531cd

>>>发布(PUBLISH)>Channel:KVStore频道-A>发送出的Message:746bde54-af8f-44d7-8a49-37d1a245d21b

<<<订阅(SUBSCRIBE)<Channel:KVStore频道-A>接收到的Message:746bde54-af8f-44d7-8a49-37d1a245d21b

>>>发布(PUBLISH)>Channel:KVStore频道-A>发送出的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef

<<<订阅(SUBSCRIBE)<Channel:KVStore频道-A>接收到的Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef

>>>取消订阅(UNSUBSCRIBE)>Channel:KVStore频道-A

----------订阅消息SUBSCRIBE结束-------

>>>发布(PUBLISH)>Channel:KVStore频道-A>发送出的Message:Aliyun消息2:(此时订阅取消,所以此消息不会被接收)

>>>发布(PUBLISH)结束>Channel:KVStore频道-A>Message:quit

相关推荐