Kafka中的Topic

在Kafka中,Topic是消息的逻辑容器,用于组织和分类消息。本文将深入探讨Kafka Topic的各个方面,包括创建、配置、生产者和消费者,以及一些实际应用中的示例代码。

1. 介绍

在Kafka中,Topic是消息的逻辑通道,生产者将消息发布到Topic,而消费者从Topic订阅消息。每个Topic可以有多个分区(Partitions),每个分区可以在不同的服务器上,以实现横向扩展。

2. 创建和配置Topic

2.1 创建Topic

使用Kafka提供的命令行工具(kafka-topics.sh)或Kafka的API来创建Topic。下面是一个使用命令行工具创建Topic的示例:

bin/kafka-topics.sh --create --topic my_topic --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092

这将创建一个名为my_topic的Topic,有3个分区,复制因子为2。

2.2 配置Topic

Kafka的Topic有各种配置选项,可以通过修改Topic的属性来满足不同的需求。例如,可以设置消息保留时间、清理策略等。以下是一个配置Topic属性的示例:

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my_topic --alter --add-config max.message.bytes=1048576

这将修改my_topic的配置,将最大消息字节数设置为1 MB。

3. 生产者和消费者

3.1 生产者

生产者负责将消息发布到Topic。使用Kafka的Producer API,可以轻松地创建一个生产者。以下是一个简单的Java示例代码:

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(properties);producer.send(new ProducerRecord<>("my_topic", "key1", "value1"));
producer.close();

3.2 消费者

消费者从Topic中读取消息。Kafka的Consumer API提供了强大而灵活的方式来实现消费者。

以下是一个简单的Java示例代码:

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "my_group");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("my_topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());}
}

4. 实际应用示例

4.1 实时日志处理

在实时日志处理的场景中,Kafka的Topic可以按照日志类型进行划分,每个Topic代表一种日志类型。这样的设计可以使得系统更具可维护性、可扩展性,并且允许不同类型的日志通过独立的消费者进行处理。以下是一个更详细的示例代码,展示如何在实时日志处理中使用Kafka Topic:

4.1.1 创建日志类型Topic

首先,为不同的日志类型创建各自的Topic。以错误日志和访问日志为例:

# 创建错误日志Topic
bin/kafka-topics.sh --create --topic error_logs --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092# 创建访问日志Topic
bin/kafka-topics.sh --create --topic access_logs --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092
4.1.2 生产者发布日志消息

在应用中,生成错误日志和访问日志的代码可能如下:

// 错误日志生产者
Producer<String, String> errorLogProducer = new KafkaProducer<>(errorLogProperties);
errorLogProducer.send(new ProducerRecord<>("error_logs", "Error message"));// 访问日志生产者
Producer<String, String> accessLogProducer = new KafkaProducer<>(accessLogProperties);
accessLogProducer.send(new ProducerRecord<>("access_logs", "Access log message"));
4.1.3 消费者实时处理日志

创建独立的消费者来处理错误日志和访问日志:

// 错误日志消费者
Consumer<String, String> errorLogConsumer = new KafkaConsumer<>(errorLogProperties);
errorLogConsumer.subscribe(Collections.singletonList("error_logs"));while (true) {ConsumerRecords<String, String> records = errorLogConsumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理错误日志System.out.printf("Error Log - Offset = %d, Value = %s%n", record.offset(), record.value());}
}// 访问日志消费者
Consumer<String, String> accessLogConsumer = new KafkaConsumer<>(accessLogProperties);
accessLogConsumer.subscribe(Collections.singletonList("access_logs"));while (true) {ConsumerRecords<String, String> records = accessLogConsumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理访问日志System.out.printf("Access Log - Offset = %d, Value = %s%n", record.offset(), record.value());}
}
4.1.4 实时监控和分析

消费者可以通过实时处理日志来进行监控和分析。例如,可以使用流处理框架(如Kafka Streams)对日志进行聚合、过滤或转换。以下是一个简化的示例:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> errorLogsStream = builder.stream("error_logs");
KStream<String, String> accessLogsStream = builder.stream("access_logs");// 在这里进行实时处理,如聚合、过滤等// 通过输出Topic将处理结果发送到下游系统
errorLogsStream.to("processed_error_logs");
accessLogsStream.to("processed_access_logs");KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

通过这种设计,可以根据实际需要扩展不同类型的日志处理,同时确保系统具有高度的灵活性和可扩展性。在实际应用中,可能需要更详细的配置和处理逻辑,以满足具体的监控和分析需求。

4.2 事件溯源

在事件驱动的架构中,事件溯源是一种强大的方式,通过创建一个专门的Kafka Topic来记录每个业务事件的发生,以便随时追踪和回溯整个系统的状态。以下是一个基于Kafka的事件溯源的详细示例代码:

4.2.1 创建事件Topic

首先,为每个关键的业务事件创建一个专用的Kafka Topic,例如order_createdorder_shipped等:

# 创建订单创建事件Topic
bin/kafka-topics.sh --create --topic order_created --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092# 创建订单发货事件Topic
bin/kafka-topics.sh --create --topic order_shipped --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092
4.2.2 发布业务事件

在应用中,当业务事件发生时,将事件发布到相应的Topic。以下是一个订单创建事件和订单发货事件的示例:

// 订单创建事件生产者
Producer<String, String> orderCreatedProducer = new KafkaProducer<>(orderCreatedProperties);
orderCreatedProducer.send(new ProducerRecord<>("order_created", "order_id", "Order created - Order ID: 123"));// 订单发货事件生产者
Producer<String, String> orderShippedProducer = new KafkaProducer<>(orderShippedProperties);
orderShippedProducer.send(new ProducerRecord<>("order_shipped", "order_id", "Order shipped - Order ID: 123"));
4.2.3 事件溯源消费者

为了实现事件溯源,我们需要一个专用的消费者来订阅所有的事件Topic,并将事件记录到一个持久化存储中(如数据库、日志文件等):

// 事件溯源消费者
Consumer<String, String> eventTraceConsumer = new KafkaConsumer<>(eventTraceProperties);
eventTraceConsumer.subscribe(Arrays.asList("order_created", "order_shipped"));while (true) {ConsumerRecords<String, String> records = eventTraceConsumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理事件,可以将事件记录到数据库或日志文件中System.out.printf("Event Trace - Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());// 持久化处理逻辑}
}
4.2.4 事件回溯和分析

通过上述设置,可以在任何时候回溯系统中的每个事件,了解事件的发生时间、顺序和内容。通过将事件存储到持久化存储中,可以建立一个事件溯源系统,支持系统状态的分析、回滚和审计。

还可以使用流处理来实时分析事件,例如计算每个订单的处理时间、统计每个事件类型的发生频率等。以下是一个简单的流处理示例:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> eventStream = builder.stream(Arrays.asList("order_created", "order_shipped"));// 在这里进行实时处理,如计算处理时间、统计频率等// 通过输出Topic将处理结果发送到下游系统
eventStream.to("processed_events");KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

通过这种方式,可以在事件溯源系统中实现强大的监控、分析和管理功能,提高系统的可观察性和可维护性。

5. 消息处理语义

Kafka支持不同的消息处理语义,包括最多一次、最少一次和正好一次。这些语义由消费者的配置决定,可以根据应用的要求进行选择。以下是一个使用最多一次语义的消费者示例代码:

properties.put("enable.auto.commit", "false"); // 禁用自动提交偏移量
properties.put("auto.offset.reset", "earliest"); // 设置偏移量重置策略为最早Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("my_topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理消息System.out.printf("Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());}consumer.commitSync(); // 手动提交偏移量}
} finally {consumer.close();
}

6. 安全性和权限控制

Kafka提供了安全性特性,包括SSL加密、SASL认证等。在生产环境中,确保适当的安全性设置是至关重要的。

以下是一个使用SSL连接的生产者示例:

properties.put("security.protocol", "SSL");
properties.put("ssl.truststore.location", "/path/to/truststore");
properties.put("ssl.truststore.password", "truststore_password");Producer<String, String> producer = new KafkaProducer<>(properties);

7. 故障容忍和可伸缩性

7.1 多节点分布和分区

在Kafka中,分布式的设计允许数据分布在多个节点上,这提供了高度的可伸缩性。每个Topic可以分成多个分区,而这些分区可以分布在不同的服务器上。这种分布式设计使得Kafka可以轻松地处理大规模数据,并实现水平扩展。

7.1.1 增加分区数

要增加Topic的分区数,可以使用以下命令:

bin/kafka-topics.sh --alter --topic my_topic --partitions 5 --bootstrap-server localhost:9092

这将把my_topic的分区数增加到5,从而提高系统的吞吐量和可伸缩性。

7.2 复制因子

Kafka通过数据的复制来实现容错性。每个分区可以有多个副本,这些副本分布在不同的节点上。在节点发生故障时,其他副本可以继续提供服务。

7.2.1 增加复制因子

要增加Topic的复制因子,可以使用以下命令:

bin/kafka-topics.sh --alter --topic my_topic --partitions 3 --replication-factor 3 --bootstrap-server localhost:9092

这将把my_topic的复制因子增加到3,确保每个分区有3个副本。增加复制因子提高了系统的容错性,因为每个分区都有多个副本,即使一个节点发生故障,其他节点上的副本仍然可用。

7.3 节点故障处理

Kafka能够处理节点故障,确保系统的可用性。当一个节点发生故障时,Kafka会自动将该节点上的分区重新分配到其他可用节点上,以保持分区的复制因子。

7.3.1 节点故障模拟

为了模拟节点故障,你可以通过停止一个Kafka broker进程来模拟。Kafka会自动感知到该节点的故障,并进行分区的重新分配。

# 停止一个Kafka broker进程
bin/kafka-server-stop.sh config/server-1.properties

7.4 性能调优

在实际应用中,通过监控系统的性能指标,你可以调整Kafka的配置以满足不同的性能需求。例如,调整日志刷写频率、调整内存和磁盘的配置等,都可以对系统的性能产生影响。

总结

Kafka的Topic是构建实时流数据处理系统的核心组件之一。通过深入了解Topic的创建、配置、生产者和消费者,以及实际应用中的示例代码,可以更好地理解和应用Kafka。在实际项目中,根据具体需求和场景进行灵活配置,以确保系统的可靠性、性能和安全性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/206703.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【华为数据之道学习笔记】3-2 基础数据治理

基础数据用于对其他数据进行分类&#xff0c;在业界也称作参考数据。基础数据通常是静态的&#xff08;如国家、币种&#xff09;&#xff0c;一般在业务事件发生之前就已经预先定义。它的可选值数量有限&#xff0c;可以用作业务或IT的开关和判断条件。当基础数据的取值发生变…

基于OpenCV+CNN+IOT+微信小程序智能果实采摘指导系统——深度学习算法应用(含pytho、JS工程源码)+数据集+模型(五)

目录 前言总体设计系统整体结构图系统流程图 运行环境Python环境TensorFlow 环境Jupyter Notebook环境Pycharm 环境微信开发者工具OneNET云平台 模块实现1. 数据预处理2. 创建模型并编译3. 模型训练及保存4. 上传结果5. 小程序开发1&#xff09;查询图片2&#xff09;查询识别结…

paypal贝宝怎么绑卡支付

一、PayPal是什么 PayPal是一个很多国家地区通用的支付渠道&#xff0c;我们可以把它理解为一项在线服务&#xff0c;相当于美国版的支付宝。你可以通过PayPal进行汇款和收款&#xff0c;相比传统的电汇和西联那类的汇款方式&#xff0c;PayPal更加简单和容易&#xff0c;被很…

利用proteus实现串口助手和arduino Mega 2560的串口通信

本例用到的proteus版本为8.13&#xff0c;ardunio IDE版本为2.2.1&#xff0c;虚拟串口vspd版本为7.2&#xff0c;串口助手SSCOM V5.13.1。软件的下载安装有很多教程&#xff0c;大家可以自行搜索&#xff0c;本文只介绍如何利用这4种软件在proteus中实现arduino Mega 2560的串…

刷题记录--算法--简单

第一题 2582. 递枕头 已解答 简单 相关标签 相关企业 提示 n 个人站成一排&#xff0c;按从 1 到 n 编号。 最初&#xff0c;排在队首的第一个人拿着一个枕头。每秒钟&#xff0c;拿着枕头的人会将枕头传递给队伍中的下一个人。一旦枕头到达队首或队尾&#xff0c;传递…

高防IP是什么?有什么优势?

随着互联网的普及和快速发展&#xff0c;网络安全问题日益突出。在众多安全问题中&#xff0c;DDOS攻击是一种常见的攻击手段&#xff0c;它通过发送大量的无效或低效请求&#xff0c;使得目标服务器无法响应正常用户的请求&#xff0c;从而造成服务不可用的情况。为了解决这个…

如何使用eXtplorer+cpolar内网穿透搭建个人云存储实现公网访问

文章目录 1. 前言2. eXtplorer网站搭建2.1 eXtplorer下载和安装2.2 eXtplorer网页测试2.3 cpolar的安装和注册 3.本地网页发布3.1.Cpolar云端设置3.2.Cpolar本地设置 4.公网访问测试5.结语 1. 前言 通过互联网传输文件&#xff0c;是互联网最重要的应用之一&#xff0c;无论是…

PHP escapeshellarg()+escapeshellcmd()绕过

文章目录 函数利用escapeshellarg()函数escapeshellcmd()函数 exp执行原理攻击面例题 [BUUCTF 2018]Online Tool例题 [网鼎杯 2020 朱雀组]Nmap 函数利用 escapeshellarg()函数 单引号 ()&#xff1a;转义为 \。 双引号 (")&#xff1a;转义为 \"。 反斜杠 (\)&…

专业130+总分400+云南大学通信847专业基础综考研经验(原专业课827)

今年专业130总分400云南大学通信上岸&#xff0c;整体考研感觉还是比较满意&#xff0c;期间也付出了很多心血&#xff0c;走过弯路&#xff0c;下面分享一下这一年考研得失&#xff0c;希望大家可以从中有所借鉴。 先说明我在考研报名前更换成云南大学的理由&#xff1a;&…

谷歌正式发布最强 AI 模型 Gemini

2023年12月6日&#xff0c;谷歌公司宣布推出其被认为是规模最大、功能最强大的人工智能模型 Gemini。 Gemini将分为三个不同的套件&#xff1a;Gemini Ultra、Gemini Pro和Gemini Nano。 Gemini Ultra被认为具备最强大的能力&#xff0c;Gemini Pro则可扩展至多任务&#x…

xilinx原语详解及仿真——ODDR

ODDR位于OLOGIC中&#xff0c;可以把单沿传输的数据转换为双沿传输的数据&#xff0c; 在讲解ODDR功能之前&#xff0c;需要先了解OLOGIC的结构及功能。 1、OLOGIC OLOGIC块位于IOB的内侧&#xff0c;FPGA内部信号想要输出到管脚&#xff0c;都必须经过OLOGIC。OLOGIC资源的类…

CleanMyMac4.16中文最新版本下载

当很多人还在为电脑运行缓慢、工作问题不能快速得到解决而烦恼的时候&#xff0c;我已经使用过了多款系统清理工具&#xff0c;并找到了最适合我的那一款。我的电脑是超耐用的Mac book&#xff0c;接下来给大家介绍三种在众多苹果电脑清理软件的排名较高的软件。 一、Maintena…

NVIDIA与 Sparkfun 的合作伙伴在 Hackster.io 上发起了人工智能创新挑战赛,喊你来参加!

NVIDIA与 Sparkfun 的合作伙伴在 Hackster.io 上发起了人工智能创新挑战赛&#xff0c;喊你来参加&#xff01; 本次竞赛的目标旨在吸引开发者社区在 NVIDIA Jetson Orin 平台上为边缘构建生成式 AI 应用程序和模型&#xff0c;希望通过本次比赛提高人们对新 Jetson 生成式 AI…

四元数,欧拉角,旋转矩阵,旋转向量

四元数&#xff0c;旋转矩阵&#xff0c;旋转向量&#xff0c;欧拉角 一、欧拉角 1、欧拉角是表达旋转的最简单的一种方式&#xff0c;形式上它是一个三维向量&#xff0c;其值分别代表物体绕坐标系三个轴(x,y,z轴&#xff09;的旋转角度&#xff0c;默认旋转正向为逆坐标轴逆…

C#winform上下班打卡系统Demo

C# winform上下班打卡系统Demo 系统效果如图所示 7个label控件(lblUsername、lblLoggedInEmployeeId、lab_IP、lblCheckOutTime、lblCheckInTime、lab_starttime、lab_endtime)、3个按钮、1个dataGridView控件、2个groupBox控件 C#代码实现 using System; using System.Dat…

Java零基础——Elasticsearch篇

1.Elasticsearch简介 Elasticsearch是一个基于Lucene的一个开源的分布式、RESTful 风格的搜索和数据分析引擎。Elasticsearch是用Java语言开发的&#xff0c;并作为Apache许可条款下的开放源码发布&#xff0c;是一种流行的企业级搜索引擎。Elasticsearch用于云计算中&#xf…

【Ambari】Python调用Rest API 获取YARN HA状态信息并发送钉钉告警

&#x1f984; 个人主页——&#x1f390;开着拖拉机回家_Linux,大数据运维-CSDN博客 &#x1f390;✨&#x1f341; &#x1fa81;&#x1f341;&#x1fa81;&#x1f341;&#x1fa81;&#x1f341;&#x1fa81;&#x1f341; &#x1fa81;&#x1f341;&#x1fa81;&am…

二层交换原理

二层交换设备工作在OSI模型的第二层&#xff0c;即数据链路层&#xff0c;它对数据包的转发是建立在MAC&#xff08;Media Access Control &#xff09;地址基础之上的。二层交换设备不同的接口发送和接收数据独立&#xff0c;各接口属于不同的冲突域&#xff0c;因此有效地隔离…

【C/PTA —— 15.结构体2(课内实践)】

C/PTA —— 15.结构体2&#xff08;课内实践&#xff09; 7-1 计算职工工资7-2 计算平均成绩7-3 找出总分最高的学生7-4 通讯录的录入与显示 7-1 计算职工工资 #include<stdio.h> #include<stdlib.h> typedef struct GZ {char name[6];double j;double f;double z;…

记一次由 jedis 引发的离谱选学问题

背景 我的应用中&#xff0c;使用 jedis 作为连接 redis 的客户端&#xff0c;一直在用的好好的&#xff0c;后来有一个新的组件&#xff0c;也需要使用 redis&#xff0c;但是组件是内部封装的&#xff0c;我只能提供一个 StringReidsTempalte&#xff0c;所以我基于应用本身…