官方介绍 - 中文 本文环境:ubuntu:20.04
RabbitMQ安装、配置与基本使用
安装RabbitMQ
# 简易脚本安装
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.deb.sh | sudo bash
sudo apt-get install rabbitmq-server -y --fix-missing
# 启动、关闭、重启、查看 rabbitmq 服务
sudo service rabbitmq-server start、stop、restart、status
配置RabbitMQ
# 注意,启动rabbitmq之后要启动管理服务插件,否则15672管理页面无法登录
sudo rabbitmq-plugins enable rabbitmq_management
# 因为guest用户默认只能在localhost登录,所以我们需要创建一个新的用户:
[fxm@fxm:~$ ] sudo rabbitmqctl add_user fxm(username) fxm(password)
Adding user "fxm" ...
[fxm@fxm:~$ ] sudo rabbitmqctl set_permissions -p / fxm(username) ".*" ".*" ".*"
Setting permissions for user "fxm" in vhost "/" ...
[fxm@fxm:~$ ] sudo rabbitmqctl set_user_tags fxm(username) administrator
Setting tags for user "fxm" to [administrator] ...
使用RabbitMQ
- 通过web访问: ip:15672
六种工作模式
“Hello World!”(普通模式)
只有一个生产者,一个消费者,一个队列
工作队列
只有一个生产者,多个消费者,一个队列
发布/订阅
通过交换机将消息发送到多个队列,多个消费者订阅相应队列
路由
待续。。。
主题交换机
待续。。。
远程过程调用
待续。。。
Go实现(Go 1.15)
本文实现了两种模式-普通模式、发布/订阅
普通模式:1. 连接失败重新连接2. 发送成功确认3. 消费成功确认
发布/订阅:1. 连接失败重新连接2. 消费成功确认
普通模式(需要先创建Queue)
创建Queue
rabbit_p.go
package rabbit_pimport ("errors""fmt""github.com/streadway/amqp"
)const MqUrl = "amqp://fxm(username):fxm(password)@IP:5672/"// 定义全局变量,指针类型
var mqConn *amqp.Connection
var mqChan *amqp.Channel// 定义生产者接口
type Producer interface {MsgContent() []byte
}// 定义RabbitMQ对象
type RabbitMQ struct {Connection *amqp.ConnectionChannel *amqp.ChannelQueueName string // 队列名称RoutingKey string // key名称ExchangeName string // 交换机名称ExchangeType string // 交换机类型
}// 链接rabbitMQ
func (r *RabbitMQ) mqConnect() error {var err errormqConn, err = amqp.Dial(MqUrl)r.Connection = mqConn // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开链接失败:%s \n", err)return err}mqChan, err = mqConn.Channel()r.Channel = mqChan // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开管道失败:%s \n", err)return err}return nil
}// 关闭RabbitMQ连接
func (r *RabbitMQ) mqClose() {// 先关闭管道,再关闭链接err := r.Channel.Close()if err != nil {fmt.Printf("MQ管道关闭失败:%s \n", err)}err = r.Connection.Close()if err != nil {fmt.Printf("MQ链接关闭失败:%s \n", err)}
}// 启动RabbitMQ生产者
func (r *RabbitMQ) StartP(producer Producer) error {// 开启监听生产者发送任务if err := r.listenProducer(producer); err != nil {return err}return nil
}// 发送任务
func (r *RabbitMQ) listenProducer(producer Producer) error {// 处理结束关闭链接defer r.mqClose()// 验证链接是否正常,否则重新链接if r.Channel == nil {if err := r.mqConnect(); err != nil {return err}}err := r.Channel.Confirm(false)if err != nil {fmt.Printf("error: %s \n", err.Error())return err}confirms := r.Channel.NotifyPublish(make(chan amqp.Confirmation, 1))err = r.Channel.Publish(r.ExchangeName, // exchanger.QueueName, // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: producer.MsgContent(),})if err != nil {fmt.Printf("MQ任务发送失败:%s \n", err)return errors.New("MQ任务发送失败")}err = confirmOne(confirms)return err
}//检测是否发送成功
func confirmOne(confirms <-chan amqp.Confirmation) error {if confirmed := <-confirms; confirmed.Ack {return nil} else {return errors.New("任务发送失败")}
}
rabbit_c.go
package rabbit_cimport ("fmt""github.com/streadway/amqp"
)const MqUrl = "amqp://fxm(username):fxm(password)@IP:5672/"// 定义全局变量,指针类型
var mqConn *amqp.Connection
var mqChan *amqp.Channel// 定义接收者接口
type Receiver interface {Consumer([]byte) error
}// 定义RabbitMQ对象
type RabbitMQ struct {Connection *amqp.ConnectionChannel *amqp.ChannelQueueName string // 队列名称RoutingKey string // key名称ExchangeName string // 交换机名称ExchangeType string // 交换机类型
}// 链接rabbitMQ
func (r *RabbitMQ) mqConnect() error {var err errormqConn, err = amqp.Dial(MqUrl)r.Connection = mqConn // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开链接失败:%s \n", err)return err}mqChan, err = mqConn.Channel()r.Channel = mqChan // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开管道失败:%s \n", err)return err}return nil
}// 关闭RabbitMQ连接
func (r *RabbitMQ) mqClose() {// 先关闭管道,再关闭链接err := r.Channel.Close()if err != nil {fmt.Printf("MQ管道关闭失败:%s \n", err)}err = r.Connection.Close()if err != nil {fmt.Printf("MQ链接关闭失败:%s \n", err)}
}// 启动RabbitMQ消费者
func (r *RabbitMQ) StartR(receiver Receiver) error {// 处理结束关闭链接defer r.mqClose()// 验证链接是否正常if r.Channel == nil {if err := r.mqConnect(); err != nil {return err}}// 获取消费通道,确保rabbitMQ一个一个发送消息_ = r.Channel.Qos(1, 0, true)msgList, err := r.Channel.Consume(r.QueueName, // queue"", // consumerfalse, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)if err != nil {fmt.Printf("获取消费通道异常:%s \n", err)return err}for msg := range msgList {// 处理数据err = receiver.Consumer(msg.Body)if err != nil {fmt.Printf("确认消息未完成异常:%s \n", err)return err} else {// 确认消息,必须为falseerr = msg.Ack(false)if err != nil {fmt.Printf("确认消息完成异常:%s \n", err)}return nil}}return nil
}
test_p.go
package mainimport ("fmt""golong/rabbit_p"
)// 实现发送者
type TestP struct {msgContent []byte
}func (t *TestP) MsgContent() []byte {fmt.Println(string(t.msgContent))return t.msgContent
}func main() {//生产者a := "fxm"p := &TestP{[]byte(a),}mqp := &rabbit_p.RabbitMQ{QueueName: "fxm",}err := mqp.StartP(p)if err != nil {fmt.Println("添加异常 !!!")}
}
test_c.go
package mainimport ("fmt""golong/rabbit_c"
)// 实现接收者
type TestC struct {msgContent string
}func (t *TestC) Consumer(dataByte []byte) error {fmt.Println(string(dataByte))return nil
}func main() {//消费者r := &TestC{}mqr := &rabbit_c.RabbitMQ{QueueName: "fxm",}_ = mqr.StartR(r)
}
发布/订阅
rabbit_p.go
package rabbit_pimport ("errors""fmt""github.com/streadway/amqp"
)const MqUrl = "amqp://fxm(username):fxm(password)@IP:5672/"// 定义全局变量,指针类型
var mqConn *amqp.Connection
var mqChan *amqp.Channel// 定义生产者接口
type Producer interface {MsgContent() []byte
}// 定义RabbitMQ对象
type RabbitMQ struct {Connection *amqp.ConnectionChannel *amqp.ChannelQueueName string // 队列名称RoutingKey string // key名称ExchangeName string // 交换机名称ExchangeType string // 交换机类型
}// 链接rabbitMQ
func (r *RabbitMQ) mqConnect() error {var err errormqConn, err = amqp.Dial(MqUrl)r.Connection = mqConn // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开链接失败:%s \n", err)return err}mqChan, err = mqConn.Channel()r.Channel = mqChan // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开管道失败:%s \n", err)return err}return nil
}// 关闭RabbitMQ连接
func (r *RabbitMQ) mqClose() {// 先关闭管道,再关闭链接err := r.Channel.Close()if err != nil {fmt.Printf("MQ管道关闭失败:%s \n", err)}err = r.Connection.Close()if err != nil {fmt.Printf("MQ链接关闭失败:%s \n", err)}
}// 启动RabbitMQ生产者
func (r *RabbitMQ) StartP(producer Producer) error {// 开启监听生产者发送任务if err := r.listenProducer(producer); err != nil {return err}return nil
}// 发送任务
func (r *RabbitMQ) listenProducer(producer Producer) error {// 处理结束关闭链接defer r.mqClose()// 验证链接是否正常,否则重新链接if r.Channel == nil {if err := r.mqConnect(); err != nil {return err}}//发送消息到一个具名交换机err := r.Channel.ExchangeDeclare(r.ExchangeName, // namer.ExchangeType, // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)if err != nil {fmt.Printf("error: %s \n", err.Error())return err}//confirms := r.Channel.NotifyPublish(make(chan amqp.Confirmation, 1))err = r.Channel.Publish(r.ExchangeName, // exchange"", // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: producer.MsgContent(),})if err != nil {fmt.Printf("MQ任务发送失败:%s \n", err)return errors.New("MQ任务发送失败")}return err
}
rabbit_c.go
package rabbit_cimport ("fmt""github.com/streadway/amqp"
)const MqUrl = "amqp://fxm(username):fxm(password)@IP:5672/"// 定义全局变量,指针类型
var mqConn *amqp.Connection
var mqChan *amqp.Channel// 定义接收者接口
type Receiver interface {Consumer([]byte) error
}// 定义RabbitMQ对象
type RabbitMQ struct {Connection *amqp.ConnectionChannel *amqp.ChannelQueueName string // 队列名称RoutingKey string // key名称ExchangeName string // 交换机名称ExchangeType string // 交换机类型
}// 链接rabbitMQ
func (r *RabbitMQ) mqConnect() error {var err errormqConn, err = amqp.Dial(MqUrl)r.Connection = mqConn // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开链接失败:%s \n", err)return err}mqChan, err = mqConn.Channel()r.Channel = mqChan // 赋值给RabbitMQ对象if err != nil {fmt.Printf("MQ打开管道失败:%s \n", err)return err}return nil
}// 关闭RabbitMQ连接
func (r *RabbitMQ) mqClose() {// 先关闭管道,再关闭链接err := r.Channel.Close()if err != nil {fmt.Printf("MQ管道关闭失败:%s \n", err)}err = r.Connection.Close()if err != nil {fmt.Printf("MQ链接关闭失败:%s \n", err)}
}// 启动RabbitMQ消费者
func (r *RabbitMQ) StartR(receiver Receiver) error {// 处理结束关闭链接defer r.mqClose()// 验证链接是否正常if r.Channel == nil {if err := r.mqConnect(); err != nil {return err}}//发送消息到一个具名交换机err := r.Channel.ExchangeDeclare(r.ExchangeName, // namer.ExchangeType, // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)if err != nil {fmt.Printf("error: %s \n", err.Error())return err}//临时队列 让服务器选择一个随机的队列名,当与消费者断开连接时,这个队列被立即删除q, err := r.Channel.QueueDeclare("", // namefalse, // durable:持久false, // delete when usused:至少有一个使用方的队列在最后一个使用方退订时被删除true, // exclusive:仅由一个连接使用,并且该连接关闭时队列将被删除false, // no-waitnil, // arguments)if err != nil {fmt.Printf("error: %s \n", err.Error())return err}r.QueueName = q.Nameerr = r.Channel.QueueBind(r.QueueName, // queue name"", // routing keyr.ExchangeName, // exchangefalse,nil,)// 获取消费通道,确保rabbitMQ一个一个发送消息_ = r.Channel.Qos(1, 0, true)msgList, err := r.Channel.Consume(r.QueueName, // queue"", // consumerfalse, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)if err != nil {fmt.Printf("获取消费通道异常:%s \n", err)return err}for msg := range msgList {// 处理数据err = receiver.Consumer(msg.Body)if err != nil {fmt.Printf("确认消息未完成异常:%s \n", err)return err} else {// 确认消息,必须为falseerr = msg.Ack(false)if err != nil {fmt.Printf("确认消息完成异常:%s \n", err)}return nil}}return nil
}
test_p.go
package mainimport ("fmt""golong/rabbit_p"
)// 实现发送者
type TestP struct {msgContent []byte
}func (t *TestP) MsgContent() []byte {fmt.Println(string(t.msgContent))return t.msgContent
}func main() {//生产者a := "fxm"p := &TestP{[]byte(a),}mqp := &rabbit_p.RabbitMQ{ExchangeName: "log",ExchangeType: "fanout",}err := mqp.StartP(p)if err != nil {fmt.Println("添加异常 !!!")}
}
test_c.go
package mainimport ("fmt""golong/rabbit_c"
)// 实现接收者
type TestC struct {msgContent string
}func (t *TestC) Consumer(dataByte []byte) error {fmt.Println(string(dataByte))return nil
}func main() {//消费者r := &TestC{}mqr := &rabbit_c.RabbitMQ{ExchangeName: "log",ExchangeType: "fanout",}_ = mqr.StartR(r)
}