时序数据库技术体系 – InfluxDB TSM存储引擎之数据读取

任何一个数据库系统内核关注的重点无非:数据在内存中如何存储、在文件中如何存储、索引结构如何存储、数据写入流程以及数据读取流程。关于InfluxDB存储内核,笔者在之前的文章中已经比较全面的介绍了数据的文件存储格式、倒排索引存储实现以及数据写入流程,本篇文章重点介绍InfluxDB中时序数据的读取流程。

InfluxDB支持类SQL查询,称为InfluxQL。InfluxQL支持基本的DDL操作和DML操作语句,详见InfluxQL_Spec,比如Select语句:

 

select_stmt = "SELECT" fields from_clause [ into_clause ] [ where_clause ]               
[ group_by_clause ] [ order_by_clause ] [ limit_clause ]              
[ offset_clause ] [ slimit_clause ] [ soffset_clause ] .

 

使用InfluxQL可以非常方便、人性化地对InfluxDB中的时序数据进行多维聚合分析。那InfluxDB内部是如何处理Query请求的呢?接下来笔者结合源码对InfluxDB的查询流程做一个剖析。另外,如果看官对源码这部分感兴趣,推荐先阅读官方文档对应部分:https://docs.influxdata.com/influxdb/v1.0/query_language/spec/#query-engine-internals

本文篇幅相对较长。为了方便阅读,本文分为上下两部分,上半部分会从原理层面介绍InfluxDB的数据读取流程,下半部分会举一个例子模拟整个数据读取的过程。

上半部分:InfluxDB数据读取流程原理

LSM(TSM)引擎对于读流程的处理通常来说都比较复杂,建议保持足够的耐心和专注力。理论部分会分两个小模块进行介绍,第一个模块会从宏观框架层面简单梳理整个读取流程,第二个模块会从微观细节层面分析TSM存储引擎(TSDB)内部详细的执行逻辑。

InfluxDB读取流程框架

笔者对照源码对整个流程做了一个简单的梳理(下图读者可能看不清楚,文末附有该图的高清版):

整个读取流程从宏观上分为四个部分:

1. Query:InfluxQL允许用户使用类SQL语句执行查询分析聚合,InfluxQL语法详见:https://docs.influxdata.com/influxdb/v1.0/query_language/spec/

 

2. QueryParser:InfluxQL进入系统之后,系统首先会对InfluxQL执行切词并解析为抽象语法树(AST),抽象树中标示出了数据源、查询条件、查询列以及聚合函数等等,分别对应上图中Source、Condition以及Aggration。InfluxQL没有使用通用的第三方AST解析库,自己实现了一套解析库,对细节感兴趣的可以参考:https://github.com/influxdata/influxql。接着InfluxDB会将抽象树转化为一个Query实体对象,供后续查询中使用。

 

3. BuildIterators:InfluxQL语句转换为Query实体对象之后,就进入读取流程中最重要最核心的一个环节 – 构建Iterator体系。构建Iterator体系是一个非常复杂的逻辑过程,其中细节非常繁复,笔者尽可能化繁为简,将其中的主线抽出来。为了方便理解,笔者将Iterator体系分为三个子体系:顶层Iterator子体系、中间层Iterator子体系以及底层Iterator子体系。

(1)顶层Iterator子体系

InfluxDB会为InfluxQL中所有查询field构造一个FieldIterator,FieldIterator表示每个查询列都会创建一个Iterator(称为ExprIterator),这是因为InfluxDB是列式存储系统,所有的列都是独立存储的,因此基于列分别构建Iterator方便执行查询聚合操作。比如sum(click),sum(impressions)和sum(revenue)三个查询列就分别对应一个ExprIterator。

ExprIterator根据查询列值是否需要聚合可以分为VarRefIterator和CallIterator,前者表示列值可以直接查询返回,不需要聚合;后者表示查询列需要执行某些聚合操作。示例中查询sum(click)就是典型的CallIterator,CallIterator实际实现分为两步,首先通过VarRefIterator把对应的列值查询到,再通过对应的Reduce函数执行相应聚合。比如sum(click)这个CallIterator就需要雇佣一个VarRefIterator把满足条件的click列值拿上来,再执行Reduce函数sum执行聚合操作。

(2)中间层Iterator子体系

InfluxDB中一个查询列的值可能分布在不同的Shard上,需要根据TimeRange决定给定时间段在哪些shard上,并为每个Shard构建一个Iterator,雇佣这个逻辑Iterator负责查询这个shard上对应列的列值。目前单机版所有shard都在同一个InfluxDB实例上,如果实现分布式管理,需要在这一层做处理。

(3)底层Iterator子体系

底层Iterator子体系负责单个shard(engine)上满足条件的某一列值的查找或者单机聚合,是Iterator体系中实际干活的Iterator。比如满足where advertiser = “baidu.com” 这个条件就需要先在倒排索引中根据advertiser = “baidu.com”查到包含该tag的所有series,再为每个series构建一个TagsetIterator去查找对应的列值,TagsetIterator会将查找指针置于最小的列值处。

纵观整个Iterator体系的构建,整体逻辑还是很清晰的。总结起来就是,查询按照查询列构建最顶层FieldIterator,每个FieldIterator会根据TimeRange雇佣多个ShardIterator去处理单个Shard上面对应列值的查找,对查找到的值要么直接返回要么执行Reduce函数进行聚合操作。每个Shard内部首先会根据查询条件利用倒排索引定位到所有满足条件的series,再为每个series构建一个TagsetIterator用来查找具体的列值数据。因此,TagsetIterator是整个体系中唯一干活的Iterator,所有其他上层Iterator都是逻辑Iterator。

另一个非常重要的点是,同一个Shard内的所有TagsetIterator在构建完成会合并成一个ShardIterator,这个合并过程是对这些TagsetIterator进行排序的过程,排序规则是按照series由小到大排序或者由大到小排序(由用户SQL对查询结果是由小到大排序还是由大到小排序决定)。同理,一个列值对应的多个ShardIterator构建完成之后会合并成一个FieldIterator,合并过程亦是一个排序过程,不过排序是针对所有Shard中的TagsetIterator进行的,排序规则是先比较series,再比较时间。可见,一个FieldIterator最终是由一系列排序过的TagsetIterator构成的。

 

4. Emitter.Emit:Iterator体系构建完成之后就完成了查询聚合前的准备工作,接下来就开始干活了。干活逻辑简单来讲是遍历所有FieldIterator,对每个FieldIterator执行一次Next函数,就会返回每个查询列的结果值,组装到一起就是一行数据。FieldIterator执行Next()函数会传递到最底层的TagsetIterator,TagsetIterator执行Next函数实际返回真实的时序数据。

TSDB存储引擎执行逻辑

TSDB存储引擎(实际上就是一个Shard)根据用户的查询请求执行原始数据的查询就是上文中提到的底层Iterator子体系的构建。查询过程分为两个部分:倒排索引查询过滤以及TSM数据层查询,前者通过Query中的where条件结合倒排索引过滤掉不满足条件的SeriesKey;后者根据留下的SeriesKey以及where条件中时间段信息(TimeRange)在TSMFile中以及内存中查出最终满足条件的数值列。TSDB存储引擎会将查询到的所有满足条件的原始数值列返回给上层,上层根据聚合函数对原始数据进行聚合并将聚合结果返回给用户。整个过程如下图所示:

 

上图需要从底部向上浏览,整个流程可以整理为如下:

1. 根据where condition以及所有倒排索引文件查处所有满足条件的SeriesKey

2. 将满足条件的SeriesKey根据GroupBy维度列进行分组,不同分组后续的所有操作都可以独立并发执行,因此可以多线程处理

3. 针对某个分组的SeriesKey集合以及待查询列,根据指定查询时间段(TimeRange)在所有TSMFile中根据B+树索引构建查询iterator

4. 将满足条件的原始数据返回给上层进行聚合运算,并将聚合运算的结果返回给用户

实际执行的过程可能比较抽象,为了更好的理解,笔者在下半部分举了一个示例。没有理解上面的逻辑没关系,可以先看下面的示例,看完之后再看上面的理论逻辑相信会更加容易理解。

下半部分:InfluxDB查询流程示例

文章上半部分从理论层面对InfluxDB查询流程进行了介绍。为了方便理解TSDB存储引擎处理查询流程的逻辑,笔者通过如下一个真实示例将其中的核心步骤进行说明。下表为原始时序数据表,表中有3个维度列:publisher、advertiser以及gender,3个数值列:impression、click以及revenue:

timestamp

publisher

advertiser

gender

impression

click

revenue

2017-11-01T00:00:00

ultrarimfast.com

baidu.com

male

1800

23

11.24

2017-12-01T00:00:00

bieberfever.com

google.com

male

2074

72

31.22

2018-01-04T00:00:00

ultrarimfast.com

baidu.com

false

1079

54

9.72

2018-01-08T00:00:01

ultrarimfast.com

google.com

male

1912

11

3.74

2018-01-21T00:00:01

bieberfever.com

baidu.com

male

897

17

5.48

2018-01-26T00:00:01

ultrarimfast.com

baidu.com

male

1120

73

6.48

现在用户想查询2018年1月份发布在baidu.com平台上的不同广告商的曝光量、点击量以及总收入,SQL如下所示:

select sum(click),sum(impression),sum(revenue) from table group by publisher where advertiser = "baidu.com" and timestamp > "2018-01-01" and timestamp < "2018-02-01"

步骤一:倒排索引过滤+groupby分组

原始查询语句:select ….  from ad_datasource where advertiser = “baidu.com” …… 。倒排索引即根据条件advertiser=”baidu.com”在所有Index File中遍历查询包含该tag的所有SeriesKey,具体原理(详见《时序数据库技术体系 – InfluxDB 多维查询之倒排索引》)如下:

1. 根据Index File中Measurement Block根据”ad_datasource”进行过滤,可以直接定位到给定source对应的所有TagKey所在的文件offset|size。

2. 加载出对应TagKey区域的Hash Index,使用给定TagKey(”advertiser”)进行hash可以直接定位到该TagKey对应的TagValue的文件offset|size。

3. 加载出TagKey对应TagValue区域的Hash Index,使用过滤条件TagValue(”baidu.com”)进行hash可以直接定位到该TagValue对应的所有SeriesID。

4. SeriesID就是对应SeriesKey在索引文件中的offset,直接根据SeriesID可以加载出对应的SeriesKey。

满足条件的所有SeriesKey如下表所示,共有3个:

publisher

advertiser

gender

ultrarimfast.com

baidu.com

male

ultrarimfast.com

baidu.com

false

bieberfever.com

baidu.com

male

根据倒排索引查询得到所有的SeriesKey之后,这里有一个非常重要的步骤:根据groupby条件对SeriesKey进行分组,分组算法为hash。示例查询中聚合条件为group by publisher,因此需要将上面得到的3个SeriesKey按照publisher的不同分成如下两组:

publisher

advertiser

gender

bieberfever.com

baidu.com

male

publisher

advertiser

gender

ultrarimfast.com

baidu.com

male

ultrarimfast.com

baidu.com

female

在倒排索引之后执行分组意义非常重大,分组后不同group的SeriesKey是可以并行独立执行查询并最终执行聚合的,因此后续的所有操作都可以使用多个线程并发执行,极大提升整个查询性能。

步骤二:TSM文件数据检索

到这一步,我们已经按照groupby得到分组后的SeriesKey集合。接下来需要根据SeriesKey以及TimeRange在TSM数据文件中查找满足条件的待查询列。在TSM数据文件中根据SeriesKey以及TimeRange查询field的具体过程(详见:《时序数据库技术体系 – InfluxDB TSM存储引擎之TSMFile》)如下:

上图中中间部分为索引层,TSM在启动之后就会将TSM文件的索引部分加载到内存,数据部分因为太大并不会直接加载到内存。用户查询可以分为三步:

1. 首先根据Key(SeriesKey+fieldKey)找到对应的SeriesIndex Block,因为Key是有序的,所以可以使用二分查找来具体实现

2. 找到SeriesIndex Block之后再根据查找的时间范围,使用[MinTime, MaxTime]索引定位到可能的Series Data Block列表

3. 将满足条件的Series Data Block加载到内存中解压进一步使用二分查找算法查找即可找到

在TSM中查询满足TimeRange条件的SeriesKey对应的待查询列值,因为InfluxDB会根据不同的查询列设置独立的FieldIterator,因此查询列有多少就有多少个FieldIterator,如下所示:

步骤三:原始数据聚合

查询到满足条件的所有原始数据之后,InfluxDB会根据查询聚合函数对原始数据进行聚合,如下图所示:

publisher

sum(impression)

sum(click)

sum(revenue)

bieberfever.com

897

17

5.48

ultrarimfast.com

1079 + 1120

54 + 73

9.72 + 6.48

文章总结

本文主要结合InfluxDB源码对查询聚合请求在服务器端的处理框架进行了系统理论介绍,同时深入介绍了InfluxDB Shard Engine是如何利用倒排索引、时序数据存储文件(TSMFile)处理用户的查询请求。最后,举了一个示例对Shard Engine的执行流程进行了形象化说明。整个读取的示意图附件:

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509714.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java多线程之生产者和消费者问题

线程通信:不同的线程执行不同的任务,如果这些任务有某种关系,线程之间必须能够通信,协调完成工作. 经典的生产者和消费者案例(Producer/Consumer):分析案例:1):生产者和消费者应该操作共享的资源(实现方式来做).2):使用一个或多个线程来表示生产者(Producer).3):使用一个或多个…

时序数据库技术体系 – InfluxDB TSM存储引擎之数据写入

之前两篇文章笔者分别从TSM File文件存储格式、倒排索引文件存储格式这两个方面对InfluxDB最基础、最底层也最核心的存储模块进行了介绍&#xff0c;接下来笔者会再用两篇文章在存储文件的基础上分别介绍InfluxDB是如何处理用户的写入&#xff08;删除&#xff09;请求和读取请…

zookeeper结构和命令详解

1.1. zookeeper特性1、Zookeeper&#xff1a;一个leader&#xff0c;多个follower组成的集群 2、全局数据一致&#xff1a;每个server保存一份相同的数据副本&#xff0c;client无论连接到哪个server&#xff0c;数据都是一致的 3、分布式读写&#xff0c;更新请求转发&#xf…

时序数据库技术体系 – InfluxDB 多维查询之倒排索引

在时序数据库概述一文中&#xff0c;笔者提到时序数据库的基础技术栈主要包括高吞吐写入实现、数据分级存储&#xff5c;TTL、数据高压缩率、多维度查询能力以及高效聚合能力等&#xff0c;上文《时序数据库技术体系 – InfluxDB存储引擎TSM》基于InfluxDB存储引擎TSM介绍了时序…

OSG框架分析

本文参考<<osg最长一帧>>, <<OpenSceneGraph三维渲染引擎编程指南>>, <<OpenSceneGraph三维渲染引擎设计与实践>> 整理而来, 感谢大牛们的精彩著作. 相比Ogre来说, Ogre代码很规范, 只是入门资料较少,如果能在学习之前能总体上对架构有个…

在Eclipse中如何操作zookpeer

导入jar包 jar包下载链接 代码解析 package com.itcast.zookpeer.zk;import java.io.IOException; import java.util.List;import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apac…

Linux 系统进程守护工具 cesi + superviosr

一、安装 Supervisor pip install supervisor 使用 echo_supervisord_conf 命令生成默认配置文件 echo_supervisord_conf > /etc/supervisord.conf 配置文件说明 位置&#xff1a;etc/supervisord.conf内容&#xff1a;# 指定了socket file的位置 [unix_http_server] f…

Docker 服务器安装(一)

使用官方安装脚本自动安装 安装命令如下&#xff1a; curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun 也可以使用国内 daocloud 一键安装命令&#xff1a; curl -sSL https://get.daocloud.io/docker | sh 设置docker 加速器 sudo curl -sSL https…

Docker 入门使用 (二)

配置国内的源 > /etc/docker/daemon.json{"registry-mirrors" : ["https://mirror.ccs.tencentyun.com","http://registry.docker-cn.com","http://docker.mirrors.ustc.edu.cn","http://hub-mirror.c.163.com"],"…

ElasticSearch sql 插件安装

PS&#xff1a;6.3 开始 ElasticSearch 自身已经支持SQL查询。 github地址&#xff1a;https://github.com/NLPchina/elasticsearch-sql 一、在线安装 直接执行 ./bin/elasticsearch-plugin install https://github.com/NLPchina/elasticsearch-sql/releases/download/6.3.…

zookpeer实现对服务器动态上下线的监听

服务器动态上下线程序的工作机制 服务器代码&#xff1a; 补充&#xff1a;volatile关键字&#xff1a;java中一切都是对象&#xff0c;当多个线程操作同一个对象时候&#xff0c;该对象会放在堆内存中&#xff0c;而多个线程相当于在多个栈中&#xff0c;当A线程想要去除对…

Result window is too large, from + size must be less than or equal to: [10000] but was [12390]. See

ES 查询报错 Caused by: java.lang.IllegalArgumentException: Result window is too large, from size must be less than or equal to: [10000] but was [12390]. See the scroll api for a more efficient way to request large data sets. This limit can be set by chan…

java中泛型学习总结

为什么需要使用泛型: 1):存储任意类型的数据在集合中 ,但是取出来都是Object类型的,此时就得强转.List list new ArrayList();list.add(1); //Interger类型Object ele list.get(0); //现在需要调用Interger类中的方法I nterger num (Interger) ele;System.out.println(num);…

别说“我已经很努力了”

转自&#xff1a;http://blog.csdn.net/foruok/article/details/40247543 我们程序员的努力与挣扎有时非常尴尬&#xff0c;如果没有结果&#xff0c;都是徒然&#xff0c;都是说不得说不得…… 我自己做项目经理时&#xff0c;干的项目也经常延期……非常惭愧。而延期其实对研…

Java集合框架-概述

Java集合框架的由来: 其实在Java2(jdk1.2)之前&#xff0c;Java是没有完整的集合框架的。它只有一些简单的可以自扩展的容器类&#xff0c;比如Vector&#xff0c;Stack&#xff0c;Hashtable等。 为什么存在容器类: 容器类(集合类)可以存储多个数据,既然数组可以存储多个数据…

MySQL Binlog增量同步工具go-mysql-transfer实现详解

go-mysql-transfer产品手册:https://www.kancloud.cn/wj596/go-mysql-transfer/2111996 一、 概述 工作需要研究了下阿里开源的MySQL Binlog增量订阅消费组件canal&#xff0c;其功能强大、运行稳定&#xff0c;但是有些方面不是太符合需求&#xff0c;主要有如下三点&#x…

std::thread详解

转自&#xff1a;http://www.cnblogs.com/haippy/p/3236136.html 上一篇博客《C11 并发指南一(C11 多线程初探)》中只是提到了 std::thread 的基本用法&#xff0c;并给出了一个最简单的例子&#xff0c;本文将稍微详细地介绍 std::thread 的用法。 std::thread 在 <thread&…

std::mutex详解

Mutex 又称互斥量&#xff0c;C 11中与 Mutex 相关的类&#xff08;包括锁类型&#xff09;和函数都声明在 <mutex> 头文件中&#xff0c;所以如果你需要使用 std::mutex&#xff0c;就必须包含 <mutex> 头文件。 <mutex> 头文件介绍 Mutex 系列类(四种) st…

java中stack集合框架

栈(Stack):数据结构的一种,存储特点:Last In First Out. Stack 类表示后进先出&#xff08;LIFO&#xff09;的对象栈. 栈结构在生活中的体现: 1):QQ消息. A,B,C三个人先后发送消息,我们查看的时候发现最顶上的是最新的消息. 2):手枪弹夹的装和发射: 要来实现栈的存储,底层…

c++阻塞队列

基于C11的阻塞队列简单实现 转载请说明出处&#xff1a;http://blog.csdn.net/cywosp/article/details/9157379 在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于&#xff0c;当队列为空时&#xff0c;从队列获取…