Python分布式进程报错:pickle模块不能序列化lambda函数
分布式进程时,遇到了一个错误:_pickle.PicklingError: Can't pickle <function <lambda> at 0x000001710FDC2EA0>: attribute lookup <lambda> on __main__ failed(pickle模块不能序列化lambda函数)
代码如下:
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
import random, queue
from multiprocessing.managers import BaseManager
# 发送任务的队列
task_queue = queue.Queue()
# 接收结果的队列
result_queue = queue.Queue()
# 从BaseManager继承的QueueManager
class QueueManager(BaseManager):
pass
# 吧两个Queue都注册到网络上,callable参数关联了Queue对象
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000,设置验证码abc
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动queue
manager.start()
# 获得通过网络访问的Queue对象
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务
for i in range(10):
n = random.randint(0, 1000)
print('添加任务 %d' % n)
task.put(n)
# 从result队列读取结果
print('尝试获取结果')
for i in range(10):
r = result.get(timeout=10)
print('结果是:%s' % r)
# 关闭
manager.shutdown()
print('master exit')
在教程中我记得有关pickle的事儿(有印象看来思想还在线上,哈哈),翻了一下,看到:
原来是系统问题造成的,那么,如何解决呢?在教程中我也看到,遇到这样的情况,需要我们自己定义函数,实现序列化。
所以对代码稍加修改,定义两个函数return_task_queue和return_result_queue实现序列化:
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
import random, queue
from multiprocessing.managers import BaseManager
# 发送任务的队列
task_queue = queue.Queue()
# 接收结果的队列
result_queue = queue.Queue()
def return_task_queue():
global task_queue
return task_queue
def return_result_queue():
global result_queue
return result_queue
# 从BaseManager继承的QueueManager
class QueueManager(BaseManager):
pass
if __name__ == '__main__':
# 吧两个Queue都注册到网络上,callable参数关联了Queue对象
QueueManager.register('get_task_queue', callable=return_task_queue)
QueueManager.register('get_result_queue', callable=return_result_queue)
# 绑定端口5000,设置验证码abc
manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
# 启动queue
manager.start()
# 获得通过网络访问的Queue对象
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务
for i in range(10):
n = random.randint(0, 1000)
print('添加任务 %d' % n)
task.put(n)
# 从result队列读取结果
print('尝试获取结果')
for i in range(10):
r = result.get(timeout=10)
print('结果是:%s' % r)
# 关闭
manager.shutdown()
print('master exit')
运行结果:
相关推荐
f = lambda x, y, z: x + y + z # returns a function that can optionally be assigned a name. def func: