Flink入门之DataStream实现word count

以下是一个从无界数据源读取数据计算word count的示例

pom配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>simple-demo</artifactId><groupId>org.example</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>flink</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency></dependencies></project>
java代码
package com.xx.flink.test;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Test;public class WCTest {@Testpublic void dataStreamTest() throws Exception {//  1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//  2. 读取数据: socketDataStreamSource<String> socketDS = env.socketTextStream("127.0.0.1", 7777);//  3. 处理数据: 切换、转换、分组、聚合SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}).setParallelism(2).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value -> value.f0).sum(1);//  4. 输出sum.print();//  5. 执行env.execute();}
}

over~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/731793.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

自研cloud框架专题–web模块(三)

项目特点一:框架集成 1.引入核心依赖2.配置相关功能 二:功能介绍 1.swagger支持并提供swagger快速配置2.knife增强swagger支持3.全局请求参数校验(Validation)支持4.字段脱敏支持5.默认jackson序列化6.xss,cors支持7.访问日志支持8.全局异常处理,统一返回结果9.系统关键及常用信…

数据结构之deque双端队列

一、概念&#xff1a; 众所周知&#xff0c;数据结构是用来存储数据&#xff0c;deque也不例外&#xff0c;他是集结了队列和栈的性质而成的结构&#xff0c;他几乎拥有所有数据结构能有的操作&#xff0c;看似已经大杀四方&#xff0c;可实际情况如何呢&#xff0c;那就带者这…

判断矩阵是否是魔方矩阵

给定程序中&#xff0c;函数fun的功能是&#xff1a;判定形参a所指的NxN&#xff08;规定N为奇数&#xff09;的矩阵是否是"魔方"&#xff0c;若是&#xff0c;函数返回值为1&#xff1b;不是&#xff0c;函数返回值为0。“幻方"的判定条件是&#xff1a;矩阵每…

解决npm install 时出现的.git can‘t be found (see https://git.io/Jc3F9)问题

解决npm install 时出现的.git can‘t be found (see https://git.io/Jc3F9)问题 问题: 今天使用webstream npm instatll之后报错 .git can‘t be found (see https://git.io/Jc3F9) 原因 一般来说就是你使用ws打开的文件夹位置中不是.git文件夹的所在的文件夹相同的位置 举个…

Day 8.TCP包头和HTTP

TCP包头 1.序号&#xff1a;发送端发送数据包的编号 2.确认号&#xff1a;已经确认接收到的数据的编号&#xff08;只有当ACK为1时、确认号才有用&#xff09;&#xff1b; TCP为什么安全可靠 1.在通信前建立三次握手 SYP SYPACK ACK 2.在通信过程中通过序列号和确认号和…

前端javascript的DOM对象操作技巧,全场景解析

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 所属的专栏&#xff1a;前端泛海 景天的主页&#xff1a;景天科技苑 文章目录 1.js的DOM介绍2.节点元素层级关系3.通过js修改&#xff0c;清空节点…

【棘手问题】Spring JPA一级缓存导致获取不到数据库表中的最新数据,对象地址不发生改变

【棘手问题】Spring JPA一级缓存导致获取不到数据库表中的最新数据&#xff0c;对象地址不发生改变 一、问题背景二、解决步骤2.1 debug2.2 原因分析2.2.1 数据步骤2.2.2 大模型解释2.2.3 解释举例2.2.4 关键函数 2.3 解决方案 三、Spring JPA一级缓存 一、问题背景 项目的数据…

STM32 通过Modbus协议更改内部Flash(模仿EEPROM)的运行参数

main.c测试 uint8_t uart1RxBuf[64]{0};uint8_t Adc1ConvEnd0; uint8_t Adc2ConvEnd0;int main(void) {/* USER CODE BEGIN 1 *//* USER CODE END 1 *//* MCU Configuration--------------------------------------------------------*//* Reset of all peripherals, Initial…

编程笔记 html5cssjs 001 第一个网页

编程笔记 html5&css&js 001 第一个网页 一、代码二、解释 这是第一个网页&#xff0c;也是一个模板。 一、代码 <!-- 声明文档类型 --> <!DOCTYPE html> <html lang "zh-cn" ><!-- 页面头部开始 --><head ><!-- 设置页面…

C++ 智能指针深度剖析

文章目录 1. 前言2. 为什么需要智能指针&#xff1f;3. 内存泄漏3.1 内存泄漏的概念及危害3.2 内存泄漏的分类3.3 如何检测内存泄漏3.4 如何避免内存泄漏 4. 智能指针的使用及原理4.1 RAII思想4.2 智能指针的原理4.3 C智能指针发展历史4.4 std::auto_ptr4.5 std::unique_ptr4.6…

掌握MySQL,看完这篇文章就够了!

1. MySQL MySQL是一种广泛使用的关系型数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;由瑞典的MySQL AB公司开发&#xff0c;目前属于甲骨文公司&#xff08;Oracle Corporation&#xff09;。 MySQL使用结构化查询语言&#xff08;SQL&#xff09;进行数据库管理…

C++的面向诗篇:类的叙事与对象的旋律

个人主页&#xff1a;日刷百题 系列专栏&#xff1a;〖C/C小游戏〗〖Linux〗〖数据结构〗 〖C语言〗 &#x1f30e;欢迎各位→点赞&#x1f44d;收藏⭐️留言&#x1f4dd; ​ ​ 一、面向对象的定义 学习C语言时&#xff0c;我们就经常听说C语言是面向过程的&#xff0c;…

Linux安装Whisper-Jax

博客 如需私有化部署欢迎咨询&#xff0c;包含whisper,whisper jax,faster whisper。 一、前提条件 ubuntu 20.04 python 3.9 cuda 11.8 nvidia-cublas-cu11 11.11.3.6 nvidia-cuda-cupti-cu11 11.8.87 nvidia-cuda-nvcc-cu11 11.8.89 nvidia-cuda-nvrtc-cu11 1…

掌握C#语言的精髓:基础知识与实用技能详解(数据类型与变量+ 条件与循环+函数与模块+LINQ+异常+OOP)

目录 一、C#的基础知识1. 数据类型与变量2. 条件与循环3. 函数与模块 二、C#的实用技能1. 面向对象编程&#xff08;OOP&#xff09;2. 异常处理3. LINQ查询 三、C#的重要性与应用场景结语 欢迎阅读本篇博客&#xff0c;我们将深入探讨C#语言的基础知识与实用技能&#xff0c;旨…

[nlp入门论文精读] | Transformer

写在前面 最近工作从CV转向了NLP&#xff0c;于是空余时间便跟着哔哩哔哩李沐老师的视频学习。其实研一NLP课程讲论文的时候&#xff0c;我们小组就选择了经典的Attention和Bert&#xff0c;但还有很多细节并不完全理解&#xff0c;实际使用时也很困惑。 因此这个系列就来记…

【 JS 】深入之继承的多种方式和优缺点

“生活是一幅绚丽的画&#xff0c;每一刻都是色彩的斑斓。在画布上&#xff0c;用坚韧的笔触勾勒出自己的世界&#xff0c;让每一幅画作都是奋斗的痕迹&#xff0c;成就属于自己的艺术之旅。” - 梵高 继承的多种方式 原型链继承 function Parent () {this.name minifan }Pa…

红队专题-开源漏扫-巡风xunfeng源码剖析与应用

开源漏扫-巡风xunfeng 介绍主体两部分:网络资产识别引擎,漏洞检测引擎。代码赏析插件编写JSON标示符Python脚本此外系统内嵌了辅助验证功能文件结构功能 模块添加IP三. 进行扫描在这里插入图片描述 ![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/de587a6f6f694…

ai写作一键生成文章速度快

ai写作是一种基于人工智能技术的自动写作工具&#xff0c;它可以根据用户提供的主题或关键词&#xff0c;快速一键生成文章。与传统的手动写作相比&#xff0c;ai写作的速度更快&#xff0c;效率更高。下面小编就带大家一起来见识下ai写作一键生成文章的速度是如何之快&#xf…

【CSP试题回顾】202012-1-期末预测之安全指数

CSP-202012-1-期末预测之安全指数 解题代码 #include <iostream> #include <algorithm> using namespace std;int n, sum;int main() {cin >> n;for (int i 0; i < n; i){int w, s;cin >> w >> s;sum w * s;}sum max(sum, 0);cout <&…

Electron-builder打包安装包——编译篇

突然有一天想打包个桌面程序&#xff0c;没有打包过&#xff0c;经过九牛二虎之力终于打包出来&#xff0c;在此感谢那些热于分享的前辈&#xff01; 本篇只讲打包运行和出现的问题 一、准备工作&#xff1a;提前下载相关资源包&#xff0c;否则在国内环境下可能因为网络问题…