[flink] flink macm1pro 快速使用从零到一

文章目录

  • 快速使用

快速使用

  1. 打开 https://flink.apache.org/downloads/ 下载 flink

因为书籍介绍的是 1.12版本的,为避免不必要的问题,下载相同版本

image.png
image.png

  1. 解压
 tar -xzvf flink-1.11.2-bin-scala_2.11.tgz

image.png

  1. 启动 flink
./bin/start-cluster.sh

image.png

  1. 打开 flink web 页面 localhost:8081

image.png

  1. 编写结合 Kafka 词频统计程序

具体参考 https://weread.qq.com/web/reader/51032ac07236f8e05107816k1f032c402131f0e3dad99f3?

package org.example;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;import java.util.Properties;public class WordCountKafkaInStdOut {public static void main(String[] args) throws Exception {// 设置Flink执行环境 StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// Kafka参数 Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("group.id", "flink-group");String inputTopic = "Shakespeare";String outputTopic = "WordCount";// Source FlinkKafkaConsumer<String> consumer =new FlinkKafkaConsumer<String>(inputTopic, new SimpleStringSchema(),properties);DataStream<String> stream = env.addSource(consumer);// Transformation // 使用Flink  API对输入流的文本进行操作 // 按空格切词、计数、分区、设置时间窗口、聚合 DataStream<Tuple2<String, Integer>> wordCount = stream.flatMap((String line, Collector<Tuple2<String, Integer>> collector) -> {String[] tokens = line.split("\\s");// 输出结果  for (String token : tokens) {if (token.length() > 0) {collector.collect(new Tuple2<>(token, 1));}}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(0).timeWindow(Time.seconds(5)).sum(1);// Sink wordCount.print();// execute env.execute("kafka streaming word count");}
} 
  1. 打包应用(当然在这之前需要本地调试一下,至少得运行通吧😄)
  2. 使用Flink提供的命令行工具flink,将打包好的作业提交到集群上。命令行的参数 --class 用来指定哪个主类作为入口。
./bin/flink run --class org.example.WordCountKafkaInStdOut xxtarget/flink_study-1.0-SNAPSHOT.jar

class 建议直接拷贝引用
image.png

  1. web 页面查看作业提交成功

image.png

  1. kafka 生产者随便发点消息

image.png

  1. 查看作业日志,词频统计结果

image.png
image.png

  1. 关闭 flink
./bin/stop-cluster.sh

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/777484.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 面试经典150题 242.有效的字母异位词

题目&#xff1a; 给定两个字符串 s 和 t &#xff0c;编写一个函数来判断 t 是否是 s 的字母异位词。 注意&#xff1a;若 s 和 t 中每个字符出现的次数都相同&#xff0c;则称 s 和 t 互为字母异位词。 思路&#xff1a;hash表&#xff0c;可以用int数组代替 代码&#x…

【每日一题】盛水容器

问题描述 给定一个长度为 n 的整数数组 height 。有 n 条垂线&#xff0c;第 i 条线的两个端点是 (i, 0) 和 (i, height[i]) 。 找出其中的两条线&#xff0c;使得它们与 x 轴共同构成的容器可以容纳最多的水。 返回容器可以储存的最大水量。 说明&#xff1a;你不能倾斜容…

JavaScript中作用域与闭包深入解析

函数中的作用域 对这些问题的最常见的回答是&#xff0c;JavaScript 拥有基于函数的作用域。也就是&#xff0c;你声明的每一个函数都为自己创建了一个气泡&#xff0c;而且没有其他的结构可以创建它们自己的作用域气泡。但是就像我们一会儿将会看到的&#xff0c;这不完全正确…

vue创建项目报错Fail to check for updates

网上查了文章说更换淘宝镜像地址啥的 改了地址后依然报错显示Fail to check for updates 并且装包时报错Failed to get response from https://registry.npmmirror.com/binary-mirror-config 既然又是淘宝镜像问题&#xff0c;直接干脆不用淘宝的地址 npm config set regis…

iPhone的iOS系统:定义移动智能体验,引领科技潮流之巅

来自&#xff1a;dlshuhua.com/post/83721.html 在移动智能设备领域&#xff0c;iPhone一直以其出色的性能和独特的用户体验脱颖而出。而这一切的背后&#xff0c;离不开其强大的操作系统——iOS。iOS系统不仅为iPhone提供了强大的性能支持&#xff0c;更通过不断创新和升级&a…

蓝桥杯备考随手记: 数位分解

1. 什么是数位分解 数位分解是将一个数拆分成它的各个数位的过程。每个数位代表了数字在该位上的权重。 例如&#xff0c;对于整数12345&#xff0c;数位分解可以得到以下结果&#xff1a; 万位&#xff1a;1千位&#xff1a;2百位&#xff1a;3十位&#xff1a;4个位&#…

产品经理的自我修养

点击下载《产品经理的自我修养》 1. 前言 在产品领域取得成功的关键在于持续的激情。只有保持热情不减,我们才能克服各种困难,打造出卓越的产品。 如果你真心渴望追求产品之路,我强烈建议你立即行动起来,亲自参与实际的产品创作。无论是建立一个网站、创建一个社群,还是…

Dubbo 负载均衡算法说明

https://cn.dubbo.apache.org/zh-cn/overview/core-features/load-balance/ 在集群负载均衡时&#xff0c;Dubbo 提供了多种均衡策略&#xff0c;缺省为 weighted random 基于权重的随机负载均衡策略。 具体实现上&#xff0c;Dubbo 提供的是客户端负载均衡&#xff0c;即由 …

【前端学习——js篇】4.浅拷贝与深拷贝

具体可见https://github.com/febobo/web-interview 4.浅拷贝与深拷贝 ①栈内存与堆内存 栈内存&#xff08;Stack Memory&#xff09; 栈内存用于存储基本类型的变量和引用类型的变量引用&#xff08;即指向堆内存中实际数据的指针&#xff09;。当一个函数被调用时&#xf…

Mysql的日志管理,备份与回复

目录 一、Mysql日志管理 1、日志的默认位置及配置文件 2、日志分类 2.1错误日志 2.2通用查询日志 2.3二进制日志 2.4慢查询日志 2.5中继日志 3、日志配置 4、日志查询 4.1查询通用日志是否开启 4.2查询二进制日志是否开启 4.3查看慢查询日志是否开启 4.4查询慢查…

Vivado Lab Edition

Vivado Lab Edition 是完整版 Vivado Design Suite 的独立安装版本 &#xff0c; 包含在生成比特流后对赛灵思 FPGA 进行编程和 调试所需的所有功能。通常适用于在如下实验室环境内进行编程和调试&#xff1a; 实验室环境中的机器所含磁盘空间、内存和连 接资源较少。Vivad…

python数据实时传给unity工程并绘制出来

python # 服务器端代码 import socket import random import struct import time# 创建一个服务器Socket server_socket socket.socket(socket.AF_INET, socket.SOCK_STREAM)# 监听的地址和端口 host 127.0.0.1 port 12345# 绑定地址和端口 server_socket.bind((host, port…

纯分享万岳外卖跑腿系统客户端源码uniapp目录结构示意图

系统买的是商业版&#xff0c;使用非常不错有三端uniapp开源代码&#xff0c;自从上次分享uniapp后有些网友让我分享下各个端的uniapp下的各个目录结构说明 我就截图说以下吧&#xff0c;

【Java - 框架 - Lombok】(1) 普通Java项目通过Lombok+Logback完成日志的创建使用 - 快速上手

普通Java项目通过"Lombok""Logback"完成日志的创建使用 - 快速上手&#xff1b; 步骤A 说明 创建"Maven"项目&#xff1b; 图片 步骤B 说明 添加相关依赖项&#xff1b; 图片 代码 <!-- "Lombok"依赖项--> <dependency>&…

c++核心学习--继承2

4.6.7多继承语法 4.6.8菱形继承 利用虚继承解决菱形继承的问题&#xff1a;继承之前加上关键字virtual变为虚继承

经纬恒润RTaW-Pegase:车载网络通信建模与时间特性分析工具

▎RTaW简介 RTaW-Pegase是由法国国家信息与自动化研究所&#xff08;INRIA&#xff09;旗下的RTaW公司开发的产品。它主要用于构建和优化汽车、航空航天以及工业领域的通信网络&#xff0c;包括时间敏感网络&#xff08;TSN&#xff09;、CAN&#xff08;FD&#xff0c;XL&…

math模块篇(六)

文章目录 math.log(x[, base])math.log1p(x)math.log2(x)math.log10(x)math.pow(x, y)math.sqrt(x)math.acos(x)math.asin(x)math.atan(x)math.atan2(y, x)math.cos(x) math.log(x[, base]) math.log(x[, base]) 是 Python 中 math 模块的一个函数&#xff0c;用于计算一个数的…

react-navigation:

我的仓库地址&#xff1a;https://gitee.com/ruanjianbianjing/bj-hybrid react-navigation&#xff1a; 学习文档&#xff1a;https://reactnavigation.org 安装核心包: npm install react-navigation/native 安装react-navigation/native本身依赖的相关包: react-nativ…

开源 | 电动自行车充换电解决方案,从智能硬件到软件系统,全部自主研发

文章目录 一、产品功能部分截图1.手机端&#xff08;小程序、安卓、ios&#xff09;2.PC端 二、小程序体验账号以及PC后台体验账号1.小程序体验账号2.PC后台体验账号关注公众号获取最新资讯 三、产品简介&#xff1f;1. 充电桩云平台&#xff08;含硬件充电桩&#xff09;&…

Codeforces Round 841 (Div. 2) C. Even Subarrays

题目 思路&#xff1a; #include <bits/stdc.h> using namespace std; #define int long long #define pb push_back #define fi first #define se second #define lson p << 1 #define rson p << 1 | 1 const int maxn 1e6 5, inf 1e9, maxm 4e4 5; co…