在分布式环境中,故障是很常见的情况,可以随时发生。 在Kafka环境中,代理可能会崩溃,网络故障,处理故障,发布消息时失败或无法使用消息等。这些不同的场景引入了不同类型的数据丢失和重复。
失败场景
A(确认失败):生产者成功发布了消息,重试> 1,但由于失败而未收到确认。 在这种情况下,生产者将重试相同的消息,可能会引入重复消息。
B(生产者进程在批处理消息中失败):生产者发送了一批失败的消息,但发布的成功很少。 在这种情况下,一旦生产者重新启动,它将再次批量重新发布所有消息,这将在Kafka中引入重复消息。
C(触发并忘记失败)生产者发布的消息,重试= 0(触发并忘记)。 如果失败,发布的消息将不知道并发送下一条消息,这将导致消息丢失。
D(批处理消息中的消费者失败)消费者从Kafka接收到一批消息,并手动提交其偏移量(enable.auto.commit = false)。 如果消费者在提交给Kafka之前失败,则下次消费者将再次使用相同的记录,这些记录将在消费者端复制副本。
精确一次语义
在这种情况下,即使生产者尝试重新发送消息,它也导致消息将被消费者发布和消费一次。
为了在Kafka中实现Exactly-Once语义,它使用以下3个属性
- enable.idempotence = true(地址a,b和c)
- MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 5(生产者每次连接总是有一个飞行中请求)
- isolated.level = read_committed(地址d)
启用幂等(enable.idempotence = true)
幂等传递使生产者可以在单个生产者的生命周期内,将消息仅一次写入Kafka到主题的特定分区,而不会造成数据丢失和每个分区的订单。
“请注意,启用幂等性要求MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION小于或等于5,RETRIES_CONFIG大于0且ACKS_CONFIG为'all'。 如果用户未明确设置这些值,则将选择合适的值。 如果设置了不兼容的值,将抛出ConfigException”
为了实现幂等性,Kafka在生成消息时使用唯一的ID(称为产品ID或PID和序列号)。 生产者在发布的每个消息上保持递增的序列号,这些消息具有唯一的PID。 代理始终将当前序列号与前一个序列号进行比较,如果新序列号不比上一个序列号大+1,则它会拒绝,这会避免重复;如果消息中丢失了更大的序列号,则会拒绝同时显示
在失败的情况下,代理将序列号与先前的序列号进行比较,如果序列不增加,+ 1将拒绝该消息。
交易(隔离级别)
事务使我们能够自动更新多个主题分区中的数据。 事务中包含的所有记录都将被成功保存,或者没有保存成功,它允许您将同一个事务中的消费者补偿与已处理的数据一起提交,从而允许端到端的一次精确语义。
生产者不等待将消息写到kafka上,生产者使用beginTransaction,commitTransaction和abortTransaction(如果发生故障)消费者使用isolate.level级别,无论是read_committed还是read_uncommitted
- read_committed:使用者将始终仅读取已提交的数据。
- read_uncommitted:以偏移顺序读取所有消息,而无需等待事务提交
如果具有Isolation.level = read_committed的使用者到达尚未完成的事务的控制消息,它将不会再从该分区传递任何消息,直到生产者提交或中止该事务或发生事务超时。 事务超时由生产者使用配置transaction.timeout.ms(默认为1分钟)确定。
生产者和消费者中的确切时间
在正常情况下,生产者和消费者是分开的。 生产者必须具有幂等性并同时管理事务,以便消费者可以使用isolation.level读取read_committed以使整个过程成为原子操作。 这样可以确保生产者将始终与源系统同步。 即使生产者崩溃或事务中止,它也始终是一致的,并且一次将消息或一批消息发布为一个单元。
同一用户一次将收到消息或一批消息。
在Exactly-Once中,语义生产者与消费者一起将作为原子操作出现,它将作为一个单元进行操作。 要么发布一次就被消耗掉,要么中止。
在Kafka Stream中恰好一次
Kafka Stream消耗来自主题A的消息,处理消息并将其发布到主题B,并在发布后使用commit(commit主要在后台运行)将所有状态存储数据刷新到磁盘。
Kafka Stream中的“一次”是“读取-处理-写入”模式,可确保将这些操作视为原子操作。 由于Kafka Stream可以满足生产者,消费者和交易的需求,因此Kafka Stream带有特殊的参数processing.guarantee,它可以完全地_once或at_least_once使得不单独处理所有参数变得容易。
Kafka Streams原子地更新使用者偏移量,本地状态存储,状态存储changelog主题和生产以一起输出所有主题。 如果这些步骤中的任何一个失败,则所有更改都将回滚。
processing.guarantee:确切地提供一次以下参数,您无需明确设置
- isolated.level = read_committed
- enable.idempotence = true
- MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 5
翻译自: https://www.javacodegeeks.com/2020/05/kafka-exactly-once-semantics.html