RabbitMQ初识

官方介绍 - 中文 本文环境:ubuntu:20.04

RabbitMQ安装、配置与基本使用

安装RabbitMQ

# 简易脚本安装
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.deb.sh | sudo bash
sudo apt-get install rabbitmq-server -y --fix-missing
# 启动、关闭、重启、查看 rabbitmq 服务
sudo service rabbitmq-server start、stop、restart、status

配置RabbitMQ

# 注意,启动rabbitmq之后要启动管理服务插件,否则15672管理页面无法登录
sudo rabbitmq-plugins enable rabbitmq_management
# 因为guest用户默认只能在localhost登录,所以我们需要创建一个新的用户:
[fxm@fxm:~$ ] sudo rabbitmqctl add_user fxm(username) fxm(password) 
Adding user "fxm" ...
[fxm@fxm:~$ ] sudo rabbitmqctl set_permissions -p / fxm(username) ".*" ".*" ".*"
Setting permissions for user "fxm" in vhost "/" ...
[fxm@fxm:~$ ] sudo rabbitmqctl set_user_tags fxm(username) administrator
Setting tags for user "fxm" to [administrator] ...

使用RabbitMQ

  • 通过web访问: ip:15672
    在这里插入图片描述

六种工作模式

“Hello World!”(普通模式)

只有一个生产者,一个消费者,一个队列

工作队列

只有一个生产者,多个消费者,一个队列

发布/订阅

通过交换机将消息发送到多个队列,多个消费者订阅相应队列

路由

待续。。。

主题交换机

待续。。。

远程过程调用

待续。。。

Go实现(Go 1.15)

本文实现了两种模式-普通模式、发布/订阅
普通模式:1. 连接失败重新连接2. 发送成功确认3. 消费成功确认
发布/订阅:1. 连接失败重新连接2. 消费成功确认

普通模式(需要先创建Queue)

创建Queue
在这里插入图片描述

rabbit_p.go

package rabbit_pimport ("errors""fmt""github.com/streadway/amqp"
)const MqUrl = "amqp://fxm(username):fxm(password)@IP:5672/"// 定义全局变量,指针类型
var mqConn *amqp.Connection
var mqChan *amqp.Channel// 定义生产者接口
type Producer interface {MsgContent() []byte
}// 定义RabbitMQ对象
type RabbitMQ struct {Connection   *amqp.ConnectionChannel      *amqp.ChannelQueueName    string // 队列名称RoutingKey   string // key名称ExchangeName string // 交换机名称ExchangeType string // 交换机类型
}// 链接rabbitMQ
func (r *RabbitMQ) mqConnect() error {var err errormqConn, err = amqp.Dial(MqUrl)r.Connection = mqConn // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开链接失败:%s \n", err)return err}mqChan, err = mqConn.Channel()r.Channel = mqChan // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开管道失败:%s \n", err)return err}return nil
}// 关闭RabbitMQ连接
func (r *RabbitMQ) mqClose() {// 先关闭管道,再关闭链接err := r.Channel.Close()if err != nil {fmt.Printf("MQ管道关闭失败:%s \n", err)}err = r.Connection.Close()if err != nil {fmt.Printf("MQ链接关闭失败:%s \n", err)}
}// 启动RabbitMQ生产者
func (r *RabbitMQ) StartP(producer Producer) error {// 开启监听生产者发送任务if err := r.listenProducer(producer); err != nil {return err}return nil
}// 发送任务
func (r *RabbitMQ) listenProducer(producer Producer) error {// 处理结束关闭链接defer r.mqClose()// 验证链接是否正常,否则重新链接if r.Channel == nil {if err := r.mqConnect(); err != nil {return err}}err := r.Channel.Confirm(false)if err != nil {fmt.Printf("error: %s \n", err.Error())return err}confirms := r.Channel.NotifyPublish(make(chan amqp.Confirmation, 1))err = r.Channel.Publish(r.ExchangeName, // exchanger.QueueName,    // routing keyfalse,          // mandatoryfalse,          // immediateamqp.Publishing{ContentType:  "text/plain",Body:         producer.MsgContent(),})if err != nil {fmt.Printf("MQ任务发送失败:%s \n", err)return errors.New("MQ任务发送失败")}err = confirmOne(confirms)return err
}//检测是否发送成功
func confirmOne(confirms <-chan amqp.Confirmation) error {if confirmed := <-confirms; confirmed.Ack {return nil} else {return errors.New("任务发送失败")}
}

rabbit_c.go

package rabbit_cimport ("fmt""github.com/streadway/amqp"
)const MqUrl = "amqp://fxm(username):fxm(password)@IP:5672/"// 定义全局变量,指针类型
var mqConn *amqp.Connection
var mqChan *amqp.Channel// 定义接收者接口
type Receiver interface {Consumer([]byte) error
}// 定义RabbitMQ对象
type RabbitMQ struct {Connection   *amqp.ConnectionChannel      *amqp.ChannelQueueName    string // 队列名称RoutingKey   string // key名称ExchangeName string // 交换机名称ExchangeType string // 交换机类型
}// 链接rabbitMQ
func (r *RabbitMQ) mqConnect() error {var err errormqConn, err = amqp.Dial(MqUrl)r.Connection = mqConn // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开链接失败:%s \n", err)return err}mqChan, err = mqConn.Channel()r.Channel = mqChan // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开管道失败:%s \n", err)return err}return nil
}// 关闭RabbitMQ连接
func (r *RabbitMQ) mqClose() {// 先关闭管道,再关闭链接err := r.Channel.Close()if err != nil {fmt.Printf("MQ管道关闭失败:%s \n", err)}err = r.Connection.Close()if err != nil {fmt.Printf("MQ链接关闭失败:%s \n", err)}
}// 启动RabbitMQ消费者
func (r *RabbitMQ) StartR(receiver Receiver) error {// 处理结束关闭链接defer r.mqClose()// 验证链接是否正常if r.Channel == nil {if err := r.mqConnect(); err != nil {return err}}// 获取消费通道,确保rabbitMQ一个一个发送消息_ = r.Channel.Qos(1, 0, true)msgList, err := r.Channel.Consume(r.QueueName, // queue"",          // consumerfalse,       // auto-ackfalse,       // exclusivefalse,       // no-localfalse,       // no-waitnil,         // args)if err != nil {fmt.Printf("获取消费通道异常:%s \n", err)return err}for msg := range msgList {// 处理数据err = receiver.Consumer(msg.Body)if err != nil {fmt.Printf("确认消息未完成异常:%s \n", err)return err} else {// 确认消息,必须为falseerr = msg.Ack(false)if err != nil {fmt.Printf("确认消息完成异常:%s \n", err)}return nil}}return nil
}

test_p.go

package mainimport ("fmt""golong/rabbit_p"
)// 实现发送者
type TestP struct {msgContent []byte
}func (t *TestP) MsgContent() []byte {fmt.Println(string(t.msgContent))return t.msgContent
}func main() {//生产者a := "fxm"p := &TestP{[]byte(a),}mqp := &rabbit_p.RabbitMQ{QueueName: "fxm",}err := mqp.StartP(p)if err != nil {fmt.Println("添加异常 !!!")}
}

test_c.go

package mainimport ("fmt""golong/rabbit_c"
)// 实现接收者
type TestC struct {msgContent string
}func (t *TestC) Consumer(dataByte []byte) error {fmt.Println(string(dataByte))return nil
}func main() {//消费者r := &TestC{}mqr := &rabbit_c.RabbitMQ{QueueName: "fxm",}_ = mqr.StartR(r)
}

发布/订阅

rabbit_p.go

package rabbit_pimport ("errors""fmt""github.com/streadway/amqp"
)const MqUrl = "amqp://fxm(username):fxm(password)@IP:5672/"// 定义全局变量,指针类型
var mqConn *amqp.Connection
var mqChan *amqp.Channel// 定义生产者接口
type Producer interface {MsgContent() []byte
}// 定义RabbitMQ对象
type RabbitMQ struct {Connection   *amqp.ConnectionChannel      *amqp.ChannelQueueName    string // 队列名称RoutingKey   string // key名称ExchangeName string // 交换机名称ExchangeType string // 交换机类型
}// 链接rabbitMQ
func (r *RabbitMQ) mqConnect() error {var err errormqConn, err = amqp.Dial(MqUrl)r.Connection = mqConn // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开链接失败:%s \n", err)return err}mqChan, err = mqConn.Channel()r.Channel = mqChan // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开管道失败:%s \n", err)return err}return nil
}// 关闭RabbitMQ连接
func (r *RabbitMQ) mqClose() {// 先关闭管道,再关闭链接err := r.Channel.Close()if err != nil {fmt.Printf("MQ管道关闭失败:%s \n", err)}err = r.Connection.Close()if err != nil {fmt.Printf("MQ链接关闭失败:%s \n", err)}
}// 启动RabbitMQ生产者
func (r *RabbitMQ) StartP(producer Producer) error {// 开启监听生产者发送任务if err := r.listenProducer(producer); err != nil {return err}return nil
}// 发送任务
func (r *RabbitMQ) listenProducer(producer Producer) error {// 处理结束关闭链接defer r.mqClose()// 验证链接是否正常,否则重新链接if r.Channel == nil {if err := r.mqConnect(); err != nil {return err}}//发送消息到一个具名交换机err := r.Channel.ExchangeDeclare(r.ExchangeName, // namer.ExchangeType, // typetrue,           // durablefalse,          // auto-deletedfalse,          // internalfalse,          // no-waitnil,            // arguments)if err != nil {fmt.Printf("error: %s \n", err.Error())return err}//confirms := r.Channel.NotifyPublish(make(chan amqp.Confirmation, 1))err = r.Channel.Publish(r.ExchangeName, // exchange"",             // routing keyfalse,          // mandatoryfalse,          // immediateamqp.Publishing{ContentType: "text/plain",Body:        producer.MsgContent(),})if err != nil {fmt.Printf("MQ任务发送失败:%s \n", err)return errors.New("MQ任务发送失败")}return err
}

rabbit_c.go

package rabbit_cimport ("fmt""github.com/streadway/amqp"
)const MqUrl = "amqp://fxm(username):fxm(password)@IP:5672/"// 定义全局变量,指针类型
var mqConn *amqp.Connection
var mqChan *amqp.Channel// 定义接收者接口
type Receiver interface {Consumer([]byte) error
}// 定义RabbitMQ对象
type RabbitMQ struct {Connection   *amqp.ConnectionChannel      *amqp.ChannelQueueName    string // 队列名称RoutingKey   string // key名称ExchangeName string // 交换机名称ExchangeType string // 交换机类型
}// 链接rabbitMQ
func (r *RabbitMQ) mqConnect() error {var err errormqConn, err = amqp.Dial(MqUrl)r.Connection = mqConn // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开链接失败:%s \n", err)return err}mqChan, err = mqConn.Channel()r.Channel = mqChan // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开管道失败:%s \n", err)return err}return nil
}// 关闭RabbitMQ连接
func (r *RabbitMQ) mqClose() {// 先关闭管道,再关闭链接err := r.Channel.Close()if err != nil {fmt.Printf("MQ管道关闭失败:%s \n", err)}err = r.Connection.Close()if err != nil {fmt.Printf("MQ链接关闭失败:%s \n", err)}
}// 启动RabbitMQ消费者
func (r *RabbitMQ) StartR(receiver Receiver) error {// 处理结束关闭链接defer r.mqClose()// 验证链接是否正常if r.Channel == nil {if err := r.mqConnect(); err != nil {return err}}//发送消息到一个具名交换机err := r.Channel.ExchangeDeclare(r.ExchangeName, // namer.ExchangeType, // typetrue,           // durablefalse,          // auto-deletedfalse,          // internalfalse,          // no-waitnil,            // arguments)if err != nil {fmt.Printf("error: %s \n", err.Error())return err}//临时队列 让服务器选择一个随机的队列名,当与消费者断开连接时,这个队列被立即删除q, err := r.Channel.QueueDeclare("",    // namefalse, // durable:持久false, // delete when usused:至少有一个使用方的队列在最后一个使用方退订时被删除true,  // exclusive:仅由一个连接使用,并且该连接关闭时队列将被删除false, // no-waitnil,   // arguments)if err != nil {fmt.Printf("error: %s \n", err.Error())return err}r.QueueName = q.Nameerr = r.Channel.QueueBind(r.QueueName,    // queue name"",             // routing keyr.ExchangeName, // exchangefalse,nil,)// 获取消费通道,确保rabbitMQ一个一个发送消息_ = r.Channel.Qos(1, 0, true)msgList, err := r.Channel.Consume(r.QueueName, // queue"",          // consumerfalse,       // auto-ackfalse,       // exclusivefalse,       // no-localfalse,       // no-waitnil,         // args)if err != nil {fmt.Printf("获取消费通道异常:%s \n", err)return err}for msg := range msgList {// 处理数据err = receiver.Consumer(msg.Body)if err != nil {fmt.Printf("确认消息未完成异常:%s \n", err)return err} else {// 确认消息,必须为falseerr = msg.Ack(false)if err != nil {fmt.Printf("确认消息完成异常:%s \n", err)}return nil}}return nil
}

test_p.go

package mainimport ("fmt""golong/rabbit_p"
)// 实现发送者
type TestP struct {msgContent []byte
}func (t *TestP) MsgContent() []byte {fmt.Println(string(t.msgContent))return t.msgContent
}func main() {//生产者a := "fxm"p := &TestP{[]byte(a),}mqp := &rabbit_p.RabbitMQ{ExchangeName: "log",ExchangeType: "fanout",}err := mqp.StartP(p)if err != nil {fmt.Println("添加异常 !!!")}
}

test_c.go

package mainimport ("fmt""golong/rabbit_c"
)// 实现接收者
type TestC struct {msgContent string
}func (t *TestC) Consumer(dataByte []byte) error {fmt.Println(string(dataByte))return nil
}func main() {//消费者r := &TestC{}mqr := &rabbit_c.RabbitMQ{ExchangeName: "log",ExchangeType: "fanout",}_ = mqr.StartR(r)
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/439947.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Apollo进阶课程 ⑦ | 高精地图的采集与生产

目录 1.高精地图采集过程中需要用到的传感器 1.1&#xff09;GPS 1.2&#xff09;IMU 1.3&#xff09;轮速计 2.高精地图采集过程中的制图方案 2.1&#xff09;方案一 激光雷达 2.2&#xff09;Camera融合激光雷达 原文链接&#xff1a;Apollo进阶课程 ⑦ | 高精地图的采…

你看不懂的spring原理是因为不知道这几个概念

背景 问题从一杯咖啡开始。 今天我去楼下咖啡机买了一杯「粉黛拿铁」。制作过程中显示&#xff1a; 我取了做好的粉黛拿铁&#xff0c;喝了一口&#xff0c;果然就是一杯热巧克力。咦咦咦&#xff0c;说好的拿铁呢&#xff1f;虽然我对「零点吧」的咖啡评价很高&#xff0c;觉…

EasyOcr 安装(linux、docker)、使用(gin、python)

EasyOcr git地址 EasyOCR是一款用python语言编写的OCR第三方库&#xff0c;同时支持GPU和CPU&#xff0c;目前已经支持超过70种语言. 安装(CPU) 注意&#xff1a; 本文是在仅在cpu下使用。如要使用CUDA版本&#xff0c;请在pytorch网站上选择正确的&#xff0c;并关闭此文章。…

Python之Numpy入门实战教程(2):进阶篇之线性代数

Numpy、Pandas、Matplotlib是Python的三个重要科学计算库&#xff0c;今天整理了Numpy的入门实战教程。NumPy是使用Python进行科学计算的基础库。 NumPy以强大的N维数组对象为中心&#xff0c;它还包含有用的线性代数&#xff0c;傅里叶变换和随机数函数。 本文主要介绍Numpy库…

【牛客 - 369F】小D的剑阵(最小割建图,二元关系建图,网络流最小割)

题干&#xff1a; 链接&#xff1a;https://ac.nowcoder.com/acm/contest/369/F 来源&#xff1a;牛客网 题目描述 现在你有 n 把灵剑&#xff0c;其中选择第i把灵剑会得到的 wiw_iwi​ 攻击力。 于此同时&#xff0c;还有q个约束&#xff0c;每个约束形如&#xff1a; …

【HDU - 3870】Catch the Theves(平面图转对偶图最短路,网络流最小割)

题干&#xff1a; A group of thieves is approaching a museum in the country of zjsxzy,now they are in city A,and the museum is in city B,where keeps many broken legs of zjsxzy.Luckily,GW learned the conspiracy when he is watching stars and told it to zjsxz…

Apollo进阶课程 ⑧ | 高精地图的格式规范

目录 高精地图规范格式分类 NDS格式规范 Open DRIVE格式规范 原文链接&#xff1a;Apollo进阶课程 ⑧ | 高精地图的格式规范 上周阿波君为大家详细介绍了「Apollo进阶课程⑦高精地图的采集与生产」。 高精地图采集过程中需要用到的传感器有GPS、IMU和轮速计。 无论是哪种传感…

Apollo进阶课程 ⑨ | 业界的高精地图产品

目录 高精地图的格式规范-OpenDRIVE HERE HD LIve Map HERE HD LIVE MAP-MAP COLLECTION HERE HD Live Map-Crowdsourced Update HERE HD Live Map-Learning HERE HD Live Map-Product MobileEye MobileEye-Pillars of Autonomous Driving MobileEye-Map as back-up s…

Apollo进阶课程⑩ | Apollo地图采集方案

目录 TomTom的高精地图和RoadDNA APOLLO地图采集流程 基站搭建 Apollo地图采集硬件方案 地图数据服务平台 原文链接&#xff1a;进阶课程⑩ | Apollo地图采集方案 上周阿波君为大家详细介绍了「Apollo进阶课程⑨业界的高精地图产品」。 出现在课程中的业界制作高精地图的厂…

用Python写Shell

环境 ubuntu: 18.04python: 3.6.9xnosh: 0.11.0 下载 pip3 install xonsh 简单使用 # 开启xonsh xonsh # 下载小工具&#xff08;也可不下&#xff09;:高亮提示、智能补全 xpip install -U xonsh[full]# 随便下载一个包 pip3 install moneyimport money m1 money.Money(…

Apollo进阶课程⑪ | Apollo地图生产技术

目录 高精地图生产流程 数据采集 数据处理 元素识别 人工验证 全自动数据融合加工 基于深度学习的地图要素识别 人工验证生产 地图成果 原文链接&#xff1a;进阶课程⑪ | Apollo地图生产技术 高精地图是自动驾驶汽车的「千里眼」和「透视镜」。 摄像头、激光雷达、传…

Jenkins初识

Jenkins是啥 官方文档 Jenkins是一款开源 CI&CD 软件&#xff0c;用于自动化各种任务&#xff0c;包括构建、测试和部署软件。 Jenkins 支持各种运行方式&#xff0c;可通过系统包、Docker 或者通过一个独立的 Java 程序。CI(Continuous integration&#xff0c;持续集成…

Apollo进阶课程 ⑫ | Apollo高精地图

目录 Apollo高精地图表征元素 Apollo车道模型 UTM坐标系 84坐标系 Track坐标系 Apollo opDRIVE规范 HDMAP引擎 高精地图在政策方面的挑战 原文链接&#xff1a;进阶课程 ⑫ | Apollo高精地图 高精地图与普通地图不同&#xff0c;高精地图主要服务于自动驾驶车辆&#…

一步步编写操作系统 6 启动bochs

运行bochs 终于安装完成了&#xff0c;虽然这过程中有可能会出现各种各样的问题&#xff0c;但还是值得庆祝的&#xff0c;对Linux不熟的朋友第一次就搞定了这么个硬货&#xff0c;我理解您此时的喜大普奔之情&#xff0c;哈哈&#xff0c;给大家点赞。顺便说一句&#xff0c;…

Apollo技能图谱2.0焕新发布 更新7大能力91个知识点

阿波君 Apollo开发者社区 2月26日 过去的一年里&#xff0c;Apollo发展迅速&#xff0c;向智能交通不断渗透。从2.5到3.5版本&#xff0c;无论控制系统的升级、高清地图的泛用和车路协同技术服务的推进&#xff0c;无不在推动自动驾驶技术从开源向开辟商业化新格局位移。 在开…

一步步编写操作系统 07 开机启动bios

bios是如何苏醒的 bios其实一直睡在某个地方&#xff0c;直到被唤醒……前面热火朝天的说了bios的功能和内存布局&#xff0c;似乎还没说到正题上&#xff0c;bios是如何启动的呢。因为bios是计算机上第一个运行的软件&#xff0c;所以它不可能自己加载自己&#xff0c;由此可…

0.《沉浸式线性代数》:前言

今天介绍一本新书《immersive linear algebra》&#xff1a;世界上第一本具有完全交互式图形的线性代数书。本书目前已经更新完毕。 作者是&#xff1a;JacobStrm&#xff0c;Kallestrm和Tomas Akenine-Mller&#xff0c;全文共包含11个部分&#xff1a;前言和10个正文章节。内…

Apollo进阶课程 ⑬ | Apollo无人车自定位技术入门

目录 1.什么是无人车自定位系统 2.为什么无人车需要精确的定位系统 2.1 激光定位 2.2 视觉定位 2.3 惯性导航 2.4 多传感器融合定位 原文链接&#xff1a;进阶课程 ⑬ | Apollo无人车自定位技术入门 上周阿波君为大家详细介绍了「Apollo进阶课程⑫丨Apollo地图生产技术」…

一步步编写操作系统 08 bios跳转到神奇的内存地址0x7c00

为什么是0x7c00 计算机执行到这份上&#xff0c;bios也即将完成自己的历史使命了&#xff0c;完成之后&#xff0c;它又将睡去。想到这里&#xff0c;心中不免一丝忧伤&#xff0c;甚至有些许挽留它的想法。可是&#xff0c;这就是它的命&#xff0c;它生来被设计成这样&…

Apollo进阶课程⑭ | Apollo自动定位技术——三维几何变换和坐标系介绍

目录 1.三维几何变换---旋转 2.三维几何变换----平移 2.1刚体的位置和朝向 3. 坐标系 3.1 ECI地心惯性坐标系 3.2 ECFF地心地固坐标系 3.3当地水平坐标系 3.4 UTM坐标系 3.5 车体坐标系 3.6IMU坐标系 3.7 相机坐标系 3.8 激光雷达坐标系 3.9 无人车定位信息中涉及…