python拓展7(Celery消息队列配置定时任务)

介绍

celery 定时器是一个调度器(scheduler);它会定时地开启(kicks off)任务,然后由集群中可用的工人(worker)来执行。

定时任务记录(entries)默认 从 beat_schedule 设置中获取,但自定义存储也可以使用,如把记录存储到SQL数据库中。

要确保同一时间一份时间表上只有一个调度器在运行,否则会因为重复发送任务而结束。使用集中途径意味着定时任务不用必须同步,并且服务无需用锁操控。

  • user:用户程序,用于告知celery去执行一个任务。
  • broker: 存放任务(依赖RabbitMQ或Redis,进行存储)
  • worker:执行任务

celery需要rabbitMQ、Redis、Amazon SQS、Zookeeper(测试中) 充当broker来进行消息的接收,并且也支持多个broker和worker来实现高可用和分布式。http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html

Celery version 4.0 runs onPython ❨2.7, 3.4, 3.5❩PyPy ❨5.4, 5.5❩This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.If you’re running an older version of Python, you need to be running an older version of Celery:Python 2.6: Celery series 3.1 or earlier.Python 2.5: Celery series 3.0 or earlier.Python 2.4 was Celery series 2.2 or earlier.Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
版本和要求

环境准备:

  • 安装rabbitMQ或Redis
    https://www.cnblogs.com/L5251/articles/9146825.html

     https://www.cnblogs.com/L5251/articles/9325586.html

  • 安装celery
         pip3 install celery

快速上手

import time
from celery import Celeryapp = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379')@app.task
def xxxxxx(x, y):time.sleep(10)return x + y
s1.py
s2.py
from celery.result import AsyncResult
from s1 import appasync = AsyncResult(id="f0b41e83-99cf-469f-9eff-74c8dd600002", app=app)if async.successful():result = async.get()print(result)# result.forget() # 将结果删除
elif async.failed():print('执行失败')
elif async.status == 'PENDING':print('任务等待中被执行')
elif async.status == 'RETRY':print('任务异常后正在重试')
elif async.status == 'STARTED':print('任务已经开始被执行')
s3.py
# 执行 s1.py 创建worker(终端执行命令):
celery worker -A s1 -l info
# PS:Windows系统上执行命令时出错解决方法
    pip3 install eventlet
# 后期运行修改为:celery worker -A s1 -l info -P eventlet
# 执行 s2.py ,创建一个任务并获取任务ID:
    python3 s2.py# 执行 s3.py ,检查任务状态并获取结果:python3 s3.py

多任务结构

pro_cel├── celery_tasks# celery相关文件夹│   ├── celery.py   # celery连接和配置相关文件│   └── tasks.py    #  所有任务函数├── check_result.py # 检查结果└── send_task.py    # 触发任务
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celerycelery = Celery('xxxxxx',broker='redis://192.168.0.111:6379',backend='redis://192.168.0.111:6379',include=['celery_tasks.tasks'])# 时区
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = False
pro_cel/celery_tasks/celery
#!/usr/bin/env python
# -*- coding:utf-8 -*-import time
from .celery import celery@celery.task
def xxxxx(*args, **kwargs):time.sleep(5)return "任务结果"@celery.task
def hhhhhh(*args, **kwargs):time.sleep(5)return "任务结果"
pro_cel/celery_tasks/tasks.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-from celery.result import AsyncResult
from celery_tasks.celery import celeryasync = AsyncResult(id="ed88fa52-11ea-4873-b883-b6e0f00f3ef3", app=celery)if async.successful():result = async.get()print(result)# result.forget() # 将结果删除
elif async.failed():print('执行失败')
elif async.status == 'PENDING':print('任务等待中被执行')
elif async.status == 'RETRY':print('任务异常后正在重试')
elif async.status == 'STARTED':print('任务已经开始被执行')
pro_cel/check_result.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import celery_tasks.tasks# 立即告知celery去执行xxxxxx任务,并传入两个参数
result = celery_tasks.tasks.xxxxx.delay(4, 4)print(result.id)
pro_cel/send_task.py

更多配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html

定时任务

1. 设定时间让celery执行一个任务

import datetime
from celery_tasks.tasks import xxxxx
"""
from datetime import datetimev1 = datetime(2017, 4, 11, 3, 0, 0)
print(v1)v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)"""
ctime = datetime.datetime.now()
utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())s10 = datetime.timedelta(seconds=10)
ctime_x = utc_ctime + s10# 使用apply_async并设定时间
result = xxxxx.apply_async(args=[1, 3], eta=ctime_x)
print(result.id)

2. 类似于contab的定时任务

"""
celery beat -A proj
celery worker -A proj -l info"""
from celery import Celery
from celery.schedules import crontabapp = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672', include=['proj.s1', ])
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = Falseapp.conf.beat_schedule = {# 'add-every-10-seconds': {#     'task': 'proj.s1.add1',#     'schedule': 10.0,#     'args': (16, 16)# },'add-every-12-seconds': {'task': 'proj.s1.add1','schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),'args': (16, 16)},
}

注:如果想要定时执行类似于crontab的任务,需要定制 Scheduler来完成。

Flask中应用Celery

pro_flask_celery/
├── app.py
├── celery_tasks├── celery.py└── tasks.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-from flask import Flask
from celery.result import AsyncResultfrom celery_tasks import tasks
from celery_tasks.celery import celeryapp = Flask(__name__)TASK_ID = None@app.route('/')
def index():global TASK_IDresult = tasks.xxxxx.delay()# result = tasks.task.apply_async(args=[1, 3], eta=datetime(2018, 5, 19, 1, 24, 0))TASK_ID = result.idreturn "任务已经提交"@app.route('/result')
def result():global TASK_IDresult = AsyncResult(id=TASK_ID, app=celery)if result.ready():return result.get()return "xxxx"if __name__ == '__main__':app.run()
app.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from celery import Celery
from celery.schedules import crontabcelery = Celery('xxxxxx',broker='redis://192.168.10.48:6379',backend='redis://192.168.10.48:6379',include=['celery_tasks.tasks'])# 时区
celery.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
celery.conf.enable_utc = False
celery_tasks/celery.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-import time
from .celery import celery@celery.task
def hello(*args, **kwargs):print('执行hello')return "hello"@celery.task
def xxxxx(*args, **kwargs):print('执行xxxxx')return "xxxxx"@celery.task
def hhhhhh(*args, **kwargs):time.sleep(5)return "任务结果"
celery_task/tasks.py

记录

为了定时调用任务,你必须添加记录到打点列表中:

from celery import Celery
from celery.schedules import crontabapp = Celery()@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):# 每10秒调用 test('hello') .sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')# 每30秒调用 test('world') sender.add_periodic_task(30.0, test.s('world'), expires=10)# 每周一上午7:30执行
    sender.add_periodic_task(crontab(hour=7, minute=30, day_of_week=1),test.s('Happy Mondays!'),)@app.task
def test(arg):print(arg)

用on_after_configure处理器进行这些设置意味着当使用test.s()时我们不会在模块层面运行app 。

add_periodic_task() 函数在幕后会添加记录到beat_schedule设定,同样的设定可以用来手动设置定时任务:

例子: 每30秒运行 tasks.add .

app.conf.beat_schedule = {'add-every-30-seconds': {'task': 'tasks.add','schedule': 30.0,'args': (16, 16)},
}
app.conf.timezone = 'UTC'

一般会使用配置文件进行配置,如下 
celeryconfig.py:

broker_url = 'pyamqp://'
result_backend = 'rpc://'task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
beat_schedule = {'add-every-30-seconds': {'task': 'tasks.add','schedule': 30.0,'args': (16, 16)},
}

程序里使用:

app.config_from_object('celeryconfig')注意
如果你的参数元组里只有一个项目,只用一个逗号就可以了,不要圆括号。

时间表使用时间差意味着每30秒间隔会发送任务(第一个任务在celery定时器开启后30秒发送,然后上每次距一次运行后30秒发送一次)

可使用的属性

    • task

      要执行的任务名字

    • schedule

      执行的频率

      可以是整数秒数,时间差,或者一个周期( crontab)。你也可以自 定义你的时间表类型,通过扩展schedule接口。

    • args

      位置参数 (list 或 tuple).

    • kwargs

      键值参数 (dict).

    • options

      执行选项 (dict).

      这可以是任何被apply_async()支持的参数与—-exchange, routing_key, expires,等。

    • relative

      如果 relative 是 true ,时间表“由时钟时间”安排,意味着 频率近似到最近的秒,分钟,小时或天,这取决于时间差中的时间间隔。 
      默认relative是false,频率不会近似,会相对于celery的启动时间。

      Crontab 表达式语法非常灵活。

例子含义
crontab()每分钟执行
crontab(minute=0, hour=0)每天午夜执行
crontab(minute=0, hour=’*/3’)每三个小时执行: 午夜, 3am, 6am, 9am, 正午, 3pm, 6pm, 9pm.
crontab(minute=0,hour=’0,3,6,9,12,15,18,21’)同上
crontab(minute=’*/15’)每15分钟执行
crontab(day_of_week=’sunday’)星期日每分钟
crontab(minute=’‘,hour=’‘, day_of_week=’sun’)同上
crontab(minute=’*/10’,hour=’3,17,22’, day_of_week=’thu,fri’)每10分钟执行,仅限于周六日3-4 am, 5-6 pm, and 10-11 pm
crontab(minute=0, hour=’/2,/3’)偶数小时或者能被3整除的小时数执行
crontab(minute=0, hour=’*/5’)被5整除的小时数,如3pm
crontab(minute=0, hour=’*/3,8-17’)8am-5pm能被3整除的
crontab(0, 0, day_of_month=’2’)每月第2天
crontab(0, 0,day_of_month=’2-30/3’)每偶数天
crontab(0, 0,day_of_month=’1-7,15-21’)每月1和3周
crontab(0, 0, day_of_month=’11’,month_of_year=’5’)每年5月11日
crontab(0, 0,month_of_year=’*/3’)每个季度第1月

开启调度

开启celery定时服务:

$ celery -A proj beat

可以把定时器嵌入到工人(worker)中,通过启用workers -B选项,如果你永远不会运行超过一个工人节点这就会很方便。但这不太常见,不推荐在生产环境这样使用:

$ celery -A proj worker -B

定时器需要在本地数据库文件(默认名为 celerybeat-schedule )存储任务上次运行时间,所以它需要在当前目录中写权限。或者你也可以给这个文件指定一个位置:

$ celery -A proj beat -s /home/celery/var/run/celerybeat-schedule

 

转载于:https://www.cnblogs.com/L5251/articles/9332304.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/279832.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

chrome连接已重置_如何重置(或调整)Chrome的下载设置

chrome连接已重置By default, Chrome saves all downloaded files to the same location—a dedicated “Downloads” folder. The thing is, this isn’t always practical for all types of download files. The good news is you can easily tweak this setting. 默认情况下…

.Net 7 团队把国内的龙芯确实当做一等公民和弃用的项目

楔子:国内龙芯据说是用的自己的指令集,在研究ILC的时候,发现了龙芯在微软那边确实是一等公民的存在。同X64,ARM,X86一同并列交叉编译和二进制提取。龙芯官网龙芯平台.NET,是龙芯公司基于开源社区.NET独立研发适配的龙芯版本&#…

戴尔押宝iSCSI,由低到高组合成型

戴尔(Dell)是较早接受SAS技术的主流存储厂商之一,2006年已推出采用SAS硬盘驱动器的SAS直连存储(DAS)系统PowerVault MD3000。一年之后,主机连接改用iSCSI的PowerVault MD3000i问世。2008年1月,E…

word中插入公式的快捷键_如何使用插入键在Word中插入复制的内容

word中插入公式的快捷键In Word, the “Insert” key on the keyboard can be used to switch between Insert and Overtype modes. However, it can also be used as a shortcut key for inserting copied or cut content at the current cursor position. 在Word中&#xff0…

微软终于为 Visual Studio 添加了内置的 Markdown 编辑器

微软终于为 Visual Studio 添加了内置的 Markdown 编辑器。根据官方博客的介绍,由于收到许多用户的反馈,微软决定为 Visual Studio 添加 Markdown 编辑器。开发者下载最新的 Visual Studio 17.5 第 2 个预览版就能够使用 Markdown 编辑功能,无…

【经验分享】Hydra(爆破神器)使用方法

这个也是backtrack下面很受欢迎的一个工具 参数详解:-R 根据上一次进度继续破解-S 使用SSL协议连接-s 指定端口-l 指定用户名-L 指定用户名字典(文件)-p 指定密码破解-P 指定密码字典(文件)-e 空密码探测和指定用户密码探测(ns)-C 用户名可以用:分割(username:passw…

【东软实训】SQL多表链接

如果一个查询同时涉及两个以上的表,则称之为链接查询,链接查询是关系数据库中最主要的查询,主要包括等值链接查询、非等值链接查询、自身链接查询、外链接查询和复合条件链接查询。 这篇博文我们来对多表链接进行学习。 Outline 链接的基本概…

博鳌“‘AI+时代’来了吗”分论坛,嘉宾们有何重要观点?...

雷锋网(公众号:雷锋网)3月27日消息,正在进行中的博鳌亚洲论坛2019年年会,于2019年3月26日至29日在中国海南博鳌举办。今年博鳌论坛的主题为“共同命运 共同行动 共同发展”。今天,在主题为《“AI时代”来了吗?》分论坛…

一款统计摸鱼时长的开源项目

对于我们程序员,在工作中一天8小时,不可能完全在写代码了,累了刷刷论坛、群里吹吹牛,这都是非常正常的。虽然一天下来,可能我们都可以按时完成工作,但是我们不知道,时间都花在哪里了&#xff0c…

saltstack 主题说明

转载于:https://www.cnblogs.com/40kuai/p/9335869.html

基于spring boot 的ssm项目的简单配置

2019独角兽企业重金招聘Python工程师标准>>> 我前面的帖子有介绍spring boot的简单搭建,现在我再讲讲spring boot的简单配置 首先,项目结构 启动类 RestController 注解相当于ResponseBody + Controller合在一起的作用。 Sprin…

nest 架构_如何与其他人分享您的Nest Cam Feed

nest 架构Your Nest Cam can help you keep an eye on your home from anywhere you are, but more eyes you trust to watch your stuff is more comforting. If you want someone else to check in once in a while, you can share your Nest Cam feed with a simple, passwo…

.Net 和Assembly下滑其它回升,TIOBE编程语言2022年12排行榜

楔子TIOBE编程语言排行榜一般反应的是语言的生态,个人比较喜欢这个排行。来看下2022年最后一个月12月,最后一天,TIOBE的排行榜单。榜单分析这里只看下前10名的编程语言,里面非常显眼的是所有的语言都增加了生态环境,包括不被看好的…

Haproxy安装与配置

Haproxy安装与配置 有关高负载均衡的软件,目前使用比较多的是haproxy、nginx和lvs。下面我们就开始学习haprxoy这款软件。 1、Haproxy概念 1.1、haproxy原理 haproxy提供高可用性、负载均衡以及基于TCP(第四层)和HTTP(第七层)应用的代理&…

删除word中所有的表格_如何在Word中删除表格

删除word中所有的表格If you’ve inserted a table in Word and you now want to delete it, you may have found it’s not all that straightforward to delete the entire table without deleting other content around the table. We’ll show you a couple of ways around…

Jenkins在windows平台下,让Powershell和批处理可以拉起进程并保持

📢欢迎点赞 :👍 收藏 ⭐留言 📝 如有错误敬请指正,赐人玫瑰,手留余香!📢本文作者:由webmote 原创📢作者格言:无尽的折腾后,终于又回到…

MVC身份验证及权限管理(转载)

from https://www.cnblogs.com/asks/p/4372783.html MVC自带的ActionFilter 在Asp.Net WebForm的中要做到身份认证微软为我们提供了三种方式&#xff0c;其中最常用的就是我们的Form认证&#xff0c;需要配置相应的信息。例如下面的配置信息&#xff1a; <authentication mo…

WPF-23 基于Timer任务调度

.NET的FCL中提供了几个计时器&#xff0c;大多数初学者都不清楚他们有什么不同&#xff0c;那我们这节来剖解一下每个计时器的本质&#xff1a;1.System.Threading.Timer如果在一个线程池上执行一个定时的周期性的后台线程任务他是最好的选择&#xff0c;这个类是和线程池相关联…

在.NET中不安装Office使用EPPlus生成带图表(Chart)的Excel报表

在开发.NET应用中可能会遇到需要生成带图表(Chart)的Excel报表的需求&#xff0c;特别是在一些ASP.NET网站中&#xff0c;有时候我们并不能保证Web服务器上一定安装了Office组件&#xff0c;所以使用微软的Office来生成Excel并不保证在所有情况下都使用&#xff0c;有时候即使W…

facebook 邀请好友_如何在Facebook上与某人解除好友

facebook 邀请好友It’s very easy for your Facebook News Feed to get cluttered. After a few years adding ukulele playing magicians you meet wandering the street and the bar staff at every bar you go to regularly, it gets overrun with people you’ll never se…