大数据 flink 01 | 从零环境搭建 简单Demo 运行

什么是Flink

Flink是一个开源的流处理批处理框架,它能够处理无界和有界的数据流,具有高吞吐量、低延迟和容错性等特点

Flink 可以应用于多个领域如:实时数据处理、数据分析、机器学习、事件驱动等

什么是流式处理?什么是批处理

流处理是一种针对实时数据流进行连续处理的技术。它的数据通常是无界,数据以持续不断的流的形式到达。

批处理是一种将大量数据集合在一起进行统一处理的技术。在批处理中,首先要收集存储数据,批处理通常用于处理历史数据或离线数据

下载与安装

flink 依赖jdk ,版本推荐 Java 8 or 11

flink 下载与安装

本文使用的是 flink-1.17.2-bin-scala_2.12.tgz

tar -xzf flink-*.tgz

web UI 配置

vim ./conf/flink-conf.yaml

rest.bind-address: 0.0.0.0

启动与停止

 ./bin/start-cluster.sh

输入 ip:8081 进入UI 管理页面

图片

Flink WebUI 页面

一个简单的例子

新建Maven 项目

添加maven 依赖
 <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.14.4</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.1</log4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- This dependency is provided, because it should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies>
官方文档一个简单的Demo
package com.codetonight;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;public class Example {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Person> flintstones = env.fromElements(new Person("Fred", 35),new Person("Wilma", 35),new Person("Pebbles", 2));DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {@Overridepublic boolean filter(Person person) throws Exception {return person.age >= 18;}});adults.print();env.execute();}public static class Person {public String name;public Integer age;public Person() {}public Person(String name, Integer age) {this.name = name;this.age = age;}public String toString() {return this.name.toString() + ": age " + this.age.toString();}}
}
本地 idea 运行

本地启动报java.lang.NoClassDefFoundError: org/apache/flink/api/common/functions/FlatMapFunction时,

idea 需要勾选 add dependencies with provided scope to classpath

操作路径  Edit Configurations

图片

提交任务到集群

通过UI页面提交Flink 任务,操作路径 Submit New Job -> Add New

图片

任务提交

上传jar,填写处理任务类(包含main 方法)的类全路径

图片

Jobs菜单下可以查看 运行中 和 已完成的 任务

图片

查看任务的日志

图片

图片

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/53635.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

xQTLs 共定位分析(XQTLbiolinks包)

XQTL 共定位分析 XQTLbiolinks 是一个端到端的生物信息学工具&#xff0c;由深圳湾实验室李磊研究团队开发&#xff0c;用于高效地分析公共或用户定制的个xQTLs数据。该软件提供了一个通过与 xQTLs 共定位分析进行疾病靶基因发现的流程&#xff0c;以检测易感基因和致病变异。…

vimax通信协议

关于“Vimax通信协议”&#xff0c;实际上可能存在一定的误解或混淆。在通信技术和网络领域&#xff0c;并没有广泛认知的名为“Vimax”的通信协议。然而&#xff0c;您可能是在询问关于“WiMAX”的信息&#xff0c;因为“WiMAX”与“Vimax”在发音上相近&#xff0c;且WiMAX是…

【STM32】RTT-Studio中HAL库开发教程七:IIC通信--EEPROM存储器FM24C04

文章目录 一、简介二、模拟IIC时序三、读写流程四、完整代码五、测试验证 一、简介 FM24C04D&#xff0c;4K串行EEPROM&#xff1a;内部32页&#xff0c;每个16字节&#xff0c;4K需要一个11位的数据字地址进行随机字寻址。FM24C04D提供4096位串行电可擦除和可编程只读存储器&a…

python学习记录3

目录 1、数据类型转换 2、eval函数 3、运算符 1、数据类型转换 变量类型的转换分为隐类转换和显类转换&#xff0c;隐类转换在python代码行中运行时就自动发生。例如 x ture print(x1) 显类转换使用函数完成&#xff0c;主要有以下几种&#xff1a; x 10 #整数默认是i…

2.1 HuggingFists系统架构(一)

系统架构 HuggingFists的前端主体开发语言为HtmlJavascript&#xff0c;后端的主体开发语言为Java。在算子部分有一定份额的Python代码&#xff0c;用于整合Python在数据处理方面强大能力。 功能架构 HuggingFists的功能架构如上&#xff0c;由下向上各层为&#xff1a; 数据存…

leetcode刷题day29|贪心算法Part03( 134. 加油站、135. 分发糖果、860.柠檬水找零、406.根据身高重建队列)

134. 加油站 思路&#xff1a; 暴力解法&#xff1a;for循环适合模拟从头到尾的遍历&#xff0c;while循环适合模拟环形遍历&#xff01;但是会超出leetcode的时间限制。 class Solution {public int canCompleteCircuit(int[] gas, int[] cost) {for(int i0;i<gas.length…

从文本图片到多模态:3D 数字人打开企业全域商业增长新空间

摘要&#xff1a;数字化与AI浪潮推动各行业变革&#xff0c;内容形式也发生巨变&#xff0c;从文本到多媒体的多模态表达&#xff0c;标志着内容创造走向升维。AIGC 3D生成技术的突飞猛进&#xff0c;彻底打破了传统3D内容生产门槛高、周期长、成本高昂的问题。将3D数字人的打造…

若依 Vue3 前端分离 3.8.8 版实现去除首页,登录后跳转至动态路由的第一个路由的页面

一、前言 某些项目可能并不需要首页&#xff0c;但在若依中想要实现不显示首页&#xff0c;并根据不同角色登录后跳转至该角色的第一个动态路由的页面需要自己实现&#xff0c;若依中没有实现该功能的特定代码。 二、代码 1. src\permission.js 在 src\permission.js 中添加…

记录一下oceanbase数据库导出数据到mysql

导出 SQL 文件 使用 mysqldump 工具从 OceanBase 导出 SQL 文件到 output2222.sql。在这一步中&#xff0c;你需要确保你有正确的权限和数据库访问配置。 mysqldump -h 192.168.191.72 -P 2881 -u rootA_a -p密码 rhzfdb > output2222.sql清理 SQL 文件 使用 sed 命令批量…

VSCode编程配置再次总结

VScode 中C++编程再次总结 0.简介 1.配置总结 1.1 launch jsion文件 launch.json文件主要用于运行和调试的配置,具有程序启动调试功能。launch.json文件会启用tasks.json的任务,并能实现调试功能。 左侧任务栏的第四个选项运行和调试,点击创建launch.json {"conf…

探索 ShellGPT:终端中的 AI 助手

文章目录 探索 ShellGPT&#xff1a;终端中的 AI 助手背景介绍ShellGPT 是什么&#xff1f;如何安装 ShellGPT&#xff1f;简单的库函数使用方法场景应用常见问题及解决方案总结 探索 ShellGPT&#xff1a;终端中的 AI 助手 背景介绍 在当今快速发展的技术领域&#xff0c;命…

【TypeScript入坑】什么是TypeScript?

TypeScript入坑 什么是 TypeScriptTypeScript 的优势 什么是 TypeScript TypeScript&#xff1a;是 JavaScript 的超集&#xff0c;拥有类型机制&#xff0c;不会再浏览器直接执行&#xff0c;而是编译成 JavaScript 后才会运行。 超集&#xff08;superset&#xff09;&…

Redis中的setnx的使用场景

Redis中的SETNX命令是一个非常有用的工具&#xff0c;特别是在处理分布式系统和并发控制时。SETNX是“Set if Not Exists”的缩写&#xff0c;用于设置键的值&#xff0c;但仅当键不存在时。以下是SETNX命令的一些主要使用场景&#xff1a; 1. 分布式锁 在分布式环境中&#…

查询最近正在执行的sql(DM8 : 达梦数据库)

查询最近正在执行的sql DM8 : 达梦数据库 1 查询最近正在执行的sql2 更多达梦数据库学习使用列表 1 查询最近正在执行的sql 迁移数据时 , 业务无响应 , 查看最近活动的sql , 有没有迁移相关的表 , 通过最后的时间字段 , 判断会话是否正在执行 SELECT SESS_ID, SQL_TEXT, STATE…

ZABBIX监控 EMQTT服务思路及实施全过程(含脚本及模板)

系统环境 ZABBIX服务器:centos7,zabbix6.4,jq,zabbix-sender-3.0.5 EMQX服务器:centos7, emqx4.4.3 监控思路 通过 EMQX 的 API 获取实时监控数据(包括统计信息和指标),然后将这些数据发送到 Zabbix 服务器进行监控。具体来说,脚本执行了以下操作: 从 EMQX API 获…

[leetcode]112_路径总和_判断是否存在

给你二叉树的根节点 root 和一个表示目标和的整数 targetSum 。 判断该树中是否存在 根节点到叶子节点 的路径&#xff0c;这条路径上所有节点值相加等于目标和 targetSum 。 如果存在&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 叶子节点 是指没有子节点…

redis哨兵启动出现 +sdown master mymaster 192.168.x.x

场景&#xff1a; 搭建好哨兵之后&#xff0c;哨兵一启动&#xff0c;过了30秒就会判断master sdown&#xff0c;但是检查配置是没有问题。 日志&#xff1a; Redis-master启动日志&#xff1a;没看到任何异常&#xff0c;所以master无异常 Redis-哨兵启动日志&#xff1a; …

(已解决)vscode如何传入argparse参数来调试/运行python程序

文章目录 前言调试传入参数运行传入参数延申 前言 以前&#xff0c;我都是用Pycharm专业版的&#xff0c;由于其好像在外网的时候&#xff0c;不能够通过VPN来连接内网服务器&#xff0c;我就改用了vscode。改用了之后&#xff0c;遇到一个问题&#xff0c;调试或者运行python…

常见服务端口号和中文大全

ServiceChinesePortExplainFTP文件传输协议20数据FTP文件传输协议21连接SSH安全外壳协议22SMTP简单邮件传输协议25DNS域名解析协议53DHCP动态主机配置协议67服务端DHCP动态主机配置协议68客户端HTTP超文本传输协议80Kerberos网络认证协议88POP3邮局协议110RPC远程过程调用111IM…

QT编译之后的debug包下运行程序双击运行出现无法定位程序输入点__gxx_personlity_seh0于动态链接库

1.出现这个错误的原因是&#xff1a; 缺少如下文件&#xff1a; 2.解决方法&#xff1a; 在运行程序.exe所在的目录执行&#xff1a;windeployqt untitled.exe&#xff08;指打包的运行程序&#xff09; 3.如果执行提示由于找不到qt5core.dll,无法继续执行代码和无法识别win…