Flink从入门到实践(一):Flink入门、Flink部署

文章目录

  • 系列文章索引
  • 一、快速上手
    • 1、导包
    • 2、求词频demo
      • (1)要读取的数据
      • (2)demo1:批处理(离线处理)
      • (3)demo2 - lambda优化:批处理(离线处理)
      • (4)demo3:流处理(实时处理)
      • (5)总结:实时vs离线
      • (6)demo4:批流一体
      • (7)对接Socket
  • 二、Flink部署
    • 1、Flink架构
    • 2、Standalone部署
    • 3、自运行flink-web
    • 4、通过参数传递
    • 5、通过webui提交job
    • 6、停止作业
    • 7、常用命令
    • 8、集群
  • 参考资料

系列文章索引

Flink从入门到实践(一):Flink入门、Flink部署
Flink从入门到实践(二):Flink DataStream API
Flink从入门到实践(三):数据实时采集 - Flink MySQL CDC

一、快速上手

1、导包

<!-- fink 相关依赖 -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.18.0</version>
</dependency>

2、求词频demo

注意!自Flink 1.18以来,所有Flink DataSet api都已弃用,并将在未来的Flink主版本中删除。您仍然可以在DataSet中构建应用程序,但是您应该转向DataStream和/或Table API。

(1)要读取的数据

定义data内容:

pk,pk,pk
ruoze,ruoze
hello

(2)demo1:批处理(离线处理)

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/*** 使用Flink进行批处理,并统计wc*** 结果:* (bye,2)* (hello,3)* (hi,1)*/
public class BatchWordCountApp {public static void main(String[] args) throws Exception {// step0: Spark中有上下文,Flink中也有上下文,MR中也有ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// step1: 读取文件内容  ==> 一行一行的字符串而已DataSource<String> source = env.readTextFile("data/wc.data");// step2: 每一行的内容按照指定的分隔符进行拆分  1:Nsource.flatMap(new FlatMapFunction<String, String>() {/**** @param value 读取到的每一行数据* @param out 输出的集合*/@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {// 使用,进行分割String[] splits = value.split(",");for(String split : splits) {out.collect(split.toLowerCase().trim());}}}).map(new MapFunction<String, Tuple2<String,Integer>>() {/**** @param value 每一个元素 (hello, 1)(hello, 1)(hello, 1)*/@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}}).groupBy(0)  // step4: 按照单词进行分组  groupBy是离线的api,传下标.sum(1)  // ==> 求词频 sum,传下标.print(); // 打印}
}

(3)demo2 - lambda优化:批处理(离线处理)

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/*** lambda表达式优化*/
public class BatchWordCountAppV2 {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSource<String> source = env.readTextFile("data/wc.data");/*** lambda语法: (参数1,参数2,参数3...) -> {函数体}*/
//        source.map(String::toUpperCase).print();// 使用了Java泛型,由于泛型擦除的原因,需要显示的声明类型信息source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {String[] splits = value.split(",");for(String split : splits) {out.collect(Tuple2.of(split.trim(), 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).groupBy(0).sum(1).print();}
}

(4)demo3:流处理(实时处理)

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** 流式处理* 结果:* 8> (hi,1)* 6> (hello,1)* 5> (bye,1)* 6> (hello,2)* 6> (hello,3)* 5> (bye,2)*/
public class StreamWCApp {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.readTextFile("data/wc.data");source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {String[] splits = value.split(",");for(String split : splits) {out.collect(Tuple2.of(split.trim(), 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(x -> x.f0) // 这种写法一定要掌握!流式的并没有groupBy,而是keyBy!根据第一个值进行sum.sum(1).print();// 需要手动开启env.execute("作业名字");}
}

(5)总结:实时vs离线

离线:结果是一次性出来的。
实时:来一个数据处理一次,数据是带状态的。

(6)demo4:批流一体

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** 采用批流一体的方式进行处理*/
public class FlinkWordCountApp {public static void main(String[] args) throws Exception {// 统一使用StreamExecutionEnvironment这个执行上下文环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 选择处理方式 批/流/自动DataStreamSource<String> source = env.readTextFile("data/wc.data");source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {String[] splits = value.split(",");for(String split : splits) {out.collect(Tuple2.of(split.trim(), 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(x -> x.f0) // 这种写法一定要掌握.sum(1).print();// 执行env.execute("作业名字");}
}

(7)对接Socket

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** 使用Flink对接Socket的数据并进行词频统计** 大数据处理的三段论: 输入  处理  输出**/
public class FlinkSocket {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/*** 数据源:可以通过多种不同的数据源接入数据:socket  kafka  text** 官网上描述的是 env.addSource(...)** socket的方式对应的并行度是1,因为它来自于SourceFunction的实现*/DataStreamSource<String> source = env.socketTextStream("localhost", 9527);System.out.println(source.getParallelism());// 处理source.flatMap((String value, Collector<Tuple2<String,Integer>> out) -> {String[] splits = value.split(",");for(String split : splits) {out.collect(Tuple2.of(split.trim(), 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(x -> x.f0) // 这种写法一定要掌握.sum(1)// 数据输出.print();  // 输出到外部系统中去env.execute("作业名字");}
}

二、Flink部署

1、Flink架构

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/flink-architecture/
Flink是一个分布式的带有状态管理的计算框架,可以运行在常用/常见的集群资源管理器上(YARN、K8S)。

一个JobManager(协调/分配),一个或多个TaskManager(工作)。
在这里插入图片描述
在这里插入图片描述

2、Standalone部署

按照官网下载执行即可:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/try-flink/local_installation/

可以根据官网来安装,需要下载、解压、安装。
也可以使用docker安装。

启动之后,localhost:8081就可以访问管控台了。

3、自运行flink-web

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>1.18.0</version>
</dependency>
Configuration configuration = new Configuration();
configuration.setInteger("rest.port", 8082); // 指定web端口,开启webUI,不写的话默认8081
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
// 新版本可以直接使用getExecutionEnvironment(conf)

以上亲测并不好使……具体原因未知,设置为flink1.16版本或许就好用了。

4、通过参数传递

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 通过参数传递进来Flink引用程序所需要的参数,flink自带的工具类
ParameterTool tool = ParameterTool.fromArgs(args);
String host = tool.get("host");
int port = tool.getInt("port");DataStreamSource<String> source = env.socketTextStream(host, port);
System.out.println(source.getParallelism());

可以通过命令行参数:–host localhost --port 8765

5、通过webui提交job

在这里插入图片描述
在这里插入图片描述

6、停止作业

在这里插入图片描述

7、常用命令

# 查看作业列表
flink list -a  # 所有
flink list -r  # 正在运行的
# 停止作业
flink cancel <jobid># 提交job
# -c,--class <classname> 指定main方法
# -C,--classpath <url> 指定classpath
# -p,--parallelism <paralle> 指定并行度
flink run -c com.demo.FlinkDemo FlinkTest.jar 

8、集群

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/flink-architecture/#flink-application-execution

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/overview/

单机部署Session Mode和Application Mode:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/standalone/overview/

k8s:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/native_kubernetes/

YARN:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/yarn/

参考资料

https://flink.apache.org/
https://nightlies.apache.org/flink/flink-docs-stable/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/674246.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

容器基础知识:容器和虚拟化的区别

虚拟化与容器化对比 容器化和虚拟化都是用于优化资源利用率并实现高效应用程序部署的技术。然而&#xff0c;它们在方法和关键特征上存在差异&#xff1a; 虚拟化: 可以理解为创建虚拟机 (VM)。虚拟机模拟一台拥有自己硬件&#xff08;CPU、内存、存储&#xff09;和操作系统…

【Python基础】案例分析:电影分析

电影分析 项目背景&#xff1a; 数据集介绍&#xff1a;movie_lens数据集是一个电影信息&#xff0c;电影评分的数据集&#xff0c;可以用来做推荐系统的数据集需求&#xff1a;对电影发展&#xff0c;类型&#xff0c;评分等做统计分析。目标&#xff1a;巩固pandas相关知识…

nohup基本使用

在Linux终端命令中经常要使用到在关闭终端界面的情况下需要后台挂起执行的进程&#xff0c;也就是关闭终端后台任务的进程还是会常驻&#xff0c;下面就简单介绍下 nohup 命令 1. nohup nohup 英文全称 no hang up&#xff08;不挂起&#xff09;&#xff0c;默认情况下&#x…

Layui 表格组件 头部工具栏 筛选列 加入全选和全不选的功能

Layui 表格组件 头部工具栏 筛选列 加入全选和全不选的功能 问题 前端使用Layui表格组件展示后台数据&#xff0c;因数据中涉及字段较多&#xff0c;因此加入了组件中固有的控制表格列隐藏显示的功能。奈何客户希望再此基础上&#xff0c;加入“全选”和“全不选”的功能&…

【动态规划】【前缀和】【C++算法】LCP 57. 打地鼠

作者推荐 视频算法专题 本文涉及知识点 动态规划汇总 C算法&#xff1a;前缀和、前缀乘积、前缀异或的原理、源码及测试用例 包括课程视频 LCP 57. 打地鼠 勇者面前有一个大小为3*3 的打地鼠游戏机&#xff0c;地鼠将随机出现在各个位置&#xff0c;moles[i] [t,x,y] 表…

Adb offline疑难杂症解决方案大全记录

无线/有线Adb offline依次尝试下面步骤&#xff1a; adb kill-server && adb start-server adb reconnect offline 多次 adb tcpip 5555 后重试 检查有线端口5037、无线5555占用&#xff0c;排除改名的adb或其他应用占用 换USB线和USB口拔插、确保同一WiFi下&#xff0…

Stable Diffusion 模型下载:Samaritan 3d Cartoon SDXL(撒玛利亚人 3d 卡通 SDXL)

文章目录 模型介绍生成案例案例一案例二案例三案例四案例五案例六案例七案例八案例九案例十 下载地址 模型介绍 由“PromptSharingSamaritan”创作的撒玛利亚人 3d 卡通类型的大模型&#xff0c;该模型的基础模型为 SDXL 1.0。 条目内容类型大模型基础模型SDXL 1.0来源CIVITA…

2024.2.7

#include<stdio.h> #include<string.h> #include<stdlib.h> typedef char datatype;typedef struct node {//数据域datatype data;//指针域&#xff1a;左struct node *lchild;//指针域&#xff1a;右struct node *rchild; }*btree;//创建节点 btree creat_n…

嵌入式中轻松识别STM32单片机是否跑飞方法

单片机项目偶尔经常出现异常&#xff0c;不知道是程序跑飞了&#xff0c;还是进入某个死循环了&#xff1f; 因为发生概率比较低&#xff0c;也没有规律&#xff0c;所以没办法在线调试查找问题。 结合这个问题&#xff0c;给大家分享一下用ST-LINK Utility识别单片机程序是否…

python-可视化篇-pyecharts库-气候堆叠图

准备 代码 # codingutf-8 # 代码文件&#xff1a;code/chapter10/10.3.py # 3D柱状图import randomfrom pyecharts import options as opts from pyecharts.charts import Bar3D# 生成测试数据 data [[x, y, random.randint(10, 40)] for y in range(7) for x in range(24)]…

[算法前沿]--060-天工Skywork-13B 开源模型

1.技术细节 》 数据处理、 数据配比、模型优化、评估方案 2.数据集 wudao-DataSkywork-150B:https://hf.co/Skywork非盈利性机构构建的CommonCrawl数据集是一个海量的、非结构化的、多语言的网页数据集。它包含了超过 8 年的网络爬虫数据集,包含原始网页数据(WARC)、元数…

Git中为常用指令配置别名

目录 1 前言 2 具体操作 2.1 创建.bashrc文件 2.2 添加指令 2.3 使其生效 2.4 测试 1 前言 在Git中有一些常用指令比较长&#xff0c;当我们直接输入&#xff0c;不仅费时费力&#xff0c;还容易出错。这时候&#xff0c;如果能给其取个简短的别名&#xff0c;那么事情就…

电力负荷预测 | 电力系统负荷预测模型(Python线性回归、随机森林、支持向量机、BP神经网络、GRU、LSTM)

文章目录 效果一览文章概述源码设计参考资料效果一览 文章概述 电力系统负荷预测模型(Python线性回归、随机森林、支持向量机、BP神经网络、GRU、LSTM) 所谓预测,就是指通过对事物进行分析及研究,并运用合理的方法探索事物的发展变化规律,对其未来发展做出预先估计和判断。…

计算机毕业设计 | SSM 医药信息管理系统(附源码)

1&#xff0c; 概述 1.1 课题背景 本系统由说书客面向广大民营药店、县区级医院、个体诊所等群体的药品和客户等信息的管理需求&#xff0c;采用SpringSpringMVCMybatisEasyui架构实现&#xff0c;为单体药店、批发企业、零售连锁企业&#xff0c;提供有针对性的信息数据管理…

MySQL 时间索引的选择

背景 MySQL 在使用过程中经常会对时间加索引&#xff0c;方便进行时间范围的查询&#xff0c;常见的时间类型有 data、datetime、long、timestamp 等&#xff0c;在此分析下这几种时间类型的索引大小&#xff0c;以找到比较合适的时间类型。 时间类型对比 常用的索引类型是 …

科技的成就(五十六)

527、Chrome 1.0 发布 2008 年 12 月 11 日&#xff0c;Chrome 1.0 发布。Chrome 是由谷歌开发的跨平台免费专有网络浏览器&#xff0c;使用内置了高性能 JavaScript 引擎 V8 的 Blink 作为浏览器渲染引擎。Chrome 基于开源软件项目 Chromium&#xff0c;其他基于该项目的知名浏…

HttpServletResponse接口用于表示状态代码的字段

1. HttpServletResponse接口用于表示状态代码的字段 您已学习了状态代码以及可用于从servlet向客户机发送状态代码的HttpServletResponse接口的字段。下表列出了HttpServletResponse接口表示状态代码的一些其他字段。 字段状态代码描述SC_HTTP_VERSION_NOT_SUPPORTED505服务器…

PyTorch深度学习实战(23)——从零开始实现SSD目标检测

PyTorch深度学习实战&#xff08;23&#xff09;——从零开始实现SSD目标检测 0. 前言1. SSD 目标检测模型1.1 SSD 网络架构1.2 利用不同网络层执行边界框和类别预测1.3 不同网络层中默认框的尺寸和宽高比1.4 数据准备1.5 模型训练 2. 实现 SSD 目标检测2.1 SSD300 架构2.2 Mul…

JAVA json转xml

首先要去官方下载json-lib工具包 https://mvnrepository.com/artifact/net.sf.json-lib/json-lib/2.4 目前最新的是2.4的版本&#xff0c;json-lib还需要以下依赖包&#xff1a; 通过mvn库可以直接去下载。 jakartacommons-lang 2.5 jakartacommons-beanutils 1.8.0 jaka…

Verilog刷题笔记25

题目&#xff1a; You’re already familiar with bitwise operations between two values, e.g., a & b or a ^ b. Sometimes, you want to create a wide gate that operates on all of the bits of one vector, like (a[0] & a[1] & a[2] & a[3] … ), whic…