目录
- 概述
- 实践
- topic
- 消费者
- 效果
- 消费指定topic的某个分区
- 代码
- 效果
- kafka分区策略-Range
概述
Kafka消费者API实践
实践
topic
# ./kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 3 --replication-factor 1 --topic test03
[root@hadoop02 bin]# ./kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 3 --replication-factor 1 --topic test03
Created topic test03.
消费者
public class KafkaConsumerApp {public static final String BROKERS = "hadoop02:9092";public static final String TOPIC = "test03";public static final String GROUP = "test-group";private static KafkaConsumer<String,String> kafkaConsumer;/*** 资源初始化*/@Beforepublic void setUp() {Properties props = new Properties();// 连接至kafka集群props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);// 反序列化props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 设置消费者组idprops.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP);// 创建一个消费者对象kafkaConsumer = new KafkaConsumer<>(props);}@Testpublic void test() {List<String> topics = new ArrayList<>();topics.add(TOPIC);kafkaConsumer.subscribe(topics);// 消费 kafka 中的 topic 的数据while (true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : consumerRecords) {String key = record.key();String value = record.value();String topic = record.topic();int partition = record.partition();long offset = record.offset();String result= "key:"+key+" ,value:"+value+" ,topic:"+topic+" ,partition:"+partition+" ,offset:"+offset;System.out.println(result);}}}/*** 资源释放*/@Afterpublic void close() {if (null != kafkaConsumer) {kafkaConsumer.close();}}}
效果
消费指定topic的某个分区
代码
public class KafkaConsumerApp {public static final String BROKERS = "hadoop02:9092";public static final String TOPIC = "test03";public static final String GROUP = "test-group";private static KafkaConsumer<String,String> kafkaConsumer;/*** 资源初始化*/@Beforepublic void setUp() {Properties props = new Properties();// 连接至kafka集群props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);// 反序列化props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 设置消费者组idprops.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP);// 创建一个消费者对象kafkaConsumer = new KafkaConsumer<>(props);}/*** 消费指定分区的数据 (此案例消费 TOPIC 分区0 中的数据)*/@Testpublic void test2() {List<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition(TOPIC, 0));kafkaConsumer.assign(topicPartitions);// 消费 kafka 中的 topic 的数据while (true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : consumerRecords) {String key = record.key();String value = record.value();String topic = record.topic();int partition = record.partition();long offset = record.offset();String result= "key:"+key+" ,value:"+value+" ,topic:"+topic+" ,partition:"+partition+" ,offset:"+offset;System.out.println(result);}}}/*** 资源释放*/@Afterpublic void close() {if (null != kafkaConsumer) {kafkaConsumer.close();}}}
效果
kafka分区策略-Range
// 设置分区器
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
可以自定义:org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor