大数据-55 Kafka sh脚本使用 与 JavaAPI使用 topics.sh producer.sh consumer.sh kafka-clients

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka (正在更新…)

章节内容

上节我们完成了:

  • Kafka介绍
  • ZK的基本环境
  • Kafka下载解压配置
  • Kafka启动配置
  • Kafka启动服务

在这里插入图片描述

Kafka启动

上节我们通过sh脚本启动,但是当我们的SSH关闭的时候,Kafka服务也退出。
这里我们可以使用 Kakfa 的守护进程的方式启动,就可以在后台运行了。

kafka-server-start.sh -daemon /opt/servers/kafka_2.12-2.7.2/config/server.properties

启动之后,我们可以通过 ps 工具看到:

ps aux | grep kafka

返回结果如下图:
在这里插入图片描述

sh脚本使用

topics.sh

kakfa-topics.sh 用于管理主题

查看所有

kafka-topics.sh --list --zookeeper h121.wzk.icu:2181

当前执行返回的是空的,因为我们没有任何主题。

创建主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic wzk_topic_1 --partitions 1 --replication-factor 1

执行结果中,我们可以观察到,已经顺利的完成了。
在这里插入图片描述

查看主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --describe --topic wzk_topic_1

执行结果中,我们可以观察到,已经顺利的完成了。
在这里插入图片描述

删除主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --delete --topic wzk_topic_1

在这里插入图片描述

新建主题(用于测试)

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic wzk_topic_test --partitions 1 --replication-factor 1

producer.sh

kafka-console-producer.sh 用于生产消息

生成数据

kafka-console-producer.sh --topic wzk_topic_test --broker-list h121.wzk.icu:9092

手动生成一批数据来进行测试:
在这里插入图片描述

consumer.sh

kafka-console-consumer.sh 用于消费消息

消费数据

kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic wzk_topic_test

此时,我们需要再开启一个 Producer 产生数据,它才会继续消费。

从头消费

kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic wzk_topic_test --from-beginning

从头开始消费的话,我们可以看到消费者已经把刚才我们写入的数据都消费了
在这里插入图片描述

Java API

架构图

在这里插入图片描述

POM

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.7.2</version>
</dependency>

生产者1测试


public class TestProducer01 {public static void main(String[] args) throws Exception {Map<String, Object> configs = new HashMap<>();configs.put("bootstrap.servers", "h121.wzk.icu:9092");configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");configs.put("acks", "1");KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test",0, 0,"hello world by java!");Future<RecordMetadata> future = producer.send(record);future.get(3_000, TimeUnit.SECONDS);producer.close();}}

生产者1运行

  2024-07-12 11:53:11,542 INFO [org.apache.kafka.clients.producer.ProducerConfig] - ProducerConfig values: acks = 1batch.size = 16384bootstrap.servers = [h121.wzk.icu:9092]buffer.memory = 33554432client.dns.lookup = use_all_dns_ipsclient.id = producer-1compression.type = noneconnections.max.idle.ms = 540000delivery.timeout.ms = 120000enable.idempotence = falseinterceptor.classes = []internal.auto.downgrade.txn.commit = falsekey.serializer = class org.apache.kafka.common.serialization.IntegerSerializerlinger.ms = 0max.block.ms = 60000max.in.flight.requests.per.connection = 5max.request.size = 1048576

运行结果如下图:
在这里插入图片描述

生产者2测试

public class TestProducer02 {public static void main(String[] args) throws Exception {Map<String, Object> configs = new HashMap<>();configs.put("bootstrap.servers", "h121.wzk.icu:9092");configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");configs.put("acks", "1");KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test",0, 0,"hello world by java! CallBack test!");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {System.out.println("主题: " + recordMetadata.topic() + ", " +"分区: " + recordMetadata.partition() + ", " +"时间戳: " + recordMetadata.timestamp());} else {System.out.println("生产消息异常!!!");}}});producer.close();}}

运行之后,控制台输出:

2024-07-12 12:46:48,795 INFO [org.apache.kafka.common.utils.AppInfoParser] - Kafka version: 2.7.2
2024-07-12 12:46:48,795 INFO [org.apache.kafka.common.utils.AppInfoParser] - Kafka commitId: 37a1cc36bf4d76f3
2024-07-12 12:46:48,795 INFO [org.apache.kafka.common.utils.AppInfoParser] - Kafka startTimeMs: 1720759608792
2024-07-12 12:46:49,200 INFO [org.apache.kafka.clients.Metadata] - [Producer clientId=producer-1] Cluster ID: DGjwPmfLSk2OKosFFLZJpg
2024-07-12 12:46:49,209 INFO [org.apache.kafka.clients.producer.KafkaProducer] - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
主题: wzk_topic_test, 分区: 0, 时间戳: 1720759609201
2024-07-12 12:46:49,282 INFO [org.apache.kafka.common.metrics.Metrics] - Metrics scheduler closed
2024-07-12 12:46:49,282 INFO [org.apache.kafka.common.metrics.Metrics] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-07-12 12:46:49,282 INFO [org.apache.kafka.common.metrics.Metrics] - Metrics reporters closed
2024-07-12 12:46:49,283 INFO [org.apache.kafka.common.utils.AppInfoParser] - App info kafka.producer for producer-1 unregistered

运行的之后的控制台如下:
在这里插入图片描述

消费者01运行


public class TestConsumer01 {public static void main(String[] args) throws Exception {Map<String, Object> configs = new HashMap<>();configs.put("bootstrap.servers", "h121.wzk.icu:9092");configs.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");configs.put("group.id", "wzk-test");KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(configs);final List<String> topics = Arrays.asList("wzk_topic_test");consumer.subscribe(topics, new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> collection) {collection.forEach(item -> {System.out.println("剥夺的分区: " + item.partition());});}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> collection) {collection.forEach(item -> {System.out.println("接收的分区: " + item.partition());});}});final ConsumerRecords<Integer, String> records = consumer.poll(3_000);final Iterable<ConsumerRecord<Integer, String>> topic1Iterable = records.records("topic_1");topic1Iterable.forEach(record -> {System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));System.out.println("消息的key:" + record.key());System.out.println("消息的偏移量:" + record.offset());System.out.println("消息的分区号:" + record.partition());System.out.println("消息的序列化key字节数:" + record.serializedKeySize());System.out.println("消息的序列化value字节数:" + record.serializedValueSize());System.out.println("消息的时间戳:" + record.timestamp());System.out.println("消息的时间戳类型:" + record.timestampType());System.out.println("消息的主题:" + record.topic());System.out.println("消息的值:" + record.value());});consumer.close();}}

消费者01测试

2024-07-12 13:00:17,456 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] - [Consumer clientId=consumer-wzk-test-1, groupId=wzk-test] Adding newly assigned partitions: wzk_topic_test-0
接收的分区: 0
2024-07-12 13:00:17,480 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] - [Consumer clientId=consumer-wzk-test-1, groupId=wzk-test] Setting offset for partition wzk_topic_test-0 to the committed offset FetchPosition{offset=12, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[h121.wzk.icu:9092 (id: 0 rack: null)], epoch=0}}
消息头字段:[]
消息的key:0
消息的偏移量:12
消息的分区号:0
消息的序列化key字节数:4
消息的序列化value字节数:20
消息的时间戳:1720760404260
消息的时间戳类型:CreateTime
消息的主题:wzk_topic_test
消息的值:hello world by java!

控制台运行截图如下:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/50746.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

禁毒教育展厅应如何创新展示方式,提升教育意义?

为了深刻揭示毒品的危害&#xff0c;促进禁毒知识的广泛传播&#xff0c;并显著提升公众的防范意识&#xff0c;禁毒教育展厅的推广举措正紧锣密鼓地展开。在这一关键进程中&#xff0c;展厅的空间布局与内容设计的合理性与针对性成为了至关重要的环节。接下来&#xff0c;我们…

angular入门基础教程(二)第一个angular组件

ng中的语法跟vue中是一样的插值语法&#xff0c;其实也是早期vue抄的ng的思路&#xff0c;使用{{variable}}形式&#xff0c;vue借鉴了ng和react&#xff0c;这个我们就不多了。 新建一个子组件 在项目根目录下面&#xff0c;执行 ng g component ./components/UserList这样…

【RL】强化学习入门:从基础到应用

本篇文章是博主强化学习RL领域学习时&#xff0c;用于个人学习、研究或者欣赏使用&#xff0c;并基于博主对相关等领域的一些理解而记录的学习摘录和笔记&#xff0c;若有不当和侵权之处&#xff0c;指出后将会立即改正&#xff0c;还望谅解。文章强化学习&#xff1a; 强化学习…

JDBC介绍及使用

目录 JDBC概述 JDBC概念 JDBC本质 JDBC好处 JDBC快速入门 JDBC API详解 DriverManager Connection Statement ResultSet PreparedStatement 数据库连接池 数据库连接池简介 数据库连接池实现 Driud使用 JDBC练习 JDBC概述 JDBC概念 JDBC 就是使用Java语言操作…

Docker Compose V2 安装 ClickHouse v20.6.8.5 经验分享

前言 ClickHouse 是一款开源的分布式列式数据库管理系统,专门设计用于高性能的大数据分析和查询。 目前项目中用到的一个场景是将mongo的数据同步到clickhouse,使用clickhouse做报表,后续也将分享同步和使用方案 使用 Docker Compose 部署单机版,小项目和自己测试够用了,生…

海外短剧CPS系统,平台短剧出海推广方案

随着国内短剧市场的蓬勃发展与国际化趋势的加速&#xff0c;海外观众对于高质量、富有创意的短剧内容需求日益增长。在此背景下&#xff0c;搭建一个高效、便捷的海外短剧CPS&#xff08;Cost Per Sales&#xff0c;按销售分润&#xff09;分销系统平台&#xff0c;能为内容创作…

实战内测-某内测项目站点FUZZ到Sql注入

0x1 前言 下面给师傅们分享的案例呢是前段时间实战的一个站点&#xff0c;也是我朋友前段时间让我测的一个站点。整体的测试流程也还算ok&#xff0c;然后里面有些细节要是对师傅们有帮助可以收藏下&#xff0c;后面主要是利用FUZZ打了一个sql注入漏洞上去。 0x2 fuzz和sql结…

Halcon Blob分析

斑点分析的思路&#xff1a;在图像中&#xff0c;相关对象的像素可以通过其灰度值来识别。例如下图的组织颗粒。这些颗粒是凉的&#xff0c;而液体是暗的&#xff0c;通过选择明亮像素(阈值)&#xff0c;可以很容易地检测到颗粒。在需要应用中&#xff0c;这种简单的暗像素和亮…

HarmonyOS持久化存储数据Preference

Preference首选项 首选项&#xff1a;首选项为应用提供Key-Value键值型的数据处理能力&#xff0c;支持应用持久化轻量级数据&#xff0c;并对其修改和查询。数据存储形式为键值对&#xff0c;键的类型为字符串型&#xff0c;值的存储数据类型包括数字型、字符型、布尔型以及这…

【优秀python web设计】基于Python flask的猫眼电影可视化系统,可视化用echart,前端Layui,数据库用MySQL,包括爬虫

1 绪论 1.1 设计背景及目的 猫眼电影作为国内知名的电影信息网站&#xff0c;拥有海量的电影信息、票房数据和用户评价数据。这些数据对于电影市场的研究和分析具有重要意义。然而&#xff0c;由于数据的复杂性和数据来源的多样性&#xff0c;如何有效地采集、存储和展示这些数…

复现波恩大学的“LiDiff:基于扩散模型实现3D LiDAR场景补全!”(点云补全)项目

本文的主要工作就是复现下述论文中的算法。 该论文全称&#xff1a;Scaling Diffusion Models to Real-World 3D LiDAR Scene Completion 一、准备工作 首先通读readme.md文件的内容&#xff0c;了解所需要的相关依赖和数据等内容。 一定要多读几遍&#xff0c;不要扫一眼就…

[Linux安全运维] LAMP 环境搭建保姆级教学(Apache + MySQL + PHP) ~~

LAMP LAMP 是一种网站技术&#xff0c;可以实现动态的网站页面部署。 1. LAMP概述 1 .1构成 Linux: 简介: Linux 是一种开源的操作系统&#xff0c;以其稳定性和安全性而著称。在 LAMP 堆栈中&#xff0c;它作为服务器操作系统运行。作用: 为应用程序提供一个稳定、安全的运…

【linux】在多核CPU下,好像看到不同进程在不同CPU调度

在2353这行打印的情况来看&#xff0c;操作系统好像给不同的进程分配不同的CPU&#xff0c;从上图来看&#xff0c;同一个进程好像基本使用的相同的CPU&#xff1a; 其实摸索syscall文件系统操作&#xff0c;本意是想找到内核文件系统中文件的创建&#xff0c;写入&#xff0c;…

3DMAX神经网络插件Neuron使用方法详解

3DMAX神经网络插件Neuron使用方法 3DMAX神经网络插件Neuron&#xff0c;从一系列样条曲线创建具有分支结构的几何体。适用于如神经网络、血管、树枝等形状的3D建模。 【适用版本】 3dMax2016及更高&#xff08;不仅限于此范围&#xff09; 【安装方法】 Neuron插件无需安装&a…

windows 暂停更新

使用windows 系统的伙伴都深受其扰&#xff0c;动不动就要强制更新&#xff0c;并且无法长时间关闭更新。这里推荐一个工具来禁止更新。越来越多的工程师可能会逐渐放弃windows ,真的太冗杂了&#xff0c;linux 的桌面和命令行越来越好用。 下载地址 https://github.com/WereD…

Renesa Version Board开发RT-Thread 之I2C驱动应用(SHT20)

目录 概述 1 硬件接口介绍 1.1 Version Board上的I2C硬件接口 1.2 SHT20 1.2.1 SHT20简介 1.2.2 SHT-20模块电路 2 软件实现 2.1 软件版本信息 2.2 RT-Thread Studio创建项目 2.3 FSP配置I2C接口 2.4 使能Sensor驱动 3 RT-Thread驱动架构 3.1 接口函数 3.1.1 …

增量学习中Task incremental、Domain incremental、Class incremental 三种学习模式的概念及代表性数据集?

1 概念 在持续学习领域&#xff0c;Task incremental、Domain incremental、Class incremental 是三种主要的学习模式&#xff0c;它们分别关注不同类型的任务序列和数据分布变化。 1.1 Task Incremental Learning (Task-incremental) 任务增量学习&#xff0c;也称为任务增…

spring 中包自动扫描之 component-scan 解析

在 spring 中&#xff0c;为简化 bean 的配置&#xff0c;在 spring-context 模块下提供了包的自动扫描功能&#xff0c;将配置的包及其子包下的所有符合条件的类都注册到 BeanFactory 中。下面来看下具体是怎么实现的。 配置 <context:component-scan base-package"…

.NET 一款获取主流浏览器存储密码的工具

01阅读须知 此文所提供的信息只为网络安全人员对自己所负责的网站、服务器等&#xff08;包括但不限于&#xff09;进行检测或维护参考&#xff0c;未经授权请勿利用文章中的技术资料对任何计算机系统进行入侵操作。利用此文所提供的信息而造成的直接或间接后果和损失&#xf…

27.jdk源码阅读之ConcurrentLinkedDeque

1. 写在前面 ConcurrentLinkedDeque 是 Java 中一个高效、线程安全的双端队列&#xff08;Deque&#xff09;&#xff0c;使用无锁算法&#xff08;CAS 操作&#xff09;来保证线程安全性。由于其复杂的实现和广泛的应用场景&#xff0c;它常常成为面试中的重点考察对象。不知道…