RabbitMQ-默认读、写方式介绍

1、RabbitMQ简介

rabbitmq是一个开源的消息中间件,主要有以下用途,分别是:

  1. 应用解耦:通过使用RabbitMQ,不同的应用程序之间可以通过消息进行通信,从而降低应用程序之间的直接依赖性,提高系统的可维护性、扩展性和容错性。
  2. 异步提速:通过将耗时的操作转化为异步执行,可以提高系统的响应速度和吞吐量,提升用户体验。
  3. 削峰填谷:在高峰时段,RabbitMQ可以缓存大量的消息,从而避免系统崩溃,并在低峰时段处理这些消息,提高系统的稳定性。
  4. 消息分发:RabbitMQ可以将消息分发到多个消费者进行处理,从而提高系统的灵活性和处理能力。

了解rabbitmq的设计架构,对理解mq如何使用有很大的帮助。

一个非常重要的点,mq中的生产者从来不是直接将消息发送到队列中的,而是将消息发送到了mq的交换机中(上图中的exchange为交换机), 甚至生产者都不知道这条消息将被发送到哪个队列中。

交换机是个怎样的设计呢,他的一侧连接生产者,从生产者接收消息,另外一侧连接队列,将消息push进队列中,将消息push进一个队列,还是多个队列,还是抛弃,这些策略是由交换机的类型决定的,对于交换机的使用,后面详细介绍。

2、RabbitMQ安装

rabiitmq的安装,最简单的一种方式为运行mq的docker镜像,一行命令搞定:

# latest RabbitMQ 3.13
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management

执行命令后,可以看到如下打印,则代表RabbitMQ启动成功:

镜像启动成功后,可以通过ip:15672打开mq控制台:

http://xxx.xx.xxx.xx:15672/#/

 

mq安装完成后,下面就可以进行实践啦。

3、默认模式读、写mq

 rabbitmq官方的库:github.com/rabbitmq/amqp091-go

生产者侧代码:

package mainimport ("context""fmt""time"amqp "github.com/rabbitmq/amqp091-go"
)func Send(msg string) error {// 连接rabbitmqconn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {fmt.Println("connect error:", err)return err}defer conn.Close()// 创建通道ch, err := conn.Channel()if err != nil {fmt.Println("channel error:", err)return err}defer ch.Close()// 创建队列,使用默认的交换机q, err := ch.QueueDeclare("lp_default", // nametrue,         // durablefalse,        // delete when unusedfalse,        // exclusivefalse,        // noWaitnil,          // arguments)if err != nil {fmt.Println("queue declare error:", err)return err}ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)defer cancel()fmt.Println(q.Name)// body := "Hello World!"err = ch.PublishWithContext(ctx,"",     // exchange,默认交换机q.Name, // routing keyfalse,  // mandatoryfalse,  // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(msg),})if err != nil {fmt.Println("publish error:", err)return err}return nil
}func main() {Send("Hello world")
}

 运行上面代码后,可以在rabbitmq的客户端 看到这个队列:

 点击队列,进入队列详情:

第一个框中,显式了队列详情,可以看出,这个队列绑定的是默认的交换机。

第二个框,点击后,可以看到队列中的消息详情。

消费者:

package mainimport ("fmt"amqp "github.com/rabbitmq/amqp091-go"
)func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {fmt.Println("connect error:", err)return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Println("Channel error:", err)return}defer ch.Close()q, err := ch.QueueDeclare("lp_default", // nametrue,         // durablefalse,        // delete when unusedfalse,        // exclusivefalse,        // no-waitnil,          // arguments)if err != nil {fmt.Println("Queue Declare error:", err)return}msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args)if err != nil {fmt.Println("Consume error:", err)return}var forever chan struct{}go func() {for d := range msgs {fmt.Printf("Received a message: %s\n", d.Body)}}()fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C")<-forever
}

代码运行记录:

liupeng@192 default % go run recive.go[*] Waiting for messages. To exit press CTRL+CReceived a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world

在以上消费端代码中,如果代码在处理消息的过程中出现异常导致了程序退出,这样正在处理的这条消息就会丢失,为了避免这种情况的发生,rabbitmq设计了消息应答的机制,我们修改上面程序,将auto-ack参数设置为false,当处理完消息后,使用d.Ack(false)发送消息应答。

	msgs, err := ch.Consume(q.Name, // queue"",     // consumerfalse,   // auto-ack,设置为false,取消自动应答false,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args)if err != nil {fmt.Println("Consume error:", err)return}var forever chan struct{}go func() {for d := range msgs {fmt.Printf("Received a message: %s\n", d.Body)d.Ack(false)  // 手动应答}}()fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C")<-forever

如果忘记了进行消息应答,消息会被重新发入调度队列,这样就会吃掉越来越多的内存。

但是,当rabbitmq的服务down掉后,队列中的消息仍然会丢失,为了保证在这种情况下,消息仍然能够不丢失,我们需要做两件事:队列不丢失+消息不丢失,代码如下:

队列持久化:

q, err := ch.QueueDeclare("hello",      // nametrue,         // durable,设置队列持久化false,        // delete when unusedfalse,        // exclusivefalse,        // no-waitnil,          // arguments
)
failOnError(err, "Failed to declare a queue")

消息持久化:

将DeliveryMode设置为amqp.Persistent

err = ch.PublishWithContext(ctx,"",     // exchange,默认交换机q.Name, // routing keyfalse,  // mandatoryfalse,  // immediateamqp.Publishing{ContentType:  "text/plain",Body:         []byte(msg),DeliveryMode: amqp.Persistent,})

以上就是默认读写rabbitmq的方法,后面再介绍其他几种使用方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/15977.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

功率电感的设计步骤

文章目录 1&#xff1a;高导磁气隙&#xff08;铁氧体&#xff09;1.1设计原理1.2 设计步骤 2 铁粉芯2.1&#xff1a;设计原理2.2&#xff1a;设计步骤 TI电感设计 学习视频原链接 截图 1 截图1 截图1 截图 2 截图2 截图2 1&#xff1a;高导磁气隙&#xff08;铁氧体&#…

基于机器学习判断面部微表情发现哪些人更容易诊有帕金森病

1. 概述 帕金森病&#xff08;Parkinson’s disease&#xff0c;PD&#xff09;是一种慢性、进展性的神经退行性疾病&#xff0c;主要影响运动系统。该病症以大脑中黑质致密部多巴胺能神经元的逐渐丧失为特征&#xff0c;导致多巴胺&#xff08;一种重要的神经递质&#xff09…

【Qt】深入探索Qt窗口与对话框:从创建到管理:QDockWidget(浮动窗口)、QDialog(对话框)

文章目录 前言&#xff1a;1. 浮动窗口2. 对话框介绍2.1. 示例&#xff1a;主窗口中&#xff0c;通过点击按钮&#xff0c;弹出一个新的对话框。2.2. 创建自定义对话框2.2.1. 纯代码的方式2.2.2. 图形化界面的方式 3. 模态对话框 和 非模态对话框4. Qt 内置对话框4.1. 消息对话…

Nginx R31 doc-12-NGINX SSL Termination 安全加密

前言 大家好&#xff0c;我是老马。很高兴遇到你。 我们为 java 开发者实现了 java 版本的 nginx https://github.com/houbb/nginx4j 如果你想知道 servlet 如何处理的&#xff0c;可以参考我的另一个项目&#xff1a; 手写从零实现简易版 tomcat minicat nginx 系列 从零手…

Git Submodules:深入理解与应用

在大型项目或跨多个独立项目的开发中&#xff0c;代码管理往往变得复杂。Git Submodules 是 Git 提供的一个强大功能&#xff0c;允许你在一个 Git 仓库&#xff08;称为父仓库&#xff09;中嵌套另一个 Git 仓库&#xff08;称为子模块仓库&#xff09;。本文将详细介绍 Git S…

Linux/Windows下如何同时运行服务端和客户端

假设服务端和客户端程序分别为server.c和client.c注意顺序&#xff01; 先运行服务端&#xff0c;后运行客户端先结束客户端&#xff0c;后结束客户端 编译 gcc -o server server.cgcc -o server client.c运行 # 先运行服务器 ./server# 再运行客户端 ./client./表示当前目录…

Hybrid Block Storage for Efficient Cloud Volume Service——论文泛读

TOS 2023 Paper 论文阅读笔记整理 问题 传统桌面和服务器应用程序向云的迁移给底层云存储带来了高性能、高可靠性和低成本的挑战。由于这些传统应用程序的I/O模式和一致性要求&#xff0c;与采用特定编程模型和范式&#xff08;如MapReduce[22]和RDD[52]&#xff09;的云原生…

香橙派AIpro(OrangePi AIPro)开发板初测评

开发板简介 最近&#xff0c;我拿到手一款Orange Pi AI Pro 开发板&#xff0c;它是香橙派联合华为精心打造的高性能AI 开发板&#xff0c;最早发布于2023年12月&#xff0c;其搭载了昇腾AI 处理器&#xff0c;可提供8TOPS INT8 的计算能力&#xff0c;内存提供了8GB 和16GB两…

基于jeecgboot-vue3的Flowable新建流程定义(一)

因为这个项目license问题无法开源&#xff0c;更多技术支持与服务请加入我的知识星球。 1、vue3版本因为流程分类是动态的&#xff0c;不再固定了&#xff0c;所以新建的时候需要选择建立哪种流程类型的流程 代码如下&#xff1a; <!-- 选择模型的流程类型对话框 -->&…

算法提高之一个简单的整数问题2

算法提高之一个简单的整数问题2 核心思想&#xff1a;线段树 懒标记&#xff1a;add存每个子节点需要加的数pushdown&#xff1a;将懒标记向下存 同时清除本行懒标记 #include <iostream>#include <cstring>#include <algorithm>using namespace std;type…

数据结构(六)图

2024年5月26日一稿(王道P220) 6.1 图的基本概念 6.1.1 图的定义 6.2 图的存储及基本操作 6.2.1邻接矩阵法 6.2.2 邻接表

python web自动化(分布式测试Grid)

Grid介绍 Selenium Grid 是 Selenium 提供的⼀个⼯具&#xff0c;⽤于⽀持在多台计算机上并⾏运⾏测试。 它允许将测试分发到不同的机器和浏览器组合上&#xff0c;同时收集结果。 1.并⾏执⾏测试⽤例&#xff1a;在不同的机器上并⾏执⾏测试⽤例&#xff0c;从⽽加速整个测试过…

Vulhub——adminer

文章目录 一、CVE-2021-21311&#xff08;SSRF&#xff09;二、CVE-2021-43008&#xff08;远程文件读取&#xff09; 一、CVE-2021-21311&#xff08;SSRF&#xff09; Adminer是一个PHP编写的开源数据库管理工具&#xff0c;支持MySQL、MariaDB、PostgreSQL、SQLite、MS SQL…

如何在WRF模型中更好地设置这些海洋物理参数以提高模拟精度?

在WRF&#xff08;Weather Research and Forecasting&#xff09;模型中正确设置海洠物理参数是提高模拟精度的关键&#xff0c;特别是当模拟涉及到海洋和大气的相互作用时。以下是一些提高模拟精度的策略和建议&#xff1a; 1. 理解模拟的地区和目标 在进行参数设置之前&…

基于SpringBoot+Vue的人事管理系统

引言 目前,人事管理的系统大都是CS架构的大型系统,很少有面向机关,事业单位内部的基于BS架构的微型人事系统,因此.开发一个基于BS架构的人事信息管理系统是非常必要的.但是基于BS架构的人事系统对于安全是一个大的考验点.在人事信息系统中,功能需简单清晰,可操作性强,其次安全…

使用paddlepaddle框架构建ViT用于CIFAR10图像分类

使用paddlepaddle框架构建ViT用于CIFAR10图像分类 硬件环境&#xff1a;GPU (1 * NVIDIA T4) 运行时间&#xff1a;一个epoch大概一分钟 import paddle import time import paddle.nn as nn import paddle.nn.functional as F import paddle.vision.transforms as transforms…

CCF-GESP 等级考试 2023年3月认证C++一级真题解析

2024年03月真题 一、单选题&#xff08;每题2分&#xff0c;共30分&#xff09; 第 1 题 以下不属于计算机输入设备的有&#xff08; &#xff09;。 A. 键盘B. 音箱C. 鼠标D. 传感器 正确答案&#xff1a;B. 音箱 解析&#xff1a; A. 键盘&#xff1a;键盘是输入设备。B. …

第六节:带你全面理解vue3 浅层响应式API: shallowRef, shallowReactive, shallowReadonly

前言 前面两章,给大家讲解了vue3中ref, reactive,readonly创建响应式数据的API, 以及常用的计算属性computed, 侦听器watch,watchEffect的使用 其中reactive, ref, readonly创建的响应式数据都是深层响应. 而本章主要给大家讲解以上三个API 对应的创建浅层响应式数据的 API,…

Java面试题:Executor框架在Java并发编程中扮演什么角色?如何使用它?

在Java并发编程中&#xff0c;Executor框架扮演着核心角色&#xff0c;它提供了一种高级的、线程安全的机制来异步执行任务。Executor框架的主要目的是将任务的提交与任务的执行分离&#xff0c;从而简化了多线程编程的复杂性。 Executor框架的角色&#xff1a; 任务与线程分离…

持续总结中!2024年面试必问 20 道 Redis面试题(八)

上一篇地址&#xff1a;持续总结中&#xff01;2024年面试必问 20 道 Redis面试题&#xff08;七&#xff09;-CSDN博客 十五、使用过Redis做异步队列么&#xff0c;你是怎么用的&#xff1f; Redis作为一个高性能的键值存储系统&#xff0c;非常适合用来实现异步队列。异步队…