当前位置: 移动技术网 > IT编程>脚本编程>Python > Python—Celery 框架使用

Python—Celery 框架使用

2019年12月14日  | 移动技术网IT编程  | 我要评论

白石美咲,省钱屋,2014074

一、celery 核心模块

1. brokers

brokers 中文意思为中间人,在这里就是指任务队列本身,接收生产者发来的消息即task,将任务存入队列。任务的消费者是worker,brokers 就是生产者和消费者存放/拿取产品的地方(队列)。celery 扮演生产者和消费者的角色。

常见的 brokers 有 rabbitmq、redis、zookeeper 等。推荐用redis或rabbitmq实现队列服务。

2. workers

就是 celery 中的工作者,执行任务的单元,类似与生产/消费模型中的消费者。它实时监控消息队列,如果有任务就从队列中取出任务并执行它。

3. backend / result stores

用于存储任务的执行结果。队列中的任务运行完后的结果或者状态需要被任务发送者知道,那么就需要一个地方储存这些结果,就是 result stores 了。

常见的 backend 有 redis、memcached 甚至常用的数据库都可以。

4. tasks

就是想在队列中进行的任务,有异步任务和定时任务。一般由用户、触发器或其他操作将任务入队,然后交由 workers 进行处理。

5. beat

定时任务调度器,根据配置定时将任务发送给brokers。

二、celery 基本使用 

1.创建一个celery application 用来定义你的任务列表,创建一个任务文件就叫tasks.py吧。

from celery import celery

# 配置好celery的backend和broker 
app = celery('task1',  backend='redis://127.0.0.1:6379/0', broker='redis://127.0.0.1:6379/0')
 
#普通函数装饰为 celery task
@app.task  
def add(x, y):
    return x + y

如此而来,我们只是定义好了任务函数func函数和worker(celery对象)。worker相当于工作者。

2.启动celery worker来开始监听并执行任务。broker 我们有了,backend 我们有了,task 我们也有了,现在就该运行 worker 进行工作了,在 tasks.py 所在目录下运行:

[root@localhost ~]# celery -a tasks worker --loglevel=info    # 启动方法1 
[root@localhost ~]# celery -a tasks worker --l debug          # 启动方法2

现在 tasks 这个任务集合的 worker 在进行工作(当然此时broker中还没有任务,worker此时相当于待命状态),如果队列中已经有任务了,就会立即执行。

3.调用任务:要给worker发送任务,需要调用 delay() 方法。

import time
from tasks import add

# 不要直接add(6, 6),这里需要用 celery 提供的接口 delay 进行调用
result = add.delay(6, 6) 
while not result.ready():
    time.sleep(1)
print('task done: {0}'.format(result.get()))

三、celery 进阶使用

1.celery_config.py:配置文件

from __future__ import absolute_import, unicode_literals
#从python的绝对路径导入而不是当前的脚本     #在python2和python3做兼容支持的

broker_url = 'redis://127.0.0.1:6379/0'
# 指定结果的接受地址
celery_result_backend = 'redis://127.0.0.1:6379/0'

2.tasks.py

from __future__ import absolute_import, unicode_literals
#从python的绝对路径导入而不是当前的脚本     #在python2和python3做兼容支持的
from celery import celery

# 配置好celery的backend和broker, task1:app的名字。broker
app = celery('task1',                              # 
             broker='redis://127.0.0.1:6379/0',   # 消息队列:连rabbitmq或redis
             backend='redis://127.0.0.1:6379/0')  # 存储结果:redis或mongo或其他数据库
 
app.config_from_object("celery_config")
app.conf.update(         # 给app设置参数
    result_expires=3600, # 保存时间为1小时
)

#普通函数装饰为 celery task
@app.task  
def add(x, y):
    return x + y
    
if __name__ == '__main__':
    app.start()

3.启动worker

[root@localhost ~]# celery -a tasks worker --loglevel=info

4.test.py

# -*- coding:utf-8 -*-
import time
from tasks import add

# 不要直接add(4, 4),这里需要用 celery 提供的接口 delay 进行调用
result = add.delay(6, 6) 
print(result.id)
while not result.ready():
    time.sleep(1)
print('task done: {0}'.format(result.get()))

四、celery 定时任务

参考:

参考:

参考:

参考:

参考:

参考:

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网