Flink代码单词统计 ---批处理

  • flatMap:一对多转换操作,输入句子,输出分词后的每个词
  • groupBy:按Key分组,0代表选择第1列作为Key
  • sum:求和,1代表按照第2列进行累加
  • print:打印最终结果

1.WordCount代码编写

需求:统计一段文字中,每个单词出现的频次。

环境准备:在src/main/java目录下,新建一个包,命名为com.atguigu.wc。

1.1 批处理

批处理基本思路:

①.先逐行读入文件数据

②.然后将每一行文字拆分成单词

③.接着按照单词分组

④.统计每组数据的个数

⑤.就是对应单词的频次。

1.2 创建项目

1)创建工程

(1)打开IntelliJ IDEA,创建一个Maven工程。

(2)将这个Maven工程命名为Flinkdemo。

2)添加项目依赖

在项目的pom文件中,添加Flink的依赖,包括flink-java、flink-streaming-java,以及flink-clients(客户端,也可以省略)。

<properties><flink.version>1.17.0</flink.version>
</properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency>
</dependencies>

3)数据准备

(1)在工程根目录下新建一个Data文件夹,并在下面创建文本文件words.txt

(2)在words.txt中输入一些文字,例如:

hello flink
hello world
hello java

4)代码编写

(1)在com.atguigu.wc包下新建Java类Demo01_BatchProcess,在静态main方法中编写代码。具体代码实现如下:

package com.atguigu.wordcount;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/**as* Created by Smexy on 2023/9/3**  计算的套路:*      ①计算的环境*          spark:SparkContext*          mr:   Driver*          flink:ExecutionEnvironment*      ②把要计算的数据封装为计算模型*         spark:  RDD(spark core)*                 DataFrame|DataSet(sparksql)*                 DStream(sparkstreaming)*         mr:     K-V*         flink:  DataSource**      ③调用计算api*          RDD.转换算子()*          mr: 自己去编写Mapper,Reducer*          flink: DataSource.算子()**  使用的是DataSetAPI(批处理)**  -------------------------*      了解。后续不用了!***/
public class Demo01_BatchProcess
{public static void main(String[] args) throws Exception {//创建支持flink计算的环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);//使用环境去读数据,封装为计算模型DataSource<String> source = env.readTextFile("data/words.txt");//调用计算apisource/*hello hi hi hi变为 (hello,1)(h1,1)(h1,1)(h1,1)输出到下游*/.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>(){/*String value: 输入的一行内容Collector<String> out: 输出结果的收集器。帮你把结果自动收集,输出到下游。单词,1: 输出的数据是多列,此时就应该使用集合或Bean来封装。flink提供了Tuple的集合。用于封装多个列。Tuple2: 用来封装2列Tuple3: 用来封装3列....Tuple25: 用来封装25列*/@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {//Tuple2<String, Integer> data = new Tuple2<>(word, 1);Tuple2<String, Integer> data = Tuple2.of(word, 1);//收集要输出的数据out.collect(data);}}})/*收到的是 (单词,1) 格式计算: 得到 (单词,N)groupBy(int fileds): 适用于 对Tuple类型的数据进行聚合。传入int N,N代表Tuple中的列的索引。groupBy(String fileds): 适用于对Bean类型的数据进行聚合,传入的String就是Bean中的属性名。*/.groupBy(0)// 对tuple2分组后的第二列进行sum运算.sum(1)//在控制台打印输出.print();}
}

5).输出

(flink,1)
(world,1)
(hello,3)
(java,1)

1.3  常见问题

问题1.

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

解决方式:maven项目的 pom.xml安装依赖:

        <dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.32</version></dependency>

问题2.

log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.utils.PlanGenerator).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

解决办法:log4j没有配置日志记录的位置,需要配置log4j.properties,在src目录main目录resources文件夹下下新建log4j.properties

log4j.properties配置文件:

log4j.rootLogger=warn,CONSOLE,File#Console
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss SSS} [%t] [%c] [%p] - %m%n#File  DailyRollingFileAppender
log4j.logger.File=info
log4j.appender.File=org.apache.log4j.DailyRollingFileAppender
log4j.appender.File.layout=org.apache.log4j.PatternLayout
log4j.appender.File.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss SSS} [%t] [%c] [%p] - %m%n
log4j.appender.File.datePattern='.'yyyy-MM-dd
log4j.appender.File.Threshold = info
log4j.appender.File.append=true
log4j.appender.File.File=d://code/logs/flink/disk.log

 此时再次执行成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/704106.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何从零实现一个词云效果

词云是一种文本数据的可视化形式&#xff0c;它富有表现力&#xff0c;通过大小不一&#xff0c;五颜六色&#xff0c;随机紧挨在一起的文本形式&#xff0c;可以在众多文本中直观地突出出现频率较高的关键词&#xff0c;给予视觉上的突出&#xff0c;从而过滤掉大量的文本信息…

Segment Routing IPv6介绍

定义 SRv6&#xff08;Segment Routing IPv6&#xff0c;基于IPv6转发平面的段路由&#xff09;是基于源路由理念而设计的在网络上转发IPv6数据包的一种协议。SRv6通过在IPv6报文中插入一个路由扩展头SRH&#xff08;Segment Routing Header&#xff09;&#xff0c;在SRH中压…

matlab 线性四分之一车体模型

1、内容简介 略 57-可以交流、咨询、答疑 路面采用公式积分来获得&#xff0c;计算了车体位移、非悬架位移、动载荷等参数 2、内容说明 略 3、仿真分析 略 线性四分之一车体模型_哔哩哔哩_bilibili 4、参考论文 略

十一、Qt自定义Widget组件、静态库与动态库

一、自定义Widget组件 1、自定义Widget组件 使用步骤采用提升法&#xff08;promotion&#xff09;重新定义paintEvent事件 2、实现程序 &#xff08;1&#xff09;创建项目&#xff0c;基于QWidget &#xff08;2&#xff09;添加类&#xff0c;为Widget组件提升类 #inclu…

探索C语言位段的秘密

位段 1. 什么是位段2. 位段的内存分配3. 位段的跨平台问题4. 位段的应用4. 使用位段的注意事项 1. 什么是位段 我们使用结构体实现位段&#xff0c;位段的声明和结构体是类似的&#xff0c;有两个不同&#xff1a; 位段的成员必须是int&#xff0c;unsigned int&#xff0c;或…

python学习笔记 - 标准库函数

概述 为了方便程序员快速编写Python脚本程序&#xff0c;Python提供了很多好用的功能模块&#xff0c;它们内置于Python系统&#xff0c;也称为内置函数(Built-in Functions&#xff0c;BlF)&#xff0c;Python 内置函数是 Python 解释器提供的一组函数&#xff0c;无需额外导…

前端常见面试题之vue3

文章目录 1. vue3比vue2有哪些优势2. 描述vue3的生命周期3. 如何看待vue3中的Composition API 和 Options API4. 如何理解ref、 toRef、和toRefs?5. vue3升级了哪些功能6. Composition API如何实现代码逻辑的复用&#xff08;hook)7. Vue3如何实现响应式的8.Vue3使用Proxy对象…

计算机网络实验八 利用 Java /C++开发网络聊天应用程序

一、实验目的和要求 1)基本掌握利用 Java 开发环境调试应用程序的方法。 2)理解基于套接字开发网络应用程序的过程,深入理解客户/服务器方式工作原理。 3)掌握基于Java和C++开发网络通信程序的方法。 二、实验环境 1)运行 Windows 2008 Server/XP/7 操作系统的 PC 2 台…

vue基础操作(vue基础)

想到多少写多少把&#xff0c;其他的想起来了在写。也写了一些css的 input框的双向数据绑定 html <input value"123456" type"text" v-model"account" input"accou" class"bottom-line bottom" placeholder"请输入…

golang学习1,dea的golang-1.22.0

参考&#xff1a;使用IDEA配置GO的开发环境备忘录-CSDN博客 1.下载All releases - The Go Programming Language (google.cn) 2.直接next 3.window环境变量配置 4.idea的go插件安装 5.新建go项目找不到jdk解决 https://blog.csdn.net/ouyang111222/article/details/1361657…

虚拟列表【vue】等高虚拟列表/非等高虚拟列表

文章目录 1、等高虚拟列表2、非等高虚拟列表 1、等高虚拟列表 参考文章1 参考文章2 <!-- eslint-disable vue/multi-word-component-names --> <template><divclass"waterfall-wrapper"ref"waterfallWrapperRef"scroll"handleScro…

数据湖delta lake

Table of Content1. 课程2. 前置技能3. 一、数据湖概念[了解] 3.1. 1.1 企业的数据困扰 3.1.1. 困扰一&#xff1a;互联网的兴起和数据孤岛3.1.2. 困扰二&#xff1a;非结构化数据3.1.3. 困扰三&#xff1a;保留原始数据3.1.4. 补充&#xff1a;什么是结构化&#xff1f; 3.1.4…

09 Redis之分布式系统(数据分区算法 + 系统搭建与集群操作)

6 分布式系统 Redis 分布式系统&#xff0c;官方称为 Redis Cluster&#xff0c;Redis 集群&#xff0c;其是 Redis 3.0 开始推出的分布式解决方案。其可以很好地解决不同 Redis 节点存放不同数据&#xff0c;并将用户请求方便地路由到不同 Redis 的问题。 什么是分布式系统?…

音乐格式转换软件有哪些?5款必备神器

音乐格式转换软件有哪些&#xff1f;音乐&#xff0c;作为人类情感的载体&#xff0c;伴随着我们生活的每一个角落。在享受音乐的同时&#xff0c;我们有时也面临着音乐格式不兼容的问题。不用担心&#xff0c;今天我将为大家揭秘五款音乐格式转换软件&#xff0c;让你的音乐之…

华为配置Mesh普通业务示例

配置Mesh普通业务示例 组网图形 图1 配置Mesh组网示意图 业务需求组网需求数据规划配置思路配置注意事项操作步骤配置文件 业务需求 在企业内部各区域通过建立Mesh无线回传链路&#xff0c;实现无线覆盖区域拓展&#xff0c;降低有线部署成本。 组网需求 AC组网方式&#xff1a…

sqllabs的order by注入

当我们在打开sqli-labs的46关发现其实是个表格&#xff0c;当测试sort等于123时&#xff0c;会根据列数的不同来进行排序 我们需要利用这个点来判断是否存在注入漏洞&#xff0c;通过加入asc 和desc判断页面有注入点 1、基于使用if语句盲注 如果我们配合if函数&#xff0c;表达…

面试经典150题——存在重复元素 II

​"The harder you work for something, the greater youll feel when you achieve it." - Unknown 1. 题目描述 2. 题目分析与解析 2.1 思路一——暴力求解 该思路很简单&#xff0c;就是暴力的查找每一个元素&#xff0c;查看是否满足题目要求&#xff0c;满足就…

Darkhole 2

kali:192.168.223.128 靶机:192.168.223.152 主机发现 nmap -sP 192.168.223.0/24 端口扫描 nmap -sV -p- -A 192.168.223.152 开启了22 80 端口 web 进入登录界面发现没有注册按钮了 扫一下目录 gobuster dir -u http://192.168.223.152 -x html,txt,php,bak,zip,git --wor…

单片机精进之路-5矩阵键盘扫描

如下图&#xff0c;先在p3口输出0xfe&#xff0c;再读取p3口的电平&#xff0c;如果没有按键按下&#xff0c;temp & 0xf0还是0xf0&#xff0c;如果又第一个键按下&#xff0c;temp & 0xf0还是0xee&#xff0c;其他按键由此类推可得。 //4*4键盘检测程序,按下键后相应…

Linux的文件操作,重拳出击( ̄︶ ̄)

Linux的文件操作 学习Linux的文件操作&#xff0c;一般需要知道一个文件如果你想要操作他&#xff0c;必须知道你对这个文件有什么操作的权限或者修改你自己对文件操作的权限。必须要知道文件有三种权限 r&#xff1a;可读 w&#xff1a;可写 x&#xff1a;可执行 在打开Linux…