python之异步任务

在 Python 中,异步任务通常通过使用库如 Celery 来实现。Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息,同时提供操作控制。

Celery 中,delayapply_async 是两种常用的方法来调度异步任务。

delay 方法

delayCelery 提供的一个快捷方法,用于简化任务的调用。它会自动将任务标记为异步执行。

from celery import Celeryapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task
def add(x, y):return x + y# 使用 delay 方法调用任务
result = add.delay(4, 6)

apply_async 方法

apply_async 提供了更多的控制选项,例如可以指定任务的执行时间、重试策略等。

from celery import Celeryapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task
def add(x, y):return x + y# 使用 apply_async 方法调用任务
result = add.apply_async((4, 6), countdown=10)  # 任务将在10秒后执行
参数说明
  • args:任务的参数,通常以元组形式传递。
  • kwargs:任务的关键字参数,以字典形式传递。
  • countdown:任务延迟执行的时间(以秒为单位)。
  • eta:任务的预计执行时间(datetime 对象)。
  • expires:任务的过期时间(datetime 对象或秒数)。
  • retry:是否在任务失败时自动重试。
  • retry_policy:重试策略,例如最大重试次数、重试间隔等。

示例1

使用 apply_async 方法来设置任务的各种参数:

from celery import Celery
from datetime import datetime, timedeltaapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task(bind=True, max_retries=3)
def add(self, x, y):try:return x + yexcept Exception as exc:raise self.retry(exc=exc, countdown=5)# 使用 apply_async 方法调用任务
eta = datetime.utcnow() + timedelta(seconds=10)
result = add.apply_async((4, 6), eta=eta, expires=60, retry=True, retry_policy={'max_retries': 5,'interval_start': 0,'interval_step': 0.2,'interval_max': 0.2,
})

任务 add 被设置为在10秒后执行,并且在60秒后过期。如果任务失败,它会自动重试最多5次,每次重试间隔0.2秒。

  • delay 方法是 apply_async 的简化版本,适用于简单的异步任务调用。
  • apply_async 方法提供了更多的控制选项,适用于需要更复杂调度和重试策略的任务。

示例2

假设你有一个自定义的任务基类 CallbackTask,你可以这样定义一个任务:

from celery import Celery, Taskapp = Celery('tasks', broker='pyamqp://guest@localhost//')class CallbackTask(Task):def on_success(self, retval, task_id, args, kwargs):print(f'Task {task_id} succeeded with result: {retval}')def on_failure(self, exc, task_id, args, kwargs, einfo):print(f'Task {task_id} failed with exception: {exc}')@app.task(name='my_custom_task', base=CallbackTask, ignore_result=True)
def add(x, y):return x + y# 调用任务
result = add.delay(4, 6)
  1. 自定义任务基类 CallbackTask

    • on_success 方法:当任务成功完成时调用。
    • on_failure 方法:当任务失败时调用。
  2. 任务定义

    • @app.task(name='my_custom_task', base=CallbackTask, ignore_result=True)
      • name='my_custom_task':任务的自定义名称。
      • base=CallbackTask:任务的基类是 CallbackTask
      • ignore_result=True:任务的结果将不会被存储。
  3. 调用任务

    • result = add.delay(4, 6):异步调用任务 add,传递参数 46

Celery 中,任务的参数通常以元组或字典的形式传递,并且 Celery 会自动处理参数的序列化和反序列化。因此,你通常不需要手动将参数 JSON 化。

参数传递

简单参数
from celery import Celeryapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task
def add(x, y):return x + y# 使用 delay 方法调用任务,传递参数
result = add.delay(4, 6)

在这个示例中,参数 46 被传递给任务 addCelery 会自动处理这些参数的序列化和反序列化。

复杂参数

如果你需要传递更复杂的参数,例如嵌套的字典或列表,Celery 也能处理这些情况:

from celery import Celeryapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task
def process_data(data):# 假设 data 是一个字典return data['key1'] + data['key2']# 使用 apply_async 方法调用任务,传递复杂参数
data = {'key1': 10, 'key2': 20}
result = process_data.apply_async((data,))

在这个示例中,data 是一个字典,Celery 会自动将其序列化并传递给任务 process_data

常见错误

Celery 中,如果尝试传递一个 Django 模型对象作为任务参数,而没有设置适当的序列化和反序列化方法,通常会遇到序列化错误。默认情况下,Celery 使用 JSON 作为序列化格式,而 JSON 不支持直接序列化 Django 模型对象。

如果直接传递一个 Django 模型对象作为任务参数,可能会遇到类似以下的错误:

kombu.exceptions.EncodeError: Object of type <YourModel> is not JSON serializable

这个错误表明 Celery 尝试将 Django 模型对象序列化为 JSON,但失败了,因为 JSON 序列化器不知道如何处理 Django 模型对象。

解决方法

  1. 传递模型对象的主键

    • 传递模型对象的主键(或其他简单类型)作为任务参数,然后在任务内部重新获取模型对象。
    from celery import Celery
    from myapp.models import MyModelapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task
    def process_model_object(model_id):obj = MyModel.objects.get(id=model_id)# 处理对象print(obj)# 调用任务,传递模型对象的主键
    obj = MyModel.objects.first()
    process_model_object.delay(obj.id)
    
  2. 自定义序列化和反序列化

    • 自定义任务参数的序列化和反序列化方法,将模型对象转换为可序列化的格式(如字典)。
    from celery import Celery
    from myapp.models import MyModelapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task
    def process_model_object(model_data):# 反序列化模型对象obj = MyModel(**model_data)# 处理对象print(obj)# 调用任务,传递模型对象的字典表示
    obj = MyModel.objects.first()
    model_data = {'id': obj.id,'field1': obj.field1,'field2': obj.field2,# 其他字段
    }
    process_model_object.delay(model_data)
    
  3. 使用 Pickle 序列化器

    • Celery 支持多种序列化器,包括 Pickle。Pickle 可以序列化几乎所有 Python 对象,但它有安全风险,不建议在不受信任的环境中使用。
    from celery import Celery
    from myapp.models import MyModelapp = Celery('tasks', broker='pyamqp://guest@localhost//')
    app.conf.update(task_serializer='pickle',accept_content=['pickle'],  # Ignore other contentresult_serializer='pickle',
    )@app.task
    def process_model_object(obj):# 处理对象print(obj)# 调用任务,传递模型对象
    obj = MyModel.objects.first()
    process_model_object.delay(obj)
    

总结

  • 传递模型对象的主键:这是最常见和推荐的方法,因为它简单且安全。
  • 自定义序列化和反序列化:适用于需要传递复杂对象的情况。
  • 使用 Pickle 序列化器:虽然方便,但有安全风险,不建议在不受信任的环境中使用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/53584.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OCR技术视角:智能文档管理中的票据自动化识别与处理

在数字化转型的浪潮中&#xff0c;企业对于高效、自动化的文档管理需求日益增长。票据作为企业运营中不可或缺的部分&#xff0c;其识别与管理的智能化成为了提升工作效率的关键。本文将深入探讨智能文档系统中票据识别功能的原理、技术优势以及在不同行业中的应用实践&#xf…

观察者模式observer

允许一个对象将其状态的改变通知其他对象 当Editor调用openFile()和saveFile()时&#xff0c;它会通过EventManager的notify方法&#xff0c;通知所有订阅了这些事件的监听器。

828华为云征文|华为云Flexus X实例部署k3s与kuboard图形化管理工具

828华为云征文&#xff5c;华为云Flexus X实例部署k3s与kuboard图形化管理工具 华为云最近正在举办828 B2B企业节&#xff0c;Flexus X实例的促销力度非常大&#xff0c;特别适合那些对算力性能有高要求的小伙伴。如果你有自建MySQL、Redis、Nginx等服务的需求&#xff0c;一定…

算法分享——《双指针》

文章目录 ✅[《移动零》](https://leetcode.cn/problems/move-zeroes/)&#x1f339;题目描述&#xff1a;&#x1f697;代码实现&#xff1a;&#x1f634;代码解析&#xff1a; ✅[《复写零》](https://leetcode.cn/problems/duplicate-zeros/)&#x1f339;题目描述&#xf…

【React】Vite 构建 React

项目搭建 vite 官网&#xff1a;Vite 跟着文档走即可&#xff0c;选择 react &#xff0c;然后 ts swc。 着重说一下 package-lock.json 这个文件有两个作用&#xff1a; 锁版本号&#xff08;保证项目在不同人手里安装的依赖都是相同的&#xff0c;解决版本冲突的问题&am…

[数据集][目标检测]石油泄漏检测数据集VOC+YOLO格式6633张1类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;6633 标注数量(xml文件个数)&#xff1a;6633 标注数量(txt文件个数)&#xff1a;6633 标注…

吐血整理 ChatGPT 3.5/4.0 新手使用手册~ 【2024.09.04 更新】

以前我也是通过官网使用&#xff0c;但是经常被封号&#xff0c;就非常不方便&#xff0c;后来有朋友推荐国内工具&#xff0c;用了一阵之后&#xff0c;发现&#xff1a;稳定方便&#xff0c;用着也挺好的。 最新的 GPT-4o、4o mini&#xff0c;可搭配使用~ 1、 最新模型科普&…

【Qt】qt发布Release版本,打包.exe可执行文件

前言&#xff1a;Qt编译的可执行程序&#xff0c;如果直接运行&#xff0c;会出现0xc000007b报错&#xff0c;或者“由于占不到Qt5Network.dll,无法继续执行代码。重新安装程序可能会解决此问题”的报错&#xff0c;因为缺少相关的依赖包和动态库。 1、第一步&#xff1a;找到…

从电商行业的变化引出未来技术趋势

时间&#xff1a;2024年09月08日 作者&#xff1a;小蒋聊技术 邮箱&#xff1a;wei_wei10163.com 微信&#xff1a;wei_wei10 音频地址&#xff1a;喜马拉雅 希望大家帮个忙&#xff01;如果大家有工作机会&#xff0c;希望帮小蒋推荐一下&#xff0c;小蒋希望遇到一个认真…

Android 系统源码项目加载预编好的so库

Android 系统源码项目加载预编好的so库 文章目录 Android 系统源码项目加载预编好的so库一、前言二、源码中加载so1、Android.mk加载so加载so的主要相关代码&#xff1a; 2、Android.bp加载so&#xff08;1&#xff09;Android.mk使用源码命令编译成Android.bp&#xff08;2&am…

【C++】STL容器详解【下】

目录 一、list容器 1.1 list基本概念 1.2 lsit构造函数 1.3 list数据元素插入和删除操作 1.4 list大小操作 1.5 list赋值操作 1.6 list数据的存取 1.7 list反转排序 二、set/multiset容器 2.1 set/multiset基本概念 2.2 set构造函数 2.3 set赋值操作 2.4 set大小操…

Hash Table、HashMap、HashSet学习

文章目录 前言Hash Table&#xff08;散列表&#xff09;基本概念散列函数散列冲突&#xff08;哈希碰撞&#xff09;拉链法红黑树时间复杂度分析 HashMap基础方法使用基本的增删改查其他的方法 实现原理 HashSet基础操作去重原理 前言 本文用于介绍关于Hash Table、HashMap、…

Unity解析XML开发随机名字生成模块

概述 在游戏开发中,自动生成具有真实感的角色名字是一项常见的需求,特别是在MMORPG或者RPG游戏中。本篇博客将介绍如何利用Unity结合XML配置文件来实现一个随机名字生成模块。我们将从头开始逐步构建这一功能,并详细解释每一步的技术细节。 技术模块介绍 1. XML解析 XML…

【qt】多线程实现倒计时

1.界面设计 设置右边的intvalue从10开始倒计时 2.新建Thread类 新建Thread类&#xff0c;使其继承QThread类&#xff0c;多态重写run函数&#xff0c;相当于线程执行函数 3.重写run函数 重写run函数&#xff0c;让另一个进程每隔1s发出一个信号&#xff0c;主线程使用conne…

uniapp 全屏日历,动态无限加载

不好用请移至评论区揍我 原创代码,请勿转载,谢谢! 注:本人仅在微信小程序测试过,未在其他app/h5尝试过,按理说应该是可以的,代码没有引用任何第三方组件 日历中每个日期下方的空白部分均可自定义,写在代码中的<view class="item">我是内容</view>…

Prometheus 服务监控

目录 Prometheus 是什么 Prometheus 的特点 Prometheus 的组件 Prometheus 的指标 Prometheus 的使用场景 Prometheus 的安装 方式一&#xff1a;压缩包方式安装 方式二&#xff1a;Docker 方式安装 Prometheus 的 Web UI 面板介绍 Prometheus 目录结构介绍 Prometh…

开源可视化大屏superset Docker环境部署

superset 开源可视化大屏Docker环境部署 前言 superset是俄罗斯开源的一款可视化大屏&#xff0c;用于数据可视化探索&#xff0c;含有丰富的图表组件&#xff0c;可以支持接入各种数据源。 接触superset就是想体验下可视化大屏功能&#xff0c;想最快速度安装成功&#xff…

Linux seq命令

参考资料 知っておくとちょっと便利&#xff01;seq コマンドの使い方をご紹介 目录 一. 基本语法二. 选项2.1 -f 格式化输出2.2 -s 指定分隔符2.3 -w 输出数字补齐宽度 三. 小案例3.1 递减序列3.2 批量创建测试文件3.3 批量下载文件 一. 基本语法 seq [OPTION] 结束值 seq […

简单的Linux Ftp服务搭建

简单的Linux FTP服务搭建 1.需求 公司有一个esb文件传输代理&#xff0c;其中我们程序有文件传输功能&#xff0c;需要将本地文件传输到esb文件代理服务器上&#xff0c;传输成功之后发送http请求&#xff0c;告知esb将固定文件进行传输到对应外围其他服务的文件目录中&#…

unity GridLayoutGroup真正的居中

GridLayoutGroup默认的居中效果: 不是真正的居中 加上代码: namespace UnityEngine.UI {/// <summary>/// GridLayoutGroup拓展&#xff0c;使支持自定义内容/// </summary>internal class GridLayoutGroupEx : GridLayoutGroup{/// <summary>/// 启用居中/…