RabbitMQ实践——搭建单人聊天服务

大纲

  • 创建Core交换器
  • 用户登录
  • 发起聊天邀请
  • 接受邀请
  • 聊天
  • 实验过程
  • 总结
  • 代码工程

经过之前的若干节的学习,我们基本掌握了Rabbitmq各个组件和功能。本文我们将使用之前的知识搭建一个简单的单人聊天服务。
基本结构如下。为了避免Server有太多连线导致杂乱,下图将Server画成两个模块,实则是一个服务。
在这里插入图片描述
该服务由两个核心交换器构成。
Core交换器是服务启动时创建的,它主要是为了向不同用户传递“系统通知型”消息。比如Jerry向Tom发起聊天邀请,则是通过上面黑色字体6-10的流程发给了Core交换器。然后Core交换器将该条消息告知Tom。
Fanout交换器是用来消息传递的。Jerry和Tom都向其发送消息,然后路由到两个队列。它们两各自订阅一个队列,就可以看到彼此的聊天内容了。

创建Core交换器

package com.rabbitmq.chat.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import reactor.core.publisher.Flux;@Service
public class Core {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;final String exchangeName = "Core";@PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();createExchange(exchangeName);}private void createExchange(String exchangeName) {rabbitTemplate.execute(channel -> {channel.exchangeDeclare(exchangeName, "direct", false, true, null);return null;});}

用户登录

用户登录后,我们会创建一个“系统通知”队列。然后用户就会通过长连接形式,持续等待系统发出通知。

    private final ReentrantLock lock = new ReentrantLock();final private Map<String, SimpleMessageListenerContainer> listeners = new java.util.HashMap<>();public Flux<String> Login(String username) {createExclusiveQueue(username);createBanding(exchangeName, username, username);return Flux.create(emitter -> {SimpleMessageListenerContainer container = getListener(username, (Message message) -> {String msg = new String(message.getBody());System.out.println("Received message: " + msg);emitter.next(msg);});container.start();});}private void createExchange(String exchangeName) {rabbitTemplate.execute(channel -> {channel.exchangeDeclare(exchangeName, "direct", false, true, null);return null;});}private void createBanding(String exchangeName, String queueName, String routingKey) {rabbitTemplate.execute(channel -> {channel.queueBind(queueName, exchangeName, routingKey);return null;});}private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {lock.lock();try {SimpleMessageListenerContainer listener = listeners.get(queueName);if (listener == null && messageListener != null) {listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener(messageListener);listeners.put(queueName, listener);}return listener;} finally {lock.unlock();}}

Controller如下

package com.rabbitmq.chat.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.chat.service.Core;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/user")
public class UserController {@Autowiredprivate Core core;@PostMapping(value = "/login", produces = "text/event-stream")public Flux<String> login(@RequestParam String username) {return core.Login(username);}
}

发起聊天邀请

发起聊天邀请时,系统会预先创建一个聊天室(ChatRoomInfo )。它包含上图中Fanout交换器、以及聊天双方需要订阅的消息队列。
这些创建完后,发起方就会等待对方发送的消息,也可以自己和自己聊天。因为消息队列已经创建好了,只是对方还没使用。

package com.rabbitmq.chat.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import lombok.Data;
import reactor.core.publisher.Flux;@Service
public class ChatRoom {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;@Dataprivate class ChatRoomInfo {private String exchange;private Map<String, String> usernameToQueuename;}private final Map<String, ChatRoomInfo> chatRooms = new java.util.HashMap<>();private final ReentrantLock lock = new ReentrantLock();   @PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();}public Flux<String> invite(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {createChatRoom(fromUsername, toUsername);}return talk(chatRoomName, fromUsername);}private void createChatRoom(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);String exchangeName = chatRoomName;String fromQueueName = "queue-" + fromUsername + "-" + toUsername;String toQueueName = "queue-" + toUsername + "-" + fromUsername;rabbitTemplate.execute(action -> {action.exchangeDeclare(exchangeName, "fanout", false, true, null);action.queueDeclare(fromQueueName, false, true, false, null);action.queueDeclare(toQueueName, false, true, false, null);action.queueBind(fromQueueName, exchangeName, "");action.queueBind(toQueueName, exchangeName, "");return null;});lock.lock();try {ChatRoomInfo chatRoomInfo = new ChatRoomInfo();chatRoomInfo.setExchange(exchangeName);chatRoomInfo.setUsernameToQueuename(Map.of(fromUsername, fromQueueName, toUsername, toQueueName));chatRooms.put(chatRoomName, chatRoomInfo);} finally {lock.unlock();}}

接受邀请

被邀请方通过Core交换器得知有人要和它聊天。
然后接受邀请的请求会寻找聊天室信息,然后订阅聊天记录队列。

    public Flux<String> accept(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);return talk(chatRoomName, toUsername);}private Flux<String> talk(String chatRoomName, String username) {ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {throw new IllegalArgumentException("Chat room not found");}String queueName = chatRoomInfo.getUsernameToQueuename().get(username);return Flux.create(emitter -> {SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener((Message message) -> {String msg = new String(message.getBody());System.out.println(username + " received message: " + msg);emitter.next(msg);});listener.start();});}

聊天

聊天的逻辑就是找到聊天室信息,然后向交换器发送消息。

    public void chat(String fromUsername, String toUsername, String message) {String chatRoomName = getChatRoomName(fromUsername, toUsername);ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {chatRoomName = getChatRoomName(toUsername, fromUsername);chatRoomInfo = chatRooms.get(chatRoomName);}if (chatRoomInfo == null) {throw new IllegalArgumentException("Chat room not found");}rabbitTemplate.convertAndSend(chatRoomInfo.getExchange(), "", fromUsername + ": " + message);}private String getChatRoomName(String fromUsername, String toUsername) {return fromUsername + "-" + toUsername + "-chat-room";}

Controller侧代码

package com.rabbitmq.chat.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.chat.service.ChatRoom;
import com.rabbitmq.chat.service.Core;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/chat")
public class ChatController {@Autowiredprivate Core core;@Autowiredprivate ChatRoom chatRoom;@PutMapping(value = "/invite", produces = "text/event-stream")public Flux<String> invite(@RequestParam String fromUsername, @RequestParam String toUsername) {core.invite(fromUsername, toUsername);return chatRoom.invite(fromUsername, toUsername);}@PutMapping(value = "/accept", produces = "text/event-stream")public Flux<String> accept(@RequestParam String fromUsername, @RequestParam String toUsername) {core.accept(fromUsername, toUsername);return chatRoom.accept(fromUsername, toUsername);}@PostMapping("/send")public void send(@RequestParam String fromUsername, @RequestParam String toUsername, @RequestParam String message) {chatRoom.chat(fromUsername, toUsername, message);}
}

实验过程

在Postman中,我们先让tom登录,然后jerry登录。
在这里插入图片描述
在这里插入图片描述
在后台,我们看到创建两个队列
在这里插入图片描述
以及Core交换器的绑定关系也被更新
在这里插入图片描述
Jerry向Tom发起聊天邀请
在这里插入图片描述
可以看到Tom收到了邀请
在这里插入图片描述
同时新增了两个队列
在这里插入图片描述
以及一个交换器
在这里插入图片描述
在这里插入图片描述
Tom通过下面请求接受邀请
在这里插入图片描述
Jerry收到Tom接受了邀请的通知
在这里插入图片描述
后面它们就可以聊天了
在这里插入图片描述
在这里插入图片描述
它们的聊天窗口都收到了消息
在这里插入图片描述
在这里插入图片描述

总结

本文主要使用的知识点:

  • direct交换器以及其绑定规则
  • fanout交换器
  • 自动删除的交换器
  • 自动删除的队列
  • 只有一个消费者的队列
  • WebFlux响应式编程

代码工程

https://github.com/f304646673/RabbitMQDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/36634.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

哈喽GPT-4o,对GPT-4o 数据分析Data Analysis的思考与看法

目录 上传一个Excel给Data Analysis。Prompt&#xff1a;请问这个数据集是做什么的Prompt&#xff1a;请问书籍的定价如何&#xff0c;请用合适的图表展示它的售价情况Prompt&#xff1a;请统计书名列中出现最多的名称&#xff0c;然后使用词云将其可视化。Prompt&#xff1a;请…

笔记 - shell脚本

前言 Shell脚本是一种用来执行命令行命令的脚本文件。它是由一系列Shell命令组成的程序&#xff0c;通常用于自动化任务、系统管理、数据处理等。Shell脚本可以运行在各种Unix和Linux系统上&#xff0c;也可以在Windows上的一些兼容环境&#xff08;如Cygwin、WSL&#xff09;中…

js闭包函数

闭包是指在一个函数内部定义的函数&#xff0c;并且该函数可以访问到外部函数的变量。闭包可以将外部函数的变量保持在内存中&#xff0c;并且不会被释放。 闭包具有以下特性&#xff1a; 1. 函数内部定义的函数可以访问外部函数的变量。 2. 外部函数的变量可以保持在内存中&am…

[大师C语言(第三十九篇)]C语言const关键字深度解析与实战技巧

第一部分&#xff1a;C语言const关键字深度解析 一、const关键字的作用 在C语言中&#xff0c;const关键字是一个用于声明常量的修饰符。它用于指定一个变量的值在程序执行过程中不能被改变。使用const关键字可以确保代码的稳定性&#xff0c;防止不必要的变化&#xff0c;并…

58.鸿蒙系统app(HarmonyOS)(ArkUI)更改应用程序图标

替换xx\MyApplication4.30\entry\src\main\resources\base\media目录下icon.png文件 54.HarmonyOS鸿蒙系统 App(ArkTS)tcp socket套接字网络连接收发测试_鸿蒙socket连接测试-CSDN博客

『Django』模型入门教程-操作MySQL

theme: smartblue 点赞 关注 收藏 学会了 本文简介 一个后台如果没有数据库可以说废了一半。日常开发中大多数时候都在与数据库打交道。Django 为我们提供了一种更简单的操作数据库的方式。 在 Django 中&#xff0c;模型(Model)是用来定义数据库结构的类。每个模型类通常对…

C++之STL(十二)

1、容器适配器 #include <iostream> #include <stack> #include <list> #include <queue> #include <functional> #include <iterator>using namespace std;int main() {// 栈&#xff08;先进后出filo&#xff09;stack<int, list<…

基于PHP的长城景区信息管理系统

有需要请加文章底部Q哦 可远程调试 基于PHP的长城景区信息管理系统 一 介绍 此长城景区信息管理系统基于原生PHP开发&#xff0c;数据库mysql。系统角色分为用户和管理员。 技术栈&#xff1a;phpmysqlphpstudyvscode 二 功能 用户 1 注册/登录/注销 2 浏览长城景区信息(旅…

Unity解决报错:Execution failed for task ‘:unityLibrary:BuildIl2CppTask‘

目录 编辑器版本2020.3.33f1 及 2021.3.15f1 直接导出apk或aar报错(虽然会自动生成temp的AS工程&#xff0c;经过打开验证 也是无解的)&#xff1b; 唯一解决办法&#xff1a;Unity导出As工程没问题&#xff1b; 编辑器版本2020.3.33f1 及 2021.3.15f1 直接导出apk或aar报…

Pytorch-----(6)

一 、问题 如何计算基于不同变量的操作如矩阵乘法。 二、具体实现 0.4版本以前&#xff0c;张量是包裹在变量之中的&#xff0c;后者有三个属性grad、volatile和 requires_grad属性。&#xff08;grad 就是梯度属性&#xff0c;requires_grad属性就是 是否需要存储梯度&#x…

[电子电路学]电路分析基本概念1

第一章 电路分析的基本概念和基本定律 电路模型 反映实际电路部件的主要电磁性质的理想电路元件及其组合&#xff0c;是实际电路电气特性的抽象和近似。 理想电路元件 实际电路器件品种繁多&#xff0c;其电磁特性多元而复杂&#xff0c;分析和计算时非常困难。而理想电路元件…

一款开源、免费、现代化风格的WPF UI控件库

前言 今天大姚给大家分享一款开源&#xff08;MIT License&#xff09;、免费、现代化风格的WPF UI控件库&#xff1a;ModernWpf。 项目介绍 ModernWpf是一个开源项目&#xff0c;它为 WPF 提供了一组现代化的控件和主题&#xff0c;使开发人员能够创建具有现代外观的桌面应…

【pytorch09】数学运算

1.数学操作 add/minus/multiply/dividematmulpowsqrt/rsqrtround 2.加减乘除 加法 矩阵乘法 torch.mm 只适用于2d torch.matmul 要分清楚是矩阵元素相乘&#xff0c;还是矩阵相乘 例子 x一共有4张照片&#xff0c;每张照片打平成784的向量&#xff0c;希望降维得到[4,51…

戴尔笔记本重装系统?笔记本卡顿失灵?一键重装系统!

随着科技的快速发展&#xff0c;笔记本电脑已成为我们日常生活和工作中不可或缺的工具。然而&#xff0c;随着时间的推移&#xff0c;笔记本可能会遇到各种问题&#xff0c;如系统卡顿、失灵等。这时&#xff0c;重装系统往往是一个有效的解决方案。本文将详细介绍如何在戴尔笔…

ONLYOFFICE 8.1编辑器桌面应用程序来袭——在线全面测评

目录 ✈下载✈ &#x1f440;界面&#x1f440; &#x1f44a;功能&#x1f44a; &#x1f9e0;幻灯片版式的重大改进&#x1f9e0; ✂无缝切换文档编辑、审阅和查看模式✂ &#x1f3b5;在演示文稿中播放视频和音频文件&#x1f3b5; &#x1f917;版本 8.1&#xff1a…

一键生成AI动画视频?Animatediff 和 ComfyUI 更配哦!

大家好我是极客菌&#xff01; 之前我分享过 Animatediff 在 WebUI 中的应用&#xff0c;最近不是在分享 ComfyUI 嘛&#xff0c;那我们也来讲讲 Animatediff 在 ComfyUI 的应用。 如果从工作流和内存利用率的角度来说&#xff0c;Animatediff 和 ComfyUI 可能更配一些&#…

大数据面试题之Flume

目录 介绍下Flume Flume架构 Flume有哪些Source 说下Flume事务机制 介绍下Flume采集数据的原理&#xff1f;底层实现&#xff1f; Flume如何保证数据的可靠性 Flume传输数据时如何保证数据一致性&#xff08;可靠性&#xff09; Flume拦截器 如何监控消费型Flu…

深入理解SSH:网络安全的守护者

在当今数字化时代&#xff0c;网络安全已成为全球关注的焦点。随着网络攻击手段的不断升级&#xff0c;保护数据传输的安全性变得尤为重要。SSH&#xff08;Secure Shell&#xff09;作为一种安全的网络协议&#xff0c;为远程登录和网络服务提供了强大的安全保障&#xff0c;成…

大数据面试题之Kafka(3)

目录 Kafka支持什么语义&#xff0c;怎么实现ExactlyOnce? Kafka的消费者和消费者组有什么区别?为什么需要消费者组? Kafka producer的写入数据过程? Kafka producer的ack设署 Kafka的ack机制&#xff0c;解决了什么问题? Kafka读取消息是推还是拉的模式?有什…

Duix - 硅基数字人SDK

简介 Introduction DUIX(Dialogue User Interface System)是硅基智能打造的AI数字人智能交互平台。通过将数字人交互能力开源,开发者可自行接入多方大模型、语音识别(ASR)、语音合成(TTS)能力,实现数字人实时交互,并在Android和iOS多终端一键部署,让每个开发者可轻松…