Apache Kafka简介

什么是Apache Kafka?

Apache Kafka是一个分布式流系统,具有发布和订阅记录流的功能。 在另一方面,它是企业消息传递系统。 它是一个快速,水平可扩展和容错的系统。 Kafka有四个核心API,

生产者API:

该API允许客户端连接到集群中运行的Kafka服务器,并将记录流发布到一个或多个Kafka主题。

使用者API:

该API允许客户端连接到集群中运行的Kafka服务器,并使用一个或多个Kafka主题的记录流。 卡夫卡的消费者是直接从卡夫卡主题的消息。

流API:

此API通过使用来自一个或多个主题的流并将这些流生成到其他输出主题,从而使客户端充当流处理器。 这允许转换输入和输出流。

连接器API:

该API允许编写可重用的生产者和使用者代码。 例如,如果我们要从任何RDBMS读取数据以将数据发布到主题,并使用主题中的数据并将其写入RDBMS。 使用连接器API,我们可以为各种数据源创建可重用的源连接器和接收器连接器组件。

Kafka用于什么用例?

Kafka用于以下用例,

讯息系统:

Kafka用作企业消息传递系统,以分离源系统和目标系统以交换数据。 与JMS相比,Kafka具有分区的高吞吐量和具有复制功能的容错能力。

网络活动跟踪:

跟踪网站上的用户旅程事件以进行分析和脱机数据处理。

日志汇总:

处理来自各种系统的日志。 特别是在具有微服务架构的分布式环境中,该系统将系统部署在各种主机上。 我们需要汇总来自各种系统的日志,并在中央位置提供日志以进行分析。 浏览有关使用Kafka的分布式日志记录体系结构的文章https://smarttechie.org/2017/07/31/distributed-logging-architecture-for-micro-services/

指标收集器:

Kafka用于从各种系统和网络收集指标以进行操作监视。 Kafka指标报告器可用于监控工具,例如Ganglia , Graphite等…

关于此https://github.com/stealthly/metrics-kafka的一些参考

什么是经纪人?

Kafka群集中的一个实例称为代理。 在Kafka群集中,如果您连接到任何一个代理,您将可以访问整个群集。 我们连接到访问群集的代理实例也称为引导服务器。 每个代理由集群中的数字ID标识。 首先从Kafka集群开始,三个经纪人是一个不错的选择。 但是有些集群中有数百个经纪人。

什么是主题?

主题是将记录发布到的逻辑名称。 在内部,该主题分为数据发布到的分区。 这些分区分布在群集中的代理之间。 例如,如果一个主题有三个分区,群集中有3个代理,则每个代理都有一个分区。 要发布到分区的数据仅附加偏移量增量。

以下是在使用分区时需要记住的几点。

  • 主题由其名称标识。 一个集群中可以有很多主题。
  • 消息的顺序保持在分区级别,而不是跨主题。
  • 一旦写入分区的数据不被覆盖。 这称为不变性。
  • 分区中的消息与键,值和时间戳一起存储。 Kafka确保将给定密钥的消息发布到同一分区。
  • 在Kafka群集中,每个分区都有一个领导者,该领导者将对该分区进行读/写操作。

在上面的示例中,我创建了一个包含三个具有复制因子3的分区的主题。在这种情况下,由于集群具有3个代理,所以这三个分区是均匀分布的,并且每个分区的副本都被复制到另外2个代理中。 由于复制因子为3,因此即使有2个代理发生故障,也不会丢失数据。 始终保持复制因子大于1且小于或等于集群中的代理数目。 您创建的主题的复制因子不能超过集群中代理的数量。

在上图中,每个分区都有一个领导者(发光分区),其他同步副本(灰色分区)是跟随者。 对于分区0,broker-1是领导者,broker-2,broker-3是跟随者。 对分区0的所有读/写将进入Broker-1,并且相同的内容将被复制到Broker-2和Broker-3。

现在,让我们按照以下步骤创建具有3个代理的Kafka集群。

步骤1:

下载Apache Kafka最新版本。 在此示例中,我使用的是最新的1.0。 解压缩文件夹并移到bin文件夹中。 启动Zookeeper,这对于从Kafka集群开始至关重要。 Zookeeper是一种协调服务,用于管理代理,对分区的领导者选举以及在主题更改(删除主题,创建主题等)或代理(添加代理,代理人代理等)更改期间向Kafka发出警报。 在此示例中,我仅启动了一个Zookeeper实例。 在生产环境中,我们应该有更多的Zookeeper实例来管理故障转移。 如果没有Zookeeper,Kafka群集将无法正常工作。

./zookeeper-server-start.sh ../config/zookeeper.properties

第2步:

现在开始卡夫卡经纪人。 在此示例中,我们将启动三个经纪人。 转到Kafka根目录下的config文件夹,并将server.properties文件复制3次,并将其命名为server_1.properties,server_2.properties和server_3.properties。 在这些文件中更改以下属性。

#####server_1.properties#####
broker.id=1
listeners=PLAINTEXT://:9091
log.dirs=/tmp/kafka-logs-1#####server_2.properties######
broker.id=2
listeners=PLAINTEXT://:9092
log.dirs=/tmp/kafka-logs-2######server_3.properties#####
broker.id=3
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-3M

现在,使用以下命令运行3个代理。

###Start Broker 1 #######
./kafka-server-start.sh ../config/server_1.properties###Start Broker 2 #######
./kafka-server-start.sh ../config/server_2.properties###Start Broker 3 #######
./kafka-server-start.sh ../config/server_3.properties

第三步:

使用以下命令创建主题。

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic first_topic

第四步:

使用Kafka控制台生成器,向上一步中创建的主题生成一些消息。 对于控制台生产者,请提及任何中间商地址。 这将是引导服务器,用于访问整个群集。

./kafka-console-producer.sh --broker-list localhost:9091 --topic first_topic
>First message
>Second message
>Third message
>Fourth message
>

步骤5:

使用Kafka控制台使用者使用消息。 对于Kafka消费者,请提及任何一个代理地址作为引导服务器。 请记住,在阅读邮件时,您可能看不到订单。 由于顺序是在分区级别而不是主题级别维护的。

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --from-beginning

如果需要,可以使用以下命令描述该主题以查看分区的分布方式以及每个分区的领导者。

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic first_topic#### The Result for the above command#####
Topic:first_topic	PartitionCount:3	ReplicationFactor:3	Configs:Topic: first_topic	Partition: 0	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3Topic: first_topic	Partition: 1	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1Topic: first_topic	Partition: 2	Leader: 3	Replicas: 3,1,2	Isr: 3,1,2

在上面的描述中,broker-1是分区0的领导者,broker-1,broker-2和broker-3具有每个分区的副本。

在下一篇文章中,我们将看到生产者和消费者JAVA API。 直到那时, 快乐消息!!!

翻译自: https://www.javacodegeeks.com/2017/11/introduction-apache-kafka.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/348939.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

oracle查看存储过程最近编译,Oracle恢复被误编译覆盖的存储过程

同事在写Oracle存储过程时候,是在以前已经写好的过程基础上修改的,想换个名字,由于疏忽没有改名字就编译了,编译完才意识到。这时原来的那个已经没有了。找我想办法恢复回原来的那个过程。通过查资料想到个方法,也不知…

oracle安装 redo log file,Oracle Dump Redo Log File 说明

关于Dump redo log 的示例,MOS 上的文档:[ID 1031381.6] 有详细说明。Dump 有两种方式:(1)使用一. dump redo 说明关于Dump redo log 的示例,MOS 上的文档:[ID 1031381.6] 有详细说明。Dump 有两种方式:(1)…

unity 飞机 残骸模型_训练残骸模式– Java 8中的改进实现

unity 飞机 残骸模型Venkat Subramaniam在今天的演讲中提到了有关“级联方法”模式或“火车残骸”模式的内容,如下所示: >someObject.method1().method2().method3().finalResult()很少有人会将此与构建器模式相关联,但事实并非如此。 无…

datastage配置oracle,IBM Datastage8.5配置问题

大家好,最近因学习需要,在虚拟机REHL5.5上安装了IBM Datastage8.5的服务器端,在windows端安装客户端,调试连接时,提示密码不正确,我修改了密码,重启了服务器,还是提示密码不正确&…

JSON模式在构建和部署API中的作用

什么是JSON模式 ? 它提供了描述任何JSON值的结构和属性的详尽方法。 在记录对任何JSON API的请求和响应时,它非常有用。 本文将探讨其在API的软件开发周期中的作用。 记录JSON响应格式 定义数据架构的最明显的用例也许是在记录API响应的结构。 让我们来…

taskctl调oracle存储过程,TASKCTL常见问题和解决方法(FAQ)

该楼层疑似违规已被系统折叠 隐藏此楼查看此楼转载自 TASKCTL部分FAQ--技术交流|业界领先ETL批量调度专家系统安装与维护:A1、安装时环境变量已设置,但安装时依然提示环境变量未设置?答:该问题主要是因为环境变量配置未生效。先检…

oracle jet auto,如何启用sqlplus的AutoTrace功能

SQL> set autotrace traceonlySQL> select table_name from user_tables;已选择98行。已用时间: 00: 00: 00.04Execution Plan----------------------------------------------------------0 SELECT STATEMENT OptimizerCHOOSE1 0 NESTED LOOPS2 1 NESTED LOOPS (OUTER)3…

使用Docker构建Oracle ADF应用程序

最近,我的一个好朋友在使用公共Oracle Maven存储库构建ADF应用程序v.12.2.1.2时经常遇到问题。 他要求我检查它是否对我有用。 好吧……没有。 因此,存储库存在一些问题。 为了使实验整洁并避免对我的工作环境造成任何影响,我决定在docker容器…

linux权限c是什么意思,linux服务器下权限设置

1.在终端输入命令 sudo chmod -R 777 /opt/Tomcat,那么Tomcat文件夹和它下面的所有子文件夹的属性都变成了777(读/写/执行权限)2.bin/sh^M:损坏的解释器: 没有那个文件或目录 错误?解决方法:脚本文件保存时使用了DOS格式,用DOS2UN…

linux内存不足时缩减缓存,Linux内存及页面缓存管理概要总结

物理内存管理页面内存管理Linux把物理内存划分为若干个大小相同(通常是4k)的页面,每个页面使用struct page描述,在内核初始化时会根据物理内存大小和页面大小,初始化一个struct page数组mem_map[]对系统中所有的页面进行统一管理。物理页面描…

jpa 异常捕获_JPA和CMT –为什么捕获持久性异常不够?

jpa 异常捕获使用CMT( 容器管理的事务 )进入EJB和JPA的世界非常舒适。 只需定义一些注释来划分事务边界(或使用默认值)即可,仅此而已–无需摆弄手动开始,提交或回滚操作。 回滚事务的一种方法是从EJB的业务…

linux 线程等待时间,线程超时等待方法 linux中select()函数使用

线程超时等待方法 linux中select()函数使用select系统调用时用来让我们的程序监视多个文件句柄的状态变化的。程序会停在select这里等待,直到被监视的文件句柄有一个或多个发生了状态改变。关于文件句柄,其实就是一个整数,通过socket函数的声…

使用Spring @Transactional进行数据源路由

卡尔帕帕(Carl Papa)在Spring框架中使用方面来确定要使用的DataSource (读写或只读)启发了我。 所以,我正在写这篇文章。 我必须承认,我对Spring的AbstractRoutingDataSource早已熟悉。 但是我不知道在哪里…

linux应用程序是什么,linux下c开发了一个应用程序,它的扩展名是什么?

这个貌2113似涉及到linux文件系统。linux文件系统与windows的不相同5261,windows系统应该是通4102过文件后缀来进行文件类型是别的,而1653linux虽然也会借鉴后缀信息,但好像不完全依赖于文件后缀。比如,在windows下,讲…

linux服务器带宽设置,linux 系统查看服务器带宽使用

使用linux服务器中,可能刚接触的同学不会查看系统的带宽使用,这里收集了一个脚本,可以实时显示服务器的带宽使用喔~vi cs.sh然后粘贴以下内容:#!/bin/bashif [ -z "$1" ]; thenechoecho usage: $0 network-interfaceech…

moxy json介绍_MOXy的对象图– XML和JSON的输入/输出局部模型

moxy json介绍假设您有一个要公开为RESTful服务的域模型。 问题是您只想输入/输出部分数据。 以前,您将创建一个代表子集的单独模型,然后使用代码在模型之间移动数据。 在EclipseLink 2.5.0中,我们有一个称为“对象图”的新功能,使…

linux设置新硬盘权限,Linux 下挂载新硬盘以及更改为普通权限

1、启动终端,以root用户登录2、查看硬盘信息:#fdisk -l3、进入磁盘,对磁盘进行分区:#fdisk /dev/sda(注意看你要挂载哪一个磁盘,我的是sda,有的是sdb)4、格式化分区:#mkfs.ext3 /dev/sda1 //注&…

使用Payara Micro的Easy Java EE Microservices

想知道如何开始使用Java EE Microservices? 使用Java EE API只需很少的步骤即可部署微服务。 许多人认为Java EE无法与微服务一起使用,但事实并非如此……特别是如果您仅使用服务所需的Java EE规范。 在这篇简短的文章中,我将演示如何使用Jav…

linux终端lex程序运行,lex的简单使用

Lex & Flex 简介Lex是lexical compiler的缩写,是Unix环境下非常著名的工具, Lex (最早是埃里克施密特和 Mike Lesk 制作)是许多 UNIX 系统的标准词法分析器(lexical analyzer)产生程式,而且这个工具所作的行为被详列为 POSIX 标准的一部分…

Linux内存page,Linux虚拟内存管理 - Page Table的作用

虚拟内存的作用:1.扩展实际有限的物理内存,当然这种扩展是虚拟的,比如物理内存512M,对于一个需要1G空间的进程来说,照样可以运行。这增加了操作系统是应用范围。2.使得进程中的数据空间增大,增大到多少与硬…