记录kafka-flink-kafka的end-to-end的exactly-once语义

记录kafka-flink-kafka的end-to-end的exactly-once语义

  • 步骤
  • 代码

步骤

  1. 开启checkpoint、stateBackend的设置和checkpoint配置
  2. 设置kafka source的配置
  3. 读取kafka source message
  4. 随意的transformation;并打印结果
  5. kafka sink端的配置
  6. 输出到kafka sink端
  7. 执行

代码

package com.javaye.demo.exactly;import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;/*** @Author: Java页大数据* @Date: 2024-04-11:17:59* @Describe:*  kafka - flink - kafka 验证end-to-end的exactly once*/
public class ExactlyOnce {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//        1.1. 开启checkpoint,间隔为1000L msenv.enableCheckpointing(1000L);//        1.2. stateBackend:checkpoint持久化目录if (SystemUtils.IS_OS_WINDOWS) {env.setStateBackend(new FsStateBackend("file:///D:/ckp"));} else {env.setStateBackend(new FsStateBackend("hdfs://only:9870/flink-checkpoints"));}CheckpointConfig config = env.getCheckpointConfig();
//        1.3. ckp的配置
//        1.3.1. 前后两次checkpoint的最小间隔:防止前后两次的checkpoint重叠config.setMinPauseBetweenCheckpoints(500L);
//        1.3.2. 容忍5次checkpoint失败config.setTolerableCheckpointFailureNumber(5);
//        1.3.3. job被取消时,保留外部的checkpointconfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//        1.3.4. 设置checkpoint的语义为 exactly-onceconfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//        1.3.5. 设置checkpoint的超时时间,若checkpoint超过该超时时间则说明该次checkpoint失败,丢弃该checkpointconfig.setCheckpointTimeout(60 * 1000);
//        1.3.6. 设置同一时刻允许多少个checkpoint同时执行config.setMaxConcurrentCheckpoints(1);//        1.4. 设置重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));//        2. 设置kafka source的配置String kafkaServer = "only:9092";String sourceTopic = "flink_kafka_source";String groupId = "flink_kafka_source_exactly_once";String clientIdPrefix = "flink_exactly_once";Properties kafkaSourceProp = new Properties();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers(kafkaServer).setTopics(sourceTopic).setGroupId(groupId).setClientIdPrefix(clientIdPrefix).setStartingOffsets(OffsetsInitializer.latest()) // Start from latest offset.setProperty("partition.discovery.interval.ms", "50000") // discover new partitions per 50 seconds.setProperty("auto.offset.reset", "latest").setValueOnlyDeserializer(new SimpleStringSchema())
//                执行checkpoint时提交offset到checkpoint,flink内部使用,并且提交一份到默认主题__consumer_offsets
//                .setCommitOffsetsOnCheckpoints(true) // checkpoint开启默认为true,否则为false;不支持该方法.setProperties(kafkaSourceProp).build();//        3. 读取kafka source messageDataStreamSource<String> kafkaDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "flink_kafka_exactly_once", TypeInformation.of(String.class));//        4. 随意的transformationSingleOutputStreamOperator<String> flatMapDS = kafkaDS.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(",");for (String word : words) {Random random = new Random();int i = random.nextInt(5);if (i > 3) {System.out.println("模拟出现bug...");throw new RuntimeException("模拟出现bug...");}System.out.println(word + "===" + i);out.collect(word + "===" + i);}}});//        4.1. 打印结果容易观察flatMapDS.print();//        5. kafka sink端的配置Properties kafkaSinkProp = new Properties();kafkaSinkProp.setProperty("transaction.timeout.ms", 1000 * 5 + ""); //设置事务超时时间,也可在kafka配置中设置KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers(kafkaServer).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("flink_kafka_sink").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setKafkaProducerConfig(kafkaSinkProp).build();//        6. 输出到kafka sink端flatMapDS.sinkTo(kafkaSink);//        7. 执行env.execute(ExactlyOnce.class.getName());}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/809533.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

为linux和windows系统备份还原点,防止系统出问题无法恢复

一、linux系统操作办法&#xff1a; sudo apt update sudo apt install timeshift timeshift --create 输出结果如下&#xff1a; 等待约5分钟就会创建成功&#xff1a; 这个备份功能只备份系统&#xff0c;不备份文件&#xff0c;但也不会删除文件。 工作站系统的保存位置&a…

Win10安装sqlplus遇到报错的解决办法

1.下载安装sqlplus.exe的错误解决过程 最近有用到sqlplus连接Oracle数据库执行自动化脚本&#xff0c;Orcle服务器版本是11.2.0.1。在Navicat工具上通过如下语句查询到的版本信息截图如图1所示&#xff1a; SELECT * FROM v$version; 图1 Oracle服务器版本信息 其中“Oracle Da…

Docker部署SpringBoot+Vue前后端分离项目

文章目录 1. 安装Docker1. 1 卸载旧版Docker1.2 配置yum仓库1.3 安装Docker1.4 添加自启动配置1.5 配置阿里云镜像加速1.6 测试 2. 安装Nginx2.1 拉取镜像2.2 安装Nginx2.3 测试 3. 安装MySQL3.1 拉取镜像3.2 安装MySQL3.3 连接MySQL 4. 部署SpringBoot项目4.1 Maven打包4.2 编…

深度学习Vue框架生命周期(三)

一.什么是生命周期&#xff1f; 在vue中&#xff0c;生命周期就是vue实例程序从创建到销毁的这个过程&#xff0c;在生命周期中&#xff0c;不同阶段我们可以做不同的事情。vue的生命周期是创建阶段、挂载阶段、更新阶段、销毁阶段 二.什么是钩子函数&#xff1f; 钩子函数就是…

常用网络状态码以及含义

以下是常见的网络状态码及其含义&#xff1a; 1xx&#xff08;信息类状态码&#xff09;&#xff1a; 100 Continue&#xff1a;继续。服务器已经收到客户端的部分请求&#xff0c;客户端可继续发送请求。101 Switching Protocols&#xff1a;切换协议。服务器正在根据客户端…

数据库数据恢复—Sql Server数据库文件丢失如何恢复数据?

服务器数据恢复环境&#xff1a; 一台安装windows server操作系统的服务器。一组由8块硬盘组建的RAID5&#xff0c;划分LUN供这台服务器使用。 在windows服务器内装有SqlServer数据库。存储空间LUN划分了两个逻辑分区。 服务器故障&初检&#xff1a; 由于未知原因&#xf…

Windows联网状态工具TCPView

文章目录 TCPView命令行工具更多Sysinternals Suite工具 TCPView TCPView用于显示系统上所有 TCP 和 UDP 终结点的详细列表&#xff0c;包括本地和远程地址以及 TCP 连接的状态&#xff0c;界面如下。 列表的表头含义如下 表头含义表头含义Process name应用名称Process id进程…

浅谈:从医疗元宇宙向更多实业领域的拓展

近年来&#xff0c;在各大媒体的持续曝光下&#xff0c;元宇宙这一新兴赛道受到全球资本市场的热烈追捧。更多的品牌方开始持续进军元宇宙领域营销&#xff0c;从限量 NFT 盲盒到多元游戏化场景&#xff0c;再到 VR 创意互动装置的出现&#xff0c;元宇宙市场正不断推陈出新&am…

最新Android Studio导入aar包的方法

以前的方式&#xff0c;目前看网上也大多数都是这种方式&#xff0c;导致我本地加的时候一直有问题 但是这样都无法sync以及编译通过&#xff0c;因为方式已经变了 1&#xff1a;将aar文件复制到MyApplication\app\libs下 2&#xff1a;在MyApplication\app\build.gradle下添加…

结构化面试-应急应变题

例题&#xff1a; 你是景区的工作人员&#xff0c;你在巡逻时听到游客在抱怨景区很大&#xff0c;找不到厕所&#xff0c;工作人员 态度也很恶劣&#xff0c;以后再也不来了&#xff0c;这时还有其他游客也在旁边附和&#xff0c;你怎么办&#xff1f; 回答&#xff1a;&…

oninput 和 onchange 事件的区别

oninput 和 onchange 是两个常用于处理表单元素&#xff08;如输入框、选择框等&#xff09;的 JavaScript 事件。它们的主要区别在于触发时机和触发频率。 oninput 事件 oninput 事件在用户输入内容时触发&#xff0c;这意味着每当用户在输入框中键入、删除或粘贴字符时&…

HTTP请求报文介绍

本章简要介绍渗透测试员在攻击Web应⽤程序时可能遇到的关键技术。 将分析HTTP协议、服务器和客⼾端常⽤的技术以及⽤于在各种情形下呈现数据的编码⽅案。 这些技术⼤都简单易懂&#xff0c;掌握其相关特性对于向Web应⽤程序发动有效攻击极其重要。 1.1 HTTP协议概述介绍 HTT…

VMvare进行靶场环境搭建,防火墙连接[物理主机,攻击机,靶机],主机与VM虚拟网卡拓扑形象,web连接防火墙报错

配置目标 两块虚拟网卡分别为vmnet1和vmnet8 vmnet1配置两个网段192.168.20.1/24和192.168.30.1/24 其中192.168.20.0网段将防火墙管理接口0/0/0&#xff0c;接口地址为192.168.20.100和物理机192.168.20.1/24进行连接 其中192.168.30.0网段将防火墙1/0/0接口&#xff0c;接…

智能AI写作,自动写文案效率高

随着科技的不断发展&#xff0c;人工智能领域的应用也日益广泛&#xff0c;其中智能AI写作作为一项新兴技术&#xff0c;正逐渐改变着传统文案写作的方式。智能AI写作是利用人工智能技术来生成文案内容&#xff0c;其高效率和高质量的特点吸引了越来越多的用户。在这个信息爆炸…

第十届蓝桥杯省赛真题(C/C++大学B组)

试题 A: 组队 答案&#xff1a;490 试题 B: 年号字串 #include <bits/stdc.h> using namespace std;int main() {//26进制数 int n;cin>>n;string s "111";for(int i s.length() - 1;i >0;i--){s[i] A - 1 n % 26;n / 26;}cout<<s<<…

如何发现高危的PoC和EXP?漏洞检测方法 示例,实战应急实战举例,包括:SQLi、XSS、SSTI/ELI、文件哈希、SSRF、命令执行/命令注入等等

如何发现高危的PoC和EXP?漏洞检测方法 & 示例,实战应急实战举例,包括:SQLi、XSS、SSTI/ELI、文件哈希、SSRF、命令执行/命令注入等等。 在网络安全领域,发现高危的PoC(Proof of Concept)和EXP(Exploit)对于防范和应对潜在的安全威胁至关重要。以下是关于如何发现高…

leetcode 1766

leetcode 1766 题目 例子 思路 将边的关系&#xff0c;转化为树结构。 记录val 对应的id 列表。 记录是否遍历过节点。 记录id 和对应的深度。 使用dfs&#xff0c; 从根开始遍历。 代码实现 class Solution { private:vector<vector<int>> gcds;//val : the …

AliyunCTF 2024 - BadApple

文章目录 前言环境搭建漏洞分析漏洞利用参考 前言 本文首发于看雪论坛 https://bbs.kanxue.com/thread-281291.htm 依稀记得那晚被阿里CTF支配的恐惧&#xff0c;今年的阿里CTF笔者就做了一道签到PWN题&#xff0c;当时也是下定决心要学习 jsc pwn 然后复现这道 BadApple 题目…

github克隆报错:failed: The TLS connection was non-properly terminated.

github克隆gazebo_ros_control报错 fatal: unable to access https://github.com/ros-controls/gazebo_ros_control.git/: gnutls_handshake() failed: The TLS connection was non-properly terminated. sudo apt-get install ros-noetic-gazebo-ros-control git 克隆gazeb…

力扣练习4.11

452. 用最少数量的箭引爆气球 不考虑y轴&#xff0c;可以将其转换为重叠区间的问题。将同属于一个重叠区间的小区间合并为一个区间&#xff0c;加上不重叠的区间&#xff0c;即是所求数量。 更加简化&#xff1a;如果是非重叠区间才加1&#xff0c;因为两个大的重叠区间间肯定…