FlinkCDC基础篇章2-数据源 SqlServerCDC写入到ES中

接着 上期FlinkCDC基础篇章1-安装使用  

  1. 下载 Flink 1.17.0 并将其解压至目录 flink-1.17.0

  2. 下载下面列出的依赖包,并将它们放到目录 flink-1.17.0/lib/ 下:

    下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译

    • flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
    • flink-sql-connector-mysql-cdc-2.4.0.jar
    • flink-sql-connector-postgres-cdc-2.4.0.jar

首先,开启 checkpoint,每隔3秒做一次 checkpoint-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s
-- 创建源表t_source_sqlserver,使用SQL Server Change Data Capture (CDC)连接器从SQL Server数据库读取数据
CREATE TABLE t_source_sqlserver (id INT,order_date DATE,purchaser INT,quantity INT,product_id INT,PRIMARY KEY (id) NOT ENFORCED -- 主键定义(可选)
) WITH ('connector' = 'sqlserver-cdc',  -- 使用SQL Server CDC连接器'hostname' = '10.194.183.120',  -- SQL Server主机名'port' = '30027',               -- SQL Server端口'username' = 'sa',              -- SQL Server用户名'password' = 'abc@123456',      -- SQL Server密码'database-name' = 'cdc_test',   -- 数据库名称'schema-name' = 'dbo',          -- 模式名称'table-name' = 'orders'         -- 要捕获更改的表名
);-- 创建目标表table_sink_mysql,使用JDBC连接器将数据写入MySQL数据库
CREATE TABLE table_sink_mysql (id INT,order_date DATE,purchaser INT,quantity INT,product_id INT,PRIMARY KEY (id) NOT ENFORCED  -- 主键定义(可选)
)
WITH ('connector' = 'jdbc',                        -- 使用JDBC连接器'url' = 'jdbc:mysql://10.194.183.120:30025/test',  -- MySQL的JDBC URL'username' = 'root',                        -- MySQL用户名'password' = 'root',                        -- MySQL密码'table-name' = 'orders'                     -- 要写入的MySQL表名
);-- 从t_source_sqlserver表中选择数据,并将其插入到table_sink_mysql表中
INSERT INTO table_sink_mysql SELECT * FROM t_source_sqlserver;
CREATE TABLE income_distribution (serviceCode STRING,accountPeriod STRING,subjectCode STRING,subjectName STRING,amt DECIMAL(13,2),PRIMARY KEY (serviceCode, accountPeriod, subjectCode) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://xxxx:9200','index' = 'income_distribution','sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL');

可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:

参考文献:

使用 Flink CDC 构建 Streaming ETL | Apache Flink CDC

flink sqlserver cdc实时同步(含sqlserver安装配置等)_flink cdc sqlserver-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/833.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【分治】Leetcode 数组中的第K个最大元素

题目讲解 数组中的第K个最大元素 算法讲解 堆排序:1. 寻找最后一个节点的父亲,依次向上遍历,完成小堆的建立;2. 从最后一个元素开始,和堆顶的数据做交换,此时最小的数据在对后面,然后对剩下的…

部署轻量级Gitea替代GitLab进行版本控制(一)

Gitea 是一款使用 Golang 编写的可自运营的代码管理工具。 Gitea Official Website gitea: Gitea的首要目标是创建一个极易安装,运行非常快速,安装和使用体验良好的自建 Git 服务。我们采用Go作为后端语言,这使我们只要生成一个可执行程序即…

【React】Sigma.js框架网络图-入门篇

一、介绍 Sigma.js是一个专门用于图形绘制的JavaScript库。 它使在Web页面上发布网络变得容易,并允许开发人员将网络探索集成到丰富的Web应用程序中。 Sigma.js提供了许多内置功能,例如Canvas和WebGL渲染器或鼠标和触摸支持,以使用户在网页上…

Echarts-丝带图

Echarts-丝带图 demo地址 打开CodePen 什么是丝带图? 丝带图是Power BI中独有额可视化视觉对象,它的工具提示能展示指标当期与下期的数据以及排名。需求:使用丝带图展示"2022年点播订单表"不同月份不同点播套餐对应订单数据。 …

搭建HBase2.x完全分布式集群(CentOS 9 + Hadoop3.x)

Apache HBase™是一个分布式、可扩展、大数据存储的Hadoop数据库。 当我们需要对大数据进行随机、实时的读/写访问时,可以使用HBase。这个项目的目标是在通用硬件集群上托管非常大的表——数十亿行X数百万列。Apache HBase是一个开源、分布式、版本化的非关系数据库…

Ceph学习 -11.块存储RBD接口

文章目录 RBD接口1.基础知识1.1 基础知识1.2 简单实践1.3 小结 2.镜像管理2.1 基础知识2.2 简单实践2.3 小结 3.镜像实践3.1 基础知识3.2 简单实践3.3 小结 4.容量管理4.1 基础知识4.2 简单实践4.3 小结 5.快照管理5.1 基础知识5.2 简单实践5.3 小结 6.快照分层6.1 基础知识6.2…

微信小程序使用 Vant Weapp 中 Collapse 折叠面板 的问题!

需求:结合Tab 标签页 和 Collapse 折叠面板 组合成显示课本和章节内容,并且用户体验要好点! 如下图展示: 问题:如何使用Collapse 折叠面板 将内容循环展示出来? js中的数据是这样的 代码实现&#xff1…

Python | Leetcode Python题解之第39题组合总和

题目&#xff1a; 题解&#xff1a; from typing import Listclass Solution:def combinationSum(self, candidates: List[int], target: int) -> List[List[int]]:def dfs(candidates, begin, size, path, res, target):if target < 0:returnif target 0:res.append(p…

Stability AI 发布 SD3 API:开启人工智能新篇章

文章目录 1.Stable Diffusion 3 API开放了! 2.Stability AI Document地址3.获取API Key4.API方式调用SD3出图接口地址接口请求规范接口请求响应结果 5.Stable Diffusion 3.0、Stable Image Core、Fooocus 2.3.1、MidJounery效果查看 1.Stable Diffusion 3 API开放了! Stabilit…

react 响应式栅格布局

遇到一个小问题 , 有很多的下拉框放在了一行的盒子里 用到了栅格思路 , 但响应式处理屏幕时候右侧的按钮会覆盖掉样式 之前我的思路是子绝父相 , 将按钮定在最右侧 , 按钮和下拉框都在同一盒子中 , 且做了栅格处理没想到还是会覆盖解决 : 后来我用到了 margin-left: auto 来让…

蓝桥杯2024年第十五届省赛真题-宝石组合

思路&#xff1a;参考博客&#xff0c;对Ha,Hb,Hc分别进行质因数分解会发现&#xff0c;S其实就等于Ha&#xff0c;Hb&#xff0c;Hc的最大公约数&#xff0c;不严谨推导过程如下&#xff08;字丑勿喷&#xff09;&#xff1a; 找到此规律后&#xff0c;也不能枚举Ha&#xff…

OpenCV——Niblack局部阈值二值化方法

目录 一、Niblack算法1、算法概述2、参考文献二、代码实现三、结果展示OpenCV——Niblack局部阈值二值化方法由CSDN点云侠原创,爬虫自重。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫。 一、Niblack算法 1、算法概述 Niblack 算法是一种典型的局部阈值…

JavaWeb开发05-事务管理-AOP概述-AOP通知类型-通知顺序-切入点表达式-连接点-案例

一、事务管理 1.事务回顾 2.Spring事务管理 删除部门和删除部门下的员工应该绑定在一起&#xff0c;但是如果这两个操作之间出现错误&#xff0c;就会执行删除部门不删除员工&#xff0c;所以需要使用事务将两个任务绑定在一起&#xff0c;要么一起成功要么一起失败 Spring中如…

基础知识集合

https://blog.csdn.net/sheng_q/category_10901984.html?spm1001.2014.3001.5482 epoll 事件驱动的I/O模型&#xff0c;同时处理大量的文件描述符 内核与用户空间共享一个事件表&#xff1a;监控的文件描述符以它们的状态&#xff0c;当状态变化&#xff0c;内核将事件通知给…

Mac上Maven的安装和环境变量配置保姆级教程(最新版实时更新)

目录 一、Maven的安装 1.进入官网&#xff08;Maven官网&#xff09;下载安装包并解压 2.这里我使用了Homebrew安装Maven 安装Homebrew&#xff1a; 安装Maven&#xff1a; 二、Maven配置环境变量 1.打开环境变量文档&#xff1a; 2.在弹出文档结尾加入配置&#xff1a…

【Node.js】 fs模块全解析

&#x1f525;【Node.js】 fs模块全解析 &#x1f4e2; 引言 在Node.js开发中&#xff0c;fs模块犹如一把万能钥匙&#xff0c;解锁着整个文件系统的操作。从读取文件、写入文件、检查状态到目录管理&#xff0c;无所不能。接下来&#xff0c;我们将逐一揭开fs模块中最常用的那…

【HC32L110】华大低功耗单片机启动文件详解

本文主要记录华大低功耗单片机 HC32L110 的 汇编启动过程&#xff0c;包括startup_hc32l110启动文件详细注释 目录 1.启动文件的作用2.堆栈定义2.1 栈2.2堆 3.向量表4.复位程序5.中断服务程序6.堆栈初始化启动过程详解7.1从0地址开始7.2在Reset_Handler中干了啥&#xff1f; 8.…

PyTorch|保存及加载模型、nn.Sequential、ModuleList和ModuleDict

系列文章目录 PyTorch|Dataset与DataLoader使用、构建自定义数据集 PyTorch|搭建分类网络实例、nn.Module源码学习 pytorch|autograd使用、训练模型 文章目录 系列文章目录一、保存及加载模型&#xff08;一&#xff09;保存及加载模型的权重&#xff08;二&#xff09;保存及…

探究欧拉恒等式的美学与数学威力

正如老子所述&#xff0c;“道生一&#xff0c;一生二&#xff0c;二生三&#xff0c;三生万物”&#xff0c;数学作为人类认知自然法则的语言&#xff0c;其数系的不断发展象征着对世界理解的深化。从自然数经由分数、无理数至复数&#xff0c;复数虽看似反直觉&#xff0c;却…

MATLAB实现蚁群算法优化柔性车间调度(ACO-fjsp)

蚁群算法优化车间调度的步骤可以分为以下几个主要阶段&#xff1a; 1.初始化阶段&#xff1a; 设置算法参数&#xff0c;如信息素浓度、启发式因子等。这些参数将影响蚂蚁在选择路径时的决策过程。 确定车间调度的具体问题规模&#xff0c;包括工件数量、机器数量以及每个工件…