go对rabbitmq基本操作

一、安装rabbitmq

  • 1、直接使用docker拉取镜像

    docker pull rabbitmq:3.8
    
  • 2、启动容器

    docker run \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=123456 \-v mq-plugins:/plugins \--name rabbit01 \--hostname rabbit01 --restart=always \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8
    
  • 3、关于端口的介绍

    • 15672的给浏览器控制台使用的
    • 5672是给程序调用的
  • 4、进入到rabbit01容器中

    docker exec -it rabbit01 /bin/bash
    
  • 5、开启可视化界面操作

    rabbitmq-plugins enable rabbitmq_management
    
  • 6、客户端直接访问xx:15672

  • 7、或者直接用别人搞好的镜像

    docker run \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=123456 \-v mq-plugins:/plugins \--name rabbit02 \--hostname rabbit02 --restart=always \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8-management
    

二、go语言对rabbitmq基本操作

  • 1、安装依赖包

    go get github.com/streadway/amqp
    
  • 2、基本的连接操作

    package mainimport ("fmt""github.com/streadway/amqp"
    )func main() {// 连接rabbitmq// conn,_ := amqp.Dial("amqp://用户名:密码@IP:端口号/虚拟机空间名称")   // 端口号:5672conn, _ := amqp.Dial("amqp://admin:123456@localhost:5672//") // 端口号:5672defer conn.Close()// 打开通道ch, err := conn.Channel()fmt.Println(err)defer ch.Close()
    }
    
  • 3、由于部分每个地方都要使用,封装成一个方法

    package utilsimport ("fmt""github.com/streadway/amqp"
    )func RabbitmqUtils() *amqp.Channel {// 连接rabbitmqconn, _ := amqp.Dial("amqp://admin:123456@localhost:5672//") // 端口号:5672//defer conn.Close()// 打开通道ch, err := conn.Channel()fmt.Println(err)//defer ch.Close()return ch
    }
    
  • 4、创建一个队列,然后到可视化界面查看是否自动创建

    func main() {// 创建一个队列// durable, autoDelete, exclusive, noWait boolqueue, err := utils.RabbitmqUtils().QueueDeclare("simple_queue", false, false, false, false, nil)fmt.Println(queue.Name, err)
    }
    

    在这里插入图片描述

  • 5、关于创建队列几个参数的介绍

    • 第一个参数是队列名称
    • 第二个参数是队列是否持久化
    • 第三个参数是是否自动删除
    • 第四个参数是队列是否可以被其他队列访问
    • 第五个参数是设置为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度

三、简单模式

  • 1、根据官网图来看,简单模式是不需要交换机的

    在这里插入图片描述

  • 2、定义生产者,向队列中发送消息(注意要先创建队列)

    func main() {/**第一个参数是交换机名称第二个参数是队列名称第三个参数是 如果生产者生产的任务没有正常进入队列中,设置为true会返还给生产者,设置为false会直接丢弃第四个参数是 路由的时候第五个参数是消息体*/err := utils.RabbitmqUtils().Publish("", "simple_queue", false, false, amqp.Publishing{Body: []byte("hello word"),})fmt.Println(err)
    }
    
  • 3、查看可是界面是否存在一条消息

  • 4、创建消费者,来获取消息内容

    /**
    第一个参数是队列名称
    第二个参数自己给当前消费者命名
    第三个参数是否自动应答
    第三个参数队列是否可以被其他队列访问
    第四个参数
    第五个参数设置为true则表示不等待服务器回执信息.函数将返回NULL,可以提高访问速度
    */
    msgChan, err := utils.RabbitmqUtils().Consume("simple_queue", "", false, false, false, false, nil)
    fmt.Println(err)
    for msg := range msgChan {fmt.Println(string(msg.Body))
    }
    

四、工作模式

  • 1、工作模式是指一个生产者多个消费者,在简单模式上扩展成多个消费者,每个消费者只能交替来消费消息

  • 2、定义2个消费者来消费消息

    func main() {msgChan, err := utils.RabbitmqUtils().Consume("work_queue", "", true, false, false, true, nil)fmt.Println(err)for msg := range msgChan {fmt.Println("消费者1:", string(msg.Body))}
    }
    
  • 3、生产多条消息

    func main() {for i := 0; i < 10; i++ {_ = utils.RabbitmqUtils().Publish("", "work_queue", false, false, amqp.Publishing{Body: []byte(fmt.Sprintf("hello word %d", i)),})}
    }
    
  • 4、消费结果

    在这里插入图片描述

五、发布订阅模式

  • 1、发布订阅模式同样是一个生产者生产消息,多个消费者来消费,与上面的工作模式的区别是:工作模式是一个消费者消费后,另外一个消费者就消费不到了,发布订阅模式是不管有几个消费者都可以消费到消息

  • 2、使用goapi来创建交换机和队列

    func main() {// 1.创建2个队列queue1, _ := utils.RabbitmqUtils().QueueDeclare("first_queue", true, false, false, true, nil)queue2, _ := utils.RabbitmqUtils().QueueDeclare("second_queue", true, false, false, true, nil)// 2.创建一个交换机_ = utils.RabbitmqUtils().ExchangeDeclare("first_exchange", amqp.ExchangeDirect, true, false, false, false, nil)// 3.队列和交换机绑定在一起_ = utils.RabbitmqUtils().QueueBind(queue1.Name, "", "first_exchange", true, nil)_ = utils.RabbitmqUtils().QueueBind(queue2.Name, "", "first_exchange", true, nil)
    }
    
  • 3、消费者只需要绑定队列来消费消息就可以

    func main() {msgChan, err := utils.RabbitmqUtils().Consume("first_queue", "", true, false, false, true, nil)fmt.Println(err)for msg := range msgChan {fmt.Println("消费者1:", string(msg.Body))}
    }
    
  • 4、生产者只需要把消息发送到交换机里面就可以,交换机会根据绑定的队列来推送消息

    func main() {_ = utils.RabbitmqUtils().Publish("first_exchange", "", false, false, amqp.Publishing{Body: []byte("hello word"),})
    }
    
  • 5、可以查看控制台两个消费者都接收到消息

六、路由模式

  • 1、路由模式和上面的发布订阅模式有点类似,只是在上面的基础上添加的路由key

  • 2、使用go-api创建交换机和队列,并且对其绑定

    func main() {// 1.创建2个队列queue1, _ := utils.RabbitmqUtils().QueueDeclare("first_queue_key", true, false, false, true, nil)queue2, _ := utils.RabbitmqUtils().QueueDeclare("second_queue_key", true, false, false, true, nil)// 2.创建一个交换机err := utils.RabbitmqUtils().ExchangeDeclare("second_exchange", amqp.ExchangeDirect, true, false, false, false, nil)if err != nil {fmt.Println(err)}// 3.队列和交换机绑定在一起_ = utils.RabbitmqUtils().QueueBind(queue1.Name, "info", "second_exchange", true, nil)_ = utils.RabbitmqUtils().QueueBind(queue2.Name, "info", "second_exchange", true, nil)_ = utils.RabbitmqUtils().QueueBind(queue2.Name, "error", "second_exchange", true, nil)
    }
    
  • 3、定义消费者

    func main() {msgChan, err := utils.RabbitmqUtils().Consume("first_queue_key", "", true, false, false, true, nil)fmt.Println(err)for msg := range msgChan {fmt.Println("消费者1:", string(msg.Body))}
    }
    
  • 4、定义生产者

    func main() {// 消费者会根据绑定的路由key来获取消息_ = utils.RabbitmqUtils().Publish("second_exchange", "error", false, false, amqp.Publishing{Body: []byte("hello word"),})
    }
    

七、主题模式

  • 1、主题模式和上面路由模式差不多,只是多了一个模糊匹配
    • *表示只匹配一个单词
    • #表示匹配多个单词

八、简单对其封装

  • 1、封装代码

    package utilsimport ("errors""fmt""github.com/streadway/amqp""log"
    )// MQURL url的格式 amqp://账号:密码@rabbitmq服务器地址:端口号/vhost
    const MQURL = "amqp://admin:123456@localhost:5672//"type RabbitMQ struct {conn    *amqp.Connectionchannel *amqp.ChannelMQUrl   string
    }// NewRabbitMQ 创建RabbitMQ的结构体实例
    func NewRabbitMQ() *RabbitMQ {rabbitMQ := &RabbitMQ{MQUrl: MQURL,}var err error// 创建rabbitMQ连接rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MQUrl)if err != nil {rabbitMQ.failOnErr(err, "创建连接错误")}rabbitMQ.channel, err = rabbitMQ.conn.Channel()if err != nil {rabbitMQ.failOnErr(err, "获取channel失败")}return rabbitMQ
    }// Binding 创建交换机和队列并且绑定在一起
    func (r *RabbitMQ) Binding(queueName, exchange, key, routerKey string) {// 1.创建1个队列queue1, err := r.channel.QueueDeclare(queueName, true, false, false, true, nil)if err != nil {r.failOnErr(err, "创建队列失败")}if exchange != "" && key == "" {r.failOnErr(errors.New("错误"), "请传递交换机链接类型")}if exchange != "" {// 2.创建一个交换机err1 := r.channel.ExchangeDeclare(exchange, key, true, false, false, false, nil)if err1 != nil {r.failOnErr(err, "创建交换机失败")}// 3.队列和交换机绑定在一起if err := r.channel.QueueBind(queue1.Name, routerKey, exchange, true, nil); err != nil {fmt.Println("1111")r.failOnErr(err, "交换机和队列绑定失败")}}fmt.Println("创建成功")
    }// failOnErr 定义内部错误处理
    func (r *RabbitMQ) failOnErr(err error, message string) {if err != nil {log.Fatalf("%s:%s", message, err)panic(fmt.Sprintf("%s:%s", message, err))}
    }func (r *RabbitMQ) Close() {defer func(Conn *amqp.Connection) {err := Conn.Close()if err != nil {r.failOnErr(err, "关闭链接失败")}}(r.conn)defer func(Channel *amqp.Channel) {err := Channel.Close()if err != nil {r.failOnErr(err, "关闭通道失败")}}(r.channel)
    }func (r *RabbitMQ) Qos(prefetchCount, prefetchSize int, global bool) {err := r.channel.Qos(prefetchCount, prefetchSize, global)if err != nil {r.failOnErr(err, "限流失败")}
    }// Publish 发布者
    func (r *RabbitMQ) Publish(exchange, routerKey, message string) {// 2.发送数据到队列中if err := r.channel.Publish(exchange,routerKey,false, // 如果为true的时候会根据exchange的类型和routKey规则,如果无法找到符合条件的队列那么会把发送的消息发挥给发送者false, // 如果为true的时候当exchane发送消息到队列后发现队列上没有绑定消费者则会把消息发还给发送者amqp.Publishing{Body: []byte(message),},); err != nil {r.failOnErr(err, "发送消息失败")}fmt.Println("恭喜你,消息发送成功")
    }// Consumer 消费者
    func (r *RabbitMQ) Consumer(queueName string, callback func(message []byte)) {// 2.接收消息message, err := r.channel.Consume(queueName,"",    // 区分多个消费者true,  // 是否自动应答false, // 是否具有排他性false, // 如果为true的时候,表示不能将同一个connection中发送的消息传递给connection中的消费者false, // 队列消费是否阻塞nil,)if err != nil {r.failOnErr(err, "接收消息失败")}fmt.Println("消费者等待消费...")forever := make(chan bool)// 使用协程处理消息go func() {for d := range message {log.Printf("接收到的消息:%s", d.Body)callback(d.Body)}}()<-forever
    }
    
  • 2、简单模式的使用

    func main() {mq := utils.NewRabbitMQ()mq.Consumer("simple_queue1", func(message []byte) {fmt.Println(string(message))})defer mq.Close()
    }func main() {mq := utils.NewRabbitMQ()mq.Binding("simple_queue1", "", "", "")defer mq.Close()mq.Publish("", "simple_queue1", "你好水痕")
    }
    
  • 3、工作模式的使用

    func main() {mq := utils.NewRabbitMQ()mq.Consumer("work_queue1", func(message []byte) {fmt.Println("消费者2", string(message))})defer mq.Close()
    }func main() {mq := utils.NewRabbitMQ()defer mq.Close()for i := 0; i < 10; i++ {mq.Publish("", "work_queue1", fmt.Sprintf("你好水痕%d", i))}
    }
    
  • 4、交换机带路由的时候

    func main() {mq := utils.NewRabbitMQ()mq.Binding("first_queue1", "first_exchange1", amqp.ExchangeDirect, "info")mq.Binding("first_queue1", "first_exchange1", amqp.ExchangeDirect, "error")mq.Binding("first_queue2", "first_exchange1", amqp.ExchangeDirect, "info")defer mq.Close()
    }func main() {mq := utils.NewRabbitMQ()mq.Consumer("first_queue2", func(message []byte) {fmt.Println("消费者2", string(message))})defer mq.Close()
    }func main() {mq := utils.NewRabbitMQ()defer mq.Close()mq.Publish("first_exchange1", "error", "你好水痕")
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/170951.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《C++PrimePlus》第9章 内存模型和名称空间

9.1 单独编译 Visual Studio中新建头文件和源代码 通过解决方案资源管理器&#xff0c;如图所示&#xff1a; 分成三部分的程序&#xff08;直角坐标转换为极坐标&#xff09; 头文件coordin.h #ifndef __COORDIN_H__ // 如果没有被定义过 #define __COORDIN_H__struct pola…

【开源】基于Vue.js的城市桥梁道路管理系统的设计和实现

项目编号&#xff1a; S 025 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S025&#xff0c;文末获取源码。} 项目编号&#xff1a;S025&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块三、系统展示四、核心代码4.1 查询城市桥…

学生信息管理系统程序Python

系统主界面 在该界面中可以选择要使用功能对应的菜单进行不同的操作。在选择功能菜单时&#xff0c;有两种方法&#xff0c; 一种是输入1&#xff0c;另一种是按下键盘上的↑或↓方向键进行选择。这两种方法的结果是一样的&#xff0c;所以使用哪种方法都可以。 &#xff08;…

时间序列预测 — Informer实现多变量负荷预测(PyTorch)

目录 1 实验数据集 2 如何运行自己的数据集 3 报错分析 1 实验数据集 实验数据集采用数据集4&#xff1a;2016年电工数学建模竞赛负荷预测数据集&#xff08;下载链接&#xff09;&#xff0c;数据集包含日期、最高温度℃ 、最低温度℃、平均温度℃ 、相对湿度(平均) 、降雨…

什么是零拷贝 、零拷贝优化方案 - 真正的零拷贝,哪些地方会用到零拷贝技术

文章目录 什么是零拷贝3、零拷贝优化方案 - 真正的零拷贝哪些地方会用到零拷贝技术 现在来谈谈零拷贝&#xff0c;以及在开发中哪些地方使用到零拷贝。 开干… 什么是零拷贝 零拷贝指的是&#xff0c;从一个存储区域到另一个存储区域的copy任务无需CPU参与就可完成。零拷贝的底…

徕芬不是满分:自称超越戴森,用户称多次故障,品控仍是老大难?

撰稿|行星 来源|贝多财经 “双十一”购物节落下帷幕后&#xff0c;各大品牌纷纷公布“战报”。其中&#xff0c;高速吹风机品牌徕芬&#xff08;也称“徕芬科技”&#xff09;销售额超4.4亿元&#xff0c;全系产品销量超过80万台&#xff0c;高速吹风机系列单品(LF03、SE)销售…

来自Microsoft Teams的摄像头背景图片

原文件在&#x1f446;&#xff0c;下面是预览图 如果你安装了Microsoft Teams也可以搜索MSTeams&#xff0c;就在MSTeams/Backgrounds

【anaconda】numpy.dot 向量点乘小技巧

假设向量A[1,1], 向量B[2,3]。如果想知道他们的内积就可以输入如下代码: 当然&#xff0c;如果是两个列向量相乘&#xff0c;肯定是不对的 但是如果没有维度也一样可以求得内积&#xff0c;而且结果不会套在列表里

AI和人工智能与机器学习全景报告

今天分享的是AI系列深度研究报告&#xff1a;《AI和人工智能与机器学习全景报告》。 &#xff08;报告出品方&#xff1a;appen&#xff09; 报告共计&#xff1a;30页 获取 数据获取仍是AI应用构建团队的主要瓶颈。 原因各不相同。例如&#xff0c;特定用例的数据可能不足…

Day02嵌入式---按键控灯

一、简单介绍 按键控制灯开关是一种常见的嵌入式系统示例项目&#xff0c;它通常用于演示嵌入式系统的基本控制能力。该项目由一个或多个LED和一个按键组成。通过按下按键&#xff0c;可以控制LED的开关状态&#xff0c;从而实现灯的亮灭控制。 二、查看功能手册 2.1 查看硬件…

基于单片机压力传感器MPX4115检测-报警系统proteus仿真+源程序

一、系统方案 1、本设计采用这51单片机作为主控器。 2、MPX4115采集压力值、DS18B20采集温度值送到液晶1602显示。 3、按键设置报警值。 4、蜂鸣器报警。 二、硬件设计 原理图如下&#xff1a; 三、单片机软件设计 1、首先是系统初始化 /*********************************…

鸿蒙开发之android开发人员指南《基础知识》

基于华为鸿蒙未来可能不再兼容android应用&#xff0c;推出鸿蒙开发系列文档&#xff0c;帮助android开发人员快速上手鸿蒙应用开发。 1. 鸿蒙使用什么基础语言开发&#xff1f; ArkTS是鸿蒙生态的应用开发语言。它在保持TypeScript&#xff08;简称TS&#xff09;基本语法风…

【免费使用】基于PaddleSeg开源项目开发的人像抠图Web API接口

基于PaddleSeg开源项目开发的人像抠图API接口&#xff0c;服务器不存储照片大家可放心使用。 1、请求接口 请求地址&#xff1a;http://apiseg.hysys.cn/predict_img 请求方式&#xff1a;POST 请求参数&#xff1a;{"image":"/9j/4AAQ..."} 参数是jso…

与Windows 10更新大同小异!一步一步教你如何更新Windows 11

如果你想让你的Windows 11设备获得最佳性能&#xff0c;那么定期更新是至关重要的。即使是最好的电脑如果不更新也会受到影响&#xff0c;因为更新会应用软件调整&#xff0c;帮助你的设备更快、更平稳地运行。它还提高了安全性&#xff0c;意味着你可以从Microsoft的最新功能中…

Kafka-TopicPartition

Kafka主题与分区 主题与分区 topic & partition&#xff0c;是Kafka两个核心的概念&#xff0c;也是Kafka的基本组织单元。 主题作为消息的归类&#xff0c;可以再细分为一个或多个分区&#xff0c;分区也可以看作对消息的二次归类。 分区的划分为kafka提供了可伸缩性、水…

【H5 Canvas】【平面几何】特殊图形绘制(箭头/正多边/正多尖角形等)

文章目录 直线/弧线 箭头 直线/弧线 箭头 // startX,startY 起始坐标 // endX,endY 结束坐标 // angel 圆弧角度,取值[0&#xff0c;PI]; 0表示画直线箭头&#xff0c;否则画圆弧箭头 CanvasRenderingContext2D.prototype.drawArrow function(startX,startY,endX,endY,angel)…

openEuler Linux 部署 FineBi

openEuler Linux 部署 FineBi 部署环境 环境版本openEuler Linux22.03MySQL8.0.35JDK1.8FineBi6.0 环境准备 升级系统内核和软件 yum -y updatereboot安装常用工具软件 yum -y install vim tar net-tools 安装MySQL8 将 MySQL Yum 存储库添加到系统的存储库列表中 sudo…

JVM——垃圾回收算法(垃圾回收算法评价标准,四种垃圾回收算法)

目录 1.垃圾回收算法发展简介2.垃圾回收算法的评价标准1.吞吐量2.最大暂停时间3.堆使用效率 3.垃圾回收算法01-标记清除算法垃圾回收算法-标记清除算法的优缺点 4.垃圾回收算法02-复制算法垃圾回收算法-复制算法的优缺点 5.垃圾回收算法03-标记整理算法标记整理算法的优缺点 6.…

适用于 Mac 和 Windows 的顶级U 盘数据恢复软件

由于意外删除或设备故障而丢失 USB 驱动器中的数据始终是一件令人压力很大的事情&#xff0c;检索该信息的最佳选择是使用优质数据恢复软件。为了让事情变得更容易&#xff0c;我们已经为您完成了所有研究并测试了工具&#xff0c;并且我们列出了最好的 USB 记忆棒恢复软件&…

队列实现栈VS栈实现队列

目录 【1】用队列实现栈 思路分析 ​ 易错总结 Queue.c&Queue.h手撕队列 声明栈MyStack 创建&初始化栈myStackCreate 压栈myStackPush 出栈&返回栈顶元素myStackPop 返回栈顶元素myStackTop 判断栈空否myStackEmpty 释放空间myStackFree MyStack总代码…