大数据-55 Kafka sh脚本使用 与 JavaAPI使用 topics.sh producer.sh consumer.sh kafka-clients

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka (正在更新…)

章节内容

上节我们完成了:

  • Kafka介绍
  • ZK的基本环境
  • Kafka下载解压配置
  • Kafka启动配置
  • Kafka启动服务

在这里插入图片描述

Kafka启动

上节我们通过sh脚本启动,但是当我们的SSH关闭的时候,Kafka服务也退出。
这里我们可以使用 Kakfa 的守护进程的方式启动,就可以在后台运行了。

kafka-server-start.sh -daemon /opt/servers/kafka_2.12-2.7.2/config/server.properties

启动之后,我们可以通过 ps 工具看到:

ps aux | grep kafka

返回结果如下图:
在这里插入图片描述

sh脚本使用

topics.sh

kakfa-topics.sh 用于管理主题

查看所有

kafka-topics.sh --list --zookeeper h121.wzk.icu:2181

当前执行返回的是空的,因为我们没有任何主题。

创建主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic wzk_topic_1 --partitions 1 --replication-factor 1

执行结果中,我们可以观察到,已经顺利的完成了。
在这里插入图片描述

查看主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --describe --topic wzk_topic_1

执行结果中,我们可以观察到,已经顺利的完成了。
在这里插入图片描述

删除主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --delete --topic wzk_topic_1

在这里插入图片描述

新建主题(用于测试)

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic wzk_topic_test --partitions 1 --replication-factor 1

producer.sh

kafka-console-producer.sh 用于生产消息

生成数据

kafka-console-producer.sh --topic wzk_topic_test --broker-list h121.wzk.icu:9092

手动生成一批数据来进行测试:
在这里插入图片描述

consumer.sh

kafka-console-consumer.sh 用于消费消息

消费数据

kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic wzk_topic_test

此时,我们需要再开启一个 Producer 产生数据,它才会继续消费。

从头消费

kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic wzk_topic_test --from-beginning

从头开始消费的话,我们可以看到消费者已经把刚才我们写入的数据都消费了
在这里插入图片描述

Java API

架构图

在这里插入图片描述

POM

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.7.2</version>
</dependency>

生产者1测试


public class TestProducer01 {public static void main(String[] args) throws Exception {Map<String, Object> configs = new HashMap<>();configs.put("bootstrap.servers", "h121.wzk.icu:9092");configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");configs.put("acks", "1");KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test",0, 0,"hello world by java!");Future<RecordMetadata> future = producer.send(record);future.get(3_000, TimeUnit.SECONDS);producer.close();}}

生产者1运行

  2024-07-12 11:53:11,542 INFO [org.apache.kafka.clients.producer.ProducerConfig] - ProducerConfig values: acks = 1batch.size = 16384bootstrap.servers = [h121.wzk.icu:9092]buffer.memory = 33554432client.dns.lookup = use_all_dns_ipsclient.id = producer-1compression.type = noneconnections.max.idle.ms = 540000delivery.timeout.ms = 120000enable.idempotence = falseinterceptor.classes = []internal.auto.downgrade.txn.commit = falsekey.serializer = class org.apache.kafka.common.serialization.IntegerSerializerlinger.ms = 0max.block.ms = 60000max.in.flight.requests.per.connection = 5max.request.size = 1048576

运行结果如下图:
在这里插入图片描述

生产者2测试

public class TestProducer02 {public static void main(String[] args) throws Exception {Map<String, Object> configs = new HashMap<>();configs.put("bootstrap.servers", "h121.wzk.icu:9092");configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");configs.put("acks", "1");KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test",0, 0,"hello world by java! CallBack test!");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {System.out.println("主题: " + recordMetadata.topic() + ", " +"分区: " + recordMetadata.partition() + ", " +"时间戳: " + recordMetadata.timestamp());} else {System.out.println("生产消息异常!!!");}}});producer.close();}}

运行之后,控制台输出:

2024-07-12 12:46:48,795 INFO [org.apache.kafka.common.utils.AppInfoParser] - Kafka version: 2.7.2
2024-07-12 12:46:48,795 INFO [org.apache.kafka.common.utils.AppInfoParser] - Kafka commitId: 37a1cc36bf4d76f3
2024-07-12 12:46:48,795 INFO [org.apache.kafka.common.utils.AppInfoParser] - Kafka startTimeMs: 1720759608792
2024-07-12 12:46:49,200 INFO [org.apache.kafka.clients.Metadata] - [Producer clientId=producer-1] Cluster ID: DGjwPmfLSk2OKosFFLZJpg
2024-07-12 12:46:49,209 INFO [org.apache.kafka.clients.producer.KafkaProducer] - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
主题: wzk_topic_test, 分区: 0, 时间戳: 1720759609201
2024-07-12 12:46:49,282 INFO [org.apache.kafka.common.metrics.Metrics] - Metrics scheduler closed
2024-07-12 12:46:49,282 INFO [org.apache.kafka.common.metrics.Metrics] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-07-12 12:46:49,282 INFO [org.apache.kafka.common.metrics.Metrics] - Metrics reporters closed
2024-07-12 12:46:49,283 INFO [org.apache.kafka.common.utils.AppInfoParser] - App info kafka.producer for producer-1 unregistered

运行的之后的控制台如下:
在这里插入图片描述

消费者01运行


public class TestConsumer01 {public static void main(String[] args) throws Exception {Map<String, Object> configs = new HashMap<>();configs.put("bootstrap.servers", "h121.wzk.icu:9092");configs.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");configs.put("group.id", "wzk-test");KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(configs);final List<String> topics = Arrays.asList("wzk_topic_test");consumer.subscribe(topics, new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> collection) {collection.forEach(item -> {System.out.println("剥夺的分区: " + item.partition());});}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> collection) {collection.forEach(item -> {System.out.println("接收的分区: " + item.partition());});}});final ConsumerRecords<Integer, String> records = consumer.poll(3_000);final Iterable<ConsumerRecord<Integer, String>> topic1Iterable = records.records("topic_1");topic1Iterable.forEach(record -> {System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));System.out.println("消息的key:" + record.key());System.out.println("消息的偏移量:" + record.offset());System.out.println("消息的分区号:" + record.partition());System.out.println("消息的序列化key字节数:" + record.serializedKeySize());System.out.println("消息的序列化value字节数:" + record.serializedValueSize());System.out.println("消息的时间戳:" + record.timestamp());System.out.println("消息的时间戳类型:" + record.timestampType());System.out.println("消息的主题:" + record.topic());System.out.println("消息的值:" + record.value());});consumer.close();}}

消费者01测试

2024-07-12 13:00:17,456 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] - [Consumer clientId=consumer-wzk-test-1, groupId=wzk-test] Adding newly assigned partitions: wzk_topic_test-0
接收的分区: 0
2024-07-12 13:00:17,480 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] - [Consumer clientId=consumer-wzk-test-1, groupId=wzk-test] Setting offset for partition wzk_topic_test-0 to the committed offset FetchPosition{offset=12, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[h121.wzk.icu:9092 (id: 0 rack: null)], epoch=0}}
消息头字段:[]
消息的key:0
消息的偏移量:12
消息的分区号:0
消息的序列化key字节数:4
消息的序列化value字节数:20
消息的时间戳:1720760404260
消息的时间戳类型:CreateTime
消息的主题:wzk_topic_test
消息的值:hello world by java!

控制台运行截图如下:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/50746.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何测量嵌入式软件程序(段)的执行时间?

测量嵌入式软件程序&#xff08;段&#xff09;的执行时间对于评估系统性能和优化代码至关重要。根据调研结果&#xff0c;汇总一些常用的方法如下&#xff1a; 1.插桩技术&#xff1a;这是一种纯软件的性能分析方法&#xff0c;通过在关键代码位置插入信息收集代码来实现。这…

禁毒教育展厅应如何创新展示方式,提升教育意义?

为了深刻揭示毒品的危害&#xff0c;促进禁毒知识的广泛传播&#xff0c;并显著提升公众的防范意识&#xff0c;禁毒教育展厅的推广举措正紧锣密鼓地展开。在这一关键进程中&#xff0c;展厅的空间布局与内容设计的合理性与针对性成为了至关重要的环节。接下来&#xff0c;我们…

angular入门基础教程(二)第一个angular组件

ng中的语法跟vue中是一样的插值语法&#xff0c;其实也是早期vue抄的ng的思路&#xff0c;使用{{variable}}形式&#xff0c;vue借鉴了ng和react&#xff0c;这个我们就不多了。 新建一个子组件 在项目根目录下面&#xff0c;执行 ng g component ./components/UserList这样…

【RL】强化学习入门:从基础到应用

本篇文章是博主强化学习RL领域学习时&#xff0c;用于个人学习、研究或者欣赏使用&#xff0c;并基于博主对相关等领域的一些理解而记录的学习摘录和笔记&#xff0c;若有不当和侵权之处&#xff0c;指出后将会立即改正&#xff0c;还望谅解。文章强化学习&#xff1a; 强化学习…

深入解析Kylin的元数据管理:架构与实践

引言 在大数据时代&#xff0c;元数据管理对于数据仓库的效率和可维护性至关重要。Apache Kylin&#xff0c;作为一个开源的分布式分析引擎&#xff0c;专门设计用于支持大数据的快速分析。Kylin 的元数据管理是其核心功能之一&#xff0c;它不仅支撑着数据模型的构建&#xf…

JDBC介绍及使用

目录 JDBC概述 JDBC概念 JDBC本质 JDBC好处 JDBC快速入门 JDBC API详解 DriverManager Connection Statement ResultSet PreparedStatement 数据库连接池 数据库连接池简介 数据库连接池实现 Driud使用 JDBC练习 JDBC概述 JDBC概念 JDBC 就是使用Java语言操作…

关于“毒药水式“色彩搭配的概念

关于"毒药水式"色彩搭配 历时63天&#xff0c;我精心打造了一个全方位型网站模板&#xff0c;其包含&#xff08;录音、留言、可视化图表及源码显示、音乐播放、多种游戏、相册图片展示、日历等多种功能&#xff09;。我将其命名为“常温”&#xff0c;我将于8月13日…

基因克隆技术在医学领域的应用实例有哪些?

基因克隆技术在医学领域的应用实例有哪些&#xff1f; 李升伟 基因克隆技术在医学领域有众多应用实例&#xff0c;以下为您列举一些常见的&#xff1a; 1. 胰岛素的生产&#xff1a;通过基因克隆技术&#xff0c;将人类胰岛素基因插入到细菌或酵母的基因组中&#xff0c;使…

Docker Compose V2 安装 ClickHouse v20.6.8.5 经验分享

前言 ClickHouse 是一款开源的分布式列式数据库管理系统,专门设计用于高性能的大数据分析和查询。 目前项目中用到的一个场景是将mongo的数据同步到clickhouse,使用clickhouse做报表,后续也将分享同步和使用方案 使用 Docker Compose 部署单机版,小项目和自己测试够用了,生…

String Functions(字符串函数)

String Functions&#xff08;字符串函数&#xff09;是一组用于操作字符串数据的函数&#xff0c;它们在多种编程语言、数据库查询语言以及特定工具中都有广泛的应用。这些函数允许用户执行诸如字符串的创建、修改、查询、比较、转换和格式化等操作。以下是对String Functions…

海外短剧CPS系统,平台短剧出海推广方案

随着国内短剧市场的蓬勃发展与国际化趋势的加速&#xff0c;海外观众对于高质量、富有创意的短剧内容需求日益增长。在此背景下&#xff0c;搭建一个高效、便捷的海外短剧CPS&#xff08;Cost Per Sales&#xff0c;按销售分润&#xff09;分销系统平台&#xff0c;能为内容创作…

vue2中手动关闭el-dropdown组件下拉菜单

主要实现代码 this.$refs.dropdown.hide(); 在el-dropdown组件上设置ref属性为"dropdown"&#xff0c;在关闭时获取el-dropdown组件实例&#xff0c;调用实例上的hide()方法即可。

实战内测-某内测项目站点FUZZ到Sql注入

0x1 前言 下面给师傅们分享的案例呢是前段时间实战的一个站点&#xff0c;也是我朋友前段时间让我测的一个站点。整体的测试流程也还算ok&#xff0c;然后里面有些细节要是对师傅们有帮助可以收藏下&#xff0c;后面主要是利用FUZZ打了一个sql注入漏洞上去。 0x2 fuzz和sql结…

C# 代码适配 Python

C# if obj is Type obj_1Python if isinstance(obj, Type):

python怎样去除长字符串中多处存在的‘[]{}’?

要去除长字符串中多处存在的特定字符&#xff08;例如[]和{}&#xff09;&#xff0c;可以使用str.replace()方法。 下面是一个示例代码&#xff1a; def remove_chars(string):# 要去除的字符chars_to_remove ["[", "]", "{", "}"…

Halcon Blob分析

斑点分析的思路&#xff1a;在图像中&#xff0c;相关对象的像素可以通过其灰度值来识别。例如下图的组织颗粒。这些颗粒是凉的&#xff0c;而液体是暗的&#xff0c;通过选择明亮像素(阈值)&#xff0c;可以很容易地检测到颗粒。在需要应用中&#xff0c;这种简单的暗像素和亮…

HarmonyOS持久化存储数据Preference

Preference首选项 首选项&#xff1a;首选项为应用提供Key-Value键值型的数据处理能力&#xff0c;支持应用持久化轻量级数据&#xff0c;并对其修改和查询。数据存储形式为键值对&#xff0c;键的类型为字符串型&#xff0c;值的存储数据类型包括数字型、字符型、布尔型以及这…

【优秀python web设计】基于Python flask的猫眼电影可视化系统,可视化用echart,前端Layui,数据库用MySQL,包括爬虫

1 绪论 1.1 设计背景及目的 猫眼电影作为国内知名的电影信息网站&#xff0c;拥有海量的电影信息、票房数据和用户评价数据。这些数据对于电影市场的研究和分析具有重要意义。然而&#xff0c;由于数据的复杂性和数据来源的多样性&#xff0c;如何有效地采集、存储和展示这些数…

复现波恩大学的“LiDiff:基于扩散模型实现3D LiDAR场景补全!”(点云补全)项目

本文的主要工作就是复现下述论文中的算法。 该论文全称&#xff1a;Scaling Diffusion Models to Real-World 3D LiDAR Scene Completion 一、准备工作 首先通读readme.md文件的内容&#xff0c;了解所需要的相关依赖和数据等内容。 一定要多读几遍&#xff0c;不要扫一眼就…

[Linux安全运维] LAMP 环境搭建保姆级教学(Apache + MySQL + PHP) ~~

LAMP LAMP 是一种网站技术&#xff0c;可以实现动态的网站页面部署。 1. LAMP概述 1 .1构成 Linux: 简介: Linux 是一种开源的操作系统&#xff0c;以其稳定性和安全性而著称。在 LAMP 堆栈中&#xff0c;它作为服务器操作系统运行。作用: 为应用程序提供一个稳定、安全的运…