Python分布式进程报错:pickle模块不能序列化lambda函数

分布式进程时,遇到了一个错误:_pickle.PicklingError: Can't pickle <function <lambda> at 0x000001710FDC2EA0>: attribute lookup <lambda> on __main__ failed(pickle模块不能序列化lambda函数)

代码如下:

Python分布式进程报错:pickle模块不能序列化lambda函数

#!/usr/bin/env python

# _*_ coding:utf-8 _*_

import random, queue

from multiprocessing.managers import BaseManager

# 发送任务的队列

task_queue = queue.Queue()

# 接收结果的队列

result_queue = queue.Queue()

# 从BaseManager继承的QueueManager

class QueueManager(BaseManager):

pass

# 吧两个Queue都注册到网络上,callable参数关联了Queue对象

QueueManager.register('get_task_queue', callable=lambda: task_queue)

QueueManager.register('get_result_queue', callable=lambda: result_queue)

# 绑定端口5000,设置验证码abc

manager = QueueManager(address=('', 5000), authkey=b'abc')

# 启动queue

manager.start()

# 获得通过网络访问的Queue对象

task = manager.get_task_queue()

result = manager.get_result_queue()

# 放几个任务

for i in range(10):

n = random.randint(0, 1000)

print('添加任务 %d' % n)

task.put(n)

# 从result队列读取结果

print('尝试获取结果')

for i in range(10):

r = result.get(timeout=10)

print('结果是:%s' % r)

# 关闭

manager.shutdown()

print('master exit')

在教程中我记得有关pickle的事儿(有印象看来思想还在线上,哈哈),翻了一下,看到:

原来是系统问题造成的,那么,如何解决呢?在教程中我也看到,遇到这样的情况,需要我们自己定义函数,实现序列化。

所以对代码稍加修改,定义两个函数return_task_queue和return_result_queue实现序列化:

#!/usr/bin/env python

# _*_ coding:utf-8 _*_

import random, queue

from multiprocessing.managers import BaseManager

# 发送任务的队列

task_queue = queue.Queue()

# 接收结果的队列

result_queue = queue.Queue()

def return_task_queue():

global task_queue

return task_queue

def return_result_queue():

global result_queue

return result_queue

# 从BaseManager继承的QueueManager

class QueueManager(BaseManager):

pass

if __name__ == '__main__':

# 吧两个Queue都注册到网络上,callable参数关联了Queue对象

QueueManager.register('get_task_queue', callable=return_task_queue)

QueueManager.register('get_result_queue', callable=return_result_queue)

# 绑定端口5000,设置验证码abc

manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')

# 启动queue

manager.start()

# 获得通过网络访问的Queue对象

task = manager.get_task_queue()

result = manager.get_result_queue()

# 放几个任务

for i in range(10):

n = random.randint(0, 1000)

print('添加任务 %d' % n)

task.put(n)

# 从result队列读取结果

print('尝试获取结果')

for i in range(10):

r = result.get(timeout=10)

print('结果是:%s' % r)

# 关闭

manager.shutdown()

print('master exit')

运行结果:

Python分布式进程报错:pickle模块不能序列化lambda函数

Python分布式进程报错:pickle模块不能序列化lambda函数

相关推荐