「Kafka」入门篇

「Kafka」入门篇

基础架构

image-20231208182616047

image-20231208182131415

Kafka 快速入门

集群规划

image-20231227110547650

集群部署

官方下载地址:http://kafka.apache.org/downloads.html

  1. 解压安装包:

    [atguigu@hadoop102 software]$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/
    
  2. 修改解压后的文件名称:

    [atguigu@hadoop102 module]$ mv kafka_2.12-3.0.0/ kafka
    
  3. 进入到 /opt/module/kafka 目录,修改配置文件

    [atguigu@hadoop102 kafka]$ cd config/
    [atguigu@hadoop102 config]$ vim server.properties
    

    输入以下内容:

    # broker 的全局唯一编号,不能重复,只能是数字。
    broker.id=0
    # 处理网络请求的线程数量
    num.network.threads=3
    # 用来处理磁盘 IO 的线程数量
    num.io.threads=8
    # 发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    # 接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    # 请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    # kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
    log.dirs=/opt/module/kafka/datas
    # topic 在当前 broker 上的分区个数
    num.partitions=1
    # 用来恢复和清理 data 下数据的线程数量
    num.recovery.threads.per.data.dir=1
    # 每个 topic 创建时的副本数,默认时 1 个副本
    offsets.topic.replication.factor=1
    # segment 文件保留的最长时间,超时将被删除
    log.retention.hours=168
    # 每个 segment 文件的大小,默认最大 1G
    log.segment.bytes=1073741824
    # 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
    log.retention.check.interval.ms=300000
    # 配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
    zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
    
  4. 分发安装包

    [atguigu@hadoop102 module]$ xsync kafka/
    
  5. 分别在 hadoop103 和 hadoop104 上修改配置文件 /opt/module/kafka/config/server.properties 中的 broker.id=1broker.id=2

    注:broker.id 不得重复,整个集群中唯一。

    image-20231227111302720

  6. 配置环境变量

    • /etc/profile.d/my_env.sh 文件中增加 kafka 环境变量配置

      [atguigu@hadoop102 module]$ sudo vim /etc/profile.d/my_env.sh
      
    • 增加如下内容:

      #KAFKA_HOME
      export KAFKA_HOME=/opt/module/kafka
      export PATH=$PATH:$KAFKA_HOME/bin
      
    • 刷新一下环境变量:

      [atguigu@hadoop102 module]$ source /etc/profile
      
    • 分发环境变量文件到其他节点,并 source:

      [atguigu@hadoop102 module]$ sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh
      [atguigu@hadoop103 module]$ source /etc/profile
      [atguigu@hadoop104 module]$ source /etc/profile
      
  7. 启动集群

    • 先启动 Zookeeper 集群,然后启动 Kafka:

      [atguigu@hadoop102 kafka]$ zk.sh start 
      
    • 依次在 hadoop102、hadoop103、hadoop104 节点上启动 Kafka:

      [atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
      [atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
      [atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
      

      注意:配置文件的路径要能够到server.properties

  8. 关闭集群

    [atguigu@hadoop102 kafka]$ bin/kafka-server-stop.sh 
    [atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh 
    [atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh 
    

集群启停脚本

  1. /home/atguigu/bin 目录下创建文件 kf.sh 脚本文件

    [atguigu@hadoop102 bin]$ vim kf.sh
    

    脚本如下:

    #! /bin/bash
    case $1 in
    "start"){for i in hadoop102 hadoop103 hadoop104doecho " --------启动 $i Kafka-------"ssh  $i  "/opt/module/kafka/bin/kafka-server-start.sh  -daemon /opt/module/kafka/config/server.properties"done
    };;
    "stop"){for i in hadoop102 hadoop103 hadoop104doecho " --------停止 $i Kafka-------"ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "done
    };;
    esac
    
  2. 添加执行权限

    [atguigu@hadoop102 bin]$ chmod +x kf.sh
    
  3. 启动集群命令

    [atguigu@hadoop102 ~]$ kf.sh start
    
  4. 停止集群命令

    [atguigu@hadoop102 ~]$ kf.sh stop
    

image-20231227113030694

Kafka 命令行操作

image-20231227113215848

主题命令行操作
  1. 查看操作主题命令参数

    [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh
    

    image-20231227113437850

  2. 查看当前服务器中的所有 topic

    [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
    
  3. 创建 first topic

    [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
    
    选项说明:
    --topic:定义 topic 名
    --replication-factor:定义副本数
    --partitions:定义分区数
    
  4. 查看 first 主题的详情

    [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
    hadoop102:9092 --describe --topic first
    
  5. 修改分区数(注意:分区数只能增加,不能减少)

    [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
    
  6. 再次查看 first 主题的详情

    [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
    
  7. 删除 topic

    [atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first
    
生产者命令行操作
  1. 查看操作生产者命令参数

    [atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh
    

    image-20231227113828506

  2. 发送消息

    [atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
    >hello world
    >atguigu atguigu
    
消费者命令行操作
  1. 查看操作消费者命令参数

    [atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh
    

    image-20231227113934044

    image-20231227113944307

  2. 消费消息

    • 消费 first 主题中的数据:

      [atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
      
    • 把主题中所有的数据都读取出来(包括历史数据):

      [atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
      

笔记整理自b站尚硅谷视频教程:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/587290.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于HTTPS

目录 什么是加密 对称加密 非对称加密 中间人攻击 引入证书 HTTPS是一个应用层的协议,是在HTTP协议的基础上引入了一个加密层. HTTP协议内容都是按照文本的方式明文传输,这就导致在传输的过程中出现一些被篡改的情况. 运营商劫持事件 未被劫持的效果,点击下载按钮,就会…

Mybatis分页插件之PageHelper生效and失效原理解析

文章目录 前言整合PageHelperPageHelper生效原理PageHelper的分页参数和线程绑定核心拦截逻辑生成分页SQLdialect.afterAll() PageHelper失效原理分页失效案例分页失效原理总结 Mybatis拦截器系列文章:从零开始的 MyBatis 拦截器之旅:实战经验分享 构建自…

探索 3D 图形处理的奥秘

最近一年多来,在 3Dfx、Intel 们的狂轰滥炸中,在 Quake、古墓丽影们的推波助澜下,三维图形已经成为计算机迷眼中的又一个热点。3D 世界到底是怎样的神奇,我们又是怎样享受它的乐趣呢?就让我们来一探究竟吧。 图形基础…

K8s资源管理介绍

用这个官网下的,kube-flannel.yml ,就不会nodes not-ready --- kind: Namespace apiVersion: v1 metadata:name: kube-flannellabels:k8s-app: flannelpod-security.kubernetes.io/enforce: privileged --- kind: ClusterRole apiVersion: rbac.author…

递归详解之青蛙跳台阶和汉诺塔问题

𝙉𝙞𝙘𝙚!!👏🏻‧✧̣̥̇‧✦👏🏻‧✧̣̥̇‧✦ 👏🏻‧✧̣̥̇:Solitary-walk ⸝⋆ ━━━┓ - 个性标签 - :来于“云”的“羽球人”。…

idea利用JRebel插件,无需重启,实现Spring Boot项目热重载,节省开发时间和精力!

插件介绍 官方介绍 翻译过来的意思是: JRebel 是一款提高开发效率的工具,允许开发者立即重新加载代码更改。它跳过了在Java开发中常见的重新构建、重启和重新部署循环。JRebel 能够让开发者在相同的时间内完成更多工作,并且在编码时能够保持…

CSS与JavaScript的简单认识

CSS:是一门语言,用于控制网页表现,让页面更好看的。 CSS(Cascading Style Sheet):层叠样式表 CSS与html结合的三种方式: 1、内部样式:用style标签,在标签内部定义CSS样式…

vim学习笔记

vim学习笔记 Linux Vim编辑器的基本使用 显示行号 set nu 自动补全 CTRL-N或CTRL-P $到当前行的末尾 u 撤销上一步的操作 Ctrlr 恢复上一步被撤销的操作 vim下配置tab缩进格数 原始文件&#xff1a; helloworld nice 普通缩进 shift > &#xff08;或者 Shift <…

springcloud微服务篇--6.网关Gateway

一、为什么需要网关&#xff1f; 网关功能&#xff1a; 身份认证和权限校验 服务路由、负载均衡 请求限流 在SpringCloud中网关的实现包括两种&#xff1a; gateway zuul Zuul是基于Servlet的实现&#xff0c;属于阻塞式编程。而SpringCloudGateway则是基于Spring5中提供的Web…

【SpringCloud笔记】(12)分布式请求链路跟踪之Sleuth

Sleuth 背景 在微服务框架中&#xff0c;一个由客户端发起的请求在后端系统中会经过多个不同的的服务节点调用来协同产生最后的请求结果&#xff0c;每一个前段请求都会形成一条复杂的分布式服务调用链路&#xff0c;链路中的任何一环出现高延时或错误都会引起整个请求最后的…

WeNet语音识别分词制作词云图

在线体验 ,点击识别语音需要等待一会&#xff0c;文件太大缓存会报错 介绍 本篇博客将介绍如何使用 Streamlit、jieba、wenet 和其他 Python 库&#xff0c;结合语音识别&#xff08;WeNet&#xff09;和词云生成&#xff0c;构建一个功能丰富的应用程序。我们将深入了解代码…

第2课 使用FFmpeg读取rtmp流并用openCV显示视频

本课对应源文件下载链接&#xff1a; https://download.csdn.net/download/XiBuQiuChong/88680079 这节课我们开始利用ffmpeg和opencv来实现一个rtmp播放器。播放器的最基本功能其实就两个:显示画面和播放声音。在实现这两个功能前&#xff0c;我们需要先用ffmpeg连接到rtmp服…

LVS负载均衡配置虚拟引起微服务注册混乱

线上小程序突然报错&#xff0c;查看网关日志&#xff0c;访问下游微服务A时大量报错&#xff1a; 1&#xff09;检查微服务是否未注册。登录eureka页面&#xff0c;发现三个节点均正常注册 三个微服务节点地址分别为&#xff1a;13.9.1.91:8080&#xff0c;13.9.1.92:8080和1…

ARM CCA机密计算软件架构之软件堆栈概述

Arm CCA平台通过硬件添加和固件组件的混合方式实现,例如在处理元素(PEs)中的RME以及特定的固件组件,特别是监视器和领域管理监视器。本节介绍Arm CCA平台的软件堆栈。 软件堆栈概述 领域VM的执行旨在与Normal world(正常世界)隔离,领域VM由Normal world Host(正常世界…

【软件工程】融通未来的工艺:深度解析统一过程在软件开发中的角色

&#x1f34e;个人博客&#xff1a;个人主页 &#x1f3c6;个人专栏&#xff1a; 软件工程 ⛳️ 功不唐捐&#xff0c;玉汝于成 目录 前言&#xff1a; 正文 统一过程&#xff08;Unified Process&#xff09; 介绍和解释&#xff1a; 应用&#xff1a; 优缺点&#xf…

C/C++ 函数的默认参数

下面介绍一项新内容 - 默认参数。 默认参数指的是当函数调用中省略了实参时自动使用的一个值。 例如&#xff0c;如果将 void wow (int n)设置成n 有默认值为1&#xff0c;则函数调用 wow()相当于 wow(1)这极大地提高了使用函数的灵活性。 假设有一个名为left()的函数&#xff…

SpringIOC之ApplicationObjectSupport

博主介绍&#xff1a;✌全网粉丝5W&#xff0c;全栈开发工程师&#xff0c;从事多年软件开发&#xff0c;在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战&#xff0c;博主也曾写过优秀论文&#xff0c;查重率极低&#xff0c;在这方面有丰富的经验…

<JavaEE> TCP 的通信机制(一) -- 确认应答 和 超时重传

目录 TCP的通信机制的核心特性 一、确认应答 1&#xff09;什么是确认应答&#xff1f; 2&#xff09;如何“确认”&#xff1f; 3&#xff09;如何“应答”&#xff1f; 二、超时重传 1&#xff09;丢包的概念 2&#xff09;什么是超时重传&#xff1f; 3&#xff09…

详解信道容量,信道速率,安全速率的区别

目录 一. 信道容量与信道速率 二. 小结 三. 安全速率与物理层安全 3.1 香农物理层安全模型 3.2 安全信道速率 四. 补充安全中断概率&#xff08;Secrecy Outage Probability, SOP&#xff09; 五. 补充安全分集度&#xff08;Secrecy Diversity Order, SDO&#xff09; …

AAAI 2024 | 用逆向思维图(ReX-GoT)进行多选对话常识推理

©PaperWeekly 原创 作者 | 郑理 单位 | 武汉大学硕士生 研究方向 | 自然语言处理 论文题目&#xff1a; Reverse Multi-Choice Dialogue Commonsense Inference with Graph-of-Thought 论文作者&#xff1a; 郑理&#xff0c;费豪&#xff0c;李霏&#xff0c;李波波&am…