go-redis源码解析:连接池原理

1. 执行命令的入口方法

redis也是通过hook执行命令,initHooks时,会将redis的hook放在第一个

img

通过hook调用到process方法,process方法内部再调用_process

img

2. 线程池初始化

redis在新建单客户端、sentinel客户端、cluster客户端等,都会newConnPool初始化线程池

2.1.1. NewClient方式初始化连接池

// NewClient returns a client to the Redis Server specified by Options.
func NewClient(opt *Options) *Client {opt.init()c := Client{baseClient: &baseClient{opt: opt,},}c.init()// 初始化线程池c.connPool = newConnPool(opt, c.dialHook)return &c
}

2.1.2. NewFailoverClient方式初始化连接池

// NewFailoverClient returns a Redis client that uses Redis Sentinel
// for automatic failover. It's safe for concurrent use by multiple
// goroutines.
// zhmark 2024/6/13 NewFailoverClient
func NewFailoverClient(failoverOpt *FailoverOptions) *Client {if failoverOpt.RouteByLatency {panic("to route commands by latency, use NewFailoverClusterClient")}if failoverOpt.RouteRandomly {panic("to route commands randomly, use NewFailoverClusterClient")}sentinelAddrs := make([]string, len(failoverOpt.SentinelAddrs))copy(sentinelAddrs, failoverOpt.SentinelAddrs)// todo:2024/6/26 有问题,每次都是换成1、3、2// 将 sentinelAddrs 切片中的元素顺序随机打乱,实现随机化效果rand.Shuffle(len(sentinelAddrs), func(i, j int) {//交换 sentinelAddrs 中第 i 个和第 j 个元素sentinelAddrs[i], sentinelAddrs[j] = sentinelAddrs[j], sentinelAddrs[i]})failover := &sentinelFailover{opt:           failoverOpt,sentinelAddrs: sentinelAddrs,}opt := failoverOpt.clientOptions()// 初始化赋值连接建立函数opt.Dialer = masterReplicaDialer(failover)opt.init()var connPool *pool.ConnPoolrdb := &Client{baseClient: &baseClient{opt: opt,},}rdb.init()
// 初始化线程池connPool = newConnPool(opt, rdb.dialHook)rdb.connPool = connPoolrdb.onClose = failover.Closefailover.mu.Lock()// 关闭老的有问题的地址连接//如:发现新读取的主节点地址和本地保存的不一样,将之前和老的主节点连接断开// addr是新的master地址failover.onFailover = func(ctx context.Context, addr string) {_ = connPool.Filter(func(cn *pool.Conn) bool {// 如果连接的远程地址与 addr 不同,则返回 true,表示要关闭此连接;否则返回 false,表示保留该连接return cn.RemoteAddr().String() != addr})}failover.mu.Unlock()return rdb
}

2.1. NewClusterClient方式初始化线程池

cluster模式和上面的NewClient、NewFailoverClient不一样。cluster模式new的时候不会初始化连接池,而是等执行命令时,获取所有节点,每个节点新建一个redisClient,每个client单独一个连接池

2.1.1. 初始化NewClusterClient时不会新建连接池

// NewClusterClient returns a Redis Cluster client as described in
// http://redis.io/topics/cluster-spec.
func NewClusterClient(opt *ClusterOptions) *ClusterClient {// 初始化opt,其中会初始化NewClient方法opt.init()c := &ClusterClient{opt:   opt,nodes: newClusterNodes(opt),}// 获取所有主从节点信息,并保存在本地c.state = newClusterStateHolder(c.loadState)// 保存命令详情c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)c.cmdable = c.Processc.initHooks(hooks{dial:       nil,process:    c._process,pipeline:   c.processPipeline,txPipeline: c.processTxPipeline,})return c
}

2.1.2. 执行命令时,通过cmdNode执行到NewClient,初始化线程池

img

通过clOpt的NewClient方法,初始化client,进而初始化线程池

img

2.1.3. 然而clOpt的NewClient方法什么时候初始化赋值的呢

在NewClusterClient方法的opt.init()中

img

img

3. 如何新建连接

总览图

img

3.1.1. 第一次执行命令时,go-redis会先通过cmdNode方法,获取所有的节点信息

img

3.1.2. 底层调用到ClusterSlots方法,触发redis.go中_process方法,内部调用_withConn方法,通过getConn方法获取可用连接

img

3.1.3. getConn方法内部发现无可用连接,则会调用newConn

3.1.4. newConn内部,调用连接池的dialConn方法触发调用

img

3.1.5. dialConn调用配置项的Dialer方法

img

3.1.6. p.cfg.Dialer在newConnPool时候初始化的,通过Dialer方法,触发dialer

img

3.1.7. 而dialer是newClient时传入的dialhook,至此直接触发了dialhook

img

3.1.8. sentinel模式也是在NewFailoverClient时传入的dialhook

img

3.1.9. redis自己的dialHook内部,执行的是opt的Dialer方法

img

3.1.10. 此Dialer方法是在NewClient中opt.init()初始化方法中赋值的,如果没有自定义,就用默认的建连方法

img

3.1.11. 默认的建连方法很简单,调用go底层的net建立连接

img

3.1.12. sentinel模式不一样,NewFailoverClient方法有自定义建连方法

img

3.1.13. 里面实现了读写分离

img

4. 闲置连接如何关闭

看是否有配置MinIdleConnsMaxIdleConns。如果有配置了MinIdleConns,那么在NewConnPool、popIdle、removeConn时,都会调用checkMinIdleConns补充创建最低闲置连接数

// Minimum number of idle connections which is useful when establishing
// new connection is slow.
// Default is 0. the idle connections are not closed by default.
MinIdleConns int
// Maximum number of idle connections.
// Default is 0. the idle connections are not closed by default.
MaxIdleConns int

img

每次执行完方法,会释放连接

img

img

5. 如何控制闲置连接数大小

6. 如何控制总连接数

poolSize:控制最大并发量

turn可能为0,闲置连接数为最大poolSize

img

img

img

7. 如何保持连接池内的连接健康

每次Get连接时,会检查连接是否健康

func (p *ConnPool) Get(ctx context.Context, isReadCmd bool) (*Conn, error) {if p.closed() {return nil, ErrClosed}// 排队if err := p.waitTurn(ctx); err != nil {return nil, err}for {p.connsMu.Lock()// 获取一个可用的连接cn, err := p.popIdle(isReadCmd)p.connsMu.Unlock()if err != nil {p.freeTurn()return nil, err}if cn == nil {break}// 读请求走replica,只是多一层保护if p.cfg.ReadMode == _const.READ_MODE_REPLICA {if isReadCmd && cn.remoteType != REMOTE_TYPE_REPLICA {continue}// 写请求不走replicaif !isReadCmd && cn.remoteType == REMOTE_TYPE_REPLICA {continue}}if !p.isHealthyConn(cn) {_ = p.CloseConn(cn)continue}atomic.AddUint32(&p.stats.Hits, 1)return cn, nil}atomic.AddUint32(&p.stats.Misses, 1)// zhmark 2024/6/18 如果连接池里没有可用的连接,那么新建连接newcn, err := p.newConn(ctx, true, isReadCmd)if err != nil {p.freeTurn()return nil, err}return newcn, nil
}

img

7.1. isHealthyConn内方法解析

// zhmark 2024/7/8 连接关键检查,维护连接池连接健康
func (p *ConnPool) isHealthyConn(cn *Conn) bool {now := time.Now()// ConnMaxLifetime 默认为0if p.cfg.ConnMaxLifetime > 0 && now.Sub(cn.createdAt) >= p.cfg.ConnMaxLifetime {return false}// ConnMaxIdleTime Default is 30 minutes. -1 disables idle timeout checkif p.cfg.ConnMaxIdleTime > 0 && now.Sub(cn.UsedAt()) >= p.cfg.ConnMaxIdleTime {return false}if connCheck(cn.netConn) != nil {return false}cn.SetUsedAt(now)return true
}

7.1.1. 连接使用时长检验

    1. ConnMaxLifetime默认为0,如果配置了ConnMaxLifetime,那么如果当前时间离连接创建时间超过ConnMaxLifetime,则会判定连接为不健康,进而关闭连接

7.1.2. 连接空闲时长检验

    1. ConnMaxIdleTime,默认为30分钟,如果连接超过ConnMaxIdleTime时间未使用,则会判定连接为不健康

7.1.3. 检查底层网络连接状态

func connCheck(conn net.Conn) error {// Reset previous timeout._ = conn.SetDeadline(time.Time{})sysConn, ok := conn.(syscall.Conn)if !ok {return nil}rawConn, err := sysConn.SyscallConn()if err != nil {return err}var sysErr errorif err := rawConn.Read(func(fd uintptr) bool {var buf [1]byten, err := syscall.Read(int(fd), buf[:])switch {case n == 0 && err == nil:sysErr = io.EOFcase n > 0:sysErr = errUnexpectedReadcase err == syscall.EAGAIN || err == syscall.EWOULDBLOCK:sysErr = nildefault:sysErr = err}return true}); err != nil {return err}return sysErr
}

8. 如何实时监控连接池状态

PoolStats

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/43690.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网站更新改版了

✅作者简介:大家好,我是Leo,热爱Java后端开发者,一个想要与大家共同进步的男人😉😉 🍎个人主页:Leo的博客 💞当前专栏:Leo杂谈 ✨特色专栏:MySQL学…

大模型/NLP/算法面试题总结2——transformer流程//多头//clip//对比学习//对比学习损失函数

用语言介绍一下Transformer的整体流程 1. 输入嵌入(Input Embedding) 输入序列(如句子中的单词)首先通过嵌入层转化为高维度的向量表示。嵌入层的输出是一个矩阵,每一行对应一个输入单词的嵌入向量。 2. 位置编码&…

潜在空间可视化(Latent space visualization)

在“深度学习”系列中,我们不会看到如何使用深度学习来解决端到端的复杂问题,就像我们在《A.I. Odyssey》中所做的那样。我们更愿意看看不同的技术,以及一些示例和应用程序。 1、引言 上次(Autoencoders - Deep Learning bits #…

c++习题07-求小数的某一位

目录 一,问题 二,思路 三,代码 一,问题 二,思路 被除数a的类型设置为long long类型,a变量需要变大,需要更大的数据类型来存储除数b和指定的小数位置n为int类型,这两个变量的的…

Scissor算法-从含有表型的bulkRNA数据中提取信息进而鉴别单细胞亚群

在做基础实验的时候,研究者都希望能够改变各种条件来进行对比分析,从而探索自己所感兴趣的方向。 在做数据分析的时候也是一样的,我们希望有一个数据集能够附加了很多临床信息/表型,然后二次分析者们就可以进一步挖掘。 然而现实…

共生与变革:AI在开发者世界的角色深度剖析

在科技日新月异的今天,人工智能(AI)已不再是遥不可及的概念,而是逐步渗透到我们工作与生活的每一个角落。对于开发者这一群体而言,AI的崛起既带来了前所未有的机遇,也引发了关于其角色定位的深刻讨论——AI…

【分布式系统】ceph部署(命令+截图巨详细版)

目录 一.存储概述 1.单机存储设备 2.单机存储的问题 3.商业存储 4.分布式存储​编辑 4.1.什么是分布式存储 4.2.分布式存储的类型 二.ceph概述 1.ceph优点 2.ceph架构 3.ceph核心组件 4.OSD存储后端 5.ceph数据存储过程 6.ceph版本发行生命周期 7.ceph集群部署 …

二叉树超详细解析

二叉树 目录 二叉树一级目录二级目录三级目录 1.树的介绍1.1树的定义1.2树的基本术语1.3相关性质 2.二叉树介绍2.1定义2.2 性质 3.二叉树的种类3.1 满二叉树3.2完全二叉树3.3 二叉查找树特点:二叉查找树的节点包含的基本信息: 3.4 平衡二叉树 4.二叉树的…

研华运动控制卡在LabVIEW中的应用

在现代工业和科研领域中,精密运动控制系统的需求日益增加。这些系统广泛应用于自动化生产线、精密机械加工、机器人控制、光学仪器调试和实验室自动化设备等诸多领域。本文以研华公司的运动控制卡为例,详细介绍其在LabVIEW中的应用,展示如何通…

初识数组(二)

目录 1. 二维数组的初始化 1) 不完全初始化 2) 完全初始化 3) 按照行初始化 4) 初始化时省略行,但是不能省略列 2.二维数组的使用 1) 二维数组的下标 2)二维数组的输入和输出 3. 二维数…

gif压缩大小但不改变画质的最佳方法,7个gif压缩免费工具别错过!

你会不会也碰到过当你需要在自媒体平台上上传gif文件时,你会发现网页端最大限制为15MB,而手机端最大限制为5MB。那么如何在不不改变画质的同时压缩gif大小呢?如今,由于其特殊的动画以及快速传输的特点,gif文件已经成为…

基于Hadoop平台的电信客服数据的处理与分析③项目开发:搭建基于Hadoop的全分布式集群---任务8:测试Hadoop集群的可用性

任务描述 测试Hadoop集群的可用性 任务指导 1. 在Web UI查看HDFS和YARN状态 2. 测试HDFS和YARN的可用性 任务实现 1. 在Web UI查看HDFS和YARN状态 在【master1】打开Web浏览器访问Hadoop其中HDFS NameNode对应的Web UI地址如下: http://master1:50070 如下…

【动态规划Ⅵ】背包问题 /// 组合问题

背包问题 什么是背包问题0-1背包问题分数背包完全背包问题重复背包问题 背包问题例题416. 分割等和子集474. 一和零 完全平方数279. 完全平方数322. 零钱兑换 排列与组合组合,无重复:518. 零钱兑换 II排列,可重复:377. 组合总和 Ⅳ…

虚拟内存【Linux】

虚拟内存 为什么需要虚拟内存Linux虚拟内存的结构32位系统下的虚拟地址空间64位系统下的虚拟地址空间页表多级页表TLB 流程虚拟内存的作用 为什么需要虚拟内存 为了在进行多进程编码进行内存访问的时候保持内存的隔离性,数据安全性,所以出现了虚拟内存。…

Spring Cloud 引入

1.单体架构: 定义:所有的功能实现都打包成一个项目 带来的后果: ①后端服务器的压力越来越大,负载越来越高,甚至出现无法访问的情况 ②业务越来越复杂,为了满足用户的需求,单体应用也会越来越…

入门PHP就来我这(高级)19 ~ 捕获sql错误

有胆量你就来跟着路老师卷起来! -- 纯干货,技术知识分享 路老师给大家分享PHP语言的知识了,旨在想让大家入门PHP,并深入了解PHP语言。 接着上篇我们来看下sql错误的捕获模式。 1 PDO中捕获SQL语句中的错误 在PDO中有3种方法可以捕…

大话光学原理:1.“实体泛光说”、反射与折射

一、实体泛光说 在古希腊,那些喜好沉思的智者们中,曾流传着一个奇妙的设想:他们认为,我们的眼睛仿佛伸出无数触手般的光线,这些光线能向四面八方延伸,紧紧抓住周围的每一个物体。于是,当我们凝视…

生成多个ssh访问不同git

如果,你的git代码仓库,比如说腾讯云coding,通过ssh秘钥访问,一直用的好好的,有一天,你又增加一个aliyun云效的代码仓库,又配置了aliyun云效的秘钥并且,根据aliyun云效的官方文档上传…

Angular进阶之九: JS code coverage是如何运作的

环境准备 需要用到的包 node 18.16.0# Javascript 代码编辑"babel/core": "^7.24.7","babel/preset-env": "^7.24.7","babel-loader": "^9.1.3",# 打包时使用的 module, 给代码中注入新的方法# http…

群晖NAS配置WebDav服务结合内网穿透实现跨平台云同步思源笔记

文章目录 前言1. 开启群晖WebDav 服务2. 本地局域网IP同步测试3. 群晖安装Cpolar4. 配置远程同步地址5. 笔记远程同步测试6. 固定公网地址7. 配置固定远程同步地址 前言 本教程主要分享如何将思源笔记、cpolar内网穿透和群晖WebDav三者相结合,实现思源笔记的云同步…