python celery使用队列

在celery的配置方法中有个参数叫task_routes,是用来设置不同的任务 消费不同的队列(也就是路由)。

格式如下:

{ ‘task name’: { ‘queue’: ‘queue name’ }}

直接上代码,简单明了,目录格式如下:

在这里插入图片描述

首先是配置文件 config.init.py

import os
import sys
from pathlib import PathBASE_DIR = Path(__file__).resolve().parent.parent
sys.path.append(str(BASE_DIR))class Config(object):"""配置文件基类"""""" 项目名称 """PROJECT_NAME = "crawler_worker"""" celery backend存放结果 """CELERY_BACKEND_URL = "redis://127.0.0.1:6379/4"""" celery broker中间件 """CELERY_BROKER_URL = "redis://127.0.0.1:6379/5"""" worker 名称 """CRAWL_SEND_EMAIL_TASK = "crawl_service.crawl.send_email_task"  # 抓取发送邮件任务CRAWL_SEND_MSG_TASK = "crawl_service.crawl.send_msg_task"  # 抓取发送短信任务settings = Config()

celery应用程序模块配置相关 celery_base.celery_app.py

import os
import sys
import time
import celery
from pathlib import PathBASE_DIR = Path(__file__).resolve().parent.parent
sys.path.append(str(BASE_DIR))from config import settings# 实例化celery对象
celery_app = celery.Celery(settings.PROJECT_NAME,backend=settings.CELERY_BACKEND_URL,broker=settings.CELERY_BROKER_URL,include=["tasks.crawl_send_email","tasks.crawl_send_msg",],
)# 任务路由
task_routes = {settings.CRAWL_SEND_EMAIL_TASK: {"queue": f"{settings.CRAWL_SEND_EMAIL_TASK}_queue"},settings.CRAWL_SEND_MSG_TASK: {"queue": f"{settings.CRAWL_SEND_MSG_TASK}_queue"},
}
# 任务去重
celery_once = {"backend": "celery_once.backends.Redis","settings": {"url": settings.CELERY_BACKEND_URL, "default_timeout": 60 * 60},
}
# 配置文件
celery_app.conf.update(task_serializer="json",result_serializer="json",accept_content=["json"],task_default_queue="normal",timezone="Asia/Shanghai",enable_utc=False,task_routes=task_routes,task_ignore_result=True,redis_max_connections=100,result_expires=3600,ONCE=celery_once,
)

抓取基类 crawl_worker_base.py

from celery_once import QueueOnceclass CrawlBase(QueueOnce):"""抓取worker基类"""name = Noneonce = {"graceful": True}ignore_result = True

发送邮件任务 crawl_send_email.py

import os
import sys
import time
import celery
from loguru import logger
from pathlib import PathBASE_DIR = Path(__file__).resolve().parent.parent
sys.path.append(str(BASE_DIR))from config import settings
from celery_base.celery_app import celery_app
from tasks.crawl_worker_base import CrawlBase"""执行命令:
celery -A tasks.crawl_send_email worker -l info -Q crawl_service.crawl.send_email_task_queue"""class SendEmailClass(CrawlBase):name = settings.CRAWL_SEND_EMAIL_TASKdef __init__(self, *args, **kwargs):super(SendEmailClass, self).__init__(*args, **kwargs)def run(self, name):logger.info("class的方式, 向%s发送邮件..." % name)time.sleep(5)logger.info("class的方式, 向%s发送邮件完成" % name)return f"成功拿到{name}发送的邮件!"send_email = celery_app.register_task(SendEmailClass())

发送短信 crawl_send_msg.py

import os
import sys
import time
import celery
from loguru import logger
from pathlib import PathBASE_DIR = Path(__file__).resolve().parent.parent
sys.path.append(str(BASE_DIR))
from config import settings
from celery_base.celery_app import celery_app
from tasks.crawl_worker_base import CrawlBase"""执行命令:
celery -A tasks.crawl_send_msg worker -l info -Q crawl_service.crawl.send_msg_task_queue"""class SendMsgClass(CrawlBase):name = settings.CRAWL_SEND_MSG_TASKdef __init__(self, *args, **kwargs):super(SendMsgClass, self).__init__(*args, **kwargs)def run(self, name):logger.info("class的方式, 向%s发送短信..." % name)time.sleep(5)logger.info("class的方式, 向%s发送短信完成" % name)return f"成功拿到{name}发送的短信!"send_msg = celery_app.register_task(SendMsgClass())

发送邮件任务-调度器 send_email_scheduler.py

import sys
from pathlib import PathBASE_DIR = Path(__file__).resolve().parent.parent
sys.path.append(str(BASE_DIR))from config import settings
from celery_base.celery_app import celery_appif __name__ == "__main__":for i in range(100):result = celery_app.send_task(name=settings.CRAWL_SEND_EMAIL_TASK, args=(f"张三嘿嘿{i}",))print(result.id)

发送短信任务-调度器 send_msg_scheduler.py

import os
import sys
import time
from pathlib import PathBASE_DIR = Path(__file__).resolve().parent.parent
sys.path.append(str(BASE_DIR))from config import settings
from celery_base.celery_app import celery_appif __name__ == "__main__":for i in range(100, 500):result = celery_app.send_task(name=settings.CRAWL_SEND_MSG_TASK, args=(f"李四哈哈哈{i}",))print(result.id)

准备工作已经做好,紧接着分别执行命令:

celery -A tasks.crawl_send_email worker -l info -Q crawl_service.crawl.send_email_task_queue
celery -A tasks.crawl_send_msg worker -l info -Q crawl_service.crawl.send_msg_task_queue

出现👇🏻下面效果就代表celery启动成功:

在这里插入图片描述

最后只要发送任务即可,在redis中就可以看到专门指定的两个队列了。

在这里插入图片描述

看下运行过程中的日志

在这里插入图片描述

一个简单的celery + 队列就实现了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/692488.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

阿里云服务器“镜像”全方面解析

阿里云服务器镜像怎么选择?云服务器操作系统镜像分为Linux和Windows两大类,Linux可以选择Alibaba Cloud Linux,Windows可以选择Windows Server 2022数据中心版64位中文版,阿里云服务器网aliyunfuwuqi.com来详细说下阿里云服务器操…

【触想智能】国产工控机五大分类你知道吗?

工控机是专门为工业控制设计,用于对生产过程中使用的机器设备、生产流程、数据参数等进行监测与控制的计算机。由于工控机经常会在恶劣的环境下运行,对数据的安全性要求也比较高,因此需要单独定制才能满足需求。 在现代化工业生产过程中&…

从零学算法240

240.编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性: 每行的元素从左到右升序排列。 每列的元素从上到下升序排列。 示例 1: 输入:matrix [[1,4,7,11,15],[2,5,8,12,19],[3,6,9,16,22],[10,13,14,…

代码随想录 Leetcode45. 跳跃游戏 II

题目&#xff1a; 代码(首刷看解析 2024年2月15日&#xff09;&#xff1a; class Solution { public:int jump(vector<int>& nums) {if (nums.size() 1) return 0;int res 0;int curDistance 0;int nextDistance 0;for (int i 0; i < nums.size(); i) {nex…

网页布局之浮动

一&#xff0c;传统网页布局的三种方式 普通流&#xff08;标准流&#xff09;、浮动、定位。 二&#xff0c;标准流&#xff08;普通流/文档流&#xff09; 即为标签按照规定好的默认方式排列。 1.块级元素会独占一行&#xff0c;从上向下顺序排列。 常用元素&#xff1a;…

Mybatis | 初识Mybatis

初识Mybatis 目录: 初识Mybatis什么是Mybatis&#xff1f;Hibernate 和 MyBatis的区别&#xff1f;Mybatis的下载和使用Mybatis的工作原理 作者简介 &#xff1a;一只大皮卡丘&#xff0c;计算机专业学生&#xff0c;正在努力学习、努力敲代码中! 让我们一起继续努力学习&#…

【动态规划专栏】专题二:路径问题--------3.礼物的最大价值

本专栏内容为&#xff1a;算法学习专栏&#xff0c;分为优选算法专栏&#xff0c;贪心算法专栏&#xff0c;动态规划专栏以及递归&#xff0c;搜索与回溯算法专栏四部分。 通过本专栏的深入学习&#xff0c;你可以了解并掌握算法。 &#x1f493;博主csdn个人主页&#xff1a;小…

算法模板 6.并查集

并查集 用于解决连通块问题。 判断集合个数也就是判断p[x] x 的个数 836. 合并集合 - AcWing题库 #include <bits/stdc.h> using namespace std; const int N 1e5 10; int p[N]; // 记录每个元素的father int n, m;int find(int x){ // 查询元素x的fatherif(p[x] …

[05] computed计算属性

computed计算属性 语法&#xff1a; 声明在 computed 配置项中&#xff0c;一个计算属性对应一个函数使用起来和普通属性一样使用 {{计算属性名}} 注意 computed配置项和data配置项是同级的computed中的计算属性虽然是函数的写法&#xff0c;但它依然是属性computed中的计算…

linux 修改开发板网卡eth0的ip地址

win10如何新增电脑ip地址&#xff1a; https://blog.csdn.net/linxinfa/article/details/105817473 ifconfig # 可设置网络设备的状态&#xff0c;或是显示目前的设置。 命令详解&#xff1a;https://www.runoob.com/linux/linux-comm-ifconfig.html 一、临时修改 ifconfig e…

MySQL篇之事务

一、事务 1.事务的定义 事务是一组操作的集合&#xff0c;它是一个不可分割的工作单位&#xff0c;事务会把所有的操作作为一个整体一起向系统提交或撤销操作请求&#xff0c;即这些操作要么同时成功&#xff0c;要么同时失败。 2.事务的特性 ACID的解释如下&#xff1a; 1. …

webpack实际实践优化项目

参考&#xff1a; 如何通过性能优化&#xff0c;将包的体积压缩了62.7% 雅虎35条 20210526-webpack深入学习&#xff0c;搭建和优化react项目 本文只专注于性能优化的这个部分。 总体来说分为两个方面&#xff1a;第一是开发环境中主要优化打包速度&#xff0c;第二是线上环境…

什么是特征值和特征向量?(done)

什么是齐次方程&#xff1f; https://blog.csdn.net/shimly123456/article/details/136198159 行列式和是否有解的关系&#xff1f; https://blog.csdn.net/shimly123456/article/details/136198215 特征值和特征向量 参考视频&#xff1a;https://www.bilibili.com/video/BV1…

怎么把Maven依赖的包都提取到文件夹

由于我用Maven只想借助他下载jar包和依赖&#xff0c;不想自己的程序编译还得依赖Maven&#xff0c;所以试验时候用Maven引用依赖&#xff0c;试验成功再拷贝jar包来本地引用&#xff0c;有时候依赖多&#xff0c;一个个拷贝就太慢了。所以要能批量把所有依赖jar提取出来。 加…

【C语言】长篇详解,字符系列篇2-----strcat,strcmp,strncpy,strncat,strncmp函数的使用和模拟实现【图文详解】

欢迎来CILMY23的博客喔&#xff0c;本期系列为【C语言】长篇详解&#xff0c;字符系列篇2-----“混杂”的字符串函数&#xff0c;字符串函数的使用和模拟实现【图文详解】&#xff0c;图文讲解各种字符串函数&#xff0c;带大家更深刻理解C语言中各种字符串函数的应用&#xff…

LeetCode.589. N 叉树的前序遍历

题目 589. N 叉树的前序遍历 分析 我们之前有做过LeetCode的 144. 二叉树的前序遍历&#xff0c;其实对于 N 插树来说和二叉树的思路是一模一样的。 二叉树的前序遍历是【根 左 右】 N叉树的前序遍历顺序是【根 孩子】&#xff0c;你可以把二叉树的【根 左 右】想象成【根 孩…

【python】网络爬虫与信息提取--scrapy爬虫框架介绍

一、scrapy爬虫框架介绍 scrapy是一个功能强大的网络爬虫框架&#xff0c;是python非常优秀的第三方库&#xff0c;也是基于python实现网络爬虫的重要技术路线。scrapy不是哟个函数功能库&#xff0c;而是一个爬虫框架。 爬虫框架&#xff1a;是实现爬虫功能的一个软件结构和功…

AS-V1000 视频监控平台产品介绍:客户端功能介绍(四)

目 录 一、引言 1.1 AS-V1000视频监控平台介绍 1.2平台服务器配置说明 二、软件概述 2.1 客户端软件用途 2.2 客户端功能 三、客户端功能说明 3.1告警管理 3.1.1告警联动 &#xff08;1&#xff09;告警联动显示 &#xff08;2&#xff09;告警联动处理 3…

基于学习的参数化查询优化方法

一、背景介绍 参数化查询是指具有相同模板&#xff0c;且只有谓词绑定参数值不同的一类查询&#xff0c;它们被广泛应用在现代数据库应用程序中。它们存在反复执行动作&#xff0c;这为其性能优化提供了契机。 然而&#xff0c;当前许多商业数据库处理参数化查询的方法仅仅只…

Uibot (RPA设计软件)智能识别信息+微信群发助手(升级版)———课后练习2

解决痛点&#xff1a; Excel如何计算两个日期之间相差月数 方法&#xff1a; 1、首先打开要进行操作的Excel表格。 2、打开后选中要计算相差月数的单元格。 3、然后输入公式&#xff1a;DATEDIF(A2,B2,"m")&#xff0c;输入完成后点击回车键。 4、在弹出的窗口中&a…