Flink 离线计算

文章目录

      • 一、样例一:读 csv 文件生成 csv 文件
      • 二、样例二:读 starrocks 写 starrocks
      • 三、样例三:DataSet、Table Sql 处理后写入 StarRocks
      • 四、遇到的坑

        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.9.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.9.1</version></dependency><!--使用Java编程语言支持DataStream / DataSet API的Table&SQL API--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>1.9.1</version><!--<scope>provided</scope>--></dependency><!--表程序规划器和运行时--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.9.1</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.9.1</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency>

一、样例一:读 csv 文件生成 csv 文件

  参考:(3)Flink学习- Table API & SQL编程

import lombok.Data;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;public class SQLWordCount {public static void main(String[] args) throws Exception {// 1、获取执行环境 ExecutionEnvironment (批处理用这个对象)ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);
//        DataSet<WC> input = env.fromElements(
//                WC.of("hello", 1),
//                WC.of("hqs", 1),
//                WC.of("world", 1),
//                WC.of("hello", 1)
//        );// 注册数据集
//        tEnv.registerDataSet("WordCount", input, "word, frequency");// 2、加载数据源到 DataSetDataSet<Student> csv = env.readCsvFile("D:\\tmp\\data.csv").ignoreFirstLine().pojoType(Student.class, "name", "age");// 3、将DataSet装换为TableTable students = bTableEnv.fromDataSet(csv);bTableEnv.registerTable("student", students);// 4、注册student表Table result = bTableEnv.sqlQuery("select name,age from student");result.printSchema();DataSet<Student> dset = bTableEnv.toDataSet(result, Student.class);System.out.println("count-->" + dset.count());dset.print();// 5、sink输出CsvTableSink sink1 = new CsvTableSink("D:\\tmp\\result.csv", ",", 1, FileSystem.WriteMode.OVERWRITE);String[] fieldNames = {"name", "age"};TypeInformation[] fieldTypes = {Types.STRING, Types.INT};bTableEnv.registerTableSink("CsvOutPutTable", fieldNames, fieldTypes, sink1);result.insertInto("CsvOutPutTable");env.execute("SQL-Batch");}@Datapublic static class Student {private String name;private int age;}
}

  准备测试文件 data.csv

name,age
zhangsan,23
lisi,43
wangwu,12

  运行程序后会生成 D:\\tmp\\result.csv 文件。

二、样例二:读 starrocks 写 starrocks

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;public class SQLWordCount {public static void main(String[] args) throws Exception {TypeInformation[] fieldTypes = {Types.STRING, Types.INT};RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("select * from student").setRowTypeInfo(rowTypeInfo).finish();final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 方式一DataSource s = env.createInput(jdbcInputFormat);s.output(JDBCOutputFormat.buildJDBCOutputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("insert into student values(?, ?)").finish());// 方式二
//        DataSet<Row> dataSource = env.createInput(jdbcInputFormat);
//
//        dataSource.output(JDBCOutputFormat.buildJDBCOutputFormat()
//                .setDrivername("com.mysql.jdbc.Driver")
//                .setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
//                .setUsername("root").setPassword("")
//                .setQuery("insert into student values(?, ?)")
//                .finish()
//        );env.execute("SQL-Batch");}
}

  数据准备:

CREATE TABLE student (name STRING,age INT
) ENGINE=OLAP 
DUPLICATE KEY(`name`)
DISTRIBUTED BY RANDOM
PROPERTIES (
"compression" = "LZ4",
"fast_schema_evolution" = "false",
"replicated_storage" = "true",
"replication_num" = "1"
);insert into student values('zhangsan', 23);

参考:
flink 读取mysql源 JDBCInputFormat、自定义数据源
flink1.10中三种数据处理方式的连接器说明
flink读写MySQL的两种方式

注意:如果运行 java -cp flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar com.xiaoqiang.app.SQLWordCount 时报错:Exception in thread "main" com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf @ jar:file:flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar!/reference.conf: 875: Could not resolve substitution to a value: ${akka.stream.materializer}

  解决:报错:Flink Could not resolve substitution to a value: ${akka.stream.materializer}

    <build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><!--<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>flink.KafkaDemo1</mainClass></transformer>--><transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer></transformers></configuration></execution></executions></plugin></plugins></build>

三、样例三:DataSet、Table Sql 处理后写入 StarRocks

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.types.Row;public class SQLWordCount {public static void main(String[] args) throws Exception {TypeInformation[] fieldTypes = {Types.STRING, Types.INT};RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("select * from student").setRowTypeInfo(rowTypeInfo).finish();ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);DataSet<Row> dataSource = env.createInput(jdbcInputFormat);dataSource.print();Table students = bTableEnv.fromDataSet(dataSource);bTableEnv.registerTable("student", students);Table result = bTableEnv.sqlQuery("select name, age from (select f0 as name, f1 as age from student) group by name, age");result.printSchema();DataSet<Row> dset = bTableEnv.toDataSet(result, Row.class);dset.output(JDBCOutputFormat.buildJDBCOutputFormat().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8").setUsername("root").setPassword("").setQuery("insert into student values(?, ?)").finish());env.execute("SQL-Batch");}
}

四、遇到的坑

  坑1:Bang equal '!=' is not allowed under the current SQL conformance level
  解决:将 sql 中的 != 修改为 <>

  坑2:java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
  解释:在最后一行代码 env.execute() 执行的时候,没有新的数据接收器被定义,对于 Flink 批处理而前一行代码 result.print() 已经触发了代码的执行和输出,所以再执行 env.execute(),就是多余的了,因此报了上面的异常。
  解决方法:去掉最后一行代码 env.execute(); 就可以了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/61142.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习视频编解码开源项目介绍【持续更新】

DVC (Deep Video Compression) 介绍&#xff1a;DVC (Deep Video Compression) 是一个基于深度学习的视频压缩框架&#xff0c;它的目标是通过深度神经网络来提高视频编码的效率&#xff0c;并降低比特率&#xff0c;同时尽可能保持视频质量。DVC 是一个端到端的神经网络模型&…

RabbitMQ 应用问题

文章目录 1. 幂等性保障什么是幂等性MQ 的幂等性如何处理消息重复的问题1. 全局唯一ID2. 业务逻辑判断 2. 顺序性保障什么是顺序性保障什么情况会打破RabbitMQ的顺序性顺序性保障方案 3. 消息积压什么是消息积压造成消息积压的原因解决消息积压的方案 结论 1. 幂等性保障 什么…

【数据库系列】MySQL基础知识:深入理解DDL、DML与DQL操作

MySQL是一个开源的关系型数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;广泛用于数据存储和管理。理解MySQL的基本操作至关重要&#xff0c;尤其是数据定义语言&#xff08;DDL&#xff09;、数据操作语言&#xff08;DML&#xff09;和数据查询语言&#xff08;DQL…

PAT1085 Perfect Sequence(25)

//判断是否是连续的数 //判断是否只能第一个数是最小值 #include <cstdio> #include <algorithm> typedef long long ll; using namespace std; int n,p; const int maxn 100010; int arr[maxn];int binary(int l, int r, ll tgt){if(arr[n-1] < tgt) return n…

git 本地同步远端分支

一、关联远程仓库 本地仓库关联远端仓库 git remote add origin https://github.com/user/repository.git 二、获取远程分支信息 获取远程仓库的最新分支信息 git fetch origin 三、创建或切换到本地分支以跟踪远程分支 1. 创建分支 创建分支并关联到远端分支 git bra…

Shell 编程基础知识

为什么要学 Shell&#xff1f; 学一个东西&#xff0c;我们大部分情况都是往实用性方向着想。从工作角度来讲&#xff0c;学习 Shell 是为了提高我们自己工作效率&#xff0c;提高产出&#xff0c;让我们在更少的时间完成更多的事情。 很多人会说 Shell 编程属于运维方面的知…

深度学习-48-AI应用实战之基于face_recognition的人脸识别

文章目录 1 人脸识别1.1 识别原理1.2 应用场景2 python实现人脸识别2.1 windows安装face_recognition2.2 安装问题及解决3 使用示例3.1 人脸区域检测3.2 对齐与编码3.3 人脸匹配3.4 信息录入4 附录4.1 函数cv2.rectangle4.2 参考附录1 人脸识别 通过图片或者摄像头的方式,将识…

深入浅出UART驱动开发与调试:从基础调试到虚拟驱动实现

往期内容 本专栏往期内容&#xff1a;Uart子系统 UART串口硬件介绍深入理解TTY体系&#xff1a;设备节点与驱动程序框架详解Linux串口应用编程&#xff1a;从UART到GPS模块及字符设备驱动 解UART 子系统&#xff1a;Linux Kernel 4.9.88 中的核心结构体与设计详解IMX 平台UART驱…

vs 项目属性表

解释 在Vistual Studio中&#xff0c;属性包含项目编译生成所需的信息 此信息包括应用程序名称、扩展名&#xff08;如 DLL、LIB、EXE&#xff09;、编译器选项、链接器选项、调试器设置、自定义生成步骤等 项目节点右键->属性&#xff0c;访问属性页要访问属性页&#xff…

空洞武士3

能帮到你的话&#xff0c;就给个赞吧 &#x1f618; 文章目录 collisionBox.hcollisionLayer.hcollisionManager.h collisionBox.h #pragma once #include "collisionLayer.h" #include "vector2.h" #include <functional>class CollisionManager; …

golang支持线程安全和自动过期map

在 Golang 中&#xff0c;原生的 map 类型并不支持并发安全&#xff0c;也没有内置的键过期机制。不过&#xff0c;有一些社区提供的库和方案可以满足这两个需求&#xff1a;线程安全和键过期。 1. 使用 sync.Map&#xff08;线程安全&#xff0c;但不支持过期&#xff09; Go…

Linux网络——IO模型和多路转接

通常所谓的IO&#xff0c;其本质就是等待通信和进行通信&#xff0c;即IO 等 拷贝。 那么想要做到高效的IO&#xff0c;就要在单位时间内&#xff0c;减少“等”的比重。 一.五种IO模型 阻塞 IO: 在内核将数据准备好之前, 系统调用会一直等待. 所有的套接字, 默认都是阻塞方…

VM Virutal Box的Ubuntu虚拟机与windows宿主机之间设置共享文件夹(自动挂载,永久有效)

本文参考如下链接 How to access a shared folder in VirtualBox? - Ask Ubuntu &#xff08;1&#xff09;安装增强功能&#xff08;Guest Additions&#xff09; 首先&#xff0c;在网上下载VBoxGuestAdditions光盘映像文件 下载地址&#xff1a;Index of http://…

AI的魔力:如何为开源软件注入智慧,开启无限可能

“AI的魔力&#xff1a;如何为开源软件注入智慧&#xff0c;开启无限可能” 引言&#xff1a; 在科技发展的浪潮中&#xff0c;开源软件生态一直扮演着推动创新与共享的重要角色。从Linux到Python&#xff0c;开源项目赋予了开发者全球协作的机会&#xff0c;推动了技术的飞速…

uniapp条件注释/跨端兼容

文章目录 条件编译处理多端差异1.组件中条件编译2.API条件编译 条件编译处理多端差异 #为什么选择条件编译处理跨端兼容 uni-app 已将常用的组件、API封装到框架中&#xff0c;开发者按照 uni-app 规范开发即可保证多平台兼容&#xff0c;大部分业务均可直接满足。 但每个平…

【拥抱AI】一文讲述如何配置Milvus?

配置Milvus是一个重要的步骤&#xff0c;它可以帮助你更好地管理和优化向量数据库的性能。以下是一些常见的配置选项和步骤&#xff0c;帮助你设置和优化Milvus。 1. 安装Milvus 首先&#xff0c;确保你已经安装了Milvus。你可以使用Docker来快速部署Milvus。以下是一个基本的…

【网络安全】CSRF

一、什么是CSRF CSRF&#xff08;Cross-Site Request Forgery&#xff09;是一种web应用程序安全漏洞&#xff0c;它利用了用户在已登录的状态下的信任&#xff0c;通过欺骗用户发送未经授权的请求来执行恶意操作。这种攻击的危害性取决于受害者在目标网站上的权限。 二、CSR…

OpenCV相机标定与3D重建(5)鱼眼镜头畸变校正的函数estimateNewCameraMatrixForUndistortRectify()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 估计用于去畸变或校正的新相机内参矩阵。 cv::fisheye::estimateNewCameraMatrixForUndistortRectify 是 OpenCV 中用于鱼眼镜头畸变校正的一个函…

IThenticate 查重有无免费午餐?深度解析

经历过论文“折磨”的过来人&#xff0c;深知查重工具是写论文不可或缺的助手。而 iThenticate 查重系统&#xff0c;深受出版商、学术机构和研究人员喜爱。不过&#xff0c;每次看到它那昂贵的价格&#xff0c;就让很多小伙伴直呼&#xff0c;IThenticate查重系统就没有免费的…

启动SpringBoot

前言&#xff1a;大家好我是小帅&#xff0c;今天我们来学习SpringBoot 文章目录 1. 环境准备2. Maven2.1 什么是Maven2.2 创建⼀个Maven项⽬2.3 依赖管理2.3.1 依赖配置2.3.2 依赖传递2.3.4 依赖排除2.3.5 Maven Help插件&#xff08;plugin&#xff09; 2.4 Maven 仓库2.6 中…