flink mysql数据表同步API CDC

概述:

CDC简介 Change Data Capture

API CDC同步数据代码

package com.yclxiao.flinkcdcdemo.api;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.yclxiao.flinkcdcdemo.util.JdbcUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.TimeZone;/*** league.oc_settle_profit -> cloud.dws_profit_record_hdj* API方式*/
public class Wfg2userApi {private static final Logger LOG = LoggerFactory.getLogger(Wfg2userApi.class);private static String MYSQL_HOST = "192.168.1.12";private static int MYSQL_PORT = 3306;private static String MYSQL_USER = "root";private static String MYSQL_PASSWD = "123456";private static String SYNC_DB = "zentao";private static List<String> SYNC_TABLES = Arrays.asList("zentao.zt_group");public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname(MYSQL_HOST).port(MYSQL_PORT).databaseList(SYNC_DB) // set captured database.tableList(String.join(",", SYNC_TABLES)) // set captured table.username(MYSQL_USER).password(MYSQL_PASSWD).deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);env.enableCheckpointing(5000);DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "CDC Source" + Wfg2userApi.class.getName());List<String> tableList = getTableList();for (String tbl : tableList) {SingleOutputStreamOperator<String> filterStream = filterTableData(cdcSource, tbl);SingleOutputStreamOperator<String> cleanStream = clean(filterStream);SingleOutputStreamOperator<String> logicStream = logic(cleanStream);logicStream.addSink(new CustomDealDataSink());}env.execute(Wfg2userApi.class.getName());}private static class CustomDealDataSink extends RichSinkFunction<String> {private transient Connection cloudConnection;private transient PreparedStatement cloudPreparedStatement;private String insertSql = "INSERT INTO `zentao_zt_group` (`id`, `project`, `vision`, `name`, `role`, `desc`, `acl`, `developer`) \n" +"      VALUES (?, ?, ?, ?, ?, ?, ?, ?)";private String deleteSql = "delete from zentao_zt_group where id = '%s'";@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 在这里初始化 JDBC 连接cloudConnection = DriverManager.getConnection("jdbc:mysql://" + MYSQL_HOST + ":3306/wfg", "root", "123456");cloudPreparedStatement = cloudConnection.prepareStatement(insertSql);}@Overridepublic void invoke(String value, Context context) throws Exception {JSONObject dataJson = JSON.parseObject(value);Long id = dataJson.getLong("id");Integer project = dataJson.getInteger("project");String vision = dataJson.getString("vision");String name = dataJson.getString("name");String role = dataJson.getString("role");String desc = dataJson.getString("desc");String acl = dataJson.getString("acl");Integer developer = dataJson.getInteger("developer");cloudPreparedStatement.setLong(1, id);cloudPreparedStatement.setInt(2, project);cloudPreparedStatement.setString(3, vision);cloudPreparedStatement.setString(4, name);cloudPreparedStatement.setString(5, role);cloudPreparedStatement.setString(6, desc);cloudPreparedStatement.setString(7, acl);cloudPreparedStatement.setInt(8, developer);cloudPreparedStatement.execute(String.format(deleteSql, id));cloudPreparedStatement.execute();}@Overridepublic void close() throws Exception {super.close();// 在这里关闭 JDBC 连接cloudPreparedStatement.close();cloudConnection.close();}}/*** 处理逻辑:过滤掉部分数据** @param cleanStream* @return*/private static SingleOutputStreamOperator<String> logic(SingleOutputStreamOperator<String> cleanStream) {return cleanStream.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String data) throws Exception {try {
//                    JSONObject dataJson = JSON.parseObject(data);
//                    String id = dataJson.getString("id");
//                    Integer bizType = dataJson.getInteger("biz_type");
//                    if (StringUtils.isBlank(id) || bizType == null) {
//                        return false;
//                    }// 只处理上岗卡数据
//                    return bizType == 9;return true;} catch (Exception ex) {LOG.warn("filter other format binlog:{}", data);return false;}}});}/*** 清晰数据** @param source* @return*/private static SingleOutputStreamOperator<String> clean(SingleOutputStreamOperator<String> source) {return source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String row, Collector<String> out) throws Exception {try {LOG.info("============================row:{}", row);JSONObject rowJson = JSON.parseObject(row);String op = rowJson.getString("op");//history,insert,updateif (Arrays.asList("r", "c", "u").contains(op)) {out.collect(rowJson.getJSONObject("after").toJSONString());} else {LOG.info("filter other op:{}", op);}} catch (Exception ex) {LOG.warn("filter other format binlog:{}", row);}}});}/*** 过滤数据** @param source* @param table* @return*/private static SingleOutputStreamOperator<String> filterTableData(DataStreamSource<String> source, String table) {return source.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String row) throws Exception {try {JSONObject rowJson = JSON.parseObject(row);JSONObject source = rowJson.getJSONObject("source");String tbl = source.getString("table");return table.equals(tbl);} catch (Exception ex) {ex.printStackTrace();return false;}}});}private static List<String> getTableList() {List<String> tables = new ArrayList<>();String sql = "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = '" + SYNC_DB + "'";List<JSONObject> tableList = JdbcUtil.executeQuery(MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWD, sql);for (JSONObject jsob : tableList) {String schemaName = jsob.getString("TABLE_SCHEMA");String tblName = jsob.getString("TABLE_NAME");String schemaTbl = schemaName + "." + tblName;if (SYNC_TABLES.contains(schemaTbl)) {tables.add(tblName);}}return tables;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/831898.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python学习笔记------json

json简介 JSON是一种轻量级的数据交互格式。可以按照JSON指定的格式去组织和封装数据 JSON本质上是一个带有特定格式的字符串 主要功能&#xff1a;json就是一种在各个编程语言中流通的数据格式&#xff0c;负责不同编程语言中的数据传递和交互 为了让不同的语言能够相互通…

【12575嵌入式系统】嵌入式系统考前复习题终结版期末复习自考复习

一、 填空题 1. 嵌入式系统是以应用为中心&#xff0c;以计算机技术为基础&#xff0c;软件硬件可裁剪&#xff0c;适应应用系 统 对 功 能 、 可 靠 性 、 成 本 、 体 积 、 功 耗 严 格 要 求 的 专 用 计 算 机 系统。 2. 请 列 举 2 个 嵌 入 式 实 时 操 作 系 统 &…

Python实战开发及案例分析(2)——单目标优化

在Python中&#xff0c;进行单目标优化主要涉及定义一个优化问题&#xff0c;包括一个目标函数和可能的约束条件&#xff0c;然后选择合适的算法来求解。Python提供了多种库&#xff0c;如SciPy、Pyomo、GEKKO等&#xff0c;用于处理各种优化问题。 案例分析&#xff1a;使用 …

《LTC与铁三角∶从线索到回款-人民邮电》关于铁三角不错的论述

《LTC与铁三角∶从线索到回款-人民邮电》一书中&#xff0c;关于铁三角不错的论述&#xff0c;收藏之&#xff1a;客户责任人的角色定义及核心价值 AR 的核心价值定位主要体现在三个方面&#xff1a;客户关系、 客户满意度、竞争对手 “ 压制 ” 。 维护客户关系&#x…

zabbix之system.run

Zabbix的system.run是一个主动模式的监控项,它允许用户在Zabbix服务器上执行自定义的命令或脚本,并获取执行结果作为监控数据。 system.run监控项的配置包括以下几个关键参数: 命令:要执行的命令或脚本。可以是任何可执行的命令,包括系统命令、脚本文件等。 参数:命令的…

百川2模型解读

简介 Baichuan 2是多语言大模型&#xff0c;目前开源了70亿和130亿参数规模的模型。在公开基准如MMLU、CMMLU、GSM8K和HumanEval上的评测&#xff0c;Baichuan 2达到或超过了其他同类开源模型&#xff0c;并在医学和法律等垂直领域表现优异。此外&#xff0c;官方还发布所有预…

[数据结构]————排序总结——插入排序(直接排序和希尔排序)—选择排序(选择排序和堆排序)-交换排序(冒泡排序和快速排序)—归并排序(归并排序)

文章涉及具体代码gitee&#xff1a; 登录 - Gitee.com 目录 1.插入排序 1.直接插入排序 总结 2.希尔排序 总结 2.选择排序 1.选择排序 ​编辑 总结 2.堆排序 总结 3.交换排序 1.冒泡排序 总结 2.快速排序 总结 4.归并排序 总结 5.总的分析总结 1.插入排…

Visual Studio 2022 工具 选项 没有网络设置问题解决

Visual Studio 2022 工具 选项 没有网络选项了&#xff0c;找了一大圈也没找到。 最后发现Visual Studio 2022的直接使用系统的代理设置了&#xff0c;在浏览器的代理中设置即可。 要使用扩展管理器安装插件&#xff0c;还不能设置pac !!! 顺便记录个pac地址&#xff1a; 1…

Unity---版本控制软件

13.3 版本控制——Git-1_哔哩哔哩_bilibili Git用的比较多 Git 常用Linux命令 pwd&#xff1a;显示当前所在路径 ls&#xff1a;显示当前路径下的所有文件 tab键自动补全 cd&#xff1a;切换路径 mkdir&#xff1a;在当前路径下创建一个文件夹 clear&#xff1a;清屏 vim…

QT, 加载qss文件应用样式

qt 应用中&#xff0c;加载qss文件主要是为了集中管理样式&#xff0c;使用QApplication::setStyleSheet(qssStr) 即可将样式应用到程序中&#xff0c;qss文件中&#xff0c;既可以定义全局样式&#xff0c;也可以针对某些类&#xff0c;甚至某个对象设置样式。关键代码如下&am…

考研就业选择规划【攻略】

考研就业选择规划【攻略】 前言版权推荐考研就业选择首先多问考研就业优势对比我的选择补充 最后 前言 2024-5-5 10:00:02 对于考研就业选择&#xff0c;我的一些看法。 这只是我现在的看法&#xff0c;我不认为现在的我能够看得多远&#xff0c;所以可能局限于现在。 一个…

Linux的socket详解

一、本机直接的进程通信方式 管道&#xff08;Pipes&#xff09;&#xff1a; 匿名管道&#xff08;Anonymous pipes&#xff09;&#xff1a;通常用于父子进程间的通信&#xff0c;它是单向的。命名管道&#xff08;Named pipes&#xff0c;也称FIFO&#xff09;&#xff1a;允…

微星主板安装双系统不能进入Ubuntu的解决办法

在微星主板的台式机上面依次安装了Windows11和Ubuntu22.04。在Ubuntu安装完成后重启&#xff0c;没有出现系统选择界面&#xff0c;直接进入了Windows11。怎么解决&#xff1f;方法如下&#xff1a; &#xff08;1&#xff09;正常安装Windows11 &#xff08;2&#xff09;安…

《自动机理论、语言和计算导论》阅读笔记:p352-P401

《自动机理论、语言和计算导论》学习第 12 天&#xff0c;p352-P401总结&#xff0c;总计 50 页。 一、技术总结 1.Turing Machine ™ 2.undecidability ​ a.Ld(the diagonalization language) 3.reduction p392, In general, if we have an algorithm to convert insta…

GpuMall的GPU算力资源池化技术有何优势?

GpuMall的GPU算力资源池化技术具有显著的优势&#xff0c;这些优势使得其在智算云领域脱颖而出&#xff0c;为用户提供了高效、灵活且可靠的GPU算力服务。以下是GpuMall GPU算力资源池化技术的主要优势&#xff1a; GpuMall智算云 | 省钱、好用、弹性。租GPU就上GpuMall,面向A…

面向未来:等保测评与持续改进

随着信息技术的不断进步和网络环境的日益复杂&#xff0c;网络安全正面临着前所未有的挑战。等保测评作为提升网络安全管理水平的重要手段&#xff0c;不仅需要应对当前的安全威胁&#xff0c;更应着眼于未来的安全需求。本文将探讨等保测评在持续改进中的角色和实施策略。 ##…

Git系列:config 配置

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

Java中的枚举类型介绍

一、背景及定义 情景&#xff1a; 枚举是在JDK1.5以后引入的。 主要用途是&#xff1a; 将一组常量组织起来&#xff0c;在这之前表示一组常量通常使用定义常量的方式&#xff1a; 这种定义方式实际上并不好。 例如&#xff1a;如果碰巧有另一个变量也是1&#xff0c;那么…

FIFO Generate IP核使用——AXI接口信号详解

在AXI协议中&#xff0c;提到的s_axis_tready、s_axi_awready、s_axi_wready、m_axi_bready、s_axi_arready和m_axi_rready是AXI接口中用于指示通道就绪状态的信号。这些信号的值通常表示主设备&#xff08;Master&#xff09;或从设备&#xff08;Slave&#xff09;是否准备好…

笔记85:如何计算递归算法的“时间复杂度”和空间复杂度?

先上公式&#xff1a; 递归算法的时间复杂度 递归次数 x 每次递归消耗的时间颗粒数递归算法的空间复杂度 递归深度 x 每次递归消耗的内存空间大小 注意&#xff1a; 时间复杂度指的是在执行这一段程序的时候&#xff0c;所花费的全部的时间&#xff0c;即时间的总和而空间复…