Kafka消息队列python开发环境搭建

目录

引言

Kafka 的核心概念和组件

Kafka 的主要特性

使用场景

申请云服务器

安装docker及docker-compose

VSCODE配置

开发环境搭建

搭建Kafka的python编程环境

Kafka的python编程示例

引言

Apache Kafka 是一个分布式流处理平台,由 LinkedIn 开发并在 2011 年贡献给 Apache 软件基金会。虽然 Kafka 常被归类为消息队列(也称为消息传递系统或消息中间件),但它实际上提供了比传统消息队列更丰富的功能,特别是在处理大规模数据流方面。Kafka 最初被设计用于处理 LinkedIn 的高吞吐量日志数据,但现在已广泛应用于各种场景,包括网站活动跟踪、日志收集、实时分析、监控数据聚合以及流处理等。

Kafka 的核心概念和组件
  1. Broker(代理):Kafka 集群中的服务器被称为 broker。每个 broker 都可以独立地处理来自生产者的数据,并响应消费者的请求。

  2. Topic(主题):Kafka 中的消息被分类存储在名为 topic 的容器中。每个 topic 可以有多个分区(partition),每个分区都有序地存储消息。

  3. Partition(分区):分区是 Kafka 中实现水平扩展和容错的关键。每个分区可以分布在不同的 broker 上,同时每个分区内的消息都是有序的。

  4. Producer(生产者):生产者负责向 Kafka 集群发送消息到指定的 topic。生产者可以指定消息的键(key),Kafka 使用这个键来决定消息被发送到哪个分区。

  5. Consumer(消费者):消费者从 Kafka 集群订阅 topic 并消费数据。Kafka 支持多个消费者群组(consumer group)同时消费同一个 topic,每个消费者群组内的消费者可以共同分担处理数据的任务。

  6. Consumer Group(消费者群组):同一个消费者群组内的消费者可以并行地消费同一个 topic 的不同分区,但每个分区只能被一个消费者群组内的一个消费者消费,以确保消息的有序性。

  7. Offset(偏移量):Kafka 中的每条消息都有一个唯一的偏移量,用于标识消息在分区中的位置。消费者通过记录自己消费到的偏移量来跟踪消息的读取进度。

Kafka 的主要特性
  • 高吞吐量:Kafka 被设计用来处理高吞吐量的数据,可以轻松处理成千上万条消息/秒。
  • 可扩展性:Kafka 的集群可以通过增加更多的 broker 来水平扩展,以处理更大的数据量和更高的吞吐量。
  • 持久性:Kafka 通过将消息存储在磁盘上来保证消息的持久性,即使在服务器故障的情况下也不会丢失数据。
  • 容错性:Kafka 提供了强大的容错机制,包括自动的副本复制和数据冗余,以确保数据的可靠性和可用性。
  • 实时性:Kafka 支持实时数据处理,使得它可以用于构建实时流处理应用程序。
使用场景
  • 消息传递:作为传统的消息队列使用,支持解耦的生产者和消费者。
  • 网站活动跟踪:收集和分析用户的点击流、搜索查询等网站活动数据。
  • 日志收集:从分布式系统中收集日志数据,用于监控和分析。
  • 实时分析:对实时数据流进行实时处理和分析,以支持实时决策。
  • 事件流处理:处理实时事件流,如传感器数据、金融交易数据等。

申请云服务器

(以京东云为例,阿里云、腾讯云、华为云、天翼云类似)

注意在选择操作系统的时候选择ubuntu22.04或ubuntu20.04

管理员账户root

管理员密码:在安装的时候设置,记住密码

下载安装mobaXterm

https://mobaxterm.mobatek.net/download-home-edition.html

安装docker及docker-compose

#以下只安装一次即可!
sudo apt update
sudo apt install -y docker.io # intel x86_64
sudo curl -SL https://github.com/docker/compose/releases/download/v2.21.0/docker-compose-linux-x86_64 \-o /usr/local/bin/docker-compose
# 如果github不能访问,可用hub.njuu.cf或521github.com/镜像站替换github.com重试
sudo chmod +x /usr/local/bin/docker-compose #如报错,去掉sudo重试
sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose #如报错,去掉sudo重试

查看docker和dockers是否安装好?

docker version
docker-compose version

VSCODE配置

开发环境搭建

  • 将如下文件保存为docker-compose.yml,并上传至服务器,例如/home/ubuntu/iiot/kafka

  • 将下面代码中的localhost替换为云服务公网IP

version: '3'
services:zookeeper:image: confluentinc/cp-zookeeper:latestenvironment:ZOOKEEPER_CLIENT_PORT: 2181ports:- "2181:2181"
dkafka:image: confluentinc/cp-kafka:latestcontainer_name: kafka-devdepends_on:- zookeeperenvironment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1ports:- "9092:9092"
  • 上传metaldigi.zip

解压缩文件

sudo apt updatesudo apt install zipunzip metaladigi.zip

cd metaldigi/kafka
  • 启动kafka消息服务器

    • 在命令行执行消息生产者

docker-compose up -d

搭建Kafka的python编程环境

  • 进入metaldigi文件夹

  • 执行 docker ps

cd metaldigi
docker-compose up -d

Kafka的python编程示例

  • 进入metaldigi文件夹

  • 执行 docker ps

 docker exec -it metal-digi-backend bash

消息生产者

from kafka import KafkaProducer# 创建一个 Kafka 生产者实例# 这里指定了 Kafka 服务器的地址和端口
producer = KafkaProducer(bootstrap_servers='150.158.11.142:9092')# 循环发送 10 条消息到 'demo-topic' 主题
for_in range(10):
# 将要发送的消息转换为字节格式message =f'message{_}'.encode('utf-8')# 发送消息到 Kafka 的 'demo-topic' 主题producer.send('demo-topic', value=message)# 打印已发送的消息print(f'Sent message: message{_}')# 关闭生产者实例
producer.close()这段代码创建一个 Kafka 生产者,用于向 Kafka 集群发送消息。它循环发送10条消息到名为 'demo-topic' 的主题。每条消息都是一个简单的文本字符串,转换为字节格式后发送。

消息消费者

from kafka import KafkaConsumer# 创建一个 Kafka 消费者实例
# 指定 Kafka 服务器地址、端口以及其他一些配置
consumer = KafkaConsumer('demo-topic',  # 指定要消费的主题bootstrap_servers='150.158.11.142:9092',  # Kafka 服务器地址和端口auto_offset_reset='earliest',  # 从最早的消息开始消费enable_auto_commit=True,  # 自动提交偏移量group_id='demo-group'  # 消费者组标识
)# 循环消费并打印收到的消息
for message in consumer:# 解码并打印消息内容print(f"Received message: {message.value.decode()}")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/47509.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot整合阿里云RocketMQ对接,商业版

1.需要阿里云开通商业版RocketMQ 普通消息新建普通主题,普通组,延迟消息新建延迟消息主题,延迟消息组 2.结构目录 3.引入依赖 <!--阿里云RocketMq整合--><dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</…

Qt类 | QLabel类详解

文章目录 一、QLabel类介绍二、Properties&#xff08;属性&#xff09;三、Public Functions&#xff08;公共函数&#xff09;1.构造函数2.alignment与setAlignment函数 -- 标签内容的对齐方式3.buddy与setBuddy函数 -- QLabel关联的伙伴控件4.hasScaledContents与setScaledC…

基于YOLOv8深度学习的水果智能检测系统【python源码+Pyqt5界面+数据集+训练代码】深度学习实战、目标检测、卷积神经网络

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

End-to-End Object Detection with Transformers【目标检测-方法详细解读】

摘要 我们提出了一种新的方法,将目标检测视为一个直接的集合预测问题。我们的方法简化了检测流程,有效地消除了许多手工设计的组件,如非极大值抑制程序或锚生成,这些组件显式编码了我们关于任务的先验知识。新框架的主要成分,称为DEtection TRansformer或DETR,是一个基于…

下载工具 -- Internet Download Manager(IDM) v6.42 build 14 绿色特别版

软件简介 Internet Download Manager&#xff08;IDM&#xff09;是一款功能强大的下载工具&#xff0c;它可以帮助用户更高效、更稳定地下载网络上的各种文件。IDM支持多线程下载技术&#xff0c;可以将文件分割成多个部分同时下载&#xff0c;从而显著提高下载速度。此外&am…

C#字符串基本操作

1、代码 //1、创建字符串&#xff08;获取长度&#xff09;string str "Hello, World!";Console.WriteLine($"string:{str},length:{str.Length}");//2、字符串连接string str1 "Hello, ";string str2 "World!";Console.WriteLine…

在 Windows 11/10/8 上恢复误删除文件的最佳方法

如果您刚刚在计算机上重新安装了 Windows 操作系统&#xff0c;结果硬盘上的所有文件都消失了&#xff0c;有没有办法从 Windows 11/10 中恢复误删除的文件&#xff1f; 许多因素都可能导致 PC 上的文件被删除。除了重新安装操作系统外&#xff0c;其他常见原因还包括意外删除…

MAC 数据恢复软件: STELLAR Data Recovery For MAC V. 12.1 更多增强功能

天津鸿萌科贸发展有限公司是 Stellar 系列软件的授权代理商。 STELLAR Data Recovery For MAC 该数据恢复软件可从任何存储驱动器、清空的回收站以及崩溃或无法启动的 Mac 设备中恢复丢失或删除的文件。 轻松恢复已删除的文档、照片、音频文件和视频。自定义扫描以帮助恢复特…

Vue.js 生命周期详解:从创建到销毁的全过程

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 非常期待和您一起在这个小…

【BUG】已解决:java.lang.reflect.InvocationTargetException

已解决&#xff1a;java.lang.reflect.InvocationTargetException 欢迎来到英杰社区https://bbs.csdn.net/topics/617804998 欢迎来到我的主页&#xff0c;我是博主英杰&#xff0c;211科班出身&#xff0c;就职于医疗科技公司&#xff0c;热衷分享知识&#xff0c;武汉城市开发…

STM32 IAP 需要关注的一些事

1、首先要知道STM32的程序是如何分布在FLASH中的。 2、升级的时候涉及到两个程序&#xff0c;一个是bootloader&#xff0c;一个是user程序&#xff0c;这两个程序的功能分别的什么作用的&#xff1f; 3、编译的固件是怎么分布的&#xff1f;通过那个配置文件去指导编译器去排布…

Spring Boot集成kudu快速入门Demo

1.什么是kudu 在Kudu出现前&#xff0c;由于传统存储系统的局限性&#xff0c;对于数据的快速输入和分析还没有一个完美的解决方案&#xff0c;要么以缓慢的数据输入为代价实现快速分析&#xff0c;要么以缓慢的分析为代价实现数据快速输入。随着快速输入和分析场景越来越多&a…

网络爬虫基础介绍

什么是爬虫 Web 爬虫&#xff0c;也称为网络蜘蛛或网络机器人&#xff0c;是一种用于自动化访问和抓取网页内容的程序。爬虫通过模拟用户访问网页的行为&#xff0c;从互联网上获取数据&#xff0c;并将其存储或进一步处理。 爬虫的应用场景 搜索引擎索引&#xff1a;如 Google…

十五、【机器学习】【监督学习】- 神经网络回归

系列文章目录 第一章 【机器学习】初识机器学习 第二章 【机器学习】【监督学习】- 逻辑回归算法 (Logistic Regression) 第三章 【机器学习】【监督学习】- 支持向量机 (SVM) 第四章【机器学习】【监督学习】- K-近邻算法 (K-NN) 第五章【机器学习】【监督学习】- 决策树…

C语言强化-1.数据结构概述

与408的关联&#xff1a;1. 逻辑结构和存储结构在选择题中会有涉及。2. 时间复杂度几乎是每一年大题必考内容&#xff01; 逻辑结构与存储结构 逻辑结构&#xff08;对人友好&#xff09; 集合结构&#xff08;无关系&#xff09;线性结构&#xff08;一对一&#xff09;树形…

Django Q()函数

Q() 函数的作用 在Django中&#xff0c;Q()函数是一个非常有用的工具&#xff0c;主要用于构建复杂的查询。它允许你创建复杂的查询语句&#xff0c;包括AND、OR和NOT逻辑操作。这对于处理复杂的数据库查询特别有用&#xff0c;特别是在你需要组合多个条件或处理复杂的过滤逻辑…

HLS加密技术:保障流媒体内容安全的利器

随着网络视频内容的爆炸性增长&#xff0c;如何有效保护视频内容的版权和安全成为了一个亟待解决的问题。HLS&#xff08;HTTP Live Streaming&#xff09;加密技术作为一种先进的流媒体加密手段&#xff0c;凭借其高效性和安全性&#xff0c;在直播、点播等场景中得到了广泛应…

【Neo4j 】学习笔记:GraphRAG 宣言:为 GenAI 添加知识

GraphRAG 宣言:为 GenAI 添加知识 原文 菲利普拉瑟尔图片 菲利普拉瑟尔 7 月 11 日 阅读时长:22 分钟 我们正在进入 RAG 的“Blue Links”时代 GraphRAG 宣言。 我们即将意识到,要想用 GenAI 做任何有意义的事情,你不能只依赖自回归 LLM来做决定。我知道你在想什么:“RAG …

【C语言】联合体(union)

文章目录 1.联合体的含义2. 联合体的声明3. 联合体大小的计算4. 联合体的特点 1.联合体的含义 联合体也叫做共用体&#xff0c;是指联合体的所有成员共用同一块内存空间。这也就说明了&#xff0c;联合体的大小至少是其成员所占空间的最大值。 2. 联合体的声明 #include<…

【整体介绍】HTML和JS编写多用户VR应用程序的框架

一、Networked-Aframe是什么&#xff1f; 简称NAF&#xff0c;底层基于Mozilla的AFrame框架&#xff0c;用HTML和JS编写多用户VR应用程序的框架。 二、特性 支持 WebRTC 和/或 WebSocket 连接。 语音聊天。音频流让您的用户在应用程序内交谈&#xff08;仅限 WebRTC&#xff…