Kafka(五)消费者回调 +定时重试 + 理解Rebalance

文章目录

  • 消费者回调
    • 如何抽象callBack消息?
      • 为什么要设置serverId?
      • 如何消费callBack消息?
  • 定时重试
    • 消息失败表的设计
    • 重试逻辑设计
  • 理解Rabalance
    • 通过日志来理解rebalance
  • 参考资料
  • 结语
  • 示例源码仓库

消费者回调

有些邮件发送成功之后,需要执行后续逻辑,例如更新数据库等。那么我们这时需要将Message Server变成生产者, 向Kafak中投递callBack消息;Business Server 此时是消费者, 消费callBack消息。

如何抽象callBack消息?

callBack的逻辑根据业务场景相关,如何在保证满足不同callBack业务逻辑的同时还满足callBack消息格式的统一呢? 我们使用反射来实现这一目的

@JsonDeserialize(builder = CallbackMetaData.CallbackMetaDataBuilder.class)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Getter
@Builder
@ToString
public class CallbackMetaData implements Serializable {@JsonProperty("id")private String messageId;/*** 应该是hostName, InetAddress.getLocalHost().getHostName()*/@JsonProperty("serverId")private String serverId;@JsonProperty("className")private String className;/*** this string is the json string of the instance of the class, generated by Jackson.* for example:* className instance = new className();* objectMapper.writeValueAsString(instance);*/@JsonProperty("instanceJsonStr")private String instanceJsonStr;@JsonProperty("methodName")private String methodName;@JsonProperty("arguments")private Object[] arguments;}

上述内容,会作为邮件消息的一部分,发送给Message Server. 当Message Server发送完邮件之后, 会检查是否包含callback消息,如果包含,则将CallbackMetaData发送到相关topic

为什么要设置serverId?

有两点原因:

  1. 代码是滚动部署的,为了兼容性必须是:回调消息是由哪台Business Server携带的,就该回调到哪台Business Server。
  2. 一开始我们的思路是使用同一个topic,所有Business Server都订阅该topic,并且使用不同的消费者组,以达到广播消息的目的。这样在消费消息时通过判断当前消息的serverId是否当前server,如果是则消费,如果不是则直接提交offset。但是后来我们发现这样设计,会使得所有服务器每时每刻都在消费消息,即使该消息不是当前服务器的。改进后的设计是:每个服务器都有自己的callback topic, 只消费自己的callback topic下的消息即可。 callback topic的名字是:callback-serverId

serverId使用机器的hostName而不是IP, 因为IP有可能会变。

如何消费callBack消息?

其他消费者相关的代码我不再赘述,请参考上一篇博文的详细内容, 执行消费逻辑的代码就3行

ConsumerRecords<String, CallbackMetaData> records = consumer.poll(Duration.ofSeconds(10));
records.forEach(each -> {Class<?> destClass;try {//        核心消费代码, 通过反射调用目标方法destClass = Class.forName(each.value().getClassName());Object instance = objectMapper.readValue(each.value().getInstanceJsonStr(), destClass);MethodUtils.invokeMethod(instance, true, each.value().getMethodName(), each.value().getArguments());} catch (Exception e) {e.printStackTrace();}
});

定时重试

前两篇博文提到,无论生产者还是消费者,最终重试N次之后依旧失败的我们会把消息存储到数据库,以便后期通过定时任务进行重试。为了减轻业务服务器的负担,所有失败消息的重试都由Message Server负责。

消息失败表的设计

@Getter
@Setter
@ToString
@EqualsAndHashCode(of = {"messageId", "failedPhrases"})
public class MessageFailedEntity {/*** 主键*/private Long id;/*** 消息id*/private String messageId;/*** JSON格式的消息内容*/private String messageContentJsonFormat;/*** 消息类型* EMAIL 表示此消息为邮件* EMAIL_CALLBACK 表示此消息为邮件回调**/private MessageType messageType;/*** 消息失败的阶段:* PRODUCER 表示在生产者发送消息的时候失败* CONSUMER 表示在消费者消费消息的时候失败*/private MessageFailedPhrase messageFailedPhrase;/*** 失败时的异常堆栈信息*/private String failedReason;/*** 消息重试次数*/private Integer retryCount;/*** 消息重试状态* 0 表示重试失败* 1 表示重试成功*/private Integer retryStatus;/*** 时间戳*/private LocalDateTime lastUpdateTime;}

重试逻辑设计

重试的思路很简单:

  1. 从数据库查询消息失败表获得一批记录,每次可能100条或者10000条,根据实际场景自己确定
  2. 根据消息类型和消息的JSON格式,序列化为对应类的对象,调用不同的生产者发送消息到Kafka
  3. 如果该消息失败阶段是PRODUCER,那么重试成功之后,则更新该记录未重试成功
  4. 如果该消息失败阶段是CONSUMER,那么重试成功之后,则只更新重试次数,由对应的消费者去更新是否重试成功。因为CONSUMER只有消费成功才算重试成功。
  5. 设置最大重试次数,如果超过最大重试次数,则不再进行重试
  6. 如果是部署了多个Message Server,那么执行定时重试任务时,可以使用分布式锁以确保只有同一时刻只有一个Message Server在执行任务,这样做的目的主要是防止为了多个任务同时进行时,从数据库中查询的记录是同一批,当然也可以在表中增加一个标志位来区分该记录是否在重试中来达到相同的目的,根据实际情况选择即可

到此为止,结合前两篇博文,我们处理了在整个消息系统中可能出现所有的异常情况。

理解Rabalance

Kafka权威指南 > 第四章 第一节

Moving partition ownership from one consumer to another is called a rebalance.

一开始接触rebalance时候,我在思考一个问题,如果我的消费者还在消费消息中, 此时Kafka要进行rebalance,这对我的消费者业务逻辑有什么影响?会不会我还在消费中,然后被打断,如果是这样的话,那对我的消费业务逻辑的幂等性来说增加了不少挑战。

带着这些疑问,我搜索了一些资料,在confluent官网上发现了一篇博客,详细讲了rebalance过程, 文章链接

以下内容来自于上述文章链接

  1. Suppose we have an existing consumer group with a set assignment of topic-partitions to consumers. This consumer group consists of a number of consumers, each with a member id as well as a group leader (usually the consumer that was first to join the group). A new consumer comes along and requests to join the consumer group by sending a request of JoinGroup to the Group Coordinator along with the topics it would like to subscribe to.
  2. The Group Coordinator kicks off the rebalance by telling all current members to issue their own JoinGroup requests. This is done as part of the response to the heartbeat that consumers send to the Group Coordinator to tell it they’re still alive and well.
  3. Each consumer in the group has max.poll.interval.ms to wrap up their current processing and send their JoinGroup request, at which point the world is stopped. With all of the JoinGroup requests, the Group Coordinator knows all of the consumers in the group and which topics should be part of the consumer group. It sends JoinResponses to the members, chooses a leader from amongst the members, and leaves the leader to compute the partition assignments.
  4. All group members respond with a SyncGroup request. The group leader sends its partition assignments along with its request.
  5. At this point, the Group Coordinator can send its SyncResponse to each consumer confirming their assigned topic-partitions.
  6. Finally, consumers acknowledge their assignments and processing can resume. The world is no longer stopped

第二处高亮的地方,解决了我的疑问,在rebalance之前,会等待每个消费者把自己的消费逻辑处理完。

通过日志来理解rebalance

下面的日志是我本地的一次rebalance期间的日志,可以对照上述步骤加深理解

2023-11-08T02:23:04.180-0500 kafka-coordinator-heartbeat-thread | low-priority-email-group org.apache.kafka.clients.consumer.internals.ConsumerCoordinator WARN: [Consumer instanceId=2, clientId=consumer-low-priority-email-group-2, groupId=low-priority-email-group] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2023-11-08T02:23:04.180-0500 kafka-coordinator-heartbeat-thread | low-priority-email-group org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=2, clientId=consumer-low-priority-email-group-2, groupId=low-priority-email-group] Resetting generation and member id due to: consumer pro-actively leaving the group
2023-11-08T02:23:04.180-0500 kafka-coordinator-heartbeat-thread | low-priority-email-group org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=2, clientId=consumer-low-priority-email-group-2, groupId=low-priority-email-group] Request joining group due to: consumer pro-actively leaving the group2023-11-08T02:32:21.607-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.620-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-4
2023-11-08T02:32:21.621-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.712-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.712-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-9
2023-11-08T02:32:21.712-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.723-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.723-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-8
2023-11-08T02:32:21.723-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.739-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.739-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-1
2023-11-08T02:32:21.739-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.741-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.741-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-5
2023-11-08T02:32:21.741-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.745-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.745-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-6
2023-11-08T02:32:21.745-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.752-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.753-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-3
2023-11-08T02:32:21.753-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.903-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.903-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-0
2023-11-08T02:32:21.903-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.919-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Request joining group due to: group is already rebalancing
2023-11-08T02:32:21.919-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Revoke previously assigned partitions low-priority-email-7
2023-11-08T02:32:21.919-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] (Re-)joining group
2023-11-08T02:32:21.920-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='1-f1f4f621-3bde-4e02-9621-a706320300ae', protocol='range'}
2023-11-08T02:32:21.920-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='10-cd494fa6-1bf4-4dd8-ae28-a494b21ad247', protocol='range'}
2023-11-08T02:32:21.920-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='7-e77860ca-7059-4a38-bfc9-7db3cc862a38', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='3-ec05c279-8059-4f03-b804-3da299f93b88', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='5-d08406a6-2bd1-4bed-aed9-5f65d7f75260', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='9-bab69ccd-a0b1-4f94-99d5-869fce905f3e', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='6-67b38de0-90e0-4f53-aaae-49c10a91a463', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='4-b24fc609-920a-4cc3-b507-2a4a1f1a568b', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Successfully joined group with generation Generation{generationId=12, memberId='8-70cd2b5d-e17f-4346-bfcb-b170e766db39', protocol='range'}
2023-11-08T02:32:21.921-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Finished assignment for group at generation 12: {10-cd494fa6-1bf4-4dd8-ae28-a494b21ad247=Assignment(partitions=[low-priority-email-2]), 7-e77860ca-7059-4a38-bfc9-7db3cc862a38=Assignment(partitions=[low-priority-email-7]), 3-ec05c279-8059-4f03-b804-3da299f93b88=Assignment(partitions=[low-priority-email-3]), 6-67b38de0-90e0-4f53-aaae-49c10a91a463=Assignment(partitions=[low-priority-email-6]), 8-70cd2b5d-e17f-4346-bfcb-b170e766db39=Assignment(partitions=[low-priority-email-8]), 1-f1f4f621-3bde-4e02-9621-a706320300ae=Assignment(partitions=[low-priority-email-0, low-priority-email-1]), 4-b24fc609-920a-4cc3-b507-2a4a1f1a568b=Assignment(partitions=[low-priority-email-4]), 5-d08406a6-2bd1-4bed-aed9-5f65d7f75260=Assignment(partitions=[low-priority-email-5]), 9-bab69ccd-a0b1-4f94-99d5-869fce905f3e=Assignment(partitions=[low-priority-email-9])}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='10-cd494fa6-1bf4-4dd8-ae28-a494b21ad247', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='1-f1f4f621-3bde-4e02-9621-a706320300ae', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-0, low-priority-email-1])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-2])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-2
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-0, low-priority-email-1
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='3-ec05c279-8059-4f03-b804-3da299f93b88', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='5-d08406a6-2bd1-4bed-aed9-5f65d7f75260', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-3])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-3
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='7-e77860ca-7059-4a38-bfc9-7db3cc862a38', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-5])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='8-70cd2b5d-e17f-4346-bfcb-b170e766db39', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-7])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='4-b24fc609-920a-4cc3-b507-2a4a1f1a568b', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-7
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-5
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-8])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='9-bab69ccd-a0b1-4f94-99d5-869fce905f3e', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Successfully synced group in generation Generation{generationId=12, memberId='6-67b38de0-90e0-4f53-aaae-49c10a91a463', protocol='range'}
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-4])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-8
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-9])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-4
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Notifying assignor about the new Assignment(partitions=[low-priority-email-6])
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-9
2023-11-08T02:32:21.922-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Adding newly assigned partitions: low-priority-email-6
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-9 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=10, clientId=consumer-low-priority-email-group-10, groupId=low-priority-email-group] Setting offset for partition low-priority-email-2 to the committed offset FetchPosition{offset=13436, offsetEpoch=Optional[8], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=8}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Setting offset for partition low-priority-email-1 to the committed offset FetchPosition{offset=13301, offsetEpoch=Optional[8], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=8}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-0 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=1, clientId=consumer-low-priority-email-group-1, groupId=low-priority-email-group] Setting offset for partition low-priority-email-0 to the committed offset FetchPosition{offset=24087, offsetEpoch=Optional[8], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=8}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-2 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=3, clientId=consumer-low-priority-email-group-3, groupId=low-priority-email-group] Setting offset for partition low-priority-email-3 to the committed offset FetchPosition{offset=13299, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-3 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=4, clientId=consumer-low-priority-email-group-4, groupId=low-priority-email-group] Setting offset for partition low-priority-email-4 to the committed offset FetchPosition{offset=13352, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-7 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=8, clientId=consumer-low-priority-email-group-8, groupId=low-priority-email-group] Setting offset for partition low-priority-email-8 to the committed offset FetchPosition{offset=13190, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-8 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=9, clientId=consumer-low-priority-email-group-9, groupId=low-priority-email-group] Setting offset for partition low-priority-email-9 to the committed offset FetchPosition{offset=13151, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-5 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=6, clientId=consumer-low-priority-email-group-6, groupId=low-priority-email-group] Setting offset for partition low-priority-email-6 to the committed offset FetchPosition{offset=13211, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-6 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=7, clientId=consumer-low-priority-email-group-7, groupId=low-priority-email-group] Setting offset for partition low-priority-email-7 to the committed offset FetchPosition{offset=13303, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}
2023-11-08T02:32:21.923-0500 consumer-low-priority-email-pool-4 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO: [Consumer instanceId=5, clientId=consumer-low-priority-email-group-5, groupId=low-priority-email-group] Setting offset for partition low-priority-email-5 to the committed offset FetchPosition{offset=13338, offsetEpoch=Optional[2], currentLeader=LeaderAndEpoch{leader=Optional[192.168.100.60:9093 (id: 1 rack: null)], epoch=2}}

参考资料

  • confluent解释rebalance的文章
  • infoq上关于rebalance的文章

结语

至此为止,利用Kafka实现一个消息系统就基本完成了,所有关键的代码都在不同的博文中并进行了详细说明,说过想要体会完整的设计、实现思路,请移步源码仓库获取完整代码。

下一篇关于Kafak的博文打算分享一下如何利用Kafka Connect将Oracle数据库的数据同步到Postgre SQL中。

示例源码仓库

  • Github地址

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/168429.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Linux】fork()

文章目录 一、fork()是什么&#xff1f;二、fork()干了什么&#xff1f;三、fork()怎么用&#xff1f; 一、fork()是什么&#xff1f; fork()函数其实是在Linux系统中用于创建一个新的进程。让我们看看Linux中是怎么描述的&#xff1f;运行man fork。 RETURN VALUE On success…

php站点伪静态配置(Apache+Linux)

404报错&#xff1a; 404 Not Found nginx/1.15.11 问题解决&#xff1a; 1、Linux location / { if (!-e $request_filename) { rewrite ^(.*)$ /index.php?s/$1 last; } } 2、Apache <IfModule mod_rewrite.c> RewriteEngine on RewriteBase / RewriteCond %{REQU…

英特尔和 ARM 将合作开发移动芯片技术,如何看待双方合作?

英特尔和 ARM 将合作开发移动芯片技术&#xff0c;如何看待双方合作&#xff1f; 最近市场传出Arm要自产芯片&#xff0c;供智能手机与笔电等使用后&#xff0c;外媒指Arm自产芯片将由英特尔晶圆代工部门打造&#xff0c;变成英特尔晶圆代工客户。将采用英特尔18A工艺&#xff…

利用Nginx与php处理方式不同绕过Nginx_host实现SQL注入

目录 首先需要搭建环境 nginxphpmysql环境&#xff1a; 搭建网站 FILTER_VALIDATE_EMAIL 绕过 方法1&#xff1a;冒号号分割host字段 方法2&#xff1a;冒号号分割host字段 方法3&#xff1a;SNI扩展绕过 首先需要搭建环境 nginxphpmysql环境&#xff1a; php安装包&a…

深入了解Spring Cloud中的分布式事务解决方案

引言 介绍分布式系统中事务管理的重要性&#xff0c;以及在云计算环境下分布式事务所面临的挑战。 传统事务和分布式事务 解释本地事务与分布式事务的区别&#xff0c;以及为什么在分布式环境中需要特殊的事务管理机制。 分布式事务的挑战 探讨在分布式系统中实现事务一致性所…

vite和webpack的区别和练习

Vite和Webpack都是现代化的前端构建工具&#xff0c;但它们之间存在一些区别&#xff1a; 构建性能&#xff1a;Vite使用ES Modules提高了构建性能&#xff0c;可以在构建时只构建需要的部分&#xff0c;而Webpack则需要在构建时处理整个应用程序。 开发体验&#xff1a;Vite具…

vue一个页面左边是el-table表格 当点击每条数据时可以在右边界面编辑表格参数,右边保存更新左边表格数据

实现思路&#xff1a; 1.点击当前行通过row拿到当前行数据。 2.将当前行数据传给子组件。 3.子组件监听父组件传过来的数据并映射在界面。 4.点击保存将修改的值传给父组件更新表格。 5.父组件收到修改过后的值&#xff0c;可以通过字段判断比如id&#xff0c;通过 findIn…

VR Interaction Framework2.0使用

1 按键 &#xff0c;比如按压下手柄的B键 if (InputBridge.Instance.BButtonDown){print("kkkkkkbbbbb456");} 2抓取某个物体&#xff0c;那么就在要抓取的那个物体上加一些组件&#xff0c;特别是Grabble Unity Events

cocos2dx DrawNode

cocos2dx 两种绘图方式 DrawPrimitivesDrawNode DrawPrimitives 3.x 已经弃用 绘制的图形可以是实心的&#xff0c;也可以是空心的。 DrawNode 在一个单独的批处理中绘制了所以元素&#xff0c;因此它绘制点、线段、多边形都要比“drawing primitives”快。 绘制的图形都…

【基础知识】AB软件RSLinx如何实现OPC通讯组态

哈喽&#xff0c;大家好&#xff0c;我是雷工。 在上一节了解了什么是RSLinx&#xff1f;以及RSLinx Lite、RSLinx Classice、RSLinx Professional、RSLinx Gateway几个版本的特点。 本节了解AB的RSLinx如何实现OPC组态。 一、创建RSLinx通讯&#xff1a; 1.1、【Communicati…

excel自己记录

1、清除换行符号 2、添加特殊符号&并清除换行符号 7日&15日&30日&60日 3、判断单元格最后一个字符是不是数字&#xff0c;不是就删掉 IF(ISNUMBER(--RIGHT(B2,1)),B2,SUBSTITUTE(B2,RIGHT(B2,1),"")) ISNUMBER(--RIGHT(B2,1))判断最右边的一个数是否…

Wireshark的捕获过滤器

Wireshark的过滤器&#xff0c;顾名思义&#xff0c;作用是对数据包进行过滤处理。具体过滤器包括捕获过滤器和显示过滤器。本文对捕获过滤器进行分析。 捕获过滤器&#xff1a;当进行数据包捕获时&#xff0c;只有那些满足给定的包含/排除表达式的数据包会被捕获。 捕获过滤器…

一起学docker系列之九docker运行mysql 碰到的各种坑及解决方法

目录 前言1 Docker 运行mysql命令2 坑一&#xff1a;无法读取/etc/mysql/conf.d目录的问题3 坑二&#xff1a;/tmp/ibnr0mis 文件无法创建/写入的问题4 坑三&#xff1a;Navicat 连接错误&#xff08;1045-access denied&#xff09;5 坑四&#xff1a;MySQL 登录失败问题结语 …

ros2文件package.xml与cmakelists.txt比较

每次在ros2里面添加文件以后&#xff0c;都要修改packages.xml,与cmakelists.txt文件。

Vue服务端渲染——同构渲染

Vue.js 可以用于构建客户端应用程序&#xff0c;组件的代码在浏览器中运行&#xff0c;并输出 DOM 元素。同时&#xff0c;Vue.js 还可以在 Node.js 环境中运行&#xff0c;它可以将同样的组件渲染为字符串并发送给浏览器。这实际上描述了 Vue.js 的两种渲染方式&#xff0c;即…

爬虫项目实战:利用基于selenium框架的爬虫模板爬取豆瓣电影Top250

&#x1f44b; Hi, I’m 货又星&#x1f440; I’m interested in …&#x1f331; I’m currently learning …&#x1f49e; I’m looking to collaborate on …&#x1f4eb; How to reach me … README 目录&#xff08;持续更新中&#xff09; 各种错误处理、爬虫实战及模…

Lua实现面向对象三大特性

面向对象是基于table实现的 封装 :(冒号) 自动将调用该函数的对象作为第一个参数传入 --Object就是第一参数 function Object:new() self&#xff1a;代表默认传入的第一个参数 _index&#xff1a;当自己的变量中找不到时&#xff0c;会默认找原表中_index指向的内容 Obj…

AD9361快速开发指南

AD9361是ADI&#xff08;Analog Devices&#xff09;公司推出的一款全集成的RF收发器芯片&#xff0c;广泛应用于无线通信系统&#xff0c;包括基于FPGA和ARM处理器的数码电视&#xff0c;卫星通信&#xff0c;雷达通信&#xff0c;军事通信和工业控制等领域。AD9361提供了广泛…

Qt问题 QString 和 void* 相互转化

QString转为void*格式 //将路径QString转为void*格式QByteArray byteArray qstrFilePath.toUtf8();char* charArray byteArray.data();void* voidPath static_cast<void*>(charArray);void*转为QString格式 char* charPath static_cast<char*>(voidPath); QS…

从 RBAC 到 NGAC ,企业如何实现自动化权限管理?

随着各领域加快向数字化、移动化、互联网化的发展&#xff0c;企业信息环境变得庞大复杂&#xff0c;身份和权限管理面临巨大的挑战。为了满足身份管理法规要求并管理风险&#xff0c;企业必须清点、分析和管理用户的访问权限。如今&#xff0c;越来越多的员工采用移动设备进行…