介绍
Celery是由Python开发、简单、灵活、可靠的分布式任务系统,用于处理大量消息,同时为操作提供维护该系统所需的工具。其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。Celery侧重于实时操作,但对调度支持也很好,其每天可以处理数以百万计的任务。它是一个专注于实时处理的任务队列,同时也支持任务调度。Celery 拥有庞大且多样化的用户和贡献者社区,您应该加入我们的IRC 或邮件列表。Celery 是开源的并且根据BSD 许可证获得授权。
特点
简单:配置和使用比较简单
高可用:当任务失败或执行过程中连接中断,celery会自动尝试重新执行
快速:每分钟可处理上百万个任务
灵活:几乎celery每个组件都可以被扩展和自定制
组件构成
- producer:生产者,发送任务到消息队列。
- Task:任务,是实际需要执行的代码逻辑。
- Beat:定时任务调度器,用于定时触发任务的执行。
- Broker:任务队列的消息代理,消息代理是一个中间件,它负责接收任务请求并将其分发给可用的工作进程(用于存储和传递任务消息)。常用的 Broker 有 RabbitMQ、Redis、Amazon SQS 等。
- Worker:任务执行者,Celery Worker 是一个独立的进程,它负责从 Broker 中接收任务请求,并执行这些任务。
- Result Backend:结果后端,用于存储任务执行结果,常用的有 Redis、MongoDB、数据库等。
安装
1
2
3
4
# 安装Celery
pip install celery
# 安装Redis作为消息代理
pip install redis
应用
异步任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 创建任务文件 tasks.py
#简单的应用代码:
from celery import Celery
app = Celery(
'hello',
broker='redis://localhost:6379/0',
result_backend = 'redis://localhost:6379/0',
# 连接失败后重试,不设置将有一个警告
broker_connection_retry_on_startup=True,
)
@app.task
def hello(name):
return 'hello %s !' % name
1
2
3
4
# 创建生产者文件 producer.py
import tasks
tasks.hello.delay('mmy83')
1
2
# 启动任务执行者 worker
celery -A hello worker --loglevel=info
1
python producer.py
定时任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 创建任务文件 tasks.py
from celery import Celery
app = Celery(
'hello',
broker='redis://localhost:6379/0',
result_backend = 'redis://localhost:6379/0',
broker_connection_retry_on_startup=True
)
@app.task
def hello(name):
return 'hello %s !' % name
app.conf.beat_schedule = {
'add-every-1-seconds': {
'task': 'tasks.hello',
'schedule': 1.0,
'args': ("beat",)
},
}
1
2
3
4
5
# 启动工作者,他会处于等待状态,等待任务到来
celery -A tasks worker --loglevel=info
# 启动定时任务调度器
celery -A tasks beat --loglevel=info
这时候其实是可以看看redis里的,里面有任务和任务执行的结果
和Django结合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from celery import Celery
# 用Django的配置文件来初始化Celery,这样可以使用Django的配置
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('celery')
# 这个是指定celery的配置文件。这样做可以和Django结合使用同一个配置文件,并且定义了一个命名空间
# 参考:https://docs.celeryq.dev/en/stable/userguide/configuration.html#configuration
# 大写命名空间意味着所有 Celery 配置选项 必须以大写而不是小写形式指定,并以 开头 CELERY_
app.config_from_object('django.conf:settings', namespace='CELERY')
# 用于自动发现任务 @shared_task 装饰器
app.autodiscover_tasks()
1
2
3
4
5
6
7
8
9
10
11
# django的配置文件 settings.py
# Celery配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
# 设置Celery结果后端
# CELERY_RESULT_BACKEND = 'rpc://'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
并发
Celery 中的并发性支持任务的并行执行。默认模型prefork适用于许多场景,通常推荐大多数用户使用。事实上,切换到另一种模式将默默禁用某些功能
prefork:默认选项,非常适合 CPU 密集型任务和大多数用例。除非有其他模型的特定需求,否则它是可靠的且推荐使用的。
eventlet和gevent:这些模型专为 IO 密集型任务而设计,使用 greenlet 实现高并发性。请注意,某些功能(如soft_timeout)在这些模式下不可用。这些模型有详细的文档页面链接。
solo:在主线程中按顺序执行任务。
threads:利用线程进行并发,如果 存在concurrent.futures模块则可用。
custom:允许通过环境变量指定自定义工作池实现。
其他
不光如此,他还有监控界面,可以监控任务执行情况,也可以查看任务执行结果。
1
2
3
4
5
6
7
# 安装
pip install flower
# 启动 默认端口是http://localhost:5555 ,但您可以使用–port参数更改它
celery -A proj flower
# celery 事件:Curses 监视器
celery -A proj events