IM系统(一)-状态服务器设计

文章目录

    • 文章概要
    • 需求分析
    • 技术栈
    • 准备工作
      • 封装日志框架
      • 封装Redis
      • 封装ETCD
    • 业务逻辑开发
      • 定义模型
      • 定义服务
      • 实现服务接口
      • 服务端启动程序
      • 客户端测试程序
    • 总结

文章概要

本篇文章分享在做一个IM(即时通讯)系统时,设计一个管理用户在线状态的服务。

该系列文章将从以下几个方面进行介绍:

  1. 状态服务的设计,需求分析、采用哪些技术栈
  2. 各个需求功能的设计和实现
  3. 运行效果

需求分析

顾名思义,状态服务器是用来记录和管理用户在线状态的,该系统中简化了用户的在线状态,分为两种:Online和Offline。
作为一个独立的服务,应该为聊天服务器提供以下功能:

  1. 实现多端设备同时在线
  2. 查询某个用户的在线状态
  3. 登记某个用户的在线状态(包括下线和上线)

技术栈

Redis
这里在设计时,没有将用户的上下线记录进行持久化,只需要将用户的在线数据使用redis进行缓存即可,当然为了方便用户查看自己的登录历史记录,该数据也具有一定的重要性,后序可能会增加。
gRPC
该服务是可以独立运行的,并且可以进行集群部署,实现系统的高扩展性和高可用,所以整体必然采用微服务架构,使用RPC协议作为微服务之间的通讯协议,满足实时性。gRPC框架是RPC的一种通用实现,官网文档也很详细,所以这里采用该框架来实现RPC通信。
ETCD
采用微服务的架构,必然需要考虑服务的注册与发现的实现方案,虽然gRPC框架支持服务的注册与发现,并且可以进行负载均衡,但是使用ETCD可以为系统提供高扩展性和可用性,并且ETCD更加全面,支持服务的健康状态检查和数据一致性,所以为系统后序可以更好的扩展,这里采用ETCD来实现服务的注册与发现。
Zap
Zap是一个轻量、快速的日志框架。

准备工作

封装日志框架

本封装只是对zap进行简单的套壳,目的是为了更方便的调用。像log一样

//pkg/logger/logger.go
var (logger *zap.Logger
)func init() {dev, err := zap.NewDevelopment()if err != nil {panic(err)}logger = dev
}func Info(msg string, fields ...zap.Field) {logger.Info(msg, fields...)
}func Error(msg string, field ...zap.Field) {logger.Error(msg, field...)
}func Fatal(msg string, field ...zap.Field) {logger.Fatal(msg, field...)
}func Warn(msg string, field ...zap.Field) {logger.Warn(msg, field...)
}func Debug(msg string, field ...zap.Field) {logger.Debug(msg, field...)
}func DPanic(msg string, field ...zap.Field) {logger.DPanic(msg, field...)
}func Sync() {logger.Sync()
}

封装Redis

为了实现用户状态的缓存,需要独立封装redis的相关操作,使得实际的业务代码更加的专一和简洁。

连接Redis服务

//pkg/db/redis.go
var (StatusRDB *redis.Client
)func init() {StatusRDB = InitRedis()
}// InitRedis 初始化Redis连接
func InitRedis() *redis.Client {rdb := redis.NewClient(&redis.Options{Addr:     config.RedisAddr,Password: config.RedisPass,DB:       config.RedisDB,})_, err := rdb.Ping(context.Background()).Result()if err != nil {logger.Fatal("Redis 初始化失败", zap.String("失败原因", err.Error()))}logger.Info("Redis 初始化成功")return rdb
}

封装Redis操作
封装之前需要考虑一下需要哪些功能,我们的目的是记录一个用户的在线状态,该在线状态是一个对象,我们需要做的是添加整个字段和修改用户的在线状态字段。所以value可选择的方案有String和Hash。使用字符串的话需要频繁的序列化和反序列化对象,所以这里使用Hash作为value的容器。

因为要支持多端设备登录,所以是多个设备对应同一个用户,所以这里将用户的设备号作为key,保证唯一性。

  1. func RegisterUserStatus(context context.Context, deviceID string, status model.UserStatus) error;
    后面将看到model.UserStatus的定义
  2. func UpdateUserStatus(context context.Context, deviceID string, status string, newStatus bool) error
  3. func RemoveAfter(context context.Context, deviceID string, duration time.Duration) error;
  4. func CheckOnline(context context.Context, deviceID string) (bool, error)
  5. func GetUserStatus(context context.Context, deviceID string) (*model.UserStatus, error)
// RegisterUserStatus 注册用户的在线状态
func RegisterUserStatus(context context.Context, deviceID string, status model.UserStatus) error {_, err := db.StatusRDB.HSet(context, deviceID, map[string]interface{}{"UserID":     status.UserID,"DeviceID":   status.DeviceID,"Status":     status.Status,"ServerAddr": status.ServerAddr,"LoginTime":  status.LoginTime,"LogoutTime": status.LogoutTime,}).Result()return err
}// UpdateUserStatus 更新用户的在线状态
func UpdateUserStatus(context context.Context, deviceID string, status string, newStatus bool) error {_, err := db.StatusRDB.HSet(context, deviceID, map[string]interface{}{status: newStatus,}).Result()return err
}// CheckOnline 检查用户是否在线
func CheckOnline(context context.Context, deviceID string) (bool, error) {status, err := db.StatusRDB.HGet(context, deviceID, "status").Result()if err == redis.Nil {return false, nil}return status == "1", err
}// RemoveAfter 设置键的过期时间
func RemoveAfter(context context.Context, deviceID string, duration time.Duration) error {_, err := db.StatusRDB.ExpireNX(context, deviceID, duration).Result()return err
}// GetUserStatus 获取用户的在线状态记录
func GetUserStatus(context context.Context, deviceID string) (*model.UserStatus, error) {// 查询该键值是否存在count, err := db.StatusRDB.Exists(context, deviceID).Result()if err != nil {return nil, err}if count <= 0 {return nil, nil}result, err := db.StatusRDB.HGetAll(context, deviceID).Result()if err != nil {return nil, err}loginTime, _ := strconv.Atoi(result["LoginTime"])logoutTime, _ := strconv.Atoi(result["LogoutTime"])return &model.UserStatus{UserID:     result["UserID"],DeviceID:   result["DeviceID"],Status:     result["Status"] == "1",ServerAddr: result["ServerAddr"],LoginTime:  int64(loginTime),LogoutTime: int64(logoutTime),}, nil
}

封装ETCD

ETCD的作用是用于服务的注册与发现,所以我们要封装注册服务和注销服务的方法。程序设计可以参考ETCD官网

  1. func RegisterServerToEtcd(addr string) error;
  2. func UnRegisterFromEtcd(addr string) error;
const (serviceName = "im/state-server"ttl         = 10
)var (etcdClient *clientv3.Client
)func init() {c, err := clientv3.NewFromURL(config.EtcdAddr)if err != nil {logger.Error("连接ETCD服务器失败", zap.String("失败原因", err.Error()))panic(err)}etcdClient = c
}// RegisterServerToEtcd 注册服务到ETCD
func RegisterServerToEtcd(addr string) error {manager, err := endpoints.NewManager(etcdClient, serviceName)if err != nil {return err}lease, _ := etcdClient.Grant(context.TODO(), ttl)err = manager.AddEndpoint(context.TODO(), fmt.Sprintf("%s/%s", serviceName, addr), endpoints.Endpoint{Addr: addr}, clientv3.WithLease(lease.ID))if err != nil {return err}alive, err := etcdClient.KeepAlive(context.TODO(), lease.ID)if err != nil {return err}go func() {for {<-alive}}()return nil
}// UnRegisterFromEtcd 从ETCD注销服务
func UnRegisterFromEtcd(addr string) error {em, err := endpoints.NewManager(etcdClient, serviceName)if err != nil {return err}err = em.DeleteEndpoint(context.TODO(), fmt.Sprintf("%s/%s", serviceName, addr))return err
}

业务逻辑开发

定义模型

// UserStatus 用户的在线状态表
type UserStatus struct {UserID     string `json:"user_id"`DeviceID   string `json:"device_id"`Status     bool   `json:"status"`      // (0:表示离线 1:表示在线)ServerAddr string `json:"server_addr"` //用户所在的服务器地址LoginTime  int64  `json:"login_time"`LogoutTime int64  `json:"logout_time"`
}

定义服务

syntax = "proto3";package pb;
option go_package = "./pb";// 当服务器查询某个用户的状态时,需要提供该用户的ID和设备号
message QueryUserStatusRequest{string UserID = 1;string DeviceID = 2;
}// 状态服务器给出响应,返回该用户的在线状态
message QueryUserStatusResponse{bool Status = 1;string ServerAddr = 2;
}// 当用户上线,需要告诉状态服务器
message OnlineRequest{string UserID = 1;string DeviceID = 2;string ServerAddr = 4;
}// 返回用户的在线状态是否设置成功
message OnlineResponse{
}// 当用户下线,需要告诉状态服务器
message OfflineRequest{string UserID = 1;string DeviceID = 2;
}message OfflineResponse{
}service UserStatus{// 查询用户在线状态的服务rpc QueryUserStatus(QueryUserStatusRequest) returns(QueryUserStatusResponse);// 上线服务rpc Online(OnlineRequest) returns(OnlineResponse);// 下线服务rpc Offline(OfflineRequest) returns(OfflineResponse);
}

通过protoc命令将上述文件生成一份客户端和服务端的代码。

实现服务接口

type UserStatusService struct {pb.UnimplementedUserStatusServer
}func NewUserStatusService() *UserStatusService {return &UserStatusService{}
}// QueryUserStatus 查询用户在线状态的服务
func (u *UserStatusService) QueryUserStatus(c context.Context, req *pb.QueryUserStatusRequest) (*pb.QueryUserStatusResponse, error) {logger.Info("调用用户在线状态查询服务")// 查询用户的在线状态status, err := repository.GetUserStatus(c, req.DeviceID)if err != nil {logger.Error("查询用户在线状态失败", zap.String("失败的原因", err.Error()))return nil, errors.New("查询用户在线状态失败")}if status == nil || !status.Status {logger.Info("查询的用户不在缓存中")return &pb.QueryUserStatusResponse{Status: false,}, nil}return &pb.QueryUserStatusResponse{Status:     status.Status,ServerAddr: status.ServerAddr,}, nil
}// Online 上线服务
func (u *UserStatusService) Online(c context.Context, request *pb.OnlineRequest) (*pb.OnlineResponse, error) {logger.Info("调用上线服务")status := model.UserStatus{UserID:     request.UserID,DeviceID:   request.DeviceID,Status:     true,ServerAddr: request.ServerAddr,LoginTime:  time.Now().Unix(),LogoutTime: 0,}err := repository.RegisterUserStatus(c, request.DeviceID, status)if err != nil {logger.Error("将用户在线信息存入Redis失败", zap.String("失败原因", err.Error()))return nil, errors.New("记录用户的在线状态失败")}logger.Info("成功记录用户的在线状态")return &pb.OnlineResponse{}, nil
}// Offline 下线服务
func (u *UserStatusService) Offline(c context.Context, request *pb.OfflineRequest) (*pb.OfflineResponse, error) {logger.Info("调用下线服务")err := repository.UpdateUserStatus(c, request.DeviceID, "Status", false)if err != nil {logger.Error("更新用户在线状态失败", zap.String("失败原因", err.Error()))return nil, errors.New("更新用户在线状态失败")}err = repository.RemoveAfter(c, request.DeviceID, time.Minute*3)if err != nil {logger.Error("设置用户在线状态过期时间失败", zap.String("失败原因", err.Error()))return nil, errors.New("更新用户在线状态失败")}return &pb.OfflineResponse{}, nil
}

服务端启动程序

var (ip   stringport string
)func init() {const (defaultAddr = "127.0.0.1"defaultPort = "8080")flag.StringVar(&ip, "addr", defaultAddr, "IP地址")flag.StringVar(&port, "port", defaultPort, "服务端口")
}func main() {flag.Parse()defer logger.Sync()logger.Info("正在启动状态服务器", zap.String("日志框架", "zap"))// 注册服务ch := make(chan os.Signal, 1)signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGABRT)go func() {s := <-cherr := etcd.UnRegisterFromEtcd(ip + ":" + port)if err != nil {logger.Info("注销服务失败", zap.String("失败原因", err.Error()))} else {logger.Info("注销服务成功")}if i, ok := s.(syscall.Signal); ok {os.Exit(int(i))} else {os.Exit(0)}}()err := etcd.RegisterServerToEtcd(ip + ":" + port)if err != nil {logger.Fatal("注册服务到ETCD失败", zap.String("失败原因", err.Error()), zap.String("程序状态", "即将退出"))}// 添加选项var options []grpc.ServerOptionuserStatusRpcServer := grpc.NewServer(options...)pb.RegisterUserStatusServer(userStatusRpcServer, api.NewUserStatusService())listen, err := net.Listen("tcp", ":8080")logger.Info("服务启动", zap.String("服务地址", fmt.Sprintf("%s:%s", ip, port)))if err != nil {logger.Fatal("启动监听失败", zap.String("错误信息", err.Error()), zap.String("程序状态", "退出"))}if userStatusRpcServer.Serve(listen) != nil {logger.Fatal("启动监听失败", zap.String("错误信息", err.Error()), zap.String("程序状态", "退出"))}
}

客户端测试程序

func main() {cli, cerr := clientv3.NewFromURL(config.EtcdAddr)if cerr != nil {panic(cerr)}etcdResolver, err := resolver.NewBuilder(cli)if err != nil {panic(err)}conn, gerr := grpc.Dial("etcd:///im/state-server", grpc.WithResolvers(etcdResolver), grpc.WithTransportCredentials(insecure.NewCredentials()))if gerr != nil {println(gerr.Error())panic(conn)}defer conn.Close()client := pb.NewUserStatusClient(conn)response, err := client.QueryUserStatus(context.Background(), &pb.QueryUserStatusRequest{UserID: "1", DeviceID: "2"})if err != nil {println(err.Error())}fmt.Printf("%#v", response.Status)
}

总结

本篇文章记录了一个IM系统中状态服务器的设计和实现,完整代码可在github查看:state-server

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/29582.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据标注对新零售的意义及人工智能在新零售领域的应用?

数据标签对于新零售至关重要&#xff0c;因为它构成了训练和部署人工智能&#xff08;AI&#xff09;和机器学习&#xff08;ML&#xff09;模型的基础。在新零售的背景下&#xff0c;数据标签涉及对数据进行分类、标记或注释以使其能够被机器理解的过程。然后&#xff0c;这些…

Qt应用开发(基础篇)——框架类 QFrame

一、前言 QFrame继承于QWidget&#xff0c;被QLCDNumber、QToolBox、QLabel、QListView等部件继承&#xff0c;是一个拥有矩形框架的基类。 QFrame可以直接创建成一个没有内容的的矩形框架&#xff0c;框架的样式由边框厚度(lineWidth)、框架形状(QFrame::Shape)和阴影样式(QFr…

浏览器多管闲事之跨域

年少时的梦想就是买一台小霸王游戏机 当时的宣传语就是小霸王其乐无穷~。 大些了&#xff0c;攒够了零花钱&#xff0c;在家长的带领下终于买到了 那一刻我感觉就是最幸福的人 风都是甜的&#xff01; 哪成想... 刚到家就被家长扣下了 “”禁止未成年人玩游戏机 (问过卖家了&a…

Transformer理论学习

Transformer出自于论文《attention is all you need》。 一些主流的序列模型主要依赖于复杂的循环结构或者CNN&#xff0c;这里面包含了编解码器等。而Transformer主要的结构是基于注意力机制&#xff0c;而且是用多头注意力机制去替换网络中的循环或者CNN(换言之就是transfor…

CD4029计数器实测仿真及BCD转七段码

前面的博文中&#xff0c;我们介绍过CD40110(这是一个常见的直接接7段数码管的计数器&#xff0c;我们这里介绍一款新的计数器CD4029&#xff0c;这也是很常见的计数器&#xff0c;不同的是后者可以输出BCD编码。 文章目录 一、总体效果二、CD4029的管脚和功能介绍1、芯片功能简…

使用 PowerShell 将 Excel 中的每个工作表单独另存为独立的文件

导语&#xff1a;在日常工作中&#xff0c;我们经常需要处理 Excel 文件。本文介绍了如何使用 PowerShell 脚本将一个 Excel 文件中的每个工作表单独另存为独立的 Excel 文件&#xff0c;以提高工作效率。 1. 准备工作 在开始之前&#xff0c;请确保已经安装了 Microsoft Exc…

【毕业项目】自主设计HTTP

博客介绍&#xff1a;运用之前学过的各种知识 自己独立做出一个HTTP服务器 自主设计WEB服务器 背景目标描述技术特点项目定位开发环境WWW介绍 网络协议栈介绍网络协议栈整体网络协议栈细节与http相关的重要协议 HTTP背景知识补充特点uri & url & urn网址url HTTP请求和…

MySQL主从复制基于二进制日志的高可用架构指南

前言 在现代数据库架构中&#xff0c;MySQL主从复制技术扮演着重要角色。它不仅可以提升数据库性能和可扩展性&#xff0c;还赋予系统卓越的高可用性和灾难恢复能力。本文将深入剖析MySQL主从复制的内部机制&#xff0c;同时通过一个实际案例&#xff0c;展示其在实际场景中的…

【计算机视觉】MoCo v2 讲解

在阅读本篇之前建议先学习: 【计算机视觉】MoCo 讲解 【计算机视觉】SimCLR 讲解 MoCo v2 论文信息 标题:Improved Baselines with Momentum Contrastive Learning 作者:Xinlei Chen 期刊: 发布时间与更新时间:2020.03.09 主题:计算机视觉、对比学习 arXiv:[2003.04297]…

Pr2022安装插件beat edit安装之后无法加载音乐怎么办?

你运行设置不对&#xff0c;安装好后试试管理员权限运行。 安装好插件后点击上方的窗口&#xff0c;然后再在里面找到扩展&#xff0c;继续点开里面有个BeatEdit&#xff0c;就是插件本身&#xff0c;点开。先选择一个加载音乐的选项&#xff0c;载入音乐先。这个时候可以按下…

如何对项目中的图片进行优化

优化步骤方案 不用图片。很多时候会使用到很多修饰类图片&#xff0c;其实这类修饰图片 完全可以用 CSS 去代替。对于移动端来说&#xff0c;屏幕宽度就那么点&#xff0c;完全没有必要去加载原图浪 费带宽。一般图片都用 CDN 加载&#xff0c;可以计算出适配屏幕的宽度&#…

CSS 滚动条

一、滚动条样式属性 ::-webkit-scrollbar {width: 6px; /* 竖向滚动条宽度 */height: 6px; /* 横向滚动条高度 */ }::-webkit-scrollbar-thumb {border-radius: 10px; /* 滚动条样式 */-webkit-box-shadow: inset 0 0 3px red; /* 内阴影 */background-color: blue; /* 滚动条…

Redhat Linux 安装MySQL安装手册

Redhat安装MySQL安装手册 1 下载2 上传服务器、解压并安装3 安装安装过程1&#xff1a;MySQL-shared-5.6.51-1.el7.x86_64.rpm安装过程2&#xff1a;MySQL-shared-compat-5.6.51-1.el7.x86_64.rpm安装过程3&#xff1a;MySQL-server-5.6.51-1.el7.x86_64.rpm安装过程4&#xff…

Flutter运行app时向logcat输出当前打开的界面路径且点击可跳转

当一个项目大了目录文件多了&#xff0c;我们往往会为了找到一个文件花费大量的时间和精力&#xff0c;为了快捷方便的调试我们的项目&#xff0c;我们往往需要在打开app运行的时候需要知道当前打开的界面的文件在哪儿&#xff0c;我们这个代码就能快捷的知道我们app正在打开的…

spring-cache框架使用笔记

spring-cache框架使用笔记 什么是spring-cache框架 spring-cache是spring框架中的一个缓存抽象层&#xff0c; 它提供了一种简便的方式来集成不同的底层缓存实现&#xff0c; 如内存缓存(concurrentMap/ehcache/caffeine)/分布式缓存(redis/couchbase)等 它简化了在app中使用…

现代C++中的从头开始深度学习【1/8】:基础知识

一、说明 提及机器学习框架与研究和工业的相关性。现在很少有项目不使用Google TensorFlow或Meta PyTorch&#xff0c;在于它们的可扩展性和灵活性。也就是说&#xff0c;花时间从头开始编码机器学习算法似乎违反直觉&#xff0c;即没有任何基本框架。然而&#xff0c;事实并非…

Unity制作护盾——2、力场冲击波护盾

Unity制作力场护盾 大家好&#xff0c;我是阿赵。   继续做护盾&#xff0c;这一期做一个力场冲击波护盾。 一、效果展示 主要的效果并不是这个球&#xff0c;而是护盾在被攻击的时候&#xff0c;会出现一个扩散的冲击波。比如上图在右边出现了冲击波 如果在左边被攻击&am…

C/C++面试总结

一、关键字static、const、extern、volatile作用 1、const 1.修饰常量 用const修饰的变量是不可变的&#xff0c;修饰后的变量只能使用&#xff0c;不能修改。 2.修饰指针 如果const位于*的左侧&#xff0c;eg&#xff1a;const int* a&#xff0c;则const就是用来修饰指针…

Java基础八 - HTTP相关/Cookie/Session/网络攻击

一、 反射/序列化/拷贝 1. 反射 //反射主要是指程序可以访问、检测和修改它本身状态或行为的一种能力 //在Yaml数据驱动自动化框架比较适用&#xff0c;能获取到当前的类名及方法名 import java.lang.reflect.*;public class ReflectionExample {public static void main(Str…

KingBaseDB对POINT类型和PATH类型的应用

前言 由于业务需要对坐标信息进行存储&#xff0c;包括中心点和区域坐标数组&#xff0c;然后就开始考虑数据的存储问题&#xff0c;刚开始想着用最简单的两个字段分别存储经度纬度&#xff0c;这样发现关联查询表可能会比较多&#xff0c;有点麻烦。之后想起来kingbase好像有…