【flink】之新版本kafka到kafka

前言: 

通过sinkTo()的优点:更简洁、类型安全,适用于使用 Flink 提供的预定义 sink 或简单的自定义 sink

准备:

 引入Flink 1.12版本即可

    <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>3.2.0-1.19</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.19.1</version></dependency>

创建任务:

package com.iterge.flink.job;import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author iterge* @version 1.0* @date 2024/10/30 16:20* @description kafka to kafka*/@Slf4j
public class KafkaToKafkaDemo {static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();public static void main(String[] args) throws Exception {KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("localhost:9092").setTopics("it.erge.test.topic").setGroupId("it.erge.test.topic.6").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> msg = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");//添加sinkKafkaSink<String> build = KafkaSink.<String>builder().setBootstrapServers("localhost:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("it.lph.test.topic").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();msg.sinkTo(build);env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/58672.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何配置,npm install 是从本地安装依赖

在 Node.js 中&#xff0c;要使npm install从本地安装依赖&#xff0c;可以按照以下步骤进行配置&#xff1a; 一、准备本地依赖包 确保你有本地的依赖包。这个依赖包可以是一个包含package.json文件的文件夹&#xff0c;或者是一个已经打包好的.tgz文件。 二、使用相对路径…

无套路领取《AI应用开发专栏》

最近有些时间没有更新技术文章了&#xff0c;都在忙着写《AI应用开发入门》专栏&#xff0c;专栏已整理放到了github上&#xff0c;有兴趣的小伙伴可以移步github阅读&#xff0c;地址见文末。 1、为什么写这个文档 之前陆续写了一些零散的AI相关的技术文章&#xff0c;也有不…

uni-app使用movable-area 实现数据的拖拽排序功能

文档地址 template部分 <movable-area :style"getAreaStyle"><movable-view class"table-row" v-for"v,i in move.list":key"v.id":y"v.y"change"handle_moving"direction"vertical"touchst…

Webserver(1.8)操作函数

目录 文件属性操作函数access函数chmod函数chown函数truncate函数 目录操作函数mkdir函数rmdir函数rename函数chdir函数*getcwd函数 目录遍历函数*opendir函数*readdir函数closedir函数 dup、dup2函数dupdup2 fcntl函数 文件属性操作函数 access函数 判断某个文件是否有某个权…

基础IO -- 标准错误输出stderr

目录 1&#xff09;为什么要有 fd 为 2 的 stderr 2&#xff09;使2和1重定向到一个文件中 这里我们谈一下以前只是了解过的stderr 通过两段代码&#xff0c;显然&#xff0c;我们可以知道两个FILE*都是指向显示器的 对于重定向&#xff0c;只有stdout才会将打印的数据重定向…

基于Multisim的四位抢答器设计与仿真

四位选手进行抢答比赛&#xff0c;用基本门电路及集成逻辑器件构成四人抢答器。选手编号分别为1&#xff0c;2&#xff0c;3&#xff0c;4号&#xff0c;用S1&#xff0c;S2&#xff0c;S3&#xff0c;S4四个按钮作为抢答按钮&#xff0c;S0按钮为总清零按钮。当四人中任何一个…

Python毕业设计选题:基于Django+Vue的物资配送管理系统的设计与实现

开发语言&#xff1a;Python框架&#xff1a;djangoPython版本&#xff1a;python3.7.7数据库&#xff1a;mysql 5.7数据库工具&#xff1a;Navicat11开发软件&#xff1a;PyCharm 系统展示 管理员登录界面 管理员功能界面 申领者管理 后勤处管理 物资信息管理 入库信息管理 …

网安秋招面试

《Java代码审计》http://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247484219&idx1&sn73564e316a4c9794019f15dd6b3ba9f6&chksmc0e47a67f793f371e9f6a4fbc06e7929cb1480b7320fae34c32563307df3a28aca49d1a4addd&scene21#wechat_redirect 《Web安全》h…

springboot092安康旅游网站的设计与实现(论文+源码)_kaic

毕业设计&#xff08;论文&#xff09; 基于JSP的安康旅游网站的设计与实现 姓  名 学  号 院  系 专  业 指导老师 2021 年 月 教务处制 目 录 目 录 摘 要 Abstract 第一章 绪论 1.1 研究现状 1.2 设…

selenium无头浏览器截图并以邮件发送

selenium 4.11版本的selenium无需在本地下载浏览器驱动 # Time : 2024/10/18 17:54 # Author : # FileName: print_sc.py # Description: import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.image i…

字符串逆序(c语言)

错误代码 #include<stdio.h>//字符串逆序 void reverse(char arr[], int n) {int j 0;//采用中间值法//访问数组中第一个元素和最后一个元素//交换他们的值&#xff0c;从而完成了字符串逆序//所以这个需要临时变量for (j 0; j < n / 2; j){char temp arr[j];arr[…

Fakelocation 步道乐跑(Root真机篇)

前言:需要 Fakelocation&#xff0c;真机Root,步道乐跑&#xff0c;Dia&#xff0c;MT管理器系统需求 Fakelocation | MT管理器 | Dia | 环境模块 任务一 真机Root&#xff08;德尔塔&#xff0c;过momo&#xff0c;刷环境模块&#xff09; 任务二 前往Dia查看包名&#xff08…

Vue学习笔记(十二)

async与await 1. async async作为一个关键字放到声明函数前面&#xff0c;表示该函数为一个异步任务&#xff0c;不会阻塞后面函数的执行async函数里如果有异步过程会等待&#xff0c;但async函数本身会马上返回&#xff0c;不会阻塞当前线程。可以简单认为async函数工作在主…

【实战篇】requests库 - 有道云翻译爬虫 【附:代理IP的使用】

目录 〇、引言一、目标二、请求参数分析三、响应分析四、编写爬虫脚本【隧道代理的使用】 〇、引言 无论是学习工作、旅游出行、跨境电商、日常交流以及一些专业领域都离不开翻译工具的支持。本文就带大家通过爬虫的方式开发一款属于自己的翻译工具~ 一、目标 如下的翻译接口…

MyBatis 源码分析 - SQL执行过程(三)之 ResultSetHandler

MyBatis的SQL执行过程 在前面一系列的文档中&#xff0c;我已经分析了 MyBatis 的基础支持层以及整个的初始化过程&#xff0c;此时 MyBatis 已经处于就绪状态了&#xff0c;等待使用者发号施令了 那么接下来我们来看看它执行SQL的整个过程&#xff0c;该过程比较复杂&#xff…

达梦数据迁移工具DTS使用实践

1、环境描述 2、DTS概述 1.支持视图、存储过程/函数、包、类、同义词、触发器等对象迁移&#xff1b; 2.支持数据类型的自动映射&#xff0c;编码转换&#xff1b; 3.支持根据条件自定义迁移部分数据&#xff1b; 4.向导式迁移步骤&#xff0c;上手简单&#xff1b; 5.支持 we…

【Web.路由】——路由约束

我们需要明确的一点就是&#xff0c;一个URL地址就是一个路由值。 而路由约束&#xff0c;就是制定出的一套规则&#xff0c;只有路由值与路由约束中的规则匹配&#xff0c;才可以进行下一步操作。 路由约束不仅可用于路由请求&#xff0c;还可以用于链接的生成。 参数值约束 …

医药公司常用的九大翻译场景

医药公司的翻译工作通常涉及多个专业领域&#xff0c;以下是一些常见的翻译场景&#xff1a; 1、药品说明书翻译&#xff1a;包括药物的成分、副作用、使用方法、储存条件等内容的翻译&#xff0c;必须准确无误&#xff0c;以确保患者和医务人员能够正确使用药物。 2、临床研…

VB中的安全性考虑,如防止SQL注入、XSS攻击等

在Visual Basic (VB) 开发中&#xff0c;安全性是一个至关重要的考虑因素。为了防止SQL注入、跨站脚本&#xff08;XSS&#xff09;攻击等常见安全威胁&#xff0c;开发人员需要采取一系列措施来确保应用程序的安全性。以下是对VB中安全性考虑的详细描述&#xff1a; 防止SQL注…

c++编解码封装

多态版编解码 对服务器和客户端的结构体进行序列化然后对数据进行反序列化 案例分析 代码demo Codec.h #pragma once #include <iostream>class Codec { public:Codec();virtual std::string encodeMsg();//string是标准库的string类virtual void* decodeMsg();virtu…