批量生产千万级数据 推送到kafka代码

1、批量规则生成代码

1、随机IP生成代码
2、指定时间范围内随机日期生成代码
3、随机中文名生成代码。

package com.wfg.flink.connector.utils;import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;/*** @author wfg*/
public class RandomGeneratorUtils {/*** 生成随机的IP地址* @return ip*/public static String generateRandomIp() {Random random = new Random();// 生成IP地址的四个部分 0-255int part1 = random.nextInt(256);int part2 = random.nextInt(256);int part3 = random.nextInt(256);int part4 = random.nextInt(256);// 将每个部分转换为字符串,并用点连接return part1 + "." + part2 + "." + part3 + "." + part4;}/*** 在指定的开始和结束日期之间生成一个随机的日期时间。** @param startDate 开始日期* @param endDate 结束日期* @return 随机生成的日期时间*/public static LocalDateTime generateRandomDateTime(LocalDate startDate, LocalDate endDate) {long startEpochDay = startDate.toEpochDay();long endEpochDay = endDate.toEpochDay();Random random = new Random();LocalDate randomDate = startDate;if (startEpochDay != endEpochDay) {long randomDay = startEpochDay + random.nextLong(endEpochDay - startEpochDay);randomDate = LocalDate.ofEpochDay(randomDay);}LocalTime randomTime = LocalTime.of(// 小时random.nextInt(24),// 分钟random.nextInt(60),// 秒random.nextInt(60));return LocalDateTime.of(randomDate, randomTime);}private static final List<String> FIRST_NAMES = new ArrayList<>();private static final List<String> LAST_NAMES = new ArrayList<>();static {// 初始化一些常见的名字和姓氏FIRST_NAMES.addAll(Arrays.asList("王","李","赵","刘","张", "钱", "孙", "周", "吴", "郑", "王", "冯", "陈", "褚", "卫", "蒋", "沈", "韩", "杨", "朱", "秦", "尤", "许", "何", "吕", "施","张", "孔", "曹", "严", "华", "金", "魏", "陶", "姜", "戚", "谢", "邹", "喻", "柏", "水", "窦", "章", "云", "苏", "潘", "葛", "奚", "范", "彭", "郎","鲁","韦", "昌", "马", "苗", "凤", " 刚", "文", "张", "桂", "富", "生", "龙", "功", "夫", "周", "建", "余", "冲", "程", "沙", "阳", "江", "潘"));LAST_NAMES.addAll(Arrays.asList("伟","芳","丽","强","明","风","阳","海","天","藏","霞","刚", "夫", "勇", "毅", "俊", "峰", "强", "军", "平", "保", "东", "文", "辉", "力", "明", "永", "健", "世", "广", "志", "义","义", "兴", "良", "海", "山", "仁", "波", "宁", "贵", "福", "生", "龙", "元", "全", "国", "胜", "学", "祥", "才", "艺", "能", "武", "航", "城", "春","明", "文", "辉", "建", "永", "强", "军", "平", "保", "东", "文", "辉", "力", "明", "永", "健", "世", "广", "志", "义", "义", "兴", "良", "海", "山","仁", "波", "宁", "贵", "福", "生", "龙", "元", "全", "国", "胜", "学", "祥", "才", "艺", "能", "武", "航", "城", "春", "明", "文", "辉", "建","永", "强", "军", "平", "保", "东", "文", "辉", "力", "明", "永", "健", "世", "广", "志", "义", "义", "兴", "良", "海", "山", "仁", "波", "宁","贵", "福", "生", "龙", "元", "全", "国", "胜", "学", "祥", "才", "艺", "能", "武", "航", "城", "春", "明", "文", "辉", "建", "永", "强", "军","平", "保", "东", "文", "辉", "力", "明", "永", "健", "世", "广", "志", "义", "义", "兴", "良", "海", "山", "仁", "波", "宁", "贵", "福", "生","龙", "元", "全", "国", "胜", "学", "祥", "才", "艺", "能", "武", "航", "城", "春", "明", "文", "辉", "建", "永", "强", "军", "平", "保", "东"));}/*** 生成一个随机的全名,由随机的姓和名组成。** @return 随机全名*/public static String generateRandomFullName() {Random random = new Random();String firstName = FIRST_NAMES.get(random.nextInt(FIRST_NAMES.size()));String lastName = LAST_NAMES.get(random.nextInt(LAST_NAMES.size()));return firstName + lastName;}
}

2. 生成对象体

package com.wfg.flink.connector.dto;import lombok.Data;/*** @author wfg*/
@Data
public class KafkaPvDto {// uuidprivate String uuid;// 用户名private String userName;// 访问时间private String visitTime;// 访问地址private String visitIp;// 访问服务IPprivate String visitServiceIp;
}

3. 批量数据生成推送代码

package com.wfg.flink.test.kafka;import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import com.alibaba.fastjson2.JSONObject;
import com.wfg.flink.connector.dto.KafkaPvDto;
import com.wfg.flink.connector.utils.RandomGeneratorUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.time.LocalDate;
import java.util.Date;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;import static com.wfg.flink.connector.constants.Constants.KAFKA_BROKERS;
import static com.wfg.flink.connector.constants.Constants.TEST_TOPIC_PV;public class KafkaTestProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", KAFKA_BROKERS);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");try (Producer<String, String> producer = new KafkaProducer<>(props)){int times = 100000;for (int i = 0; i < times; i++) {System.out.println("Send No. :" + i );CompletableFuture.allOf(CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer))).join();producer.flush();}}}private static void sendKafkaMsg (Producer < String, String > producer){String msg = createMsg();System.out.println(msg);producer.send(new ProducerRecord<>(TEST_TOPIC_PV, UUID.randomUUID().toString().replaceAll("-", ""), msg));}private static String createMsg () {KafkaPvDto dto = new KafkaPvDto();dto.setUuid(UUID.randomUUID().toString().replaceAll("-", ""));dto.setUserName(RandomGeneratorUtils.generateRandomFullName());dto.setVisitIp(RandomGeneratorUtils.generateRandomIp());DateTime begin = DateUtil.beginOfDay(new Date());String timeStr = DateUtil.format(RandomGeneratorUtils.generateRandomDateTime(LocalDateTimeUtil.of(begin).toLocalDate(), LocalDate.now()), "yyyy-MM-dd HH:mm:ss");dto.setVisitTime(timeStr);dto.setVisitServiceIp(RandomGeneratorUtils.generateRandomIp());return JSONObject.toJSONString(dto);}
}

注意:
kafka本机运行请参考: kafka本地部署链接

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/29650.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

个人关于Vue2组成的见解

前言 Vue.js 是一个用于构建用户界面的渐进式框架&#xff0c;它从核心逐渐扩展&#xff0c;提供了一套丰富的功能来构建复杂的单页应用。Vue 2 是该框架的第二个主要版本&#xff0c;引入了许多改进和新特性。 组成部分及解析 模板&#xff08;Template&#xff09; 模板是…

简易计算器需求报告

1. &#xff08;简易计算器&#xff09; 需求说明书 文件编号&#xff1a;2022[1] [木柚2] 06[3] [木柚4] 01[5] [木柚6] 完成日期&#xff1a;2024年 06月18日 编制&#xff1a; 易正阳 日期&#xff1a;2024年6月18日 审核&#xff1a;张正 日期&#xff1a;2024年6月18…

Vue微前端架构与Qiankun实践理论指南

title: Vue微前端架构与Qiankun实践理论指南 date: 2024/6/15 updated: 2024/6/15 author: cmdragon excerpt: 这篇文章介绍了微前端架构概念&#xff0c;聚焦于如何在Vue.js项目中应用Qiankun框架实现模块化和组件化&#xff0c;以达到高效开发和维护的目的。讨论了Qiankun…

有哪些技术可代替docker?

Docker是用于创建和管理容器化应用程序的流行平台&#xff0c;但市场上也存在多种Docker的替代方案。 前言 国内使用Docker的困难现象&#xff0c;无疑是一个值得深入剖析和批判的问题。Docker作为一种容器化技术&#xff0c;以其轻量级、可移植性和高效性在全球范围内得到了广…

软件测试面试题:性能测试关注哪些指标?

问题 在工作中&#xff0c;使用JMeter做压力测试时&#xff0c;需要关注其中的哪些指标&#xff1f; 性能测试关注哪些指标&#xff1f; 考察点 面试官想了解&#xff1a; 是否用过 JMeter 指标进行分析 技术点 涉及的技术点&#xff1a; JMeter 结果分析 回答 性能指…

gitblit git pycharm 新建版本库及push备忘

在终端l中输入ssh,如果有消息弹出说明安装成功。 // 在任意路径打开GIT BASH,执行以下命令,期间所有询问可以直接Enter跳过 ssh-keygen -t rsa -C "注册Gitlab的邮箱" “”之内可以任何文字,备注提示作用。 设置用户名和邮箱 已经设置的可以检查一下。 #设置用…

SpringBoot配置第三方专业缓存框架j2cache

j2cache的使用 这不是一个缓存 这是一个缓存框架 J2Cache, 也称为Java Cache或JSR-107&#xff0c;是一个用于缓存管理的标准API&#xff0c;它允许开发者在Java应用程序中实现分布式、基于内存的缓存。J2Cache主要通过javax.cache.Cache接口提供功能&#xff0c;用于存储和…

RoaringBitMap处理海量数据内存diff

一、背景 假设mysql库中有一张近千万的客户信息表(未分表)&#xff0c;其中有客户性别&#xff0c;等级(10个等级)&#xff0c;参与某某活动等字段 1、如果要通过等级性别其他条件(离散度也低)筛选出客户&#xff0c;如何处理查询&#xff1f; 2、参与活动是记录活动ID&#…

了解Nest.js

一直做前端开发&#xff0c;都会有成为全栈工程师的想法&#xff0c;而 Nest 就是一个很好的途径&#xff0c;它是 Node 最流行的企业级开发框架&#xff0c;提供了 IOC、AOP、微服务等架构特性。接下来就让我们一起来学习Nest.js Nest.js官网地址 一&#xff0c;了解Nest Cli …

充电学习—6、电量计FuelGauge

电量计功能&#xff1a; 检测电池 计量电量 电量计首要工作&#xff1a; 计算电池的剩余容量、充满时容量、电量百分比 电量百分比 剩余容量 / 充满时容量 * 100% SOC RM / FCC * 100% 典型的一个电池包框架&#xff1a; 包含电芯、电量计IC、保护IC、充放电MOSFET、保险丝…

【做一道算一道】零钱兑换

零钱兑换 给你一个整数数组 coins &#xff0c;表示不同面额的硬币&#xff1b;以及一个整数 amount &#xff0c;表示总金额。 计算并返回可以凑成总金额所需的 最少的硬币个数 。如果没有任何一种硬币组合能组成总金额&#xff0c;返回 -1 。 你可以认为每种硬币的数量是无…

栈帧浅析,堆栈漏洞概述——【太原理工大学软件安全期末补充】

在上一篇文章中我说实验一不重要&#xff0c;确实没必要完全按照实验内容逐字逐句理解&#xff0c;但是这里我们补充一个知识点 栈帧&#xff08;Stack Frame&#xff09;是计算机程序执行过程中&#xff0c;调用栈&#xff08;Call Stack&#xff09;中的一个单元&#xff0c;…

hugging face:大模型时代的github介绍

1. Hugging Face是什么&#xff1a; Hugging Face大模型时代的“github”&#xff0c;很多人有个这样的认知&#xff0c;但是我觉得不完全准确&#xff0c;他们相似的地方在于资源丰富&#xff0c;github有各种各样的软件代码和示例&#xff0c;但是它不是系统的&#xff0c;没…

Linux-DNS域名解析服务01

BIND 域名服务基础 1、DNS&#xff08;Domain Name System&#xff09;系统的作用及类型 整个 Internet 大家庭中连接了数以亿计的服务器、个人主机&#xff0c;其中大部分的网站、邮件等服务器都使用了域名形式的地址&#xff0c;如 www.google.com、mail.163.com 等。很显然…

探索C嘎嘎的奇妙世界:第十四关---STL(string的模拟实现)

1. string类的模拟实现 1.1 经典的string类问题 上一关已经对string类进行了简单的介绍&#xff0c;大家只要能够正常使用即可。在面试中&#xff0c;面试官总喜欢让学生自己来模拟实现string类&#xff0c;最主要是实现string类的构造、拷贝构造、赋值运算符重载以及析构函数…

MacOS - command not found: brew

问题描述 command not found: brew 原因分析 没有安装 Homebrew&#xff0c;安装后即可使用~ 解决方案 打开终端&#xff0c;输入&#xff1a;/bin/zsh -c "$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/raw/master/Homebrew.sh)"&#xff0c;点击回车 在弹出…

make V=1 分析

文章目录 make V11. 加载 -include 声明的文件 make V1 1. 加载 -include 声明的文件 # 顶层 Makefile 270 KCONFIG_CONFIG ? .config ...... 580 -include include/config/auto.conf 584 -include include/config/auto.conf.cmd ...... 587 $(KCONFIG_CONFIG) include/confi…

小程序的登录+发布流程

今天我们来将一下小程序的登录和发布流程&#xff01;&#xff01;&#xff01; 小程序的登录流程 流程图 首先登录流程还是看官网说的&#xff1a;https://developers.weixin.qq.com/miniprogram/dev/framework/open-ability/login.html 这是官网发布的一个流程图 认识cod…

2 图片的分割处理和亚像素精度处理(c++和python)

本文的图片处理分为图片分割、图像的亚像素坐标处理。亚像素处理的原理可以看论文一种基于多项式插值改进的亚像素细分算法&#xff0c;该论文的详解及c的代码实现可以看博文基于多项式插值的亚像素边缘定位算法_基于多项式插值的亚像素算法-CSDN博客。下面的内容很多来自以上博…

【论文阅读】-- 时间空间化:用于深度分类器训练的可扩展且可靠的时间旅行可视化

Temporality Spatialization: A Scalable and Faithful Time-Travelling Visualization for Deep Classifier Training 摘要1 引言2 动机3 问题定义4 方法论4.1 时空复合体4.2 复数约简 5 实验6 相关工作7 结论参考文献 摘要 时间旅行可视化回答了深度分类器的预测是如何在训练…