Java实现消息队列服务

使用 JAVA 语言自己动手来写一个MQ (类似ActiveMQ,RabbitMQ)
在这里插入图片描述
主要角色

首先我们必须需要搞明白 MQ (消息队列) 中的三个基本角色

ProducerBrokerConsumer

整体架构如下所示
在这里插入图片描述
自定义协议

首先从上一篇中介绍了协议的相关信息,具体厂商的 MQ(消息队列) 需要遵循某种协议或者自定义协议 , 消息的 生产者和消费者需要遵循其协议(约定)才能后成功地生产消息和生产消息 ,所以在这里我们自定义一个协议如下.

消息处理中心 : 如果接收到的信息包含"SEND"字符串,即视为生产者发送的消息,消息处理中心需要将此信息存储等待消费者消费

消息处理中心 : 如果接受到的信息为CONSUME,既视为消费者发送消费请求,需要将存储的消息队列头部的信息转发给消费者,然后将此消息从队列中移除

消息处理中心 : 如果消息处理中心存储的消息满3条仍然没有消费者进行消费,则不再接受生产者的生产请求

消息生产者:需要遵循协议将生产的消息头部增加"SEND:" 表示生产消息

消息消费者:需要遵循协议向消息处理中心发送"CONSUME"字符串表示消费消息

流程顺序

项目构建流程

下面将整个MQ的构建流程过一遍

新建一个 Broker 类,内部维护一个 ArrayBlockingQueue 队列,提供生产消息和消费消息的方法, 仅仅具备存储服务功能

新建一个 BrokerServer 类,将 Broker 发布为服务到本地9999端口,监听本地9999端口的 Socket 链接,在接受的信息中进行我们的协议校验, 这里 仅仅具备接受消息,校验协议,转发消息功能;

新建一个 MqClient 类,此类提供与本地端口9999的Socket链接 , 仅仅具备生产消息和消费消息的方法

测试:新建两个 MyClient 类对象,分别执行其生产方法和消费方法

具体使用流程

生产消息:客户端执行生产消息方法,传入需要生产的信息,该信息需要遵循我们自定义的协议,消息处理中心服务在接受到消息会根据自定义的协议校验该消息是否合法,如果合法如果合法就会将该消息存储到Broker内部维护的 ArrayBlockingQueue 队列中.如果 ArrayBlockingQueue 队列没有达到我们协议中的最大长度将将消息添加到队列中,否则输出生产消息失败.

消息消息:客户端执行消费消息方法, Broker服务 会校验请求的信息的信息是否等于 CONSUME ,如果验证成功则从Broker内部维护的 ArrayBlockingQueue 队列的 Poll 出一个消息返回给客户端

代码演示

消息处理中心 Broker

/*** * 消息处理中心* */
public class Broker {// 队列存储消息的最大数量private final static int MAX_SIZE = 3;// 保存消息数据的容器private static ArrayBlockingQueue messageQueue = new ArrayBlockingQueue(MAX_SIZE);// 生产消息public static void produce(String msg) {if (messageQueue.offer(msg)) {System.out.println("成功向消息处理中心投递消息:" + msg + ",当前暂存的消息数量是:" + messageQueue.size());} else {System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");}System.out.println("=======================");}// 消费消息public static String consume() { String msg = messageQueue.poll();if(msg !=null) {// 消费条件满足情况,从消息容器中取出一条消息System.out.println("已经消费消息:"+ msg +",当前暂存的消息数量是:"+ messageQueue.size());   }else{            System.out.println("消息处理中心内没有消息可供消费!");        }   System.out.println("=======================");returnmsg; }
}}

消息处理中心服务 BrokerServer

客户端 MqClient


/*** * 用于启动消息处理中心* */
public class BrokerServer implements Runnable {public static int SERVICE_PORT = 9999;private final Socket socket;public BrokerServer(Socket socket) {this.socket = socket;}@Overridepublic void run() {try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out = new PrintWriter(socket.getOutputStream())) {while (true) {String str = in.readLine();if (str == null) {continue;}System.out.println("接收到原始数据:" + str);if (str.equals("CONSUME")) {// CONSUME 表示要消费一条消息//从消息队列中消费一条消息String message = Broker.consume();out.println(message);out.flush();} else if (str.contains("SEND:")) {// 接受到的请求包含SEND:字符串 表示生产消息放到消息队列中Broker.produce(str);} else {System.out.println("原始数据:" + str + "没有遵循协议,不提供相关服务");}}} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) throws Exception {ServerSocket server = new ServerSocket(SERVICE_PORT);while (true) {BrokerServer brokerServer = new BrokerServer(server.accept());new Thread(brokerServer).start();}}
}

测试MQ

public class ProduceClient {public static void main(String[] args) throws Exception {MqClient client = newMqClient();client.produce("SEND:Hello World");}
}public class ConsumeClient {public static void main(String[] args) throws Exception {MqClient client = newMqClient();String message = client.consume();System.out.println("获取的消息为:" + message);}
}

我们多执行几次客户端的生产方法和消费方法就可以看到一个完整的MQ的通讯过程,下面是我执行了几次的一些日志

接收到原始数据:SEND:Hello World成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:

1=======================接收到原始数据:SEND:Hello World成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:
2=======================接收到原始数据:SEND:Hello World成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:
3=======================接收到原始数据:SEND:Hello World消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
=======================接收到原始数据:Hello World原始数据:Hello World没有遵循协议,不提供相关服务接收到原始数据:CONSUME已经消费消息:SEND:Hello World,当前暂存的消息数量是:
2=======================接收到原始数据:CONSUME已经消费消息:SEND:Hello World,当前暂存的消息数量是:
1=======================接收到原始数据:CONSUME已经消费消息:SEND:Hello World,当前暂存的消息数量是:
0=======================接收到原始数据:CONSUME消息处理中心内没有消息可供消费!=======================

小结

本章示例代码主要源自分布式消息中间件实践一书 , 这里我们自己使用Java语言写了一个MQ消息队列 , 通过这个消息队列我们对MQ中的几个角色 “生产者,消费者,消费处理中心,协议” 有了更深的理解 ; 那么下一章节我们就来一块学习具体厂商的MQ RabbitMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/249725.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java 发布订阅

1.发布者接口 package com.yy.subpub; /** * Description: 发布者接口 * author: leijing * date: 2016年9月29日 下午5:07:20 */ public interface IPublisher<M> { /** * Description: 向订阅器发布消息 * param subscribePublish 订阅器 * param message 消息 * para…

EasyNVR内网摄像机接入网关+EasyNVS云端管理平台,组件起一套轻量级类似于企业级萤石云的解决方案...

背景分析 对于EasyNVR我们应该都了解&#xff0c;主要应用于互联安防直播&#xff0c;对于EasyNVR&#xff0c;我们可以清楚的发现&#xff0c;EasyNVR的工作机制是EasyNVR拉取摄像机的RTSP/Onvif视频流&#xff0c;然后客户端可以通过访问EasyNVR服务端实现流分发&#xff0c;…

Vim删除文件到行首或者行尾

vim用的不是很熟练&#xff0c;只是有时候需要的时候会学习一下 我们知道&#xff0c;vim有三种模式&#xff0c;一种是一般模式&#xff0c;一种是编辑模式&#xff0c;另外一种是命令行模式 在一般模式下&#xff0c;可以进行删除&#xff0c;复制粘贴等操作&#xff0c;在编…

Golang的值类型和引用类型的范围、存储区域、区别

常见的值类型和引用类型分别有哪些&#xff1f; 值类型&#xff1a;基本数据类型 int 系列, float 系列, bool, string 、数组和结构体struct&#xff0c;使用这些类型的变量直接指向存在内存中的值&#xff0c;值类型的变量的值通常存储在栈中。 引用类型&#xff1a;指针、sl…

RPC框架实现原理

一、什么是RPC框架&#xff1f; RPC&#xff0c;全称为Remote Procedure Call&#xff0c;即远程过程调用&#xff0c;是一种计算机通信协议。 比如现在有两台机器&#xff1a;A机器和B机器&#xff0c;并且分别部署了应用A和应用B。假设此时位于A机器上的A应用想要调用位于B机…

zookeeper入门系列

zookeeper可谓是目前使用最广泛的分布式组件了。其功能和职责单一&#xff0c;但却非常重要。 在现今这个年代&#xff0c;介绍zookeeper的书和文章可谓多如牛毛&#xff0c;本人不才&#xff0c;试图通过自己的理解来介绍zookeeper&#xff0c;希望通过一个初学者的视角来学习…

plsql查询数据中文乱码

在plsql中进行表数据查询的时候&#xff0c;发现查询出来的中文居然显示为乱码&#xff0c;通过查找资料解决该问题。 1、查看数据的编码&#xff08;语句&#xff1a;select * from v$nls_parameters&#xff09; 发现显示的语言不是我们常用的GBK模式 2、配置本机语言环境变量…

Zookeeper的功能以及工作原理

1.ZooKeeper是什么&#xff1f; ZooKeeper是一个分布式的&#xff0c;开放源码的分布式应用程序协调服务&#xff0c;是Google的Chubby一个开源的实现&#xff0c;它是集群的管理者&#xff0c;监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终&#xf…

前端学习总结——CSS布局方式之传统布局

传统布局 传统布局即是早期在平板电脑、智能手机等移动设备并不流行的时候使用的布局方式。 一、表格布局 例如&#xff1a;采用表格方式实现如下简单模型的布局 &#xff08;1&#xff09;固定布局 即用具体的像素值来确定模型的宽和高等值。 HTML代码如下所示 <tabl…

[POI2007]MEG-Megalopolis

传送门&#xff1a;嘟嘟嘟 第一反应是树链剖分&#xff0c;但是太长懒得写&#xff0c;然后就想出了一个很不错的做法。 想一下&#xff0c;如果我们改一条边&#xff0c;那么影响的只有他的子树&#xff0c;只要先搞一个dfs序&#xff0c;为什么搞出这个呢&#xff1f;因为有一…

memcache在ThinkPHP中的使用1---PHP下安装memcache

1.什么是Memcached缓存 Memcached是一套小巧、高效且成熟的内存数据库。与普通的数据库不同&#xff0c;Memcached存储的数据只能是简单的键值对&#xff0c;在查询时需要根据存放的key获取数据。 Memcached最大的特点是数据存放于内存&#xff0c;性能会比传统文件系统高出…

骨骼收集器01背包

来源hdu2602 问题描述 许多年前&#xff0c;在泰迪的家乡&#xff0c;有一个人被称为“骨头收藏家”。这个男人喜欢收集各种各样的骨头&#xff0c;比如狗狗&#xff0c;牛&#xff0c;还有他去了坟墓...... 骨头收藏家有一个大容量的V袋&#xff0c;沿着他的收集之旅有很多骨头…

ZooKeeper的安装与部署

本文讲述如何安装和部署ZooKeeper。 一、系统要求 ZooKeeper可以运行在多种系统平台上面&#xff0c;表1展示了zk支持的系统平台&#xff0c;以及在该平台上是否支持开发环境或者生产环境。 表1&#xff1a;ZooKeeper支持的运行平台 | 系统 | 开发环境 | 生产环境| | Linux…

python爬取豆瓣前25个影片内容的正则表达式练习

通过python正则表达式获取豆瓣top250的第一页的25个影片排名,影片名字,影片连接,导演,主演,上映日期,国家,剧情,评分,评价人数的内容 网页html内容: 1 <ol class"grid_view">2 <li>3 <div class"item">4 …

JavaScript 面向对象的程序设计1

一、理解对象 1.创建一个对象&#xff0c;然后给这个对象新建属性和方法。 ①常见的创建方式 var person new Object(); //创建一个Object 对象person.name XIE; //创建一个name 属性并赋值person.age 20; //创建一个age 属性并赋值person.sayName function () { //创建…

Zookeeper 使用

安装和配置详解 本文介绍的 Zookeeper 是以 3.2.2 这个稳定版本为基础&#xff0c;最新的版本可以通过官网 http://hadoop.apache.org/zookeeper/来获取&#xff0c;Zookeeper 的安装非常简单&#xff0c;下面将从单机模式和集群模式两个方面介绍 Zookeeper 的安装和配置。 单…

Asp.Net Core 工作单元 UnitOfWork UOW

Asp.Net Core 工作单元示例 来自 ABP UOW 去除所有无用特性 代码下载 &#xff1a; 去除所有无用特性版本&#xff0c;原生AspNetCore实现 差不多 2278 行代码&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1NoEIDSAPNr46xNHYEx9KCA 提取码&#xff1a;570i 包含C…

网站性能优化--CRP

网站性能优化–CRP 为了把HTML、CSS和JavaScript转化成活灵活现、绚丽多彩的网页&#xff0c;浏览器需要处理一系列的中间过程&#xff0c;优化性能其实就是了解这个过程中发生了什么-即CRP(Critical Rendering Path&#xff0c;关键渲染路径)。首先&#xff0c;我们从头开始快…

Dubbo+zookeeper基础讲解

一、dubbo是什么&#xff1f; 1&#xff09;本质&#xff1a;一个Jar包,一个分布式框架,&#xff0c;一个远程服务调用的分布式框架。 既然是新手教学&#xff0c;肯定很多同学不明白什么是分布式和远程服务调用&#xff0c;为什么要分布式&#xff0c;为什么要远程调用。我简…

What Are You Talking About HDU1075

一开始我也想用map 但是处理不好其他字符。。 看了题解 多多学习&#xff01; 很巧妙 就是粗暴的一个字符一个字符的来 分为小写字母和非小写字母两个部分 一但单词结束的时候就开始判断。 #include<bits/stdc.h> using namespace std;int main() {string a,b;map&l…