RabbitMQ深度探索:简单实现 MQ

基于多线程队列实现 MQ :

  1. 实现类:
    public class ThreadMQ {private static LinkedBlockingDeque<JSONObject> broker = new LinkedBlockingDeque<JSONObject>();public static void main(String[] args) {//创建生产者线程Thread producer = new Thread(new Runnable() {@Overridepublic void run() {while (true){try {Thread.sleep(1000);JSONObject data = new JSONObject();data.put("phone","11111111");broker.offer(data);}catch (Exception e){}}}},"生产者");producer.start();Thread consumer = new Thread(new Runnable() {@Overridepublic void run() {while (true){try {JSONObject data = broker.poll();if(data != null){System.out.println(Thread.currentThread().getName() + data.toJSONString());}}catch (Exception e){}}}},"消费者");consumer.start();}
    }

基于 netty 实现 MQ:

  1. 执行过程:
    1. 消费者 netty 客户端与 nettyServer 端 MQ 服务器保持长连接,MQ 服务器端保存消费者连接
    2. 生产者 netty 客户端发送请求给 nettyServer 端 MQ 服务器,MQ 服务器端再将消息内容发送给消费者
  2. 执行流程:
    1. 导入 Maven 依赖:
      <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version>
      </dependency>
      <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.0.23.Final</version>
      </dependency>
      <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version>
      </dependency>
      <dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.11</version>
      </dependency>
      <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.5</version>
      </dependency>
    2. 服务端:
      package com.qcby.springboot.MQ;import com.alibaba.fastjson.JSONObject;
      import io.netty.bootstrap.ServerBootstrap;
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      import io.netty.channel.*;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioServerSocketChannel;
      import org.apache.commons.lang3.StringUtils;import java.io.UnsupportedEncodingException;
      import java.util.ArrayList;
      import java.util.concurrent.LinkedBlockingDeque;/*** @ClassName BoyatopMQServer2021* @Author* @Version V1.0**/
      public class BoyatopNettyMQServer {public void bind(int port) throws Exception {/*** Netty 抽象出两组线程池BossGroup和WorkerGroup* BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写。*/EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();try {bootstrap.group(bossGroup, workerGroup)// 设定NioServerSocketChannel 为服务器端.channel(NioServerSocketChannel.class)//BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,//用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。.option(ChannelOption.SO_BACKLOG, 100)// 服务器端监听数据回调Handler.childHandler(new BoyatopNettyMQServer.ChildChannelHandler());//绑定端口, 同步等待成功;ChannelFuture future = bootstrap.bind(port).sync();System.out.println("当前服务器端启动成功...");//等待服务端监听端口关闭future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {//优雅关闭 线程组bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 设置异步回调监听ch.pipeline().addLast(new BoyatopNettyMQServer.MayiktServerHandler());}}public static void main(String[] args) throws Exception {int port = 9008;new BoyatopNettyMQServer().bind(port);}private static final String type_consumer = "consumer";private static final String type_producer = "producer";private static LinkedBlockingDeque<String> msgs = new LinkedBlockingDeque<>();private static ArrayList<ChannelHandlerContext> ctxs = new ArrayList<>();// 生产者投递消息的:topicNamepublic class MayiktServerHandler extends SimpleChannelInboundHandler<Object> {/*** 服务器接收客户端请求** @param ctx* @param data* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object data)throws Exception {//ByteBuf buf=(ByteBuf)data;//byte[] req = new byte[buf.readableBytes()];//buf.readBytes(req);//String body = new String(req, "UTF-8");//System.out.println("body:"+body);JSONObject clientMsg = getData(data);String type = clientMsg.getString("type");switch (type) {case type_producer:producer(clientMsg);break;case type_consumer:consumer(ctx);break;}}private void consumer(ChannelHandlerContext ctx) {// 保存消费者连接ctxs.add(ctx);// 主动拉取mq服务器端缓存中没有被消费的消息String data = msgs.poll();if (StringUtils.isEmpty(data)) {return;}// 将该消息发送给消费者byte[] req = data.getBytes();ByteBuf firstMSG = Unpooled.buffer(req.length);firstMSG.writeBytes(req);ctx.writeAndFlush(firstMSG);}private void producer(JSONObject clientMsg) {// 缓存生产者投递 消息String msg = clientMsg.getString("msg");msgs.offer(msg); //保证消息不丢失还可以缓存硬盘//需要将该消息推送消费者ctxs.forEach((ctx) -> {// 将该消息发送给消费者String data = msgs.poll();if (data == null) {return;}byte[] req = data.getBytes();ByteBuf firstMSG = Unpooled.buffer(req.length);firstMSG.writeBytes(req);ctx.writeAndFlush(firstMSG);});}private JSONObject getData(Object data) throws UnsupportedEncodingException {ByteBuf buf = (ByteBuf) data;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");return JSONObject.parseObject(body);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {ctx.close();}}
      }
    3. 生产端:
      package com.qcby.springboot.MQ;import com.alibaba.fastjson.JSONObject;
      import io.netty.bootstrap.Bootstrap;
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      import io.netty.channel.*;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioSocketChannel;/*** @ClassName BoyatopNettyMQProducer* @Author* @Version V1.0**/
      public class BoyatopNettyMQProducer {public void connect(int port, String host) throws Exception {//配置客户端NIO 线程组EventLoopGroup group = new NioEventLoopGroup();Bootstrap client = new Bootstrap();try {client.group(group)// 设置为Netty客户端.channel(NioSocketChannel.class)/*** ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。* Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。* 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。*/.option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new BoyatopNettyMQProducer.NettyClientHandler());1. 演示LineBasedFrameDecoder编码器
      //                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
      //                            ch.pipeline().addLast(new StringDecoder());}});//绑定端口, 异步连接操作ChannelFuture future = client.connect(host, port).sync();//等待客户端连接端口关闭future.channel().closeFuture().sync();} finally {//优雅关闭 线程组group.shutdownGracefully();}}public static void main(String[] args) {int port = 9008;BoyatopNettyMQProducer client = new BoyatopNettyMQProducer();try {client.connect(port, "127.0.0.1");} catch (Exception e) {e.printStackTrace();}}public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {JSONObject data = new JSONObject();data.put("type", "producer");JSONObject msg = new JSONObject();msg.put("userId", "123456");msg.put("age", "23");data.put("msg", msg);// 生产发送数据byte[] req = data.toJSONString().getBytes();ByteBuf firstMSG = Unpooled.buffer(req.length);firstMSG.writeBytes(req);ctx.writeAndFlush(firstMSG);}/*** 客户端读取到服务器端数据** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");System.out.println("客户端接收到服务器端请求:" + body);}// tcp属于双向传输@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}}
      }
    4. 客户端:
      package com.qcby.springboot.MQ;import com.alibaba.fastjson.JSONObject;
      import io.netty.bootstrap.Bootstrap;
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      import io.netty.channel.*;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioSocketChannel;/*** @ClassName BoyatopNettyMQProducer* @Author* @Version V1.0**/
      public class NettyMQConsumer {public void connect(int port, String host) throws Exception {//配置客户端NIO 线程组EventLoopGroup group = new NioEventLoopGroup();Bootstrap client = new Bootstrap();try {client.group(group)// 设置为Netty客户端.channel(NioSocketChannel.class)/*** ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。* Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。* 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。*/.option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new NettyMQConsumer.NettyClientHandler());1. 演示LineBasedFrameDecoder编码器
      //                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
      //                            ch.pipeline().addLast(new StringDecoder());}});//绑定端口, 异步连接操作ChannelFuture future = client.connect(host, port).sync();//等待客户端连接端口关闭future.channel().closeFuture().sync();} finally {//优雅关闭 线程组group.shutdownGracefully();}}public static void main(String[] args) {int port = 9008;NettyMQConsumer client = new NettyMQConsumer();try {client.connect(port, "127.0.0.1");} catch (Exception e) {e.printStackTrace();}}public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {JSONObject data = new JSONObject();data.put("type", "consumer");// 生产发送数据byte[] req = data.toJSONString().getBytes();ByteBuf firstMSG = Unpooled.buffer(req.length);firstMSG.writeBytes(req);ctx.writeAndFlush(firstMSG);}/*** 客户端读取到服务器端数据** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");System.out.println("客户端接收到服务器端请求:" + body);}// tcp属于双向传输@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}}
      }
  3. 持久化机制:
    1. 如果 MQ 接收到生产者投递信息,如果消费者不存在的情况下,消息是否会丢失?
    2. 答:不会丢失,消息确认机制必须要消费者消费成功之后,在通知给 MQ 服务器端,删除该消息
  4. MQ 服务器将该消息推送给消费者:
    1. 消费者已经和 MQ 服务器保持长连接
    2. 消费者在第一次启动的时候会主动拉取信息
  5. MQ 如何实现高并发思想:
    1. MQ 消费者根据自身能力情况,拉取 MQ 服务器端消费消息
    2. 默认的情况下取出一条消息
  6. 缺点:
    1. 存在延迟问题
  7. 需要考虑 MQ 消费者提高速率的问题:
    1. 如何提高消费者速率:消费者实现集群、消费者批量获取消息即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/69151.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

国产编辑器EverEdit - 工具栏说明

1 工具栏 1.1 应用场景 当用户想显示/隐藏界面的标签栏、工具栏、状态栏、主菜单等界面元素时&#xff0c;可以通过EverEdit的菜单选项进行设置。 1.2 使用方法 选择菜单查看 -> 工具栏&#xff0c;在工具栏的子菜单中选择勾选或去掉勾选对应的选项。 标签栏&#xff1…

虚幻UE5手机安卓Android Studio开发设置2025

一、下载Android Studio历史版本 步骤1&#xff1a;虚幻4.27、5.0、5.1、5.2官方要求Andrd Studio 4.0版本&#xff1b; 5.3、5.4、5.5官方要求的版本为Android Studio Flamingo | 2022.2.1 Patch 2 May 24, 2023 虚幻官网查看对应Andrd Studiob下载版本&#xff1a; https:/…

VLAN 基础 | 不同 VLAN 间通信实验

注&#xff1a;本文为 “ Vlan 间通信” 相关文章合辑。 英文引文&#xff0c;机翻未校。 图片清晰度限于原文图源状态。 未整理去重。 How to Establish Communications between VLANs? 如何在 VLAN 之间建立通信&#xff1f; Posted on November 20, 2015 by RouterSwi…

bat脚本实现自动化漏洞挖掘

bat脚本 BAT脚本是一种批处理文件&#xff0c;可以在Windows操作系统中自动执行一系列命令。它们可以简化许多日常任务&#xff0c;如文件操作、系统配置等。 bat脚本执行命令 echo off#下面写要执行的命令 httpx 自动存活探测 echo off httpx.exe -l url.txt -o 0.txt nuc…

堆的实现——堆的应用(堆排序)

文章目录 1.堆的实现2.堆的应用--堆排序 大家在学堆的时候&#xff0c;需要有二叉树的基础知识&#xff0c;大家可以看我的二叉树文章&#xff1a;二叉树 1.堆的实现 如果有⼀个关键码的集合 K {k0 , k1 , k2 , …&#xff0c;kn−1 } &#xff0c;把它的所有元素按完全⼆叉树…

edu小程序挖掘严重支付逻辑漏洞

edu小程序挖掘严重支付逻辑漏洞 一、敏感信息泄露 打开购电小程序 这里需要输入姓名和学号&#xff0c;直接搜索引擎搜索即可得到&#xff0c;这就不用多说了&#xff0c;但是这里的手机号可以任意输入&#xff0c;只要用户没有绑定手机号这里我们输入自己的手机号抓包直接进…

FRP通过公网IP实现内网穿透

FRP通过公网IP实现内网穿透 一、简介二、安装服务端1、下载2、安装FRP3、使用 systemd 命令管理 frps 服务4、设置 frps 开机自启动 三、安装客户端1、下载2、安装FRP3、使用 systemd 命令管理 frpc 服务4、设置 frpc 开机自启动 四、访问仪表盘 一、简介 frp 是一款高性能的反…

K8S学习笔记-------1.安装部署K8S集群环境

1.修改为root权限 #sudo su 2.修改主机名 #hostnamectl set-hostname k8s-master01 3.查看网络地址 sudo nano /etc/netplan/01-netcfg.yaml4.使网络配置修改生效 sudo netplan apply5.修改UUID&#xff08;某些虚拟机系统&#xff0c;需要设置才能生成UUID&#xff09;#…

go运算符

内置运算符 算术运算符关系运算符逻辑运算符位运算符赋值运算符 算术运算符 注意&#xff1a; &#xff08;自增&#xff09;和–&#xff08;自减&#xff09;在 Go 语言中是单独的语句&#xff0c;并不是运算符 package mainimport "fmt"func main() {fmt.Printl…

【贪心算法篇】:“贪心”之旅--算法练习题中的智慧与策略(一)

✨感谢您阅读本篇文章&#xff0c;文章内容是个人学习笔记的整理&#xff0c;如果哪里有误的话还请您指正噢✨ ✨ 个人主页&#xff1a;余辉zmh–CSDN博客 ✨ 文章所属专栏&#xff1a;贪心算法篇–CSDN博客 文章目录 一.贪心算法1.什么是贪心算法2.贪心算法的特点 二.例题1.柠…

一款wordpress AI免费插件自动内容生成+前端AI交互+文章批量采集

一款wordpressAI自动内容生成前端AI会话窗口交互文章批量采集免费插件 1. SEO优化文章生成 关键词驱动的内容生成&#xff1a;用户可以输入关键词或长尾关键词&#xff0c;插件会根据这些关键词生成高质量的SEO优化文章。文章结构清晰&#xff0c;语言自然流畅&#xff0c;符合…

Linux03——常见的操作命令

root用户以及权限 Linux系统的超级管理员用户是&#xff1a;root用户 su命令 可以切换用户&#xff0c;语法&#xff1a;su [-] [用户名]- 表示切换后加载环境变量&#xff0c;建议带上用户可以省略&#xff0c;省略默认切换到root su命令是用于账户切换的系统命令&#xff…

使用 Ollama 在 Windows 环境部署 DeepSeek 大模型实战指南

文章目录 前言Ollama核心特性 实战步骤安装 Ollama验证安装结果部署 DeepSeek 模型拉取模型启动模型 交互体验命令行对话调用 REST API 总结个人简介 前言 近年来&#xff0c;大语言模型&#xff08;LLM&#xff09;的应用逐渐成为技术热点&#xff0c;而 DeepSeek 作为国产开…

关于大数据

在大数据背景下存在的问题&#xff1a; 非结构化、半结构化数据&#xff1a;NoSQL数据库只负责存储&#xff1b;程序处理时涉及到数据移动&#xff0c;速度慢 是否存在一套整体解决方案&#xff1f; 可以存储并处理海量结构化、半结构化、非结构化数据 处理海量数据的速…

通过docker安装部署deepseek以及python实现

前提条件 Docker 安装:确保你的系统已经安装并正确配置了 Docker。可以通过运行 docker --version 来验证 Docker 是否安装成功。 网络环境:保证设备有稳定的网络连接,以便拉取 Docker 镜像和模型文件。 步骤一:拉取 Ollama Docker 镜像 Ollama 可以帮助我们更方便地管理…

企业四要素如何用PHP进行调用

一、什么是企业四要素&#xff1f; 企业四要素接口是在企业三要素&#xff08;企业名称、统一社会信用代码、法定代表人姓名&#xff09;的基础上&#xff0c;增加了一个关键要素&#xff0c;通常是企业注册号或企业银行账户信息。这种接口主要用于更全面的企业信息验证&#x…

Android性能优化系列——卡顿优化

卡顿&#xff0c;就是用户体感界面不流畅。我们知道手机的屏幕画面是按照一定频率来刷新的&#xff0c;理论上讲&#xff0c;24 帧的画面更新就能让人眼感觉是连贯的。但是实际上&#xff0c;这个只是针对普通的视频而言。对于一些强交互或者较为敏感的场景来说&#xff0c;比如…

激光工控机在自动化领域中有哪些作用?

首先是对于高精度加工控制方面&#xff1a;激光工控机能够精确控制激光光束的运动轨迹和输出功率&#xff0c;实现对各种材料的精细切割、雕刻和焊接&#xff0c;保证加工质量和效率。 其次还能实时监控与远程控制激光工控机凭借其强大的网络通信功能&#xff0c;可以实时监控…

陷入闭包:理解 React 状态管理中的怪癖

TLDR 闭包就像函数随身携带的背包&#xff0c;包含它们创建时的数据React 组件使用闭包来记住它们的状态和属性过时的闭包可能导致状态更新不如预期时的错误函数式更新提供了一个可靠的方式来处理最新状态 简介 你是否曾经疑惑过&#xff0c;为什么有时你的 React 状态更新不…

基于STM32的智能加湿器设计(新版本)

目录 1、设计要求 2、系统功能 3、演示视频和实物 4、系统设计框图 5、软件设计流程图 6、原理图 7、主程序 8、总结 &#x1f91e;大家好&#xff0c;这里是5132单片机毕设设计项目分享&#xff0c;今天给大家分享的是加湿器。设备的详细功能见网盘中的文章《12、基于…