kafka 案例
- 目录
- 概述
- 需求:
- 设计思路
- 实现思路分析
- 1.kafka案例_API 带回调函数的生产者
- 2.kafka案例_API生产者分区策略测试
- 3.kafka案例_自定义分区的生产者
- 4.kafka案例_API同步发送生产者
- 5.kafka案例_API简单消费者
- 5.kafka案例_API消费者重置offset
- 参考资料和推荐阅读
Survive by day and develop by night.
talk for import biz , show your perfect code,full busy,skip hardness,make a better result,wait for change,challenge Survive.
happy for hardess to solve denpendies.
目录
概述
需求:
设计思路
实现思路分析
1.kafka案例_API 带回调函数的生产者
以下是一个使用 Kafka 的 Java 生产者带回调函数的案例:
import org.apache.kafka.clients.producer.*;import java.util.Properties;public class ProducerExample {public static void main(String[] args) {Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(properties);ProducerRecord<String, String> record = new ProducerRecord<>("topic-name", "key", "value");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("消息发送成功!");} else {System.out.println("消息发送失败:" + exception.getMessage());}}});producer.close();}
}
请注意替换 bootstrap.servers
的值为 Kafka 服务器的地址和端口,并替换 topic-name
、key
和 value
分别为实际使用的主题、键和值。
在这个案例中,我们使用 KafkaProducer
类创建一个 Kafka 生产者。然后,通过创建一个 ProducerRecord
对象,我们指定了要发送到的主题、键和值。
然后,我们调用 producer.send()
方法来发送消息。此方法接受一个 Callback
参数,该参数用于在消息发送完成后执行回调函数。在回调函数中,我们可以检查发送结果并采取相应的操作。
最后,我们调用 producer.close()
方法来关闭生产者。
2.kafka案例_API生产者分区策略测试
在Kafka中,可以使用API生产者分区策略来决定将消息发送到哪个分区。以下是一个展示如何使用API生产者分区策略的示例代码。
首先,创建一个新的Java类,例如ProducerPartitionStrategyTest.java
,并导入Kafka相关的依赖项。
import org.apache.kafka.clients.producer.*;import java.util.Properties;
接下来,定义一个自定义的Partitioner
类,用于实现分区策略。在这个例子中,我们将根据消息的键来决定分区。如果键为偶数,则将消息发送到分区0;如果键为奇数,则将消息发送到分区1。
class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {int numPartitions = cluster.partitionCountForTopic(topic);int partition = 0;try {int keyValue = Integer.parseInt(key.toString());if (keyValue % 2 == 0) {partition = 0;} else {partition = 1;}} catch (NumberFormatException e) {partition = Math.abs(key.hashCode() % numPartitions);}return partition;}@Overridepublic void close() {// 不做任何操作}@Overridepublic void configure(Map<String, ?> configs) {// 不做任何操作}
}
然后,在主方法中创建一个Producer
并设置自定义分区策略。
public class ProducerPartitionStrategyTest {public static void main(String[] args) {// 配置Kafka生产者Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("partitioner.class", "com.example.CustomPartitioner");// 创建Kafka生产者Producer<String, String> producer = new KafkaProducer<>(properties);// 发送消息for (int i = 0; i < 10; i++) {String key = String.valueOf(i);String value = "Message " + i;ProducerRecord<String, String> record = new ProducerRecord<>("topic", key, value);producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.err.println("Error producing message: " + exception.getMessage());} else {System.out.println("Message sent to partition " + metadata.partition() + ", offset " + metadata.offset());}}});}// 关闭生产者producer.close();}
}
在以上示例中,我们首先配置了Kafka生产者的一些属性,例如Kafka服务器的地址、键和值的序列化程序以及分区策略。然后,我们创建了一个Kafka生产者,并使用自定义分区策略将消息发送到Kafka集群中。
在发送消息的循环中,我们创建了一个ProducerRecord
对象,它包含要发送的消息的主题、键和值。然后,我们使用send()
方法将消息发送到Kafka集群,并使用回调函数处理发送结果。
最后,我们关闭了生产者。
运行以上代码后,你将会看到消息被发送到正确的分区,并打印出消息的分区和偏移量的信息。
这就是一个简单的演示如何使用API生产者分区策略的例子。你可以根据自己的需求来实现自定义的分区策略,并将消息发送到合适的分区中。
3.kafka案例_自定义分区的生产者
自定义分区的生产者示例代码:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;public class CustomPartitionProducer {public static void main(String[] args) {// Kafka 服务器地址String bootstrapServers = "localhost:9092";// 主题名称String topic = "custom-partition-topic";// 创建生产者配置Properties props = new Properties();props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 创建自定义分区器props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息到自定义分区Random random = new Random();for (int i = 0; i < 10; i++) {String key = "key" + i;String value = "value" + i;producer.send(new ProducerRecord<>(topic, key, value));System.out.println("Sent message: key=" + key + ", value=" + value);}// 关闭生产者producer.close();}public static class CustomPartitioner implements org.apache.kafka.clients.producer.Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();// 这里自定义分区逻辑// 根据key的尾数来决定消息被发送到哪个分区int partition = Integer.parseInt(key.toString().substring(key.toString().length() - 1)) % numPartitions;System.out.println("Custom partitioner: topic=" + topic + ", key=" + key + ", partition=" + partition);return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}}
}
上面的代码示例首先创建一个自定义分区器CustomPartitioner
,然后在生产者配置中指定该分区器类:
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
在自定义分区器中,根据key的尾数来决定消息被发送到哪个分区:
int partition = Integer.parseInt(key.toString().substring(key.toString().length() - 1)) % numPartitions;
然后启动生产者,发送消息到指定的主题。每条消息的key都是以数字结尾的字符串,根据key的尾数来选择分区。输出中会打印出消息的详细信息,包括主题、key和分区信息。
4.kafka案例_API同步发送生产者
下面是使用Kafka客户端库进行API同步发送的一个示例:
import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {String bootstrapServers = "localhost:9092";String topic = "test-topic";String key = "key1";String value = "value1";// 配置Kafka生产者Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);// 创建消息实例ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);try {// 发送消息并等待返回结果RecordMetadata metadata = producer.send(record).get();System.out.println("消息发送成功,Topic: " + metadata.topic() +", Partition: " + metadata.partition() +", Offset: " + metadata.offset());} catch (Exception e) {System.out.println("消息发送失败:" + e.getMessage());} finally {// 关闭Kafka生产者producer.close();}}
}
这个示例使用了Kafka的Java客户端库,并创建了一个Kafka生产者实例。代码中设置了Kafka服务器的地址、要发送的主题、消息的键和值。还配置了键和值的序列化器为StringSerializer
。
然后,创建了一个ProducerRecord
实例来包装要发送的消息。通过调用producer.send(record).get()
方法发送消息并等待返回结果。发送成功后,通过返回的RecordMetadata
对象获取到消息的元数据,包括发送到的Topic、Partition和Offset。
最后,关闭Kafka生产者。
5.kafka案例_API简单消费者
以下是一个简单的Kafka案例,使用Kafka的Java API实现一个简单的消费者。
首先,需要安装Kafka并启动Kafka服务。然后,创建一个Kafka消费者来消费指定的主题。
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Collections;
import java.util.Properties;public class SimpleConsumer {public static void main(String[] args) {// Kafka broker的地址String bootstrapServers = "localhost:9092";// 消费者组的IDString groupId = "test-group";// 要消费的主题String topic = "test-topic";// 创建Kafka消费者的配置Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 创建Kafka消费者Consumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList(topic));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);// 处理每条消息records.forEach(record -> {System.out.println("Received message: " + record.value());});}}
}
在这个例子中,首先定义了Kafka broker的地址、消费者组的ID和要消费的主题。然后,创建了一个Kafka消费者的配置,并设置了必要的属性。然后,创建了一个Kafka消费者,并使用subscribe()
方法订阅了指定的主题。最后,使用poll()
方法从Kafka集群拉取消息,并使用forEach()
方法对每条消息进行处理。
运行这个消费者应用程序,它将开始消费指定主题的消息并打印出来。
注意:这只是一个简单的Kafka消费者示例,没有处理异常或实现自动提交偏移量。在实际应用中,需要根据具体需求添加更多的处理逻辑。
5.kafka案例_API消费者重置offset
要在Java中重置Kafka消费者的偏移量(offset),您可以使用以下代码片段:
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;import java.util.Arrays;
import java.util.Properties;public class ConsumerOffsetResetExample {public static void main(String[] args) {String topic = "your-topic";String bootstrapServers = "localhost:9092";String groupId = "your-group-id";// 创建Kafka消费者配置Properties properties = new Properties();properties.setProperty("bootstrap.servers", bootstrapServers);properties.setProperty("group.id", groupId);properties.setProperty("enable.auto.commit", "false"); // 禁用自动提交偏移量// 创建Kafka消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);// 订阅主题consumer.subscribe(Arrays.asList(topic));// 将偏移量重置到最早的可用位置consumer.seekToBeginning(consumer.assignment());// 或者将偏移量重置到最新的可用位置// consumer.seekToEnd(consumer.assignment());// 处理消息try {while (true) {// 拉取消息// ConsumerRecord<String, String> record = consumer.poll(Duration.ofMillis(100)).iterator().next();// 处理消息// ...// 手动提交偏移量// consumer.commitSync();}} finally {// 关闭Kafka消费者consumer.close();}}
}
在上面的代码中,我们使用seekToBeginning
方法将偏移量重置为最早的可用位置。您还可以使用seekToEnd
方法将偏移量重置为最新的可用位置。请根据您的需求选择适当的方法。
参考资料和推荐阅读
参考资料
官方文档
开源社区
博客文章
书籍推荐
- 暂无
欢迎阅读,各位老铁,如果对你有帮助,点个赞加个关注呗!同时,期望各位大佬的批评指正~,如果有兴趣,可以加文末的交流群,大家一起进步哈