用pyspark把kafka主题数据经过etl导入另一个主题中的有关报错

首先看一下我们的示例代码

import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
"""
------------------------------------------Description : TODO:SourceFile : etl_stream_kafkaAuthor  : zxxDate  : 2024/11/14
-------------------------------------------
"""
if __name__ == '__main__':os.environ['JAVA_HOME'] = 'D:/bigdata/03-java/java-8/jdk'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/bigdata/04-Hadoop/hadoop/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'D:/bigdata/22-spark/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'D:/bigdata/22-spark/Miniconda3/python.exe'spark = SparkSession.builder.master("local[2]").appName("etl_stream_kafka").config("spark.sql.shuffle.partitions", 2).getOrCreate()# 连接kafkareadDF = spark.readStream.format("kafka") \.option("kafka.bootstrap.servers", "bigdata01:9092") \.option("subscribe", "topicA") \.load()# 使用DSL语句etlDF = readDF.selectExpr("cast(value as STRING)").filter(F.col("value").contains("success"))etlDF.writeStream \.format("kafka") \.option("kafka.bootstrap.servers", "bigdata01:9092") \.option("topic", "etlTopic") \.option("checkpointLocation", "../../datas/kafka_stream") \.start().awaitTermination()# 关闭spark.stop()

运行发现报错

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):File "D:\bigdata\18-python\pyspark_project\pythonProject1\main\streamingkafka\etl_stream_kafka.py", line 22, in <module>readDF = spark.readStream.format("kafka") \File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\pyspark\sql\streaming.py", line 482, in loadreturn self._df(self._jreader.load())File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\py4j\java_gateway.py", line 1304, in __call__return_value = get_return_value(File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\pyspark\sql\utils.py", line 117, in decoraise converted from None
pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

报错 : org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

解决:这个是因为缺少了Kafka和Spark的集成包,前往https://mvnrepository.com/artifact/org.apache.spark

下载对应的jar包即可,比如我是SparkSql写入的Kafka,那么我就需要下载Spark-Sql-Kafka.x.x.x.jar

 进入网站(已打包放入文章末尾)

找到对应有关spark 和kafka的模块

找到对应的版本 ,这里我用的kafka是3.0版本,下载的是3.1.2版本

 点进去,下载jar包

 再次运行会发现仍然报错,这是因为jar包之间的依赖关系,从刚才下载的界面下面再下载有关的jar包

 

 

 

 再次运行即可

 jar包下载链接

【免费】用pyspark把数据从kafka的一个主题用流处理后再导入kafka的另一个主题的有关报错资源-CSDN文库

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/886878.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32芯片EXIT外部中断的配置与原理以及模板代码(标准库)

配置EXIT外部中断其实就是把GPIO刀NVIC的各个外设配置好 第一步&#xff1a;配置RCC&#xff0c;把我们涉及到的外设的时钟都打开 &#xff08;此处EXTI是默认打开的&#xff0c;而NVIC是内核外设无需配置&#xff09; 第二步&#xff1a;配置GPIO,选择端口为输入模式 第三…

misc设备驱动

MISC 理解 简化创建设备号&#xff0c;cdev&#xff0c;class&#xff0c;device 的步骤。 设备树 代码 模块初始化和退出 平台下驱动匹配和移除 MiSC 结构体 文件描述符的处理函数 其他

layui合并table相同内的行

<table border"1" id"table1" class"layui-table"><thead><tr><th><b>姓名</b></th><th><b>项目</b></th><th><b>任务</b></th><th><b>…

java ssm 羽绒服商城网站 在线商城 在线服饰销售网站 源码 jsp

一、项目简介 本项目是一套基于SSM的羽绒服商城网站&#xff0c;主要针对计算机相关专业的和需要项目实战练习的Java学习者。 包含&#xff1a;项目源码、数据库脚本、软件工具等。 项目都经过严格调试&#xff0c;确保可以运行&#xff01; 二、技术实现 ​后端技术&#xff…

程序里sendStringParametersAsUnicode=true的配置导致sql server cpu使用率高问题处理

一 问题描述 近期生产环境几台sql server从库cpu使用率总是打满&#xff0c;发现抓的带变量值的慢sql&#xff0c;手动代入变量值执行并不慢&#xff0c;秒级返回&#xff0c;不知道问题出在哪里。 二 问题排查 用扩展事件或者sql profiler抓慢sql&#xff0c;抓到了变量值&…

OpenAI震撼发布:桌面版ChatGPT,Windows macOS双平台AI编程体验!

【雪球导读】 「OpenAI推出ChatGPT桌面端」 OpenAI重磅推出ChatGPT桌面端&#xff0c;全面支持Windows和macOS系统&#xff01;这款新工具为用户在日常生活和工作中提供了前所未有的无缝交互体验。对于那些依赖桌面端进行开发工作的专业人士来说&#xff0c;这一更新带来了令人…

湘潭大学软件工程算法设计与分析考试复习笔记(四)

回顾 湘潭大学软件工程算法设计与分析考试复习笔记&#xff08;一&#xff09;湘潭大学软件工程算法设计与分析考试复习笔记&#xff08;二&#xff09;湘潭大学软件工程算法设计与分析考试复习笔记&#xff08;三&#xff09; 前言 现在是晚上十一点&#xff0c;我平时是十…

Python学习29天

二分查找 # 定义函数冒泡排序法从大到小排列 def bbble_sort(list):# i控制排序次数for i in range(len(list) - 1):# j控制每次排序比较次数for j in range(len(list) - 1 - i):if list[j] < list[j 1]:list[j], list[j 1] list[j 1], list[j] # 定义二分查找函数 def…

(Linux)搭建静态网站——基于http/https协议的静态网站

简单了解nginx配置文件 1.下载并开启nginx服务 下载 [rootlocalhost ~]# dnf install nginx -y开启 [rootlocalhost ~]# systemctl restart nginx 1.(1)搭建静态网站——基于http协议的静态网站 实验1&#xff1a;搭建一个web服务器&#xff0c;访问该服务器时显示“hello w…

【数据结构-表达式解析】力扣227. 基本计算器 II

给你一个字符串表达式 s &#xff0c;请你实现一个基本计算器来计算并返回它的值。 整数除法仅保留整数部分。 你可以假设给定的表达式总是有效的。所有中间结果将在 [-231, 231 - 1] 的范围内。 注意&#xff1a;不允许使用任何将字符串作为数学表达式计算的内置函数&#…

「六」体验HarmonyOS端云一体化开发模板——本地真机运行应用

关于作者 白晓明 宁夏图尔科技有限公司董事长兼CEO、坚果派联合创始人 华为HDE、润和软件HiHope社区专家、鸿蒙KOL、仓颉KOL 华为开发者学堂/51CTO学堂/CSDN学堂认证讲师 开放原子开源基金会2023开源贡献之星 「目录」 「一」HarmonyOS端云一体化概要 「二」体验HarmonyOS端云一…

【Bug合集】——Java大小写引起传参失败,获取值为null的解决方案

阿华代码&#xff0c;不是逆风&#xff0c;就是我疯 你们的点赞收藏是我前进最大的动力&#xff01;&#xff01; 希望本文内容能够帮助到你&#xff01;&#xff01; 目录 一&#xff1a;本文面向的人群 二&#xff1a;错误场景引入 三&#xff1a;正确场景引入 四&#xf…

【第4章 | 分类与逻辑回归】(python机器学习)

一、逻辑回归 1.1逻辑回归 二项逻辑回归 • Binomial logistic regression model是一种分类模型 • 由条件概率P(Y|X)表示的分类模型 • 形式化为logistic distribution • X取实数&#xff0c;Y取值1,0 特点&#xff1a; • 事件的几率odds&#xff1a;事件发生与事件不发生…

VSCode+ESP-IDF开发ESP32-S3-DevKitC-1(1)开发环境搭建

VSCodeESP-IDF开发ESP32-S3-DevKitC-1&#xff08;1&#xff09;开发环境搭建 1.开发环境搭建&#xff08;安装ESP-IDF&#xff09;2.开发环境搭建&#xff08;安装VS Code&#xff09;3.开发环境搭建&#xff08;VSCode中安装ESP-IDF插件及配置&#xff09; 1.开发环境搭建&am…

RAID存储技术 详解

RAID&#xff08;Redundant Array of Independent Disks&#xff0c;独立磁盘冗余阵列&#xff09;是一种将多个物理硬盘组合为一个逻辑存储单元的技术。它通过分布数据、冗余校验和容错能力&#xff0c;提高存储系统的性能、可靠性和容量利用率。 以下从底层原理和源代码层面…

深入理解TTY体系:设备节点与驱动程序框架详解

往期内容 本专栏往期内容&#xff1a;Uart子系统 UART串口硬件介绍 interrupt子系统专栏&#xff1a; 专栏地址&#xff1a;interrupt子系统Linux 链式与层级中断控制器讲解&#xff1a;原理与驱动开发 – 末片&#xff0c;有专栏内容观看顺序 pinctrl和gpio子系统专栏&#xf…

【大语言模型】ACL2024论文-17 VIDEO-CSR:面向视觉-语言模型的复杂视频摘要创建

【大语言模型】ACL2024论文-17 VIDEO-CSR&#xff1a;面向视觉-语言模型的复杂视频摘要创建 VIDEO-CSR&#xff1a;面向视觉-语言模型的复杂视频摘要创建 目录 文章目录 【大语言模型】ACL2024论文-17 VIDEO-CSR&#xff1a;面向视觉-语言模型的复杂视频摘要创建目录摘要研究…

华为openEuler考试真题演练(附答案)

【单选题】 以下关于互联网的描述&#xff0c;哪个选项是正确的? A:Nginx 在万维网中可以作为 ftp 服务器的反向代理&#xff0c;并与ftp服务器的数量--对应 B:Nginx 在互联网中可以作为 web服务器端&#xff0c;成为万维网的一个节点 C:互联网上的的资源需使用 Nginx进行七层…

03 —— Webpack 自动生成 html 文件

HtmlWebpackPlugin | webpack 中文文档 | webpack中文文档 | webpack中文网 安装 npm install --save-dev html-webpack-plugin 下载html-webpack-plugin本地软件包 npm i html-webpack-plugin --save-dev 配置webpack.config.js让webpack拥有插件功能 const HtmlWebpack…

STM32设计井下瓦斯检测联网WIFI加Zigbee多路节点协调器传输-分享

目录 目录 前言 一、本设计主要实现哪些很“开门”功能&#xff1f; 二、电路设计原理图 1.电路图采用Altium Designer进行设计&#xff1a; 2.实物展示图片 三、程序源代码设计 四、获取资料内容 前言 本系统基于STM32微控制器和Zigbee无线通信技术&#xff0c;设计了…