大数据组件(三)快速入门实时计算平台Dinky

大数据组件(三)快速入门实时计算平台Dinky

  • Dinky 是一个开箱即用的一站式实时计算平台(同样,还有StreamPark),以 Apache Flink 为基础,连接数据湖仓等众多框架,致力于流批一体和湖仓一体的建设与实践。

  • Dinky 是基于Spring Boot 轻应用,不需要在任何 Flink 集群修改源码或添加额外插件,无感知连接和监控Flink 集群。

    Image placeholder

  • 今天,我们以一个简单案例快速了解下实时计算平台Dinky:

    • 官网:https://www.dinky.org.cn/
    • GitHub:https://github.com/DataLinkDC/dinky

1 Dinky的下载及安装

  • 下载地址:https://www.dinky.org.cn/download/download
  • Dinky最新版本已经到了1.2.0,这里采用0.7.3版本为示例进行安装。
# 1、初始化dinky表
[root@centos01 apps]# tar -zxvf dlink-release-0.7.3.tar.gz
[root@centos01 apps]# mv dlink-release-0.7.3 dlink-0.7.3# Dinky采用mysql作为后端的存储库,部署需要MySQL5.7以上版本
# 创建数据库
mysql>CREATE DATABASE dinky;# 创建用户并允许远程登录
mysql>create user 'dinky'@'%' IDENTIFIED WITH mysql_native_password by 'dinky';# 授权
mysql>grant ALL PRIVILEGES ON dinky.* to 'dinky'@'%';mysql>flush privileges;# 登录创建好的dinky用户,执行初始化sql文件
mysql -udinky -pdinky
mysql> use dinky;
mysql> source /opt/apps/dlink-0.7.3/sql/dinky.sql# 2、修改Dinky连接mysql的配置文件
[root@centos01 dlink-0.7.3]# vim ./config/application.yml
spring:datasource:url: jdbc:mysql://${MYSQL_ADDR:centos01:3306}/${MYSQL_DATABASE:dinky}?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=trueusername: ${MYSQL_USERNAME:dinky}password: ${MYSQL_PASSWORD:dinky}driver-class-name: com.mysql.cj.jdbc.Driverapplication:name: dlinkmvc:pathmatch:matching-strategy: ant_path_matcherformat:date: yyyy-MM-dd HH:mm:ss#json格式化全局配置jackson:time-zone: GMT+8date-format: yyyy-MM-dd HH:mm:ssmain:allow-circular-references: true# 3、加载依赖
# 需要在Dinky根目录下plugins/flink${FLINK_VERSION}文件夹并上传相关的Flink依赖
# 我这里使用的是flink1.14版本
cp /opt/apps/flink-1.14.4/lib/* /opt/apps/dlink-0.7.3/plugins/flink1.14# Dinky当前版本的yarn的perjob与application执行模式依赖flink-shade-hadoop,需要额外添加flink-shade-hadoop-uber-3包。
[root@centos01 plugins]# ll
total 58212
drwxr-xr-x. 3 root root     4096 Jan  6 17:27 flink1.14
-rw-r--r--. 1 root root 59604787 Dec 23 18:24 flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar# 注意:这里我放置了geohash-1.4.0.jar和fastjson-1.2.75.jar
# 还放了doris的connector
# 这是为了后面使用自定义函数
[root@centos01 plugins]# ll flink1.14/
total 240560
-rw-r--r--. 1 root root     53820 Jan  3 23:34 commons-cli-1.4.jar
-rw-r--r--. 1 root root    655085 Jan  6 17:27 fastjson-1.2.75.jar
-rw-r--r--. 1 tom  1001     85584 Feb 25  2022 flink-csv-1.14.4.jar
-rw-r--r--. 1 tom  1001 136063964 Feb 25  2022 flink-dist_2.12-1.14.4.jar
-rw-r--r--. 1 root root   8077256 Dec 23 13:43 flink-doris-connector-1.14_2.12-1.1.1.jar
-rw-r--r--. 1 tom  1001    153145 Feb 25  2022 flink-json-1.14.4.jar
-rw-r--r--. 1 root root  59604787 Dec 23 18:25 flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
-rw-r--r--. 1 tom  1001   7709731 Sep 10  2021 flink-shaded-zookeeper-3.4.14.jar
-rw-r--r--. 1 tom  1001  39635530 Feb 25  2022 flink-table_2.12-1.14.4.jar
-rw-r--r--. 1 root root     25422 Jan  6 15:48 geohash-1.4.0.jar
-rw-r--r--. 1 root root   1654887 Dec 24 21:16 hadoop-mapreduce-client-core-3.1.1.jar
-rw-r--r--. 1 tom  1001    208006 Jan 13  2022 log4j-1.2-api-2.17.1.jar
-rw-r--r--. 1 tom  1001    301872 Jan  7  2022 log4j-api-2.17.1.jar
-rw-r--r--. 1 tom  1001   1790452 Jan  7  2022 log4j-core-2.17.1.jar
-rw-r--r--. 1 tom  1001     24279 Jan  7  2022 log4j-slf4j-impl-2.17.1.jar# 另外,需要注意的是:这里把除了flink1.14之外的目录全部删除了,否则在使用的过程中会报错如下:
Caused by: java.io.IOException: Cannot find any jar files for plugin in directory [plugins/flink1.11]. Please provide the jar files for the plugin or delete the directory.at org.apache.flink.core.plugin.DirectoryBasedPluginFinder.createJarURLsFromDirectory(DirectoryBasedPluginFinder.java:103)at org.apache.flink.core.plugin.DirectoryBasedPluginFinder.createPluginDescriptorForSubDirectory(DirectoryBasedPluginFinder.java:88)at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)# 4、上传jar包到hdfs
# 使用Application模式时,需要将flink和dinky相关的包上传到HDFS
hdfs dfs -mkdir -p /dinky/jar/
hdfs dfs -put /opt/apps/dlink-0.7.3/jar/dlink-app-1.14-0.7.3-jar-with-dependencies.jar /dinky/jarhdfs dfs -mkdir /flink-dist
hdfs dfs -put /opt/apps/flink-1.14.4/lib /flink-dist
hdfs dfs -put /opt/apps/flink-1.14.4/plugins /flink-dist# 5、启动、关闭
cd /opt/apps/dlink-0.7.3
[root@centos01 dlink-0.7.3]# ./auto.sh start 1.14
[root@centos01 dlink-0.7.3]# jps
1688 Dlink# web相关
web页面:http://centps01:8888
默认用户名/密码: admin/admin# 关闭命令
[root@centos01 dlink-0.7.3]# ./auto.sh stop

在这里插入图片描述

2 案例讲解_快速入门实时计算平台Dinky

  • 我们用下面的案例,快速了解下如何基于Dinky进行开发flink-sql程序
  • 如下图,我们读取kafka的日志事件,去look-up join在Hbase库中的维表,打宽后再sink到kafka中,并冗余一份数据到doris中
  • 平台更多的用法还是参考官方文档:https://www.dinky.org.cn/docs/next/get_started/overview

在这里插入图片描述

2.1 集群配置

  • 集群实例管理适用于 Standalone,Yarn Session 和 Kubernetes Session 这三种集群实例的注册
  • 集群配置管理适用于 Yarn Per-job、Yarn Application 和 Kubernetes Application 这三种类型配置。
# 这里为方便使用Standalone集群
[root@centos01 dlink-0.7.3]# /opt/apps/flink-1.14.4/bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host centos01.
Starting taskexecutor daemon on host centos01.# web页面
http://centos01:8081
  • 注册Standalone集群

在这里插入图片描述

  • 这里再展示下Flink on yarn的注册

在这里插入图片描述

在这里插入图片描述

  • 配置中心配置下提交 FlinkSQL 的 Jar 文件路径

在这里插入图片描述

2.2 Flink 连接配置变量(user_info2hbase任务)

  • 我们需要将mysql用户数据同步到hbase中,然后利用flink-sql关联hbase

  • 这里我们直接使用flink-cdc同步数据

  • 利用Dinky,我们可以自动生成flink-sql中建表语句
    在这里插入图片描述

在这里插入图片描述

-- Flink 连接配置: (可以放入公共参数,及其敏感信息参数)'hostname' = '192.168.42.104','port' = '3306','username' = 'root','password' = '123456'-- Flink 连接模板: 注意引用变量的前后逗号, 使用此方式作业右侧必须开启全局变量
-- ${schemaName} 动态获取数据库,${tableName} 动态获取表名称'connector' = 'mysql-cdc',${Centos04-Mysql},'scan.incremental.snapshot.enabled' = 'true','debezium.snapshot.mode'='latest-offset','database-name' = '${schemaName}','table-name' = '${tableName}'
  • 添加完数据源,在元数据中心可以访问

在这里插入图片描述

  • 下面在数据开发中可以创建flink-sql作业:

    • ADD JAR语句用于将用户jar添加到classpath。利用Mysql-cdc需要connector,我们flink/lib目录中并无此Jar包,因此这里我们添加此jar包;

    • 执行模式,我们选择Standalone,然后Flink集群就会出现我们之前配置过的集群信息;

      在这里插入图片描述

    • 这里的DefaultCatalog是dinky自己实现了 mysql-catalog,而非flink原生基于内存的catalog。因此,程序结束后,在结构中(默认my_catalog.default_database),还能查看到表信息;

    在这里插入图片描述

    • 右边需要启用全局变量,然后dinky会进行替换

    在这里插入图片描述

    • 点击执行配置,任务会提交到flink-standalone集群上,启用打印流,就能通过"select * from"获取最新数据,这样方便我们进行调试
      在这里插入图片描述

    在这里插入图片描述

    • 在运维中心可以查看相关信息,也可以停止job

    在这里插入图片描述

    在这里插入图片描述

-- 添加mysql-cdc的jar包
add jar '/opt/apps/bak_jars/flink-sql-connector-mysql-cdc-2.3.0.jar';-- 读取mysql的cdc数据
drop table if exists ums_member_cdc;
CREATE TABLE IF NOT EXISTS ums_member_cdc (`id` BIGINT NOT NULL,`username` STRING,`phone` STRING,`user_status` INT,`create_time` TIMESTAMP,`gender` INT,`birthday` DATE,`province` STRING,`city` STRING,`job` STRING,`source_type` INT,PRIMARY KEY ( `id` ) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc',${Centos04-Mysql},'scan.incremental.snapshot.enabled' = 'true','debezium.snapshot.mode'='latest-offset','database-name' = 'mall','table-name' = 'ums_member'
);select * from ums_member_cdc;
  • 我们现在需要将ums_member_cdc同步到hbase中,下面给出代码
-- 添加mysql-cdc的jar包
add jar '/opt/apps/bak_jars/flink-sql-connector-mysql-cdc-2.3.0.jar';-- 读取mysql的cdc数据
drop table if exists ums_member_cdc;
CREATE TABLE IF NOT EXISTS ums_member_cdc (`id` BIGINT NOT NULL,`username` STRING,`phone` STRING,`user_status` INT,`create_time` TIMESTAMP,`gender` INT,`birthday` DATE,`province` STRING,`city` STRING,`job` STRING,`source_type` INT,PRIMARY KEY ( `id` ) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc',${Centos04-Mysql},'scan.incremental.snapshot.enabled' = 'true','debezium.snapshot.mode'='latest-offset','database-name' = 'mall','table-name' = 'ums_member'
);-- 添加hbase相关的jar包
add jar '/opt/apps/bak_jars/flink-connector-hbase-base_2.12-1.14.4.jar'; 
add jar '/opt/apps/bak_jars/flink-connector-hbase-2.2_2.12-1.14.4.jar';
add jar '/opt/apps/hbase-2.2.5/lib/hbase-common-2.2.5.jar';
add jar '/opt/apps/hbase-2.2.5/lib/hbase-client-2.2.5.jar';
add jar '/opt/apps/hbase-2.2.5/lib/hbase-protocol-2.2.5.jar';
add jar '/opt/apps/hbase-2.2.5/lib/hbase-protocol-shaded-2.2.5.jar';
add jar '/opt/apps/hbase-2.2.5/lib/hbase-shaded-miscellaneous-2.2.1.jar';
add jar '/opt/apps/hbase-2.2.5/lib/hbase-shaded-netty-2.2.1.jar';
add jar '/opt/apps/hbase-2.2.5/lib/hbase-shaded-protobuf-2.2.1.jar';-- hbase(main):029:0> create 'dim_user_info', 'f1'
-- 创建hbase的映射表
create table if not exists dim_user_hbase(                   id STRING,                        f1 ROW<                                 id BIGINT,                           phone STRING,                        user_status INT,                          create_time TIMESTAMP(3),            gender INT,                          birthday DATE,                       province STRING,                     city STRING,                         job STRING,                          source_type INT>                     
) WITH(                                    'connector' = 'hbase-2.2',             'table-name' = 'dim_user_info',        'zookeeper.quorum' = 'centos01:2181'    
);-- 将Mysql-cdc的信息写入到hbase中
insert into dim_user_hbase
select cast(id as string) as id, row(id,phone,user_status,create_time,gender,birthday,province,city,job,source_type) as f1
from   ums_member_cdc;

注(相关表信息):

DROP TABLE IF EXISTS `ums_member`;
CREATE TABLE `ums_member` (`id` bigint(20) NOT NULL,`username` varchar(64) DEFAULT NULL,`phone` varchar(64) DEFAULT NULL,`user_status` int(11) DEFAULT NULL,`create_time` datetime DEFAULT NULL,`gender` int(11) DEFAULT NULL,`birthday` date DEFAULT NULL,`province` varchar(64) DEFAULT NULL,`city` varchar(64) DEFAULT NULL,`job` varchar(64) DEFAULT NULL,`source_type` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- ----------------------------
-- Records of ums_member
-- ----------------------------
INSERT INTO `ums_member` VALUES ('1', 'tom', '18616350000', '0', '2023-04-06 01:00:00', '0', '1995-02-18', '浙江省', '杭州市', '运营', '1');
INSERT INTO `ums_member` VALUES ('2', 'hank', '18616350001', '0', '2023-04-06 01:00:00', '1', '1995-03-18', '浙江省', '宁波市', '运营', '1');

2.3 mock维表数据

[root@centos01 dlink-0.7.3]# hbase shellhbase(main):001:0> create 'dim_page_info', 'f'put 'dim_page_info', '/mall/', 'f:pt', '商品详情页'
put 'dim_page_info', '/mall/', 'f:sv', '商城服务'
put 'dim_page_info', '/mall/search/', 'f:pt', '搜索结果页'
put 'dim_page_info', '/mall/search/', 'f:sv', '搜索服务'
put 'dim_page_info', '/mall/promotion/', 'f:pt', '活动页'
put 'dim_page_info', '/mall/promotion/', 'f:sv', '商城服务'hbase(main):001:0> list
TABLE
dim_geo_area
dim_page_info
dim_user_info
3 row(s)# 这里dim_geo_area是将一些全国一些地方典型经纬度(百度坐标系)的转换为geoHash编码
# 后面,将用户经纬度同样转换为geoHash编码,关联用户所在的省、市、区\县
# 可以自己模拟几条数据
hbase(main):003:0> get 'dim_geo_area', 'w7w3j', {FORMATTER=>'toString'}
COLUMN                               CELL   
f:p                                     timestamp=1736131692022, value=海南省 
f:c                                     timestamp=1736131692022, value=海口市             
f:r                                     timestamp=1736131692022, value=秀英区                      
1 row(s)
Took 0.0084 seconds

2.4 维表join

  • 下面所示geohash所用的jar包,我们用来定义udf函数

    • 创建Java作业,udf模板选择java_udf

      在这里插入图片描述

    • 然后,修改为下面的代码

    package com.yyds.udf;import org.apache.flink.table.functions.ScalarFunction;
    // 注意:这里使用到的jar包,已经手动放在了flink安装目录下的lib下
    // 同时,也同步到dinky的/opt/apps/dlink-0.7.3/plugins/flink1.14目录中
    // 参考:Dinky的下载及安装
    import ch.hsr.geohash.GeoHash;public class GetGeoHash extends ScalarFunction {// 接受一个gps座标,返回它的geohash码public String eval(Double lng, Double lat){String geohash = null;try{geohash = GeoHash.geoHashStringWithCharacterPrecision(lat,lng,5);} catch (Exception e){}return geohash;}
    }
    
    • geohash算法使用了下面的jar包
<dependency><groupId>ch.hsr</groupId><artifactId>geohash</artifactId><version>1.4.0</version>
</dependency>
  • 维表join的代码如下:
-- 设置相关参数
set 'execution.checkpointing.interval'= '10000';
set 'state.checkpoints.dir' = 'hdfs://centos01:8020/flink_ck/dwd_sink2kafka';-- 注册udf函数
create function if not exists GetGeoHash as 'com.yyds.udf.GetGeoHash';-- 添加kafka相关jar包
add jar '/opt/apps/bak_jars/flink-connector-kafka_2.12-1.14.4.jar';
add jar '/opt/apps/bak_jars/kafka-clients-2.6.2.jar';
-- 添加hbase相关的jar包
add jar '/opt/apps/bak_jars/flink-connector-hbase-2.2_2.12-1.14.4.jar';
add jar '/opt/apps/bak_jars/flink-connector-hbase-base_2.12-1.14.4.jar';
add jar '/opt/apps/hbase-2.2.5/lib/hbase-common-2.2.5.jar';
add jar '/opt/apps/hbase-2.2.5/lib/hbase-client-2.2.5.jar';
add jar '/opt/apps/hbase-2.2.5/lib/hbase-protocol-2.2.5.jar';
add jar '/opt/apps/hbase-2.2.5/lib/hbase-protocol-shaded-2.2.5.jar';
add jar '/opt/apps/hbase-2.2.5/lib/hbase-shaded-miscellaneous-2.2.1.jar';
add jar '/opt/apps/hbase-2.2.5/lib/hbase-shaded-netty-2.2.1.jar';
add jar '/opt/apps/hbase-2.2.5/lib/hbase-shaded-protobuf-2.2.1.jar';-- 1、获取kafka主表
-- 创建topic
-- /opt/apps/kafka_2.12-2.6.2/bin/kafka-topics.sh --create --replication-factor 1 --partitions 1 --bootstrap-server centos01:9092 --topic mall_logs
-- 启动生产者
-- /opt/apps/kafka_2.12-2.6.2/bin/kafka-console-producer.sh --bootstrap-server centos01:9092 --topic mall_logsdrop table if exists mall_logs_kafka_source;
CREATE TABLE if not exists mall_logs_kafka_source (`carrier`           STRING,`deviceId`          STRING,`deviceType`        STRING,`eventId`           STRING,`id`                BIGINT,`isNew`             INT,`latitude`          DOUBLE,`longitude`         DOUBLE,`netType`           STRING,`osName`            STRING,`osVersion`         STRING,`properties`        map<string,string> ,`releaseChannel`    STRING,`resolution`        STRING,`sessionId`         STRING,`timestamp`         bigint,proc_time   AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'mall_logs','properties.bootstrap.servers' = '192.168.42.101:9092','properties.group.id' = 'flink_group', 'format' = 'json','json.fail-on-missing-field' = 'false', 'scan.startup.mode' = 'earliest-offset','value.fields-include' = 'EXCEPT_KEY'
);-- 2、从hbase中获取地域信息维表、用户信息维表
-- 地域信息维表
drop table if exists dim_geo_area_hbase;
create table if not exists dim_geo_area_hbase(                   geohash STRING,                        f ROW<   p STRING, c STRING, r STRING                              >                     
) WITH(                                    'connector' = 'hbase-2.2',             'table-name' = 'dim_geo_area',        'zookeeper.quorum' = 'centos01:2181'
);-- 用户信息维表
drop table if exists dim_user_info_hbase;
create table if not exists dim_user_info_hbase(                   id STRING,                        f1 ROW<                                 id BIGINT,                           phone STRING,                        user_status INT,                          create_time TIMESTAMP(3),            gender INT,                          birthday STRING,                       province STRING,                     city STRING,                         job STRING,                          source_type INT>                     
) WITH(                                    'connector' = 'hbase-2.2',             'table-name' = 'dim_user_info',        'zookeeper.quorum' = 'centos01:2181'    
);-- 页面信息维表
drop table if exists dim_page_info_hbase;
create table if not exists dim_page_info_hbase(  url_prefix STRING,                      f  ROW<                                 sv STRING,                           pt STRING>                          ) WITH(                                    'connector' = 'hbase-2.2',             'table-name' = 'dim_page_info',        'zookeeper.quorum' = 'centos01:2181'    );-- 3、使用lookup join进行维表关联
CREATE TEMPORARY VIEW tmp_wide_view AS 
SELECT e.id             as user_id,e.isNew          as is_new,e.sessionId      as session_id,e.eventId        as event_id,e.`timestamp`    as action_time,e.longitude      as longitude,e.latitude       as latitude,GetGeoHash(e.longitude, e.latitude) as geohash,e.releaseChannel as release_channel,e.deviceType     as device_type,e.properties,e.netType        as net_type,e.osName         as os_name,-- 用户信息u.f1.phone       as register_phone,u.f1.user_status as user_status,u.f1.create_time as register_time,u.f1.gender      as register_gender,u.f1.birthday    as register_birthday,u.f1.province    as register_province,u.f1.city        as register_city,u.f1.job         as register_job,u.f1.source_type as register_source_type,-- 地域信息g.f.p            as gps_province,g.f.c            as gps_city,g.f.r            as gps_region,-- 页面信息p.f.pt           as page_type,p.f.sv           as page_service
FROM mall_logs_kafka_source AS e
LEFT JOIN dim_user_info_hbase     FOR SYSTEM_TIME AS OF e.proc_time AS u
ON cast(e.id as string) = cast(u.id as string)
LEFT JOIN dim_geo_area_hbase      FOR SYSTEM_TIME AS OF e.proc_time AS g 
ON GetGeoHash(e.longitude, e.latitude) = g.geohash
LEFT JOIN dim_page_info_hbase     FOR SYSTEM_TIME AS OF e.proc_time AS p 
ON regexp_extract(e.properties['url'], '^(.*/).*?') = p.url_prefix;-- 4、将宽表sink到kafka
-- 创建topic: /opt/apps/kafka_2.12-2.6.2/bin/kafka-topics.sh --create --replication-factor 1 --partitions 1 --bootstrap-server centos01:9092 --topic dwd_user_details
drop table if exists dwd_user_details_kafka;
CREATE TABLE if not exists dwd_user_details_kafka (user_id           BIGINT,is_new            INT,session_id        STRING,event_id          STRING,action_time       BIGINT,longitude         DOUBLE,latitude          DOUBLE,geohash           STRING,release_channel   STRING,device_type       STRING,properties        MAP<STRING, STRING>,net_type          STRING,       os_name           STRING,register_phone    STRING,user_status       INT,register_time     TIMESTAMP(3),register_gender   INT,register_birthday STRING,register_province STRING,register_city     STRING,register_job      STRING,register_source_type INT,gps_province      STRING,gps_city          STRING,gps_region        STRING,page_type         STRING,page_service      STRING
) WITH ('connector' = 'kafka','topic' = 'dwd_user_details','properties.bootstrap.servers' = 'centos01:9092','properties.group.id' = 'testGroup2','scan.startup.mode' = 'earliest-offset','value.format' = 'json','value.json.fail-on-missing-field' = 'false',   -- 正确命名空间下的配置项'value.json.ignore-parse-errors' = 'true'      -- 正确命名空间下的配置项
);-- 插入到kafka
insert into dwd_user_details_kafka select * from tmp_wide_view;

注(几条kafka样例数据):

{"carrier":"中国电信","deviceId":"fae6233e-f973-42bb-abc6-8d39cbd56074","deviceType":"MI-NOTE","eventId":"app_launch","id":1,"isNew":0,"lastUpdate":2023,"latitude":29.30644391183944,"longitude":120.06729564866686,"netType":"WIFI","osName":"android","osVersion":"8.5","releaseChannel":"小米游戏中心","resolution":"1024*768","sessionId":"HvBGPAxJ3Bmi","timestamp":1618020104340}                                    
{"carrier":"中国电信","deviceId":"fae6233e-f973-42bb-abc6-8d39cbd56074","deviceType":"MI-NOTE","eventId":"mall_click","id":1,"isNew":0,"lastUpdate":2023,"latitude":29.30644391183944,"longitude":120.06729564866686,"netType":"WIFI","osName":"android","osVersion":"8.5","properties":{"url":"/mall/item-2.html","item_id":2},"releaseChannel":"小米游戏中心","resolution":"1024*768","sessionId":"HvBGPAxJ3Bmi","timestamp":1618020106190}
{"carrier":"中国电信","deviceId":"fae6233e-f973-42bb-abc6-8d39cbd56074","deviceType":"MI-NOTE","eventId":"mall_click","id":1,"isNew":0,"lastUpdate":2023,"latitude":29.30644391183944,"longitude":120.06729564866686,"netType":"WIFI","osName":"android","osVersion":"8.5","properties":{"url":"/mall/item-666.html","item_id":666},"releaseChannel":"小米游戏中心","resolution":"1024*768","sessionId":"HvBGPAxJ3Bmi","timestamp":1618020106190}
{"carrier":"中国移动","deviceId":"863b13cc-23d5-4b22-8c41-8af6f2bc2436","deviceType":"LEPHONE-6","eventId":"app_launch","id":2,"isNew":0,"lastUpdate":2021,"latitude":19.88434436079741,"longitude":110.26320040618856,"netType":"5G","osName":"android","osVersion":"8.5","releaseChannel":"优亿市场","resolution":"2048*1024","sessionId":"VrRAxksJfobh","timestamp":1618020104396}

2.5 kafka2doris

  • 冗余一份数据到doris,注意:这里同样注册了Udf函数
  • 这里的数据源已经在mysql-catalog中,因此可以直接使用
-- 设置相关参数
set 'execution.checkpointing.interval'= '10000';
set 'state.checkpoints.dir' = 'hdfs://centos01:8020/flink_ck/dwd_kafka2doris';-- 注册udf函数
create function if not exists Map2JsonStr as 'com.yyds.udf.Map2JsonStr';-- 添加kafka相关jar包
add jar '/opt/apps/bak_jars/flink-connector-kafka_2.12-1.14.4.jar';
add jar '/opt/apps/bak_jars/kafka-clients-2.6.2.jar';
-- 添加doris相关的jar包
add jar '/opt/apps/flink-1.14.4/lib/flink-doris-connector-1.14_2.12-1.1.1.jar';drop table if exists dwd_user_details_doris;
CREATE TABLE if not exists dwd_user_details_doris (gps_province         VARCHAR(16),gps_city             VARCHAR(16),gps_region           VARCHAR(16),dt                   DATE,user_id              BIGINT,is_new               INT,session_id           VARCHAR(20),event_id             VARCHAR(10),action_time          BIGINT,longitude            DOUBLE,latitude             DOUBLE,release_channel      VARCHAR(20),device_type          VARCHAR(20),properties           VARCHAR(80),  -- 注意:Doris中不支持Map类型net_type             VARCHAR(10),os_name              VARCHAR(10),register_phone       VARCHAR(20),user_status          INT,register_time        TIMESTAMP(3),register_gender      INT,register_birthday    VARCHAR(20),register_province    VARCHAR(20),register_city        VARCHAR(20),register_job         VARCHAR(20),register_source_type INT,page_type            VARCHAR(20),page_service         VARCHAR(20)
) WITH ('connector' = 'doris','fenodes' = 'centos01:8030','table.identifier' = 'doris_database.dwd_user_details','username' = 'root','password' = '123456','sink.label-prefix' = 'doris_sink_label-963'
);-- 从kafka插入到doris中,即冗余一份到doris
INSERT INTO dwd_user_details_doris
SELECT gps_province,gps_city,gps_region,TO_DATE(DATE_FORMAT(TO_TIMESTAMP_LTZ(action_time, 3), 'yyyy-MM-dd')) AS dt,user_id,is_new,session_id,event_id,action_time,longitude,latitude,release_channel,device_type,Map2JsonStr(properties) AS properties, -- 注意:确保Map2JsonStr是已注册的UDF函数net_type,os_name,register_phone,user_status,register_time,register_gender,cast(register_birthday as string) as register_birthday,register_province,register_city,register_job,register_source_type,page_type,page_service
FROM dwd_user_details_kafka;

注:

  • doris结果展示

在这里插入图片描述

  • udf函数的定义
package com.yyds.udf;import org.apache.flink.table.functions.ScalarFunction;
// 同样用到外部jar包
import com.alibaba.fastjson.JSON;import java.util.Map;public class Map2JsonStr extends ScalarFunction {public String eval(Map<String,String> properties) {return  JSON.toJSONString(properties);}
}
  • doris结果表的建表语句
CREATE TABLE IF NOT EXISTS dwd_user_details (gps_province         VARCHAR(16),gps_city             VARCHAR(16),gps_region           VARCHAR(16),dt                   DATE,user_id              BIGINT,is_new               INT,session_id           VARCHAR(20),event_id             VARCHAR(10),action_time          BIGINT,longitude            DOUBLE,latitude             DOUBLE,release_channel      VARCHAR(20),device_type          VARCHAR(20),properties           VARCHAR(80),  -- 注意:Doris中不支持Map类型net_type             VARCHAR(10),os_name              VARCHAR(10),register_phone       VARCHAR(20),user_status          INT,register_time        DATETIME,     -- Doris 不直接支持 TIMESTAMP(3),使用DATETIME代替register_gender      INT,register_birthday    VARCHAR(20),register_province    VARCHAR(20),register_city        VARCHAR(20),register_job         VARCHAR(20),register_source_type INT,page_type            VARCHAR(20),page_service         VARCHAR(20)
) ENGINE=OLAP
DUPLICATE KEY(gps_province, gps_city, gps_region, dt, user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
PROPERTIES ("replication_num" = "1","in_memory" = "false","storage_format" = "DEFAULT"
);
  • 平台更多用法,如:报警组配置、CDCSOURCE的整库同步、flink jar包的运行、flink on yarn运行等,请参考官方文档,不再赘述。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/66717.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

TANGO - 数字人全身动作生成

文章目录 一、关于 TANGO演示视频&#xff08;YouTube&#xff09;&#x1f4dd;发布计划 二、⚒️安装克隆存储库构建环境 三、&#x1f680;训练和推理1、推理2、为自定义字符创建图形 一、关于 TANGO TANGO 是 具有分层音频运动嵌入 和 扩散插值的共语音手势视频再现 由东…

1月9日星期四今日早报简报微语报早读

1月9日星期四&#xff0c;农历腊月初十&#xff0c;早报#微语早读。 1、上海排查47家“俄罗斯商品馆”&#xff1a;个别店铺被责令停业&#xff0c;立案调查&#xff1b; 2、西藏定日县已转移受灾群众4.65万人&#xff0c;检测到余震646次&#xff1b; 3、国家发改委&#x…

Zemax 序列模式下的扩束器

扩束器结构原理 扩束器用于增加准直光束&#xff08;例如激光束&#xff09;的直径&#xff0c;同时保持其准直。它通常用于激光光学和其他需要修改光束大小或发散度的应用。 在典型的扩束器中&#xff0c;输入光束是准直激光器&#xff0c;或光束进入第一个光学元件。当光束开…

【TI毫米波雷达】DCA1000不使用mmWave Studio的数据采集方法,以及自动化实时数据采集

【TI毫米波雷达】DCA1000不使用mmWave Studio的数据采集方法&#xff0c;以及自动化实时数据采集 mmWave Studio提供的功能完全够用了 不用去纠结用DCA1000低延迟、无GUI传数据 速度最快又保证算力无非就是就是Linux板自己写驱动做串口和UDP 做雷达产品应用也不会采用DCA1000的…

Kubernetes Gateway API-4-TCPRoute和GRPCRoute

1 TCPRoute 目前 TCP routing 还处于实验阶段。 Gateway API 被设计为与多个协议一起工作&#xff0c;TCPRoute 就是这样一个允许管理TCP流量的路由。 在这个例子中&#xff0c;我们有一个 Gateway 资源和两个 TCPRoute 资源&#xff0c;它们按照以下规则分配流量&#xff1…

提升决策支持:五大报表软件功能全面评测

本文将为大家介绍五款功能强大的报表软件&#xff0c;包括山海鲸报表、JReport、Power BI、Zoho Analytics 和 SAP Crystal Reports。这些工具各具特色&#xff0c;能够帮助企业快速生成数据报表并进行深度分析。无论是数据可视化、报表定制、自动化生成还是与其他系统的集成&a…

Backend - C# EF Core 执行迁移 Migrate

目录 一、创建Postgre数据库 二、安装包 &#xff08;一&#xff09;查看是否存在该安装包 &#xff08;二&#xff09;安装所需包 三、执行迁移命令 1. 作用 2. 操作位置 3. 执行&#xff08;针对visual studio&#xff09; 查看迁移功能的常用命令&#xff1a; 查看…

GESP202312 四级【小杨的字典】题解(AC)

》》》点我查看「视频」详解》》》 [GESP202312 四级] 小杨的字典 题目描述 在遥远的星球&#xff0c;有两个国家 A 国和 B 国&#xff0c;他们使用着不同的语言&#xff1a;A 语言和 B 语言。小杨是 B 国的翻译官&#xff0c;他的工作是将 A 语言的文章翻译成 B 语言的文章…

【软考网工笔记】计算机基础理论与安全——网络规划与设计

HFC 混合光纤同轴电缆网 HFC: Hybrid Fiber - Coaxial 的缩写&#xff0c;即混合光纤同轴电缆网。是一种经济实用的综合数字服务宽带网接入技术。 HFC 通常由光纤干线、同轴电缆支线和用户配线网络三部分组成&#xff0c;从有线电视台出来的节目信号先变成光信号在干线上传输…

KubeVirt 进阶:设置超卖比、CPU/MEM 升降配、在线磁盘扩容

前两篇文章&#xff0c;我们分别介绍 Kubevirt 的安装、基本使用 以及 将 oVirt 虚拟机迁移到 KubeVirt&#xff0c;我们留了两个ToDo&#xff0c;一个是本地磁盘的动态分配&#xff0c;一个是固定 IP 的需求&#xff0c;本期我们先解决第一个&#xff0c;本地磁盘的动态分配。…

自动化脚本本地可执行但是Jenkins上各种报错怎么解决

作者碎碎念&#xff1a; 测试环境 Jenkinsdockerpythonunittest&#xff0c; 测试问题&#xff1a;本人在写关于SAP4Me网站的自动化脚本时遇到一个问题 本地怎么都跑的通 但是一上Jenkins会出现各种各样的问题 因为在Jenkins里面脚本是放在docker环境里面跑的 所以环境的差异…

TaskBuilder前端组件简介

3.3.3.1前端组件的分类 前端页面是由众多组件层层嵌套构成的&#xff0c;这些组件是任讯信息自主研发的一套前端UI组件&#xff0c;称为tfp组件&#xff0c;这些组件根据其功能和特点又分为几大类&#xff0c;它们的继承关系如下图所示&#xff1a; 从图中可知&#xff0c;所…

linux下shell中使用上下键翻出历史命名时出现^[[A^[[A^[[A^[[B^[[B的问题解决

前言 今天在使用linux的时候&#xff0c;使用上下键想翻出历史命令时&#xff0c;却出现[[A[[A[[A[[B^[[B这种东东&#xff0c;而tab键补全命令的功能也无法使用。最终发现是由于当前用户使用的shell是/bin/sh的原因。 解决方法 运行以下命令&#xff0c;将默认 shell 设置为…

FPGA的 基本结构(Xilinx 公司Virtex-II 系列FPGA )

以Xilinx 公司Virtex-II 系列FPGA 为例&#xff0c;其基本结构由下图所示。它是主要由两大部分组成&#xff1a;可编程输入/输出&#xff08;Programmable I/Os&#xff09;部分和内部可配置&#xff08;Configurable Logic&#xff09;部分。 可编程输入/输出&#xff08;I/Os…

《零基础Go语言算法实战》【题目 1-11】格式化字符串

《零基础Go语言算法实战》 【题目 1-11】格式化字符串 在 Go 语言中&#xff0c;找到使用变量格式化字符串而不打印值的简单方法。 【解答】 在 Go 语言中&#xff0c;在不打印值的情况下进行格式化的最简单方法是使用 fmt.Sprintf() 函数&#xff0c; 它返回一个格式化的…

Virtualbox7.1.4安装Proxmox

准备工作 有以下两个大步骤&#xff1a; 第一步、安装Virtualbox 最新版本安装过程遇到问题参考&#xff1a; VirtualBox新版本报错 Invalid installation directory解决方案_virtualbox invalid installation directory-CSDN博客 第二步、下载Proxmox 官网下载地址&…

Apache JMeter 压力测试使用说明

文章目录 一、 安装步骤步骤一 下载相关的包步骤二 安装 Jmeter步骤三 设置 Jmeter 工具语言类型为中文 二、使用工具2.1 创建测试任务步骤一 创建线程组步骤二 创建 HTTP 请求 2.2 配置 HTTP 默认参数添加 HTTP消息头管理器HTTP请求默认值 2.3 添加 查看结果监听器2.4 查看结果…

yum换源

背景描述 源&#xff1a;阿里云&#xff1a; 系统&#xff1a;centos7 https://developer.aliyun.com/mirror/其他参考地址&#xff1a; https://developer.aliyun.com/mirror/centos?spma2c6h.13651102.0.0.3e221b11UchtP5https://developer.aliyun.com/mirror/centos-vau…

《跟我学Spring Boot开发》系列文章索引❤(2025.01.09更新)

章节文章名备注第1节Spring Boot&#xff08;1&#xff09;基于Eclipse搭建Spring Boot开发环境环境搭建第2节Spring Boot&#xff08;2&#xff09;解决Maven下载依赖缓慢的问题给火车头提提速第3节Spring Boot&#xff08;3&#xff09;教你手工搭建Spring Boot项目纯手工玩法…

AIDD-人工智能药物设计-人工智能破解酶稳定性定向进化中的多个突变位点高效重组问题

mLife | 人工智能破解酶稳定性定向进化中的多个突变位点高效重组问题 优化酶的热稳定性对于蛋白质科学和工业应用至关重要。目前&#xff0c;通过&#xff08;半&#xff09;理性设计和随机诱变方法可以较为准确地设计多个增强酶热稳定性的单点突变。但当组合多个突变时&#…