rabbitmq取消自动重连_rabbitmq客户端自动重连

编程rookie, 如有错误请指出 ☞:

253065903@qq.com

RabbitMQ

Node.js 客户端( AMQP 0-9-1 V0.5.2

)自动重连

重启策略

开始找解决方案:

通过查看AMQP的源码,发现没有reconnect的选项

然后上GitHub上看有没有人提出类似的问题 github repo

,通过输入 reconnect

搜索issue区找到题为 Support Auto-reconnection

的 issue

,发现这个问题是早在 2013

年就提出来的,可是现在还是 open

的状态!

在这个Issue区发现有一个解决方案还是可以实践一下的:

function connectRMQ() {

amqp.connect(config.rabbitmq.URI).then(function(conn) {

conn.on('close', function() {

console.error('Lost connection to RMQ. Reconnecting in 60 seconds...');

return setTimeout(connectRMQ, 60 * 1000);

});

return conn.createChannel().then(function(ch) {

var ok = ch.assertQueue(config.rabbitmq.queue, {durable: true});

ok = ok.then(function() {

ch.prefetch(1);

ch.consume(config.rabbitmq.queue, doWork, {noAck: false});

});

return ok.then(function() {

console.log(" [*] Waiting in %s.", config.rabbitmq.queue);

});

function doWork(msg) {

var body = msg.content.toString();

console.log(" [x] Received %s", body);

setTimeout(function() {

ch.ack(msg);

}, config.rabbitmq.timeout);

}

});

}).then(null, function() {

setTimeout(connectRMQ, 10 * 1000);

return console.log('connection failed');

});

}

connectRMQ();

上述的解决方案是在建立连接之后对连接添加 close

的监听事件,当 close

事件触发,

或者连接出错以及之后的步骤出错都在10s之后重新调用方法本身实现重连

还有一种简单粗暴的方法,监听 close

、 error

事件, 有错误就抛出来,然后依靠外部的守护程序将此客户端重启

实践

按照Issue区发现的解决方案进行实践,修改之前的代码

#!/usr/bin/env node

const MQ_CONFIG = require('./conf/rabbitmq')

const REDIS_CONFIG = require('./conf/redis')

const Utils = require('./lib/Utils')

const pubRedisCli = Utils.connectRedis(REDIS_CONFIG.url)

var amqp = require('amqplib')

var ex = MQ_CONFIG.ex

var patten = MQ_CONFIG.routing_key

var exType = MQ_CONFIG.ex_type

var q = MQ_CONFIG.q || 'signals'

var cnt = 0

function connect() {

amqp

.connect(MQ_CONFIG.url)

.then(conn => {

conn.on('close', e => {

reconnect(e)

})

return conn

})

.then(conn => {

cnt = 0

log('connect RMQ OK')

console.log(' [*] Waiting for signals. To exit press CTRL+C')

return conn.createChannel()

})

.then(ch => {

return ch.assertQueue(q, {

durable: true

})

.then(() => {

return ch.assertExchange(ex, exType, {

durable: true

})

})

.then(() => {

return ch.assertQueue(q, {

durable: true

})

})

.then(() => {

return ch.bindQueue(q, ex, patten)

})

.then(() => {

return ch.consume(q, (msg) => {

pubRedisCli.publish(msg.fields.routingKey, msg.content.toString(), function (err, reply) {

if (err) {

log(err)

ch.nack(msg)

} else {

if (reply !== 0) {

ch.ack(msg)

} else {

ch.ack(msg)

saveUnSubscribeMsg(msg.content.toString())

}

}

})

})

})

})

.catch(e => {

reconnect(e)

})

}

function reconnect(e) {

log(e.message)

log('lost RMQ connection')

cnt++

log(`正在第${cnt}次重新连接RMQ...`)

setTimeout(() => {

connect()

}, 10 * 1000)

}

connect()

/**

* if signals didn't be subscribed, they would be saved to ./data dir

* @param {string} msg

*/

function saveUnSubscribeMsg(msg) {

let date = new Date().toLocaleDateString()

const fs = require('fs')

const dir = './data'

if (!fs.existsSync(dir)) {

fs.mkdirSync(dir)

}

let path = `${dir}/${date}.txt`

let isExist = fs.existsSync(path)

if (isExist) {

fs.appendFileSync(path, msg)

} else {

fs.writeFileSync(path, msg)

}

}

function log(...args) {

console.log(...args, new Date().toLocaleString())

}

然后进行测试:

通过对MQserver的重启,均正常,然后将MQserver的机器的网断掉测试,发现了close事件并没有监听到,而是报了heartbeat超时的错误,从而程序直接退出了,于是又在代码中加入对error事件的监听:

amqp

.connect(MQ_CONFIG.url)

.then(conn => {

conn.on('error', e => {

reconnect(e)

})

conn.on('close', e => {

reconnect(e)

})

return conn

})

这下应该不会导致程序退出了吧,然而又引入了新的问题,当重启MQserver时,报了 ECONNECTRET

的错误,两个监听事件都监听到了,所以程序重连了两次,导致一个项目在MQserver上建立了两个连接,当再一次重启MQserver时,建立了四个连接!

这是很严重的错误,然而并不是所有时候两个监听事件都能监听到,比如 heartbeat

超时就只报 error

的错误,所有需要想出一个策略,让程序始终与MQserver之间只有一个连接。

采用声明一个变量,记录是不是正在连接

var isConnecting = false

如果已经在连接了,其他的重连都不做处理

function reconnect(e) {

if (!isConnecting) {

isConnecting = true

log(e.message)

log('lost RMQ connection')

cnt++

log(`正在第${cnt}次重新连接RMQ...`)

setTimeout(() => {

connect()

}, 10 * 1000)

}

}

连接上时将重连的标志设为 false

.then(conn => {

cnt = 0

log('connect RMQ OK')

isConnecting = false

于是乎,完整代码如下:

#!/usr/bin/env node

const MQ_CONFIG = require('./conf/rabbitmq')

const REDIS_CONFIG = require('./conf/redis')

const Utils = require('./lib/Utils')

const pubRedisCli = Utils.connectRedis(REDIS_CONFIG.url)

var amqp = require('amqplib')

var ex = MQ_CONFIG.ex

var patten = MQ_CONFIG.routing_key

var exType = MQ_CONFIG.ex_type

var q = MQ_CONFIG.q || 'signals'

var cnt = 0

var isConnecting = false

function connect() {

amqp

.connect(MQ_CONFIG.url)

.then(conn => {

conn.on('error', (e) => {

reconnect(e)

})

conn.on('close', e => {

reconnect(e)

})

return conn

})

.then(conn => {

cnt = 0

log('connect RMQ OK')

isConnecting = false

console.log(' [*] Waiting for signals. To exit press CTRL+C')

return conn.createChannel()

})

.then(ch => {

return ch.assertQueue(q, {

durable: true

})

.then(() => {

return ch.assertExchange(ex, exType, {

durable: true

})

})

.then(() => {

return ch.assertQueue(q, {

durable: true

})

})

.then(() => {

return ch.bindQueue(q, ex, patten)

})

.then(() => {

return ch.consume(q, (msg) => {

pubRedisCli.publish(msg.fields.routingKey, msg.content.toString(), function (err, reply) {

if (err) {

log(err)

ch.nack(msg)

} else {

if (reply !== 0) {

ch.ack(msg)

} else {

ch.ack(msg)

saveUnSubscribeMsg(msg.content.toString())

}

}

})

})

})

})

.catch(e => {

isConnecting = false

reconnect(e)

})

}

function reconnect(e) {

if (!isConnecting) {

isConnecting = true

log(e.message)

log('lost RMQ connection')

cnt++

log(`正在第${cnt}次重新连接RMQ...`)

setTimeout(() => {

connect()

}, 10 * 1000)

}

}

connect()

/**

* if signals didn't be subscribed, they would be saved to ./data dir

* @param {string} msg

*/

function saveUnSubscribeMsg(msg) {

let date = new Date().toLocaleDateString()

const fs = require('fs')

const dir = './data'

if (!fs.existsSync(dir)) {

fs.mkdirSync(dir)

}

let path = `${dir}/${date}.txt`

let isExist = fs.existsSync(path)

if (isExist) {

fs.appendFileSync(path, msg)

} else {

fs.writeFileSync(path, msg)

}

}

function log(...args) {

console.log(...args, new Date().toLocaleString())

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/264847.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Expression,挑起2006年最后的争论?

微软开始把传说中的Expression拿出来show了,这东西对微软来说具有战略意义,比IE7重要的多。要理解它的意义,首先要了解微软。 大家都知道Google的核心价值观之一就是“一切以用户为中心”(进而发展出现在最流行的“用户体验至上”)&#xff0…

[HTML]去除li前面的小黑点,和ul、LI部分属性

[转] 对于很多人用div来做网站时,总会用到,但在显示效果时前面总是会有一个小黑点,这个令很多人头痛,但又找不到要源,其它我们可以用以下方法来清除。[HTML]去除li前面的小黑点,和ul、LI部分属性[转] 对于很多人用div来…

堆栈认知——栈溢出实例(ret2shellcode)

参考:栈溢出实例–笔记二(ret2shellcode) 地址:https://qingmu.blog.csdn.net/article/details/119303513 目录1、栈溢出含义及栈结构2、ret2shellcode基本思路3、实战一下3.1、二进制程序如下3.2、分析调试查看栈3.3、编写payloa…

Glusterfs初试

Gluster的模式及介绍在此不表,这里只记录安装及配置过程。 1.整体环境 server1 : gfs1.cluster.com server2 : gfs2.cluster.com Client: 2.安装Gluster 下载软件https://access.redhat.com/downloads/content/186/ver3/rhel---7/3.4/x86_64/product-software 下…

如何查看光驱硬盘托架的尺寸_如何确定光驱位的硬盘托架的大小尺寸和接口

如果你想在电脑光驱位安装固态硬盘,前提必须要确定光驱位硬盘托架的类型,如大小尺寸和接口参数。下面将介绍大家如何确定相关参数,其适合于联想,华硕,惠普等电脑品牌。1,最合理的确定方法是到Windows 设备管…

[Android Pro] ant 编译android工程

参考文章: http://blog.csdn.net/xyz_lmn/article/details/7268582?reload http://hubingforever.blog.163.com/blog/static/1710405792013220840347/ http://www.cnblogs.com/tankaixiong/archive/2010/11/24/1887156.html 一,准备ant ant 官网可下载h…

堆栈认知——堆简介

参考:Linux笔记–堆简介 地址:https://qingmu.blog.csdn.net/article/details/119510863 目录1、前言2、堆的由来3、Linux中堆简介4、堆分类4.1、请求堆4.2、释放堆5、内存分配背后的系统调用6、堆相关数据结构7、堆的申请8、调试验证1、前言 当前针对各…

(0.2.6)Mysql安装——编译安装

参考我的另一篇文章:https://www.cnblogs.com/gered/p/9539333.html转载于:https://www.cnblogs.com/gered/p/10359289.html

ubuntu查看gpu使用率_如何监控GPU卡的使用率(Linux)

Linux系统,在程序运行的时候,如何实时监控GPU卡的使用率呢?首先,你需要安装好CUDA。然后,你需要将CUDA的bin目录加入到PATH中。方法是在终端窗口输入如下命令:# vi ~/.bashrc将 /usr/local/cuda/bin 加入到…

sqlserver2000 mdf 文件导入

在Enterprise Manager中菜单\工具\sql analyze 把数据库的数据文件(*.mdf)和日志文件(*.ldf)都拷贝到目的服务器,在SQL sp_attach_db dbname test, filename1 d:\mssql7\data\test_data.mdf, …

如何为 Horizon View 配置 VMware VSAN?

原文:http://myvirtualcloud.net/?p5440注明:本文内容基于 VMwareVSAN beta 版本撰写,请访问http://www.vmware.com/products/virtual-san/获得有关正式版本的更新信息。我已经在前面的文章中讨论了VSAN 给 Horizon View 带来的益处&#xf…

配置vscode远程免密登入Linux服务器

视频教程:https://www.bilibili.com/video/BV1s64y167cM?vd_sourcecc0e43b449de7e8663ca1f89dd5fea7d 参考:配置vscode远程免密登入Linux服务器 地址:https://blog.csdn.net/weixin_54178481/article/details/123977675?spm1001.2014.3001.…

P1552 [APIO2012]派遣

链接 https://www.luogu.org/problemnew/show/P1552 思路 忍者数量肯定越多越好 那就从下到上的合并它的孩子 左偏树的话 顺便维护一个tot,大头堆,如果tot大于了m,把大的删掉 如果左偏树忘干净了或者没学的话 线段树合并也是个不错的选择 直接…

Wss 3.0安装指南(一)

Wss 3.0安装指南(一) "基本" 模式 WSS3.0的安装可分为独立服务器安装和服务器场安装,前者是将所有的服务和应用安装在一台Server 上,后者则是…

java web 程序---javabean实例--登陆界面并显示用户名和密码

重点:注意大小写,不注意细节,这点小事,还需要请教 发现一个问题,也是老师当时写的时候,发现代码没错,但是就是运行问题。 大家看,那个java类,我们要求是所有属性均为私有…

智能五子棋基本思路

前些天闲时写的,在学数据结构的时拿来练手的.没技术含量,最有技术含量的AI部分,我是看别人(园子里叫二十四生的)的算法改的.刚弄了一下午小程序弄不过去,头疼,现无聊的紧,闲着发着玩.当消遣主要发下AI核心算法.有兴趣的同学用VB,VC.VC#都可以一起做着玩.保持对编程的兴趣.其它没…

webpack 4.0 配置文件 webpack.config.js文件的放置位置

一般webpack.config.js是默认放在根目录的,不在根目录的时候需要在package.json中制定位置,我的配置文件目录是config/webpack.config.js,在package.json文件中的配置为: "scripts": { "build": "webpack --mode p…

Python-memcached的基本使用 - Flynewton成长点滴 - 开源中国社区

Python-memcached的基本使用 - Flynewton成长点滴 - 开源中国社区Python-memcached的基本使用 发表于3年前(2010-12-04 00:02) 阅读(9601) | 评论(3) 12人收藏此文章, 我要收藏 赞1 python memcached 想学Python,又想…

快速构建ceph可视化监控系统

https://my.oschina.net/colben/blog/1844602 https://my.oschina.net/u/3626804/blog/1859613转载于:https://www.cnblogs.com/diyunpeng/p/10363183.html

阳奉阴违(转载)

这个词不是好词,大家都这么看,但现在生活里却经常要照它做,表面上应付一下,表示一下,然后再怎么做就随便;如果你不表示这一下,那就怎么也不行。 MSN和Yahoo就已经理解了这个问题,并且…