※この記事をもとにパッケージを作成しました
R で超簡単に並列処理を書けるパッケージ pforeach を作った
さて、これまでの集大成として、R の foreach を自分用にカスタマイズしてみましょう。
まず、理解すべきことは、foreach は R の文法ではなくて、関数を使って実現されているということです。これを理解していれば、自分用の foreach()
関数として、pforeach()
を定義することができます。
pforeach <- function(...) {
return(foreach(...))
}
library(foreach)
pforeach(i = 1:3, .combine=c) %do% ({
rnorm(1)
})
使い方は foreach()
関数と同じです。内部ではまだ単に foreach()
を読んでいるだけですね。
では、最初にデフォルトの結合方法を変えてみましょう。
foreach()
をデフォルトで実行するとリストが返ってきますが、私は .combine=c
として実行することが多いので、これをデフォルトにしてみます。
pforeach <- function(..., .combine=c) {
return(foreach(..., .combine=.combine))
}
library(foreach)
pforeach(i = 1:3) %do% ({
rnorm(1)
})
すっきりしました。
ここからが本題です。
pforeach
をデフォルトで並列実行するように変更してみましょう(pforeach の p は parallel を意識しています)。
registerDoSEQ() とクラスタを implicit にする方法がここで使えます。
pforeach <- function(..., .combine=c, .parallel=TRUE, .debug=!.parallel) {
if(!require(doParallel)) stop("install.packages('doParallel')")
if(!.parallel || .debug) {
foreach::registerDoSEQ()
} else {
doParallel::registerDoParallel(parallel::detectCores())
}
return(foreach(..., .combine=.combine))
}
library(foreach)
pforeach(i = 1:3) %dopar% ({
rnorm(1)
})
stopImplicitCluster2()
stopImplicitCluster2()
が出てきましたが、これは後で消します。
コア数はデフォルトで最大コアにしていますが、これを指定できるようにしてみましょう。
pforeach <- function(..., .combine=c, .parallel=TRUE, .debug=!.parallel, .cores) {
if(!require(doParallel)) stop("install.packages('doParallel')")
if(!.parallel || .debug) {
foreach::registerDoSEQ()
} else {
if(missing(.cores)) .cores=parallel::detectCores()
else if(.cores <= 0) .cores=parallel::detectCores() + .cores
doParallel::registerDoParallel(.cores)
}
return(foreach(..., .combine=.combine))
}
library(foreach)
pforeach(i = 1:3, .cores=2) %dopar% ({
rnorm(1)
})
stopImplicitCluster2()
.cores=-1
のようにマイナス値を指定することで、最大コア数-1 のような場合を簡潔に書けるようにもしました。
さらに、この記事を参考にして、乱数を固定できるようにしましょう。
pforeach <- function(..., .combine=c, .parallel=TRUE, .debug=!.parallel, .cores, .seed) {
if(!require(doParallel)) stop("install.packages('doParallel')")
if(!.parallel || .debug) {
foreach::registerDoSEQ()
} else {
if(missing(.cores)) .cores=parallel::detectCores()
else if(.cores <= 0) .cores=parallel::detectCores() + .cores
doParallel::registerDoParallel(.cores)
}
if(!missing(.seed)) {
if(!require(doRNG)) stop("install.packages('doRNG')")
registerDoRNG(.seed)
}
return(foreach(..., .combine=.combine))
}
library(foreach)
pforeach(i = 1:3, .seed=12345) %dopar% ({
rnorm(1)
})
stopImplicitCluster2()
さて、 %dopar%
をいちいち書くのが面倒くさいのでカリー化しましょう。
pforeach <- function(..., .combine=c, .parallel=TRUE, .debug=!.parallel, .cores, .seed=NULL) {
if(!require(doParallel)) stop("install.packages('doParallel')")
if(!.parallel || .debug) {
foreach::registerDoSEQ()
if(!is.null(.seed)) set.seed(.seed)
} else {
if(missing(.cores)) .cores=parallel::detectCores()
else if(.cores <= 0) .cores=parallel::detectCores() + .cores
doParallel::registerDoParallel(.cores)
}
return(function(expr) {
expr <- substitute(expr)
on.exit(stopImplicitCluster2())
if(!is.null(.seed)) {
if(!require(doRNG)) stop("install.packages('doRNG')")
foreach(..., .combine=.combine, .options.RNG=.seed) %dorng% eval(expr)
} else {
foreach(..., .combine=.combine) %dopar% eval(expr)
}
})
}
pforeach(i = 1:3)({
rnorm(1)
})
pforeach(i = 1:3, .cores=2)({
rnorm(1)
})
pforeach(i = 1:3, .seed=12345)({
rnorm(1)
})
stopImplicitCluster2()
も無くなってすっきりしました。
最終コード
最終的に、次のようなコードになりました。
stopImplicitCluster2 <- function() {
options <- doParallel:::.options
if (exists(".revoDoParCluster", where = options) &&
!is.null(get(".revoDoParCluster", envir = options))) {
stopCluster(get(".revoDoParCluster", envir = options))
remove(".revoDoParCluster", envir = options)
}
}
pforeach <- function(..., .combine=c, .parallel=TRUE, .debug=!.parallel, .cores, .seed=NULL, .export, .packages) {
if(!require(doParallel)) stop("install.packages('doParallel')")
if(!.parallel || .debug) {
foreach::registerDoSEQ()
if(!is.null(.seed)) set.seed(.seed)
} else {
if(missing(.cores)) .cores=parallel::detectCores()
else if(.cores <= 0) .cores=parallel::detectCores() + .cores
doParallel::registerDoParallel(.cores)
}
if(missing(.export)) .export=ls(parent.frame(1000))
if(missing(.packages)) .packages=loadedNamespaces()
return(function(expr) {
expr <- substitute(expr)
on.exit(stopImplicitCluster2())
`%doop%` <- `%dopar%`
if(!is.null(.seed)) {
if(!require(doRNG)) stop("install.packages('doRNG')")
set.seed(.seed)
`%doop%` <- `%dorng%`
}
foreach(..., .combine=.combine, .export=.export, .packages=.packages) %doop% eval(expr)
})
}
data <- iris
pforeach(i=seq_len(nrow(data)))({
sum(data[i, 1:4])
})
library(dplyr)
pforeach(i=seq_len(nrow(data)))({
data[i, ] %>% select(-Species) %>% sum
})
ここでは、@teramonagi さんの下記の記事を参考にして、.export
や .pakages
を意識しなくてもいいように改良を加えています。
次のようなコードも特に意識せずに実行できます。
g <- function(i){rnorm(1)}
f3 <- function()
{
pforeach(i=1:3)({g(i)})
}
f3()
以上、foreach を自分用にカスタマイズする方法を説明しました。
ご参考になれば幸いです。