kafka入门03——简单实战

目录

安装Java

安装Zookeeper

安装Kafka

生产与消费



主要是记录下Kafka的安装配置过程,前置条件需要安装jdk和zookeeper。

安装Java

1.Oracle官网下载对应jdk安装包

  • 官网地址:Java Downloads | Oracle

  • 好人分享了下载需要的oracle账号,传送门Oracle账号分享_oracle 账户分享-CSDN博客

2.将本地压缩包上传到虚拟机自定义路径,路径看诸君的习惯,敝人使的/usr/local/java

使用SSH远程连接工具FinalShell上传jdk压缩包(上传文件也看诸君喜好的SSH连接工具),FinalShell安装下载:【安装教程】SSH远程连接工具-FinalShell的安装_finalshell安装_Summer_may的博客-CSDN博客

3.解压缩,在压缩包路径下输入: tar -zxvf 上传的jdk压缩包名

tar -zvxf jdk-8u391-linux-x64.tar.gz

4.编辑环境变量,打开配置文件 vim /etc/profile 或者 vi /etc/profile。在文件最后添加:

export JAVA_HOME=/usr/local/java/jdk1.8.0_391export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jarexport PATH=$PATH:$JAVA_HOME/bin
  • JAVA_HOME配置成自己的路径

  • 按i进行编辑,完成后按esc退出编辑模式,“shift+:”输入“wq!”保存并退出

5.刷新全局配置使生效

cd / #退回到根目录. /etc/profile #环境变量配置刷新

最后检查java版本

java -version

安装Zookeeper

1.java确认安装过了,这里直接开始安装zookeeper。

多一句,自zk3.5.5版本以后,已编译的jar包,尾部有bin,应该使用的是apache-zookeeper-3.8.3-bin.tar.gz。避免报错:“找不到或无法加载主类 org.apache.zookeeper.server.quorum.QuorumPeerMain”。(图片来自网络引用)

官网地址:Apache Download Mirrors

2.上传压缩包并解压,参照上文中FinalShell的方式。

cd /usr/local/zookeeper/
tar -zvxf apache-zookeeper-3.8.3-bin.tar.gz 

3.重命名zoo_sample.cfg为zoo.cfg

# cp conf/zoo_sample.cfg conf/zoo.cfg

zookeeper的server启动脚本使用的配置文件名称是zoo.cfg,不改名会报错

4.进入bin目录,使用zkServer.sh启动zookeeper,如果报错“-bash: zkServer.sh: command not found”就使用“./zkServer.sh start”

# zkServer.sh start

5.查看启动状态

# zkServer.sh status

6.使用Cli验证

# zkCli.sh

简单操作:

1.创建节点 create /zkTest myData

2.查看节点 get /zkTest 或者 get -s /zkTest

3.删除节点 delete /zkTest

4.退出cli客户端 quit

安装Kafka

1.官网下载压缩包到本地。官网地址:https://kafka.apache.org/downloads

2.根据上文一样,使用远程连接工具FinalShell上传压缩包进行解压。

tar -zxvf  kafka_2.12-3.6.0.tgz

3.确认Kafka相关配置(kafka依赖Zookeeper先启动Zookeeper服务)

# cd /usr/local/kafka/kafka_2.12-3.5.0/config/
# vi server.properties 

4.进入bin目录使用kafka-server-start.sh通过config配置启动Zookeeper服务

[root@10 bin]# ./kafka-server-start.sh ../config/server.properties 

5.重新打开一个SSH连接,通过jps命令查看kafka启动状态

[root@10 kafka_2.12-3.5.0]# jps -l

生产与消费

1.进入kafka的bin目录,先来创建个topic验证下

[root@10 bin]# ./kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-test --replication-factor 3 --partitions 4

报错:Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option

原因是:Kafka 版本过高,命令不存在

修改命令:

[root@10 bin]# ./kafka-topics.sh --bootstrap-server localhost:9092  --create --topic topic-test --replication-factor 3 --partitions 4

还是报错: ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.

原因是:zookeeper使用的单机部署,只有一个broker。创建topic的命令中分区是4,副本是3,超出了数量限制。

修改命令:

[root@10 bin]# ./kafka-topics.sh --bootstrap-server localhost:9092  --create --topic topic-test --replication-factor 1 --partitions 4

创建topic成功

2.展示topic信息:

[root@10 bin]# ./kafka-topics.sh --bootstrap-server localhost:9092  --describe --topic topic-test

3.进入kafka的bin目录使用自带的kafka-console-consumer.sh脚本订阅主题topic-test。

[root@10 bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-test

这个时候topic-test没有存入任何消息,所以脚本还不能消费任何消息。

4.打开一个新的SSH连接,进入kafka的bin目录使用自带的kafka-console-producer.sh脚本发送消息到主题topic-test。

[root@10 bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic topic-test

发送“Hello kafka,”"Hello,test"

consumer连接窗口就能消费消息了。

到此,简单的kafka实战案例就结束了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/117252.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

WPS中图的自动编号及引用

WPS中图的自动编号及引用 图的自动编号图编号的引用图编号及引用的更新 图的自动编号 将光标放置在需要插入编号的位置点击“引用”→“题注”: 点击“引用”→“题注”: 点击“编号”,设置图的编号格式,可勾选“包含章节编号”&…

关于rc.local 自启动多个应用问题

参考:关于rc.local 自启动多个应用问题_rc.local启动多个服务-CSDN博客 Linux开机rc.local不自启动执行脚本问题的排查思路及问题解决_rc.local文件启动不生效-CSDN博客 Ask GPT 如果第一个命令 sudo pppd call dial 不返回并且一直在运行,而你需要等待…

Plex Media Server for Mac: 打造您的专属媒体库

在数字媒体时代,我们越来越依赖各种媒体应用来丰富我们的生活。其中,Plex Media Server for Mac以其高效、稳定和多功能性,逐渐成为了Mac用户们的首选。今天,我们就来深入探讨这款个人媒体软件的优势和应用场景。 Plex Media Serv…

【数据库】组合查询 UNION

组合查询 概述组合查询UNIONUNION ALLINTERSECTEXCEPT 概述 组合查询允许将两个或多个查询的结果合并成一个单一的结果集。组合查询分类包括 UNION、UNION ALL、INTERSECT 和 EXCEPT 来合并查询结果。下述不同的组合查询; 下述示例中将使用的表:Illino…

Java面试题-Java核心基础-第十二天(SPI机制)

目录 一、什么是SPI机制 二、SPI机制的作用 三、SPI的一些应用 四、 例子 一、什么是SPI机制 SPI因为service provider interface 意为:服务提供者的接口 就是为服务提供者提供的接口,就是设计一套接口规范,然后不同的服务提供者去进行相…

防止消息丢失与消息重复——Kafka可靠性分析及优化实践

系列文章目录 上手第一关,手把手教你安装kafka与可视化工具kafka-eagle Kafka是什么,以及如何使用SpringBoot对接Kafka 架构必备能力——kafka的选型对比及应用场景 Kafka存取原理与实现分析,打破面试难关 防止消息丢失与消息重复——Kafka可…

Android 12.0 根据app包名授予app监听系统通知权限

1.概述 在12.0的系统rom产品定制化开发中,在一些产品核心开发中,第三方app需要开启系统通知权限,然后可以在app中,监听系统所有通知,来做个通知中心的功能,所以需要授权 获取系统通知的权限,然后来顺利的监听系统通知。来做系统通知的功能,首选分析下相关授权通知的功…

Netty实战-实现自己的通讯框架

通信框架功能设计 功能描述 通信框架承载了业务内部各模块之间的消息交互和服务调用,它的主要功能如下: 基于 Netty 的 NIO 通信框架,提供高性能的异步通信能力;提供消息的编解码框架,可以实现 POJO 的序列化和反序…

小程序 wxml2canvas开发文档

wxml: <view class"share__canvas share__canvas1"><view class"share__canvas1-text draw_canvas" data-type"text" data-text"这是一段无边距文字">这是一段无边距文字</view> </view> <canvas canvas-…

BurpSuite安装

下载 BurpSuite 下载 Java17 下载后确定版本 java -version获取启动器 密钥生成器 破解 将下载的 BurpSuite、启动器、密钥生成器&#xff0c;放入同一个目录 打开 CMD 进入该目录 启动密钥生成器 java -jar burp-keygen-scz.jar开启新的CMD&#xff0c;进入该目录 启动…

paddlepaddle使用实践过程中的问题记录

环境背景 python&#xff1a;3.10.11 系统&#xff1a;macOS Big Sur 11.7.10 cpu&#xff1a;Intel Core i7 2.6GHz 内存&#xff1a;16G paddle版本问题 初始安装的是的MacOS cpu 2.5.1版本&#xff0c;在python解释器中执行import paddle时报错&#xff1a; ImportError:…

SpringCache配置Redis有效解决缓存击穿和缓存雪崩问题

初始代码 作者参考的一篇CSDN的配置函数代码&#xff0c;实在不好意思&#xff0c;作者忘记是哪位博主了&#xff1a; /*** 设置CacheManager缓存规则* param factory* return*/Beanpublic CacheManager cacheManager(RedisConnectionFactory factory) {RedisSerializer<St…

Spring Boot - 启动主要流程

Spring Boot的启动主要流程可以概括为以下几个步骤&#xff1a; 加载Spring Boot配置&#xff1a; Spring Boot应用的启动从加载配置开始。Spring Boot会读取application.properties或application.yml等配置文件&#xff0c;将配置信息加载到Spring的Environment中&#xff0c;…

IOS屏幕旋转监听

IOS屏幕旋转 1.设计窗口,添加三个按钮 2.添加事件连接 3.按钮点击事件实现 先添加三个IBAction 实现IBAction 使用旋转立刻生效 -(IBAction)btnFixPortrait:(id)sender{//访问应用程序委托成员_app.mask UIInterfaceOrientationMaskPortrait;//设置窗口旋转属性[self setN…

企业安全—SDL概述篇

0x00 前言 众所周知&#xff0c;从源头开始就开发安全的代码&#xff0c;比产品已经成型之后付出的代价要小很多&#xff0c;也就是一直在说的安全左移的概念。最好就是从一开始&#xff0c;大家就用最安全的代码&#xff0c;或者是框架&#xff0c;那么开发出来的产品必然会减…

git rebase -i 详解

git rebase 命令简介 git rebase命令允许我们轻松地更改一系列提交&#xff0c;修改存储库的历史记录。我们可以重新排序、编辑或合并提交。一般常用git rebase来合并当前分支的多个commit记录&#xff08;压缩&#xff09;以及避免出现分支的交叉合并&#xff08;变基&#x…

ChatGPT课件汇总介绍

第二节:有效管理 Token,充分发挥 ChatGPT 的能力 OpenAI 官方计算token的测试地址:https://platform.openai.com/tokenizer 第三节:探索ChatGPT在不同领域的创新应用 1、小说撰写 1、我希望你能作为一个小说家。我会给你一个主题,请写出有创意的、吸引人的故事,能够长时…

MongoDB 学习笔记(基础)

概论 出现背景&#xff1a;MongoDB 是文档型数据库&#xff0c;由于传统的关系型数据库&#xff08;如 MySQL&#xff09;&#xff0c;在数据操作的“三高”需求以及应对 web 的网站需求面前显得有些吃力&#xff0c;在此环境下 MongoDB 出世了 三高需求&#xff1a; (1) 对数…

进程的优先级与LAMP项目部署实战

一、进程的优先级&#xff08;扩展&#xff09; 1、什么是进程的优先级 Linux是一个多用户、多任务的操作系统&#xff0c;系统中通常运行着非常多的进程。哪些进程先运行&#xff0c;哪些进程后运行&#xff0c;就由进程优先级来控制 思考&#xff1a;什么时候需要用到进程…

力扣-python-两数相加

题解 1: # Definition for singly-linked list. # class ListNode(object): # def __init__(self, val0, nextNone): # self.val val # self.next nextclass Solution(object):def addTwoNumbers(self, l1, l2):""":type l1: ListNode:t…