2、Flink DataStreamAPI 概述(下)

代码示例

Maven 依赖

<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.19.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.19.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency></dependencies>

log4j.properties

log4j.rootLogger=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

1、_01_QuickStart

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.util.Collector;import java.time.Duration;public class _01_QuickStart {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Tuple2<String, Integer>> dataStream = env.socketTextStream("localhost", 8888).flatMap(new Splitter()).keyBy(value -> value.f0).window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5))).sum(1);dataStream.print();//        JobExecutionResult jobExecutionResult = env.execute("Window WordCount");//程序完成时打印//JobExecutionResult=>Program execution finished//Job with JobID c01e1255752cbb34469a9a10177e637c has finished.//Job Runtime: 25596 ms
//        System.out.println("JobExecutionResult=>"+jobExecutionResult.getJobExecutionResult());JobClient jobClient = env.executeAsync("Window WordCount");// Java程序可以通过JobClient同Flink Job交互// jobID=>32e976f03ac7243c09a5cf07c0739921// jobStatus=>RUNNINGSystem.out.println("jobID=>"+jobClient.getJobID());System.out.println("jobStatus=>"+jobClient.getJobStatus().get());}public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {for (String word : sentence.split(",")) {out.collect(new Tuple2<String, Integer>(word, 1));}}}
}

2、_02_ReadFileSource

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;import java.io.File;public class _02_ReadFileSource {public static void main(String[] args) throws Exception {ExecutionConfig executionConfig = new ExecutionConfig();executionConfig.setAutoWatermarkInterval(1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(executionConfig.toConfiguration());env.setBufferTimeout(1000);System.out.println("自动生成水位线间隔=>"+env.getConfig().getAutoWatermarkInterval());//第一次打印//1> a//5> h//6> i//4> f//2> c//3> e//1> b//2> d//4> g//向本地文件中新增三行insert//第二次打印//5> insert//2> i//2> insert//3> insert//1> g//1> h//8> d//7> a//8> e//7> b//8> f//7> cDataStreamSource<String> source = env.readFile(new TextInputFormat(Path.fromLocalFile(new File("word.txt"))), "/Users/***/Desktop/word.txt", FileProcessingMode.PROCESS_CONTINUOUSLY, 10000);source.print();env.execute();}
}

3、_03_CollectAsync

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.CloseableIterator;public class _03_CollectAsync {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从元素列表创建一个 DataStreamDataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);CloseableIterator<Integer> iterator = myInts.collectAsync();env.execute();while (iterator.hasNext()){System.out.println("iterator=>"+iterator.next());}}
}

4、_04_JobClientStopWithSavepoint

import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.util.CloseableIterator;import java.io.File;// 在 程序完成时 或者 CheckPoint触发时 才会输出结果
public class _04_JobClientStopWithSavepoint {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(2000);DataStreamSource<String> source = env.readFile(new TextInputFormat(Path.fromLocalFile(new File("word.txt"))), "/Users/***/Desktop/word.txt", FileProcessingMode.PROCESS_CONTINUOUSLY, 10000);CloseableIterator<String> iterator = source.collectAsync();JobClient jobClient = env.executeAsync();//        TimeUnit.SECONDS.sleep(5);
//        jobClient.stopWithSavepoint(false,"/Users/hhx/Desktop/", SavepointFormatType.DEFAULT);while (iterator.hasNext()){System.out.println("iterator=>"+iterator.next());}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/4130.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Xcode隐私协议适配

1. Privacy manifest files 1.1 简介 自己App或三方SDK&#xff08;通过XCFrameworks|Swift packages|Xcode projects集成的&#xff09;需要包含一个隐私清单文件&#xff08;privacy manifest&#xff09;叫作 PrivacyInfo.xcprivacy。它是一个属性列表&#xff0c;记录了A…

VIM插件安装与配置

文章目录 前言插件管理工具1. vimrc通用配置5.标签列表插件(taglist)总结 前言 在某些情况下只能用vim编辑器&#xff0c;而没有类似vscode的图形化界面&#xff0c;为vim配置一系列插件即可方便的在vim下编程 插件管理工具 VundleVim vim-plug 将插件的地址添加进去&#xf…

[C++基础学习]----03-程序流程结构之循环结构详解

前言 在C程序中&#xff0c;循环结构在用于重复执行一段代码块&#xff0c;直到满足某个条件为止。循环结构有多种形式&#xff0c;包括while循环、do-while循环和for循环。 正文 01-循环结构简介 1、while循环语句&#xff1a; while循环在每次循环开始前检查条件是否为真&a…

【利兹】XJCO3910/COMP391001 Combinatorial Optimisation组合优化/运筹学 cw考试资料辅导

COMP391001| XJCO3910 (36642) 西交利兹院 Combinatorial Optimisation组合优化/运筹学 资料or辅导 需要请私聊 1.独家近年考试题 包你高分 2. cw and 官方标准答案 3. worksheets and solutions

【docker】安装openjdk

查看可用的 openjdk版本 docker hub 查看地址&#xff1a;https://hub.docker.com/_/openjdk 此图片已被正式弃用&#xff0c;建议所有用户尽快找到并使用合适的替代品。其他官方形象替代品的一些例子&#xff08;按字母顺序列出&#xff0c;没有有意或暗示的偏好&#xff09;…

机器学习:深入解析SVM的核心概念(问题与解答篇)【一、间隔与支持向量】

直接阅读原始论文可能有点难和复杂&#xff0c;所以导师直接推荐我阅读周志华的《西瓜书》&#xff01;&#xff01;然后仔细阅读其中的第六章&#xff1a;支持向量机 间隔与支持向量 问题一&#xff1a;什么叫法向量&#xff1f;为什么是叫法向量 在这个线性方程中&#xff…

UE5 C++ 项目C#编译目标 链接模块

一.Engine里用C#来配置 1.UnrealEditor.Target.cs using UnrealBuildTool; using System.Collections.Generic;public class UnrealEditorTarget : TargetRules {public UnrealEditorTarget( TargetInfo Target ) : base(Target){Type TargetType.Editor;IncludeOrderVersio…

给大一大二师生的忠告,如何在校招中脱颖而出做到降维打击

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是「奇点」&#xff0c;江湖人称 singularity。刚工作几年&#xff0c;想和大家一同进步&#x1f91d;&#x1f91d; 一位上进心十足的【Java ToB端大厂…

[超详细]Java子父类树形结构查询和删除[小白]

目录 前言 1、查询子父类通过树形结构封装起来 一、创建实体类 二、创建mapper类 三、创建service和serviceImpl类 四、创建controller类 2、删除该父类下的所有子类&#xff0c;并且删除自己 controller层 service和serviceImpl层 总结 前言 [超详细]Java子父类树形…

STM32与Proteus的串口仿真详细教程与源程序

资料下载地址&#xff1a;STM32与Proteus的串口仿真详细教程与源程序 资料内容 包含LCD1602显示&#xff0c;串口发送接收&#xff0c;完美实现。 文档内容齐全&#xff0c;包含使用说明&#xff0c;相关驱动等。 解决了STM32的Proteus串口收发问题。 注意&#xff1a;每输…

Datart 扩装下载功能之PDF和图片下载

Datart 扩装下载功能之PDF和图片下载 首先下载依赖 yum install mesa-libOSMesa-devel gnu-free-sans-fonts wqy-zenhei-fonts -y 然后下载安装chrome yum install https://dl.google.com/linux/direct/google-chrome-stable_current_x86_64.rpm 查看chrome版本号 google…

SAP 修改SO BAPI报错在配置和销售凭证 1 间通信时内部出错(V1 854)

1.背景 在多次使用修改SO BAPI BAPI_SALESORDER_CHANGE的时候由于缓存未清空&#xff0c;可能会报错“在配置和销售凭证 &1 间通信时内部出错”&#xff08;Internal error in communication between configuration and sales doc.&1&#xff09;&#xff0c;对应的消…

基于Vue3实现的 宫格 图片摆放

一个可以支持无限宫格的 vue3实现 本来要参考微信群头像的规则实现&#xff0c;网上找到一大堆类似的需求&#xff0c;奈何XXX折磨人&#xff0c;九宫格已经不能满足ta了。 当前代码实现了………… 好多东西(可以多宫格).具体的看效果图 code <style scoped langless> .…

AI预测体彩排列3第2套算法实战化测试第5弹2024年4月27日第5次测试

今天继续进行新算法的测试&#xff0c;今天是第5次测试。好了&#xff0c;废话不多说了&#xff0c;直接上图上结果。 2024年4月27日体彩排3预测结果 6码定位方案如下&#xff1a; 百位&#xff1a;6、2、1、7、8、9 十位&#xff1a;8、9、4、3、1、0 个位&#xff1a;3、7、8…

【SpringBoot整合系列】SpringBoot整合Redis[附redis工具类源码]

目录 SpringBoot整合Redis1.下载和安装Redis2.新建工程&#xff0c;导入依赖3.添加配置4.先来几个基本的示例测试代码输出结果用redis客户端查看一下存储内容 5.封装redis工具类RedisKeyUtilRedisStringUtilRedisHashUtilRedisListUtilRedisSetUtilRedisZsetUtil备注 6.测试通用…

node.js 解析post请求 方法二

前提&#xff1a;以前面发的node.js解析post请求方法一为模板&#xff0c;具体见 http://t.csdnimg.cn/ABaIn 此文我们运用第二种方法&#xff1a;使用第三方模块formidable对post请求进行解析。 1》代码难点 *** 在Node.js中使用formidable模块来解析POST请求主要涉及到处理…

IO流基础

IO流介绍 1.什么是IO流&#xff1f; 流是一种抽象概念&#xff0c;它代表了数据的无结构化传递。按照流的方式进行输入输出&#xff0c;数据被当成无结构的字节序列或字符序列。从流中取得数据的操作称为提取操作&#xff0c;而向流中添加数据的操作称为插入操作。用来进行输入…

python的pandas库

什么是pandas Pandas是一个开源的第三方Python库&#xff0c;它从Numpy和Matplotlib的基础上构建而来&#xff0c;享有数据分析“三剑客之一”的盛名。Pandas已经成为Python数据分析的必备高级工具&#xff0c;目标是成为强大、灵活、可以支持任何编程语言的数据分析工具。 数…

JVM (Micrometer)监控SpringBoot(AWS EKS版)

问题 怎样使用JVM (Micrometer)面板&#xff0c;监控Spring&#xff1f;这里不涉及Prometheus和Grafana&#xff0c;重点介绍与Micrometer与Springboot&#xff0c;k8s怎样集成。 pom.xml 引入依赖&#xff0c;如下&#xff1a; <properties><micrometer.version&…

手写一个民用Tomcat (07)

继续我们的Tomcat &#xff0c;我们完成了 参数封装成map&#xff0c;下面我们处理&#xff0c;Cookie 和session 我们先引入两个类Session&#xff0c;和SessionFacade&#xff08;也是门面模式&#xff09; public class JxdSession implements HttpSession {private Strin…