前言
Godis 是一个用 Go 语言实现的 Redis 服务器。
地址:https://github.com/HDT3213/godis?tab=readme-ov-file
简单架构描述
godis是一个中心服务,是TCP服务。流程大概是:godis开启服务,客户端通过TCP建立连接。客户端发起命令,如’keys k’查询k是否存在,服务端接收到这个命令,从而做出业务逻辑返回结果给客户端。
所以第一步就来看看godis的TCP服务的源码实现
源码目录结构
.
├── LICENSE
├── README.md
├── README_CN.md
├── aof
├── appendonly.aof
├── build-all.sh
├── build-darwin.sh
├── build-linux.sh
├── cluster
├── commands.md
├── config
├── database
├── datastruct
├── go.mod
├── go.sum
├── interface
├── lib
├── logs
├── main.go
├── node1.conf
├── node2.conf
├── node3.conf
├── pubsub
├── redis
├── redis.conf
├── tcp
├── test.rdb
└── tmp
源码
主方法启动
func main() {// 打印标志print(banner)// 日志配置logger.Setup(&logger.Settings{Path: "logs",Name: "godis",Ext: "log",TimeFormat: "2006-01-02",})// 配置文件configFilename := os.Getenv("CONFIG")// 获取redis.conf,如果没有获取到,则用默认参数if configFilename == "" {if fileExists("redis.conf") {config.SetupConfig("redis.conf")} else {config.Properties = defaultProperties}} else {config.SetupConfig(configFilename)}// 主程序启动,开启TCP服务err := tcp.ListenAndServeWithSignal(&tcp.Config{Address: fmt.Sprintf("%s:%d", config.Properties.Bind, config.Properties.Port),}, RedisServer.MakeHandler())if err != nil {logger.Error(err)}
}
TCP服务
// Config stores tcp server properties
type Config struct {Address string `yaml:"address"`MaxConnect uint32 `yaml:"max-connect"`Timeout time.Duration `yaml:"timeout"`
}// ClientCounter Record the number of clients in the current Godis server
var ClientCounter int32// ListenAndServeWithSignal binds port and handle requests, blocking until receive stop signal
func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {// 创建两个通道closeChan := make(chan struct{})sigCh := make(chan os.Signal)signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)// 异步执行一个内置方法go func() {sig := <-sigChswitch sig {case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:closeChan <- struct{}{}}}()// 监听服务listener, err := net.Listen("tcp", cfg.Address)if err != nil {return err}//cfg.Address = listener.Addr().String()logger.Info(fmt.Sprintf("bind: %s, start listening...", cfg.Address))// 监听和服务ListenAndServe(listener, handler, closeChan)return nil
}// ListenAndServe binds port and handle requests, blocking until close
func ListenAndServe(listener net.Listener, handler tcp.Handler, closeChan <-chan struct{}) {// listen signalerrCh := make(chan error, 1)defer close(errCh)// 异步监听服务关闭信号go func() {// select 监听通道信息select {case <-closeChan:logger.Info("get exit signal")case er := <-errCh:logger.Info(fmt.Sprintf("accept error: %s", er.Error()))}logger.Info("shutting down...")// 关闭监听_ = listener.Close() // listener.Accept() will return err immediately// 关闭连接_ = handler.Close() // close connections}()// 创建一个上下文ctx := context.Background()var waitDone sync.WaitGroupfor {// 监听阻塞conn, err := listener.Accept()if err != nil {// learn from net/http/serve.go#Serve()if ne, ok := err.(net.Error); ok && ne.Timeout() {logger.Infof("accept occurs temporary error: %v, retry in 5ms", err)time.Sleep(5 * time.Millisecond)continue}// 错误信息传递errCh <- errbreak}// handlelogger.Info("accept link")// 统计客户端连接数ClientCounter++// 并发等待waitDone.Add(1)// 建立异步方法go func() {// 方法处理结束后,关闭defer func() {logger.Info("done...")waitDone.Done()// 建立连接后数量-1atomic.AddInt32(&ClientCounter, -1)}()logger.Info("handle data")// 服务端处理,上下文传递handler.Handle(ctx, conn)}()}waitDone.Wait()
}
TCP消息处理
package server/** A tcp.Handler implements redis protocol*/import ("context""io""net""strings""sync""github.com/hdt3213/godis/cluster""github.com/hdt3213/godis/config"database2 "github.com/hdt3213/godis/database""github.com/hdt3213/godis/interface/database""github.com/hdt3213/godis/lib/logger""github.com/hdt3213/godis/lib/sync/atomic""github.com/hdt3213/godis/redis/connection""github.com/hdt3213/godis/redis/parser""github.com/hdt3213/godis/redis/protocol"
)var (unknownErrReplyBytes = []byte("-ERR unknown\r\n")
)// 处理结构体
// Handler implements tcp.Handler and serves as a redis server
type Handler struct {// 活跃连接, 并发安全mapactiveConn sync.Map // *client -> placeholder// 存储引擎db database.DBclosing atomic.Boolean // refusing new client and new request
}// 创建一个处理实例
// MakeHandler creates a Handler instance
func MakeHandler() *Handler {// redis的一个存储引擎var db database.DB// 创建是集群还是单例if config.Properties.ClusterEnable {db = cluster.MakeCluster()} else {db = database2.NewStandaloneServer()}return &Handler{db: db,}
}// 客户端关闭连接处理
func (h *Handler) closeClient(client *connection.Connection) {_ = client.Close()h.db.AfterClientClose(client)h.activeConn.Delete(client)
}// 处理接收到客户端的命令
// Handle receives and executes redis commands
func (h *Handler) Handle(ctx context.Context, conn net.Conn) {if h.closing.Get() {// closing handler refuse new connection_ = conn.Close()return}client := connection.NewConn(conn)// 存储一个客户端h.activeConn.Store(client, struct{}{})// 获取字符串ch := parser.ParseStream(conn)// 接收客户端数据for payload := range ch {// 遍历消息体if payload.Err != nil {// 错误信息处理,关闭连接if payload.Err == io.EOF ||payload.Err == io.ErrUnexpectedEOF ||strings.Contains(payload.Err.Error(), "use of closed network connection") {// connection closedh.closeClient(client)logger.Info("connection closed: " + client.RemoteAddr())return}// 协议错误处理,关闭连接// protocol errerrReply := protocol.MakeErrReply(payload.Err.Error())_, err := client.Write(errReply.ToBytes())if err != nil {h.closeClient(client)logger.Info("connection closed: " + client.RemoteAddr())return}continue}// 信息不可为空if payload.Data == nil {logger.Error("empty payload")continue}// 获取到客户端信息r, ok := payload.Data.(*protocol.MultiBulkReply)if !ok {logger.Error("require multi bulk protocol")continue}// 执行结果result := h.db.Exec(client, r.Args)// 结果回复if result != nil {_, _ = client.Write(result.ToBytes())} else {_, _ = client.Write(unknownErrReplyBytes)}}
}// Close stops handler
func (h *Handler) Close() error {logger.Info("handler shutting down...")h.closing.Set(true)// TODO: concurrent waith.activeConn.Range(func(key interface{}, val interface{}) bool {client := key.(*connection.Connection)_ = client.Close()return true})h.db.Close()return nil
}