Java技术栈 —— Spark入门(三)之实时视频流

Java技术栈 —— Spark入门(三)之实时视频流转灰度图像

  • 一、将摄像头数据发送至kafka
  • 二、Kafka准备topic
  • 三、spark读取kafka图像数据并处理
  • 四、本地显示灰度图像(存在卡顿现象,待优化)

项目整体结构图如下

在这里插入图片描述

参考文章或视频链接
[1] Architecture-for-real-time-video-streaming-analytics

一、将摄像头数据发送至kafka

这个代码将运行在你有摄像头的机器上,缺依赖就装依赖

import cv2
import kafka
import numpy as np# 设置 Kafka Producer
# 注意修改你的kafka地址
producer = kafka.KafkaProducer(bootstrap_servers='localhost:9092')# 打开摄像头(0 为默认摄像头)
cap = cv2.VideoCapture(0)while True:# 从摄像头捕获帧ret, frame = cap.read()if not ret:break# 将图像编码为 JPEG 格式_, buffer = cv2.imencode('.jpg', frame)# 将图像作为字节数组发送到 Kafkaproducer.send('camera-images', buffer.tobytes())# 显示当前捕获的帧cv2.imshow('Video', frame)# 按 'q' 键退出if cv2.waitKey(1) & 0xFF == ord('q'):break# 释放资源
cap.release()
cv2.destroyAllWindows()
producer.close()

二、Kafka准备topic

在准备topic之前,要先配置kafka中的config/server.properties文件,否则其它机器无法联通kafka,配置好后重启kafka。

# 找到这两个选项并修改成如下内容
listeners=PLAINTEXT://0.0.0.0:9092
# 改成你的kafka所在服务器ip
advertised.listeners=PLAINTEXT://{your_ip}:9092

如果你之前创建过topic,那就清空这些topic中的数据

# 设置保留时间为0,相当于立即清空数据
#bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name {your_topic_name} --add-config retention.ms=0
# 恢复原始保留设置,立即清空数据后,将数据的保留时间恢复至原有状态
#bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name {your_topic_name} --add-config retention.ms=604800000

开始正式创建topic

# 创建输入图片所在topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic camera-images --partitions 1 --replication-factor 1
# 创建输出的gray灰度图片所在topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic result-gray-images --partitions 1 --replication-factor 1# 准备好后查看下topic list进行验证
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# 查看某topic中的数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic {your_topic_name} --from-beginning

三、spark读取kafka图像数据并处理

首先给你的spark脚本所运行的python环境(这个环境一般可以为conda等虚拟环境),安装必要的依赖库

pip install opencv-python-headless

准备脚本文件

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import BinaryType
import cv2
import numpy as npbootstrapServers = "localhost:9092"# 创建 SparkSession
spark = SparkSession.builder \.appName("Kafka-Spark-OpenCV") \.getOrCreate()# 初始化 Kafka Producer,用于发送处理后的图像
# 如果不这样做,会出现PicklingError,因为如果UDF中,包含了无法被序列化的对象,例如线程锁(_thread.RLock)或 Kafka 的 KafkaProducer 实例,序列化就会失败。
# 因此,在每个执行器内部,创建 KafkaProducer 实例
producer = None# 从 Kafka 读取数据流
df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("subscribe", "camera-images") \.load()# UDF 用于将图像转换为灰度
def convert_to_gray(image_bytes):global producer# 创建 KafkaProducer 实例(在每个执行器上只初始化一次)if producer is None:producer = KafkaProducer(bootstrap_servers = bootstrapServers)# 将字节数组转换为 numpy 数组nparr = np.frombuffer(image_bytes, np.uint8)# 将 numpy 数组解码为图像img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)# 将图像转换为灰度gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)# 将灰度图像编码为 JPEG_, buffer = cv2.imencode('.jpg', gray)# 将处理后的图像发送到 Kafka 'result-gray-images' 主题producer.send('result-gray-images', buffer.tobytes())return buffer.tobytes()# 注册 UDF
convert_to_gray_udf = udf(convert_to_gray, BinaryType())# 应用 UDF 对数据进行灰度化处理
gray_df = df.withColumn("gray_image", convert_to_gray_udf("value"))# 将处理后的数据写入文件或其他输出
query = gray_df.writeStream \.outputMode("append") \.format("console") \.start()# query = gray_df\
#     .writeStream \
#     .format('kafka') \
#     .outputMode('update') \
#     .option("kafka.bootstrap.servers", bootstrapServers) \
#     .option('checkpointLocation', '/spark/job-checkpoint') \
#     .option("topic", "result-gray-images") \
#     .start()query.awaitTermination()

spark-submit提交脚本文件:

# 1.提高内存
# 2.调整 Kafka 批次大小,减少单个批次的数据量,从而降低内存使用(这个步骤存疑)
/opt/spark-3.5.2-bin-hadoop3/bin/spark-submit \
--executor-memory 4g \
--driver-memory 4g \
--conf "spark.kafka.maxOffsetsPerTrigger=1000" \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,org.apache.kafka:kafka-clients:3.5.2 \
/opt/spark-3.5.2-bin-hadoop3/jobs/pyjobs/kafka_to_spark.py

四、本地显示灰度图像(存在卡顿现象,待优化)

import cv2
import numpy as np
from kafka import KafkaConsumer# 设置 Kafka Consumer
consumer = KafkaConsumer('result-gray-images',bootstrap_servers='{your_kafka_ip}:9092',auto_offset_reset='latest',enable_auto_commit=True,# group_id='image-display-group'
)# 从 Kafka 主题读取灰度图像并显示
for message in consumer:# print("reading gray image.... ")# 将消息转换为 numpy 数组nparr = np.frombuffer(message.value, np.uint8)# 解码为图像gray_img = cv2.imdecode(nparr, cv2.IMREAD_GRAYSCALE)# 显示灰度图像cv2.imshow('Gray Video', gray_img)if cv2.waitKey(1) & 0xFF == ord('q'):break# 释放资源
cv2.destroyAllWindows()
consumer.close()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/51687.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python-MNE-源空间和正模型07:修复BEM和头表面

有时在创建BEM模型时,由于可能出现的一系列问题(例如,表面之间的交叉),表面需要手动校正。在这里,我们将看到如何通过将表面导出到3D建模程序blender,编辑它们,并重新导入它们来实现这一点。我们还将给出一…

鸿蒙(API 12 Beta3版)【通过字节数组生成码图】

基本概念 码图生成能力支持将字节数组转换为自定义格式的码图。 场景介绍 码图生成能力支持将字节数组转换为自定义格式的码图。 例如:调用码图生成能力, 将字节数组转换成交通一卡通二维码使用。 约束与限制 只支持QR Code生成,根据纠错水平不同对…

【已解决】win11笔记本电脑突然无法检测到其他显示器 / 无法使用扩展屏(2024.8.29 / 驱动更新问题)

我们点击 winx ,找到设备管理器,查看显示适配器: 主要问题就出现在 NVIDIA GeForce RTX 3060 Laptop GPU 上(虽然我把所有驱动都重新更新了一遍😭)。 常用驱动更新: dell 驱动更新&#xff08…

HTML <template> 标签的基本技巧

前言 HTML中的<template>标记是 Web 开发中一个功能强大但经常未得到充分利用的元素。它允许你定义可重复使用的内容&#xff0c;这些内容可以克隆并插入 DOM 中而无需最初渲染。 此功能对于创建动态、交互式 Web 应用程序特别有用。 在本文中&#xff0c;我们将探讨有…

STM32G474采用“多个单通道ADC转换”读取3个ADC引脚的电压

STM32G474采用“多个单通道ADC转换”读取3个ADC引脚的电压&#xff1a;PC0、PA1和PA2。本测试将ADC1_IN6映射到PC0引脚&#xff0c;ADC12_IN2映射到PA1引脚&#xff0c;ADC1_IN3映射到PA2引脚。 1、ADC输入 ADC输入电压范围&#xff1a;Vref– ≤ VIN ≤ Vref ADC支持“单端输入…

顺序表和链表知识点

1 顺序表 顺序表是指用一段物理地址连续的空间去存储数据的线性结构。 顺序表有两种&#xff1a;静态顺序表&#xff0c;动态顺序表。 1.1 静态顺序表结构体定义 typedef int ElemDataSL;typedef struct SequeList {ElemDataSL arr[100];int size; }SL; 静态顺序表在创建结构体…

【 html+css 绚丽Loading 】000026 五行吞灵盘

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享htmlcss 绚丽Loading&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495…

网络安全领域含金量最高的5大赛事,每个网安人的梦!

做网络安全一定要知道的5大赛事&#xff0c;含金量贼高&#xff0c;如果你能拿奖&#xff0c;国内大厂随你挑&#xff0c;几乎是每个有志网安人的梦&#xff01; 一、 DEF CON CTF&#xff08;DEF CON Capture the Flag&#xff09; DEF CON CTF是DEF CON黑帽大会上的一项著名…

江协科技STM32学习- P7 GPIO输入

&#x1f680;write in front&#x1f680; &#x1f50e;大家好&#xff0c;我是黄桃罐头&#xff0c;希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流 &#x1f381;欢迎各位→点赞&#x1f44d; 收藏⭐️ 留言&#x1f4dd;​…

数据结构(树、平衡树、红黑树)

目录 树 树的遍历方式 平衡二叉树 旋转机制 左旋 右旋 旋转实例 左左 左右 右右 右左 总结 红黑树 树 相关概念 节点的内部结构如下 二叉树与二叉查找树的定义如下 树的遍历方式 前序遍历&#xff1a;当前节点&#xff0c;左子节点&#xff0c;右子结点 中序遍…

string的模拟实现与深浅拷贝

在上一章中可以看见&#xff0c;string类函数的基本实现和用法&#xff0c;在本文。来用基础的语言来模拟实现string类&#xff0c;来了解一下他们的基础底层&#xff1b; 在VS中string&#xff0c;我们可以看见&#xff0c;实现VS的类成员很多&#xff0c;很麻烦&#xff1b; …

【STM32】电容触摸按键

电容按键就是酷&#xff0c;但据我使用过电容按键版的洗澡计费机子后&#xff0c;一生黑&#xff08;湿手优化没做好的电容按键简直稀碎&#xff09;。 大部分图片来源&#xff1a;正点原子HAL库课程 专栏目录&#xff1a;记录自己的嵌入式学习之路-CSDN博客 目录 1 触摸按…

Zookeeper官网Java示例代码解读(一)

2024-08-22 1. 基本信息 官网地址&#xff1a; https://zookeeper.apache.org/doc/r3.8.4/javaExample.html 示例设计思路 Conventionally, ZooKeeper applications are broken into two units, one which maintains the connection, and the other which monitors data. I…

【C++ Primer Plus习题】7.5

问题: 解答: #include <iostream> using namespace std;int function(int n) {if (n 0)return 1;if (n 1)return 1;return n* function(n - 1); }int main() {int value 0;while (true){cout << "请输入数字:";cin >> value;cout << val…

华为Huawei路由器交换机SSH配置

华为设备的SSH登录配置需要5个步骤&#xff0c;示例如下&#xff1a; 一、配置命令 使能SSH功能 stelnet server enable生成公钥 rsa local-key-pair create 1024配置AAA用户密码及相应授权 aaalocal-user xxx password cipher xxxyyy1234local-user xxx privilege level …

ADB 获取屏幕坐标,并模拟滑动和点击屏幕

本文声明:本文是参考https://blog.csdn.net/beyond702/article/details/69258932编制。同时,补充了在windows系统模式下,详细的获取屏幕坐标的步骤。 1.判断设备与windows电脑USB连接是否正常 在CMD窗口输入命令:ADB devices,按ENTER键,输出如下结果,则表示连接正常。 …

Prometheus+Grafana监控数据可视化

上一篇文章讲了prometheus的简单使用&#xff0c;这一篇就先跳过中间略显枯燥的内容&#xff0c;来到监控数据可视化。 一方面&#xff0c;可视化的界面看着更带劲&#xff0c;另一方面&#xff0c;也更方便我们直观的查看监控数据&#xff0c;方便后面的学习。 Grafana安装与…

DIFFUSION 系列笔记| Latent Diffusion Model、Stable Diffusion基础概念、数学原理、代码分析、案例展示

目录 Latent Diffusion Model LDM 主要思想 LDM使用示例 LDM Pipeline LDM 中的 UNET 准备时间步 time steps 预处理阶段 pre-process 下采样过程 down sampling 中间处理 mid processing 上采样 upsampling 后处理 post-process LDM Super Resolution Pipeline…

Redis基本全局命令

文章目录 get和setkeysexistsdelexpirettltype redis全局命令&#xff1a; redis支持很多种数据结构&#xff0c;整体上来说。redis是键值对结构&#xff0c;key固定就是字符串&#xff0c;value实际上就会有很多种&#xff0c;比如说&#xff1a; 字符串哈希表列表有序集合 …

住宅物业满意度计算方式中满意率和满意度指数的区别

满意率和满意度指数是用于计算住宅物业满意度的两种不同方式&#xff0c;它们的区别如下&#xff1a; 1、满意率&#xff1a;满意率是通过计算满意的居民人数与总参与调查的居民人数之间的比例来衡量满意度。它以百分比形式表示&#xff0c;可以直观地了解居民对物业管理的整体…