目录
项目介绍
项目分析
项目分层
初始化
首页功能
获取首页
注册
进入聊天界面
用户模块
创建用户
删除用户
修改用户信息
查找用户
通过名字和密码查找用户
获取用户列表
好友模块
获取所有好友
添加好友
发送消息
ws升级和订阅redis管道接收消息
用户发送消息
上传文件
群聊功能
创建群聊
加入群聊
加载用户群关系
消息缓存
总结
项目介绍
基于Gin框架的IM即时通讯小demo,实现了用户注册,添加好友,创建和加入群聊;好友聊天,群聊天;可以自定义个人信息,群信息;在聊天中可以发送文字、图片、表情、语音。
项目地址:knoci/GinChat (github.com)
项目分析
项目分层
- config:配置文件
- asset:前端静态资源,上传的图片存储在其路径下的upload文件夹
- docs:swagger
- utils:一些辅助功能实现,加密、回复、初始化等
- views:视图层,html模板
- router:路由层
- service:服务层,具体功能的实现
- models:模型层,表结构及其相关函数
初始化
mian函数如下
func main() {utils.InitConfig()utils.InitMySQL()utils.InitRedis()InitTimer()utils.DB.AutoMigrate(models.UserBasic{})utils.DB.AutoMigrate(models.Message{})utils.DB.AutoMigrate(models.Contact{})utils.DB.AutoMigrate(models.Community{})r := router.Router()r.Run(viper.GetString("port.server"))
}// 初始化定时器
func InitTimer() {utils.Timer(time.Duration(viper.GetInt("timeout.DelayHeartbeat"))*time.Second, time.Duration(viper.GetInt("timeout.HeartbeatHz"))*time.Second, models.CleanConnection, "")
}
utils的Init类函数,负责读取配置,初始化数据库。其中使用了viper库来读取configpath下名为app的配置文件。Mysql使用GOEM初始化,使用log库的logger输出日志。Redis则使用go-redis库初始化。同时定义了全局数据库DB,在main中使用AutoMigrate自动建表。
var (DB *gorm.DBRed *redis.Client
)func InitConfig() {viper.SetConfigName("app")viper.AddConfigPath("config")err := viper.ReadInConfig()if err != nil {fmt.Println(err)}
}func InitMySQL() {//自定义日志模板 打印SQL语句newLogger := logger.New(log.New(os.Stdout, "\r\n", log.LstdFlags),logger.Config{SlowThreshold: time.Second, //慢SQL阈值LogLevel: logger.Info, //级别Colorful: true, //彩色},)DB, _ = gorm.Open(mysql.Open(viper.GetString("mysql.dns")),&gorm.Config{Logger: newLogger})fmt.Println(" MySQL inited 。。。。")
}func InitRedis() {Red = redis.NewClient(&redis.Options{Addr: viper.GetString("redis.addr"),Password: viper.GetString("redis.password"),DB: viper.GetInt("redis.DB"),PoolSize: viper.GetInt("redis.poolSize"), //连接池中最大连接数MinIdleConns: viper.GetInt("redis.minIdleConn"),})
}
随后在调用router层的Router()函数,这里我们下面再分析。在InitTimer()中使用到了utils.Timer,它是一个定时器,用于在给定的延迟(delay
)后开始,然后每隔 tick
时间间隔执行一次传入的函数 fun
,直到 fun
返回 false
为止。
type TimerFunc func(interface{}) bool
/*
delay 首次延迟
tick 间隔
fun 定时执行的方法
param 方法的参数
*/
func Timer(delay, tick time.Duration, fun TimerFunc, param interface{}) {go func() {if fun == nil {return}t := time.NewTimer(delay)for {select {case <-t.C:if fun(param) == false {return}t.Reset(tick)}}}()
}
传入的函数 fun
,是models/message.go里的CleanConnection函数,通过从客户端Map里拿出之前记录的心跳时间,通过比较 上次心跳时间+最大心跳间隔 和 现在时间 来判断连接是否超时,然后返回bool值给utils.Timer。
// 清理超时连接
func CleanConnection(param interface{}) (result bool) {result = truedefer func() {if r := recover(); r != nil {fmt.Println("cleanConnection err", r)}}()currentTime := uint64(time.Now().Unix())for i := range clientMap {node := clientMap[i]if node.IsHeartbeatTimeOut(currentTime) {fmt.Println("心跳超时..... 关闭连接:", node)node.Conn.Close()}}return result
}// 用户心跳是否超时
func (node *Node) IsHeartbeatTimeOut(currentTime uint64) (timeout bool) {if node.HeartbeatTime+viper.GetUint64("timeout.HeartbeatMaxTime") <= currentTime {fmt.Println("心跳超时。。。自动下线", node)timeout = true}return
}
配置文件app.yml如下:
mysql:dns: root:123@tcp(127.0.0.1:3306)/ginchat?charset=utf8mb4&parseTime=True&loc=Local
redis:addr: "127.0.0.1:6379"password: ""DB: 0poolSize: 10minIdleConn: 30
timeout:DelayHeartbeat: 3 #延迟心跳时间 单位秒HeartbeatHz: 30 #每隔多少秒心跳时间HeartbeatMaxTime: 80000 #最大心跳时间 ,超过此就下线RedisOnlineTime: 4 #缓存的在线用户时长 单位H
port:server: ":8848"udp: 3001
首页功能
Router()中首先注册了默认路由,设置swagger,引入静态资源。
r := gin.Default()// 注册swagger api相关路由docs.SwaggerInfo.BasePath = ""r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))// 静态资源r.Static("/asset", "asset/")r.LoadHTMLGlob("views/**/*")r.StaticFile("/favicon.ico", "asset/images/favicon.ico")
首页功能的router是这样的,全部都是GET请求,具体实现在service目录下的index.go。
// 首页r.GET("/", service.GetIndex)r.GET("/index", service.GetIndex)r.GET("/toRegister", service.ToRegister)r.GET("/toChat", service.ToChat)r.GET("/chat", service.Chat)
获取首页
首先我们看 "/" 和 "/index" 都是GetIndex()函数,函数解析两个 HTML 模板文件:index.html
和 views/chat/head.html
。这是通过 template.ParseFiles
函数完成的,该函数返回一个 Template
对象(这里命名为 ind
),接下来使用 ind.Execute
方法执行模板渲染,ind.Execute
方法会将模板 ind
与数据 "index"
结合,生成最终的 HTML 内容,并写入到响应流中,客户端浏览器将会收到并显示这个 HTML 页面。
在首页的前端代码中,虽然我们这里不对前端进行分析,但是实际上的登录是POST 了 "user/login" ,具体调用了 FindUserByNameAndPwd() 函数,这个函数的具体功能,我们在用户模块中再分析。
func GetIndex(c *gin.Context) {ind, err := template.ParseFiles("index.html", "views/chat/head.html")if err != nil {panic(err)}err = ind.Execute(c.Writer, "index")if err != nil {panic(err)}
}
注册
"/toRegister" 是用户注册的功能,在ToRegister()函数中,渲染前端的 view/user/register.html 。前端中注册账户 POST 了 "user/createUser" 路由。
func ToRegister(c *gin.Context) {ind, err := template.ParseFiles("views/user/register.html")if err != nil {panic(err)}err = ind.Execute(c.Writer, "register")if err != nil {panic(err)}
}
进入聊天界面
"/toChat"实际上我们的的聊天主界面,ToChat()函数实现的是一个跳转,在函数中渲染了聊天界面需要的一系列模板,请求了userId和token来验证身份,把信息存入UserBasic结构体,传给前端。
UserBasic结构体在models的user_basic中定义,存储用户的信息。
/*
UserBasic在models的user_basic中定义
*/
type UserBasic struct {gorm.ModelName stringPassword stringPhone string `valid:"matches(^1[3-9]{1}\\d{9}$)"`Email string `valid:"email"`Avatar string //头像Identity stringClientIp stringClientPort stringSalt stringLoginTime time.TimeHeartbeatTime time.TimeLoginOutTime time.Time `gorm:"column:login_out_time" json:"login_out_time"`IsLogout boolDeviceInfo string
}func ToChat(c *gin.Context) {ind, err := template.ParseFiles("views/chat/index.html","views/chat/head.html","views/chat/foot.html","views/chat/tabmenu.html","views/chat/concat.html","views/chat/group.html","views/chat/profile.html","views/chat/createcom.html","views/chat/userinfo.html","views/chat/main.html")if err != nil {panic(err)}userId, _ := strconv.Atoi(c.Query("userId"))token := c.Query("token")user := models.UserBasic{}user.ID = uint(userId)user.Identity = tokenerr = ind.Execute(c.Writer, user)if err != nil {panic(err)}
}
最后"/chat" 的 service.Chat() 函数,会运行models/message.go里的 Chat() 函数,这个函数是实现发送消息功能的关键,我们放到发送消息功能再分析吧。
func Chat(c *gin.Context) {models.Chat(c.Writer, c.Request)
}
用户模块
用户模块负责的是用户的增删改查,主要是数据库相关的操作。
// 用户模块r.POST("/user/getUserList", service.GetUserList)r.POST("/user/findUserByNameAndPwd", service.FindUserByNameAndPwd)r.POST("/user/createUser", service.CreateUser)r.POST("/user/deleteUser", service.DeleteUser)r.POST("/user/updateUser", service.UpdateUser)r.POST("/user/find", service.FindByID)
创建用户
我们先来看 "user/createUser" 创建用户,这是个POST请求调用了 userservice.go 的CreateUser() 函数。函数初始化一个UserBasic结构体,从页面请求name,password,Identity字段。
func CreateUser(c *gin.Context) {user := models.UserBasic{}user.Name = c.Request.FormValue("name")password := c.Request.FormValue("password")repassword := c.Request.FormValue("Identity")fmt.Println(user.Name, password, repassword)// 用户名判断data := models.FindUserByName(user.Name)if user.Name == "" || password == "" || repassword == "" {c.JSON(http.StatusBadRequest, gin.H{"code": -1,"message": "用户名或密码不能为空!","data": user,})return}if data.Name != "" {c.JSON(http.StatusBadRequest, gin.H{"code": -1,"message": "该用户名已存在","data": user,})return}// 密码判断if password != repassword {c.JSON(http.StatusBadRequest, gin.H{"code": -1,"message": "两次输入的密码不一致","data": user,})return}user.LoginTime = time.Now()user.HeartbeatTime = time.Now()user.LoginOutTime = time.Now()// 密码加密salt := fmt.Sprintf("%06d", rand.Int31())user.Salt = saltuser.Password = utils.MakePassword(password, salt)models.CreateUser(user)c.JSON(http.StatusOK, gin.H{"code": 0,"message": "创建用户成功!","data": user,})
}
之后利用名字,通过 models/user_basic.go 的FindUserByName 通过名字在数据库查找获得一个UserBasic对象data。
合法性验证,判断输入的用户名,密码不能为空,通过data判断用户是否已经存在,最后判断输入的两次密码是否一致。
func FindUserByName(name string) UserBasic {user := UserBasic{}utils.DB.Where("name = ?", name).First(&user)return user
}
把user时间相关数据赋值,之后就是密码的加密,目的是在数据库中不存入明文密码,保证用户账户安全。用rand生成一个随机的salt,然后传入 utils.MakePassword() 进行MD5加密,把加密后结果赋值给user的password。
// 小写,计算字符串的 MD5 哈希值
func Md5Encode(data string) string {// 创建了一个新的 MD5 哈希计算器h := md5.New()// 将输入的字符串 data 转换为字节切片([]byte(data)),然后写入到 MD5 哈希计算器 h 中h.Write([]byte(data))// h.Sum 方法调用会完成哈希计算,并将结果返回为一个字节切片。nil 参数表示没有额外的字节切片需要追加到哈希计算中tempStr := h.Sum(nil)// 将上一步得到的字节切片 tempStr 转换为十六进制字符串return hex.EncodeToString(tempStr)
}// 大写
func MD5Encode(data string) string {return strings.ToUpper(Md5Encode(data))
}// 随机数加密
func MakePassword(plainpwd, salt string) string {return Md5Encode(plainpwd + salt)
}
最后把初始化完成的user传给 models/user_basic.go 的 CreateUser() 在数据库中存入用户,返回创建成功的JSON。
func CreateUser(user UserBasic) *gorm.DB {return utils.DB.Create(&user)
}
删除用户
"user/deleteUser" POST请求调用了 userservice.go 的DeleteUser() 函数。函数Query获取一个 "id" 把id赋给创建的UserBasic结构体传给 models层的方法 DeleteUser() 从数据库中删除,注意GORM的delete并不是马上清除,而是标记清除。
func DeleteUser(c *gin.Context) {user := models.UserBasic{}id, _ := strconv.Atoi(c.Query("id"))user.ID = uint(id)models.DeleteUser(user)c.JSON(http.StatusOK, gin.H{"code": 0,"message": "删除用户成功!","data": user,})
}
func DeleteUser(user UserBasic) *gorm.DB {return utils.DB.Delete(&user)
}
修改用户信息
"user/updateUser" POST请求调用了 userservice.go 的 UpdateUser() 函数。PostForm拿到字段 "id","name","password","phone","icon","email" 赋值给结构体。
这里使用了 govalidator 库来进行格式验证,确保输入正确的邮箱和手机号。
func UpdateUser(c *gin.Context) {user := models.UserBasic{}// 从POST请求中拿到idid, _ := strconv.Atoi(c.PostForm("id"))user.ID = uint(id)user.Name = c.PostForm("name")user.Password = c.PostForm("password")user.Phone = c.PostForm("phone")user.Avatar = c.PostForm("icon")user.Email = c.PostForm("email")// govalidator校验_, err := govalidator.ValidateStruct(user)if err != nil {fmt.Println(err)c.JSON(http.StatusBadRequest, gin.H{"code": -1,"message": "修改参数不匹配!","data": user,})} else {models.UpdateUser(user)c.JSON(http.StatusOK, gin.H{"code": 0,"message": "修改用户成功!","data": user,})}
}
最后在 model 层的 UpdateUser 实现数据库的数据更新。
func UpdateUser(user UserBasic) *gorm.DB {// DB.Model(&user) 的作用是指定后续的数据库操作(如 Update、Delete 等)应该作用于哪个模型(Model)上return utils.DB.Model(&user).Updates(UserBasic{Name: user.Name, Password: user.Password, Phone: user.Phone, Email: user.Email, Avatar: user.Avatar})
}
查找用户
"user/find" POST请求调用了 userservice.go 的 FindByID() 函数。顾名思义,就是通过Id在数据库中查找用户,没什么好说的,具体数据库操作实现封装在model层。
func FindByID(c *gin.Context) {userId, _ := strconv.Atoi(c.Request.FormValue("userId"))// name := c.Request.FormValue("name")data := models.FindByID(uint(userId))utils.RespOK(c.Writer, data, "ok")
}
// 查找某个用户
func FindByID(id uint) UserBasic {user := UserBasic{}utils.DB.Where("id = ?", id).First(&user)return user
}
通过名字和密码查找用户
"user/findUserByNameAndPwd" POST请求调用了 userservice.go 的 FindUserByNameAndPwd() 函数。这个函数我们之前在首页功能的获取首页说过,实际上负责了登录的功能。
func FindUserByNameAndPwd(c *gin.Context) {data := models.UserBasic{}name := c.Request.FormValue("name")password := c.Request.FormValue("password")// 用户是否存在user := models.FindUserByName(name)if user.Name == "" {c.JSON(http.StatusBadRequest, gin.H{"code": -1,"message": "该用户不存在","data": data,})return}// 判断密码flag := utils.ValidPassword(password, user.Salt, user.Password)if !flag {c.JSON(http.StatusBadRequest, gin.H{"code": -1,"message": "密码错误","data": data,})return}// 查询pwd := utils.MakePassword(password, user.Salt)data = models.FindUserByNameAndPwd(name, pwd)c.JSON(http.StatusOK, gin.H{"code": 0,"message": "登录成功","data": data,})
}
从POST中获取 "name","password" 字段,先用 FindUserByName() 查找用户是否存在,然后把输入的密码和user的salt传入 utils.ValidPassword() 函数解码,检查是否与数据库中的密码一致,通过JSON返回code,message,data。
获取用户列表
"user/getUserList" POST请求调用了 userservice.go 的 GetUserList() 函数。获得UserBasic{} 结构体切片,就是所有用户的数据。
func GetUserList(c *gin.Context) {data := make([]*models.UserBasic, 10)data = models.GetUserList()c.JSON(http.StatusOK, gin.H{"code": 0,"message": "用户名已注册","data": data,})
}
func GetUserList() []*UserBasic {data := make([]*UserBasic, 10)_ = utils.DB.Find(&data)return data
}
好友模块
// 好友模块r.POST("/searchFriends", service.SearchFriends)r.POST("/contact/addfriend", service.AddFriend)
好用模块对应的 models 是 contact.go,封装了一个 Contact 表来存储用户关系。
// 人员关系
type Contact struct {gorm.ModelOwnerId uint // 谁的关系TargetId uint // 对应的谁Type int // 对应的类型 1好友 2群Desc string
}
获取所有好友
"/searchFriends" POST请求调用了 userservice.go 的 SearchFriends() 函数。
func SearchFriends(c *gin.Context) {id, _ := strconv.Atoi(c.Request.FormValue("userId"))users := models.SearchFriends(uint(id))// 查询OK列表utils.RespOKList(c.Writer, users, len(users))
}
在 model 层中的 SearchFriends() 函数接收到 id,通过在数据库中查询,把 id 对应 type为1,即好友关系的 contact 存入 contact切片 contacts,然后遍历切片contacts获得好友的id,又通过好友id获得UserBasic结构体存入UserBasic切片,把UserBasic切片返回。
看起来很绕,其实就是要从contact表中查关系,然后转换成userbasic表数据传回,因为我们要的是所以好友的用户信息,而不是仅仅的contact关系。
func SearchFriends(userId uint) []UserBasic {contacts := make([]Contact, 0)objIds := make([]uint64, 0)utils.DB.Where("owner_id =? and type=1", userId).Find(&contacts)for _, v := range contacts {objIds = append(objIds, uint64(v.TargetId))}users := make([]UserBasic, 0)utils.DB.Where("id in ?", objIds).Find(&users)return users
}
此外,我们可以看到调用了 Resp 函数,这个函数是在 utils 里的resp.go,其中的函数实现按照格式规范返回的功能。
type H struct {Code intMsg stringData interface{}Rows interface{}Total interface{}
}func Resp(w http.ResponseWriter, code int, data interface{}, msg string) {// 设置响应的 Content-Type 头为 application/json,告诉客户端响应体是 JSON 格式的数据。w.Header().Set("Content-Type", "application/json")// 发送 HTTP 状态码 200 OK 到客户端,表示请求已成功处理。w.WriteHeader(http.StatusOK)h := H{Code: code,Data: data,Msg: msg,}ret, err := json.Marshal(h)if err != nil {fmt.Println(err)}// 将序列化后的 JSON 数据 ret 写入响应流,发送给客户端。w.Write(ret)
}func RespList(w http.ResponseWriter, code int, data interface{}, total interface{}) {// 设置响应的 Content-Type 头为 application/json,告诉客户端响应体是 JSON 格式的数据。w.Header().Set("Content-Type", "application/json")// 发送 HTTP 状态码 200 OK 到客户端,表示请求已成功处理。w.WriteHeader(http.StatusOK)h := H{Code: code,Rows: data,Total: total,}ret, err := json.Marshal(h)if err != nil {fmt.Println(err)}// 将序列化后的 JSON 数据 ret 写入响应流,发送给客户端。w.Write(ret)
}func RespFail(w http.ResponseWriter, msg string) {Resp(w, -1, nil, msg)
}func RespOK(w http.ResponseWriter, data interface{}, msg string) {Resp(w, 0, data, msg)
}func RespOKList(w http.ResponseWriter, data interface{}, total interface{}) {RespList(w, 0, data, total)
}
添加好友
"/contact/addFriends" POST请求调用了 userservice.go 的 AddFriends() 函数。
func AddFriend(c *gin.Context) {id, _ := strconv.Atoi(c.Request.FormValue("userId"))targetid, _ := strconv.Atoi(c.Request.FormValue("targetId"))code, msg := models.AddFriend(uint(id), uint(targetid))// 查询OK列表if code == 0 {utils.RespOK(c.Writer, code, msg)} else {utils.RespFail(c.Writer, msg)}
}
获取了我们自己的 useId 和要添加对象的 targetId 传给 models下contact.go 的 AddFriend() 。在函数中我们进行了一系列的验证,像是输入不许为空、添加的用户不能是自己、不能重复添加好友。
在往数据库添加contact表时,我们运用了事物功能,因为建立好友关系要同时建两张表,一张是自己把对方加入好友的表,一张是对方把我加入好友的表。
为了避免建表过程中可能出现错误,导致只有一张表的垃圾数据,我们要用事物保证建表的一致性。用Begin开启事务,Commit提交事务,recover捕获异常后Rollback回滚。
func AddFriend(userId uint, targetId uint) (int, string) {user := UserBasic{}if targetId != 0 {user = FindUserByID(targetId)if user.Password != "" {if userId == user.ID {return -5, "无法添加自己"}temp := Contact{}utils.DB.Where("owner_id =? and target_id = ? and type=1", userId, targetId).Find(&temp)if temp.ID != 0 {return -3, "对方已经是你的好友"}tx := utils.DB.Begin() // 打开事物defer func() {// 事物一旦开始,不论最后什么异常都会回滚if r := recover(); r != nil {tx.Rollback()}}()contact := Contact{OwnerId: userId,TargetId: targetId,Type: 1,Desc: "好友",}if err := utils.DB.Create(&contact); err != nil {tx.Rollback()return -4, "创建好友关系发生错误"}contact = Contact{OwnerId: targetId,TargetId: userId,Type: 1,Desc: "好友",}if err := utils.DB.Create(&contact); err != nil {tx.Rollback()return -4, "创建好友关系发生错误"}tx.Commit()return 0, "添加好友成功"}return -1, "未找到该用户"}return -2, "输入不许为空"
}
发送消息
ws升级和订阅redis管道接收消息
"/user/SendMsg" GET调用了 userservice.go 的 SendMsg() 函数。这个函数就是用Upgarde()函数把ws的Upgrader从http协议升级为 WebSocket,这里设置了CheckOrigin为true是检查跨域请求的常规操作。
// websocket的upgrate设定检查,防止跨域站点的伪造请求
var upGrade = websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {return true},
}func SendMsg(c *gin.Context) {ws, err := upGrade.Upgrade(c.Writer, c.Request, nil)if err != nil {fmt.Println("upgrade:", err)return}defer func(ws *websocket.Conn) {err = ws.Close()if err != nil {fmt.Println("close:", err)}}(ws)MsgHandler(ws, c)
}
func MsgHandler(ws *websocket.Conn, c *gin.Context) {for {msg, err := utils.Subscribe(c, utils.PublishKey)if err != nil {fmt.Println("subscribe err:", err)}tm := time.Now().Format("2006-01-02T15:04:05")// 消息格式化m := fmt.Sprintf("[ws][%s]:%s", tm, msg)err = ws.WriteMessage(1, []byte(m))if err != nil {fmt.Println("WriteMessage err:", err)}}
}
升级为WebSocket后,调用 utils 下 system_init.go 的 Subscribe() 函数订阅了Redis中websocket频道的消息,sub订阅对象通过ReceiveMessage堵塞获取管道消息,消息体调用Payload方法返回具体消息内容。
最后往ws中写入测试消息,查看通信是否正常。
const (PublishKey = "websocket"
)// 封装redis的消息函数
// Publish 发布消息到redis
func Publish(ctx context.Context, channel, msg string) error {var err errorerr = Red.Publish(ctx, channel, msg).Err()if err != nil {fmt.Println("Publish error: %v", err)}return err
}// Subscribe 订阅redis消息
func Subscribe(ctx context.Context, channel string) (string, error) {sub := Red.Subscribe(ctx, channel)msg, err := sub.ReceiveMessage(ctx)if err != nil {fmt.Println("Subscribe error: %v", err)}return msg.Payload, err
}
用户发送消息
"/user/SendUserMsg" GET调用了 userservice.go 的 SendUserMsg() 函数。会运行models/message.go里的 Chat() 函数,这个函数是实现发送消息功能的关键。
func SendUserMsg(c *gin.Context) {models.Chat(c.Writer, c.Request)
}
这里比较复杂,我们一步步来看。
第一步获取参数,拿到userId;第二步协议升级为ws;第三步,初始化我们封装的node结构体,一个node就是一个消息通信的节点,ws是全双工通信,可以实现收发消息;第四步把node映射到clientMap生,加上读写锁防止冲突。
type Node struct {Conn *websocket.ConnAddr string //客户端地址FirstTime uint64 //首次连接时间HeartbeatTime uint64 //心跳时间LoginTime uint64 //登录时间DataQueue chan []byteGroupSets set.Interface
}// 映射关系
var clientMap map[int64]*Node = make(map[int64]*Node, 0)// 读写锁
var rwLock sync.RWMutexfunc Chat(w http.ResponseWriter, r *http.Request) {// 1.获取参数query := r.URL.Query()Id := query.Get("userId")userId, _ := strconv.ParseInt(Id, 10, 64)// 2.校验合法性 暂时没有实现 直接trueisvalid := trueconn, err := (&websocket.Upgrader{//token 校验CheckOrigin: func(r *http.Request) bool {return isvalid},}).Upgrade(w, r, nil)if err != nil {fmt.Println(err)return}// 3.获取连接,封装结构体currentTime := uint64(time.Now().Unix())node := &Node{Conn: conn,Addr: conn.RemoteAddr().String(), //客户端地址HeartbeatTime: currentTime, //心跳时间LoginTime: currentTime, //登录时间DataQueue: make(chan []byte, 50),GroupSets: set.New(set.ThreadSafe), // 设置线程安全}// 4.用户关系,userid与node绑定并且加锁rwLock.Lock()clientMap[userId] = noderwLock.Unlock()// 5.完成发送逻辑go sendProc(node)// 6.完成接受逻辑,接收后广播到udp,根据类型(私聊,群聊)分别处理go recvProc(node)//7.加入在线用户到缓存SetUserOnlineInfo("online_"+Id, []byte(node.Addr), time.Duration(viper.GetInt("timeout.RedisOnlineTime"))*time.Hour)sendMsg(uint(userId), []byte("欢迎进入聊天系统"))
}
第五步开启一个go协程,负责发送消息。把data从node节点的消息管道中取出,然后WriteMessage 写入ws连接conn。
func sendProc(node *Node) {for {select {case data := <-node.DataQueue:err := node.Conn.WriteMessage(websocket.TextMessage, data)if err != nil {fmt.Println(err)return}}}
}
第六步开启一个go协程,负责接收消息。把接收的消息JSON反序列化后存入Message结构体。如果Type是3为心跳检测,调用 HeartBeat 方法更新心跳时间。
// 消息
type Message struct {gorm.ModelUserId uint // 发送方TargetId uint // 接受方Type int //发送类型 1私聊 2群聊 3广播Media int //消息类型 1文字 2表情包 3图片 4音频Content string // 消息内容CreateTime uint64 //创建时间ReadTime uint64 //读取时间Pic stringUrl stringDesc stringAmount int // 其他数字统计
}// 需要重写此方法才能完整的msg转byte[]
func (msg Message) MarshalBinary() ([]byte, error) {return json.Marshal(msg)
}func recvProc(node *Node) {for {_, data, err := node.Conn.ReadMessage()if err != nil {fmt.Println(err)return}msg := Message{}err = json.Unmarshal(data, &msg)if err != nil {fmt.Println(err)}//心跳检测 msg.Media == -1 || msg.Type == 3if msg.Type == 3 {currentTime := uint64(time.Now().Unix())node.Heartbeat(currentTime)} else {dispatch(data)broadMsg(data) // 将消息广播到局域网}}
}// 更新用户心跳
func (node *Node) Heartbeat(currentTime uint64) {node.HeartbeatTime = currentTimereturn
}
然后通过 dispatch() 函数,按照消息类型分发消息。更新了msg的CreateTime,然后又反序列化得到消息内容data。
这里我们注意到获得msg反序列化了一次,获取data又反序列化了一次,是因为ws收到JSON格式的数据,反序列化得到msg是一个大的结构,其中的消息data是JSON格式存储,想要拿到消息内容还要反序列化一次。
func dispatch(data []byte) {msg := Message{}msg.CreateTime = uint64(time.Now().Unix())err := json.Unmarshal(data, &msg)if err != nil {fmt.Println(err)}switch msg.Type {case 1: // 私信sendMsg(msg.TargetId, data)case 2:sendGroupMsg(int64(msg.TargetId), data) //发送的群ID ,消息内容//case 3:// sendAllMsg()//case 4:}
}
如果是私聊,在 sendMsg 函数中,依次实现下列功能
- 读写锁从映射表取出目标节点
- 创建上下文,取出msg的json数据,设置时间戳,创建 targtIdstr(接收方) 和 userIdstr(发送方) 对象
- 从Redis中onlie键检查用户是否在线,在线就把消息写入目标节点消息管道
- 按格式构建消息键,把消息缓存在redis有序集合,socre+1使消息的时效大于上一条
func sendMsg(userId uint, msg []byte) {rwLock.RLock()node, ok := clientMap[int64(userId)]rwLock.RUnlock()jsonMsg := Message{}json.Unmarshal(msg, &jsonMsg)ctx := context.Background()targetIdStr := strconv.Itoa(int((userId)))userIdStr := strconv.Itoa(int(jsonMsg.UserId))jsonMsg.CreateTime = uint64(time.Now().Unix())r, err := utils.Red.Get(ctx, "online_"+userIdStr).Result()if err != nil {fmt.Println(err)}if r != "" {if ok {node.DataQueue <- msg}}var key stringif userId > jsonMsg.UserId {key = "msg_" + userIdStr + "_" + targetIdStr} else {key = "msg_" + targetIdStr + "_" + userIdStr}res, err := utils.Red.ZRevRange(ctx, key, 0, -1).Result()if err != nil {fmt.Println(err)}score := float64(cap(res)) + 1ress, e := utils.Red.ZAdd(ctx, key, &redis.Z{score, msg}).Result() //jsonMsg//res, e := utils.Red.Do(ctx, "zadd", key, 1, jsonMsg).Result() //备用 后续拓展 记录完整msgif e != nil {fmt.Println(e)}fmt.Println(ress)
}
最后通过 boradMsg() 函数把消息广播到局域网,具体实现了一个 UDP 通信的框架,包括发送和接收数据的功能。发送协程负责从通道中获取数据并发送到网络,而接收协程负责监听并接收网络数据,然后进行处理,实现IM的实时通信
broadMsg 函数:将消息发送到 udpsendChan
通道,这个通道被 udpSendProc
协程监听。
udpSendProc 函数:这是一个协程,用于不断地从 udpsendChan
中读取数据,并使用 UDP 协议发送到局域网内的广播地址 192.168.1.255。
udpRecvProc 函数:这是一个协程,用于在局域网内监听 UDP 广播消息。接收到的 UDP 数据包会被传递给 dispatch
函数进行处理。
var udpsendChan chan []byte = make(chan []byte, 1024)func broadMsg(data []byte) {udpsendChan <- data
}func init() {go udpSendProc()go udpRecvProc()fmt.Println("init goroutines...")
}// 完成udp数据发送协程
func udpSendProc() {con, err := net.DialUDP("udp", nil, &net.UDPAddr{IP: net.IPv4(192, 168, 1, 255),Port: viper.GetInt("port.udp"),})defer con.Close()if err != nil {fmt.Println(err)}for {select {case data := <-udpsendChan:_, err = con.Write(data)if err != nil {fmt.Println(err)return}}}
}// 完成udp接收数据协程
func udpRecvProc() {con, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4zero,Port: viper.GetInt("port.udp"),})if err != nil {fmt.Println(err)}defer con.Close()for {var buf [512]byten, err := con.Read(buf[0:])if err != nil {fmt.Println(err)return}dispatch(buf[0:n])}
}
看了udp部分的代码,大家可能会有疑问,为什么我明明已经有node节点的消息管道和发送协程、接收协程,还需要再封装一个udp管道和udp发送接收协程呢?
实际上,node节点和接收发送协程,是网络层ws的,学过网络通信的我们都知道,要实现网络消息传输,还是依赖传输层的udp/tcp实现,特别是我们及时IM,依赖了udp的广播。
-
为什么要有 UDP 广播:
- UDP 广播允许发送数据包到局域网内的所有设备,而不需要建立单独的连接。
- 这在聊天系统中很有用,因为它可以用于通知所有在线设备有新消息到达,或者用于实现局域网内的群聊功能。
-
UDP 广播是怎么被调用的:
- 当
recvProc
接收到非心跳消息时,会调用broadMsg
函数,将消息发送到 UDP 通道。 udpSendProc
协程监听这个通道,并将收到的消息发送到局域网内的广播地址。
- 当
自此为止,实现发送的核心已经分析完毕了,还是让我们回到 Chat() 函数吧。
第七步,加入在线用户到redis缓存,然后给调用函数user的ws发送一条"欢迎进入聊天系统"。到此Chat聊天的大致雏形已成。
func SetUserOnlineInfo(key string, val []byte, timeTTL time.Duration) {ctx := context.Background()utils.Red.Set(ctx, key, val, timeTTL)
}
func SetUserOnlineInfo(key string, val []byte, timeTTL time.Duration) {ctx := context.Background()utils.Red.Set(ctx, key, val, timeTTL)
}
上传文件
// 上传文件r.POST("/attach/upload", service.Upload)
"/attach/upload" POST请求调用了 userservice.go 的 Upload() 函数。FormFile从请求中获取"file"文件,这里前端会处理接口。
然后设置了格式名字,地址(asset/upload),用os.Create创建这么一个空文件,再用io.Copy复制"file"文件到本地空文件,这种做法使得之后展示的图片,实际上就是访问复制的本地文件。返回值包括文件地址url,以便更新数据库中的图片字段。
说实话这么做并不明智,应该选择把文件上传到云,而不是保存在用户的本地,但是这是个小demo,就这样吧。
func Upload(c *gin.Context) {w := c.Writerr := c.Request// 获取文件srcFile, header, err := r.FormFile("file")if err != nil {utils.RespFail(w, err.Error())}defer srcFile.Close()// 上传suffix := ".png"ofilName := header.Filenametem := strings.Split(ofilName, ".")if len(tem) > 1 {suffix = "." + tem[len(tem)-1]}fileName := fmt.Sprintf("%d%04d%s", time.Now().Unix(), rand.Int31(), suffix)url := "./asset/upload/" + fileNamedetFile, err := os.Create(url)if err != nil {utils.RespFail(w, err.Error())}_, err = io.Copy(detFile, srcFile)if err != nil {utils.RespFail(w, err.Error())}utils.RespOK(w, url, "发送图片成功")
}
群聊功能
// 群聊功能r.POST("/contact/createCommunity", service.CreateCommunity)r.POST("/contact/loadcommunity", service.LoadCommunity)r.POST("/contact/joinGroup", service.JoinGroups)
创建群聊
"/contact/createCommunity" POST请求调用了 userservice.go 的 CreateCommunity() 函数。接收POST请求的"ownerId","name","icon","desc",初始化化封装的community结构体,传递给 models 下 CreateCommunity() 函数。
/*
Community在models的community.go下定义
*/
type Community struct {gorm.ModelName stringOwnerId uintImg stringDesc string
}func CreateCommunity(c *gin.Context) {ownerId, _ := strconv.Atoi(c.Request.FormValue("ownerId"))name := c.Request.FormValue("name")icon := c.Request.FormValue("icon")desc := c.Request.FormValue("desc")community := models.Community{}community.OwnerId = uint(ownerId)community.Name = namecommunity.Img = iconcommunity.Desc = desccode, msg := models.CreateCommunity(community)if code == 0 {utils.RespOK(c.Writer, code, msg)} else {utils.RespFail(c.Writer, msg)}
}
在接收到community结构体后,因为涉及多表操作首先开启事务,判断群的合法性。在数据库的community表创建行后,还要在contact表更新关系,Type类型是2表示群聊。
func CreateCommunity(community Community) (int, string) {tx := utils.DB.Begin()//事务一旦开始,不论什么异常最终都会 Rollbackdefer func() {if r := recover(); r != nil {tx.Rollback()}}()if len(community.Name) == 0 {return -1, "群名称不能为空"}if community.OwnerId == 0 {return -1, "请先登录"}if err := utils.DB.Create(&community).Error; err != nil {fmt.Println(err)tx.Rollback()return -1, "建群失败"}contact := Contact{}contact.OwnerId = community.OwnerIdcontact.TargetId = community.IDcontact.Type = 2 //群关系if err := utils.DB.Create(&contact).Error; err != nil {tx.Rollback()return -1, "添加群关系失败"}tx.Commit()return 0, "建群成功"
}
加入群聊
"/contact/joinGroups" POST请求调用了 userservice.go 的 JoinGroups() 函数。获取用户id和群id,传入 community.go 的 JoinGroup() 函数。
func JoinGroups(c *gin.Context) {userId, _ := strconv.Atoi(c.Request.FormValue("userId"))comId := c.Request.FormValue("comId")// name := c.Request.FormValue("name")data, msg := models.JoinGroup(uint(userId), comId)if data == 0 {utils.RespOK(c.Writer, data, msg)} else {utils.RespFail(c.Writer, msg)}
}
JoinGroup()中,我们初始化 contact关系结构体,设置Id以及Type。我们通过comId查找community表,这里设计成群名字或者群号都可以找到,经过判断后,contact的TargetId赋值community的ID,在数据库contact关系表创建新的rows。
func JoinGroup(userId uint, comId string) (int, string) {contact := Contact{}contact.OwnerId = userId//contact.TargetId = comIdcontact.Type = 2community := Community{}utils.DB.Where("id=? or name=?", comId, comId).Find(&community)if community.Name == "" {return -1, "没有找到群"}utils.DB.Where("owner_id=? and target_id=? and type =2 ", userId, comId).Find(&contact)if !contact.CreatedAt.IsZero() {return -1, "已加过此群"} else {contact.TargetId = community.IDutils.DB.Create(&contact)return 0, "加群成功"}
}
加载用户群关系
"/contact/loadcommunity" POST请求调用了 userservice.go 的 LoadCommunity() 函数。获取"ownerId",传入 community.go 的 LoadCommunity()。
func LoadCommunity(c *gin.Context) {ownerId, _ := strconv.Atoi(c.Request.FormValue("ownerId"))// name := c.Request.FormValue("name")data, msg := models.LoadCommunity(uint(ownerId))if len(data) != 0 {utils.RespList(c.Writer, 0, data, msg)} else {utils.RespFail(c.Writer, msg)}
}
LoadCommunity() 中,我们用拿到的ownerId查关系表,查所有type是2的关系(即群关系),把群id计入一个objIds的切片,然后遍历切片,用群id获取对应 community 结构,存入community切片最后返回。
func LoadCommunity(ownerId uint) ([]*Community, string) {contacts := make([]Contact, 0)objIds := make([]uint64, 0)utils.DB.Where("owner_id = ? and type=2", ownerId).Find(&contacts)for _, v := range contacts {objIds = append(objIds, uint64(v.TargetId))}data := make([]*Community, 10)utils.DB.Where("id in ?", objIds).Find(&data)for _, v := range data {fmt.Println(v)}return data, "查询成功"
}
消息缓存
r.POST("/user/redisMsg", service.RedisMsg)
这个POST请求在进入主界面时便发送,目的是为了读取redis中缓存的历史消息。在 RedisMsg() 函数中,获取了聊天双方Id,历史消息的开始和结束条数,还有isRev表示是否逆序排序。传递给 models 下 message.go 的 RedisMsg()函数。
func RedisMsg(c *gin.Context) {userIdA, _ := strconv.Atoi(c.PostForm("userIdA"))userIdB, _ := strconv.Atoi(c.PostForm("userIdB"))start, _ := strconv.Atoi(c.PostForm("start"))end, _ := strconv.Atoi(c.PostForm("end"))isRev, _ := strconv.ParseBool(c.PostForm("isRev"))res := models.RedisMsg(int64(userIdA), int64(userIdB), int64(start), int64(end), isRev)utils.RespOKList(c.Writer, "ok", res)
}
RedisMsg()函数中,按一定格式key才能从数据库有序集合中,按score有序的读取历史消息缓存,把历史消息的字符串切片返回。
// 获取缓存里面的消息
func RedisMsg(userIdA int64, userIdB int64, start int64, end int64, isRev bool) []string {ctx := context.Background()userIdStr := strconv.Itoa(int(userIdA))targetIdStr := strconv.Itoa(int(userIdB))var key stringif userIdA > userIdB {key = "msg_" + targetIdStr + "_" + userIdStr} else {key = "msg_" + userIdStr + "_" + targetIdStr}var rels []stringvar err errorif isRev {rels, err = utils.Red.ZRange(ctx, key, start, end).Result()} else {rels, err = utils.Red.ZRevRange(ctx, key, start, end).Result()}if err != nil {fmt.Println(err) //没有找到}return rels
}
总结
这是Gin框架的IM小demo,基本实现了聊天功能,一些拓展的功能没有实现,但是其核心对于入门Gin和Websocket通信来说,是不错的练习。