- 生产者通过send()方法发送消息
- 消息会经过拦截器->序列化器->分区器 进行加工
- 然后将消息存在缓冲区
- 当缓冲区中消息达到条件会按批次发送到broker对应分区上
- broker将接收到的消息进行刷盘持久化
- 消息处理broker会返回给producer响应
- 落盘成功
- 返回元数据信息 - > 生产者继续发送后面消息
- 落盘失败 - 生产者设置了重试次数
- 生产者去缓冲区重试发送
Tip:
-
Producer 创建时,会创建一个Sender线程设置为守护线程
-
Producer 创建时,会创建缓冲区
-
Producer 生产消息,内部是一个异步流程
-
RecordAccumulator(缓冲区) 对每一个分区都有一个缓冲区
- 每个分区的缓冲区中消息也是有序的
- 可以指定缓冲区中的消息按
批次
发送- 缓冲区大小达到
batch.size
lingger.ms
达到上限- 以上两个条件满足一个即发送一批
- 缓冲区大小达到
- 可以指定整个缓冲区的大小
批次的概念很好理解,缓冲区就像一辆公交车,有两种发车方式,一是人满了就发车,一是等5分钟就发车,不管是人满了还是到5分钟了,发车,go~
- 一个批次消息发送后,通过网络,发往Kafka指定分区,然后刷盘到broker
- 如果Producer设置了
retries
参数值>0,那么允许消息发送失败进行重试,重试机制由客户端Producer内部实现 - Broker端消息落盘成功,会返回元数据给生产者
- 通过阻塞直接返回 (同步发送)
- 通过回调函数返回(异步发送)