godis源码分析——Redis协议解析器

前言

redis这个目录下的所有代码就是为了一个事情,就是适配redis。

流程

redis下的基本流程

在这里插入图片描述

源码

在redis/client/client.go

主要是客户端处理

package clientconst (created = iotarunningclosed
)type B struct {data   chan stringticker *time.Ticker
}// Client is a pipeline mode redis client
type Client struct {conn net.Conn// 等待发送pendingReqs chan *request // wait to send// 等待响应waitingReqs chan *request // waiting responseticker      *time.Tickeraddr        stringstatus  int32working *sync.WaitGroup // its counter presents unfinished requests(pending and waiting)
}// 这个一个发送到redis的请求结构
// request is a message sends to redis server
type request struct {id        uint64args      [][]bytereply     redis.Replyheartbeat boolwaiting   *wait.Waiterr       error
}const (chanSize = 256maxWait  = 3 * time.Second
)// MakeClient creates a new client
func MakeClient(addr string) (*Client, error) {conn, err := net.Dial("tcp", addr)if err != nil {return nil, err}return &Client{addr:        addr,conn:        conn,pendingReqs: make(chan *request, chanSize),waitingReqs: make(chan *request, chanSize),working:     &sync.WaitGroup{},}, nil
}// 开始启动异步程序
// Start starts asynchronous goroutines
func (client *Client) Start() {client.ticker = time.NewTicker(10 * time.Second)// 每个方法都会监听channelgo client.handleWrite()go client.handleRead()go client.heartbeat()atomic.StoreInt32(&client.status, running)
}// 异步关闭客户端
// Close stops asynchronous goroutines and close connection
func (client *Client) Close() {atomic.StoreInt32(&client.status, closed)client.ticker.Stop()// stop new requestclose(client.pendingReqs)// wait stop processclient.working.Wait()// clean_ = client.conn.Close()close(client.waitingReqs)
}// 重新连接
func (client *Client) reconnect() {logger.Info("reconnect with: " + client.addr)_ = client.conn.Close() // ignore possible errors from repeated closesvar conn net.Connfor i := 0; i < 3; i++ {var err errorconn, err = net.Dial("tcp", client.addr)if err != nil {logger.Error("reconnect error: " + err.Error())time.Sleep(time.Second)continue} else {break}}if conn == nil { // reach max retry, abortclient.Close()return}client.conn = connclose(client.waitingReqs)for req := range client.waitingReqs {req.err = errors.New("connection closed")req.waiting.Done()}client.waitingReqs = make(chan *request, chanSize)// restart handle readgo client.handleRead()
}// 监听发送心跳
func (client *Client) heartbeat() {for range client.ticker.C {client.doHeartbeat()}
}// 写入监听
func (client *Client) handleWrite() {for req := range client.pendingReqs {client.doRequest(req)}
}// 发送一个请求到redis服务器
// Send sends a request to redis server
func (client *Client) Send(args [][]byte) redis.Reply {if atomic.LoadInt32(&client.status) != running {return protocol.MakeErrReply("client closed")}req := &request{args:      args,heartbeat: false,waiting:   &wait.Wait{},}req.waiting.Add(1)client.working.Add(1)defer client.working.Done()// 放入client.pendingReqs <- reqtimeout := req.waiting.WaitWithTimeout(maxWait)if timeout {return protocol.MakeErrReply("server time out")}if req.err != nil {return protocol.MakeErrReply("request failed " + req.err.Error())}return req.reply
}// 心跳
func (client *Client) doHeartbeat() {request := &request{args:      [][]byte{[]byte("PING")},heartbeat: true,waiting:   &wait.Wait{},}request.waiting.Add(1)client.working.Add(1)defer client.working.Done()client.pendingReqs <- requestrequest.waiting.WaitWithTimeout(maxWait)
}func (client *Client) doRequest(req *request) {if req == nil || len(req.args) == 0 {return}// 数据转换为bytere := protocol.MakeMultiBulkReply(req.args)bytes := re.ToBytes()var err error// 三次重试for i := 0; i < 3; i++ { // only retry, waiting for handleRead_, err = client.conn.Write(bytes)if err == nil ||(!strings.Contains(err.Error(), "timeout") && // only retry timeout!strings.Contains(err.Error(), "deadline exceeded")) {break}}if err == nil {// 成功发送通知client.waitingReqs <- req} else {req.err = errreq.waiting.Done()}
}// 完成请求
func (client *Client) finishRequest(reply redis.Reply) {defer func() {if err := recover(); err != nil {debug.PrintStack()logger.Error(err)}}()request := <-client.waitingReqsif request == nil {return}request.reply = replyif request.waiting != nil {request.waiting.Done()}
}// 处理响应数据
func (client *Client) handleRead() {// 数据转义ch := parser.ParseStream(client.conn)for payload := range ch {// 检查消息体有没有错误if payload.Err != nil {status := atomic.LoadInt32(&client.status)if status == closed {return}client.reconnect()return}client.finishRequest(payload.Data)}
}

在redis/conn/conn.go

TCP连接方法管理

import ("net""sync""time""github.com/hdt3213/godis/lib/logger""github.com/hdt3213/godis/lib/sync/wait"
)const (// flagSlave means this a connection with slaveflagSlave = uint64(1 << iota)// flagSlave means this a connection with masterflagMaster// flagMulti means this connection is within a transactionflagMulti
)// Connection represents a connection with a redis-cli
type Connection struct {conn net.Conn// wait until finish sending data, used for graceful shutdownsendingData wait.Wait// lock while server sending responsemu    sync.Mutexflags uint64// subscribing channelssubs map[string]bool// password may be changed by CONFIG command during runtime, so store the passwordpassword string// queued commands for `multi`queue    [][][]bytewatching map[string]uint32txErrors []error// selected dbselectedDB int
}// 连接池
var connPool = sync.Pool{New: func() interface{} {return &Connection{}},
}// 返回远程地址
// RemoteAddr returns the remote network address
func (c *Connection) RemoteAddr() string {return c.conn.RemoteAddr().String()
}// Close disconnect with the client
func (c *Connection) Close() error {c.sendingData.WaitWithTimeout(10 * time.Second)_ = c.conn.Close()c.subs = nilc.password = ""c.queue = nilc.watching = nilc.txErrors = nilc.selectedDB = 0connPool.Put(c)return nil
}// 创建一个连接实例
// NewConn creates Connection instance
func NewConn(conn net.Conn) *Connection {// 从线程池去c, ok := connPool.Get().(*Connection)if !ok {logger.Error("connection pool make wrong type")return &Connection{conn: conn,}}c.conn = connreturn c
}// Write sends response to client over tcp connection
func (c *Connection) Write(b []byte) (int, error) {if len(b) == 0 {return 0, nil}c.sendingData.Add(1)defer func() {c.sendingData.Done()}()return c.conn.Write(b)
}// 获取连接名称
func (c *Connection) Name() string {if c.conn != nil {return c.conn.RemoteAddr().String()}return ""
}// 订阅放入map
// Subscribe add current connection into subscribers of the given channel
func (c *Connection) Subscribe(channel string) {c.mu.Lock()defer c.mu.Unlock()if c.subs == nil {c.subs = make(map[string]bool)}c.subs[channel] = true
}// 订阅删除
// UnSubscribe removes current connection into subscribers of the given channel
func (c *Connection) UnSubscribe(channel string) {c.mu.Lock()defer c.mu.Unlock()if len(c.subs) == 0 {return}delete(c.subs, channel)
}// 获取订阅集合长度
// SubsCount returns the number of subscribing channels
func (c *Connection) SubsCount() int {return len(c.subs)
}// GetChannels returns all subscribing channels
func (c *Connection) GetChannels() []string {if c.subs == nil {return make([]string, 0)}channels := make([]string, len(c.subs))i := 0for channel := range c.subs {channels[i] = channeli++}return channels
}// 设置密码
// SetPassword stores password for authentication
func (c *Connection) SetPassword(password string) {c.password = password
}// 获取密码
// GetPassword get password for authentication
func (c *Connection) GetPassword() string {return c.password
}// 获取可变状态
// InMultiState tells is connection in an uncommitted transaction
func (c *Connection) InMultiState() bool {return c.flags&flagMulti > 0
}// 设置可变状态
// SetMultiState sets transaction flag
func (c *Connection) SetMultiState(state bool) {if !state { // reset data when cancel multic.watching = nilc.queue = nilc.flags &= ^flagMulti // clean multi flagreturn}c.flags |= flagMulti
}// 返回当前事务的队列命令
// GetQueuedCmdLine returns queued commands of current transaction
func (c *Connection) GetQueuedCmdLine() [][][]byte {return c.queue
}// 命令加入队列
// EnqueueCmd  enqueues command of current transaction
func (c *Connection) EnqueueCmd(cmdLine [][]byte) {c.queue = append(c.queue, cmdLine)
}// AddTxError stores syntax error within transaction
func (c *Connection) AddTxError(err error) {c.txErrors = append(c.txErrors, err)
}// GetTxErrors returns syntax error within transaction
func (c *Connection) GetTxErrors() []error {return c.txErrors
}// ClearQueuedCmds clears queued commands of current transaction
func (c *Connection) ClearQueuedCmds() {c.queue = nil
}// GetWatching returns watching keys and their version code when started watching
func (c *Connection) GetWatching() map[string]uint32 {if c.watching == nil {c.watching = make(map[string]uint32)}return c.watching
}// GetDBIndex returns selected db
func (c *Connection) GetDBIndex() int {return c.selectedDB
}// SelectDB selects a database
func (c *Connection) SelectDB(dbNum int) {c.selectedDB = dbNum
}func (c *Connection) SetSlave() {c.flags |= flagSlave
}func (c *Connection) IsSlave() bool {return c.flags&flagSlave > 0
}func (c *Connection) SetMaster() {c.flags |= flagMaster
}func (c *Connection) IsMaster() bool {return c.flags&flagMaster > 0
}

在redis/conn/fake.go

假连接,用于测试

在redis/parser/parser.go

用于解析客户端发来的数据

package parserimport ("bufio""bytes""errors""io""runtime/debug""strconv""strings""github.com/hdt3213/godis/interface/redis""github.com/hdt3213/godis/lib/logger""github.com/hdt3213/godis/redis/protocol"
)// 消息体结构
// Payload stores redis.Reply or error
type Payload struct {Data redis.ReplyErr  error
}// 解析从io流的数据
// ParseStream reads data from io.Reader and send payloads through channel
func ParseStream(reader io.Reader) <-chan *Payload {ch := make(chan *Payload)go parse0(reader, ch)return ch
}// 解析byte
// ParseBytes reads data from []byte and return all replies
func ParseBytes(data []byte) ([]redis.Reply, error) {ch := make(chan *Payload)reader := bytes.NewReader(data)go parse0(reader, ch)var results []redis.Replyfor payload := range ch {if payload == nil {return nil, errors.New("no protocol")}if payload.Err != nil {if payload.Err == io.EOF {break}return nil, payload.Err}results = append(results, payload.Data)}return results, nil
}// 解析第一个消息体
// ParseOne reads data from []byte and return the first payload
func ParseOne(data []byte) (redis.Reply, error) {ch := make(chan *Payload)reader := bytes.NewReader(data)go parse0(reader, ch)payload := <-ch // parse0 will close the channelif payload == nil {return nil, errors.New("no protocol")}return payload.Data, payload.Err
}// 私有方法,
func parse0(rawReader io.Reader, ch chan<- *Payload) {// 最后判断有无错误,有则打印日志defer func() {if err := recover(); err != nil {logger.Error(err, string(debug.Stack()))}}()// 解析流reader := bufio.NewReader(rawReader)for {line, err := reader.ReadBytes('\n')if err != nil {// 异常处理ch <- &Payload{Err: err}close(ch)return}// 解析长度length := len(line)// 过短异常if length <= 2 || line[length-2] != '\r' {// there are some empty lines within replication traffic, ignore this error//protocolError(ch, "empty line")continue}line = bytes.TrimSuffix(line, []byte{'\r', '\n'})// 根据不同的字符,做不同的解析方法,ASCII判断switch line[0] {case '+':content := string(line[1:])ch <- &Payload{Data: protocol.MakeStatusReply(content),}if strings.HasPrefix(content, "FULLRESYNC") {err = parseRDBBulkString(reader, ch)if err != nil {ch <- &Payload{Err: err}close(ch)return}}case '-':ch <- &Payload{Data: protocol.MakeErrReply(string(line[1:])),}case ':':value, err := strconv.ParseInt(string(line[1:]), 10, 64)if err != nil {protocolError(ch, "illegal number "+string(line[1:]))continue}ch <- &Payload{Data: protocol.MakeIntReply(value),}case '$':err = parseBulkString(line, reader, ch)if err != nil {ch <- &Payload{Err: err}close(ch)return}case '*':err = parseArray(line, reader, ch)if err != nil {ch <- &Payload{Err: err}close(ch)return}default:args := bytes.Split(line, []byte{' '})ch <- &Payload{Data: protocol.MakeMultiBulkReply(args),}}}
}// 解析字符串
func parseBulkString(header []byte, reader *bufio.Reader, ch chan<- *Payload) error {strLen, err := strconv.ParseInt(string(header[1:]), 10, 64)if err != nil || strLen < -1 {protocolError(ch, "illegal bulk string header: "+string(header))return nil} else if strLen == -1 {ch <- &Payload{Data: protocol.MakeNullBulkReply(),}return nil}body := make([]byte, strLen+2)_, err = io.ReadFull(reader, body)if err != nil {return err}ch <- &Payload{Data: protocol.MakeBulkReply(body[:len(body)-2]),}return nil
}// RDB和后续AOF之间没有CRLF,因此需要区别对待
// there is no CRLF between RDB and following AOF, therefore it needs to be treated differently
func parseRDBBulkString(reader *bufio.Reader, ch chan<- *Payload) error {header, err := reader.ReadBytes('\n')header = bytes.TrimSuffix(header, []byte{'\r', '\n'})if len(header) == 0 {return errors.New("empty header")}strLen, err := strconv.ParseInt(string(header[1:]), 10, 64)if err != nil || strLen <= 0 {return errors.New("illegal bulk header: " + string(header))}body := make([]byte, strLen)_, err = io.ReadFull(reader, body)if err != nil {return err}ch <- &Payload{Data: protocol.MakeBulkReply(body[:len(body)]),}return nil
}func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error {nStrs, err := strconv.ParseInt(string(header[1:]), 10, 64)// nStrs > 0为合法if err != nil || nStrs < 0 {protocolError(ch, "illegal array header "+string(header[1:]))return nil} else if nStrs == 0 {ch <- &Payload{Data: protocol.MakeEmptyMultiBulkReply(),}return nil}// 消息合法判断lines := make([][]byte, 0, nStrs)for i := int64(0); i < nStrs; i++ {var line []byteline, err = reader.ReadBytes('\n')if err != nil {return err}length := len(line)if length < 4 || line[length-2] != '\r' || line[0] != '$' {protocolError(ch, "illegal bulk string header "+string(line))break}strLen, err := strconv.ParseInt(string(line[1:length-2]), 10, 64)if err != nil || strLen < -1 {protocolError(ch, "illegal bulk string length "+string(line))break} else if strLen == -1 {lines = append(lines, []byte{})} else {body := make([]byte, strLen+2)_, err := io.ReadFull(reader, body)if err != nil {return err}lines = append(lines, body[:len(body)-2])}}// 合法消息装入通道ch <- &Payload{Data: protocol.MakeMultiBulkReply(lines),}return nil
}func protocolError(ch chan<- *Payload, msg string) {err := errors.New("protocol error: " + msg)ch <- &Payload{Err: err}
}

在redis/protocol/asserts/asserts.go

用于测试检查

在redis/protocol/consts.go

定义的一些常量

在redis/protocol/errors.go

定义的一些错误

在redis/protocol/reply.go

协议消息返回

在redis/server/server.go

TCP服务接收到连接后,异步拉起服务,用于客户端的消息处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/44893.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker安装RabbitMQ(带web管理端)

1.拉取带web管理的镜像 可以拉取rabbitmq对应版本的web管理端&#xff0c;比如&#xff1a;rabbitmq:3.9.11-management&#xff0c;也可以直接拉取带web管理端的最新版本 rabbitmq:management. docker pull rabbitmq:3.9.11-management 注意&#xff1a;如果docker pull ra…

jenkins系列-06.harbor

https://github.com/goharbor/harbor/releases?page2 https://github.com/goharbor/harbor/releases/download/v2.3.4/harbor-offline-installer-v2.3.4.tgz harbor官网&#xff1a;https://goharbor.io/ 点击 Download now 链接&#xff0c;会自动跳转到上述github页面&am…

C++ | Leetcode C++题解之第233题数字1的个数

题目&#xff1a; 题解&#xff1a; class Solution { public:int countDigitOne(int n) {// mulk 表示 10^k// 在下面的代码中&#xff0c;可以发现 k 并没有被直接使用到&#xff08;都是使用 10^k&#xff09;// 但为了让代码看起来更加直观&#xff0c;这里保留了 klong l…

Redis系列命令更新--Redis哈希命令

一、设置密码验证&#xff1a; 使用文本编辑器&#xff0c;这里使用Notepad&#xff0c;打开Redis服务配置文件。 注意&#xff1a;不要找错了&#xff0c;通常为redis.windows-service.conf&#xff0c;而不是redis.windows.conf。后者是以非系统服务方式启动程序使用的配置…

《BASeg: Boundary aware semantic segmentation for autonomous driving》论文解读

期刊&#xff1a;Neural Networks | Journal | ScienceDirect.com by Elsevier 年份&#xff1a;2023 代码&#xff1a;https://github.com/Lature-Yang/BASeg 摘要 语义分割是自动驾驶领域街道理解任务的重要组成部分。现有的各种方法要么专注于通过聚合全局或多尺度上下文…

旷野之间20 - Google 研究的推测 RAG

为什么选择 RAG 新兴能力 直到最近&#xff0c;人们发现 LLM 具有新兴能力&#xff0c;即在与用户或任务交互过程中出现的意外功能。 这些功能的示例包括&#xff1a; 解决问题&#xff1a; LLM 可以利用其语言理解和推理能力&#xff0c;为未经过明确培训的任务提供富有洞…

python的字符串

字符串 简单操作 创建 利用 ‘ ’ 或 “ ” 将字符或数字包裹起来的都为字符串 a"你好" 格式化字符串 元组的字符格式化 字符串格式化函数 srt.format() f格式化 方法 split()//指定分割符经行分割 strip()//指定移除字符头尾的字符 join()//指定序列中的字符连接成新…

5、 测试

这里写目录标题 1、自动化测试简介&#xff08;1&#xff09;自动化测试是什么&#xff08;2&#xff09;为什么要写测试测试节约你的时间发现错误&#xff0c;预防错误测试使得代码更有吸引力 2、基础测试策略3、开始写第一个测试&#xff08;1&#xff09;首先得有个bug&…

Not Invented Here 不是在这里发明的 / Proudly found elsewhere 自豪地在其他地方找到

注&#xff1a; 机翻&#xff0c;未校对。 两篇关于创新管理的小文章 Not Invented Here 不是在这里发明的 In the history of organizational success, the enterprises that dominate tend to flawlessly execute on ideas that were created elsewhere. Examine just abo…

智慧水利解决方案:从理论到实践的全面跨越,展示其在水资源管理、水灾害预警、水生态保护等方面的创新应用

目录 一、引言&#xff1a;智慧水利的时代背景与意义 二、智慧水利的理论框架与技术体系 1、理论框架 2、技术体系 三、智慧水利在水资源管理中的应用 1、水资源优化配置 2、水量水质协同管理 四、智慧水利在水灾害预警中的应用 1、洪水预警与应急响应 2、干旱监测与评…

Mediapipe-姿态估计实例

Mediapipe简介 Mediapipe 是由 Google Research 开发的一款开源框架&#xff0c;旨在帮助开发者轻松地构建、测试和部署复杂的多模态、多任务的机器学习模型。它特别擅长于实时处理和分析音频、视频等多媒体数据。以下是 Mediapipe 的一些关键特点和组件&#xff1a; 关键特点…

基于微信小程序的音乐播放平台

基于微信小程序的音乐播放平台 音乐播放小程序项目简介技术栈功能模块项目流程系统E-R图项目页面 音乐播放小程序 项目简介 微信音乐小程序旨在提供一个简洁高效的音乐播放平台&#xff0c;用户可以方便地搜索、播放和收藏自己喜欢的音乐。整个项目采用前后端分离的架构&…

WIN10开机突然,过一会就自动重启蓝屏DRIVER_IRQL_NOT_LESS_OR_EQUAL

环境&#xff1a; Win10 专业版 DELL7080 问题描述&#xff1a; WIN10开机突然&#xff0c;过一会就自动重启蓝屏DRIVER_IRQL_NOT_LESS_OR_EQUAL 事件日志 解决方案&#xff1a; 1.找到MEMORY.DMP文件内容&#xff0c;分析一下 Microsoft (R) Windows Debugger Version 10…

主机安全-开源HIDS字节跳动Elkeid安装使用

目录 概述什么是HIDSHIDS与NIDS的区别EDR、XDR是啥&#xff1f; Elkeid架构Elkeid Agent && Agent centerElkeid DriverElkeid RASPElkeid HUBService DiscoveryManager安装数据采集规则&告警 参考 概述 什么是HIDS HIDS&#xff08; host-based intrusion detec…

使用Gitee仓库镜像管理功能实现Gitee与Github 双向同步

进入你所需要同步的仓库&#xff0c;点击「管理」->「镜像仓库管理」&#xff0c;点击「添加镜像」选项&#xff1b; 如果你的Gitee账号还没有绑定过 GitHub 帐号&#xff0c;先根据弹窗的提示绑定 GitHub 帐号&#xff1b; 添加镜像时候&#xff0c;在「镜像方向」中选择…

二次开发源码 借贷系统uniapp/借贷认证系统/小额信贷系统/工薪贷APP/资金贷系统h5

前端&#xff1a;UNIAPP 后端&#xff1a;ThinkPHP 数据库&#xff1a; Mysql 前端使用的uniapp 可以打包APP H5 小程序 系统提供了完善的网络借贷体系&#xff0c;为金融中介平台提供从获客到贷后管理全流程服务&#xff0c;解决了借贷手续繁琐、流程缓慢等问题 此源码为运营…

管理Linux本地用户和组

什么是用户 用户账户在可以运行命令的不同人员和程序之间提供安全界限。 在Linux系统中&#xff0c;系统通过分配唯一的标识号&#xff08;用户ID或UID&#xff09;来区分不同的用户帐户。 在Linux系统中&#xff0c;用户帐户有以下三种主要类型&#xff1a; 超级用户 负责…

分布式一致性算法:Raft学习

分布式一致性算法&#xff1a;Raft学习 1 什么是分布式系统&#xff1f; 分布式系统是由一组通过网络进行通信、为了完成共同的任务而协调工作的计算机节点组成的系统。这些节点可能位于不同的物理位置&#xff0c;但它们协同工作以提供一个统一的计算平台或服务。分布式系统…

Unity中一键生成具有身体感知的虚拟人物动作

在虚拟现实(VR)和增强现实(AR)的浪潮中&#xff0c;如何让虚拟人物的动作更加自然、真实&#xff0c;已经成为一个重要课题。AI4Animation项目&#xff0c;一个由 Sebastian Starke 主导的开源框架&#xff0c;为Unity开发者提供了强大的工具集&#xff0c;以实现这一目标。本文…

OrangePi AIpro在安防领域的深思和实战(旷视科技CNN模型ShuffleNetV1开发案例测试)

一、前言 公司最近有个项目是安防领域的&#xff0c;主要用在边缘结点&#xff0c;虽然已做成形&#xff0c;但是还是存在一些缺陷&#xff0c;例如&#xff1a;算力问题&#xff0c;开发板的成熟问题&#xff0c;已经各种技术的解决方案落地问题。目前我们集成了很多功能&…