概念
消费者组(Consumer Group):由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者;
整体流程
流程说明:
- 消费者组包括多个消费者,每个消费者只能消费分区中的一部分数据;
- 当一个消费者组中的消费者读取一个分区中的数据时,其他消费者就不能再读取该分区中的数据;
- 一个消费者组可以有多个消费者,每个消费者只能消费分配给该消费者组的某些主题的某些分区;
- 同一个分区只会被一个消费者组中的一个消费者消费,不同消费者组之间可以重复消费;
- 当消费者组中的某个消费者宕机后,Kafka会将该消费者所消费的分区重新分配给其他消费者,从而实现消费者的高可用性;
- 消费者组中的消费者可以动态加入和退出,Kafka会自动重新分配分区;
- 在同一个消费者组内,消费者之间可以进行负载均衡,以此来提高消息的吞吐量和消费的效率;
- 消费者组可以通过消费者组ID(groupid)来标识,一个消费者组ID可以同时消费多个主题;
配置参数说明
参数名称 | 描述 |
---|---|
bootstrap.servers | 向Kafka集群建立初始连接用到的host/port列表。 |
key.deserializer和value.deserializer | 指定接收消息的key和value的反序列化类型。一定要写全类名。 |
group.id | 标记消费者所属的消费者组。 |
enable.auto.commit | 默认值为true,消费者会自动周期性地向服务器提交偏移量。 |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。 |
auto.offset.reset | 当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。 |
offsets.topic.num.partitions | __consumer_offsets的分区数,默认是50个分区。 |
heartbeat.interval.ms | Kafka消费者和coordinator之间的心跳时间,默认3s。 该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的1/3。 |
session.timeout.ms | Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。 |
fetch.min.bytes | 默认1个字节。消费者获取服务器端一批消息最小的字节数。 |
fetch.max.wait.ms | 默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。 |
fetch.max.bytes | 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。 |
max.poll.records | 一次poll拉取数据返回消息的最大条数,默认是500条。 |
分区策略
- Range
# 特点
确保每个消费者消费的分区数量是均衡的。
注意:Rangle范围分配策略是针对每个Topic的。
# 配置
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RangeAssignor。
# 算法公式
n = 分区数量 / 消费者数量
m = 分区数量 % 消费者数量
前m个消费者消费n+1个,剩余消费者消费n个。
- RoundRobin
# 特点
将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区以此分配给每个消费者。
# 配置
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RoundRobinAssignor。
- Sticky
# 特点
在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是Kafka从0.11.x版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
- CooperativeSticky
可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range+ CooperativeSticky。
回调函数说明
事件回调
- 设置回调
// 设置事件回调
m_event_cb = new ConsumerEventCb;
errorCode = m_config->set("event_cb", m_event_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{printf("Conf set(event_cb) failed, errorStr:%s\n", errorStr.c_str());return;
}
- 回调处理
// 设置事件回调
class ConsumerEventCb : public RdKafka::EventCb
{
public:void event_cb(RdKafka::Event &event) {switch (event.type()) {case RdKafka::Event::EVENT_ERROR:break;case RdKafka::Event::EVENT_STATS:break;case RdKafka::Event::EVENT_LOG:break;case RdKafka::Event::EVENT_THROTTLE:break;default:break;}}
};
消费者组再平衡回调
- 设置回调
// 设置消费者组再平衡回调
m_rebalance_cb = new ConsumerRebalanceCb;
errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{ELogError(("%s|Conf set(rebalance_cb) failed, errorStr:%s", GET_CODE_INFO(), errorStr.c_str()));break;
}
- 回调处理
// 设置消费者组再平衡回调
// 注册该函数会关闭 rdkafka 的自动分区赋值和再分配
class ConsumerRebalanceCb : public RdKafka::RebalanceCb
{
private:// 打印当前获取的分区static void printTopicPartition(const std::vector<RdKafka::TopicPartition *>&partitions) {for (unsigned int i = 0; i < partitions.size(); i++) {printf("count:%d, topic:%s,partition:%d\n",i, partitions[i]->topic().c_str(),partitions[i]->partition());}}public:// 消费者组再平衡回调void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err,std::vector<RdKafka::TopicPartition *> &partitions) {printf("RebalanceCb: %s", RdKafka::err2str(err).c_str());printTopicPartition(partitions);// 分区分配成功if (RdKafka::ERR__ASSIGN_PARTITIONS == err) {// 消费者订阅这些分区consumer->assign(partitions);// 获取消费者组本次订阅的分区数量,可以属于不同的topicm_partitionCount = (int)partitions.size();} else // 分区分配失败{// 消费者取消订阅所有的分区consumer->unassign();// 消费者订阅分区的数量为0m_partitionCount = 0;}}private:int m_partitionCount; // 消费者组本次订阅的分区数量
};
流程(c++)
- 配置消费者客户端;
- 订阅主题和分区;
- 拉取消息;
- 处理消息;
- 提交消费位移;
配置消费者客户端
int CKafkaConsumer::Create()
{std::string errorStr;RdKafka::Conf::ConfResult errorCode;do {// 1、创建配置对象// 1.1、构造 consumer conf 对象m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);if(nullptr == m_config){printf("Create RdKafka Conf failed.\n");break;}// 必要参数1:指定 broker 地址列表errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Conf set(bootstrap.servers) failed, errorStr:%s.\n",errorStr.c_str());break;}// 必要参数2:设置消费者组 iderrorCode = m_config->set("group.id", m_groupID, errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Conf set(group.id) failed, errorStr:%s.\n",errorStr.c_str());break;}// 设置事件回调m_event_cb = new ConsumerEventCb;errorCode = m_config->set("event_cb", m_event_cb, errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Conf set(event_cb) failed, errorStr:%s.\n",errorStr.c_str());break;}// 设置消费者组再平衡回调m_rebalance_cb = new ConsumerRebalanceCb;errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Conf set(rebalance_cb) failed, errorStr:%s.\n",errorStr.c_str());break;}// 当消费者到达分区结尾,发送 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件errorCode = m_config->set("enable.partition.eof", "false", errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Conf set(enable.partition.eof) failed, errorStr:%s.\n",errorStr.c_str());break;}// 每次最大拉取的数据大小errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Conf set(max.partition.fetch.bytes) failed, errorStr:%s.\n",errorStr.c_str());break;}// 设置分区分配策略:range、roundrobin、cooperative-stickyerrorCode = m_config->set("partition.assignment.strategy", "range", errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Conf set(partition.assignment.strategy) failed, errorStr:%s.\n",errorStr.c_str());break;}// 心跳探活超时时间---1serrorCode = m_config->set("session.timeout.ms", "6000", errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Conf set(session.timeout.ms) failed, errorStr:%s.\n",errorStr.c_str());break;}// 心跳保活间隔errorCode = m_config->set("heartbeat.interval.ms", "2000", errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Conf set(heartbeat.interval.ms) failed, errorStr:%s.\n",errorStr.c_str());break;}// 1.2、创建 topic conf 对象m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);if (nullptr == m_topicConfig) {printf("Create RdKafka Topic Conf failed.\n");break;}// 必要参数3:设置新到来消费者的消费起始位置,latest 消费最新的数据,earliest 从头开始消费errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Topic Conf set(auto.offset.reset) failed, errorStr:%s.\n",errorStr.c_str());break;}// 默认 topic 配置,用于自动订阅 topicserrorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);if (RdKafka::Conf::CONF_OK != errorCode) {printf("Conf set(default_topic_conf) failed, errorStr:%s.\n",errorStr.c_str());break;}// 2、创建 Consumer 对象m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);if (nullptr == m_consumer) {printf("Create KafkaConsumer failed, errorStr:%s.\n",errorStr.c_str());break;}printf("Created consumer success, consumerName:%s.\n",m_consumer->name().c_str());return 0;} while (0);Destroy();return -1;
}
订阅主题和分区
std::vector<std::string> topicsVec;
topicsVec.push_back("zd_test_topic_one");
topicsVec.push_back("zd_test_topic_two");RdKafka::ErrorCode errorCode = m_consumer->subscribe(topicsVec);
if (RdKafka::ERR_NO_ERROR != errorCode)
{printf("Subscribe failed, errorStr:%s\n", RdKafka::err2str(errorCode).c_str());return;
}
拉取消息
// 可放到线程中处理while (m_running)
{RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时if(NULL != msg){// 消费消息ConsumeMsg_(msg, NULL);m_consumer->commitAsync(); delete msg;}
}
处理消息
void KafkaConsumer::ConsumeMsg_(RdKafka::Message *msg, void *opaque)
{switch (msg->err()) {case RdKafka::ERR__TIMED_OUT: // 超时break;case RdKafka::ERR_NO_ERROR: // 有消息进来printf("Message in, topic:%s, partition:[%d], key:%s, payload:%s\n",msg->topic_name().c_str(), msg->partition(), msg->key()->c_str(), (char *)msg->payload());// 消息处理break;default:break;}
}
提交消费位移
m_consumer->commitAsync();