RabbitMQ实践——搭建单人聊天服务

大纲

  • 创建Core交换器
  • 用户登录
  • 发起聊天邀请
  • 接受邀请
  • 聊天
  • 实验过程
  • 总结
  • 代码工程

经过之前的若干节的学习,我们基本掌握了Rabbitmq各个组件和功能。本文我们将使用之前的知识搭建一个简单的单人聊天服务。
基本结构如下。为了避免Server有太多连线导致杂乱,下图将Server画成两个模块,实则是一个服务。
在这里插入图片描述
该服务由两个核心交换器构成。
Core交换器是服务启动时创建的,它主要是为了向不同用户传递“系统通知型”消息。比如Jerry向Tom发起聊天邀请,则是通过上面黑色字体6-10的流程发给了Core交换器。然后Core交换器将该条消息告知Tom。
Fanout交换器是用来消息传递的。Jerry和Tom都向其发送消息,然后路由到两个队列。它们两各自订阅一个队列,就可以看到彼此的聊天内容了。

创建Core交换器

package com.rabbitmq.chat.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import reactor.core.publisher.Flux;@Service
public class Core {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;final String exchangeName = "Core";@PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();createExchange(exchangeName);}private void createExchange(String exchangeName) {rabbitTemplate.execute(channel -> {channel.exchangeDeclare(exchangeName, "direct", false, true, null);return null;});}

用户登录

用户登录后,我们会创建一个“系统通知”队列。然后用户就会通过长连接形式,持续等待系统发出通知。

    private final ReentrantLock lock = new ReentrantLock();final private Map<String, SimpleMessageListenerContainer> listeners = new java.util.HashMap<>();public Flux<String> Login(String username) {createExclusiveQueue(username);createBanding(exchangeName, username, username);return Flux.create(emitter -> {SimpleMessageListenerContainer container = getListener(username, (Message message) -> {String msg = new String(message.getBody());System.out.println("Received message: " + msg);emitter.next(msg);});container.start();});}private void createExchange(String exchangeName) {rabbitTemplate.execute(channel -> {channel.exchangeDeclare(exchangeName, "direct", false, true, null);return null;});}private void createBanding(String exchangeName, String queueName, String routingKey) {rabbitTemplate.execute(channel -> {channel.queueBind(queueName, exchangeName, routingKey);return null;});}private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {lock.lock();try {SimpleMessageListenerContainer listener = listeners.get(queueName);if (listener == null && messageListener != null) {listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener(messageListener);listeners.put(queueName, listener);}return listener;} finally {lock.unlock();}}

Controller如下

package com.rabbitmq.chat.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.chat.service.Core;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/user")
public class UserController {@Autowiredprivate Core core;@PostMapping(value = "/login", produces = "text/event-stream")public Flux<String> login(@RequestParam String username) {return core.Login(username);}
}

发起聊天邀请

发起聊天邀请时,系统会预先创建一个聊天室(ChatRoomInfo )。它包含上图中Fanout交换器、以及聊天双方需要订阅的消息队列。
这些创建完后,发起方就会等待对方发送的消息,也可以自己和自己聊天。因为消息队列已经创建好了,只是对方还没使用。

package com.rabbitmq.chat.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import lombok.Data;
import reactor.core.publisher.Flux;@Service
public class ChatRoom {@Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;@Dataprivate class ChatRoomInfo {private String exchange;private Map<String, String> usernameToQueuename;}private final Map<String, ChatRoomInfo> chatRooms = new java.util.HashMap<>();private final ReentrantLock lock = new ReentrantLock();   @PostConstructpublic void init() {connectionFactory = rabbitTemplate.getConnectionFactory();}public Flux<String> invite(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {createChatRoom(fromUsername, toUsername);}return talk(chatRoomName, fromUsername);}private void createChatRoom(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);String exchangeName = chatRoomName;String fromQueueName = "queue-" + fromUsername + "-" + toUsername;String toQueueName = "queue-" + toUsername + "-" + fromUsername;rabbitTemplate.execute(action -> {action.exchangeDeclare(exchangeName, "fanout", false, true, null);action.queueDeclare(fromQueueName, false, true, false, null);action.queueDeclare(toQueueName, false, true, false, null);action.queueBind(fromQueueName, exchangeName, "");action.queueBind(toQueueName, exchangeName, "");return null;});lock.lock();try {ChatRoomInfo chatRoomInfo = new ChatRoomInfo();chatRoomInfo.setExchange(exchangeName);chatRoomInfo.setUsernameToQueuename(Map.of(fromUsername, fromQueueName, toUsername, toQueueName));chatRooms.put(chatRoomName, chatRoomInfo);} finally {lock.unlock();}}

接受邀请

被邀请方通过Core交换器得知有人要和它聊天。
然后接受邀请的请求会寻找聊天室信息,然后订阅聊天记录队列。

    public Flux<String> accept(String fromUsername, String toUsername) {String chatRoomName = getChatRoomName(fromUsername, toUsername);return talk(chatRoomName, toUsername);}private Flux<String> talk(String chatRoomName, String username) {ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {throw new IllegalArgumentException("Chat room not found");}String queueName = chatRoomInfo.getUsernameToQueuename().get(username);return Flux.create(emitter -> {SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener((Message message) -> {String msg = new String(message.getBody());System.out.println(username + " received message: " + msg);emitter.next(msg);});listener.start();});}

聊天

聊天的逻辑就是找到聊天室信息,然后向交换器发送消息。

    public void chat(String fromUsername, String toUsername, String message) {String chatRoomName = getChatRoomName(fromUsername, toUsername);ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName);if (chatRoomInfo == null) {chatRoomName = getChatRoomName(toUsername, fromUsername);chatRoomInfo = chatRooms.get(chatRoomName);}if (chatRoomInfo == null) {throw new IllegalArgumentException("Chat room not found");}rabbitTemplate.convertAndSend(chatRoomInfo.getExchange(), "", fromUsername + ": " + message);}private String getChatRoomName(String fromUsername, String toUsername) {return fromUsername + "-" + toUsername + "-chat-room";}

Controller侧代码

package com.rabbitmq.chat.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.chat.service.ChatRoom;
import com.rabbitmq.chat.service.Core;import reactor.core.publisher.Flux;@RestController
@RequestMapping("/chat")
public class ChatController {@Autowiredprivate Core core;@Autowiredprivate ChatRoom chatRoom;@PutMapping(value = "/invite", produces = "text/event-stream")public Flux<String> invite(@RequestParam String fromUsername, @RequestParam String toUsername) {core.invite(fromUsername, toUsername);return chatRoom.invite(fromUsername, toUsername);}@PutMapping(value = "/accept", produces = "text/event-stream")public Flux<String> accept(@RequestParam String fromUsername, @RequestParam String toUsername) {core.accept(fromUsername, toUsername);return chatRoom.accept(fromUsername, toUsername);}@PostMapping("/send")public void send(@RequestParam String fromUsername, @RequestParam String toUsername, @RequestParam String message) {chatRoom.chat(fromUsername, toUsername, message);}
}

实验过程

在Postman中,我们先让tom登录,然后jerry登录。
在这里插入图片描述
在这里插入图片描述
在后台,我们看到创建两个队列
在这里插入图片描述
以及Core交换器的绑定关系也被更新
在这里插入图片描述
Jerry向Tom发起聊天邀请
在这里插入图片描述
可以看到Tom收到了邀请
在这里插入图片描述
同时新增了两个队列
在这里插入图片描述
以及一个交换器
在这里插入图片描述
在这里插入图片描述
Tom通过下面请求接受邀请
在这里插入图片描述
Jerry收到Tom接受了邀请的通知
在这里插入图片描述
后面它们就可以聊天了
在这里插入图片描述
在这里插入图片描述
它们的聊天窗口都收到了消息
在这里插入图片描述
在这里插入图片描述

总结

本文主要使用的知识点:

  • direct交换器以及其绑定规则
  • fanout交换器
  • 自动删除的交换器
  • 自动删除的队列
  • 只有一个消费者的队列
  • WebFlux响应式编程

代码工程

https://github.com/f304646673/RabbitMQDemo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/36634.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

哈喽GPT-4o,对GPT-4o 数据分析Data Analysis的思考与看法

目录 上传一个Excel给Data Analysis。Prompt&#xff1a;请问这个数据集是做什么的Prompt&#xff1a;请问书籍的定价如何&#xff0c;请用合适的图表展示它的售价情况Prompt&#xff1a;请统计书名列中出现最多的名称&#xff0c;然后使用词云将其可视化。Prompt&#xff1a;请…

58.鸿蒙系统app(HarmonyOS)(ArkUI)更改应用程序图标

替换xx\MyApplication4.30\entry\src\main\resources\base\media目录下icon.png文件 54.HarmonyOS鸿蒙系统 App(ArkTS)tcp socket套接字网络连接收发测试_鸿蒙socket连接测试-CSDN博客

『Django』模型入门教程-操作MySQL

theme: smartblue 点赞 关注 收藏 学会了 本文简介 一个后台如果没有数据库可以说废了一半。日常开发中大多数时候都在与数据库打交道。Django 为我们提供了一种更简单的操作数据库的方式。 在 Django 中&#xff0c;模型(Model)是用来定义数据库结构的类。每个模型类通常对…

C++之STL(十二)

1、容器适配器 #include <iostream> #include <stack> #include <list> #include <queue> #include <functional> #include <iterator>using namespace std;int main() {// 栈&#xff08;先进后出filo&#xff09;stack<int, list<…

基于PHP的长城景区信息管理系统

有需要请加文章底部Q哦 可远程调试 基于PHP的长城景区信息管理系统 一 介绍 此长城景区信息管理系统基于原生PHP开发&#xff0c;数据库mysql。系统角色分为用户和管理员。 技术栈&#xff1a;phpmysqlphpstudyvscode 二 功能 用户 1 注册/登录/注销 2 浏览长城景区信息(旅…

Unity解决报错:Execution failed for task ‘:unityLibrary:BuildIl2CppTask‘

目录 编辑器版本2020.3.33f1 及 2021.3.15f1 直接导出apk或aar报错(虽然会自动生成temp的AS工程&#xff0c;经过打开验证 也是无解的)&#xff1b; 唯一解决办法&#xff1a;Unity导出As工程没问题&#xff1b; 编辑器版本2020.3.33f1 及 2021.3.15f1 直接导出apk或aar报…

[电子电路学]电路分析基本概念1

第一章 电路分析的基本概念和基本定律 电路模型 反映实际电路部件的主要电磁性质的理想电路元件及其组合&#xff0c;是实际电路电气特性的抽象和近似。 理想电路元件 实际电路器件品种繁多&#xff0c;其电磁特性多元而复杂&#xff0c;分析和计算时非常困难。而理想电路元件…

一款开源、免费、现代化风格的WPF UI控件库

前言 今天大姚给大家分享一款开源&#xff08;MIT License&#xff09;、免费、现代化风格的WPF UI控件库&#xff1a;ModernWpf。 项目介绍 ModernWpf是一个开源项目&#xff0c;它为 WPF 提供了一组现代化的控件和主题&#xff0c;使开发人员能够创建具有现代外观的桌面应…

【pytorch09】数学运算

1.数学操作 add/minus/multiply/dividematmulpowsqrt/rsqrtround 2.加减乘除 加法 矩阵乘法 torch.mm 只适用于2d torch.matmul 要分清楚是矩阵元素相乘&#xff0c;还是矩阵相乘 例子 x一共有4张照片&#xff0c;每张照片打平成784的向量&#xff0c;希望降维得到[4,51…

戴尔笔记本重装系统?笔记本卡顿失灵?一键重装系统!

随着科技的快速发展&#xff0c;笔记本电脑已成为我们日常生活和工作中不可或缺的工具。然而&#xff0c;随着时间的推移&#xff0c;笔记本可能会遇到各种问题&#xff0c;如系统卡顿、失灵等。这时&#xff0c;重装系统往往是一个有效的解决方案。本文将详细介绍如何在戴尔笔…

ONLYOFFICE 8.1编辑器桌面应用程序来袭——在线全面测评

目录 ✈下载✈ &#x1f440;界面&#x1f440; &#x1f44a;功能&#x1f44a; &#x1f9e0;幻灯片版式的重大改进&#x1f9e0; ✂无缝切换文档编辑、审阅和查看模式✂ &#x1f3b5;在演示文稿中播放视频和音频文件&#x1f3b5; &#x1f917;版本 8.1&#xff1a…

一键生成AI动画视频?Animatediff 和 ComfyUI 更配哦!

大家好我是极客菌&#xff01; 之前我分享过 Animatediff 在 WebUI 中的应用&#xff0c;最近不是在分享 ComfyUI 嘛&#xff0c;那我们也来讲讲 Animatediff 在 ComfyUI 的应用。 如果从工作流和内存利用率的角度来说&#xff0c;Animatediff 和 ComfyUI 可能更配一些&#…

深入理解SSH:网络安全的守护者

在当今数字化时代&#xff0c;网络安全已成为全球关注的焦点。随着网络攻击手段的不断升级&#xff0c;保护数据传输的安全性变得尤为重要。SSH&#xff08;Secure Shell&#xff09;作为一种安全的网络协议&#xff0c;为远程登录和网络服务提供了强大的安全保障&#xff0c;成…

Duix - 硅基数字人SDK

简介 Introduction DUIX(Dialogue User Interface System)是硅基智能打造的AI数字人智能交互平台。通过将数字人交互能力开源,开发者可自行接入多方大模型、语音识别(ASR)、语音合成(TTS)能力,实现数字人实时交互,并在Android和iOS多终端一键部署,让每个开发者可轻松…

4A的「A」会变成AI的「A」吗?

戛纳国际创意节上&#xff0c;广告集团WPP的全球CEO Mark Read 和英国CEO Karen Blackett 解释了WPP如何应对AIGC所带来的「威胁」。同时&#xff0c;Mark Read 与Elon Musk对话&#xff0c;讨论「技术创新的变革力量&#xff0c;人工智能如何重塑创造力、商业和社会&#xff0…

合芯科技冯春阳博士受邀出席苏州大学技术分享会

近日&#xff0c;苏州大学电子信息学院与合芯科技苏州公司成功举办“新时代与‘芯’相遇&#xff0c;科技赋能向未来”的技术分享会。合芯科技冯春阳博士进行了主题为“高性能CPU关键技术与发展现状”的专题分享&#xff0c;并参加导师聘请仪式。苏州大学电子信息学院党委副书记…

苹果电脑有什么好玩的游戏 Windows电脑上的游戏怎么在Mac电脑玩

苹果电脑不仅在工作和生产领域备受推崇&#xff0c;其游戏领域也同样令人惊艳。从经典的策略游戏到刺激的竞技游戏&#xff0c;苹果平台上有着丰富多样的游戏选择&#xff0c;满足了不同玩家的喜好和需求。下面我们来看看苹果电脑有什么好玩的游戏&#xff0c;Windows电脑上的游…

开源模型破局OpenAI服务限制,15分钟灵活搭建RAG和Agent应用

简介&#xff1a; 今天&#xff0c;我们做了两个实验&#xff0c;目标在15分钟内&#xff0c;完成下载社区的开源模型&#xff0c;部署成API&#xff0c;替换LlamaIndex中RAG和LangChain中OpenAI接口Agent的最佳实践&#xff0c;并取得符合预期的结果。 实验一 实验目标&…

Spring Cloud Gateway 与 Nacos 的完美结合

在现代微服务架构中&#xff0c;服务网关扮演着至关重要的角色。它不仅负责路由请求到相应的服务&#xff0c;还承担着诸如负载均衡、安全认证、限流熔断等重要功能。Spring Cloud Gateway 作为 Spring Cloud 生态系统中的一员&#xff0c;以其强大的功能和灵活的配置&#xff…

浅谈linux(1)

文章目录 一、linux1.1、使用终端xshell登陆到云服务器上1.2、linux一些常用命令1.2.1、一些快捷键1.2.2、关于目录的操作1.2.3、关于文件的命令1.2.4、关于目录的命令1.2.5、vim 针对文件进行编辑 一、linux linux 操作系统&#xff0c;我使用的是发行版&#xff0c;Centos7。…