samber/lo 库的使用方法: 处理 channel

samber/lo 库的使用方法: 处理 channel

samber/lo 是一个 Go 语言库,提供了一些常用的集合操作函数,如 Filter、Map 和 FilterMap。
这个库函数太多,因此我决定按照功能分别介绍,本文介绍的是 samber/lo 库中channel相关的函数。

ChannelDispatcher

将信息从输入通道消息分发到 N 个子通道中。当输入通道关闭时,这个关闭事件会被传播到所有的子通道,也就是说,所有的子通道也会被关闭。这些通道可以有一个固定的缓冲容量,或者当 cap(容量)为 0 时,它们是无缓冲的。

// 创建了一个带有 42 个缓冲区的整数通道 ch,并向其中发送 0 到 10 的整数。
ch := make(chan int, 42)
for i := 0; i <= 10; i++ {ch <- i
}// 使用 lo.ChannelDispatcher 函数创建了 5 个子通道,每个通道的缓冲区大小为 10。
// 这个函数会将 ch 中的数据按照轮询策略分发到这些子通道中。
children := lo.ChannelDispatcher(ch, 5, 10, DispatchingStrategyRoundRobin[int])
// []<-chan int{...}// 定义了一个 consumer 函数,这个函数会从给定的通道中读取数据,直到通道关闭。
// 如果通道已经关闭,ok 会为 false,我们就打印 "closed" 并退出循环。
consumer := func(c <-chan int) {for {msg, ok := <-cif !ok {println("closed")break}println(msg)}
}// 为每个子通道启动一个 consumer goroutine。
// 这样,我们就创建了 5 个并发的消费者,它们会并行地从 ch 中读取数据。
for i := range children {go consumer(children[i])
}

有很多分发策略可用:

  • lo.DispatchingStrategyRoundRobin: 使用轮询策略将消息分发到子通道中。
  • lo.DispatchingStrategyRandom: 使用随机策略将消息分发到子通道中。
  • lo.DispatchingStrategyWeightedRandom: 使用加权随机策略将消息分发到子通道中。
  • lo.DispatchingStrategyFirst: 分发消息到第一个非满的子通道中。
  • lo.DispatchingStrategyLeast: 分发消息到最空的子通道中。
  • lo.DispatchingStrategyMost: 分发消息到最满的子通道中。

其中一些策略会带有回退机制,以便优先考虑非阻塞行为。请参阅实现。

对于自定义策略,只需实现 lo.DispatchingStrategy 原型即可:

type DispatchingStrategy[T any] func(message T, messageIndex uint64, channels []<-chan T) int
  • DispatchingStrategy 是一个函数,它接受三个参数:

  • message T:这是要分发的消息,其类型为泛型 T。

  • messageIndex uint64:这是消息的索引,通常用于确定将消息分发到哪个通道。

  • channels []<-chan T:这是一个通道切片,消息将被分发到这些通道中的一个。

  • 这个函数返回一个 int,表示消息应该被分发到 channels 切片中的哪个通道。

Eg:

type Message struct {TenantID uuid.UUID
}func hash(id uuid.UUID) int {h := fnv.New32a()h.Write([]byte(id.String()))return int(h.Sum32())
}// Routes messages per TenantID.
customStrategy := func(message string, messageIndex uint64, channels []<-chan string) int {destination := hash(message) % len(channels)// check if channel is fullif len(channels[destination]) < cap(channels[destination]) {return destination}// fallback when child channel is fullreturn utils.DispatchingStrategyRoundRobin(message, uint64(destination), channels)
}children := lo.ChannelDispatcher(ch, 5, 10, customStrategy)
...

SliceToChannel

返回一个只读的通道,其中包含了集合中的元素。当最后一个元素被读取后,通道会被关闭。第一个参数是通道的容量,第二个参数是集合。

list := []int{1, 2, 3, 4, 5}for v := range lo.SliceToChannel(2, list) {println(v)
}
// prints 1, then 2, then 3, then 4, then 5

ChannelToSlice

返回一个由通道中的元素构建的切片。阻塞直到通道关闭。

list := []int{1, 2, 3, 4, 5}
ch := lo.SliceToChannel(2, list)items := ChannelToSlice(ch)
// []int{1, 2, 3, 4, 5}

Generator

实现了生成器设计模式。通道在最后一个元素被读取后会被关闭。通道的容量可以被定制。 Generator的第一个参数是通道的容量,第二个参数是生成器函数, 返回一个通道。 其中,生成器函数的参数是一个函数,这个函数用于向通道中发送元素。

generator := func(yield func(int)) {yield(1)yield(2)yield(3)
}for v := range lo.Generator(2, generator) {println(v)
}
// prints 1, then 2, then 3

Buffer

创建一个包含 n 个元素的切片,这些元素来自通道。返回切片、切片长度、读取时间和通道状态(打开/关闭)。第一个参数是通道,第二个参数是切片的长度。

ch := lo.SliceToChannel(2, []int{1, 2, 3, 4, 5})items1, length1, duration1, ok1 := lo.Buffer(ch, 3)
// []int{1, 2, 3}, 3, 0s, true
items2, length2, duration2, ok2 := lo.Buffer(ch, 3)
// []int{4, 5}, 2, 0s, false

示例:RabbitMQ 消费者

ch := readFromQueue()for {// read 1k itemsitems, length, _, ok := lo.Buffer(ch, 1000)// do batching stuffif !ok {break}
}

BufferWithTimeout

和Buffer函数类似, 但是增加了一个超时参数, 如果超时,返回已经读取的元素。

generator := func(yield func(int)) {for i := 0; i < 5; i++ {yield(i)time.Sleep(35*time.Millisecond)}
}ch := lo.Generator(0, generator)items1, length1, duration1, ok1 := lo.BufferWithTimeout(ch, 3, 100*time.Millisecond)
// []int{1, 2}, 2, 100ms, true
items2, length2, duration2, ok2 := lo.BufferWithTimeout(ch, 3, 100*time.Millisecond)
// []int{3, 4, 5}, 3, 75ms, true
items3, length3, duration2, ok3 := lo.BufferWithTimeout(ch, 3, 100*time.Millisecond)
// []int{}, 0, 10ms, false

示例:RabbitMQ 消费者

ch := readFromQueue()for {// read 1k items// wait up to 1 seconditems, length, _, ok := lo.BufferWithTimeout(ch, 1000, 1*time.Second)// do batching stuffif !ok {break}
}

示例:多线程的 RabbitMQ 消费者

ch := readFromQueue()// 5 workers
// prefetch 1k messages per worker
children := lo.ChannelDispatcher(ch, 5, 1000, lo.DispatchingStrategyFirst[int])consumer := func(c <-chan int) {for {// read 1k items// wait up to 1 seconditems, length, _, ok := lo.BufferWithTimeout(ch, 1000, 1*time.Second)// do batching stuffif !ok {break}}
}for i := range children {go consumer(children[i])
}

FanIn

合并多个输入通道的消息到一个缓冲通道中。输出消息没有优先级。当所有的上游通道到达 EOF 时,下游通道关闭。

stream1 := make(chan int, 42)
stream2 := make(chan int, 42)
stream3 := make(chan int, 42)all := lo.FanIn(100, stream1, stream2, stream3)
// <-chan int

FanOut

广播所有上游消息到多个下游通道。当上游通道到达 EOF 时,下游通道关闭。如果任何下游通道已满,广播将暂停。

stream := make(chan int, 42)all := lo.FanOut(5, 100, stream)
// [5]<-chan int

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/692739.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

adb-环境安装

1. 下载解压包&#xff1a;百度网盘 请输入提取码百度网盘为您提供文件的网络备份、同步和分享服务。空间大、速度快、安全稳固&#xff0c;支持教育网加速&#xff0c;支持手机端。注册使用百度网盘即可享受免费存储空间https://pan.baidu.com/s/1TDu2fzGbqCyug3wCSmV9oQ?pwd…

Android无法获取已安装应用包名的问题

前言 在某些情况下&#xff0c;我们需要获取android上已安装的第三方应用的一些信息 检索 private boolean isAppInstalled(Context context, String pkgName) {if (pkgName null || pkgName.isEmpty()) {return false;}PackageInfo packageInfo;try {packageInfo context…

UI美化stylesheet

一、网上找到自己喜欢的图标 大家可以每个图标类型找出三种不同的颜色&#xff0c;方便后续美化效果&#xff0c;这里我每种只找了一个。&#xff08;随便找的&#xff0c;最后效果不好看&#xff09; 将这个文件夹复制到项目的文件夹中。 然后右键Add New…选择QT&#xff0c…

list链表

1. list基本概念 功能&#xff1a;将数据进行链式存储 链表&#xff08;list&#xff09;是一种物理存储单元上非连续的存储结构&#xff0c;数据元素的逻辑顺序是通过链表中的指针链接实现的 链表的组成&#xff1a;链表由一系列结点组成 结点的组成&#xff1a;一个是存储数据…

阿里云OSS和SEC服务器,免费ssl证书申请和安装

一&#xff1a;阿里云OSS证书申请和安装 1创建免费证书等待签发 2验证&#xff0c;复制DNS解析配置 3在主体域名中解析DNS&#xff08;记录复制上面的证书申请配置&#xff09; 4验证域名DNS配置 5下载证书 6安装OSS证书 7上传证书&#xff08;下载的证书解压&#xff09…

Paddlepaddle使用自己的VOC数据集训练目标检测(0废话简易教程)

一 安装paddlepaddle和paddledection&#xff08;略&#xff09; 笔者使用的是自己的数据集 二 在dataset目录下新建自己的数据集文件&#xff0c;如下&#xff1a; 其中 xml文件内容如下&#xff1a; 另外新建一个createList.py文件&#xff1a; # -- coding: UTF-8 -- imp…

探索水下低光照图像检测性能,基于DETR(DEtection TRansformer)模型开发构建海底生物检测识别分析系统

海底这类特殊数据场景下的检测模型开发相对来说比较少&#xff0c;在前面的博文中也有一些涉及&#xff0c;感兴趣的话可以自行移步阅读即可&#xff1a; 《尝试探索水下目标检测&#xff0c;基于yolov5轻量级系列模型n/s/m开发构建海底生物检测系统》 《基于YOLOv5C3CBAMCBA…

app逆向-平头哥框架ratel使用

文章目录 一、前言二、实现逻辑1、安装ratel手机端app2、使⽤电脑端进⾏感染目标app3、开发⼀个平头哥插件 一、前言 平头哥&#xff08;ratel&#xff09;是⼀个Android逆向分析⼯具套件&#xff0c;他提供⼀系列渐进式app逆向分析⼯具。同时平头哥也是⼀个app⼆次开发的沙箱…

Java语言概述(三)

Java 是一种通用的、面向对象的编程语言&#xff0c;支持并发编程、网络编程和多线程等特性。它由 Sun Microsystems 公司在 1995 年推出&#xff0c;并且随着其开源和跨平台的特性&#xff0c;Java 已经成为了当今最流行的编程语言之一。 Java 的主要特点包括&#xff1a; 跨…

【运维】站点可靠性工程介绍:研发,运维,SRE,Devops的关系

文章目录 1、什么是SRE2、SRE与研发、运维的区别 1、什么是SRE 站点可靠性工程&#xff08;SRE&#xff09; 是 IT 运维的软件工程方案。 SRE 团队使用软件作为工具&#xff0c;来管理系统、解决问题并实现运维任务自动化。 SRE 执行的任务以前通常由运维团队手动执行&#x…

ChatGPT遭受匿名苏丹组织DDoS攻击:网络安全在AI时代的新挑战

ChatGPT遭受匿名苏丹组织DDoS攻击&#xff1a;网络安全在AI时代的新挑战 最近&#xff0c;全球备受瞩目的AI对话机器人ChatGPT及其开发公司OpenAI遇到了一连串的服务中断和异常错误&#xff0c;这一系列问题背后似乎暗藏了分布式拒绝服务&#xff08;DDoS&#xff09;攻击的阴…

Huggingface镜像网站下载语言模型方法

通常通过镜像网站下载https://hf-mirror.com/。 在链接页面有介绍方法&#xff0c;对于不大的模型可以直接下载。这里介绍比较常用且方便的下载方法。 使用huggingface 官方提供的 huggingface-cli 命令行工具 安装&#xff08;huggingface_hub、hf_transfer安装可以使用-i命…

Elasticsearch:创建自定义 ES Rally tracks 的分步指南

作者&#xff1a;Alejandro Snchez 按照这个综合教程学习如何制作个性化的 Rally tracks ES Rally 是什么&#xff1f;它的用途是什么&#xff1f; ES Rally 是一个用于在 Elasticsearch 上测试性能的工具&#xff0c;允许你运行和记录比较测试。 做出决策可能很困难&#x…

OpenCV统计函数之minMaxLoc和meanStdDev

在OpenCV中&#xff0c;minMaxLoc和meanStdDev是两个用于统计图像或数组中元素的基本特性的函数。这些统计函数对于图像处理、特征提取和数据分析非常有用。 minMaxLoc minMaxLoc函数用于查找数组或图像中的最小值和最大值&#xff0c;并可选地返回这些值的位置。这在处理图像…

Vue模版语法之属性绑定v-bind

双大括号不能在 HTML 属性中使用。想要响应式地绑定一个属性&#xff0c;应该使用 v-bind 指令 1. 使用v-bind绑定属性 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>新建页面</title><sc…

【Java中23种设计模式-单例模式--饿汉式】

加油&#xff0c;新时代打工人&#xff01; 简单粗暴&#xff0c;直接上代码。 23种设计模式定义介绍 Java中23种设计模式-单例模式 Java中23种设计模式-单例模式2–懒汉式线程不安全 Java中23种设计模式-单例模式2–懒汉式2线程安全 package mode;/*** author wenhao* dat…

一个PDF处理利器的.Net开源项目

在项目开发中&#xff0c;处理PDF文件是一个非常常见的需求&#xff0c;之前也推荐几个&#xff0c;今天继续给大家推荐一个强大且易于使用的开源库&#xff0c;专门用于处理PDF文件&#xff0c;它提供了一系列功能强大的工具&#xff0c;帮助开发人员轻松地解析、修改和创建PD…

【蓝桥杯基础】1.7星系炸弹

问题 在X星系的广袤空间中漂浮着许多X星人造“炸弹”&#xff0c;用来作为宇宙中的路标。 每个炸弹都可以设定多少天之后爆炸。 比如&#xff1a;阿尔法炸弹2015年1月1日放置&#xff0c;定时为15天&#xff0c;则它在2015年1月16日爆炸。 有一个贝塔炸弹&#xff0c;2014年…

基于物联网智慧公厕的多功能城市智慧驿站

在现代城市发展中&#xff0c;智慧化已经成为了一个不可或缺的趋势。而多功能城市智慧驿站&#xff0c;作为智慧城市建设的一部分&#xff0c;以物联网智慧公厕为基础&#xff0c;集合了诸多功能于一身&#xff0c;成为了城市中不容忽视的存在。多功能城市智慧驿站也称为轻松的…

Spring Cloud部署篇1——Jar包部署至CentOS云服务器

一、项目介绍 系统模块 com.mingink |--mingink-api // 接口模块 | └──mingink-api-system // 系统接口 |--mingink-common // 通用模块 | └──mingink-common-core // 系统接口 |--mingink-gateway…