RabbitMQ 客户端 连接、发送、接收处理消息

RabbitMQ 客户端 连接、发送、接收处理消息

一. RabbitMQ 的机制跟 Tcp、Udp、Http 这种还不太一样

RabbitMQ 服务,不是像其他服务器一样,负责逻辑处理,然后转发给客户端
而是所有客户端想要向 RabbitMQ服务发送消息,

第一步:创建一个链接 RabbitMQ 服务的连接

需要传入 RabbitMQ服务地址、用户名、密码,然后在连接代码中传入一个 queue 的字符串作为 标志
连接成功后,RabbitMQ服务上就可以看到这个链接了
如下图,可以看到有一个 Name = queueL1 的连接,后边有链接状态、消息数
Ready 和 Total 都是 0
在这里插入图片描述

向 RabbitMQ 发送消息的:

(1) 如果没有建立连接,执行第一步,建立一个链接
(2) 通过 发送消息接口向 RabbitMQ 服务 发消息
(3) RabbitMQ 服务接收到消息,只是按照连接的 queue 分别把消息放在自己名字的 queue 下, RabbitMQ 服务只是存着客户端发送的消息,服务什么都不处理

向 RabbitMQ 服务发送几条消息
下图可以看到 queueL1 的队列已经接收了 5 条消息,这五条消息如果没有客户端接收处理,就一直在这存着
在这里插入图片描述

接收 RabbitMQ 服务消息:

(1) 如果没有建立连接,执行第一步,建立一个链接
(2) 注册接收消息接口,在 RabbitMQ 中叫 消费消息,可以标记消费消息后是否将 RabbitMQ 的数据删除
(3) 如果 RabbitMQ 服务收到消息,就转发给 注册接收消息接口的 连接,如果接收的连接标记了 AutoDelete,那么发送给客户端后,RabbitMQ 就会将消息从消息队列中删除

注册接收消息,我的客户端就会收到 RabbitMQ 发送过来的消息,消息中包含发送上来的消息内容,还有发送消息的 queue 名字

此时再看,就会发现 Ready 和 Total 又变成 0 了
在这里插入图片描述

为什么上面讲解中将 接收 RabbitMQ 服务消息、向 RabbitMQ 发送消息的 分开说
是因为 RabbitMQ 发送消息就仅仅是发消息,发送完就不管了
而 RabbitMQ 的消费消息(接收消息) 也仅仅是接收消息,它不管是谁发的消息,只要是发送的 RabbitMQ 服务的消息,它都能接收,

(3.1) 比如我创建了 一个 连接,queue名为 xxxA,
它发送了消息 “Hello World”,
xxxA 连接自己又注册了 消费消息(接收消息),那么xxxA 自己就会接收到 xxxA 队列发送的 Hello World 信息

(3.2) 我又创建了 新的连接,queue 名还是 xxxA
那么新的连接也可以收到 (3.1) 发的 消息 HelloWorld

二. 客户端连接服务器

  1. 实例化一个 连接 RabbitMQ 服务的客户端连接
    实例化需要传入 服务地址、用户名、密码
using RabbitMQ.Client;
using System;
using System.Threading.Tasks;
using RabbitMQ.Client.Exceptions;namespace Network
{/// <summary>/// RabbitMQ 创建一个链接/// 供 RabbitMQReceive、RabbitMQSend 使用/// </summary>internal class RabbitMQConnect{private string queue = "myQueue";   // 连接的队列名private string hostName = "localhost"; // 替换为你的 RabbitMQ 服务器地址private string userName = "guest";     // 替换为用户名private string password = "guest";     // 替换为密码private ConnectionFactory factory;private IConnection connection;private IChannel channel;private NetWorkState state;public RabbitMQConnect(string queue, string hostName, string userName, string password){this.queue = queue;this.hostName = hostName;this.userName = userName;this.password = password;State = NetWorkState.Disconnected;}public string Queue{get { return queue; }}public NetWorkState State{get { return state; }private set { state = value; }}public IChannel Channel{get { return channel; }}/// <summary>/// 网络是否连接中/// </summary>public bool IsConnect{get{return connection.IsOpen;}}public async Task StartConnect(){// 创建连接工厂// 如果初始化失败,不会启动恢复连接factory = new ConnectionFactory(){HostName = hostName, // 替换为你的 RabbitMQ 服务器地址UserName = userName, // 替换为用户名Password = password  // 替换为密码};// 自动恢复连接factory.AutomaticRecoveryEnabled = true;// 如果由于异常导致恢复失败(例如RabbitMQ节点仍然不可达),它将在固定的时间间隔(默认为5秒)后重试。间隔时间可配置如下// Connection.CloseAsync 关闭的连接不会启动自动恢复连接factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);factory.TopologyRecoveryEnabled = true;while (State != NetWorkState.Connected){await Connect();}await Task.Delay(1);}private async Task Connect(){try{State = NetWorkState.Connecting;// 异步创建连接connection = await factory.CreateConnectionAsync();channel = await connection.CreateChannelAsync();// 声明队列QueueDeclareOk queueDeclareOk = await channel.QueueDeclareAsync(queue: queue,durable: false,exclusive: false,autoDelete: false,arguments: null);/*autoDelete = true:没有消费者时队列自动删除,通常用于临时或一次性的队列。autoDelete = false:队列不会自动删除,通常用于需要长期存在的队列。选择是否设置 autoDelete = true 取决于你是否希望队列在没有消费者时自动删除。如果你的队列是临时的、一次性的,那么使用 autoDelete = true 会更适合;如果队列是长期需要使用的,则设置为 autoDelete = false 会更为合适 */State = NetWorkState.Connected;Debug.Log("RabbitMQ Connect Success");}catch (BrokerUnreachableException e){await Task.Delay(5000);State = NetWorkState.ConnectFailed;Debug.LogError("ConnectError:" + e.ToString());// apply retry logic}await Task.Delay(1);}/// <summary>/// 关闭连接/// </summary>public async void Dispose(){if (connection != null){UnityEngine.Debug.Log("ConnectDispose");await connection.CloseAsync();}await Task.Delay(1);}}
}

RibbitMQ 服务通过 queue 来区分每一个连接的客户端,代码部分如下

QueueDeclareOk queueDeclareOk = await channel.QueueDeclareAsync(queue: queue,durable: false,exclusive: false,autoDelete: false,arguments: null);
  1. 从一个链接发送消息给 RabbitMQ
using RabbitMQ.Client;
using System.Text;
using System.Threading.Tasks;
using System;namespace Network
{/// <summary>/// RabbitMQ 只发送消息的实例/// </summary>internal class RabbitMQSend{private RabbitMQConnect rabbitMQConnect;public RabbitMQSend(string queue, string hostName, string userName, string password){rabbitMQConnect = new RabbitMQConnect(queue, hostName, userName, password);Start();}public async void Start(){await rabbitMQConnect.StartConnect();}/// <summary>/// 发送消息/// exchange:   要发布消息的交换机名称。/// routingKey: 路由键,决定消息应该路由到哪个队列。/// mandatory:  如果设置为 true,RabbitMQ 会确保消息至少被投递到一个队列。如果没有队列接收该消息,RabbitMQ 会触发 basic.return。/// immediate:  如果设置为 true,RabbitMQ 会在消息无法立即被消费时丢弃消息。/// basicProperties: 消息的属性,类型为 IBasicProperties。这些属性可以设置消息的优先级、持久性等。/// body: 消息体的字节数组。/// /// BasicPublishAsync 方法 没有返回消息投递的结果。它仅仅表示“请求已经被成功发送到 RabbitMQ 的交换机”。如果发布操作成功,Task 会正常完成,不会抛出异常。你可以通过异常处理来捕获潜在的错误。/// </summary>/// <param name="msg"></param>public async Task SendAsync(string message){if (!rabbitMQConnect.IsConnect){await rabbitMQConnect.StartConnect();}try{IChannel channel = rabbitMQConnect.Channel;var body = Encoding.UTF8.GetBytes(message);var props = new BasicProperties();props.ContentType = "text/plain";props.DeliveryMode = DeliveryModes.Persistent;await channel.BasicPublishAsync(exchange: "",routingKey: rabbitMQConnect.Queue,mandatory: false,basicProperties: props,body: body).ConfigureAwait(false);Debug.Log($"[x] Sent: {message}");}catch (Exception ex){UnityEngine.Debug.LogError($"Error publishing message: {ex.Message}");}}public void Dispose(){rabbitMQConnect?.Dispose();}}
}
  1. 注册一个 消费消息(接受消息)事件
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Threading.Tasks;
using System;namespace Network
{/// <summary>/// RabbitMQ 只接受消息的实例/// </summary>internal class RabbitMQReceive{private RabbitMQConnect rabbitMQConnect;// 指定要接收那个 queue 的消息private string receiveQueue;private Action<string, byte[]> receivedCallBack;public RabbitMQReceive(string queue, string receiveQueue, string hostName, string userName, string password){rabbitMQConnect = new RabbitMQConnect(queue, hostName, userName, password);this.receiveQueue = receiveQueue;Start();}private async void Start(){await rabbitMQConnect.StartConnect();string result = await BasicConsumer();Debug.Log("result:" + result);}/// <summary>/// 设置接收消息回调/// </summary>/// <param name="receivedCallBack"></param>public void SetReceivedCB(Action<string, byte[]> receivedCallBack){this.receivedCallBack = receivedCallBack;}/// <summary>/// 创建异步消费者/// </summary>/// <returns></returns>private async Task<string> BasicConsumer(){var consumer = new AsyncEventingBasicConsumer(rabbitMQConnect.Channel);// 处理消息的异步回调逻辑consumer.ReceivedAsync += ReceivedAsync;// 开始消费string result = await rabbitMQConnect.Channel.BasicConsumeAsync(queue: receiveQueue,  // 指定消费者要监听的队列名称autoAck: true,        // 决定是否自动确认消息。如果 true,消息在交付时会自动确认。如果 false,则需要手动调用 BasicAck 确认消息consumer: consumer);  // 指定消息的处理方式,通过实现 IBasicConsumer 接口来定义如何处理从队列中接收到的消息/*autoAck = true:消息一旦传递给消费者,RabbitMQ 就认为该消息被成功处理,无需再确认。autoAck = false:消费者需要显式地调用 channel.BasicAck 来确认消息的处理,通常用于消息处理失败时能够重试消息。*/return result;}/// <summary>/// 异步接收消息/// </summary>/// <param name="sender"></param>/// <param name="eventArgs"></param>/// <returns></returns>private async Task ReceivedAsync(object sender, BasicDeliverEventArgs eventArgs){AsyncEventingBasicConsumer consumer = sender as AsyncEventingBasicConsumer;string queue = consumer.Channel.CurrentQueue;var body = eventArgs.Body.ToArray();receivedCallBack?.Invoke(queue, body);UnityEngine.Debug.Log($"[x] Received:");// 模拟异步任务处理(比如访问数据库或调用其他服务)await Task.Delay(-1);}public void Dispose(){rabbitMQConnect?.Dispose();}}
}
  1. 客户端实例
using System.Text;
using System.Threading.Tasks;
using LitJson;
using System.Collections;
using System.Collections.Generic;namespace Network
{/// <summary>/// RabbitMQ 客户端,可以实例化多个/// </summary>public class RabbitMQClient{private RabbitMQSend rabbitMQSend;private RabbitMQReceive rabbitMQReceive;/// <summary>/// 实例化/// </summary>public RabbitMQClient(){}/// <summary>/// 创建一个发送消息的队列/// </summary>/// <param name="queue">连接的队列名,如果为空,服务器会生成一个</param>/// <param name="hostName">链接的服务器地址</param>/// <param name="userName">用户名</param>/// <param name="password">密码</param>public void CreateSend(string queue, string hostName, string userName, string password){rabbitMQSend = new RabbitMQSend(queue, hostName, userName, password);}/// <summary>/// 创建一个接收消息的队列/// </summary>/// <param name="queue">连接的队列名,如果为空,服务器会生成一个</param>/// <param name="hostName">链接的服务器地址</param>/// <param name="userName">用户名</param>/// <param name="password">密码</param>public void CreateReceive(string queue, string receiveQueue, string hostName, string userName, string password){Debug.Log("CreateReceive");rabbitMQReceive = new RabbitMQReceive(queue, receiveQueue, hostName, userName, password);rabbitMQReceive.SetReceivedCB(ReceivedAsync);}/// <summary>/// 连接服务器/// </summary>/// <returns></returns>public async void StartConnect(){rabbitMQSend.Start();await Task.Delay(1);}/// <summary>/// 接收消息回调/// </summary>/// <param name="byteData"></param>private void ReceivedAsync(string queue, byte[] byteData){var jsonString = Encoding.UTF8.GetString(byteData);UnityEngine.Debug.Log($"[x] ReceivedAsync: {jsonString}");JsonData jsonData = JsonMapper.ToObject(jsonString);// 要检查的属性名string propertyName = "cmdId";CmdId cmdId = CmdId.None;if (((IDictionary)jsonData).Contains(propertyName)){string cmdIdStr = jsonData[propertyName].ToString();cmdId = (CmdId)int.Parse(cmdIdStr);}//Task task = JsonMapper.ToObject<Task>(jsonData.ToJson());NetWorkNotifycation.GetInstance().Notify<JsonData>(cmdId, jsonData);}/// <summary>/// 发送消息/// </summary>/// <param name="message"></param>/// <returns></returns>public async void SendAsync(string message){await rabbitMQSend.SendAsync(message);}public void Dispose(){rabbitMQSend?.Dispose();rabbitMQReceive?.Dispose();}}
}
  1. 测试用例
using UnityEngine;
using Network;public class RabbitMQDemo : MonoBehaviour
{// 客户端private RabbitMQClient rabbitMQClient;// Start is called before the first frame updatevoid Start(){// 实例化rabbitMQClient = new RabbitMQClient();}private int number = 1000;// Update is called once per framevoid Update(){if (Input.GetKeyDown(KeyCode.A)){UnityEngine.Debug.LogError("创建一个 发送 消息的队列");rabbitMQClient.CreateSend("queueLA", "localhost", "guest", "guest");}if (Input.GetKeyDown(KeyCode.D)){number++;string msg = "My Message:" + number.ToString();UnityEngine.Debug.LogError("发送消息:" + msg);rabbitMQClient.SendAsync(msg);}if (Input.GetKeyDown(KeyCode.W)){UnityEngine.Debug.LogError("创建一个 接收 消息的队列");rabbitMQClient.CreateReceive("queueLB", "queueLA", "localhost", "guest", "guest");//RabbitMQClient client2 = new RabbitMQClient();//client2.CreateReceive("queueLA", "localhost", "guest", "guest");}}private void OnApplicationQuit(){rabbitMQClient.Dispose();}
}
/// <summary>/// 网络连接状态/// </summary>public enum NetWorkState{// init/// <summary>/// 关闭/断开连接/// </summary>Closed,// client/// <summary>/// 已经建立连接/// </summary>Connected,/// <summary>/// 正在请求连接/// </summary>Connecting,/// <summary>/// 连接失败/// </summary>ConnectFailed,// both/// <summary>/// 连接超时/// </summary>Timeout,/// <summary>/// 断开连接/// </summary>Disconnected,}

扩展
可以在 网页上 Overview 页面,找到 Ports and contexts 部分
可以看到每种协议对应的端口是不一样的
每种协议都有一种独立的连接方式
需要根据自己选择的协议拼接路径

比如 我上面代码使用的 http 方式

    string localHost = "localhost"; // ip如 xxx.xxx.xxx.xxxstring userName = "用户名";string password = "密码";// 创建连接工厂// 如果初始化失败,不会启动恢复连接factory = new ConnectionFactory(){HostName = hostName, // 替换为你的 RabbitMQ 服务器地址UserName = userName, // 替换为用户名Password = password  // 替换为密码};

amqp 协议连接方式如下

	string url = $"amqp://{userName}:{password}@{hostName}:{port}"; factory = new ConnectionFactory(){Uri = new Uri(url)};

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/62629.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

题海拾贝——生成元(Digit Generator,ACM/ICPC SEOUL 2005,UVa1583)

Hello大家好&#xff01;很高兴我们又见面啦&#xff01;给生活添点passion&#xff0c;开始今天的编程之路&#xff01; 我的博客&#xff1a;<但凡. 欢迎点赞关注&#xff01; 1、题目描述 如果x加上x的各个数字之和得到y&#xff0c;就说x是y的生成元。给出(1<n<10…

欧科云链研究院:比特币还能“燃”多久?

出品&#xff5c; OKG Research 作者&#xff5c;Hedy Bi 本周二&#xff0c;隔夜“特朗普交易” 的逆转趋势波及到比特币市场。比特币价格一度冲高至约99,000美元后迅速回落至93,000美元以下&#xff0c;最大跌幅超6%。这是由于有关以色列和黎巴嫩有望达成停火协议的传闻引发…

hint: Updates were rejected because the tip of your current branch is behind!

问题 本地仓库往远段仓库推代码时候提示&#xff1a; error: failed to push some refs to 192.168.2.1:java-base/java-cloud.git hint: Updates were rejected because the tip of your current branch is behind! refs/heads/master:refs/heads/master [rejected] (…

设计模式面试大全:说一下单例模式,及其应用场景?

定义 单例模式&#xff08;Singleton Pattern&#xff09;是 Java 中最简单的设计模式之一&#xff0c;此模式保证某个类在运行期间&#xff0c;只有一个实例对外提供服务&#xff0c;而这个类被称为单例类。 单例模式也比较好理解&#xff0c;比如一个人一生当中只能有一个真…

go-zero使用自定义模板实现统一格式的 body 响应

前提 go环境的配置、goctl的安装、go-zero的基本使用默认都会 需求 go-zero框架中&#xff0c;默认使用goctl命令生成的代码并没有统一响应格式&#xff0c;现在使用自定义模板实现统一响应格式&#xff1a; {"code": 0,"msg": "OK","d…

【Python网络爬虫笔记】5-(Request 带参数的get请求) 爬取豆瓣电影排行信息

目录 1.抓包工具查看网站信息2.代码实现3.运行结果 1.抓包工具查看网站信息 请求路径 url:https://movie.douban.com/typerank请求参数 页面往下拉&#xff0c;出现新的请求结果&#xff0c;参数start更新&#xff0c;每次刷新出20条新的电影数据 2.代码实现 # 使用网络爬…

玻璃效果和窗户室内效果模拟

一、玻璃效果 首先来讲如何模拟玻璃效果。玻璃的渲染包括三部分&#xff0c;普通场景物体的渲染、反射和折射模拟、毛玻璃模拟。作为场景物体&#xff0c;那么类似其它场景物体Shader一样&#xff0c;可以使用PBR、BlingPhong或者Matcap&#xff0c;甚至三阶色卡通渲染都可以。…

某东图标点选验证码

注意&#xff0c;本文只提供学习的思路&#xff0c;严禁违反法律以及破坏信息系统等行为&#xff0c;本文只提供思路 如有侵犯&#xff0c;请联系作者下架 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。…

九、Ubuntu Linux操作系统

一、Ubuntu简介 Ubuntu Linux是由南非人马克沙特尔沃思(Mark Shutteworth)创办的基于Debian Linux的操作系统&#xff0c;于2004年10月公布Ubuntu是一个以桌面应用为主的Linux发行版操作系统Ubuntu拥有庞大的社区力量&#xff0c;用户可以方便地从社区获得帮助其官方网站:http…

fastdds:编译、安装并运行helloworld

fastdds安装可以参考官方文档&#xff1a; 3. Linux installation from sources — Fast DDS 3.1.0 documentation 从INSTALLATION MANUAL这一节可以看出来&#xff0c;fastdds支持的操作系统包括linux、windows、qnx、MAC OS。本文记录通过源码和cmake的方式来安装fastdds的…

7. 现代卷积神经网络

文章目录 7.1. 深度卷积神经网络&#xff08;AlexNet&#xff09;7.2. 使用块的网络&#xff08;VGG&#xff09;7.3. 网络中的网络&#xff08;NiN&#xff09;7.4. 含并行连结的网络&#xff08;GoogLeNet&#xff09;7.5. 批量规范化7.5.1. 训练深层网络7.5.2. 批量规范化层…

监控视频汇聚平台:Liveweb视频监控管理平台方案详细介绍

Liveweb国标视频综合管理平台是一款以视频为核心的智慧物联应用平台。它基于分布式、负载均衡等流媒体技术进行开发&#xff0c;提供广泛兼容、安全可靠、开放共享的视频综合服务。该平台具备多种功能&#xff0c;包括视频直播、录像、回放、检索、云存储、告警上报、语音对讲、…

常用函数的使用错题汇总

目录 new/delete malloc/free1. 语言和类型2. 内存分配3. 内存释放4. 安全性和类型安全5. 其他特性总结 线程停止文件流 new/delete malloc/free malloc/free 和 new/delete 是 C/C 中用于动态内存管理的两种方式&#xff0c;它们有一些重要的区别。以下是这两种方式的比较&…

Istio笔记01--快速体验Istio

Istio笔记01--快速体验Istio 介绍部署与测试部署k8s安装istio测试istio 注意事项说明 介绍 Istio是当前最热门的服务网格产品&#xff0c;已经被广泛应用于各个云厂商和IT互联网公司。企业可以基于Istio轻松构建服务网格&#xff0c;在接入过程中应用代码无需更改&#xff0c;…

爬取boss直聘上海市人工智能招聘信息+LDA主题建模

爬取boss直聘上海市人工智能招聘信息 import time import tqdm import random import requests import json import pandas as pd import os from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriv…

入门数据结构JAVADS——如何构建一棵简单二叉排序树

目录 前言 什么是二叉排序树 二叉排序树的特点 二叉排序树示意图 构建二叉排序树 插入元素 搜索元素 删除元素 完整代码 结尾 前言 在整个十一月,笔者因为一些原因停笔了,但马上迈入12月进而进入2025年,笔者决定不再偷懒了,继续更新以促进学习的积极性.闲话说到这,今天…

40分钟学 Go 语言高并发:GC原理与优化

GC原理与优化 一、GC基础知识概览 方面核心概念重要性优化目标GC算法三色标记法、并发GC⭐⭐⭐⭐⭐理解GC工作原理垃圾回收策略触发条件、回收步骤⭐⭐⭐⭐⭐掌握GC过程GC调优参数设置、性能监控⭐⭐⭐⭐优化GC效果内存管理内存分配、内存逃逸⭐⭐⭐⭐⭐减少内存压力 让我们…

linux 文件权限,修改权限,系统调用

参考chmod 777 到底是啥 ???看完这个你就完全懂了&#xff01;-CSDN博客 ls -l 查看当前目录文件的权限 会有一个十位的东西 分别为 d:这是一个文件夹 后面3*3位分别表示所有者用户&#xff0c;同组用户&#xff0c;其他用户的读(r)&#xff0c;写(w)&#xff0c;执行(x)…

notepad++文件github下载

1、github下载网址&#xff1a;Releases notepad-plus-plus/notepad-plus-plus GitHub 2、找到操作系统支持的软件&#xff1a; 3、CSDN下载链接&#xff1a;https://download.csdn.net/download/u013083576/90046203

【CSS in Depth 2 精译_064】10.3 CSS 中的容器查询相对单位 + 10.4 CSS 容器样式查询 + 10.5 本章小结

当前内容所在位置&#xff08;可进入专栏查看其他译好的章节内容&#xff09; 【第十章 CSS 容器查询】 ✔️ 10.1 容器查询的一个简单示例 10.1.1 容器尺寸查询的用法 10.2 深入理解容器 10.2.1 容器的类型10.2.2 容器的名称10.2.3 容器与模块化 CSS 10.3 与容器相关的单位 ✔…