[转载] KAFKA分布式消息系统

转载自http://blog.chinaunix.net/uid-20196318-id-2420884.html

Kafka[1]是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程状态)。

 

当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线)。高可靠交付对linkedin的日志不是必须的,故可通过降低可靠性来提高性能,同时通过构建分布式的集群,允许消息在系统中累积,使得kafka同时支持离线和在线日志处理。

 

注:本文中发布者(publisher)与生产者(producer)可以互换,订阅者(subscriber)与消费者(consumer)可以互换。

 

Kafka的架构如下图所示:

 

Kafka存储策略

  1. kafka以topic来进行消息管理,每个topic包含多个part(ition),每个part对应一个逻辑log,有多个segment组成。
  2. 每个segment中存储多条消息(见下图),消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。
  3. 每个part在内存中对应一个index,记录每个segment中的第一条消息偏移。
  4. 发布者发到某个topic的消息会被均匀的分布到多个part上(随机或根据用户指定的回调函数进行分布),broker收到发布消息往对应part的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。

 

发布与订阅接口

 

发布消息时,kafka client先构造一条消息,将消息加入到消息集set中(kafka支持批量发布,可以往消息集合中添加多条消息,一次行发布),send消息时,client需指定消息所属的topic。

 

订阅消息时,kafka client需指定topic以及partition num(每个partition对应一个逻辑日志流,如topic代表某个产品线,partition代表产品线的日志按天切分的结果),client订阅后,就可迭代读取消息,如果没有消息,client会阻塞直到有新的消息发布。consumer可以累积确认接收到的消息,当其确认了某个offset的消息,意味着之前的消息也都已成功接收到,此时broker会更新zookeeper上地offset registry(后面会讲到)。

 

 

高效的数据传输

  1. 发布者每次可发布多条消息(将消息加到一个消息集合中发布), sub每次迭代一条消息。
  2. 不创建单独的cache,使用系统的page cache。发布者顺序发布,订阅者通常比发布者滞后一点点,直接使用linux的page cache效果也比较后,同时减少了cache管理及垃圾收集的开销。
  3. 使用sendfile优化网络传输,减少一次内存拷贝。

 

无状态broker

  1. Broker没有副本机制,一旦broker宕机,该broker的消息将都不可用。
  2. Broker不保存订阅者的状态,由订阅者自己保存。
  3. 无状态导致消息的删除成为难题(可能删除的消息正在被订阅),kafka采用基于时间的SLA(服务水平保证),消息保存一定时间(通常为7天)后会被删除。
  4. 消息订阅者可以rewind back到任意位置重新进行消费,当订阅者故障时,可以选择最小的offset进行重新读取消费消息。

 

Consumer group

  1. 允许consumer group(包含多个consumer,如一个集群同时消费)对一个topic进行消费,不同的consumer group之间独立订阅。
  2. 为了对减小一个consumer group中不同consumer之间的分布式协调开销,指定partition为最小的并行消费单位,即一个group内的consumer只能消费不同的partition。

 

Zookeeper 协调控制

1. 管理broker与consumer的动态加入与离开。

2. 触发负载均衡,当broker或consumer加入或离开时会触发负载均衡算法,使得一

   个consumer group内的多个consumer的订阅负载平衡。

3.  维护消费关系及每个partion的消费信息。

 

Zookeeper上的细节:

  1. 每个broker启动后会在zookeeper上注册一个临时的broker registry,包含broker的ip地址和端口号,所存储的topics和partitions信息。
  2. 每个consumer启动后会在zookeeper上注册一个临时的consumer registry:包含consumer所属的consumer group以及订阅的topics。
  3. 每个consumer group关联一个临时的owner registry和一个持久的offset registry。对于被订阅的每个partition包含一个owner registry,内容为订阅这个partition的consumer id;同时包含一个offset registry,内容为上一次订阅的offset。

 

 

消息交付保证

  1. kafka对消息的重复、丢失、错误以及顺序型没有严格的要求。
  2. kafka提供at-least-once delivery,即当consumer宕机后,有些消息可能会被重复delivery。
  3. 因每个partition只会被consumer group内的一个consumer消费,故kafka保证每个partition内的消息会被顺序的订阅。
  4. Kafka为每条消息为每条消息计算CRC校验,用于错误检测,crc校验不通过的消息会直接被丢弃掉。

 

 

Linkedin的应用环境

如下图,左边的应用于日志数据的在线实时处理,右边的应用于日志数据的离线分析(现将日志pull至hadoop或DWH中)。

 

 

 

Kafka的性能

 

测试环境: 2 Linux machines, each with 8 2GHz cores,  16GB  of  memory,  6  disks  with  RAID  10.  The  two machines are  connected  with  a  1Gb  network  link.  One  of  the machines was used as the broker and the other machine was used as the producer or the consumer.

 

测试评价(by me):(1)环境过于简单,不足以说明问题。(2)对于producer持续的波动没有进行分析。(3)只有两台机器zookeeper都省了??

 

测试结果:如下图,完胜其他的message queue,单条消息发送(每条200bytes),能到50000messages/sec,50条batch方式发送,平均为400000messages/sec.

 

 

 

 

Kafka未来研究方向

1. 数据压缩(节省网络带宽及存储空间)

2. Broker多副本

3. 流式处理应用

 

 

参考资料

【1】  http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf

【2】  https://cwiki.apache.org/KAFKA/kafka-papers-and-presentations.data/Kafka-netdb-06-2011.pdf

转载于:https://www.cnblogs.com/scott19820130/p/4736089.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/574374.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java sleep唤醒_JAVA wait(), notify(),sleep详解(转)

在JAVA中,是没有类似于PV操作、进程互斥等相干的方法的。JAVA的进程同步是通过synchronized()来实现的,需要说明的是,JAVA的synchronized()方法类似于操作系统概念中的互斥内存块,在JAVA中的Object类型中,都是带有1个内…

ACM 错排

Description 大家常常感慨,要做好一件事情真的不容易,确实,失败比成功容易多了! 做好“一件”事情尚且不易,若想永远成功而总从不失败,那更是难上加难了,就像花钱总是比挣钱容易的道理一样。 话…

Linux的NTP配置总结

在Linux系统中,为了避免主机时间因为在长时间运行下所导致的时间偏差,进行时间同步(synchronize)的工作是非常必要的。Linux系统下,一般使用ntp服务来同步不同机器的时间。NTP 是网络时间协议(Network Time Protocol)的…

mysql_real_connect阻塞_mysql_real_connect崩溃、未经处理的异常

mysql_real_connect崩溃、未经处理的异常背景近期客户测试软件,功能里有mysql连接问题,在mysql连接失败时,客户机器上出现“已停止工作”界面,而我机器上软件直接退出没有提示自动关闭。查找分析因为是一直用的代码,和…

java中settimeout作用_关于setTimeout的妙用

定义在指定的延迟时间之后调用一个函数或执行一个代码片段这个是setTimeout最主要的功能,但也是很坑的地方,首先javascript其实是运行在单线程的环境下,意味者定时器会在未来的某个时间支持,但是具体的执行的时间并不能够很准确的…

中兴的一道笔试题

今天做了中兴的秋招题目,有一个题以前没有仔细想过,题目我有点儿记不清楚了,大概意思是这样的:有一个循环的单链表,给定该链表的尾指针比给定头指针好么? 我的思路:如下图,这是一个循…

Linux read 命令

Linux read命令用于从标准输入读取数值。 read 内部命令被用来从标准输入读取单行数据。这个命令可以用来读取键盘输入,当使用重定向的时候,可以读取文件中的一行数据。 语法 read [-ers] [-a aname] [-d delim] [-i text] [-n nchars] [-N nchars] […

放图片 java_java怎么在我想要的图片上在放一个我想要的图片

展开全部import javax.imageio.*;import javax.swing.*;import java.awt.*;import java.awt.event.*;import java.awt.geom.*;import java.io.*;import java.util.*;/*** author Hardneedl*/final class PicDemo extends JFrame {public String getTitle() {return "PicDe…

Ext-ajax请求数据

Ext.Ajax.request({url: webPath/news/newsEastmoneyList,method: POST,success: function (response, options) {var data Ext.decode(response.responseText);if(data.success){list.getStore().load({page:1});Ext.Msg.alert(提示,提取成功,共提取data.zg条.);}…

java .net des_DES加密解密 JAVA与.NET互通程序代码

JAVA版本import javax.crypto.Cipher;import javax.crypto.SecretKey;import javax.crypto.SecretKeyFactory;import javax.crypto.spec.DESKeySpec;import javax.crypto.spec.IvParameterSpec;public class Des {private byte[] desKey;//解密数据public static String decryp…

linux 查找文件或者服务

[rootlocalhost ~]# whereis mysql mysql: /usr/bin/mysql /usr/lib/mysql /usr/share/mysql /usr/share/man/man1/mysql.1.gz [rootlocalhost ~]# which mysql /usr/bin/mysql转载于:https://www.cnblogs.com/adolfmc/p/4749224.html

android java 调用js_android WebApp 集成方式怎么使用java调用js

WebAPP集成,本地打包,有两种方式java js通信1.DCloud插件模式,参考SDK DEMO的H5Plugin,Java:继承StandardFeature写接口。public class DBControlPlugin extends StandardFeature{public void PluginTestFunction(IWebview pWebvi…

Android SurfaceView实现静态于动态画图效果

本文是基于Android的SurfaceView的动态画图效果,实现静态和动态下的正弦波画图,可作为自己做图的简单参考,废话不多说,先上图, 静态效果: 动态效果: 比较简单,代码注释的也比较详细&…

java ip调天气预报接口_JAVA + WeatherWebService 实现天气预报接口调取

两步完成接口调取第一步:引入jar包若在线jar包失效,私信我即可。第二步:创建类实现在JAVA项目中创建java类(我的是:WeatherUtil)修改类中cityid为你需要展示的地区id,我的是上海的【cityid查询】WeatherUtil类代码:imp…

数学概念——J - 数论,质因数分解

J - 数论,质因数分解Time Limit:1000MS Memory Limit:32768KB 64bit IO Format:%I64d & %I64u Submit StatusDescription Tomorrow is contest day, Are you all ready? We have been training for 45 days, and all guys must be tired.But , you are…

java map集合排序的_Java对Map集合进行排序

Java对Map集合进行排序实现 Comparator 接口,重写compare方法,完成自定义排序int compare(Object o1, Object o2) 返回一个基本类型的整型如果要按照升序排序,则o1 小于o2,返回-1(负数),相等返回0,01大于02返回1(正数)…

java黄金分割点游戏_结对编程1——黄金点小游戏项目简介及需求分析

一、项目成员2018141461085龚泽楠2018141461012蔡铧荣二、项目名称黄金点小游戏三、项目简介游戏规则: N个同学( N通常大于 10 ),每人写一个 0~100 之间的有理数 (不包括 0或100) ,交给裁判算出所有数字的平均值然后乘以 0.618 (所谓黄金分割…

STL泛型算法总结

泛型算法只是依赖于迭代器的操作,而不是依赖于容器 泛型算法可以分为3大类: 下面的vec和vec2代表相同类型的容器 1.只读算法:只允许读取其输入范围内的元素,而不改变元素 find(vec.cbegin(),vec.cend(),k) 查找算法:前…

java x锁_基于Java名称的锁?

MySQL具有方便的功能:SELECT GET_LOCK("SomeName")这可用于为应用程序创建简单但非常具体的基于名称的锁。但是,它需要数据库连接。我有很多情况,例如:someMethod() {// do stuff to user A for their data for feature…

HDU 2242 考研路茫茫——空调教室

考研路茫茫——空调教室 Time Limit: 2000msMemory Limit: 32768KBThis problem will be judged on HDU. Original ID: 224264-bit integer IO format: %I64d Java class name: Main众所周知,HDU的考研教室是没有空调的,于是就苦了不少不去图书馆的…