celery是封装记录

目录

入口主程序

celery配置文件

config/config.conf(可取消)

redis 配置

nanny 保姆文件


入口主程序

celery启动时调用的主程序

tasks.py

#! /usr/bin/env python                                                                                                                                                                   
# -*- coding: utf-8 -*-from celery import Celeryfrom config.config import CeleryConfig
from share.log_config import setup_log# 创建 Celery 应用
setup_log()aaa_app = Celery('attack_app',include=['nanny',]
)
aaa_app.config_from_object(CeleryConfig)if __name__ == '__main__':aaa_app.worker_main()

celery配置文件

设置celery 参数 与 redis 连接

config/config.py

#! /usr/bin/env python
# -*- coding: utf-8 -*-import os
import redisfrom share.app_settings import AppSettingclass RedisConfig():flag = AppSetting().get_config_value("log", "debug")DEBUG = True if flag != "False" else FalseENV_REDIS_HOST = os.getenv('REDIS_URL')#host = ENV_REDIS_HOST if ENV_REDIS_HOST else ('192.168.0.156' if DEBUG else '192.168.154.247')host = ENV_REDIS_HOST if ENV_REDIS_HOST else ('127.0.0.1' if DEBUG else '127.0.0.1')port = 6379password = '123456' if DEBUG else '123456'cache_db = 2borker = 0backend = 1rc = redis.StrictRedis(host=host, port=port, db=cache_db,password=password)class CeleryConfig(object):BROKER_URL = f"redis://:{RedisConfig.password}@{RedisConfig.host}:{RedisConfig.port}/{RedisConfig.borker}"    # borkerCELERY_RESULT_BACKEND = f"redis://:{RedisConfig.password}@{RedisConfig.host}:{RedisConfig.port}/{RedisConfig.backend}"   # backendCELERY_TASK_SERIALIZER = 'json'  # " json从4.0版本开始默认json,早期默认为pickle(可以传二进制对象)CELERY_RESULT_SERIALIZER = 'json'CELERY_ACCEPT_CONTENT = ['json']CELERY_ENABLE_UTC = True  # 启用UTC时区设置CELERY_TIMEZONE = 'Asia/Shanghai'  # 上海时区CELERYD_MAX_TASKS_PER_CHILD = 1  # 每个进程最多执行1个任务后释放进程(再有任务,新建进程执行,解决内存泄漏)WORKER_HIJACK_ROOT_LOGGER = False

config/config.conf(可取消)

[log]
debug = False
path  = logs/celery.tasks.log

redis 配置

主要设置密码,举例

config/redis.conf

requirepass 123456

nanny 保姆文件

在入口文件中 include 中

用于异步动态调用模块使用

这里注意一处注释的代码

@aaa_app.task(base=CallbackTask, ignore_result=True)

ignore_result参数:如果使用该参数,则当调用 load_and_run_plugin.delay(module_path, data, task_meta) 函数时,返回值将无法获取

res = load_and_run_plugin.delay(module_path, data, task_meta)
result = res.get()

头文件中的 sys.path.append("./") 必不可缺,否则动态调用模块无法获取正确的路径

nanay.py

#! /usr/bin/env python                                                                                                                                                                   
# -*- coding: utf-8 -*-import os
import sys
import json
import logging
import traceback
import importlib
from celery import Taskfrom tasks import aaa_appsys.path.append("./")
logger = logging.getLogger("log")class CallbackTask(Task):"""exc     : 失败时的错误的类型task_id : 任务的id;args    : 任务函数的参数kwargs  : 键值对参数einfo   : 失败或重试时的异常详细信息retval  : 任务成功执行的返回值"""def on_success(self, retval, task_id, args, kwargs):pass"""aaa_app.send_task("task_manager.load_worker_task_result",args=({"retval": retval, "task_id": task_id, "args": args, "kwargs": kwargs}, "success"),queue="queue_task_manager")"""def on_failure(self, exc, task_id, args, kwargs, einfo):pass"""attack_app.send_task("task_manager.load_worker_task_result",args=({"exc": exc, "task_id": task_id, "args": args, "kwargs": kwargs, "einfo": einfo}, "failure"),queue="queue_task_manager")"""#@aaa_app.task(base=CallbackTask, ignore_result=True)
@aaa_app.task(base=CallbackTask)
def load_and_run_plugin(module, data, task_meta, func="run"):model_obj = importlib.import_module(module)print(f'开始获取{module}插件的{func}方法')_func = getattr(model_obj, func, None)if _func:try:print(f'开始运行{module}.{func}方法,入参:{data}')print(f"task_meta:{task_meta}")module_result = dict()module_result["result"] = _func(data, task_meta)except Exception  as err:errortrace = traceback.format_exc()module_result["status"] = Falsemodule_result["errinfo"] = f"{module}.{func}出错,错误信息:{errortrace}"print(f"{module}.{func}出错,错误信息:{errortrace}")logger.error(f"{module}.{func}出错,错误信息:{errortrace}")else:module_result["status"] = Falsemodule_result["errinfo"] = f"{module}不存在方法{func},请检查插件"logger.error(f"{module}不存在方法{func},请检查插件")return module_result

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/237945.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

防火墙安全策略

目录 一、防火墙种类 二、防火墙流量控制手段 1、包过滤技术(传统) 2、状态检测技术 (1)、状态检测机制 三、安全实验 1、拓扑 2、需求 3、配置思路 4、关键配置截图 5、验证 一、防火墙种类 对于防火墙来说就是针对哪…

选型前必看,CRM系统在线演示为什么重要?

在CRM挑选环节中,假如企业需要深入了解CRM管理系统的功能和功能,就需要CRM厂商提供在线演示。简单的说,就是按照企业的需要,检测怎样通过CRM进行。如今我们来谈谈CRM在线演示的作用。 在线演示 1、了解CRM情况 熟悉系统功能&…

阿里云oss存储器图片上传功能

controller层 RestController RequestMapping("/upload") public class FileUploadController {PostMappingpublic Result upload(MultipartFile file) throws IOException {String orgFilename file.getOriginalFilename();//名字组合String filename UUID.rand…

姿态识别、目标检测和跟踪的综合应用

引言: 近年来,随着人工智能技术的不断发展,姿态识别、目标检测和跟踪成为了计算机视觉领域的热门研究方向。这三个技术的综合应用为各个行业带来了巨大的变革和机遇。本文将分别介绍姿态识别、目标检测和跟踪的基本概念和算法,并探…

如何添加jar包到本地Maven项目中

在 Maven 中添加一个外部 JAR 包的依赖&#xff0c;你需要使用 Maven 的 <dependency> 元素来指定该 JAR 包的坐标信息。以下是具体的步骤&#xff1a; 将 JAR 包手动添加到 Maven 本地仓库&#xff1a; 首先&#xff0c;确保将外部 JAR 包手动添加到 Maven 本地仓库。可…

基于Java开发的微信约拍小程序

一、系统架构 前端&#xff1a;vue | element-ui 后端&#xff1a;springboot | mybatis 环境&#xff1a;jdk8 | mysql8 | maven | mysql 二、代码及数据库 三、功能说明 01. 首页 02. 授权登录 03. 我的 04. 我的-编辑个人资料 05. 我的-我的联系方式 06. …

Java——PriorityQueue用法(实现最大堆)

Java——PriorityQueue用法 PriorityQueue 是 Java 中基于优先级堆实现的一个队列&#xff0c;可以用来存储一组元素&#xff0c;并按照一定的优先级顺序访问这些元素。其中&#xff0c;优先级高的元素会被先访问。 PriorityQueue 类位于 java.util 包中&#xff0c;是一个泛…

等待队列头实现阻塞 IO(BIO)

文章目录 等待队列头实现阻塞 IO(BIO)模型等待队列头init_waitqueue_headDECLARE_WAIT_QUEUE_HEAD 等待队列项使用方法驱动程序应用程序模块使用参考 等待队列头实现阻塞 IO(BIO) 等待队列是内核实现阻塞和唤醒的内核机制。 等待队列以循环链表为基础结构&#xff0c;链表头和…

苹果如何从iCloud恢复备份?正确方法看这里!

iCloud为所有苹果用户免费提供5G内存空间&#xff0c;用户可以将照片、短信、联系人、备忘录等重要信息备份到iCloud云端&#xff0c;这样可以方便在不同设备之间同步和共享。 同时&#xff0c;iCloud保证这些数据在所有苹果设备上及时自动更新。当遇到手机数据丢失时&#xf…

构建搜索引擎,而非向量数据库(Vector DB) [译]

原文&#xff1a;Build a search engine, not a vector DB 作者&#xff1a; Panda Smith 在过去 12 个月中&#xff0c;我们见证了向量数据库&#xff08;Vector DB&#xff09;创业公司的迅猛增长。我此刻并不打算深入探讨它们各自的设计取舍。相反&#xff0c;我更想探讨和…

做外贸多想一步,多走一步

最近在网上给小儿买了一个液晶画画板&#xff0c;自从告诉小儿已经购物需要耐心等待之后&#xff0c;几乎每天小儿要询问几遍&#xff0c;快递到哪里了&#xff1f; 好不容易盼到了&#xff0c;结果打开一看却是个坏的&#xff0c;虽然外包装是好的&#xff0c;但是明显这个快…

数据库客户案例:每个物种都需要一个数据库!

1、GERDH——花卉多组学数据库 项目名称&#xff1a;GERDH&#xff1a;花卉多组学数据库 链接地址&#xff1a;https://dphdatabase.com 项目描述&#xff1a;GERDH包含了来自150多种园艺花卉植物种质的 12961个观赏植物。将不同花卉植物转录组学、表观组学等数据进行比较&am…

读《文明之光》第四册总结

今天来给大家分享一下【吴军】老师的《文明之光》&#xff0c;该书全套共四册&#xff0c;今天给大家分享的是第四册。 人总是要有些理想和信仰。初读这本书&#xff0c;就被本书的第一句话说感动过。 当人们问起我的理想时&#xff0c;我就给他们讲…

Linux | 数据结构之内核链表

Linux | 数据结构之内核链表 时间:2023年12月20日15:42:45 文章目录 Linux | 数据结构之内核链表1.参考2.内核链表2-1.源码2-2.节点类型2-3.内核链表相关算法2-3-1.初始化`2-3-1-1`.宏的实现2-3-1-2.内联函数的实现2-3-2.插入`2-3-2-1`.将new指向的结点插入到head指向的结点后…

Patreon怎么订阅付款?Patreon会员订阅付款保姆级教程,用虚拟VISA卡订阅Patreon作者艺术家

Patreon 是目前世界上最受欢迎的会员平台之一。 内容创作者和艺术家通常很难让粉丝在经济上支持他们。 通过使用像 Patreon 这样的平台&#xff0c;创作者和艺术家可以很容易地从他们的作品中获得报酬。粉丝也能更方便的支持他们&#xff0c;今天就教大家如何订阅Patreon 首先我…

MySQL 5.6较上一版本的关键升级

MySQL 5.6是一个主要的版本发布&#xff0c;它在性能、可伸缩性、可靠性和可用性方面引入了多项重要改进和新特性。它在2013年发布&#xff0c;相比于它的前身MySQL 5.5&#xff0c;MySQL 5.6带来了以下关键升级&#xff1a; 优化的InnoDB存储引擎&#xff1a;MySQL 5.6中的Inn…

8.点云获取和数据处理(python)

点云数据获取和处理的代码如下&#xff1a; 一、用DBSCAN聚类的方法处理点云数据 通过设置点云坐标的最大聚类对点云坐标进行归类&#xff0c;再将相同类的坐标求均值&#xff08;中心点坐标&#xff09;&#xff0c;这些均值坐标通过手眼标定的转换矩阵转换为二维的相机坐标&…

k8s中Chart的命名模板

Chart的命名模板 命名模板有时候也被称为部分或子模板。 相对于 deployment.yaml 这种主模板&#xff0c;命名模板只是定义部分通用内容&#xff0c;然后在各个主模板中调用。 templates目录下有个_helpers.tpl文件。公共的命名模板都放在这个文件里。 命名模板使用 define…

Python中最常用的10个内置函数!

Python作为一种多用途编程语言&#xff0c;拥有丰富的内置函数库&#xff0c;这些函数可以极大地提高开发效率。本文将介绍Python中最常用的10个内置函数&#xff0c;它们的功能各有不同&#xff0c;但在实际编程中经常派上用场。我们将深入了解每个函数&#xff0c;并提供示例…

解锁高效工作!5款优秀工时管理软件推荐

工时管理&#xff0c;一直是让许多企业和团队头疼的问题。传统的纸质工时表、复杂的电子表格&#xff0c;不仅操作繁琐&#xff0c;还容易出错。幸好&#xff0c;随着科技的进步&#xff0c;我们迎来了工时管理软件的春天。今天&#xff0c;就让我们一起走进这个新时代&#xf…