flask整合rabbitMQ插件的方式

文章目录

  • 二、Python-flask-rabbitMQ-插件方式整合
    • 引言
    • 具体步骤
      • 1 安装依赖:
      • 2 编写实体类:
      • 3 编写消费者和生产者:
      • 4 初始化消费者和生产者:
      • 5 其他地方使用生产者

二、Python-flask-rabbitMQ-插件方式整合

引言

当今互联网应用的高并发场景下,如何保证可靠的消息传递和高效的处理成为了一项重要的挑战。在这种情况下,RabbitMQ作为一种可靠的消息队列系统,被广泛应用于各个领域。

本文将介绍如何使用RabbitMQ插件整合Flask框架,实现并发性生产者和消费者的兼容。通过这种方式,我们可以利用RabbitMQ的优势来确保消息的可靠传递,并提高应用程序的处理能力。

首先,我们需要在Flask应用程序中引入RabbitMQ插件。可以使用Pika库来连接和操作RabbitMQ。通过创建一个连接池管理连接对象,我们可以避免频繁地创建和销毁连接,提高效率。

同时,为了处理高并发的生产者,我们可以使用批量发送消息的方式来减少通道创建和消息发布的开销。通过设置缓冲区来收集一定数量或一定时间间隔内的消息,然后批量发送,可以更有效地利用资源。

此外,对于高并发的消费者,我们可以考虑使用异步的方式来处理消息。通过将消息发送任务交给后台线程或异步任务队列处理,可以避免请求的堵塞,提高应用程序的并发能力。

通过以上的优化方案,我们可以在Flask应用程序中充分利用RabbitMQ的功能,并且兼容高并发的生产者和消费者。这将帮助我们构建更可靠、高效的消息队列系统,应对日益增长的并发访问需求。

总之,使用RabbitMQ插件整合Flask框架,并采用优化方案来兼容并发性生产者和消费者,是构建可靠、高效消息传递系统的关键一步。通过这种方式,我们能够更好地应对高并发场景下的挑战,提升应用程序的性能和稳定性。

具体步骤

1 安装依赖:

使用pip安装pika库:

pip install pika

2 编写实体类:

from queue import Queue
from threading import Lockimport pika# 定义交换机类型的枚举值
class ExchangeType:DEFAULT = 'default'DIRECT = "direct"FANOUT = "fanout"TOPIC = 'topic'class RabbitMQ:def __init__(self, host='localhost', port=5672, username='guest', password='guest', pool_size=10):self.credentials = pika.PlainCredentials(username, password)self.parameters = pika.ConnectionParameters(host=host, port=port, credentials=self.credentials)self.connection_pool = Queue(pool_size)  # 连接池,存储连接和信道self.lock = Lock()  # 互斥锁,用于对连接池的访问进行同步for _ in range(pool_size):connection = self._create_connection()channel = connection.channel()self.connection_pool.put((connection, channel))def _create_connection(self):return pika.BlockingConnection(self.parameters)def get_channel(self):with self.lock:connection, channel = self.connection_pool.get()  # 从连接池获取连接和信道return connection, channeldef release_channel(self, connection, channel):with self.lock:self.connection_pool.put((connection, channel))  # 将连接和信道放回连接池def send_message(self, exchange, routing_key, message, exchange_type=ExchangeType.DEFAULT):connection, channel = self.get_channel()try:channel.exchange_declare(exchange=exchange, exchange_type=exchange_type)  # 声明交换机并指定类型channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message)  # 发布消息finally:self.release_channel(connection, channel)def receive_messages(self, queue, callback):connection, channel = self.get_channel()try:channel.queue_declare(queue=queue, durable=True)  # 声明队列并标记为持久化# channel.queue_purge(queue=queue)  # 清空队列,以防之前的非持久化消息残留channel.basic_qos(prefetch_count=10)  # 每次从 RabbitMQ 获取 10 条消息channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=False)  # 消费消息并设置回调函数channel.start_consuming()  # 开始消费消息finally:self.release_channel(connection, channel)

3 编写消费者和生产者:

def connect_mq(app):# 初始化 RabbitMQ 实例rabbitmq = RabbitMQ(host=app.config['RABBITMQ_HOST'], port=5672, username='guest', password='guest')# 在应用上下文中注册 RabbitMQ 实例app.config['RABBITMQ'] = rabbitmq# consume_mq(app)#thread = threading.Thread(target=consume_mq, args=(app,))# 启动线程thread.start()def consume_mq(app):# 启动消费者程序,开始接收和处理消息def callback(ch, method, properties, body):try:print(f"消息队列内容 {body.decode()}")# 处理rabbitMQ内容to_transcribe(body.decode())except Exception as e:print(str(e))ch.basic_ack(delivery_tag=method.delivery_tag)# 启动消费者程序,开始接收和处理消息with app.app_context():rabbitmq = current_app.config['RABBITMQ']rabbitmq.receive_messages('audio_queue', callback)

4 初始化消费者和生产者:

def create_app():app = Flask(__name__)connect_mq_v1(app)

5 其他地方使用生产者


class MessageHandler:"""处理存放音频,将所有的任务都放在MQ里面"""def __init__(self, dir_name,uuid_str, back_url, file_url, request_type, file_name, *args, **kwargs):# 文件夹名称self.dir_name = dir_name# 文件名称self.file_name = file_name# 文件上传类型self.request_type = request_type# 文件存储位置self.file_url = file_url# 客户端回调地址self.back_url = back_url# 唯一标识self.uuid_str = uuid_strdef send(self):""":param content_type:队列类型:param rpc:MQ对象:return:"""try:# 发送消息队列# rpc.send_expire(body=json.dumps(self.to_json()), exchange='audio_queue', key='audio_queue')rabbitmq = current_app.config['RABBITMQ']rabbitmq.send_message('audio_queue', 'audio_queue', json.dumps(self.to_json()))print("发送消息到mq成功,用于存放音频信息")except Exception as e:print(f"发送消息到mq服务失败,请检查, {e}")def to_json(self):_dict = self.__dict__return _dict
# 将请求体和uuid放到rabbitMQ中
MessageHandler(**dates).send()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/113619.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

BERT变体(1):ALBERT、RoBERTa、ELECTRA、SpanBERT

Author:龙箬 Computer Application Technology Change the World with Data and Artificial Intelligence ! CSDNweixin_43975035 天下之大,虽离家万里,何处不可往!何事不可为! 1. ALBERT \qquad ALBERT的英文全称为A Lite versio…

【推荐算法】ctr cvr联合建模问题合集

ctr和cvr分开建模相比ctcvr的优势? 在电商搜索推荐排序中,将ctr和cvr分开建模,相比直接建模ctcvr的优势是什么? - 萧瑟的回答 - 知乎 总结: 1、ctr的数据可以试试获取,能实时训练。但是cvr存在延迟现象&…

最近学习内容(2023-10-21)

最近学习内容 Linux编译链接命令一条有用的删除可执行文件的bash命令gcc 在macos 的编译选项,其中-g会生成一个.dSYM文件夹to long don’t read 工具的使用gnu bintuils 的使用,但是很可惜macos上的是Mach-O,不是ELFaxel多线程下载器和其余的…

使用nginx方向代理部署Vue项目刷新页面404的问题解决

文章目录 问题假设原理探究问题解决 问题假设 部署出现的问题为:由于项目中使用的vue router 项目直接使用node环境部署项目,在同一个路由如: 192.168.1.30:/home刷新浏览器正常 nginx部署刷新不出现404 /nginx not found 如何解决?以下是我…

C# 文件 校验:MD5、SHA1、SHA256、SHA384、SHA512、CRC32、CRC64

文件 校验 算法:MD5、SHA1、SHA256、SHA384、SHA512、CRC32、CRC64 (免费) 编程语言:C# 功能:文件 哈希 属性 校验算法:MD5、SHA1、SHA256、SHA384、SHA512、CRC32、CRC64。 下载(免费):https://download.csdn.net/download/polloo2012/88450148 本程序 File Pro…

工程监测仪器振弦传感器信号转换器在桥梁安全监测中的重要性

工程监测仪器振弦传感器信号转换器在桥梁安全监测中的重要性 桥梁是人类社会建设过程中最重要的交通基础设施之一,对于保障人民出行、促进经济发展具有极其重要的作用。由于桥梁结构在长期使用过程中受到环境因素和负荷的影响,会逐渐发生变形和损伤&…

python读写.pptx文件

1、读取PPT import pptx pptpptx.Presentation(rC:\Users\user\Documents\\2.pptx) # ppt.save(rC:\Users\user\Documents\\1.pptx) # slideppt.slides.add_slide(ppt.slide_layouts[1])# 读取所有幻灯片上的文字 for slide in ppt.slides:for shape in slide.shapes:if shape…

10月21日,每日信息差

今天是2023年10月21日,以下是为您准备的13条信息差 第一、东方物探公司与阿里云达成战略合作,逐步助力勘探行业实现智能化、自动化、绿色化和可持续化的目标 第二、九洲集团签约300MW集中式风电项目计划总投资21亿,项目达产后,预…

【前端vue面试】vuex

目录 什么是 Vuex?vuex流程图Vuex 和单纯的全局对象不同点StatemapStateGetterMutationmapMutationsAction基础用法dispatch提交载荷(Payload)mapActions组合 ActionModules什么是 Vuex? Vuex 是一个专为 Vue.js 应用程序开发的状态管理模式。它采用集中式存储管理应用的所…

Android Studio初学者实例:RecyclerView学习--模仿今日头条--续

新学期开始了,这篇文章收到了很多人的评论有很多地方不懂,所以写下了以下的文章--续篇 首先使用RecyclerView也好还是使用ListView,更或是GridView你都要先构思需要什么 这些东西无一例外通常都是用在列表显示下,那么需要一些&a…

vue ref和$refs获取dom元素

vue ref和$refs获取dom元素 **创建 工程: H:\java_work\java_springboot\vue_study ctrl按住不放 右键 悬着 powershell H:\java_work\java_springboot\js_study\Vue2_3入门到实战-配套资料\01-随堂代码素材\day04\准备代码\14-ref和$refs获取dom对象 vue --ve…

【OpenVINO】行人摔倒检测 — 基于 OpenVINO C# API 部署PP-Human-上篇

行人摔倒检测 — 基于 OpenVINO C# API 部署PP-Human 1. 英特尔开发套件1.1 OpenVINO1.2 AIxBoard 介绍产品定位产品参数AI推理单元 2. PaddleDetection实时行人分析工具PP-Human3. 预测模型获取与转换3.1 PP-YOLOE行人跟踪模型介绍模型下载与转换(1)Pad…

【ARM Coresight 系列文章19.1 -- Cortex-A720 PMU 详细介绍】

文章目录 概述Cortex-A720 PMU Features1.1 PMU 使用介绍1.2 Performance monitors events1.3 Performance Monitors Extension registers1.3.1 Performance monitors program1.4 Performance monitors interrupts1.5 Interaction with the Performance Monitoring Unit and De…

2023年传媒行业中期策略 AIGC从三个不同层次为内容产业赋能

基本面和新题材共振,推动传媒互联网行情上涨 AIGC 概念带动,传媒板块领涨 A 股 2023 年第一个交易日(1 月 3 日)至 6 月 2 日,申万传媒指数区间涨幅高达 48.38%,同时期沪深 300 跌幅为 0.25%,…

Python大数据之PySpark

PySpark入门 1、 Spark与PySpark 1、 Spark与PySpark

centos7 部署oracle完整教程(命令行)

centos7 部署oracle完整教程(命令行) 一. centos7安装oracle1.查看Swap分区空间(不能小于2G)2.修改CentOS系统标识 (由于Oracle默认不支持CentOS)2.1.删除CentOS Linux release 7.9.2009 (Core)(快捷键dd)&…

【公众号开发】图像文字识别 · 模板消息推送 · 素材管理 · 带参数二维码的生成与事件的处理

【公众号开发】(4) 文章目录 【公众号开发】(4)1. 图像文字识别功能1.1 百度AI图像文字识别接口申请1.2 查看文档学习如何调用百度AI1.3 程序开发1.3.1 导入依赖:1.3.2 公众号发来post请求格式1.3.3 对image类型的消息…

QStringListModel

创建模型&#xff1a; QStringListModel* model new QStringListModel(this); 初始化列表&#xff1a; QStringList strList;strList << QStringLiteral("北京") << QStringLiteral("上海") << QStringLiteral("天津") &l…

《基于 Vue 组件库 的 Webpack5 配置》8.在生成打包文件之前清空 output(dist) 目录(两种方式)

方式一 ​ 如果 webpack 是 v5.20.0&#xff0c;直接使用属性 output.clean&#xff0c;配置如下&#xff1a; module.exports {//...output: {clean: true}, };方式二 如果使用较低版本&#xff0c;可以使用插件 clean-webpack-plugin&#xff1a; 先安装&#xff1a;npm…

Python---死循环概念---while True

在编程中一个靠自身控制无法终止的程序称为“死循环”。 在Python中&#xff0c;我们也可以使用while True来模拟死循环&#xff1a; 代码&#xff1a; while True: print(每天进步一点点) 图示 应用&#xff1a; 比如&#xff0c;在测试里面&#xff0c;自动化测试用例…