kafka 案例

kafka 案例

  • 目录
    • 概述
      • 需求:
    • 设计思路
    • 实现思路分析
      • 1.kafka案例_API 带回调函数的生产者
      • 2.kafka案例_API生产者分区策略测试
      • 3.kafka案例_自定义分区的生产者
      • 4.kafka案例_API同步发送生产者
      • 5.kafka案例_API简单消费者
      • 5.kafka案例_API消费者重置offset
  • 参考资料和推荐阅读

Survive by day and develop by night.
talk for import biz , show your perfect code,full busy,skip hardness,make a better result,wait for change,challenge Survive.
happy for hardess to solve denpendies.

目录

概述

需求:

设计思路

实现思路分析

1.kafka案例_API 带回调函数的生产者

以下是一个使用 Kafka 的 Java 生产者带回调函数的案例:

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class ProducerExample {public static void main(String[] args) {Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(properties);ProducerRecord<String, String> record = new ProducerRecord<>("topic-name", "key", "value");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null) {System.out.println("消息发送成功!");} else {System.out.println("消息发送失败:" + exception.getMessage());}}});producer.close();}
}

请注意替换 bootstrap.servers 的值为 Kafka 服务器的地址和端口,并替换 topic-namekeyvalue 分别为实际使用的主题、键和值。

在这个案例中,我们使用 KafkaProducer 类创建一个 Kafka 生产者。然后,通过创建一个 ProducerRecord 对象,我们指定了要发送到的主题、键和值。

然后,我们调用 producer.send() 方法来发送消息。此方法接受一个 Callback 参数,该参数用于在消息发送完成后执行回调函数。在回调函数中,我们可以检查发送结果并采取相应的操作。

最后,我们调用 producer.close() 方法来关闭生产者。

2.kafka案例_API生产者分区策略测试

在Kafka中,可以使用API生产者分区策略来决定将消息发送到哪个分区。以下是一个展示如何使用API生产者分区策略的示例代码。

首先,创建一个新的Java类,例如ProducerPartitionStrategyTest.java,并导入Kafka相关的依赖项。

import org.apache.kafka.clients.producer.*;import java.util.Properties;

接下来,定义一个自定义的Partitioner类,用于实现分区策略。在这个例子中,我们将根据消息的键来决定分区。如果键为偶数,则将消息发送到分区0;如果键为奇数,则将消息发送到分区1。

class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {int numPartitions = cluster.partitionCountForTopic(topic);int partition = 0;try {int keyValue = Integer.parseInt(key.toString());if (keyValue % 2 == 0) {partition = 0;} else {partition = 1;}} catch (NumberFormatException e) {partition = Math.abs(key.hashCode() % numPartitions);}return partition;}@Overridepublic void close() {// 不做任何操作}@Overridepublic void configure(Map<String, ?> configs) {// 不做任何操作}
}

然后,在主方法中创建一个Producer并设置自定义分区策略。

public class ProducerPartitionStrategyTest {public static void main(String[] args) {// 配置Kafka生产者Properties properties = new Properties();properties.put("bootstrap.servers", "localhost:9092");properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("partitioner.class", "com.example.CustomPartitioner");// 创建Kafka生产者Producer<String, String> producer = new KafkaProducer<>(properties);// 发送消息for (int i = 0; i < 10; i++) {String key = String.valueOf(i);String value = "Message " + i;ProducerRecord<String, String> record = new ProducerRecord<>("topic", key, value);producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.err.println("Error producing message: " + exception.getMessage());} else {System.out.println("Message sent to partition " + metadata.partition() + ", offset " + metadata.offset());}}});}// 关闭生产者producer.close();}
}

在以上示例中,我们首先配置了Kafka生产者的一些属性,例如Kafka服务器的地址、键和值的序列化程序以及分区策略。然后,我们创建了一个Kafka生产者,并使用自定义分区策略将消息发送到Kafka集群中。

在发送消息的循环中,我们创建了一个ProducerRecord对象,它包含要发送的消息的主题、键和值。然后,我们使用send()方法将消息发送到Kafka集群,并使用回调函数处理发送结果。

最后,我们关闭了生产者。

运行以上代码后,你将会看到消息被发送到正确的分区,并打印出消息的分区和偏移量的信息。

这就是一个简单的演示如何使用API生产者分区策略的例子。你可以根据自己的需求来实现自定义的分区策略,并将消息发送到合适的分区中。

3.kafka案例_自定义分区的生产者

自定义分区的生产者示例代码:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;public class CustomPartitionProducer {public static void main(String[] args) {// Kafka 服务器地址String bootstrapServers = "localhost:9092";// 主题名称String topic = "custom-partition-topic";// 创建生产者配置Properties props = new Properties();props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 创建自定义分区器props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息到自定义分区Random random = new Random();for (int i = 0; i < 10; i++) {String key = "key" + i;String value = "value" + i;producer.send(new ProducerRecord<>(topic, key, value));System.out.println("Sent message: key=" + key + ", value=" + value);}// 关闭生产者producer.close();}public static class CustomPartitioner implements org.apache.kafka.clients.producer.Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();// 这里自定义分区逻辑// 根据key的尾数来决定消息被发送到哪个分区int partition = Integer.parseInt(key.toString().substring(key.toString().length() - 1)) % numPartitions;System.out.println("Custom partitioner: topic=" + topic + ", key=" + key + ", partition=" + partition);return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}}
}

上面的代码示例首先创建一个自定义分区器CustomPartitioner,然后在生产者配置中指定该分区器类:

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());

在自定义分区器中,根据key的尾数来决定消息被发送到哪个分区:

int partition = Integer.parseInt(key.toString().substring(key.toString().length() - 1)) % numPartitions;

然后启动生产者,发送消息到指定的主题。每条消息的key都是以数字结尾的字符串,根据key的尾数来选择分区。输出中会打印出消息的详细信息,包括主题、key和分区信息。

4.kafka案例_API同步发送生产者

下面是使用Kafka客户端库进行API同步发送的一个示例:

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {String bootstrapServers = "localhost:9092";String topic = "test-topic";String key = "key1";String value = "value1";// 配置Kafka生产者Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);// 创建消息实例ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);try {// 发送消息并等待返回结果RecordMetadata metadata = producer.send(record).get();System.out.println("消息发送成功,Topic: " + metadata.topic() +", Partition: " + metadata.partition() +", Offset: " + metadata.offset());} catch (Exception e) {System.out.println("消息发送失败:" + e.getMessage());} finally {// 关闭Kafka生产者producer.close();}}
}

这个示例使用了Kafka的Java客户端库,并创建了一个Kafka生产者实例。代码中设置了Kafka服务器的地址、要发送的主题、消息的键和值。还配置了键和值的序列化器为StringSerializer

然后,创建了一个ProducerRecord实例来包装要发送的消息。通过调用producer.send(record).get()方法发送消息并等待返回结果。发送成功后,通过返回的RecordMetadata对象获取到消息的元数据,包括发送到的Topic、Partition和Offset。

最后,关闭Kafka生产者。

5.kafka案例_API简单消费者

以下是一个简单的Kafka案例,使用Kafka的Java API实现一个简单的消费者。

首先,需要安装Kafka并启动Kafka服务。然后,创建一个Kafka消费者来消费指定的主题。

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Collections;
import java.util.Properties;public class SimpleConsumer {public static void main(String[] args) {// Kafka broker的地址String bootstrapServers = "localhost:9092";// 消费者组的IDString groupId = "test-group";// 要消费的主题String topic = "test-topic";// 创建Kafka消费者的配置Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 创建Kafka消费者Consumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList(topic));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);// 处理每条消息records.forEach(record -> {System.out.println("Received message: " + record.value());});}}
}

在这个例子中,首先定义了Kafka broker的地址、消费者组的ID和要消费的主题。然后,创建了一个Kafka消费者的配置,并设置了必要的属性。然后,创建了一个Kafka消费者,并使用subscribe()方法订阅了指定的主题。最后,使用poll()方法从Kafka集群拉取消息,并使用forEach()方法对每条消息进行处理。

运行这个消费者应用程序,它将开始消费指定主题的消息并打印出来。

注意:这只是一个简单的Kafka消费者示例,没有处理异常或实现自动提交偏移量。在实际应用中,需要根据具体需求添加更多的处理逻辑。

5.kafka案例_API消费者重置offset

要在Java中重置Kafka消费者的偏移量(offset),您可以使用以下代码片段:

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;import java.util.Arrays;
import java.util.Properties;public class ConsumerOffsetResetExample {public static void main(String[] args) {String topic = "your-topic";String bootstrapServers = "localhost:9092";String groupId = "your-group-id";// 创建Kafka消费者配置Properties properties = new Properties();properties.setProperty("bootstrap.servers", bootstrapServers);properties.setProperty("group.id", groupId);properties.setProperty("enable.auto.commit", "false");  // 禁用自动提交偏移量// 创建Kafka消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);// 订阅主题consumer.subscribe(Arrays.asList(topic));// 将偏移量重置到最早的可用位置consumer.seekToBeginning(consumer.assignment());// 或者将偏移量重置到最新的可用位置// consumer.seekToEnd(consumer.assignment());// 处理消息try {while (true) {// 拉取消息// ConsumerRecord<String, String> record = consumer.poll(Duration.ofMillis(100)).iterator().next();// 处理消息// ...// 手动提交偏移量// consumer.commitSync();}} finally {// 关闭Kafka消费者consumer.close();}}
}

在上面的代码中,我们使用seekToBeginning方法将偏移量重置为最早的可用位置。您还可以使用seekToEnd方法将偏移量重置为最新的可用位置。请根据您的需求选择适当的方法。

参考资料和推荐阅读

参考资料
官方文档
开源社区
博客文章
书籍推荐

  1. 暂无

欢迎阅读,各位老铁,如果对你有帮助,点个赞加个关注呗!同时,期望各位大佬的批评指正~,如果有兴趣,可以加文末的交流群,大家一起进步哈

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/16234.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux网络编程: 网络基础

Linux网络编程: 网络基础 1.网络划分 一.网络通信1.网络通信和本地通信的联系2.网络通信与本地通信最本质的区别及其衍生出的问题 二.网络协议初识1.为何本地通信不行&#xff1f;2.打电话的例子体会分层模型的好处3.OSI七层模型的提出4.OSI七层模型与TCP/IP五层模型1.TCP/IP与…

二十九篇:构建未来:信息系统的核心框架与应用

构建未来&#xff1a;信息系统的核心框架与应用 1. 引言 在这个充满挑战和机遇的信息时代&#xff0c;信息系统已经成为现代组织不可或缺的神经中枢。它们不仅革新了我们处理信息的方式&#xff0c;更是极大地增强了决策制定的效率和质量。在这篇文章中&#xff0c;我将分享我…

php 一个数组中的元素是否在一个字符串中包含

php 一个数组中的元素是否在一个字符串中包含 要检查一个数组中的元素是否在一个字符串中出现&#xff0c;你可以使用strpos()函数。这个函数返回子字符串首次出现的位置索引&#xff0c;如果没有找到&#xff0c;它会返回false。 $array [apple, banana, cherry]; $string …

Java进阶-SpringCloud使用BeanUtil工具类简化对象之间的属性复制和操作

在Java编程中&#xff0c;BeanUtil工具类是一种强大且便捷的工具&#xff0c;用于简化对象之间的属性复制和操作。本文将介绍BeanUtil的基本功能&#xff0c;通过详细的代码示例展示其应用&#xff0c;并与其他类似工具进行对比。本文还将探讨BeanUtil在实际开发中的优势和使用…

有效的字母异位词-力扣

看到题目首先想到的解法是使用unordered_map容器来做&#xff0c;遍历第一个字符串&#xff0c;统计每个char出现的次数&#xff0c;然后遍历第二个字符串&#xff0c;出现一个char&#xff0c;map对应值就减一&#xff0c;最后判断map容器所有键值是否为0&#xff0c;如果全部…

5.26牛客循环结构

1002. 难点&#xff1a; 两层循环条件设置 思路 可以设置三个变量 代码 1003 思路&#xff1a; 与星号双塔差不多&#xff0c;在此基础上加大一点难度 每日练题5.23 &#xff08;EOF用法&#xff09;-CSDN博客 代码 1004 代码

Android Compose 八:常用组件 Switch

Switch 切换按钮 val isChecked remember { mutableStateOf(true) }Switch(checked isChecked.value,onCheckedChange {Log.i("text_compose","onCheckedChange>>"it)isChecked.value it})效果 默认颜色 应该对应 主题色 1.1 thumbContent 按钮…

mapreduce综合应用案例 — 招聘数据清洗

MapReduce是一种编程模型&#xff0c;用于处理和生成大数据集。它通过Map和Reduce两个步骤来实现数据的分布式处理。在招聘数据清洗的场景中&#xff0c;MapReduce可以用来处理大规模的招聘数据集&#xff0c;以提取、清洗和转换数据&#xff0c;为进一步的分析和决策提供支持。…

Python Beautiful Soup 使用详解

大家好&#xff0c;在网络爬虫和数据抓取的领域中&#xff0c;Beautiful Soup 是一个备受推崇的 Python 库&#xff0c;它提供了强大而灵活的工具&#xff0c;帮助开发者轻松地解析 HTML 和 XML 文档&#xff0c;并从中提取所需的数据。本文将深入探讨 Beautiful Soup 的使用方…

vue项目打包教程

如果是用 vue-cli 创建的项目&#xff0c;则项目目录中没有 config 文件夹&#xff0c;所以我们需要自建一个配置文件&#xff1b;在vue项目目录下创建文件 vue.config.js&#xff0c;需注意文件名称必须是 vue.config.js&#xff0c;然后在文件中插入以下代码&#xff1a; 文件…

我让gpt4o给我推荐了一千多次书 得到了这些数据

事情是这样的&#xff0c;我们公司不是有个读书小组嘛&#xff0c;但是今年大家都忙于工作&#xff0c;忽视了读书这件事&#xff0c;所以我就想着搞个群机器人&#xff0c;让它明天定时向群里推荐一本书&#xff0c;用来唤起大家对读书的兴趣。但在调试的过程中就发现gpt4o老喜…

ACM实训冲刺第十九天

第一套&#xff08;搞定&#xff09; #include<stdio.h> #include<string.h> int main(){int n;scanf("%d",&n);char s[100];getchar();for(int i0;i<n;i){gets(s);int cnta0,cnte0,cnti0,cnto0,cntu0;for(int i0;i<strlen(s);i){if(s[i]a){…

齿轮常见故障学习笔记

大家好&#xff0c;这期咱们聊一聊齿轮常见的失效形式&#xff0c;查阅了相关的资料&#xff0c;做个笔记分享给大家&#xff0c;共同学习。 介绍 齿轮故障可能以多种方式发生。如果在设计阶段本身就尽量防止这些故障的产生&#xff0c;则可以产生改更为优化的齿轮设计。齿轮…

pytest框架用例命名规则详解

pytest 测试用例的命名规则是为了确保 pytest 能够正确地识别和执行测试用例。 以下是关于 pytest 测试用例命名规则的详细解释&#xff1a; 1 单个测试文件以‘test_’开头或者以‘_test’结尾 比如我们创建test_case1.py case2_test.py文件。 2 单个测试文件中&#xff0c…

58. UE5 RPG AI行为树的装饰器

书接56. UE5 RPG 给敌人添加AI实现跟随玩家&#xff0c;我们实现了AI一些基础设置&#xff0c;并实现了获取敌人附近的玩家实现了跟随功能 接下来&#xff0c;我们将实现区分职业&#xff0c;并根据职业不同设置不同的攻击距离&#xff0c;并且根据职业实现不同的技能施放。 …

《Effective Objective-C 2.0》读书笔记——对象、消息、运行期

目录 第二章&#xff1a;对象、消息、运行期第6条&#xff1a;理解“属性”这一概念第7条&#xff1a;在对象内部尽量直接访问实例变量第8条&#xff1a;理解“对象等同性”这一概念第9条&#xff1a;以“类族模式”隐藏实现细节第10条&#xff1a;在既有类中使用关联对象存放自…

App推广新境界:Xinstall助你轻松突破运营痛点,实现用户快速增长!

在移动互联网时代&#xff0c;App已经成为企业营销不可或缺的一部分。然而&#xff0c;如何有效地推广App&#xff0c;吸引并留住用户&#xff0c;成为了众多企业面临的难题。今天&#xff0c;我们将为您揭秘一款神奇的App推广工具——Xinstall&#xff0c;它将助您轻松突破运营…

绘唐3模型怎么放本地sd安装及模型放置位置 及云端sd部署

绘唐3模型怎么放本地sd安装及模型放置位置 及云端sd部署 资料里面授权方式&#xff1a; https://qvfbz6lhqnd.feishu.cn/wiki/CcaewIWnSiAFgokOwLycwi0Encf 云端和模型之间存在某种关联性。云端通常用于存储和管理大量数据&#xff0c;并提供计算和资源的服务。模型是对数据进…

Linux环境下TensorFlow安装教程

TensorFlow是学习深度学习时常用的Python神经网络框 下面以Mask R-CNN 的环境配置为例&#xff1a; 首先进入官网&#xff1a;www.tensorflow.org TensorFlow安装的总界面&#xff1a; 新建anaconda虚拟环境&#xff1a; conda create -n envtf2 python3.8 &#xff08;Pyth…

『大模型笔记』从头开始代码构建GPT!

从头开始代码构建GPT! 文章目录 一. 从头开始代码构建GPT!二. 参考文献一. 从头开始代码构建GPT! 我们构建了一个生成式预训练Transformer (GPT),遵循论文《Attention is All You Need》和OpenAI的GPT-2 / GPT-3的方法。我们讨论了与ChatGPT的联系,ChatGPT已经风靡全球。我…