状态的一致性和FlinkSQL

状态一致性

一致性其实就是结果的正确性。精确一次是指数据有可能被处理多次,但是结果只有一个。
三个级别:

  1. 最多一次:1次或0次,有可能丢数据
  2. 至少一次:1次或n次,出错可能会重试
    • 输入端只要可以做到数据重放,即在出错后,可以重新发送一样的数据
  3. 精确一次:数据只会发送1次
    • 幂等写入:多次重复操作不影响结果,有可能出现某个值由于数据重放,导致结果回到原先的值,然后逐渐恢复。
    • 预写日志:
      1. 先把结果数据作为日志状态保存起来
      2. 进行检查点保存时,也会将这些结果数据一并做持久化存储
      3. 在收到检查点完成的通知时,将所有结果数据一次性写入外部系统
    • 预写日志缺点:这种再次确认的方式,如果写入成功返回的ack出现故障,还是会出现数据重复。
    • 两阶段提交(2PC):数据写入过程和数据提交分为两个过程,如果写入过程没有发生异常,就将事务进行提交。
      • 算子节点在收到第一个数据时,就开启一个事务,然后提交数据,在下一个检查点到达前都是预写入,如果下一个检查点正常,再进行最终提交。
      • 对外部系统有一定的要求,要能够识别事务ID,事务的重复提交应该是无效的。
      • 即barrier到来时,如果结果一致,就提交事务,否则进行事务回滚

Flink和Kafka连接时的精确一次保证

  • 开启检查点
  • 开启事务隔离级别,读已提交
  • 注意设置kafka超时时间为10分钟
public class Flink02_KafkaToFlink {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);//开启检查点env.enableCheckpointing(1000L);//kafka sourceKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092").setGroupId("flinkb").setTopics("topicA")//优先使用消费者组 记录的Offset进行消费,如果offset不存在,根据策略进行重置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)).setValueOnlyDeserializer(new SimpleStringSchema())//如果还有别的配置需要指定,统一使用通用方法.setProperty("isolation.level", "read_committed").build();DataStreamSource<String> ds = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkasource");//处理过程//kafka SinkKafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092").setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("first").setValueSerializationSchema(new SimpleStringSchema()).build())//语义//AT_LEAST_ONCE:至少一次,表示数据可能重复,需要考虑去重操作//EXACTLY_ONCE:精确一次//kafka transaction timeout is larger than broker//kafka超时时间:1H//broker超时时间:15分钟//                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)//数据传输的保障.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)//数据传输的保障.setTransactionalIdPrefix("flink"+ RandomUtils.nextInt(0,100000))
//                .setProperty(ProducerConfig.RETRIES_CONFIG,"10").setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"60*1000*10")//10分钟.build();ds.map(JSON::toJSONString).sinkTo(kafkaSink);//写入到kafka 生产者ds.sinkTo(kafkaSink);try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

FlinkSQL1.17

FlinkSQL不同版本的接口仍在变化,有变动查看官网。
在官网这个位置可以查看Flink对于以来的一些官方介绍。
在这里插入图片描述
Table依赖剖析
三个依赖:
1. flink-table-api-java-uber-1.17.2.jar (所有的Java API)
2. flink-table-runtime-1.17.2.jar (包含Table运行时)
3. flink-table-planner-loader-1.17.2.jar (查询计划器,即SQL解析器)

静态导包:在import后添加static,并在类后面加上*导入全部。主要是为了方便使用下面的 $ 方法,否则 $ 方法前面都要添加Expressions的类名前缀

table.where($("vc").isGreaterOrEqual(100)).select($("id"),$("vc"),$("ts")).execute().print();

程序架构

  1. 准备环境
    • 流表环境:基于流创建表环境
    • 表环境:从操作层面与流独立,底层处理还是流
  2. 创建表
    • 基于流:将流转换为表
    • 连接器表
  3. 转换处理
    • 基于Table对象,使用API进行处理
    • 基于SQL的方式,直接写SQL处理
  4. 输出
    • 基于Table对象或连接器表,输出结果
    • 表转换为流,基于流的方式输出

流处理中的表

  • 处理的数据对象
    • 关系:字段元组的有界集合
    • 流处理:字段元组的无限序列
  • 对数据的访问
    • 关系:可以得到完整的
    • 流处理:数据是动态的

因此处理过程中的表是动态表,必须要持续查询。

流表转换

持续查询

  • 追加查询:窗口查询的结果通过追加的方式添加到表的末尾,使用toDataStream
  • 更新查询:窗口查询的结果会对原有的结果进行修改, 使用toChangeLogStream
  • 如果不清楚是什么类型,直接使用toChangeLogSteam()将表转换为流
public class Flink04_TableToStreamQQ {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);SingleOutputStreamOperator<Event> ds = env.socketTextStream("hadoop102", 8888).map(line -> {String[] fields = line.split(",");return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));});Table table = tableEnv.fromDataStream(ds);tableEnv.createTemporaryView("t1", table);//SQLString appendSQL = "select user, url, ts from t1 where user <> 'zhangsan'";//需要在查询过程中更新上一次的值String updateSQL = "select user, count(*) cnt from t1 group by user";Table resultTable = tableEnv.sqlQuery(updateSQL);//表转换为流//doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[user], select=[user, COUNT(*) AS cnt])
//        DataStream<Row> rowDs = tableEnv.toDataStream(resultTable);//有更新操作时,使用toChangelogStream(),它即支持追加,也支持更新查询DataStream<Row> rowDs = tableEnv.toChangelogStream(resultTable);rowDs.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

将动态表转换为流

  • 仅追加流:如果表的结果都是追加查询
  • Retract撤回流:
    • 包含两类消息,添加消息和撤回消息
    • 下游需要根据这两类消息进行处理
  • 更新插入流:
    • 两种消息:更新插入消息(带key)和删除消息

连接器

  • DataGen和Print连接器
public class Flink01_DataGenPrint {public static void main(String[] args) {//TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());//1. 准备表环境, 基于流环境,创建表环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//DataGenString createTable =" create table t1 ( " +"  id STRING , " +"  vc INT ," +"  ts BIGINT " +" ) WITH (" +"  'connector' = 'datagen' ,"  +"  'rows-per-second' = '1' ," +"  'fields.id.kind' = 'random' , " +"  'fields.id.length' = '6' ," +"  'fields.vc.kind' = 'random' , " +"  'fields.vc.min' = '100' , " +"  'fields.vc.max' = '1000' ," +"  'fields.ts.kind' = 'sequence' , " +"  'fields.ts.start' = '1000000' , " +"  'fields.ts.end' = '100000000' " +" )" ;tableEnv.executeSql(createTable);//Table resultTable = tableEnv.sqlQuery("select * from t1 where vc >= 200");//.execute().print();//printString sinkTable ="create table t2(" +"id string," +"vc int," +"ts bigint" +") with (" +"   'connector' = 'print', " +"   'print-identifier' = 'print>' " +")";tableEnv.executeSql(sinkTable);tableEnv.executeSql("insert into t2 select id, vc, ts from t1 where vc >= 200");}
}
  • 文件连接器
public class Flink02_FileConnector {public static void main(String[] args) {TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().build());//FileSourceString sourceTable =" create table t1 ( " +"  id STRING , " +"  vc INT ," +"  ts BIGINT," +//"  `file.name` string not null METADATA," + 文件名字由于系统原因无法识别盘符后面的冒号"  `file.size` bigint not null METADATA" +" ) WITH (" +"  'connector' = 'filesystem' ,"  +"  'path' = 'input/ws.txt' ,"  +"  'format' = 'csv' "  +" )" ;tableEnvironment.executeSql(sourceTable);//tableEnvironment.sqlQuery(" select * from t1 ").execute().print();//转换处理...//File sinkString sinkTable =" create table t2 ( " +"  id STRING , " +"  vc INT ," +"  ts BIGINT," +//"  `file.name` string not null METADATA," + 文件名字由于系统原因无法识别盘符后面的冒号"  file_size bigint" +" ) WITH (" +"  'connector' = 'filesystem' ,"  +"  'path' = 'output' ,"  +"  'format' = 'json' "  +" )" ;tableEnvironment.executeSql(sinkTable);tableEnvironment.executeSql("insert into t2 " +"select id, vc, ts, `file.size` from t1");}
}
  • kafka连接器
public class Flink03_KafkaConnector {public static void main(String[] args) {TableEnvironment tableEnvironment = TableEnvironment.create(EnvironmentSettings.newInstance().build());//kafka sourceString sourceTable =" create table t1 ( " +"  id STRING , " +"  vc INT ," +"  ts BIGINT," +"  `topic` string not null METADATA," +"  `partition` int not null METADATA," +"  `offset` bigint not null METADATA" +" ) WITH (" +"  'connector' = 'kafka' ,"  +"  'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092' ,"  +"  'topic' = 'topicA', "  +"  'properties.group.id' = 'flinksql', "  +"  'value.format' = 'csv', "  +"  'scan.startup.mode' = 'group-offsets',"  +"  'properties.auto.offset.reset' = 'latest' "  +" )" ;//创建表tableEnvironment.executeSql(sourceTable);//打印查询结果//tableEnvironment.sqlQuery(" select * from t1 ").execute().print();//转换处理...//kafka SinkString sinkTable =" create table t2 ( " +"  id STRING , " +"  vc INT ," +"  ts BIGINT," +"  `topic` string " +" ) WITH (" +"  'connector' = 'kafka' ,"  +"  'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092' ,"  +"  'topic' = 'topicB', "  +"  'sink.delivery-guarantee' = 'at-least-once', "  +//"  'properties.transaction.timeout.ms' = '', "  +//"  'sink.transactional-id-prefix' = 'xf', "  +//"  'properties.group.id' = 'flinksql', "  +"  'value.format' = 'json' "  +//"  'scan.startup.mode' = 'group-offsets',"  +//"  'properties.auto.offset.reset' = 'latest' "  +" )" ;tableEnvironment.executeSql(sinkTable);tableEnvironment.executeSql("insert into t2 " +"select id, vc, ts, `topic` from t1");}
}
  • Jdbc连接器

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/225091.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[每周一更]-(第27期):HTTP压测工具之wrk

[补充完善往期内容] wrk是一款简单的HTTP压测工具,托管在Github上,https://github.com/wg/wrkwrk 的一个很好的特性就是能用很少的线程压出很大的并发量. 原因是它使用了一些操作系统特定的高性能 io 机制, 比如 select, epoll, kqueue 等. 其实它是复用了 redis 的 ae 异步事…

Android APP 常见概念与 adb 命令

adb 的概念 adb 即 Android Debug Bridge 。在窗口输入 adb 即可显示帮助文档。adb 实际上就是在后台开启一个 server&#xff0c;会接收 adb 的命令然后帮助管理&#xff0c;控制&#xff0c;查看设备的状态、信息等&#xff0c;是开发、测试 Android 相关程序的最常用手段。…

Centos系统pnpm升级报错 ERR_PNPM_NO_GLOBAL_BIN_DIR

在 CentOS 系统中使用 pnpm i -g pnpm 报错&#xff1a;ERR_PNPM_NO_GLOBAL_BIN_DIR Unable to find the global bin directory&#xff0c;折腾半天终于解决了。 完整报错信息 [rootVM-8 test]# pnpm i -g pnpm Nothing to stop. No server is running for the store at /roo…

linux20day 排序sort 字符处理cut cpu使用占比排序 awk文本数据处理

目录 1、排序sort参数用法排序&#xff08;-n&#xff09;从大到小 倒叙&#xff08;-r&#xff09; cpu使用占比排序&#xff08;ps aux --sort -%cpu&#xff09; 2、截取到某个字符串 cut3、awk处理文本文件用法&#xff1a;打印等于 和不等于 1、排序sort 经常用于排序 参…

数据分析的基本步骤

了解过数据分析的概念之后&#xff0c;我们再来说下数据分析的常规步骤。 明确目标 首先我们要确定一个目标&#xff0c;即我们要从数据中得到什么。比如我们要看某个指标A随时间的变化趋势&#xff0c;以期进行简单的预测。 数据收集 当确定了目标之后&#xff0c;就有了取…

js逆向-JS加密破解进阶

目录 一、JS逆向进阶一&#xff1a;破解AES加密 &#xff08;一&#xff09;AES对称加密算法原理 &#xff08;二&#xff09;破解AES加密 &#xff08;三&#xff09;实战&#xff1a;发现报告网 二、JS逆向进阶二&#xff1a;破解RSA加密 &#xff08;一&#xff09;RS…

gRPC 一种现代、开源、高性能的远程过程调用 (RPC) 可以在任何地方运行的框架

背景介绍 gRPC 是一种现代开源高性能远程过程调用 &#xff08;RPC&#xff09; 可以在任何环境中运行的框架。它可以有效地连接服务 在数据中心内和数据中心之间&#xff0c;具有对负载平衡、跟踪、 运行状况检查和身份验证。它也适用于最后一英里 分布式计算&#xff0c;用于…

P20类神经网络训练不起来怎么办?- 批次和动量

什么是batchsmall batch 和 large batch 的比较 &#xff1a; large batch 更快&#xff0c;small batch 在训练集和测试集上效果效果更好动量的意义和作用&#xff1a; 类似于物理上多了一点惯性&#xff0c;防止困在鞍点。 动量是之前所有梯度的加权和。 1. batch 是什么 …

高压电气是什么

高压电气 电工电气百科 文章目录 高压电气前言一、高压电气是什么二、高压电气的类别三、高压电气的作用原理总结前言 高压电气在电力系统中起着重要的作用,它能够将电能有效地输送和分配到各个用户,为社会和工业生产提供稳定可靠的电力供应。然而,高压电气系统也需要注意安…

Mr_HJ / form-generator项目文档学习与记录(续)

以后主打超融开源社区 (jiangzhicheng88) - Gitee.com render.js就是对vue的render函数的自己简单定制封装。 render.js实现的功能是将json表单中的__config__.tag解析为具体的vue组件&#xff1b; 正常开发流程我们组件输入的时候会触发组件内的 this.$emit(getValue, val)…

PyQt6 安装Qt Designer

前言&#xff1a;在Python自带的环境下&#xff0c;安装Qt Designer&#xff0c;并在PyCharm中配置designer工具。 在项目开发中&#xff0c;使用Python虚拟环境安装PyQt6-tools时&#xff0c;designer.exe会安装在虚拟环境的目录中&#xff1a;.venv\Lib\site-packages\qt6_a…

NPM开发工具的简介和使用方法及代码示例

NPM&#xff08;Node Package Manager&#xff09;是Node.js的包管理工具&#xff0c;用于管理和共享被发布到模块仓库的JavaScript代码。本文将介绍NPM的定义、使用方法、代码示例以及总结。 一、NPM的定义 NPM是Node.js的默认包管理工具&#xff0c;它的功能包括安装、管理、…

机器学习算法---回归

1. 线性回归&#xff08;Linear Regression&#xff09; 原理&#xff1a; 通过拟合一个线性方程来预测连续响应变量。线性回归假设特征和响应变量之间存在线性关系&#xff0c;并通过最小化误差的平方和来优化模型。优点&#xff1a; 简单、直观&#xff0c;易于理解和实现。…

【日常笔记】notepad++ 正则表达式基本用法

一、场景 二、正则表达式--语法 2.1、学习基本的匹配字符&#xff1a; 2.2、学习特殊字符和量词&#xff1a; 2.3、学习转义字符 2.4、学习分组和捕获 2.5、区分大小写 和 匹配整个单词 2.6、引用分组 三、实战 ▶ 希望把课程目录中 -- 前面的都去掉 一、场景 希望把…

Jrebel 在 Idea 2023.3中无法以 debug 的模式启动问题

Jrebel 在 Idea 2023.3中无法以 debug 的模式启动问题 Idea 在升级了2023.3以后&#xff0c;Jrebel 无法以 debug 的模式启动&#xff0c;找了半天&#xff0c;最后在插件主页的评论区找到了解决方案 特此记录一下

Dockerfile:创建镜像,创建自定义的镜像。

Docker的创建镜像的方式&#xff1a; 基于已有镜像进行创建。 根据官方提供的镜像源&#xff0c;创建镜像&#xff0c;然后拉起容器。是一个白板&#xff0c;只能提供基础的功能&#xff0c;扩展性的功能还是需要自己定义&#xff08;进入容器进行操作&#xff09; 基于模板进…

SpringBoot 基础概念:SpringApplication#getSpringFactoriesInstances

SpringBoot 基础概念&#xff1a;SpringApplication#getSpringFactoriesInstances SpringApplication#getSpringFactoriesInstances SpringApplication#getSpringFactoriesInstances private <T> Collection<T> getSpringFactoriesInstances(Class<T> type,…

在 Spring Boot 中发送邮件简单实现

Spring Boot 对于发送邮件这种常用功能也提供了开箱即用的 Starter&#xff1a;spring-boot-starter-mail。 通过这个 starter&#xff0c;只需要简单的几行配置就可以在 Spring Boot 中实现邮件发送&#xff0c;可用于发送验证码、账户激活等等业务场景。 本文将通过实际的案…

【AI美图】第03期效果图,AI人工智能全自动绘画,二次元美图欣赏

带来一组二次元人工智能自动绘图 对比分析&#xff1a; 标题手画二次元需要技巧&#xff1a; 二次元高清图片的绘制技巧主要包括以下几点&#xff1a; 线条的运用&#xff1a;在二次元风格的绘画中&#xff0c;线条的运用非常重要。要绘制出流畅、细腻的线条&#xff0c;需…

用于自动驾驶的基于深度学习的图像 3D 物体检测:综述

论文地址&#xff1a;https://ieeexplore.ieee.org/abstract/document/10017184/ 背景 准确、鲁棒的感知系统是理解自动驾驶和机器人驾驶环境的关键。自动驾驶需要目标的 3D 信息&#xff0c;包括目标的位置和姿态&#xff0c;以清楚地了解驾驶环境。 摄像头传感器因其颜色和…