物流实时数仓:数仓搭建(ODS)

系列文章目录

物流实时数仓:采集通道搭建
物流实时数仓:数仓搭建


文章目录

  • 系列文章目录
  • 前言
  • 一、IDEA环境准备
    • 1.pom.xml
    • 2.目录创建
  • 二、代码编写
    • 1.log4j.properties
    • 2.CreateEnvUtil.java
    • 3.KafkaUtil.java
    • 4.OdsApp.java
  • 三、代码测试
  • 总结


前言

现在我们开始进行数仓的搭建,我们用Kafka来代替数仓的ods层。
基本流程为使用Flink从MySQL读取数据然后写入Kafka中


一、IDEA环境准备

1.pom.xml

写入项目需要的配置

<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>1.8</java.version><flink.version>1.17.0</flink.version><hadoop.version>3.2.3</hadoop.version><flink-cdc.version>2.3.0</flink-cdc.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.68</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-reload4j</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink-cdc.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency></dependencies>

基本上项目需要的所有jar包都有了,不够以后在加。

2.目录创建

在这里插入图片描述按照以上目录结构进行目录创建

二、代码编写

1.log4j.properties

log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

2.CreateEnvUtil.java

这个文件中有两个方法
创建初始化Flink的env
Flink连接mysql的MySqlSource

package com.atguigu.tms.realtime.utils;import com.esotericsoftware.minlog.Log;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.kafka.connect.json.JsonConverterConfig;import java.util.HashMap;public class CreateEnvUtil {public static StreamExecutionEnvironment getStreamEnv(String[] args) {// 1.1 指定流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.检查点相关设置// 2.1 开启检查点env.enableCheckpointing(6000L, CheckpointingMode.EXACTLY_ONCE);// 2.2 设置检查点的超时时间env.getCheckpointConfig().setCheckpointTimeout(120000L);// 2.3 设置job取消之后 检查点是否保留env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2.4 设置两个检查点之间的最小时间间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000L);// 2.5 设置重启策略env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.seconds(3)));// 2.6 设置状态后端env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/tms/ck");// 2.7 设置操作hdfs用户// 获取命令行参数ParameterTool parameterTool = ParameterTool.fromArgs(args);String hdfsUserName = parameterTool.get("hadoop-user-name", "atguigu");System.setProperty("HADOOP_USER_NAME", hdfsUserName);return env;}public static MySqlSource<String> getMysqlSource(String option, String serverId, String[] args) {ParameterTool parameterTool = ParameterTool.fromArgs(args);String mysqlHostname = parameterTool.get("hadoop-user-name", "hadoop102");int mysqlPort = Integer.parseInt(parameterTool.get("mysql-port", "3306"));String mysqlUsername = parameterTool.get("mysql-username", "root");String mysqlPasswd = parameterTool.get("mysql-passwd", "000000");option = parameterTool.get("start-up-option", option);serverId = parameterTool.get("server-id", serverId);// 创建配置信息 Map 集合,将 Decimal 数据类型的解析格式配置 k-v 置于其中HashMap config = new HashMap<>();config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());// 将前述 Map 集合中的配置信息传递给 JSON 解析 Schema,该 Schema 将用于 MysqlSource 的初始化JsonDebeziumDeserializationSchema jsonDebeziumDeserializationSchema =new JsonDebeziumDeserializationSchema(false, config);MySqlSourceBuilder<String> builder = MySqlSource.<String>builder().hostname(mysqlHostname).port(mysqlPort).username(mysqlUsername).password(mysqlPasswd).deserializer(jsonDebeziumDeserializationSchema);switch (option) {// 读取实时数据case "dwd":String[] dwdTables = new String[]{"tms.order_info","tms.order_cargo","tms.transport_task","tms.order_org_bound"};return builder.databaseList("tms").tableList(dwdTables).startupOptions(StartupOptions.latest()).serverId(serverId).build();// 读取维度数据case "realtime_dim":String[] realtimeDimTables = new String[]{"tms.user_info","tms.user_address","tms.base_complex","tms.base_dic","tms.base_region_info","tms.base_organ","tms.express_courier","tms.express_courier_complex","tms.employee_info","tms.line_base_shift","tms.line_base_info","tms.truck_driver","tms.truck_info","tms.truck_model","tms.truck_team"};return builder.databaseList("tms").tableList(realtimeDimTables).startupOptions(StartupOptions.initial()).serverId(serverId).build();}Log.error("不支持操作类型");return null;}
}

3.KafkaUtil.java

该文件中有一个方法,创建Flink连接Kafka需要的Sink

package com.atguigu.tms.realtime.utils;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.kafka.clients.producer.ProducerConfig;public class KafkaUtil {private static final String KAFKA_SERVER = "hadoop102:9092,hadoop103:9092,hadoop104:9092";public static KafkaSink<String> getKafkaSink(String topic, String transIdPrefix, String[] args) {// 将命令行参数对象封装为 ParameterTool 类对象ParameterTool parameterTool = ParameterTool.fromArgs(args);// 提取命令行传入的 key 为 topic 的配置信息,并将默认值指定为方法参数 topic// 当命令行没有指定 topic 时,会采用默认值topic = parameterTool.get("topic", topic);// 如果命令行没有指定主题名称且默认值为 null 则抛出异常if (topic == null) {throw new IllegalArgumentException("主题名不可为空:命令行传参为空且没有默认值!");}// 获取命令行传入的 key 为 bootstrap-servers 的配置信息,并指定默认值String bootstrapServers = parameterTool.get("bootstrap-severs", KAFKA_SERVER);// 获取命令行传入的 key 为 transaction-timeout 的配置信息,并指定默认值String transactionTimeout = parameterTool.get("transaction-timeout", 15 * 60 * 1000 + "");return KafkaSink.<String>builder().setBootstrapServers(bootstrapServers).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(topic).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).setTransactionalIdPrefix(transIdPrefix).setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout).build();}public static KafkaSink<String> getKafkaSink(String topic, String[] args) {return getKafkaSink(topic, topic + "_trans", args);}
}

4.OdsApp.java

Ods层的app创建,负责读取和写入数据

package com.atguigu.tms.realtime.app.ods;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import com.esotericsoftware.minlog.Log;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;public class OdsApp {public static void main(String[] args) throws Exception {// 1.获取流处理环境并指定检查点StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// 2 使用FlinkCDC从MySQL中读取数据-事实数据String dwdOption = "dwd";String dwdServerId = "6030";String dwdsourceName = "ods_app_dwd_source";mysqlToKafka(dwdOption, dwdServerId, dwdsourceName, env, args);// 3 使用FlinkCDC从MySQL中读取数据-维度数据String realtimeDimOption = "realtime_dim";String realtimeDimServerId = "6040";String realtimeDimsourceName = "ods_app_realtimeDim_source";mysqlToKafka(realtimeDimOption, realtimeDimServerId, realtimeDimsourceName, env, args);env.execute();}public static void mysqlToKafka(String option, String serverId, String sourceName, StreamExecutionEnvironment env, String[] args) {MySqlSource<String> MySqlSource = CreateEnvUtil.getMysqlSource(option, serverId, args);SingleOutputStreamOperator<String> dwdStrDS = env.fromSource(MySqlSource, WatermarkStrategy.noWatermarks(), sourceName).setParallelism(1).uid(option + sourceName);// 3 简单ETLSingleOutputStreamOperator<String> processDS = dwdStrDS.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String jsonStr, ProcessFunction<String, String>.Context ctx, Collector<String> out) {try {JSONObject jsonObj = JSONObject.parseObject(jsonStr);if (jsonObj.getJSONObject("after") != null && !"d".equals(jsonObj.getString("op"))) {
//                                System.out.println(jsonObj);Long tsMs = jsonObj.getLong("ts_ms");jsonObj.put("ts", tsMs);jsonObj.remove("ts_ms");String jsonString = jsonObj.toJSONString();out.collect(jsonString);}} catch (Exception e) {Log.error("从Flink-CDC得到的数据不是一个标准的json格式",e);}}}).setParallelism(1);// 4 按照主键进行分组,避免出现乱序KeyedStream<String, String> keyedDS = processDS.keyBy((KeySelector<String, String>) jsonStr -> {JSONObject jsonObj = JSON.parseObject(jsonStr);return jsonObj.getJSONObject("after").getString("id");});//将数据写入KafkakeyedDS.sinkTo(KafkaUtil.getKafkaSink("tms_ods", sourceName + "_transPre", args)).uid(option + "_ods_app_sink");}
}

三、代码测试

在虚拟机启动我们需要的组件,目前需要hadoop、zk、kafka和MySQL。
在这里插入图片描述
先开一个消费者进行消费。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_ods

然后运行OdsApp.java
他会先读取维度数据,因为维度数据需要全量更新之前的数据。
在这里插入图片描述
当他消费结束后,我们运行jar包,获取事实数据。

java -jar tms-mock-2023-01-06.jar 

如果能消费到新数据,代表通道没问题,ODS层创建完成。

在这里插入图片描述


总结

至此ODS搭建完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/166491.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

当内容创作进入 AGI 时代,你也可以成为「神笔马良」

我神笔马良的童话故事我们或多或少都听过&#xff0c;一支神笔在手&#xff0c;想画什么就能画出什么&#xff0c;栩栩如生。创造者的理解力、想象力和创作力都能通过这支神笔释放。 近一年&#xff0c;随着 AIGC 内容生产工具的快速出圈&#xff0c;有人把 Stable Diffusion、…

Sublime Text 4168最新代码编辑

Sublime Text是一款功能强大的文本编辑器&#xff0c;具有以下主要功能&#xff1a; 支持多种编程语言的语法高亮和代码自动完成功能&#xff0c;包括Python、JavaScript、HTML、CSS等。提供代码片段&#xff08;Snippet&#xff09;功能&#xff0c;可以将常用的代码片段保存…

JSP EL 算数运算符逻辑运算符

除了 empty 我们这边还有一些基本的运算符 第一种 等等于 jsp代码如下 <% page contentType"text/html; charsetUTF-8" pageEncoding"UTF-8" %> <%request.setCharacterEncoding("UTF-8");%> <!DOCTYPE html> <html> …

JVM-基础

jdk7及以前&#xff1a; 通过-XX:PermSize 来设置永久代初始分配空间&#xff0c;默认值是20.75m -XX:MaxPermSize来设定永久代最大可分配空间&#xff0c;32位是64m&#xff0c;64位是82m jdk8及之后&#xff1a; 通过-XX:MetaspaceSize 来设置永久代初始分配空间&#xff…

概要设计文档案例分享

1引言 1.1编写目的 1.2项目背景 1.3参考资料 2系统总体设计 2.1整体架构 2.2整体功能架构 2.3整体技术架构 2.4运行环境设计 2.5设计目标 3系统功能模块设计 3.1个人办公 4性能设计 4.1响应时间 4.2并发用户数 5接口设计 5.1接口设计原则 5.2接口实现方式 6运行设计 6.1运行模块…

骨传导耳机的优缺点都有哪些?骨传导耳机值得入手吗?

骨传导耳机的优点还是很多的&#xff0c;相比于传统耳机&#xff0c;骨传导耳机要更值得入手&#xff01; 下面让我们了解下骨传导耳机的优缺点都有哪些&#xff1a; 一、优点 1、使用更安全 传统的耳机&#xff0c;在使用时会听不到外界的声音&#xff0c;而骨传导耳机通过…

“java.lang.IllegalStateException: No ConfigurableListableBeanFactory set“,缺少配置

一、错误分析 做品优购项目的运营商安全登录时&#xff0c;运行项目后&#xff0c;浏览器访问模板页&#xff0c;模板页的表格无法正常显示&#xff0c;报错信息如下&#xff1a; SEVERE: StandardWrapper.Throwable java.lang.IllegalStateException: No ConfigurableLista…

Linux 6.7全面改进x86 CPU微码加载方式

导读最近&#xff0c;社区在清理 Linux 上的 Intel/AMD x86 CPU 微代码加载方面做了大量的工作&#xff0c;这些工作现已合并到 Linux 6.7 中。 由于在启动时加载 CPU 微代码对于减少不断出现的新 CPU 安全漏洞以及有时解决功能问题非常重要&#xff0c;Thomas Gleixner 最近开…

C百题--7.输出乘法表

1.问题描述 输出9*9乘法表 2.解决思路 利用99乘法表行和列之间的关系&#xff0c;进行输出 注意&#xff1a;%-2d 2代表占两个字符&#xff1b;-代表左对齐 3.代码实现 #include<stdio.h> int main(){for(int i1;i<9;i){for(int j1;j<i;j){printf("%d*%d…

微信小程序埋点

使用如下代码封装一下&#xff0c;例如封装在log.js文件里面&#xff1a; var log wx.getRealtimeLogManager ? wx.getRealtimeLogManager() : nullmodule.exports {debug() {if (!log) returnlog.debug.apply(log, arguments)},info() {if (!log) returnlog.info.apply(l…

深入学习pytorch笔记

两个重要的函数 dir()&#xff1a; 一个内置函数&#xff0c;用于列出对象的所有属性和方法 help()&#xff1a;一个内置函数&#xff0c;用于获取关于Python对象、模块、函数、类等的详细信息 Dateset类 Dataset&#xff1a;pytorch中的一个类&#xff0c;开发者在训练和…

抖音电商品牌力不足咋办?如何升级或强开旗舰店、官方旗舰店?我们有妙招!

随着抖音电商的发展&#xff0c;越来越多的商家蜂拥而至&#xff0c;入驻经营抖音小店... 然而我们在开店的时候&#xff0c;选择开通官方旗舰店、旗舰店、专营店或专卖店&#xff0c;却被系统提示为你的商标品牌力不足&#xff0c;无法开通官方旗舰店、旗舰店、专营店、专卖店…

在 vscode 中的json文件写注释,不报错的解决办法

打开 vscode 的「设置」&#xff0c;搜索&#xff1a;files: associations&#xff0c;然后添加 *.json jsonc最后

Nginx 配置错误导致的漏洞

目录 1. CRLF注入漏洞 Bottle HTTP头注入漏洞 2.目录穿越漏洞 3. http add_header被覆盖 本篇要复现的漏洞实验有一个网站直接为我们提供了Docker的环境&#xff0c;我们只需要下载下来就可以使用&#xff1a; Docker环境的安装可以参考&#xff1a;Docker安装 漏洞环境的…

展现天津援疆工作成果 “团结村里看振兴”媒体采风团走进和田

央广网天津11月19日消息(记者周思杨)11月18日&#xff0c;由媒体记者、书法和摄影家、旅行社企业代表等40余人组成的“团结村里看振兴”媒体采风团走进新疆和田。在接下来的一周时间里&#xff0c;采风团将走访天津援疆和田地区策勒县、于田县、民丰县乡村振兴示范村&#xff0…

HTML CSS登录网页设计

一、效果图: 二、HTML代码: <!DOCTYPE html> <!-- 定义HTML5文档 --> <html lang="en"> …

在全球碳市场中崭露头角的中碳CCNG

在全球气候治理的大背景下&#xff0c;中国碳中和发展集团有限公司&#xff08;简称中国碳中和&#xff09;正在成为全球碳交易市场的一个重要参与者。随着国际社会对碳排放的日益关注&#xff0c;中国碳中和凭借其在碳资产开发、咨询与管理等领域的深厚积累&#xff0c;正成为…

视频剪辑新招:批量随机分割,分享精彩瞬间

随着社交媒体的普及&#xff0c;短视频已经成为分享生活、交流信息的重要方式。为制作出吸引的短视频&#xff0c;许多创作者都投入了大量的时间和精力进行剪辑。然而&#xff0c;对于一些没有剪辑经验的新手来说&#xff0c;这个过程可能会非常繁琐。现在一起来看云炫AI智剪批…

杨传辉:从一体化架构,到一体化产品,为关键业务负载打造一体化数据库

在刚刚结束的年度发布会上&#xff0c;OceanBase正式推出一体化数据库的首个长期支持版本 4.2.1 LTS&#xff0c;这是面向 OLTP 核心场景的全功能里程碑版本&#xff0c;相比上一个 3.2.4 LTS 版本&#xff0c;新版本能力全面提升&#xff0c;适应场景更加丰富&#xff0c;有更…

web前端之若依框架图标对照表、node获取文件夹中的文件名,并通过数组返回文件名、在html文件中引入.svg文件、require、icon

MENU 前言效果图htmlJavaScripstylenode获取文件夹中的文件名 前言 需要把若依原有的icon的svg文件拿到哦&#xff01; 注意看生成svg的路径。 效果图 html <div id"idSvg" class"svg_box"></div>JavaScrip let listSvg [404, bug, build, …