用pyspark把kafka主题数据经过etl导入另一个主题中的有关报错

首先看一下我们的示例代码

import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
"""
------------------------------------------Description : TODO:SourceFile : etl_stream_kafkaAuthor  : zxxDate  : 2024/11/14
-------------------------------------------
"""
if __name__ == '__main__':os.environ['JAVA_HOME'] = 'D:/bigdata/03-java/java-8/jdk'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/bigdata/04-Hadoop/hadoop/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'D:/bigdata/22-spark/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'D:/bigdata/22-spark/Miniconda3/python.exe'spark = SparkSession.builder.master("local[2]").appName("etl_stream_kafka").config("spark.sql.shuffle.partitions", 2).getOrCreate()# 连接kafkareadDF = spark.readStream.format("kafka") \.option("kafka.bootstrap.servers", "bigdata01:9092") \.option("subscribe", "topicA") \.load()# 使用DSL语句etlDF = readDF.selectExpr("cast(value as STRING)").filter(F.col("value").contains("success"))etlDF.writeStream \.format("kafka") \.option("kafka.bootstrap.servers", "bigdata01:9092") \.option("topic", "etlTopic") \.option("checkpointLocation", "../../datas/kafka_stream") \.start().awaitTermination()# 关闭spark.stop()

运行发现报错

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):File "D:\bigdata\18-python\pyspark_project\pythonProject1\main\streamingkafka\etl_stream_kafka.py", line 22, in <module>readDF = spark.readStream.format("kafka") \File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\pyspark\sql\streaming.py", line 482, in loadreturn self._df(self._jreader.load())File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\py4j\java_gateway.py", line 1304, in __call__return_value = get_return_value(File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\pyspark\sql\utils.py", line 117, in decoraise converted from None
pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

报错 : org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

解决:这个是因为缺少了Kafka和Spark的集成包,前往https://mvnrepository.com/artifact/org.apache.spark

下载对应的jar包即可,比如我是SparkSql写入的Kafka,那么我就需要下载Spark-Sql-Kafka.x.x.x.jar

 进入网站(已打包放入文章末尾)

找到对应有关spark 和kafka的模块

找到对应的版本 ,这里我用的kafka是3.0版本,下载的是3.1.2版本

 点进去,下载jar包

 再次运行会发现仍然报错,这是因为jar包之间的依赖关系,从刚才下载的界面下面再下载有关的jar包

 

 

 

 再次运行即可

 jar包下载链接

【免费】用pyspark把数据从kafka的一个主题用流处理后再导入kafka的另一个主题的有关报错资源-CSDN文库

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/886878.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【MyBatis 源码阅读与笔记】Mapper 接口的动态代理实现

MyBatis 源码阅读与笔记 题目&#xff1a;Mapper 接口的动态代理实现 研究 MyBatis 如何通过动态代理为 Mapper 接口创建实现类。重点分析 MapperProxy 类&#xff0c;理解其如何拦截方法调用并执行 SQL。 笔记 1 动态代理原理 MyBatis 使用 JDK 动态代理为 Mapper 接口创建…

STM32芯片EXIT外部中断的配置与原理以及模板代码(标准库)

配置EXIT外部中断其实就是把GPIO刀NVIC的各个外设配置好 第一步&#xff1a;配置RCC&#xff0c;把我们涉及到的外设的时钟都打开 &#xff08;此处EXTI是默认打开的&#xff0c;而NVIC是内核外设无需配置&#xff09; 第二步&#xff1a;配置GPIO,选择端口为输入模式 第三…

深入解析PostgreSQL中的PL/pgSQL语法

在数据库管理系统中&#xff0c;PostgreSQL因其强大的功能和稳定性而受到广泛欢迎。其中&#xff0c;PL/pgSQL作为PostgreSQL的过程化语言&#xff0c;为用户提供了更为灵活和强大的编程能力。本文将深入解析PL/pgSQL的语法&#xff0c;帮助读者更好地掌握这门语言&#xff0c;…

misc设备驱动

MISC 理解 简化创建设备号&#xff0c;cdev&#xff0c;class&#xff0c;device 的步骤。 设备树 代码 模块初始化和退出 平台下驱动匹配和移除 MiSC 结构体 文件描述符的处理函数 其他

layui合并table相同内的行

<table border"1" id"table1" class"layui-table"><thead><tr><th><b>姓名</b></th><th><b>项目</b></th><th><b>任务</b></th><th><b>…

java ssm 羽绒服商城网站 在线商城 在线服饰销售网站 源码 jsp

一、项目简介 本项目是一套基于SSM的羽绒服商城网站&#xff0c;主要针对计算机相关专业的和需要项目实战练习的Java学习者。 包含&#xff1a;项目源码、数据库脚本、软件工具等。 项目都经过严格调试&#xff0c;确保可以运行&#xff01; 二、技术实现 ​后端技术&#xff…

程序里sendStringParametersAsUnicode=true的配置导致sql server cpu使用率高问题处理

一 问题描述 近期生产环境几台sql server从库cpu使用率总是打满&#xff0c;发现抓的带变量值的慢sql&#xff0c;手动代入变量值执行并不慢&#xff0c;秒级返回&#xff0c;不知道问题出在哪里。 二 问题排查 用扩展事件或者sql profiler抓慢sql&#xff0c;抓到了变量值&…

《强激光与粒子束》

投 稿 须 知 1. 投稿著作所有列名作者皆同意在投稿文章经《强激光与粒子束》&#xff08;简称《强》刊&#xff09;刊登后&#xff0c;其著作财产权即转让给《强激光与粒子束》编辑部&#xff0c;但作者仍保有著作人身权&#xff0c;并保有本著作未来自行集结、教学等个人使用…

基于 MUSA 的大语言模型推理和服务框架vLLM

1. 引言​ vLLM是一个高性能且内存高效的大语言模型推理和服务框架&#xff0c;也是当前业界使用范围最广的大模型推理框架&#xff0c;截至目前github star数28.4k。该框架性能优秀&#xff0c;而且部署容易&#xff0c;使用CUDA/ROCm提供GPU加速能力。但vLLM目前不支持使用摩…

OpenAI震撼发布:桌面版ChatGPT,Windows macOS双平台AI编程体验!

【雪球导读】 「OpenAI推出ChatGPT桌面端」 OpenAI重磅推出ChatGPT桌面端&#xff0c;全面支持Windows和macOS系统&#xff01;这款新工具为用户在日常生活和工作中提供了前所未有的无缝交互体验。对于那些依赖桌面端进行开发工作的专业人士来说&#xff0c;这一更新带来了令人…

Python小白学习教程从入门到入坑------习题课3(基础巩固)

目录 一、选择题 二、实战题 2.1 实战一:从键盘获取一个4位整数&#xff0c;分别输出个位、十位、百位、千位上的数字 2.2 实战二&#xff1a;根据父母身高预测儿子的身高 一、选择题 1、以下哪项不是Python语言的保留字符&#xff08; C &#xff09; A. False B. and C. …

湘潭大学软件工程算法设计与分析考试复习笔记(四)

回顾 湘潭大学软件工程算法设计与分析考试复习笔记&#xff08;一&#xff09;湘潭大学软件工程算法设计与分析考试复习笔记&#xff08;二&#xff09;湘潭大学软件工程算法设计与分析考试复习笔记&#xff08;三&#xff09; 前言 现在是晚上十一点&#xff0c;我平时是十…

Python学习29天

二分查找 # 定义函数冒泡排序法从大到小排列 def bbble_sort(list):# i控制排序次数for i in range(len(list) - 1):# j控制每次排序比较次数for j in range(len(list) - 1 - i):if list[j] < list[j 1]:list[j], list[j 1] list[j 1], list[j] # 定义二分查找函数 def…

SparkSQL的执行过程:从源码角度解析逻辑计划、优化计划和物理计划

SparkSQL的执行过程可以分为以下几个阶段&#xff1a;从用户的SQL语句到最终生成的RDD执行&#xff0c;涵盖逻辑计划、优化计划和物理计划。以下是详细的源码角度解析&#xff1a; 1. 解析阶段&#xff08;Parsing&#xff09; SQL语句解析&#xff1a;Spark 使用 Catalyst 引…

(Linux)搭建静态网站——基于http/https协议的静态网站

简单了解nginx配置文件 1.下载并开启nginx服务 下载 [rootlocalhost ~]# dnf install nginx -y开启 [rootlocalhost ~]# systemctl restart nginx 1.(1)搭建静态网站——基于http协议的静态网站 实验1&#xff1a;搭建一个web服务器&#xff0c;访问该服务器时显示“hello w…

【数据结构-表达式解析】力扣227. 基本计算器 II

给你一个字符串表达式 s &#xff0c;请你实现一个基本计算器来计算并返回它的值。 整数除法仅保留整数部分。 你可以假设给定的表达式总是有效的。所有中间结果将在 [-231, 231 - 1] 的范围内。 注意&#xff1a;不允许使用任何将字符串作为数学表达式计算的内置函数&#…

「六」体验HarmonyOS端云一体化开发模板——本地真机运行应用

关于作者 白晓明 宁夏图尔科技有限公司董事长兼CEO、坚果派联合创始人 华为HDE、润和软件HiHope社区专家、鸿蒙KOL、仓颉KOL 华为开发者学堂/51CTO学堂/CSDN学堂认证讲师 开放原子开源基金会2023开源贡献之星 「目录」 「一」HarmonyOS端云一体化概要 「二」体验HarmonyOS端云一…

【Bug合集】——Java大小写引起传参失败,获取值为null的解决方案

阿华代码&#xff0c;不是逆风&#xff0c;就是我疯 你们的点赞收藏是我前进最大的动力&#xff01;&#xff01; 希望本文内容能够帮助到你&#xff01;&#xff01; 目录 一&#xff1a;本文面向的人群 二&#xff1a;错误场景引入 三&#xff1a;正确场景引入 四&#xf…

使用Kotlin写一个将字符串加密成short数组,然后可以解密还原成原始的字符串的功能

文章目录 一、运行效果1.1 单个字符串加解密1.2 多个字符串数组加解密二、源代码2.1 控制流图2.2 实现的源代码一、运行效果 1.1 单个字符串加解密 待加密的单个字符串: 测试字符串转化成short数组-----字节卷动 单个字符串加密后的数据: [19914, -21676, 31702, 23463, 2833…

云原生学习

1、云原生学习 文章目录 1、云原生学习1. 介绍2. Docker容器化 1. 介绍 什么是云原生&#xff1f;原生指使用JAVA等语言编写的项目&#xff0c;云是指将项目部署到云服务器上云平台&#xff1a;公有云、私有云 本地平台是指直接部署在自己计算机&#xff0c;而开发的应用一定要…