Kafka基础
1 消息队列
1.1 什么是消息队列
消息队列(MQ):消息+队列,保存消息的队列。消息的传输过程中的容器;主要提供生产、消费接口供外部调用做数据的存储和获取。
1.2 为什么要有消息队列
当网站面对教大的流量冲击,在网站系统中一般会有一个消息存储/缓存系统(即消息队列,也叫消息中间件),网站就可以按照自己服务负载的能力来消费这些消息。
1.3 消息队列分类
主要分为两类:点对点(p2p)、发布订阅(Pub/Sub)
(1)Peer-to-Peer
一般基于Pull或者Polling接收数据 ,发送到队列中的消息被一个而且仅仅一个接收者所接受,即使有多个接收者在同一个队列中侦听同一消息。即支持异步“即发即收”的消息传递方式,也支持同步请求/应答传送方式 。
(2)发布订阅
发布到同一个主题的消息,可被多个订阅者所接收 发布/订阅即可基于Push消费数据,也可基于Pull或者Polling消费数据。解耦能力比P2P模型更强
点对点与发布订阅的比较:
共同点: 消息生产者生产消息发送到queue中,然后消息消费者从queue中读取并且消费消息。
不同点: 点对点(p2p)模型包括:消息队列(Queue)、发送者(Sender)、接收者(Receiver) 一个生产者生产的消息只有一个消费者(Consumer)(即一旦被消费,消息就不在消息队列中) 发布订阅(pub/Sub)模式包含:消息队列(Queue)、主题(Topic)、发布者(Publisher)、订阅者(Subscriber)
1.4 消息系统使用场景
(1)峰值处理能力:消息系统可顶住峰值流量,业务系统可根据处理能力从消息系统中获取并处理对应量的请求
(2)解耦:各系统之间通过消息系统这个统一的接口交换数据,无须了解彼此的存在
(3)冗余:部分消息系统具有消息持久化能力,可规避消息处理前丢失的风险
(4)可恢复性:系统中部分键失效并不会影响整个系统,它恢复会仍然可从消息系统中获取并处理数据
(5)异步通信:在不需要立即处理请求的场景下,可以将请求放入消息系统,合适的时候再处理
(6)扩展 消息系统是统一的数据接口,各系统可独立扩展
1.5 常见的消息系统
Kafka/Jafka ,Redis ,RabbitMQ ,ZeroMQ ,ActiveMQ 等等
2 Kafka相关概念
2.1 简介
Kafka是分布式的发布—订阅消息系统。它最初由LinkedIn(领英)公司发布,使用Scala语言编写。Kafka是一个高吞吐量的、持久性的、分布式发布订阅消息系统。
三大特点:
①高吞吐量:可以满足每秒百万级别消息的生产和消费——生产消费。
②持久性:有一套完善的消息存储机制,确保数据的高效安全的持久化——中间存储。
③分布式:基于分布式的扩展和容错机制;Kafka的数据都会复制到几台服务器上。当某一台故障失效时,生产者和消费者转而使用其它的机器——整体
2.2 Kafka核心的概念
Topic(主题):Kafka处理的消息的不同分类
Broker(消息服务器代理):Kafka集群中的一个kafka服务节点称为一个broker,主要存储消息数据。存在硬盘中。每个topic都是有分区的。
Partition(Topic物理上的分组 ):一个topic在broker中被分为1个或者多个
Message(消息):通信的基本单位,每个消息都属于一个partition
Producer(生产者):消息和数据的生产者,向Kafka的一个topic发布消息。
Consumer(消费者):消息和数据的消费者,定于topic并处理其发布的消息。
Zookeeper:协调kafka的正常运行。
3 配置
3.1 安装配置
安装包下载:http://archive.apache.org/dist/kafka/1.1.1/kafka_2.12-1.1.1.tgz
解压安装好后,修改Kafka安装目录下的config/server.properties
## 当前kafka实例的id,必须为整数,一个集群中不可重复
broker.id=1
## 生产到kafka中的数据存储的目录,目录需要手动创建
log.dirs=/home/refuel/opt/module/kafka_2.11-2.1.1/data/logs
## kafka数据在zk中的存储目录
zookeeper.connect=bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka
创建目录mkdir -p /home/refuel/opt/module/kafka_2.11-2.1.1/data/logs
启动命令home/refuel/opt/module/kafka_2.11-2.1.1/bin/kafka-server-start.sh -daemon home/refuel/opt/module/kafka_2.11-2.1.1/config/server.properties
3.2 kafka在zookeeper中的目录
/kafka/cluster /id {"version":"1","id":"b9Imb3cBTQmOz6aSq-m_9Q"} ==》代表的是一个kafka集群包含集群的版本,和集群的id/controller {"version":1,"brokerid":2,"timestamp":"1568893911970"} ==》controller是kafka中非常重要的一个角色,控制器,控制partition的leader选举,topic的crud操作。brokerid意为由其id对应的broker承担controller的角色。/controller_epoch 2 代表的是controller的纪元,换句话说是代表controller的更迭,每当controller的brokerid更换一次,controller_epoch就+1./brokers/ids [1, 2, 3] ==》存放当前kafka的broker实例列表/topics [spark, __consumer_offsets] ==》当前kafka中的topic列表/seqid 系统的序列id/consumers ==》老版本用于存储kafka消费者的信息,主要保存对应的offset,新版本中基本不用,此时用户的消费信息,保存在一个系统的topic中:__consumer_offsets/config --->存放配置信息
4 Kafka的基本操作
4.1 topic的操作
topic是kafka非常重要的核心概念,是用来存储各种类型的数据的。关于topic的操作脚本:kafka-topics.sh 。
(1)创建topic
[refuel@bigdata01 kafka]$ bin/kafka-topics.sh --create \
--topic spark\ ## 指定要创建的topic的名称
--zookeeper bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka \ ##指定kafka关联的zk地址
--partitions 3 \ ##指定该topic的分区个数
--replication-factor 3 ##指定副本因子
注意:指定副本因子的时候,不能大于broker实例个数
(2)查看topic的列表
[refuel@bigdata01 kafka]$ bin/kafka-topics.sh --list \
--zookeeper bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka
(3)查看每一个topic的信息
[refuel@bigdata01 kafka]$ bin/kafka-topics.sh --describe --topic spark \
--zookeeper bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka
Partition: 当前topic对应的分区编号
Replicas : 副本因子,当前kafka对应的partition所在的broker实例的broker.id的列表
Leader : 该partition的所有副本中的leader领导者,处理所有kafka该partition读写请求
ISR : 该partition的存活的副本对应的broker实例的broker.id的列表
(4)修改一个topic
[refuel@bigdata01 kafka]$ bin/kafka-topics.sh --alter --topic spark --partitions 4 --zookeeper bigdata01:2181/kafka
注意:partition个数,只能增加,不能减少
(5)删除一个topic
[refuel@bigdata01 kafka]$ bin/kafka-topics.sh --delete --topic spark --partitions 4 --zookeeper bigdata01:2181/kafka
(6)生产数据
[refuel@bigdata01 kafka]$ bin/kafka-console-producer.sh \
--topic spark \ -->指定数据被生产的topic
--broker-list bigdata01:9092,bigdata02:9092,bigdata03:9092 --->指定kafka的broker列表
(7)消费数据
[refuel@bigdata03 kafka]$ bin/kafka-console-consumer.sh \
--topic spark \
--bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092
没数据,原因在于消费者后于生产者启动,在消费者启动之前生产者消费的数据变不能直接获取。如果想要获取消费者启动之前生产者生产的数据,可以添加一个参数--from-beginning。
4.2 API编程
(1)kafka生产者的api操作
public static void main(String[] args) throws IOException {Properties properties = new Properties();properties.load(MyKafkaProducer.class.getClassLoader().getResourceAsStream("producer.properties"));Producer<Integer, String> producer = new KafkaProducer<>(properties);//发送数据ProducerRecord<Integer, String> record = new ProducerRecord("spark", "product");producer.send(record);producer.close();}
producer.properties配置如下
bootstrap.servers=bigdata01:9092,bigdata02:9092,bigdata03:9092
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
K: --->代表的是向topic中发送的每一条消息的key的类型,key可以为null
V: --->代表的是向topic中发送的每一条消息的value的类型
创建producer时指定的配置信息如下
bootstrap.servers=bigdata01:9092,bigdata02:9092,bigdata03:9092 ==》kafka的服务器
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer ==》Key的序列化器
value.serializer=org.apache.kafka.common.serialization.StringSerializer ==》value的序列化器
acks=[0|-1|1|all] ==》消息确认机制0: 不做确认,直管发送消息即可-1|all: 不仅leader需要将数据写入本地磁盘,并确认,还需要同步的等待其它followers进行确认1:只需要leader进行消息确认即可,后期follower可以从leader进行同步
batch.size=1024 ==》每个分区内的用户缓存未发送record记录的空间大小。如果缓存区中的数据,没有沾满,也就是任然有未用的空间,那么也会将请求发送出去,为了较少请求次数,我们可以配置linger.ms大于0,
linger.ms=10 ==》不管缓冲区是否被占满,延迟10ms发送request
buffer.memory=10240 ==》控制的是一个producer中的所有的缓存空间
retries=0 ==》发送消息失败之后的重试次数
(2)kafka消费者的api操作
public static void main(String[] args) throws Exception {//消费者Properties properties = new Properties();properties.load(MyKafkaConsumer.class.getClassLoader().getResourceAsStream("consumer.properties"));Consumer<Integer, String> consumer = new KafkaConsumer<>(properties);//订阅topicconsumer.subscribe(Arrays.asList("spark"));//从kafka对应的topic中拉取数据while (true) {ConsumerRecords<Integer, String> consumerRecords = consumer.poll(1000);for (ConsumerRecord<Integer, String> record : consumerRecords) {Integer key = record.key();String value = record.value();int partition = record.partition();long offset = record.offset();String topic = record.topic();System.out.println(String.format("topic:%s\tpartition:%d\toffset:%d\tkey:%d\tvalue:%s",topic,partition,offset,key,value));}}}
consumer.properties
bootstrap.servers=bigdata01:9092,bigdata02:9092,bigdata03:9092
group.id=group01
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
5 Kafka的数据消费
kafka消费者在消费数据的时候,都是分组别的。不同组的消费不受影响,相同组内的消费,需要注意,如果partition有3个,消费者有3个,那么便是每一个消费者消费其中一个partition对应的数据;如果有2个消费者,此时一个消费者消费其中一个partition数据,另一个消费者消费2个partition的数据。如果有超过3个的消费者,同一时间只能最多有3个消费者能消费得到数据。
bin/kafka-console-consumer.sh --topic spark \--bootstrap-server bigdata01:9092,bigdata02:9092,bigdata03:9092 \--group haha \ #消费者对应的消费者组--offset earliest \ #从什么位置(消息的偏移量)开始消费--partition 2 #消费哪一个分区中的数据
offset:是kafka的topic中的partition中的每一条消息的标识,如何区分该条消息在kafka对应的partition的位置,就是用该偏移量。offset的数据类型是Long,8个字节长度。offset在分区内是有序的,分区间是不一定有序。如果想要kafka中的数据全局有序,就只能让partition个数为1。
在组内,kafka的topic的partition个数,代表了kafka的topic的并行度,同一时间最多可以有多个线程来消费topic的数据,所以如果要想提高kafka的topic的消费能力,应该增大partition的个数。
6 record进入分区的策略
每一条producerRecord有,topic名称、可选的partition分区编号,以及一对可选的key和value组成。
有三种策略进入分区:①如果指定的partition,那么直接进入该partition;②如果没有指定partition,但是指定了key,使用key的hash选择partition③如果既没有指定partition,也没有指定key,使用轮询的方式进入partition。
6.1 自定义分区
自定义分区的核心类
public interface Configurable {/*** Configure this class with the given key-value pairs 指定当前producer的配置信息*/void configure(Map<String, ?> configs);}public interface Partitioner extends Configurable, Closeable {/*** Compute the partition for the given record.计算给定记录的分区* @param topic The topic name 主题名字* @param key The key to partition on (or null if no key)* @param keyBytes key序列之后的字节数组的形式* @param value The value to partition on or null* @param valueBytes value序列之后的字节数组的形式* @param cluster The current cluster metadata 当前cluster的元数据信息*/public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);/*** This is called when partitioner is closed.分区结束之后被调用*/public void close();
}
(1)随机分区
public class RandomPartitioner implements Partitioner {private Random random = new Random();public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {//返回当前topic的partition个数Integer partitionCount = cluster.partitionCountForTopic(topic);int partition = random.nextInt(partitionCount);System.out.println("partition: " + partition);return partition;}public void configure(Map<String, ?> configs) {}public void close() {}}
注册使用 :partitioner.class=com.refuel.bigdata.kafka.partitioner.RandomPartitioner
(2)hash分区
public class HashPartitioner implements Partitioner {public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {Integer partCount = cluster.partitionCountForTopic(topic);int partition = Math.abs(key.hashCode()) % partCount;System.out.println("key: " + key + "partition: " + partition);return partition;}public void configure(Map<String, ?> configs) {}public void close() {}}
注册使用:partitioner.class=com.refuel.bigdata.kafka.partitioner.HashPartitioner
(3)轮询分区
public class RoundRobinPartitioner implements Partitioner { //定义一个原子计数器private AtomicInteger count = new AtomicInteger(); public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {int parCount = cluster.partitionCountForTopic(topic);int partition = count.getAndIncrement() % parCount;System.out.println("key: " + key + "\tpartition: " + partition);return partition;}public void configure(Map<String, ?> configs) {}public void close() {}
}
注册使用:partitioner.class=com.refuel.bigdata.kafka.partitioner.RoundRobinPartitioner
7 Kafka架构
(1)Topic
Kafka处理的消息的不同分类的逻辑概念,同一个Topic的消息可分布在一个或多个节点(Broker)上 一个Topic包含一个或者多个Partition 每条信息都属于且仅属于一个Topic Producer发布数据是,必须制定该消息发布到哪一个Topic,Consumer订阅消息时,也必须制定订阅哪个Topic的消息。
(2)Partition
物理概念,一个Partition只分布在一个Broker上(不考虑备份) 一个partition物理上对应一个文件夹 一个Partition包含多个Segment(Segment对用户透明) 一个Segment对应一个文件,Segment由一个个不可变记录组成 记录只会被append到Segment中,不会被单独删除或者修改 清除过期日志时,直接删除一个或多个Segment segment文件(log文件)文件名规范: 这个文件里面第一条消息的offset - 1
(3)broker
Kafka中的broker对于调用者而言都是透明的,也就是说各个broker的地位都是一样的,但是在kafka内部有区分,主要就是controller和非controller之分,controller的角色我们可以在zookeeper的对应目录/kafka/controller中获取对应的brokerid。
(4)offset
在kafka1.0以下的版本中使用zk来保存kafka消费者的offset(目录为/kafka/consumers/**),但是在kafka1.0以上,不再使用zookeeper来保存,主要原因在于,避免zookeeper负载过高,造成相关联的框架无法使用,此时在kafka提供了一个系统级别的topic:__consumer_offsets来报错偏移量信息。
7.1 消息flush和Retention策略
(1)flush策略:为了提供kafka的读写数据能力,先将接收数据到kafka内存,不可能无限制的保存在内存,所以必然会将数据flush到磁盘(partition的segement)文件,在flush的时候做了Durability(持久)和Latency(延迟)和Throughput(吞吐量)的权衡与取舍。
配置文件:
# The number of messages to accept before forcing a flush of data to disk
## 每当每一个topic接收到10000条message的时候,就会将数据flush到磁盘
log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#每隔1s flush一次数据
log.flush.interval.ms=1000
(2)retention策略 :partition对应的文件,就保存在一个个的segment文件中,每一个文件默认大小是1G,但是log.retention.check.interval.ms监测频率是5分钟一次,所以segment文件可能会超过1G,此时就会启动retion策略,将文件裁剪到log.retention.bytes配置,如果超过了log.segment.bytes=1G配置,将会创建一个新的segment文件;默认情况,segment文件会保留7天。
配置文件:
# The minimum age of a log file to be eligible for deletion due to age
# 日志最小的保留时间:7天,超过这个时间,数据可能会被清理掉
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned(裁剪) from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
## segement文件如果超过log.retention.bytes的配置,将会可能被裁剪,直到小于log.retention.bytes配置
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
# 一个segment文件最大的大小,超过log.segment.bytes一个G,将会创建一个新的segment文件
log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
## 每隔5分钟,检测一次retention策略是否达到
log.retention.check.interval.ms=300000
7.2 Kafka消息检索原理
(1)message的物理结构
结构 | |
8 bytes offset | 表示该消息在partition的第几条消息 |
4 bytes message size | 消息的大小 |
1 bytes magic size | kafka程序服务协议号 |
4 bytes src | crc32校验 |
其他 | 压缩、key,编码等 |
payload | 实际消息的数据 |
(2)partition分区目录说明:
.index文件和.log文件:
partition分区目录下的文件列表,主要包含两种类型的文件 x.index索引文件和x.log segment文件,其中x.log保存的是message信息,x.index保存的是索引数据。
①x.index中保存的内容:
a. index文件的序号就是message在日志文件中的相对偏移量
b. OffsetIndex是稀疏索引,也就是说不会存储所有的消息的相对offset和position。也就是说index文件的序号对应的是log文件中的消息偏移量;index文件中的地址栏对应的是log文件中文件中的偏移字节。
因为一个partition下面有多个segment文件,segment文件当达到retention策略之后将会被裁剪或删除,同时partition中的offset是单调递增的,从0开始增加,但是segment文件中的消息在该文件中的偏移量指的是文件开头到该文件走过的字节长度,所以消息offset和文件中的偏移量不一样。
直接根据message的offset是无法直接读取到消息的,需要x.index中保存的相对偏移量来帮忙了。
(3)通过命令查看segment文件内容
kafka-run-class.sh kafka.tools.DumpLogSegments \
--print-data-log \ --->打印读取到的segment日志文件内容
--files 00000000000000000000.log --->指定读取的segment日志文件
其中的offset是该条message在该partition中的偏移量,position为该条消息在该文件中的字节偏移量。
(4)消息检索
定位到具体的segment日志文件 由于log日志文件的文件名是这个文件中第一条消息的(offset-1). 因此可以根据offset定位到这个消息所在日志文件
(5)消息检索过程
①定位到具体的segment日志文件。由于log日志文件的文件名是这个文件中第一条消息的(offset-1). 因此可以根据offset定位到这个消息所在日志文件。
②计算查找的offset在日志文件的相对偏移量
segment文件中第一条消息的offset是已知的 。计算message相对偏移量:需要定位的offset - segment文件中第一条消息
③查找index索引文件, 可以定位到该消息在日志文件中的偏移字节为a. 综上, 直接读取文件夹.log中偏移a字节的数据即可。
如果查找的offset在日志文件的相对偏移量在index索引文件不存在, 可根据其在index索引文件最接近的上限偏移量,往下顺序查找。
8 flume和kafka的整合
flume主要是做日志数据(离线或实时)地采集。
(1)创建整合的topic
bin/kafka-topics.sh --create --topic flume-kafka --zookeeper bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka --partitions 3 --replication-factor 3
Created topic "flume-kafka"
(2)编写flume-agent配置文件
flume-kafka-sink.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = bigdata01
a1.sources.r1.port = 44444# 修改sink为kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sinks.k1.kafka.topic = flume-kafka
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1