Apache Flink 入门

零、概述

Apache Flink 是一个高性能的开源分布式流处理框架,专注于实时数据流的处理。

它设计用于处理无界和有界数据流,在内存级速度下提供高效的有状态计算。

Flink 凭借其独特的Checkpoint机制和Exactly-Once语义,确保数据处理的准确性和一致性,同时支持高吞吐量和低延迟。

通过灵活的窗口操作和丰富的状态管理功能,Flink 能够应对复杂的实时数据处理需求,是大数据处理领域的重要技术之一。

其强大的DataStream API和Table API为开发者提供了高效、简洁的数据处理手段。

一、添加依赖 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xch</groupId><artifactId>java-flink</artifactId><version>1.0-SNAPSHOT</version><properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><flink.version>1.12.2</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency></dependencies></project>

二、map() filter() flatMap()方法示例

2.1 map()方法示例

简单处理,和java8的stream的map()类似,不过只能进行简单的处理,返回:数组元素自身的和

public static List<Integer> mapDemo(DataSource<Integer> dataSteam) throws Exception {return dataSteam.map(x -> x + x).collect();
}

2.2 filter()方法示例

过滤方法,返回偶数,

public static List<Integer> filterDemo(DataSource<Integer> dataSteam) throws Exception {return dataSteam.filter(x -> x % 2 == 0).collect();
}

2.3 flatMap()方法示例

flatMap方法可以处理复杂、定制化的逻辑,返回元素的类型也可以是复杂的;

  • 第一个简单处理的示例
public static List<Object> flatMapDemo(DataSource<Integer> dataSteam) throws Exception {return dataSteam.flatMap(new FlatMapFunction<Integer, Object>() {@Overridepublic void flatMap(Integer integer, Collector<Object> collector) throws Exception {collector.collect(integer);collector.collect(integer * integer);}}).collect();
}
  • 第二个复杂的示例
public static List<Map<Integer, Object>> flatMapDemo1(DataSource<Integer> dataSteam) throws Exception {return dataSteam.flatMap(new FlatMapFunction<Integer, Map<Integer, Object>>() {@Overridepublic void flatMap(Integer integer, Collector<Map<Integer, Object>> collector) throws Exception{Map<Integer, Object> hashMap = new HashMap<>();hashMap.put(integer, integer * integer);collector.collect(hashMap);}}).collect();
}

2.4 示例演示

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;import java.util.HashMap;
import java.util.List;
import java.util.Map;public class FlinkDemo {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSource<Integer> dataSteam = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);System.out.println("mapDemo:" + mapDemo(dataSteam));System.out.println("filterDemo:" + filterDemo(dataSteam));System.out.println("flatMapDemo:" + flatMapDemo(dataSteam));System.out.println("flatMapDemo1:" + flatMapDemo1(dataSteam));}
}

输出内容:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/47289.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue中:class、watch、v-show使用

1、:class 指令 在 Vue.js 中&#xff0c;:class 指令&#xff08;或 v-bind:class&#xff09;允许你动态地绑定 CSS 类到一个元素。这个指令有两种主要的使用方式&#xff1a;绑定一个对象或者绑定一个数组。 1.1、:class{} 对象语法 对象语法允许你基于条件来添加或移除类…

如何在网页中对视频进行截图

在网页开发中&#xff0c;我们经常需要对视频进行截图&#xff0c;以便在文章或博客中展示视频的某个瞬间。HTML5 提供了强大的 <video> 标签&#xff0c;使得在网页中嵌入视频变得简单。但是&#xff0c;如何对这些视频进行截图呢&#xff1f;本文将介绍一种简单的方法&…

只用 CSS 能玩出什么花样?

在前端开发领域&#xff0c;CSS 不仅仅是一种样式语言&#xff0c;它更像是一位多才多艺的艺术家&#xff0c;能够创造出令人惊叹的视觉效果。本文将带你探索 CSS 的无限可能&#xff0c;从基本形状到动态动画&#xff0c;从几何艺术到仿生设计&#xff0c;只用 CSS 就能玩出令…

Vscode中Github copilot插件无法使用(出现感叹号)解决方案

1、击扩展或ctrl shift x ​​​​​​​ 2、搜索查询或翻找到Github compilot 3、点击插件并再左侧点击登录github 点击Sign up for a ... 4、跳转至github登录页&#xff0c;输入令牌完成登陆后返回VScode 5、插件可以正常使用

社会科学战线

《社会科学战线》&#xff08;以下简称《战线》&#xff09;是吉林省社会科学院主办的大型综合性人文社会科学类期刊&#xff0c;创刊于1978年5月&#xff0c;月刊&#xff0c;每期约50万字。内容涵盖哲学、历史学、文学、经济学、政治学、法学、社会学、教育学等人文社会科学学…

微服务实战系列之玩转Docker(三)

前言 镜像&#xff08;Image&#xff09;作为Docker的“水源”&#xff0c;取之于它&#xff0c;用之于它。这对于立志成为运维管理的撒手锏——Docker而言&#xff0c;重要性不言而喻。 我们在虚拟机时代&#xff08;当然现在依然ing…&#xff09;&#xff0c;如何快速完成…

成为CMake砖家(5): VSCode CMake Tools 插件基本使用

大家好&#xff0c;我是白鱼。 之前提到过&#xff0c;白鱼的主力 编辑器/IDE 是 VSCode&#xff0c; 也提到过使用 CMake Language Support 搭配 dotnet 执行 CMakeLists.txt 语法高亮。 对于阅读 CMakeLists.txt 脚本&#xff0c; 这足够了。 而在 C/C 开发过程中&#xff…

NXP i.MX8系列平台开发讲解 - 3.19 Linux TTY子系统(二)

专栏文章目录传送门&#xff1a;返回专栏目录 Hi, 我是你们的老朋友&#xff0c;主要专注于嵌入式软件开发&#xff0c;有兴趣不要忘记点击关注【码思途远】 目录 1. Linux 串口驱动 1.1 Uart 驱动注册流程 1.2 uart 操作函数 1.3 line discipline 2. Linux tty应用层使用…

FPGA 实现DDR4的读写

1 硬件设计 FPGA 端&#xff1a; DDR4: 2 验证方案 3 仿真验证 4 DDR4 下板验证

《昇思25天学习打卡营第25天|第10天》

今天是打卡的第十天&#xff0c;今天开始学应用实践中的LLM原理和实践&#xff0c;今天学的是基于MindSpore实现BERT对话情绪识别。最先了解的是BERT模型的简介&#xff08;来自变换器的双向编码器表征量&#xff08;Bidirectional Encoder Representations from Transformers&…

递归锁与普通锁的区别

什么是锁&#xff1f; 在多线程编程中&#xff0c;锁是一种机制&#xff0c;用来确保某些代码块在同一时间只能被一个线程执行。想象一下&#xff0c;你和你的朋友们都想同时进入一个只有一把椅子的房间。为了避免混乱&#xff0c;你们需要一个锁来控制进入的顺序。 普通锁&a…

PHP 包含

PHP 包含 PHP 是一种广泛使用的开源服务器端脚本语言,它在 web 开发中扮演着重要的角色。PHP 的一个核心特性是其能够包含其他文件,这允许开发者将代码分割成可重用的模块,从而提高代码的可维护性和组织性。本文将深入探讨 PHP 中的文件包含机制,包括其工作原理、使用场景…

Java —— 内部类

Java内部类 1.什么是内部类&#xff1f; 将一个类A定义在另一个类B里面&#xff0c;里面的类A就称为内部类&#xff08;InnerClass&#xff09;&#xff0c;类B则称为外部类&#xff08;OuterClass&#xff09;。 2.为什么需要内部类&#xff1f; 具体来说&#xff0c;当一…

大数据量批处理场景处理

大数据量多线程批处理工具&#xff1a;MultiThreadMamager: 基于线程池实现的动态管理工具 基于ExecutorServiceTaskAbstract抽象实现内部方法&#xff1a;material()获取数据、processing()取数结果处理。 由管理工具去调用线程池执行任务和任务自动迭代处理 - Gitee.com 主要…

springboot 重新注册 bean

项目中&#xff0c;有时候会遇到这样的需求&#xff1a;更新配置后&#xff0c;需要重新处理相关的业务&#xff0c;但是不想重启应用。例如 elasticsearch 证书过期后&#xff0c;需要更换 http_ca.crt &#xff0c;但是又不想重启应用。 本人对 spring IOC 的源码不算深入&a…

NodeJS技巧:在循环中管理异步函数的执行次数

背景介绍 在现代Web开发中&#xff0c;NodeJS因其高效的异步处理能力而备受青睐。尤其在数据抓取、网络爬虫等应用场景中&#xff0c;NodeJS的非阻塞I/O特性使其成为不二之选。然而&#xff0c;在实际编程过程中&#xff0c;我们经常会遇到一个棘手的问题——如何在循环中控制…

HTC 10 刷系统 LineageOS 19.1 Android 12

解锁手机 解锁或导致数据全部清除&#xff0c;注意保存 Bootloader解锁&#xff0c;S-ON可以不用解锁&#xff08;好像可以绕过解锁安装twrp&#xff0c;暂时没尝试&#xff09; HTC 官方 Unlock Bootloader HTC Desire 20 pro 可以不通过官方网站解锁 adb reboot bootload…

Unity入门之重要组件和API(4) : Screen

Screen类主要处理屏幕相关的操作。 1.静态属性 1.1常用属性 【设备分辨率】 Resolution resolution Screen.currentResolution; print("设备分辨率宽&#xff1a;" resolution.width " 高&#xff1a;" resolution.height);【屏幕窗口的宽高】 这里…

Spring源码-读取XML文件配置信息

一、XML基础 DTD、XML阐述、XML的两种文档类型约束和DTD的使用-CSDN博客 二、new File()、getResource()、getResourceAsStream() Java中Class类和 ClassLoader 类 的 getResource()和 getResourceAsStream()方法的使用_getclassloader().getresourceasstream-CSDN博客 三、…

svn ldap认证临时切换到本地认证

当前的svn是在CentOS 7 下 SVN、 Apache 对接 LDAP 服务实现用户账号管理和权限认证&#xff0c;本文模拟ldap数据丢失如何恢复svn&#xff0c;方法是临时将认证切换到本地认证 编辑subversion.conf文件 vi /etc/httpd/conf.d/subversion.conf 注释ldap-status #<Locati…