前言
redis这个目录下的所有代码就是为了一个事情,就是适配redis。
流程
redis下的基本流程
源码
在redis/client/client.go
主要是客户端处理
package clientconst (created = iotarunningclosed
)type B struct {data chan stringticker *time.Ticker
}// Client is a pipeline mode redis client
type Client struct {conn net.Conn// 等待发送pendingReqs chan *request // wait to send// 等待响应waitingReqs chan *request // waiting responseticker *time.Tickeraddr stringstatus int32working *sync.WaitGroup // its counter presents unfinished requests(pending and waiting)
}// 这个一个发送到redis的请求结构
// request is a message sends to redis server
type request struct {id uint64args [][]bytereply redis.Replyheartbeat boolwaiting *wait.Waiterr error
}const (chanSize = 256maxWait = 3 * time.Second
)// MakeClient creates a new client
func MakeClient(addr string) (*Client, error) {conn, err := net.Dial("tcp", addr)if err != nil {return nil, err}return &Client{addr: addr,conn: conn,pendingReqs: make(chan *request, chanSize),waitingReqs: make(chan *request, chanSize),working: &sync.WaitGroup{},}, nil
}// 开始启动异步程序
// Start starts asynchronous goroutines
func (client *Client) Start() {client.ticker = time.NewTicker(10 * time.Second)// 每个方法都会监听channelgo client.handleWrite()go client.handleRead()go client.heartbeat()atomic.StoreInt32(&client.status, running)
}// 异步关闭客户端
// Close stops asynchronous goroutines and close connection
func (client *Client) Close() {atomic.StoreInt32(&client.status, closed)client.ticker.Stop()// stop new requestclose(client.pendingReqs)// wait stop processclient.working.Wait()// clean_ = client.conn.Close()close(client.waitingReqs)
}// 重新连接
func (client *Client) reconnect() {logger.Info("reconnect with: " + client.addr)_ = client.conn.Close() // ignore possible errors from repeated closesvar conn net.Connfor i := 0; i < 3; i++ {var err errorconn, err = net.Dial("tcp", client.addr)if err != nil {logger.Error("reconnect error: " + err.Error())time.Sleep(time.Second)continue} else {break}}if conn == nil { // reach max retry, abortclient.Close()return}client.conn = connclose(client.waitingReqs)for req := range client.waitingReqs {req.err = errors.New("connection closed")req.waiting.Done()}client.waitingReqs = make(chan *request, chanSize)// restart handle readgo client.handleRead()
}// 监听发送心跳
func (client *Client) heartbeat() {for range client.ticker.C {client.doHeartbeat()}
}// 写入监听
func (client *Client) handleWrite() {for req := range client.pendingReqs {client.doRequest(req)}
}// 发送一个请求到redis服务器
// Send sends a request to redis server
func (client *Client) Send(args [][]byte) redis.Reply {if atomic.LoadInt32(&client.status) != running {return protocol.MakeErrReply("client closed")}req := &request{args: args,heartbeat: false,waiting: &wait.Wait{},}req.waiting.Add(1)client.working.Add(1)defer client.working.Done()// 放入client.pendingReqs <- reqtimeout := req.waiting.WaitWithTimeout(maxWait)if timeout {return protocol.MakeErrReply("server time out")}if req.err != nil {return protocol.MakeErrReply("request failed " + req.err.Error())}return req.reply
}// 心跳
func (client *Client) doHeartbeat() {request := &request{args: [][]byte{[]byte("PING")},heartbeat: true,waiting: &wait.Wait{},}request.waiting.Add(1)client.working.Add(1)defer client.working.Done()client.pendingReqs <- requestrequest.waiting.WaitWithTimeout(maxWait)
}func (client *Client) doRequest(req *request) {if req == nil || len(req.args) == 0 {return}// 数据转换为bytere := protocol.MakeMultiBulkReply(req.args)bytes := re.ToBytes()var err error// 三次重试for i := 0; i < 3; i++ { // only retry, waiting for handleRead_, err = client.conn.Write(bytes)if err == nil ||(!strings.Contains(err.Error(), "timeout") && // only retry timeout!strings.Contains(err.Error(), "deadline exceeded")) {break}}if err == nil {// 成功发送通知client.waitingReqs <- req} else {req.err = errreq.waiting.Done()}
}// 完成请求
func (client *Client) finishRequest(reply redis.Reply) {defer func() {if err := recover(); err != nil {debug.PrintStack()logger.Error(err)}}()request := <-client.waitingReqsif request == nil {return}request.reply = replyif request.waiting != nil {request.waiting.Done()}
}// 处理响应数据
func (client *Client) handleRead() {// 数据转义ch := parser.ParseStream(client.conn)for payload := range ch {// 检查消息体有没有错误if payload.Err != nil {status := atomic.LoadInt32(&client.status)if status == closed {return}client.reconnect()return}client.finishRequest(payload.Data)}
}
在redis/conn/conn.go
TCP连接方法管理
import ("net""sync""time""github.com/hdt3213/godis/lib/logger""github.com/hdt3213/godis/lib/sync/wait"
)const (// flagSlave means this a connection with slaveflagSlave = uint64(1 << iota)// flagSlave means this a connection with masterflagMaster// flagMulti means this connection is within a transactionflagMulti
)// Connection represents a connection with a redis-cli
type Connection struct {conn net.Conn// wait until finish sending data, used for graceful shutdownsendingData wait.Wait// lock while server sending responsemu sync.Mutexflags uint64// subscribing channelssubs map[string]bool// password may be changed by CONFIG command during runtime, so store the passwordpassword string// queued commands for `multi`queue [][][]bytewatching map[string]uint32txErrors []error// selected dbselectedDB int
}// 连接池
var connPool = sync.Pool{New: func() interface{} {return &Connection{}},
}// 返回远程地址
// RemoteAddr returns the remote network address
func (c *Connection) RemoteAddr() string {return c.conn.RemoteAddr().String()
}// Close disconnect with the client
func (c *Connection) Close() error {c.sendingData.WaitWithTimeout(10 * time.Second)_ = c.conn.Close()c.subs = nilc.password = ""c.queue = nilc.watching = nilc.txErrors = nilc.selectedDB = 0connPool.Put(c)return nil
}// 创建一个连接实例
// NewConn creates Connection instance
func NewConn(conn net.Conn) *Connection {// 从线程池去c, ok := connPool.Get().(*Connection)if !ok {logger.Error("connection pool make wrong type")return &Connection{conn: conn,}}c.conn = connreturn c
}// Write sends response to client over tcp connection
func (c *Connection) Write(b []byte) (int, error) {if len(b) == 0 {return 0, nil}c.sendingData.Add(1)defer func() {c.sendingData.Done()}()return c.conn.Write(b)
}// 获取连接名称
func (c *Connection) Name() string {if c.conn != nil {return c.conn.RemoteAddr().String()}return ""
}// 订阅放入map
// Subscribe add current connection into subscribers of the given channel
func (c *Connection) Subscribe(channel string) {c.mu.Lock()defer c.mu.Unlock()if c.subs == nil {c.subs = make(map[string]bool)}c.subs[channel] = true
}// 订阅删除
// UnSubscribe removes current connection into subscribers of the given channel
func (c *Connection) UnSubscribe(channel string) {c.mu.Lock()defer c.mu.Unlock()if len(c.subs) == 0 {return}delete(c.subs, channel)
}// 获取订阅集合长度
// SubsCount returns the number of subscribing channels
func (c *Connection) SubsCount() int {return len(c.subs)
}// GetChannels returns all subscribing channels
func (c *Connection) GetChannels() []string {if c.subs == nil {return make([]string, 0)}channels := make([]string, len(c.subs))i := 0for channel := range c.subs {channels[i] = channeli++}return channels
}// 设置密码
// SetPassword stores password for authentication
func (c *Connection) SetPassword(password string) {c.password = password
}// 获取密码
// GetPassword get password for authentication
func (c *Connection) GetPassword() string {return c.password
}// 获取可变状态
// InMultiState tells is connection in an uncommitted transaction
func (c *Connection) InMultiState() bool {return c.flags&flagMulti > 0
}// 设置可变状态
// SetMultiState sets transaction flag
func (c *Connection) SetMultiState(state bool) {if !state { // reset data when cancel multic.watching = nilc.queue = nilc.flags &= ^flagMulti // clean multi flagreturn}c.flags |= flagMulti
}// 返回当前事务的队列命令
// GetQueuedCmdLine returns queued commands of current transaction
func (c *Connection) GetQueuedCmdLine() [][][]byte {return c.queue
}// 命令加入队列
// EnqueueCmd enqueues command of current transaction
func (c *Connection) EnqueueCmd(cmdLine [][]byte) {c.queue = append(c.queue, cmdLine)
}// AddTxError stores syntax error within transaction
func (c *Connection) AddTxError(err error) {c.txErrors = append(c.txErrors, err)
}// GetTxErrors returns syntax error within transaction
func (c *Connection) GetTxErrors() []error {return c.txErrors
}// ClearQueuedCmds clears queued commands of current transaction
func (c *Connection) ClearQueuedCmds() {c.queue = nil
}// GetWatching returns watching keys and their version code when started watching
func (c *Connection) GetWatching() map[string]uint32 {if c.watching == nil {c.watching = make(map[string]uint32)}return c.watching
}// GetDBIndex returns selected db
func (c *Connection) GetDBIndex() int {return c.selectedDB
}// SelectDB selects a database
func (c *Connection) SelectDB(dbNum int) {c.selectedDB = dbNum
}func (c *Connection) SetSlave() {c.flags |= flagSlave
}func (c *Connection) IsSlave() bool {return c.flags&flagSlave > 0
}func (c *Connection) SetMaster() {c.flags |= flagMaster
}func (c *Connection) IsMaster() bool {return c.flags&flagMaster > 0
}
在redis/conn/fake.go
假连接,用于测试
在redis/parser/parser.go
用于解析客户端发来的数据
package parserimport ("bufio""bytes""errors""io""runtime/debug""strconv""strings""github.com/hdt3213/godis/interface/redis""github.com/hdt3213/godis/lib/logger""github.com/hdt3213/godis/redis/protocol"
)// 消息体结构
// Payload stores redis.Reply or error
type Payload struct {Data redis.ReplyErr error
}// 解析从io流的数据
// ParseStream reads data from io.Reader and send payloads through channel
func ParseStream(reader io.Reader) <-chan *Payload {ch := make(chan *Payload)go parse0(reader, ch)return ch
}// 解析byte
// ParseBytes reads data from []byte and return all replies
func ParseBytes(data []byte) ([]redis.Reply, error) {ch := make(chan *Payload)reader := bytes.NewReader(data)go parse0(reader, ch)var results []redis.Replyfor payload := range ch {if payload == nil {return nil, errors.New("no protocol")}if payload.Err != nil {if payload.Err == io.EOF {break}return nil, payload.Err}results = append(results, payload.Data)}return results, nil
}// 解析第一个消息体
// ParseOne reads data from []byte and return the first payload
func ParseOne(data []byte) (redis.Reply, error) {ch := make(chan *Payload)reader := bytes.NewReader(data)go parse0(reader, ch)payload := <-ch // parse0 will close the channelif payload == nil {return nil, errors.New("no protocol")}return payload.Data, payload.Err
}// 私有方法,
func parse0(rawReader io.Reader, ch chan<- *Payload) {// 最后判断有无错误,有则打印日志defer func() {if err := recover(); err != nil {logger.Error(err, string(debug.Stack()))}}()// 解析流reader := bufio.NewReader(rawReader)for {line, err := reader.ReadBytes('\n')if err != nil {// 异常处理ch <- &Payload{Err: err}close(ch)return}// 解析长度length := len(line)// 过短异常if length <= 2 || line[length-2] != '\r' {// there are some empty lines within replication traffic, ignore this error//protocolError(ch, "empty line")continue}line = bytes.TrimSuffix(line, []byte{'\r', '\n'})// 根据不同的字符,做不同的解析方法,ASCII判断switch line[0] {case '+':content := string(line[1:])ch <- &Payload{Data: protocol.MakeStatusReply(content),}if strings.HasPrefix(content, "FULLRESYNC") {err = parseRDBBulkString(reader, ch)if err != nil {ch <- &Payload{Err: err}close(ch)return}}case '-':ch <- &Payload{Data: protocol.MakeErrReply(string(line[1:])),}case ':':value, err := strconv.ParseInt(string(line[1:]), 10, 64)if err != nil {protocolError(ch, "illegal number "+string(line[1:]))continue}ch <- &Payload{Data: protocol.MakeIntReply(value),}case '$':err = parseBulkString(line, reader, ch)if err != nil {ch <- &Payload{Err: err}close(ch)return}case '*':err = parseArray(line, reader, ch)if err != nil {ch <- &Payload{Err: err}close(ch)return}default:args := bytes.Split(line, []byte{' '})ch <- &Payload{Data: protocol.MakeMultiBulkReply(args),}}}
}// 解析字符串
func parseBulkString(header []byte, reader *bufio.Reader, ch chan<- *Payload) error {strLen, err := strconv.ParseInt(string(header[1:]), 10, 64)if err != nil || strLen < -1 {protocolError(ch, "illegal bulk string header: "+string(header))return nil} else if strLen == -1 {ch <- &Payload{Data: protocol.MakeNullBulkReply(),}return nil}body := make([]byte, strLen+2)_, err = io.ReadFull(reader, body)if err != nil {return err}ch <- &Payload{Data: protocol.MakeBulkReply(body[:len(body)-2]),}return nil
}// RDB和后续AOF之间没有CRLF,因此需要区别对待
// there is no CRLF between RDB and following AOF, therefore it needs to be treated differently
func parseRDBBulkString(reader *bufio.Reader, ch chan<- *Payload) error {header, err := reader.ReadBytes('\n')header = bytes.TrimSuffix(header, []byte{'\r', '\n'})if len(header) == 0 {return errors.New("empty header")}strLen, err := strconv.ParseInt(string(header[1:]), 10, 64)if err != nil || strLen <= 0 {return errors.New("illegal bulk header: " + string(header))}body := make([]byte, strLen)_, err = io.ReadFull(reader, body)if err != nil {return err}ch <- &Payload{Data: protocol.MakeBulkReply(body[:len(body)]),}return nil
}func parseArray(header []byte, reader *bufio.Reader, ch chan<- *Payload) error {nStrs, err := strconv.ParseInt(string(header[1:]), 10, 64)// nStrs > 0为合法if err != nil || nStrs < 0 {protocolError(ch, "illegal array header "+string(header[1:]))return nil} else if nStrs == 0 {ch <- &Payload{Data: protocol.MakeEmptyMultiBulkReply(),}return nil}// 消息合法判断lines := make([][]byte, 0, nStrs)for i := int64(0); i < nStrs; i++ {var line []byteline, err = reader.ReadBytes('\n')if err != nil {return err}length := len(line)if length < 4 || line[length-2] != '\r' || line[0] != '$' {protocolError(ch, "illegal bulk string header "+string(line))break}strLen, err := strconv.ParseInt(string(line[1:length-2]), 10, 64)if err != nil || strLen < -1 {protocolError(ch, "illegal bulk string length "+string(line))break} else if strLen == -1 {lines = append(lines, []byte{})} else {body := make([]byte, strLen+2)_, err := io.ReadFull(reader, body)if err != nil {return err}lines = append(lines, body[:len(body)-2])}}// 合法消息装入通道ch <- &Payload{Data: protocol.MakeMultiBulkReply(lines),}return nil
}func protocolError(ch chan<- *Payload, msg string) {err := errors.New("protocol error: " + msg)ch <- &Payload{Err: err}
}
在redis/protocol/asserts/asserts.go
用于测试检查
在redis/protocol/consts.go
定义的一些常量
在redis/protocol/errors.go
定义的一些错误
在redis/protocol/reply.go
协议消息返回
在redis/server/server.go
TCP服务接收到连接后,异步拉起服务,用于客户端的消息处理