1. Kafka 消费者的逻辑
- 配置消费者客户端参数。
- 创建相应的消费者实例。
- 订阅主题。
- 拉取消息并消费;
- 提交消息位移;
- 关闭消费者实例;
2 Kafka 的C++ API
2.1 RdKafka::Conf
见生成者实现文章。
2.2 RdKafka::Event
见生成者实现文章。
2.3 RdKafka::EventCb
见生成者实现文章。
2.4 RdKafka::TopicPartition
static TopicPartition * create(const std::string &topic, int partition);
//创建一个TopicPartition对象。static TopicPartition *create (const std::string &topic, int partition,int64_t offset);
//创建TopicPartition对象。static void destroy (std::vector<TopicPartition*> &partitions);
//销毁所有TopicPartition对象。const std::string & topic () const;
//返回Topic名称。int partition ();
//返回分区号。int64_t offset();
//返回位移。void set_offset(int64_t offset);
//设置位移。ErrorCode err();
//返回错误码。
2.5 RdKafka::RebalanceCb
virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< TopicPartition * >&partitions)=0;
用于RdKafka::KafkaConsunmer的组再平衡回调函数;注册rebalance_cb回调函数会关闭rdkafka的自动分区赋值和再分配并替换应用程序的rebalance_cb回调函数
再平衡回调函数负责对基于RdKafka::ERR_ASSIGN_PARTITIONS和RdKafka::ERR_REVOKE_PARTITIONS事件更新rdkafka的分区分配,也能处理任意前两者错误除外其它再平衡失败错误。对于RdKafka::ERR_ASSIGN_PARTITIONS和RdKafka::ERR_REVOKE_PARTITIONS事件之外的其它再平衡失败错误,必须调用unassign()同步状态。
没有再平衡回调函数,rdkafka也能自动完成再平衡过程,但注册一个再平衡回调函数可以使应用程序在执行其它操作时拥有更大的灵活性,例如从指定位置获取位移或手动提交位移。
C++封装API:
class ConsumerRebalanceCb : public RdKafka::RebalanceCb
{
private:static void printTopicPartition (const std::vector<RdKafka::TopicPartition*>&partitions) // 打印当前获取的分区{for (unsigned int i = 0 ; i < partitions.size() ; i++)std::cerr << partitions[i]->topic() <<"[" << partitions[i]->partition() << "], ";std::cerr << "\n";}public:void rebalance_cb (RdKafka::KafkaConsumer *consumer,RdKafka::ErrorCode err,std::vector<RdKafka::TopicPartition*> &partitions){std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": ";printTopicPartition(partitions);if (err == RdKafka::ERR__ASSIGN_PARTITIONS){consumer->assign(partitions);partition_count = (int)partitions.size();}else{consumer->unassign();partition_count = 0;}}
private:int partition_count;
};
2.6 RdKafka::Message
见生成者实现文章。
2.7 RdKafka::KafkaConsumer(核心)
KafkaConsumer是高级API,要求Kafka 0.9.0以上版本,当前支持range和roundrobin分区分配策略。
static KafkaConsumer * create(Conf *conf, std::string &errstr);
创建KafkaConsumer对象,conf对象必须配置Consumer要加入的消费者组。使用KafkaConsumer::close()进行关闭。ErrorCode assignment(std::vector< RdKafka::TopicPartition * > &partitions);
返回由RdKafka::KafkaConsumer::assign() 设置的当前分区。ErrorCode subscription(std::vector< std::string > &topics);
返回由RdKafka::KafkaConsumer::subscribe() 设置的当前订阅Topic。ErrorCode subscribe(const std::vector< std::string > &topics);
更新订阅Topic分区。ErrorCode unsubscribe();
将当前订阅Topic取消订阅分区。ErrorCode assign(const std::vector< TopicPartition * > &partitions);
将分配分区更新为partitions。ErrorCode unassign();
停止消费并删除当前分配的分区。Message * consume(int timeout_ms);
消费消息或获取错误事件,触发回调函数,会自动调用注册的回调函数,包括RebalanceCb、EventCb、OffsetCommitCb等。需要使用delete释放消息。应用程序必须确保consume在指定时间间隔内调用,为了执行等待调用的回调函数,即使没有消息。当RebalanceCb被注册时,在需要调用和适当处理内部Consumer同步状态时,确保consume在指定时间间隔内调用极为重要。应用程序必须禁止对KafkaConsumer对象调用poll函数。
如果RdKafka::Message::err()是ERR_NO_ERROR,则返回正常的消息;如果RdKafka::Message::err()是ERR_NO_ERRO,返回错误事件;如果RdKafka::Message::err()是ERR_TIMED_OUT,则超时。ErrorCode commitSync();
提交当前分配分区的位移,同步操作,会阻塞直到位移被提交或提交失败。如果注册了RdKafka::OffsetCommitCb回调函数,其会在KafkaConsumer::consume()函数内调用并提交位移。ErrorCode commitAsync();
异步提交位移。ErrorCode commitSync(Message *message);
基于消息对单个topic+partition对象同步提交位移。virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets) = 0;
对指定多个TopicPartition同步提交位移。ErrorCode commitAsync(Message *message);
基于消息对单个TopicPartition异步提交位移。virtual ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) = 0;
对多个TopicPartition异步提交位移。ErrorCode close();
正常关闭,会阻塞直到四个操作完成(触发避免当前分区分配的局部再平衡,停止当前赋值消费,提交位移,离开分组)virtual ConsumerGroupMetadata *groupMetadata () = 0;
返回本Consumer实例的Consumer Group的元数据。ErrorCode position (std::vector<TopicPartition*> &partitions)
获取TopicPartition对象中当前位移,会别填充TopicPartition对象的offset字段。ErrorCode seek (const TopicPartition &partition, int timeout_ms)
定位TopicPartition的Consumer到位移。timeout_ms为0,会开始Seek并立即返回;timeout_ms非0,Seek会等待timeout_ms时间。ErrorCode offsets_store (std::vector<TopicPartition*> &offsets)
为TopicPartition存储位移,位移会在auto.commit.interval.ms时提交或是被手动提交。enable.auto.offset.store属性必须设置为fasle。
3 Kafka 消费者客户端开发
3.1 必要的参数配置(bootstrap.servers)
在创建消费者的时候以下以下三个选项是必选的:
bootstrap.servers:指定 broker (kafka服务器)的地址清单,清单里不需要包含所有的 broker(kafka) 地址,生产者会从给定的 broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错。
group.id:consumer group 是 kafka 提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的 ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。
auto.offset.reset:这个参数是针对新的 groupid 中的消费者而言的,当有新 groupid 的消费者来