物联网mqtt网关搭建背后的技术原理

前言

物联网是现在比较热门的软件领域,众多物联网厂商都有自己的物联网平台,而物联网平台其中一个核心的模块就是Mqtt网关。这篇文章的目的是手把手教大家写书写一个mqtt网关,后端存储支持Kafka/Pulsar,支持mqtt 连接、断链、发送消息、订阅消息。技术选型:

  • Netty java最流行的网络框架
  • netty-codec-mqtt netty的子项目,mqtt编解码插件
  • Pulsar/Kafka 流行的消息中间件作为后端存储

核心pom依赖如下

        <dependency><groupId>io.netty</groupId><artifactId>netty-codec-mqtt</artifactId></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-common</artifactId></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-transport</artifactId></dependency><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-original</artifactId><version>${pulsar.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>${mqtt-client.version}</version><scope>test</scope></dependency>

软件参数设计

软件参数可谓是非常常见,复杂的开源项目,参数甚至可以达到上百个、配置文件长达数千行。我们需要的配置有

MqttServer监听的端口

监听端口的配置即使是写demo也非常必要,常常用在单元测试中,由于单元测试跑完之后,即使网络服务器关闭,操作系统也不会立即释放端口,所以单元测试的时候指定随机端口非常关键,在java中,我们可以通过这样的工具类来获取一个空闲的端口。未配置的话,我们就使用mqtt的默认端口1883。

package io.github.protocol.mqtt.broker.util;import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.ServerSocket;public class SocketUtil {public static int getFreePort() {try (ServerSocket serverSocket = new ServerSocket(0)) {return serverSocket.getLocalPort();} catch (IOException e) {throw new UncheckedIOException(e);}}}

后端存储配置

我们的mqtt网关是没有可靠的存储能力的,依赖后端的消息中间件来做持久化处理。后端规划支持Pulsar、Kafka两种类型。定义枚举类如下

public enum ProcessorType {KAFKA,PULSAR,
}

对应的KafkaProcessorConfig、PulsarProcessorConfig比较简单,包含基础的连接地址即可,如果后续要做性能调优、安全,这块还是会有更多的配置项

@Setter
@Getter
public class KafkaProcessorConfig {private String bootstrapServers = "localhost:9092";public KafkaProcessorConfig() {}
}
@Setter
@Getter
public class PulsarProcessorConfig {private String httpUrl = "http://localhost:8080";private String serviceUrl = "pulsar://localhost:6650";public PulsarProcessorConfig() {}
}

启动netty MqttServer

我们通过netty启动一个mqttServer,添加mqtt解码器

package io.github.protocol.mqtt.broker;import io.github.protocol.mqtt.broker.processor.KafkaProcessor;
import io.github.protocol.mqtt.broker.processor.KafkaProcessorConfig;
import io.github.protocol.mqtt.broker.processor.MqttProcessor;
import io.github.protocol.mqtt.broker.processor.PulsarProcessor;
import io.github.protocol.mqtt.broker.processor.PulsarProcessorConfig;
import io.github.protocol.mqtt.broker.util.SocketUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class MqttServer {private final MqttServerConfig mqttServerConfig;public MqttServer() {this(new MqttServerConfig());}public MqttServer(MqttServerConfig mqttServerConfig) {this.mqttServerConfig = mqttServerConfig;if (mqttServerConfig.getPort() == 0) {mqttServerConfig.setPort(SocketUtil.getFreePort());}}public void start() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();// decoderp.addLast(new MqttDecoder());p.addLast(MqttEncoder.INSTANCE);}});// Start the server.ChannelFuture f = b.bind(mqttServerConfig.getPort()).sync();// Wait until the server socket is closed.f.channel().closeFuture().sync();} finally {// Shut down all event loops to terminate all threads.bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}private MqttProcessor processor(MqttServerConfig config) {return switch (config.getProcessorType()) {case KAFKA -> new KafkaProcessor(config.getMqttAuth(), config.getKafkaProcessorConfig());case PULSAR -> new PulsarProcessor(config.getMqttAuth(), config.getPulsarProcessorConfig());};}public int getPort() {return mqttServerConfig.getPort();}}

MqttserverStarter.java

我们写一个简单的main函数用来启动mqttServer,方便调测

package io.github.protocol.mqtt.broker;public class MqttServerStarter {public static void main(String[] args) throws Exception {new MqttServer().start();}}

客户端使用eclipse mqtt client进行测试

package io.github.protocol.mqtt;import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;@Log4j2
public class MqttClientPublishExample {public static void main(String[] args) throws Exception {String topic = "MQTT Examples";String content = "Message from MqttPublishExample";int qos = 2;String broker = "tcp://127.0.0.1:1883";String clientId = "JavaSample";MemoryPersistence persistence = new MemoryPersistence();try {MqttClient sampleClient = new MqttClient(broker, clientId, persistence);MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);log.info("Connecting to broker: {}", broker);sampleClient.connect(connOpts);log.info("Connected");log.info("Publishing message: {}", content);MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);sampleClient.publish(topic, message);log.info("Message published");sampleClient.disconnect();log.info("Disconnected");System.exit(0);} catch (MqttException me) {log.error("reason {} msg {}", me.getReasonCode(), me.getMessage(), me);}}}

然后我们先运行MqttServer,再运行MqttClient,发现MqttClient卡住了

Connecting to broker: tcp://127.0.0.1:1883

这是为什么呢,我们通过抓包发现仅仅只有客户端发送了Mqtt connect信息,服务端并没有响应

image.png

但是根据mqtt标准协议,发送Connect消息,必须要有ConnAck响应

image.png

所以我们需要在接收到Connect后,返回connAck消息。我们创建一个MqttHandler,让他继承ChannelInboundHandlerAdapter, 用来接力MqttDecoder解码完成后的消息,这里要重点继承其中的channelRead方法,以及channelInactive方法,用来释放断链时需要释放的资源

package com.github.shoothzj.mqtt;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class MqttHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {super.channelRead(ctx, msg);}}

然后把这个handler加入到netty的职责链中,放到解码器的后面

image.png

在mqtt handler中插入我们的代码

    @Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {super.channelRead(ctx, msg);if (msg instanceof MqttConnectMessage) {handleConnect(ctx, (MqttConnectMessage) msg);} else {log.error("Unsupported type msg [{}]", msg);}}private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {log.info("connect msg is [{}]", connectMessage);}

打印出connectMessage如下

[MqttConnectMessage[fixedHeader=MqttFixedHeader[messageType=CONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=22], variableHeader=MqttConnectVariableHeader[name=MQTT, version=4, hasUserName=false, hasPassword=false, isWillRetain=false, isWillFlag=false, isCleanSession=true, keepAliveTimeSeconds=60], payload=MqttConnectPayload[clientIdentifier=JavaSample, willTopic=null, willMessage=null, userName=null, password=null]]]

通常,mqtt connect message中会包含qos、用户名、密码等信息,由于我们启动客户端的时候也没有携带用户名和密码,这里获取到的都为null,我们先不校验这些消息,直接给客户端返回connack消息,代表连接成功

        final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build();ctx.channel().writeAndFlush(ackMessage);

我们再运行起Server和Client,随后可以看到已经走过了Connect阶段,进入了publish message过程,接下来我们再实现更多的其他场景

image-20201218204302720

附上此阶段的MqttHandler代码

package com.github.shoothzj.mqtt;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import lombok.extern.slf4j.Slf4j;import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;@Slf4j
public class MqttHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {super.channelRead(ctx, msg);if (msg instanceof MqttConnectMessage) {handleConnect(ctx, (MqttConnectMessage) msg);} else {log.error("Unsupported type msg [{}]", msg);}}private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {log.info("connect msg is [{}]", connectMessage);final MqttFixedHeader fixedHeader = connectMessage.fixedHeader();final MqttConnectVariableHeader variableHeader = connectMessage.variableHeader();final MqttConnectPayload connectPayload = connectMessage.payload();final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build();ctx.channel().writeAndFlush(ackMessage);}}

我们当前把所有的逻辑都放在MqttHandler里面,不方便后续的扩展。抽象出一个MqttProcessor接口来处理具体的请求,MqttHandler负责解析MqttMessage的类型并分发。MqttProcess接口设计如下

package io.github.protocol.mqtt.broker.processor;import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;public interface MqttProcessor {void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception;void processConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg) throws Exception;void processPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) throws Exception;void processPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception;void processPubRec(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;void processPubRel(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;void processPubComp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception;void processSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg) throws Exception;void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception;void processUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg) throws Exception;void processPingReq(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;void processPingResp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;void processDisconnect(ChannelHandlerContext ctx) throws Exception;void processAuth(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;}

我们允许这些方法抛出异常,当遇到极难处理的故障时,把mqtt连接断掉(如后端存储故障),等待客户端的重连。

MqttHandler中来调用MqttProcessor,相关MqttHandler代码如下

        Preconditions.checkArgument(message instanceof MqttMessage);MqttMessage msg = (MqttMessage) message;try {if (msg.decoderResult().isFailure()) {Throwable cause = msg.decoderResult().cause();if (cause instanceof MqttUnacceptableProtocolVersionException) {// Unsupported protocol versionMqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.CONNACK,false, MqttQoS.AT_MOST_ONCE, false, 0),new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION,false), null);ctx.writeAndFlush(connAckMessage);log.error("connection refused due to invalid protocol, client address [{}]",ctx.channel().remoteAddress());ctx.close();return;} else if (cause instanceof MqttIdentifierRejectedException) {// ineligible clientIdMqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.CONNACK,false, MqttQoS.AT_MOST_ONCE, false, 0),new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,false), null);ctx.writeAndFlush(connAckMessage);log.error("ineligible clientId, client address [{}]", ctx.channel().remoteAddress());ctx.close();return;}throw new IllegalStateException(msg.decoderResult().cause().getMessage());}MqttMessageType messageType = msg.fixedHeader().messageType();if (log.isDebugEnabled()) {log.debug("Processing MQTT Inbound handler message, type={}", messageType);}switch (messageType) {case CONNECT:Preconditions.checkArgument(msg instanceof MqttConnectMessage);processor.processConnect(ctx, (MqttConnectMessage) msg);break;case CONNACK:Preconditions.checkArgument(msg instanceof MqttConnAckMessage);processor.processConnAck(ctx, (MqttConnAckMessage) msg);break;case PUBLISH:Preconditions.checkArgument(msg instanceof MqttPublishMessage);processor.processPublish(ctx, (MqttPublishMessage) msg);break;case PUBACK:Preconditions.checkArgument(msg instanceof MqttPubAckMessage);processor.processPubAck(ctx, (MqttPubAckMessage) msg);break;case PUBREC:processor.processPubRec(ctx, msg);break;case PUBREL:processor.processPubRel(ctx, msg);break;case PUBCOMP:processor.processPubComp(ctx, msg);break;case SUBSCRIBE:Preconditions.checkArgument(msg instanceof MqttSubscribeMessage);processor.processSubscribe(ctx, (MqttSubscribeMessage) msg);break;case SUBACK:Preconditions.checkArgument(msg instanceof MqttSubAckMessage);processor.processSubAck(ctx, (MqttSubAckMessage) msg);break;case UNSUBSCRIBE:Preconditions.checkArgument(msg instanceof MqttUnsubscribeMessage);processor.processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);break;case UNSUBACK:Preconditions.checkArgument(msg instanceof MqttUnsubAckMessage);processor.processUnsubAck(ctx, (MqttUnsubAckMessage) msg);break;case PINGREQ:processor.processPingReq(ctx, msg);break;case PINGRESP:processor.processPingResp(ctx, msg);break;case DISCONNECT:processor.processDisconnect(ctx);break;case AUTH:processor.processAuth(ctx, msg);break;default:throw new UnsupportedOperationException("Unknown MessageType: " + messageType);}} catch (Throwable ex) {ReferenceCountUtil.safeRelease(msg);log.error("Exception was caught while processing MQTT message, ", ex);ctx.close();}

这里的代码,主要是针对MqttMessage的不同类型,调用MqttProcessor的不同方法,值得一提的有两点

  • 提前判断了一些解码异常,fast fail
  • 全局捕获异常,并进行断链处理

维护MqttSession

维护Mqtt会话的session,主要用来持续跟踪客户端会话信息,跟踪在系统中占用的资源等,考虑到无论是何种后端实现,都需要维护Mqtt的Session,我们构筑一个AbstractMqttProcessor来维护MqttSession

package io.github.protocol.mqtt.broker.processor;import io.github.protocol.mqtt.broker.MqttSessionKey;
import io.github.protocol.mqtt.broker.auth.MqttAuth;
import io.github.protocol.mqtt.broker.util.ChannelUtils;
import io.github.protocol.mqtt.broker.util.MqttMessageUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;import java.util.stream.IntStream;@Slf4j
public abstract class AbstractProcessor implements MqttProcessor {protected final MqttAuth mqttAuth;public AbstractProcessor(MqttAuth mqttAuth) {this.mqttAuth = mqttAuth;}@Overridepublic void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception {String clientId = msg.payload().clientIdentifier();String username = msg.payload().userName();byte[] pwd = msg.payload().passwordInBytes();if (StringUtils.isBlank(clientId) || StringUtils.isBlank(username)) {MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.CONNACK,false, MqttQoS.AT_MOST_ONCE, false, 0),new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,false), null);ctx.writeAndFlush(connAckMessage);log.error("the clientId username pwd cannot be empty, client address[{}]", ctx.channel().remoteAddress());ctx.close();return;}if (!mqttAuth.connAuth(clientId, username, pwd)) {MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.CONNACK,false, MqttQoS.AT_MOST_ONCE, false, 0),new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD,false), null);ctx.writeAndFlush(connAckMessage);log.error("the clientId username pwd cannot be empty, client address[{}]", ctx.channel().remoteAddress());ctx.close();return;}MqttSessionKey mqttSessionKey = new MqttSessionKey();mqttSessionKey.setUsername(username);mqttSessionKey.setClientId(clientId);ChannelUtils.setMqttSession(ctx.channel(), mqttSessionKey);log.info("username {} clientId {} remote address {} connected",username, clientId, ctx.channel().remoteAddress());onConnect(mqttSessionKey);MqttConnAckMessage mqttConnectMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.CONNACK,false, MqttQoS.AT_MOST_ONCE, false, 0),new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false),null);ctx.writeAndFlush(mqttConnectMessage);}protected void onConnect(MqttSessionKey mqttSessionKey) {}@Overridepublic void processConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg) throws Exception {MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());if (mqttSession == null) {log.error("conn ack, client address {} not authed", ctx.channel().remoteAddress());ctx.close();}}@Overridepublic void processPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) throws Exception {MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());if (mqttSession == null) {log.error("publish, client address {} not authed", ctx.channel().remoteAddress());ctx.close();return;}if (msg.fixedHeader().qosLevel() == MqttQoS.FAILURE) {log.error("failure. clientId {}, username {} ", mqttSession.getClientId(), mqttSession.getUsername());return;}if (msg.fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) {log.error("does not support QoS2 protocol. clientId {}, username {} ",mqttSession.getClientId(), mqttSession.getUsername());return;}onPublish(ctx, mqttSession, msg);}protected void onPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,MqttPublishMessage msg) throws Exception {}@Overridepublic void processPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception {MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());if (mqttSession == null) {log.error("pub ack, client address {} not authed", ctx.channel().remoteAddress());ctx.close();}}@Overridepublic void processPubRec(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());if (mqttSession == null) {log.error("pub rec, client address {} not authed", ctx.channel().remoteAddress());ctx.close();}}@Overridepublic void processPubRel(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());if (mqttSession == null) {log.error("pub rel, client address {} not authed", ctx.channel().remoteAddress());ctx.close();}}@Overridepublic void processPubComp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());if (mqttSession == null) {log.error("pub comp, client address {} not authed", ctx.channel().remoteAddress());ctx.close();}}@Overridepublic void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception {MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());if (mqttSession == null) {log.error("sub, client address {} not authed", ctx.channel().remoteAddress());ctx.close();}onSubscribe(ctx, mqttSession, msg.payload());MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK,false, MqttQoS.AT_MOST_ONCE, false, 0);IntStream intStream = msg.payload().topicSubscriptions().stream().mapToInt(s -> s.qualityOfService().value());MqttSubAckPayload payload = new MqttSubAckPayload(intStream.toArray());ctx.writeAndFlush(MqttMessageFactory.newMessage(fixedHeader,MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()),payload));}protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,MqttSubscribePayload subscribePayload) throws Exception {}@Overridepublic void processSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg) throws Exception {MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());if (mqttSession == null) {log.error("sub ack, client address {} not authed", ctx.channel().remoteAddress());ctx.close();}}@Overridepublic void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception {MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());if (mqttSession == null) {log.error("unsub, client address {} not authed", ctx.channel().remoteAddress());ctx.close();}}@Overridepublic void processUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg) throws Exception {MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());if (mqttSession == null) {log.error("unsub ack, client address {} not authed", ctx.channel().remoteAddress());ctx.close();}}@Overridepublic void processPingReq(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {ctx.writeAndFlush(MqttMessageUtil.pingResp());}@Overridepublic void processPingResp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());if (mqttSession == null) {log.error("ping resp, client address {} not authed", ctx.channel().remoteAddress());ctx.close();}}@Overridepublic void processDisconnect(ChannelHandlerContext ctx) throws Exception {MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());if (mqttSession == null) {log.error("disconnect, client address {} not authed", ctx.channel().remoteAddress());}onDisconnect(mqttSession);}protected void onDisconnect(MqttSessionKey mqttSessionKey) {}@Overridepublic void processAuth(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());if (mqttSession == null) {log.error("auth, client address {} not authed", ctx.channel().remoteAddress());ctx.close();}}
}

可以看到,这里的AbstractProcessor主要是维护了MqttSessionKey,校验MqttSessionKey,并拦截publish中不支持的Qos2、Failure。同时,也影响了mqtt心跳请求。同样的,我们允许在onPublishonSubscribe中抛出异常。

基于消息队列实现的mqtt网关的基础思想也比较简单,简而言之就是,有publish消息的时候向消息队列中生产消息。有订阅的时候就从消息队列中拉取消息。由此延伸出来,我们可能需要维护每个mqtt topic和producer、consumer的对应关系,因为像kafka、pulsar这些消息中间件的消费者都是区分topic的,片段通用代码如下:

    protected final ReentrantReadWriteLock.ReadLock rLock;protected final ReentrantReadWriteLock.WriteLock wLock;protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionProducerMap;protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionConsumerMap;protected final Map<MqttTopicKey, P> producerMap;protected final Map<MqttTopicKey, C> consumerMap;public AbstractMqProcessor(MqttAuth mqttAuth) {super(mqttAuth);ReentrantReadWriteLock lock = new ReentrantReadWriteLock();rLock = lock.readLock();wLock = lock.writeLock();this.sessionProducerMap = new HashMap<>();this.sessionConsumerMap = new HashMap<>();this.producerMap = new HashMap<>();this.consumerMap = new HashMap<>();}@Overrideprotected void onConnect(MqttSessionKey mqttSessionKey) {wLock.lock();try {sessionProducerMap.put(mqttSessionKey, new ArrayList<>());sessionConsumerMap.put(mqttSessionKey, new ArrayList<>());} finally {wLock.unlock();}}@Overrideprotected void onDisconnect(MqttSessionKey mqttSessionKey) {wLock.lock();try {// find producersList<MqttTopicKey> produceTopicKeys = sessionProducerMap.get(mqttSessionKey);if (produceTopicKeys != null) {for (MqttTopicKey mqttTopicKey : produceTopicKeys) {P producer = producerMap.get(mqttTopicKey);if (producer != null) {ClosableUtils.close(producer);producerMap.remove(mqttTopicKey);}}}sessionProducerMap.remove(mqttSessionKey);List<MqttTopicKey> consumeTopicKeys = sessionConsumerMap.get(mqttSessionKey);if (consumeTopicKeys != null) {for (MqttTopicKey mqttTopicKey : consumeTopicKeys) {C consumer = consumerMap.get(mqttTopicKey);if (consumer != null) {ClosableUtils.close(consumer);consumerMap.remove(mqttTopicKey);}}}sessionConsumerMap.remove(mqttSessionKey);} finally {wLock.unlock();}}
}

kafka processor实现

由于kafka producer不区分topic,我们可以在kafka processor中复用producer,在将来单个kafka producer的性能到达上限时,我们可以将kafka producer扩展为kafka producer列表进行轮询处理,消费者由于mqtt协议可能针对每个订阅topic有不同的行为,不合适复用同一个消费者实例。我们在构造函数中启动KafkaProducer

private final KafkaProcessorConfig kafkaProcessorConfig;private final KafkaProducer<String, ByteBuffer> producer;public KafkaProcessor(MqttAuth mqttAuth, KafkaProcessorConfig kafkaProcessorConfig) {super(mqttAuth);this.kafkaProcessorConfig = kafkaProcessorConfig;this.producer = createProducer();}protected KafkaProducer<String, ByteBuffer> createProducer() {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProcessorConfig.getBootstrapServers());properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer.class);return new KafkaProducer<>(properties);}

处理MqttPublish消息,MqttPublish消息包含如下几个关键参数

MqttQoS mqttQoS = publishMessage.fixedHeader().qosLevel();
String topic = publishMessage.variableHeader().topicName();
ByteBuffer byteBuffer = publishMessage.payload().nioBuffer();

其中

  • qos代表这条消息的质量级别,0没有任何保障,1代表至少一次,2代表恰好一次。当前仅支持qos0、qos1
  • topicName就是topic的名称
  • ByteBuffer就是消息的内容

根据topic、qos发送消息,代码如下

        String topic = msg.variableHeader().topicName();ProducerRecord<String, ByteBuffer> record = new ProducerRecord<>(topic, msg.payload().nioBuffer());switch (msg.fixedHeader().qosLevel()) {case AT_MOST_ONCE -> producer.send(record, (metadata, exception) -> {if (exception != null) {log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, exception);return;}log.debug("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}",mqttSessionKey, metadata.topic(), metadata.partition(), metadata.offset());});case AT_LEAST_ONCE -> {try {RecordMetadata recordMetadata = producer.send(record).get();log.info("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}",mqttSessionKey, recordMetadata.topic(),recordMetadata.partition(), recordMetadata.offset());ctx.writeAndFlush(MqttMessageUtil.pubAckMessage(msg.variableHeader().packetId()));} catch (Exception e) {log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, e);}}case EXACTLY_ONCE, FAILURE -> throw new IllegalStateException(String.format("mqttSessionKey %s can not reach here", mqttSessionKey));}

处理订阅消息,我们暂时仅根据订阅的topic,创建topic进行消费即可,由于kafka原生客户端建议的消费代码模式如下

while (true) {ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, byte[]> record : records) {// do logic}
}

我们需要切换到其他线程对consumer进行消息,书写一个KafkaConsumerListenerWrapper的wrapper,转换为listener异步消费模型

package io.github.protocol.mqtt.broker.processor;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;@Slf4j
public class KafkaConsumerListenerWrapper implements AutoCloseable {private final AdminClient adminClient;private final KafkaConsumer<String, byte[]> consumer;public KafkaConsumerListenerWrapper(KafkaProcessorConfig config, String username) {Properties adminProperties = new Properties();adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());this.adminClient = KafkaAdminClient.create(adminProperties);Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());properties.put(ConsumerConfig.GROUP_ID_CONFIG, username);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);this.consumer = new KafkaConsumer<>(properties);}public void start(String topic, KafkaMessageListener listener) throws Exception {try {TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(topic)).values().get(topic).get();log.info("topic info is {}", topicDescription);} catch (ExecutionException ee) {if (ee.getCause() instanceof UnknownTopicOrPartitionException) {log.info("topic {} not exist, create it", topic);adminClient.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1)));} else {log.error("find topic info {} error", topic, ee);}} catch (Exception e) {throw new IllegalStateException("find topic info error", e);}consumer.subscribe(Collections.singletonList(topic));log.info("consumer topic {} start", topic);new Thread(() -> {try {while (true) {ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, byte[]> record : records) {listener.messageReceived(record);}}} catch (WakeupException we) {consumer.close();} catch (Exception e) {log.error("consumer topic {} consume error", topic, e);consumer.close();}}).start();Thread.sleep(5_000);}@Overridepublic void close() throws Exception {log.info("wake up {} consumer", consumer);consumer.wakeup();}
}
    @Overrideprotected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,MqttSubscribePayload subscribePayload) throws Exception {for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) {KafkaConsumerListenerWrapper consumer = createConsumer(mqttSessionKey, topicSubscription.topicName());subscribe(ctx, consumer, topicSubscription.topicName());}}private KafkaConsumerListenerWrapper createConsumer(MqttSessionKey mqttSessionKey, String topic) {MqttTopicKey mqttTopicKey = new MqttTopicKey();mqttTopicKey.setTopic(topic);mqttTopicKey.setMqttSessionKey(mqttSessionKey);wLock.lock();try {KafkaConsumerListenerWrapper consumer = consumerMap.get(mqttTopicKey);if (consumer == null) {consumer = new KafkaConsumerListenerWrapper(kafkaProcessorConfig, mqttSessionKey.getUsername());sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {if (mqttTopicKeys == null) {mqttTopicKeys = new ArrayList<>();}mqttTopicKeys.add(mqttTopicKey);return mqttTopicKeys;});consumerMap.put(mqttTopicKey, consumer);}return consumer;} finally {wLock.unlock();}}protected void subscribe(ChannelHandlerContext ctx,KafkaConsumerListenerWrapper consumer, String topic) throws Exception {BoundInt boundInt = new BoundInt(65535);consumer.start(topic, record -> {log.info("receive message from kafka, topic {}, partition {}, offset {}",record.topic(), record.partition(), record.offset());MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage(MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), record.value());ctx.writeAndFlush(mqttPublishMessage);});}

在上述的代码中,有一个需要通篇注意的点:日志打印的时候,注意要将关键的信息携带,比如:topic、mqtt username、mqtt clientId等,在写demo的时候没有感觉,但是海量请求下需要定位问题的时候,就知道这些信息的关键之处了。

使用BountInt这个简单的工具类来生成从0~65535的packageId,满足协议的要求

pulsar processor实现

pulsar相比kafka来说,更适合作为mqtt协议的代理。原因有如下几点:

  • pulsar支持百万topic、topic实现更轻量
  • pulsar原生支持listener的消费模式,不需要每个消费者启动一个线程
  • pulsar支持share的消费模式,消费模式更灵活
  • pulsar消费者的subscribe可确保成功创建订阅,相比kafka的消费者没有这样的语义保障
    protected final ReentrantReadWriteLock.ReadLock rLock;protected final ReentrantReadWriteLock.WriteLock wLock;protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionProducerMap;protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionConsumerMap;protected final Map<MqttTopicKey, Producer<byte[]>> producerMap;protected final Map<MqttTopicKey, Consumer<byte[]>> consumerMap;private final PulsarProcessorConfig pulsarProcessorConfig;private final PulsarAdmin pulsarAdmin;private final PulsarClient pulsarClient;public PulsarProcessor(MqttAuth mqttAuth, PulsarProcessorConfig pulsarProcessorConfig) {super(mqttAuth);ReentrantReadWriteLock lock = new ReentrantReadWriteLock();rLock = lock.readLock();wLock = lock.writeLock();this.sessionProducerMap = new HashMap<>();this.sessionConsumerMap = new HashMap<>();this.producerMap = new HashMap<>();this.consumerMap = new HashMap<>();this.pulsarProcessorConfig = pulsarProcessorConfig;try {this.pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(pulsarProcessorConfig.getHttpUrl()).build();this.pulsarClient = PulsarClient.builder().serviceUrl(pulsarProcessorConfig.getServiceUrl()).build();} catch (Exception e) {throw new IllegalStateException("Failed to create pulsar client", e);}}

处理publish消息

    @Overrideprotected void onPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,MqttPublishMessage msg) throws Exception {String topic = msg.variableHeader().topicName();Producer<byte[]> producer = getOrCreateProducer(mqttSessionKey, topic);int len = msg.payload().readableBytes();byte[] messageBytes = new byte[len];msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);switch (msg.fixedHeader().qosLevel()) {case AT_MOST_ONCE -> producer.sendAsync(messageBytes).thenAccept(messageId -> log.info("clientId [{}],"+ " username [{}]. send message to pulsar success messageId: {}",mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId)).exceptionally((e) -> {log.error("clientId [{}], username [{}]. send message to pulsar fail: ",mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e);return null;});case AT_LEAST_ONCE -> {try {MessageId messageId = producer.send(messageBytes);MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK,false, MqttQoS.AT_MOST_ONCE, false, 0);MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageFactory.newMessage(fixedHeader,MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()), null);log.info("clientId [{}], username [{}]. send pulsar success. messageId: {}",mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId);ctx.writeAndFlush(pubAckMessage);} catch (PulsarClientException e) {log.error("clientId [{}], username [{}]. send pulsar error: {}",mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e.getMessage());}}case EXACTLY_ONCE, FAILURE -> throw new IllegalStateException(String.format("mqttSessionKey %s can not reach here", mqttSessionKey));}}private Producer<byte[]> getOrCreateProducer(MqttSessionKey mqttSessionKey, String topic) throws Exception {MqttTopicKey mqttTopicKey = new MqttTopicKey();mqttTopicKey.setTopic(topic);mqttTopicKey.setMqttSessionKey(mqttSessionKey);rLock.lock();try {Producer<byte[]> producer = producerMap.get(mqttTopicKey);if (producer != null) {return producer;}} finally {rLock.unlock();}wLock.lock();try {Producer<byte[]> producer = producerMap.get(mqttTopicKey);if (producer == null) {producer = createProducer(topic);sessionProducerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {if (mqttTopicKeys == null) {mqttTopicKeys = new ArrayList<>();}mqttTopicKeys.add(mqttTopicKey);return mqttTopicKeys;});producerMap.put(mqttTopicKey, producer);}return producer;} finally {wLock.unlock();}}protected Producer<byte[]> createProducer(String topic) throws Exception {return pulsarClient.newProducer(Schema.BYTES).topic(topic).create();}

处理subscribe消息

    @Overrideprotected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,MqttSubscribePayload subscribePayload) throws Exception {for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) {subscribe(ctx, mqttSessionKey, topicSubscription.topicName());}}protected void subscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,String topic) throws Exception {MqttTopicKey mqttTopicKey = new MqttTopicKey();mqttTopicKey.setTopic(topic);mqttTopicKey.setMqttSessionKey(mqttSessionKey);wLock.lock();try {Consumer<byte[]> consumer = consumerMap.get(mqttTopicKey);if (consumer == null) {consumer = createConsumer(ctx, mqttSessionKey.getUsername(), topic);sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {if (mqttTopicKeys == null) {mqttTopicKeys = new ArrayList<>();}mqttTopicKeys.add(mqttTopicKey);return mqttTopicKeys;});consumerMap.put(mqttTopicKey, consumer);}} finally {wLock.unlock();}}protected Consumer<byte[]> createConsumer(ChannelHandlerContext ctx, String username,String topic) throws Exception {BoundInt boundInt = new BoundInt(65535);try {PartitionedTopicStats partitionedStats = pulsarAdmin.topics().getPartitionedStats(topic, false);log.info("topic {} partitioned stats {}", topic, partitionedStats);} catch (PulsarAdminException.NotFoundException nfe) {log.info("topic {} not found", topic);pulsarAdmin.topics().createPartitionedTopic(topic, 1);}return pulsarClient.newConsumer(Schema.BYTES).topic(topic).messageListener((consumer, msg) -> {log.info("receive message from pulsar, topic {}, message {}", topic, msg.getMessageId());MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage(MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), msg.getData());ctx.writeAndFlush(mqttPublishMessage);}).subscriptionName(username).subscribe();}

测试用例

鲁邦的软件应该有相应的测试用例,这里简单写了两个基础的pubsub用例,实际的production ready的项目,测试用例会更加复杂,涵盖各种异常的场景。有句话说的很好 ”单元测试是对开发人员的即时激励“,我也很认同这句话

kafka

启动kafka测试broker

我们可以通过embedded-kafka-java这个项目来启动用做单元测试的kafka broker。通过如下的group引入依赖

        <dependency><groupId>io.github.embedded-middleware</groupId><artifactId>embedded-kafka-core</artifactId><version>0.0.2</version><scope>test</scope></dependency>

我们就可以通过如下的代码启动基于kafka的mqtt broker

@Slf4j
public class MqttKafkaTestUtil {public static MqttServer setupMqttKafka() throws Exception {EmbeddedKafkaServer embeddedKafkaServer = new EmbeddedKafkaServer();new Thread(() -> {try {embeddedKafkaServer.start();} catch (Exception e) {log.error("kafka broker started exception ", e);}}).start();Thread.sleep(5_000);MqttServerConfig mqttServerConfig = new MqttServerConfig();mqttServerConfig.setPort(0);mqttServerConfig.setProcessorType(ProcessorType.KAFKA);KafkaProcessorConfig kafkaProcessorConfig = new KafkaProcessorConfig();kafkaProcessorConfig.setBootstrapServers(String.format("localhost:%d", embeddedKafkaServer.getKafkaPort()));mqttServerConfig.setKafkaProcessorConfig(kafkaProcessorConfig);MqttServer mqttServer = new MqttServer(mqttServerConfig);new Thread(() -> {try {mqttServer.start();} catch (Exception e) {log.error("mqsar broker started exception ", e);}}).start();Thread.sleep(5000L);return mqttServer;}}

kafka端到端测试用例,比较简单,通过mqtt client publish一条消息,然后消费出来

@Log4j2
public class MqttKafkaPubSubTest {@Testpublic void pubSubTest() throws Exception {MqttServer mqttServer = MqttKafkaTestUtil.setupMqttKafka();String topic = UUID.randomUUID().toString();String content = "test-msg";String broker = String.format("tcp://localhost:%d", mqttServer.getPort());String clientId = UUID.randomUUID().toString();MemoryPersistence persistence = new MemoryPersistence();MqttClient sampleClient = new MqttClient(broker, clientId, persistence);MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName(UUID.randomUUID().toString());connOpts.setPassword(UUID.randomUUID().toString().toCharArray());connOpts.setCleanSession(true);log.info("Mqtt connecting to broker");sampleClient.connect(connOpts);CompletableFuture<String> future = new CompletableFuture<>();log.info("Mqtt subscribing");sampleClient.subscribe(topic, (s, mqttMessage) -> {log.info("messageArrived");future.complete(mqttMessage.toString());});log.info("Mqtt subscribed");MqttMessage message = new MqttMessage(content.getBytes());message.setQos(1);log.info("Mqtt message publishing");sampleClient.publish(topic, message);log.info("Mqtt message published");TimeUnit.SECONDS.sleep(3);sampleClient.disconnect();String msg = future.get(5, TimeUnit.SECONDS);Assertions.assertEquals(content, msg);}}

pulsar

我们可以通过embedded-pulsar-java这个项目来启动用做单元测试的pulsar broker。通过如下的group引入依赖

        <dependency><groupId>io.github.embedded-middleware</groupId><artifactId>embedded-pulsar-core</artifactId><version>0.0.2</version><scope>test</scope></dependency>

我们就可以通过如下的代码启动基于pulsar的mqtt broker

@Slf4j
public class MqttPulsarTestUtil {public static MqttServer setupMqttPulsar() throws Exception {EmbeddedPulsarServer embeddedPulsarServer = new EmbeddedPulsarServer();embeddedPulsarServer.start();MqttServerConfig mqttServerConfig = new MqttServerConfig();mqttServerConfig.setPort(0);mqttServerConfig.setProcessorType(ProcessorType.PULSAR);PulsarProcessorConfig pulsarProcessorConfig = new PulsarProcessorConfig();pulsarProcessorConfig.setHttpUrl(String.format("http://localhost:%d", embeddedPulsarServer.getWebPort()));pulsarProcessorConfig.setServiceUrl(String.format("pulsar://localhost:%d", embeddedPulsarServer.getTcpPort()));mqttServerConfig.setPulsarProcessorConfig(pulsarProcessorConfig);MqttServer mqttServer = new MqttServer(mqttServerConfig);new Thread(() -> {try {mqttServer.start();} catch (Exception e) {log.error("mqsar broker started exception ", e);}}).start();Thread.sleep(5000L);return mqttServer;}
}

pulsar端到端测试用例,比较简单,通过mqtt client publish一条消息,然后消费出来

@Log4j2
public class MqttPulsarPubSubTest {@Testpublic void pubSubTest() throws Exception {MqttServer mqttServer = MqttPulsarTestUtil.setupMqttPulsar();String topic = UUID.randomUUID().toString();String content = "test-msg";String broker = String.format("tcp://localhost:%d", mqttServer.getPort());String clientId = UUID.randomUUID().toString();MemoryPersistence persistence = new MemoryPersistence();MqttClient sampleClient = new MqttClient(broker, clientId, persistence);MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName(UUID.randomUUID().toString());connOpts.setPassword(UUID.randomUUID().toString().toCharArray());connOpts.setCleanSession(true);log.info("Mqtt connecting to broker");sampleClient.connect(connOpts);CompletableFuture<String> future = new CompletableFuture<>();log.info("Mqtt subscribing");sampleClient.subscribe(topic, (s, mqttMessage) -> {log.info("messageArrived");future.complete(mqttMessage.toString());});log.info("Mqtt subscribed");MqttMessage message = new MqttMessage(content.getBytes());message.setQos(1);log.info("Mqtt message publishing");sampleClient.publish(topic, message);log.info("Mqtt message published");TimeUnit.SECONDS.sleep(3);sampleClient.disconnect();String msg = future.get(5, TimeUnit.SECONDS);Assertions.assertEquals(content, msg);}
}

性能优化

这里我们简单描述几个性能优化点,像一些调整线程数、buffer大小这类的参数调整就不在这里赘述了,这些需要具体的性能压测来决定参数的设置。

在linux上使用Epoll网络模型

public class EventLoopUtil {/*** @return an EventLoopGroup suitable for the current platform*/public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) {if (Epoll.isAvailable()) {return new EpollEventLoopGroup(nThreads, threadFactory);} else {return new NioEventLoopGroup(nThreads, threadFactory);}}public static Class<? extends ServerSocketChannel> getServerSocketChannelClass(EventLoopGroup eventLoopGroup) {if (eventLoopGroup instanceof EpollEventLoopGroup) {return EpollServerSocketChannel.class;} else {return NioServerSocketChannel.class;}}}

通过Epollo.isAvailable,以及在指定channel类型的时候通过判断group的类型选择对应的channel类型

        EventLoopGroup acceptorGroup = EventLoopUtil.newEventLoopGroup(1,new DefaultThreadFactory("mqtt-acceptor"));EventLoopGroup workerGroup = EventLoopUtil.newEventLoopGroup(1,new DefaultThreadFactory("mqtt-worker"));
                b.group(acceptorGroup, workerGroup)// key point.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup)).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();// decoderp.addLast(new MqttDecoder());p.addLast(MqttEncoder.INSTANCE);p.addLast(new MqttHandler(processor(mqttServerConfig)));}});

关闭tcp keepalive

由于mqtt协议本身就有心跳机制,所以可以关闭tcp的keepalive,依赖mqtt协议层的心跳即可,节约海量连接下的性能。配置ChannelOption.SO_KEEPALIVE为false即可

                    .option(ChannelOption.SO_KEEPALIVE, false)

超时时间调短

默认情况下,无论是单元测试中mqtt,还是pulsar producer和kafka producer的生产超时时间,都相对较长(一般为30s),如果在内网环境部署,可以将超时时间调整到5s。来避免无意义的超时等待

使用多个KafkaProducer来优化性能

单个KafkaProducer会达到tcp链路带宽的瓶颈,当有海量请求,而延时在kafka生产比较突出的情况下,可以考虑启动多个KafkaProducer。并根据mqtt协议的特点(链路多,单个链路上qps不高),用mqttSessionKey的哈希值来决定使用那个KafkaProducer发送消息

在KafkaProcessorConfig中添加如下配置,生产者个数,默认为1

        private int producerNum = 1;

在初始化的时候,初始化Producer数组,而不是单个Producer

        this.producerArray = new KafkaProducer[kafkaProcessorConfig.getProducerNum()];for (int i = 0; i < kafkaProcessorConfig.getProducerNum(); i++) {producerArray[i] = createProducer();}

封装一个方法来获取producer

    private Producer<String, ByteBuffer> getProducer(MqttSessionKey mqttSessionKey) {return producerArray[Math.abs(mqttSessionKey.hashCode() % kafkaProcessorConfig.getProducerNum())];}

结语

本文的代码均已上传到github。我们这里仅仅只实现了基础的mqtt 连接、发布、订阅功能,甚至不支持暂停、取消订阅。想要实现一个成熟商用的mqtt网关,我们还需要用户隔离、对协议的更多支持、可靠性、可运维、流控、安全等能力。如有商用生产级别的mqtt需求,又无法快速构筑成熟的mqtt网关的提供稳定可靠的mqtt服务,支持海量设备连接上云、设备和云端消息双向通信能力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/49086.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Zabbix监控系统:zabbix服务部署+基于Proxy分布式部署+zabbix主动与被动监控模式

一、Zabbix概述 1.1 简介 zabbix 是一个基于 Web 界面的提供分布式系统监视以及网络监视功能的企业级的开源解决方案。 zabbix 能监视各种网络参数&#xff0c;保证服务器系统的安全运营&#xff0c;提供灵活的通知机制以让系统管理员快速定位/解决存在的各种问题。 zabbix…

中文之美:每日成语【瓜田李下】

文章目录 引言瓜田李下相关谜语成语接龙引言 中文之美,美在诗词歌赋,美在绝句华章,也美在对事物名称的雅致表达。 瓜田李下 拼音:guā tin lǐ xi 释义 1.经过瓜田时弯腰提鞋,经过李子树下时抬手扶帽,容易被人怀疑是偷瓜、偷李子。 2.比喻嫌疑之地。 出处:三国曹植…

AGI 之 【Hugging Face】 的【从零训练Transformer模型】之二 [ 从零训练一个模型 ] 的简单整理

AGI 之 【Hugging Face】 的【从零训练Transformer模型】之二 [ 从零训练一个模型 ] 的简单整理 目录 AGI 之 【Hugging Face】 的【从零训练Transformer模型】之二 [ 从零训练一个模型 ] 的简单整理 一、简单介绍 二、Transformer 1、模型架构 2、应用场景 3、Hugging …

基于微信小程序+SpringBoot+Vue的校园自助打印系统(带1w+文档)

基于微信小程序SpringBootVue的校园自助打印系统(带1w文档) 基于微信小程序SpringBootVue的校园自助打印系统(带1w文档) 管理信息可以处理复杂的信息从而提高用户的工作效率&#xff0c;减少失误。所以本基于Vue和微信小程序的校园自助打印系统的开发非常有意义&#xff0c;本系…

科研绘图系列:R语言TCGA分组饼图(multiple pie charts)

介绍 在诸如癌症基因组图谱(TCGA)等群体研究项目中,为了有效地表征和比较不同群体的属性分布,科研人员广泛采用饼图作为数据可视化的工具。饼图通过将一个完整的圆形划分为若干个扇形区域,每个扇形区域的面积大小直接对应其代表的属性在整体中的占比。这种图形化的展示方…

博客建站4 - ssh远程连接服务器

1. 什么是SSH?2. 下载shh客户端3. 配置ssh密钥4. 连接服务器5. 常见问题 5.1. IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY! 1. 什么是SSH? SSH&#xff08;Secure Shell&#xff09;是一种加密的网络协议&#xff0c;用于在不安全的网络中安全地远程登录到其他…

算法-选择排序

选择排序实现 以下是用kotlin写的选择排序。 让我们来一行行地分析。我会先摘出代码片段&#xff0c;然后给出解释。 这个外层的循环代表每一轮检查。在一轮检查之初&#xff0c;我们会先记住目前的最小值的索引。 因此每轮开始时lowestNumberIndex都会是该轮的起点索引i。注意…

系统架构设计师教程 第4章 信息安全技术基础知识-4.1 信息安全基础知识-解读

系统架构设计师教程 第4章 信息安全技术基础知识-4.1 信息安全基础知识 4.1.1 信息安全的概念4.1.1.1 信息安全的范围4.1.1.1.1 设备安全4.1.1.1.2 数据安全4.1.1.1.3 内容安全4.1.1.1.4 行为安全 4.1.2 信息存储安全4.1.2.1 信息使用的安全4.1.2.1.1 用户的标识与验证4.1.2.1.…

(C++) 文件读写基础

文章目录 &#x1f5c2;️前言&#x1f4c4;ref&#x1f4c4;访问标记&#x1f5c3;️流打开模式类型 &#x1f5c2;️Code&#x1f4c4;demo&#x1f4c4;分点讲解&#x1f5c3;️打开/关闭&#x1f5c3;️写&#x1f5c3;️读&#x1f5c3;️状态函数 &#x1f5c2;️END&…

【YOLOv10[基础]】热力图可视化实践① | 支持视频热力图 | 密度热力图 | 论文必备

本文将进行添加YOLOv10版本的热力图可视化功能的实践,支持视频的热力图可视化。 目录 一 热力图可视化实践① 1 代码 2 效果图 在论文中经常可以见到提取的物体特征以热力图的形式展示出来,将特征图以热力图的方式进行可视化在深度学习中有以下的原因: ①强调激活区域 ,…

MongoDB教程(十八):MongoDB MapReduce

&#x1f49d;&#x1f49d;&#x1f49d;首先&#xff0c;欢迎各位来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里不仅可以有所收获&#xff0c;同时也能感受到一份轻松欢乐的氛围&#xff0c;祝你生活愉快&#xff01; 文章目录 引言一、MapRed…

数据库对象中出现复杂的对象嵌套,如何使用Mybatis plus优雅的解决这个问题:

起因 类原型&#xff1a; 在User类&#xff1a; package com.itheima.mp.domain.po;import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.IdType; import java.time…

VBA:随机生成100以内两位数加减法练习

给儿子用&#xff0c;每天一百道&#xff0c;哈哈哈 Sub add_ranknum()Dim num1 As Integer, num2 As Integer, num3 As Integer, temp As Integer Dim operat As StringFor c 1 To 10 Step 2 For i 1 To 20 NX:Randomizenum1 Rnd * 99num2 Rnd * 99If num1 num2 Then GoT…

阿里云ubuntu宝塔面板部署uni-app-flask-websocket前后端项目

1.下载宝塔面板 wget -O install.sh https://download.bt.cn/install/install-ubuntu_6.0.sh && sudo bash install.sh ed8484bec 然后去安全组开放对应的端口 面板账户登录信息 【云服务器】请在安全组放行 29725 端口 进入控制面板后修改默认用户名和密码 2. …

HTML 相册2.0 通过css 获取图片资源 2024/7/22 日志

简单方法通过css 绕过同源策略 以获取资源 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>水面倒影…

从理论到实践:如何用 TDengine 打造完美数据模型​

在用 TDengine 进行数据建模之前&#xff0c;我们需要回答两个关键问题&#xff1a;建模的目标用户是谁&#xff1f;他们的具体需求是什么&#xff1f;在一个典型的时序数据管理方案中&#xff0c;数据采集和数据应用是两个主要环节。如下图所示&#xff1a; 对于数据采集工程师…

EXCEL怎么自动添加表格吗?

第一步&#xff0c;选中需要添加表格的范围 第二步&#xff0c;点击开始&#xff0c;选择条件格式&#xff0c;“使用公式确定要设置格式的单元格” 第三步&#xff0c;编辑规则说明加上<>"" 第四步&#xff0c;点击边框&#xff0c;选择外边框确定即可&#x…

电脑没有摄像头怎么用手机当摄像头?虚拟摄像头使用的详细教程来了(全)

随着科技水平以及全球化经济的快速发展&#xff0c;视频会议、在线课程和直播已经成为日常办公或者生活中必不可少的一个环节。然而&#xff0c;在如今仍有许多台式电脑和一些老旧的笔记本电脑并没有内置摄像头&#xff0c;亦或者自带的摄像头质量不够理想&#xff0c;这使得视…

1小时上手Alibaba Sentinel流控安全组件

微服务的雪崩效应 假如我们开发了一套分布式应用系统&#xff0c;前端应用分别向A/H/I/P四个服务发起调用请求&#xff1a; 但随着时间推移&#xff0c;假如服务 I 因为优化问题&#xff0c;导致需要 20 秒才能返回响应&#xff0c;这就必然会导致20秒内该请求线程会一直处于阻…

跟代码执行流程,读Megatron源码(四)megatron训练脚本initialize.py之initialize_megatron()分布式环境初始化

在前文中&#xff0c;我们讲述了pretrain函数的执行流程&#xff0c;其首要步骤是megatron分组的初始化与环境的配置。本文将深入initialize_megatron函数源码&#xff0c;剖析其初始化分布式训练环境的内部机制。 注&#xff1a;在此假设读者具备3D并行相关知识 一. initiali…