golang rabbitmq客户端连接及重连

目录

  • 1、连接、发送、发送异常、重连
  • 2、调用示例

1、连接、发送、发送异常、重连

package rabbitmqimport ("encoding/json""fmt""time""github.com/sirupsen/logrus""github.com/streadway/amqp"
)type RabbitMQ struct {conn            *amqp.Connectionchannel         *amqp.Channelconfigs         RabbitMqConfigconnErrorChan   chan *amqp.ErrorreturnErrorChan chan amqp.ReturnactivateChan    chan interface{}
}func NewRabbitMQ() *RabbitMQ {return &RabbitMQ{}
}// Init 初始化队列服务
func (r *RabbitMQ) Init(cfg RabbitMqConfig) error {// 建立连接conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s%s",cfg.User,cfg.PassWord,cfg.Addr,cfg.VHost))if err != nil {return err}r.conn = connr.configs = cfg// 创建管道if err := r.initChannel(); err != nil {return err}// 交换if err := r.exchangeDeclare(); err != nil {return err}// 队列if err := r.queueDeclare(); err != nil {return err}// 队列绑定交换if err := r.queueBind(); err != nil {return err}// 保持连接活动状态r.connErrorChan = make(chan *amqp.Error, 1)r.conn.NotifyClose(r.connErrorChan)go r.reopen()// 消息没有成功路由到队列监控r.returnErrorChan = make(chan amqp.Return, 1)r.channel.NotifyReturn(r.returnErrorChan)go r.messagePushError()r.activateChan = make(chan interface{}, 1)return nil
}// reopen 重试
func (r *RabbitMQ) reopen() {for {select {case err := <-r.connErrorChan:logrus.WithError(err).Error("RabbitMq server exception retry")if r.conn != nil {r.conn = nil        // 清除连接r.activateChan <- 1 // 通知监控协程结束}// r.isActivate = falsetime.Sleep(time.Second * time.Duration(r.configs.Interval))if err := r.Init(r.configs); err != nil {logrus.WithError(err).Error("reopen queue rabbitmq")continue}logrus.Info("reopen rabbitmq success ")return}}
}// messagePushError 消息发送到队列时异常监控
func (r *RabbitMQ) messagePushError() {for {select {case v, ok := <-r.returnErrorChan:if !ok {continue}logrus.WithFields(map[string]interface{}{"code":    v.ReplyCode,"message": v.ReplyText,"content": string(v.Body),}).Error("send to rabbitmq failed")case <-r.activateChan:logrus.Info("The current connection has been interrupted")return}}
}// initChannel 初始化管道
func (r *RabbitMQ) initChannel() error {channel, err := r.conn.Channel()if err != nil {return err}if err := channel.Qos(1,     // prefetch count0,     // prefetch sizefalse, // global); err != nil {return err}r.channel = channelreturn nil
}// exchangeDeclare 创建交换器
func (r *RabbitMQ) exchangeDeclare() error {exchange := r.configs.RabbitmqExchangereturn r.channel.ExchangeDeclare(exchange.Name,exchange.Kind,r.configs.Durable,r.configs.AutoDelete,r.configs.Internal,r.configs.NoWait, nil)
}// queueDeclare 创建队列
func (r *RabbitMQ) queueDeclare() error {_, err := r.channel.QueueDeclare(r.configs.RabbitmqQueue.Name,r.configs.Durable,r.configs.AutoDelete,r.configs.Internal,r.configs.NoWait, nil)return err
}// queueBind 队列与交换器的绑定
func (r *RabbitMQ) queueBind() error {return r.channel.QueueBind(r.configs.RabbitmqQueue.Name,r.configs.RabbitmqQueue.Name,r.configs.RabbitmqExchange.Name,r.configs.NoWait, nil)
}// Send 消息发送
func (r *RabbitMQ) Send(message interface{}) error {messageByte, err := json.Marshal(message)if err != nil {return err}err = r.channel.Publish("",                           // 交换机r.configs.RabbitmqQueue.Name, // 路由队列的Keytrue,                         // 发送到队列失败是否进行通知false,                        // 目标队列没有消费者时是否进行通知,官方不建议开启amqp.Publishing{Headers:         amqp.Table{},ContentType:     "text/plain",ContentEncoding: "",DeliveryMode:    amqp.Persistent, // 消息持久化Body:            messageByte,},)if err != nil {return err}return nil
}// Close 关闭服务
func (r *RabbitMQ) Close() error {if err := r.conn.Close(); err != nil {return err}return nil
}// 队列配置项结构
type RabbitMqConfig struct {Addr             string           `mapstructure:"addr"`        // mq地址VHost            string           `mapstructure:"vhost"`       // mq的vhostUser             string           `mapstructure:"user"`        // mq用户名PassWord         string           `mapstructure:"password"`    // mq密码Durable          bool             `mapstructure:"durable"`     //持久化标识, true: 持久化, false: 否AutoDelete       bool             `mapstructure:"auto_delete"` //是否自动删除, true: 是, false: 否Internal         bool             `mapstructure:"internal"`    //是否是内部的NoWait           bool             `mapstructure:"nowait"`      //是否等待服务器确认, true: 不等待, false: 等待Interval         int              `mapstructure:"interval"`    // 重连时间间隔RabbitmqExchange RabbitmqExchange `mapstructure:"exchange"`RabbitmqQueue    RabbitmqQueue    `mapstructure:"queue"`
}type RabbitmqExchange struct {Name string `mapstructure:"name"` // 交换机名称Kind string `mapstructure:"kind"` // 交换机类型, direct: 默认值, 使用路由, fanout: 不使用路由,topic: 订阅,
}
type RabbitmqQueue struct {Name string `mapstructure:"name"` //队列名称
}

 

2、调用示例

package mainimport ("standard/rabbitmq/rmq"    //自行替换为上面的包的位置"github.com/sirupsen/logrus"
)func main() {cfg := rabbitmq.RabbitMqConfig{Addr:       "127.0.0.1:5672",VHost:      "/",User:       "guest",PassWord:   "guest",Durable:    true,AutoDelete: false,Internal:   false,NoWait:     false,RabbitmqExchange: rabbitmq.RabbitmqExchange{Name: "exchange.test",Kind: "direct",},RabbitmqQueue: rabbitmq.RabbitmqQueue{Name: "queue.test",},Interval: 2,}err := rabbitmq.NewRabbitMQ().Init(cfg)if err != nil {logrus.WithError(err).Error("init rabbit")return}logrus.Info("init rabbitmq success")/**err = rabbitmq.NewRabbitMQ().Send(nil)  // 发送数据至队列if err != nil {logrus.WithError(err).Error("init rabbit")return}logrus.Info("send to rabbitmq success")**/select {}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/1464.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spark在大数据集群下的部署

Spark部署文档 前提:需要保证配置好了三台装好hadoop的虚拟机hadoop102&#xff0c;hadoop103&#xff0c;hadoop104 下载地址 https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz 条件 PYTHON 推荐3.8JDK 1.8 解压 解压下载的Spark安装包 tar…

春秋云境:CVE-2022-25578[漏洞利用]

通过题目标题查询漏洞信息 所以我们渗透的重点就要放在.htaccess文件上 这是一种分布式配置文件&#xff0c;所以我们先寻找web管理登录页面 打开主页就能看到右下角的“管理”&#xff0c;或者我们使用dirsearch进行扫描也可以 在登录页面尝试弱口令登录 输入该CMS相关的一…

几道练习题八

第 1 题 【 问答题 】 • 多项式相加 我们经常遇到两多项式相加的情况&#xff0c; 在这里&#xff0c; 我们就需要用程序来模拟实现把两个多项式相加到一起。 首先&#xff0c; 我们会有两个多项式&#xff0c;每个多项式是独立的一行&#xff0c; 每个多项式由系数、 幂数这样…

【游戏专区】贪吃蛇

1&#xff0c;游戏背景 贪吃蛇&#xff08;Snake&#xff09;是一款经典的电子游戏&#xff0c;最初在1976年由 Gremlin 公司开发。它的游戏背景相对简单&#xff0c;但具有高度的成瘾性。 1. **游戏场景**&#xff1a;通常在一个有界的矩形区域内进行&#xff0c;可以是一个…

关于Android绘制这一遍就够了

Android绘制基础 Android平台提供了一套完整的UI框架&#xff0c;其中包括了绘制组件和绘制API。在Android中&#xff0c;绘制主要涉及到两个核心概念&#xff1a;Canvas和Paint。 Canvas Canvas是Android中的一个类&#xff0c;它代表了绘图的画布。你可以在这个画布上进行…

Swift函数与闭包

一.Swift函数的定义与调用&#xff1a; Swift中的函数使用func关键字进行定义&#xff0c;语法如下&#xff1a; func 函数名(参数名1: 参数类型1, 参数名2: 参数类型2, ...) -> 返回类型 {// 函数体// 执行逻辑return 返回值 }其中&#xff0c;参数名和参数类型是可选的&…

Android Studio实现页面跳转

建立文件 temp.xml <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"match_parent"android:layout_height"match_parent"…

layui--table里使用switch

1. 项目需求 在layui.table上面渲染后的列表上面加一个switch开关&#xff0c;监听switch开关的动作&#xff0c;实现本列数据的状态切换&#xff01; 实现效果如下&#xff1a; 2. 实现方式 下面介绍的思路都是利用table的templet模板实现的&#xff0c;不同的在于模板代码…

Linux-内存文件

1. 基础IO操作 1.1 c语言的IO接口 fopen&#xff1a;打开一个文件&#xff0c;按照指定方式 参数&#xff1a;filename 文件名&#xff0c;也可以是路径&#xff0c;mode&#xff1a;打开方式 返回打开的文件指针 fread&#xff1a;从指定流中读数据 参数&#xff1a;从FIL…

Vuex 的原理

Vuex 是一个专为 Vue.js 应用程序开发的状态管理模式。每一个 Vuex 应用的核心就是 store&#xff08;仓库&#xff09;。“store” 基本上就是一个容器&#xff0c;它包含着你的应用中大部分的状态 ( state )。 Vuex 的状态存储是响应式的。当 Vue 组件从 store 中读取状态的…

没有理由不加倍努力

最近su7很火&#xff0c;各隐藏大佬都纷纷从后台来到前台&#xff0c;把整个网红界的网红等级提升了好几个档次。红衣大叔更是借此机会在疯狂地打造自己的网红IP。 千亿大佬都这还般努力&#xff0c;作为平民的自己哪还有不努力的理由。 加倍努力&#xff01;

29 共享内存

共享内存区是最快的IPC形式&#xff0c;一旦这样的内存映射到共享它的进程的地址空间&#xff0c;这些进程间数据传递不再涉及到内核&#xff0c;不再执行进入内核的系统调用来传递彼此的数据 原理 系统在内存中申请一段空间&#xff0c;通过页表映射挂接到进程的共享区&#…

Linux--链表 第二十五天

1. 链表 t1.next -> data t1.next->next->data .(点号)的优先级比->的大 所以 t1.next->data 就可以了 不用(t1.next)->data 2. 链表的静态增加和动态遍历 打印链表算法&#xff0c; void printLink(struct Test *head) { struct Te…

【前端面试常问】Promise与Async/Await

Promise与Async/Await &#x1f680; JavaScript中的两个重要概念——Promise和Async/Await&#xff0c;它们是我们处理异步编程时不可或缺的工具&#xff0c;让我们能够更优雅地驾驭回调地狱 &#x1f480;。 &#x1f31f; Promise 一个Promise对象代表了一个现在、将来或永…

Python机器学习项目开发实战:怎么处理图像内容分析

注意&#xff1a;本文的下载教程&#xff0c;与以下文章的思路有相同点&#xff0c;也有不同点&#xff0c;最终目标只是让读者从多维度去熟练掌握本知识点。 下载教程&#xff1a;Python机器学习项目开发实战_图像内容分析_编程案例解析实例详解课程教程.pdf Python在机器学习…

剑指Offer题目笔记32(拓扑排序)

面试题113&#xff1a; 解决方案&#xff1a; 将课程看成图中的节点&#xff0c;如果两门课程存在先修顺序那么它们在图中对应的节点之间存在一条从先修课程到后修课程的边&#xff0c;因此这是一个有向图。可行的修课序列实际上是图的拓扑排序序列。图中的每条边都是从先修课…

Web前端框架/库/工具

前言 前端从步枪&#xff08;原生js&#xff09;到了半自动武器&#xff08;jQuery&#xff09;并进化为全自动武器&#xff08;三大框架&#xff08;angular&#xff0c;react&#xff0c;vue及其生态链&#xff09;&#xff09;。 常说工欲善其事必先利其器。对于那些想要提…

【c++11】看完立马就懂--右值引用!!!

右值引用 一、什么是右值&#xff1f;什么是左值&#xff1f;二、右值引用三、右值引用的好处四、万能引用五、完美转发 一、什么是右值&#xff1f;什么是左值&#xff1f; 首先&#xff0c;当我们看到右值的时候&#xff0c;我们很自然的就会产生疑问&#xff1f; 什么的右边…

黑马鸿蒙学习5:LIST容器

LIST容器&#xff0c;其实就是如果FOREACH容器展示不全的话&#xff0c;会自动有滚动条了。要注意的是&#xff0c;LIST中必须有固定的listitem这个项&#xff0c;而且列表里面只能包含一个根组件。 必须把ROW容器放到listitem中&#xff0c;如下&#xff1a;

51、图论-岛屿数量

思路&#xff1a; 该问题要求在一个由 1&#xff08;表示陆地&#xff09;和 0&#xff08;表示水&#xff09;组成的二维网格中&#xff0c;计算岛屿的数量。岛屿被水包围&#xff0c;并且通过水平或垂直连接相邻的陆地可以形成。这个问题的核心是识别并计数网格中相连的陆地…