如何在Spark中使用gbdt模型分布式预测

这目录

  • 1 训练gbdt模型
  • 2 第三方包python环境打包
  • 3 Spark中使用gbdt模型
    • 3.1 spark配置文件
    • 3.2 主函数main.py
  • 4 spark任务提交

1 训练gbdt模型

我们可以基于lightgbm快速的训练一个gbdt模型,训练相对比较简单,只要把训练样本处理好,几行代码可以快速训练好模型,如下是训练一个多分类模型训练核心代码如下:

import lightgbm as lgb
import joblib
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
#假设处理好的训练样本为train.csv
df = pd.read_csv('./train.csv')
X = pd.drop(['label'],axis=1)
Y = df.label
# split data for val
x_train,x_val,y_train,y_val = train_test_split(X,Y,test_size=0.2,random_state=123)
# train model
cate_features=['sex','brand']
train_data = train_data = lgb.Dataset(x_train,label=y_train,categoryical_featrues=cate_features)
params = {'objective':'multiclass','learning_rate':0.1,'n_estimators':100,'num_class':23}
model = lgb.train(params, train_data,100)
#predict val
y_pred = model.predict(x_val)
y_pred = y_pred.argmax(axis=1)# acc
acc = accuracy_score(y_val, y_pred)
print(acc)# feature importance
feature_name = model.feature_name()
feature_importance = model.feature_importance()
feature_score = dict(zip(feature_name, feature_importance))
feature_score_sort = sorted(feature_score.items(),key=lambda x:x[1], reverse=True)# save model
joblib.dump(model, 'model.pkl')

上述就是基于lightgbm训练gbdt模型的代码,训练完后我们通过joblib保存了我们训练好的模型,这个模型接下来我们可以在spark进行分布式预测。

2 第三方包python环境打包

在使用spark的时候,我们可以自定义python环境,并且把我们需要的第三方包都可以安装该python环境里,这样在spark里我们就可以用python第三方包,比如等会我们需要的joblib, numpy等。具体如何配置python环境和第三方包,可以参考我上一篇博客:如何在spark中使用scikit-learn和tensorflow等第三方python包

3 Spark中使用gbdt模型

通过上述步骤,把需要的python环境和第三方包制作好了,包名为python39.zip,接下来我们介绍一下如何在spark中使用我们刚才训练好的gbdt模型进行分布式快速预测。

3.1 spark配置文件

提交spark任务的时候,配置文件这块也需要稍微修改一下,配置文件信息如下:

$SPARK_HOME/bin/spark-submit \
--master yarn \
--deploy-memory 12G
--executor-memory 20G \
--executor-cores 4 \
--queue root.your_queue_name \
--archives ./python39.zip#python39 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python39/python39/bin/python3.9 \
--conf spark.yarn.appMasterEnv,HADOOP_USER_NAME=your_hduser_name \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.enbled=true \
--conf spark.dynamicAllocation.maxExecutors=50 \
--conf spark.dynamicAllocation.minExecutors=50 \
--conf spark.braodcast.compress=True \
--conf saprk.network.timeout=1000s \
--conf spark.sql.hive.mergeFiles=true \
--conf spark.speculation=false \
--conf spark.yarn.executor.memoryOverhead=4096 \
--files $HIVE_CONF_DIR/hive-site.xml \
--py-files ./model.pkl \
$@

上述是基本的提交spark任务的配置文件,其中
–archives ./python39.zip#python39 \
–archives参数用于在Spark应用程序运行期间将本地压缩档案文件解压到YARN集群节点上。#python39 是为档案文件定义的别名,这将在Spark应用程序中使用。
这个参数的目的是将名为python39.zip的压缩文件解压到YARN集群节点,并将其路径设置为python39,以供Spark应用程序使用。这通常用于指定特定版本的Python环境,以便在Spark任务中使用。
–conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python39/python39/bin/python3.9
–conf参数用于设置Spark配置属性。
spark.yarn.appMasterEnv.PYSPARK_PYTHON 是一个Spark配置属性,它指定了YARN应用程序的主节点(ApplicationMaster)使用的Python解释器。
./python39/python39/bin/python3.9 是实际Python解释器的路径,它将在YARN应用程序的主节点上执行。
这个参数的目的是告诉Spark应用程序在YARN的主节点上使用特定的Python解释器即./python39/python39/bin/python3.9。这通常用于确保Spark应用程序使用正确的Python版本和环境来运行任务。
–py-files ./model.pkl \
–py-file是 Spark 提交任务时的一个参数,用于将指定的 .py 文件、.zip 文件或 .egg 文件分发到集群的所有 Worker 节点。Spark 会将这些文件自动添加到 Python 的模块路径中(即 sys.path),使得这些文件可以被任务中的代码引用。所以在这里我们将 model.pkl 模型文件分发到 Spark 集群的每个节点,确保每个节点在运行任务时都能访问并使用这个模型

3.2 主函数main.py

接下来,我们来看下如何在在spark调用我们训练好的gbdt模型进行预测,核心代码主要如下:
1)import基础函数功能包等

# -*-coding:utf8 -*-
import sys
from pyspark.sql.types import Row
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext, HiveContext
import datetime
import numpy as np
import joblibsave_table='your_target_table_name'
source_table = 'your_source_predict_table_name'
index_table = 'your_index_table_name'
# define saved table schema
schema = StructType([StructFiled('userid', StringType(), True),StructFiled('names', ArrayType(StringType()), True),StructFiled('scores', ArrayType(FloatType()), True)])
  1. main执行入口基础配置和执行流程
if __name__=='__main__':conf = SparkConf()sc = SparkContext(conf=conf, appName='gbdt_spark_predict')sc.setLogLevel("WARN")hiveCtx = HiveContext(sc)#hive基础配置hiveCtx.setConf('spark.shuffle.consolidateFiles','true')hiveCtx.setConf('spark.shuffle.memoryFraction','0.4')hiveCtx.setConf('spark.sql.shuffle.partitions','1000')if len(sys.argv) == 1:dt = datetime.datetime.now() + datetime.timedelta(-1)else:dt = datetime.datetime.strptime(sys.argv[1],"%Y%m%d').date()dt_str = dt.strftime('%Y-%m-%d')hiveCtx.sql("use your_datebase_name")#注册函数,在sql时候可以使用hiveCtx.registerFunction('null_chage', null_chage, StringType())#创建目标表create_table(hiveCtx)#主函数get_predict(hiveCtx)

上面主函数给出了一个基本的流程步骤:1)spark, hive context等初始化 2)注册函数可以直接在sql中使用,方便数据处理 3)建立目标hive表 4)执行功能函数

  1. 函数功能模块实现

在第2步骤里,我们主要有三个函数需要编写,一个是可以在sql中调用的基础函数,第二个就是创建表函数,第三个就是功能函数,我们接下来实现这三个的基本功能:

#用在sql中的基本数据操作处理
def null_chage(x):return 'unknow' if x is None else x#创建目标表
def create_table(hiveCtx):create_tbl = """CREATE EXTERNAL TABLE IF NOT EXISTS your_database_name.{table_name} (userid       string       COMMENT 'user id';names        array<string>   COMMENT 'predict label names')scores        array<float>   COMMENT 'predict socre')PARTITIONED BY(dt string, dp string)STORED AS ORCLOCATION 'hdfs://your_database_name.db/{table_name}'TBLPROPERTIES('orc.compress'='SNAPPY','comment'='gbdt predict user score')""".format(table_name=save_table)
# 功能函数
def get_predict(hiveCtx):# get label and idex datasql="""select index, valuefrom {index_table}where dt='active;""".format(index_table=index_table)print(sql)vocab = hiveCtx.sql(sql).rdd.collect()vocab_dict = dict()for x in vocab:vocab_dict.setdefault(x[0],x[1])# broadcastbr_vocab_dict = sc.broadcast(vocab_dict)# get predict datasql="""select null_chage(userid) as userid, featuresfrom {source_table}where dt='active'""".format(source_table=source_table)print(sql)hiveCtx.sql(sql).rdd.mapPartitions(lambda rows: main_func(rows, br_vocab_dict)) \.toDF(schema=schema) \.registerTempTable('final_tbl')# insert tableinsert_sql = """insert overwrite table {save_table} partition (dt='{dt}')select * from final_tbl""".format(save_table=save_table,dt='active')print(insert_sql)hiveCtx.sql(insert_sql)

接下来,我们来看下main_func函数的实现:

def main_func(rows, br_vocab_dict):# load modelmodel = joblib.load('./model.pkl')vocab_dict = br_vobab_dict.valuefor row in rows:userid, features = rowfeatures = np.array(features)predict = model.predict(features)predict_sort = np.argsort(-predict[0])names = [vocab_dict[idx] for idx in predict_sort]scores = [float(predict[0][idx]) for idx in predict_sort]yield userid, names, scores

整个代码的实现我们在这里就写完了,整体实现逻辑是比较清晰易懂的,按照这个流程来,我们可以很高效快速的基于spark分布式的跑一些数据处理和模型预测性的任务。

4 spark任务提交

接下来,就是提交我们的spark任务了,在工作环境目录如下文件信息:

  • 提前准备好的python环境包python39.zip
  • spark config文件 run_spark_arg.sh
  • 主函数代码 main.py
  • gbdt模型文件model.pkl

最后环节就是提交spark任务,我么可以在服务器提交命令如下:

nohup sh run_spark_arg.sh main.py >log.txt 2>&1 &

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/62764.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

38 基于单片机的宠物喂食(ESP8266、红外、电机)

目录 一、主要功能 二、硬件资源 三、程序编程 四、实现现象 一、主要功能 基于STC89C52单片机&#xff0c;采用L298N驱动连接P2.3和P2.4口进行电机驱动&#xff0c; 然后串口连接P3.0和P3.1模拟ESP8266&#xff0c; 红外传感器连接ADC0832数模转换器连接单片机的P1.0~P1.…

Python 【图像分类】之 PyTorch 进行猫狗分类功能的实现(Swanlab训练可视化/ Gradio 实现猫狗分类 Demo)

Python 【图像分类】之 PyTorch 进行猫狗分类功能的实现(Swanlab训练可视化/ Gradio 实现猫狗分类 Demo) 目录 Python 【图像分类】之 PyTorch 进行猫狗分类功能的实现(Swanlab训练可视化/ Gradio 实现猫狗分类 Demo) 一、简单介绍 二、PyTorch 三、CNN 1、神经网络 2、卷…

HTML5系列(7)-- Web Storage 实战指南

前端技术探索系列&#xff1a;HTML5 Web Storage 实战指南 &#x1f5c4;️ 致读者&#xff1a;本地存储的新纪元 &#x1f44b; 前端开发者们&#xff0c; 今天我们将深入探讨 HTML5 中的 Web Storage 技术&#xff0c;这是一个强大的本地存储解决方案&#xff0c;让我们能…

week 6 - SQL Select II

Overview 1. Joins 包括交叉连接&#xff08;Cross&#xff09;、内连接&#xff08;Inner&#xff09;、自然连接&#xff08;Natural&#xff09;、外连接&#xff08;Outer&#xff09; 2. ORDER BY to produce ordered output 3. 聚合函数&#xff08;Aggregate Functio…

算法训练营day23(二叉树09:修建二叉搜索树,有序数组转化为平衡二叉搜索树,二叉搜索树转化为累加树,二叉树专题总结)

第六章 二叉树part09今日内容&#xff1a;● 669. 修剪二叉搜索树 ● 108.将有序数组转换为二叉搜索树 ● 538.把二叉搜索树转换为累加树 ● 总结篇 详细布置 669. 修剪二叉搜索树 这道题目比较难&#xff0c;比 添加增加和删除节点难的多&#xff0c;建议先看视频理解。题目…

C语言操作符深度解析

目录 一、操作符的分类 1、算术操作符 1、1、 和- 1、2、* 1、3、/ 1、4、% 2、赋值操作符&#xff1a;和复合赋值 2、1、连续赋值 2、2、复合赋值符 3、单⽬操作符&#xff1a;、--、、- 3、1、和-- 3、1、1、前置 3、1、2、后置 3、2、1、前置-- 3、2、2、后…

打造高质量技术文档的关键要素(结合MATLAB)

在技术的浩瀚海洋中&#xff0c;一份优秀的技术文档宛如精准的航海图。它不仅是知识传承的载体&#xff0c;也是团队协作的桥梁&#xff0c;更是产品成功的幕后英雄。打造出色的技术文档并非易事&#xff0c;以下将从多个方向探讨如何做到这一点。 文章目录 方向一&#xff1a;…

《C++与人工智能:照亮能源可持续发展之路》

在全球对能源需求持续攀升以及对可持续发展日益重视的当下&#xff0c;如何有效解决能源领域的复杂问题成为了亟待攻克的关键挑战。而 C与人工智能技术的融合&#xff0c;正犹如一盏明灯&#xff0c;为能源管理、可再生能源预测等方面开辟出全新的路径&#xff0c;有力地推动着…

Python 深度学习框架之Keras库详解

文章目录 Python 深度学习框架之Keras库详解一、引言二、Keras的特点和优势1、用户友好2、多网络支持3、跨平台运行 三、Keras的安装和环境配置1、软硬件环境2、Python虚拟环境 四、使用示例1、MNIST手写数字识别 五、总结 Python 深度学习框架之Keras库详解 一、引言 Keras是…

【大语言模型】ACL2024论文-23 检索增强的多语言知识编辑

【大语言模型】ACL2024论文-23 检索增强的多语言知识编辑 目录 文章目录 【大语言模型】ACL2024论文-23 检索增强的多语言知识编辑目录摘要研究背景问题与挑战如何解决核心创新点算法模型实验效果&#xff08;包含重要数据与结论&#xff09;相关工作后续优化方向 后记 检索增强…

android user版本默认usb模式为充电模式

android插入usb时会切换至默认设置的模式&#xff0c;debug版本为adb&#xff0c;user版本为mtp protected long getChargingFunctions() {// if ADB is enabled, reset functions to ADB// else enable MTP as usual.if (isAdbEnabled()) {return UsbManager.FUNCTION_ADB;} e…

_C#_串口助手_字符串拼接缺失问题(未知原理)

最近使用WPF开发串口助手时&#xff0c;遇到一个很奇怪的问题&#xff0c;无论是主线程、异步还是多线程&#xff0c;当串口接收速度达到0.016s一次以上&#xff0c;就会发生字符串缺失问题并且很卡。而0.016s就一切如常&#xff0c;仿佛0.015s与0.016s是天堑之隔。 同一份代码…

CF Round988 题解报告

/***实力还是要努力 D 赛时我过了&#xff0c;就不讲了&#xff0c;毕竟我过的也大概是简单题&#xff1b; 代码&#xff1a; #include<iostream> #include<queue> using namespace std; #define int long long int t; int n,m,l; struct hurdle{int l,r,len; …

基于Python的猎聘网招聘数据采集与可视化分析

1.1项目简介 在现代社会&#xff0c;招聘市场的竞争日趋激烈&#xff0c;企业和求职者都希望能够更有效地找到合适的机会与人才。猎聘网作为国内领先的人力资源服务平台&#xff0c;汇聚了大量的招聘信息和求职者数据&#xff0c;为研究招聘市场趋势提供了丰富的素材。基于Pyt…

HTML 常用标签属性汇总一<input> 标签

1.可选的属性 DTD 指示此属性允许在哪种 DTD 中使用.SStrict, TTransitional, FFrameset。 属性 值 描述 DTD accept mime_type 规定通过文件上传来提交的文件的类型。 STF align ​​​​​​​ left center right middle bottom 不赞成使用。规定图像输入的对齐方式…

基于Java Springboot高校社团微信小程序

一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术&#xff1a;Html、Css、Js、Vue、Element-ui 数据库&#xff1a;MySQL 后端技术&#xff1a;Java、Spring Boot、MyBatis 三、运行环境 开发工具&#xff1a;IDEA/eclipse 微信…

springboot(20)(删除文章分类。获取、更新、删除文章详细)(Validation分组校验)

目录 一、删除文章分类功能。 &#xff08;1&#xff09;接口文档。 1、请求路径、请求参数。 2、请求参数。 3、响应数据。 &#xff08;2&#xff09;实现思路与代码书写。 1、controller层。 2、service接口业务层。 3、serviceImpl实现类。 4、mapper层。 5、后端接口测试。…

vim 显示行数和删除内容操作

在 Vim 中&#xff0c;显示行数和删除内容是两个常见的操作&#xff0c;结合使用可以帮助你更加高效地编辑文件。以下是关于如何在 Vim 中显示行数和删除内容的详细说明&#xff1a; 1. 显示行数 显示绝对行号 绝对行号会显示每一行的实际行号&#xff0c;适合你查看文件的大…

【前端】特殊案例分析深入理解 JavaScript 中的词法作用域

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: 前端 文章目录 &#x1f4af;前言&#x1f4af;案例代码&#x1f4af;词法作用域&#xff08;Lexical Scope&#xff09;与静态作用域什么是词法作用域&#xff1f;代码执行的详细分析 &#x1f4af;函数定义与调用的…

Node.js 实战: 爬取百度新闻并序列化 - 完整教程

很多时候我们需要爬取一些公开的网页内容来做一些数据分析和统计。而多数时候&#xff0c;大家会用到python &#xff0c;因为实现起来很方便。但是其实Node.js 用来爬取网络内容&#xff0c;也是非常强大的。 今天我向大家介绍一下我自己写的一个百度新闻的爬虫&#xff0c;可…