Flink读取mysql数据库(java)

代码如下:

package com.weilanaoli.ruge.vlink.flink;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import java.util.logging.Level;
import java.util.logging.Logger;class MysqlExample {public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("xx.xx.xx.xx")    //输入地址.port(3306)                 //输入端口.databaseList("xx")         //输入库名.tableList("xx.test")       //输入表名.username("xx")             //输入用户名.password("xxxx")           //输入密码.startupOptions(StartupOptions.initial())  //读取binlog策略,这个启动选项有五种.deserializer(new JsonDebeziumDeserializationSchema()) //配置不要锁表,但是数据一致性不是精准一次,会变成最少一次.build();//配置执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000);DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");SingleOutputStreamOperator<Object> singleOutputStreamOperator = dataStreamSource.process(new ProcessFunction<String, Object>() {@Overridepublic void processElement(String value, ProcessFunction<String, Object>.Context ctx, Collector<Object> out) {try {System.out.println("processElement=====" + value);}catch (Exception e) {e.printStackTrace();}}});dataStreamSource.print("原始数据=====");env.execute("Print MySQL Snapshot + Binlog");}
}

运行结果如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/19957.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Excel修改日期格式,改变日期的筛选方式

我们有两列日期数据&#xff1a; 左边这一列筛选会显示&#xff1a; 右边这一列筛选会显示&#xff1a; 修改格式&#xff0c;将【日期1】改为【日期2】 将【日期1】的格式修改为文本格式即可 修改格式&#xff0c;将【日期2】改为【日期1】 选中日期2&#xff0c;点击【数据…

JDK各版本重要变革

各版本更新详情 JDK8(LTS)--2014/3 语法层面 lambda表达式(重要特色之一) 一种特殊的匿名内部类,语法更加简洁允许把函数作为一个方法的参数,将代码象数据一样传递&#xff0c;即将函数作为方法参数传递基本语法: <函数式接口> <变量名> (参数...) -> { 方法…

什么是Java中的JVMTI(JVM Tool Interface)?

Java中的JNI&#xff08;Java Native Interface&#xff09;和JVMTI&#xff08;JVM Tool Interface&#xff09;都是与Java运行时环境&#xff08;JVM&#xff09;交互的工具&#xff0c;但它们有不同的目的和使用场景。下面我从新手的角度来幽默地解释一下它们的区别和用途。…

LeetCode 热题 100 JavaScript --226. 翻转二叉树

给你一棵二叉树的根节点 root &#xff0c;翻转这棵二叉树&#xff0c;并返回其根节点。 示例 3&#xff1a; 输入&#xff1a;root [] 输出&#xff1a;[] 提示&#xff1a; 树中节点数目范围在 [0, 100] 内 -100 < Node.val < 100 var invertTree function(root…

哈工大计算机网络课程网络安全基本原理详解之:消息完整性与数字签名

哈工大计算机网络课程网络安全基本原理详解之&#xff1a;消息完整性与数字签名 这一小节&#xff0c;我们继续介绍网络完全中的另一个重要内容&#xff0c;就是消息完整性&#xff0c;也为后面的数字签名打下基础。 报文完整性 首先来看一下什么是报文完整性。 报文完整性…

基于springboot+jpa+mysql+html网上中药商城系统

基于springbootjpamysqlhtml网上中药商城系统 一、系统介绍二、功能展示1.主页(客户)2.登陆&#xff08;客户&#xff09;3.注册&#xff08;客户&#xff09;4.购物车(客户)5.我的订单&#xff08;客户&#xff09;6.用户管理&#xff08;管理员&#xff09;7.分类管理&#x…

代理模式是什么

目录 代理模式 代理模式的组成 代理模式的作用 静态代理 静态代理实现步骤: 静态代理的缺点 动态代理 动态代理的实现 JDK 动态代理&#xff08;接口代理&#xff09; jdk动态代理核心 JDK 动态代理类实现步骤: CGLIB动态代理 CGLIB动态代理的核心 CGLIB 动态代理…

远程连接身份验证错误,又找不到加密Oracle修正

一、问题描述 远程连接服务器出现了错误&#xff0c;错误信息为&#xff1a;远程连接身份验证错误&#xff0c;又找不到加密Oracle修正。 二、原因分析 出错原因&#xff1a;Windows的CVE-2018-0886 的 CredSSP 更新将CredSSP 身份验证协议默认设置成了“缓解”&#xff0c;…

Informer 论文学习笔记

论文&#xff1a;《Informer: Beyond Efficient Transformer for Long Sequence Time-Series Forecasting》 代码&#xff1a;https://github.com/zhouhaoyi/Informer2020 地址&#xff1a;https://arxiv.org/abs/2012.07436v3 特点&#xff1a; 实现时间与空间复杂度为 O ( …

轻松批量文件改名!一键翻译重命名文件夹/文件,省时高效!」

繁忙的数字时代&#xff0c;我们经常需要处理大量的文件和文件夹。而手动逐个更改文件名不仅费时费力&#xff0c;还容易出错。因此&#xff0c;我们为您带来了一款强大的工具——批量文件改名软件&#xff01;现在&#xff0c;您可以一键翻译重命名文件夹和文件&#xff0c;轻…

在centos7.9安装tomcat8,并配置服务启动脚本,部署jpress应用

目录 一、简述静态网页和动态网页的区别 二、简述 Webl.0 和 Web2.0 的区别 三、 安装Tomcat8&#xff0c;配置服务启动脚本&#xff0c;部署jpress应用 3.1、Tomcat简介 3.2、安装Tomcat 3.2.1、配置环境 3.2.2、安装JDK 3.2.3、安装tomcat8 3.2.4、访问主页&#xff1…

go 如何知道一个对象是分配在栈上还是堆上?

如何判断变量是分配在栈&#xff08;stack&#xff09;上还是堆&#xff08;heap&#xff09;上&#xff1f; Go和C不同&#xff0c;Go局部变量会进行逃逸分析。如果变量离开作用域后没有被引用&#xff0c;则优先分配到栈上&#xff0c;否则分配到堆上。判断语句&#xff1a;…

数据可视化(4)散点图及面积图

1.简单散点图 #散点图 #scatter(x,y) x数据&#xff0c;y数据 x[i for i in range(10)] y[random.randint(1,10) for i in range(10)] plt.scatter(x,y) plt.show()2.散点图分析 #分析广告支出与销售收入相关性 dfcarpd.read_excel(广告支出.xlsx) dfdatapd.read_excel(销售…

1.3 eureka+ribbon,完成服务注册与调用,负载均衡源码追踪

本篇继先前发布的1.2 eureka注册中心&#xff0c;完成服务注册的内容。 目录 环境搭建 采用eurekaribbon的方式&#xff0c;对多个user服务发送请求&#xff0c;并实现负载均衡 负载均衡原理 负载均衡源码追踪 负载均衡策略 如何选择负载均衡策略&#xff1f; 饥饿加载…

怎么在ros中怎么在一个节点总把一个矩阵保存为一个参数到参数管理器中,然后在另一个节点中读取这个参数

在ROS中,可以通过Parameter Server在节点之间共享参数。要在一个节点中保存矩阵作为参数,可以使用set_param()函数: python import rospy import numpy as npmatrix np.array([[1, 2], [3, 4]]) rospy.set_param("/matrix_param", matrix.tolist())这里我们把NumPy矩…

抖音seo短视频账号矩阵系统技术开发简述

说明&#xff1a;本开发文档适用于抖音seo源码开发&#xff0c;抖音矩阵系统开发&#xff0c;短视频seo源码开发&#xff0c;短视频矩阵系统源码开发 一、 抖音seo短视频矩阵系统开发包括 抖音seo短视频账号矩阵系统的技术开发主要包括以下几个方面&#xff1a; 1.前端界面设…

unity连接MySQL数据库并完成增删改查

数据存储量比较大时&#xff0c;我就需要将数据存储在数据库中方便使用&#xff0c;尤其是制作管理系统时&#xff0c;它的用处就更大了。 在编写程序前&#xff0c;需要在Assets文件夹中创建plugins文件&#xff0c;将.dll文件导入&#xff0c;文件从百度网盘自取&#xff1a;…

使用文心一言等智能工具指数级提升嵌入式/物联网(M5Atom/ESP32)和机器人操作系统(ROS1/ROS2)学习研究和开发效率

以M5AtomS3为例&#xff0c;博客撰写效率提升10倍以上&#xff1a; 0. Linux环境Arduino IDE中配置ATOM S3_zhangrelay的博客-CSDN博客 1. M5ATOMS3基础01按键_zhangrelay的博客-CSDN博客 2. M5ATOMS3基础02传感器MPU6886_zhangrelay的博客-CSDN博客 3. M5ATOMS3基础03给RO…

【MySQL】表的增删查改

文章目录 一、创建表create二、查看表desc三、修改表3.1 修改表名alter3.2 在表中插入数据insert3.3 在表中新增字段alter3.4 修改指定列的属性alter3.5 移除表中的一列alter3.6 修改表中某一列的列名alter 四、删除表drop 一、创建表create mysql> create table if not ex…

Neo4j文档阅读笔记-Installation and Launch Guide

安装&#xff08;Windows&#xff09; ①找到下载好的Neo4j Desktop文件&#xff0c;然后双击进行安装&#xff1b; ②安装Neo4j Desktop根据下一步进行安装。 启动 ①激活 打开Neo4j Desktop应用程序后&#xff0c;将激活码输入到“Activation Key”窗口中。 ②创建数据库…