使用 SPL 高效实现 Flink SLS Connector 下推

作者:潘伟龙(豁朗)

背景

日志服务 SLS 是云原生观测与分析平台,为 Log、Metric、Trace 等数据提供大规模、低成本、实时的平台化服务,基于日志服务的便捷的数据接入能力,可以将系统日志、业务日志等接入 SLS 进行存储、分析;阿里云 Flink 是阿里云基于 Apache Flink 构建的大数据分析平台,在实时数据分析、风控检测等场景应用广泛。阿里云 Flink 原生支持阿里云日志服务 SLS 的 Connector,可以在阿里云 Flink 平台将 SLS 作为源表或者结果表使用。

在阿里云 Flink 配置 SLS 作为源表时,默认会消费 SLS 的 Logstore 数据进行动态表的构建,在消费的过程中,可以指定起始时间点,消费的数据也是指定时间点以后的全量数据;在特定场景中,往往只需要对某类特征的日志或者日志的某些字段进行分析处理,此类需求可以通过 Flink SQL 的 WHERE 和 SELECT 完成,这样做有两个问题:

1)Connector 从源头拉取了过多不必要的数据行或者数据列造成了网络的开销;

2)这些不必要的数据需要在 Flink 中进行过滤投影计算,这些清洗工作并不是数据分析的关注的重点,造成了计算的浪费。

对于这种场景,有没有更好的办法呢?

答案是肯定的,SLS 推出了 SPL 语言, 可以高效的对日志数据的清洗,加工。 这种能力也集成在了日志消费场景,包括阿里云 Flink 中 SLS Connector,通过配置 SLS SPL 即可实现对数据的清洗规则,在减少网络传输的数据量的同时,也可以减少 Flink 端计算消耗。

接下来对 SPL 及 SPL 在阿里云 Flink SLS Connector 中应用进行介绍及举例。

SLS SPL 介绍

图片

SLS SPL 是日志服务推出的一款针对弱结构化的高性能日志处理语言,可以同时在 Logtail 端、查询扫描、流式消费场景使用,具有交互式、探索式、使用简洁等特点。

SPL 基本语法如下:

<data-source> 
| <spl-cmd> -option=<option> -option ... <expression>, ... as <output>, ...
| <spl-cmd> ...
| <spl-cmd> ...

< spl-cmd > 是 SPL 指令,支持行过滤、列扩展、列裁剪、正则取值、字段投影、数值计算、JSON、CSV 等半结构化数据处理,具体参考 SPL 指令 [ 1] 介绍,具体来说包括:

结构化数据 SQL 计算指令:

支持行过滤、列扩展、数值计算、SQL 函数调用

  • extend 通过 SQL 表达式计算结果产生新字段
  • where 根据 SQL 表达式计算结果过滤数据条目
*
| extend latency=cast(latency as BIGINT)
| where status='200' AND latency>100

字段操作指令:

支持字段投影、字段重名、列裁剪

  • project 保留与给定模式相匹配的字段、重命名指定字段
  • project-away 保留与给定模式相匹配的字段、重命名指定字段
  • project-rename 重命名指定字段,并原样保留其他所有字段
*
| project-away -wildcard "__tag__:*"
| project-rename __source__=remote_addr

非结构化数据提取指令:

支持 JSON、正则、CSV 等非结构化字段值处理

  • parse-regexp 提取指定字段中的正则表达式分组匹配信息
  • parse-json 提取指定字段中的第一层 JSON 信息
  • parse-csv 提取指定字段中的 CSV 格式信息
*
| parse-csv -delim='^_^' content as time, body
| parse-regexp body, '(\S+)\s+(\w+)' as msg, user

SPL 在 Flink SLS Connector 中的原理介绍

阿里云 Flink 支持 SLS Connector,通过 SLS Connector 实时拉取 SLS 中 Logstore 的数据,分析后的数据也可以实时写入 SLS,作为一个高性能计算引擎,Flink SQL 也在越来越广泛的应用在 Flink 计算中,借助 SQL 语法可以对结构化的数据进行分析。

在 SLS Connector 中,可以配置日志字段为 Flink SQL 中的 Table 字段,然后基于 SQL 进行数据分析;在未支持 SPL 配置之前,SLS Connector 会实时消费全量的日志数据到 Flink 计算平台,当前消费方式有如下特点:

  • 在 Flink 中计算的往往不需要所有的日志行,比如在安全场景中,可能仅需要符合某种特征的数据,需要进行日志进行过滤,事实上不需要的日志行也会被拉取,造成网络带宽的浪费。
  • 在 Flink 中计算的一般是特定的字段列,比如在 Logstore 中有 30 个字段,真正需要在 Flink 计算的可能仅有 10 个字段,全字段的拉取造成了网络带宽的浪费。

在以上场景中,可能会增加并不需要的网络流量和计算开销,基于这些特点,SLS 将 SPL 的能力集成到 SLS Connector 的新版本中,可以实现数据在到达 Flink 之前已经进行了行过滤和列裁剪,这些预处理能力内置在 SLS 服务端,可以达到同时节省网络流量与 Flink 计算(过滤、列裁剪)开销的目的。

原理对比

  • 未配置 SPL 语句时:Flink 会拉取 SLS 的全量日志数据(包含所有列、所有行)进行计算,如图 1。
  • 配置 SPL 语句时:SPL 可以对拉取到的数据如果 SPL 语句包含过滤及列裁剪等,Flink 拉取到的是进行过滤和列裁剪后部分数据进行计算,如图 2。

图片

在 Flink 中使用 SLS SPL

接下来以一个 Nginx 日志为例,来介绍基于 SLS SPL 的能力来使用 Flink。为了便于演示,这里在 Flink 控制台配置 SLS 的源表,然后开启一个连续查询以观察效果。在实际使用过程中,可以直接修改 SLS 源表,保留其余分析和写出逻辑。

接下来介绍下阿里云 Flink 中使用 SPL 实现行过滤与列裁剪功能。

在 SLS 准备数据

  • 开通 SLS,在 SLS 创建 Project,Logstore,并创建具有消费 Logstore 的权限的账号 AK/SK。
  • 当前 Logstore 数据使用 SLS 的的 SLB 七层日志模拟接入方式产生模拟数据,其中包含 10 多个字段。

图片

模拟接入会持续产生随机的日志数据,日志内容示例如下:

{"__source__": "127.0.0.1","__tag__:__receive_time__": "1706531737","__time__": "1706531727","__topic__": "slb_layer7","body_bytes_sent": "3577","client_ip": "114.137.195.189","host": "www.pi.mock.com","http_host": "www.cwj.mock.com","http_user_agent": "Mozilla/5.0 (Windows NT 6.2; rv:22.0) Gecko/20130405 Firefox/23.0","request_length": "1662","request_method": "GET","request_time": "31","request_uri": "/request/path-0/file-3","scheme": "https","slbid": "slb-02","status": "200","upstream_addr": "42.63.187.102","upstream_response_time": "32","upstream_status": "200","vip_addr": "223.18.47.239"
}

Logstore 中 slbid 字段有两种值:slb-01 和 slb-02,对 15 分钟的日志数据进行 slbid 统计,可以发现 slb-01 与 slb-02 数量相当。

图片

行过滤场景

在数据处理中过滤数据是一种常见需求,在 Flink 中可以使用 filter 算子或者 SQL 中的 where 条件进行过滤,使用非常方便;但是在 Flink 使用 filter 算子,往往意味着数据已经通过网络进入 Flink 计算引擎中,全量的数据会消耗着网络带宽和 Flink 的计算性能,这种场景下,SLS SPL 为 Flink SLS Connector 提供了一种支持过滤“下推”的能力,通过配置 SLS Connector 的 query 语句中,过滤条件,即可实现过滤条件下推。避免全量数据传输和全量数据过滤计算。

图片

创建 SQL 作业

在阿里云 Flink 控制台创建一个空白的 SQL 的流作业草稿,点击下一步,进入作业编写。

图片

在作业草稿中输入如下创建临时表的语句:

CREATE TEMPORARY TABLE sls_input(request_uri STRING,scheme STRING,slbid STRING,status STRING,`__topic__` STRING METADATA VIRTUAL,`__source__` STRING METADATA VIRTUAL,`__timestamp__` STRING METADATA VIRTUAL,__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,proctime as PROCTIME()
) WITH ('connector' = 'sls','endpoint' ='cn-beijing-intranet.log.aliyuncs.com','accessId' = '${ak}','accessKey' = '${sk}','starttime' = '2024-01-21 00:00:00','project' ='${project}','logstore' ='test-nginx-log','query' = '* | where slbid = ''slb-01'''
);
  • 这里为了演示方便,仅设置 request_uri、scheme、slbid、status 和一些元数据字段作为表字段。
  • a k 、 {ak}、 ak{sk}、${project} 替换为具有 Logstore 消费权限的账号。
  • endpoint:填写同地域的 SLS 的私网地址。
  • query:填写 SLS 的 SPL 语句,这里填写了 SPL 的过滤语句:* | where slbid = ‘‘slb-01’’,注意在阿里云 Flink 的 SQL 作业开发中,字符串需要使用英文单引号进行转义。

连续查询及效果

在作业中输入分析语句,按照 slbid 进行聚合查询,动态查询会根据日志的变化,实时刷新数字。

SELECT slbid, count(1) as slb_cnt FROM sls_input GROUP BY slbid

点击右上角调试按钮,进行调试,可以看到结果中 slbid 的字段值,始终是 slb-01。

图片

可以看出设置了 SPL 语句后,sls_input 仅包含 slbid=‘slb-01’ 的数据,其他不符合条件的数据被过滤掉了。

流量对比

使用 SPL 后,可以看出在 SLS 的写流量不变的情况下,Flink 对 SLS 的读流量有大幅度下降;同时在过滤占主要很多 Flink CU 的场景下,经过过滤后,Flink CU 也会有相应的降低。

图片

列裁剪场景

在数据处理中列裁剪也是一种常见需求,在原始数据中,往往会有全量的字段,但是实际的计算只需要特定的字段;类似需要在 Flink 中可以使用 project 算子或者 SQL 中的 select 进行列裁剪与变换,使用 Flink 使用 project 算子,往往意味着数据已经通过网络进入 Flink 计算引擎中,全量的数据会消耗着网络带宽和 Flink 的计算性能,这种场景下,SLS SPL 为 Flink SLS Connector 提供了一种支持投影下推的能力,通过配置 SLS Connector 的 query 参数,即可实现投影字段下推。避免全量数据传输和全量数据过滤计算。

创建 SQL 作业

创建步骤同行过滤场景,在作业草稿中输入如下创建临时表的语句,这里 query 参数配置进行了修改,在过滤的基础上增加了投影语句,可以实现从 SLS 服务端仅拉取特定字段的内容。

CREATE TEMPORARY TABLE sls_input(request_uri STRING,scheme STRING,slbid STRING,status STRING,`__topic__` STRING METADATA VIRTUAL,`__source__` STRING METADATA VIRTUAL,`__timestamp__` STRING METADATA VIRTUAL,__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,proctime as PROCTIME()
) WITH ('connector' = 'sls','endpoint' ='cn-beijing-intranet.log.aliyuncs.com','accessId' = '${ak}','accessKey' = '${sk}','starttime' = '2024-01-21 00:00:00','project' ='${project}','logstore' ='test-nginx-log','query' = '* | where slbid = ''slb-01'' | project request_uri, scheme, slbid, status, __topic__, __source__, "__tag__:__receive_time__"'
);

为了效果,下面分行展示语句中配置,在 Flink 语句中任然需要单行配置。

* 
| where slbid = ''slb-01'' 
| project request_uri, scheme, slbid, status, __topic__, __source__, "__tag__:__receive_time__"

上面使用了 SLS SPL 的管道式语法来实现数据过滤后投影的操作,类似 Unix 管道,使用|符号将不同指令进行分割,上一条指令的输出作为下一条指令的输入,最后的指令的输出表示整个管道的输出。

连续查询及效果

图片

在作业中输入分析语句,可以看到,结果与行过滤场景结果类似。

SELECT slbid, count(1) as slb_cnt FROM sls_input_project GROUP BY slbid

🔔 注意: 这里与行过滤不同的是,上面的行过滤场景会返回全量的字段,而当前的语句令 SLS Connector 只返回特定的字段,再次减少了数据的网络传输。

SPL 还可以做什么

  • 上述实例中演示了使用 SLS SPL 的过滤和投影功能来实现 SLS Connector 的“下推”功能,可以有效地减少网络流量和 Flink CU 的使用。可以避免在 Flink 进行计算之前,进行额外的过滤和投影计算消耗。
  • SLS SPL 的功能不止于过滤与投影,SLS SPL 完整支持的语法可以参考文档:SPL 指令 [ 1] 。同时,SPL管道式语法已全面支持在 Flink Connector 中进行配置。
  • SLS SPL 支持对于数据进行预处理,比如正则字段、JSON 字段,CSV 字段展开;数据格式转换,列的增加和减少;过滤等。除了用于消费场景,在 SLS 的 Scan 模式与采集端都会应用场景,以便用户在采集端、消费端都可以使用 SPL 的能力。

相关链接:

[1] SPL 指令

https://help.aliyun.com/zh/sls/user-guide/spl-instruction?spm=a2c4g.11186623.0.0.33f35a3dl8g8KD

[2] 日志服务概述

https://help.aliyun.com/zh/sls/product-overview/what-is-log-service

[3] SPL 概述

https://help.aliyun.com/zh/sls/user-guide/spl-overview

[4] 阿里云 Flink Connector SLS

https://help.aliyun.com/zh/flink/developer-reference/log-service-connector

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/736385.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

打字通小游戏制作教程:用HTML5和JavaScript提升打字速度

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…

Redis主从架构和管道Lua(一)

Redis主从架构 架构 Redis主从工作原理 如果为master配置了一个slave,不管这个slave是否是第一次连接上Master,它都会发送一个PSYNC命令给master请求复制数据。master受到PSYNC命令&#xff0c;会在后台进行数据持久化通过bgsave生成最新的 RDB快照文件&#xff0c;持久化期间…

Java 集合类的高级特性介绍

在 Java 编程中&#xff0c;了解集合类的高级特性对于编写高效和可维护的代码至关重要。以下是一些你应该知道的 Java 集合类的高级特性&#xff0c;以及简单的例子来说明它们的用法。 1. 迭代器&#xff08;Iterators&#xff09;和列表迭代器&#xff08;ListIterators&#…

Babel:现代JavaScript的桥梁

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

基于YOLOv8深度学习的路面坑洞检测与分割系统【python源码+Pyqt5界面+数据集+训练代码】深度学习实战、目标分割

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

如何入驻1688跨境市场,拿新赛道下的百万流量!|1688API接口一键链接国内外

✦ ✦ ✦ 前言 1688是国内领先源头厂货直销平台&#xff0c;拥有2亿用户、6500万专业卖家&#xff0c;覆盖57大行业、年交易额︎8000亿&#xff0c;100万中小企业已入驻。刚刚升为阿里第一批战略级创新业务的1688&#xff0c;又被曝出新消息。近期&#xff0c;关于“阿里16…

Socket通信Demo(Unity客户端和C#)

新建一个Unity项目新建脚本编写客户端 using System.Net.Sockets; using System.Net; using System; using System.Text;public class Client : MonoBehaviour {private Socket socket;//定义用来存消息的容器private byte[] buffer new byte[1024];// Start is called befor…

使用GraaVIM打包Linux平台本地镜像

1.创建实例&#xff0c;在WindTerm上面连接云服务器 2.安装Lrzsz文件上传工具 yum install lrzsz 3.上传打好的jar包 lrz 使用ls命令查看是否上传成功 3.安装gcc等环境 sudo yum install gcc glibc-devel zlib-devel 4.下载安装配置Linux下的GraaVIM、native-image 下载链…

RocketMQ入门指南:从零开始学习分布式消息队列技术

RocketMQ 1. MQ介绍1.1 为什么要用MQ1.2 MQ的优点和缺点1.3 各种MQ产品的比较 2. RocketMQ快速入门2.1 准备工作2.1.1 下载RocketMQ2.2.2 环境要求 2.2 安装RocketMQ2.2.1 安装步骤2.2.2 目录介绍 2.3 启动RocketMQ2.4 测试RocketMQ2.4.1 发送消息2.4.2 接收消息 2.5 关闭Rocke…

模板不存在:./Application/Home/View/OnContact/Index.html 错误位置

模板不存在:./Application/Home/View/OnContact/Index.html 错误位置FILE: /home/huimingdedhpucixmaihndged5e/wwwroot/ThinkPHP123/Library/Think/View.class.php  LINE: 110 TRACE#0 /home/huimingdedhpucixmaihndged5e/wwwroot/ThinkPHP123/Library/Think/View.class.php(…

接收服务端请求,WebSocket 并非唯一选择!(含:ChatGPT 流推送原理解析)

前端训练营&#xff1a;1v1私教&#xff0c;终身辅导计划&#xff0c;帮你拿到满意的 offer。 已帮助数百位同学拿到了中大厂 offer。欢迎来撩~~~~~~~~ Hello&#xff0c;大家好&#xff0c;我是 Sunday。 说到推送数据&#xff0c;大家可能首先想到的是 WebSocket。 事实上&…

STM32利用标准库的方式输出PWM(proteus仿真)

首先打开proteus仿真软件&#xff0c;绘制电路图&#xff1a; 其中示波器的添加很简单的&#xff0c;看图&#xff1a; 再来看看咱们最后程序的效果&#xff1a; 下面就是程序代码了&#xff0c;新建两个文件PWM.c和PWM.h文件&#xff0c;所属关系如图&#xff1a; 整个的编程思…

VBA_NZ系列工具NZ02:VBA读取PDF使用说明

我的教程一共九套及VBA汉英手册一部&#xff0c;分为初级、中级、高级三大部分。是对VBA的系统讲解&#xff0c;从简单的入门&#xff0c;到数据库&#xff0c;到字典&#xff0c;到高级的网抓及类的应用。大家在学习的过程中可能会存在困惑&#xff0c;这么多知识点该如何组织…

亚马逊店铺解决和预防订单下滑的技巧

1. 保持账号的良好表现。不要销售侵权产品&#xff0c;发货要及时&#xff0c;能有追踪号的就带可查询追踪号&#xff0c;能发FBA的就通过FBA发货。 2. 持续做好产品优化工作&#xff0c;及时留意大环境的变化和平台政策变动。遇到编辑权限受限&#xff0c;可开case咨询或申请…

LeetCode.2129. 将标题首字母大写

&#x1f354;题目 2129. 将标题首字母大写 &#x1f35f;分析 这道题目描述的很清晰&#xff0c;我们只需要将给定的字符串按照空格划分成字符串数组 str&#xff0c;然后判断 str[i] 的长度如果 <2 &#xff0c;则将 str[i] 转为小写&#xff0c;如果 str[i] 的长度 &g…

JVM 面试题

1、什么情况下会发生栈内存溢出。 栈内存溢出通常发生在以下几种情况中&#xff1a; 函数递归调用过深&#xff1a; 当函数递归调用自身且没有合适的退出条件时&#xff0c;每次递归调用都会在栈上分配一个新的栈帧来存储局部变量、返回地址等信息。如果递归层次过多&#xff…

[数据集][图像分类]棉花叶子病害分类数据集2293张4类别

数据集类型&#xff1a;图像分类用&#xff0c;不可用于目标检测无标注文件 数据集格式&#xff1a;仅仅包含jpg图片&#xff0c;每个类别文件夹下面存放着对应图片 图片数量(jpg文件个数)&#xff1a;2293 分类类别数&#xff1a;4 类别名称:["diseased_cotton_leaf"…

基于多源信息融合的巡飞弹对地目标识别与毁伤评估

源自&#xff1a;系统仿真学报 作者&#xff1a;徐艺博 于清华 王炎娟 郭策 冯世如 卢惠民 摘 要 面向利用多枚巡飞弹对地面高防御移动目标进行打击的任务场景&#xff0c;提出一种基于多源信息融合的巡飞弹对地移动目标识别与毁伤评估方法。基于IoU判定实现红外图像与可…

E2697A 安捷伦高阻抗适配器

181/2461/8938描述: E2697A高阻抗适配器允许将需要高阻抗输入的探头&#xff08;例如无源探头、电流探头&#xff09;连接到Infiniium 54850、80000和90000系列高性能示波器。E2697A高阻抗适配器扩展了Agilent Infiniium高性能示波器的功能&#xff0c;使其成为电源、逆变器、…

闭包的理解和使用场景

文章目录 一、是什么二、使用场景柯里化函数使用闭包模拟私有方法其他 三、注意事项 一、是什么 一个函数和对其周围状态&#xff08;lexical environment&#xff0c;词法环境&#xff09;的引用捆绑在一起&#xff08;或者说函数被引用包围&#xff09;&#xff0c;这样的组…