多并发发短信处理(头条项目-07)

1 pipeline操作 Redis数据库

Redis 的 C/S 架构:

  • 基于客户端-服务端模型以及请求/响应协议的 TCP服务
  • 客户端向服务端发送⼀个查询请求,并监听Socket返回。
  • 通常是以 阻塞模式,等待服务端响应。
  • 服务端处理命令,并将结果返回给客户端。

存在的问题:

  • 如果Redis服务端 同时处理多个请求,加上⽹络延迟,那么服务端利⽤率不⾼,效率降低。

解决的办法:

  • 管道pipeline

1.1 pipeline的介绍

管道pipeline

  • 可以⼀次性发送多条命令并在执⾏完后⼀次性将结果返回
  • pipeline通过 减少客户端与Redis的通信次数 来实现降低往返延时时间。

实现的原理

  • 实现的原理是 队列
  • Client可以将三个命令 放到⼀个tcp报⽂⼀起发送
  • Server则可以 将三条命令的处理结果放到⼀个tcp报⽂ 返回。
  • 队列是先进先出,这样就保证数据的顺序性。

1.2 pipeline操作Redis数据库

1.2.1 实现步骤

1. 创建Redis管道

2. 将Redis请求添加到队列

3. 执⾏请求

1.2.2 代码实现
# 创建Redis管道
pl = redis_conn.pipeline()
# 将Redis请求添加到队列
pl.setex('sms_%s' % phone, 60, smscode)
pl.setex('is_send_%s' % phone, 60, 1)
# 执⾏请求
pl.execute()

2 生产者消费者设计模式

存在的问题:

性能优化:

思考:如何 将发送短信从主业务中解耦出来

⽣产者消费者设计模式介绍

  • 为了将发送短信从主业务中解耦出来,我们 引⼊⽣产者消费者设计模式
  • 它是最常⽤的解耦⽅式之⼀,寻找中间⼈(broker)搭桥,保证两个业务没有直接关联

总结:

  • ⽣产者⽣成消息,缓存到消息队列 中,消费者读取消息队列中的消息并执⾏。
  • 由芒果头条⽣成发送短信消息,缓存到消息队列中,消费者读取消息队列中的发送短信消息并执⾏

3 RabbitMQ介绍和使用

3.1 RabbitMQ介绍

  • 开源AMQP实现,Erlang 语⾔编写,⽀持多种客户端
  • 分布式、⾼可⽤、持久化、可靠、安全
  • ⽀持多种协议:AMQP、STOMP、MQTT、HTTP
  • 适⽤于多系统之间 的业务解耦的消息中间件

3.2 消息队列 选择建议

3.2.1 Kafka

Kafka主要特点是基于Pull的模式来处理消息消费,追求⾼吞吐量,⼀开始的⽬的就 是⽤于⽇志收集和传输,适合产⽣⼤量数据的互联⽹服务的数据收集业务

⼤型公司建议可以选⽤,如果有⽇志采集功能,肯定是⾸选kafka 了。

3.2.2 RocketMQ

天⽣为⾦融互联⽹领域⽽⽣,对于可靠性要求很⾼的场景,尤其是电商⾥⾯的订单 扣款,以及业务削峰,在⼤量交易涌⼊时,后端可能⽆法及时处理的情况。

RoketMQ在稳定性上可能更值得信赖,这些业务场景在阿⾥双11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择RocketMQ。

3.2.3 RabbitMQ

RabbitMQ: 结合erlang语⾔本身的并发优势,性能较好,社区活跃度也⽐较⾼,但是 不利于做⼆次开发和维护。不过,RabbitMQ的社区⼗分活跃,可以解决开发过程 中遇到的bug。

如果你的数据量没有那么⼤,⼩公司优先选择功能⽐较完备的RabbitMQ

3.3 安装RabbitMQ(ubuntu 18.04)

安装⽅式1(推荐):

  • 安装Erlang 参考安装Erlang版本
  • 安装RabbitMQ 参考官⽹安装步骤
  • rabbitmq-server 安装包下载链接

安装⽅式2:

# 1. 安装erlang
#由于rabbitmq需要erlang语⾔的⽀持,在安装rabbitmq之前需要安装erlang
sudo apt-get install erlang-nox
# 2. 安装Rabbitmq
#更新源
sudo apt-get update
#安装
sudo apt-get install rabbitmq-server

服务器操作:

# 重启服务器
$ sudo systemctl restart rabbitmq-server
# 启动服务器
$ sudo systemctl start rabbitmq-server
# 关闭服务器
$ sudo systemctl stop rabbitmq-server
# 查看服务器状态
sudo service rabbitmq-server status
# 查看rabbitmq 基本信息
sudo rabbitmqctl status

3.4 添加admin,并赋予administrator权限

# 添加admin⽤户,密码设置为admin。
sudo rabbitmqctl add_user admin admin 
# 赋予权限
sudo rabbitmqctl set_user_tags admin administrator
# 赋予virtual host中所有资源的配置、写、读权限以便管理其中的资源
sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
# 查看⽤户列表
sudo rabbitmqctl list_users
# 删除⽤户
$ sudo rabbitmqctl delete_user admin

3.5 启动服务器测试

# 安装了Rabbitmq后,默认也安装了该管理⼯具,执⾏命令即可启动
sudo rabbitmq-plugins enable rabbitmq_management(先定位到rabbitmq安装⽬录)
# 浏览器访问
http://localhost:15672/

3.6 Python访问RabbitMQ

  • RabbitMQ 提供默认的administrator账户
  • ⽤户名和密码:guest、guest
  • 协议:amqp
  • 地址:localhost
  • 端⼝:15672
  • 查看队列中的消息:sudo rabbitctl list_queues
# Python3虚拟环境下,安装pika
$ pip install pika
# ⽣产者代码:producer.py
import pika# 链接到RabbitMQ服务器
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
# 创建频道
channel = connection.channel()
# 声明消息队列
channel.queue_declare(queue='zhuozi')
# routing_key是队列名 body是要插⼊的内容
channel.basic_publish(exchange='',routing_key='zhuozi', body=b'Hello RabbitMQ!')
print("开始向 'zhuozi' 队列中发布消息 '汉堡做好啦!'")
# 关闭链接
connection.close()
# 消费者代码:consumer.py
import pika# 链接到rabbitmq服务器
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
# 创建频道,声明消息队列
channel = connection.channel()
# 和⽣产者声明同⼀个队列,如果⼀⽅挂掉,不会丢失数据
channel.queue_declare(queue='zhuozi')# 定义接受消息的回调函数
def callback(channel, method, properties, body):print(body)# 告诉RabbitMQ使⽤callback来接收信息
channel.basic_consume(on_message_callback=callback, queue='zhuozi',auto_ack=True)
# 开始接收信息
channel.start_consuming()

3.7 RabbitMQ配置远程访问

直接使⽤新建的管理员⽤户访问即可远程访问。

Celery 介绍和使用

存在问题:

  • 消费者取到消息之后,需要 异步处理
  • 任务可能出现⾼并发的情况,需要多任务的⽅式执⾏
  • 耗时任务很多种,每种耗时任务编写的⽣产者和消费者代码有重复
  • 取到的消息什么时候执⾏,以什么样的⽅式执⾏

结论:

  • 实际开发中,我们可以 借助成熟的⼯具Celery 来完成。
  • 有了Celery,我们在使⽤⽣产者消费者模式时,只需要关注任务本身,极⼤的 简化了程序员的开发流程

4.1 Celery介绍

Celery介绍:

  • ⼀个 简单、灵活且可靠、处理⼤量消息的分布式系统,可以在⼀台或者多台机器上运⾏。
  • 单个 Celery进程每分钟可处理数以百万计 的任务。
  • 通过消息进⾏通信,使⽤消息队列(broker)在客户端和消费者之间进⾏协调。

安装Celery:

$ pip install -U Celery

Celery官⽅⽂档

4.2 创建Celery实例并加载配置

4.2.1 定义Celery包
# mgproject/mgproject/celery_tasks
在项⽬包⽬录下创建celery_tasks(python package)
4.2.2 创建Celery实例

在celery_tasks包⽬录下 创建main.py⽂件

# celery_tasks/main.py
# celery启动⽂件
from celery import Celery
# 创建celery实例
celery_app = Celery('mangguo')
4.2.3 加载Celery配置

在celery_tasks包⽬录下 创建config.py⽂件

# celery_tasks/config.py
# 指定消息队列的位置
broker_url= 'amqp://guest:guest@localhost:5672'
# 修改celery_tasks/main.py
# celery启动⽂件
from celery import Celery
# 创建celery实例
celery_app = Celery('mangguo')
# 加载celery配置
celery_app.config_from_object('celery_tasks.config')

4.3 定义发送短信任务

在celery_tasks包⽬录下创建 sms(python包)/tasks.py

4.3.1 注册任务

celery_tasks.main.py

from celery import Celery
# 创建celery实例
celery_app = Celery('mangguo')
# 加载celery配置
celery_app.config_from_object('celery_tasks.config')
# ⾃动注册celery任务
celery_app.autodiscover_tasks(['celery_tasks.sms'])
4.3.2 定义任务

celery_tasks.sms.tasks.py

import os
import sys# 添加导包路径
B_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.insert(1, B_DIR)
sys.path.insert(0, os.path.join(B_DIR, 'utils'))
import logging
from celery_tasks.main import celery_app# 为celery使⽤django配置⽂件进⾏设置
if not os.getenv('DJANGO_SETTINGS_MODULE'):os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings.dev')
from huyi_sms.sms3 import send_sms_codelogger = logging.getLogger('django')@celery_app.task(name='huyi_send_sms_code')
def huyi_send_sms_code(phone, smscode_str):"""发送短信异步任务:param phone: ⼿机号:param smscode: 短信验证码:return: 成功 code=2 或 失败 smsid=0"""try:# 调⽤外部接⼝执⾏发送短信任务ret = send_sms_code(smscode_str, phone)except Exception as e:logger.error(e)if ret.get('code') != 2:logger.error(e)return ret.get('code', None)

4.4 启动Celery服务

$ cd ~/Desktop/projects/mangguo/mgproject
$ celery -A celery_tasks.main worker -l info
  • -A指对应的应⽤程序,  其参数是项⽬中 Celery实例的位置
  • worker指这⾥要启动的worker。
  • -l指⽇志等级,⽐如info等级。

4.5 调⽤发送短信任务

# verifications/views.py
from mgproject.celery_tasks.sms.tasks import huyi_send_sms_code# Celery异步发送短信验证码
ret = huyi_send_sms_code.delay(phone, smscode_str)
# 8. 根据外部接⼝返回值响应前端结果
if ret:  # 执⾏⼀个任务就返回⼀个taskid 689e889c-a607-49f3-9777-248a8dcce310return JsonResponse({'code': '200', 'errormsg': 'OK'})
return JsonResponse({'code': '5001', 'errormsg': '发送短信验证码错误'})

4.6 补充celery worker的⼯作模式

  • 默认是 进程池⽅式,进程数以当前机器的CPU核数为参考,每个CPU开四个进 程。
  • 如何⾃⼰指定进程数:celery -A proj worker --concurrency=4
  • 如何改变进程池⽅式为协程⽅式:
  • celery -A proj worker --concurrency=1000 -P eventlet -c 1000
# 安装eventlet模块
$ pip install eventlet
# 启⽤ Eventlet 池
$ celery -A celery_tasks.main worker -l info -P eventlet -c 1000

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/66954.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【网络协议】动态路由协议

前言 本文将概述动态路由协议,定义其概念,并了解其与静态路由的区别。同时将讨论动态路由协议相较于静态路由的优势,学习动态路由协议的不同类别以及无类别(classless)和有类别(classful)的特性…

c#集成npoi根据excel模板导出excel

NuGet中安装npoi 创建excel模板,替换其中的内容生成新的excel文件。 例子中主要写了这四种情况: 1、替换单个单元格内容; 2、替换横向多个单元格; 3、替换表格; 4、单元格中插入图片; using System.IO; …

人工智能知识分享第十天-机器学习_聚类算法

聚类算法 1 聚类算法简介 1.1 聚类算法介绍 一种典型的无监督学习算法,主要用于将相似的样本自动归到一个类别中。 目的是将数据集中的对象分成多个簇(Cluster),使得同一簇内的对象相似度较高,而不同簇之间的对象相…

B树及其Java实现详解

文章目录 B树及其Java实现详解一、引言二、B树的结构与性质1、节点结构2、性质 三、B树的操作1、插入操作1.1、插入过程 2、删除操作2.1、删除过程 3、搜索操作 四、B树的Java实现1、节点类实现2、B树类实现 五、使用示例六、总结 B树及其Java实现详解 一、引言 B树是一种多路…

本地缓存:Guava Cache

这里写目录标题 一、范例二、应用场景三、加载1、CacheLoader2、Callable3、显式插入 四、过期策略1、基于容量的过期策略2、基于时间的过期策略3、基于引用的过期策略 五、显示清除六、移除监听器六、清理什么时候发生七、刷新八、支持更新锁定能力 一、范例 LoadingCache<…

【高录用 | 快见刊 | 快检索】第十届社会科学与经济发展国际学术会议 (ICSSED 2025)

第十届社会科学与经济发展国际学术会议(ICSSED 2025)定于2025年2月28日-3月2日在中国上海隆重举行。会议主要围绕社会科学与经济发展等研究领域展开讨论。会议旨在为从事社会科学与经济发展研究的专家学者提供一个共享科研成果和前沿技术&#xff0c;了解学术发展趋势&#xff…

[ComfyUI]接入Google的Whisk,巨物融合玩法介绍

一、介紹​ 前段时间&#xff0c;谷歌推出了一个图像生成工具whisk&#xff0c;有一个很好玩的图片融合玩法&#xff0c;分别提供三张图片,就可以任何组合来生成图片。​ ​ 最近我发现有人开发了对应的ComfyUI插件&#xff0c;对whisk做了支持&#xff0c;就来体验了下&#…

模式识别与机器学习

文章目录 考试题型零、简介1.自学内容(1)机器学习(2)机器学习和统计学中常见的流程(3)导数 vs 梯度(4)KL散度(5)凸优化问题 2.基本概念3.典型的机器学习系统4.前沿研究方向举例 一、逻辑回归1.线性回归2.逻辑回归3.随堂练习 二、贝叶斯学习基础1.贝叶斯公式2.贝叶斯决策3.分类器…

nginx负载均衡-基于端口的负载均衡(一)

注意&#xff1a; (1) 做负载均衡技术至少需要三台服务器&#xff1a;一台独立的负载均衡器&#xff0c;两台web服务器做集群 一、nginx分别代理后端web1 和 web2的三台虚拟主机 1、web1&#xff08;nginx-10.0.0.7&#xff09;配置基于端口的虚拟主机 [rootOldboy extra]# …

【ArcGIS微课1000例】0138:ArcGIS栅格数据每个像元值转为Excel文本进行统计分析、做图表

本文讲述在ArcGIS中,以globeland30数据为例,将栅格数据每个像元值转为Excel文本,便于在Excel中进行统计分析。 文章目录 一、加载globeland30数据二、栅格转点三、像元值提取至点四、Excel打开一、加载globeland30数据 打开配套实验数据包中的0138.rar中的tif格式栅格土地覆…

智能安全帽_4G/5G智能安全帽主板方案定制开发

智能安全帽是一种先进的安全防护设备&#xff0c;主要以视频和语音通话为功能&#xff0c;能够全面记录施工现场的作业情况&#xff0c;并支持管理人员与现场工作人员之间的双向语音通话。这一创新设计使得项目管理人员能够实时、有效地掌握施工过程中的安全和质量情况。 这款智…

uniApp通过xgplayer(西瓜播放器)接入视频实时监控

&#x1f680; 个人简介&#xff1a;某大型国企资深软件开发工程师&#xff0c;信息系统项目管理师、CSDN优质创作者、阿里云专家博主&#xff0c;华为云云享专家&#xff0c;分享前端后端相关技术与工作常见问题~ &#x1f49f; 作 者&#xff1a;码喽的自我修养&#x1f9…

基于RK3568/RK3588大车360度环视影像主动安全行车辅助系统解决方案,支持ADAS/DMS

产品设计初衷 HS-P2-2D是一款针对大车盲区开发的360度全景影像 安全行车辅助系统&#xff0c;通过车身四周安装的超广角像机&#xff0c;经算法合成全景鸟瞰图&#xff0c;通过鸟瞰图&#xff0c;司机非常清楚的看清楚车辆四周情况&#xff0c;大大降低盲区引发的交通事故。 产…

树的模拟实现

一.链式前向星 所谓链式前向星&#xff0c;就是用链表的方式实现树。其中的链表是用数组模拟实现的链表。 首先我们需要创建一个足够大的数组h&#xff0c;作为所有结点的哨兵位。创建两个足够大的数组e和ne&#xff0c;一个作为数据域&#xff0c;一个作为指针域。创建一个变…

【C++入门】详解(中)

目录 &#x1f495;1.函数的重载 &#x1f495;2.引用的定义 &#x1f495;3.引用的一些常见问题 &#x1f495;4.引用——权限的放大/缩小/平移 &#x1f495;5. 不存在的空引用 &#x1f495;6.引用作为函数参数的速度之快&#xff08;代码体现&#xff09; &#x1f4…

《Opencv》图像的旋转

一、使用numpy库实现 np.rot90(img,-1) 后面的参数为-1时事顺时针旋转&#xff0c;为1时是逆时针旋转。 import cv2 import numpy as np img cv2.imread(./images/kele.png) """方法一""" # 顺时针90度 rot_1 np.rot90(img,-1) # 逆时针90度…

CES 2025|全面拥抱端侧AI,美格智能在CES发布系列创新成果

要点&#xff1a; ▶ 在AI机器人领域&#xff0c;以高算力AI模组助力发布“通天晓”人形机器人和2款全新微小型AI机器人 ▶ 在AI硬件领域&#xff0c;发布消费级AI智能体产品——AIMO&#xff0c;引领个人专属的大模型时代 ▶ 在5G通信领域&#xff0c;发布全新5GWiFi-7 CPE…

Spring Boot 支持哪些日志框架

Spring Boot 支持多种日志框架&#xff0c;主要包括以下几种&#xff1a; SLF4J (Simple Logging Facade for Java) Logback&#xff08;默认&#xff09;Log4j 2Java Util Logging (JUL) 其中&#xff0c;Spring Boot 默认使用 SLF4J 和 Logback 作为日志框架。如果你需要使…

SpringBoot操作spark处理hdfs文件

SpringBoot操作spark处理hdfs文件 1、导入依赖 <!-- spark依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.2.2</version></dependency><depend…

Jaeger UI使用、采集应用API排除特定路径

Jaeger使用 注&#xff1a; Jaeger服务端版本为&#xff1a;jaegertracing/all-in-one-1.6.0 OpenTracing版本为&#xff1a;0.33.0&#xff0c;最后一个版本&#xff0c;停留在May 06, 2019。最好升级到OpenTelemetry。 Jaeger客户端版本为&#xff1a;jaeger-client-1.3.2。…