Celery的实践指南

Celery的实践指南
celery原理:
celery实际上是实现了一个典型的生产者-消费者模型的消息处理/任务调度统,消费者(worker)和生产者(client)都可以有任意个,他们通过消息系统(broker)来通信。
典型的场景为:
  1. 客户端启动一个进程(生产者),当用户的某些操作耗时较长或者比较频繁时,考虑接入本消息系统,发送一个task任务给broker。
  2. 后台启动一个worker进程(消费者),当发现broker中保存有某个任务到了该执行的时间,他就会拿过来,根据task类型和参数执行。
实践中的典型场景:
  1. 简单的定时任务:
    1. 替换crontab的celery写法:

      1. from celery import Celery
        from celery.schedules import crontab

        app = Celery("tasks", backend="redis://localhost", broker="redis://localhost")

        app.conf.update(CELERYBEAT_SCHEDULE = {
            "add": {
                "task": "celery_demo.add",
                "schedule": crontab(minute="*"),
                "args": (16, 16)
            },
        })

        @app.task
        def add(x, y):
            return x + y

    2. 运行celery的worker,让他作为consumer运行,自动从broker上获得任务并执行。
      1. `celery -A celery_demo worker`
    3. 运行celery的client,让其根据schedule,自动生产出task msg,并发布到broker上。
      1. `celery -A celery_demo beat`
    4. 安装并运行flower,方便监控task的运行状态
      1. `celery flower -A celery_demo`
      2. 或者设置登录密码 `
        celery flower -A celery_demo --basic_auth=user1:password1,user2:password2
  2. 多同步任务-链式任务-
  3. 失败自动重试的task
    1. 失败重试方法: 将task代码函数参数增加self,同时绑定bind。
    2. demo代码:
      1. @app.task(bind=True, default_retry_delay=300, max_retries=5)
        def my_task_A(self):
            try:
                print("doing stuff here...")
            except SomeNetworkException as e:
                print("maybe do some clenup here....")
                self.retry(e)
    3. 自动重试后,是否将任务重新入queue后排队,还是等待指定的时间?可以通过self.retry()参数来指定。
  4. 派发到不同Queue队列的task
    1. 一个task自动映射到多个queue中的方法, 通过配置task和queue的routing_key命名模式。
      1. 比如:把queue的exchange和routing_key配置成通用模式:
      2. 再定义task的routing_key的名称:
    2. 可用的不同exchange策略:
      1. direct:直接根据定义routing_key
      2. topic:exchange会根据通配符来将一个消息推送到多个queue。
      3. fanout:将消息拆分,分别推送到不同queue,通常用于超大任务,耗时任务。
    3. 参考:http://celery.readthedocs.org/en/latest/userguide/routing.html#routers
  5. 高级配置
    1. result是否保存
    2. 失败邮件通知:
    3. 关闭rate limit:
  6. auto_reload方法(*nix系统):
    1. celery通过监控源代码目录的改动,自动地进行reload
    2. 使用方法:1.依赖inotify(Linux) 2. kqueue(OS X / BSD)
    3. 安装依赖:
      $ pip install pyinotify
    4. (可选) 指定fsNotify的依赖:
      $ env CELERYD_FSNOTIFY=stat celery worker -l info --autoreload
    5. 启动: celery -A appname worker --autoreload
  7. auto-scale方法:
    1. 启用auto-scale
    2. 临时增加worker进程数量(增加consumer):
      $ celery -A proj control add_consumer foo -d worker1.local
    3. 临时减少worker进程数量(减少consumer):
  8. 将scheduled task的配置从app.conf变成DB的方法:
    1. 需要在启动时指定custom schedule 类名,比如默认的是: celery.beat.PersistentScheduler 。
      1. celery -A proj beat -S djcelery.schedulers.DatabaseScheduler
  9. 启动停止worker的方法:
    1. 启动 as daemon : http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#daemonizing
      1. root用户可以使用celeryd
      2. 非特权用户:celery multi start worker1 -A appName  —autoreload  --pidfile="$HOME/run/celery/%n.pid"  --logfile="$HOME/log/celery/%n.log"
      3. 或者 celery worker —detach
    2. 停止
    3. ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9
  10. 与Flask集成的方法
    1. 集成后flask将充当producer来创建并发送task给broker,在celery启动的独立worker进程将从broker中获得task并执行,同时将结果返回。
    2. flask中异步地获得task结果的方法:add.delay(x,y),有时需要对参数进行命名后传递 或者 add.apply_async(args=(x,y), countdown=30)
    3. flask获得
  11. 与flask集成后的启动问题
    1. 由于celery的默认routing_key是根据生产者在代码中的import级别来设定的,所以worker端在启动时应该注意其启动目录应该在项目顶级目录上,否者会出现KeyError。
  12. 性能提升: eventlet 和 greenlet
官方参考:http://docs.celeryproject.org/en/latest/userguide/index.html

转载于:https://www.cnblogs.com/ToDoToTry/p/5453149.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/255410.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【pyqt5学习】——bug修复,利用qt资源库qrc文件进行背景图像设置,不显示图像

目录 bug描述 bug分析 bug解决 bug描述 利用下面文章的方法进行资源设置后,有的显示了有的没有显示 【pyqt5学习】——pyqt5中.qrc资源文件的创建与编写_有情怀的机械男的博客-CSDN博客_python qrc目录一、说明二、安装pyqt5以及相关工具(pyqt5、pyuic…

Datalore:用于机器学习可视化的新Web方案!

前不久,JetBrains团队发布了Datalore,这是一款基于云的web应用程序,用于构建机器学习模型并在Python中创建丰富的可视化。最新的测试版本旨在简化构建机器学习模型的流程,并帮助开发人员进行数据分析。由于Datalore的智能编码辅助…

Val3语言介绍

Val3语言是一门专门针对特定工业机器人的一门语言。在墙内有关这方面的介绍非常的少。在墙外维基百科上有一篇详细介绍的,有空FQ摘抄在这里。 Val3是使用XML格式。可以使用XML编辑器来进行编写VAL3的程序,可直接在模拟器上或者示教盒上编程,一般推荐使用…

网络层相关问题

1.说一下网络层的总体结构。 2.有没有看过源码? A.继承Request   B.打日志   C.修改参数 3.说一下Volley整体结构。 4.有了解过OkHttp吗? 5.说一下OkHttp和Volley的区别?转载于:https://www.cnblogs.com/jarvisyin/p/6389553.html

CSS魔法堂:重拾Border之——图片作边框

前言 当CSS3推出border-radius属性时我们是那么欣喜若狂啊,一想到终于不用再添加额外元素来模拟圆角了,但发现border-radius还分水平半径和垂直半径,然后又发现border-top-left/right-radius的水平半径之和大于元素宽度时,实际值会…

共享内存简介和mmap 函数

一、共享内存简介 共享内存区是最快的IPC形式,这些进程间数据传递不再涉及到内核,换句话说是进程不再通过执行进入内核的系统调用来传递彼此的数据。 即每个进程地址空间都有一个共享存储器的映射区,当这块区域都映射到相同的真正的物理地址空…

【pyqt5学习】——QToolTip,QLabel控件,伙伴关系

目录 0、学习资源 1、给控件设置提示信息——QToolTip 2、QLabel控件(显示图像、设置超链接、信号绑定) 3、QLabel伙伴关系 1)代码 2)效果 3)知识点addWidget 0、学习资源 PyQt5教程,来自网易云课堂…

Hello IT

从高中的计算机课开始学习如何按下开机键,如何上网,如何背诵五笔字根,再到大学中如何使用office,利用C编码做算法,到现在IT中的伪一员,时间算起来也不短,然正在入门是在2010年8月杭州参加嵌入式培训&#x…

ARM指令集2

ARM指令集2 ARM微处理器支持加载/存储指令用于在寄存器和存储器之间传送数据,加载指令用于将存储器中的数据传送到寄存器,存储指令则完成相反的操作。 LDR指令(与MOV有区别,MOV只能操作通用寄存器) LDR指令格式为&…

SylixOS中select原理及使用分析

2019独角兽企业重金招聘Python工程师标准>>> 1. select接口简介 1.1 select接口使用用例 select是操作系统多路I/O复用技术实现的方式之一。 多路I/O复用技术大致使用场景为:构造一张感兴趣的文件描述符列表,然后调用多路复用的IO接口&#x…

【pyqt5学习】——QLineEdit学习(回显模式)

目录 1、回显模式 2、成果显示 3、知识点 1)FormLayout布局添加addRow方法 2)在输入框显示灰色提示字体,输入内容时消失setPlaceholderText 3)设置回显模式setEchoMode 4、完整代码 1、回显模式 QLineEdit控件的主要功能是输…

有关机械手臂控制中的两个重要输入参数

1.在机械手臂中有两个重要参数。一个是编码器的值,另外一个是马达的电流值。根据这两个可以获得机械手臂的运动学,动力学的一些数据。第一重要特征参数 是DH参数,另外一个就是每个轴的质心参数。

MySQL的权限分配

MySQL 赋予用户权限命令的简单格式可概括为:grant 权限 on 数据库对象 to 用户,如 GRANT PRIVILEGES ON datebase.* to user% IDENTIFIED by passwd;一、给表数据赋权 grant 普通数据用户,查询、插入、更新、删除 数据库中所有表数据的权利。…

用HttpURLConnection发送http请求

//发送http请求try {//1.使用网址构造一个URL对象 URL url new URL(path);//2.获取连接对象 HttpURLConnection conn (HttpURLConnection) url.openConnection();//3.设置一些属性 //设置请求方式,注意大写conn.setRequestMethod("GET");//设置请求超时…

【pyqt5学习】——QLineEdit控件输入校验器Validator、掩码setInputMask限制输入、textChanged信号

目录 1、输入校验器——限制输入框输入的内容 1)校验器类型——整数、浮点数、数字字母结合(正则) 2)步骤 3)结果 ​编辑 ​编辑 4)完整代码 2、利用掩码进行输入的限制 0)掩码对照表 1…

Call requires API level 3 (current min is 1)

结果出现“Call requires API level 3 (current min is 1): 解决方法: 在工程上点击右键 -> Android Tools -> Clear Lint Markers,即可。转载于:https://www.cnblogs.com/qianyukun/p/5458331.html

Product文本格式说明

使用txt进行产品信息的说明。 Product文本格式说明 //**************************************************** //产品信息 //固定标识符全部大写,全部在等号()前面 //****************************************************** PRODUCTTest //…

PyOpenCL图像处理:Box模糊

为什么80%的码农都做不了架构师?>>> # -*- coding: utf-8 -*-from __future__ import absolute_import, print_function import numpy as np import pyopencl as cl import cv2 from PIL import Imagedef RoundUp(groupSize, globalSize): r globalSi…

【python bug修复】——Script file ‘D:\softwares_install\Anaconda3\envs\PartTimes\Scripts\pip-scrip

目录 1、问题描述 2、问题解决 1)下载pip安装脚本 2) 运行安装pip脚本 3) 下载库 1、问题描述 利用pip命令进行库的安装时,突然出现这个问题,之前使用还好好的 Script file D:\softwares_install\Anaconda3\envs\P…

项目中的那些事---下载pdf文件

最近做了一个下载pdf文档的需求&#xff0c;本以为使用HTML5中<a>标签的属性download就能简单搞定&#xff0c;不料IE竟然不支持这一简单粗暴的H5新特性&#xff0c;而是直接在网页中打开&#xff0c; 于是各种搜索之后得出以下结论&#xff1a;IE中下载文档时&#xff0…