【数据采集】实验07-Kafka的常用命令及使用

文章目录

  • 一、实验目的
  • 二、实验准备
  • 三、实验内容
      • 1. 运行Zookeeper
      • 2. 运行kafka
      • 3. 创建topics
      • 4. Kafka与MySQL的组合使用,把JSON格式数据放入Kafka发送出去,再从Kafka中获取并写入到MySQL数据库,p97
      • 5. Kafka与MySQL的组合使用,把MySQL数据库数据取出,转化为JSON格式,放入Kafka发送出去,再从Kafka中获取,p100
      • 6. 手动提交偏移量,把JSON格式数据放入Kafka发送出去,p100
      • 7. 消费订阅分区,p100
      • 8. 关闭kafka
      • 9. 关闭Zookeeper
  • 四、实验小结

一、实验目的

  1. 掌握Zookeeper的配置和使用。
  2. 掌握Kafka的配置和使用。
  3. 掌握Kafka的基本的操作命令。

二、实验准备

  1. Zookeeper
  2. Kafka

三、实验内容

1. 运行Zookeeper

打开cmd然后执行zkserver 命令

2. 运行kafka

打开命令提示窗口(win+R,输入cmd,进入),进入E:\kafka\kafka37文件内输入并执行以下命令打开kafka

.\bin\windows\kafka-server-start.bat .\config\server.properties

3. 创建topics

打开命令提示窗口,进入E:\kafka\kafka37\bin\windows文件内
创建topics,可以把topic理解为文件夹,partition为topic下面的子文件夹,log在partition下,而消息保存在log中

kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic test1

查看topics,这里查看一下是否创建成功

kafka-topics --list --bootstrap-server localhost:9092

查看 topic 的详细信息,partition 数量和副本数量等

kafka-topics --bootstrap-server localhost:9092  --describe --topic test1

修改当前topic分区(只能增加分区,如果减少会报错)

kafka-topics.bat --bootstrap-server localhost:9092 --alter --topic test1 --partitions 4

查看修改后的分区

kafka-topics --bootstrap-server localhost:9092  --describe --topic test1

查看消息组消息队列

kafka-consumer-groups.bat --describe --bootstrap-server localhost:9092 --all-groups

4. Kafka与MySQL的组合使用,把JSON格式数据放入Kafka发送出去,再从Kafka中获取并写入到MySQL数据库,p97

5. Kafka与MySQL的组合使用,把MySQL数据库数据取出,转化为JSON格式,放入Kafka发送出去,再从Kafka中获取,p100

目标:读取student表的数据内容,将其转为JSON格式,发送给Kafka
向student表中插入两条记录的SQL语句如下:

insert into student values(95002,’Fopn’,’M’,22);
insert into student values(95003,’Tom’,’M’,23);

编写一个生产者程序mysql_producer.py

from kafka import KafkaProducer
import json
import pymysql.cursorsproducer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v:json.dumps(v).encode('utf-8'))connect=pymysql.Connect(host='localhost',port=3306,user='root',passwd='123456',db='test',charset='utf8')
cursor=connect.cursor()
sql="select sno,sname,ssex,sage from student;"
cursor.execute(sql)
data = cursor.fetchall()
connect.commit()for msg in data:res={}res['sno']=msg[0]res['name']=msg[1]res['sex']=msg[2]res['age']=msg[3]producer.send('mysql_topic', res)connect.close()
producer.close()

再从Kafka中获取到JSON格式数据,打印出来;
编写一个消费者程序mysql_consumer.py

from kafka import KafkaConsumer
import json
import pymysql.cursorsconsumer = KafkaConsumer('mysql_topic',bootstrap_servers=['localhost:9092'],group_id=None,auto_offset_reset='earliest')
for msg in consumer:msg1=str(msg.value,encoding="utf-8")data=json.loads(msg1)print(data)

6. 手动提交偏移量,把JSON格式数据放入Kafka发送出去,p100

data.json文件

[{"sno": "95001", "name": "John1", "sex": "M", "age": 23},
{"sno": "95002", "name": "John2", "sex": "M", "age": 23},
{"sno": "95003", "name": "John3", "sex": "M", "age": 23}]

根据上面给出的data.json文件,执行如下操作:
编写生产者程序,将json文件数据发送给Kafka;
编写一个生产者程序commit_producer.py

from kafka import KafkaProducer
import json # 引入模块
# 打开一个json文件
data = open("./data.json")
# 转换为python对象
strJson = json.load(data)
producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v:json.dumps(v).encode('utf-8'))
producer.send('json_topic', strJson)
producer.close()
编写消费者程序,读取前面发送的数据,并手动提交偏移量;
编写一个消费者程序commit_consumer.py
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
import jsonclass Consumer():def __init__(self):self.server = 'localhost:9092'self.topic = 'json_topic'self.consumer = Noneself.tp = Noneself.consumer_timeout_ms = 5000self.group_id = 'test1'def get_connect(self):self.consumer = KafkaConsumer('json_topic',group_id=self.group_id,auto_offset_reset='earliest',bootstrap_servers=self.server,enable_auto_commit=False,consumer_timeout_ms=self.consumer_timeout_ms)def beginConsumer(self):now_offset = 0while True:for message in self.consumer:now_offset = message.offsetdata = message.value.decode('utf-8')data = json.loads(data)print(data)self.consumer.commit()consumer.close()c = Consumer()
c.get_connect()
c.beginConsumer()

7. 消费订阅分区,p100

手动创建主题“assign_topic”,分区数量2

kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic assign_topic

(1)编写生产者程序,以通用唯一标识符UUID作为消息,发送给主题assign_topic;
编写一个生产者程序assign_producer.py:

from kafka import KafkaProducer,TopicPartition
import time
import uuidproducer = KafkaProducer(bootstrap_servers='localhost:9092')
display_interval = 5print('Producing messages to topic assign_topic. Press Ctrl-C to interrupt.')
display_iteration = 0
message_count = 0
start_time = time.time()
while True:identifier = str(uuid.uuid4())  producer.send('assign_topic', identifier.encode('utf-8'))  message_count += 1now = time.time()if now - start_time > display_interval:print('No.%i iter %i messages produced at %.0f messages / second' % (display_iteration,message_count,message_count / (now - start_time)))display_iteration += 1message_count = 0start_time = time.time()

(2)编写消费者程序1,订阅主题的分区0,只消费分区0数据;
编写第一个消费者程序assgin_consumer1.py:

from kafka import KafkaConsumer,TopicPartition
import time
import uuiddisplay_interval = 5consumer1 = KafkaConsumer(bootstrap_servers='localhost:9092',auto_offset_reset='earliest')
consumer1.assign([TopicPartition('assign_topic', 0)])
print('Consuming messagse from topic assign_topic. Press Ctrl-C to interrupt.')
display_iteration = 0
message_count = 0
partitions = set()
start_time = time.time()
while True:message = next(consumer1)identifier = str(message.value,encoding="utf-8")message_count += 1partitions.add(message.partition)now = time.time()if now - start_time > display_interval:print('No.%i  %i messages consumed at %.0f messages / second - from partitions %r' % (display_iteration,message_count,message_count / (now - start_time),sorted(partitions)))display_iteration += 1message_count = 0partitions = set()start_time = time.time()

(3)编写消费者程序2,订阅主题的分区1,只消费分区1数据;
编写第二个消费者程序assgin_consumer2.py:

from kafka import KafkaConsumer,TopicPartition
import time
import uuiddisplay_interval = 5consumer2 = KafkaConsumer(bootstrap_servers='localhost:9092',auto_offset_reset='earliest')
consumer2.assign([TopicPartition('assign_topic', 1)])
print('Consuming messagse from topic assign_topic. Press Ctrl-C to interrupt.')
display_iteration = 0
message_count = 0
partitions = set()
start_time = time.time()
while True:message = next(consumer2)identifier = str(message.value,encoding="utf-8")message_count += 1partitions.add(message.partition)now = time.time()if now - start_time > display_interval:print('No.%i  %i messages consumed at %.0f messages / second - from partitions %r' % (display_iteration,message_count,message_count / (now - start_time),sorted(partitions)))display_iteration += 1message_count = 0partitions = set()start_time = time.time()

8. 关闭kafka

.\bin\windows\kafka-server-stop.bat

9. 关闭Zookeeper

.\bin\windows\zookeeper-server-stop.bat

注意:在运行实验期间,1,2打开的窗口不要关闭

四、实验小结

Kafka在大数据生态系统中的作用?

  • 消息中间件:Kafka作为一个分布式流处理平台,其核心作用是作为数据管道,实现实时数据传输。它可以接收从各种数据源产生的大量事件流数据,并将其高效地分发给下游的数据消费者,如实时处理引擎、搜索引擎索引、数据仓库、报表系统等。
  • 缓冲区:Kafka可以作为临时存储层,允许系统在处理能力与数据生成速度不匹配时起到缓冲作用,减轻系统的压力。
  • 解耦:在大数据处理过程中,Kafka使数据生产者与消费者之间解耦,使得数据生产不受消费速率的影响,同时支持多种消费者类型对同一流数据进行并行处理。
  • 持久化和容错:Kafka提供高持久性保障,即使在故障情况下也能保留数据,这对于需要处理不可丢失的实时事件流的应用至关重要。
  • 流处理和实时分析:结合Spark、Flink等流处理框架,Kafka能够支撑实时数据流处理和复杂事件处理。
    2、Kafka总体架构中各个组件的功能有?
  • Producer(生产者):负责发布消息至Kafka的一个或多个Topic,可以选择将消息发送到特定的分区。
    • Broker(代理):Kafka服务器实例,构成了集群,它们保存和复制消息,处理来自生产者的消息发布请求和消费者的订阅请求。
    • Topic(主题):消息分类的逻辑概念,类似于数据库表,消息按照主题进行归类。
    • Partition(分区):Topic内部进一步细分,每个分区是一个有序且不可变的消息序列,有助于水平扩展和并行处理。
    • Consumer(消费者):从Kafka订阅并消费消息的客户端,属于某个消费者组,共同消费一个主题的所有分区,实现负载均衡。
    • Consumer Group(消费者组):一组具有相同组ID的消费者集合,确保消息在组内公平分配和仅一次消费。
    • ZooKeeper(在较早版本中):提供集群协调服务,管理broker的元数据和消费者组状态,但在较新的Kafka版本中,已逐渐移除对ZooKeeper的依赖,改为使用KRaft协议实现自我管理和协调。
      3、Kafka的主要应用场景?
  • 日志收集:聚合不同系统的日志事件,便于集中分析和监控。
    • 用户行为追踪:记录用户在网站或应用程序上的行为轨迹,用于实时监控、推荐系统或异常检测。
    • 流式处理:作为实时流数据处理管道的一部分,配合Storm、Spark Streaming等工具进行实时数据分析和计算。
    • 消息传递:在微服务架构中作为异步通信机制,实现服务之间的解耦和异步操作。
    • 数据集成:通过Kafka Connect API连接不同的数据源和目标系统,实现实时ETL(抽取、转换、加载)过程。
    • 事件驱动架构:触发基于事件的响应和服务,例如订单状态变更通知、库存更新同步等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/22602.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在视频号上面怎么卖货?只需要开一个店铺即可!具体怎么操作?

大家好,我是电商小V 最近这段时间在视频号上面卖货的热度可以说是非常大的,也是创业者常常提起的话题,大家之所以对视频号卖货非常感兴趣那是因为视频号和抖音处于一个赛道,也是朝着电商的方向发展, 所以说大家对于腾讯…

mongo篇---mongoDB Compass连接数据库

mongo篇—mongoDB Compass连接数据库 mongoDB笔记 – 第一条 一、mongoDB Compass连接远程数据库,配置URL。 URL: mongodb://username:passwordhost:port点击connect即可。 注意:host最好使用名称,防止出错连接超时。

cmake使用(01)

顶层CMakeLists.txt cmake_minimum_required (VERSION 3.5)# 配置 交叉编译 放置在 project() 命令之前# /opt/fslc-wayland/2.5.2/sysroots/aarch64-fslc-linux/usr/bin/make: error # while loading shared libraries: libdl.so.2: cannot open shared object file: # No su…

c++ 模板类的泛化版本,只声明,不定义,可以么?可以

王建伟老师的课本,413 页讲解过这个情况。只要不创建泛化模板的对象即可。最近看源码时,也出现了这种情况,大师们也这么写。故简化逻辑,测试一下: 谢谢

家庭海外仓系统:做好标准化管理,小空间也能做出高收益

家庭海外仓凭借其运营模式灵活,合作成本低的独有特点,还是被很多跨境卖家所接受的。不过家庭海外仓的盈利也面临着一些问题。 首先,家庭海外仓的仓储空间有限,很难通过规模效应放大收益。家庭海外仓通常只能存储少量货物&#xf…

个人参与场外期权交易的最全指南

个人参与场外期权交易的最全指南 一、引言 场外期权作为金融市场中的一大亮点,为个人投资者提供了多样化的风险管理及投资策略选择。本文将详细探讨个人如何安全、有效地参与场外期权交易。 文章来源/:财智财经 二、理解场外期权 场外期权是双方通过协…

ISO 26262认证与ASPICE认证是汽车电子软件的双重保障

为了确保汽车电子软件的高品质与可靠性,ISO 26262与ASPICE两大认证标准应运而生,它们共同为汽车电子软件的开发提供了双重保障。 (要明确的是:在ASPICE行业中专业来说,ASPICE项目是没有认证,而只有评估。不…

fastjson反序列化漏洞复现

靶机IP:192.168.253.134 攻击机IP:192.168.142.44 1、靶机环境搭建 靶机:http://caiyun.feixin.10086.cn/dl/095CteuquNKVq 提取密码:NNPD RCE:http://caiyun.feixin.10086.cn/dl/095CuIsJQOw14 提取密码:J2vd 靶机账号密码&…

OBS+nginx+nginx-http-flv-module实现阿里云的推流和拉流

背景:需要将球机视频推送到阿里云nginx,使用网页和移动端进行播放,以前视频格式为RTMP,但是在网页上面播放RTMP格式需要安装flash插件,chrome浏览器不给安装,调研后发现可以使用nginx的模块nginx-http-flv-…

工业物联网网关助推企业数字化转型,解决设备互联互通问题-天拓四方

随着科技的飞速发展,物联网技术已经渗透到工业领域的每一个角落。作为连接物理世界和数字世界的桥梁,工业物联网网关在推动企业数字化转型中发挥着至关重要的作用。数字化转型已经成为企业提升竞争力的必由之路,然而,在转型过程中…

升级最新版openssh-9.7p1及openssl-1.1.1h详细步骤及常见问题总结

近期因为openssh相继被漏洞扫描工具扫出存在漏洞,所以考虑升级操作系统中的openssh和openssl为最新版本,来避免漏洞风险。期间的升级过程及遇到的疑难问题,特此记录下来,供有需要的人参考。 本次目标是升级 openssh 为 9.7p1 版本…

计算机网络ppt和课后题总结(上)

试在下列条件下比较电路交换和分组交换。要传送的报文共 x(bit)。从源点到终点共经过 k 段链路,每段链路的传播时延为 d(s),数据率为 b(b/s)。在电路交换时电路的建立时间为 s(s)。在分组交换时分组长度为 p(bit),且各结点的排队等待时间可忽…

深入理解 Vue Router 及其 `router` 和 `route` 变量

深入理解 Vue Router 及其 router 和 route 变量 在使用 Vue.js 进行单页面应用开发时,Vue Router 是一个不可或缺的工具。它使得我们可以轻松地管理应用中的路由,提供了流畅的用户体验。然而,在实际开发中,许多开发者可能会混淆…

【Bug】httpClient循环调用除首次外会报Forbidden postman上用同样的参数可以

文章目录 问题问题代码原因解决处理Bug的具体步骤 问题 httpClient循环调用除首次外会报Forbidden postman上用同样的参数可以 问题代码 出问题的是一个外部系统的登录接口的调用 var response _httpClient.PostAsync($"/prod-api/openauth/login", content).Res…

Object.defineProperty 和 Proxy 响应式原理 vue2 vue3

一、VUE2 响应式原理 Object.defineProperty Object.defineProperty 方法允许精确添加一个属性到对象上,或者修改对象的现有属性,并返回这个对象。它可以用来定义或修改属性的特性,如 value, writable, enumerable, 和 configurable。 1) …

YOLOv8+PyQt5苹果叶病害检测(可以重新训练,yolov8模型,从图像、视频和摄像头三种路径识别检测)

效果视频:YOLOv8PyQt5苹果叶病害检测系统完整资源集合 资源包含可视化的苹果叶病害检测系统,基于最新的YOLOv8训练的苹果叶病害检测模型,和基于PyQt5制作的可视苹果叶病害系统,包含登陆页面和检测页面,该系统可自动检…

操作符:->

在一个指针变量指向一个结构体时常常会用->操作符来使用结构体内部的成员&#xff0c; 下面是我们没有使用指针时&#xff0c;如何调用结构体内的成员&#xff0c; #include<stdio.h>struct stu {char name[20];int age;char number[20]; };int main() {struct stu …

python实现——分类类型数据挖掘任务(图形识别分类任务)

分类类型数据挖掘任务 基于卷积神经网络&#xff08;CNN&#xff09;的岩石图像分类。有一岩石图片数据集&#xff0c;共300张岩石图片&#xff0c;图片尺寸224x224。岩石种类有砾岩&#xff08;Conglomerate&#xff09;、安山岩&#xff08;Andesite&#xff09;、花岗岩&am…

学会这14大招,30天涨粉两三千没问题!沈阳新媒体运营培训

很多小白在刚转入公司做新媒体时&#xff0c;基本都是从帮助公司运营账号开始的。但不同于个人号&#xff0c;一个企业本身是没有ip属性的&#xff0c;它的风格、调性等&#xff0c;都需要通过你的运营&#xff0c;让它变成一个活灵活现的、赋予独立个性人设的账号。 目前&…

Isaac Lab支持的强化学习框架介绍

在Isaac Lab中使用rl_games强化学习框架进行机械臂训练实验 python source/standalone/workflows/rl_games/train.py --taskIsaac-Franka-Cabinet-Direct-v0 使用 RL 代理进行培训 — Isaac Lab 文档 --- Training with an RL Agent — Isaac Lab documentation (isaac-sim.g…