编程rookie, 如有错误请指出 ☞:
253065903@qq.com
RabbitMQ
Node.js 客户端( AMQP 0-9-1 V0.5.2
)自动重连
重启策略
开始找解决方案:
通过查看AMQP的源码,发现没有reconnect的选项
然后上GitHub上看有没有人提出类似的问题 github repo
,通过输入 reconnect
搜索issue区找到题为 Support Auto-reconnection
的 issue
,发现这个问题是早在 2013
年就提出来的,可是现在还是 open
的状态!
在这个Issue区发现有一个解决方案还是可以实践一下的:
function connectRMQ() {
amqp.connect(config.rabbitmq.URI).then(function(conn) {
conn.on('close', function() {
console.error('Lost connection to RMQ. Reconnecting in 60 seconds...');
return setTimeout(connectRMQ, 60 * 1000);
});
return conn.createChannel().then(function(ch) {
var ok = ch.assertQueue(config.rabbitmq.queue, {durable: true});
ok = ok.then(function() {
ch.prefetch(1);
ch.consume(config.rabbitmq.queue, doWork, {noAck: false});
});
return ok.then(function() {
console.log(" [*] Waiting in %s.", config.rabbitmq.queue);
});
function doWork(msg) {
var body = msg.content.toString();
console.log(" [x] Received %s", body);
setTimeout(function() {
ch.ack(msg);
}, config.rabbitmq.timeout);
}
});
}).then(null, function() {
setTimeout(connectRMQ, 10 * 1000);
return console.log('connection failed');
});
}
connectRMQ();
上述的解决方案是在建立连接之后对连接添加 close
的监听事件,当 close
事件触发,
或者连接出错以及之后的步骤出错都在10s之后重新调用方法本身实现重连
还有一种简单粗暴的方法,监听 close
、 error
事件, 有错误就抛出来,然后依靠外部的守护程序将此客户端重启
实践
按照Issue区发现的解决方案进行实践,修改之前的代码
#!/usr/bin/env node
const MQ_CONFIG = require('./conf/rabbitmq')
const REDIS_CONFIG = require('./conf/redis')
const Utils = require('./lib/Utils')
const pubRedisCli = Utils.connectRedis(REDIS_CONFIG.url)
var amqp = require('amqplib')
var ex = MQ_CONFIG.ex
var patten = MQ_CONFIG.routing_key
var exType = MQ_CONFIG.ex_type
var q = MQ_CONFIG.q || 'signals'
var cnt = 0
function connect() {
amqp
.connect(MQ_CONFIG.url)
.then(conn => {
conn.on('close', e => {
reconnect(e)
})
return conn
})
.then(conn => {
cnt = 0
log('connect RMQ OK')
console.log(' [*] Waiting for signals. To exit press CTRL+C')
return conn.createChannel()
})
.then(ch => {
return ch.assertQueue(q, {
durable: true
})
.then(() => {
return ch.assertExchange(ex, exType, {
durable: true
})
})
.then(() => {
return ch.assertQueue(q, {
durable: true
})
})
.then(() => {
return ch.bindQueue(q, ex, patten)
})
.then(() => {
return ch.consume(q, (msg) => {
pubRedisCli.publish(msg.fields.routingKey, msg.content.toString(), function (err, reply) {
if (err) {
log(err)
ch.nack(msg)
} else {
if (reply !== 0) {
ch.ack(msg)
} else {
ch.ack(msg)
saveUnSubscribeMsg(msg.content.toString())
}
}
})
})
})
})
.catch(e => {
reconnect(e)
})
}
function reconnect(e) {
log(e.message)
log('lost RMQ connection')
cnt++
log(`正在第${cnt}次重新连接RMQ...`)
setTimeout(() => {
connect()
}, 10 * 1000)
}
connect()
/**
* if signals didn't be subscribed, they would be saved to ./data dir
* @param {string} msg
*/
function saveUnSubscribeMsg(msg) {
let date = new Date().toLocaleDateString()
const fs = require('fs')
const dir = './data'
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir)
}
let path = `${dir}/${date}.txt`
let isExist = fs.existsSync(path)
if (isExist) {
fs.appendFileSync(path, msg)
} else {
fs.writeFileSync(path, msg)
}
}
function log(...args) {
console.log(...args, new Date().toLocaleString())
}
然后进行测试:
通过对MQserver的重启,均正常,然后将MQserver的机器的网断掉测试,发现了close事件并没有监听到,而是报了heartbeat超时的错误,从而程序直接退出了,于是又在代码中加入对error事件的监听:
amqp
.connect(MQ_CONFIG.url)
.then(conn => {
conn.on('error', e => {
reconnect(e)
})
conn.on('close', e => {
reconnect(e)
})
return conn
})
这下应该不会导致程序退出了吧,然而又引入了新的问题,当重启MQserver时,报了 ECONNECTRET
的错误,两个监听事件都监听到了,所以程序重连了两次,导致一个项目在MQserver上建立了两个连接,当再一次重启MQserver时,建立了四个连接!
这是很严重的错误,然而并不是所有时候两个监听事件都能监听到,比如 heartbeat
超时就只报 error
的错误,所有需要想出一个策略,让程序始终与MQserver之间只有一个连接。
采用声明一个变量,记录是不是正在连接
var isConnecting = false
如果已经在连接了,其他的重连都不做处理
function reconnect(e) {
if (!isConnecting) {
isConnecting = true
log(e.message)
log('lost RMQ connection')
cnt++
log(`正在第${cnt}次重新连接RMQ...`)
setTimeout(() => {
connect()
}, 10 * 1000)
}
}
连接上时将重连的标志设为 false
.then(conn => {
cnt = 0
log('connect RMQ OK')
isConnecting = false
于是乎,完整代码如下:
#!/usr/bin/env node
const MQ_CONFIG = require('./conf/rabbitmq')
const REDIS_CONFIG = require('./conf/redis')
const Utils = require('./lib/Utils')
const pubRedisCli = Utils.connectRedis(REDIS_CONFIG.url)
var amqp = require('amqplib')
var ex = MQ_CONFIG.ex
var patten = MQ_CONFIG.routing_key
var exType = MQ_CONFIG.ex_type
var q = MQ_CONFIG.q || 'signals'
var cnt = 0
var isConnecting = false
function connect() {
amqp
.connect(MQ_CONFIG.url)
.then(conn => {
conn.on('error', (e) => {
reconnect(e)
})
conn.on('close', e => {
reconnect(e)
})
return conn
})
.then(conn => {
cnt = 0
log('connect RMQ OK')
isConnecting = false
console.log(' [*] Waiting for signals. To exit press CTRL+C')
return conn.createChannel()
})
.then(ch => {
return ch.assertQueue(q, {
durable: true
})
.then(() => {
return ch.assertExchange(ex, exType, {
durable: true
})
})
.then(() => {
return ch.assertQueue(q, {
durable: true
})
})
.then(() => {
return ch.bindQueue(q, ex, patten)
})
.then(() => {
return ch.consume(q, (msg) => {
pubRedisCli.publish(msg.fields.routingKey, msg.content.toString(), function (err, reply) {
if (err) {
log(err)
ch.nack(msg)
} else {
if (reply !== 0) {
ch.ack(msg)
} else {
ch.ack(msg)
saveUnSubscribeMsg(msg.content.toString())
}
}
})
})
})
})
.catch(e => {
isConnecting = false
reconnect(e)
})
}
function reconnect(e) {
if (!isConnecting) {
isConnecting = true
log(e.message)
log('lost RMQ connection')
cnt++
log(`正在第${cnt}次重新连接RMQ...`)
setTimeout(() => {
connect()
}, 10 * 1000)
}
}
connect()
/**
* if signals didn't be subscribed, they would be saved to ./data dir
* @param {string} msg
*/
function saveUnSubscribeMsg(msg) {
let date = new Date().toLocaleDateString()
const fs = require('fs')
const dir = './data'
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir)
}
let path = `${dir}/${date}.txt`
let isExist = fs.existsSync(path)
if (isExist) {
fs.appendFileSync(path, msg)
} else {
fs.writeFileSync(path, msg)
}
}
function log(...args) {
console.log(...args, new Date().toLocaleString())
}