zeroMQ初体验-7.优雅的卸载工作进程

关掉一个进程有很多种方式,而在ZeroMQ中则推崇通过使用信号通知,可控的卸载、关闭进程。在这里,要援引之前的"分而治之"例子(具体可以见这里)。

例图:

显然,信号发送是由能够掌握整个进度的"水槽"(下游)来控制,在原有基础上做少许变更即可。

Worker(数据处理):

import sys
import time
import zmq

context = zmq.Context()

receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")

controller = context.socket(zmq.SUB)
controller.connect("tcp://localhost:5559")
controller.setsockopt(zmq.SUBSCRIBE, "")

poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(controller, zmq.POLLIN)
while True:
    socks = dict(poller.poll())

    if socks.get(receiver) == zmq.POLLIN:
        message = receiver.recv()

        workload = int(message)  # Workload in msecs
        time.sleep(workload / 1000.0)
        sender.send(message)

        sys.stdout.write(".")
        sys.stdout.flush()

    if socks.get(controller) == zmq.POLLIN:
        break

水槽(下游):

import sys
import time
import zmq

context = zmq.Context()

receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")

controller = context.socket(zmq.PUB)
controller.bind("tcp://*:5559")

receiver.recv()

tstart = time.time()

for task_nbr in xrange(100):
    receiver.recv()
    if task_nbr % 10 == 0:
        sys.stdout.write(":")
    else:
        sys.stdout.write(".")
    sys.stdout.flush()

tend = time.time()
tdiff = tend - tstart
total_msec = tdiff * 1000
print "Total elapsed time: %d msec" % total_msec

controller.send("KILL")
time.sleep(1)

注意:

正常情况下,即使进程被关闭,可能端口并没有被清除(那是有ZeroMQ维护的),原文中调用了这么两句

zmq_close(server)

zmq_term(context)

python中对应为zmq.close(),zmq.term(),不过python的垃圾回收会替俺们解决后顾之忧的~

(未完待续)

相关推荐