106 基于消息队列来做 mysql 大数据表数据的遍历处理

前言

最近有这样的一个需求, 我们存在一张 很大的 mysql 数据表, 数据量大概是在 六百万左右 

然后 需要获取所有的记录, 将数据传输到 es 中 

然后 当时 我就写了一个脚本来读取 这张大表, 然后 分页获取数据, 然后 按页进行数据处理 转换到 es 

但是存在的问题是, 前面 还效率还可以, 但是 约到后面, 大概是到 三百多页, 的时候 从 mysql 读取数据 已经快不行了 

十分耗时, 这里就是 记录这个问题的 另外的处理方式 

我这里的处理是基于 消息中间件, 从 mysql 通过 datax/spoon 传输数据到 kafka 很快 

然后  java 程序从 kafka 中消费队列的数据 也很快, 最终 六百万的数据 读取 + 处理 合计差不多是 一个多小时完成, 其中处理 有一部分地方 业务上面比较耗时 

 

 

待处理的数据表

待处理的数据表如下, 里面合计 600w 的数据 

CREATE TABLE `student_all` (`id` int NOT NULL AUTO_INCREMENT,`field0` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field1` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field2` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field3` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field4` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field5` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field6` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field7` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field8` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field9` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field10` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field11` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field12` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field13` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field14` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field15` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field16` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field17` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field18` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field19` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field20` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field21` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field22` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field23` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field24` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field25` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field26` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field27` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field28` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`field29` varchar(128) COLLATE utf8mb4_general_ci NOT NULL,`CREATED_AT` bigint NOT NULL,`UPDATED_AT` bigint NOT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4379001 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci

 

 

基于 mysql 的数据分页处理

基于 mysql 的处理程序如下, 就是一个简单的 mysql 分页 

然后将需要提取的数据封装, 然后 批量提交给 es 

总的情况来说是 前面的一部分页是可以 很快的响应数据, 但是 越到后面, mysql 服务器越慢 

/*** Test05PostQy2Es** @author Jerry.X.He* @version 1.0* @date 2022/11/21 16:00*/
public class Test05PostEsFromMysql {private static String mysqlUrl = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&autoReconnectForPools=true";private static String mysqlUsername = "postgres";private static String mysqlPassword = "postgres";private static JdbcTemplate mysqlJdbcTemplate = JdbcTemplateUtils.getJdbcTemplate(mysqlUrl, mysqlUsername, mysqlPassword);private static RestHighLevelClient esClient = getEsClient();private static IndicesClient indicesClient = esClient.indices();// Test05PostQy2Espublic static void main(String[] args) throws Exception {String esIndexName = "student_all_20221211";bulkEsData(esIndexName);}private static void bulkEsData(String esIndexName) throws Exception {String queryDbTableName = "student_all";List<String> fieldList = Arrays.asList("id", "field0", "field1", "field2", "field3", "field4", "field5", "field6", "field7", "field8", "field9", "field10", "field11", "field12", "field13", "field14", "field15", "field16", "field17", "field18", "field19", "field20", "field21", "field22", "field23", "field24", "field25", "field26", "field27", "field28", "field29", "CREATED_AT", "UPDATED_AT");String idKey = "id";String whereCond = "";
//        String orderBy = "order by id asc";String orderBy = "";AtomicInteger counter = new AtomicInteger(0);int pageSize = 1000;int startPage = 0;pageDo(queryDbTableName, whereCond, orderBy, pageSize, startPage, (pageNo, list) -> {BulkRequest bulkRequest = new BulkRequest();for (Map<String, Object> entity : list) {IndexRequest indexRequest = new IndexRequest(esIndexName);Map<String, Object> sourceMap = new LinkedHashMap<>();List<String> allFieldsListed = new ArrayList<>();for (String fieldName : fieldList) {String fieldValue = String.valueOf(entity.get(fieldName));sourceMap.put(fieldName, fieldValue);allFieldsListed.add(Objects.toString(fieldValue, ""));}String id = String.valueOf(entity.get(idKey));indexRequest.id(id);sourceMap.put("_allFields", StringUtils.join(allFieldsListed, "$$"));indexRequest.source(sourceMap);bulkRequest.add(indexRequest);}try {BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);counter.addAndGet(list.size());} catch (Exception e) {e.printStackTrace();}System.out.println(" page : " + pageNo + ", flushed " + counter.get() + " records ");});}private static void pageDo(String tableName, String whereCond, String orderBy, int pageSize, int startPage,BiConsumer<Integer, List<Map<String, Object>>> func) {if (StringUtils.isNotBlank(whereCond) && (!whereCond.trim().toLowerCase().startsWith("where"))) {whereCond = " where " + whereCond;}if (StringUtils.isNotBlank(orderBy) && (!orderBy.trim().toLowerCase().startsWith("order"))) {orderBy = " order by " + orderBy;}String queryCountSql = String.format(" select count(*) from %s %s %s", tableName, whereCond, orderBy);Integer totalCount = mysqlJdbcTemplate.queryForObject(queryCountSql, Integer.class);Integer totalPage = (totalCount == null || totalCount == 0) ? 0 : (totalCount - 1) / pageSize + 1;for (int i = startPage; i < totalPage; i++) {int offset = i * pageSize;String queryPageSql = String.format(" select * from %s %s %s limit %s,%s ", tableName, whereCond, orderBy, offset, pageSize);List<Map<String, Object>> list = mysqlJdbcTemplate.queryForList(queryPageSql);func.accept(i, list);}}}

 

 

基于中间件 kafka 的处理

首先通过 spoon/datax 将数据从 mysql 转换到 kafka 

然后 再由脚本从 kafka 消费数据, 处理 传输到 es 中 

入了一次 消息队列之后, 然后程序 再来消费, 就会快很多了, 消息队列本身功能比较单纯 比较适合于做做顺序遍历 就会有优势一些 

 

这里以 spoon 将数据从 mysql 转换到 kafka 

我这里 本地环境 内存等什么的都不足, 因此是 一分钟 入库三万条, 但是 实际生产环境 会很快 

在生产环境 五百多w 的数据, 基于 datax 传输 mysql 到 kafka, 差不多是 五六分钟 就可以了 

e3cb2b641cfe4d208e11040f1b5fbc2a.png

 

 

基于 kafka 将数据传输到 es 

如下程序 仅仅是将 kafka 中的数据 原样照搬过去了, 但是 实际的场景 中会做一些 额外的业务处理, 这里仅仅是为了 演示 

/*** Test05PostQy2Es** @author Jerry.X.He* @version 1.0* @date 2022/11/21 16:00*/
public class Test05PostEsFromKafka {private static RestHighLevelClient esClient = getEsClient();private static IndicesClient indicesClient = esClient.indices();private static String esIndexName = "student_all_20221211";private static String groupId = "group-01";// Test05PostQy2Espublic static void main(String[] args) throws Exception {bulkKafka2EsData(esIndexName, groupId);}private static void bulkKafka2EsData(String esIndexName, String groupId) throws Exception {List<Pair<String, String>> hjk2StdFieldMap = hjk2StdFieldMap();Properties properties = kafkaProperties(groupId);String idKey = "ID";KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);kafkaConsumer.subscribe(Arrays.asList("STUDENT_ALL_20221211"));AtomicInteger counter = new AtomicInteger(0);long start = System.currentTimeMillis();while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(100);if (records.isEmpty()) {Thread.sleep(10 * 1000);long spent = System.currentTimeMillis() - start;System.out.println(" spent : " + (spent / 1000) + " s ");continue;}BulkRequest bulkRequest = new BulkRequest();boolean isEmpty = true;for (ConsumerRecord<String, String> record : records) {IndexRequest indexRequest = new IndexRequest(esIndexName);String value = record.value();JSONObject entity = JSON.parseObject(value);// 获取 idString id = StringUtils.defaultIfBlank(entity.getString(idKey), "");if (isFilterByQy(id)) {continue;}Map<String, Object> sourceMap = new LinkedHashMap<>();List<String> allFieldsListed = new ArrayList<>();for (Pair<String, String> entry : hjk2StdFieldMap) {String hjkKey = entry.getKey(), stdKey = entry.getValue();String fieldValue = StringUtils.defaultIfBlank(entity.getString(hjkKey), "");sourceMap.put(stdKey, fieldValue);allFieldsListed.add(Objects.toString(fieldValue, ""));}indexRequest.id(id);sourceMap.put("_allFields", StringUtils.join(allFieldsListed, "$$"));isEmpty = false;indexRequest.source(sourceMap);bulkRequest.add(indexRequest);}if (isEmpty) {continue;}try {BulkResponse bulkResponse = esClient.bulk(bulkRequest, RequestOptions.DEFAULT);counter.addAndGet(bulkRequest.requests().size());} catch (Exception e) {e.printStackTrace();}System.out.println(" flushed " + counter.get() + " records ");}}private static List<Pair<String, String>> hjk2StdFieldMap() {List<Pair<String, String>> hjk2StdFieldMap = new ArrayList<>();hjk2StdFieldMap.add(new ImmutablePair<>("id", "id"));hjk2StdFieldMap.add(new ImmutablePair<>("CREATED_AT", "CREATED_AT"));hjk2StdFieldMap.add(new ImmutablePair<>("UPDATED_AT", "UPDATED_AT"));for (int i = 0; i < Test05CreateMysqlBigTable.maxFieldIdx; i++) {String fieldName = String.format("field%s", i);hjk2StdFieldMap.add(new ImmutablePair<>(fieldName, fieldName));}return hjk2StdFieldMap;}private static Properties kafkaProperties(String groupId) {Properties properties = new Properties();properties.put("bootstrap.servers", "192.168.0.190:9092");properties.put("group.id", groupId);properties.put("enable.auto.commit", "true");properties.put("auto.commit.interval.ms", "1000");properties.put("auto.offset.reset", "earliest");properties.put("session.timeout.ms", "30000");properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return properties;}private static boolean isFilterByQy(String qy) {if (StringUtils.isBlank(qy)) {return true;}return false;}}

 

 

spoon 安装 kakfa 插件

来自 Kettle安装Kafka Consumer和Kafka Producer插件

    1.从github上下载kettle的kafka插件,地址如下Kafka Consumer地址:https://github.com/RuckusWirelessIL/pentaho-kafka-consumer/releases/tag/v1.7Kafka Producer地址:https://github.com/RuckusWirelessIL/pentaho-kafka-producer/releases/tag/v1.92.进入 kettle 安装目录:在plugin目录下创建steps目录3.把下载的插件解压后放到 steps 目录下5.重启 spoon.bat 即可

 

 

 

 

参考

Kettle安装Kafka Consumer和Kafka Producer插件

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/755858.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前后端项目笔记

前端项目创建 准备工作 nodejs安装 vue cli安装 vue create frontend 最后一个y的话 它会保存 方便下次创建项目 我这是手快敲错了 随自己 前端项目组件及作用 Element-UI引入 安装 npm i element-ui -S main.js中引入 清空路口App.vue 清空Home页面 随便写个按钮 原因…

HCIE考证心得 | 在云校的学习收获颇多

我是来自深圳信息职业技术学院22级现代移动通信3-3班的冯同学&#xff0c;我在2023年12月12日通过了华为认证Cloud Service HCIE。在此&#xff0c;我将分享考证中的心得体会给大家。 备考的六点建议 一是要细心严谨&#xff0c;做实验时要全神贯注&#xff0c;明确实验要求…

面试算法-45-分发糖果

题目 n 个孩子站成一排。给你一个整数数组 ratings 表示每个孩子的评分。 你需要按照以下要求&#xff0c;给这些孩子分发糖果&#xff1a; 每个孩子至少分配到 1 个糖果。 相邻两个孩子评分更高的孩子会获得更多的糖果。 请你给每个孩子分发糖果&#xff0c;计算并返回需要…

react hook: useRef

在组件顶层调用 useRef 以声明一个 ref const ref useRef(initialValue) console.log(ref.current) initialValue&#xff1a;ref 对象的 current 属性的初始值。可以是任意类型的值。这个参数在首次渲染后被忽略。 current 返回一个只有一个属性的对象, 初始值为传递的 initi…

RocketMQ发送和接收方式详解

RocketMQ有几种发送方式 RocketMQ 提供了几种不同的消息发送方式&#xff0c;以满足不同场景下的需求。这些发送方式主要包括&#xff1a;同步发送&#xff08;Synchronous&#xff09;&#xff1a;这是最常见的一种发送方式&#xff0c;客户端发送消息后&#xff0c;会等待服…

Python-GEE绘制DEM精美图片

目录 上传矢量和DEM获取添加颜色条参考文章 先连接上GEE的自己的项目 import ee import geemap geemap.set_proxy(port33210) ee.Authenticate() ee.Initialize(projecta-flyllf0313)上传矢量和DEM获取 使用Google Earth Engine&#xff08;GEE&#xff09;和Google Earth Eng…

基于单片机的模糊PID炉温控制系统设计

摘 要 电热炉是在工业热处理的生产中广泛使用的一种设备&#xff0c;电热炉的温度控制系统存在时变性&#xff0c;非线性&#xff0c;滞后性等特征&#xff0c;难以用常规PID的控制器对系统达到很好的控制效果。当控温精度的要求高时&#xff0c;使用传统的控制理论方法难以达…

亮相AWE 2024,日立中央空调打造定制空气新体验

日立中央空调于3月14日携旗下空气定制全新成果&#xff0c;亮相2024中国家电及消费电子博览会&#xff08;简称AWE 2024&#xff09;现场&#xff0c;围绕“科创先行 智引未来”这一主题&#xff0c;通过技术与产品向行业与消费者&#xff0c;展现自身对于家居空气的理解。 展会…

kanzi颜色工作流程

线性和非线性伽玛色彩空间 RGB 颜色空间的目的是表示在计算机显示器上显示的颜色。目前&#xff0c;sRGB是非线性伽玛色彩空间的标准。之所以需要它&#xff0c;是因为人类对光的感知是非线性的&#xff0c;而且计算机显示器对光强度具有非线性响应。 人眼比浅色更能区分深色…

Android 13 源码编译及报错修复

下载AOSP指定分支 repo init -u git://aosp../platform/manifest -b android-13.0.0_r83 同步代码到本地 repo sync -c 初始化编译环境, 选择构建目标 source build/envsetup.sh lunch 选择需要构建的目标&#xff0c;此处以aosp_arm64-eng为例 进行固件编译 make -j12 期间编译…

力扣热门算法题 49. 字母异位词分组,50. Pow(x, n),51. N 皇后

49. 字母异位词分组&#xff0c;50. Pow(x, n)&#xff0c;51. N 皇后&#xff0c;每题做详细思路梳理&#xff0c;配套Python&Java双语代码&#xff0c; 2024.03.19 可通过leetcode所有测试用例。 目录 49. 字母异位词分组 解题思路 完整代码 python Java 50. Pow(x…

接口测试工具:Postman详解

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 关注公众号【互联网杂货铺】&#xff0c;回复 1 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 Postman 是一款功能强大的 API 开发和测试工具&#xff0c;以下…

【JDK原理】类加载约束条件

JVM简介 JVM&#xff08;Java虚拟机&#xff09;是Java程序的运行环境&#xff0c;它负责将Java字节码加载到内存中并执行。在JVM中&#xff0c;类加载是一个重要的过程&#xff0c;它负责将类的字节码加载到内存中&#xff0c;并对类进行验证、准备和解析&#xff0c;最终生成…

STM32F411 Micropython使用日记

1、开发板购买&#xff1a;推荐淘宝“无名科技Nologo” 19.8包邮到手&#xff1b;买开发板还需要买SPI NorFlash&#xff0c;推荐8MB的flash&#xff0c;不懂的可以问卖家&#xff0c;买回来需要焊接好&#xff0c;也可以找店家试试看能不能帮忙焊接&#xff0c;不然micropytho…

Java中加减乘除运算工具类

Java 中的 double 类型不能用于精确的加减乘除运算&#xff0c;这是因为计算机使用二进制来表示浮点数&#xff0c;而二进制无法精确表示所有十进制数。 原因&#xff1a; 浮点数由两部分组成&#xff1a;指数和尾数。指数表示浮点数的大小&#xff0c;尾数表示浮点数的精度。…

八节【DBA从入门到实践】课程,带你快速掌握OceanBase运维管理核心技能

为帮助用户及开发者更好、更快地掌握OceanBase DBA核心技能&#xff0c;OceanBase社区设计了配套教程——“DBA从入门到实践”。8期教程带大家循序渐进掌握OceanBase运维管理核心技能。搭配随堂习题和OceanBase技术专家在线答疑&#xff0c;快速掌握重要知识点&#xff0c;并轻…

【DL经典回顾】激活函数大汇总(二十七)(Bent Identity附代码和详细公式)

激活函数大汇总&#xff08;二十七&#xff09;&#xff08;Bent Identity附代码和详细公式&#xff09; 更多激活函数见激活函数大汇总列表 一、引言 欢迎来到我们深入探索神经网络核心组成部分——激活函数的系列博客。在人工智能的世界里&#xff0c;激活函数扮演着不可或…

直观与交互:山海鲸可视化软件与Excel传统表格的对比

作为一名长期使用Excel进行数据处理和分析的用户&#xff0c;最近我尝试了一款名为山海鲸的可视化软件&#xff0c;发现它与Excel传统表格之间存在诸多明显的差异。接下来&#xff0c;我将从个人体验视角出发&#xff0c;谈谈这两种工具的不同之处。 首先&#xff0c;从数据呈…

汇编语言和IBM的关系

一 缺乏汇编的硬件没有灵魂 1964年&#xff0c;在IBM没有发明System 360大型计算机之前&#xff0c;IBM已经发明了很多计算机。如IBM 1952年发布的第一台商用计算机&#xff1a;701计算机。1959年&#xff0c;IBM首次利用晶体管、磁芯存储器、印刷电路技术&#xff0c;发明了小…