【mq】从零开始实现 mq-01-生产者、消费者启动

🚀 优质资源分享 🚀

学习路线指引(点击解锁)知识定位人群定位
🧡 Python实战微信订餐小程序 🧡进阶级本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。
💛Python量化交易实战💛入门级手把手带你打造一个易扩展、更安全、效率更高的量化交易系统

MQ 是什么?

MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。

指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。

消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。

MQ 的作用?

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

解耦:一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。

异步:主业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体验。

削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。

ps: 以上内容摘选自百科。

实现 mq 的准备工作

maven 引入

<dependency><groupId>io.nettygroupId><artifactId>netty-allartifactId><version>4.1.42.Finalversion>
dependency><dependency><groupId>com.alibabagroupId><artifactId>fastjsonartifactId><version>1.2.76version>
dependency>

模块划分

The message queue in java. 作为 mq 的从零开始的学习项目,目前已开源。

项目的模块如下:

模块说明
mq-common公共代码
mq-broker注册中心
mq-producer消息生产者
mq-consumer消息消费者

消息消费者

接口定义

package com.github.houbb.mq.consumer.api;/*** @author binbin.hou* @since 1.0.0*/
public interface IMqConsumer {/*** 订阅* @param topicName topic 名称* @param tagRegex 标签正则*/void subscribe(String topicName, String tagRegex);/*** 注册监听器* @param listener 监听器*/void registerListener(final IMqConsumerListener listener);}

IMqConsumerListener 作为消息监听类的接口,定义如下:

public interface IMqConsumerListener {/*** 消费* @param mqMessage 消息体* @param context 上下文* @return 结果*/ConsumerStatus consumer(final MqMessage mqMessage,final IMqConsumerListenerContext context);}

ConsumerStatus 代表消息消费的几种状态。

消息体

启动消息体 MqMessage 定义如下:

package com.github.houbb.mq.common.dto;import java.util.Arrays;
import java.util.List;/*** @author binbin.hou* @since 1.0.0*/
public class MqMessage {/*** 标题名称*/private String topic;/*** 标签*/private List tags;/*** 内容*/private byte[] payload;/*** 业务标识*/private String bizKey;/*** 负载分片标识*/private String shardingKey;// getter&setter&toString}

push 消费者策略实现

消费者启动的实现如下:

/*** 推送消费策略** @author binbin.hou* @since 1.0.0*/
public class MqConsumerPush extends Thread implements IMqConsumer  {// 省略...@Overridepublic void run() {// 启动服务端log.info("MQ 消费者开始启动服务端 groupName: {}, port: {}, brokerAddress: {}",groupName, port, brokerAddress);EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(workerGroup, bossGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() {@Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new MqConsumerHandler());}})// 这个参数影响的是还没有被accept 取出的连接.option(ChannelOption.SO\_BACKLOG, 128)// 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。.childOption(ChannelOption.SO\_KEEPALIVE, true);// 绑定端口,开始接收进来的链接ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();log.info("MQ 消费者启动完成,监听【" + port + "】端口");channelFuture.channel().closeFuture().syncUninterruptibly();log.info("MQ 消费者关闭完成");} catch (Exception e) {log.error("MQ 消费者启动异常", e);throw new MqException(ConsumerRespCode.RPC\_INIT\_FAILED);} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}// 省略...}

ps: 初期我们把 consumer 作为服务端,后续引入 broker 则只有 broker 是服务端。

MqConsumerHandler 处理类

这个类是一个空的实现。

public class MqConsumerHandler extends SimpleChannelInboundHandler {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, Object object) throws Exception {//nothing}}

测试代码

MqConsumerPush mqConsumerPush = new MqConsumerPush();
mqConsumerPush.start();

启动日志:

[DEBUG] [2022-04-21 19:16:41.343] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2022-04-21 19:16:41.356] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者开始启动服务端 groupName: C\_DEFAULT\_GROUP\_NAME, port: 9527, brokerAddress: 
[INFO] [2022-04-21 19:16:43.196] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消费者启动完成,监听【9527】端口

消息生产者

接口定义

最基本的消息发送接口。

package com.github.houbb.mq.producer.api;import com.github.houbb.mq.common.dto.MqMessage;
import com.github.houbb.mq.producer.dto.SendResult;/*** @author binbin.hou* @since 1.0.0*/
public interface IMqProducer {/*** 同步发送消息* @param mqMessage 消息类型* @return 结果*/SendResult send(final MqMessage mqMessage);/*** 单向发送消息* @param mqMessage 消息类型* @return 结果*/SendResult sendOneWay(final MqMessage mqMessage);}

生产者实现

MqProducer 启动的实现如下,基于 netty。

package com.github.houbb.mq.producer.core;/*** 默认 mq 生产者* @author binbin.hou* @since 1.0.0*/
public class MqProducer extends Thread implements IMqProducer {//省略...@Overridepublic void run() {// 启动服务端log.info("MQ 生产者开始启动客户端 GROUP: {}, PORT: {}, brokerAddress: {}",groupName, port, brokerAddress);EventLoopGroup workerGroup = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();ChannelFuture channelFuture = bootstrap.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer(){@Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO)).addLast(new MqProducerHandler());}}).connect("localhost", port).syncUninterruptibly();log.info("MQ 生产者启动客户端完成,监听端口:" + port);channelFuture.channel().closeFuture().syncUninterruptibly();log.info("MQ 生产者开始客户端已关闭");} catch (Exception e) {log.error("MQ 生产者启动遇到异常", e);throw new MqException(ProducerRespCode.RPC\_INIT\_FAILED);} finally {workerGroup.shutdownGracefully();}}//省略...
}

MqProducerHandler 处理类

默认的空实现,什么都不做。

package com.github.houbb.mq.producer.handler;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;/*** @author binbin.hou* @since 1.0.0*/
public class MqProducerHandler extends SimpleChannelInboundHandler {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, Object object) throws Exception {//do nothing now}}

启动代码

MqProducer mqProducer = new MqProducer();
mqProducer.start();

启动日志:

[DEBUG] [2022-04-21 19:17:11.960] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2022-04-21 19:17:11.974] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生产者开始启动客户端 GROUP: P\_DEFAULT\_GROUP\_NAME, PORT: 9527, brokerAddress: 
四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0x5cb48145] REGISTERED
四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler connect
信息: [id: 0x5cb48145] CONNECT: localhost/127.0.0.1:9527
四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0x5cb48145, L:/127.0.0.1:57740 - R:localhost/127.0.0.1:9527] ACTIVE
[INFO] [2022-04-21 19:17:13.833] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生产者启动客户端完成,监听端口:9527

小结

基于 netty 最基本的服务端启动、客户端启动到这里就结束了。

千里之行,始于足下。

我们下一节将和大家一起学习,如何实现客户端与服务端之间的交互。

希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

我是老马,期待与你的下次重逢。

开源地址

The message queue in java.(java 简易版本 mq 实现) : https://github.com/houbb/mq

拓展阅读

rpc-从零开始实现 rpc: https://github.com/houbb/rpc

【mq】从零开始实现 mq-01-生产者、消费者启动

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/401056.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

远程连接Ubuntu桌面配置

1、打开终端&#xff1a;依次安装 sudo apt-get install xrdpsudo apt-get install vnc4server tightvncserversudo apt-get install xubuntu-desktop 2、安装完&#xff1a;xubuntu-desktop之后&#xff0c;做如下配置以及启动 roothd-slave2:jvm# echo "xfce4-session&q…

【大话云原生】微服务篇-五星级酒店的服务方式

&#x1f680; 优质资源分享 &#x1f680; 学习路线指引&#xff08;点击解锁&#xff09;知识定位人群定位&#x1f9e1; Python实战微信订餐小程序 &#x1f9e1;进阶级本课程是python flask微信小程序的完美结合&#xff0c;从项目搭建到腾讯云部署上线&#xff0c;打造一…

从C#到TypeScript - Generator

从C#到TypeScript - Generator 上篇讲了Promise&#xff0c;Promise的执行需要不停的调用then&#xff0c;虽然比callback要好些&#xff0c;但也显得累赘。所以ES6里添加了Generator来做流程控制&#xff0c;可以更直观的执行Promise&#xff0c;但终级方案还是ES7议案中的asy…

C#中检查null的语法糖

&#x1f680; 优质资源分享 &#x1f680; 学习路线指引&#xff08;点击解锁&#xff09;知识定位人群定位&#x1f9e1; Python实战微信订餐小程序 &#x1f9e1;进阶级本课程是python flask微信小程序的完美结合&#xff0c;从项目搭建到腾讯云部署上线&#xff0c;打造一…

iOS UItextView监听输入特定字符跳转页面选择选项返回

今天有朋友问我一个需求的实现,于是自己写了一个Demo简单的实现了一下: 需求是: 1>比如: 检测用户输入"A"字符串,跳转页面选择选项,将选择的选项放置textView里,作为当前的输入; 2>不是"A"字符,则正常的textView输入; 3.用户跳转选择了,则将选择的输…

PDCA循环——快速提升软件质量的必备工具

&#x1f680; 优质资源分享 &#x1f680; 学习路线指引&#xff08;点击解锁&#xff09;知识定位人群定位&#x1f9e1; Python实战微信订餐小程序 &#x1f9e1;进阶级本课程是python flask微信小程序的完美结合&#xff0c;从项目搭建到腾讯云部署上线&#xff0c;打造一…

oracle 表空间 用户

-- create user mapecun identified by "accp"; --alter user 用户名 quota unlimited on 表空间; --alter user mapecun quota unlimited on USERS; --grant create sequence to mapecun; /** grant create session to mapecun; grant create table to mapecun; g…

如何在Web前端实现CAD图文字全文搜索功能之技术分享

&#x1f680; 优质资源分享 &#x1f680; 学习路线指引&#xff08;点击解锁&#xff09;知识定位人群定位&#x1f9e1; Python实战微信订餐小程序 &#x1f9e1;进阶级本课程是python flask微信小程序的完美结合&#xff0c;从项目搭建到腾讯云部署上线&#xff0c;打造一…

基于Java (spring-boot)的仓库管理系统

一、项目介绍 本系统的使用者一共有系统管理员、仓库管理员和普通用户这3种角色: 1.系统管理员&#xff1a;通过登录系统后&#xff0c;可以进行管理员和用户信息的管理、仓库和物品分类的管理&#xff0c;以及操作日志的查询&#xff0c;具有全面的系统管理权限。 2.仓库管理…

基于语义感知SBST的API场景测试智能生成

&#x1f680; 优质资源分享 &#x1f680; 学习路线指引&#xff08;点击解锁&#xff09;知识定位人群定位&#x1f9e1; Python实战微信订餐小程序 &#x1f9e1;进阶级本课程是python flask微信小程序的完美结合&#xff0c;从项目搭建到腾讯云部署上线&#xff0c;打造一…

【KMP模板】简单写个KMP~

本来easy的KMP 却一直过不了洛谷的模板题。。。 仔细一看原来在输出next数组时打的回车而不是空格。。。 身败名裂。。。 话说有个sunday貌似一般状况下比KMP快呢。。。去看看2333 #include<cstdio> #include<iostream> #include<cstring> #include<algor…

2015 CALLED THE INTERFACE OF 2014

Writer&#xff1a;BYSocket&#xff08;泥沙砖瓦浆木匠&#xff09; 微博&#xff1a;BYSocket豆瓣&#xff1a;BYSocketReprint it anywhere u want. ”Hi , Happy New Year.Written in Stupid Enlish,Dont push me *.* ” 2014 System 2015 is coming.But 2014 is not over.…

论文解读(MERIT)《Multi-Scale Contrastive Siamese Networks for Self-Supervised Graph Representation Learni

&#x1f680; 优质资源分享 &#x1f680; 学习路线指引&#xff08;点击解锁&#xff09;知识定位人群定位&#x1f9e1; Python实战微信订餐小程序 &#x1f9e1;进阶级本课程是python flask微信小程序的完美结合&#xff0c;从项目搭建到腾讯云部署上线&#xff0c;打造一…

Fiddler抓包9-保存会话(save)

前言 为什么要保存会话呢&#xff1f;举个很简单的场景&#xff0c;你在上海测试某个功能接口的时候&#xff0c;发现了一个BUG&#xff0c;而开发这个接口的开发人员是北京的一家合作公司。你这时候给对方开发提bug&#xff0c; 如何显得专业一点&#xff0c;能让对方心服口服…

『现学现忘』Git基础 — 17、Commit对象

&#x1f680; 优质资源分享 &#x1f680; 学习路线指引&#xff08;点击解锁&#xff09;知识定位人群定位&#x1f9e1; Python实战微信订餐小程序 &#x1f9e1;进阶级本课程是python flask微信小程序的完美结合&#xff0c;从项目搭建到腾讯云部署上线&#xff0c;打造一…

Spring/MVC映射WEB-INF下的文件(img、css、js等)

学过Mvc的都知道并且会访问该目录下面的jsp 页面&#xff08;这是最基础的&#xff09; 但我们想访问里面的图片什么的&#xff0c;又该怎么去访问呢&#xff0c; 一句代码&#xff1a; <mvc:resources mapping"/img/**" location"/WEB-INF/img/"/> …

NSDate 类的总结,全面基础

NSDate 类的总结,全面基础 <span style"font-size:24px;"><span style"font-size:18px;">//1.创建日期对象//创建的NSDate对象,获得的永远是0时区时间,假设要是求东八区时间,就加8个小时NSDate *date [NSDate date];NSLog("%",date…

《HelloGitHub》第 73 期

&#x1f680; 优质资源分享 &#x1f680; 学习路线指引&#xff08;点击解锁&#xff09;知识定位人群定位&#x1f9e1; Python实战微信订餐小程序 &#x1f9e1;进阶级本课程是python flask微信小程序的完美结合&#xff0c;从项目搭建到腾讯云部署上线&#xff0c;打造一…

FTP命令大全

项目需要&#xff0c;接触到ftp的一些操作&#xff0c;这里搬运一些ftp命令供以后参考 命令描述ABOR(ABORT)此命令使服务器终止前一个FTP服务命令以及任何相关数据传输。ACCT(ACCOUNT)此命令的参数部分使用一个Telnet字符串来指明用户的账户。ADAT(AUTHENTICATION/SECURITY DAT…

typora + EasyBlogImageForTypora直接上传图片到博客园

&#x1f680; 优质资源分享 &#x1f680; 学习路线指引&#xff08;点击解锁&#xff09;知识定位人群定位&#x1f9e1; Python实战微信订餐小程序 &#x1f9e1;进阶级本课程是python flask微信小程序的完美结合&#xff0c;从项目搭建到腾讯云部署上线&#xff0c;打造一…