flink重温笔记(十七): flinkSQL 顶层 API ——SQLClient 及流批一体化

Flink学习笔记

前言:今天是学习 flink 的第 17 天啦!学习了 flinkSQL 的客户端工具 flinkSQL-client,主要是解决大数据领域数据计算避免频繁提交jar包,而是简单编写sql即可测试数据,文章中主要结合 hive,即编写 flinksql 可以操作 hive 中的数据表,以及流批一体化:kafak 将数据写入到 hive中,结合自己实验猜想和代码实践,总结了很多自己的理解和想法,希望和大家多多交流!

Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!

喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"


文章目录

  • Flink学习笔记
    • 三、FlinkSQL Client
      • 1. 启动 Hive 组件
      • 2. Flink-Client 准备操作
      • 3. Flink-Client 三种显示格式
      • 4. 与 hive 结合的初始化
      • 5. 两种方式创建 Hive 相关库表
        • 5.1 SQL 方式
        • 5.2 Table-API 方式
      • 6. 基于 Catalog 的数据库操作
      • 7. kafka 通过 FlinkSQL Client 写入 Hive
        • 7.1 flink 准备 jar 包
        • 7.2 案例演示

三、FlinkSQL Client

1. 启动 Hive 组件

###############################################################	
# 记得先启动 hadoop 不然连接 beeline 时报错:
Error: Could not open client transport with JDBC Uri: jdbc:hive2://node1:10000: java.net.ConnectException: 拒绝连接 (Connection refused) (state=08S01,code=0)##hive:##nohup hive --service metastore & >> nohup.outnohup hive --service hiveserver2 & >> hiveserver2.outbeeline!connect jdbc:hive2://node1:10000关闭:ps -ef | grep hivekill -9 pid
###############################################################

2. Flink-Client 准备操作

可以在建表或者建库的时候,可以直接编写 SQL 语句提交,web 页面也能显示 Running,省去打包操作!

  • flink 的 lib 目录下 jar 包

基于 flink 1.3.1 ,hive 2.1.1,jar 包放在我的资源区了,大家有需要可以下载一下!

flink-connector-hive_2.11-1.13.1.jar
hive-exec-2.1.1.jar
antlr-runtime-3.4.jar
  • 需要提前启动 flink 集群模式
./bin/start-cluster.sh
  • 进入 Flink Client 操作
./bin/sql-client.sh embedded
  • 退出操作
quit;

3. Flink-Client 三种显示格式

  • 1- 表格模式
SET sql-client.execution.result-mode=table;
  • 2- 变更日志模式
SET sql-client.execution.result-mode=changelog;
  • 3- Tableau 模式
SET sql-client.execution.result-mode=tableau;

4. 与 hive 结合的初始化

创建 init.sql

# 显示打印错误日志信息
SET sql-client.verbose = true;
CREATE CATALOG myhive WITH ('type' = 'hive','hive-conf-dir'='/export/server/flink-1.13.1/conf');USE CATALOG myhive;CREATE DATABASE if not exists itcast_flinksql;USE itcast_flinksql;

初始化启动:

./sql-client.sh -i init.sql

5. 两种方式创建 Hive 相关库表

现将 hive 的 hive-site.xml 文件保存到 src/main/resource 目录下

5.1 SQL 方式
package cn.itcast.day01.catalog;/*** @author lql* @time 2024-03-14 09:38:43* @description TODO*/import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;import java.util.stream.Stream;/*** 用户可以使用 DDL 通过 Table API 或者 SQL Client 在 Catalog 中创建表*/
public class SqlDDLDemo {public static void main(String[] args) {//todo 0)设置当前hadoop操作的用户名System.setProperty("HADOOP_USER_NAME", "root");//todo 1)初始化flink流处理的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings bbSetting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, bbSetting);//todo 2)创建HiveCatalogString catalogName = "myHive";String databaseName = "itcast_flinksql";HiveCatalog catalog = new HiveCatalog(catalogName, //指定catalog的名字"default", //默认数据库的名字"src/main/resources", //指定hive-site.xml文件的路径"2.1.1" //指定hive的版本);//todo 3)注册目录System.out.println("===========注册目录==================");tabEnv.registerCatalog(catalogName, catalog);//todo 4)切换目录System.out.println("===========切换目录==================");tabEnv.useCatalog(catalogName);//todo 5)创建数据库System.out.println("===========创建数据库==================");String createDBSql = "CREATE DATABASE IF NOT EXISTS "+catalogName+"."+databaseName;tabEnv.executeSql(createDBSql);//todo 6)切换数据库System.out.println("===========切换数据库==================");tabEnv.useDatabase(databaseName);//todo 7)创建表System.out.println("===========创建表==================");String createTableSql = "CREATE TABLE IF NOT EXISTS mytable(name string, age int)";tabEnv.executeSql(createTableSql);//todo 8)查询所有的表System.out.println("===========创建表==================");tabEnv.listTables();}
}

结果:成功注册了目录,数据库,表!

总结:

  • 1- 需要设置 hadoop 权限
  • 2- new 一个 HiveCatalog 对象

5.2 Table-API 方式
package cn.itcast.day01.catalog;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import java.util.HashMap;
import java.util.Map;/*** @author lql* @time 2023-03-14 09:10:58* @description TODO: 用户可以用编程的方式使用Java 或者 Scala 来创建 Catalog 表。*/
public class JavaClientDemo {public static void main(String[] args) throws DatabaseAlreadyExistException, TableAlreadyExistException, DatabaseNotExistException {// 设置当前hadoop操作的用户名System.setProperty("HADOOP_USER_NAME","root");// 创建流处理环境,设置并行度为 5StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();streamEnv.setParallelism(5);// 设置 table 环境,使用 blink planner,并开启流式查询模式EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings);// 定义 Hive Catalog 名称、数据库名称、表格名称等信息String catalogName = "myhive";String databaseName = "itcast_flinksql";String tableName = "test";// 创建一个 HiveCatalog 实例,用于访问 Hive 中的表格和数据库HiveCatalog catalog = new HiveCatalog(catalogName,                   // Catalog 名称"default",                // 默认使用的数据库"src/main/resources",  // Hive 配置文件的目录路径(hive-site.xml)"2.1.1"                   // Hive 版本号);// 注册 HiveCatalog// 在注册之前需要保证在指定目录下有 metadata 目录,并且 metadata 目录下没有 myhive 目录,否则会失败tableEnv.registerCatalog(catalogName, catalog);tableEnv.useCatalog(catalogName);// 创建数据库System.out.println("---------------创建数据库------------------------");catalog.createDatabase(databaseName,new CatalogDatabaseImpl(new HashMap<>(),"my_comments"),true);// 创建表格System.out.println("--------------创建表table-----------------------------");TableSchema schema = TableSchema.builder().field("name", DataTypes.STRING()).field("age", DataTypes.INT()).build();Map<String,String> properties = new HashMap<>();/*** flink 通用表将具有is_generic=true.*/properties.put("is_generic",String.valueOf(true));catalog.createTable(new ObjectPath(databaseName,tableName),new CatalogTableImpl(schema,properties,"my_comments"),true);System.out.println(tableName);}
}

结果:成功注册了目录,数据库,表!

总结:

  • 1- 建立数据库时,需要 new 一个 CatalogDatabaseImpl
  • 2- 创建表格式,需要先定义数据结构:schema
  • 3- 设置 hadoop 操作的用户名

6. 基于 Catalog 的数据库操作

例子:对 hive 数据库进行创建、删除、判断等操作的代码 java 实现

package cn.itcast.day01.catalog;/*** @author lql* @time 2024-03-14 09:37:16* @description TODO*/
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;import java.util.HashMap;/*** 数据库操作*/
public class DatabaseOperater {public static void main(String[] args) throws DatabaseAlreadyExistException, DatabaseNotEmptyException, DatabaseNotExistException {//设置当前hadoop操作的用户名System.setProperty("HADOOP_USER_NAME", "root");StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();streamEnv.setParallelism(5);EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings);String catalogName = "myhive";String databaseName = "itcast_flinksql";String tableName = "test";HiveCatalog catalog = new HiveCatalog(catalogName,                   // catalog name"default",                // default database"src/main/resources",  // Hive config (hive-site.xml) directory"2.1.1"                   // Hive version);//注册目录tableEnv.registerCatalog(catalogName, catalog);tableEnv.useCatalog(catalogName);System.out.println("---------------创建数据库------------------------");catalog.createDatabase(databaseName,new CatalogDatabaseImpl(new HashMap<>(), "my comment"), true);System.out.println("---------------删除数据库------------------------");catalog.dropDatabase(databaseName, true, true);System.out.println("---------------验证数据库是否存在------------------------");boolean result = catalog.databaseExists(databaseName);System.out.println("---------------"+result+"------------------------");System.out.println("---------------在目录中列出数据库------------------------");catalog.listDatabases();}
}

总结:需要 new 一个 HashMap<>()


7. kafka 通过 FlinkSQL Client 写入 Hive

7.1 flink 准备 jar 包
flink-sql-connector-kafka_2.12-1.13.1.jar
flink-connector-kafka_2.12-1.13.1.jar
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar# 如果是集群模式,需要将 lib 包分发到各台机器
7.2 案例演示
  • 准备工作:
# 启动 zookeeper
# 启动 hadoop
# 启动 hive
# 启动 flink cluster 模式
# 进入 flinksql client 客户端,注意要用前面的 init.sql 脚本初始化!
  • 逻辑 sql:
CREATE TABLE IF NOT EXISTS `order`(id INT,category STRING,areaName STRING,money INT,`timestamp` BIGINT,eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 1000, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间WATERMARK FOR eventTime AS eventTime - INTERVAL '10' SECOND -- 水印
) WITH ('connector' = 'kafka','topic' = 'order',                        -- 指定消费的topic'scan.startup.mode' = 'latest-offset',    -- 指定起始offset位置'properties.zookeeper.connect' = 'node1:2181','properties.bootstrap.servers' = 'node1:9092','properties.group.id' = 'order_01','format' = 'json','json.ignore-parse-errors' = 'true'
);
  • 启动kafka生产者的数据:
{"id":1,"timestamp":1588870980000,"category":"电脑","areaName":"石家庄","money":"1450"}
{"id":2,"timestamp":1588870860000,"category":"手机","areaName":"北京","money":"1450"}
{"id":3,"timestamp":1588870980000,"category":"手机","areaName":"北京","money":"8412"}
{"id":4,"timestamp":1588885260000,"category":"电脑","areaName":"上海","money":"1513"}
{"id":5,"timestamp":1588870980000,"category":"家电","areaName":"北京","money":"1550"}
{"id":6,"timestamp":1588870860000,"category":"电脑","areaName":"深圳","money":"1550"}

结果:kafka 数据源源不断写入到 hive 表中

总结:

  • 1- 在 hive 中可以看到表,但是不能查询数据(报错 Error),因为这个表是 flink 通用表;
  • 2- 如果想要建 hive 兼容表,需要通用表将具有 is_generic = true,改为 is_generic = False。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/747994.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

初探文件包含漏洞

目录 1.什么是文件包含漏洞2.漏洞分类3.php中常见的文件包含函数4.文件包含漏洞的绕过方法4.1本地文件包含&#xff08;LFI&#xff09;绕过方法&#xff1a;4.2远程文件包含&#xff08;RFI&#xff09;绕过方法&#xff1a; 5.对于文件包含漏洞的防御措施 1.什么是文件包含漏…

SpringBoot3整合Elasticsearch8.x之全面保姆级教程

整合ES 环境准备 安装配置ES&#xff1a;https://blog.csdn.net/qq_50864152/article/details/136724528安装配置Kibana&#xff1a;https://blog.csdn.net/qq_50864152/article/details/136727707新建项目&#xff1a;新建名为web的SpringBoot3项目 elasticsearch-java 公…

Hive实现查询左表有右表没有的记录

工作中遇到这样一个场景&#xff0c;业务逻辑是&#xff1a;如果一个主体发生了某一问题&#xff0c;就不再统计该主体的其他问题。 思路&#xff1a;首先想到的方法就是not in方法&#xff0c;但是Hive并不不支持。那么使用left join对两个表进行连接&#xff0c;右表主键为空…

uploads-labs靶场(1-10关)

一、搭建环境: 下载upload-labs源代码 下载链接&#xff1a;https://codeload.github.com/c0ny1/upload-labs/zip/refs/heads/master 将压缩包解压后的文件名改为upload-labs&#xff0c;然后放入phpstudy\www目录下 二、关卡通关: 1、pass-01&#xff08;前端绕过&#xf…

B. Array Fix

思路&#xff1a;我们倒着看&#xff0c;首先判断以下当前元素有没有被操作过&#xff0c;被操作过的话&#xff0c;那么需要改为操作后的数&#xff0c;然后跟当前数的前一个数进行比较&#xff0c;如果a[i] < a[i - 1]的话&#xff0c;那么需要将a[i - 1]拆分&#xff0c;…

【SpringBoot】头条新闻项目实现CRUD登录注册

文章目录 一、头条案例介绍二、技术栈介绍三、前端搭建四、基于SpringBoot搭建项目基础架构4.1 数据库脚本执行4.2 搭建SprintBoot工程4.2.1 导入依赖:4.2.2 编写配置4.2.3 工具类准备 4.3 MybatisX逆向工程 五、后台功能开发5.1 用户模块开发5.1.1 jwt 和 token 介绍5.1.2 jwt…

huawei services HK华为云服务

huaweiserviceshk是一种云计算服务&#xff0c;为华为云服务用户提供了多种服务&#xff0c;包括云服务器、数据库、存储、网络等&#xff0c;用户可以根据自己的需求选择不同的服务并支付相应的费用 如何付费呢&#xff0c;这里可以使用441112&#xff0c;点击获取 卡片信息在…

Makefile+OpenOCD开发STM32

准备工作 平台&#xff1a;Windows11&#xff08;Linux&#xff0c;MAC同理&#xff09; 编译链&#xff08;arm-none-eabi-gcc&#xff09;&#xff1a;Downloads | GNU Arm Embedded Toolchain Downloads – Arm Developer 下载对应平台工具链并添加到环境变量&#xff0c…

springboot+poi-tl根据模板导出word(含动态表格和图片),并将导出的文档压缩zip导出

springbootpoi-tl根据模板导出word&#xff08;含动态表格和图片&#xff09; 官网&#xff1a;http://deepoove.com/poi-tl/ 参考网站&#xff1a;https://blog.csdn.net/M625387195/article/details/124855854 pom导入的maven依赖 <dependency><groupId>com.dee…

hcie数通和云计算选哪个好?

1. 基础知识与技能要求 数通技术是网络技术的核心&#xff0c;它涉及到网络协议、路由交换、网络安全等多个方面。如果你是一名网络工程师或开发者&#xff0c;想要在数通领域有所建树&#xff0c;你需要具备扎实的基础知识和丰富的实战经验。 云计算则更注重于虚拟化、存储、网…

基于openCV实现的单目相机行人和减速带检测

概述 在计算机视觉项目中&#xff0c;相机标定是一项至关重要的任务&#xff0c;因为它可以校正相机内部参数&#xff0c;消除因镜头畸变等因素导致的图像失真&#xff0c;从而提高后续图像处理和分析的精度。在这个项目中&#xff0c;相机标定的核心功能集成在名为calibratio…

redis的安装与string类型

1. redis的安装 1.1 升级gcc版本 因为centos7.x的gcc版本还是4.8.5&#xff0c;而编译指定的版本是需要5.3以上。 环境部署与安装scl源 yum install gcc cmake -y --部署安装环境 yum install centos-release-scl scl-utils-build -y --安装scl源 安装gcc新版本 yum -y ins…

还原wps纯粹的编辑功能

1.关闭稻壳模板&#xff1a; 1.1. 启动wps(注意不要乱击稻壳模板&#xff0c;点了就找不到右键菜单了) 1.2. 在稻壳模板选项卡右击&#xff1a;选不再默认展示 2.关闭托盘中wps云盘图标&#xff1a;右击云盘图标/同步与设置&#xff1a; 2.1.关闭云文档同步 2.2.窗口选桌面应用…

Vue2+ElementUI表单、Form组件的封装

Vue2ElementUI表单、Form组件的封装 &#xff1a;引言 在 Vue2 项目中&#xff0c;ElementUI 的 el-form 组件是常用的表单组件。它提供了丰富的功能和样式&#xff0c;可以满足各种需求。但是&#xff0c;在实际开发中&#xff0c;我们经常会遇到一些重复性的需求&#xff0c…

16.WEB渗透测试--Kali Linux(四)

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 内容参考于&#xff1a; 易锦网校会员专享课 上一个内容&#xff1a;15.WEB渗透测试--Kali Linux&#xff08;三&#xff09;-CSDN博客 1.crunch简介与使用 C…

分布式CAP理论

CAP理论&#xff1a;一致性&#xff08;Consistency&#xff09;、可用性&#xff08;Availability&#xff09;和分区容错性&#xff08;Partition tolerance&#xff09;。是Eric Brewer在2000年提出的&#xff0c;用于描述分布式系统基本性质的定理。这三个性质在分布式系统…

SQLZoo:SELECT from WORLD Tutorial/zh

name continent area population gdp Afghanistan Asia 652230 25500100 20343000000 Albania Europe 28748 2831741 12960000000 … name:國家名稱 continent:洲份 area:面積 population:人口 gdp:國內生產總值 Country Profile 在這教程中&#xff0c;我們會使用SELECT語句&…

python pytest 最简单的接口自动化测试框架

最近由于工作的原因&#xff0c;需要开发一个接口自动化测试框架&#xff0c;使用pytest框架、数据驱动&#xff0c;并展示直观的测试报告。 具体的开发过程如下&#xff1a; 安装必要的库&#xff1a; pytest&#xff1a;用于编写和运行测试用例。requests&#xff1a;用于发…

FPGA静态时序分析与约束(一)、理解亚稳态

系列文章目录 FPGA静态时序分析与约束&#xff08;二&#xff09;、时序分析 FPGA静态时序分析与约束&#xff08;三&#xff09;、读懂vivado时序报告 文章目录 系列文章目录前言一、概述一、何为亚稳态&#xff1f;二、图解亚稳态三、什么时候亚稳态会导致系统失效&#xff…

Jsch实践(三):如何使用Jsch的ChannelShell类,在远程服务器上执行脚本命令?

如何使用Jsch的ChannelShell类&#xff0c;在远程服务器上执行脚本命令 要使用JSch的ChannelShell类在远程服务器上执行脚本命令&#xff0c;你需要创建一个shell通道&#xff0c;然后向这个通道的输入流发送命令&#xff0c;并读取输出流来获取命令的响应。下面是一个示例代码…