流式ETL配置指南:从MySQL到Elasticsearch的实时数据同步

流式ETL配置指南:从MySQL到Elasticsearch的实时数据同步

场景介绍

假设您运营一个电商平台,需要将MySQL数据库中的订单、用户和产品信息实时同步到Elasticsearch,以支持实时搜索、分析和仪表盘展示。传统的批处理ETL无法满足实时性要求,因此我们将使用Flink CDC构建流式ETL管道。

前提条件

  1. MySQL数据库 (作为数据源)
  2. Elasticsearch (作为目标系统)
  3. Flink环境 (处理引擎)
  4. Java开发环境

步骤一:环境准备

1.1 准备MySQL环境

-- 创建数据库
CREATE DATABASE IF NOT EXISTS shop;
USE shop;-- 创建用户表
CREATE TABLE users (id INT PRIMARY KEY,name VARCHAR(100),email VARCHAR(100),create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);-- 创建产品表
CREATE TABLE products (id INT PRIMARY KEY,name VARCHAR(200),price DECIMAL(10,2),stock INT,category VARCHAR(100)
);-- 创建订单表
CREATE TABLE orders (id INT PRIMARY KEY,user_id INT,order_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,status VARCHAR(20),total_amount DECIMAL(10,2),FOREIGN KEY (user_id) REFERENCES users(id)
);-- 创建订单详情表
CREATE TABLE order_items (id INT PRIMARY KEY,order_id INT,product_id INT,quantity INT,price DECIMAL(10,2),FOREIGN KEY (order_id) REFERENCES orders(id),FOREIGN KEY (product_id) REFERENCES products(id)
);-- 插入一些测试数据
INSERT INTO users VALUES (1, '张三', 'zhangsan@example.com', '2023-01-01 10:00:00');
INSERT INTO products VALUES (101, 'iPhone 14', 5999.00, 100, '电子产品');
INSERT INTO orders VALUES (1001, 1, '2023-01-05 14:30:00', '已完成', 5999.00);
INSERT INTO order_items VALUES (10001, 1001, 101, 1, 5999.00);

确保MySQL已开启binlog,编辑MySQL配置文件:

[mysqld]
server-id=1
log-bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL

1.2 准备Elasticsearch环境

创建索引映射:

PUT /shop_orders
{"mappings": {"properties": {"order_id": { "type": "keyword" },"user_id": { "type": "keyword" },"user_name": { "type": "keyword" },"user_email": { "type": "keyword" },"order_time": { "type": "date" },"status": { "type": "keyword" },"total_amount": { "type": "double" },"items": {"type": "nested","properties": {"product_id": { "type": "keyword" },"product_name": { "type": "text" },"quantity": { "type": "integer" },"price": { "type": "double" },"category": { "type": "keyword" }}}}}
}

步骤二:创建Flink流式ETL项目

2.1 创建Maven项目

pom.xml文件配置:

<dependencies><!-- Flink核心依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.0</version></dependency><!-- Flink CDC连接器 --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- Elasticsearch连接器 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>1.17.0</version></dependency><!-- JSON处理 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.17.0</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.9.0</version></dependency>
</dependencies>

2.2 实现ETL主程序

创建MySQLToElasticsearchETL.java文件:

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class MySQLToElasticsearchETL {public static void main(String[] args) throws Exception {// 1. 设置Flink执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);  // 开发环境设置为1,生产环境根据需要调整env.enableCheckpointing(60000);  // 每60秒做一次检查点// 2. 配置MySQL CDC源MySqlSource<String> userSource = MySqlSource.<String>builder().hostname("localhost").port(3306).databaseList("shop").tableList("shop.users").username("root").password("yourpassword").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();MySqlSource<String> productSource = MySqlSource.<String>builder().hostname("localhost").port(3306).databaseList("shop").tableList("shop.products").username("root").password("yourpassword").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();MySqlSource<String> orderSource = MySqlSource.<String>builder().hostname("localhost").port(3306).databaseList("shop").tableList("shop.orders").username("root").password("yourpassword").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();MySqlSource<String> orderItemSource = MySqlSource.<String>builder().hostname("localhost").port(3306).databaseList("shop").tableList("shop.order_items").username("root").password("yourpassword").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();// 3. 创建数据流DataStream<String> userStream = env.fromSource(userSource,WatermarkStrategy.noWatermarks(),"User CDC Source");DataStream<String> productStream = env.fromSource(productSource,WatermarkStrategy.noWatermarks(),"Product CDC Source");DataStream<String> orderStream = env.fromSource(orderSource,WatermarkStrategy.noWatermarks(),"Order CDC Source");DataStream<String> orderItemStream = env.fromSource(orderItemSource,WatermarkStrategy.noWatermarks(),"OrderItem CDC Source");// 4. 数据转换与关联// 用户缓存Map<Integer, Map<String, Object>> userCache = new HashMap<>();userStream.map(json -> {JsonObject jsonObject = JsonParser.parseString(json).getAsJsonObject();JsonObject after = jsonObject.getAsJsonObject("after");if (after != null) {int userId = after.get("id").getAsInt();Map<String, Object> userInfo = new HashMap<>();userInfo.put("name", after.get("name").getAsString());userInfo.put("email", after.get("email").getAsString());userCache.put(userId, userInfo);}return json;});// 产品缓存Map<Integer, Map<String, Object>> productCache = new HashMap<>();productStream.map(json -> {JsonObject jsonObject = JsonParser.parseString(json).getAsJsonObject();JsonObject after = jsonObject.getAsJsonObject("after");if (after != null) {int productId = after.get("id").getAsInt();Map<String, Object> productInfo = new HashMap<>();productInfo.put("name", after.get("name").getAsString());productInfo.put("price", after.get("price").getAsDouble());productInfo.put("category", after.get("category").getAsString());productCache.put(productId, productInfo);}return json;});// 订单与订单项关联Map<Integer, List<Map<String, Object>>> orderItemsCache = new HashMap<>();orderItemStream.map(json -> {JsonObject jsonObject = JsonParser.parseString(json).getAsJsonObject();JsonObject after = jsonObject.getAsJsonObject("after");if (after != null) {int orderId = after.get("order_id").getAsInt();int productId = after.get("product_id").getAsInt();Map<String, Object> itemInfo = new HashMap<>();itemInfo.put("product_id", productId);itemInfo.put("quantity", after.get("quantity").getAsInt());itemInfo.put("price", after.get("price").getAsDouble());// 添加产品信息if (productCache.containsKey(productId)) {itemInfo.put("product_name", productCache.get(productId).get("name"));itemInfo.put("category", productCache.get(productId).get("category"));}if (!orderItemsCache.containsKey(orderId)) {orderItemsCache.put(orderId, new ArrayList<>());}orderItemsCache.get(orderId).add(itemInfo);}return json;});// 处理订单并关联用户和订单项SingleOutputStreamOperator<Map<String, Object>> enrichedOrderStream = orderStream.map(new MapFunction<String, Map<String, Object>>() {@Overridepublic Map<String, Object> map(String json) throws Exception {JsonObject jsonObject = JsonParser.parseString(json).getAsJsonObject();JsonObject after = jsonObject.getAsJsonObject("after");String op = jsonObject.get("op").getAsString();Map<String, Object> orderInfo = new HashMap<>();// 只处理插入和更新事件if ("c".equals(op) || "u".equals(op)) {int orderId = after.get("id").getAsInt();int userId = after.get("user_id").getAsInt();orderInfo.put("order_id", orderId);orderInfo.put("user_id", userId);orderInfo.put("order_time", after.get("order_time").getAsString());orderInfo.put("status", after.get("status").getAsString());orderInfo.put("total_amount", after.get("total_amount").getAsDouble());// 关联用户信息if (userCache.containsKey(userId)) {orderInfo.put("user_name", userCache.get(userId).get("name"));orderInfo.put("user_email", userCache.get(userId).get("email"));}// 关联订单项if (orderItemsCache.containsKey(orderId)) {orderInfo.put("items", orderItemsCache.get(orderId));}}return orderInfo;}});// 5. 配置Elasticsearch接收器List<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost("localhost", 9200, "http"));ElasticsearchSink.Builder<Map<String, Object>> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,(request, context, element) -> {if (element.containsKey("order_id")) {request.index("shop_orders").id(element.get("order_id").toString()).source(element);}});// 配置批量写入esSinkBuilder.setBulkFlushMaxActions(1);  // 每条记录立即写入,生产环境可以调大esSinkBuilder.setBulkFlushInterval(1000);  // 每秒刷新一次// 6. 写入ElasticsearchenrichedOrderStream.addSink(esSinkBuilder.build());// 7. 执行作业env.execute("MySQL to Elasticsearch ETL Job");}
}

步骤三:部署和运行

3.1 编译打包

使用Maven打包:

mvn clean package

3.2 提交到Flink集群

flink run -c MySQLToElasticsearchETL target/your-jar-file.jar

3.3 验证数据同步

在Elasticsearch中查询数据:

curl -X GET "localhost:9200/shop_orders/_search?pretty"

关键点和注意事项

  1. 数据一致性

    • 确保开启Flink的检查点机制,实现exactly-once语义
    • 合理设置检查点间隔,平衡一致性和性能
  2. 状态管理

    • 在上述例子中,我们在内存中维护了用户和产品的缓存,生产环境应使用Flink的状态API
    • 考虑状态大小和清理策略,避免状态无限增长
  3. 表关联策略

    • 上述示例使用了简化的表关联方式
    • 生产环境可以考虑使用Flink SQL或异步I/O进行优化
  4. 性能优化

    • 调整并行度以匹配业务需求
    • 设置合适的批处理大小和间隔
    • 监控反压(backpressure)情况
  5. 错误处理

    • 添加错误处理逻辑,处理数据格式异常
    • 实现重试机制,应对临时网络故障
    • 考虑死信队列(DLQ)来处理无法处理的消息
  6. 监控和告警

    • 接入Prometheus和Grafana监控Flink作业
    • 设置关键指标告警,如延迟、失败次数等
  7. 扩展性考虑

    • 设计时考虑表结构变更的处理方式
    • 为未来增加新数据源或新目标系统预留扩展点

扩展功能

基于这个基础架构,您可以进一步实现:

  1. 增量更新优化:只同步变更字段,减少网络传输
  2. 历史数据回溯:支持从特定时间点重新同步数据
  3. 数据转换:增加复杂的业务计算逻辑
  4. 数据过滤:根据业务规则过滤不需要的数据
  5. 多目标写入:同时将数据写入Elasticsearch和其他系统如Kafka

这个完整的方案展示了如何使用Flink CDC构建一个端到端的流式ETL系统,实现从MySQL到Elasticsearch的实时数据同步,同时处理表之间的关联关系。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/73339.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker-Volume数据卷详讲

Docker数据卷-Volume 一&#xff1a;Volume是什么&#xff0c;用来做什么的 当删除docker容器时&#xff0c;容器内部的文件就会跟随容器所销毁&#xff0c;在生产环境中我们需要将数据持久化保存&#xff0c;就催生了将容器内部的数据保存在宿主机的需求&#xff0c;volume …

单片机和微控制器知识汇总——《器件手册--单片机、数字信号处理器和可编程逻辑器件》

目录 四、单片机和微控制器 4.1 单片机(MCU/MPU/SOC) 一、定义 二、主要特点 三、工作原理 四、主要类型 五、应用领域 六、选型与设计注意事项 七、发展趋势 4.2 数字信号处理器(DSP/DSC) ​编辑​编辑 一、定义 二、工作原理 三、结构特点 四、应用领域 五、选型与设计注…

macOS 安装 Miniconda

macOS 安装 Miniconda 1. Quickstart install instructions2. 执行3. shell 上初始化 conda4. 关闭 终端登录用户名前的 base参考 1. Quickstart install instructions mkdir -p ~/miniconda3 curl https://repo.anaconda.com/miniconda/Miniconda3-latest-MacOSX-arm64.sh -o…

高数下---8.1平面与直线

目录 平面的确定 直线的确定 若要求某一直线或平面就根据要素来求。 例题 平面中的特殊情况 平面中的解题思路 直线的解题思路 平面的确定 两要素 一 一点 二 倾斜角 即法向量 点法式 可化为一般式 Ax By Cz D 0; (A,B,C) 即法向量&#xff1b; 改变D 即…

CMS迁移中SEO优化整合步骤详解

内容概要 在CMS迁移过程中&#xff0c;系统化的规划与执行是保障SEO排名稳定性的核心。首先需明确迁移流程的关键阶段&#xff0c;包括数据备份、URL适配、元数据同步及安全配置等环节。其中&#xff0c;数据备份不仅需覆盖原始数据库与静态资源&#xff0c;还需验证备份文件的…

存储过程、存储函数与触发器详解(MySQL 案例)

存储过程、存储函数与触发器详解&#xff08;MySQL 案例&#xff09; 一、存储过程&#xff08;Stored Procedure&#xff09; 定义 存储过程是预先编译好并存储在数据库中的一段 SQL 代码集合&#xff0c;可以接收参数、执行逻辑操作&#xff08;如条件判断、循环&#xff09;…

Python:进程间的通信,进程的操作队列

进程间的队列&#xff1a; 队列的基本操作&#xff1a; 入队&#xff1a;将数据放到队列尾部 出队&#xff1a;从队列的头部取出一个元素 maxsize&#xff1a;队列中能存放数据个数的上限(整数)&#xff0c;一旦达到上限插入会导致阻塞&#xff0c;直到队列中的数据被消费掉 …

【C++初阶】--- 类与对象(中)

1.类的默认成员函数 默认成员函数就是⽤⼾没有显式实现&#xff0c;编译器会⾃动⽣成的成员函数称为默认成员函数。⼀个类&#xff0c;我们不写的情况下编译器会默认⽣成以下6个默认成员函数&#xff0c;我们主要需要掌握前4个&#xff0c;后两个了解以下即可&#xff0c;默认…

python处理音频相关的库

1 音频信号采集与播放 pyaudio import sys import pyaudio import wave import timeCHUNK 1024 FORMAT pyaudio.paInt16 CHANNELS 1#仅支持单声道 RATE 16000 RECORD_SECONDS 3#更改录音时长#录音函数&#xff0c;生成wav文件 def record(file_name):try:os.close(file_…

[M模拟] lc2711. 对角线上不同值的数量差(对角线遍历+前后缀分解)

文章目录 1. 题目来源2. 题目解析 1. 题目来源 链接&#xff1a;2711. 对角线上不同值的数量差 前置题&#xff1a; [M模拟] lc3446. 按对角线进行矩阵排序(对角线遍历公式推导模板题) 矩形的对角线遍历的基础题。 题单&#xff1a; 待补充 2. 题目解析 2025年03月25日…

设计一个基于机器学习的光伏发电功率预测模型,以Python和Scikit - learn库为例

下面为你设计一个基于机器学习的光伏发电功率预测模型&#xff0c;以Python和Scikit - learn库为例。此模型借助历史气象数据和光伏发电功率数据来预测未来的光伏发电功率。 模型设计思路 数据收集&#xff1a;收集历史气象数据&#xff08;像温度、光照强度、湿度等&#xf…

洛谷 P1351 [NOIP 2014 提高组] 联合权值(树)

题目描述 无向连通图 G 有 n 个点&#xff0c;n−1 条边。点从 1 到 n 依次编号,编号为 i 的点的权值为 Wi​&#xff0c;每条边的长度均为 1。图上两点 (u,v) 的距离定义为 u 点到 v 点的最短距离。对于图 G 上的点对 (u,v)&#xff0c;若它们的距离为 2&#xff0c;则它们之间…

YoloV8训练和平精英人物检测模型

概述 和平精英人物检测&#xff0c;可以识别游戏中所有人物角色&#xff0c;并通过绘制框将人物选中&#xff0c;训练的模型仅仅具有识别功能&#xff0c;可以识别游戏中的视频、图片等文件&#xff0c;搭配Autox.js可以推理&#xff0c;实现实时绘制&#xff0c;但是对手机性…

智能汽车图像及视频处理方案,支持视频实时拍摄特效能力

在智能汽车日新月异的今天&#xff0c;美摄科技作为智能汽车图像及视频处理领域的先行者&#xff0c;凭借其卓越的技术实力和前瞻性的设计理念&#xff0c;为全球智能汽车制造商带来了一场视觉盛宴的革新。美摄科技推出智能汽车图像及视频处理方案&#xff0c;一个集高效性、智…

架构设计之自定义延迟双删缓存注解(下)

架构设计之自定义延迟双删缓存注解(下) 小薛博客官方架构设计之自定义延迟双删缓存注解(下)地址 为了保证Cache和ClearAndReloadCache的灵活性&#xff0c;特意加入EL表达式解析 1、Cache package com.xx.cache;import java.lang.annotation.*; import java.util.concurren…

rosbag|ROS中.bag数据包转换为matlab中.mat数据类型

代码见代码 msg_dict中设置自定义消息类型 test_config中设置需要记录的具体的值 test_config中topic_name以及message_type照搬plotjuggler打开时的参数 最后生成.mat文件在matlab中进行使用

基于动态 FOF(基金中的基金)策略的基金交易推荐系统的设计与实现思路

下面为你呈现一个基于动态 FOF&#xff08;基金中的基金&#xff09;策略的基金交易推荐系统的设计与实现思路&#xff0c;同时给出一个简单的 Python 示例代码。 系统设计 1. 需求分析 收集各类基金的历史数据&#xff0c;涵盖净值、收益率、风险指标等。依据动态 FOF 策略…

搭建主从DNS、nfs、nginx

任务需求&#xff1a; 客户端通过访问 www.nihao.com 后&#xff0c;能够通过 dns 域名解析&#xff0c;访问到 nginx 服务中由 nfs 共享的首页文件&#xff0c;内容为&#xff1a;Very good, you have successfully set up the system. 各个主机能够实现时间同步&#xff0c;…

JS 对象转数组,数组转对象

数据格式 objMap : {apiP: 8000, sder: true, host: "1.111", wPort: "1335" }要求&#xff1a;将 objMap 转化为 数组 const equipArray Object.keys(objMap ).map(key > {return {name: key,value: objMap [key]}打印结果 数组转为对象 let equipAr…

vue - [Vue warn]: Duplicate keys detected: ‘0‘. This may cause an update error.

问题描述&#xff1a; vue项目中&#xff0c;对表单数组赋值时&#xff0c;控制台抛出警告&#xff1a; 问题代码&#xff1a; 问题分析&#xff1a; 1、Vue 要求每个虚拟 DOM 节点必须有唯一的 key。该警告信息通常出现在使用v-for循环的场景中&#xff0c;多个同级节点使用…