日志服务Flink Connector《支持Exactly Once》

摘要: Flink log connector是阿里云日志服务推出的,用于对接Flink的工具,包含两块,分别是消费者和生产者,消费者用于从日志服务中读数据,支持exactly once语义,生产者用于将数据写到日志服务中,该Connector隐藏了日志服务的一些概念,比如Shard的分裂合并等,用户在使用时只需要专注在自己的业务逻辑即可。

阿里云日志服务是针对实时数据一站式服务,用户只需要将精力集中在分析上,过程中数据采集、对接各种存储计算、数据索引和查询等琐碎工作等都可以交给日志服务完成。

日志服务中最基础的功能是LogHub,支持数据实时采集与消费,实时消费家族除 Spark Streaming、Storm、StreamCompute(Blink外),目前新增Flink啦。

图片描述

Flink Connector
Flink log connector是阿里云日志服务提供的,用于对接flink的工具,包括两部分,消费者(Consumer)和生产者(Producer)。

消费者用于从日志服务中读取数据,支持exactly once语义,支持shard负载均衡.
生产者用于将数据写入日志服务,使用connector时,需要在项目中添加maven依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.3.2</version>
</dependency>
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>flink-log-connector</artifactId><version>0.1.3</version>
</dependency>
<dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>2.5.0</version>
</dependency><dependency><groupId>com.aliyun.openservices</groupId><artifactId>aliyun-log</artifactId><version>0.6.10</version></dependency>
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>log-loghub-producer</artifactId><version>0.1.8</version>
</dependency>

代码:Github

用法
请参考日志服务文档,正确创建Logstore。
如果使用子账号访问,请确认正确设置了LogStore的RAM策略。参考授权RAM子用户访问日志服务资源。
1. Log Consumer
在Connector中, 类FlinkLogConsumer提供了订阅日志服务中某一个LogStore的能力,实现了exactly once语义,在使用时,用户无需关心LogStore中shard数
量的变化,consumer会自动感知。

flink中每一个子任务负责消费LogStore中部分shard,如果LogStore中shard发生split或者merge,子任务消费的shard也会随之改变。

1.1 配置启动参数

Properties configProps = new Properties();
// 设置访问日志服务的域名
configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com");
// 设置访问ak
configProps.put(ConfigConstants.LOG_ACCESSSKEYID, "");
configProps.put(ConfigConstants.LOG_ACCESSKEY, "");
// 设置日志服务的project
configProps.put(ConfigConstants.LOG_PROJECT, "ali-cn-hangzhou-sls-admin");
// 设置日志服务的LogStore
configProps.put(ConfigConstants.LOG_LOGSTORE, "sls_consumergroup_log");
// 设置消费日志服务起始位置
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
// 设置日志服务的消息反序列化方法
RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<RawLogGroupList> logTestStream = env.addSource(new FlinkLogConsumer<RawLogGroupList>(deserializer, configProps));

上面是一个简单的消费示例,我们使用java.util.Properties作为配置工具,所有Consumer的配置都可以在ConfigConstants中找到。

注意,flink stream的子任务数量和日志服务LogStore中的shard数量是独立的,如果shard数量多于子任务数量,每个子任务不重复的消费多个shard,如果少于,

那么部分子任务就会空闲,等到新的shard产生。

1.2 设置消费起始位置
Flink log consumer支持设置shard的消费起始位置,通过设置属性ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,就可以定制消费从shard的头尾或者某个特定时间开始消费,具体取值如下:

Consts.LOG_BEGIN_CURSOR: 表示从shard的头开始消费,也就是从shard中最旧的数据开始消费。
Consts.LOG_END_CURSOR: 表示从shard的尾开始,也就是从shard中最新的数据开始消费。
UnixTimestamp: 一个整型数值的字符串,用1970-01-01到现在的秒数表示, 含义是消费shard中这个时间点之后的数据。
三种取值举例如下:

configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000");

1.3 监控:消费进度(可选)
Flink log consumer支持设置消费进度监控,所谓消费进度就是获取每一个shard实时的消费位置,这个位置使用时间戳表示,详细概念可以参考
文档消费组-查看状态,消费组-监控报警

configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name”);

注意上面代码是可选的,如果设置了,consumer会首先创建consumerGroup,如果已经存在,则什么都不做,consumer中的snapshot会自动同步到日志服务的consumerGroup中,用户可以在日志服务的控制台查看consumer的消费进度。

1.4 容灾和exactly once语义支持
当打开Flink的checkpointing功能时,Flink log consumer会周期性的将每个shard的消费进度保存起来,当作业失败时,flink会恢复log consumer,并
从保存的最新的checkpoint开始消费。

写checkpoint的周期定义了当发生失败时,最多多少的数据会被回溯,也就是重新消费,使用代码如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启flink exactly once语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 每5s保存一次checkpoint
env.enableCheckpointing(5000);

更多Flink checkpoint的细节请参考Flink官方文档Checkpoints。

1.5 补充材料:关联 API与权限设置
Flink log consumer 会用到的阿里云日志服务接口如下:

GetCursorOrData

用于从shard中拉数据, 注意频繁的调用该接口可能会导致数据超过日志服务的shard quota, 可以通过ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS和ConfigConstants.LOG_MAX_NUMBER_PER_FETCH
控制接口调用的时间间隔和每次调用拉取的日志数量,shard的quota参考文章[shard简介](https://help.aliyun.com/document_detail/28976.html).
configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100");
configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");

ListShards

 用于获取logStore中所有的shard列表,获取shard状态等.如果您的shard经常发生分裂合并,可以通过调整接口的调用周期来及时发现shard的变化。
// 设置每30s调用一次ListShards
configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000");

CreateConsumerGroup

该接口调用只有当设置消费进度监控时才会发生,功能是创建consumerGroup,用于同步checkpoint。

ConsumerGroupUpdateCheckPoint

该接口用户将flink的snapshot同步到日志服务的consumerGroup中。

子用户使用Flink log consumer需要授权如下几个RAM Policy:

图片描述
2. Log Producer
FlinkLogProducer 用于将数据写到阿里云日志服务中。

注意producer只支持Flink at-least-once语义,这就意味着在发生作业失败的情况下,写入日志服务中的数据有可能会重复,但是绝对不会丢失。

用法示例如下,我们将模拟产生的字符串写入日志服务:

// 将数据序列化成日志服务的数据格式
class SimpleLogSerializer implements LogSerializationSchema<String> {public RawLogGroup serialize(String element) {RawLogGroup rlg = new RawLogGroup();RawLog rl = new RawLog();rl.setTime((int)(System.currentTimeMillis() / 1000));rl.addContent("message", element);rlg.addLog(rl);return rlg;}
}
public class ProducerSample {public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";public static String sAccessKeyId = "";public static String sAccessKey = "";public static String sProject = "ali-cn-hangzhou-sls-admin";public static String sLogstore = "test-flink-producer";private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class);public static void main(String[] args) throws Exception {final ParameterTool params = ParameterTool.fromArgs(args);final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setGlobalJobParameters(params);env.setParallelism(3);DataStream<String> simpleStringStream = env.addSource(new EventsGenerator());Properties configProps = new Properties();// 设置访问日志服务的域名configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint);// 设置访问日志服务的akconfigProps.put(ConfigConstants.LOG_ACCESSSKEYID, sAccessKeyId);configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey);// 设置日志写入的日志服务projectconfigProps.put(ConfigConstants.LOG_PROJECT, sProject);// 设置日志写入的日志服务logStoreconfigProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore);FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);simpleStringStream.addSink(logProducer);env.execute("flink log producer");}// 模拟产生日志public static class EventsGenerator implements SourceFunction<String> {private boolean running = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {long seq = 0;while (running) {Thread.sleep(10);ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));}}@Overridepublic void cancel() {running = false;}}
}

2.1 初始化
Producer初始化主要需要做两件事情:

初始化配置参数Properties, 这一步和Consumer类似, Producer有一些定制的参数,一般情况下使用默认值即可,特殊场景可以考虑定制:

// 用于发送数据的io线程的数量,默认是8
ConfigConstants.LOG_SENDER_IO_THREAD_COUNT
// 该值定义日志数据被缓存发送的时间,默认是3000
ConfigConstants.LOG_PACKAGE_TIMEOUT_MILLIS
// 缓存发送的包中日志的数量,默认是4096
ConfigConstants.LOG_LOGS_COUNT_PER_PACKAGE
// 缓存发送的包的大小,默认是3Mb
ConfigConstants.LOG_LOGS_BYTES_PER_PACKAGE
// 作业可以使用的内存总的大小,默认是100Mb
ConfigConstants.LOG_MEM_POOL_BYTES
上述参数不是必选参数,用户可以不设置,直接使用默认值。

重载LogSerializationSchema,定义将数据序列化成RawLogGroup的方法。

RawLogGroup是log的集合,每个字段的含义可以参考文档[日志数据模型](https://help.aliyun.com/document_detail/29054.html)。

如果用户需要使用日志服务的shardHashKey功能,指定数据写到某一个shard中,可以使用LogPartitioner产生数据的hashKey,用法例子如下:

FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
logProducer.setCustomPartitioner(new LogPartitioner<String>() {// 生成32位hash值public String getHashKey(String element) {try {MessageDigest md = MessageDigest.getInstance("MD5");md.update(element.getBytes());String hash = new BigInteger(1, md.digest()).toString(16);while(hash.length() < 32) hash = "0" + hash;return hash;} catch (NoSuchAlgorithmException e) {}return  "0000000000000000000000000000000000000000000000000000000000000000";}});

注意LogPartitioner是可选的,不设置情况下, 数据会随机写入某一个shard。

2.2 权限设置:RAM Policy
Producer依赖日志服务的API写数据,如下:

log:PostLogStoreLogs
log:ListShards
当RAM子用户使用Producer时,需要对上述两个API进行授权:

图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/523185.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

两个瓶子水怎样一样多_同事每天比我多睡两个小时!省下70万买了地铁站附近房子 杭州姑娘却感叹买房时一定是脑子进了水……...

都市快报讯 你上下班路上要多久&#xff1f;这个问题最近引起网友热议。 昨天&#xff0c;杭州市城乡建设发展研究院发布11月份城区交通运行分析&#xff1a;从上个月起&#xff0c;杭州的交通运行已经开启“冬季模式”&#xff0c;晚高峰出行进一步集中&#xff0c;拥堵程度环…

为什么电路交换不适合计算机网络,电路交换技术不适合计算机数据通信

5.2 分组交换原理上一节介绍了计算机网络的组成和网络的体系结构&#xff0c;其中通信子网的基本任务就是将数据信息从源点传送到S的点&#xff0c;在源点与目的点之间可能要经过许多个链路和中继节点。链路的功能是传输&#xff0c;而中继节点的功能是交换&#xff0c;也就是从…

Kibana:数据分析的可视化利器

摘要&#xff1a; 阿里云Elastisearch集成了可视化工具Kibana&#xff0c;用户可以使用Kibana的开发工具便捷的查询和分析存储在Elastisearch中的数据。除了柱状图、线状图、饼图、环形图等经典可视化功能外&#xff0c;还拥有地理位置分析、数据图谱分析、时序数据分析等高级功…

Kubernetes监控在小米的落地

戳蓝字“CSDN云计算”关注我们哦&#xff01;转自&#xff1a;小米云技术作者&#xff1a;郭如意本文介绍了高可用、持久存储、可动态调整的Kubernetes监控方案的实现过程。小米的弹性调度平台&#xff08;Ocean&#xff09;以及容器平台主要基于开源容器自动化管理平台kuberne…

智能机器人建房子后房价走势_深圳建二手房价引导制度,学习长沙意图明显,距离稳准狠差点意思...

#深圳建二手房价引导制度#大家好&#xff0c;我是勇谈。9月17日&#xff0c;深圳市司法局就《深圳市房地产市场监管办法(修订征求意见稿)》公开征求意见。《征求意见稿》共九章108条。对于修订监管办法的必要性&#xff0c;深圳市司法局也给出了自己的答案“《办法》的相关规定…

idea解决maven pom依赖下载失败

流程1 第一步&#xff1a;打开cmd窗口&#xff0c;进入repository本地仓库 执行命令&#xff1a; 进入本地仓库&#xff1a; cd %userprofile%\.m2\repository第二步&#xff1a;执行以下命令&#xff1a; for /r %i in (*.lastUpdated) do del %i流程2 mvn -U idea:idea

6位技术大咖11月倾心巨献,大数据+安全主题的技术分享合集【阿里云MVP 干货集锦】...

摘要&#xff1a; 大家好&#xff0c;阿里云 MVP 11月大数据安全主题分享新鲜出炉&#xff0c;快来一睹为快吧&#xff01;哪些MVP的分享最吸引你&#xff0c;你最想支持哪个MVP&#xff1f; 我们将开启为期一周的最人气内容评选&#xff0c;我们将在MVP评论内容下抽取评论最佳…

淤泥管道机器人_丛台区设备管道清洗超高效率

丛台区设备管道清洗超高效率后&#xff0c;施工前对施工人员安全措施安排完毕后&#xff0c;对检查井内剩余的砖、石、部分淤泥等残留物进行人工清理&#xff0c;直到清理完毕为止。  CCTV管道检测是指管道闭路电视检测系统可以实现排水管道的内窥检测工作&#xff1a;可以检…

华为发布开发者召集令,等你来战!

戳蓝字“CSDN云计算”关注我们哦&#xff01;曾几何时&#xff0c;“上云”只是大型企业和科技领域的特权。时至今日&#xff0c;“云”不再是漂浮空中那般缥缈。普通企业&#xff0c;甚至是个人开发者之间&#xff0c;对“上云”的谈及也如同一日三餐那么平常。对于新兴行业而…

承担集团数万应用、研发人员日常工作,阿里持续交付平台的设计、迭代之道...

摘要&#xff1a; 阿里持续交付平台已经经历了 8 年的不断迭代进化&#xff0c;成长为集团几万应用所依赖的最重要的研发工具&#xff0c;它的效率直接影响着几万研发日常工作。但平台不能只是工具的堆砌&#xff0c;更需要针对互联网时代的研发模式进行深度思考&#xff0c;不…

使用FTP下载文件connect.retrieveFileStream(filename) 获取不到InputStream流,返回null的问题

使用同事的代码做FTP下载文件&#xff0c;InputStream in connect.retrieveFileStream(fileName);执行这句时InputStream总是获取为空 后来把代码改成ftp.retrieveFileStream(new String(dirPath[1].getBytes(“UTF-8”), “ISO-8859-1”));加上字符集指定就好了&#xff0c;…

css怎么让两个table并排_关于CSS布局

水平居中水平居中可能是CSS布局中最常用到的布局&#xff0c;这里介绍几种水平居中的方式1、使用inline-block 和 text-align实现.parent{text-align: center;} .child{display: inline-block;}优点&#xff1a;兼容性好&#xff1b;不足&#xff1a;需要同时设置子元素和父元素…

边缘计算高考题!全答对就可以去华为上班!

戳蓝字“CSDN云计算”关注我们哦&#xff01;作者 | 边小缘来源 | 边缘计算社区绝密★启用前2019年普通高等学校招生全国统一考试&#xff08;边缘计算社区版&#xff09;不定向选择题。&#xff08;1010100分&#xff09;1以下哪项不是边缘计算的特点&#xff1f;A.低时延C.离…

《2017中国开发者调查报告》即将发布!你看那个人,好像一个程序员哦!

摘要&#xff1a; 2017云栖大会北京峰会期间&#xff0c;云栖社区即将重磅发布首份《2017中国开发者调查报告》&#xff0c;历时3个月的调研&#xff0c;7032人参与调查问卷&#xff0c;最终呈现出一份集开发者画像与能力的完整描绘。想了解最接地气的中国开发者现状吗&#xf…

中蜂几月份自然分蜂_蜜蜂的种类:北黑蜂,中华蜜蜂,皖南中蜂,贵州纳雍中蜂等等...

北黑蜂东北黑蜂是在闭锁优越的自然环境里通过自然选择与人工进行所培育的中国唯一的地方优良蜂种&#xff0c;分布在我国黑龙江省饶河县&#xff0c;其各项生理指标均明显优于世界四大著名蜂种&#xff0c;这是其它蜂种不可比拟的&#xff0c;也是我国乃至世界不可多得的极其宝…

2017,人工智能技术如何让中国开发者“倾心”又“上火”!

摘要&#xff1a;2017云栖大会北京峰会期间&#xff0c;云栖社区即将重磅发布首份《2017中国开发者调查报告》&#xff0c;历时3个月的调研&#xff0c;7032人参与调查问卷&#xff0c;最终呈现出一份集开发者画像与能力的完整描绘。本文就让大家先睹为快&#xff0c;分享其中关…

OpenStack入门科普,看这一篇就够啦!

戳蓝字“CSDN云计算”关注我们哦&#xff01;作者 | 小枣君来源 | 鲜枣课堂大家好&#xff0c;我是小枣君。最近几年&#xff0c;OpenStack这个词开始频繁出现&#xff0c;引起了越来越多人的关注。对于大部分人来说&#xff0c;这是一个很陌生的词&#xff0c;不知道它到底是什…

代码谱写传奇,深度揭秘中国开发者现状!

摘要&#xff1a;云栖社区重磅发布首份《2017中国开发者调查报告》&#xff0c;历时3个月的调研&#xff0c;7032人参与调查问卷&#xff0c;最终呈现出一份集开发者画像与能力的完整描绘。《报告》总结概括了中国八大开发技术领域特征&#xff0c;涵盖了Web开发、前端开发、云…

linux还原系统_怎么成为一名合格的Linux运维工程师

随着云时代的到来&#xff0c;企业对运维工程师提出了新的要求。以某银行Linux运维工程师招聘为例&#xff0c;应聘者不仅要熟悉Linux、Windows等操作系统运维&#xff0c;掌握Oracle数据库、Weblogic中间件技术及网络技术;还要具有ITSM运维体系事件流程、变更流程、服务台管理…

ERP物理机迁移至阿里云实践

摘要&#xff1a; ERP物理机迁移至阿里云实践 机房选型 随着公司的不断发展,业务量逐渐增大,对信息化的要求也越来越高,随之对信息部的要求也越来越多,为此公司决定对现有的信息系统进行升级改造. ERP物理机迁移至阿里云实践 一、机房选型 随着公司的不断发展,业务量逐渐增大,…