【flink】之如何消费kafka数据?

为了编写一个使用Apache Flink来读取Apache Kafka消息的示例,我们需要确保我们的环境已经安装了Flink和Kafka,并且它们都能正常运行。此外,我们还需要在项目中引入相应的依赖库。以下是一个详细的步骤指南,包括依赖添加、代码编写和执行说明。

 1.环境准备

确保你已经安装了Apache Kafka和Apache Flink,并且Kafka正在运行。Kafka的默认端口是9092,而Zookeeper(Kafka依赖的服务)的默认端口是2181

2.Maven项目设置

创建一个新的Maven项目,并在pom.xml中添加以下依赖:

<dependencies>  <!-- Flink dependencies -->  <dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-streaming-java_2.12</artifactId>  <version>1.13.2</version>  </dependency>  <dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-connector-kafka_2.12</artifactId>  <version>1.13.2</version>  </dependency>  <!-- Kafka client dependency -->  <dependency>  <groupId>org.apache.kafka</groupId>  <artifactId>kafka-clients</artifactId>  <version>2.8.0</version>  </dependency>  <!-- Logging -->  <dependency>  <groupId>org.slf4j</groupId>  <artifactId>slf4j-log4j12</artifactId>  <version>1.7.30</version>  </dependency>  
</dependencies>

注意:请根据你使用的Scala或Java版本以及Flink和Kafka的版本调整上述依赖。

3.编写Flink Kafka Consumer代码

import org.apache.flink.api.common.functions.MapFunction;  
import org.apache.flink.streaming.api.datastream.DataStream;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;  import java.util.Properties;  public class FlinkKafkaConsumerDemo {  public static void main(String[] args) throws Exception {  // 设置执行环境  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  // Kafka消费者属性  Properties props = new Properties();  props.put("bootstrap.servers", "localhost:9092");  props.put("group.id", "test-group");  props.put("enable.auto.commit", "true");  props.put("auto.commit.interval.ms", "1000");  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  // 创建Kafka消费者  FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(  "input-topic", // Kafka topic  new SimpleStringSchema(), // 反序列化器  props);  // 添加数据源  DataStream<String> stream = env.addSource(myConsumer);  // 数据处理  stream.map(new MapFunction<String, String>() {  @Override  public String map(String value) throws Exception {  return "Received: " + value;  }  }).print();  // 执行流程序  env.execute("Flink Kafka Consumer Example");  }  // 简单的字符串反序列化器  public static final class SimpleStringSchema implements DeserializationSchema<String> {  @Override  public String deserialize(byte[] message) throws IOException {  return new String(message, "UTF-8");  }  @Override  public boolean isEndOfStream(String nextElement) {  return false;  }  @Override  public TypeInformation<String> getProducedType() {  return BasicTypeInfo.STRING_TYPE_INFO;  }  }  
}

4.执行程序

  1. 确保Kafka正在运行,并且有一个名为input-topic的topic(如果没有,你需要先创建它)。
  2. 编译并运行你的Maven项目

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/49157.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

navicat 导入 sql 遇到的问题

错误1 [Err] 1064 - You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near &#xfeff;SET FOREIGN_KEY_CHECKS 0;DROP TABLE IF EXISTS tmp_tables; CREATE TABLE at line 1 [Err] &…

TypeScript-内置应用程序类型-Recode

文章目录 前言Record<string, any>Record 的语法使用示例在你的代码中的应用示例代码 前言 学习TypeScript 内置应用程序类型 在 TypeScript 中&#xff0c;Record 是一个实用类型&#xff0c;它允许你基于一个键的联合类型和一个值的类型&#xff0c;来创建一个新的对象…

使用约束布局该如何设置哪个视图(UILabel)的内容优先被压缩?

引言 在实际项目开发中&#xff0c;约束布局给我们带来了很大的便利&#xff0c;可以帮助我们创建灵活且响应迅速的用户界面。通常情况下&#xff0c;它都能很好地工作&#xff0c;但在一些包含许多UILabel的场景中&#xff0c;比如会话列表的UI&#xff0c;哪个视图会被优先压…

《0基础》学习Python——第二十讲__网络爬虫/<3>

一、用post请求爬取网页 同样与上一节课的get强求的内容差不多&#xff0c;即将requests.get(url,headershead)代码更换成requests.post(url,headershead),其余的即打印获取的内容&#xff0c;如果content-typejson类型的&#xff0c;打印上述代码的请求&#xff0c;则用一个命…

二叉树---二叉搜索树的最小绝对差

题目&#xff1a; 给你一个二叉搜索树的根节点 root &#xff0c;返回 树中任意两不同节点值之间的最小差值 。 差值是一个正数&#xff0c;其数值等于两值之差的绝对值。 思路&#xff1a;中序遍历二叉搜索树的顺序是递增序列&#xff0c;将该序列中的相邻值两两比较找到最…

约定(模拟赛2 T3)

题目描述 小A在你的帮助下成功打开了山洞中的机关&#xff0c;虽然他并没有找到五维空间&#xff0c;但他在山洞中发现了无尽的宝藏&#xff0c;这个消息很快就传了出去。人们为了争夺洞中的宝藏相互陷害&#xff0c;甚至引发了战争&#xff0c;世界都快要毁灭了。小A非常地难…

2024论文精读:利用大语言模型(GPT)增强上下文学习去做关系抽取任务

文章目录 1. 前置知识2. 文章通过什么来引出他要解决的问题3. 作者通过什么提出RE任务存在上面所提出的那几个问题3.1 问题一&#xff1a;ICL检索到的**示范**中实体个关系的相关性很低。3.2 问题二&#xff1a;示范中缺乏解释输入-标签映射导致ICL效果不佳。 4. 作者为了解决上…

【Git】(基础篇七)—— IntelliJIDEA集成Git

InteliJ IDEA集成Git 现在有很多的集成工具帮助我们写代码&#xff0c;使用这些工具可以帮助我们加速写代码&#xff0c;很多工具也可以集成git&#xff0c;使用图形工具管理git&#xff0c;相信了解了底层运行逻辑的你能够很快地上手使用这些工具&#xff0c;本文以InteliJ I…

7 Vue3

相比 Vue2 1. 优点 vue3支持vue2的大多数特性&#xff0c;实现对vue2的兼容vue3对比vue2具有明显的性能提升 打包大小减少41%初次渲染快55%&#xff0c;更新快133%内存使用减少54% 更好的支持TypeScript使用Proxy代替defineProperty实现响应式数据 2. 性能提升的原因 静态标…

jmeter-beanshell学习11-从文件获取指定数据

参数文件里的参数可能过段时间就不能用了&#xff0c;需要用新的参数。如果有多个交易&#xff0c;读不同的参数文件&#xff0c;但是数据还是一套&#xff0c;就要改多个参数文件。或者只想执行参数文件的某一行数据&#xff0c;又不想调整参数文件顺序。 第一个问题目前想到…

数学基础 -- 泰勒展开式

泰勒展开 泰勒展开是将一个函数在某点附近展开成幂级数的工具。具体来说&#xff0c;对于一个在某点 a a a处具有 n n n阶导数的函数 f ( x ) f(x) f(x)&#xff0c;其泰勒展开式为&#xff1a; f ( x ) f ( a ) f ′ ( a ) ( x − a ) f ′ ′ ( a ) 2 ! ( x − a ) 2 f …

Ant Design Vue中日期选择器快捷选择 presets 用法

ant写文档的纯懒狗 返回的是一个day.js对象 范围选择时可接受一个数组 具体参考 操作 Day.js 话不多说 直接上代码 <a-range-pickerv-model:value"formData.datePick"valueFormat"YYYY-MM-DD HH:mm:ss"showTime:presets"presets"change&quo…

Linux命令更新-文本处理sed

简介 sed&#xff08;Stream Editor&#xff09;是一种功能强大的文本处理工具&#xff0c;用于非交互式编辑文本文件。它可以对输入流进行一系列文本处理操作&#xff0c;例如查找、替换、删除、添加等&#xff0c;是运维人员和开发人员必备的利器。相比于其他文本编辑工具&a…

(前缀和) LeetCode 238. 除自身以外数组的乘积

一. 题目描述 原题链接 给你一个整数数组 nums&#xff0c;返回 数组 answer &#xff0c;其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法&…

敏捷开发适用于哪些项目?你用对了吗?

敏捷项目管理是一种适用于快速变化和不确定性高的项目环境的项目管理方法。因此&#xff0c;敏捷项目管理在软件开发、信息技术、互联网、市场营销、教育培训等领域得到了广泛的应用。 一、敏捷开发适合哪些项目&#xff1f; 1、需求频繁变化的项目 在传统的瀑布模型中&#…

WebRTC通话原理(SDP、STUN、 TURN、 信令服务器)

文章目录 1.媒体协商SDP简介 2.网络协商STUN的工作原理TURN工作原理 3.信令服务器信令服务器的主要功能信令服务器的实现方式 1.媒体协商 比如下面这个例子 A端与B端要想通信 A端视频采用VP8做解码&#xff0c;然后发送给B端&#xff0c;B端怎么解码&#xff1f; B端视频采用…

缓存技术:提升性能与效率的利器

在当今数字化时代&#xff0c;软件应用的性能与响应速度成为了衡量其成功与否的重要标准之一。随着数据量的爆炸性增长和用户需求的日益多样化&#xff0c;如何高效地处理这些数据并快速响应用户请求成为了软件开发中亟待解决的问题。缓存技术&#xff0c;作为提升系统性能、优…

【JavaScript】延迟加载 js 脚本

defer 属性&#xff1a;在 HTML 中通过设置 script 标签的 defer 属性来实现脚本的延迟加载&#xff0c;即脚本的下载与 HTML 的解析不会阻塞彼此&#xff0c;脚本会在 HTML 解析完成后才执⾏。async 属性&#xff1a;在 HTML 中通过设置 script 标签的 async 属性来实现脚本的…

【论文共读】【翻译】ShuffleNet v1:一种用于移动设备的极其高效的卷积神经网络

[原文地址] https://arxiv.org/pdf/1707.01083 [翻译] 0. 摘要 我们介绍了一种计算效率极高的CNN架构&#xff0c;称为ShuffleNet&#xff0c;该架构专为计算能力非常有限的移动设备&#xff08;例如&#xff0c;10-150 MFLOPs&#xff09;而设计。新架构利用了两个新操作&am…

Ubuntu 22.04.4 LTS (linux) Tomcat 下载 安装配置详细教程

1 官网下载 下载链接 2 ubuntu 服务器安装 #下载 wget https://dlcdn.apache.org/tomcat/tomcat-9/v9.0.91/bin/apache-tomcat-9.0.91.tar.gz #解压 tar zxvf apache-tomcat-9.0.91.tar.gz sudo mv apache-tomcat-9.0.91/ /data/tomcat #配置环境变量 sudo vi /etc/profil…