Flink学习(七)-单词统计

前言

Flink是流批一体的框架。因此既可以处理以流的方式处理,也可以按批次处理。

一、代码基础格式

//1st 设置执行环境
xxxEnvironment env = xxxEnvironment.getEnvironment;//2nd 设置流
DataSource xxxDS=env.xxxx();//3rd 设置转换
Xxx transformation =xxxDS.xxxx();//4th 设置sink
transformation.print();//5th 可能需要
env.execute();

二、Demo1 批处理

  • 源码

 public static void main(String[] args) throws Exception {//1,创建一个执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//2,获取输入流DataSource<String> lineDS = env.readTextFile("input/word.txt");//3,处理数据FlatMapOperator<String, Tuple2<String, Integer>> wordDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {//3.1 分隔字符串String[] values = value.split(" ");//3.2 汇总统计for (String word : values) {Tuple2<String, Integer> wordTuple = Tuple2.of(word, 1);collector.collect(wordTuple);}}});//4,按单词聚合UnsortedGrouping<Tuple2<String, Integer>> tuple2UnsortedGrouping = wordDS.groupBy(0);//5,分组内聚合AggregateOperator<Tuple2<String, Integer>> sum = tuple2UnsortedGrouping.sum(1);//6,输出结果sum.print();}
  • 效果展示

三、Demo2 流处理

  • 源码

   public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> lineDS = env.readTextFile("input/word.txt");SingleOutputStreamOperator<Tuple2<String, Integer>> wordDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] words = value.split(" ");for (String word : words) {Tuple2<String, Integer> temp = Tuple2.of(word, 1);collector.collect(temp);}}});KeyedStream<Tuple2<String, Integer>, Tuple> wordCountKeyBy = wordDS.keyBy(0);SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordCountKeyBy.sum(1);sum.print();env.execute();}
  • 效果展示

四、Demo3 无边界流处理

  • 源码

    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> lineDS = env.socketTextStream("192.168.3.11", 9999);SingleOutputStreamOperator<Tuple2<String, Integer>> sum = lineDS.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value -> value.f0).sum(1);sum.print();env.execute();}
  • 效果展示 

往192.168.3.11的9999端口上持续输送数据流,程序端会出现如下统计

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/1344.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring Boot中接收各种各样的参数

一、接收json参数&#xff0c;封装为Map 1.1、核心代码 /*** 接收json参数&#xff0c;封装为Map* param servletRequest* return* throws Exception*/ PostMapping("/getParam") public R getParam(HttpServletRequest servletRequest) throws Exception {Map<…

Bootstrap 5 保姆级教程(十二):弹出框 消息弹窗

一、弹出框 1.1 创建弹出框 通过向元素添加 data-bs-toggle"popover" 来来创建弹出框。 title 属性的内容为弹出框的标题&#xff0c;data-bs-content 属性显示了弹出框的文本内容&#xff1a; 注意: 弹出框要写在 JavaScript 的初始化代码里。 以下实例可以在文…

NLP 文本表征方式

在自然语言处理&#xff08;NLP&#xff09;领域&#xff0c;将文本转换成计算机能够理解和处理的格式是一个基本的步骤。这个过程通常被称为文本表征或文本向量化。下面&#xff0c;我将详细介绍几种常见的文本表征方法&#xff0c;并提供一些例子来说明这些技术是如何应用的。…

投入产出表的分析要点有哪些

投入产出分析是利用投入产出表、投入产出系数和投入产出模型&#xff0c;对国民经济各部门之间的技术经济联系和影响进行分析的一种经济数据分析方法。 一、什么是投入产出表 我国的投入产出表是描述国民经济中各种产品的来源与使用去向的棋盘式平衡表 , 是产品部门 产品部门…

【已解决】电脑设置notepad++默认打开txt

1、以管理员的方式打开notepad 步骤&#xff1a;打开设置 -> 首选项 -> 文件关联 2、 设置Notepad默认打开 按照以下步骤将Notepad设置为默认打开.txt文件&#xff1a; 右键单击任何一个.txt文件。选择“属性”。在“常规”选项卡中&#xff0c;找到“打开方式”&#…

【Interconnection Networks 互连网络】Dragonfly Topology 蜻蜓网络拓扑

蜻蜓拓扑 Dragonfly Topology 1. 拓扑参数2. Topology Description 拓扑描述3. Topology Variations 拓扑变体 蜻蜓拓扑 Dragonfly Topology 1. 拓扑参数 Dragonfly拓扑参数&#xff1a; N N N: 网络中终端(terminal)的总数量 p p p: 连接到每个路由器的终端数量 a a a: 每…

VR全景:为户外游玩体验插上科技翅膀

随着VR全景技术的愈发成熟&#xff0c;无数人感到惊艳&#xff0c;也让各行各业看到了一片光明的发展前景。尤其是越来越多的文旅景区开始引入VR全景技术&#xff0c;相较于以往的静态风景图&#xff0c;显然现在的VR全景结合了动态图像和声音更加吸引人。 VR全景技术正在逐步改…

密码学 | 承诺:Pedersen 承诺 + ZKP

​ &#x1f951;原文&#xff1a;Toward Achieving Anonymous NFT Trading &#x1f951;写在前面&#xff1a;看了篇 22 年 SCI 3 区论文&#xff0c;里面提到在 Pedersen 承诺的揭示阶段可以使用零知识证明&#xff0c;而不必揭示消息明文和随机数。姑且记录一下这个方法。…

Dijkstra算法求最短路

Dijkstra算法可以在图中寻找一个节点&#xff08;称为“源节点”&#xff09;到所有其它节点的最短路径。 文章目录 前言 一、Dijkstra算法是什么&#xff1f; 二、问题介绍 三、朴素版Dijkstra算法 1.图的存储 2.算法实现 四、使用步骤 1.代码如下&#xff08;示例&#xff09…

使用 hiredis 客户端库封装一个简单的 Redis 类

目录 思考一下redis编程的整个过程。 我们作为redis客户端。需要跟redis服务器交互。 封装 Redis 的 C 类的过程可以分为以下几个步骤&#xff1a; 一个完成发布订阅功能的 Redis 类 思考一下redis编程的整个过程。 我们作为redis客户端。需要跟redis服务器交互。 那说白了…

Linux的UDEV机制

udev 机制引入&#xff1a; 手机接入Linux热拔插相关 a. 把手机接入开发板 b. 安装adb工具&#xff0c;在终端输入adb安装指令&#xff1a; sudo apt-get install adb c. dmeg能查看到手机接入的信息&#xff0c;但是输入adb devices会出现提醒 dinsufficient permissions for …

【Java】HashMap、HashTable和ConcurrentHashMap的区别

文章目录 区别一、HashMap1.1基本定义与特性1.2工作原理与实现1.3常用方法1.4性能与优化 二、HashTable三、ConcurrentHashMap3.1基本特点3.2实现原理3.3常用方法3.4适用场景3.5性能优化 HashTable、HashMap和ConcurrentHashMap之间的区别主要体现在线程安全、继承关系与实现接…

Mysql 和 PostgreSQL 到底选啥?

当我深入探讨MySQL和PostgreSQL这两个著名的开源数据库时&#xff0c;我们不仅发现它们在功能、性能和用例方面存在明显的差异&#xff0c;同时也能看出它们各自在特定场景下的独特优势。选择哪一个往往取决于项目的具体需求、团队的熟悉度以及未来的扩展计划。 在这篇文章中&…

kaggle 泰坦尼克号2 得分0.7799

流程 导入所要使用的包引入kaggle的数据集csv文件查看数据集有无空值填充这些空值提取特征分离训练集和测试集调用模型 导入需要的包 import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns import warnings warnings.filterwarni…

Vue3: 获取元素DOM的方法

Vue3中获取dom的方法有两种 : ref模板引用和传统方法 1.ref模板引用 模板引用是官方提出的方法&#xff0c;请看下面的例子&#xff1a; <template><canvas ref"solarCanvas" id"solar" width"1300" height"900"></…

K8S 污点和容忍度(Taint,Toleration)

介绍 在 Kubernetes 中&#xff0c;污点&#xff08;Taints&#xff09;和容忍度&#xff08;Tolerations&#xff09;是用于节点调度的一种机制&#xff0c;它们允许你控制哪些 Pod 能够调度到哪些节点上。 污点&#xff08;Taints&#xff09; 污点是节点上的一种属性&…

从C到JAVA之学习JAVA的第一周笔记

文章目录 java语言概述JDK与JRE编写执行过程第一份java代码解读编写编译运行其他 注释三种注释方法 java API文档关键字标识符数据类型基本数据类型自动类型提升规则引用数据类型 string概述String与基本数据类型的变量间的运算 运算符键盘录入运行控制语句数组定义与静态初始化…

springboot no mapping for.....解决办法

这个问题是由于没有加入对应的GET,POST注解&#xff0c;导致映射失败&#xff0c;加入对应注解就ok了

JDK 11下载、安装、配置

下载 到Oracle管网下载JDK 11&#xff0c;下载前需要登录&#xff0c;否则直接点下载会出现502 bad gateway。 下载页面链接 https://www.oracle.com/hk/java/technologies/downloads/#java11-windows 登录 有些人可能没有Oracle账号&#xff0c;注册也比较慢&#xff0c;有需…

随笔05 我的创作纪念日(512天)

机缘 机缘这事儿&#xff0c;我在随笔系列博文里已经翻来覆去说了不少&#xff0c;这次就不再唠叨了&#xff0c;省得被小伙伴嫌弃成祥林嫂~&#x1f61c; &#x1f338;随笔01 我的创作纪念日&#xff08;128天&#xff09;_newmitbbs-CSDN博客 收获 我这一小片自留地&…