Flink读取mysql数据库(java)

代码如下:

package com.weilanaoli.ruge.vlink.flink;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.util.logging.Level;
import java.util.logging.Logger;class MysqlExample {public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("xx.xx.xx.xx")    //输入地址.port(3306)                 //输入端口.databaseList("xx")         //输入库名.tableList("xx.test")       //输入表名.username("xx")             //输入用户名.password("xxxx")           //输入密码.startupOptions(StartupOptions.initial())  //读取binlog策略,这个启动选项有五种.deserializer(new JsonDebeziumDeserializationSchema()) //配置不要锁表,但是数据一致性不是精准一次,会变成最少一次.build();//配置执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000);DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");SingleOutputStreamOperator<Object> singleOutputStreamOperator = dataStreamSource.process(new ProcessFunction<String, Object>() {@Overridepublic void processElement(String value, ProcessFunction<String, Object>.Context ctx, Collector<Object> out) {try {System.out.println("processElement=====" + value);}catch (Exception e) {e.printStackTrace();}}});dataStreamSource.print("原始数据=====");env.execute("Print MySQL Snapshot + Binlog");}
}

运行结果如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/19957.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Excel修改日期格式,改变日期的筛选方式

我们有两列日期数据&#xff1a; 左边这一列筛选会显示&#xff1a; 右边这一列筛选会显示&#xff1a; 修改格式&#xff0c;将【日期1】改为【日期2】 将【日期1】的格式修改为文本格式即可 修改格式&#xff0c;将【日期2】改为【日期1】 选中日期2&#xff0c;点击【数据…

JDK各版本重要变革

各版本更新详情 JDK8(LTS)--2014/3 语法层面 lambda表达式(重要特色之一) 一种特殊的匿名内部类,语法更加简洁允许把函数作为一个方法的参数,将代码象数据一样传递&#xff0c;即将函数作为方法参数传递基本语法: <函数式接口> <变量名> (参数...) -> { 方法…

LeetCode 热题 100 JavaScript --226. 翻转二叉树

给你一棵二叉树的根节点 root &#xff0c;翻转这棵二叉树&#xff0c;并返回其根节点。 示例 3&#xff1a; 输入&#xff1a;root [] 输出&#xff1a;[] 提示&#xff1a; 树中节点数目范围在 [0, 100] 内 -100 < Node.val < 100 var invertTree function(root…

哈工大计算机网络课程网络安全基本原理详解之:消息完整性与数字签名

哈工大计算机网络课程网络安全基本原理详解之&#xff1a;消息完整性与数字签名 这一小节&#xff0c;我们继续介绍网络完全中的另一个重要内容&#xff0c;就是消息完整性&#xff0c;也为后面的数字签名打下基础。 报文完整性 首先来看一下什么是报文完整性。 报文完整性…

基于springboot+jpa+mysql+html网上中药商城系统

基于springbootjpamysqlhtml网上中药商城系统 一、系统介绍二、功能展示1.主页(客户)2.登陆&#xff08;客户&#xff09;3.注册&#xff08;客户&#xff09;4.购物车(客户)5.我的订单&#xff08;客户&#xff09;6.用户管理&#xff08;管理员&#xff09;7.分类管理&#x…

远程连接身份验证错误,又找不到加密Oracle修正

一、问题描述 远程连接服务器出现了错误&#xff0c;错误信息为&#xff1a;远程连接身份验证错误&#xff0c;又找不到加密Oracle修正。 二、原因分析 出错原因&#xff1a;Windows的CVE-2018-0886 的 CredSSP 更新将CredSSP 身份验证协议默认设置成了“缓解”&#xff0c;…

Informer 论文学习笔记

论文&#xff1a;《Informer: Beyond Efficient Transformer for Long Sequence Time-Series Forecasting》 代码&#xff1a;https://github.com/zhouhaoyi/Informer2020 地址&#xff1a;https://arxiv.org/abs/2012.07436v3 特点&#xff1a; 实现时间与空间复杂度为 O ( …

轻松批量文件改名!一键翻译重命名文件夹/文件,省时高效!」

繁忙的数字时代&#xff0c;我们经常需要处理大量的文件和文件夹。而手动逐个更改文件名不仅费时费力&#xff0c;还容易出错。因此&#xff0c;我们为您带来了一款强大的工具——批量文件改名软件&#xff01;现在&#xff0c;您可以一键翻译重命名文件夹和文件&#xff0c;轻…

在centos7.9安装tomcat8,并配置服务启动脚本,部署jpress应用

目录 一、简述静态网页和动态网页的区别 二、简述 Webl.0 和 Web2.0 的区别 三、 安装Tomcat8&#xff0c;配置服务启动脚本&#xff0c;部署jpress应用 3.1、Tomcat简介 3.2、安装Tomcat 3.2.1、配置环境 3.2.2、安装JDK 3.2.3、安装tomcat8 3.2.4、访问主页&#xff1…

go 如何知道一个对象是分配在栈上还是堆上?

如何判断变量是分配在栈&#xff08;stack&#xff09;上还是堆&#xff08;heap&#xff09;上&#xff1f; Go和C不同&#xff0c;Go局部变量会进行逃逸分析。如果变量离开作用域后没有被引用&#xff0c;则优先分配到栈上&#xff0c;否则分配到堆上。判断语句&#xff1a;…

数据可视化(4)散点图及面积图

1.简单散点图 #散点图 #scatter(x,y) x数据&#xff0c;y数据 x[i for i in range(10)] y[random.randint(1,10) for i in range(10)] plt.scatter(x,y) plt.show()2.散点图分析 #分析广告支出与销售收入相关性 dfcarpd.read_excel(广告支出.xlsx) dfdatapd.read_excel(销售…

1.3 eureka+ribbon,完成服务注册与调用,负载均衡源码追踪

本篇继先前发布的1.2 eureka注册中心&#xff0c;完成服务注册的内容。 目录 环境搭建 采用eurekaribbon的方式&#xff0c;对多个user服务发送请求&#xff0c;并实现负载均衡 负载均衡原理 负载均衡源码追踪 负载均衡策略 如何选择负载均衡策略&#xff1f; 饥饿加载…

抖音seo短视频账号矩阵系统技术开发简述

说明&#xff1a;本开发文档适用于抖音seo源码开发&#xff0c;抖音矩阵系统开发&#xff0c;短视频seo源码开发&#xff0c;短视频矩阵系统源码开发 一、 抖音seo短视频矩阵系统开发包括 抖音seo短视频账号矩阵系统的技术开发主要包括以下几个方面&#xff1a; 1.前端界面设…

使用文心一言等智能工具指数级提升嵌入式/物联网(M5Atom/ESP32)和机器人操作系统(ROS1/ROS2)学习研究和开发效率

以M5AtomS3为例&#xff0c;博客撰写效率提升10倍以上&#xff1a; 0. Linux环境Arduino IDE中配置ATOM S3_zhangrelay的博客-CSDN博客 1. M5ATOMS3基础01按键_zhangrelay的博客-CSDN博客 2. M5ATOMS3基础02传感器MPU6886_zhangrelay的博客-CSDN博客 3. M5ATOMS3基础03给RO…

【MySQL】表的增删查改

文章目录 一、创建表create二、查看表desc三、修改表3.1 修改表名alter3.2 在表中插入数据insert3.3 在表中新增字段alter3.4 修改指定列的属性alter3.5 移除表中的一列alter3.6 修改表中某一列的列名alter 四、删除表drop 一、创建表create mysql> create table if not ex…

Neo4j文档阅读笔记-Installation and Launch Guide

安装&#xff08;Windows&#xff09; ①找到下载好的Neo4j Desktop文件&#xff0c;然后双击进行安装&#xff1b; ②安装Neo4j Desktop根据下一步进行安装。 启动 ①激活 打开Neo4j Desktop应用程序后&#xff0c;将激活码输入到“Activation Key”窗口中。 ②创建数据库…

AMEYA:尼得科科宝滑动型DIP开关CVS产品参数及价格​

日本电产尼得科科宝滑动型DIP开关CVS采用紧凑设计&#xff0c;3bit产品&#xff0c;旋钮把手高度为0.2mm&#xff0c;操作性良好端子为1mm间距&#xff0c;电路数丰富(2,3,4,8)端接样式为鸥翼式&#xff0c;J形引线使用树脂材料符合UL认证94V-0 符合RoHS规范。 日本电产尼得科科…

Vol的学习

首先学习基础用法 1.查看系统基本信息 vol.py -f 路径 imageinfo 2.查看进程命令行 vol.py -f 路径 --profile系统版本 cmdline vol.py -f 路径 --profile版本 cmdscan 3.查看进程信息 vol.py -f 路径 --profile系统 pslist 通过树的方式返回 vol.py -f 路径 --profile系统…

postgis mvt矢量切片 django drf mapboxgl

postgis mvt矢量切片 django drf mapboxgl 0.前提 [1] 静态的矢量切片可以采用 tippecanoe 生成&#xff0c;nginx代理&#xff0c;这种数据是不更新的&#xff1b; [2] 动态的矢量切片&#xff0c;一般采用postgis生成。基本上矢量切片80%的厂商都采用postgis&#xff0c;确实…

【Docker】部署 mysql8.0 无法访问

文章目录 &#x1f5fd;先来说我的是什么情况&#x1fa81;问题描述&#x1fa81;解决方法&#xff1a;✔️1 重启iptables✔️2 重启docker &#x1fa81;其他有可能连不上的原因✔️1 客户端不支持caching_sha2_password的加密方式✔️2 my.conf 配置只有本机可以访问 &#…