python使用flask+Thread实现异步任务
# 定义一个WorkerController,用于执行业务代码 class WorkerController(object): def __init__(self): pass def do_something(self, params): print("do something")
# 定义一个AsyncWorker,继承Thread,重写run方法,run方法中调用WorkerController的do_something方法 from threading import Thread from worker_controller import WorkerController WORKER_CONTROLLER = WorkerController() class AsyncWorker(Thread): ASYNC_WORKER_INFO = dict() def __init__(self, params, ticket_id): Thread.__init__(self) self.params = params self.ticket_id = ticket_id self.daemon = True self.start() def run(self): AsyncWorker.ASYNC_WORKER_INFO[self.ticket_id] = { # 保存一些业务信息,之后轮询的时候,可以作为输出返回 "some": "information", "status": "running" } WORKER_CONTROLLER.do_something(self.params)
# 开始任务 @app.route('/do-something', methods=['GET']) def do_something(): params = request.args.get('params', '').strip() # 生成一个任务id,用于轮询,例如ticket id ticket_id = genearteMD5(str(int(round(time.time() * 1000)))) # 实例化一个AsyncWorker AsyncWorker(params=params, ticket_id=ticket_id) # 将ticket_id返回 return jsonify({"ticket_id": ticket_id}) # 轮询任务 @app.route("/check-status", methods=['GET']) def check_status(): ticket_id = request.args.get("ticket_id") logs = AsyncWorker.ASYNC_WORKER_INFO[str(ticket_id)] return jsonify(logs)
如何在前端进行异步轮询呢?以angularjs的interval方法为例:https://www.cnblogs.com/CheeseZH/p/12444034.html
相关推荐
huavhuahua 2020-11-20
xiaoseyihe 2020-11-16
weiiron 2020-08-17
joyjoy0 2020-08-13
qiximiao 2020-08-03
huakai 2020-07-26
生物信息学 2020-06-22
荒乱的没日没夜 2020-06-14
adamlovejw 2020-06-11
RocNg 2020-06-01
Keepgoing 2020-05-26
lqxqust 2020-05-19
class Singleton: def __new__: # 关键在于这,每一次实例化的时候,我们都只会返回这同一个instance对象 if not hasattr: cls.instance =
lhxxhl 2020-05-16
HongKongPython 2020-05-12
wangqing 2020-05-10
Lexan 2020-05-09
cenylon 2020-05-08
宿舍 2020-05-06