RabbitMQ初识

官方介绍 - 中文 本文环境:ubuntu:20.04

RabbitMQ安装、配置与基本使用

安装RabbitMQ

# 简易脚本安装
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.deb.sh | sudo bash
sudo apt-get install rabbitmq-server -y --fix-missing
# 启动、关闭、重启、查看 rabbitmq 服务
sudo service rabbitmq-server start、stop、restart、status

配置RabbitMQ

# 注意,启动rabbitmq之后要启动管理服务插件,否则15672管理页面无法登录
sudo rabbitmq-plugins enable rabbitmq_management
# 因为guest用户默认只能在localhost登录,所以我们需要创建一个新的用户:
[fxm@fxm:~$ ] sudo rabbitmqctl add_user fxm(username) fxm(password) 
Adding user "fxm" ...
[fxm@fxm:~$ ] sudo rabbitmqctl set_permissions -p / fxm(username) ".*" ".*" ".*"
Setting permissions for user "fxm" in vhost "/" ...
[fxm@fxm:~$ ] sudo rabbitmqctl set_user_tags fxm(username) administrator
Setting tags for user "fxm" to [administrator] ...

使用RabbitMQ

  • 通过web访问: ip:15672
    在这里插入图片描述

六种工作模式

“Hello World!”(普通模式)

只有一个生产者,一个消费者,一个队列

工作队列

只有一个生产者,多个消费者,一个队列

发布/订阅

通过交换机将消息发送到多个队列,多个消费者订阅相应队列

路由

待续。。。

主题交换机

待续。。。

远程过程调用

待续。。。

Go实现(Go 1.15)

本文实现了两种模式-普通模式、发布/订阅
普通模式:1. 连接失败重新连接2. 发送成功确认3. 消费成功确认
发布/订阅:1. 连接失败重新连接2. 消费成功确认

普通模式(需要先创建Queue)

创建Queue
在这里插入图片描述

rabbit_p.go

package rabbit_pimport ("errors""fmt""github.com/streadway/amqp"
)const MqUrl = "amqp://fxm(username):fxm(password)@IP:5672/"// 定义全局变量,指针类型
var mqConn *amqp.Connection
var mqChan *amqp.Channel// 定义生产者接口
type Producer interface {MsgContent() []byte
}// 定义RabbitMQ对象
type RabbitMQ struct {Connection   *amqp.ConnectionChannel      *amqp.ChannelQueueName    string // 队列名称RoutingKey   string // key名称ExchangeName string // 交换机名称ExchangeType string // 交换机类型
}// 链接rabbitMQ
func (r *RabbitMQ) mqConnect() error {var err errormqConn, err = amqp.Dial(MqUrl)r.Connection = mqConn // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开链接失败:%s \n", err)return err}mqChan, err = mqConn.Channel()r.Channel = mqChan // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开管道失败:%s \n", err)return err}return nil
}// 关闭RabbitMQ连接
func (r *RabbitMQ) mqClose() {// 先关闭管道,再关闭链接err := r.Channel.Close()if err != nil {fmt.Printf("MQ管道关闭失败:%s \n", err)}err = r.Connection.Close()if err != nil {fmt.Printf("MQ链接关闭失败:%s \n", err)}
}// 启动RabbitMQ生产者
func (r *RabbitMQ) StartP(producer Producer) error {// 开启监听生产者发送任务if err := r.listenProducer(producer); err != nil {return err}return nil
}// 发送任务
func (r *RabbitMQ) listenProducer(producer Producer) error {// 处理结束关闭链接defer r.mqClose()// 验证链接是否正常,否则重新链接if r.Channel == nil {if err := r.mqConnect(); err != nil {return err}}err := r.Channel.Confirm(false)if err != nil {fmt.Printf("error: %s \n", err.Error())return err}confirms := r.Channel.NotifyPublish(make(chan amqp.Confirmation, 1))err = r.Channel.Publish(r.ExchangeName, // exchanger.QueueName,    // routing keyfalse,          // mandatoryfalse,          // immediateamqp.Publishing{ContentType:  "text/plain",Body:         producer.MsgContent(),})if err != nil {fmt.Printf("MQ任务发送失败:%s \n", err)return errors.New("MQ任务发送失败")}err = confirmOne(confirms)return err
}//检测是否发送成功
func confirmOne(confirms <-chan amqp.Confirmation) error {if confirmed := <-confirms; confirmed.Ack {return nil} else {return errors.New("任务发送失败")}
}

rabbit_c.go

package rabbit_cimport ("fmt""github.com/streadway/amqp"
)const MqUrl = "amqp://fxm(username):fxm(password)@IP:5672/"// 定义全局变量,指针类型
var mqConn *amqp.Connection
var mqChan *amqp.Channel// 定义接收者接口
type Receiver interface {Consumer([]byte) error
}// 定义RabbitMQ对象
type RabbitMQ struct {Connection   *amqp.ConnectionChannel      *amqp.ChannelQueueName    string // 队列名称RoutingKey   string // key名称ExchangeName string // 交换机名称ExchangeType string // 交换机类型
}// 链接rabbitMQ
func (r *RabbitMQ) mqConnect() error {var err errormqConn, err = amqp.Dial(MqUrl)r.Connection = mqConn // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开链接失败:%s \n", err)return err}mqChan, err = mqConn.Channel()r.Channel = mqChan // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开管道失败:%s \n", err)return err}return nil
}// 关闭RabbitMQ连接
func (r *RabbitMQ) mqClose() {// 先关闭管道,再关闭链接err := r.Channel.Close()if err != nil {fmt.Printf("MQ管道关闭失败:%s \n", err)}err = r.Connection.Close()if err != nil {fmt.Printf("MQ链接关闭失败:%s \n", err)}
}// 启动RabbitMQ消费者
func (r *RabbitMQ) StartR(receiver Receiver) error {// 处理结束关闭链接defer r.mqClose()// 验证链接是否正常if r.Channel == nil {if err := r.mqConnect(); err != nil {return err}}// 获取消费通道,确保rabbitMQ一个一个发送消息_ = r.Channel.Qos(1, 0, true)msgList, err := r.Channel.Consume(r.QueueName, // queue"",          // consumerfalse,       // auto-ackfalse,       // exclusivefalse,       // no-localfalse,       // no-waitnil,         // args)if err != nil {fmt.Printf("获取消费通道异常:%s \n", err)return err}for msg := range msgList {// 处理数据err = receiver.Consumer(msg.Body)if err != nil {fmt.Printf("确认消息未完成异常:%s \n", err)return err} else {// 确认消息,必须为falseerr = msg.Ack(false)if err != nil {fmt.Printf("确认消息完成异常:%s \n", err)}return nil}}return nil
}

test_p.go

package mainimport ("fmt""golong/rabbit_p"
)// 实现发送者
type TestP struct {msgContent []byte
}func (t *TestP) MsgContent() []byte {fmt.Println(string(t.msgContent))return t.msgContent
}func main() {//生产者a := "fxm"p := &TestP{[]byte(a),}mqp := &rabbit_p.RabbitMQ{QueueName: "fxm",}err := mqp.StartP(p)if err != nil {fmt.Println("添加异常 !!!")}
}

test_c.go

package mainimport ("fmt""golong/rabbit_c"
)// 实现接收者
type TestC struct {msgContent string
}func (t *TestC) Consumer(dataByte []byte) error {fmt.Println(string(dataByte))return nil
}func main() {//消费者r := &TestC{}mqr := &rabbit_c.RabbitMQ{QueueName: "fxm",}_ = mqr.StartR(r)
}

发布/订阅

rabbit_p.go

package rabbit_pimport ("errors""fmt""github.com/streadway/amqp"
)const MqUrl = "amqp://fxm(username):fxm(password)@IP:5672/"// 定义全局变量,指针类型
var mqConn *amqp.Connection
var mqChan *amqp.Channel// 定义生产者接口
type Producer interface {MsgContent() []byte
}// 定义RabbitMQ对象
type RabbitMQ struct {Connection   *amqp.ConnectionChannel      *amqp.ChannelQueueName    string // 队列名称RoutingKey   string // key名称ExchangeName string // 交换机名称ExchangeType string // 交换机类型
}// 链接rabbitMQ
func (r *RabbitMQ) mqConnect() error {var err errormqConn, err = amqp.Dial(MqUrl)r.Connection = mqConn // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开链接失败:%s \n", err)return err}mqChan, err = mqConn.Channel()r.Channel = mqChan // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开管道失败:%s \n", err)return err}return nil
}// 关闭RabbitMQ连接
func (r *RabbitMQ) mqClose() {// 先关闭管道,再关闭链接err := r.Channel.Close()if err != nil {fmt.Printf("MQ管道关闭失败:%s \n", err)}err = r.Connection.Close()if err != nil {fmt.Printf("MQ链接关闭失败:%s \n", err)}
}// 启动RabbitMQ生产者
func (r *RabbitMQ) StartP(producer Producer) error {// 开启监听生产者发送任务if err := r.listenProducer(producer); err != nil {return err}return nil
}// 发送任务
func (r *RabbitMQ) listenProducer(producer Producer) error {// 处理结束关闭链接defer r.mqClose()// 验证链接是否正常,否则重新链接if r.Channel == nil {if err := r.mqConnect(); err != nil {return err}}//发送消息到一个具名交换机err := r.Channel.ExchangeDeclare(r.ExchangeName, // namer.ExchangeType, // typetrue,           // durablefalse,          // auto-deletedfalse,          // internalfalse,          // no-waitnil,            // arguments)if err != nil {fmt.Printf("error: %s \n", err.Error())return err}//confirms := r.Channel.NotifyPublish(make(chan amqp.Confirmation, 1))err = r.Channel.Publish(r.ExchangeName, // exchange"",             // routing keyfalse,          // mandatoryfalse,          // immediateamqp.Publishing{ContentType: "text/plain",Body:        producer.MsgContent(),})if err != nil {fmt.Printf("MQ任务发送失败:%s \n", err)return errors.New("MQ任务发送失败")}return err
}

rabbit_c.go

package rabbit_cimport ("fmt""github.com/streadway/amqp"
)const MqUrl = "amqp://fxm(username):fxm(password)@IP:5672/"// 定义全局变量,指针类型
var mqConn *amqp.Connection
var mqChan *amqp.Channel// 定义接收者接口
type Receiver interface {Consumer([]byte) error
}// 定义RabbitMQ对象
type RabbitMQ struct {Connection   *amqp.ConnectionChannel      *amqp.ChannelQueueName    string // 队列名称RoutingKey   string // key名称ExchangeName string // 交换机名称ExchangeType string // 交换机类型
}// 链接rabbitMQ
func (r *RabbitMQ) mqConnect() error {var err errormqConn, err = amqp.Dial(MqUrl)r.Connection = mqConn // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开链接失败:%s \n", err)return err}mqChan, err = mqConn.Channel()r.Channel = mqChan // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开管道失败:%s \n", err)return err}return nil
}// 关闭RabbitMQ连接
func (r *RabbitMQ) mqClose() {// 先关闭管道,再关闭链接err := r.Channel.Close()if err != nil {fmt.Printf("MQ管道关闭失败:%s \n", err)}err = r.Connection.Close()if err != nil {fmt.Printf("MQ链接关闭失败:%s \n", err)}
}// 启动RabbitMQ消费者
func (r *RabbitMQ) StartR(receiver Receiver) error {// 处理结束关闭链接defer r.mqClose()// 验证链接是否正常if r.Channel == nil {if err := r.mqConnect(); err != nil {return err}}//发送消息到一个具名交换机err := r.Channel.ExchangeDeclare(r.ExchangeName, // namer.ExchangeType, // typetrue,           // durablefalse,          // auto-deletedfalse,          // internalfalse,          // no-waitnil,            // arguments)if err != nil {fmt.Printf("error: %s \n", err.Error())return err}//临时队列 让服务器选择一个随机的队列名,当与消费者断开连接时,这个队列被立即删除q, err := r.Channel.QueueDeclare("",    // namefalse, // durable:持久false, // delete when usused:至少有一个使用方的队列在最后一个使用方退订时被删除true,  // exclusive:仅由一个连接使用,并且该连接关闭时队列将被删除false, // no-waitnil,   // arguments)if err != nil {fmt.Printf("error: %s \n", err.Error())return err}r.QueueName = q.Nameerr = r.Channel.QueueBind(r.QueueName,    // queue name"",             // routing keyr.ExchangeName, // exchangefalse,nil,)// 获取消费通道,确保rabbitMQ一个一个发送消息_ = r.Channel.Qos(1, 0, true)msgList, err := r.Channel.Consume(r.QueueName, // queue"",          // consumerfalse,       // auto-ackfalse,       // exclusivefalse,       // no-localfalse,       // no-waitnil,         // args)if err != nil {fmt.Printf("获取消费通道异常:%s \n", err)return err}for msg := range msgList {// 处理数据err = receiver.Consumer(msg.Body)if err != nil {fmt.Printf("确认消息未完成异常:%s \n", err)return err} else {// 确认消息,必须为falseerr = msg.Ack(false)if err != nil {fmt.Printf("确认消息完成异常:%s \n", err)}return nil}}return nil
}

test_p.go

package mainimport ("fmt""golong/rabbit_p"
)// 实现发送者
type TestP struct {msgContent []byte
}func (t *TestP) MsgContent() []byte {fmt.Println(string(t.msgContent))return t.msgContent
}func main() {//生产者a := "fxm"p := &TestP{[]byte(a),}mqp := &rabbit_p.RabbitMQ{ExchangeName: "log",ExchangeType: "fanout",}err := mqp.StartP(p)if err != nil {fmt.Println("添加异常 !!!")}
}

test_c.go

package mainimport ("fmt""golong/rabbit_c"
)// 实现接收者
type TestC struct {msgContent string
}func (t *TestC) Consumer(dataByte []byte) error {fmt.Println(string(dataByte))return nil
}func main() {//消费者r := &TestC{}mqr := &rabbit_c.RabbitMQ{ExchangeName: "log",ExchangeType: "fanout",}_ = mqr.StartR(r)
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/439947.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Apollo进阶课程 ⑦ | 高精地图的采集与生产

目录 1.高精地图采集过程中需要用到的传感器 1.1&#xff09;GPS 1.2&#xff09;IMU 1.3&#xff09;轮速计 2.高精地图采集过程中的制图方案 2.1&#xff09;方案一 激光雷达 2.2&#xff09;Camera融合激光雷达 原文链接&#xff1a;Apollo进阶课程 ⑦ | 高精地图的采…

【BZOJ 3831】【Poi2014】Little Bird(单调队列优化dp)

题干&#xff1a; Description In the Byteotian Line Forest there are trees in a row. On top of the first one, there is a little bird who would like to fly over to the top of the last tree. Being in fact very little, the bird might lack the strength to f…

你看不懂的spring原理是因为不知道这几个概念

背景 问题从一杯咖啡开始。 今天我去楼下咖啡机买了一杯「粉黛拿铁」。制作过程中显示&#xff1a; 我取了做好的粉黛拿铁&#xff0c;喝了一口&#xff0c;果然就是一杯热巧克力。咦咦咦&#xff0c;说好的拿铁呢&#xff1f;虽然我对「零点吧」的咖啡评价很高&#xff0c;觉…

EasyOcr 安装(linux、docker)、使用(gin、python)

EasyOcr git地址 EasyOCR是一款用python语言编写的OCR第三方库&#xff0c;同时支持GPU和CPU&#xff0c;目前已经支持超过70种语言. 安装(CPU) 注意&#xff1a; 本文是在仅在cpu下使用。如要使用CUDA版本&#xff0c;请在pytorch网站上选择正确的&#xff0c;并关闭此文章。…

Python之Numpy入门实战教程(2):进阶篇之线性代数

Numpy、Pandas、Matplotlib是Python的三个重要科学计算库&#xff0c;今天整理了Numpy的入门实战教程。NumPy是使用Python进行科学计算的基础库。 NumPy以强大的N维数组对象为中心&#xff0c;它还包含有用的线性代数&#xff0c;傅里叶变换和随机数函数。 本文主要介绍Numpy库…

【牛客 - 369F】小D的剑阵(最小割建图,二元关系建图,网络流最小割)

题干&#xff1a; 链接&#xff1a;https://ac.nowcoder.com/acm/contest/369/F 来源&#xff1a;牛客网 题目描述 现在你有 n 把灵剑&#xff0c;其中选择第i把灵剑会得到的 wiw_iwi​ 攻击力。 于此同时&#xff0c;还有q个约束&#xff0c;每个约束形如&#xff1a; …

一步步编写操作系统 1 部署工作环境 1

1.1工欲善其事&#xff0c;必先利其器。 如果您觉得操作系统已属于很底层的东西&#xff0c;我双手赞成。但是如果您像我之前一样&#xff0c;觉得底层的东西无法用上层高级的东西来构建&#xff0c;现在可以睁大眼睛好好看看下面要介绍的东西了。 首先&#xff0c;操作系统是…

多用户操作git“远程仓库“(本地)

设置本地远程仓库 准备远程仓库文件 cd ~/git-repo.git初始化 git init --shared修改git的接收配置 git config receive.denyCurrentBranch ignore初始化git仓库 git config user.email "fxmfxm.com" git config user.name "fxm" git add . git commit -m …

10点43博客文章汇总(2018年度)

今天是春节后上班第一天&#xff0c;将2018年度的文章进行汇总。总共分为三类&#xff1a;翻译、转载、原创。 1.翻译 翻译类目前完结的有Kaggle上的文章和斯坦福CS231n的文章。 Kaggle Learn的Python课程的中文翻译&#xff0c;链接为&#xff1a;Python&#xff1b;Kaggle …

【HDU - 3870】Catch the Theves(平面图转对偶图最短路,网络流最小割)

题干&#xff1a; A group of thieves is approaching a museum in the country of zjsxzy,now they are in city A,and the museum is in city B,where keeps many broken legs of zjsxzy.Luckily,GW learned the conspiracy when he is watching stars and told it to zjsxz…

一步步编写操作系统 2 部署工作环境 2

1.22汇编语言编译器新贵&#xff0c;NASM "新"是相对于旧来说的&#xff0c;老的汇编器MASM和TASM已经过时了&#xff0c;从名称上可以看出字母n是在m之后&#xff0c;其功能必然有所超越才会被大家接受。 请用一句话概括NASM优势在哪里&#xff1f;免费语法简洁使…

Apollo进阶课程 ⑧ | 高精地图的格式规范

目录 高精地图规范格式分类 NDS格式规范 Open DRIVE格式规范 原文链接&#xff1a;Apollo进阶课程 ⑧ | 高精地图的格式规范 上周阿波君为大家详细介绍了「Apollo进阶课程⑦高精地图的采集与生产」。 高精地图采集过程中需要用到的传感器有GPS、IMU和轮速计。 无论是哪种传感…

Casbin初识

Casbin中文文档 环境 go:1.15casbin:v2mysql:5.7 代码 package mycasbinimport ("fmt""github.com/casbin/casbin/v2""github.com/casbin/casbin/v2/model"gormAdapter "github.com/casbin/gorm-adapter/v3""gorm.io/driver/…

Apollo进阶课程 ⑨ | 业界的高精地图产品

目录 高精地图的格式规范-OpenDRIVE HERE HD LIve Map HERE HD LIVE MAP-MAP COLLECTION HERE HD Live Map-Crowdsourced Update HERE HD Live Map-Learning HERE HD Live Map-Product MobileEye MobileEye-Pillars of Autonomous Driving MobileEye-Map as back-up s…

【 HDU - 3062】Party(2-sat)

题干&#xff1a; 有n对夫妻被邀请参加一个聚会&#xff0c;因为场地的问题&#xff0c;每对夫妻中只有1人可以列席。在2n 个人中&#xff0c;某些人之间有着很大的矛盾&#xff08;当然夫妻之间是没有矛盾的&#xff09;&#xff0c;有矛盾的2个人是不会同时出现在聚会上的。…

微博API接入初识【cxn专用】

微博API官方文档 本文介绍 本文环境成为微博开发者通过鉴权获取单条微博内容 环境 WindowsPython 3.8.10sinaweibopy3-1.3 &#xff08;pip3 install sinaweibopy3&#xff09;requests 成为微博开发者 微博官方新手教程 &#xff08;cxn可以跳过&#xff0c;用博主的即可…

一步步编写操作系统3 部署工作环境 3

盗梦空间般的开发环境&#xff0c;虚拟机中再装个虚拟机。 很多同学电脑的系统都是windows&#xff0c;个别的是mac os,还有的同学用的是linux。做为一名Linux粉丝&#xff0c;我的开发环境必然建立在Linux平台下。那对于其它系统的用户&#xff0c;你们可以自己部署相应平台的…

Apollo进阶课程⑩ | Apollo地图采集方案

目录 TomTom的高精地图和RoadDNA APOLLO地图采集流程 基站搭建 Apollo地图采集硬件方案 地图数据服务平台 原文链接&#xff1a;进阶课程⑩ | Apollo地图采集方案 上周阿波君为大家详细介绍了「Apollo进阶课程⑨业界的高精地图产品」。 出现在课程中的业界制作高精地图的厂…

【HDU - 2665】Kth number(区间第K大,主席树,模板)

题干&#xff1a; Give you a sequence and ask you the kth big number of a inteval. Input The first line is the number of the test cases. For each test case, the first line contain two integer n and m (n, m < 100000), indicates the number of integers …

一步步编写操作系统4 安装x86虚拟机 bochs

Bochs下载安装 在完成了linux发行版的安装后&#xff0c;现在到了安装bochs的环节&#xff0c;这是我们的操作系统最终的宿主机。 由于我的工作是运维&#xff0c;所以练就了任何软件包都要从源码安装的“陋习”&#xff0c;从来不信任任何软件包。因为只有从源码安装的版本才…