flink和kafka区别_Apache Flink和Kafka入门

flink和kafka区别

介绍

Apache Flink是用于分布式流和批处理数据处理的开源平台。 Flink是具有多个API的流数据流引擎,用于创建面向数据流的应用程序。

Flink应用程序通常使用Apache Kafka进行数据输入和输出。 本文将指导您逐步使用Apache Flink和Kafka。

弗林克·卡夫卡

先决条件

  • Apache Kafka 0.9.x
  • 吉特
  • Maven 3.x或更高版本

创建您的Flink流项目

第一步是创建Java应用程序,最简单的方法是使用flink-quickstart-java原型,其中包含核心依赖关系和打包任务。 本文与Apache Flink快速入门示例相似,重点明确介绍了MapR Streams的数据输入和输出。

在此应用程序中,我们将创建两个作业:

  • WriteToKafka :生成随机字符串,然后使用Kafka Flink连接器及其Producer API将其发布到MapR Streams主题。
  • ReadFromKafka :读取相同的主题,并使用Kafka Flink连接器及其使用者在标准输出中打印消息。 API。

完整项目可在GitHub上找到:

  • Flink和Kakfa应用

让我们使用Apache Maven创建项目:

mvn archetype:generate \-DarchetypeGroupId=org.apache.flink\-DarchetypeArtifactId=flink-quickstart-java \-DarchetypeVersion=1.1.2 \-DgroupId=com.grallandco.demos \-DartifactId=kafka-flink-101 \-Dversion=1.0-SNAPSHOT \-DinteractiveMode=false

Maven将创建以下结构:

tree kafka-flink-101/
kafka-flink-101/
├── pom.xml
└── src└── main├── java│   └── com│       └── grallandco│           └── demos│               ├── BatchJob.java│               ├── SocketTextStreamWordCount.java│               ├── StreamingJob.java│               └── WordCount.java└── resources└── log4j.properties7 directories, 6 files

此项目配置为创建一个Jar文件,该文件包含您的flink项目代码,还包括运行它所需的所有依赖关系。

该项目包含其他一些示例工作,本文不需要它们,您可以将其用于教育目的,也可以将其从项目中删除。

添加Kafka连接器

打开pom.xml并将以下依赖项添加到您的项目中:

第一步,我们必须添加Flink Kafka连接器作为依赖项,以便我们可以使用Kafka接收器。 将此添加到“依赖项”部分的pom.xml文件中:

您现在必须添加Flink Kafka Connector依赖项才能使用Kafka接收器。 在<dependencies>元素中添加以下条目:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.9_2.10</artifactId><version>${flink.version}</version></dependency>

Flink项目现在准备通过Kafka连接器使用DataStream,因此您可以从Apache Kafka发送和接收消息。

安装并启动Kafka

下载Kafka,在终端中输入以下命令:

curl -O http://www.us.apache.org/dist/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
tar -xzf kafka_2.11-0.9.0.0.tgz
cd kafka_2.11-0.9.0.0

Kafka使用ZooKeeper,如果您没有运行Zookeeper,则可以使用以下命令启动它:

./bin/zookeeper-server-start.sh config/zookeeper.properties

通过在新终端中运行以下命令来启动Kafka代理:

./bin/kafka-server-start.sh config/server.properties

在另一个终端中,运行以下命令来创建一个名为flink-demo的Kafka主题:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-demo

使用Kafka工具将消息发布和使用到flink-demo主题。

制片人

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-demo

消费者

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic flink-demo --from-beginning

在生产者窗口中,您可以发布一些消息,并在消费者窗口中查看它们。 我们将使用这些工具来跟踪Kafka和Flink之间的交互。

编写您的Flink应用程序

现在让我们使用Flink Kafka Connector将消息发送到Kafka并使用它们。

制片人

生产者使用SimpleStringGenerator()类生成消息,然后将字符串发送到flink-demo主题。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", “localhost:9092"); DataStream<String> stream = env.addSource(new SimpleStringGenerator());stream.addSink(new FlinkKafkaProducer09<>("flink-demo", new SimpleStringSchema(), properties));env.execute();}

SimpleStringGenerator()方法代码在此处提供 。

主要步骤是:

  • 在任何Flink应用程序的基础上创建一个新的StreamExecutionEnvironment
  • 在应用程序环境中创建一个新的DataStream时, SimpleStringGenerator类将Flink中所有流数据源的Source接口实现SourceFunction 。
  • FlinkKafkaProducer09器添加到主题。

消费者

使用者只需从flink-demo主题中读取消息,然后将它们打印到控制台中即可。

public static void main(String[] args) throws Exception {// create execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", “localhost:9092");properties.setProperty("group.id", "flink_consumer");DataStream<String> stream = env.addSource(new FlinkKafkaConsumer09<>("flink-demo", new SimpleStringSchema(), properties) );stream.map(new MapFunction<String, String>() {private static final long serialVersionUID = -6867736771747690202L;@Overridepublic String map(String value) throws Exception {return "Stream Value: " + value;}}).print();env.execute();}

主要步骤是:

  • 在任何Flink应用程序的基础上创建一个新的StreamExecutionEnvironment
  • 使用消费者信息创建一组属性,在此应用程序中,我们只能设置消费者group.id
  • 使用FlinkKafkaConsumer09从主题flink-demo获取消息

生成并运行应用程序

让我们直接从Maven(或从您最喜欢的IDE)运行应用程序。

1-建立专案:

$ mvn clean package

2-运行Flink生产者作业

$ mvn exec:java -Dexec.mainClass=com.mapr.demos.WriteToKafka

3-运行Flink消费者工作

$ mvn exec:java -Dexec.mainClass=com.mapr.demos.ReadFromKafka

在终端中,您应该看到生产者生成的消息

现在,您可以在Flink群集上部署并执行此作业。

结论

在本文中,您学习了如何将Flink与kafka结合使用来写入和读取数据流。

翻译自: https://www.javacodegeeks.com/2016/10/getting-started-apache-flink-kafka.html

flink和kafka区别

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/335625.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

solr cloud 更新 solrconfig 配置_Solr各版本新特性「4.x,5.x,6.x,7.x」

一.Solr4.x新特性1.近实时搜索Solr的近实时搜索【Near Real-Time&#xff0c;NRT】功能实现了文档添加到搜索的快速进行&#xff0c;以应对搜索快速变化的数据。2.原子更新与乐观并发原子更新功能允许客户端应用对已有文档上进行添加、更新、删除和对字段增值等操作&#xff0c…

junit数据驱动测试_使用Junit和Easytest进行数据驱动的测试

junit数据驱动测试在本文中&#xff0c;我们将看到如何使用Junit进行数据驱动的测试。 为此&#xff0c;我将使用一个名为EasyTest的库。 我们知道&#xff0c;对于TestNG&#xff0c;它已内置了数据提供程序。 通过简单的测试&#xff0c;我们可以使用Junit进行数据驱动的测试…

HH SaaS电商系统的出库功能模块设计

文章目录出库单业务流程基本流程扩展流程找不到符合条件的仓库&#xff0c;要求部分退款&#xff08;未生成出库单时&#xff09;找不到符合条件的仓库&#xff0c;全部退款&#xff08;未生成出库单时&#xff09;找不到符合条件的仓库&#xff0c;等待库存补足&#xff08;未…

java 拼图_拼图项目的诅咒:为什么Java 9一遍又一遍地延迟?

java 拼图JDK 9发行日期推迟到2017年7月 距JDK 9发行不到200天&#xff0c;它又被推迟了 。 新的发布日期已更新为2017年7月&#xff0c;比之前推迟的日期晚了四个月。 推迟日期 9月13日&#xff0c;Oracle Java平台小组的首席架构师Mark Reinhold发表了他的建议&#xff0c;…

mysql数据库增删改查关键字_mysql数据库的增删改查

数据库基本操作&#xff1a;增删改查#DML语言/*数据操作语言&#xff1a;插入&#xff1a;insert修改&#xff1a;update删除&#xff1a;delete*/1.增插入语句的方式一表已经存在啦&#xff0c;我们需要往里面插入数据/*语法&#xff1a;insert into 表名(列名,…) values(值1…

HH SaaS电商系统的采购功能模块设计

文章目录如何生成采购单系统生成采购单的流程基本流程扩展流程找不到符合条件的供应商&#xff0c;要求部分退款&#xff08;初次生成采购单时&#xff09;找不到符合条件的供应商&#xff0c;要求全部退款&#xff08;初次生成采购单时&#xff09;指定供应商的库存不足&#…

HH SaaS电商系统的入库功能模块设计

文章目录创建入库单的场景创建入库单的业务流程商品直接入库内部仓退货入库&#xff08;内部仓&#xff09;换货入库&#xff08;内部仓&#xff09;退货入库&#xff08;外部仓&#xff09;换货入库&#xff08;外部仓&#xff09;备货入库&#xff08;内部仓&#xff09;备货…

接口方法javadoc注释_继承Javadoc方法注释

接口方法javadoc注释尽管用于javadoc工具的JDK工具和实用程序页面通过实现和继承方法来描述Javadoc方法注释重用的规则&#xff0c;但是当实际上不需要使用{inheritDoc}时&#xff0c;很容易不必要地显式描述注释继承&#xff0c;因为会使用相同的注释隐式继承。 Java 8 javado…

redis java 监听_从零手写实现redis(四)添加监听器

前言java从零手写实现redis&#xff08;一&#xff09;如何实现固定大小的缓存&#xff1f;java从零手写实现redis&#xff08;三&#xff09;redis expire 过期原理java从零手写实现redis&#xff08;三&#xff09;内存数据如何重启不丢失&#xff1f;本节&#xff0c;让我们…

drill apache_如何指南:Apache Drill入门

drill apacheApache Drill是一个引擎&#xff0c;可以连接到许多不同的数据源&#xff0c;并为它们提供SQL接口。 它不仅是遍历任何复杂事物SQL界面&#xff0c;而且是功能强大的界面&#xff0c; 其中包括对许多内置函数和窗口函数的支持。 尽管它可以连接到可以使用SQL进行查…

mac mysql 重设密码_Mac下忘记mysql密码重新设置密码的图文教程

MySQL 文件在路径/usr/local/mysql下1&#xff0c; 在系统偏好设置中关闭 mysql &#xff1a; Stop MySQL Server2 &#xff0c;打开终端进入路径  /usr/local/mysql/bin输入命令 sudo su&#xff0c; 然后输入开机密码。然后输入命令&#xff1a;./mysqld_safe –skip-grant-…

activiti 变量_如何在Activiti中使用瞬态变量

activiti 变量我们昨天发布的Activiti v6 Beta3中已经加入了很多需要的功能-临时变量。 在这篇文章中&#xff0c;我将向您展示一个示例&#xff0c;说明如何使用瞬态变量来覆盖一些以前不可能&#xff08;或最佳&#xff09;的高级用例。 到目前为止&#xff0c;Activiti中的…

erosa mysql_MySQL协议和canal实现

前言前面的文章里&#xff0c;我们了解到 canal 可以从 MySQL 中感知数据的变化。这是因为它模拟 MySQL slave 的交互协议&#xff0c;伪装自己为 MySQL slave &#xff0c;从而实现了主从复制。正是了解到这一点&#xff0c;笔者有两个问题便一直萦绕于心&#xff1a;它是如何…

HH SaaS电商系统的供应商系统设计

供应商信息结构图 供应商类型 商城的供应商划分为专享型、共享型两种&#xff0c;但是租户和店铺供应商则都是“专享型”的。 共享型供应商发布的商品归属供应商自己的&#xff0c;商品档案供应商才有资格管理&#xff0c;所以spu_base需要保存供应商id&#xff0c;有供应商id…

aws faas_带有AWS Lambda和Java的无服务器FaaS

aws faas什么是无服务器架构&#xff1f; 无服务器架构在由第三方完全管理的临时容器中运行自定义代码。 自定义代码通常只是完整应用程序的一小部分。 也称为函数 。 这为无服务器架构提供了另一个名称&#xff0c;即功能即服务 &#xff08;FaaS&#xff09;。 该容器是短暂的…

跨境商品的进口税额显示

跨境商品的采购类型有三种&#xff1a;直邮、保税、一般贸易&#xff0c;而一般贸易的商品已经清关入境了&#xff0c;虽然是跨境商品&#xff0c;但是无需再清关&#xff0c;所以商品详情页无需显示进口税相关信息。 直邮跨境商品显示的进口税信息如下图所示&#xff1a; 保税…

coreldraw x8段落_CDR X8设置自定义文字为默认字体(二)

通过上一篇文章的介绍&#xff0c;我们已经了解到了在CorelDRAW中如何自定义设置默认字体&#xff0c;相关阅读可参阅&#xff1a;CDR X8设置文字为默认字体。其实在CorelDRAW软件中给用户提供方式不止是一种&#xff0c;本文将介绍更多关于设置默认字体的方法。1. 打开CorelDR…

jhipster_JHipster入门,第3部分

jhipster欢迎回到本JHipster教程系列&#xff01; 在第一部分中&#xff0c;我们介绍了如何创建整体应用程序。 在第二部分中 &#xff0c;我们逐步创建了一个微服务应用程序&#xff08;这有点复杂&#xff09;。 对于那些正在努力使JHipster正常运转的人&#xff0c;我想着重…

jhipster_JHipster入门,第2部分

jhipster所以你回来了&#xff01; 在本系列的最后一部分中 &#xff0c;我们采用了单片路线创建了一个JHipster应用程序。 这是红色药丸路线&#xff1b; 生活几乎与您习惯的一样。 但是也许您喜欢挑战。 也许您想超越红色药丸并尝试蓝色药丸。 在这种情况下&#xff0c;Blue…

HH SaaS电商系统的虚拟资金账户(钱包余额)设计

文章目录方案一&#xff0c;将资金账户抽象出来虚拟资金账户余额流水记录实体方案二&#xff0c;用户表直接保存资金余额余额流水记录实体方案一&#xff0c;将资金账户抽象出来 虚拟资金账户 P.S. 如果机构代码和消费代码有区分类型&#xff0c;那么资金账户表中就不必保存“…