Edited at

Apache Spark for Bluemixで天気情報を高速分析

More than 3 years have passed since last update.


はじめに

2015年7月にIBM Bluemix上でApache Sparkを利用することが

できるようになったので早速試してみた。

 ※2015年8月現在、Apache Spark for Bluemixはβ版として公開されている。


ボイラープレートからApach Spark Starterを起動

Apache Spark Starterには書きの3つのツールが含まれているようだ。

 ・Jupyter Notebooks :データ解析と可視化用

 ・SWIFT Object Storage: 分析用データセットの保管用

 ・Apache Spark :分散型データ処理用


  1. Bluemix Consoleにログインし、[カタログ]から[Apache Spark Starter]を選択する
    150825-0001.png
      

  2. 任意の名前を設定し、[作成]する150825-0002.png


  3. これで必要な環境の準備は完了したのでConsoleからアプリケーションを起動してみる。Jupyterによる管理ページとObject Storageの管理用ページが用意されていることが確認できる。

    イメージ.jpg


  4. Jupyter及びObject Storage接続用のCredentialsを確認

    4.jpg



Jupyterページ:チュートリアルの内容確認

[LAUNCH]をクリックすると『Jupyter』が起動する。デフォルトでObject Storageにチュートリアルのファイルが3つ保存されており、Jupyterで可視化されるようになっている。チュートリアルが用意されているので、[Tutorial 1]を試してみる。

150825-0005.png

むむ…結構長くなりそうだ…


チュートリアル:天気情報をロードし、Sparkで分析



  1. Bluemixのオブジェクトストレージにraw dataをアップロード


    • NOAAの[GHCN-Daily FTP Access] からFTPページを開き[by_year]の中から<2015.csv.gz>をダウンロードする

       http://www.ncdc.noaa.gov/data-access/quick-links#ghcn

    • CSVファイルを開きファイルの最初の行に次の列ヘッダーを追加し保存

        → STATION,DATE,METRIC,VALUE,C5,C6,C7,C8

    • Object Strageページから新しいコンテナを作成する。

    • [+]ボタンをクリックし、「ClimateDataForTutorial」という名前を設定する
      150825-0006.png


    • 作成したコンテナページから用意したCSVファイルをアップロード
      150825-0007.png





  2. 最初のRDDを定義する


    • アップロードしたCSVファイル(raw data)からRDDを作成する

    • JupyterからNotebook(Python 2)を作成する
      150825-0008.png

    • Notebooksを作成するとSparkContextを利用できる

    • 下記コマンドを入力し、RDDを生成する


      weather = sc.textFile("swift://ClimateDataForTutorial.spark/2015.csv")

      150825-0009.png





  3. (オプション)別のアカウントからロードする 


    • 別のSpark StarterアカウントやSoftLayerアカウントからもRDD定義が可能

    • ここでは詳細は割愛する。




  4. カラムに含まれるデータを解析


    • RDDの各行から天候データを受け取り、コンマ区切りによって分割する

      weatherParse = weather.map(lambda line : line.split(","))

    • 結果、新しいRDD weatherParseは文字列のリストとして定義される

    • 各リスト内の文字列は、行の個々の要素となる
      150825-0010.png




  5. 降水量データ値によって列を小さなデータセットにReduce


    • 降水量データ行のみ選択し、データセットを小さなデータセットにする

    • METRIC列がPRCPに等しい行でフィルタをかけてやる
      weatherPrecp = weatherParse.filter(lambda x: x[2] == "PRCP")




  6. 気象局による平均降水量を計算するためにデータセットを変換


    • 計算しやすいようにデータセットの構造を変換

      weatherPrecpCountByKey = weatherPrecp.map(lambda x : (x[0], (int(x[3]), 1)))
      150825-0011.png




  7. 気象局による総降水量を計算


    • 全ての気象局の総降水量を計算するために、reduceByKeyを利用

      weatherPrecpAddByKey = weatherPrecpCountByKey.reduceByKey(lambda v1,v2 : (v1[0]+v2[0], v1[1]+v2[1]))
      150825-0012.png




  8. 平均値を計算する


    • 気象局ごとの平均値を計算する。

      weatherAverages = weatherPrecpAddByKey.map(lambda k: (k[0], k[1][0] / float(k[1][1] ) ) )




  9. 10個の気象局の平均値を出力


    • 10の気象局ごとの平均値を計算することも可能である。

    • station IDと平均値のペアだと、並び順の問題でstation IDの方にソート条件が適用されるため降水量の多い順で出力されない。

      for pair in weatherAverages.top(10):
      print "Station %s had average precipitations of %f" % (pair[0],pair[1])





  10. 降水量の多い順に並び替えて出力


    • Station IDと平均値のペアの順序をMap機能で入れ替えた上でTop 10を表示することで、降水量の多い順に並び替えて出力させる。

      for pair in weatherAverages.map(lambda (x,y) : (y,x)).top(10):
      print "Station %s had average precipitations of %f" % (pair[1],pair[0])

      150825-0013.png




所感

Bluemix上でSparkインスタンスの数量及びメモリ容量を変更するだけでApache Sparkの分散処理性能を変更することができるため、BluemixとApache Sparkの組み合わせは非常に強力であると感じた。

また、OpenStack Swiftをデータセットの保管先に利用しているため、安価に大容量のデータを保管することも可能であり、OpenStack APIを利用することで、データ自体の入出力を自動化することも可能であることも非常に良いポイントだと思う。

Apache Sparkはメモリを利用して動作するため、Apache Hadoopに比べて非常に高速に分析が可能である

Apache Spark環境を簡単に用意し、簡単にスケールができ、データ連携もしやすいので、Apache Spark for Bluemixの今後に大きく期待したい。

また気が向いた時にApache Spark for Bluemixを試してみたいと思う。