wsl内置Ubuntu使用 Dinky 与 Flink 集成

Dinky 与 Flink 集成

说明

本文档介绍 Dinky 与 Flink 集成的使用方法,

如果您是 Dinky 的新用户, 请先阅读 本文档, 以便更好的搭建 Dinky 环境

如果您已经熟悉 Dinky 并已经部署了 Dinky, 请跳过本文档的前置要求部分, 直接阅读 Dinky 与 Flink 集成部分

注意: 本文档基于 Dinky 1.0.0+ 版本编写, 请确保 Dinky 版本 >= 1.0.0

前置要求

  • JDK 1.8/11
  • Dinky 1.0.0+
  • MySQL 5.7+
  • Flink 1.14+(Dinky v1.0.0 支持 Flink 1.14+ 及以上版本)

Flink 环境准备

本案例以 Flink 1.18.0 模式采用 Standalone 模式为例, 请根据实际情况自行选择部署模式,各个模式的部署方案自行参考 Flink 官方文档/百度/谷歌/必应…

下载 Flink

wget https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz

解压 Flink

tar -zxvf flink-1.18.0-bin-scala_2.12.tgzmv flink-1.18.0 flink

配置环境变量

vim ~/.bashrc# 末尾加入以下内容
export FLINK_HOME=/opt/flink
export PATH=$PATH:$FLINK_HOME/bin# 使环境变量生效
source ~/.bashrc

启动 Flink

cd flink./bin/start-cluster.sh

验证页面访问: http://ip:8081

我使用的是wsl的ubuntu系统。

wsl端口号

http://172.26.217.146:8081

172.26.217.146:8081

需要修改flink的conf bind.adress的配置,为0.0.0.0,才可以从外部服务访问

http://172.26.217.146:8081/#/overview

Dinky 环境准备

下载 Dinky

wget https://github.com/DataLinkDC/dinky/releases/download/v1.0.0-rc2/dinky-release-1.0.0-rc2.tar.gz

解压 Dinky

tar -zxvf dinky-release-1.0.0-rc2.tar.gzmv dinky-release-1.0.0-rc2 dinky

配置数据库

本案例以 MySQL 为例, 支持 MySQL 5.7+, PostgreSQL , 或者直接使用内置 H2 数据库,请根据实际情况自行选择数据库,各个数据库的部署方案自行参考官方文档/百度/谷歌/必应…

如果选择使用 H2 数据库, 请跳过本节 ,MySQL 安装步骤在这里不再赘述, 请自行百度/谷歌/必应…

安装完成之后 创建 dinky 数据库, 并设置账户密码,也可使用默认账户密码, 创建完成数据库之后需要执行初始化 sql 脚本, 脚本路径为 dinky/sql/dinky-mysql.sql

修改 conf/application-mysql.yml 文件, 修改数据库连接信息

spring:datasource:url: jdbc:mysql://${MYSQL_ADDR:127.0.0.1:3306}/${MYSQL_DATABASE:dinky}?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=trueusername: ${MYSQL_USERNAME:dinky}password: ${MYSQL_PASSWORD:dinky}driver-class-name: com.mysql.cj.jdbc.Driver

修改 conf/application.yml 文件, 修改数据库连接使用方式

spring:# Dinky application nameapplication:name: Dinkyprofiles:# The h2 database is used by default. If you need to use other databases, please set the configuration active to: mysql, currently supports [mysql, pgsql, h2]# If you use mysql database, please configure mysql database connection information in application-mysql.yml# If you use pgsql database, please configure pgsql database connection information in application-pgsql.yml# If you use the h2 database, please configure the h2 database connection information in application-h2.yml,# note: the h2 database is only for experience use, and the related data that has been created cannot be migrated, please use it with cautionactive: mysql #[h2,mysql,pgsql]  修改此处,默认为 h2, 修改为 mysqlinclude: jmx

Dinky 与 Flink 集成

注意

修改完数据库连接配置不要着急启动,接下来 Dinky 与 Flink 集成

  1. 将 Flink 的 lib 目录下的 所有 jar 包复制到 Dinky 的 extends 目录下
cp -r /opt/flink/lib/* /opt/dinky/extends/
  1. 添加/修改一些额外的依赖
cd /opt/dinky/extends/
# 添加 common-cli 依赖, 否则会出现异常
wget https://repo1.maven.org/maven2/commons-cli/commons-cli/1.6.0/commons-cli-1.6.0.jar# 为什么要下载这个 jar 包, 因为内部有些冲突的已经被删除掉了, 
# 注意: 如果无需 hadoop 环境, 可以不下载这个 jar 包, 但是如果需要 hadoop 环境, 必须下载这个 jar 包
# 下载 Dinky 群公告 内的 flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar# 将 flink-table-planner-loader 包换成 flink-table-planner 包
# 先删除 flink-table-planner-loader 包
rm -rf /opt/dinky/extends/flink-table-planner-loader-1.18.0.jar
rm -rf /opt/flink/lib/flink-table-planner-loader-1.18.0.jar
# 将 flink-table-planner 包复制到 extends 目录下和 flink lib 目录下
cp /opt/flink/opt/flink-table-planner_2.12-1.18.0.jar /opt/flink/lib/
cp /opt/flink/opt/flink-table-planner_2.12-1.18.0.jar /opt/dinky/extends/

注意

以上依赖修改完成之后, 需要重启 Flink

启动 Dinky

注意

当你阅读到这里的时候, 请确保你已经完成了上述的所有步骤, Flink 已经启动, 并可以正常访问. 请先不要添加其他连接器依赖

下述的命令中 1.18 代表 Flink 版本, 请根据实际情况修改 支持 1.14 , 1.15, 1.16 , 1.17 , 1.18,

通过指定版本使 Dinky 加载对应版本的 Flink 依赖, 以便 Dinky 能够正常与 Flink 集成。

cd dinky
# 1.18 代表 Flink 版本, 请根据实际情况修改 支持 1.14 , 1.15, 1.16 , 1.17 , 1.18, 
./auto.sh start 1.18

验证

页面访问: http://ip:8888 正常访问至登录页面, 证明 Dinky 已经启动成功, 请使用默认账户密码登录, 默认账户密码为 admin/admin

http://172.26.217.146:8888/

重点:启动失败,日志可以看到。没有mysql的jar包

无法访问 需要去mysql官网下载jar包 mysql

mysql-connector-j-8.3.0.jar

https://dev.mysql.com/downloads/connector/j/

将这个也放到 extends里面

/opt/dinky/extends/

然后重新启动

./auto.sh stop
./auto.sh start 1.18

Datagen 任务 Demo

创建 Datagen 任务

  1. 进入 数据开发 -> 项目 -> 新建根目录 名称自行定义
  2. 右键 新建作业 -> 类型选择FlinkSQL -> 输入名称 -> 输入描述(可选) 点击完成
  3. 在编辑器中输入如下代码
# checkpoint 配置 自行根据实际情况修改, 以下为示例
set execution.checkpointing.checkpoints-after-tasks-finish.enabled=true;
SET pipeline.operator-chaining=false;
set state.backend.type=rocksdb;
set execution.checkpointing.interval=8000;
set state.checkpoints.num-retained=10;
set cluster.evenly-spread-out-slots=true;DROP TABLE IF EXISTS source_table3;
CREATE TABLE IF NOT EXISTSsource_table3 (`order_id` BIGINT,`product` BIGINT,`amount` BIGINT,`order_time` as CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)), WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND)
WITH('connector' = 'datagen','rows-per-second' = '1','fields.order_id.min' = '1','fields.order_id.max' = '2','fields.amount.min' = '1','fields.amount.max' = '10','fields.product.min' = '1','fields.product.max' = '2');DROP TABLE IF EXISTS sink_table5;CREATE TABLE IF NOT EXISTSsink_table5 (`product` BIGINT,`amount` BIGINT,`order_time` TIMESTAMP(3),`one_minute_sum` BIGINT)
WITH('connector' = 'print');INSERT INTOsink_table5
SELECTproduct,amount,order_time,SUM(amount) OVER (PARTITION BYproductORDER BYorder_timeRANGE BETWEEN INTERVAL '1' MINUTE PRECEDINGAND CURRENT ROW) as one_minute_sum
FROMsource_table3;
  1. 配置右侧 任务配置 ,请根据实际情况填写,如对参数不了解, 请鼠标悬浮至表单的每项 label 右侧的 ? 查看帮助信息
  2. 点击保存按钮/ctrl+s 保存任务
  3. 可自行点击 DAG/检查 等按钮查看任务的 DAG 图和检查该作业的语法是否正确
  4. 以上完成之后,点击运行按钮, 等待任务运行完成, 可以在 运维中心 中查看任务的运行状态/直接点击运行按钮左侧的运维按钮,即可跳转至运维中心该任务的详情页面查看运行状态,如下图:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/688925.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MinGW编译OpenCV4.5(64位/32位通用,附编译完成包下载)

MinGW编译OpenCV4.5(64位/32位通用,附编译完成包下载) MinGW编译OpenCV4.5

MySQL高级特性篇(3)-全文检索的实现与优化

MySQL数据库全文检索是指对数据库中的文本字段进行高效地搜索和匹配。在MySQL数据库中,可以使用全文检索来实现快速的文本搜索功能,并且可以通过一些优化手段提高全文检索的性能。 一、MySQL全文检索的基本概念 全文检索是一种将关键字搜索与自然语言处…

JAVA面试框架篇

1. Spring refresh 流程 要求 掌握 refresh 的 12 个步骤 Spring refresh 概述 refresh 是 AbstractApplicationContext 中的一个方法,负责初始化 ApplicationContext 容器,容器必须调用 refresh 才能正常工作。它的内部主要会调用 12 个方法&#x…

单主模式和多主模式切换

1 组复制模式切换注意点 组复制有两种运行模式,一种是单主模式,一种是多主模式。这个模式是在整个组中设置的,由 group_replication_single_primary_mode 这个系统变量指定,而且在所有成员上必须保持一致。ON 表示单主模式&#…

禁止电子邮箱地址登录WordPress后台的插件No Login by Email Address

WordPress 4.5及之后的版本增加了使用注册用户的电子邮件地址代替用户名登录的功能,但是大多数个人站长的管理员邮箱地址都是固定,而且到其他站点进行评论留言也是同一个邮箱地址,很容易给一些别有用心的可乘之机,所以禁止WordPre…

(AtCoder Beginner Contest 341)(A - D)

比赛地址 : Tasks - Toyota Programming Contest 2024#2&#xff08;AtCoder Beginner Contest 341&#xff09; A . Print 341 模拟就好了 &#xff0c; 先放一个 1 , 然后放 n 个 01 ; #include<bits/stdc.h> #define IOS ios::sync_with_stdio(0);cin.tie(0);cout…

【Effective Objective - C 2.0】——读书笔记(五)

文章目录 二十九、理解引用计数三十、以ARC简化引用计数三十一、在dealloc方法中只释放引用并解除监听三十二、编写异常安全代码时留意内存管理问题三十三、以弱引用避免保留环三十四、以”自动释放池块“降低内存峰值三十五、用"僵尸对象"调试内存管理问题三十六、不…

C++知识点总结(15):选择排序、插入排序

文章目录 一、选择排序1. 概念2. 伪代码3. 程序4. 例题第k大的数 二、元素插入1. 伪代码2. 程序 三、插入排序1. 概念2. 伪代码3. 程序4. 例题洛谷 P1152 四、分析 一、选择排序 1. 概念 下标12345最小值原始43521/第一次135241第二次125342第三次123543第四次123454完成1234…

C++ 调用js 脚本

需求&#xff1a; 使用Qt/C 调用js 脚本。Qt 调用lua 脚本性能应该是最快的&#xff0c;但是需要引入第三方库&#xff0c;虽然也不是特别麻烦&#xff0c;但是调用js脚本&#xff0c;确实内置的功能&#xff08;C 调用lua 脚本-CSDN博客&#xff09; 步骤&#xff1a; 1&…

解决elementUI固定列后,下方多了一条横线的问题

最近遇到一个bug,如下图,el-table的操作列使用fixed属性固定后,下方多了一条横线: 我们将样式设置高优先,以覆盖内联样式,如下是less里使用穿透样式解决的办法: <style lang="less" scoped> /deep/ .el-table__fixed-right {height: 100

go内置库函数实现client与server数据的发送接收

功能&#xff1a;客户端持续写入数据&#xff0c;直到输入exit退出&#xff0c;服务端读取数据并打印 注意&#xff1a;server和client目录在同一层级 服务端 server/main package mainimport ("fmt""net" )func main() {listen, err : net.Listen(&quo…

【牛客 NC253455】小红走排列 题解(链表+位集合+贪心算法)

题目描述 对于一个排列&#xff0c;小红定义该排列的总消耗为&#xff1a;1走到2&#xff0c;2走到3&#xff0c;……&#xff0c;最终从 n − 1 n-1 n−1走到 n n n所需的最少的总步数。其中&#xff0c;每一步可以向左走一步&#xff0c;也可以向右走一步。 现在&#xff0…

Kubernetes基础(二十一)-k8s的服务发现机制

1 概述 Kubernetes&#xff08;K8s&#xff09;是一个强大的容器编排平台&#xff0c;提供了丰富的功能来简化容器化应用的管理。其中之一重要的特性就是服务发现机制&#xff0c;它使得应用程序能够在K8s集群中动态地发现和访问其他服务。本文将深入研究K8s中的服务发现机制&…

Java基础知识学习:深入理解Java中的类与对象,Java重要知识点概念性解释,结合实例讲解请看下一篇博文

引言&#xff1a; 在Java编程语言中&#xff0c;类&#xff08;Class&#xff09;与对象&#xff08;Object&#xff09;是面向对象编程&#xff08;OOP&#xff09;的核心概念。类可以看作是创建对象的蓝图&#xff0c;而对象则是类的实例。本文将深入解释Java知识体系中的类与…

unity学习(25)——客户端与服务器合力完成注册功能(7)逻辑流程彻底解决

在服务器LoginHandler类中&#xff1a; public void login(Session session, SocketModel model) {LoginDTO loginDto Coding<LoginDTO>.decode(model.Message);//MyLog.form.textAdd("用户申请登录" loginDto.userName " " loginDto.pass…

2024/2/18 图论 最短路入门 floyd 1

目录 Floyd求最短路 854. Floyd求最短路 - AcWing题库 模板】Floyd B3647 【模板】Floyd - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) Floyd求最短路 854. Floyd求最短路 - AcWing题库 思路&#xff1a;在代码里面 完整代码&#xff1a; #include <bits/stdc.h&g…

Quartz---JobDataMap使用的两种方式

任务调度执行原理图&#xff1a; JobDataMap的使用 JobDataMap是Quartz调度器中的一个重要组件&#xff0c;主要用于存储和传递与作业&#xff08;Job&#xff09;相关的数据。它是一个实现了Java Map接口的对象&#xff0c;可以用来保存一系列的序列化的对象。这些对象在作业执…

Langchain的提示词模板

因为做AI项目的过程中&#xff0c;需要处理各种提示词&#xff08;prompt&#xff09;&#xff0c;其中有些是固定的文本&#xff0c;有些是会反复修改的。如果是简单的提示词&#xff0c;直接用python里面的字符串format()或者replace()函数进行处理即可。对于复杂的&#xff…

强大的蓝牙工具Ubertooth系列

Ubertooth One 是一个开源的蓝牙嗅探设备&#xff0c;它提供了多个软件工具来进行蓝牙通信的分析、监视和探测。以下是 Ubertooth One 的一系列软件工具的介绍&#xff1a; Ubertooth&#xff1a;Ubertooth 是 Ubertooth One 的核心驱动程序和硬件抽象层。它允许与 Ubertooth O…

Oracle大型数据库技术

实验环境 sqlplus 记录实验过程的方法&#xff1a; spool 带有绝对路径的文件名 [append] --SQL语句 spool off开启相关服务 oracleserviceORCL 控制面板–管理–服务–找到后开启命令行方法&#xff1a; cmd–net start|stop oracleserviceorcl 常用操作 显示当前用户名 sho…