Flink 入门案例介绍

一、工程搭建

  • 在 IDEA 中创建一个 Maven 工程:FlinkTutorial

  • 在 pom 文件中引入依赖:

    <dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><!-- 2.12 是scala版本 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency>
    </dependencies>
    

二、批处理 WordCount 案例

package com.app.wc// 批处理 WordCount
public class WordCount {public static void main(String[] args) throws Exception {// 1.创建 flink 执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2.读取文件数据// DataSource 是 Operator 的子类,Operator 是 DataSet 的子类// Flink 的批处理是基于 DataSet 类型的 API 来处理DataSource<String> inputData = env.readTextFile("datas/word.txt");// 3.执行数据处理(按空格分词并转换成 (word, 1) 这样的二元组格式),分组聚合DataSet<Tuple2<String, Integer>> result = inputData.flatMap(new MyFlatMap())  //需要传入FlatMapFunction接口的实现类.groupBy(0)  //可以传入KeySelector实现类或位置索引或字段名.sum(1);  // 传入进行聚合计算的位置索引// 4.输出result.print();}// 自定义FlatMapFunction接口的实现类,并定义输入和输出泛型,实现 flatMap 方法// Tuple2 是 flink 包下的,区别于 Scala 中的 Tuple2public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {@overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 按空格分词String[] words = value.split(" ");// 遍历数组并转换为二元组输出for(String word : words) {out.collect(new Tuple2(word, 1));}}}
}

三、有界流处理 WordCount 案例

package com.app.wc// 流处理WordCount
public class StreamWordCount {public static void main(String[] args) throws Exception {// 1.创建flink流处理执行环境对象StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(8); // 设置并发度// 2.读取文件StreamDataSource<String> inputData = env.readTextFile("datas/word.txt");// 3.处理数据(分词,转换结构),并分组聚合DataStream<Tuple2<String, Integer>> result = inputData.flatMap(new MyFlatMap()).keyBy(0).sum(1);// 4.输出result.print();// 5.执行任务(流处理是事件触发的)env.execute();}// 自定义FlatMapFunction接口的实现类,并定义输入和输出泛型,实现 flatMap 方法// Tuple2 是 flink 包下的,区别于 Scala 中的 Tuple2public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {@overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 按空格分词String[] words = value.split(" ");// 遍历数组并转换为二元组输出for(String word : words) {out.collect(new Tuple2(word, 1));}}}
}

四、无界流处理 WordCount 案例

方便生产环境部署

package com.app.wcpublic class StreamWordCount2 {public static void main(String[] args) throws Exception {// 1.创建flink流处理执行环境对象StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(8); // 设置并发度// 2.监听 7777 端口服务(nc -lk 7777)// 2.1 使用 ParameterTool 类从启动参数中获取配置项ParameterTool tool = ParameterTool.formArgs(args);String hostname = tool.get("hostname");int port = tool.getInt("port");// 2.2 获取数据流DataStream<String> inputData = env.socketTextFile(hostname, port);// 3.处理数据(分词,转换结构),并分组聚合DataStream<Tuple2<String, Integer>> result = inputData.flatMap(new MyFlatMap()).keyBy(0).sum(1);// 4.输出result.print();// 5.执行任务(流处理是事件触发的)env.execute();}// 自定义FlatMapFunction接口的实现类,并定义输入和输出泛型,实现 flatMap 方法// Tuple2 是 flink 包下的,区别于 Scala 中的 Tuple2public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {@overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 按空格分词String[] words = value.split(" ");// 遍历数组并转换为二元组输出for(String word : words) {out.collect(new Tuple2(word, 1));}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/24927.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SAP ABAP 往数据库表里加数据

目录 方法一&#xff1a;SE16N SE11 方法二&#xff1a;创建维护VIEW&#xff1a;SE11 SM30 Error补充说明&#xff1a; 方法一&#xff1a;SE16N SE11 首先SE16N 进来。 进来之后在テーブル的位置输入表名&#xff0c;然后点击执行&#xff08;F8&#xff09; 如果第一次…

spring 解决循环依赖

在 spring 框架中&#xff0c;我们知道它是通过三级缓存来解决循环依赖的&#xff0c;那么它具体是怎么实现的&#xff0c;以及是否必须需要三级缓存才能解决循环依赖&#xff0c;本文来作相关介绍。 具体实现 先来看看它的三级缓存到底是什么&#xff0c;先看如下代码&#…

Unity动画录制工具在运行时录制和保存模型骨骼运动的方法录制动画给其他角色模型使用支持JSON、FBX等格式

如果您正在寻找一种在运行时录制和保存模型骨骼运动的方法&#xff0c;那么此插件是满足您需求的完美解决方案。 实时录制角色运动 将录制到的角色动作转为动画文件 将录制好的动作给新的角色模型使用&#xff0c;完美复制 支持导出FBX格式 操作简单&#xff0c;有按钮界面…

selenium的使用教程

Selenium简介 Selenium是一个用于Web应用程序自动化测试工具。它支持多种浏览器&#xff0c;可以录制、编辑和运行自动化测试。通过Selenium&#xff0c;我们可以编写脚本来模拟用户在浏览器中的操作&#xff0c;从而进行功能测试。 二、安装与配置 安装Selenium库 使用pip安…

unity中通过实现底层接口实现非按钮(图片)的事件监听

编写监听脚本 PEListenter 继承自MonoBehaviour类&#xff0c;并实现了IPointerDownHandler、IPointerUpHandler和IDragHandler接口&#xff0c;按照需求定义需要接收事件&#xff08;鼠标按下、抬起、拖拽&#xff09;的回调函数 //监听类&#xff08;需要挂载在物体上面&am…

关于AD9777芯片的说明以及FPGA控制实现 I

关于AD9777芯片的说明以及FPGA控制实现 I 语言 :Verilg HDL 、VHDL EDA工具:ISE、Vivado、Quartus II 关于AD9777芯片的说明以及FPGA控制实现 I一、引言二、AD9777主要特色1. 高分辨率和高速数据率:2. 可编程插值滤波器:3. 数字正交调制能力:4. 低功耗:5. SPI接口:6. 内…

【机器学习】我们该如何评价GPT-4o?GPT-4o的技术能力分析以及前言探索

目录 &#x1f926;‍♀️GPT-4o是什么&#xff1f; &#x1f68d;GPT-4o的技术能力 1. 自然语言理解 2. 自然语言生成 3. 对话系统 4. 语言翻译 5. 文本纠错 6. 知识问答 7. 定制和微调 8. 透明性和可解释性 9. 扩展性 &#x1f690;版本对比分析 1. GPT-4标准版 …

像素蛋糕Photoshop颜色导出不一致问题分析与解决

问题点&#xff1a;发现用像素蛋糕修完图明天应该为最右边图片显示 模特应该是白皙的&#xff0c;但是导出图片无论是否勾选SRGB都表现的为种间图片颜色一样 饱和度巨高。 问题分析&#xff1a;那这一定是颜色配置文件出现问题&#xff0c;找到客服表示可以去PS打开看是否与预…

Linux之进程信号详解【上】

&#x1f30e; Linux信号详解 文章目录&#xff1a; Linux信号详解 信号入门 技术应用角度的信号 信号及信号的产生       信号的概念       信号的处理方式 信号的产生方式         键盘产生信号         系统调用产生信号         软件…

P1072 [NOIP2009 提高组] Hankson 的趣味题

Hankson 的趣味题 这题要有思维&#xff01;对。数论&#xff01;最大公约数与最小公倍数。 用LaTex写公式&#xff0c;真的麻烦&#xff01;wcnmd!,,,,,,be---- 于是我用手写了&#xff1a; 大功告成&#xff01;上马&#xff01; #include<cstdio> using namespace …

国产大模型

层出不穷的大模型产品&#xff0c;你怎么选&#xff1f; 国产大模型的发展历史可以大致分为以下几个阶段&#xff1a; 起步阶段&#xff1a; 起始时间&#xff1a;早期阶段&#xff0c;国产大模型的发展较为缓慢&#xff0c;但已有企业开始探索相关领域。重要事件&#xff1a;…

MyBatis插件机制

MyBatis插件机制是该框架提供的一种灵活扩展方式&#xff0c;允许开发者在不修改框架源代码的情况下对MyBatis的功能进行定制和增强。这种机制主要通过拦截器&#xff08;Interceptor&#xff09;实现&#xff0c;使得开发者可以拦截和修改MyBatis在执行SQL语句过程中的行为。 …

类加载的奥秘

一、类的加载过程将类的字节码文件加载到Java虚拟机中进行执行。 1.通过一个类的全限定名来获取定义此类的二进制流字节码文件(如zip 包、网络、运算生成、JSP 生成、数据库读取等)。 2.将这个字节流所代表的静态存储结构&#xff08;如常量池、字段、方法等&#xff09;转化为…

STM32使用HAL库时 UART ErrorCode

在STM32的UART&#xff08;通用异步收发传输器&#xff09;通信中&#xff0c;ErrorCode用于指示UART通信过程中发生的错误。这些错误码通常定义在STM32 HAL&#xff08;硬件抽象层&#xff09;库中&#xff0c;以便用户能够方便地识别和处理各种通信错误。以下是一些常见的STM…

两轮自平衡小车资料(L298N 模块原理图及使用说明+c源码)

本文详细介绍了基于STM32微控制器的两轮自平衡小车的设计与实现过程。内容包括小车的硬件选型、电路设计、软件编程以及PID控制算法的应用。通过陀螺仪和加速度计获取小车的姿态信息&#xff0c;利用PID控制算法调整电机输出&#xff0c;实现小车的自主平衡。此外&#xff0c;还…

[图解]企业应用架构模式2024新译本讲解12-领域模型5

1 00:00:00,560 --> 00:00:04,690 刚才是往那个表里面添加数据了 2 00:00:04,700 --> 00:00:07,960 相当于&#xff0c;或者往这个合同里面添加数据了 3 00:00:08,430 --> 00:00:09,530 现在要查询怎么办 4 00:00:09,900 --> 00:00:10,930 跟前面一样 5 00:00:…

简单的基于threejs和BVH第一人称视角和第三人称视角控制器

渲染框架是基于THREE,碰撞检测是基于BVH。本来用的是three自带的octree结构做碰撞发现性能不太好 核心代码&#xff1a; import * as THREE from three import { RoundedBoxGeometry } from three/examples/jsm/geometries/RoundedBoxGeometry.js; import { MeshBVH, MeshBVHHe…

计算机系统基础笔记(12)——控制

前言 在持续输出ing 一、条件码 1.处理器状态&#xff08;x86-64&#xff0c;部分的&#xff09; 当前程序的执行信息 ◼ 临时数据 ◼ 运行时栈的位置&#xff08;栈顶&#xff09; ◼ 当前代码控制点的位置&#xff08;即将要执行的指令地址&#xff09; ◼ 最近一次指令执…

【C++关键字】auto的使用(C++11)

auto的使用&#xff08;C11&#xff09; auto关键字auto的使用细则auto使用场景 随着程序的复杂化&#xff0c;程序中用到的类型也越来越复杂化&#xff0c;经常体现在&#xff1a; 1.类型难以拼写 2.含义不明确导致容易出错 在C语言阶段处理这类问题的方法&#xff0c;可以使…

拉格朗日乘子将不等式约束转化为等式约束例子

拉格朗日乘子将不等式约束转化为等式约束例子 在优化问题中,常常需要将不等式约束转化为等式约束。使用拉格朗日乘子法,可以通过引入松弛变量将不等式约束转换为等式约束,然后构造拉格朗日函数进行求解。 拉格朗日乘子法简介 拉格朗日乘子法是求解带约束优化问题的一种方…