使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

package flink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkSQL_CDC {public static void main(String[] args) throws Exception {//
//        Configuration conf = new Configuration();
//        conf.setInteger("rest.port",3335);
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);//1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2.创建Flink-MySQL-CDC的SourceTableResult tableResult = tableEnv.executeSql("CREATE TABLE table_name (" +"  id INT primary key," +"  name STRING" +") WITH (" +"  'connector' = 'mysql-cdc'," +"  'hostname' = 'hadoop102'," +"  'port' = '3306'," +"  'username' = 'root'," +"  'password' = 'xxxx'," +"  'database-name' = 'student'," +"  'table-name' = 'table_name'," +"'server-time-zone' = 'Asia/Shanghai'," +"'scan.startup.mode' = 'initial'" +")");// 2. 注册SinkTable: sink_sensor
//        tableEnv.executeSql("" +
//                "CREATE TABLE kafka_binlog ( " +
//                "  user_id INT, " +
//                "  user_name STRING, " +
//                "`proc_time` as PROCTIME()" +
//                ") WITH ( " +
//                "  'connector' = 'kafka', " +
//                "  'topic' = 'test2', " +
//                "  'properties.bootstrap.servers' = 'hadoop102:9092', " +
//                "  'format' = 'json' " +
//                ")" +
//                "");//upsert-kafkatableEnv.executeSql("" +"CREATE TABLE kafka_binlog ( " +"  user_id INT, " +"  user_name STRING, " +"`proc_time` as PROCTIME()," +"  PRIMARY KEY (user_id) NOT ENFORCED" +") WITH ( " +"  'connector' = 'upsert-kafka', " +"  'topic' = 'test2', " +"  'properties.bootstrap.servers' = 'hadoop102:9092', " +"  'key.format' = 'json' ," +"  'value.format' = 'json' " +")" +"");// 3. 从SourceTable 查询数据, 并写入到 SinkTabletableEnv.executeSql("insert into kafka_binlog select * from table_name");tableEnv.executeSql("select * from kafka_binlog").print();env.execute();}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/58882.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

adb 查找应用包名,应用 Activity 等信息

列出设备上的包 不使用参数:adb shell pm list packages,打印设备/模拟器上的所有软件包 根据包名查看应用的activity 命令: dumpsys package 包名 adb shell dumpsys package 包名 petrel-cv96d:/data/app # dumpsys package com.instal…

自然语言处理2-NLP

目录 自然语言处理2-NLP 如何把词转换为向量 如何让向量具有语义信息 在CBOW中 在Skip-gram中 skip-gram比CBOW效果更好 CBOW和Skip-gram的算法实现 Skip-gram的理想实现 Skip-gram的实际实现 自然语言处理2-NLP 在自然语言处理任务中,词向量(…

运维Shell脚本小试牛刀(一)

运维Shell脚本小试牛刀(一) 运维Shell脚本小试牛刀(二) 一: Shell中循环剖析 for 循环....... #!/bin/bash - # # # # FILE: countloop.sh # USAGE: ./countloop.sh # DESCRIPTION: # OPTIONS: ------- # …

c++ boost::json

Boost社区12月11日发布了1.75版本,在之前,​​Boost使用Boost.PropertyTree解析​​JSON​​​,​​XML​​​,​​INI​​​和​​INFO​​​格式的文件。但是由于成文较早及需要兼容其他的数据格式,相比较于其他的​…

秒懂算法2

视频链接 : 希望下次秒懂的是算法题_哔哩哔哩_bilibili P1094 [NOIP2007 普及组] 纪念品分组 原题链接 : [NOIP2007 普及组] 纪念品分组 - 洛谷 思路 : 排序 贪心 双指针首先先对输入进来的数组进行排序(由小到大)运用贪心的思想 : 前后结合,令l1,rn,若a[l]a[r]<w…

讲解人工智能在现代科技中的应用和未来发展趋势

人工智能是现代科技中的一个重要分支&#xff0c;其应用涵盖了各个领域&#xff0c;如医疗、金融、制造业、交通运输、安防、教育、游戏等。在医疗领域&#xff0c;人工智能可以通过分析医学图像、实时监控病人状况、制定诊疗方案等方面为医生提供支持与帮助。在金融领域&#…

git 的常用命令

git是一个版本管理器&#xff0c;是程序员必备工具之一&#xff0c;其主分为三个区&#xff1a; 工作区&#xff1a; 暂存区&#xff1a; 仓库&#xff1a; 通过保持软件版本&#xff0c;分支&#xff0c;合并&#xff0c;等多种版本操作&#xff0c;使软件能在自己想要的版本…

arm版Linux下安装es集群

背景&#xff1a;由于生产上网络没通&#xff0c;没办法&#xff0c;只能自己安装一个es集群的测试环境了&#xff0c;我的电脑是Mac M2&#xff0c;安装的Linux是centos7&#xff0c;也是arm版的。 第一步&#xff1a;查看自己Linux系统的版本 命令&#xff1a;uname -a 例如…

分布式 - 服务器Nginx:一小时入门系列之 rewrite 指令

文章目录 1. rewrite 指令语法2. rewrite 指令示例3. 不使用 last 和 break 重写规则4. 使用 break 重写规则5. 使用 last 重写规则 1. rewrite 指令语法 nginx的rewrite指令用于重写URL&#xff0c;可以将一个URL重写为另一个URL。它的语法如下&#xff1a; rewrite regex r…

设置微软Edge浏览器主页和新标签页,摆脱扰人和分散注意力的主页

默认情况下&#xff0c;Microsoft Edge会向您显示世界上最令人分心和讨厌的主页&#xff08;也称为主屏幕&#xff09;。微软不想只向你展示一个搜索框&#xff0c;也许还有一个漂亮的背景或一些你喜欢的网站的快捷方式&#xff0c;而是想在你面前扔一堆新闻标题和广告。 你可…

Linux 终端命令行 产品介绍

Linux命令手册内置570多个Linux 命令&#xff0c;内容包含 Linux 命令手册。 【软件功能】&#xff1a; 文件传输 bye、ftp、ftpcount、ftpshut、ftpwho、ncftp、tftp、uucico、uucp、uupick、uuto、scp备份压缩 ar、bunzip2、bzip2、bzip2recover、compress、cpio、dump、gun…

全新抖音快手小红书去水印系统网站源码 | 支持几十种平台

全新抖音快手小红书去水印系统网站源码 | 支持几十种平台

视频行为分析——视频图像转换与ffmpeg相关操作

工具类说明 1. 图像视频转换 1.1 视频输出gif from moviepy.editor import VideoFileClip # 设置输入视频文件路径和输出GIF文件路径 input_video video.avi output_gif output.gif # 读取视频文件 video VideoFileClip(input_video) # 将视频保存为GIF文件 video.write_…

微信小程序左上角home图标的解决方法之一 层级混乱导致的home图标显示的问题 自定义左上角左侧图标的返回路径

这个项目的编辑页在tabbar上 导致跳到tabbar得使用wx.switchTab 保存后返回原来的页面就出现了左上角的home图标 本来想通过自定义home图标的跳转路径来解决这个问题 没想到居然找不到相关内容 有清楚的朋友麻烦给我留个言不胜感激 那我写一下我的骚操作 app.js globalData: {…

idea2023远程调试

使用idea2023版远程调试springboot 1.添加remote jvm debug 2.填写远程ip, 选择要调试项目 3.将 参数附带到需要远程调试的jar 命令之前, 重启即可, 命令一定要在jar包之前 , -agentlib:jdwptransportdt_socket,servery,suspendn,address5005即 java -agentlib:jdwptranspor…

Hive 服务管理脚本

#!/bin/bash HIVE_HOME/opt/software/hive-3.1.3 HIVE_LOG_HOME/opt/software/hive-3.1.3/logfunction checkLogDir {if [[ ! -e ${HIVE_LOG_HOME} ]]; thenecho "${HIVE_LOG_HOME} 目录不存在&#xff0c;正在创建。"mkdir -p ${HIVE_LOG_HOME}fi }function checkHi…

我们的第一个 Qt 窗口程序

Qt 入门实战教程&#xff08;目录&#xff09; Windows Qt 5.12.10下载与安装 为何使用Qt Creator开发QT 本文介绍用Qt自带的集成开发工具Qt Creator创建Qt默认的窗口程序。 本文不需要你另外安装Visual Studio 2022这样的集成开发环境&#xff0c;也不需要你再在Visual St…

docker发布项目及使用外部文件的情况处理

适用docker环境已搭建好 首先项目打jar包&#xff1a;server-cdzh-2.1.0-SNAPSHOT.jar 创建Dockerfile FROM java:8 ADD server-cdzh-2.1.0-SNAPSHOT.jar cdzh.jar EXPOSE 60156 ENTRYPOINT ["java","-jar","/cdzh.jar"] 在linux服务器新建…

【python】【centos】使用python杀死进程后自身也会退出

问题 使用python杀死进程后自身程序也会退出&#xff0c;无法执行后边的代码 这样不行&#xff1a; # cmd " ps -ef | grep -v grep | grep -E task_pull_and_submit.py$|upgrade_system.py$| awk {print $2}"# pids os.popen(cmd).read().strip(\n).split(\n)# p…

JPA实体类中使用联合主键

参考链接&#xff1a;JPA Primary Key 业务场景&#xff1a; 实体类Aaabc中需要将id1、id2作为联合主键来使用 方式一&#xff1a;使用IdClass 首先定义IdClass类 import lombok.Data; import java.io.Serializable;Data public class AaabcIdClass implements Serializable …