Spark java.io.NotSerializableException

文章目录

    • 源代码
    • 错误原因
    • 错误信息
    • 方案一
      • 方案一具体代码
    • 方案二
      • 方案二具体代码

源代码

//编写SQL语句
val sql = "insert into province_browser_cnt(`province`,`browser`,`operator_cnt`) values(?,?,?)"
//获取mysql的连接
val conn = JDBCUtil.getConn()
//将如下的结果写入数据库
userRDD.map(line => ((line(6), line(7)), 1)).reduceByKey(_ + _).foreach(line=> {//创建psval ps = conn.prepareStatement(sql)ps.setString(1, line._1._1) //设置省份ps.setString(2, line._1._2) //设置浏览器ps.setInt(3, line._2) //设置统计值//提交ps.execute()})

错误原因

因为ps是一个PrepareStatement对象,这个对象无法序列化,而传入foreach中的对象是需要分布式传送到各个节点上,传送前先序列化,到达相应服务器上后再反序列化,PrepareStatement是个Java类,如果一个Java类想(反)序列化,必须实现Serialize接口,PrepareStatement并没有实现这个接口。

错误信息

Exception in thread "main" org.apache.spark.SparkException: Task not serializableat org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:971)at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:970)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)at org.apache.spark.rdd.RDD.foreach(RDD.scala:970)at edu.nanzhi.core.useraction.UserActionExternalPro$.main(UserActionExternalPro.scala:68)at edu.nanzhi.core.useraction.UserActionExternalPro.main(UserActionExternalPro.scala)
Caused by: java.io.NotSerializableException: com.mysql.cj.jdbc.DatabaseMetaData
Serialization stack:- object not serializable (class: com.mysql.cj.jdbc.DatabaseMetaData, value: com.mysql.cj.jdbc.DatabaseMetaData@59c70ceb)- field (class: com.mysql.cj.jdbc.ConnectionImpl, name: dbmd, type: interface java.sql.DatabaseMetaData)- object (class com.mysql.cj.jdbc.ConnectionImpl, com.mysql.cj.jdbc.ConnectionImpl@210d2a6c)- field (class: edu.nanzhi.core.useraction.UserActionExternalPro$$anonfun$main$18, name: conn$1, type: interface java.sql.Connection)- object (class edu.nanzhi.core.useraction.UserActionExternalPro$$anonfun$main$18, <function1>)at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)... 11 more

方案一

添加collect()函数将结果数据收集到driver端。对象ps在driver端,collect后的数据也在driver端,就不需ps序列化数据传到各个节点了,因此就不会出现序列化错误。

方案一具体代码

userRDD.map(line => ((line(6), line(7)), 1)).reduceByKey(_ + _).collect().foreach(line=> {//创建psval ps = conn.prepareStatement(sql)ps.setString(1, line._1._1) //设置省份ps.setString(2, line._1._2) //设置浏览器ps.setInt(3, line._2) //设置统计值//提交ps.execute()})

如上代码改正后将不会再报序列化错误。

方案二

因为方案一采用collect(),如果数据量大,将会消耗大量Driver端内存,影响整个作业性能,严重者甚至会导致Driver宕掉,导致整个作业失败。所以建议采用分区循环方式foreachPartition实现。foreachPartition函数和foreach函数类似,foreachPartition是将rdd的每一个分区循环一次,而foreach函数是上游每行数据循环一次。foreachPatition可以将每个分区的数据以迭代器方式迭代操作,而foreach函数则不能,通常foreachPatition比foreach要高效。比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用foreach函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用foreachPartition,那么只需要针对每一个分区建立一个connection。同时,foreachPartitioin可以迭代进行处理,但foreach里的对象仍然还是需要反序列化。

方案二具体代码

userRDD.map(line => ((line(6), line(7)), 1)).reduceByKey(_ + _).foreachPartition(it => {val conn = JDBCUtil.getConn()try {//获取ps对象val ps = conn.prepareStatement(sql)//链接自动提交关闭conn.setAutoCommit(false)//循环迭代器it.foreach(line => {ps.setString(1, line._1._1) //设置省份ps.setString(2, line._1._2) //设置浏览器ps.setInt(3, line._2) //设置统计值//提交ps.addBatch()})//循环完后批次执行ps.executeBatch()//链接提交--->手动提交必须要关闭自动提交,默认事自动提交conn.commit()}})

如上代码改正后将不会再报序列化错误,同时比方案一更加高效。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/4105.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于discuz论坛网址优化的一些记录(伪静态)

最近网站刚上线&#xff0c;针对SEO做了些操作&#xff0c;为了方便网站网页被收录&#xff0c;特此记录下 1.开启伪静态 按照操作勾选所有项&#xff0c;然后点击查看伪静态规则 2.打开宝塔&#xff0c;找到左侧列表的网站&#xff0c;然后找到相应站点的设置。把discuz自动…

STM32的端口引脚的复用功能及重映射功能解析

目录 STM32的端口引脚的复用功能及重映射功能解析 复用功能 复用功能的初始化 重映射功能 重映射功能的初始化 复用功能和重映射的区别 部分重映射与完全重映射 补充 STM32的端口引脚的复用功能及重映射功能解析 复用功能 首先、我们可以这样去理解stm32引脚的复用功能…

SD-WAN怎样助力企业网络升级

随着企业规模的持续扩张&#xff0c;其网络建设的重要性日益凸显&#xff0c;成为业务成功的基石。尤其对于中小企业而言&#xff0c;信息化和电脑化已成为推动生产力和竞争力提升的关键所在。办公室自动化、数据库、ERP、CRM、物流供应链等关键业务应用的不断增加&#xff0c;…

K8s: Service对象以及与Pod之间的通信关系

Service 对象 1 &#xff09;概述 每个 Pod 都有自己的 IP 地址&#xff0c;但是在 Deployment 中注意&#xff0c;实际在部署我们服务的时候创建的是 Deployment 而非 pod Deployment 是控制器的一种 在同一时刻运行的 Pod 集合可能与稍后运行该应用程序的 Pod 集合不同这导致…

css 文字左右抖动效果

<template><div class"box"><div class"shake shape">抖动特效交字11</div></div> </template><script setup></script><style scope> .shape {margin: 50px;width: 200px;height: 50px;line-heigh…

计算机存储原理.2

1.主存储器与CPU之间的连接 2.存储器芯片的输入输出信号 3.增加主存的存储字长 3.1位扩展 数据总线的利用成分是不充分的(单块只能读写一位)&#xff0c;为了解决这个问题所以引出了位扩展。 使用多块存储芯片解决这个问题。 3.2字扩展 因为存储器买的是8k*8位的&am…

Linear Secret-Sharing Scheme(LSSS) Monotone Span Program(MSP)

参考文献&#xff1a; [KW93] Karchmer M, Wigderson A. On span programs[C]//[1993] Proceedings of the Eigth Annual Structure in Complexity Theory Conference. IEEE, 1993: 102-111.[CDM00] Cramer R, Damgrd I, Maurer U. General secure multi-party computation fr…

【C++风云录】走向智能农业时代:利用C++库实现农田管理和食品质量监测的突破

农业科学与食品安全&#xff1a;利用C库实现智慧农业的梦想 前言 随着科技的不断进步&#xff0c;农业科学和食品安全已经成为人们关注的焦点。农业生产的效率和质量对于满足不断增长的人口需求和保障食品安全至关重要。为了提高农业生产的效率和可持续性&#xff0c;利用计算…

【探索Java编程:从入门到入狱】Day2

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【Java、PHP】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收…

K8s: 控制器之StatefulSets对象

StatefulSet 1 ) 概述 Stateful&#xff0c;也就是有状态应用&#xff0c;微服务无状态是一个理想的这么一个环境有些应用是有状态的&#xff0c;比如这个web服务器&#xff0c;它只能运行在一台server上因为它要访问一些持久化的存储比如说 mysql 它就是一个典型的有状态的应…

js[黑马笔记]

js基础 基础语法 输入输出 变量 数组 常量 数据类型 类型转换 运算符 语句 数组 函数 调用方式 函数名() 匿名函数 使用: 1.函数表达式 2.立即执行函数 对象 内置对象 web API DOM document object Model元素操作 获取元素 设置元素 定时器 DOM事件基础 事件监听 事件类…

MySQL商城数据表(70-79)

70店铺入驻流程表 DROP TABLE IF EXISTS xuge_shop_flows; CREATE TABLE xuge_shop_flows (flowId int(11) NOT NULL AUTO_INCREMENT,flowName varchar(100) NOT NULL,isShow tinyint(4) DEFAULT 1 COMMENT 0:隐藏 1:显示,sort tinyint(4) DEFAULT 0,isDelete tinyint(4) DEFA…

流量网关与服务网关的区别:(面试题,掌握)

流量网关&#xff1a;&#xff08;如Nignx&#xff0c;OpenResty&#xff0c;Kong&#xff09;是指提供全局性的、与后端业务应用无关的策略&#xff0c;例如 HTTPS证书认证、Web防火墙、全局流量监控&#xff0c;黑白名单等。 服务网关&#xff1a;&#xff08;如Spring Clou…

含匹配扰动的多智能体领航跟随一致性Matlab仿真

文章目录 [TOC](文章目录) 前言一、问题描述二、基于LQR的观测器和控制器设计1.观测器设计2.控制器设计 三、数值仿真四、参考文献总结 前言 ​本文探讨了带有匹配扰动的多智能体领航跟随一致性控制方法&#xff0c;并提供了相应的Matlab仿真代码。 具体的设计步骤如下&#…

filebeat 设置elasticsearch索引的 max_result_window

在 Filebeat 中设置索引的 max_result_window 需要修改 Elasticsearch 的索引模板。max_result_window 参数定义了在 Elasticsearch 中执行搜索时&#xff0c;最大返回文档的数量。默认情况下&#xff0c;该值为 10000。 答案来着gpt demo&#xff1a;http://124.220.104.235/ …

大数据—数据采集DataX

一、DataX介绍 官网&#xff1a; DataX/introduction.md at master alibaba/DataX GitHub DataX 是阿里云 DataWorks数据集成 的开源版本&#xff0c;在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。 DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、…

【算法刷题 | 贪心算法02】4.24(摆动序列)

文章目录 3.摆动序列3.1题目3.2解法&#xff1a;贪心3.2.1贪心思路3.2.2代码实现 3.摆动序列 3.1题目 如果连续数字之间的差严格地在正数和负数之间交替&#xff0c;则数字序列称为 摆动序列 。 第一个差&#xff08;如果存在的话&#xff09;可能是正数或负数。仅有一个元素…

机器人系统能用MQTT5.0代替ROS2吗?

前言 ROS2是目前最主流的机器人系统&#xff0c;但由于ROS2的学习曲线比较徒陗&#xff0c;而且对于资源受限的系统并不友好&#xff1b;而MQTT5.0是最新的MQTT消息传输协议&#xff0c;为现代IoT提供了更友好的支持&#xff0c;下面讨论MQTT5.0和ROS2结合使用&#xff0c;或机…

WPF 资源基础

动态资源/静态资源 UI代码 <Window x:Class"WpfApp1.MainWindow"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml"xmlns:d"http://schemas.microsoft.com/ex…

太速科技-基于6 U VPX M.2 高带宽加固存储板

基于6 U VPX M.2 高带宽加固存储板 一、板卡概述 基于6 U VPX M.2 高带宽加固存储板&#xff0c;可以实现VPX接口的数据读写到PCI-E总线的NVME存储媒介上。采用PLX8732&#xff0c;上行链路提供带宽x16的PCI-E数据到VPX接口上&#xff1b;下行链路提供3路带宽x4的PCI-E接口…