一、前言
在android前端中接入了rabbitmq消息队列来处理业务,在手机网络环境错综复杂,网络信号不稳定,可能导致mq的频繁断开与连接,在日志中,发现有很多这样的日志,java.net.SocketException: Connection reset,接下来通过源码调试来分析下该错误可能产生的原因。MissedHeartbeatException则是在客户端在多次未收到服务端的消息后,认为服务端已经断开,则抛出该异常。
二、分析
java.net.SocketException: Connection reset在网络搜了一圈,基本上说的是客户端连接着mq,但是服务端已经断开与客户端的连接,此时客户端还在执行接收数据操作,就会发生该错误。
三、MQ的心跳机制
MQ在创建连接的时候则会进行初始化开启心跳服务initializeHeartbeatSender();
private void initializeHeartbeatSender() {this._heartbeatSender = new HeartbeatSender(_frameHandler, heartbeatExecutor, threadFactory);}
在rabbitmq中,客户端会间隔1/2的心跳周期来定时发送心跳
/*** Sets the heartbeat in seconds.*/public void setHeartbeat(int heartbeatSeconds) {synchronized(this.monitor) {if(this.shutdown) {return;}// cancel any existing heartbeat taskif(this.future != null) {this.future.cancel(true);this.future = null;}if (heartbeatSeconds > 0) {// wake every heartbeatSeconds / 2 to avoid the worst case// where the last activity comes just after the last heartbeatlong interval = SECONDS.toNanos(heartbeatSeconds) / 2;ScheduledExecutorService executor = createExecutorIfNecessary();Runnable task = new HeartbeatRunnable(interval);this.future = executor.scheduleAtFixedRate(task, interval, interval, TimeUnit.NANOSECONDS);}}}
发送心跳,此时如果发生IO异常,这边没处理
private final class HeartbeatRunnable implements Runnable {private final long heartbeatNanos;private HeartbeatRunnable(long heartbeatNanos) {this.heartbeatNanos = heartbeatNanos;}@Overridepublic void run() {try {LogUtils.log("心跳定时器发送");long now = System.nanoTime();if (now > (lastActivityTime + this.heartbeatNanos)) {frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));frameHandler.flush();}} catch (IOException e) {// ignore}}}
结合官方文档和客户端源代码,心跳默认超时时间是60秒,并且每隔30秒进行一次心跳检查,如果超过两次心跳检查都没有确定节点检查,则会关闭连接
3.1测试
测试用例中, 将心跳周期设置为30秒
public static void main(String[] args) {String queueName="123456";ExecutorService executor= Executors.newFixedThreadPool(10); ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.0.11.211");factory.setPort(5672);factory.setUsername("admin");factory.setVirtualHost("/");factory.setPassword("admin");factory.setConnectionTimeout(5000); factory.setAutomaticRecoveryEnabled(false); factory.setTopologyRecoveryEnabled(false); factory.setRequestedHeartbeat(30);executor.submit(() -> {try {Connection connection = factory.newConnection();LogUtils.log("连接创建成功");connection.addShutdownListener(cause -> {LogUtils.log("断开连接:"+cause.getMessage()+" msg=>:"+cause.getCause());});Channel channel = connection.createChannel();LogUtils.log("创建通道成功:" + channel.getChannelNumber());channel.basicQos(30);channel.basicConsume(queueName, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {try {String message = new String(body, "UTF-8");LogUtils.log("消息:"+message);channel.basicReject(envelope.getDeliveryTag(), false);} catch (Exception e) {LogUtils.log("消费者异常,e:" + e.getMessage()+" consumerTag:"+consumerTag);}}});channel.addShutdownListener(cause -> {LogUtils.log("消费者断开连接:" + cause.getMessage() + " msg=>:" + cause.getCause().toString());});} catch (Exception e) {LogUtils.log("发生异常:"+e);e.printStackTrace();}});}
然后将心跳的发送业务关闭
private final class HeartbeatRunnable implements Runnable {private final long heartbeatNanos;private HeartbeatRunnable(long heartbeatNanos) {this.heartbeatNanos = heartbeatNanos;}@Overridepublic void run() {try {LogUtils.log("心跳定时器发送");long now = System.nanoTime();if (now > (lastActivityTime + this.heartbeatNanos)) {// frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));// frameHandler.flush();}} catch (IOException e) {// ignore}}}
运行后,如下:
2023-09-25 09:35:02.976=>连接创建成功
2023-09-25 09:35:02.987=>创建通道成功:1
2023-09-25 09:35:17.948=>心跳定时器发送
2023-09-25 09:35:32.948=>心跳定时器发送
2023-09-25 09:35:47.949=>心跳定时器发送
2023-09-25 09:36:02.949=>心跳定时器发送
2023-09-25 09:36:17.948=>心跳定时器发送
2023-09-25 09:36:32.948=>心跳定时器发送
2023-09-25 09:36:32.960=>消费者断开连接:connection error msg=>:java.net.SocketException: Connection reset
2023-09-25 09:36:32.962=>断开连接:connection error msg=>:java.net.SocketException: Connection reset
结果分析,服务端在3个心跳周期未检测到客户端的心跳后,则会默认客户端已经断线,则将其断开。
四、MissedHeartbeatException分析
在客户端连接MQ成功后,则开始数据服务的读取this._frameHandler.initialize(this);
private void startIoLoops() {if (executorService == null) {Thread nioThread = Environment.newThread(threadFactory,new NioLoop(socketChannelFrameHandlerFactory.nioParams, this),"rabbitmq-nio");nioThread.start();} else {this.executorService.submit(new NioLoop(socketChannelFrameHandlerFactory.nioParams, this));}}
读取线程业务方法,如果frame不为空,则丢失心跳这边重置为0次,反之则开始计数丢失次数
private void readFrame(Frame frame) throws IOException {LogUtils.log("开始读取数据");if (frame != null) {_missedHeartbeats = 0;if (frame.getType() == AMQP.FRAME_HEARTBEAT) {LogUtils.log("读取数据:心跳"); } else {if (frame.getChannel() == 0) { // the special channel_channel0.handleFrame(frame);} else {if (isOpen()) { ChannelManager cm = _channelManager;if (cm != null) {ChannelN channel;try {channel = cm.getChannel(frame.getChannel());} catch(UnknownChannelException e) { LOGGER.info("Received a frame on an unknown channel, ignoring it");return;}channel.handleFrame(frame);}}}}} else { LogUtils.log("开始读取数据frame为空"); handleSocketTimeout();}}
超时机制,如果进入该业务,则_missedHeartbeats 会自动加1,如果超过一定次数,则会跑出MissedHeartbeatException
private void handleSocketTimeout() throws SocketTimeoutException {if (_inConnectionNegotiation) {throw new SocketTimeoutException("Timeout during Connection negotiation");}if (_heartbeat == 0) { // No heart-beatingreturn;}LogUtils.log("handleSocketTimeout-------_missedHeartbeats心跳:"+_missedHeartbeats);if (++_missedHeartbeats > (1)) {throw new MissedHeartbeatException("Heartbeat missing with heartbeat = " +_heartbeat + " seconds, for " + this.getHostAddress());}}
为了方便测试,将心跳设置为10s,将_missedHeartbeats 判断大于1则抛出MissedHeartbeatException异常
4.1测试
2023-09-25 10:21:16.565=>开始读取数据
2023-09-25 10:21:16.651=>开始读取数据
2023-09-25 10:21:16.658=>开始读取数据
2023-09-25 10:21:16.658=>连接创建成功
2023-09-25 10:21:16.669=>开始读取数据
2023-09-25 10:21:16.670=>创建通道成功:1
2023-09-25 10:21:16.671=>开始读取数据
2023-09-25 10:21:16.675=>开始读取数据
2023-09-25 10:21:19.177=>开始读取数据
2023-09-25 10:21:19.177=>开始读取数据frame为空
2023-09-25 10:21:19.177=>handleSocketTimeout-------_missedHeartbeats心跳:0
2023-09-25 10:21:21.659=>开始读取数据
2023-09-25 10:21:21.659=>读取数据:心跳
2023-09-25 10:21:24.160=>开始读取数据
2023-09-25 10:21:24.160=>开始读取数据frame为空
2023-09-25 10:21:24.161=>handleSocketTimeout-------_missedHeartbeats心跳:0
2023-09-25 10:21:26.659=>开始读取数据
2023-09-25 10:21:26.660=>读取数据:心跳
2023-09-25 10:21:29.161=>开始读取数据
2023-09-25 10:21:29.161=>开始读取数据frame为空
2023-09-25 10:21:29.161=>handleSocketTimeout-------_missedHeartbeats心跳:0
2023-09-25 10:21:31.661=>开始读取数据
2023-09-25 10:21:31.661=>读取数据:心跳
2023-09-25 10:21:34.161=>开始读取数据
2023-09-25 10:21:34.161=>开始读取数据frame为空
2023-09-25 10:21:34.161=>handleSocketTimeout-------_missedHeartbeats心跳:0
2023-09-25 10:21:36.662=>开始读取数据
2023-09-25 10:21:36.662=>开始读取数据frame为空
2023-09-25 10:21:36.662=>handleSocketTimeout-------_missedHeartbeats心跳:1
2023-09-25 10:21:36.668=>消费者断开连接:connection error msg=>:com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heartbeat = 10 seconds, for 192.0.11.211
2023-09-25 10:21:36.671=>断开连接:connection error msg=>:com.rabbitmq.client.MissedHeartbeatException: Heartbeat missing with heartbeat = 10 seconds, for 192.0.11.211
上面可知,服务端会在心跳周期向客户端发送心跳,如果在客户端没收到任何消息时间段内,MissedHeartbeatException超过一定次数,则将跑出该异常,官方默认是2*4=8