RabbitMQ实践——搭建多人聊天服务

大纲

  • 用户登录
  • 创建聊天室
  • 监听Stream(聊天室)
  • 发送消息
  • 实验
    • 登录
      • Tom侧
      • Jerry侧
    • 创建聊天室
      • Jerry侧
      • Tom侧
    • 进入聊天室
      • Jerry侧
      • Tom侧
    • 发送消息
      • Jerry发送消息
        • Jerry侧聊天室
        • Tom侧聊天室
      • Tom发送消息
        • Jerry侧聊天室
        • Tom侧聊天室
  • 代码工程
  • 参考资料

在《RabbitMQ实践——搭建单人聊天服务》一文中,我们搭建了Tom和Jerry两人的聊天服务。在这个服务中,它们都向Fanout交换器发送消息。而Fanout会将消息路由到它们两各自监听的队列。这样它们就可以得到全部消息。
在这里插入图片描述
如果是多人聊天,比如10个人聊天,按上述方案,需要Fanout交换器绑定10个队列。这就会使得结构变得非常复杂。
这是因为Classic类型队列在消费者确认读取消息后,会将消息从队列中删除。这样就需要我们使用fanout向多个队列路由消息,以供不同消费者消费。如果多个消费者消费同一个队列,则会导致每个消费者得到的都是部分信息。这就不符合我们理解的聊天场景。
但是我们可以使用Stream类型队列来解决这个问题。
Stream类型队列和之前的Classic队列的不同点是:Stream队列并不会清除消息。消息会一直存在于Stream队列中,消费者可以从指定位置开始读取消息。这样我们只要有一个Stream队列保存消息,所有消费者都从队列中读取消息即可。
在这里插入图片描述

用户登录

关于用户登录的流程我们在《RabbitMQ实践——搭建单人聊天服务》中已经有详细的介绍。即上图中黑色字体1、2、3、4、5的步骤。

创建聊天室

我们会创建一个以聊天室名称命名的交换器和Stream类型队列。即上图中黑色字体6、7、8、9的步骤。
需要注意的是Stream类型队列创建方案和Classic类型类似,只需要多指定"x-queue-type"=“stream”。但是对于Durable(持久化)只能设置为True,exclusive只能设置为False,autoDelete只能设置为False。

package com.rabbitmq.chat.service;import java.util.Collections;
import java.util.Date;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import reactor.core.publisher.Flux;@Service
public class ChatRoomV2 {@Autowiredprivate RabbitTemplate rabbitTemplate;public void createChatRoom(String admin, String roomName) {createChatRoom(roomName);}private void createChatRoom(String roomName) {rabbitTemplate.execute(action -> {action.exchangeDeclare(roomName, "fanout", false, true, null);action.queueDeclare(roomName, true, false, false,Collections.singletonMap("x-queue-type", "stream"));action.queueBind(roomName, roomName, "");return null;});}

聊天室创建完毕后,会通知所有登录的用户。

    @PostMapping("/create")public void create(@RequestParam String admin, @RequestParam String roomName) {chatRoomV2.createChatRoom(admin, roomName);core.notifyEveryone(roomName + " is created");}

监听Stream(聊天室)

    public Flux<String> receive(String username, String roomName) {return Flux.create(emitter -> {rabbitTemplate.execute(channel -> {channel.basicQos(100);Date timestamp = new Date(System.currentTimeMillis());channel.basicConsume(roomName, false, username,false, true,Collections.singletonMap("x-stream-offset", timestamp),(consumerTag, message) -> {String senderOfMessage = message.getProperties().getHeaders().get("username").toString();String show = "You Said: ";if (!senderOfMessage.equals(username)) {show = senderOfMessage + " Said: ";}show += new String(message.getBody());System.out.println(show);emitter.next(show);channel.basicAck(message.getEnvelope().getDeliveryTag(), false);},consumerTag -> { });return null;});});}

我们将"x-stream-offset"设置为当前毫秒数,是表示我们只读取当前时间之后发布的消息。这也符合聊天室的业务特点:不能读取历史消息。
当我们收到消息后,我们会获取消息Header中的自定义字段username,它标志了消息的发布者。如果发布者和读取者是同一人,我们将展示内容前面新增“You Said:”;如果是别人说的,则标记发布者的名称。
由于我们使用了WebFlux响应式编程,所以Controller层要做特殊处理

    @GetMapping(value = "/receive", produces = "text/event-stream")public Flux<String> receive(@RequestParam String username, @RequestParam String roomName) {return chatRoomV2.receive(username, roomName);}

发送消息

每个聊天室用户只要给之前创建的Fanout交换器发送消息即可。在这一步,我们给他们发送的消息Header中新增了字段username,以标记是谁发送的。

    public void send(String username, String roomName, String message) {Message msg = MessageBuilder.withBody(message.getBytes()).setHeader("username", username).build();rabbitTemplate.send(roomName, "", msg);}

实验

登录

Tom侧

在这里插入图片描述

Jerry侧

在这里插入图片描述

创建聊天室

Jerry侧

Jerry申请创建一个聊天室
在这里插入图片描述
在管理后台,我们看到对应的交换器和Stream都创建出来了。
在这里插入图片描述
在这里插入图片描述
同时在刚才的登录接口界面,Jerry收到了通知
在这里插入图片描述

Tom侧

Tom也会收到通知
在这里插入图片描述

进入聊天室

Tom和Jerry在收到通知后,可以通过receive接口进入聊天室,监听聊天室内容变化。

Jerry侧

在这里插入图片描述

Tom侧

在这里插入图片描述

发送消息

Jerry发送消息

在这里插入图片描述

Jerry侧聊天室

在这里插入图片描述

Tom侧聊天室

在这里插入图片描述

Tom发送消息

在这里插入图片描述

Jerry侧聊天室

在这里插入图片描述

Tom侧聊天室

在这里插入图片描述

代码工程

https://github.com/f304646673/RabbitMQDemo

参考资料

  • https://www.rabbitmq.com/docs/streams

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/35155.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Webpack: 前端资深构建工具

概述 如果你是一名前端工程师&#xff0c;相信之前或多或少听过、用过 Webpack 这一构建工具&#xff0c;它能够融合多种工程化工具&#xff0c;将开发阶段的应用代码编译、打包成适合网络分发、客户端运行的应用产物如今&#xff0c;Webpack 已经深深渗入到前端工程的方方面面…

简单了解IoC

IoC 什么是IoC&#xff1f; IoC&#xff08;Inversion of Control&#xff09;&#xff0c;即控制反转&#xff0c;这是一种设计思想&#xff0c;在Spring指将对象的控制权交给Spring容器&#xff0c;由容器来实现对象的创建、管理&#xff0c;程序员只需要从容器获取想要的对…

java设计模式(四)原型模式(Prototype Pattern)

1、模式介绍&#xff1a; 原型模式&#xff08;Prototype Pattern&#xff09;是一种创建型设计模式&#xff0c;它允许对象在创建新实例时通过复制现有实例而不是通过实例化新对象来完成。这样做可以避免耗费大量的资源和时间来初始化对象。原型模式涉及一个被复制的原型对象…

ES6模板字符串详解

ES6是JavaScript语言的一次重大更新&#xff0c;引入了许多新特性和语法改进&#xff0c;其中模板字符串是一个非常实用和灵活的语法特性。它可以让我们从数组或对象中提取值&#xff0c;并赋给对应的变量&#xff0c;让代码变得更加简洁和易读。 本文将深入探讨ES6解构赋值的语…

Nginx开发--动静分离和URLRewrite

05 【动静分离和URLRewrite】 1.动静分离介绍 为了提高网站的响应速度&#xff0c;减轻程序服务器&#xff08;Tomcat&#xff0c;Jboss等&#xff09;的负载&#xff0c;对于静态资源&#xff0c;如图片、js、css等文件&#xff0c;可以在反向代理服务器中进行缓存&#xff…

减少液氮罐内液氮损耗的方法

监测与管理液氮容器的密封性能 液氮容器的密封性能直接影响液氮的损耗情况。一个常见的损耗源是容器本身的密封不良或老化导致的泄漏。为了有效减少液氮损耗&#xff0c;首先应当定期检查液氮容器的密封性能。这可以通过简单的方法如肉眼检查外观&#xff0c;或者更精确的方法…

xxl-job 分布式任务调度 基本使用

xxl-job 是一个分布式任务调度平台&#xff0c;使用非常方便。 官网&#xff1a;https://gitee.com/xuxueli0323/xxl-job 工作原理类似于nacos 执行器注册到调度中心 调度中心分配任务 执行器执行任务 docker-compose 配置 version: 3 services:xxl-job:image: xuxueli/xxl-…

科普文:外贸垃圾邮件判定

国外垃圾邮件判定规则 很多时候&#xff0c;外贸的沟通多以邮件为主&#xff0c;他们作为专业的采购商&#xff0c;每天邮箱里都会塞满了邮件。因此&#xff0c;为了提高工作效率&#xff0c;很多国外客户喜欢使用垃圾邮件过滤器来过滤掉一部分垃圾邮件。 以下几种情况会触发垃…

《重构》读书笔记【第1章 重构,第一个示例,第2章 重构原则】

文章目录 第1章 重构&#xff0c;第一个示例1.1 重构前1.2 重构后 第2章 重构原则2.1 何谓重构2.2 两顶帽子2.3 为何重构2.4 何时重构2.5 重构和开发过程 第1章 重构&#xff0c;第一个示例 我这里使用的IDE是IntelliJ IDEA 1.1 重构前 plays.js export const plays {&quo…

AcWing算法基础课笔记——高斯消元

高斯消元 用来求解方程组 a 11 x 1 a 12 x 2 ⋯ a 1 n x n b 1 a 21 x 1 a 22 x 2 ⋯ a 2 n x n b 2 … a n 1 x 1 a n 2 x 2 ⋯ a n n x n b n a_{11} x_1 a_{12} x_2 \dots a_{1n} x_n b_1\\ a_{21} x_1 a_{22} x_2 \dots a_{2n} x_n b_2\\ \dots \\ a…

论文导读 | Manufacturing Service Operations Management近期文章精选

编者按 在本系列文章中&#xff0c;我们梳理了顶刊Manufacturing & Service Operations Management5月份发布有关OR/OM以及相关应用的文章之基本信息&#xff0c;旨在帮助读者快速洞察行业/学界最新动态。 推荐文章1 ● 题目&#xff1a;Robust Drone Delivery with Weath…

【C++题解】1712. 输出满足条件的整数2

问题&#xff1a;1712. 输出满足条件的整数2 类型&#xff1a;简单循环 题目描述&#xff1a; 有这样的三位数&#xff0c;其百位、十位、个位的数字之和为偶数&#xff0c;且百位大于十位&#xff0c;十位大于个位&#xff0c;请输出满所有满足条件的整数。 输入&#xff1…

#05搜索法

要点&#xff1a; ①搜索法&#xff1a;穷举搜索、深度优先搜索、广度优先搜索、广深结合搜索、回溯法、分支限界法&#xff1b; ②解空间树&#xff1a;子集树、排列树、满m叉树。 ③回溯法及分支限界法求解问题的方法与步骤。 难点&#xff1a; 子集树、排列树和满m叉树…

小程序下拉刷新,加载更多数据,移动端分页

文章目录 页面结构图WXML页面代码js代码wxss代码总结备注 参考&#xff1a;https://juejin.cn/post/7222855604406796346 页面结构图 一般页面就4个结构&#xff1a;最外滚动层、数据展示层、暂无数据层、没有更多数据层。 如图&#xff1a; WXML页面代码 <scroll-view …

Golang | Leetcode Golang题解之第191题位1的个数

题目&#xff1a; 题解&#xff1a; func hammingWeight(num uint32) (ones int) {for ; num > 0; num & num - 1 {ones}return }

# Kafka_深入探秘者(5):kafka 分区

Kafka_深入探秘者&#xff08;5&#xff09;&#xff1a;kafka 分区 一、kafka 副本机制 1、Kafka 可以将主题划分为多个分区(Partition)&#xff0c;会根据分区规则选择把消息存储到哪个分区中&#xff0c;只要如果分区规则设置的合理&#xff0c;那么所有的消息将会被均匀的…

Golang | Leetcode Golang题解之第198题打家劫舍

题目&#xff1a; 题解&#xff1a; func rob(nums []int) int {if len(nums) 0 {return 0}if len(nums) 1 {return nums[0]}first : nums[0]second : max(nums[0], nums[1])for i : 2; i < len(nums); i {first, second second, max(first nums[i], second)}return se…

openEuler离线安装nginx

目录 1.创建储存目录 2.切换到储存目录 3.首先在外网的环境下下载nginx的rpm包 4.目录打包tar包拷贝到离线路径 5.安装nginx 6.启动 nginx 7.停止 nginx 8.重新加载 nginx 配置 9.重新启动 nginx&#xff08;先停止再启动 nginx&#xff09; 10.检查 nginx 服务…

【分布式系统】Zookeeper学习笔记

基本概念 Zookeeper工作机制 从设计模式角度理解: 是一个基于观察者模式设计的分布式服务管理框架; 负责存储和管理大家都关心的数据, 一旦这些数据的状态发生变化, Zookeeper就将负责通知已经在Zookeeper上注册的那些观察值做出相应的反应. Zookeeper特点 Zookeeper有: 一…

EdgeOne 边缘函数 + Hono.js + Fauna 搭建个人博客

一、背景 虽然 “博客” 已经是很多很多年前流行的东西了&#xff0c;但是时至今日&#xff0c;仍然有一部分人在维护自己的博客站点&#xff0c;输出不少高质量的文章。 我使用过几种博客托管平台或静态博客生成框架&#xff0c;前段时间使用Hono.jsFauna &#xff0c;基于 …