flinksql kafka到mysql累计指标练习

flinksql 累计指标练习

数据流向:kafka ->kafka ->mysql

模拟写数据到kafka topic:wxt中

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) throws Exception {// 设置kafka服务器地址和端口号String kafkaServers = "localhost:9092";// 设置producer属性Properties properties = new Properties();properties.put("bootstrap.servers", kafkaServers);properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建Kafka producer对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 发送消息String topic = "wxt";JSONObject jsonObject = new JSONObject();jsonObject.put("id", 9);jsonObject.put("name", "王大大");jsonObject.put("age", 11);// 将JSON对象转换成字符串String jsonString = jsonObject.toString();// 输出JSON字符串System.out.println("JSON String: " + jsonString);ProducerRecord<String, String> record = new ProducerRecord<>(topic, jsonString);producer.send(record);// 关闭producerproducer.close();}
}

kafka topic :wxt1
在这里插入图片描述

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;public class KafkaToMysqlJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();TableEnvironment tEnv = TableEnvironment.create(settings);// 定义Kafka连接属性String kafkaBootstrapServers = "localhost:9092";String kafkaTopic = "wxt";String groupId = "wxt1";// 注册Kafka表tEnv.executeSql("CREATE TABLE kafka_table (\n" +"  id INT,\n" +"  name STRING,\n" +"  age INT,\n" +"  proctime as PROCTIME()\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = '" + kafkaTopic + "',\n" +"  'properties.bootstrap.servers' = '" + kafkaBootstrapServers + "',\n" +"  'properties.group.id' = '" + groupId + "',\n" +"  'format' = 'json',\n" +"  'scan.startup.mode' = 'earliest-offset'\n" +")");// 注册Kafka表// latest-offset//earliest-offsettEnv.executeSql("CREATE TABLE kafka_table2 (\n" +"  window_start STRING,\n" +"  window_end STRING,\n" +"  name STRING,\n" +"  age INT\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'wxt2',\n" +"  'properties.bootstrap.servers' = '" + kafkaBootstrapServers + "',\n" +"  'properties.group.id' = 'kafka_table2',\n" +"  'format' = 'json',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'value.format' = 'csv'\n" +")");tEnv.executeSql("CREATE TABLE mysql_sink_table (\n" +"  window_start String,\n" +"   window_end String,\n" +"    name String,\n" +"    age INT\n" +") WITH (\n" +"   'connector' = 'jdbc',\n" +"   'url' = 'jdbc:mysql://localhost:3306/tests?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',\n" +"   'username' = 'root',\n" +"   'password' = '12345678',\n" +"   'table-name' = 'leiji_age'\n" +")");tEnv.executeSql("insert into kafka_table2 select cast(window_start as string) as window_start,cast(window_end as string) as window_end,name,sum(age) as age\n" +"from TABLE( CUMULATE( TABLE kafka_table, DESCRIPTOR(proctime), INTERVAL '20' SECOND, INTERVAL '1' DAY))\n" +"group by  window_start,window_end,name");tEnv.executeSql("insert into mysql_sink_table select window_start,window_end,name,age from kafka_table2");env.execute("KafkaToMysqlJob");}
}

kafka topic :wxt2
在这里插入图片描述
mysql结果数据:
在这里插入图片描述
pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>flinksql</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><scala.binary.version>2.12</scala.binary.version><flink.version>1.14.3</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-math3</artifactId><version>3.5</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.0.31</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><!-- https://mvnrepository.com/artifact/commons-io/commons-io --><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.11.0</version></dependency><dependency><groupId>org.antlr</groupId><artifactId>antlr-runtime</artifactId><version>3.5.2</version></dependency><dependency><groupId>org.apache.thrift</groupId><artifactId>libfb303</artifactId><version>0.9.3</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.15</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><configuration><!-- put your configurations here --></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-dependency-plugin</artifactId><version>2.10</version><executions><execution><id>copy-dependencies</id><phase>package</phase><goals><goal>copy-dependencies</goal></goals><configuration><outputDirectory>${project.build.directory}/lib</outputDirectory></configuration></execution></executions></plugin><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build></project>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/119247.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2M大小的PDF文档上传到LangChain-ChatGLM知识图谱中,大致需要的时间

对于将2M大小的PDF文档上传到LangChain-ChatGLM知识图谱中,大致需要的时间如下: PDF到文本的提取转换:若PDF内容主要为文本,此步骤约需要1-2分钟。 提取的文本经过预处理与分析:此步骤需要对文本进行分词、命名实体识别等处理,约需要2-5分钟。 抽取文本中的结构化知识(实体、…

Web前端面试之Vue—对Vue的理解

目录 一、web发展历程 二、vue是什么 三、Vue核心特性 组件化 数据驱动 指令 四、Vue与Angular以及React的区别 一、web发展历程 Web是World Wide Web的简称&#xff0c;中文译为万维网 我们可以将它规划成如下的几个时代来进行理解 静态网页&#xff1a;最早的网页是没…

一台服务器最大能支持多少条 TCP 连接

文章目录 1. 一台服务器最大能打开的文件数1.1 限制参数1.2 调整服务器能打开的最大文件数示例 2. 一台服务器最大能支持多少连接3. 一台客户端机器最多能发起多少条连接4. 其他5. 相关实际问题5.1 "too many open files" 报错是怎么回事&#xff0c;该如何解决5.2 一…

新手入门?初登开发者舞台的你所适合的三大开发工具?

对新手开发者来说&#xff0c;工具的简洁性和实用性和自己的产出直接挂钩&#xff0c;一个好用的工具往往会让编译代码减少很多麻烦&#xff0c;有哪些比较适合的工具&#xff0c;几乎成了每个新人必定会问的问题之一。 针对这些疑惑&#xff0c;今天就来讲讲三大新手型开发工…

【Python】【logging】限制输出

Logging配置示例 Logging的文件输出和终端需要单独配置&#xff0c;终端的配置项也可以和文件配置的输出不同&#xff0c;推荐配置相同&#xff0c;避免输出上的差异 import logging import os from logging.handlers import RotatingFileHandlerlogger logging.getLogger()…

多张照片怎么打包发给别人?几个步骤轻松搞定!

在工作和生活中&#xff0c;我们常常需要发送多张照片&#xff0c;因为照片数量较多&#xff0c;打包可以减少发送时间&#xff0c;提高发送效率。那么如何操作呢&#xff1f;有什么好用的软件呢&#xff1f;下面向大家介绍三种常用的软件。 方法一&#xff1a;使用7-zip 1、在…

安卓离线点击第一条通知跳转正常,第二条失败或者通过URL Scheme唤起App指定页面第一次成功,第二次失败

1、服务端intent参数中设置为launchFlags0x04000000。 intent:#Intent;launchFlags0x04000000;packagecn.ddd.mShop.android;componentcn.ddd.mShop.android/com.sss.mshop.page.activity.PushAndLinkMessageProcessingActivity;S.gttask“123”;end 2、尝试更改intent中设定…

分布式ID系统设计(1)

分布式ID系统设计(1) 在分布式服务中&#xff0c;需要对data和message进行唯一标识。 比如订单、支付等。然后在数据库分库分表之后也需要一个唯一id来表示。 基于DB的自增就肯定不能满足了。这个时候能够生成一个Global的唯一ID的服务就很有必要我们姑且把它叫做id-server 。…

uni-app打包apk实现自动更新

一、直接复制粘贴就可用(豪横) app.vue文件里写 //app.vue里写 <script>export default {onShow: function() {console.log(App Show)},onHide: function() {console.log(App Hide)},onLaunch: function() {let appVersion uni.getSystemInfo({success: function(e) {ap…

python基础语法(九)

目录 新增元素查找元素删除元素连接列表关于元组 感谢各位大佬对我的支持,如果我的文章对你有用,欢迎点击以下链接 &#x1f412;&#x1f412;&#x1f412;个人主页 &#x1f978;&#x1f978;&#x1f978;C语言 &#x1f43f;️&#x1f43f;️&#x1f43f;️C语言例题 &…

更新电脑显卡驱动的操作方法有哪些?

更新显卡驱动可以有效的提升我们电脑的性能&#xff0c;可以通过设备管理器、显卡驱动软件等方式进行检查驱动是否需要更新&#xff0c;并修复一些电脑上已知的显卡问题。 然而&#xff0c;对于一些不是很懂电脑技术的人员来说&#xff0c;更新电脑显卡驱动是一件比较复杂和混乱…

视频号视频提取小程序,快速下载视频号视频

​视频号提取小程序可以帮助用户方便地从视频号视频平台获取到自己喜欢的视频号内容。通过这个小程序&#xff0c;你可以快速搜索并提取出视频号&#xff0c;并进行相关的操作。 据悉视频下载bot小程序目前已经更名为【提取下载小助手】 使用视频号提取小程序有以下几个步骤&…

1.验证码绕过

1.环境 1.前端验证码 抓包 发到重放器 可重复使用 爆破 总结&#xff0c;前端的验证直接删除验证码即可开始爆破 服务端 3.token 爆破

pycharm远程连接Linux服务器

文章目录 一&#xff1a;说明二&#xff1a;系统三&#xff1a;实现远程连接方式一&#xff1a; 直接连接服务器不使用服务器的虚拟环境步骤一&#xff1a;找到配置服务器的地方步骤二&#xff1a;进行连接配置步骤三&#xff1a;进行项目文件映射操作步骤四&#xff1a;让文件…

ES6语法学习

1、every()和some()的区别 只要数组中有一个元素满足条件&#xff0c;some() 就返回 true&#xff1b; 只有当数组中的所有元素都满足条件时&#xff0c;every() 才返回 true。 var ageArr [3, 10, 18, 20];var isLowEvery ageArr.every(function(age) {return age < 10;…

如何在群晖Synology+Office实现多人编辑一个文件?

使用群晖Synology Office提升生产力&#xff1a;多人同时编辑一个文件 文章目录 使用群晖Synology Office提升生产力&#xff1a;多人同时编辑一个文件本教程解决的问题是&#xff1a;1. 本地环境配置2. 制作本地分享链接3. 制作公网访问链接4. 公网ip地址访问您的分享相册5. 制…

在Go项目中二次封装Kafka客户端功能

1.摘要 在上一章节中,我利用Docker快速搭建了一个Kafka服务,并测试成功Kafka生产者和消费者功能,本章内容尝试在Go项目中对Kafka服务进行封装调用, 实现从Kafka自动接收消息并消费。 在本文中使用了Kafka的一个高性能开源库Sarama, Sarama是一个遵循MIT许可协议的Apache Kafk…

Kubernetes(K8s)从入门到精通系列之十八:使用 Operator Lifecycle Manager(OLM) 安装operator

Kubernetes从入门到精通系列之十八&#xff1a;使用 Operator Lifecycle Manager OLM 安装operator 一、先决条件二、安装operator三、示例&#xff1a;安装最新版本的 Operator四、示例&#xff1a;安装特定版本的 Operator 从运算符目录中安装运算符 通过 CatalogSource 将 …

InstructionGPT

之前是写在[LLM&#xff1a;提示学习Prompt Learning]里的&#xff0c;抽出来单独讲一下。 基本原理 在做下游的任务时&#xff0c;我们发现GPT-3有很强大的能力&#xff0c;但是只要人类说的话不属于GPT-3的范式&#xff0c;他几乎无法理解。例如&#xff0c;我们说把句子A变…

Android Studio Logcat日志VIVO手机显示*号问题

咨询VIVO客服 1、拨盘输入 *#06# 获取串码&#xff0c;发送给客服 2、拨号盘输入*#*#112#*#*-右上角菜单-更多-一键授权 注意不要刷机&#xff0c;恢复出厂设置&#xff0c;手动取消授权哦