python延时队列_如何通过Python实现RabbitMQ延迟队列

最近在做一任务时,遇到需要延迟处理的数据,最开始的做法是现将数据存储在数据库,然后写个脚本,隔五分钟扫描数据表再处理数据,实际效果并不好。因为系统本身一直在用rabbitmq做异步处理任务的中间件,所以想到是否可以利用rabbitmq实现延迟队列。功夫不负有心人,rabbitmq虽然没有现成可用的延迟队列,但是可以利用其两个重要特性来实现之:1、time to live(ttl)消息超时机制;2、dead letter exchanges(dlx)死信队列。下面将具体描述实现原理以及实现代

延迟队列的基础原理time to live(ttl)

rabbitmq可以针对queue设置x-expires 或者 针对message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)

rabbitmq消息的过期时间有两种方法设置。

通过队列(queue)的属性设置,队列中所有的消息都有相同的过期时间。(本次延迟队列采用的方案)对消息单独设置,每条消息ttl可以不同。

如果同时使用,则消息的过期时间以两者之间ttl较小的那个数值为准。消息在队列的生存时间一旦超过设置的ttl值,就成为死信(dead letter)

dead letter exchanges(dlx)

rabbitmq的queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。

x-dead-letter-exchange:出现死信(dead letter)之后将dead letter重新发送到指定exchange

x-dead-letter-routing-key:出现死信(dead letter)之后将dead letter重新按照指定的routing-key发送

队列中出现死信(dead letter)的情况有:

消息或者队列的ttl过期。(延迟队列利用的特性)

队列达到最大长度

消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false

综合上面两个特性,将队列设置ttl规则,队列ttl过期后消息会变成死信,然后利用dlx特性将其转发到另外的交换机和队列就可以被重新消费,达到延迟消费效果。

2cc0ae19e2a7b84fbfa79b907f160722.png

延迟队列设计及实现(python)

从上面描述,延迟队列的实现大致分为两步:

产生死信,有两种方式per-message ttl和 queue ttl,因为我的需求中是所有的消息延迟处理时间相同,所以本实现中采用 queue ttl设置队列的ttl,如果需要将队列中的消息设置不同的延迟处理时间,则设置per-message ttl()

设置死信的转发规则,dead letter exchanges设置方法()

完整代码如下:

"""

created on fri aug 3 17:00:44 2018

@author: bge

"""

import pika,json,logging

class rabbitmqclient:

def __init__(self, conn_str='amqp://user:pwd@host:port/%2f'):

self.exchange_type = "direct"

self.connection_string = conn_str

self.connection = pika.blockingconnection(pika.urlparameters(self.connection_string))

self.channel = self.connection.channel()

self._declare_retry_queue() #retryqueue and retryexchange

logging.debug("connection established")

def close_connection(self):

self.connection.close()

logging.debug("connection closed")

def declare_exchange(self, exchange):

self.channel.exchange_declare(exchange=exchange,

exchange_type=self.exchange_type,

durable=true)

def declare_queue(self, queue):

self.channel.queue_declare(queue=queue,

durable=true,)

def declare_delay_queue(self, queue,dlx='retryexchange',ttl=60000):

"""

创建延迟队列

:param ttl: ttl的单位是us,ttl=60000 表示 60s

:param queue:

:param dlx:死信转发的exchange

:return:

"""

arguments={}

if dlx:

#设置死信转发的exchange

arguments[ 'x-dead-letter-exchange']=dlx

if ttl:

arguments['x-message-ttl']=ttl

print(arguments)

self.channel.queue_declare(queue=queue,

durable=true,

arguments=arguments)

def _declare_retry_queue(self):

"""

创建异常交换器和队列,用于存放没有正常处理的消息。

:return:

"""

self.channel.exchange_declare(exchange='retryexchange',

exchange_type='fanout',

durable=true)

self.channel.queue_declare(queue='retryqueue',

durable=true)

self.channel.queue_bind('retryqueue', 'retryexchange','retryqueue')

def publish_message(self,routing_key, msg,exchange='',delay=0,ttl=none):

"""

发送消息到指定的交换器

:param exchange: rabbitmq交换器

:param msg: 消息实体,是一个序列化的json字符串

:return:

"""

if delay==0:

self.declare_queue(routing_key)

else:

self.declare_delay_queue(routing_key,ttl=ttl)

if exchange!='':

self.declare_exchange(exchange)

self.channel.basic_publish(exchange=exchange,

routing_key=routing_key,

body=msg,

properties=pika.basicproperties(

delivery_mode=2,

type=exchange

))

self.close_connection()

print("message send out to %s" % exchange)

logging.debug("message send out to %s" % exchange)

def start_consume(self,callback,queue='#',delay=1):

"""

启动消费者,开始消费rabbitmq中的消息

:return:

"""

if delay==1:

queue='retryqueue'

else:

self.declare_queue(queue)

self.channel.basic_qos(prefetch_count=1)

try:

self.channel.basic_consume( # 消费消息

callback, # 如果收到消息,就调用callback函数来处理消息

queue=queue, # 你要从那个队列里收消息

)

self.channel.start_consuming()

except keyboardinterrupt:

self.stop_consuming()

def stop_consuming(self):

self.channel.stop_consuming()

self.close_connection()

def message_handle_successfully(channel, method):

"""

如果消息处理正常完成,必须调用此方法,

否则rabbitmq会认为消息处理不成功,重新将消息放回待执行队列中

:param channel: 回调函数的channel参数

:param method: 回调函数的method参数

:return:

"""

channel.basic_ack(delivery_tag=method.delivery_tag)

def message_handle_failed(channel, method):

"""

如果消息处理失败,应该调用此方法,会自动将消息放入异常队列

:param channel: 回调函数的channel参数

:param method: 回调函数的method参数

:return:

"""

channel.basic_reject(delivery_tag=method.delivery_tag, requeue=false)

发布消息代码如下:

from mq.rabbitmq import rabbitmqclient

print("start program")

client = rabbitmqclient()

msg1 = '{"key":"value"}'

client.publish_message('test-delay',msg1,delay=1,ttl=10000)

print("message send out")

消费者代码如下:

from mq.rabbitmq import rabbitmqclient

import json

print("start program")

client = rabbitmqclient()

def callback(ch, method, properties, body):

msg = body.decode()

print(msg)

# 如果处理成功,则调用此消息回复ack,表示消息成功处理完成。

rabbitmqclient.message_handle_successfully(ch, method)

queue_name = "retryqueue"

client.start_consume(callback,queue_name,delay=0)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持萬仟网。

希望与广大网友互动??

点此进行留言吧!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/506743.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

安卓 监听 mysql_Android监听数据库的值改变与否

做购物车的时候有这个需要,当点击增加数量的时候,要更更新总价与数量,也就是往数据库里更新数据,当更新完的时候,就要计算出新的价钱,这就需要对数据库进行监听。这就要用到内容观察者了。第一步&#xff1…

while的用法java_java中的while循环和do while循环

那么在讲解循环之前呢我们先来了解一下什么是循环 生活中的例子 车子的轮胎他就是一直在循环 马拉松跑到也是在循环 因为运动员不停的一圈一圈在跑这也是一个循环那么我们为什么要学习循环呢?下面看一个小问题 张浩Java考试成绩未达到自己的目标。为了表明自己勤…

迷宫java代码_java写的迷宫代码

迷宫代码:截图如下:packagecom.zxl.maze;/** 抽象类表示选择不同的算法*/public abstract classAbstractMap{/** 得到数据*/public abstract boolean[][] getData(int m,intn);/** 重置*/public abstract void reset(int m,intn);}packagecom.zxl.maze;/…

java class list_详解Java 集合类 List 的那些坑

现在的一些高级编程语言都会提供各种开箱即用的数据结构的实现,像 Java 编程语言的集合框架中就提供了各种实现,集合类包含 Map 和 Collection 两个大类,其中 Collection 下面的 List 列表是我们经常使用的集合类之一,很多的业务代…

java 类加载 双亲委派_Java类加载器和双亲委派机制

前言之前详细介绍了Java类的整个加载过程(类加载机制详解)。虽然,篇幅较长,但是也不要被内容吓到了,其实每个阶段都可以用一句话来概括。1)加载:查找并加载类的二进制字节流数据。2)验证:保证被加载的类的正确性。3)准…

win10开发java_win10系统搭建Java开发环境的操作方法

很多小伙伴都遇到过对win10系统搭建Java开发环境进行设置的困惑吧,一些朋友看过网上对win10系统搭建Java开发环境设置的零散处理方法,并没有完完全全明白win10系统搭建Java开发环境的操作方法非常简单,只需要1、双击运行jdk-8u60-windows-x64…

java cms bootstrap_thinkcms: Java CMS系统,完善的后台功能,大气的前台页面. 使用springMVC,hibernate,bootstrap,amazeui....

#thinkcmsJava CMS系统,完善的后台功能,大气的前台页面主要技术springmvc-mvc控制层shiro-方便全面的安全控制框架hibernate-orm框架ehcache-缓存框架前端:jquery,bootstrap,amazeui拉完代码将maven相关包拉下来&#…

java方法不可覆盖_详解Java构造方法为什么不能覆盖,我的钻牛角尖病又犯了.......

一看Think in Java,遇到个程序classEgg2 {protected classYolk {publicYolk() {System.out.println("Egg2.Yolk()");}public voidf() {System.out.println("Egg2.Yolk.f()");}}private Yolk y newYolk();publicEgg2() {System.out.println(&qu…

有人去瑞幸咖啡java_瑞幸股价暴跌,门店竟然爆单了

本文首发于“全天候科技”作者姚心璐。欢迎下载“见闻VIP”,即时见证历史。昨晚至今,有人被瑞幸疯狂打了1.8折,有人疯狂用1.8折券买入瑞幸咖啡。是不是看上去很迷?这的确是冰火两重天一样的真实存在。4月2日,瑞幸自曝2…

我的世界java版做船_“不要在云了,船的合成用木铲?”我的世界:Java和基岩版的差异...

“不要在云了,你这个云玩家!”不知道你没有没有因为某些原因被网友说成是MC的云玩家,我就听到过一个很冤枉的故事。一个已经玩了5年之久的基岩版玩家,被一个玩了一年Java版的玩家硬生生说成是一个云玩家,重点不是这&am…

java线程interu_Intel 10nm服务器U首曝:多线程性能提升118%

Intel 10nm Ice Lake已经应用在轻薄本平台上,当时频率先天不足,而且只能做到4核心,不得不同时祭出14nm Comet Lake予以辅助,而在游戏本、桌面上也不得不继续依赖14nm Comet Lake。根据路线图,服务器平台上Intel今年会先…

java面板换一个斜的圆形_java – 如何从底部设计圆形视图?

我想设计一个从底部是圆形的视图,请看图像I have try different way to design, Like XML, Programatically but I am not able to get any success. I use XML code it’s doing circle from bottom but when I use Any image or Banner slider as show in figure then it’s …

php反序列化漏洞 freebuf,最全的PHP反序列化漏洞的理解和应用

原创:f1r3K0php反序列化漏洞,又叫php对象注入漏洞,是一种常见的漏洞,在我们进行代码审计以及CTF中经常能够遇到。01学习前最好提前掌握的知识PHP类与对象(https://www.php.net/manual/zh/language.oop5.php)PHP魔术方法(https://s…

php按数字分页类,PHP简单实现数字分页功能示例

本文实例讲述了PHP简单实现数字分页功能。分享给大家供大家参考&#xff0c;具体如下&#xff1a;header ( Content-Type: text/html; charsetutf-8 );//分页$page$_GET[page];$allcount 100;$page_size 10;$page_show 5;$page_count ceil($allcount/$page_size);if($page <…

php 瀑布流布局,CSS3实现瀑布流布局的方法

这次给大家带来CSS3实现瀑布流布局的方法&#xff0c;CSS3实现瀑布流布局的注意事项有哪些&#xff0c;下面就是实战案例&#xff0c;一起来看一下。以前使用瀑布流都要用js&#xff0c;现在有了css3&#xff0c;可以轻松实现了。掌握点&#xff1a;1、column-count 把p中的文本…

php 文章列表,ThinkPHP初学者:主页,获取一个文章列表

在之前的文章&#xff0c;已经实现了注册登录的功能&#xff0c;主要熟悉TP与HTML、JS交互&#xff0c;数据库的基本操作等。接下来就要登录到主页&#xff0c;熟悉一下列表的处理&#xff0c;以及数据库多表联查操作。为了简化模型&#xff0c;列表的字段仅有文章标题、简介、…

php js登录,php+js实现单点登录

phpjs实现单点登录2020年08月14日 00:45:23阅读数&#xff1a;110登录信息表DROP TABLE IF EXISTS fly_admin_login_info;CREATE TABLE fly_admin_login_info (id int(11) unsigned NOT NULL AUTO_INCREMENT,admin_id int(11) unsigned DEFAULT NULL,email varchar(80) DEFAULT…

android jni java调用c,Android与JNI(一) ---- Java调用C 静态调用

第一、通过eclipse新建一个工程名为HelloJni的android工程&#xff0c;并编译。第二、右键工程-->Android Tools --> Add Native Support,出现如下界面&#xff0c;名字默认就可以了&#xff0c;点击finish。第三、我们在MainActivity类中加入要调用的native代码public n…

c语言程序综合实习学生成绩,C语言程序设计综合实习报告

课题一&#xff1a;用指针优化学生成绩排名一、目的1&#xff0e;熟悉变量的指针和指向变量的的指针变量的概念和使用2&#xff0e;熟悉数组的指针和指向数组的的指针变量的概念和使用3. 掌握冒泡法或选择法排序的算法4. 掌握函数的定义、调用、声明&#xff0c;以及参数的两种…

c语言求最多啤酒数,C语言,算法、动态规划:有一个箱子的容量为v(正整数,0=v=20000),同时有n个物品(0n=30),...

满意答案24k纯真爱l2013.11.07采纳率&#xff1a;42% 等级&#xff1a;12已帮助&#xff1a;9552人#include#define N 30int xiangzi(int n ,int V ,int a[]) //楼主后面的Vo数组必须放进递归函数里面或定义成全局数组 另外h[n]什么情况??{int minv,t,mV;if(n0){if(a[n]&l…