文章目录
- 1、简介
- 2、安装和下载
- 2.1 flask
- 2.2 celery
- 2.3 redis
- 3、功能开发
- 3.1 创建异步任务的方法
- 3.1.1 使用默认的参数
- 3.1.2 指定相关参数
- 3.1.3 自定义Task基类
- 3.2 调用异步任务的方法
- 3.2.1 app.send_task
- 3.2.2 Task.delay
- 3.2.3 Task.apply_async
- 3.3 获取任务结果和状态
- 4、入门示例(celery)
- 4.1 新建任务task
- 4.2 新建应用app
- 4.3 执行命令
- 5、更多示例(celery)
- 5.1 例子1
- 5.2 例子2
- 5.3 例子3
- 5.4 例子4
- 6、扩展示例(celery+flask)
- 6.1 例子1
- 6.2 例子2
- 6.3 例子3
- 结语
1、简介
Celery 是一个简单、灵活且可靠的分布式系统,用于 处理大量消息,同时为操作提供 维护此类系统所需的工具。专注于实时处理的任务队列,同时也 支持任务调度。
Celery 拥有庞大而多样化的用户和贡献者社区, 你应该来加入我们的IRC或我们的邮件列表。
Celery 是开源的,并在 BSD 许可证下获得许可。
-
celery的架构由三部分组成:
-
消息中间件(broker):
任务调度队列,用于存放task,接收任务生产者发来的消息(即任务),将任务tasks存入队列。Celery 本身不提供队列服务,但是可以方便的和第三方提供的消息中间件集成,官方推荐使用 RabbitMQ 和 Redis。 -
任务执行单元(worker)
Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。 -
任务执行结果存储(backend)
Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, Redis 和 MongoDB 等。
-
Celery 采用典型生产者和消费者模型。生产者提交任务到任务队列,众多消费者从任务队列中取任务执行。
2、安装和下载
查看安装的库的信息如下:
pip list
2.1 flask
Flask 是一款使用 Python 编写的轻量级 Web 应用框架,它基于 Werkzeug WSGI 工具箱和 Jinja2 模板引擎。Flask 由 Armin Ronacher 开发,其目标是提供一个简单、灵活且易于扩展的框架,可以帮助开发人员快速构建 Web 应用程序。
pip install Flask
2.2 celery
Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理。
Celery 在执行任务时需要通过一个消息中间件(Broker)来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis。
pip install celery
#pip install -U Celery
#pip install "celery[librabbitmq]"
#pip install "celery[librabbitmq,redis,auth,msgpack]"
2.3 redis
https://redis.io/download/
REmote DIctionary Server(Redis) 是一个由 Salvatore Sanfilippo 写的 key-value 存储系统,是跨平台的非关系型数据库。
Redis 是一个开源的使用 ANSI C 语言编写、遵守 BSD 协议、支持网络、可基于内存、分布式、可选持久性的键值对(Key-Value)存储数据库,并提供多种语言的 API。
Redis 通常被称为数据结构服务器,因为值(value)可以是字符串(String)、哈希(Hash)、列表(list)、集合(sets)和有序集合(sorted sets)等类型。
安装redis库:
pip install redis
3、功能开发
3.1 创建异步任务的方法
任何被 task 修饰的方法都会被创建一个 Task 对象,变成一个可序列化并发送到远程服务器的任务;
3.1.1 使用默认的参数
@celery.task
def function_name():pass
示例如下:
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')@app.task
def add(x, y):return x+y
或
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)@app.task
def add(x, y):logger.info('Adding {0} + {1}'.format(x, y))return x + y
3.1.2 指定相关参数
@celery.task(bind=True, name='name')
def function_name():pass
示例如下:
name : 可以显式指定任务的名字;默认是模块的命名空间中本函数的名字。
bind : 一个bool值,设置是否绑定一个task的实例,如果绑定,task实例会作为参数传递到任务方法中,可以访问task实例的所有的属性,即前面反序列化中那些属性
# 当bind=True时,add函数第一个参数是self,指的是task实例
@task(bind=True) # 第一个参数是self,使用self.request访问相关的属性
def add(self, x, y):try:logger.info(self.request.id)except:self.retry() # 当任务失败则进行重试,也可以通过max_retries属性来指定最大重试次数
或
logger = get_task_logger(__name__)@app.task(bind=True)
def add(self, x, y):logger.info(self.request.id)
3.1.3 自定义Task基类
import celeryclass MyTask(celery.Task):# 任务失败时执行def on_failure(self, exc, task_id, args, kwargs, einfo):print('{0!r} failed: {1!r}'.format(task_id, exc))# 任务成功时执行def on_success(self, retval, task_id, args, kwargs):pass# 任务重试时执行def on_retry(self, exc, task_id, args, kwargs, einfo):pass@task(base=MyTask)
def add(x, y):raise KeyError()
或
import celeryclass MyTask(celery.Task):def on_failure(self, exc, task_id, args, kwargs, einfo):print('{0!r} failed: {1!r}'.format(task_id, exc))@app.task(base=MyTask)
def add(x, y):raise KeyError()
3.2 调用异步任务的方法
3.2.1 app.send_task
send_task 在发送的时候是不会检查 tasks.add 函数是否存在的,即使为空也会发送成功,所以 celery 执行是可能找不到该函数报错;
# tasks.py
from celery import Celery
app = Celery()def add(x, y):return x+y# app.py
app.send_task('tasks.add',args=[3,4]) # 参数基本和apply_async函数一样
3.2.2 Task.delay
delay 方法是 apply_async 方法的简化版,不支持执行选项,只能传递任务的参数。
# tasks.py
from celery import Celery
app = Celery()@app.task
def add(x, y, z=0):return x + y# app.py
add.delay(30, 40, z=5) # 包括位置参数和关键字参数
3.2.3 Task.apply_async
apply_async 支持执行选项,它会覆盖全局的默认参数和定义该任务时指定的执行选项,本质上还是调用了 send_task 方法;
# tasks.py
from celery import Celery
app = Celery()@app.task
def add(x, y, z=0):return x + y# app.py
add.apply_async(args=[30,40], kwargs={'z':5})
3.3 获取任务结果和状态
由于 celery 发送的都是去其他进程执行的任务,如果需要在客户端监控任务的状态,有如下方法:
r = task.apply_async()
r.ready() # 查看任务状态,返回布尔值, 任务执行完成, 返回 True, 否则返回 False.
r.wait() # 会阻塞等待任务完成, 返回任务执行结果,很少使用;
r.get(timeout=1) # 获取任务执行结果,可以设置等待时间,如果超时但任务未完成返回None;
r.result # 任务执行结果,未完成返回None;
r.state # PENDING, START, SUCCESS,任务当前的状态
r.status # PENDING, START, SUCCESS,任务当前的状态
r.successful # 任务成功返回true
r.traceback # 如果任务抛出了一个异常,可以获取原始的回溯信息
4、入门示例(celery)
4.1 新建任务task
- tasks.py
from celery import Celery#消息中间件(使用的redis)
broker = 'redis://localhost:6379/1'
#结果存储(使用的redis)
backend = 'redis://localhost:6379/2'
#实例化Celery对象
app = Celery('celeryDemo',broker=broker,backend=backend
)@app.task()
def add(x,y):print('task: add')return x+y
4.2 新建应用app
- app.py
from tasks import addif __name__ == '__main__':print('task start....')result = add.delay(2,3)print('task end....')print(result)
或者
#coding=utf-8
from tasks import addif __name__ == '__main__':# delay与apply_async生成的都是AsyncResult对象res = add.apply_async((123, 100))if res.successful():result = res.get()print(result)elif res.failed():print('任务失败')elif res.status == 'PENDING':print('任务等待中被执行')elif res.status == 'RETRY':print('任务异常后正在重试')elif res.status == 'STARTED':print('任务已经开始被执行')
4.3 执行命令
(1)运行redis服务器:
命令行执行如下程序。
redis-server.exe
(2)运行worker端:
##常规启动Worker
#celery -A tasks worker --loglevel=INFO ##Windows下启动Worker
#pip install eventlet
#celery -A tasks worker --loglevel=INFO -P eventlet
#celery -A tasks worker -l info -P eventlet -c 10celery -A tasks worker -l info -P threads
(3)运行 app.py文件:
python app.py
5、更多示例(celery)
5.1 例子1
- config.py
# config.py
# 设置配置
BROKER_URL = 'amqp://username:password@localhost:5672/yourvhost'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_TASK_SERIALIZER = 'msgpack'
CELERY_RESULT_SERIALIZER = 'msgpack'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERY_ACCEPT_CONTENT = ["msgpack"]
CELERY_DEFAULT_QUEUE = "default"
CELERY_QUEUES = {"default": { # 这是上面指定的默认队列"exchange": "default","exchange_type": "direct","routing_key": "default"}
}
- tasks.py
# tasks.py
from app import celery@celery.task
def add(x, y):return x + y@celery.task(name="sub")
def sub(x, y):return x - y
- app.py
# app.py --- 初始化celery对象
from celery import Celery
import config
from task import add, subcelery = Celery(__name__, include=["task"]) # 设置需要导入的模块
# 引入配置文件
celery.config_from_object(config)if __name__ == '__main__':add.apply_async((2,2), routing_key='default',priority=0,exchange='default')
5.2 例子2
- config.py
# coding: utf-8
from celery import Celerycelery_broker = 'amqp://guest@127.0.0.1//'
celery_backend = 'amqp://guest@127.0.0.1//'# Add tasks here
CELERY_IMPORTS = ('tasks',
)app = Celery('celery', broker=celery_broker,
backend=celery_backend, include=CELERY_IMPORTS)app.conf.update(CELERY_ACKS_LATE=True, # 允许重试CELERY_ACCEPT_CONTENT=['pickle', 'json'],CELERY_TASK_SERIALIZER='json',CELERY_RESULT_SERIALIZER='json',# 设置并发worker数量CELERYD_CONCURRENCY=4, # 每个worker最多执行500个任务被销毁,可以防止内存泄漏CELERYD_MAX_TASKS_PER_CHILD=500, BROKER_HEARTBEAT=0, # 心跳CELERYD_TASK_TIME_LIMIT=12 * 30, # 超时时间
)# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = True
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {'sub': {'task': 'tasks.sub','schedule': timedelta(seconds=3),# 每周一早八点# 'schedule': crontab(hour=8, day_of_week=1), 'args': (300, 150),}
}
- tasks.py
#coding=utf-8
from config import app
from celery.signals import worker_process_init, worker_process_shutdown@worker_process_init.connect
def init_worker(*args, **kwargs):# 初始化资源pass@worker_process_shutdown.connect
def release_worker(*args, **kwargs):# 释放资源pass# 普通函数装饰为 celery task
@app.task
def add(x, y):return x + y@app.task
def sub(x, y):return x - y
- main.py
#coding=utf-8import time
from tasks import addif __name__ == '__main__':a = time.time()# delay与apply_async生成的都是AsyncResult对象async = add.apply_async((1, 100))if async.successful():result = async.get()print(result)elif async.failed():print('任务失败')elif async.status == 'PENDING':print('任务等待中被执行')elif async.status == 'RETRY':print('任务异常后正在重试')elif async.status == 'STARTED':print('任务已经开始被执行')
5.3 例子3
- task1.py
# -*- coding: utf-8 -*-# 使用celery
import time
from celery import Celery
import redis# 创建一个Celery类的实例对象
app = Celery('celery_demo', broker='redis://127.0.0.1:6379/15')@app.task
def add(a, b):count = a + bprint('任务函数正在执行....')time.sleep(1)return count
- app1.py
import time
from task1 import adddef notity(a, b):result = add.delay(a, b)return resultif __name__ == '__main__':for i in range(5):time.sleep(1)result = notity(i, 100)print(result)
先执行celery命令:
celery -A task1 worker -l info -P eventlet
再执行worker命令:
python app1.py
5.4 例子4
- celery_config.py
#-*-coding=utf-8-*-
from __future__ import absolute_importfrom celery.schedules import crontab
# 中间件
BROKER_URL = 'redis://localhost:6379/6'# 结果存储CELERY_RESULT_BACKEND = 'redis://:127.0.0.1:6379/5'
# 默认worker队列
CELERY_DEFAULT_QUEUE = 'default'
# 异步任务
CELERY_IMPORTS = ("tasks"
)from datetime import timedelta
# celery beat
CELERYBEAT_SCHEDULE = {'add':{'task':'tasks.add','schedule':timedelta(seconds=10),'args':(1,12)},# 每10s执行一次'task1': {'task': 'tasks.add','schedule': timedelta(seconds=10),'args': (2, 8)},# 每天15:00执行'task2': {'task': 'tasks.add','schedule': crontab(hour=15, minute=0),'args': (9, 9)}
}
- celery_app.py
from __future__ import absolute_import
from celery import Celeryapp = Celery('celery_app')
app.config_from_object('celery_config')
- tasks.py
from celery_app import app@app.task(queue='default')
def add(x, y):return x + y@app.task(queue='default')
def sub(x, y):return x - y
- app.py
import sys, os# sys.path.append(os.path.abspath('.'))
sys.path.append(os.path.abspath('..'))
from tasks import adddef add_loop():ret = add.apply_async((1, 2), queue='default')print(type(ret))return retif __name__ == '__main__':ret = add_loop()print(ret.get())print(ret.status)
-
运行命令
(1)终端输入:celery -A celery_app worker -Q default --loglevel=info
(2)终端输入:celery -A celery_app beat
(3)终端执行:python app.py -
启动worker
celery -A celery_app worker --loglevel=info
#celery -A celery_app worker --loglevel=info --concurrency=10
- 启动定时任务配置:
celery -A celery_app beat --loglevel=info
- 同时启动worker和beat:
celery -A celery_app worker --beat --loglevel=info
6、扩展示例(celery+flask)
flask是一个阻塞式的框架。这里的“阻塞”是指flask处理请求的时候,一次只能处理一个,当多个requests过来,flask会说,大家不要急,一个一个来。
6.1 例子1
- mycelery.py
from celery import Celerydef make_celery(app):celery = Celery( #实例化Celery'tasks',broker='redis://localhost:6379/1', #使用redis为中间人backend='redis://localhost:6379/2' #结果存储)class ContextTask(celery.Task): #创建ContextTask类并继承Celery.Task子类def __call__(self, *args, **kwargs): with app.app_context(): #和Flask中的app建立关系return self.run(*args, **kwargs) #返回任务celery.Task = ContextTask #异步任务实例化ContextTaskreturn celery #返回celery对象
- tasks.py
import time
from app import celery@celery.task #使用异步任务装饰器task
def add(x, y):time.sleep(5) #休眠5秒return x + y
- app.py
from flask import Flask
import tasks
from mycelery import make_celeryapp = Flask(__name__)
celery = make_celery(app) #调用make_celery方法并传入app使celery和app进行关联@app.route('/')
def hello():tasks.add.delay(1,2) #调用tasks文件中的add()异步任务方法return '请求正在后台处理中,您可以去处理其他事情'if __name__ == '__main__':app.run(debug=True)
在终端执行如下代码运行Celery命令:
celery -A tasks worker -l info -P eventlet -c 10
启动Flask程序,访问http://127.0.0.1:5000/后在终端查Worker服务:
http://127.0.0.1:5000
6.2 例子2
- main.py
from flask import Flask
from celery import Celery
from celery.result import AsyncResult
import timeapp = Flask(__name__)
# 用以储存消息队列
app.config['CELERY_BROKER_URL'] = 'redis://127.0.0.1:6379/11'
# 用以储存处理结果
app.config['CELERY_RESULT_BACKEND'] = 'redis://127.0.0.1:6379/12'celery_ = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])
celery_.conf.update(app.config)@celery_.task
def task_add(arg1, arg2):# 两数相加time.sleep(10)return arg1+arg2@app.route("/sum/<arg1>/<arg2>")
def route_sum(arg1,arg2):# 发送任务到celery,并返回任务ID,后续可以根据此任务ID获取任务结果result = task_add.delay(int(arg1),int(arg2))return result.id@app.route("/get_result/<result_id>")
def route_result(result_id):# 根据任务ID获取任务结果result = celery_.AsyncResult(id=result_id)return str(result.get())if __name__ == "__main__":app.run(debug=True)
在终端执行如下代码运行Celery命令:
celery -A main.celery_ worker -l info -P eventlet -c 10
启动Flask程序,访问http://127.0.0.1:5000/后在终端查Worker服务:
http://127.0.0.1:5000
通过链接发送任务,并获得任务ID:
http://127.0.0.1:5000/sum/1/2
通过下面这个链接可以获得任务结果:
http://127.0.0.1:5000/get_result/105fb6d4-67ae-46ab-8c84-77381821a43c
另外可以通过flower模块来监控celery的任务:
flower --basic_auth=admin:admin --broker=redis://127.0.0.1:6379/0 --address=0.0.0.0 --port=5556
# celery flower --broker=redis://localhost:6379/6
http://127.0.0.1:5556
6.3 例子3
- main.py
from flask import Flask, jsonify
from celery import create_app, shared_taskapp = Flask(__name__)
app.config.update(CELERY_BROKER_URL='redis://localhost:6379/0',CELERY_RESULT_BACKEND='redis://localhost:6379/0'
)celery = create_app(app)@shared_task(bind=True)
def add(self, x, y):return x + y@app.route('/')
def index():return 'Hello, World!'@app.route('/add/<int:x>/<int:y>')
def add_route(x, y):task = add.delay(x, y)return jsonify({'task_id': task.id})@app.route('/result/<task_id>')
def get_result_route(task_id):task = add.AsyncResult(task_id)if task.state == 'SUCCESS':return jsonify({'result': task.result})else:return jsonify({'status': task.state})if __name__ == '__main__':app.run(debug=True)
结语
如果您觉得该方法或代码有一点点用处,可以给作者点个赞,或打赏杯咖啡;
╮( ̄▽ ̄)╭
如果您感觉方法或代码不咋地
//(ㄒoㄒ)//,就在评论处留言,作者继续改进;
o_O???
如果您需要相关功能的代码定制化开发,可以留言私信作者;
(✿◡‿◡)
感谢各位大佬童鞋们的支持!
( ´ ▽´ )ノ ( ´ ▽´)っ!!!