Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
1
Help us understand the problem. What is going on with this article?
@taiki_yoshida

Azure HDInsight + Microsoft R Serverで演算処理を分散する

More than 3 years have passed since last update.

概要

Microsoft Azure HDInsightとは、マイクロソフト社が提供する、HadoopのPaaSサービスで、インフラ周りの構築ノウハウをあまり知らなくても、立派なHadoopクラスターが構築できる素晴らしいサービスです。
今回はこのMicrosoft Azure HDInsightを構築し、構築したクラスター上でR言語を並列実行させてみます。

事前に準備するもの

  • Azure サブスクリプション

※無償試用版をお使いでない方は必ずお読みください※

このクラスターを構築すると、毎時270円ほどお金がかかります。作成後は必ず削除してください。
削除しないと1日6000円ぐらいかかるので、大変な請求額となります。

構築手順

  1. 「新規」> 「Intelligence + analytics」のカテゴリーから「HDInsight」を選択します。
    image

  2. クラスター名を入力し、クラスター構成から「クラスターの種類」を「R Server」にし、「選択」をクリックします。
    image

  3. 資格情報を設定します。「クラスターログインパスワード」は大文字・小文字と記号を含む12文字以上のパスワードが必要です。
    image

  4. 次にデータソースを関連付けます。既存のAzureストレージアカウントや、Data Lake Storeが利用できます。なければ新規作成も可能です。データソースからのデータはこの記事では利用しませんので、どの設定でもOKです。
    image

  5. クラスターのサイズを指定します。著者はお金がないので一番安い構成にしています(笑) ※それでも1時間 242.76円なので、利用が終わったら必ず削除するように気を付けましょう。あとは「作成」を押して、デプロイされるのを待ちます。大体10分以内にデプロイは完了しているはずです。
    image

  6. 「R Server ダッシュボード」をクリックします。
    image

  7. 「R Studio サーバ」を選択します。
    image

  8. ログインが2回求められます。1回目はクラスターログイン情報、2回目はSSHログイン情報です。
    image

  9. これでR言語での実行準備は完了です。
    image

単一のノードで実行する

以下のコードを実行します。

 # サンプルデータのHDFS (WASB) 場所を指定します
 bigDataDirRoot <- "/example/data"

 # 一時的に保管するローカルフォルダーを作成します
 source <- "/tmp/AirOnTimeCSV2012"
 dir.create(source)

 # データを一時フォルダへダウンロードします
 remoteDir <- "http://packages.revolutionanalytics.com/datasets/AirOnTimeCSV2012"
 download.file(file.path(remoteDir, "airOT201201.csv"), file.path(source, "airOT201201.csv"))
 download.file(file.path(remoteDir, "airOT201202.csv"), file.path(source, "airOT201202.csv"))
 download.file(file.path(remoteDir, "airOT201203.csv"), file.path(source, "airOT201203.csv"))
 download.file(file.path(remoteDir, "airOT201204.csv"), file.path(source, "airOT201204.csv"))
 download.file(file.path(remoteDir, "airOT201205.csv"), file.path(source, "airOT201205.csv"))
 download.file(file.path(remoteDir, "airOT201206.csv"), file.path(source, "airOT201206.csv"))
 download.file(file.path(remoteDir, "airOT201207.csv"), file.path(source, "airOT201207.csv"))
 download.file(file.path(remoteDir, "airOT201208.csv"), file.path(source, "airOT201208.csv"))
 download.file(file.path(remoteDir, "airOT201209.csv"), file.path(source, "airOT201209.csv"))
 download.file(file.path(remoteDir, "airOT201210.csv"), file.path(source, "airOT201210.csv"))
 download.file(file.path(remoteDir, "airOT201211.csv"), file.path(source, "airOT201211.csv"))
 download.file(file.path(remoteDir, "airOT201212.csv"), file.path(source, "airOT201212.csv"))

 # bigDataDirRootのディレクトリを設定し、ロードするデータを指定します
 inputDir <- file.path(bigDataDirRoot,"AirOnTimeCSV2012") 

 # ディレクトリを作成します
 rxHadoopMakeDir(inputDir)

 # データをソースからインプットへコピーします
 rxHadoopCopyFromLocal(source, bigDataDirRoot)

 # HDFS (WASB) ファイルシステムを定義します
 hdfsFS <- RxHdfsFileSystem()

 # 航空会社のリストを作成します
 airlineColInfo <- list(
     DAY_OF_WEEK = list(type = "factor"),
     ORIGIN = list(type = "factor"),
     DEST = list(type = "factor"),
     DEP_TIME = list(type = "integer"),
     ARR_DEL15 = list(type = "logical"))

 # カラム名を抽出します
 varNames <- names(airlineColInfo)

 # hdfs上のデータソースを定義します
 airOnTimeData <- RxTextData(inputDir, colInfo = airlineColInfo, varsToKeep = varNames, fileSystem = hdfsFS)

 # ローカルシステム上のテキストデータを定義します
 airOnTimeDataLocal <- RxTextData(source, colInfo = airlineColInfo, varsToKeep = varNames)

 # 利用する関数です
 formula = "ARR_DEL15 ~ ORIGIN + DAY_OF_WEEK + DEP_TIME + DEST"

# 演算コンテキストをローカルにします
 rxSetComputeContext("local")

 # ロジスティクス回帰を実行します(およそ40分ほどかかります)
 system.time(
     modelLocal <- rxLogit(formula, data = airOnTimeDataLocal)
 )

 # 結果を表示します
 summary(modelLocal)

この実行方法ですと、並列処理ではないため、40分ほど時間がかかります。長いですね…

複数のノードで実行する

複数ノードで実行するには、Sparkを演算コンテキストとして設定することで実行できます。


 # Spark演算コンテキストを定義します 
 mySparkCluster <- RxSpark()

 # 演算コンテキストを定義します
 rxSetComputeContext(mySparkCluster)

 # ロジスティクス回帰を実行します
 system.time(  
     modelSpark <- rxLogit(formula, data = airOnTimeData)
 )

 # 結果を表示します
 summary(modelSpark)

クラスターの削除

利用後は必ずクラスターを削除されることをおすすめします。
image

参考資料

1
Help us understand the problem. What is going on with this article?
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
taiki_yoshida
吉田の備忘録をやっています、吉田です。 Microsoft製品やソリューションをより身近に、より分かりやすく、より早く共有することが日々の目標です。Dynamics 365、Office365、Azure関連の記事を投稿してます。 ※発言は個人の見解であり、所属する組織の公式見解ではありません。

Comments

No comments
Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account Login
1
Help us understand the problem. What is going on with this article?