go-高效处理应用程序数据

一、背景

大型的应用程序为了后期的排障、运营等,会将一些请求、日志、性能指标等数据保存到存储系统中。为了满足这些需求,我们需要进行数据采集,将数据高效的传输到存储系统

二、问题

  • 采集服务仅仅针对某个需求开发,需要修改业务代码逻辑,会给业务带来比较大的负担,并且耦合度太高
  • 数据采集导致已有的服务请求延时变高
  • 采集性能差,需要较长的时间才能采集完一批数据
  • 服务关闭时会丢失数据

三、解决方案

  • 针对问题1,我们可以将数据采集从业务服务中解耦出来,专门创建一个数据采集服务。业务程序只需要将数据传输到指定的中间件,至于数据的处理、采样、过滤、传出等逻辑都在采集服务中完成。
  • 针对问题2,将数据的导出由同步改为异步,异步开启多个协程消费通道中的数据,这样对程序的性能几乎微乎其微
  • 针对问题3,设置好采集最长间隔、批量采集大小以及使用高性能的数据中间件作为中转,比如redis、kafka
  • 针对问题4,为导出器和采集服务增加关闭监听,程序关闭时将数据清空完再退出

四、采集服务实现

4.1 架构设计

在这里插入图片描述

4.2 storage

负责从数据中间件中拿到数据,接口定义如下:

type AnalyticsStorage interface {Init(config interface{}) errorGetName() stringConnect() boolGetAndDeleteSet(string) []interface{}
}

redis的storage实现

import ("crypto/tls""strconv""time"redis "github.com/go-redis/redis/v7""github.com/marmotedu/errors""github.com/mitchellh/mapstructure"genericoptions "github.com/marmotedu/iam/internal/pkg/options""github.com/marmotedu/iam/pkg/log"
)// ------------------- REDIS CLUSTER STORAGE MANAGER -------------------------------// RedisKeyPrefix defines prefix for iam analytics key.
const (RedisKeyPrefix      = "analytics-"defaultRedisAddress = "127.0.0.1:6379"
)var redisClusterSingleton redis.UniversalClient// RedisClusterStorageManager is a storage manager that uses the redis database.
type RedisClusterStorageManager struct {db        redis.UniversalClientKeyPrefix stringHashKeys  boolConfig    genericoptions.RedisOptions
}// NewRedisClusterPool returns a redis cluster client.
func NewRedisClusterPool(forceReconnect bool, config genericoptions.RedisOptions) redis.UniversalClient {if !forceReconnect {if redisClusterSingleton != nil {log.Debug("Redis pool already INITIALIZED")return redisClusterSingleton}} else {if redisClusterSingleton != nil {redisClusterSingleton.Close()}}log.Debug("Creating new Redis connection pool")maxActive := 500if config.MaxActive > 0 {maxActive = config.MaxActive}timeout := 5 * time.Secondif config.Timeout > 0 {timeout = time.Duration(config.Timeout) * time.Second}var tlsConfig *tls.Configif config.UseSSL {tlsConfig = &tls.Config{InsecureSkipVerify: config.SSLInsecureSkipVerify,}}var client redis.UniversalClientopts := &RedisOpts{MasterName:   config.MasterName,Addrs:        getRedisAddrs(config),DB:           config.Database,Password:     config.Password,PoolSize:     maxActive,IdleTimeout:  240 * time.Second,ReadTimeout:  timeout,WriteTimeout: timeout,DialTimeout:  timeout,TLSConfig:    tlsConfig,}if opts.MasterName != "" {log.Info("--> [REDIS] Creating sentinel-backed failover client")client = redis.NewFailoverClient(opts.failover())} else if config.EnableCluster {log.Info("--> [REDIS] Creating cluster client")client = redis.NewClusterClient(opts.cluster())} else {log.Info("--> [REDIS] Creating single-node client")client = redis.NewClient(opts.simple())}redisClusterSingleton = clientreturn client
}func getRedisAddrs(config genericoptions.RedisOptions) (addrs []string) {if len(config.Addrs) != 0 {addrs = config.Addrs}if len(addrs) == 0 && config.Port != 0 {addr := config.Host + ":" + strconv.Itoa(config.Port)addrs = append(addrs, addr)}return addrs
}// RedisOpts is the overridden type of redis.UniversalOptions. simple() and cluster() functions are not public
// in redis library. Therefore, they are redefined in here to use in creation of new redis cluster logic.
// We don't want to use redis.NewUniversalClient() logic.
type RedisOpts redis.UniversalOptionsfunc (o *RedisOpts) cluster() *redis.ClusterOptions {if len(o.Addrs) == 0 {o.Addrs = []string{defaultRedisAddress}}return &redis.ClusterOptions{Addrs:     o.Addrs,OnConnect: o.OnConnect,Password: o.Password,MaxRedirects:   o.MaxRedirects,ReadOnly:       o.ReadOnly,RouteByLatency: o.RouteByLatency,RouteRandomly:  o.RouteRandomly,MaxRetries:      o.MaxRetries,MinRetryBackoff: o.MinRetryBackoff,MaxRetryBackoff: o.MaxRetryBackoff,DialTimeout:        o.DialTimeout,ReadTimeout:        o.ReadTimeout,WriteTimeout:       o.WriteTimeout,PoolSize:           o.PoolSize,MinIdleConns:       o.MinIdleConns,MaxConnAge:         o.MaxConnAge,PoolTimeout:        o.PoolTimeout,IdleTimeout:        o.IdleTimeout,IdleCheckFrequency: o.IdleCheckFrequency,TLSConfig: o.TLSConfig,}
}func (o *RedisOpts) simple() *redis.Options {addr := defaultRedisAddressif len(o.Addrs) > 0 {addr = o.Addrs[0]}return &redis.Options{Addr:      addr,OnConnect: o.OnConnect,DB:       o.DB,Password: o.Password,MaxRetries:      o.MaxRetries,MinRetryBackoff: o.MinRetryBackoff,MaxRetryBackoff: o.MaxRetryBackoff,DialTimeout:  o.DialTimeout,ReadTimeout:  o.ReadTimeout,WriteTimeout: o.WriteTimeout,PoolSize:           o.PoolSize,MinIdleConns:       o.MinIdleConns,MaxConnAge:         o.MaxConnAge,PoolTimeout:        o.PoolTimeout,IdleTimeout:        o.IdleTimeout,IdleCheckFrequency: o.IdleCheckFrequency,TLSConfig: o.TLSConfig,}
}func (o *RedisOpts) failover() *redis.FailoverOptions {if len(o.Addrs) == 0 {o.Addrs = []string{"127.0.0.1:26379"}}return &redis.FailoverOptions{SentinelAddrs: o.Addrs,MasterName:    o.MasterName,OnConnect:     o.OnConnect,DB:       o.DB,Password: o.Password,MaxRetries:      o.MaxRetries,MinRetryBackoff: o.MinRetryBackoff,MaxRetryBackoff: o.MaxRetryBackoff,DialTimeout:  o.DialTimeout,ReadTimeout:  o.ReadTimeout,WriteTimeout: o.WriteTimeout,PoolSize:           o.PoolSize,MinIdleConns:       o.MinIdleConns,MaxConnAge:         o.MaxConnAge,PoolTimeout:        o.PoolTimeout,IdleTimeout:        o.IdleTimeout,IdleCheckFrequency: o.IdleCheckFrequency,TLSConfig: o.TLSConfig,}
}// GetName returns the redis cluster storage manager name.
func (r *RedisClusterStorageManager) GetName() string {return "redis"
}// Init initialize the redis cluster storage manager.
func (r *RedisClusterStorageManager) Init(config interface{}) error {r.Config = genericoptions.RedisOptions{}err := mapstructure.Decode(config, &r.Config)if err != nil {log.Fatalf("Failed to decode configuration: %s", err.Error())}r.KeyPrefix = RedisKeyPrefixreturn nil
}// Connect will establish a connection to the r.db.
func (r *RedisClusterStorageManager) Connect() bool {if r.db == nil {log.Debug("Connecting to redis cluster")r.db = NewRedisClusterPool(false, r.Config)return true}log.Debug("Storage Engine already initialized...")// Reset it just in caser.db = redisClusterSingletonreturn true
}func (r *RedisClusterStorageManager) hashKey(in string) string {return in
}func (r *RedisClusterStorageManager) fixKey(keyName string) string {setKeyName := r.KeyPrefix + r.hashKey(keyName)log.Debugf("Input key was: %s", setKeyName)return setKeyName
}// GetAndDeleteSet get and delete key from redis.
func (r *RedisClusterStorageManager) GetAndDeleteSet(keyName string) []interface{} {log.Debugf("Getting raw key set: %s", keyName)if r.db == nil {log.Warn("Connection dropped, connecting..")r.Connect()return r.GetAndDeleteSet(keyName)}log.Debugf("keyName is: %s", keyName)fixedKey := r.fixKey(keyName)log.Debugf("Fixed keyname is: %s", fixedKey)var lrange *redis.StringSliceCmd_, err := r.db.TxPipelined(func(pipe redis.Pipeliner) error {lrange = pipe.LRange(fixedKey, 0, -1)pipe.Del(fixedKey)return nil})if err != nil {log.Errorf("Multi command failed: %s", err)r.Connect()}vals := lrange.Val()result := make([]interface{}, len(vals))for i, v := range vals {result[i] = v}log.Debugf("Unpacked vals: %d", len(result))return result
}// SetKey will create (or update) a key value in the store.
func (r *RedisClusterStorageManager) SetKey(keyName, session string, timeout int64) error {log.Debugf("[STORE] SET Raw key is: %s", keyName)log.Debugf("[STORE] Setting key: %s", r.fixKey(keyName))r.ensureConnection()err := r.db.Set(r.fixKey(keyName), session, 0).Err()if timeout > 0 {if expErr := r.SetExp(keyName, timeout); expErr != nil {return expErr}}if err != nil {log.Errorf("Error trying to set value: %s", err.Error())return errors.Wrap(err, "failed to set key")}return nil
}// SetExp is used to set the expiry of a key.
func (r *RedisClusterStorageManager) SetExp(keyName string, timeout int64) error {err := r.db.Expire(r.fixKey(keyName), time.Duration(timeout)*time.Second).Err()if err != nil {log.Errorf("Could not EXPIRE key: %s", err.Error())}return errors.Wrap(err, "failed to set expire time for key")
}func (r *RedisClusterStorageManager) ensureConnection() {if r.db != nil {// already connectedreturn}log.Info("Connection dropped, reconnecting...")for {r.Connect()if r.db != nil {// reconnection workedreturn}log.Info("Reconnecting again...")}
}

4.3 pump

我们首先会针对某种业务创建对应的数据结构,比如

type AnalyticsRecord struct {TimeStamp  int64     `json:"timestamp"`Username   string    `json:"username"`Effect     string    `json:"effect"`Conclusion string    `json:"conclusion"`Request    string    `json:"request"`Policies   string    `json:"policies"`Deciders   string    `json:"deciders"`ExpireAt   time.Time `json:"expireAt"   bson:"expireAt"`
}

pump负责将数据导出到指定的数据存储系统,比如promethus、mongo、ES等,它的接口定义如下:

type Pump interface {GetName() stringNew() PumpInit(interface{}) errorWriteData(context.Context, []interface{}) errorSetTimeout(timeout int)GetTimeout() int
}

4.4 exporter

exporter依赖storage和pump,每个exporter负责一种业务,一般对应一种数据结构

type AnalyticsRecord struct {TimeStamp  int64     `json:"timestamp"`Username   string    `json:"username"`Effect     string    `json:"effect"`Conclusion string    `json:"conclusion"`Request    string    `json:"request"`Policies   string    `json:"policies"`Deciders   string    `json:"deciders"`ExpireAt   time.Time `json:"expireAt"   bson:"expireAt"`
}

exporter从storage中取出数据转成对应的数据结构,并进行过滤和去掉冗余字段内容。最后将数据通过pump导出到数据系统中,exporter大概如下:

type AnalyticsExporter struct {storage storage.Storagepump    pump.Pumpfilter      []filter.Filters  //对数据进行过滤timeout               intOmitDetailedRecording bool  //将冗余字段置为空
}func (e *AnalyticsExporter) Export() {//1.从storage中拉取数据//2. 转换为对应的数据结构//3. 过滤数据//4. 置空冗余字段//5. 导出数据
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/45800.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

防火墙小试——部分(书接上回)

toop接上回 1.实验拓扑及要求 前情回顾 DMZ区内的服务器,办公区仅能在办公时间内(9:00 - 18:00)可以访问,生产区的设备全天可以访问. 生产区不允许访问互联网,办公区和游客区允许访问互联网 …

C#统一委托Func与Action

C#在System命名空间下提供两个委托Action和Func,这两个委托最多提供16个参数,基本上可以满足所有自定义事件所需的委托类型。几乎所有的 事件 都可以使用这两个内置的委托Action和Func进行处理。 Action委托: Action定义提供0~16个参数&…

使用亮数据代理IP+Python爬虫批量爬取招聘信息训练面试类AI智能体

本文目录 一、引言二、开发准备三、代码开发四、使用亮数据进行高效爬取4.1 为什么需要亮数据4.2 如何使用亮数据 五、使用数据训练AI智能体六、 总结 一、引言 在当今AI迅速发展的时代,招聘市场正经历着前所未有的变革。传统的招聘方式已难以满足双方的需求。AI智…

canvas快速入门(一)canvas的基础使用

注释很详细&#xff0c;直接上代码 新增内容&#xff1a; 1. canvas的两种创建方式及优劣 2. canvas宽高设置及注意事项 3. 简单测例 项目结构&#xff1a; 源码&#xff1a; index.html <!DOCTYPE html> <html lang"en"> <head><meta charset…

先天睡功-守一老师

描述 守一老师&#xff0c;一个富有才华的老师&#xff01; 对于大家的学习有不可多得的帮助。 内容 目前主要的内容以睡觉为主&#xff0c;对于学习睡睡觉有比较大的帮助&#xff01; 但是网络上面错综复杂&#xff0c;很多老旧的版本影响学习&#xff01; 而这里我整理了…

安全防御实验2

一、实验拓扑 二、实验要求 办公区设备可以通过电信链路和移动链路上网(多对多的NAT&#xff0c;并且需要保留一个公网IP不能用来转换)分公司设备可以通过总公司的移动链路和电信链路访问到Dmz区的http服务器多出口环境基于带宽比例进行选路&#xff0c;但是&#xff0c;办公区…

OZON夏季热卖产品有哪些,OZON夏季热卖新品

OZON平台在夏季的热卖产品种类繁多&#xff0c;涵盖了多个领域&#xff0c;主要包括但不限于以下几个方面&#xff0c;接下来看看OZON夏季热卖产品有哪些&#xff0c;OZON夏季热卖新品&#xff01;Top1 运动套装 Костюм спортивный Victorias Secret 商品id…

【C++】C++入门实战教程(打造属于自己的C++知识库)

目录 目录 写在前面 1.C学习路线 2.本教程框架介绍 一.C基础部分 1.程序编码规范 2.程序运行与编译 3.关键字 4.常用数据类型 5.运算符相关 二.C进阶部分 1.面向对象编程 2.函数编程 3.模板编程 4.多线程与并发 5.STL介绍及使用 6.内存模型与优化 三.C实战部…

美国视觉AI解决方案公司Hayden AI完成9000万美元C轮融资

来源&#xff1a;猛兽财经 作者&#xff1a;猛兽财经 猛兽财经获悉&#xff0c;总部位于美国加利福尼亚州旧金山弗朗西斯科专门为智慧城市提供视觉AI解决方案的Hayden AI&#xff0c;近期宣布已完成9000万美元C轮融资。 本轮融资由The Rise Fund领投&#xff0c;Drawdown Fun…

股指期货存在的风险有哪些?

股指期货因其标的物的特殊性&#xff0c;其面临的风险类型十分复杂&#xff0c;主要面临的一般风险和特有风险如下&#xff1a; 一般风险 从风险是否可控的角度&#xff0c;可以划分为不可控风险和可控风险&#xff1b;从交易环节可分为代理风险、流动性风险、强制平仓风险&…

BUCK外围器件选型,输入电容,输出电容,电感,续流二极管

概述&#xff1a; 一般情况下&#xff0c;电源接口处会有大小不同的电容进行并联&#xff0c;大容量电容是为了防止自身产生干扰影响其他器件&#xff0c;所以叫去耦电容&#xff1b;小容量电容是为了其他高频干扰影响自身&#xff0c;所以叫旁路电容。当然这只是通常情况下。 …

深入剖析多个表left join on的执行步骤原理:实战案例解析与原理探讨

文章目录 文章导图前言初始化数据-建表两个表left jion多表-left jion on c.bidb.bid分析|执行步骤和结果理解 变形-修改c表数据变形1变形2 总结 多表-left jion on c.aida.aid分析执行步骤和结果理解 变形-修改c表数据变形1变形2 解答开头总结 Left join on系列文章测试一下你…

​Chrome 插件: GoFullPage 一键搞定全网页截图

在互联网时代&#xff0c;网页截图已成为我们日常工作和生活中不可或缺的部分。无论是保存重要信息、制作教程&#xff0c;还是分享有趣的内容&#xff0c;截图功能都显得尤为重要。然而&#xff0c;常规的截图工具往往只能截取当前屏幕的内容&#xff0c;对于长网页则显得力不…

做个简单的知识付费网站需要什么方式

网站是线上承载信息宣传的主要工具之一&#xff0c;也是企业公司发展的重要工具之一&#xff0c;除了固定信息呈现外&#xff0c;还有不少商家具备各种方式的干货输出能力&#xff0c;或者想以内容售卖获得一定营收。 如教培机构、自媒体、网校、知识生产者、领域达人等都具备…

【unity笔记】九、Unity添加串口通信

unity仿真使用虚拟串口调试。下面为简单流程。 常用串口调试软件在这里下载。 1.虚拟串口 添加虚拟串口&#xff0c;这里使用com1 com2 2. 串口调试 在这里为虚拟串口发送消息。 3. unity配置 3.1 设置 在文件->生成设置->玩家设置->玩家->其他设置 中找到…

【机器学习】逻辑回归的原理、应用与扩展

文章目录 一、逻辑回归概述二、Sigmoid函数与损失函数2.1 Sigmoid函数2.2 损失函数 三、多分类逻辑回归与优化方法3.1 多分类逻辑回归3.2 优化方法 四、特征离散化 一、逻辑回归概述 逻辑回归是一种常用于分类问题的算法。大家熟悉的线性回归一般形式为 Y a X b \mathbf{Y}…

初学SpringMVC之 JSON 篇

JSON&#xff08;JavaScript Object Notation&#xff0c;JS 对象标记&#xff09;是一种轻量级的数据交换格式 采用完全独立于编程语言的文本格式来存储和表示数据 JSON 键值对是用来保存 JavaScript 对象的一种方式 比如&#xff1a;{"name": "张三"}…

「Pytorch」roLabelImg 图像异常旋转 bug

在进行Yolo-obb 模型训练的时候需要标注旋转框&#xff0c;roLabelImg 是比较推荐的一款旋转框标注工具&#xff0c;既可以标注正常的矩形框&#xff0c;还可以标注旋转框 roLabelImg Github 地址&#xff1a;https://github.com/HumanSignal/labelImg 但是在使用过程中遇到了…

SpringCloud学习

认识微服务 1.单体架构&#xff1a;将业务的所有功能集中在一个项目中开发&#xff0c;打成一个包部署 优点&#xff1a;架构简单 部署成本低 缺点&#xff1a;耦合度高 2.分布式架构&#xff1a;根据业务功能对系统进行拆分&#xff0c;每个业务模块作为独立项目开发&…

k8s record 20240710 监控

不是adaptor 是opetator 案例 监控有了&#xff0c;日志搜集呢&#xff1f; 一、kubelet 的小弟 kubelet — 负责维护容器的生命周期&#xff0c;节点和集群其他部分通信 cAdvisor 集成在 Kubernetes 的 kubelet 中&#xff0c;能够自动发现和监控集群中所有的容器。dockers…