go-redis 封装事件-client封装模型、批量数据处理的导出器设计

一、redis-go的封装实践-client模型

在这里插入图片描述

// Copyright 2020 Lingfei Kong <colin404@foxmail.com>. All rights reserved.
// Use of this source code is governed by a MIT style
// license that can be found in the LICENSE file.package storageimport ("context""crypto/tls""fmt""strconv""strings""sync/atomic""time"redis "github.com/go-redis/redis/v7""github.com/marmotedu/errors"uuid "github.com/satori/go.uuid""github.com/spf13/viper""github.com/marmotedu/iam/pkg/log"
)// Config defines options for redis cluster.
type Config struct {Host                  stringPort                  intAddrs                 []stringMasterName            stringUsername              stringPassword              stringDatabase              intMaxIdle               intMaxActive             intTimeout               intEnableCluster         boolUseSSL                boolSSLInsecureSkipVerify bool
}// ErrRedisIsDown is returned when we can't communicate with redis.
var ErrRedisIsDown = errors.New("storage: Redis is either down or ws not configured")var (singlePool      atomic.ValuesingleCachePool atomic.ValueredisUp         atomic.Value
)var disableRedis atomic.Value// DisableRedis very handy when testsing it allows to dynamically enable/disable talking with redisW.
func DisableRedis(ok bool) {if ok {redisUp.Store(false)disableRedis.Store(true)return}redisUp.Store(true)disableRedis.Store(false)
}func shouldConnect() bool {if v := disableRedis.Load(); v != nil {return !v.(bool)}return true
}// Connected returns true if we are connected to redis.
func Connected() bool {if v := redisUp.Load(); v != nil {return v.(bool)}return false
}//从缓存池中拿到redis客户端
func singleton(cache bool) redis.UniversalClient {if cache {v := singleCachePool.Load()if v != nil {return v.(redis.UniversalClient)}return nil}if v := singlePool.Load(); v != nil {return v.(redis.UniversalClient)}return nil
}//判断是否创建了client,没有则创建一个
func connectSingleton(cache bool, config *Config) bool {if singleton(cache) == nil {log.Debug("Connecting to redis cluster")if cache {singleCachePool.Store(NewRedisClusterPool(cache, config))return true}singlePool.Store(NewRedisClusterPool(cache, config))return true}return true
}// RedisCluster is a storage manager that uses the redis database.
type RedisCluster struct {KeyPrefix stringHashKeys  boolIsCache   bool
}// 测试redis是否可连接
func clusterConnectionIsOpen(cluster RedisCluster) bool {c := singleton(cluster.IsCache)testKey := "redis-test-" + uuid.Must(uuid.NewV4()).String()if err := c.Set(testKey, "test", time.Second).Err(); err != nil {log.Warnf("Error trying to set test key: %s", err.Error())return false}if _, err := c.Get(testKey).Result(); err != nil {log.Warnf("Error trying to get test key: %s", err.Error())return false}return true
}// ConnectToRedis starts a go routine that periodically tries to connect to redis.
func ConnectToRedis(ctx context.Context, config *Config) {tick := time.NewTicker(time.Second)defer tick.Stop()c := []RedisCluster{{}, {IsCache: true},}var ok boolfor _, v := range c {if !connectSingleton(v.IsCache, config) {break}if !clusterConnectionIsOpen(v) {redisUp.Store(false)break}ok = true}redisUp.Store(ok)
again: //不断检查redis连接是否正常,若redis连接不上将redisUp设为falsefor {select {case <-ctx.Done():returncase <-tick.C:if !shouldConnect() {continue}// 创建一个客户端for _, v := range c {if !connectSingleton(v.IsCache, config) {redisUp.Store(false)goto again}if !clusterConnectionIsOpen(v) {redisUp.Store(false)goto again}}redisUp.Store(true)}}
}// NewRedisClusterPool create a redis cluster pool.
func NewRedisClusterPool(isCache bool, config *Config) redis.UniversalClient {// redisSingletonMu is locked and we know the singleton is nillog.Debug("Creating new Redis connection pool")// poolSize applies per cluster node and not for the whole cluster.poolSize := 500if config.MaxActive > 0 {poolSize = config.MaxActive}timeout := 5 * time.Secondif config.Timeout > 0 {timeout = time.Duration(config.Timeout) * time.Second}var tlsConfig *tls.Configif config.UseSSL {tlsConfig = &tls.Config{InsecureSkipVerify: config.SSLInsecureSkipVerify,}}var client redis.UniversalClientopts := &RedisOpts{Addrs:        getRedisAddrs(config),MasterName:   config.MasterName,Password:     config.Password,DB:           config.Database,DialTimeout:  timeout,ReadTimeout:  timeout,WriteTimeout: timeout,IdleTimeout:  240 * timeout,PoolSize:     poolSize,TLSConfig:    tlsConfig,}if opts.MasterName != "" {log.Info("--> [REDIS] Creating sentinel-backed failover client")client = redis.NewFailoverClient(opts.failover()) //创建一个能够支持Redis Sentinel模式下主从切换的Redis客户端} else if config.EnableCluster {log.Info("--> [REDIS] Creating cluster client")client = redis.NewClusterClient(opts.cluster())} else {log.Info("--> [REDIS] Creating single-node client")client = redis.NewClient(opts.simple())}return client
}func getRedisAddrs(config *Config) (addrs []string) {if len(config.Addrs) != 0 {addrs = config.Addrs}if len(addrs) == 0 && config.Port != 0 {addr := config.Host + ":" + strconv.Itoa(config.Port)addrs = append(addrs, addr)}return addrs
}// RedisOpts is the overridden type of redis.UniversalOptions. simple() and cluster() functions are not public in redis
// library.
// Therefore, they are redefined in here to use in creation of new redis cluster logic.
// We don't want to use redis.NewUniversalClient() logic.
type RedisOpts redis.UniversalOptionsfunc (o *RedisOpts) cluster() *redis.ClusterOptions {if len(o.Addrs) == 0 {o.Addrs = []string{"127.0.0.1:6379"}}return &redis.ClusterOptions{Addrs:     o.Addrs,OnConnect: o.OnConnect,Password: o.Password,MaxRedirects:   o.MaxRedirects,ReadOnly:       o.ReadOnly,RouteByLatency: o.RouteByLatency,RouteRandomly:  o.RouteRandomly,MaxRetries:      o.MaxRetries,MinRetryBackoff: o.MinRetryBackoff,MaxRetryBackoff: o.MaxRetryBackoff,DialTimeout:        o.DialTimeout,ReadTimeout:        o.ReadTimeout,WriteTimeout:       o.WriteTimeout,PoolSize:           o.PoolSize,MinIdleConns:       o.MinIdleConns,MaxConnAge:         o.MaxConnAge,PoolTimeout:        o.PoolTimeout,IdleTimeout:        o.IdleTimeout,IdleCheckFrequency: o.IdleCheckFrequency,TLSConfig: o.TLSConfig,}
}// 创建redis单点配置
func (o *RedisOpts) simple() *redis.Options {addr := "127.0.0.1:6379"if len(o.Addrs) > 0 {addr = o.Addrs[0]}return &redis.Options{Addr:      addr,OnConnect: o.OnConnect,DB:       o.DB,Password: o.Password,MaxRetries:      o.MaxRetries,MinRetryBackoff: o.MinRetryBackoff,MaxRetryBackoff: o.MaxRetryBackoff,DialTimeout:  o.DialTimeout,ReadTimeout:  o.ReadTimeout,WriteTimeout: o.WriteTimeout,PoolSize:           o.PoolSize,MinIdleConns:       o.MinIdleConns,MaxConnAge:         o.MaxConnAge,PoolTimeout:        o.PoolTimeout,IdleTimeout:        o.IdleTimeout,IdleCheckFrequency: o.IdleCheckFrequency,TLSConfig: o.TLSConfig,}
}//创建redis集群-从节点配置
func (o *RedisOpts) failover() *redis.FailoverOptions {if len(o.Addrs) == 0 {o.Addrs = []string{"127.0.0.1:26379"}}return &redis.FailoverOptions{SentinelAddrs: o.Addrs,MasterName:    o.MasterName,OnConnect:     o.OnConnect,DB:       o.DB,Password: o.Password,MaxRetries:      o.MaxRetries,MinRetryBackoff: o.MinRetryBackoff,MaxRetryBackoff: o.MaxRetryBackoff,DialTimeout:  o.DialTimeout,ReadTimeout:  o.ReadTimeout,WriteTimeout: o.WriteTimeout,PoolSize:           o.PoolSize,MinIdleConns:       o.MinIdleConns,MaxConnAge:         o.MaxConnAge,PoolTimeout:        o.PoolTimeout,IdleTimeout:        o.IdleTimeout,IdleCheckFrequency: o.IdleCheckFrequency,TLSConfig: o.TLSConfig,}
}// Connect will establish a connection this is always true because we are dynamically using redis.
func (r *RedisCluster) Connect() bool {return true
}func (r *RedisCluster) singleton() redis.UniversalClient {return singleton(r.IsCache)
}func (r *RedisCluster) hashKey(in string) string {if !r.HashKeys {// Not hashing? Return the raw keyreturn in}return HashStr(in)
}func (r *RedisCluster) fixKey(keyName string) string {return r.KeyPrefix + r.hashKey(keyName)
}func (r *RedisCluster) cleanKey(keyName string) string {return strings.Replace(keyName, r.KeyPrefix, "", 1)
}func (r *RedisCluster) up() error {if !Connected() {return ErrRedisIsDown}return nil
}// GetKey will retrieve a key from the database.
func (r *RedisCluster) GetKey(keyName string) (string, error) {if err := r.up(); err != nil {return "", err}cluster := r.singleton()value, err := cluster.Get(r.fixKey(keyName)).Result()if err != nil {log.Debugf("Error trying to get value: %s", err.Error())return "", ErrKeyNotFound}return value, nil
}// GetMultiKey gets multiple keys from the database.
func (r *RedisCluster) GetMultiKey(keys []string) ([]string, error) {if err := r.up(); err != nil {return nil, err}cluster := r.singleton()keyNames := make([]string, len(keys))copy(keyNames, keys)for index, val := range keyNames {keyNames[index] = r.fixKey(val)}result := make([]string, 0)switch v := cluster.(type) {case *redis.ClusterClient:{getCmds := make([]*redis.StringCmd, 0)pipe := v.Pipeline()for _, key := range keyNames {getCmds = append(getCmds, pipe.Get(key))}_, err := pipe.Exec()if err != nil && !errors.Is(err, redis.Nil) {log.Debugf("Error trying to get value: %s", err.Error())return nil, ErrKeyNotFound}for _, cmd := range getCmds {result = append(result, cmd.Val())}}case *redis.Client:{values, err := cluster.MGet(keyNames...).Result()if err != nil {log.Debugf("Error trying to get value: %s", err.Error())return nil, ErrKeyNotFound}for _, val := range values {strVal := fmt.Sprint(val)if strVal == "<nil>" {strVal = ""}result = append(result, strVal)}}}for _, val := range result {if val != "" {return result, nil}}return nil, ErrKeyNotFound
}// GetKeyTTL return ttl of the given key.
func (r *RedisCluster) GetKeyTTL(keyName string) (ttl int64, err error) {if err = r.up(); err != nil {return 0, err}duration, err := r.singleton().TTL(r.fixKey(keyName)).Result()return int64(duration.Seconds()), err
}// GetRawKey return the value of the given key.
func (r *RedisCluster) GetRawKey(keyName string) (string, error) {if err := r.up(); err != nil {return "", err}value, err := r.singleton().Get(keyName).Result()if err != nil {log.Debugf("Error trying to get value: %s", err.Error())return "", ErrKeyNotFound}return value, nil
}// GetExp return the expiry of the given key.
func (r *RedisCluster) GetExp(keyName string) (int64, error) {log.Debugf("Getting exp for key: %s", r.fixKey(keyName))if err := r.up(); err != nil {return 0, err}value, err := r.singleton().TTL(r.fixKey(keyName)).Result()if err != nil {log.Errorf("Error trying to get TTL: ", err.Error())return 0, ErrKeyNotFound}return int64(value.Seconds()), nil
}// SetExp set expiry of the given key.
func (r *RedisCluster) SetExp(keyName string, timeout time.Duration) error {if err := r.up(); err != nil {return err}err := r.singleton().Expire(r.fixKey(keyName), timeout).Err()if err != nil {log.Errorf("Could not EXPIRE key: %s", err.Error())}return err
}// SetKey will create (or update) a key value in the store.
func (r *RedisCluster) SetKey(keyName, session string, timeout time.Duration) error {log.Debugf("[STORE] SET Raw key is: %s", keyName)log.Debugf("[STORE] Setting key: %s", r.fixKey(keyName))if err := r.up(); err != nil {return err}err := r.singleton().Set(r.fixKey(keyName), session, timeout).Err()if err != nil {log.Errorf("Error trying to set value: %s", err.Error())return err}return nil
}// SetRawKey set the value of the given key.
func (r *RedisCluster) SetRawKey(keyName, session string, timeout time.Duration) error {if err := r.up(); err != nil {return err}err := r.singleton().Set(keyName, session, timeout).Err()if err != nil {log.Errorf("Error trying to set value: %s", err.Error())return err}return nil
}// Decrement will decrement a key in redis.
func (r *RedisCluster) Decrement(keyName string) {keyName = r.fixKey(keyName)log.Debugf("Decrementing key: %s", keyName)if err := r.up(); err != nil {return}err := r.singleton().Decr(keyName).Err()if err != nil {log.Errorf("Error trying to decrement value: %s", err.Error())}
}// IncrememntWithExpire will increment a key in redis.
func (r *RedisCluster) IncrememntWithExpire(keyName string, expire int64) int64 {log.Debugf("Incrementing raw key: %s", keyName)if err := r.up(); err != nil {return 0}// This function uses a raw key, so we shouldn't call fixKeyfixedKey := keyNameval, err := r.singleton().Incr(fixedKey).Result()if err != nil {log.Errorf("Error trying to increment value: %s", err.Error())} else {log.Debugf("Incremented key: %s, val is: %d", fixedKey, val)}if val == 1 && expire > 0 {log.Debug("--> Setting Expire")r.singleton().Expire(fixedKey, time.Duration(expire)*time.Second)}return val
}// GetKeys will return all keys according to the filter (filter is a prefix - e.g. tyk.keys.*).
func (r *RedisCluster) GetKeys(filter string) []string {if err := r.up(); err != nil {return nil}client := r.singleton()filterHash := ""if filter != "" {filterHash = r.hashKey(filter)}searchStr := r.KeyPrefix + filterHash + "*"log.Debugf("[STORE] Getting list by: %s", searchStr)fnFetchKeys := func(client *redis.Client) ([]string, error) {values := make([]string, 0)iter := client.Scan(0, searchStr, 0).Iterator()for iter.Next() {values = append(values, iter.Val())}if err := iter.Err(); err != nil {return nil, err}return values, nil}var err errorvar values []stringsessions := make([]string, 0)switch v := client.(type) {case *redis.ClusterClient:ch := make(chan []string)go func() {err = v.ForEachMaster(func(client *redis.Client) error {values, err = fnFetchKeys(client)if err != nil {return err}ch <- valuesreturn nil})close(ch)}()for res := range ch {sessions = append(sessions, res...)}case *redis.Client:sessions, err = fnFetchKeys(v)}if err != nil {log.Errorf("Error while fetching keys: %s", err)return nil}for i, v := range sessions {sessions[i] = r.cleanKey(v)}return sessions
}// GetKeysAndValuesWithFilter will return all keys and their values with a filter.
func (r *RedisCluster) GetKeysAndValuesWithFilter(filter string) map[string]string {if err := r.up(); err != nil {return nil}keys := r.GetKeys(filter)if keys == nil {log.Error("Error trying to get filtered client keys")return nil}if len(keys) == 0 {return nil}for i, v := range keys {keys[i] = r.KeyPrefix + v}client := r.singleton()values := make([]string, 0)switch v := client.(type) {case *redis.ClusterClient:{getCmds := make([]*redis.StringCmd, 0)pipe := v.Pipeline()for _, key := range keys {getCmds = append(getCmds, pipe.Get(key))}_, err := pipe.Exec()if err != nil && !errors.Is(err, redis.Nil) {log.Errorf("Error trying to get client keys: %s", err.Error())return nil}for _, cmd := range getCmds {values = append(values, cmd.Val())}}case *redis.Client:{result, err := v.MGet(keys...).Result()if err != nil {log.Errorf("Error trying to get client keys: %s", err.Error())return nil}for _, val := range result {strVal := fmt.Sprint(val)if strVal == "<nil>" {strVal = ""}values = append(values, strVal)}}}m := make(map[string]string)for i, v := range keys {m[r.cleanKey(v)] = values[i]}return m
}// GetKeysAndValues will return all keys and their values - not to be used lightly.
func (r *RedisCluster) GetKeysAndValues() map[string]string {return r.GetKeysAndValuesWithFilter("")
}// DeleteKey will remove a key from the database.
func (r *RedisCluster) DeleteKey(keyName string) bool {if err := r.up(); err != nil {// log.Debug(err)return false}log.Debugf("DEL Key was: %s", keyName)log.Debugf("DEL Key became: %s", r.fixKey(keyName))n, err := r.singleton().Del(r.fixKey(keyName)).Result()if err != nil {log.Errorf("Error trying to delete key: %s", err.Error())}return n > 0
}// DeleteAllKeys will remove all keys from the database.
func (r *RedisCluster) DeleteAllKeys() bool {if err := r.up(); err != nil {return false}n, err := r.singleton().FlushAll().Result()if err != nil {log.Errorf("Error trying to delete keys: %s", err.Error())}if n == "OK" {return true}return false
}// DeleteRawKey will remove a key from the database without prefixing, assumes user knows what they are doing.
func (r *RedisCluster) DeleteRawKey(keyName string) bool {if err := r.up(); err != nil {return false}n, err := r.singleton().Del(keyName).Result()if err != nil {log.Errorf("Error trying to delete key: %s", err.Error())}return n > 0
}// DeleteScanMatch will remove a group of keys in bulk.
func (r *RedisCluster) DeleteScanMatch(pattern string) bool {if err := r.up(); err != nil {return false}client := r.singleton()log.Debugf("Deleting: %s", pattern)fnScan := func(client *redis.Client) ([]string, error) {values := make([]string, 0)iter := client.Scan(0, pattern, 0).Iterator()for iter.Next() {values = append(values, iter.Val())}if err := iter.Err(); err != nil {return nil, err}return values, nil}var err errorvar keys []stringvar values []stringswitch v := client.(type) {case *redis.ClusterClient:ch := make(chan []string)go func() {err = v.ForEachMaster(func(client *redis.Client) error {values, err = fnScan(client)if err != nil {return err}ch <- valuesreturn nil})close(ch)}()for vals := range ch {keys = append(keys, vals...)}case *redis.Client:keys, err = fnScan(v)}if err != nil {log.Errorf("SCAN command field with err: %s", err.Error())return false}if len(keys) > 0 {for _, name := range keys {log.Infof("Deleting: %s", name)err := client.Del(name).Err()if err != nil {log.Errorf("Error trying to delete key: %s - %s", name, err.Error())}}log.Infof("Deleted: %d records", len(keys))} else {log.Debug("RedisCluster called DEL - Nothing to delete")}return true
}// DeleteKeys will remove a group of keys in bulk.
func (r *RedisCluster) DeleteKeys(keys []string) bool {if err := r.up(); err != nil {return false}if len(keys) > 0 {for i, v := range keys {keys[i] = r.fixKey(v)}log.Debugf("Deleting: %v", keys)client := r.singleton()switch v := client.(type) {case *redis.ClusterClient:{pipe := v.Pipeline()for _, k := range keys {pipe.Del(k)}if _, err := pipe.Exec(); err != nil {log.Errorf("Error trying to delete keys: %s", err.Error())}}case *redis.Client:{_, err := v.Del(keys...).Result()if err != nil {log.Errorf("Error trying to delete keys: %s", err.Error())}}}} else {log.Debug("RedisCluster called DEL - Nothing to delete")}return true
}// StartPubSubHandler will listen for a signal and run the callback for
// every subscription and message event.
func (r *RedisCluster) StartPubSubHandler(channel string, callback func(interface{})) error {if err := r.up(); err != nil {return err}client := r.singleton()if client == nil {return errors.New("redis connection failed")}pubsub := client.Subscribe(channel)defer pubsub.Close()if _, err := pubsub.Receive(); err != nil {log.Errorf("Error while receiving pubsub message: %s", err.Error())return err}for msg := range pubsub.Channel() {callback(msg)}return nil
}// Publish publish a message to the specify channel.
func (r *RedisCluster) Publish(channel, message string) error {if err := r.up(); err != nil {return err}err := r.singleton().Publish(channel, message).Err()if err != nil {log.Errorf("Error trying to set value: %s", err.Error())return err}return nil
}// GetAndDeleteSet get and delete a key.
func (r *RedisCluster) GetAndDeleteSet(keyName string) []interface{} {log.Debugf("Getting raw key set: %s", keyName)if err := r.up(); err != nil {return nil}log.Debugf("keyName is: %s", keyName)fixedKey := r.fixKey(keyName)log.Debugf("Fixed keyname is: %s", fixedKey)client := r.singleton()var lrange *redis.StringSliceCmd_, err := client.TxPipelined(func(pipe redis.Pipeliner) error {lrange = pipe.LRange(fixedKey, 0, -1)pipe.Del(fixedKey)return nil})if err != nil {log.Errorf("Multi command failed: %s", err.Error())return nil}vals := lrange.Val()log.Debugf("Analytics returned: %d", len(vals))if len(vals) == 0 {return nil}log.Debugf("Unpacked vals: %d", len(vals))result := make([]interface{}, len(vals))for i, v := range vals {result[i] = v}return result
}// AppendToSet append a value to the key set.
func (r *RedisCluster) AppendToSet(keyName, value string) {fixedKey := r.fixKey(keyName)log.Debug("Pushing to raw key list", log.String("keyName", keyName))log.Debug("Appending to fixed key list", log.String("fixedKey", fixedKey))if err := r.up(); err != nil {return}if err := r.singleton().RPush(fixedKey, value).Err(); err != nil {log.Errorf("Error trying to append to set keys: %s", err.Error())}
}// Exists check if keyName exists.
func (r *RedisCluster) Exists(keyName string) (bool, error) {fixedKey := r.fixKey(keyName)log.Debug("Checking if exists", log.String("keyName", fixedKey))exists, err := r.singleton().Exists(fixedKey).Result()if err != nil {log.Errorf("Error trying to check if key exists: %s", err.Error())return false, err}if exists == 1 {return true, nil}return false, nil
}// RemoveFromList delete an value from a list idetinfied with the keyName.
func (r *RedisCluster) RemoveFromList(keyName, value string) error {fixedKey := r.fixKey(keyName)log.Debug("Removing value from list",log.String("keyName", keyName),log.String("fixedKey", fixedKey),log.String("value", value),)if err := r.singleton().LRem(fixedKey, 0, value).Err(); err != nil {log.Error("LREM command failed",log.String("keyName", keyName),log.String("fixedKey", fixedKey),log.String("value", value),log.String("error", err.Error()),)return err}return nil
}// GetListRange gets range of elements of list identified by keyName.
func (r *RedisCluster) GetListRange(keyName string, from, to int64) ([]string, error) {fixedKey := r.fixKey(keyName)elements, err := r.singleton().LRange(fixedKey, from, to).Result()if err != nil {log.Error("LRANGE command failed",log.String("keyName",keyName,),log.String("fixedKey", fixedKey),log.Int64("from", from),log.Int64("to", to),log.String("error", err.Error()),)return nil, err}return elements, nil
}// AppendToSetPipelined append values to redis pipeline.
func (r *RedisCluster) AppendToSetPipelined(key string, values [][]byte) {if len(values) == 0 {return}fixedKey := r.fixKey(key)if err := r.up(); err != nil {log.Debug(err.Error())return}client := r.singleton()pipe := client.Pipeline()for _, val := range values {pipe.RPush(fixedKey, val)}if _, err := pipe.Exec(); err != nil {log.Errorf("Error trying to append to set keys: %s", err.Error())}// if we need to set an expiration timeif storageExpTime := int64(viper.GetDuration("analytics.storage-expiration-time")); storageExpTime != int64(-1) {// If there is no expiry on the analytics set, we should set it.exp, _ := r.GetExp(key)if exp == -1 {_ = r.SetExp(key, time.Duration(storageExpTime)*time.Second)}}
}// GetSet return key set value.
func (r *RedisCluster) GetSet(keyName string) (map[string]string, error) {log.Debugf("Getting from key set: %s", keyName)log.Debugf("Getting from fixed key set: %s", r.fixKey(keyName))if err := r.up(); err != nil {return nil, err}val, err := r.singleton().SMembers(r.fixKey(keyName)).Result()if err != nil {log.Errorf("Error trying to get key set: %s", err.Error())return nil, err}result := make(map[string]string)for i, value := range val {result[strconv.Itoa(i)] = value}return result, nil
}// AddToSet add value to key set.
func (r *RedisCluster) AddToSet(keyName, value string) {log.Debugf("Pushing to raw key set: %s", keyName)log.Debugf("Pushing to fixed key set: %s", r.fixKey(keyName))if err := r.up(); err != nil {return}err := r.singleton().SAdd(r.fixKey(keyName), value).Err()if err != nil {log.Errorf("Error trying to append keys: %s", err.Error())}
}// RemoveFromSet remove a value from key set.
func (r *RedisCluster) RemoveFromSet(keyName, value string) {log.Debugf("Removing from raw key set: %s", keyName)log.Debugf("Removing from fixed key set: %s", r.fixKey(keyName))if err := r.up(); err != nil {log.Debug(err.Error())return}err := r.singleton().SRem(r.fixKey(keyName), value).Err()if err != nil {log.Errorf("Error trying to remove keys: %s", err.Error())}
}// IsMemberOfSet return whether the given value belong to key set.
func (r *RedisCluster) IsMemberOfSet(keyName, value string) bool {if err := r.up(); err != nil {log.Debug(err.Error())return false}val, err := r.singleton().SIsMember(r.fixKey(keyName), value).Result()if err != nil {log.Errorf("Error trying to check set member: %s", err.Error())return false}log.Debugf("SISMEMBER %s %s %v %v", keyName, value, val, err)return val
}// SetRollingWindow will append to a sorted set in redis and extract a timed window of values.
func (r *RedisCluster) SetRollingWindow(keyName string,per int64,valueOverride string,pipeline bool,
) (int, []interface{}) {log.Debugf("Incrementing raw key: %s", keyName)if err := r.up(); err != nil {log.Debug(err.Error())return 0, nil}log.Debugf("keyName is: %s", keyName)now := time.Now()log.Debugf("Now is: %v", now)onePeriodAgo := now.Add(time.Duration(-1*per) * time.Second)log.Debugf("Then is: %v", onePeriodAgo)client := r.singleton()var zrange *redis.StringSliceCmdpipeFn := func(pipe redis.Pipeliner) error {pipe.ZRemRangeByScore(keyName, "-inf", strconv.Itoa(int(onePeriodAgo.UnixNano())))zrange = pipe.ZRange(keyName, 0, -1)element := redis.Z{Score: float64(now.UnixNano()),}if valueOverride != "-1" {element.Member = valueOverride} else {element.Member = strconv.Itoa(int(now.UnixNano()))}pipe.ZAdd(keyName, &element)pipe.Expire(keyName, time.Duration(per)*time.Second)return nil}var err errorif pipeline {_, err = client.Pipelined(pipeFn)} else {_, err = client.TxPipelined(pipeFn)}if err != nil {log.Errorf("Multi command failed: %s", err.Error())return 0, nil}values := zrange.Val()// Check actual valueif values == nil {return 0, nil}intVal := len(values)result := make([]interface{}, len(values))for i, v := range values {result[i] = v}log.Debugf("Returned: %d", intVal)return intVal, result
}// GetRollingWindow return rolling window.
func (r RedisCluster) GetRollingWindow(keyName string, per int64, pipeline bool) (int, []interface{}) {if err := r.up(); err != nil {log.Debug(err.Error())return 0, nil}now := time.Now()onePeriodAgo := now.Add(time.Duration(-1*per) * time.Second)client := r.singleton()var zrange *redis.StringSliceCmdpipeFn := func(pipe redis.Pipeliner) error {pipe.ZRemRangeByScore(keyName, "-inf", strconv.Itoa(int(onePeriodAgo.UnixNano())))zrange = pipe.ZRange(keyName, 0, -1)return nil}var err errorif pipeline {_, err = client.Pipelined(pipeFn)} else {_, err = client.TxPipelined(pipeFn)}if err != nil {log.Errorf("Multi command failed: %s", err.Error())return 0, nil}values := zrange.Val()// Check actual valueif values == nil {return 0, nil}intVal := len(values)result := make([]interface{}, intVal)for i, v := range values {result[i] = v}log.Debugf("Returned: %d", intVal)return intVal, result
}// GetKeyPrefix returns storage key prefix.
func (r *RedisCluster) GetKeyPrefix() string {return r.KeyPrefix
}// AddToSortedSet adds value with given score to sorted set identified by keyName.
func (r *RedisCluster) AddToSortedSet(keyName, value string, score float64) {fixedKey := r.fixKey(keyName)log.Debug("Pushing raw key to sorted set", log.String("keyName", keyName), log.String("fixedKey", fixedKey))if err := r.up(); err != nil {log.Debug(err.Error())return}member := redis.Z{Score: score, Member: value}if err := r.singleton().ZAdd(fixedKey, &member).Err(); err != nil {log.Error("ZADD command failed",log.String("keyName", keyName),log.String("fixedKey", fixedKey),log.String("error", err.Error()),)}
}// GetSortedSetRange gets range of elements of sorted set identified by keyName.
func (r *RedisCluster) GetSortedSetRange(keyName, scoreFrom, scoreTo string) ([]string, []float64, error) {fixedKey := r.fixKey(keyName)log.Debug("Getting sorted set range",log.String("keyName",keyName,),log.String("fixedKey", fixedKey),log.String("scoreFrom", scoreFrom),log.String("scoreTo", scoreTo),)args := redis.ZRangeBy{Min: scoreFrom, Max: scoreTo}values, err := r.singleton().ZRangeByScoreWithScores(fixedKey, &args).Result()if err != nil {log.Error("ZRANGEBYSCORE command failed",log.String("keyName",keyName,),log.String("fixedKey", fixedKey),log.String("scoreFrom", scoreFrom),log.String("scoreTo", scoreTo),log.String("error", err.Error()),)return nil, nil, err}if len(values) == 0 {return nil, nil, nil}elements := make([]string, len(values))scores := make([]float64, len(values))for i, v := range values {elements[i] = fmt.Sprint(v.Member)scores[i] = v.Score}return elements, scores, nil
}// RemoveSortedSetRange removes range of elements from sorted set identified by keyName.
func (r *RedisCluster) RemoveSortedSetRange(keyName, scoreFrom, scoreTo string) error {fixedKey := r.fixKey(keyName)log.Debug("Removing sorted set range",log.String("keyName",keyName,),log.String("fixedKey", fixedKey),log.String("scoreFrom", scoreFrom),log.String("scoreTo", scoreTo),)if err := r.singleton().ZRemRangeByScore(fixedKey, scoreFrom, scoreTo).Err(); err != nil {log.Debug("ZREMRANGEBYSCORE command failed",log.String("keyName", keyName),log.String("fixedKey", fixedKey),log.String("scoreFrom", scoreFrom),log.String("scoreTo", scoreTo),log.String("error", err.Error()),)return err}return nil
}

二、批量导出日志的实践——导出器模型

工作核心

用户设置导出器的缓冲区大小以及导出间隔时间,当大小到达阈值或者达到间隔时间将数据导出到指定存储中间件
在这里插入图片描述
为了加快导出器的效率不至于让它的缓冲通道很快就满,因此内部开启多个worker并行工作,对channel中的消息进行消费

代码逻辑

在这里插入图片描述

在这里插入图片描述

代码

package analyticsimport ("sync""sync/atomic""time""github.com/vmihailenco/msgpack/v5""github.com/marmotedu/iam/pkg/log""github.com/marmotedu/iam/pkg/storage"
)const analyticsKeyName = "iam-system-analytics"const (recordsBufferForcedFlushInterval = 1 * time.Second
)// AnalyticsRecord encodes the details of a authorization request.
type AnalyticsRecord struct {TimeStamp  int64     `json:"timestamp"`Username   string    `json:"username"`Effect     string    `json:"effect"`Conclusion string    `json:"conclusion"`Request    string    `json:"request"`Policies   string    `json:"policies"`Deciders   string    `json:"deciders"`ExpireAt   time.Time `json:"expireAt"   bson:"expireAt"`
}var analytics *Analytics// SetExpiry set expiration time to a key.
func (a *AnalyticsRecord) SetExpiry(expiresInSeconds int64) {expiry := time.Duration(expiresInSeconds) * time.Secondif expiresInSeconds == 0 {// Expiry is set to 100 yearsexpiry = 24 * 365 * 100 * time.Hour}t := time.Now()t2 := t.Add(expiry)a.ExpireAt = t2
}// Analytics will record analytics data to a redis back end as defined in the Config object.
type Analytics struct {store                      storage.AnalyticsHandlerpoolSize                   intrecordsChan                chan *AnalyticsRecordworkerBufferSize           uint64recordsBufferFlushInterval uint64shouldStop                 uint32poolWg                     sync.WaitGroup
}// NewAnalytics returns a new analytics instance.
func NewAnalytics(options *AnalyticsOptions, store storage.AnalyticsHandler) *Analytics {ps := options.PoolSizerecordsBufferSize := options.RecordsBufferSizeworkerBufferSize := recordsBufferSize / uint64(ps)log.Debug("Analytics pool worker buffer size", log.Uint64("workerBufferSize", workerBufferSize))recordsChan := make(chan *AnalyticsRecord, recordsBufferSize)analytics = &Analytics{store:                      store,poolSize:                   ps,recordsChan:                recordsChan,workerBufferSize:           workerBufferSize,recordsBufferFlushInterval: options.FlushInterval,}return analytics
}// GetAnalytics returns the existed analytics instance.
// Need to initialize `analytics` instance before calling GetAnalytics.
func GetAnalytics() *Analytics {return analytics
}// Start start the analytics service.
func (r *Analytics) Start() {r.store.Connect()// start worker poolatomic.SwapUint32(&r.shouldStop, 0)for i := 0; i < r.poolSize; i++ {r.poolWg.Add(1)go r.recordWorker()}
}// 保证starter的优雅关闭要做到三点:
//1. 通过shouldStop变量控制不能再有新的数据写入
//2. 关闭channel,通知所有协程处理好剩余数据
//3. 通过waitGroup等待现有数据处理结束后再返回
func (r *Analytics) Stop() {// flag to stop sending records into channelatomic.SwapUint32(&r.shouldStop, 1)// close channel to stop workersclose(r.recordsChan)// wait for all workers to be doner.poolWg.Wait()
}// RecordHit will store an AnalyticsRecord in Redis.
func (r *Analytics) RecordHit(record *AnalyticsRecord) error {// check if we should stop sending records 1stif atomic.LoadUint32(&r.shouldStop) > 0 {return nil}// just send record to channel consumed by pool of workers// leave all data crunching and Redis I/O work for pool workersr.recordsChan <- recordreturn nil
}func (r *Analytics) recordWorker() {defer r.poolWg.Done()// this is buffer to send one pipelined command to redis// use r.recordsBufferSize as cap to reduce slice re-allocationsrecordsBuffer := make([][]byte, 0, r.workerBufferSize)// read records from channel and processlastSentTS := time.Now()for {var readyToSend boolselect {case record, ok := <-r.recordsChan:// check if channel was closed and it is time to exit from workerif !ok {// send what is left in bufferr.store.AppendToSetPipelined(analyticsKeyName, recordsBuffer)return}// we have new record - prepare it and add to bufferif encoded, err := msgpack.Marshal(record); err != nil {log.Errorf("Error encoding analytics data: %s", err.Error())} else {recordsBuffer = append(recordsBuffer, encoded)}// identify that buffer is ready to be sentreadyToSend = uint64(len(recordsBuffer)) == r.workerBufferSizecase <-time.After(time.Duration(r.recordsBufferFlushInterval) * time.Millisecond):// nothing was received for that period of time// anyways send whatever we have, don't hold data too long in bufferreadyToSend = true}// send data to Redis and reset bufferif len(recordsBuffer) > 0 && (readyToSend || time.Since(lastSentTS) >= recordsBufferForcedFlushInterval) {r.store.AppendToSetPipelined(analyticsKeyName, recordsBuffer)recordsBuffer = recordsBuffer[:0]lastSentTS = time.Now()}}
}// DurationToMillisecond convert time duration type to float64.
func DurationToMillisecond(d time.Duration) float64 {return float64(d) / 1e6
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/41624.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Halcon 铣刀刀口破损缺陷检测

一 OTSU OTSU&#xff0c;是一种自适应阈值确定的方法,又叫大津法&#xff0c;简称OTSU&#xff0c;是一种基于全局的二值化算法,它是根据图像的灰度特性,将图像分为前景和背景两个部分。当取最佳阈值时&#xff0c;两部分之间的差别应该是最大的&#xff0c;在OTSU算法中所采…

排序 -- 万能测试oj

. - 力扣&#xff08;LeetCode&#xff09; 这道题我们可以使用我们学过的那些常见的排序方法来进行解答 //插入排序 void InsertSort(int* nums, int n) {for (int i 0; i < n-1; i){int end i;int tmp nums[end 1];while (end > 0){if (tmp < nums[end]){nums[…

PyVideoTrans:一款功能全面的视频翻译配音工具!【送源码】

PyVideoTrans是一款功能全面的视频翻译配音工具&#xff0c;专为视频内容创作者设计。它能够将视频中的语言翻译成另一种语言&#xff0c;并自动生成与之匹配的字幕和配音。支持多种语言&#xff0c;包括但不限于中文&#xff08;简繁体&#xff09;、英语、韩语、日语、俄语、…

Wormhole Filters: Caching Your Hash on Persistent Memory——泛读笔记

EuroSys 2024 Paper 论文阅读笔记整理 问题 近似成员关系查询&#xff08;AMQ&#xff09;数据结构可以高效地近似确定元素是否在集合中&#xff0c;例如Bloom滤波器[10]、cuckoo滤波器[23]、quotient滤波器[8]及其变体。但AMQ数据结构的内存消耗随着数据规模的增长而快速增长…

MSPM0G3507——串口0从数据线传输变为IO口传输

默认的跳线帽时这样的&#xff0c;这样时是数据线传输 需要改成这样&#xff0c;即可用IO口进行数据传输

windows系统本地端口被占用的问题

第一步&#xff1a;查找所有运行的端口 按住“WindowsR”组合键&#xff0c;打开命令窗口&#xff0c;输入【cmd】命令&#xff0c;回车。在弹出的窗口中输入 命令【netstat -ano】&#xff0c;再按一下回车键 Win系统端口被占用-查找所有运行的端口 第二步&#xff1a;查看…

opencv_C++学习笔记(入门30讲)

文章目录 1.配置开发环境2.图像读取与显示3.图像色彩空间转换4.图像对象的创建与赋值5.图像像素的读写操作6.图像像素的算数操作7.滚动条-调整图像亮度8.滚动条-调整对比度和亮度9.键盘响应操作10.图像像素的逻辑操作11.图像的通道分离和合并12.图像色彩空间转换13.图像的像素值…

阿里云存储的降本增效与运维

小浩负责公司存储架构层&#xff0c;需要确保存储层不会成为公司业务系统的性能瓶颈&#xff0c;让数据读写达到最佳性能。那么小浩可以从哪些方面着手优化性能呢&#xff1f;他继续求助系统架构师大雷。 小浩&#xff1a;雷哥&#xff0c;PD反馈公司系统最近响应很慢&#xff…

HTTP模块(一)

HTTP服务 本小节主要讲解HTTP服务如何创建服务&#xff0c;查看HTTP请求&响应报文&#xff0c;还有注意事项说明&#xff0c;另外讲解本地环境&Node环境&浏览器之间的链路图示&#xff0c;如何提取HTTP报文字符串&#xff0c;及报错信息查询。 创建HTTP服务端 c…

lspci

【原】Linux之PCIE三种空间解析 PCIe学习笔记——2.PCIe配置空间 PCIE学习&#xff08;2&#xff09;PCIE配置空间详解 开发者分享 | 使用 lspci 和 setpci 调试 PCIe 问题 b : 字节 w&#xff1a;word L&#xff1a; 4byte

LLM - 词表示和语言模型

一. 词的相似度表示 (1): 用一系列与该词相关的词来表示 (2): 把每个词表示一个独立的符号(one hot) (3): 利用该词上下文的词来表示该词 (3): 建立一个低维度的向量空间&#xff0c;用深度学习方法将该词映射到这个空间里(Word Embedding) 二&#xff1a;语言模型 (1): 根…

Redis源码整体结构

一 前言 Redis源码研究为什么先介绍整体结构呢?其实也很简单,作为程序员的,要想对一个项目有快速的认知,对项目整体目录结构有一个清晰认识,有助于我们更好的了解这个系统。 二 目录结构 Redis源码download到本地之后,对应结构如下: 从上面的截图可以看出,Redis源码一…

52-5 内网代理2 - LCX端口转发(不推荐使用LCX)

环境搭建: 本地开3台虚拟机:kali(必须)、windows2012与2008 (可换成其他windows虚拟机) kali - 网络配置成桥接模式 windows2012 - 设置两个网卡,NAT与桥接模式 注意:windows2012要关闭防火墙,要不然其他主机ping不通 关闭防火墙后再开启远程桌面连接 windwos20…

去O化神器 Exbase

随着去O化进程推动&#xff0c;很多旧业务依赖的oracle数据库&#xff0c;都需要实现做数据库的替换&#xff0c;当下能很好兼容Oracle&#xff0c;并实现异构数据库之间转换的工具并不多。这里给大家推荐一个商业工具数据库迁移工具exbase&#xff08;北京海量&#xff09;&am…

昇思MindSpore 25天学习打卡营|day18

DCGAN生成漫画头像 在下面的教程中&#xff0c;我们将通过示例代码说明DCGAN网络如何设置网络、优化器、如何计算损失函数以及如何初始化模型权重。在本教程中&#xff0c;使用的动漫头像数据集共有70,171张动漫头像图片&#xff0c;图片大小均为96*96。 GAN基础原理 这部分原…

想知道你的电脑能不能和如何升级RAM吗?这里有你想要的一些提示

考虑给你的电脑增加更多的RAM,但不确定从哪里开始?本指南涵盖了有关升级Windows PC或笔记本电脑中RAM的所有信息。 你需要升级RAM吗 在深入研究升级RAM的过程之前,评估是否需要升级是至关重要的。你是否经历过系统滞后、频繁的BSOD错误或应用程序和程序突然崩溃?这些症状…

从零开始的python学习生活

pycharm部分好用快捷键 变量名的定义 与之前学习过的语言有所不同的是&#xff0c;python中变量名的定义更加的简洁 such as 整形。浮点型和字符串的定义 money50 haha13.14 gaga"hello"字符串的定义依然是需要加上引号&#xff0c;也不需要写&#xff1b;了 字符…

【网站推荐】Developer Roadmaps 开发者学习路线

你是否想学习某门技术而苦苦找不到学习路线。本文推荐一个网站&#xff0c;解决学习路径问题。 roadmap.sh 旨在创建路线图、指南和其他教育内容&#xff0c;以帮助指导开发人员选择路径并指导他们的学习。 技术路线包括了前端后端安卓iosUI设计等内容&#xff0c;一些技术比如…

antdesignvue对话框用户可移动并缩放

原贴 轻松搞定Ant Design Modal对话框拖拽缩放 - ByteZoneX社区https://www.bytezonex.com/archives/IFRuoJhd.html Ant Design 模态对话框&#xff1a;实现拖拽缩放功能 **子 Ant Design 是一个流行的前端 UI 框架&#xff0c;提供了一系列实用的组件&#xff0c;包括模态对…

ESP32CAM物联网教学02

ESP32CAM物联网教学02 物联网门锁 小智来到姑姑家门口&#xff0c;按了门铃&#xff1b;还在公司上班的姑姑用电脑给小智开了门&#xff0c;让他先进屋休息。小智对物联网门锁产生了兴趣&#xff1a;什么是物联网&#xff1f;为什么这么厉害&#xff1f; 初识物联网 我们在百…