R的多进程使用与改进
在R中需要使用多进程时,常见方案是使用foreach
和doParallel
的组合。
foreach
foreach
包中最重要的是foreach
函数,该函数创建一个foreach
对象,随后串行或并行的执行表达式。
library(foreach)?foreach
out:
foreach( ..., .combine, .init, .final = NULL, .inorder = TRUE, .multicombine = FALSE, .maxcombine = if (.multicombine) 100 else 2, .errorhandling = c("stop", "remove", "pass"), .packages = NULL, .export = NULL, .noexport = NULL, .verbose = FALSE)e1 %:% e2when(cond)obj %do% exobj %dopar% extimes(n)
foreach
函数在创建时常用的几个参数为:
...
: 表达式中使用的变量。
.packages
: 表达式依赖的包,字符向量。
.export
: 表达式依赖的变量,字符向量。
.combine
: 运算后结果的组合方式,默认为列表,可选 'c'
'cbind'
'rbind'
'+'
'*'
等。
.errorhandling
: 当运行中出现错误时的处理方式。
使用时,对象后接%do%
为串行,%dopar%
为并行。
foreach(i=1:3) %do% sqrt(i)
out:
[[1]][1] 1[[2]][1] 1.414214[[3]][1] 1.732051
当使用%do%
执行时,程序将会自动忽略.packages
与.export
变量。
如果需要使用多进程,不只需要更换成%dopar%
,你还需要注册集群,执行,结束集群。
library(doParallel)cl = makeCluster(4) #注册4个进程的集群registerDoParallel(cl)foreach(i=1:3) %dopar% sqrt(i)stopCluster(cl) # 记得结束集群
包装
对多进程进行包装,形成runParallel
函数。
library(foreach)library(doParallel)runParallel = function(FUN,PARAMS,packages = NULL,export = NULL){ cl = makeCluster(4) registerDoParallel(cl) N = length(PARAMS) R = foreach( i = 1:N, .packages = packages, .export = export, .errorhandling = 'stop' ) %dopar% { r = do.call(FUN, PARAMS[[i]]) r } stopCluster(cl) R}
程序中的do.call
能够使用提供的参数运行FUN
函数。
runParallel
函数传入FUN
与并行参数的列表集合PARAMS
,就可以使用FUN
对每个值进行处理,然后返回全部值。
问题
在实际使用中遇到这样一个问题,在这里把问题稍微简化一下。
有两个文件,do_some_thing.R
和do_some_other_thing.R
,里面各自编写了一个函数。
do_some_thing.R
do_some_thing = function(x){ do_some_other_thing(x**2)}
do_some_other_thing.R
do_some_other_thing = function(x){ x / 2}
很明显,do_some_thing.R
中引用了do_some_other_thing.R
中的函数。
现在我source
这两个文件并暴露这两个函数,编写一个函数调用do_some_thing
。
some_thing = new.env()source('do_some_thing.R',local = some_thing)some_other_thing = new.env()source('do_some_other_thing.R',local = some_other_thing)attach(some_thing)attach(some_other_thing)fun = function(x){do_some_thing(x+1)}
然后进行多进程调用。
params = lapply(1:10, list)runParallel(fun,params)
得到错误。
Error in { : task 1 failed - "没有"do_some_thing"这个函数"
找不到do_some_thing
函数,然而当我们加上所有可能的.export
变量后。
runParallel(fun,params,export=c('do_some_thing','do_some_other_thing','some_thing','some_other_thing'))
仍然失败。
Error in { : task 1 failed - "没有"do_some_other_thing"这个函数"
有趣的是找不到的函数变成了do_some_other_thing
,然而我明明export
这个变量了啊。
在搜索中,回答普遍认为doParallel
存在设计缺陷,它不能顺利的找到函数内调用的其他自定义函数。
在不停的搜索过后,我终于找到了一个解决方案,使用doFuture
包[1]。
doFuture
包会自动解决这类依赖问题,无需提供.packages
和.export
参数,同时无需手动结束集群。
一个更改后的版本是这样的。
runParallel = function(FUN,PARAMS){ registerDoFuture() plan(multisession, workers = 4) #在这里指定进程数 N = length(PARAMS) R = foreach( i = 1:N, .errorhandling = 'stop' ) %dopar% { r = do.call(FUN, PARAMS[[i]]) r } R}
runParallel(fun,params)
out:
[[1]][1] 2[[2]][1] 4.5······
成功。
十分推荐doFuture
包!
我
我是 SSSimon Yang,关注我,用code解读世界
References
[1]
doFuture
包: https://github.com/HenrikBengtsson/doFuture