运用Comet技术实现服务端往客户端主动推送数据(结合redis发布/订阅)

记得我之前写过  redis主动向页面push数据  的文章,但文中所描述的方法要应用到J2EE的项目中还是比较困难的(还需用到nodejs什么的)。于是本文来探究下比较适合web项目的主动推技术。

Comet是一种用于web的推送技术,能使服务器能实时地将更新的信息传送到客户端,而无须客户端发出请求,目前有两种实现方式:长轮询(long-polling)和iframe流(streaming)。下面就用iframe流的实现方式来实现服务端主动向客户端(这里客户端指的是jsp页面)推送的效果,并且结合了redis的发布订阅,算是比较典型的例子了。

客户端(页面):

<script type="text/javascript">
	$(function() {
		setCometUrl();
		bindLinstener();
	});
	
	function bindLinstener() {
		if (window.addEventListener) {  
		    window.addEventListener("load", comet.initialize, false);  
		    window.addEventListener("unload", comet.onUnload, false);  
		} else if (window.attachEvent) {  
		    window.attachEvent("onload", comet.initialize);  
		    window.attachEvent("onunload", comet.onUnload);  
		} 
	}
	
	function setCometUrl(){
		comet.cometUrl = "pubsub/push.json";
	}
	
	//服务器推送代码
	var comet = {
		connection : false,
		iframediv : false,

		initialize : function() {
			if (navigator.appVersion.indexOf("MSIE") != -1) {
				// For IE browsers
				comet.connection = new ActiveXObject("htmlfile");
				comet.connection.open();
				comet.connection.write("<html>");
				comet.connection.write("<script>document.domain = '" + document.domain + "'");
				comet.connection.write("</html>");
				comet.connection.close();
				comet.iframediv = comet.connection.createElement("div");
				comet.connection.appendChild(comet.iframediv);
				comet.connection.parentWindow.comet = comet;
				comet.iframediv.innerHTML = "<iframe id='comet_iframe' src='"+comet.cometUrl+"'></iframe>";
			
			} else if (navigator.appVersion.indexOf("KHTML") != -1) {
				// for KHTML browsers
				comet.connection = document.createElement('iframe');
				comet.connection.setAttribute('id', 'comet_iframe');
				comet.connection.setAttribute('src', comet.cometUrl);
				with (comet.connection.style) {
					position = "absolute";
					left = top = "-100px";
					height = width = "1px";
					visibility = "hidden";
				}
				document.body.appendChild(comet.connection);

			} else {
				// For other browser (Firefox...)
				comet.connection = document.createElement('iframe');
				comet.connection.setAttribute('id', 'comet_iframe');
				comet.iframediv = document.createElement('iframe');
				comet.iframediv.setAttribute('src', comet.cometUrl);
				
				comet.connection.appendChild(comet.iframediv);
				document.body.appendChild(comet.connection);
			}
		},

		onUnload : function() {
			if (comet.connection) {
				comet.connection = false; // release the iframe to prevent problems with IE when reloading the page
				closePage();
			}
		},
		
		receiveMsg : function(msg) {
			$("#content").append(msg + "<br/>");
		}
		
	}
	
	function closePage() {
		$.ajax({
			async : true,
			cache : false,
			type : "POST",
			//data:{objId:objId},
			dataType:"json",
			url :"pubsub/close.json",
			success : function(data) {
			},
			error: function(){
			}
		});
	}
</script>

</head>
<body >
	<div id="content" class="show"></div>
</body>

 这个客户端页面是利用浏览器支持的Comet,仅发起一次ajax请求,打通后台后,后台就会源源不断主动往这个页面发送数据。

后台较为复杂,并且还结合了redis的发布订阅。数据来源则是订阅redis的一个channel而得到。

Action:

@Controller
public class PubSubAction {
	
	LinkedList<String> queue = new LinkedList<String>();
	PrintWriter out;
	
	//线程
	MsgSubHandler subT = null;
	CheckQueueHandler checkT = null;
	
	@RequestMapping("/pubsub/push.json")
	@ResponseBody
	public void pushMsg(HttpServletResponse response) {
		System.out.println("这儿进几次.........");
		//订阅
		subT = new MsgSubHandler("pubsub_channel", queue);
		subT.start();
		//检查
		checkT = new CheckQueueHandler(queue);
		checkT.start();
		//创建Comet Iframe
		sendHtmlScript(response, "<script type=\"text/javascript\"> var comet = window.parent.comet;</script>");
		
		while (true) {
			try {
				Thread.sleep(1000);//每隔1s从队列取数
				if(queue.size() > 0) {
					String msg = queue.pop();
					System.out.println("从队列里取到的信息:" + msg);
					sendHtmlScript(response, "<script type=\"text/javascript\"> comet.receiveMsg('"+msg+"');</script>");
				}
			}catch(InterruptedException e) {
				e.printStackTrace();
			}	
		}
	}
	
	@RequestMapping("/pubsub/close.json")
	@ResponseBody
	public void shutdownServer() throws InterruptedException {
		System.out.println("开始关闭操作..");
		//关闭流
		out.flush();
		out.close();
		//队列情空
		queue.clear();
		//消息的关闭处理
		subT.shut();
		checkT.shut();
		//线程停止
		if(checkT.isAlive()) {
			checkT.interrupt();
			checkT.join();
		}
		if(subT.isAlive()) {
			subT.interrupt();
			subT.join();
		}
	}
	
	private void sendHtmlScript(HttpServletResponse response,String script){
		response.setCharacterEncoding("UTF-8");
		response.setContentType("text/html");
		response.setDateHeader("Expires", 0);
		response.setHeader("Pragma", "No-cache");
		response.setHeader("Cache-Control", "no-cache,no-store,max-age=0");
		try {
			out = response.getWriter();
			out.write(script);
			out.flush();
		} catch (IOException e) {
			e.printStackTrace();
			log.error(e.getMessage(), e);
		}
   }
}

其中,订阅消息的线程类和检查消息队列大小的线程类分别如下:

1:定时检查队列大小的线程类,目的是避免消息队列大小过大

public class CheckQueueHandler extends Thread {
	
	private LinkedList<String> queue;
	private boolean runFlag = true;
	
	public CheckQueueHandler(LinkedList<String> queue) {
		this.queue = queue;
	}

	@Override
	public void run() {
		try {
			while (runFlag && queue.size()>0) {
				Thread.sleep(60 * 1000);//每隔1分钟检查指定队列的大小
				if (queue.size() >= 500) {
					queue.clear();
				}
			}
		} catch (InterruptedException e) {
			e.printStackTrace();  
		}
	}
	
	public void shut() {
		runFlag = false;
	}
}

 2:订阅相应的channel的线程类:

public class MsgSubHandler extends Thread{
	
	private LinkedList<String> queue;
	private String channel;
	
	JedisPool pool;
	Jedis jedis;
	PubSubListener listener;
	
	public MsgSubHandler(String channel, LinkedList<String> queue) {
		this.channel = channel;
		this.queue = queue;
		
		//redis资源初始化
		pool = SysBeans.getBean("jedisPool");
		jedis = pool.getResource();
		
		//发布/订阅监听初始化
		listener = new PubSubListener(queue);
	}
	
	@Override
	public void run() {
		//订阅指定的渠道信息
		jedis.subscribe(listener, channel);
	}
	
	public void shut() {
		//归还redis资源
		if(pool !=null && jedis != null) {
			pool.returnResource(jedis);
		}
		//取消渠道订阅
		listener.unsubscribe();
	}
}

 3:redis的发布/订阅监听类

public class PubSubListener extends JedisPubSub {
	
	private LinkedList<String> queue =null;
	
	public PubSubListener(LinkedList<String> queue) {
		this.queue  =  queue;
	}
	
	//取得订阅后消息的处理  
    @Override  
    public void onMessage(String channel, String message) {  
        //System.out.print("onMessage:取得订阅后消息的处理  ");  
        queue.add(message);   
    }  
      
    //取得按表达式的方式订阅的消息后的处理  
    @Override  
    public void onPMessage(String pattern, String channel, String message) {  
        System.out.print("onPMessage:取得按表达式的方式订阅的消息后的处理    ");  
        System.out.println(pattern + "=" + channel + "=" + message);  
    }  
      
    //初始化按表达式的方式订阅时候的处理  
    @Override  
    public void onPSubscribe(String pattern, int subscribedChannels) {  
        System.out.print("onPSubscribe:初始化按表达式的方式订阅时候的处理   ");  
        System.out.println(pattern + "=" + subscribedChannels);    
    }  
      
    //取消化按表达式的方式订阅时候的处理  
    @Override  
    public void onPUnsubscribe(String pattern, int subscribedChannels) {  
        System.out.print("onPUnsubscribe:取消化按表达式的方式订阅时候的处理   ");  
        System.out.println(pattern + "=" + subscribedChannels);   
    }  
      
    //初始化订阅时候的处理  
    @Override  
    public void onSubscribe(String channel, int subscribedChannels) {  
        System.out.print("onSubscribe:初始化订阅时候的处理   ");  
        System.out.println(channel + "=" + subscribedChannels);   
    }  
      
    //取消订阅时候的处理  
    @Override  
    public void onUnsubscribe(String channel, int subscribedChannels) {  
        System.out.print("onUnsubscribe:取消订阅时候的处理   ");  
        System.out.println(channel + "=" + subscribedChannels);  
    }  

}

启动工程,打开客户端页面,最初始的div:


运用Comet技术实现服务端往客户端主动推送数据(结合redis发布/订阅)
 同时控制台打印:

这儿进几次.........

onSubscribe:初始化订阅时候的处理   pubsub_channel=1

这说明:一打开客户端,就实现了订阅对应channel的目的。

接下来,为了让这个div中有数据,我们开始来对这个channel进行publish一些数据,模拟:

public static void main(String[] args) {
		Jedis jedis = new Jedis("localhost");
		while(true) {
			try {
				Thread.sleep(2000);
				jedis.publish("pubsub_channel", "I like " + Math.random()*100 );
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			
		}
	}

 然后你再观察这个div,会发现如下现象(某一时刻):


运用Comet技术实现服务端往客户端主动推送数据(结合redis发布/订阅)

由此说明:我们达到了如题所想要的目的!——结合了redis的发布/订阅  并且客户端只请求服务端一次,服务端主动向客户端推送了数据。

最后,我们再试着关闭客户端页面,会发现控制台打印:

onUnsubscribe:取消订阅时候的处理   pubsub_channel=0

说明,客户端一关闭,就取消了对channel的订阅了。并且queue队列也会被清空。


 其实Comet并不是新兴的技术,关于【反ajax】技术,最新的有WebSocket,以后有机会再研究。

相关推荐