Flink入门之DataStream API及kafka消费者

DataStream API

  1. 主要流程:
    • 获取执行环境
    • 读取数据源
    • 转换操作
    • 输出数据
    • Execute触发执行
  2. 获取执行环境
    • 根据实际情况获取StreamExceptionEnvironment.getExecutionEnvironment(conf)
    • 创建本地环境StreamExecutionEnvironment.createLocalEnvironment()
    • 创建远程环境createRemoteEnvironment(“hadoop102”, 37784, “jar/1.jar”)
      • 参数1:主机号
      • 参数2:端口号
      • 参数3:作业jar包的路径
  3. 获取数据源
    • 简单数据源
      • 从集合中读取数据env.fromCollection(集合)
      • 从元素列表中获取数据env.fromElements()
      • 从文件中读取数据,env.readTextFIle(路径), 已废弃
      • 从端口读取数据,env.socketTextStream()
    • 文件数据源
    • kafka数据源
    • DataGen数据源
    • 自定义数据源

文件数据源

使用文件数据源前,需要先添加相关依赖

 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version><scope>provided</scope>
</dependency>
public class Flink02_FileSource {public static void main(String[] args) throw Exception {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);//file sourceFileSource.FileSourceBuilder<String> fileSourceBuilder = FileSource.<String>forRecordStreamFormat(new TextLineInputFormat("utf-8"), new Path("input/word.txt"));FileSource<String> fileSource = fileSourceBuilder.build();//source 算子DataStreamSource<String> ds = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");ds.print();env.execute();}
}

DataGen数据源

主要用于生成模拟数据,也需要导入相关依赖

 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version><scope>compile</scope></dependency>
public class Flink04_DataGenSource {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return UUID.randomUUID() + "->" + value;}},100,RateLimiterStrategy.perSecond(1),Types.STRING);DataStreamSource<String> dataGenDs = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGenDs");dataGenDs.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

Kafka消费者

  1. 消费方式:拉取

  2. 消费者对象:KafkaConsumenr

  3. 消费原则:
    一个主题的一个分区只能被一个消费者组中的一个消费者消费
    一个消费者组中的一个消费者可以消费一个主题中的多个分区

  4. 消费者相关的参数:

    • key.deserializer 反序列化
    • value.deserializer
    • bootstrap.servers 集群的位置
    • group.id 消费者组id (为何分组,方便同一组的消费者进行断点续传)
    • auto.commit.interval.ms 自动提交间隔 默认5s
    • enable.auto.commit: 开启自动提交offset偏移量
    • auto.offset.reset: 当offset不存在时,offset重置,默认是最末尾的位置
      • ①新的消费者组,之前没有消费过,没有记录的offset
      • ②当前要消费的offset在kafka中已经不存在,可能是因为时间久了,对应的数据清理掉了
    • 重置策略:
      • earliest: 头,能消费到分区中现有的数据
      • latest: 尾,只能消费到分区中新来的数据
    • isolation.level:事务隔离级别
      • 读未提交
      • 读已提交
  5. 消费数据存在的问题

    • 漏消费,导致数据丢失
    • 重复消费,导致数据重复
  6. shell 创建生产者对象:kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first

public class Flink03_KafkaSource {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);KafkaSource<String> stringKafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092").setGroupId("flink").setTopics("first")//优先使用消费者组记录的Offset进行消费,如果offset不存在,根据策略进行重置.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)).setValueOnlyDeserializer(new SimpleStringSchema())//如果还有别的配置需要指定,统一使用通用方法
//                .setProperty("isolation.level", "read_committed").build();DataStreamSource<String> kafkaDS = env.fromSource(stringKafkaSource, WatermarkStrategy.noWatermarks(), "kafkaDS");kafkaDS.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/200973.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue-template-compiler 的原理

什么是 Vue 模板&#xff1f; Vue 模板是一种用 HTML 语法来描述 Vue 组件的视图的方式&#xff0c; 例如&#xff1a; <template><div class"example">{{ msg }}</div> </template> <script>export default {data() {return {msg: &…

软件测试面试题解析--什么题是必问的?

设计测试用例的主要方法有哪些&#xff1f;简述一下缺陷的生命周期&#xff1f;测试流程&#xff1f;项目流程&#xff1f;验收测试中和β测试区别&#xff1f;如何维护测试用例&#xff1f;每天测多少用例怎么分配的测试的一天能找多少bug你在上一家公司&#xff0c;写没写过测…

Selenium+Unittest+HTMLTestRunner框架更改为Selenium+Pytest+Allure(二)

1 代码框架 整体项目结构如图&#xff1a; Common&#xff1a;公共库 Logs&#xff1a; 日志目录 Page&#xff1a; 页面元素 Report&#xff1a;测试报告 TestCase&#xff1a;测试用例 TestData&#xff1a; 测试数据 2 单模块运行 直接上代码&#xff1a; # -*- coding…

详细介绍如何使用 SSD 进行实时物体检测:单次 MultiBox 探测器-含源码

介绍 在实时对象检测中,主流范例传统上采用多步骤方法,包括边界框、像素或特征重采样以及高质量分类器应用的提议。虽然这种方法已经实现了高精度,但其计算需求往往阻碍了其对实时应用的适用性。然而,单次多框检测器 (SSD) 代表了基于深度学习的对象检测的突破性飞跃。SSD…

SpringCloud | Dubbo 微服务实战——注册中心详解

前言 「作者主页」&#xff1a;雪碧有白泡泡 「个人网站」&#xff1a;雪碧的个人网站 |Eureka,Nacos,Consul,Zookeeper在Spring Cloud和Dubbo中实战 引言 在项目开发过程中&#xff0c;随着项目不断扩大&#xff0c;也就是业务的不断增多&#xff0c;我们将采用集群&#xf…

day7 四数之和为x

vector<vector<int>> fourSum(vector<int>& nums, int target) { vector<vector<int>> result; sort(nums.begin(), nums.end()); for (int k 0; k < nums.size(); k) { // 剪枝处理 if (nums[k] > target && nums[k] > …

647. Palindromic Substrings 516. Longest Palindromic Subsequence

647. Palindromic Substrings Given a string s, return the number of palindromic substrings 回文子串 in it. A string is a palindrome when it reads the same backward as forward. A substring is a contiguous sequence of characters within the string. nomal: …

LeetCode //C - 221. Maximal Square

221. Maximal Square Given an m x n binary matrix filled with 0’s and 1’s, find the largest square containing only 1’s and return its area. Example 1: Input: matrix [[“1”,“0”,“1”,“0”,“0”],[“1”,“0”,“1”,“1”,“1”],[“1”,“1”,“1”,…

图解Spark Graphx实现顶点关联邻接顶点的collectNeighbors函数原理

一、场景案例 在一张社区网络里&#xff0c;可能需要查询出各个顶点邻接关联的顶点集合&#xff0c;类似查询某个人关系比较近的都有哪些人的场景。 在用Spark graphx中&#xff0c;通过函数collectNeighbors便可以获取到源顶点邻接顶点的数据。 下面以一个例子来说明&#…

C语言--每日选择题--Day37

第一题 1. 有以下说明语句&#xff1a;则下面引用形式错误的是&#xff08;&#xff09; struct Student {int num;double score; };struct Student stu[3] {{1001,80}, {1002,75}, {1003,91}} struct Student *p stu; A&#xff1a;p->num B&#xff1a;(p).num C&#…

[论文精读]序列建模使大视觉模型的规模化学习成为可能

本博客是一篇最新论文的精读&#xff0c;论文为UC伯克利大学及约翰霍普金斯大学相关研究者新近(2023.12.1)在arxiv上上传的《Sequential Modeling Enables Scalable Learning for Large Vision Models》 。知名科技新媒体“新智元”给与本文极高评价&#xff0c;并以《计算机视…

STM32(PWM、ADC)

1、PWM 定义 PWM&#xff0c;全称为脉冲宽度调制&#xff08;Pulse Width Modulation&#xff09;&#xff0c;它通过改变信号的高电平和低电平的持续时间比例来控制输出信号的平均功率或电压。 PWM&#xff0c;全称为脉冲宽度调制&#xff08;Pulse Width Modulation&#xff…

FPGA实现电机位置环、速度环双闭环PID控制

一、设计思路 主要设计思路就是根据之前写的一篇FPGA实现电机转速PID控制&#xff0c;前面已经实现了位置环的控制&#xff0c;思想就是通过电机编码器的当前位置值不断地修正PID去控制速度。 那为了更好的实现控制&#xff0c;可以在位置环后加上速度环&#xff0c;实现电机位…

目标检测YOLO系列从入门到精通技术详解100篇-【目标检测】3D目标检测

目录 前言 几个高频面试题目 怎样制作目标检测的训练样本图像? 像元值应该如何进行归一化?

scipy

scipy 是什么常用方法 是什么 scipy是Python语言的一个开源数值计算库&#xff0c;主要目的是为科学、工程、计算等领域提供有用的数学算法和函数&#xff0c;包括线性代数、优化、信号处理、傅里叶变换、统计函数等。它是Python科学计算环境的重要组成部分&#xff0c;通常与N…

2021年GopherChina大会-核心PPT资料下载

一、峰会简介 自 Go 语言诞生以来&#xff0c;中国便是其应用最早和最广的国家之一&#xff0c;根据 Jetbrains 在 2021 年初做的调查报告&#xff0c;总体来说目前大概有 110 万专业的开发者 选择 Go 作为其主要开发语言。就其全球分布而言, 居住在亚洲的开发者最多&#xff…

go学习之goroutine和channel

文章目录 一、goroutine(协程)1.goroutine入门2.goroutine基本介绍-1.进程和线程说明-2.程序、进程和线程的关系示意图-3.Go协程和Go主线程 3.案例说明4.小结5.MPG模式基本介绍6.设置Golang运行的CPU数7.协程并发&#xff08;并行&#xff09;资源竞争的问题8.全局互斥锁解决资…

MySQL 8 update语句更新数据表里边的数据

数据重新补充 这里使用alter table Bookbought.bookuser add userage INT after userphone;为用户表bookuser在userphone列后边添加一个类型为INT的新列userage。 使用alter table Bookbought.bookuser add sex varchar(6) after userage ;为用户表bookuser在userage 列后边添…

Oracle-数据库连接数异常上涨问题分析

问题&#xff1a; 用户的数据库在某个时间段出现连接数异常上涨问题&#xff0c;时间持续5分钟左右&#xff0c;并且问题期间应用无法正常连接请求数据库 从连接数的监控上可以看到数据库平常峰值不到100个连接&#xff0c;在问题时间段突然上涨到400以上 问题分析&#xff1a;…

unity | 动画模块之循环滚动选项框

一、作者的话 评论区有人问&#xff0c;有没有竖排循环轮播选项框&#xff0c;我就写了一个 二、效果动画 如果不是你们想要的&#xff0c;就省的你们继续往下看了 三、制作思路 把移动分成里面的方块&#xff0c;还有背景&#xff08;父物体&#xff09;&#xff0c;方块自…