ビッグデータと Hadoop
ビッグデータという言葉がバズワードになって久しく、昨今では Hadoop を実際のビジネスに活用した事例もちらほらと見かけるようになりました。
なぜ Hadoop なのか
データ分析の観点から Hadoop のような大規模分散処理における最大のメリットを挙げるとすると、従来の伝統的な統計学的観点による標本抽出をせずとも、母集団の全数走査を比較的手軽にできる、すなわち大規模データの全数調査に尽きると筆者は考えています。
伝統的な標本調査においては、標本から中心極限定理と正規分布近似(あるいはノンパラメトリックな推定)を利用して、母集団と母数を推定することで行います。漸近理論や統計量と区間推定、仮説検定と確率分布といった重要な考え方についてはすでに過去の記事で紹介をしました。忘れた方は過去記事を参照してみてください。
抽出過程において何らかのモデルを仮定しそれに基づく標本抽出をすることは、標本サイズを小さくすることで手元の一般的な計算機で分析することを可能にしますが、モデルの前提の正しさを確認しておく必要があります。そのために母集団の統計量などを元に検定をしたりするわけです。
また何らかのモデルに基づいた標本抽出をおこなう際にも、単純に容量の大きなデータから 写像 (map) 、 簡約 (reduce) をおこなうことのできる Hadoop は強力なツールとして使えます。とくに最近は Amazon Web Service のようなクラウドサービスが広く普及していますから、いつでもどこでも必要なときに必要なぶんだけ計算機資源を用意して Hadoop クラスタを利用するといったことはますます手軽になってきています。
なぜ Ruby なのか
Hadoop は MapReduce をJava で実装したソフトウェアですが利用者は必ずしも Java を使う必要はありません。
Hadoop Streaming は非常に強力な API で、標準入出力を利用可能なほぼすべての言語で利用できます。
Hadoop の利用に「プログラミングが必要」と言うと拒否反応が返ってくることがあるでしょう。たとえば専門家でなくてもボタンひとつで手軽にソフトウェアを扱えるようにできないか、日本語で仕様書を書けば利用できるようにならないか、といったものです。
しかしながら、手軽で簡潔に記述することができ、計算機がそのまま解釈することができ、保守や運用に優れているのはエクセルの仕様書や指示書などではなく、プログラムのソースコードそのものです。
データの抽出をするにしても、多くのケースではほんの 10 行余りのコードを記述すれば Hadoop を利用するには充分でしょう。また Ruby の優れた記述性、柔軟性、生産性の高さについては言うまでもないでしょう。特にアドホックな分析を繰り返すときに、このようなスクリプト言語の生産性が強力なパワーを発揮します。
MapReduce プログラミング
MapReduce プログラミングと言うと何やら難しそうに聞こえますが、決して難しい技術ではなく、計算機に具体的かつ簡潔な命令を与える手段として、人間にとっても計算機にとっても親和性の高い共通のインターフェースとなるものです。
やるべきこととしては前述の通りデータストリームに対する写像 (map) ・簡約 (reduce) 変換といった、関数型の操作を考えれば良いのです。
各行が次のように位置情報が JSON 形式で格納されている巨大なデータセットを考えてみます。 (架空のデータです)
{"topic_seq": 8394670, "op": 1, "seq": 95219446, "timestamp": 1394560204,"error_level": 231, "mac_str": "012345ABCDEF", "event_type": "location", "assoc": false,  "location_x": 12399.999046325684, "location_y": 3123.9998569488525}
たとえばこのデータセットから Mac アドレスの登場頻度を調べてみることにしましょう。これはいわゆるワードカウントと呼ばれる典型的な処理のひとつです。
Hadoop でのデータ処理は標準入出力で代替してテストすることができます。 Mapper と Reducer を次のように記述します。
require 'json'
class Mapper
  def self.map(stdin) # 標準入力が渡ってくる
    event_type = mac_str = ""
    # 各行を逐次処理していく
    stdin.each_line {|line|
      json = line.force_encoding("utf-8").strip
      # JSON パーサーで分解する
      JSON.parse(json).each {|k,v|
        # Mac アドレスを取り出す
        mac_str = v if k == "mac_str"
      }
      # 1 件カウントする
      puts "#{mac_str}\t1\n" unless mac_str == ""
      mac_str = ""
    }
  end
end
これにより Mapper のアウトプットは次のようになります。
012345ABCDEF    1
元のデータから必要なデータを分解、抽出するのが Mapper の仕事です。この場合は位置情報の内部にある Mac アドレスを抽出、1 件という数と共に出力したわけです。
これは他の行に依存しませんから、この場合 Mapper は複数のノードに分散することが可能です。分散すればするほど単一ノードごとの負荷は少なくてすみます。
このとき Mapper によって中間データ領域に出力されたデータは自動的にソートされます。したがって Reducer ではデータの集計にあたりソートされたデータが渡ってくることを前提として記述していけばいいわけです。
class Reducer
  def self.reduce(stdin)
    # 変数の初期化
    count = 0
    key = newkey = ""
    # 各行を逐次処理していく
    stdin.each_line {|line|
      # 1 行ごとに列に分解
      newkey, num = line.strip.split("\t")
      # 主キーが変化していないか監視する
      unless key == newkey
        # 変化していればカウントしていた合計値を出力する
        puts "#{key}\t#{count}\n"
        key = newkey
        count = 0
        newkey = ""
      end
      # キーが変化するまでカウンタを加算
      count += num.to_i
    }
    # 最後の 1 行が出力される機会が無いので
    puts "#{key}\t#{count}\n" unless key == ""
  end
end
これにより次のように集計がおこなわれます。
012345ABCDEF    1
012345ABCDFF    2
012346ABCDEF    4
01236AABCDEF    3
無事データを全数調査することができました。
次回以降 Hadoop + Ruby についてより深く説明していきます。