熬了四个大夜才搞明白,最晚一天熬到早上十点/(ㄒoㄒ)/~~,最后发现只要加个参数就解决了。。。抱头痛哭
问题描述:
Hadoop集群部署在docker容器中,宿主机执行pyspark程序读取hive表
问题一:当master('local[*]')时,docker容器返回给driver端datanode节点的内网ip地址,修改hosts只能将域名转发到ip地址,不能将ip地址转发给ip地址。
问题二:当master('spark://localhost:7077'),因为容器做了端口映射,这里使用的时localhost。driver端为宿主机,spark会把driver端的hostname传到spark集群worker节点上,spark work容器无法识别宿主机hostname
解决方法:
在宿主机配置好hosts,格式为:127.0.0.1 容器hostname(eg:datanode)
问题一:SparkSession加参数config("dfs.client.use.datanode.hostname", "true")//客户端(如 Spark Driver)通过主机名访问 DataNode。
问题二:SparkSession加参数config("spark.driver.host", "192.168.1.5") //宿主机ip地址
就是这么简单。。。哭死(;´༎ຶД༎ຶ`)
当时试了好多种办法,nginx反向代理、DNSmasq自定义DNS、NPS内网穿透、Macvlan网络模式,SNAT、最后甚至还装了k8s集群外加k8s监控界面,真的哭死。看看现在时间吧,已经4:03了。。。😭😭😭
最后附上完整代码:
import string
import random
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':# =============================== spark local模式 ===============================# 想要读写数据必须要配置windows下的Hadoop环境(不一定,未验证)# 想要使用Hive元数据必须添加enableHiveSupport()这行代码# 无论spark集群配置文件中有没有配置spark元数据信息,都要在代码工程中配置元数据信息,因为本地读取不到集群中的环境变量,创建hive-site.xml文件或代码中定义config都行# 如果不指定spark.sql.warehouse.dir信息则默认为: file:/C:/Users/yelor/spark-warehouse# 如果不知道hive.metastore.uris的值则找不到hive元数据,但不会报错,只是无法使用hive元数据# spark.sql.warehouse.dir和hive.metastore.uris的值可以在代码工程中配置hive-site.xml文件来指定# 容器外需要注意ip地址互通问题,需要配置hosts# 如果Hadoop集群部署在docker容器中,dfs.client.use.datanode.hostname=true在本地local模式下必须要加,不然spark会使用datanode的内网ip来通信import os# 这里可以选择本地win系统的PySpark环境执行pySpark代码,也可以使用虚拟机中PySpark环境,通过os可以配置。# os.environ['SPARK_HOME'] = r'D:\software2\spark-3.1.2-bin-hadoop3.2' # 暂时不写也行PYSPARK_PYTHON = r"D:\AnacondaCache\envs\mspro\python" #python.exe或者简写为python都行# 当存在多个python版本环境时(如使用conda),不指定python环境会报错os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON# 配置指定提交作业至HDFS的用户 不然上传文件到 HDFS 的时候会报权限错误 参考文章:https://zhuanlan.zhihu.com/p/538265736# os.environ["HADOOP_USER_NAME"] = "yelor" # 暂时不写也行# 在local模式下,如果想使用hive元数据,以下参数是必须要配置的:spark.sql.warehouse.dir、hive.metastore.urisspark = SparkSession.builder.\appName('udf_define').\master('local[*]').\config('spark.sql.shuffle.partitions', 2).\config('spark.sql.warehouse.dir', 'hdfs://localhost:9000/user/hive/warehouse').\config('hive.metastore.uris', 'thrift://localhost:9083').\config("spark.executor.memory", "1g").\config("spark.driver.memory", "1g").\config("dfs.client.use.datanode.hostname", "true").\config("spark.driver.host", "192.168.1.5").\enableHiveSupport(). \getOrCreate() # =============================== spark master集群模式 ===============================# 如果要连接spark集群,需要保证pyspark包的版本与集群的spark版本一致# 查看spark版本:spark-submit --version# 就算是使用集群执行作业,也必须要配置hive-site.xml文件中的信息,因为还是读取driver端的环境变量# 必须添加enableHiveSupport()这行代码# spark.driver.host保证代码传到spark容器中时以指定的ip地址为driver地址,不然会使用本机的hostname# import os# os.environ['PYSPARK_PYTHON']=r"D:\\AnacondaCache\\envs\\mspro\\python.exe"# spark = SparkSession.builder.\# appName('udf_define').\# master('spark://localhost:7077').\# config('spark.sql.warehouse.dir', 'hdfs://localhost:9000/user/hive/warehouse').\# config('hive.metastore.uris', 'thrift://localhost:9083').\# config("spark.executor.memory", "512m").\# config("spark.driver.memory", "512m").\# config("spark.driver.host", "192.168.1.5").\# enableHiveSupport(). \# getOrCreate() # 如果spark配置文件中没有配置spark元数据信息,就不能使用enableHiveSupport().\ 直接在代码中配置元数据信息也能脸上hive元数据sc = spark.sparkContext# 设置日志级别为 DEBUG# sc.setLogLevel("DEBUG")# 查看表的存储位置warehouse_dir = spark.conf.get("spark.sql.warehouse.dir")print(f"Spark SQL warehouse directory: {warehouse_dir}")# 指定要使用的数据库database_name = "tb"spark.sql(f"USE {database_name}")# 执行 SQL 查询# query = "show databases"# query = "SELECT * FROM orders"# query = "SELECT * FROM students"# df = spark.sql(query)# # 显示查询结果# df.show()# 生成 10000 条模拟数据data = []for _ in range(1000):# 生成随机的姓名和年龄name = ''.join(random.choices(string.ascii_letters, k=5))age = random.randint(18, 60)data.append((name, age))# 定义 DataFrame 的列名columns = ["name", "age"]# 创建 DataFramedf = spark.createDataFrame(data, columns)# 创建临时视图# df.createOrReplaceTempView("test_table")try:# 创建持久化表(可选)df.write.saveAsTable("testaa_table", mode="overwrite")# 验证数据插入spark.sql("SELECT * FROM testaa_table LIMIT 5").show()# 加入循环,保持 SparkSession 一直运行,方便看 Spark UIwhile True:try:import timetime.sleep(1)except KeyboardInterrupt:breakexcept Exception as e:print(f"An error occurred: {e}")finally:spark.stop()# 停止 SparkSession# spark.stop()