LoginSignup
6
8

More than 5 years have passed since last update.

R の foreach を自分用にカスタマイズする #rstatsj

Last updated at Posted at 2014-11-07

※この記事をもとにパッケージを作成しました
R で超簡単に並列処理を書けるパッケージ pforeach を作った

さて、これまでの集大成として、R の foreach を自分用にカスタマイズしてみましょう。

まず、理解すべきことは、foreach は R の文法ではなくて、関数を使って実現されているということです。これを理解していれば、自分用の foreach() 関数として、pforeach() を定義することができます。

pforeach.R
pforeach <- function(...) {
  return(foreach(...))
}
使い方
library(foreach)
pforeach(i = 1:3, .combine=c) %do% ({
  rnorm(1)
})

使い方は foreach() 関数と同じです。内部ではまだ単に foreach() を読んでいるだけですね。

では、最初にデフォルトの結合方法を変えてみましょう。
foreach() をデフォルトで実行するとリストが返ってきますが、私は .combine=c として実行することが多いので、これをデフォルトにしてみます。

pforeach.R
pforeach <- function(..., .combine=c) {
  return(foreach(..., .combine=.combine))
}
使い方
library(foreach)
pforeach(i = 1:3) %do% ({
  rnorm(1)
})

すっきりしました。

ここからが本題です。
pforeach をデフォルトで並列実行するように変更してみましょう(pforeach の p は parallel を意識しています)。
registerDoSEQ()クラスタを implicit にする方法がここで使えます。

pforeach.R
pforeach <- function(..., .combine=c, .parallel=TRUE, .debug=!.parallel) {
  if(!require(doParallel)) stop("install.packages('doParallel')")
  if(!.parallel || .debug) {
    foreach::registerDoSEQ()
  } else {
    doParallel::registerDoParallel(parallel::detectCores())
  }
  return(foreach(..., .combine=.combine))
}
使い方
library(foreach)
pforeach(i = 1:3) %dopar% ({
  rnorm(1)
})
stopImplicitCluster2()

stopImplicitCluster2() が出てきましたが、これは後で消します。

コア数はデフォルトで最大コアにしていますが、これを指定できるようにしてみましょう。

pforeach.R
pforeach <- function(..., .combine=c, .parallel=TRUE, .debug=!.parallel, .cores) {
  if(!require(doParallel)) stop("install.packages('doParallel')")
  if(!.parallel || .debug) {
    foreach::registerDoSEQ()
  } else {
    if(missing(.cores)) .cores=parallel::detectCores()
    else if(.cores <= 0) .cores=parallel::detectCores() + .cores
    doParallel::registerDoParallel(.cores)
  }
  return(foreach(..., .combine=.combine))
}
使い方
library(foreach)
pforeach(i = 1:3, .cores=2) %dopar% ({
  rnorm(1)
})
stopImplicitCluster2()

.cores=-1 のようにマイナス値を指定することで、最大コア数-1 のような場合を簡潔に書けるようにもしました。

さらに、この記事を参考にして、乱数を固定できるようにしましょう。

pforeach.R
pforeach <- function(..., .combine=c, .parallel=TRUE, .debug=!.parallel, .cores, .seed) {
  if(!require(doParallel)) stop("install.packages('doParallel')")
  if(!.parallel || .debug) {
    foreach::registerDoSEQ()
  } else {
    if(missing(.cores)) .cores=parallel::detectCores()
    else if(.cores <= 0) .cores=parallel::detectCores() + .cores
    doParallel::registerDoParallel(.cores)
  }
  if(!missing(.seed)) {
    if(!require(doRNG)) stop("install.packages('doRNG')")
    registerDoRNG(.seed)
  }
  return(foreach(..., .combine=.combine))
}
使い方
library(foreach)
pforeach(i = 1:3, .seed=12345) %dopar% ({
  rnorm(1)
})
stopImplicitCluster2()

さて、 %dopar% をいちいち書くのが面倒くさいのでカリー化しましょう。

pforeach.R
pforeach <- function(..., .combine=c, .parallel=TRUE, .debug=!.parallel, .cores, .seed=NULL) {
  if(!require(doParallel)) stop("install.packages('doParallel')")
  if(!.parallel || .debug) {
    foreach::registerDoSEQ()
    if(!is.null(.seed)) set.seed(.seed)
  } else {
    if(missing(.cores)) .cores=parallel::detectCores()
    else if(.cores <= 0) .cores=parallel::detectCores() + .cores
    doParallel::registerDoParallel(.cores)
  }
  return(function(expr) {
    expr <- substitute(expr)
    on.exit(stopImplicitCluster2())
    if(!is.null(.seed)) {
      if(!require(doRNG)) stop("install.packages('doRNG')")
      foreach(..., .combine=.combine, .options.RNG=.seed) %dorng% eval(expr)
    } else {
      foreach(..., .combine=.combine) %dopar% eval(expr)
    }
  })
}
使い方
pforeach(i = 1:3)({
  rnorm(1)
})

pforeach(i = 1:3, .cores=2)({
  rnorm(1)
})

pforeach(i = 1:3, .seed=12345)({
  rnorm(1)
})

stopImplicitCluster2() も無くなってすっきりしました。

最終コード

最終的に、次のようなコードになりました。

pforeach.R
stopImplicitCluster2 <- function() {
  options <- doParallel:::.options
  if (exists(".revoDoParCluster", where = options) &&
        !is.null(get(".revoDoParCluster", envir = options))) {
    stopCluster(get(".revoDoParCluster", envir = options))
    remove(".revoDoParCluster", envir = options)
  }
}

pforeach <- function(..., .combine=c, .parallel=TRUE, .debug=!.parallel, .cores, .seed=NULL, .export, .packages) {
  if(!require(doParallel)) stop("install.packages('doParallel')")
  if(!.parallel || .debug) {
    foreach::registerDoSEQ()
    if(!is.null(.seed)) set.seed(.seed)
  } else {
    if(missing(.cores)) .cores=parallel::detectCores()
    else if(.cores <= 0) .cores=parallel::detectCores() + .cores
    doParallel::registerDoParallel(.cores)
  }
  if(missing(.export)) .export=ls(parent.frame(1000))
  if(missing(.packages)) .packages=loadedNamespaces()
  return(function(expr) {
    expr <- substitute(expr)
    on.exit(stopImplicitCluster2())
    `%doop%` <- `%dopar%`
    if(!is.null(.seed)) {
      if(!require(doRNG)) stop("install.packages('doRNG')")
      set.seed(.seed)
      `%doop%` <- `%dorng%`
    }
    foreach(..., .combine=.combine, .export=.export, .packages=.packages) %doop% eval(expr)
  })
}
使い方
data <- iris
pforeach(i=seq_len(nrow(data)))({
  sum(data[i, 1:4])
})

library(dplyr)
pforeach(i=seq_len(nrow(data)))({
  data[i, ] %>% select(-Species) %>% sum
})

ここでは、@teramonagi さんの下記の記事を参考にして、.export.pakages を意識しなくてもいいように改良を加えています。

次のようなコードも特に意識せずに実行できます。

使い方
g <- function(i){rnorm(1)}
f3 <- function()
{
  pforeach(i=1:3)({g(i)})
}
f3()

以上、foreach を自分用にカスタマイズする方法を説明しました。
ご参考になれば幸いです。

6
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
8