flinksql kafka到mysql累计指标练习

flinksql 累计指标练习

数据流向:kafka ->kafka ->mysql

模拟写数据到kafka topic:wxt中

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) throws Exception {// 设置kafka服务器地址和端口号String kafkaServers = "localhost:9092";// 设置producer属性Properties properties = new Properties();properties.put("bootstrap.servers", kafkaServers);properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建Kafka producer对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 发送消息String topic = "wxt";JSONObject jsonObject = new JSONObject();jsonObject.put("id", 9);jsonObject.put("name", "王大大");jsonObject.put("age", 11);// 将JSON对象转换成字符串String jsonString = jsonObject.toString();// 输出JSON字符串System.out.println("JSON String: " + jsonString);ProducerRecord<String, String> record = new ProducerRecord<>(topic, jsonString);producer.send(record);// 关闭producerproducer.close();}
}

kafka topic :wxt1
在这里插入图片描述

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;public class KafkaToMysqlJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();TableEnvironment tEnv = TableEnvironment.create(settings);// 定义Kafka连接属性String kafkaBootstrapServers = "localhost:9092";String kafkaTopic = "wxt";String groupId = "wxt1";// 注册Kafka表tEnv.executeSql("CREATE TABLE kafka_table (\n" +"  id INT,\n" +"  name STRING,\n" +"  age INT,\n" +"  proctime as PROCTIME()\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = '" + kafkaTopic + "',\n" +"  'properties.bootstrap.servers' = '" + kafkaBootstrapServers + "',\n" +"  'properties.group.id' = '" + groupId + "',\n" +"  'format' = 'json',\n" +"  'scan.startup.mode' = 'earliest-offset'\n" +")");// 注册Kafka表// latest-offset//earliest-offsettEnv.executeSql("CREATE TABLE kafka_table2 (\n" +"  window_start STRING,\n" +"  window_end STRING,\n" +"  name STRING,\n" +"  age INT\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'wxt2',\n" +"  'properties.bootstrap.servers' = '" + kafkaBootstrapServers + "',\n" +"  'properties.group.id' = 'kafka_table2',\n" +"  'format' = 'json',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'value.format' = 'csv'\n" +")");tEnv.executeSql("CREATE TABLE mysql_sink_table (\n" +"  window_start String,\n" +"   window_end String,\n" +"    name String,\n" +"    age INT\n" +") WITH (\n" +"   'connector' = 'jdbc',\n" +"   'url' = 'jdbc:mysql://localhost:3306/tests?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',\n" +"   'username' = 'root',\n" +"   'password' = '12345678',\n" +"   'table-name' = 'leiji_age'\n" +")");tEnv.executeSql("insert into kafka_table2 select cast(window_start as string) as window_start,cast(window_end as string) as window_end,name,sum(age) as age\n" +"from TABLE( CUMULATE( TABLE kafka_table, DESCRIPTOR(proctime), INTERVAL '20' SECOND, INTERVAL '1' DAY))\n" +"group by  window_start,window_end,name");tEnv.executeSql("insert into mysql_sink_table select window_start,window_end,name,age from kafka_table2");env.execute("KafkaToMysqlJob");}
}

kafka topic :wxt2
在这里插入图片描述
mysql结果数据:
在这里插入图片描述
pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flinksql</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><scala.binary.version>2.12</scala.binary.version><flink.version>1.14.3</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-math3</artifactId><version>3.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.0.31</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><!-- https://mvnrepository.com/artifact/commons-io/commons-io --><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.11.0</version></dependency><dependency><groupId>org.antlr</groupId><artifactId>antlr-runtime</artifactId><version>3.5.2</version></dependency><dependency><groupId>org.apache.thrift</groupId><artifactId>libfb303</artifactId><version>0.9.3</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><configuration><!-- put your configurations here --></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-dependency-plugin</artifactId><version>2.10</version><executions><execution><id>copy-dependencies</id><phase>package</phase><goals><goal>copy-dependencies</goal></goals><configuration><outputDirectory>${project.build.directory}/lib</outputDirectory></configuration></execution></executions></plugin><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build></project>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/119247.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

新手入门?初登开发者舞台的你所适合的三大开发工具?

对新手开发者来说&#xff0c;工具的简洁性和实用性和自己的产出直接挂钩&#xff0c;一个好用的工具往往会让编译代码减少很多麻烦&#xff0c;有哪些比较适合的工具&#xff0c;几乎成了每个新人必定会问的问题之一。 针对这些疑惑&#xff0c;今天就来讲讲三大新手型开发工…

多张照片怎么打包发给别人?几个步骤轻松搞定!

在工作和生活中&#xff0c;我们常常需要发送多张照片&#xff0c;因为照片数量较多&#xff0c;打包可以减少发送时间&#xff0c;提高发送效率。那么如何操作呢&#xff1f;有什么好用的软件呢&#xff1f;下面向大家介绍三种常用的软件。 方法一&#xff1a;使用7-zip 1、在…

uni-app打包apk实现自动更新

一、直接复制粘贴就可用(豪横) app.vue文件里写 //app.vue里写 <script>export default {onShow: function() {console.log(App Show)},onHide: function() {console.log(App Hide)},onLaunch: function() {let appVersion uni.getSystemInfo({success: function(e) {ap…

更新电脑显卡驱动的操作方法有哪些?

更新显卡驱动可以有效的提升我们电脑的性能&#xff0c;可以通过设备管理器、显卡驱动软件等方式进行检查驱动是否需要更新&#xff0c;并修复一些电脑上已知的显卡问题。 然而&#xff0c;对于一些不是很懂电脑技术的人员来说&#xff0c;更新电脑显卡驱动是一件比较复杂和混乱…

视频号视频提取小程序,快速下载视频号视频

​视频号提取小程序可以帮助用户方便地从视频号视频平台获取到自己喜欢的视频号内容。通过这个小程序&#xff0c;你可以快速搜索并提取出视频号&#xff0c;并进行相关的操作。 据悉视频下载bot小程序目前已经更名为【提取下载小助手】 使用视频号提取小程序有以下几个步骤&…

1.验证码绕过

1.环境 1.前端验证码 抓包 发到重放器 可重复使用 爆破 总结&#xff0c;前端的验证直接删除验证码即可开始爆破 服务端 3.token 爆破

pycharm远程连接Linux服务器

文章目录 一&#xff1a;说明二&#xff1a;系统三&#xff1a;实现远程连接方式一&#xff1a; 直接连接服务器不使用服务器的虚拟环境步骤一&#xff1a;找到配置服务器的地方步骤二&#xff1a;进行连接配置步骤三&#xff1a;进行项目文件映射操作步骤四&#xff1a;让文件…

如何在群晖Synology+Office实现多人编辑一个文件?

使用群晖Synology Office提升生产力&#xff1a;多人同时编辑一个文件 文章目录 使用群晖Synology Office提升生产力&#xff1a;多人同时编辑一个文件本教程解决的问题是&#xff1a;1. 本地环境配置2. 制作本地分享链接3. 制作公网访问链接4. 公网ip地址访问您的分享相册5. 制…

在Go项目中二次封装Kafka客户端功能

1.摘要 在上一章节中,我利用Docker快速搭建了一个Kafka服务,并测试成功Kafka生产者和消费者功能,本章内容尝试在Go项目中对Kafka服务进行封装调用, 实现从Kafka自动接收消息并消费。 在本文中使用了Kafka的一个高性能开源库Sarama, Sarama是一个遵循MIT许可协议的Apache Kafk…

InstructionGPT

之前是写在[LLM&#xff1a;提示学习Prompt Learning]里的&#xff0c;抽出来单独讲一下。 基本原理 在做下游的任务时&#xff0c;我们发现GPT-3有很强大的能力&#xff0c;但是只要人类说的话不属于GPT-3的范式&#xff0c;他几乎无法理解。例如&#xff0c;我们说把句子A变…

Android Studio Logcat日志VIVO手机显示*号问题

咨询VIVO客服 1、拨盘输入 *#06# 获取串码&#xff0c;发送给客服 2、拨号盘输入*#*#112#*#*-右上角菜单-更多-一键授权 注意不要刷机&#xff0c;恢复出厂设置&#xff0c;手动取消授权哦

【Linux】Linux任务管理与守护进程

Linux任务管理与守护进程 一、任务管理1、进程组概念2、作业概念3、会话概念4、相关操作 二、守护进程1、守护进程的创建2、守护进程的库函数 一、任务管理 1、进程组概念 在Linux中&#xff0c;每个进程除了有一个进程ID之外&#xff0c;还有一个属性是进程组(PGID)&#xff…

CAD迷你看图 mac v4.4.5

CAD迷你看图是一款小巧的DWG文件浏览小工具&#xff0c;支持AutoCAD DWG/DXF等常用图纸文件&#xff0c;可脱离AutoCAD快速浏览DWG图纸&#xff0c;并提供了平移、缩放、全屏等功能。该软件采用独特的云技术&#xff0c;根据不同DWG图纸的需要自动装载相应字体&#xff0c;解决…

木马文件检测系统 毕业设计 JAVA+Vue+SpringBoot+MySQL

项目编号&#xff1a;S041&#xff0c;源码已在 Bilibili 中上架&#xff0c;需要的朋友请自行下载。 https://gf.bilibili.com/item/detail/1104375029为了帮助小白入门 Java&#xff0c;博主录制了本项目配套的《项目手把手启动教程》&#xff0c;希望能给同学们带来帮助。 …

Linux网络编程:IP协议

目录 一. IP协议的功能 二. IP协议报头 2.1 IP报头的格式 2.2 IP报头各部分含义 三. IP报文的分片问题 3.1 什么是分片 3.2 分片的原理 3.3 合并报文 四. 网段划分 4.1 网络号和主机号 4.2 网络号和主机号的划分策略 4.3 特殊的IP地址 4.4 IP地址数量不足问题 五.…

「滚雪球学Java」:Java前序篇(章节汇总)

&#x1f3c6;本文收录于「滚雪球学Java」专栏&#xff0c;专业攻坚指数级提升&#xff0c;助你一臂之力&#xff0c;带你早日登顶&#x1f680;&#xff0c;欢迎大家关注&&收藏&#xff01;持续更新中&#xff0c;up&#xff01;up&#xff01;up&#xff01;&#xf…

竞赛选题 深度学习卷积神经网络垃圾分类系统 - 深度学习 神经网络 图像识别 垃圾分类 算法 小程序

文章目录 0 简介1 背景意义2 数据集3 数据探索4 数据增广(数据集补充)5 垃圾图像分类5.1 迁移学习5.1.1 什么是迁移学习&#xff1f;5.1.2 为什么要迁移学习&#xff1f; 5.2 模型选择5.3 训练环境5.3.1 硬件配置5.3.2 软件配置 5.4 训练过程5.5 模型分类效果(PC端) 6 构建垃圾…

文心一言 VS 讯飞星火 VS chatgpt (121)-- 算法导论10.4 2题

二、用go语言&#xff0c;给定一个n结点的二叉树&#xff0c;写出一个 O(n)时间的递归过程&#xff0c;将该树每个结点的关键字输出。 文心一言&#xff1a; 在Go语言中&#xff0c;你可以使用递归函数来遍历二叉树的所有节点&#xff0c;并输出每个节点的关键字。以下是一个…

OpenLayers入门,OpenLayers从vue的assets资源路径加载geojson文件并解析数据叠加到地图上,以加载世界各国边界为例

专栏目录: OpenLayers入门教程汇总目录 前言 本章以加载世界各国边界的GeoJSON格式数据为例,讲解如何使用OpenLayers从vue的assets资源路径加载geojson文件并解析数据叠加到地图上。 二、依赖和使用 "ol": "^6.15.1"使用npm安装依赖npm install ol@…

redis爆满导致数据丢失

记一则redis爆满导致数据丢失的一场事故 某功能上线后&#xff0c;发现出现问题&#xff0c;最后定位到了 redis. 由于存储的数据过多&#xff0c;导致阿里云4G大小的 redis 爆满&#xff0c;触发了回收策略。 于是临时扩容,运维同学当时未找到阿里云配置。 后面我用工具连接了…