influxdb数据过期_Influxdb Cluster下的数据写入

Cluster下的数据写入

数据写入的实现主要分析cluster/points_writer.go中的WritePoints函数的实现// WritePoints writes across multiple local and remote data nodes according the consistency level.func (w *PointsWriter) WritePoints(p *WritePointsRequest) error {

w.statMap.Add(statWriteReq, 1)

w.statMap.Add(statPointWriteReq, int64(len(p.Points)))    //2.1 先获取RetentionPolicy

if p.RetentionPolicy == "" {

db, err := w.MetaClient.Database(p.Database)        if err != nil {            return err

} else if db == nil {            return influxdb.ErrDatabaseNotFound(p.Database)

}

p.RetentionPolicy = db.DefaultRetentionPolicy

}    // 2.2 生成 shardMap

shardMappings, err := w.MapShards(p)    if err != nil {        return err

}    // Write each shard in it's own goroutine and return as soon

// as one fails.

ch := make(chan error, len(shardMappings.Points))    for shardID, points := range shardMappings.Points {

// 2.3 写入数据到Shard

go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {

ch 

}(shardMappings.Shards[shardID], p.Database, p.RetentionPolicy, points)

}    // Send points to subscriptions if possible.

ok := false

// We need to lock just in case the channel is about to be nil'ed

w.mu.RLock()

select {    case w.subPoints 

ok = true

default:

}

w.mu.RUnlock()    if ok {

w.statMap.Add(statSubWriteOK, 1)

} else {

w.statMap.Add(statSubWriteDrop, 1)

}    // 2.4 等待写入完成

for range shardMappings.Points {

select {        case 

}

}

}    return nil}上面的函数实现主要分如下几个步骤

2.1 获取对应的RetentionPolicy

2.2 生成ShardMap, 将各个point对应到相应ShardGroup中的Shard中, 这步很关键

2.3 按ShardId不同,开启新的goroutine, 将points写入相应的Shard,可能设计对写入数据到其它的DataNode上;

2.4 等待写入完成或退出

ShardMap的生成先讲一下ShardGroup的概念

1.1 写入Influxdb的每一条数据对带有相应的time时间,每一个SharGroup都有自己的start和end时间,这个时间跨度是由用户写入时选取的RetentionPolicy时的ShardGroupDarution决定,这样每条写入的数据就必然仅属于一个确定的ShardGroup中;

主要实现在cluster/points_writer.go中的MapShards中func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) {    // holds the start time ranges for required shard groups

timeRanges := map[time.Time]*meta.ShardGroupInfo{}

rp, err := w.MetaClient.RetentionPolicy(wp.Database, wp.RetentionPolicy)    if err != nil {        return nil, err

}    if rp == nil {        return nil, influxdb.ErrRetentionPolicyNotFound(wp.RetentionPolicy)

}    for _, p := range wp.Points {

timeRanges[p.Time().Truncate(rp.ShardGroupDuration)] = nil

}    // holds all the shard groups and shards that are required for writes

for t := range timeRanges {

sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, t)        if err != nil {            return nil, err

}

timeRanges[t] = sg

}

mapping := NewShardMapping()    for _, p := range wp.Points {

sg := timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]

sh := sg.ShardFor(p.HashID())

mapping.MapPoint(&sh, p)

}    return mapping, nil}我们来拆解下上面函数的实现

3.1 扫描所有的points, 按时间确定我们需要多个ShardGroupfor _, p := range wp.Points {

timeRanges[p.Time().Truncate(rp.ShardGroupDuration)] = nil

}

3.2 调用w.MetaClient.CreateShardGroup, 如果ShardGroup存在直接返回ShardGroup信息,如果不存在创建,创建过程涉及到将CreateShardGroup的请求发送给MetadataServer并等待本地更新到新的MetaData数据;sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, t)

3.3 分析ShardGroup的分配规则, 在services/meta/data.go中的CreateShardGroupfunc (data *Data) CreateShardGroup(database, policy string, timestamp time.Time) error {

...    // Require at least one replica but no more replicas than nodes.

// 确认复本数,不能大于DataNode节点总数

replicaN := rpi.ReplicaN    if replicaN == 0 {

replicaN = 1

} else if replicaN > len(data.DataNodes) {

replicaN = len(data.DataNodes)

}    // Determine shard count by node count divided by replication factor.

// This will ensure nodes will get distributed across nodes evenly and

// replicated the correct number of times.

// 根据复本数确定Shard数量

shardN := len(data.DataNodes) / replicaN    // Create the shard group.

// 创建ShardGroup

data.MaxShardGroupID++

sgi := ShardGroupInfo{}

sgi.ID = data.MaxShardGroupID

sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC()

sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC()    // Create shards on the group.

sgi.Shards = make([]ShardInfo, shardN)    for i := range sgi.Shards {

data.MaxShardID++

sgi.Shards[i] = ShardInfo{ID: data.MaxShardID}

}    // Assign data nodes to shards via round robin.

// Start from a repeatably "random" place in the node list.

// ShardInfo中的Owners记录了当前Shard所有复本所在DataNode的信息

// 分Shard的所有复本分配DataNode

// 使用data.Index作为基数确定开始的DataNode,然后使用 round robin策略分配

// data.Index:每次meta信息有更新,Index就会更新, 可以理解为meta信息的版本号

nodeIndex := int(data.Index % uint64(len(data.DataNodes)))    for i := range sgi.Shards {

si := &sgi.Shards[i]        for j := 0; j 

nodeID := data.DataNodes[nodeIndex%len(data.DataNodes)].ID

si.Owners = append(si.Owners, ShardOwner{NodeID: nodeID})

nodeIndex++

}

}    // Retention policy has a new shard group, so update the policy. Shard

// Groups must be stored in sorted order, as other parts of the system

// assume this to be the case.

rpi.ShardGroups = append(rpi.ShardGroups, sgi)

sort.Sort(ShardGroupInfos(rpi.ShardGroups))    return nil

}

3.3 按每一个具体的point对应到ShardGroup中的一个Shard: 按point的HashID来对Shard总数取模,HashID是measurment + tag set的Hash值for _, p := range wp.Points {

sg := timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]

sh := sg.ShardFor(p.HashID())

mapping.MapPoint(&sh, p)

}

....

func (sgi *ShardGroupInfo) ShardFor(hash uint64) ShardInfo {    return sgi.Shards[hash%uint64(len(sgi.Shards))]

}

数据按一致性要求写入过程简述

1.1 根据一致性要求确认需要成功写入几份switch consistency {    // 对于ConsistencyLevelAny, ConsistencyLevelOne只需要写入一份即满足一致性要求,返回客户端

case ConsistencyLevelAny, ConsistencyLevelOne:

required = 1

case ConsistencyLevelQuorum:

required = required/2 + 1

}

1.2 根据Shard.Owners对应的DataNode, 向其中的每个DataNode写入数据,如果是本机,直接调用w.TSDBStore.WriteToShard写入;如果非本机,调用err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points);

1.3 写入远端失败时,数据写入HintedHandoff本地磁盘队列多次重试写到远端,直到数据过期被清理;对于一致性要求是ConsistencyLevelAny, 写入本地HintedHandoff成功,就算是写入成功;w.statMap.Add(statWritePointReqHH, int64(len(points)))

hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points)                if hherr != nil {

ch 

}                if hherr == nil && consistency == ConsistencyLevelAny {

ch 

}

1.4 等待写入超时或完成for range shard.Owners {

select {        case 

w.statMap.Add(statWriteTimeout, 1)            // return timeout error to caller

return ErrTimeout        case result := 

if result.Err != nil {                if writeError == nil {

writeError = result.Err

}                continue

}

wrote++            // 写入已达到一致性要求,就立即返回

if wrote >= required {

w.statMap.Add(statWriteOK, 1)                return nil

}

}

}

HintedHandoff服务定义在services/hh/service.go中

写入HintedHandoff中的数据,按NodeID的不同写入不同的目录,每个目录下又分多个文件,每个文件作为一个segment, 命名规则就是依次递增的id, id的大小按序就是写入的时间按从旧到新排序;

hitnedhandoff.png

HintedHandoff服务会针对每一个远端DataNode创建NodeProcessor, 每个负责自己DataNode的写入, 运行在一个单独的goroutine中

在每个goroutine中,作两件事:一个是定时清理过期的数据,如果被清理掉的数据还没有成功写入到远端,则会丢失;二是从文件读取数据写入到远端;func (n *NodeProcessor) run() {

defer n.wg.Done()

...    for {

select {        case 

case 

n.Logger.Printf("failed to purge for node %d: %s", n.nodeID, err.Error())

}        case 

limiter := NewRateLimiter(n.RetryRateLimit)            for {

c, err := n.SendWrite()                if err != nil {                    if err == io.EOF {                        // No more data, return to configured interval

currInterval = time.Duration(n.RetryInterval)

} else {

currInterval = currInterval * 2

if currInterval > time.Duration(n.RetryMaxInterval) {

currInterval = time.Duration(n.RetryMaxInterval)

}

}                    break

}                // Success! Ensure backoff is cancelled.

currInterval = time.Duration(n.RetryInterval)                // Update how many bytes we've sent

limiter.Update(c)                // Block to maintain the throughput rate

time.Sleep(limiter.Delay())

}

}

}

}数据的本地存储和读取

5.1 定义在services/hh/queue.go,所有的segment file在内存中组织成一个队列,读从head指向的segment读取,写入到tail指向的segment, 每个segment文件的最后8字节记录当前segment文件已经读到什么位置

5.2 清理,当这个segment文件内容都发送完当前文件会被删除,周期性清理每次只会check当前head指向的segment是否需要清理掉

作者:扫帚的影子

链接:https://www.jianshu.com/p/6a94486b2daa

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/244536.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

html5内容切换特效,html5+jQuery图片和文字内容同时左右切换特效

html5jQuery图片和文字内容同时左右切换特效,点击图片或者点击左右按钮进行切换,图片转动以及文字内容动画效果切换。查看演示下载资源:22次 下载资源下载积分:20积分js代码 (function(){var bannerIndex 0;var $bannerBgs $(.j…

asp向不同的用户发送信息_【asp.net core 系列】 1 带你了解一下asp.net core

0. 前言 这是一个新的系列,名字是《http://ASP.NET Core 入门到实战》。这个系列主讲http://ASP.NET Core MVC,辅助一些前端的基础知识(能用来实现我们需要的即可,并非主讲)。同时这个系列也会在后续介绍http://ASP.NE…

python邮件转发_把Gmail邮件转发到gtalk的Python方法

用Python方法把Gmail邮件转发到gtalk有多难,想知道gmail如何转发邮件?gmail自动转发已收到邮件的设置方法。本文就来把这段详细的python学习参考代码贴出来,大家一起参考下吧。gmail转发邮件到gtalk的Python方法源码:#!/usr/bin/env python#…

html 怎么使用http请求数据类型,HTTP请求方式中8种请求方法(简单介绍)

HTTP工作原理HTTP协议定义Web客户端如何从Web服务器请求Web页面,以及服务器如何把Web页面传送给客户端。HTTP协议采用了请求/响应模型。客户端向服务器发送一个请求报文,请求报文包含请求的方法、URL、协议版本、请求头部和请求数据。服务器以一个状态行…

上行30m下行200m是多少宽带_套餐内有多少流量,就加送多少流量!电信流量攻势太凶猛!...

虽然6月6日国内发放了5G商用牌照,5G时代正式开启。但是对于用户来说,要想用上5G服务尚需等待一段时间,因为现在5G手机终端尚未普及、5G资费套餐也未出炉、运营商的5G网络还需要通过建设进一步扩大覆盖范围。在5G真正走近用户的前夕&#xff0…

怎么在终端启用python_在终端启动Python时报错的解决方案

最近,在终端启动Python时,报了一个错误: Failed calling sys.__interactivehook__ Traceback (most recent call last): File "d:\ProgramData\Anaconda3\lib\site.py", line 439, in register_readline readline.read_history_fil…

vue项目html引入css,vue项目引入自定义.css的样式文件

ES6的引入方式:.vue文件中css文件引入import "../assets/common/common.css";//自定义.css的样式路径js文件的引入在main.js中:import API from ./assets/api/api.config.jsVue.prototype.$API API;P.S.:传统上,引入cs…

485通信原理_上位机开发之单片机通信实践

经常会有一些学员会问到上位机与单片机之间通信的问题,而我们经常会讲上位机与PLC之间通信,那么其实对上位机开发来说,不管是和PLC通信,还是和单片机通信,通信原理都是一样的。PLC的本质就是单片机,在单片机…

hive 结构metastone_深入理解hive之事务处理

事务的四个特性1.automicity:原子性2.consistency:一致性3. isolation:独立性4.durability:持久性5.支持事务有几个条件需要满足:1.所有的事务都支持自动提交;2.只支持ORC格式的数据;3.桶表7.配置hive的参数使其支持事务&#xff…

计算机数媒专业优势,27所院校新开设数字媒体艺术专业,“数媒”专业为什么这么火?...

原标题:27所院校新开设数字媒体艺术专业,“数媒”专业为什么这么火?今年2月,教育部公布了2020年度普通高等学校本科专业备案和审批结果。其中,有包括华中农业大学在内的全国27所院校新增备案了“数字媒体艺术”本科专业…

sap 分割评估_SAP那些事-实战篇-73-受托加工的几种方案探讨

大家都知道,SAP中对于委托加工有标准的解决方案,包括标准委托(带料委托加工)和工序委托,然而,对于受托方(即代加工方)SAP却没有标准的解决方案,后来想想,也怪…

婚宴座位图html5,图解现代婚宴座位安排

中国人衣、食、住、行素来都有“礼”可循。对于婚宴来说更是讲究,桌子的摆放,座位的安排都需要经过深思熟虑。因为小小的桌椅就关系到婚宴的礼仪以及宾客的身份。下面就让专家为我们进行图解婚宴座位安排和礼仪的指导吧!婚礼座位安排&#xf…

c++ 图的连通分量是什么_学习数据结构第五章:图(图的遍历操作)

第五章:图(图的遍历操作)1.图的遍历图的遍历:从图中某一顶点出发,按照某种搜索方法沿着图中的边对图中的所有顶点访问依次且仅访问一次其实树的层次遍历和图的广度优先搜索类似,可以把这个二叉树看成一个图2.广度优先搜索(BFS)广度…

带通滤波器作用和用途_带通滤波器的作用和电路设计图

一个带通滤波器是一个只有在特定频段的频率传递信号衰减这一频段以外的所有信号的同时,其目的是的电路。在一个带通滤波器的重要参数,高,低截止频率( F H 和f 升),带宽(BW),中心频率 f C,中心频率增益&…

轮换html有虚宽出现,乒乓球理论考试复习资料

一、判断题(共10题,每题3分)1)突出、战术变化多、无明显漏洞。()正确2)运动员甲使用的球拍底板由于磨损碰掉一块木片,他用质量相同的木料进行修补,表面平整,裁判员不许其正式比赛时使用。()正确3)发球时第一落点距球网过近&#x…

gson json转map_Java几种常用JSON库性能比较

点击上方“Java研发军团”,选择“置顶公众号”关键时刻,第一时间送达!阅读本文需要5分钟作者:飞污熊xncoding.com/2018/01/09/java/jsons.html本篇通过JMH来测试一下Java中几种常见的JSON解析库的性能。 每次都在网上看到别人说什…

nodejs 游戏框架_nodejs游戏服务器框架

{"moduleinfo":{"card_count":[{"count_phone":1,"count":1}],"search_count":[{"count_phone":6,"count":6}]},"card":[{"des":"云服务器 ECS(Elastic Compute Service)是一…

计算机辅助设计工业产品cad竞赛试题,2017计算机辅助设计(工业产品CAD)赛项样题...

2017年全国职业院校技能大赛中职“计算机辅助设计(工业产品CAD)”赛项样题参赛选手须知1.比赛时间4小时,赛题共3大题4道题目,参赛选手需完成全部题目。2.参赛选手应始终保持赛题、图册完整,不得拆散赛题或图册。3.参赛选手须严格按照题目规定…

技校毕业是什么学历_技校毕业了是什么学历

展开全部技校毕业是中专学历。技校的全称为:技工学校。技校与中专、32313133353236313431303231363533e59b9ee7ad9431333365656536职高、一样,都是属于同等的高中层次学历。技校毕业的学生可以获得技工学校印发的初级、中级、高级或者以上的职业资格证书…

golang 捕获堆栈信息_【网络数据安全】为什么时间戳对于数据包捕获很重要

网络上发生的所有事件都是时间敏感的,这就是为什么在讨论数据包捕获和分析时,给数据包加上时间戳非常重要。此功能不仅可以防止和分析网络攻击,而且还能让你检查趋势和网络延迟。网络数据包时间戳可用于调查以某种方式影响网络性能的事件。例…