Celery知识

celery介绍

# celery 的概念:
    * 翻译过来是芹菜
    * 官网:https://docs.celeryq.dev/en/stable/
# 是分布式的异步任务框架:
    分布式:一个任务,拆成多个任务在不同机器上做
    异步任务:后台执行,不阻塞主任务
    框架:集成到项目中
# 作用:
1、异步任务:异步()发邮件,短信,通知
2、延迟任务:延迟几秒 再执行某个任务
                订单提交后,延迟半小时,把订单取消
3、定时任务 :每隔多长事件 执行某个任务,比如定时更新缓存

        eg.买了个会员,会员快到期了会每天给你发短信提醒

celery架构

# django  是一个服务,celery 也是是一个服务,和django没有必然联系
    -命令启动,就能提供服务
# 三个模块:
        1、broker:消息中间件,消息队列,任务中间件
             存储任务(函数):发送短信任务,统计在线人数...
              redis:  使用字符串形式,能把任务表示出来即可

              reabbitmq 存储:  其实就是一个队列,一个个任务任务
        2、worker:任务执行单元,可以启动多个
              从消息队列(broker的redis)取出任务然后去执行程序(进程)
        3、backend:结果存储  Result Stores
              任务执行完成后的结果存储在这里
              redis存储,关系型数据库。。
# 执行流程:
        1、其他程序提交任务(函数),任务序列化后存到celery的broker
                        用redis   0 库
        2、接下来:worker执行,从broker中取任务,然后执行
        3、任务执行完后,把结果存到 bancked中
                        用redis   1库


# 注意:
    celery和其他程序是 独立运行
        1、可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
        2、celery服务为为其他项目服务提供异步解决任务需求的
注:会有两个服务同时运行,一个是项目服务(django),一个是celery服务

项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求

# 例子:

    人(django)是一个独立运行的服务 | 医院(celery)也是一个独立运行的服务
        正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题,
        人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求

celery快速使用

# 使用步骤
1、安装:pip install celery
2、写个py文件 demo.py

from celery import Celery
import time
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery('app', broker=broker, backend=backend)
# 写任务---》写函数---》必须用 @app.task 装饰---》装饰后,就变成了celery的任务了
@app.task
def hello():time.sleep(2)  # 模拟任务延迟return 'hello world'
@app.task
def add(a, b):time.sleep(3)return a + b

3、其他程序中,提交任务

res=add.delay(4,5)
print(res)

4、 启动worker---worker启动可以再靠前
 win运行:
        pip3 install eventlet
        celery -A demo worker -l info -P eventlet

 非win运行:mac linux
        celery -A demo  worker -l info

5、查询结果
---直接取redis中查
---使用代码查询

from demo import app
from celery.result import AsyncResult
id = '17bf03ad-a1e6-49d1-a182-794bd3e96b74'
if __name__ == '__main__':a = AsyncResult(id=id, app=app)if a.successful():result = a.get() # hello worldprint(result)elif a.failed():print('任务失败')elif a.status == 'PENDING':print('任务等待中被执行')elif a.status == 'RETRY':print('任务异常后正在重试')elif a.status == 'STARTED':print('任务已经开始被执行')

celery包结构

# 包结构 目录如下:
        --celery_task
                    --celery.py
                    --user_task.py
                    --order_task.py
                    --goods_task.py
    
-其他程序中提交任务: add_task_package.py
-其他程序中查询结果:get_result_package.py
###具体步骤

# celery.py  
from celery import Celery
#######1 实例化得到对象 ##########
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery('app', broker=broker, backend=backend,include=['celery_task.order_task','celery_task.user_task'])
#######2 写任务 ##########以后各种类型任务,单独写在py文件中
# order_task.py   user_task.py 
# order_task.py
import time
from .celery import app
@app.task
def cancel_order(order_id):time.sleep(2)return '订单:%s取消成功' % order_id
# user_task.py
import time
from .celery import app
@app.task
def send_sms(phone, code):time.sleep(1)return '手机号:%s,发送验证码:%s,成功' % (phone, code)
######## 3 其他程序,提交任务 ##############################
from celery_task.user_task import send_sms
res=send_sms.delay('1893424323',8888)
print(res)

##### 4 启动worker########## 在包的一层,执行包,不需要具体到某个py文件了
win运行:pip3 install eventlet
        A celery_demo 包名----因为他会去包下找 celery.py 中得app执行
        celery -A celery_task worker -l info -P eventlet

非win运行:mac linux
        celery -A celery_task  worker -l info

#####5 查询结果#####
# 使用代码,查询结果
from celery_task.celery import app
from celery.result import AsyncResultid = '46b26c73-62ae-403c-ba62-e469f2f8c69f'
if __name__ == '__main__':a = AsyncResult(id=id, app=app)if a.successful():result = a.get() # hello worldprint(result)elif a.failed():print('任务失败')elif a.status == 'PENDING':print('任务等待中被执行')elif a.status == 'RETRY':print('任务异常后正在重试')elif a.status == 'STARTED':print('任务已经开始被执行')

celery实现异步任务,定时任务,延迟任务

# 异步任务:
        任务名.delay(传参数)

# 延迟任务---延迟多长事件干事

        点右键执行,work运行

# 链接上面任务二的order函数
from celery_task.order_task import cancel_order
from datetime import datetime, timedelta# atetime.utcnow() 当前utc时间
eta = datetime.utcnow() + timedelta(seconds=15)
res = cancel_order.apply_async(args=['10001',], eta=eta)  # 订单+15s 后执行这个任务
print(res)

# 定时任务--一定要启动beat
     1、 在celery.py 中写

# 链接上面任务二的user_task函数
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 不使用UTC时间
app.conf.enable_utc = False
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {'send_sms': {'task': 'celery_task.user_task.send_sms',  #  执行的任务函数 'schedule': timedelta(seconds=3),    # 每隔三秒钟干# 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点'args': ('1896388888', '6666'),     # 传参数,手机号\验证码}
}

2、启动worker:celery -A celery_task worker -l info -P eventlet
3、启动beat(每个一段时间,就提交任务):celery -A celery_task beat  -l info

4、等待即可

django中使用celery

# 两种方案:
    -通用方案:自己封装
    -django-celery--》app---》创建出一些表
# 自己封装的通用方案:
1、把封装的包:celery_task 复制到项目中
2、在django中使用celery.py 中必须加入
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')
    3 写任务,启动worker
    4 在django的视图类中,异步调用即可

# celery_task/celery.py
from celery import Celery
import os# 任务里使用django的东西:缓存,表模型。。。必须加入
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffy_api.settings.dev')# 1 实例化得到对象
broker = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'
app = Celery('app', broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task'])# 2 写任务,以后各种类型任务,单独写在py文件中# 定时任务
app.conf.timezone = 'Asia/Shanghai'    # 时区
app.conf.enable_utc = False    # 不使用UTC# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontabapp.conf.beat_schedule = {
}
# celery_task/user_task.py
import time
from .celery import app
from libs.send_information import common_send_sms@app.task
def send_sms(phone, code):res = common_send_sms(code, mobile=phone)if res:return '短信发送成功:%s' % phoneelse:return '短信发送失败:%s' % phone
# user/views.py# 发送短信接口@action(methods=['get'], detail=False, url_path='send_information')def send_sms(self, request):try:mobile = request.query_params['mobile']  # 取的手机号放在请求地址栏中code = get_code()  # 生成验证码cache.set('sms_code_%s' % mobile, code, 61)  # 放在缓存中,以手机号做区# 异步发送短信--不管是否成功--如果不成功,用户再发一次即可# t = Thread(target=common_send_sms, args=[code, mobile])# t.start()  # 启动线程发送短信# celery异步发送短信send_sms.delay(mobile, code)return APIResponse(msg='短信已发送')except MultiValueDictKeyError as e:raise APIException(detail='手机号必须携带')except Exception as e:raise APIException(detail=str(e))

双写一致性(缓存问题)

# 轮播图加缓存
    出现问题:banner表中数据变了,缓存不会变
    mysql和redis数据不一致: mysql和redis双写一致性
# 双写一致性的解决方案:
    1、mysql修改---删缓存
    2、mysql修改---改缓存
    3、定时更新---每个5s,更新一次缓存
             先删缓存,在更新mysql
            先改缓存,再更新mysql
# 轮播图的接口---使用定时更新,解决双写一致性问题

# celery_task/home_task.p
import time
from .celery import app
from home.models import Banner
from django.conf import settings
from home.serializer import BannerSerializer
from django.core.cache import cache@app.task
def update_banner():# 1 获取所有轮播图数据queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]# 2 序列化ser = BannerSerializer(instance=queryset, many=True)# ser.data# 2.1 把服务端前缀拼接上for item in ser.data:# media/banner/banner1.png,item['image'] = settings.BACKEND_URL + item['image']# 3 放到缓存cache.set('banner_list', ser.data)return '更新成功'
# celery.py
app = Celery('app', broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task','celery_task.home_task'])app.conf.beat_schedule = {'update_banner': {'task': 'celery_task.home_task.update_banner',# 'schedule': timedelta(seconds=5),'schedule': timedelta(minutes=3),'args': (),},
}

# 启动worker:celery -A celery_task worker -l info -P eventlet

    启动beat:celery -A celery_task beat  -l info
# 以后尽管改 mysql数据,最多3分钟就会更新到最新了

异步秒杀方案

# 秒杀功能:
      并发量要高:承载住很多用户同时操作
                订单表
                扣减库存
            效率要高


# 同步秒杀
    假设秒杀需要10s钟,项目并发量是3,总共5个商品要秒杀
    10s内,只有3个人能进入到系统,并且开始秒杀

# 前端
const routes = [{path: '/',name: 'home',component: HomeView},{path: '/seckill',name: 'seckill',component: SeckillView},
]
<template><div><Header></Header><div style="padding: 50px;margin-left: 100px"><h1>Go语言课程</h1><img src="http://photo.liuqingzheng.top/2023%2002%2022%2021%2057%2011%20/image-20230222215707795.png"height="300px"width="300px"><br><el-button type="danger" @click="handleSeckill" v-loading.fullscreen.lock="fullscreenLoading">秒杀课程</el-button></div><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><br><Footer></Footer></div>
</template><script>
import Header from '@/components/Header'
import Footer from '@/components/Footer'export default {name: "SckillView",data() {return {fullscreenLoading: false,task_id: '',t: null,}},methods: {// ##############同步秒杀##############// handleSeckill() {//   this.fullscreenLoading = true;//   this.$axios({//     url: '/user/seckill/seckill/',//     method: 'POST',//     data: {//       course_id: '99'//     }//   }).then(res => {//     this.fullscreenLoading = false;//     this.$message.success(res.msg)//   }).catch(res => {//     this.fullscreenLoading = false;//     this.$message.error(res)//   })// }// ##############同步秒杀##############// ##############异步秒杀##############handleSeckill() {this.fullscreenLoading = true;this.$axios({url: '/user/seckill/seckill/',method: 'POST',data: {course_id: '99'}}).then(res => {// 在排队,转圈的,还需要继续显示this.$message.success(res.msg)this.task_id = res.task_id// 继续发送请求---》查询是否秒杀成功:1 成功 2 没成功  3 秒杀任务还没执行// 启动定时任务,没隔1s,向后端发送一次请求this.t = setInterval(() => {this.$axios({url: '/user/seckill/get_result/',method: 'get',params: {task_id: this.task_id}}).then(res => {// 100 成功,success : 1 成功  0 失败  2 还没开始if (res.success == '1') {// 转圈框不显示this.fullscreenLoading = false;// 停止定时任务clearInterval(this.t)this.t = nullthis.$message.success(res.msg)} else if (res.success == '0') {// 转圈框不显示this.fullscreenLoading = false;// 停止定时任务clearInterval(this.t)this.t = nullthis.$message.error(res.msg)} else {// this.$message.error(res.msg)console.log(res.msg)}})}, 1000)}).catch(res => {this.fullscreenLoading = false;this.$message.error(res)})}},components: {Header, Footer}
}
</script><style scoped></style>
# 后端
# 秒杀功能
import randomfrom celery_task.order_task import seckill
from celery_task.celery import app
from celery.result import AsyncResultclass SeckillView(ViewSet):# 同步操作,性能不高# 异步提交任务@action(methods=['POST'], detail=False)def seckill(self, request, *args, **kwargs):course_id = request.data.get('course_id')task_id = seckill.delay(course_id)return APIResponse(msg='您正在排队', task_id=str(task_id))@action(methods=['GET'], detail=False)def get_result(self, request, *args, **kwargs):task_id = request.query_params.get('task_id')a = AsyncResult(id=task_id, app=app)if a.successful():result = a.get()  # True 和 Falseif result:return APIResponse(success='1', msg='秒杀成功')else:return APIResponse(success='0', msg='秒杀失败')elif a.status == 'PENDING':print('任务等待中被执行')return APIResponse(success='2', msg='任务等待中被执行')else:return APIResponse(success='3', msg='秒杀任务正在执行')
# 任务
import random
import time@app.task
def seckill(course_id):print('根据课程id:%s,查询课程是否还有剩余,耗时2s' % course_id)time.sleep(2)res = random.choice([True, False])if res:  # 库存够print('扣减库存,耗时1s')time.sleep(1)print('下单,耗时2s')time.sleep(2)return Trueelse:return False
# 路由
router.register('seckill', SeckillView, 'seckill')

今日思维导图:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/742297.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【开源】SpringBoot框架开发软件学院思政案例库系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 系统管理员2.2 普通教师 三、系统展示四、核心代码4.1 查询思政案例4.2 审核思政案例4.3 查询思政课程4.4 思政案例点赞4.5 新增思政案例评语 五、免责说明 一、摘要 1.1 项目介绍 基于JAVAVueSpringBootMySQL的软件学…

Mysql8.0.30数据data目录文件解释

数据库内存和磁盘架构 data目录展示 [rootDESKTOP-9ADRUGP data]# pwd /usr/local/software/mysql/3312/data [rootDESKTOP-9ADRUGP data]# ls -l total 96616 -rw-r----- 1 systemd-coredump input 56 Jul 24 2023 auto.cnf -rw-r----- 1 systemd-coredump input 30…

数据库基础理论知识

1.基本概念 数据(Data)&#xff1a;数据库存储的基本对象。数字、字符串、图形、图像、音频、视频等数据库(DB)&#xff1a;在计算机内&#xff0c;永久存储、有组织、可共享的数据集合数据库管理系统(DBMS)&#xff1a;管理数据库的系统软件数据库系统(DBS)&#xff1a;DBDBM…

浏览器的工作原理

从输入一个url到页面加载完成&#xff0c;中间都发生了什么&#xff1f; 参考原文地址 首先在浏览器地址栏输入一个地址并回车之后&#xff0c; 1. DNS查找 浏览器会进行DNS查找&#xff0c;把域名https://example.com转化为真实的IP地址10.29.33.xx&#xff0c;根据IP地址找…

linux驱动——中断

1.Cortex-A系列的中断的简介 中断的基本概念&#xff1a;(interrupt) 中断本质上是系统内部的异常机制,当中断产生之后&#xff0c;他会停下当前正在执行的任务&#xff0c;转而去做其他的事情,在停下当前正在执行的任务之前,要先入栈&#xff08;保护现场,其他的事情做完之后…

Mysql/Redis缓存一致性

如何保证MySQL和Redis的缓存一致。从理论到实战。总结6种来感受一下。 理论知识 不好的方案 1.先写MySQL&#xff0c;再写Redis 图解说明: 这是一幅时序图&#xff0c;描述请求的先后调用顺序&#xff1b; 黄色的线是请求A&#xff0c;黑色的线是请求B&#xff1b; 黄色的…

TYPE C模拟耳机POP音产生缘由

关于耳机插拔的POP音问题&#xff0c;小白在之前的文章中讲述过关于3.5mm耳机的POP音产生原因。其实这类插拔问题的POP音不仅仅存在于3.5mm耳机&#xff0c;就连现在主流的Type C模拟耳机的插拔也存在此问题&#xff0c;今天小白就来讲一讲这类耳机产生POP音的缘由。 耳机左右…

两个笔记本如何将一个笔记本作为另一个笔记本的拓展屏

需求是有两个笔记本&#xff0c;一个笔记本闲置&#xff0c;另一个笔记本是主力本。想将另一个闲置的笔记本连接到主力本上作为拓展屏使用。网上搜了好久&#xff0c;有一些人提到了&#xff0c;也有一些视频但是文章比较少。简单总结一下吧 上述需求有两种方式 第一种&#x…

浅谈Redis 的 保护模式(protected-mode)

今天在一台服务器上面部署了redis,发现始终无法用工具远程连接,项目里面是正常的,就是工具不行,防火墙也关闭了.折腾了一会才突然想起来,是不是触发了保护模式. 什么时候触发保护模式protected-mode: 同时满足以下两个: 1.bind未指定ip 2.未配置密码 解决方案: 编辑redis…

基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的交通标志识别系统详解(深度学习模型+UI界面代码+训练数据集)

摘要&#xff1a;本篇博客详细介绍了利用深度学习构建交通标志识别系统的过程&#xff0c;并提供了完整的实现代码。该系统采用了先进的YOLOv8算法&#xff0c;并与YOLOv7、YOLOv6、YOLOv5等早期版本进行了性能评估对比&#xff0c;分析了性能指标如mAP、F1 Score等。文章深入探…

计算机组成原理实验报告1 | 实验1.1 运算器实验(键盘方式)

本文整理自博主大学本科《计算机组成原理》课程自己完成的实验报告。 —— *实验环境为学校机房实验箱。 目录 一、实验目的 二、实验内容 三、实验步骤及实验结果 Ⅰ、单片机键盘操作方式实验 1、实验连线&#xff08;键盘实验&#xff09; 2、实验过程 四、实验结果的…

代码随想录-java-栈与队列总结

栈&#xff08;Stack&#xff09;&#xff1a;是只允许在一端进行插入或删除的线性表。栈是一种线性表&#xff0c;限定这种线性表只能在某一端进行插入和删除操作。进行操作的这一端称为栈顶。 队列&#xff08;Queue&#xff09;是只允许在一端进行插入操作&#xff0c;而在另…

Python使用FastAPI提供图片缩略图生成接口

使用pillow的thumbnail生成缩略图时&#xff0c;会保持原图的宽高比&#xff1b;使用的opencv的resize则不会 具体代码如下&#xff1a; #!/usr/bin/env python import re import sys from enum import Enum from io import BytesIO from pathlib import Path from typing im…

汇编课设——秒表2

1. 设计要求 基于 51 开发板,利用键盘作为按键输入,将数码管作为显示输出,实现电子秒表。 功能要求: (1)计时精度达到百分之一秒; (2)能按键记录下5次时间并通过按键回看 (3)设置时间,实现倒计时,时间到,数码管闪烁 10 次,并激发蜂鸣器,可通过按键解除。 2. 设计思…

思科网络中如何进行动态NAT配置

一、什么是动态NAT&#xff1f;动态NAT与静态NAT的区别是什么&#xff1f; &#xff08;1&#xff09;动态NAT&#xff08;Network Address Translation&#xff09;是一种网络地址转换技术&#xff0c;它会动态地将内部私有网络中的局域网IP地址映射为公共IP地址&#xff0c;…

Hack The Box-Codify

目录 信息收集 rustscan nmap dirsearch WEB 提权 get user get root 信息收集 rustscan ┌──(root㉿ru)-[~/kali/hackthebox] └─# rustscan -b 2250 10.10.11.239 --range0-65535 --ulimit4500 -- -A -sC .----. .-. .-. .----..---. .----. .---. .--. .-. …

JVM 类的加载篇

我们都知道一个类从加载到卸载一共分为七个过程 加载 - 链接(验证 - 准备 - 解析) - 初始化 - 使用 - 卸载 下文我们将详细解析这些过程 谁需要加载? 在Java中数据类型分为基本数据类型和引用数据类型,基本数据类型由虚拟机预定义,引用数据类型则需要类的加载 1.加载/装载(loa…

Docker入门二(应用部署、迁移与备份)

文章目录 一、应用部署1.MySQL部署2.Redis部署3.Nginx部署 二、迁移与备份1.容器做成镜像2.把镜像被分成压缩包 一、应用部署 1.MySQL部署 在dokcer中部署mysql&#xff0c;以后不需要在宿主机上装mysql1.做端口映射docker run -id --namemysql5.7 -p 3306:3306 -e MYSQL_ROOT…

网工内推 | 国企、上市公司网工、运维,CCNA即可,补贴福利多

01 深圳新思 招聘岗位&#xff1a;网络工程师&#xff08;中电集团&#xff09; 职责描述&#xff1a; 1&#xff1a;负责办公室电脑的桌面运维&#xff0c;主要是windows维护与应用维护&#xff1b; 2&#xff1a;负责办公室网络设备配置&#xff0c;如防火墙&#xff0c;交换…

CMake 编译 raylib 程序

CMakeLists.txt 内容如下&#xff1a; cmake_minimum_required(VERSION 3.0) project(t001) # 搜索指定目录下源文件 file(GLOB SRC_LIST ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp) # 包含头文件路径 include_directories(F:/vclib/raylib-5.0_win64_mingw-w64/include) # 包含静态…