监听DB配置变更之go-broadcast简单实现

文章目录

  • 1. 前言
  • 2. 分析
  • 3. 实现
  • 4. 问题
  • 5. 小结
  • 6. 参考

1. 前言

之前遇到一个需求,因为配置的查找是基于db的,而db的更改却无法实时通知到具体利用到这条数据的使用方,为了实现db数据变动时,能够尽快让使用方知道这条数据发生了变更,从而进行后续数据变更等相关逻辑的运行,就需要实现db数据变动时的通知。

在观察者模式中,因为观察者模式是一种一对多的关系模式,即多个观察者观察同一个主题对象,当主题对象发生变化时,会通知所有的观察者对象。

2. 分析

使用观察者模式来实现的话,则需要实现如下四个部分的结构:

  1. 抽象主题
  2. 具体主题
  3. 抽象观察者
  4. 具体观察者

举个例子,在我们日常使用微信公众号中,当你关注了一个公众号,这个公众号如果有更新的话,则会推送给每一个关注过这个公众号的用户。此时我们可以将具体的部分的接收映射到微信公众号中,即:

  1. 抽象主题:公众号,具备订阅、取消订阅和发送消息的功能
  2. 具体主题:具体某一个公众号
  3. 抽象观察者:用户(泛指使用微信公众号的用户受众)
  4. 具体观察者:某一个具体的用户

分析了以上四个结构之后,我们需要实现的功能部分就清楚了。即我们需要实现一个抽象主题,这个主题需要有提供注册、取消注册以及提交信息的能力,当提交信息到抽象主题的时候,抽象主题需要将这个消息通知到所有已经注册过的具体观察者。

3. 实现

在明确了需求之后, 就开始进行功能的实现,因为使用的是go语言,则第一时间肯定是希望通过chan这样的功能来实现,因为chan天生具备监听的能力,我们可以通过监听注册到抽象主题的chan,从而实现抽象主题消息的实时监听。

但秉持着“你需要的功能,基本都有人实现过”的方针,第一时间还是上到了github,看看是否有现成的开源方案,经过一番查找,还真发现了一个开源库可以使用,这个库的名称是go-broadcast。

下面就来说下broadcaster是如何实现上面的功能逻辑的,broadcaster这个库的代码很简单,主体实现逻辑只有110行代码左右,但符合我们的功能逻辑实现需要。

type broadcaster struct {input chan interface{}reg   chan chan<- interface{}unreg chan chan<- interface{}outputs map[chan<- interface{}]bool
}// The Broadcaster interface describes the main entry points to
// broadcasters.
type Broadcaster interface {// Register a new channel to receive broadcastsRegister(chan<- interface{})// Unregister a channel so that it no longer receives broadcasts.Unregister(chan<- interface{})// Shut this broadcaster down.Close() error// Submit a new object to all subscribersSubmit(interface{})// Try Submit a new object to all subscribers return false if input chan is fillTrySubmit(interface{}) bool
}

首先定义了一个接口叫做Broadcaster,然后定义了一个broadcaster实现了Broadcaster的所有方法逻辑。

func (b *broadcaster) Register(newch chan<- interface{}) {b.reg <- newch
}func (b *broadcaster) Unregister(newch chan<- interface{}) {b.unreg <- newch
}func (b *broadcaster) Close() error {close(b.reg)close(b.unreg)return nil
}// Submit an item to be broadcast to all listeners.
func (b *broadcaster) Submit(m interface{}) {if b != nil {b.input <- m}
}
  • Register方法主要实现了将注册的chan直接放入到reg这个chan中,用于后续注册
  • Register方法主要实现了将注册的chan直接让如到ureg这个chan中,用于后续注销
  • Close方法主要是关闭reg和ureg两个chan
  • Submit方法主要实现对抽象主题broadcaster发送消息,将消息放入input这个chan中

上面的方法都是基于chan作为通信的,而chan中有了数据,后续需要消费数据。

// NewBroadcaster creates a new broadcaster with the given input
// channel buffer length.
func NewBroadcaster(buflen int) Broadcaster {b := &broadcaster{input:   make(chan interface{}, buflen),reg:     make(chan chan<- interface{}),unreg:   make(chan chan<- interface{}),outputs: make(map[chan<- interface{}]bool),}go b.run()return b
}

这里的run()方法则是消费所有chan数据的地方。

func (b *broadcaster) broadcast(m interface{}) {for ch := range b.outputs { // 遍历所有注册的chan,将消息发送到注册的chan中ch <- m}
}func (b *broadcaster) run() {for {select {case m := <-b.input: // 如果有消息输入,则广播出去b.broadcast(m)case ch, ok := <-b.reg: // 如果有新注册的,则进行output的添加if ok {b.outputs[ch] = true} else {return}case ch := <-b.unreg: // 如果有注销的,则进行output的删除delete(b.outputs, ch)}}
}

整体的运行图如下:

在这里插入图片描述

  • 对应chan通过reg进行注册,注册后的chan记录在outputs中
  • 对应chan通过ureg进行注销,注销后的chan从output中移除
  • 对应的信息通过input输入,输入后的msg通过遍历outputs注册列表,从而通知到每一个注册者

4. 问题

在使用go-broadcast的过程中,看到之前有个pr加了一个TrySubmit的逻辑,这个逻辑主要是解决当input被装满了以后,broadcast会被阻塞,这个时候如果有新的消息进来,如何办呢?

// TrySubmit attempts to submit an item to be broadcast, returning
// true iff it the item was broadcast, else false.
func (b *broadcaster) TrySubmit(m interface{}) bool {if b == nil {return false}select {case b.input <- m:return truedefault:return false}
}

解决办法是采用select的方法尝试去塞入,塞入不成功则意味着消息提交失败,返回false,让使用者根据消息提交的结果进行后续的逻辑处理。

但这里还存在另外一个问题,库中给了一个样本case,这个样本case基于的条件都是消息传递给chan的时候没有阻塞。如下代码所示:

func (b *broadcaster) broadcast(m interface{}) {for ch := range b.outputs {ch <- m}
}

但一旦有注册的chan消费的时候阻塞了,这时候就会产生问题,会导致其它正常消费的chan因为一个异常chan而全部被阻塞住,导致其他chan都无法正常消费。

这个时候就会导致在input没有满的时候,即消息可以放入,但是消息无法被正常的消费,进而又反向导致input逐渐被塞满,最终导致input无法被塞入,消息也无法被发送到对应的chan中,导致run方法逻辑卡在broadcast中,导致整个运行出现问题。

解决办法:

func (b *broadcaster) broadcast(m interface{}) {for ch := range b.outputs {// if exist one output consume the chan message is too slow,// will block other output receive the msg.select {case ch <- m:default:}}
}

但这种虽然解决了一个chan满消费block其他chan的问题,随之也引入了丢消息的问题了,即有些消费慢的chan,由于chan消费慢导致无法接收新的消息,进而导致新消息丢失的问题。

5. 小结

因为需要实时监听db配置的变更,所以去探寻了一下方案,最终采用了go-broadcast的方案,但在使用go-broadcast的过程中,发现在broadcast消息的时候存在阻塞的行为,为了保证整个服务不被某个chan阻塞而停止运行,在broadcast消息的时候添加了select default条件来规避这个问题。

6. 参考

  • go-broadcast

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/25848.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

3、线性代数

1、矩阵转置 A[i,j]A[j,i] 2、对称矩阵 &#xff1a;A转置A [0,2,3] [2 1 5] [3,5,1] 3、三维矩阵 求和 axis0 两个矩阵相加 axis1 两个向量相加 &#xff0c;axis2 向量内部相加 keepdimsTrue 求和后维度保持不变 4、cumsum累加求和 5、torch.mm() 或 torch.bmm() 【矩…

支持YUV和RGB格式两路视频同时播放

1.头文件&#xff1a; sdlqtrgb.h #pragma once #include <QtWidgets/QWidget> #include "ui_sdlqtrgb.h" #include <thread> class SdlQtRGB : public QWidget {Q_OBJECTpublic:SdlQtRGB(QWidget* parent Q_NULLPTR);~SdlQtRGB(){is_exit_ true;//等…

十大排序

本文将以「 通俗易懂」的方式来描述排序的基本实现。 &#x1f9d1;‍&#x1f4bb;阅读本文前&#xff0c;需要一点点编程基础和一点点数据结构知识 本文的所有代码以cpp实现 文章目录 排序的定义 插入排序 ⭐ &#x1f9d0;算法描述 &#x1f496;具体实现 &#x1f…

LabVIEW硬件与仿真双模式设计液压系统

在实际项目中&#xff0c;结合LabVIEW与液压运动控制器&#xff0c;通过设计两种运行方式&#xff1a;硬件运行和仿真运行&#xff0c;使得系统既能进行实际操作又能进行仿真测试。此设计不仅方便了开发调试&#xff0c;也为教学培训和展示提供了极大的便利。本文将从项目背景、…

【机器学习】基于卷积LSTM的视频预测

1. 引言 1.1 LSTM是什么 LSTM&#xff08;Long Short-Term Memory&#xff09;是一种特殊的循环神经网络&#xff08;RNN&#xff09;变体&#xff0c;旨在解决传统RNN在处理长序列数据时遇到的梯度消失和梯度爆炸问题。LSTM通过引入门控机制和细胞状态的概念&#xff0c;使得…

QPS,平均时延和并发数

我们当前有两个服务A和B&#xff0c;想要知道哪个服务的性能更好&#xff0c;该用什么指标来衡量呢&#xff1f; 1. 单次请求时延 一种最简单的方法就是使用同一请求体同时请求两个服务&#xff0c;性能越好的服务时延越短&#xff0c;即 R T 返回结果的时刻 − 发送请求的…

【Python教程】4-字符串、列表、字典、元组与集合操作

在整理自己的笔记的时候发现了当年学习python时候整理的笔记&#xff0c;稍微整理一下&#xff0c;分享出来&#xff0c;方便记录和查看吧。个人觉得如果想简单了解一名语言或者技术&#xff0c;最简单的方式就是通过菜鸟教程去学习一下。今后会从python开始重新更新&#xff0…

AI大模型探索之路-实战篇16:优化决策流程:Agent智能数据分析平台中Planning功能实践

系列篇章&#x1f4a5; AI大模型探索之路-实战篇4&#xff1a;深入DB-GPT数据应用开发框架调研 AI大模型探索之路-实战篇5&#xff1a;探索Open Interpreter开放代码解释器调研 AI大模型探索之路-实战篇6&#xff1a;掌握Function Calling的详细流程 AI大模型探索之路-实战篇7…

现代x86汇编-环境安装

今天端午节&#xff0c;独自在家&#xff0c;翻阅了张银奎老师编写的《现代x86汇编语言程序设计》一书&#xff0c;前言部分说明书中示例代码都是用微软visual C工具编写并使用微软宏汇编&#xff08;著名的MASM&#xff09;编译的&#xff0c;好久没有用微软vc了&#xff0c;假…

详解FedProx:FedAvg的改进版 Federated optimization in heterogeneous networks

FedProx&#xff1a;2020 FedAvg的改进 论文&#xff1a;《Federated Optimization in Heterogeneous Networks》 引用量&#xff1a;4445 源码地址&#xff1a; 官方实现&#xff08;tensorflow&#xff09;https://github.com/litian96/FedProx 几个pytorch实现&#xff1a;…

基于STM32的595级联的Proteus仿真

文章目录 一、595级联1.题目要求2.思路3.仿真图3.1 未仿真时3.2 模式A3.2 模式B3.3 故障模式 二、总结 一、595级联 1.题目要求 STM32单片机&#xff0c;以及三个LED灯对应红黄绿灯&#xff0c;IIC的OLED显示屏&#xff0c;温湿度传感器DHT11&#xff0c;两个独立按键和两个5…

MySQL时间和日期类型详解(零基础入门篇)

目录 1. DATE 2. DATETIME 3. TIMESTAMP 4. TIME 5. YEAR 6. 日期和时间的使用示例 以下SQL语句的测试可以使用命令行&#xff0c;或是使用SQL工具比如MySQL Workbench或SQLynx等。 在 MySQL 中&#xff0c;时间和日期数据类型用于存储与时间相关的数据&#xff0c;如何…

重温共射放大电路

1、放大概念 小功率信号变成一个大功率信号&#xff0c;需要一个核心器件做这件事&#xff0c;核心器件的能量由电源提供&#xff0c;通过核心器件用小功率的信号去控制大电源&#xff0c;来实现能量的转换和控制&#xff0c;前提是不能失真&#xff0c;可以用一系列正弦波进行…

Running Gradle task ‘assembleDebug‘ Flutter项目

基于Android方面运行Flutter项目一直卡在 Launching lib\main.dart on Android SDK built for x86 in debug mode… Running Gradle task ‘assembleDebug’… 基础原因&#xff1a; 默认存放Gradle插件包的Maven仓库是国外(需VPN) 我的原因&#xff1a; 缺少JDK和缺少Androi…

【Oracle】Oracle导入导出dmp文件

文章目录 前言一、什么是dmp&#xff1f;二、imp/impdp、exp/expdp对比及示例1.区别2.imp/impdp对比及示例a. impb. impbp 3.exp/expdp对比及示例a. expb.expdp 3.其他事项 三、执行导入导出前置条件1.创建角色并授权2.创建目录映射 前言 在工作中&#xff0c;经常会遇到需要备…

React中的 Scheduler

为什么需要调度 在 React 中&#xff0c;组件最终体现为 Fiber&#xff0c;并形成 FiberTree&#xff0c;Fiber 的目的是提高渲染性能&#xff0c;将原先的 React 渲染任务拆分为多个小的微任务&#xff0c;这样做的目的是可以灵活的让出主线程&#xff0c;可以随时打断渲染&a…

定个小目标之刷LeetCode热题(10)

这道题属于一道中等题&#xff0c;看来又得背题了&#xff0c;直接看题解吧&#xff0c;有两种解法 第一种动态规划法 状态&#xff1a;dp[i][j] 表示字符串s在[i,j]区间的子串是否是一个回文串 状态转移方程&#xff1a;当s[i] s[j] && (j - i < 2 || dp[i 1]…

讨论C++类与对象

讨论C类与对象 C语言结构体和C类的对比类的实例化类对象的大小猜想一猜想二针对上述猜想的实践 this指针不同对象调用成员函数 类的6个默认成员函数构造函数析构函数拷贝构造函数浅拷贝和深拷贝 赋值运算符重载 初始化列表初始化顺序 C语言结构体和C类的对比 在C语言中&#x…

对猫毛过敏?怎么有效的缓解过敏症状,宠物空气净化器有用吗?

猫过敏是一种常见的过敏反应&#xff0c;由猫的皮屑、唾液或尿液中的蛋白质引起。这些蛋白质被称为过敏原&#xff0c;它们可以通过空气传播&#xff0c;被人体吸入后&#xff0c;会触发免疫系统的过度反应。猫过敏是宠物过敏中最常见的类型之一&#xff0c;对许多人来说&#…

xilinx的Aurora8B10B的IP仿真及上板测试(高速收发器十七)

前文讲解了Aurora8B10B协议原理及xilinx相关IP&#xff0c;本文讲解如何设置该IP&#xff0c;并且通过示例工程完成该IP的仿真和上板。 1、生成Aurora8B10B IP 如下图所示&#xff0c;首先在vivado的IP catalog中输入Aurora 8B10B&#xff0c;双击该IP。 图1 查找Aurora 8B10…