Kafka消息队列python开发环境搭建

目录

引言

Kafka 的核心概念和组件

Kafka 的主要特性

使用场景

申请云服务器

安装docker及docker-compose

VSCODE配置

开发环境搭建

搭建Kafka的python编程环境

Kafka的python编程示例

引言

Apache Kafka 是一个分布式流处理平台,由 LinkedIn 开发并在 2011 年贡献给 Apache 软件基金会。虽然 Kafka 常被归类为消息队列(也称为消息传递系统或消息中间件),但它实际上提供了比传统消息队列更丰富的功能,特别是在处理大规模数据流方面。Kafka 最初被设计用于处理 LinkedIn 的高吞吐量日志数据,但现在已广泛应用于各种场景,包括网站活动跟踪、日志收集、实时分析、监控数据聚合以及流处理等。

Kafka 的核心概念和组件
  1. Broker(代理):Kafka 集群中的服务器被称为 broker。每个 broker 都可以独立地处理来自生产者的数据,并响应消费者的请求。

  2. Topic(主题):Kafka 中的消息被分类存储在名为 topic 的容器中。每个 topic 可以有多个分区(partition),每个分区都有序地存储消息。

  3. Partition(分区):分区是 Kafka 中实现水平扩展和容错的关键。每个分区可以分布在不同的 broker 上,同时每个分区内的消息都是有序的。

  4. Producer(生产者):生产者负责向 Kafka 集群发送消息到指定的 topic。生产者可以指定消息的键(key),Kafka 使用这个键来决定消息被发送到哪个分区。

  5. Consumer(消费者):消费者从 Kafka 集群订阅 topic 并消费数据。Kafka 支持多个消费者群组(consumer group)同时消费同一个 topic,每个消费者群组内的消费者可以共同分担处理数据的任务。

  6. Consumer Group(消费者群组):同一个消费者群组内的消费者可以并行地消费同一个 topic 的不同分区,但每个分区只能被一个消费者群组内的一个消费者消费,以确保消息的有序性。

  7. Offset(偏移量):Kafka 中的每条消息都有一个唯一的偏移量,用于标识消息在分区中的位置。消费者通过记录自己消费到的偏移量来跟踪消息的读取进度。

Kafka 的主要特性
  • 高吞吐量:Kafka 被设计用来处理高吞吐量的数据,可以轻松处理成千上万条消息/秒。
  • 可扩展性:Kafka 的集群可以通过增加更多的 broker 来水平扩展,以处理更大的数据量和更高的吞吐量。
  • 持久性:Kafka 通过将消息存储在磁盘上来保证消息的持久性,即使在服务器故障的情况下也不会丢失数据。
  • 容错性:Kafka 提供了强大的容错机制,包括自动的副本复制和数据冗余,以确保数据的可靠性和可用性。
  • 实时性:Kafka 支持实时数据处理,使得它可以用于构建实时流处理应用程序。
使用场景
  • 消息传递:作为传统的消息队列使用,支持解耦的生产者和消费者。
  • 网站活动跟踪:收集和分析用户的点击流、搜索查询等网站活动数据。
  • 日志收集:从分布式系统中收集日志数据,用于监控和分析。
  • 实时分析:对实时数据流进行实时处理和分析,以支持实时决策。
  • 事件流处理:处理实时事件流,如传感器数据、金融交易数据等。

申请云服务器

(以京东云为例,阿里云、腾讯云、华为云、天翼云类似)

注意在选择操作系统的时候选择ubuntu22.04或ubuntu20.04

管理员账户root

管理员密码:在安装的时候设置,记住密码

下载安装mobaXterm

https://mobaxterm.mobatek.net/download-home-edition.html

安装docker及docker-compose

#以下只安装一次即可!
sudo apt update
sudo apt install -y docker.io # intel x86_64
sudo curl -SL https://github.com/docker/compose/releases/download/v2.21.0/docker-compose-linux-x86_64 \-o /usr/local/bin/docker-compose
# 如果github不能访问,可用hub.njuu.cf或521github.com/镜像站替换github.com重试
sudo chmod +x /usr/local/bin/docker-compose #如报错,去掉sudo重试
sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose #如报错,去掉sudo重试

查看docker和dockers是否安装好?

docker version
docker-compose version

VSCODE配置

开发环境搭建

  • 将如下文件保存为docker-compose.yml,并上传至服务器,例如/home/ubuntu/iiot/kafka

  • 将下面代码中的localhost替换为云服务公网IP

version: '3'
services:zookeeper:image: confluentinc/cp-zookeeper:latestenvironment:ZOOKEEPER_CLIENT_PORT: 2181ports:- "2181:2181"
dkafka:image: confluentinc/cp-kafka:latestcontainer_name: kafka-devdepends_on:- zookeeperenvironment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1ports:- "9092:9092"
  • 上传metaldigi.zip

解压缩文件

sudo apt updatesudo apt install zipunzip metaladigi.zip

cd metaldigi/kafka
  • 启动kafka消息服务器

    • 在命令行执行消息生产者

docker-compose up -d

搭建Kafka的python编程环境

  • 进入metaldigi文件夹

  • 执行 docker ps

cd metaldigi
docker-compose up -d

Kafka的python编程示例

  • 进入metaldigi文件夹

  • 执行 docker ps

 docker exec -it metal-digi-backend bash

消息生产者

from kafka import KafkaProducer# 创建一个 Kafka 生产者实例# 这里指定了 Kafka 服务器的地址和端口
producer = KafkaProducer(bootstrap_servers='150.158.11.142:9092')# 循环发送 10 条消息到 'demo-topic' 主题
for_in range(10):
# 将要发送的消息转换为字节格式message =f'message{_}'.encode('utf-8')# 发送消息到 Kafka 的 'demo-topic' 主题producer.send('demo-topic', value=message)# 打印已发送的消息print(f'Sent message: message{_}')# 关闭生产者实例
producer.close()这段代码创建一个 Kafka 生产者,用于向 Kafka 集群发送消息。它循环发送10条消息到名为 'demo-topic' 的主题。每条消息都是一个简单的文本字符串,转换为字节格式后发送。

消息消费者

from kafka import KafkaConsumer# 创建一个 Kafka 消费者实例
# 指定 Kafka 服务器地址、端口以及其他一些配置
consumer = KafkaConsumer('demo-topic',  # 指定要消费的主题bootstrap_servers='150.158.11.142:9092',  # Kafka 服务器地址和端口auto_offset_reset='earliest',  # 从最早的消息开始消费enable_auto_commit=True,  # 自动提交偏移量group_id='demo-group'  # 消费者组标识
)# 循环消费并打印收到的消息
for message in consumer:# 解码并打印消息内容print(f"Received message: {message.value.decode()}")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/47509.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot整合阿里云RocketMQ对接,商业版

1.需要阿里云开通商业版RocketMQ 普通消息新建普通主题,普通组,延迟消息新建延迟消息主题,延迟消息组 2.结构目录 3.引入依赖 <!--阿里云RocketMq整合--><dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</…

Qt类 | QLabel类详解

文章目录 一、QLabel类介绍二、Properties&#xff08;属性&#xff09;三、Public Functions&#xff08;公共函数&#xff09;1.构造函数2.alignment与setAlignment函数 -- 标签内容的对齐方式3.buddy与setBuddy函数 -- QLabel关联的伙伴控件4.hasScaledContents与setScaledC…

基于YOLOv8深度学习的水果智能检测系统【python源码+Pyqt5界面+数据集+训练代码】深度学习实战、目标检测、卷积神经网络

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

C#字符串基本操作

1、代码 //1、创建字符串&#xff08;获取长度&#xff09;string str "Hello, World!";Console.WriteLine($"string:{str},length:{str.Length}");//2、字符串连接string str1 "Hello, ";string str2 "World!";Console.WriteLine…

在 Windows 11/10/8 上恢复误删除文件的最佳方法

如果您刚刚在计算机上重新安装了 Windows 操作系统&#xff0c;结果硬盘上的所有文件都消失了&#xff0c;有没有办法从 Windows 11/10 中恢复误删除的文件&#xff1f; 许多因素都可能导致 PC 上的文件被删除。除了重新安装操作系统外&#xff0c;其他常见原因还包括意外删除…

Vue.js 生命周期详解:从创建到销毁的全过程

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 非常期待和您一起在这个小…

【BUG】已解决:java.lang.reflect.InvocationTargetException

已解决&#xff1a;java.lang.reflect.InvocationTargetException 欢迎来到英杰社区https://bbs.csdn.net/topics/617804998 欢迎来到我的主页&#xff0c;我是博主英杰&#xff0c;211科班出身&#xff0c;就职于医疗科技公司&#xff0c;热衷分享知识&#xff0c;武汉城市开发…

STM32 IAP 需要关注的一些事

1、首先要知道STM32的程序是如何分布在FLASH中的。 2、升级的时候涉及到两个程序&#xff0c;一个是bootloader&#xff0c;一个是user程序&#xff0c;这两个程序的功能分别的什么作用的&#xff1f; 3、编译的固件是怎么分布的&#xff1f;通过那个配置文件去指导编译器去排布…

Spring Boot集成kudu快速入门Demo

1.什么是kudu 在Kudu出现前&#xff0c;由于传统存储系统的局限性&#xff0c;对于数据的快速输入和分析还没有一个完美的解决方案&#xff0c;要么以缓慢的数据输入为代价实现快速分析&#xff0c;要么以缓慢的分析为代价实现数据快速输入。随着快速输入和分析场景越来越多&a…

十五、【机器学习】【监督学习】- 神经网络回归

系列文章目录 第一章 【机器学习】初识机器学习 第二章 【机器学习】【监督学习】- 逻辑回归算法 (Logistic Regression) 第三章 【机器学习】【监督学习】- 支持向量机 (SVM) 第四章【机器学习】【监督学习】- K-近邻算法 (K-NN) 第五章【机器学习】【监督学习】- 决策树…

Django Q()函数

Q() 函数的作用 在Django中&#xff0c;Q()函数是一个非常有用的工具&#xff0c;主要用于构建复杂的查询。它允许你创建复杂的查询语句&#xff0c;包括AND、OR和NOT逻辑操作。这对于处理复杂的数据库查询特别有用&#xff0c;特别是在你需要组合多个条件或处理复杂的过滤逻辑…

HLS加密技术:保障流媒体内容安全的利器

随着网络视频内容的爆炸性增长&#xff0c;如何有效保护视频内容的版权和安全成为了一个亟待解决的问题。HLS&#xff08;HTTP Live Streaming&#xff09;加密技术作为一种先进的流媒体加密手段&#xff0c;凭借其高效性和安全性&#xff0c;在直播、点播等场景中得到了广泛应…

【C语言】联合体(union)

文章目录 1.联合体的含义2. 联合体的声明3. 联合体大小的计算4. 联合体的特点 1.联合体的含义 联合体也叫做共用体&#xff0c;是指联合体的所有成员共用同一块内存空间。这也就说明了&#xff0c;联合体的大小至少是其成员所占空间的最大值。 2. 联合体的声明 #include<…

【整体介绍】HTML和JS编写多用户VR应用程序的框架

一、Networked-Aframe是什么&#xff1f; 简称NAF&#xff0c;底层基于Mozilla的AFrame框架&#xff0c;用HTML和JS编写多用户VR应用程序的框架。 二、特性 支持 WebRTC 和/或 WebSocket 连接。 语音聊天。音频流让您的用户在应用程序内交谈&#xff08;仅限 WebRTC&#xff…

2024全球和国内最常用的弱密码,有没有你的?

密码管理器NordPass分析了来自公开来源的超过4.3TB 的密码数据&#xff0c;找出了当前为止&#xff08;2024年&#xff09;最常用&#xff08;最脆弱&#xff09;的密码。 这些密码主要有下面这些特征&#xff1a; 简单且常用&#xff0c;万年弱密码&#xff0c;比如123456、a…

智慧消防建设方案(完整方案参考PPT)

智慧消防系统建设方案旨在通过物联网、大数据与云计算技术&#xff0c;集成火灾自动报警、智能监控、应急指挥等功能于一体。方案部署智能传感器监测火情&#xff0c;实时数据分析预警&#xff0c;实现火灾早发现、早处置。构建可视化指挥平台&#xff0c;优化应急预案&#xf…

Python中发送邮件的艺术:普通邮件、PDF附件与Markdown附件

用的是qq邮箱,具体获取smtp的password可以看这个文章 获取密码 Python中发送邮件的艺术:普通邮件、PDF附件与Markdown附件 在今天的博客中,我们将探讨如何使用Python的smtplib库来发送电子邮件,包括发送普通文本邮件、携带PDF文件的邮件和附带Markdown文件的邮件。这些功能…

视频号助手获取feedId

1.登陆视频号 2.内容管理-视频 3.视频后分享

Git 用法

基本介绍 版本控制工具用处&#xff1a; 备份代码还原协同开发追溯 版本控制工具 1、集中式版本控制工具 版本库是集中存放在中央服务器的&#xff0c;team 里每个人 work 时从中央服务器下载代码&#xff0c;是必须联网才能工作&#xff0c;局域网或互联网。个人修改后然后…

【BUG】已解决:WslRegisterDistribution failed with error: 0x800701bc

已解决&#xff1a;WslRegisterDistribution failed with error: 0x800701bc 欢迎来到英杰社区https://bbs.csdn.net/topics/617804998 欢迎来到我的主页&#xff0c;我是博主英杰&#xff0c;211科班出身&#xff0c;就职于医疗科技公司&#xff0c;热衷分享知识&#xff0c;武…