日志服务(SLS)集成 Spark 流计算实战

前言

日志服务作为一站式的日志的采集与分析平台,提供了各种用户场景的日志采集能力,通过日志服务提供的各种与·与SDK,采集客户端(Logtail),Producer,用户可以非常容易的把各种数据源中的数据采集到日志服务的Logstore中。同时为了便于用户对日志进行处理,提供了各种支持流式消费的SDK,如各种语言的消费组,与 Spark,Flink,Storm 等各种流计算技术无缝对接的Connector,以便于用户根据自己的业务场景非常便捷的处理海量日志。

从最早的Spark Streaming到最新的Stuctured Streaming,Spark 一直是最流行的流计算框架之一。使用日志服务的Spark SDK,可以非常方便的在Spark 中消费日志服务中的数据,同时也支持将 Spark 的计算结果写入日志服务。

日志服务基础概念

日志服务的存储层是一个类似Kafka的Append only的FIFO消息队列,包含如下基本概念:

  • 日志(Log):由时间、及一组不定个数的Key-Value对组成。
  • 日志组(LogGroup):一组日志的集合,包含相同Meta信息如Topic,Source,Tags等。是读写的基本单位。

图-1 Log与LogGroup的关系

  • Shard:分区,LogGroup读写基本单元,对应于Kafka的partition。
  • Logstore:日志库,用以存放同一类日志数据。Logstore会包含1个或多个Shard。
  • Project:Logstore存放容器,包含一个或者多个Logstore。

准备工作

1)添加Maven依赖:

<dependency><groupId>com.aliyun.emr</groupId><artifactId>emr-logservice_2.11</artifactId><version>1.9.0</version>
</dependency>

Github源码下载。
2)计划消费的日志服务project,logstore以及对应的endpoint。
3)用于访问日志服务Open API的Access Key。

对 Spark Streaming 的支持

Spark Streaming是Spark最早推出的流计算技术,现在已经进入维护状态,不再会增加新的功能。但是考虑到Spark Streaming 的使用仍然非常广泛,我们先从Spark Streaming开始介绍。Spark Streaming 提供了一个DStream 的数据模型抽象,本质是把无界数据集拆分成一个一个的RDD,转化为有界数据集的流式计算。每个批次处理的数据就是这段时间内从日志服务消费到的数据。

 

图-2 DStream

Spark Streaming 从日志服务消费支持 Receiver 和 Direct 两种消费方式。

Receiver模式

Receivers的实现内部实现基于日志服务的消费组(Consumer Library)。数据拉取与处理完全分离。消费组自动均匀分配Logstore内的所有shard到所有的Receiver,并且自动提交checkpoint到SLS。这就意味着Logstore内的shard个数与Spark 实际的并发没有对应关系。
对于所有的Receiver,接收到的数据默认会保存在Spark Executors中,所以Failover的时候有可能造成数据丢失,这个时候就需要开启WAL日志,Failover的时候可以从WAL中恢复,防止丢失数据。

SDK将SLS中的每行日志解析为JSON字符串形式,Receiver使用示例如下所示:

object SLSReceiverSample {def main(args: Array[String]): Unit = {val project = "your project"val logstore = "your logstore"val consumerGroup = "consumer group"val endpoint = "your endpoint"val accessKeyId = "access key id"val accessKeySecret = "access key secret"val batchInterval = Milliseconds(5 * 1000)val conf = new SparkConf().setAppName("Test SLS Loghub")val ssc = new StreamingContext(conf, batchInterval)val stream = LoghubUtils.createStream(ssc,project,logstore,consumerGroup,endpoint,accessKeyId,accessKeySecret,StorageLevel.MEMORY_AND_DISK,LogHubCursorPosition.END_CURSOR)stream.checkpoint(batchInterval * 2).foreachRDD(rdd =>rdd.map(bytes => new String(bytes)).top(10).foreach(println))ssc.checkpoint("hdfs:///tmp/spark/streaming")ssc.start()ssc.awaitTermination()}
}

除Project,Logstore,Access Key 这些基础配置外,还可以指定StorageLevel,消费开始位置等。

Direct模式

Direct模式不再需要Receiver,也不依赖于消费组,而是使用日志服务的低级API,在每个批次内直接从服务端拉取数据处理。对于Logstore中的每个Shard来说,每个批次都会读取指定位置范围内的数据。为了保证一致性,只有在每个批次确认正常结束之后才能把每个Shard的消费结束位置(checkpoint)保存到服务端。

为了实现Direct模式,SDK依赖一个本地的ZooKeeper,每个shard的checkpoint会临时保存到本地的ZooKeeper,等用户手动提交checkpoint时,再从ZooKeeper中同步到服务端。Failover时也是先从本地ZooKeeper中尝试读上一次的checkpoint,如果没有读到再从服务端获取。

object SLSDirectSample {def main(args: Array[String]): Unit = {val project = "your project"val logstore = "your logstore"val consumerGroup = "consumerGroup"val endpoint = "endpoint"val accessKeyId = "access key id"val accessKeySecret = "access key secret"val batchInterval = Milliseconds(5 * 1000)val zkAddress = "localhost:2181"val conf = new SparkConf().setAppName("Test Direct SLS Loghub")val ssc = new StreamingContext(conf, batchInterval)val zkParas = Map("zookeeper.connect" -> zkAddress)val loghubStream = LoghubUtils.createDirectStream(ssc,project,logstore,consumerGroup,accessKeyId,accessKeySecret,endpoint,zkParas,LogHubCursorPosition.END_CURSOR)loghubStream.checkpoint(batchInterval).foreachRDD(rdd => {println(s"count by key: ${rdd.map(s => {s.sorted(s.length, s)}).countByKey().size}")// 手动更新checkpointloghubStream.asInstanceOf[CanCommitOffsets].commitAsync()})ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directoryssc.start()ssc.awaitTermination()}
}

Direct模式示例

如何限速

在Receiver中,如果需要限制消费速度,我们只需要调整 Consumer Library 本身的参数即可。而Direct方式是在每个批次开始时从SLS拉取数据,这就涉及到一个问题:一个批次内拉取多少数据才合适。如果太多,一个批次内处理不完,造成处理延时。如果太少会导worker空闲,工作不饱和,消费延时。这个时候我们就需要合理配置拉取的速度和行数,实现一个批次尽可能多处理又能及时完成的目标。理想状态下Spark 消费的整体速率应该与SLS采集速率一致,才能实现真正的实时处理。

由于SLS的数据模型是以LogGroup作为读写的基本单位,而一个LogGroup中可能包含上万行日志,这就意味着Spark中直接限制每个批次的行数难以实现。因此,Direct限流涉及到两个配置参数:

参数说明默认值
spark.streaming.loghub.maxRatePerShard每个批次每个Shard读取行数,决定了限流的下限10000
spark.loghub.batchGet.step每次请求读取LogGroup个数,决定了限流的粒度100

可以通过适当缩小spark.loghub.batchGet.step来控制限流的精度,但是即便如此,在某些情况下还是会存在较大误差,如一个LogGroup中存在10000行日志,spark.streaming.loghub.maxRatePerShard设置为100,spark.loghub.batchGet.step设置为1,那一个批次内该shard还是会拉取10000行日志。

两种模式的对比

和Receiver相比,Direct有如下的优势:

  1. 降低资源消耗,不需要占用Executor资源来作为Receiver的角色。
  2. 鲁棒性更好,在计算的时候才会从服务端真正消费数据,降低内存使用,不再需要WAL,Failover 直接在读一次就行了,更容易实现exactly once语义。
  3. 简化并行。Spark partition 与 Logstore 的 shard 个数对应,增加shard个数就能提高Spark任务处理并发上限。

但是也存在一些缺点:

  1. 在SLS场景下,需要依赖本地的 ZooKeeper 来保存临时 checkpoint,当调用 commitAsync 时从 ZooKeeper同步到日志服务服务端。所以当需要重置 checkpoint 时,也需要先删除本地 ZooKeeper 中的 checkpoint 才能生效。
  2. 上一个批次保存 checkpoint 之前,下一个批次无法真正开始,否则 ZooKeeper 中的 checkpoint 可能会被更新成一个中间状态。目前SDK在每个批次会检查是否上一个批次的 checkpoint 还没有提交,如果没有提交则生成一个空批次,而不是继续从服务端消费。
  3. 在 SLS 场景下,限流方式不够精确。

Spark Streaming结果写入SLS

与消费SLS相反,Spark Streaming的处理结果也可以直接写入SLS。使用示例:

...val lines = loghubStream.map(x => x)// 转换函数把结果中每条记录转为一行日志def transformFunc(x: String): LogItem = {val r = new LogItem()r.PushBack("key", x)r}val callback = new Callback with Serializable {override def onCompletion(result: Result): Unit = {println(s"Send result ${result.isSuccessful}")}}// SLS producer configval producerConfig = Map("sls.project" -> loghubProject,"sls.logstore" -> targetLogstore,"access.key.id" -> accessKeyId,"access.key.secret" -> accessKeySecret,"sls.endpoint" -> endpoint,"sls.ioThreadCount" -> "2")lines.writeToLoghub(producerConfig,"topic","streaming",transformFunc, Option.apply(callback))ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directoryssc.start()ssc.awaitTermination()

对Structured Streaming的支持

Structured  Streaming 并不是最近才出现的技术,而是早在16年就已经出现,但是直到 Spark 2.2.0 才正式推出。其数据模型是基于无界表的概念,流数据相当于往一个表上不断追加行。

图-3 无界表模型

与Spark Streaming相比,Structured Streaming主要有如下特点:

  1. 底层实现基于Spark SQL引擎,可以使用大多数Spark SQL的函数。和Spark SQL共用大部分API,如果对Spark SQL熟悉的用户,非常容易上手。复用Spark SQL的执行引用,性能更佳。
  2. 支持 Process time 和 Event time,而Spark Streaming只支持 Process Time。
  3. 批流同一的API。Structured Streaming 复用Spark SQL的 DataSet/DataFrame模型,和 RDD/DStream相比更High level,易用性更好。
  4. 实时性更好,默认基于micro-batch模式。在 Spark 2.3 中,还增加了连续处理模型,号称可以做到毫秒级延迟。
  5. API 对用户更友好,只保留了SparkSession一个入口,不需要创建各种Context对象,使用起来更简单。

SDK使用示例

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}object StructuredStreamingDemo {def main(args: Array[String]) {val spark = SparkSession.builder.appName("StructuredLoghubWordCount").master("local").getOrCreate()import spark.implicits._val schema = new StructType(Array(StructField("content", StringType)))val lines = spark.readStream.format("loghub").schema(schema).option("sls.project", "your project").option("sls.store", "your logstore").option("access.key.id", "your access key id").option("access.key.secret", "your access key secret").option("endpoint", "your endpoint").option("startingoffsets", "latest").load().select("content").as[String]val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()val query = wordCounts.writeStream.outputMode("complete").format("loghub").option("sls.project", "sink project").option("sls.store", "sink logstore").option("access.key.id", "your access key id").option("access.key.secret", "your access key secret").option("endpoint", "your endpoint").option("checkpointLocation", "your checkpoint dir").start()query.awaitTermination()}
}

代码解释:
1)schema 声明了我们需要的字段,除了日志中的字段外,还有如下的内部字段:

__logProject__
__logStore__
__shard__
__time__
__topic__
__source__
__sequence_number__ // 每行日志唯一id

如果没有指定schema,SDK默认提供一个__value__字段,其内容为由所有字段组成的一个JSON字符串。

2)lines 定义了一个流。
startingoffsets:开始位置,支持:

  • latest :日志服务最新写入位置。强烈建议从latest开始,从其他位置开始意味着需要先处理历史数据,可能需要等待较长时间才能结束。
  • earliest:日志服务中最早的日志对应的位置。
  • 或者为每个shard指定一个开始时间,以JSON形式指定。

maxOffsetsPerTrigger:批次读取行数,SDK中默认是64*1024 。

3)结果写入到日志服务
format 指定为Loghub即可。

不足之处

  1. 不支持手动提交checkpoint,SDK内部自动保存checkpoint到checkpointLocation中。
  2. 不再需要提供consumerGroup名称,也就是说checkpoint没有保存到SLS服务端,无法在日志服务里面监控消费延迟,只能通过Spark 任务日志观察消费进度。


原文链接
本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/517048.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

再见了,Python!!

结合我最近这些年的Python学习、开发经验&#xff0c;发现90%的人在学Python时都会遇到下面这些问题&#xff1a;1.想学Python&#xff0c;但没什么经验根本不知道从何学起&#xff0c;而且应用方向太多了根本不知道该选择什么方向...2.基础入门看似简单&#xff0c;但是进阶实…

上去很美的 Serverless 在中国落地的怎么样了?

说起当前最火的技术&#xff0c;不得不提的一个概念就是 Serverless。2019 年几乎所有人都在说 Serverless&#xff0c;实际落地 Serverless 的有多少&#xff1f;Serverless 作为一种新型的互联网架构&#xff0c;直接或间接推动了云计算的发展&#xff0c;从 AWS Lambda 到阿…

Knative 驾驭篇:带你 '纵横驰骋' Knative 自动扩缩容实现

Knative 中提供了自动扩缩容灵活的实现机制&#xff0c;本文从 三横两纵 的维度带你深入了解 KPA 自动扩缩容的实现机制。让你轻松驾驭 Knative 自动扩缩容。 注&#xff1a;本文基于最新 Knative v0.11.0 版本代码解读 KPA 实现流程图 在 Knative 中&#xff0c;创建一个 Rev…

MongoDB 计划从“Data Sprawl”中逃脱

原文作者 | Adrian Bridgwater译者 |天道酬勤&#xff0c;责编 |晋兆雨头图 | CSDN 付费下载自视觉中国提供特定技术子集的软件供应商&#xff0c;喜欢用尽可能广泛的标签来提升自己&#xff0c;这是一种传达平台宽度和能力的方式。我们知道MongoDB以开源根数据库而闻名&#x…

FastMock

文章目录官网文档官网 官网&#xff1a;https://www.fastmock.site/#/ 文档 https://marvengong.gitee.io/fastmock/#/

关于在nw里使用require('printer')和nw.require('printer')报错的问题

公司项目为了兼容xp所以使用nw.js&#xff08;0.14.7-sdk&#xff09;&#xff0c;用到了printer模块&#xff08;第三方的c打印模块&#xff09;&#xff0c;在引入该模块的时候&#xff0c;使用了require导致一直报cannot find modul “.”&#xff0c;后来改用nw.require&am…

小蜜团队万字长文 | 讲透对话管理模型最新研究进展

对话管理模型背景 从人工智能研究的初期开始&#xff0c;人们就致力于开发高度智能化的人机对话系统。艾伦图灵&#xff08;Alan Turing&#xff09;在1950年提出图灵测试[1]&#xff0c;认为如果人类无法区分和他对话交谈的是机器还是人类&#xff0c;那么就可以说机器通过了…

mockjs

文章目录官网文档地址示例官网 &#xff1a;http://mockjs.com/ 文档地址 https://github.com/nuysoft/Mock/wiki/Getting-Started 示例 http://mockjs.com/examples.html

炸裂!谷歌这波操作,预警了什么?

我们都知道谷歌爸爸收购了Cask Data一家公司。长期以来&#xff0c;谷歌致力于推动围绕 GoogleCloud 的企业业务&#xff0c;但在这方面一直被亚马逊和微软吊打&#xff0c;这次的收购正是为了弥补自身的短板。被收购的 Cask Data 是一家专门提供基于Hadoop的大型数据分析服务解…

美团点评基于 Flink 的实时数仓平台实践

一、美团点评实时计算演进 美团点评实时计算演进历程 在 2016 年&#xff0c;美团点评就已经基于 Storm 实时计算引擎实现了初步的平台化。2017 年初&#xff0c;我们引入了 Spark Streaming 用于特定场景的支持&#xff0c;主要是在数据同步场景方面的尝试。在 2017 年底&am…

koa-generator 快速生成 koa2 服务的脚手架工具

文章目录1. 全局安装脚手架工具2. 执行生成3. 安装依赖4. 启动服务5. 默认的访问地址通常我们可以借助于脚手架&#xff0c;快速创建一个Koa2项目&#xff0c;当然也可以自己从头搭建&#xff1b;脚手架会帮我们提前搭好基本的架子 1. 全局安装脚手架工具 cnpm install -g koa…

轻松搭建基于 SpringBoot + Vue 的 Web 商城应用

首先介绍下在本文出现的几个比较重要的概念&#xff1a; 函数计算&#xff08;Function Compute&#xff09;: 函数计算是一个事件驱动的服务&#xff0c;通过函数计算&#xff0c;用户无需管理服务器等运行情况&#xff0c;只需编写代码并上传。函数计算准备计算资源&#xff…

股市中的Santa Claus Rally (圣诞节行情)

圣诞节行情 Santa Claus Rally Santa Claus Rally 是指 12 月 25 日圣诞节前后股市的持续上涨这样一个现象。《股票交易员年鉴》的创始人 Yale Hirsch 于 1972 年创造了这个定义&#xff0c;他将当年最后五个交易日和次年前两个交易日的时间范围定义为反弹日期。 根据 CFRA Re…

没想到!!Unicode 字符还能这样玩?

来源 | 程序通事责编 |晋兆雨头图 | CSDN 付费下载自视觉中国上周的时候&#xff0c;朋友圈的直升飞机不知道为什么就火了&#xff0c;很多朋友开着各种花式飞机带着起飞。图片来自网络还没来得及了解咋回事来着&#xff0c;这个直升飞机就????到的微博热搜。图片来自网络后…

为什么 APP 纷纷开发“暗黑模式”?优酷最佳实践总结

一、缘起 随着iOS 13和Android 10的正式发布&#xff0c;一个名词"暗黑模式(Dark Mode)"逐渐走入了大家的视野。各大APP都将暗黑模式的适配列入了开发日程&#xff0c;舆情上用户们对暗黑模式支持的呼声也非常的高。优酷主客也顺应时势&#xff0c;启动了相应的技术…

Mongo 安装、配置、启动 Windows

文章目录一、Mongo 安装1. Mongo 下载2. 安装3. 配置环境变量4. 验证5. 连接二、Mongo 配置2.1. 编辑mongod.cfg2.2. 修改dbPath2.3. systemLog路径2.4. 启动Mongod服务2.5. 安装MongoDB服务2.6. 验证一、Mongo 安装 1. Mongo 下载 下载Mongo数据库并安装 https://www.mongod…

混合云存储阵列与云存储网关的协同解决方案

前言 混合云存储阵列&#xff08;CSA&#xff09;于2017年云栖大会发布&#xff0c;上市2年多&#xff0c;已经被基因测序&#xff0c;医疗PACS&#xff0c;影视制作&#xff0c;非编&#xff0c;广电&#xff0c;视频监控等行业和场景的客户广泛采用。混合云存储阵列承载了用户…

炸裂!Google这波操作,预警了什么?

我们都知道谷歌爸爸收购了Cask Data一家公司。长期以来&#xff0c;谷歌致力于推动围绕 GoogleCloud 的企业业务&#xff0c;但在这方面一直被亚马逊和微软吊打&#xff0c;这次的收购正是为了弥补自身的短板。被收购的 Cask Data 是一家专门提供基于Hadoop的大型数据分析服务解…

Robo 3T 安装连接 MongoDB

文章目录1. 官网2.安装3. 连接 MongoDB1. 官网 https://robomongo.org 2.安装 3. 连接 MongoDB

何为真正的 FaaS ?阿里舜天平台做了四大创新

导读&#xff1a;数据中心和云计算的超高增速&#xff0c;AI、视频、基因测序等应用对于算力的无尽渴求和摩尔定律发展事实上已经停滞的现实&#xff0c;均给异构加速带来了巨大的应用潜力和商机。但 Faas 解决方案仍有较高的门槛&#xff0c;今天&#xff0c;我们一起了解 Faa…