一文说清flink从编码到部署上线

引言:目前flink的文章比较多,但一般都关注某一特定方面,很少有一个文章,从一个简单的例子入手,说清楚从编码、构建、部署全流程是怎么样的。所以编写本文,自己做个记录备查同时跟大家分享一下。本文以简单的mysql cdc为例展开说明。
环境说明:MySQL:5.7;flink:1.14.0;hadoop:3.0.0;操作系统:CentOS 7.6;JDK:1.8.0_401。

1.MySQL

1.1 创建数据库和测试数据

数据库脚本:

CREATE DATABASE `flinktest`;
USE `flinktest`;
CREATE TABLE `products` (`id` int(11) NOT NULL AUTO_INCREMENT,`name` varchar(255) NOT NULL,`description` varchar(512) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8mb4;insert  into `products`(`id`,`name`,`description`) values 
(1,'aaa','aaaa'),
(2,'ccc','ccc'),
(3,'dd','ddd'),
(4,'eeee','eee'),
(5,'ffff','ffff'),
(6,'hhhh','hhhh'),
(7,'iiii','iiii'),
(8,'jjjj','jjjj');

账号使用root就行。

1.2 开启binlog

参考:https://core815.blog.csdn.net/article/details/144233298
踩坑:测试过程中发现mysql 9.0一直无法获取更新的数据,最终使用的5.7。

2.编码

2.1 主要实现

package com.zl;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;import java.util.Arrays;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;import static com.mysql.cj.conf.PropertyKey.useSSL;public class MysqlExample {public static void main(String[] args) throws Exception {List<String> SYNC_TABLES = Arrays.asList("flinktest.products");MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("10.86.37.169").port(3306).databaseList("flinktest").tableList(String.join(",", SYNC_TABLES)).username("root").password("pwd").startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).build();/// 配置flink访问页面-开始/* Configuration config = new Configuration();// 启用 Web UI,访问地址【http://ip:port】config.setBoolean("web.ui.enabled", true); config.setString(RestOptions.BIND_PORT,"8081");
//        这个使用jar直接运行可以,如果提交给yarn会报错,需要改为getExecutionEnvironment()StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);*////配置flink访问页面-结束StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);/// 设置CK存储-开始(不需要可注释掉)// hadoop部署见:https://core815.blog.csdn.net/article/details/144022938// hdfs访问地址见:/home/hadoop-3.3.3/etc/hadoop/core-site.xmlenv.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000"+"/flinktest/");env.getCheckpointConfig().setCheckpointInterval(3000);/// 设置CK存储-结束// 如果不能正常读取mysql的binlog://①可能是mysql没有打开binlog或者mysql版本不支持(当前在mysql5.7.20环境下,功能正常);// ②可能是数据库ip、port、账号、密码错误。env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").setParallelism(1).print();env.execute("Print MySQL Snapshot + Binlog");}}

2.2 依赖

<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zl.flinkcdc</groupId><artifactId>FlickCDC</artifactId><packaging>jar</packaging><version>1.0-SNAPSHOT</version><name>FlickCDC</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink-version>1.14.0</flink-version><flink-cdc-version>2.4.0</flink-cdc-version><hadoop.version>3.0.0</hadoop.version><slf4j.version>1.7.25</slf4j.version><log4j.version>2.16.0</log4j.version></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink-version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink-cdc-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-guava</artifactId><version>30.1.1-jre-15.0</version></dependency><!--<dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-guava</artifactId><version>18.0-13.0</version></dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>${flink-version}</version></dependency><!--        hadoop相关依赖--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version><scope>provided</scope><exclusions><exclusion><artifactId>commons-cli</artifactId><groupId>commons-cli</groupId></exclusion><exclusion><artifactId>commons-compress</artifactId><groupId>org.apache.commons</groupId></exclusion><exclusion><artifactId>guava</artifactId><groupId>com.google.guava</groupId></exclusion><exclusion><artifactId>jackson-annotations</artifactId><groupId>com.fasterxml.jackson.core</groupId></exclusion><exclusion><artifactId>jackson-core</artifactId><groupId>com.fasterxml.jackson.core</groupId></exclusion><exclusion><artifactId>jackson-databind</artifactId><groupId>com.fasterxml.jackson.core</groupId></exclusion><exclusion><artifactId>slf4j-api</artifactId><groupId>org.slf4j</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version><scope>provided</scope><exclusions><exclusion><artifactId>asm</artifactId><groupId>org.ow2.asm</groupId></exclusion><exclusion><artifactId>avro</artifactId><groupId>org.apache.avro</groupId></exclusion><exclusion><artifactId>commons-cli</artifactId><groupId>commons-cli</groupId></exclusion><exclusion><artifactId>commons-codec</artifactId><groupId>commons-codec</groupId></exclusion><exclusion><artifactId>commons-compress</artifactId><groupId>org.apache.commons</groupId></exclusion><exclusion><artifactId>commons-io</artifactId><groupId>commons-io</groupId></exclusion><exclusion><artifactId>commons-lang3</artifactId><groupId>org.apache.commons</groupId></exclusion><exclusion><artifactId>commons-logging</artifactId><groupId>commons-logging</groupId></exclusion><exclusion><artifactId>commons-math3</artifactId><groupId>org.apache.commons</groupId></exclusion><exclusion><artifactId>guava</artifactId><groupId>com.google.guava</groupId></exclusion><exclusion><artifactId>jackson-databind</artifactId><groupId>com.fasterxml.jackson.core</groupId></exclusion><exclusion><artifactId>jaxb-api</artifactId><groupId>javax.xml.bind</groupId></exclusion><exclusion><artifactId>log4j</artifactId><groupId>log4j</groupId></exclusion><exclusion><artifactId>nimbus-jose-jwt</artifactId><groupId>com.nimbusds</groupId></exclusion><exclusion><artifactId>slf4j-api</artifactId><groupId>org.slf4j</groupId></exclusion><exclusion><artifactId>slf4j-log4j12</artifactId><groupId>org.slf4j</groupId></exclusion><exclusion><artifactId>zookeeper</artifactId><groupId>org.apache.zookeeper</groupId></exclusion><exclusion><artifactId>jsr305</artifactId><groupId>com.google.code.findbugs</groupId></exclusion><exclusion><artifactId>gson</artifactId><groupId>com.google.code.gson</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop.version}</version><scope>provided</scope><exclusions><exclusion><artifactId>commons-cli</artifactId><groupId>commons-cli</groupId></exclusion><exclusion><artifactId>guava</artifactId><groupId>com.google.guava</groupId></exclusion><exclusion><artifactId>jackson-databind</artifactId><groupId>com.fasterxml.jackson.core</groupId></exclusion></exclusions></dependency><dependency><groupId>commons-cli</groupId><artifactId>commons-cli</artifactId><version>1.5.0</version></dependency><!--mvn install:install-file -Dfile=D:/maven/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar-DgroupId=org.apache.flink -DartifactId=flink-shaded-hadoop-3 -Dversion=3.1.1.7.2.9.0-173-9.0 -Dpackaging=jar--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-3</artifactId><version>3.1.1.7.2.9.0-173-9.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><addClasspath>true</addClasspath><mainClass>com.zl.MysqlExample</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>

完整代码见:https://gitee.com/core815/flink-cdc-mysql

3.打包

mvn版本:3.5.4。
到pom.xml所在路径,执行“mvn package”
在这里插入图片描述
打包效果:
在这里插入图片描述

4.jar直接运行

java -jar FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

在这里插入图片描述

5.flink yarn运行

hadoop、flink、yarn环境见:https://core815.blog.csdn.net/article/details/144022938

把FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar放到“/home”路径下。

执行下面命令:

flink run-application -t yarn-application -Dparallelism.default=1 -Denv.java.opts=" -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" -Dtaskmanager.memory.process.size=1g -Dyarn.application.name="FlinkCdcMysql"  -Dtaskmanager.numberOfTaskSlots=1 -c com.zl.MysqlExample /home/FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

控制台看到如下打印:
在这里插入图片描述
yarn管理页面:
在这里插入图片描述
运行日志查看步骤:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
下面即可看到完整日志:
在这里插入图片描述

6.常见问题

6.1 问题1

日志错误:
The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.

解决:
修改my.cnf文件。
[mysqld]
default-time-zone=‘Asia/Shanghai’
重启MySQL服务。

6.2 问题2:hdfs

日志错误:
Permission denied: user=PC2023, access=WRITE, inode=“/”:root:supergroup:drwxr-xr-x

解决:

临时解决
hadoop fs -chmod -R 777 /

6.3 问题3:guava30 guava18冲突

分析:
flink 1.13 cdc2.3的组合容易出这个问题。

解决:
参考:https://developer.aliyun.com/ask/574901
flink 使用1.14.0版本;cdc使用2.4.0版本。

6.4 问题4

日志错误:
/user/root/.flink/application_1733492706887_0002/log4j.properties could only be written to 0 of the 1 minReplication nodes

解决:
https://www.pianshen.com/article/1554707968/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/63739.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在VMWare上安装openEuler 22.03-LTS

文章目录 1. openEluer 22.03-LTS概述2. 安装openEluer 22.03-LTS2.1 安装配置虚拟机2.2 安装openEuler2.3 配置虚拟机静态IP地址 3. 使用yum和dnf3.1 使用dnf安装软件包3.2 使用dnf卸载软件包3.3 使用yum安装软件包3.4 使用yum卸载软件包 4. 利用FinalShell连接Server015. Bas…

短信验证码burp姿势

首先声明&#xff0c;本文仅仅作为学习使用&#xff0c;因个人原因导致的后果&#xff0c;皆有个人承担&#xff0c;本人没有任何责任。 在之前的burp学习中&#xff0c;我们学习了图片验证码的突破&#xff0c;但是现实中还有很多短信验证码&#xff0c;在此我介绍几种短信验…

【软件测试面试题】测试理论/基础面试(持续更新)

Hi&#xff0c;大家好。最近很多朋友都在说今年的互联网行情不好&#xff0c;面试很难&#xff0c;不知道怎么复习&#xff0c;我最近总结了一份在软件测试面试中比较常见的测试理论/基础面试面试题合集&#xff0c;希望对大家有帮助。建议点赞收藏再阅读&#xff0c;防止丢失&…

在 Windows WSL 上部署 Ollama 和大语言模型:从镜像冗余问题看 Docker 最佳实践20241208

&#x1f6e0;️ 在 Windows WSL 上部署 Ollama 和大语言模型&#xff1a;从镜像冗余问题看 Docker 最佳实践 ⭐ 引言 随着大语言模型&#xff08;LLM&#xff09;和人工智能技术的迅猛发展&#xff0c;开发者们越来越多地尝试在本地环境中部署模型进行实验。 但部署过程中常…

ContOS7安装完成后的一系列问题

问题1&#xff1a;constos7,安装完后&#xff0c;无法使用yum,导致无法安装任何东西&#xff0c; 解决&#xff1a;原因是国外的镜像地址有问题&#xff0c;改为国内的地址。 查看yum的状态 yum repolist 2.执行如下命令&#xff0c;更换yum源 bash <(curl -sSL https://…

使用JavaScrip和HTML搭建一个简单的博客网站系统

搭建一个简单的博客网站系统&#xff0c;我们需要创建几个基本的页面和功能&#xff1a;登录、注册、文章发布等。这里我们先实现一个基础版本&#xff0c;包括用户登录、注册以及文章发布的功能。由于这是一个简化版的示例&#xff0c;我们将所有逻辑集成在一个HTML文件中&…

uniapp扭蛋机组件

做了一个uniapp的扭蛋机组件&#xff0c;可以前往下载地址下载 仅测试了vue2、3、h5页面微信小程序&#xff0c;理论支持全平台 使用方法简单&#xff0c;具有待机动效、抽奖中动效、掉落奖品动效&#xff0c;可以替换奖品图片&#xff0c;足以满足大部分抽奖页面需求。 示例图…

C#实现一个HttpClient集成通义千问-开发前准备

集成一个在线大模型&#xff08;如通义千问&#xff09;&#xff0c;来开发一个chat对话类型的ai应用&#xff0c;我需要先了解OpenAI的API文档&#xff0c;请求和返回的参数都是以相关接口文档的标准进行的 相关文档 OpenAI API文档 https://platform.openai.com/docs/api-…

用JavaScript实现一个贪吃蛇游戏

原理如下&#xff0c;贪吃蛇的蛇身就是一个数组&#xff0c;数组中的每个元素都是一个坐标&#xff0c;蛇身每次移动时都会在数组前插入一个新坐标&#xff0c;并在数组尾部删掉一条记录&#xff0c;吃到食物后数组的尾部记录就不删。如果移到屏幕边缘会从屏幕的另一边出现。好…

红日靶场vulnstack 4靶机的测试报告[细节](一)

目录 一、测试环境 1、系统环境 2、注意事项 3、使用工具/软件 二、测试目的 三、操作过程 1、信息搜集 2、漏洞利用Getshell ①Struts 2 s2-045漏洞 手工利用s2-45漏洞 Msf综合利用 ②Tomcat框架(CVE-2017-12615) ③phpMyAdmin(CVE-2018-12613) 构造语句写入冰蝎木…

electron 打包 webview 嵌入需要调用电脑摄像头拍摄失败问题

electron 打包 webview 嵌入需要调用电脑摄像头拍摄失败问题 这篇文章是接我cocos专栏的上一篇文章继续写的&#xff0c;我上一篇文章写的是 cocos 开发触摸屏项目&#xff0c;需要嵌入一个网页用来展示&#xff0c;最后通过 electron 打包成 exe 程序&#xff0c;而且网页里面…

Android 开发者选项-模拟辅助显示设备

目录 概述使用开关的代码实现方式系统部分的处理:参考 概述 在Android开发中&#xff0c;模拟辅助显示设备通常指的是通过Android开发者选项来设置的一种虚拟显示设备&#xff0c;它允许开发者在一个设备上模拟另一个设备的显示特性。这种功能对于测试应用程序在不同屏幕尺寸、…

[COLM 2024] V-STaR: Training Verifiers for Self-Taught Reasoners

本文是对 STaR 的改进方法&#xff0c;COLM 是 Conference On Language Models&#xff0c;大模型领域新出的会议&#xff0c;在国际上很知名&#xff0c;不过目前还没有被列入 ccf list&#xff08;新会议一般不会列入&#xff09;&#xff1b;作者来自高校、微软研究院和 Goo…

Spann3R:基于DUSt3R的密集捕获数据增量式重建方法

来自作者Hengyi Wang在b站3D视觉工坊中对于该论文透彻的讲解&#xff0c;这里是相关重要部分的截屏。这篇博客的用途主要是自己做记录&#xff0c;其次分享给感兴趣的同学&#xff0c;最后谢谢作者大佬的认真讲解。 作者是按照这样的次序来介绍的&#xff1a; 首先从传统的三…

SAP-ABAP开发学习-面向对象OOALV(1)

本文目录 一、概述 面向对象开发特点 二、类与对象 程序中类的创建 Class构成要素 对象 方法 一、概述 随着SAP R/3 4.0版本的开发&#xff0c;ABAP语言开始引入了面向对象的开发概念。这在ABAP语言的发展过程中&#xff0c;面向对象&#xff08;Object-oriented&#…

【实用技能】如何在 .NET C# 中的邮件合并过程中操作表格单元格

TX Text Control 中的邮件合并 类是一个强大的库&#xff0c;旨在通过将数据合并到模板中来自动创建文档。它充当结构化数据&#xff08;例如来自数据库、JSON 或 XML&#xff09;和动态文档生成之间的桥梁&#xff0c;对于需要自动化文档工作流程的应用程序来说非常有用。 TX…

有源模拟滤波器的快速设计

本文章是笔者整理的备忘笔记。希望在帮助自己温习避免遗忘的同时&#xff0c;也能帮助其他需要参考的朋友。如有谬误&#xff0c;欢迎大家进行指正。 一、概述 几乎所有电子电路中都能看到有源模拟滤波器的身影。音频系统使用滤波器进行频带限制和平衡。通信系统设计使用滤波…

使用OpenTK展示3D点云图像(C#)

最近在研究3D显示&#xff0c;找到一款在winform上展示3D点云的控件&#xff0c;并且实现了点线面的展示&#xff0c;及光照渲染纹理贴图等功能&#xff0c;如下面几张图所展示。 一些基础知识可以在LearnOpenTK - OpenTK 这个网站上学习到。 我这边使用的是openTK3.3.3版本&a…

【笔记】架构上篇Day6 法则四:为什么要顺应技术的生命周期?

法则四&#xff1a;为什么要顺应技术的生命周期&#xff1f; 简介&#xff1a;包含模块一 架构师的六大生存法则-法则四&#xff1a;为什么要顺应技术的生命周期&#xff1f;&法则四&#xff1a;架构设计中怎么判断和利用技术趋势&#xff1f; 2024-08-29 17:30:07 你好&am…

【Sentinel Go】新手指南、流量控制、熔断降级和并发隔离控制

随着微服务的流行&#xff0c;服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件&#xff0c;主要以流量为切入点&#xff0c;从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开…