Celery中文翻译-Application

Celery在使用前必须实例化,称为application或app。app是线程安全的,具有不同配置、组件、task的多个Celery应用可以在同一个进程空间共存。

# 创建Celery应用
>>> from celery import Celery
>>> app = Celery()
>>> app
<Celery __main__:0x100469fd0>

最后一行文本化显示了Celery应用:包含应用所属类的名称,当前主模块名,以及内存地址。唯一重要的信息是模块名称。

Main Name

在Celery中发送task消息时,该消息仅包含要执行的task的名称。每一个worker维护一个task名称和对应函数的映射,这称为task registry

当定义一个task时,该task将注册到本地:

>>> @app.task
... def add(x, y):
...     return x + y

>>> add
<@task: __main__.add>

>>> add.name
__main__.add

>>> app.tasks['__main__.add']
<@task: __main__.add>

当Celery无法检测task函数属于哪个模块时,使用main模块名生成初始task名称。

这种方式仅适用于以下两种场景:

  1. 定义task的模块作为程序运行
  2. app在python shell中创建
# tasks.py
from celery import Celery
app = Celery()

@app.task
def add(x, y): return x + y

if __name__ == '__main__':
    app.worker_main()

如果直接运行tasks.py,task名将以__main__为前缀,但如果tasks.py被其他程序导入,task名将以tasks为前缀。如下:

>>> from tasks import add
>>> add.name
tasks.add

也可以直接指定主模块名:

>>> app = Celery('tasks')
>>> app.main
'tasks'

>>> @app.task
... def add(x, y):
...     return x + y

>>> add.name
tasks.add

Configuration

可以通过直接设置,或使用专用配置模块对Celery进行配置。

通过app.conf属性查看或直接设置配置:

>>> app.conf.timezone
'Europe/London'

>>> app.conf.enable_utc = True

或用app.conf.update方法一次更新多个配置:

>>> app.conf.update(
...     enable_utc=True,
...     timezone='Europe/London',
...)

config_from_object

app.config_from_object()方法从配置模块或对象中导入配置。需要注意的是:调用config_from_object()方法将重置在这之前配置的任何设置

使用模块名
app.config_from_object()方法接收python模块的完全限定名(fully qualified name)或具体到其中的某个属性名,例如"celeryconfig", "myproj.config.celery", 或"myproj.config:CeleryConfig":

from celery import Celery

app = Celery()
app.config_from_object('celeryconfig')

只要能够正常执行import celeryconfig,app就能正常配置。

使用模块对象
也可以传入一个已导入的模块对象,但不建议这样做。

import celeryconfig

from celery import Celery

app = Celery()
app.config_from_object(celeryconfig)

更推荐使用模块名的方式,因为这样在使用prefork pool时不需要序列化该模块。如果在实际应用中出现配置问题或序列化错误,请尝试使用模块名的方式。

使用配置类或对象

from celery import Celery

app = Celery()

class Config:
    enable_utc = True
    timezone = 'Europe/London'

app.config_from_object(Config)

config_from_envvar

app.config_from_envvar()方法从环境变量中接收配置模块名。

import os
from celery import Celery

#: Set default configuration module name
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')

app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')

通过环境变量指定配置模块:

$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l info

Censored configuration

如果要显示Celery配置,可能需要过滤某些敏感信息如密码、密钥等。Celery提供了几种用于帮助显示配置的实用方法。

humanize()
该方法返回列表字符串形式的配置,默认只包含改动过的配置,如果要显示内置的默认配置,设置with_defaults参数为True:

>>> app.conf.humanize(with_defaults=False, censored=True)

table()
该方法返回字典形式的配置:

>>> app.conf.table(with_defaults=False, censored=True)

Celery可能不会移除所有的敏感信息,因为它使用正则表达式匹配键并判断是否移除。如果用户添加了包含敏感信息的自定义配置,可以使用Celery可能标记为敏感配置的名称来命名(API, TOKEN, KEY, SECRET, PASS, SIGNATURE, DATABASE)。

Laziness

应用实例是惰性的。

创建Celery实例只会执行以下操作:

  1. 创建用于event的logical clock instance
  2. 创建task registry
  3. 设置为当前应用(除非禁用了set_as_current参数)
  4. 调用app.on_init()回调函数(默认不执行任何操作)

app.task()装饰器不会在task定义时立即创建task,而是在task使用时或finalized应用后创建。

下例说明了在使用task或访问其属性前,都不会创建task:

>>> @app.task
>>> def add(x, y):
...    return x + y

>>> type(add)
<class 'celery.local.PromiseProxy'>

>>> add.__evaluated__()
False

>>> add        # <-- causes repr(add) to happen
<@task: __main__.add>

>>> add.__evaluated__()
True

应用的Finalization指显式地调用app.finalize()方法或隐式地访问app.tasks属性。

finalized应用将会:

  1. 复制必须在应用间共享的task。task默认是共享的,但如果禁用了task装饰器的shared属性,将属于应用私有。
  2. 评估所有待处理的task装饰器
  3. 确保所有task绑定到当前应用。将task绑定到某个应用,以便可以从配置中读取默认值。

Breaking the chain

虽然可以依赖于当前应用,但最佳实践是将应用实例传递给任何需要它的对象,这个行为可以称为app chain

# 依赖于当前应用(bad)

from celery import current_app

class Scheduler(object):

    def run(self):
        app = current_app
# 传递应用实例(good)

class Scheduler(object):

    def __init__(self, app):
        self.app = app

在开发模式设置CELERY_TRACE_APP环境变量,可以在应用链断开时抛出异常:

$ CELERY_TRACE_APP=1 celery worker -l info

Abstract Tasks

使用task()装饰器创建的task都继承自celery.app.task模块的Task基类。继承该类可以自定义task类:

from celery import Task
# 或者 from celery.app.task import Task

class DebugTask(Task):

    def __call__(self, *args, **kwargs):
        print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
        return super(DebugTask, self).__call__(*args, **kwargs)

如果要重写__call__()方法,记得调用super。这样在task直接调用时会执行基类的默认事件。

Task基类是特殊的,因为它并未绑定到任何特定的应用。一旦task绑定到应用,它将读取配置以设置默认值等。

  1. 通过base参数指定基类

    @app.task(base=DebugTask)
    def add(x, y):
        return x + y
  2. 通过app.Task属性指定基类

    >>> from celery import Celery, Task
    
    >>> app = Celery()
    
    >>> class MyBaseTask(Task):
    ...    queue = 'hipri'
    
    >>> app.Task = MyBaseTask
    >>> app.Task
    <unbound MyBaseTask>
    
    >>> @app.task
    ... def add(x, y):
    ...     return x + y
    
    >>> add
    <@task: __main__.add>
    
    >>> add.__class__.mro()
    [<class add of <Celery __main__:0x1012b4410>>,
     <unbound MyBaseTask>,
     <unbound Task>,
     <type 'object'>]

相关推荐