目录
入口主程序
celery配置文件
config/config.conf(可取消)
redis 配置
nanny 保姆文件
入口主程序
celery启动时调用的主程序
tasks.py
#! /usr/bin/env python
# -*- coding: utf-8 -*-from celery import Celeryfrom config.config import CeleryConfig
from share.log_config import setup_log# 创建 Celery 应用
setup_log()aaa_app = Celery('attack_app',include=['nanny',]
)
aaa_app.config_from_object(CeleryConfig)if __name__ == '__main__':aaa_app.worker_main()
celery配置文件
设置celery 参数 与 redis 连接
config/config.py
#! /usr/bin/env python
# -*- coding: utf-8 -*-import os
import redisfrom share.app_settings import AppSettingclass RedisConfig():flag = AppSetting().get_config_value("log", "debug")DEBUG = True if flag != "False" else FalseENV_REDIS_HOST = os.getenv('REDIS_URL')#host = ENV_REDIS_HOST if ENV_REDIS_HOST else ('192.168.0.156' if DEBUG else '192.168.154.247')host = ENV_REDIS_HOST if ENV_REDIS_HOST else ('127.0.0.1' if DEBUG else '127.0.0.1')port = 6379password = '123456' if DEBUG else '123456'cache_db = 2borker = 0backend = 1rc = redis.StrictRedis(host=host, port=port, db=cache_db,password=password)class CeleryConfig(object):BROKER_URL = f"redis://:{RedisConfig.password}@{RedisConfig.host}:{RedisConfig.port}/{RedisConfig.borker}" # borkerCELERY_RESULT_BACKEND = f"redis://:{RedisConfig.password}@{RedisConfig.host}:{RedisConfig.port}/{RedisConfig.backend}" # backendCELERY_TASK_SERIALIZER = 'json' # " json从4.0版本开始默认json,早期默认为pickle(可以传二进制对象)CELERY_RESULT_SERIALIZER = 'json'CELERY_ACCEPT_CONTENT = ['json']CELERY_ENABLE_UTC = True # 启用UTC时区设置CELERY_TIMEZONE = 'Asia/Shanghai' # 上海时区CELERYD_MAX_TASKS_PER_CHILD = 1 # 每个进程最多执行1个任务后释放进程(再有任务,新建进程执行,解决内存泄漏)WORKER_HIJACK_ROOT_LOGGER = False
config/config.conf(可取消)
[log]
debug = False
path = logs/celery.tasks.log
redis 配置
主要设置密码,举例
config/redis.conf
requirepass 123456
nanny 保姆文件
在入口文件中 include 中
用于异步动态调用模块使用
这里注意一处注释的代码
@aaa_app.task(base=CallbackTask, ignore_result=True)
ignore_result参数:如果使用该参数,则当调用 load_and_run_plugin.delay(module_path, data, task_meta) 函数时,返回值将无法获取
res = load_and_run_plugin.delay(module_path, data, task_meta)
result = res.get()
头文件中的 sys.path.append("./") 必不可缺,否则动态调用模块无法获取正确的路径
nanay.py
#! /usr/bin/env python
# -*- coding: utf-8 -*-import os
import sys
import json
import logging
import traceback
import importlib
from celery import Taskfrom tasks import aaa_appsys.path.append("./")
logger = logging.getLogger("log")class CallbackTask(Task):"""exc : 失败时的错误的类型task_id : 任务的id;args : 任务函数的参数kwargs : 键值对参数einfo : 失败或重试时的异常详细信息retval : 任务成功执行的返回值"""def on_success(self, retval, task_id, args, kwargs):pass"""aaa_app.send_task("task_manager.load_worker_task_result",args=({"retval": retval, "task_id": task_id, "args": args, "kwargs": kwargs}, "success"),queue="queue_task_manager")"""def on_failure(self, exc, task_id, args, kwargs, einfo):pass"""attack_app.send_task("task_manager.load_worker_task_result",args=({"exc": exc, "task_id": task_id, "args": args, "kwargs": kwargs, "einfo": einfo}, "failure"),queue="queue_task_manager")"""#@aaa_app.task(base=CallbackTask, ignore_result=True)
@aaa_app.task(base=CallbackTask)
def load_and_run_plugin(module, data, task_meta, func="run"):model_obj = importlib.import_module(module)print(f'开始获取{module}插件的{func}方法')_func = getattr(model_obj, func, None)if _func:try:print(f'开始运行{module}.{func}方法,入参:{data}')print(f"task_meta:{task_meta}")module_result = dict()module_result["result"] = _func(data, task_meta)except Exception as err:errortrace = traceback.format_exc()module_result["status"] = Falsemodule_result["errinfo"] = f"{module}.{func}出错,错误信息:{errortrace}"print(f"{module}.{func}出错,错误信息:{errortrace}")logger.error(f"{module}.{func}出错,错误信息:{errortrace}")else:module_result["status"] = Falsemodule_result["errinfo"] = f"{module}不存在方法{func},请检查插件"logger.error(f"{module}不存在方法{func},请检查插件")return module_result