RabbitMQ实践——搭建多人聊天服务

大纲

  • 用户登录
  • 创建聊天室
  • 监听Stream(聊天室)
  • 发送消息
  • 实验
    • 登录
      • Tom侧
      • Jerry侧
    • 创建聊天室
      • Jerry侧
      • Tom侧
    • 进入聊天室
      • Jerry侧
      • Tom侧
    • 发送消息
      • Jerry发送消息
        • Jerry侧聊天室
        • Tom侧聊天室
      • Tom发送消息
        • Jerry侧聊天室
        • Tom侧聊天室
  • 代码工程
  • 参考资料

在《RabbitMQ实践——搭建单人聊天服务》一文中,我们搭建了Tom和Jerry两人的聊天服务。在这个服务中,它们都向Fanout交换器发送消息。而Fanout会将消息路由到它们两各自监听的队列。这样它们就可以得到全部消息。
在这里插入图片描述
如果是多人聊天,比如10个人聊天,按上述方案,需要Fanout交换器绑定10个队列。这就会使得结构变得非常复杂。
这是因为Classic类型队列在消费者确认读取消息后,会将消息从队列中删除。这样就需要我们使用fanout向多个队列路由消息,以供不同消费者消费。如果多个消费者消费同一个队列,则会导致每个消费者得到的都是部分信息。这就不符合我们理解的聊天场景。
但是我们可以使用Stream类型队列来解决这个问题。
Stream类型队列和之前的Classic队列的不同点是:Stream队列并不会清除消息。消息会一直存在于Stream队列中,消费者可以从指定位置开始读取消息。这样我们只要有一个Stream队列保存消息,所有消费者都从队列中读取消息即可。
在这里插入图片描述

用户登录

关于用户登录的流程我们在《RabbitMQ实践——搭建单人聊天服务》中已经有详细的介绍。即上图中黑色字体1、2、3、4、5的步骤。

创建聊天室

我们会创建一个以聊天室名称命名的交换器和Stream类型队列。即上图中黑色字体6、7、8、9的步骤。
需要注意的是Stream类型队列创建方案和Classic类型类似,只需要多指定"x-queue-type"=“stream”。但是对于Durable(持久化)只能设置为True,exclusive只能设置为False,autoDelete只能设置为False。

package com.rabbitmq.chat.service;import java.util.Collections;
import java.util.Date;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import reactor.core.publisher.Flux;@Service
public class ChatRoomV2 {@Autowiredprivate RabbitTemplate rabbitTemplate;public void createChatRoom(String admin, String roomName) {createChatRoom(roomName);}private void createChatRoom(String roomName) {rabbitTemplate.execute(action -> {action.exchangeDeclare(roomName, "fanout", false, true, null);action.queueDeclare(roomName, true, false, false,Collections.singletonMap("x-queue-type", "stream"));action.queueBind(roomName, roomName, "");return null;});}

聊天室创建完毕后,会通知所有登录的用户。

    @PostMapping("/create")public void create(@RequestParam String admin, @RequestParam String roomName) {chatRoomV2.createChatRoom(admin, roomName);core.notifyEveryone(roomName + " is created");}

监听Stream(聊天室)

    public Flux<String> receive(String username, String roomName) {return Flux.create(emitter -> {rabbitTemplate.execute(channel -> {channel.basicQos(100);Date timestamp = new Date(System.currentTimeMillis());channel.basicConsume(roomName, false, username,false, true,Collections.singletonMap("x-stream-offset", timestamp),(consumerTag, message) -> {String senderOfMessage = message.getProperties().getHeaders().get("username").toString();String show = "You Said: ";if (!senderOfMessage.equals(username)) {show = senderOfMessage + " Said: ";}show += new String(message.getBody());System.out.println(show);emitter.next(show);channel.basicAck(message.getEnvelope().getDeliveryTag(), false);},consumerTag -> { });return null;});});}

我们将"x-stream-offset"设置为当前毫秒数,是表示我们只读取当前时间之后发布的消息。这也符合聊天室的业务特点:不能读取历史消息。
当我们收到消息后,我们会获取消息Header中的自定义字段username,它标志了消息的发布者。如果发布者和读取者是同一人,我们将展示内容前面新增“You Said:”;如果是别人说的,则标记发布者的名称。
由于我们使用了WebFlux响应式编程,所以Controller层要做特殊处理

    @GetMapping(value = "/receive", produces = "text/event-stream")public Flux<String> receive(@RequestParam String username, @RequestParam String roomName) {return chatRoomV2.receive(username, roomName);}

发送消息

每个聊天室用户只要给之前创建的Fanout交换器发送消息即可。在这一步,我们给他们发送的消息Header中新增了字段username,以标记是谁发送的。

    public void send(String username, String roomName, String message) {Message msg = MessageBuilder.withBody(message.getBytes()).setHeader("username", username).build();rabbitTemplate.send(roomName, "", msg);}

实验

登录

Tom侧

在这里插入图片描述

Jerry侧

在这里插入图片描述

创建聊天室

Jerry侧

Jerry申请创建一个聊天室
在这里插入图片描述
在管理后台,我们看到对应的交换器和Stream都创建出来了。
在这里插入图片描述
在这里插入图片描述
同时在刚才的登录接口界面,Jerry收到了通知
在这里插入图片描述

Tom侧

Tom也会收到通知
在这里插入图片描述

进入聊天室

Tom和Jerry在收到通知后,可以通过receive接口进入聊天室,监听聊天室内容变化。

Jerry侧

在这里插入图片描述

Tom侧

在这里插入图片描述

发送消息

Jerry发送消息

在这里插入图片描述

Jerry侧聊天室

在这里插入图片描述

Tom侧聊天室

在这里插入图片描述

Tom发送消息

在这里插入图片描述

Jerry侧聊天室

在这里插入图片描述

Tom侧聊天室

在这里插入图片描述

代码工程

https://github.com/f304646673/RabbitMQDemo

参考资料

  • https://www.rabbitmq.com/docs/streams

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/35155.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Webpack: 前端资深构建工具

概述 如果你是一名前端工程师&#xff0c;相信之前或多或少听过、用过 Webpack 这一构建工具&#xff0c;它能够融合多种工程化工具&#xff0c;将开发阶段的应用代码编译、打包成适合网络分发、客户端运行的应用产物如今&#xff0c;Webpack 已经深深渗入到前端工程的方方面面…

简单了解IoC

IoC 什么是IoC&#xff1f; IoC&#xff08;Inversion of Control&#xff09;&#xff0c;即控制反转&#xff0c;这是一种设计思想&#xff0c;在Spring指将对象的控制权交给Spring容器&#xff0c;由容器来实现对象的创建、管理&#xff0c;程序员只需要从容器获取想要的对…

java设计模式(四)原型模式(Prototype Pattern)

1、模式介绍&#xff1a; 原型模式&#xff08;Prototype Pattern&#xff09;是一种创建型设计模式&#xff0c;它允许对象在创建新实例时通过复制现有实例而不是通过实例化新对象来完成。这样做可以避免耗费大量的资源和时间来初始化对象。原型模式涉及一个被复制的原型对象…

ES6模板字符串详解

ES6是JavaScript语言的一次重大更新&#xff0c;引入了许多新特性和语法改进&#xff0c;其中模板字符串是一个非常实用和灵活的语法特性。它可以让我们从数组或对象中提取值&#xff0c;并赋给对应的变量&#xff0c;让代码变得更加简洁和易读。 本文将深入探讨ES6解构赋值的语…

域控制器BSP开发工程师面试题

在域控制器BSP(Board Support Package)开发工程师的面试中,可能会遇到以下一些问题。以下是根据参考文章和相关知识整理的面试题及其可能的回答格式: 面试题1:请简要介绍一下您对域控制器BSP的理解。 回答: 域控制器BSP,即板卡支持包,是嵌入式系统开发中的关键组成部…

Nginx开发--动静分离和URLRewrite

05 【动静分离和URLRewrite】 1.动静分离介绍 为了提高网站的响应速度&#xff0c;减轻程序服务器&#xff08;Tomcat&#xff0c;Jboss等&#xff09;的负载&#xff0c;对于静态资源&#xff0c;如图片、js、css等文件&#xff0c;可以在反向代理服务器中进行缓存&#xff…

减少液氮罐内液氮损耗的方法

监测与管理液氮容器的密封性能 液氮容器的密封性能直接影响液氮的损耗情况。一个常见的损耗源是容器本身的密封不良或老化导致的泄漏。为了有效减少液氮损耗&#xff0c;首先应当定期检查液氮容器的密封性能。这可以通过简单的方法如肉眼检查外观&#xff0c;或者更精确的方法…

KALI LINUX 开启ssh免登录服务及固定ip及

SSH以进行远程登录 在Kali Linux中启用SSH以进行远程登录,请按照以下步骤操作: 安装SSH服务:sudo apt update sudo apt install openssh-server 已安装可忽略 sudo systemctl start ssh 启动SSH服务 sudo systemctl enable ssh 确保SSH服务设置为开机启动: (可选)如…

xxl-job 分布式任务调度 基本使用

xxl-job 是一个分布式任务调度平台&#xff0c;使用非常方便。 官网&#xff1a;https://gitee.com/xuxueli0323/xxl-job 工作原理类似于nacos 执行器注册到调度中心 调度中心分配任务 执行器执行任务 docker-compose 配置 version: 3 services:xxl-job:image: xuxueli/xxl-…

科普文:外贸垃圾邮件判定

国外垃圾邮件判定规则 很多时候&#xff0c;外贸的沟通多以邮件为主&#xff0c;他们作为专业的采购商&#xff0c;每天邮箱里都会塞满了邮件。因此&#xff0c;为了提高工作效率&#xff0c;很多国外客户喜欢使用垃圾邮件过滤器来过滤掉一部分垃圾邮件。 以下几种情况会触发垃…

《重构》读书笔记【第1章 重构,第一个示例,第2章 重构原则】

文章目录 第1章 重构&#xff0c;第一个示例1.1 重构前1.2 重构后 第2章 重构原则2.1 何谓重构2.2 两顶帽子2.3 为何重构2.4 何时重构2.5 重构和开发过程 第1章 重构&#xff0c;第一个示例 我这里使用的IDE是IntelliJ IDEA 1.1 重构前 plays.js export const plays {&quo…

【学习笔记】ElasticSearch

中文社区&#xff1a;https://elasticsearch.cn/ Cluster集群&#xff0c;一个ES集群由一个或多个节点&#xff08;Node&#xff09;组成&#xff0c;每个集群都有一个Cluster Name作为标识 Node节点&#xff0c;一个ES实例就是一个node&#xff0c;一个机器可以有多个实例&a…

工具提示框(Tooltip): 设计、应用与最佳实践

工具提示框(Tooltip): 设计、应用与最佳实践 引言 工具提示框(Tooltip)是用户界面(UI)设计中的一种常见元素,它为用户提供关于界面元素或操作的额外信息。当用户将鼠标悬停在某个元素上时,Tooltip会以文本形式显示相关信息,帮助用户更好地理解和使用界面。本文将探讨…

AcWing算法基础课笔记——高斯消元

高斯消元 用来求解方程组 a 11 x 1 a 12 x 2 ⋯ a 1 n x n b 1 a 21 x 1 a 22 x 2 ⋯ a 2 n x n b 2 … a n 1 x 1 a n 2 x 2 ⋯ a n n x n b n a_{11} x_1 a_{12} x_2 \dots a_{1n} x_n b_1\\ a_{21} x_1 a_{22} x_2 \dots a_{2n} x_n b_2\\ \dots \\ a…

Android 使用cmd wifi命令操作wifi

cmd wifi 命令完整的说明说下。 console:/ # cmd wifi Wi-Fi (wifi) commands:help or -hPrint this help text.get-country-codeGets country code as a two-letter stringset-wifi-enabled enabled|disabledEnables…

论文导读 | Manufacturing Service Operations Management近期文章精选

编者按 在本系列文章中&#xff0c;我们梳理了顶刊Manufacturing & Service Operations Management5月份发布有关OR/OM以及相关应用的文章之基本信息&#xff0c;旨在帮助读者快速洞察行业/学界最新动态。 推荐文章1 ● 题目&#xff1a;Robust Drone Delivery with Weath…

【C++题解】1712. 输出满足条件的整数2

问题&#xff1a;1712. 输出满足条件的整数2 类型&#xff1a;简单循环 题目描述&#xff1a; 有这样的三位数&#xff0c;其百位、十位、个位的数字之和为偶数&#xff0c;且百位大于十位&#xff0c;十位大于个位&#xff0c;请输出满所有满足条件的整数。 输入&#xff1…

#05搜索法

要点&#xff1a; ①搜索法&#xff1a;穷举搜索、深度优先搜索、广度优先搜索、广深结合搜索、回溯法、分支限界法&#xff1b; ②解空间树&#xff1a;子集树、排列树、满m叉树。 ③回溯法及分支限界法求解问题的方法与步骤。 难点&#xff1a; 子集树、排列树和满m叉树…

小程序下拉刷新,加载更多数据,移动端分页

文章目录 页面结构图WXML页面代码js代码wxss代码总结备注 参考&#xff1a;https://juejin.cn/post/7222855604406796346 页面结构图 一般页面就4个结构&#xff1a;最外滚动层、数据展示层、暂无数据层、没有更多数据层。 如图&#xff1a; WXML页面代码 <scroll-view …

前端:Vue中使用JS-Cookie

在我们构建Vue站点时候&#xff0c;可能需要使用 cookie 来记录用户信息或者偏好设置&#xff0c;我们可以引入第三方库 GitHub - js-cookie/js-cookie: A simple, lightweight JavaScript API for handling browser cookies 来方便地操作 cookie。接下来我们就来一步一步地实…