SpringBoot集成flink

Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并行化计算的流数据处理引擎。
最大亮点是流处理,最适合的应用场景是低时延的数据处理。
场景:高并发pipeline处理数据,时延毫秒级,且兼具可靠性。

环境搭建:

①、安装flink

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/try-flink/local_installation/

②、安装Netcat

Netcat(又称为NC)是一个计算机网络工具,它可以在两台计算机之间建立 TCP/IP 或 UDP 连接。
用于测试网络中的端口,发送文件等操作。
进行网络调试和探测,也可以进行加密连接和远程管理等高级网络操作

yum install -y nc # 安装nc命令nc -lk 8888 # 启动socket端口

无界流之读取socket文本流

一、依赖


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springboot-demo</artifactId><groupId>com.et</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>flink</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- 添加 Flink 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.17.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>1.17.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>1.17.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.handlers</resource></transformer><transformerimplementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer"><resource>META-INF/spring.factories</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.schemas</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.et.flink.job.SocketJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project>

二、SoketJob

public class SocketJob{public static void main(String[] args)throws Exception{// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 指定并行度,默认电脑线程数env.setParallelism(3);// 读取数据socket文本流 指定监听 IP 端口 只有在接收到数据才会执行任务DataStreamSource<String> socketDS = env.socketTextStream("172.24.4.193", 8888);// 处理数据: 切换、转换、分组、聚合 得到统计结果SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}).setParallelism(2)// // 显式地提供类型信息:对于flatMap传入Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式设置系统当前返回类型,才能正确解析出完整数据.returns(new TypeHint<Tuple2<String, Integer>>() {})
//                .returns(Types.TUPLE(Types.STRING,Types.INT)).keyBy(value -> value.f0).sum(1);// 输出sum.print();// 执行env.execute();}
}

测试:

启动socket流:

nc -l 8888

本地执行:直接ideal启动main程序,在socket流中输入

abc bcd cde
bcd cde fgh
cde fgh hij

在这里插入图片描述

集群执行:
执行maven打包,将打包的jar上传到集群中
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/724212.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

鸿蒙NEXT开发实战:【视频文件裁剪】

使用OpenHarmony系统提供的ffmpeg三方库的能力在系统中实现了音视频文件裁剪的功能&#xff0c;并通过NAPI提供给上层应用调用。 基础信息 视频文件裁剪 简介 在OpenHarmony系统整个框架中有很多子系统&#xff0c;其中多媒体子系统是OpenHarmony比较重要的一个子系统&#…

Seata 2.x 系列【1】专栏导读

有道无术&#xff0c;术尚可求&#xff0c;有术无道&#xff0c;止于术。 本系列Spring Boot 版本 3.1.0 本系列Seata 版本 2.0.0 源码地址&#xff1a;https://gitee.com/pearl-organization/study-seata-demo 文章目录 1. 背景2. 简介3. 适用人群4. 环境及版本5. 文章导航5…

Spring基础——方法注入(Method Injection)

目录 查找方法注入&#xff08;Lookup Method&#xff09;查找方法注入基于XML的方法注入基于注解的方法注入 Arbitrary Method Replacement&#xff08;任意方法替换&#xff09; 文章所用项目源码参考&#xff1a;java_spring_learn_repo 查找方法注入&#xff08;Lookup Met…

解决微信好友添加频繁问题

今天我们来聊一聊微信好友添加频繁的问题。在日常使用中&#xff0c;有时候我们会遇到一些添加好友受限的情况&#xff0c;那么究竟是什么原因导致了这一问题呢&#xff1f;接下来&#xff0c;让我们逐一来看一看。 1. 添加好友的频率太高 首先&#xff0c;如果我们在短时间内…

Java必须掌握的红黑树(含面试大厂题含源码)

当面试官要求你实现一个红黑树时&#xff0c;可能会给你一些提示或者要求&#xff0c;比如要求实现插入、删除、查找等操作。下面是一个简单的红黑树实现示例&#xff0c;包含了插入操作&#xff1a; class RedBlackTree {private static final boolean RED false;private st…

.NetCore6.0实现ActionFilter过滤器记录接口请求日志

文章目录 目的实现案例&#xff1a;一.首先我们新建一个WebApi项目二.配置 appsettings.json 文件&#xff0c;配置日志存放路径三.创建 Model 文件夹&#xff0c;创建AppConfig类和ErrorLog类1.在AppConfig类中编写一个GetConfigInfo方法获取配置文件中的值2.在ErrorLog类中&a…

供应josef约瑟DL-24C电流继电器 额定电流3A 整定范围0.5-2A 电气控制必备元件

电流继电器是一种特殊的电子控制器件&#xff0c;具有控制系统和被控制系统&#xff0c;它使用较小的电流去控制较大的电流&#xff0c;起到自动开关的作用。以下是电流继电器的特征&#xff1a; 承载大电流&#xff1a;电流继电器可以承载大电流&#xff0c;通常能够承受数十…

SpringBoot集成Docker

Docker是一个开源的应用容器引擎&#xff0c;它允许开发者将应用及其依赖打包到一个可移植的容器中。 一、依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://ww…

SpringBoot 接口防刷

1&#xff1a;pom添加依赖 <dependency><groupId>net.jodah</groupId><artifactId>expiringmap</artifactId><version>0.5.10</version></dependency> 2&#xff1a;封装工具 Slf4j public class RequestUtil {/*** 1&#…

大语言模型(LLM):每个专业人士的完美助手

「大语言模型&#xff08;LLM&#xff09;革命」&#xff1a;ChatGPT如何引领工作效率新篇章 在不断发展的技术领域&#xff0c;像 ChatGPT 这样的大型语言模型 (LLM) 已成为各行业专业人士不可或缺的工具。 这篇博文探讨了大语言模型&#xff08;LLM&#xff09;在专业环境中的…

GDPU Java 天码行空2

实验2 类与封装性 文章目录 实验2 类与封装性&#xff08;一&#xff09;实验目的&#xff08;二&#xff09;实验内容和步骤&#xff08;1&#xff09;建立学生类和测试类。学生类中有成员变量&#xff1a;姓名&#xff0c;年龄&#xff1b;成员方法&#xff1a;学习&#xff…

Java 8日期时间类LocalDate、LocalDateTime详解

Java 8日期时间类LocalDate、LocalDateTime详解 一、常见类型转换1.1 LocalDateTime 转 LocalDate1.2 LocalDateTime 转 Date1.3 LocalDate 转 LocalDateTime1.4 LocalDate 转 Date1.5 Date 转 LocalDateTime1.6 Date 转 LocalDate1.7 LocalDate 转 String1.8 String 转 LocalD…

程序逻辑控制

1.java的三大结构 可以说java的这三大结构包括其中的语句跟c语言上的基本上都是一样的。现在就当重新复习一遍吧&#xff01; 1.顺序结构 2.分支结构 if语句 跟c语言的语法一模一样。就直接看文案了。 switch语句 java中的switch语句跟c语言中的switch几乎相同&#xff0c;…

AtCoder Beginner Contest 343 A~F

A.Wrong Answer&#xff08;模拟&#xff09; 题意&#xff1a; 给你两个整数 A A A和 B B B&#xff0c;它们介于 0 0 0和 9 9 9之间。 输出任何一个介于 0 0 0和 9 9 9之间且不等于 A B AB AB的整数。 分析&#xff1a; 按题意判断并输出一个整数即可。 代码&#xff…

[力扣 Hot100]Day39 对称二叉树

题目描述 给你一个二叉树的根节点 root &#xff0c; 检查它是否轴对称。 出处 思路 每一对对称的节点AB&#xff0c;A的左儿子和B的右儿子&#xff0c;A的右儿子和B的左儿子又是一对对称节点。 代码 class Solution { public:bool compare(TreeNode* r1, TreeNode* r2){i…

qnx display

05-SA8155 QNX Display框架及代码分析(1)_openwfd-CSDN博客 backlight p: 0 t: 0 00000 SHD -----ONLINE----- 2024/03/06 13:49:22.046 backlight p:1060958 t: 1 00000 ERR backlight_be[backlight_be.c:284]: pthread_create enter 2024/03/06 13…

python基础练习题目

1. 根据身高体重&#xff0c;判断人的胖瘦 描述&#xff1a; 通过身高和体重&#xff0c;判断一个人的胖瘦。国际上一般采用BMI体重指数&#xff0c;计算公式为BMI 体重 / 身高2(保留小数点后1位)&#xff0c;参考标准如下&#xff1a;‪‬‪‬‪‬‪‬‪‬‮‬‪‬‫‬‪‬‪…

回流和重绘

浏览器是如何进行页面渲染的&#xff1f; 1、解析&#xff08;parser&#xff09;HTML&#xff0c;生成DOM树&#xff1b; 2、同时解析css&#xff0c;生成样式规则&#xff1b; 3、根据DOM树和样式规则&#xff0c;生成渲染树&#xff1b; 4、进行布局Layout&#xff08;回流&…

SpringBoot集成RocketMQ

RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件&#xff0c;前身是MetaQ&#xff0c;是阿里参考Kafka特点研发的一个队列模型的消息中间件&#xff0c;后开源给apache基金会成为了apache的顶级开源项目&#xff0c;具有高性能、高可靠、高实时、分布式特点。 环境搭…

对VisionPro的认识,CogPMAlingTool模板匹配工具练习

什么是VisionPro&#xff1f; 在认识VisionPro之前我们需要先熟悉一下图片的各种格式 这里我们可以参考来自githubcurry博主的文章 图片各种格式的区别以及计算机如何存储图片 VisionPro 是由世界领先的机器视觉公司 Cognex 开发的一款专业机器视觉软件。它提供了强大的图像…