🚀 优质资源分享 🚀
学习路线指引(点击解锁) | 知识定位 | 人群定位 |
---|---|---|
🧡 Python实战微信订餐小程序 🧡 | 进阶级 | 本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。 |
💛Python量化交易实战💛 | 入门级 | 手把手带你打造一个易扩展、更安全、效率更高的量化交易系统 |
MQ 是什么?
MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。
指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。
消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。
MQ 的作用?
消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
解耦:一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。
异步:主业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体验。
削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。
ps: 以上内容摘选自百科。
实现 mq 的准备工作
maven 引入
<dependency><groupId>io.nettygroupId><artifactId>netty-allartifactId><version>4.1.42.Finalversion>
dependency><dependency><groupId>com.alibabagroupId><artifactId>fastjsonartifactId><version>1.2.76version>
dependency>
模块划分
The message queue in java. 作为 mq 的从零开始的学习项目,目前已开源。
项目的模块如下:
模块 | 说明 |
---|---|
mq-common | 公共代码 |
mq-broker | 注册中心 |
mq-producer | 消息生产者 |
mq-consumer | 消息消费者 |
消息消费者
接口定义
package com.github.houbb.mq.consumer.api;/*** @author binbin.hou* @since 1.0.0*/
public interface IMqConsumer {/*** 订阅* @param topicName topic 名称* @param tagRegex 标签正则*/void subscribe(String topicName, String tagRegex);/*** 注册监听器* @param listener 监听器*/void registerListener(final IMqConsumerListener listener);}
IMqConsumerListener
作为消息监听类的接口,定义如下:
public interface IMqConsumerListener {/*** 消费* @param mqMessage 消息体* @param context 上下文* @return 结果*/ConsumerStatus consumer(final MqMessage mqMessage,final IMqConsumerListenerContext context);}
ConsumerStatus 代表消息消费的几种状态。
消息体
启动消息体 MqMessage 定义如下:
package com.github.houbb.mq.common.dto;import java.util.Arrays;
import java.util.List;/*** @author binbin.hou* @since 1.0.0*/
public class MqMessage {/*** 标题名称*/private String topic;/*** 标签*/private List tags;/*** 内容*/private byte[] payload;/*** 业务标识*/private String bizKey;/*** 负载分片标识*/private String shardingKey;// getter&setter&toString}
push 消费者策略实现
消费者启动的实现如下:
/*** 推送消费策略** @author binbin.hou* @since 1.0.0*/
public class MqConsumerPush extends Thread implements IMqConsumer {// 省略...@Overridepublic void run() {// 启动服务端log.info("MQ 消费者开始启动服务端 groupName: {}, port: {}, brokerAddress: {}",groupName, port, brokerAddress);EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(workerGroup, bossGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() {@Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new MqConsumerHandler());}})// 这个参数影响的是还没有被accept 取出的连接.option(ChannelOption.SO\_BACKLOG, 128)// 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。.childOption(ChannelOption.SO\_KEEPALIVE, true);// 绑定端口,开始接收进来的链接ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();log.info("MQ 消费者启动完成,监听【" + port + "】端口");channelFuture.channel().closeFuture().syncUninterruptibly();log.info("MQ 消费者关闭完成");} catch (Exception e) {log.error("MQ 消费者启动异常", e);throw new MqException(ConsumerRespCode.RPC\_INIT\_FAILED);} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}// 省略...}
ps: 初期我们把 consumer 作为服务端,后续引入 broker 则只有 broker 是服务端。
MqConsumerHandler 处理类
这个类是一个空的实现。
public class MqConsumerHandler extends SimpleChannelInboundHandler {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, Object object) throws Exception {//nothing}}
测试代码
MqConsumerPush mqConsumerPush = new MqConsumerPush();
mqConsumerPush.start();
启动日志:
[DEBUG] [2022-04-21 19:16:41.343] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2022-04-21 19:16:41.356] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者开始启动服务端 groupName: C\_DEFAULT\_GROUP\_NAME, port: 9527, brokerAddress:
[INFO] [2022-04-21 19:16:43.196] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者启动完成,监听【9527】端口
消息生产者
接口定义
最基本的消息发送接口。
package com.github.houbb.mq.producer.api;import com.github.houbb.mq.common.dto.MqMessage;
import com.github.houbb.mq.producer.dto.SendResult;/*** @author binbin.hou* @since 1.0.0*/
public interface IMqProducer {/*** 同步发送消息* @param mqMessage 消息类型* @return 结果*/SendResult send(final MqMessage mqMessage);/*** 单向发送消息* @param mqMessage 消息类型* @return 结果*/SendResult sendOneWay(final MqMessage mqMessage);}
生产者实现
MqProducer 启动的实现如下,基于 netty。
package com.github.houbb.mq.producer.core;/*** 默认 mq 生产者* @author binbin.hou* @since 1.0.0*/
public class MqProducer extends Thread implements IMqProducer {//省略...@Overridepublic void run() {// 启动服务端log.info("MQ 生产者开始启动客户端 GROUP: {}, PORT: {}, brokerAddress: {}",groupName, port, brokerAddress);EventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();ChannelFuture channelFuture = bootstrap.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer(){@Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO)).addLast(new MqProducerHandler());}}).connect("localhost", port).syncUninterruptibly();log.info("MQ 生产者启动客户端完成,监听端口:" + port);channelFuture.channel().closeFuture().syncUninterruptibly();log.info("MQ 生产者开始客户端已关闭");} catch (Exception e) {log.error("MQ 生产者启动遇到异常", e);throw new MqException(ProducerRespCode.RPC\_INIT\_FAILED);} finally {workerGroup.shutdownGracefully();}}//省略...
}
MqProducerHandler 处理类
默认的空实现,什么都不做。
package com.github.houbb.mq.producer.handler;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;/*** @author binbin.hou* @since 1.0.0*/
public class MqProducerHandler extends SimpleChannelInboundHandler {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, Object object) throws Exception {//do nothing now}}
启动代码
MqProducer mqProducer = new MqProducer();
mqProducer.start();
启动日志:
[DEBUG] [2022-04-21 19:17:11.960] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2022-04-21 19:17:11.974] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生产者开始启动客户端 GROUP: P\_DEFAULT\_GROUP\_NAME, PORT: 9527, brokerAddress:
四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0x5cb48145] REGISTERED
四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler connect
信息: [id: 0x5cb48145] CONNECT: localhost/127.0.0.1:9527
四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0x5cb48145, L:/127.0.0.1:57740 - R:localhost/127.0.0.1:9527] ACTIVE
[INFO] [2022-04-21 19:17:13.833] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生产者启动客户端完成,监听端口:9527
小结
基于 netty 最基本的服务端启动、客户端启动到这里就结束了。
千里之行,始于足下。
我们下一节将和大家一起学习,如何实现客户端与服务端之间的交互。
希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。
我是老马,期待与你的下次重逢。
开源地址
The message queue in java.(java 简易版本 mq 实现) : https://github.com/houbb/mq
拓展阅读
rpc-从零开始实现 rpc: https://github.com/houbb/rpc
【mq】从零开始实现 mq-01-生产者、消费者启动