11
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

個人開発Advent Calendar 2017

Day 13

MapReduceフレームワークを自前で実装して気づいたこと

Last updated at Posted at 2017-12-13

これはなにか

  • 最近、少しばかり時間をかけて(作業時間50時間ぐらい?)MapReduceフレームワークをrubyで実装してみました(仕事ではなくただの趣味で)
  • (2018.02.17追記)repositoryはこちらです
  • 久しぶりのそこそこの規模の個人開発だったので色々と気付きがあったので、雑多にまとめてみます

まえおき

なぜMapReduceを実装したのか

  • MapReduceのアルゴリズムに興味を持ち、googleが公開している論文Hadoopのofficial referenceを読んでみた
  • しかし、全体像は分かって具体的な実装のイメージがいまいち掴めなくてモヤモヤした
    • そもそも自分は分散処理システムに関わったことがなかった
  • かといって、HadoopやSparkのコードを知識ゼロから読んでいくのは時間がかかる
  • なので、自分なりに実装した方が理解が早そうだと思った

どういうのを作ったのか

  • 今のところMapReducepっぽい動きをする何か…という感じ。今のところいろいろ未完成…
  • さすがに分散ファイルシステムまで自分で実装できないのでS3で代用(なのでデータのローカリティは完全に無視)
  • 全体的にHadoop1のアーキテクチャをパクっているが、大体こんな感じで動作する。
  1. JobTrackerはユーザーからのmap scriptとreduce scriptをpostで受け取る
  2. それをJobWorkerにpost
  3. map taskを受け取ったJobWorkerがmap taskを実行する
  4. map taskが終わるとmap taskを担当したJobWorkerが別のJobWorkerにreduce taskをpostする
  5. reduce taskを受け取ったJobWorkerはreduce taskを実行する

map_reduce.png

2017/12/29 更新。動いてる様子の動画を追加

実際に動いてる様子。JobTracker + JobWorker * 3 が動作しており、map taskが1worker, reduce taskが2workerで動作している。

例えばWordCountを行うJobなら、以下のようなjobを定義してJobTrackerにPOSTすることで処理が開始される。Worker間でデータをやり取りする時はMessagePackでserialize/deserializeしている。

map_class_name = 'WordCount'
map_script = <<-'EOS'
  class WordCount
    def map(input_data, output_io)
      input_data.split(' ').each do |raw_word|
        word = raw_word.strip
        next if word.empty?

        output_io.puts({ key: word, value: 1 }.to_json)
      end
    end
  end
EOS

reduce_class_name = 'WordCount'
reduce_script = <<-'EOS'
  require 'json'
  class WordCount
    def reduce(input_io, output_io)
      output = Hash.new(0)
      input_io.each_line(chomp: true, rs: "\n") do |line|
        input = JSON.parse(line, symbolize_names: true)
        output[input[:key]] += input[:value]
      end

      output.each do |key, value|
        output_io.puts(JSON.generate(Hash[key, value]))
      end
    end
  end
EOS

job_input_file_path = 'input.txt'
job_input_bucket_name = 'input'

job_output_directory_path = 'word_count'
job_output_bucket_name = 'output'

job = SimpleMapReduce::Server::Job.new(
  map_script: map_script,
  map_class_name: map_class_name,
  reduce_script: reduce_script,
  reduce_class_name: reduce_class_name,
  job_input_file_path: job_input_file_path,
  job_input_bucket_name: job_input_bucket_name,
  job_output_directory_path: job_output_directory_path,
  job_output_bucket_name: job_output_bucket_name
)

で、書いたコードはどこよ

  • (2018.02.17追記)ここです
  • とにかく動くことを最優先でガーっと書いたので、全体的に汚いのとテストが書けてないのでまだ未公開です。。 🙇
  • 来年1月中にはgithubに公開したい
  • JobTracker, JobWorkerで非同期処理を行う部分は、on memoryなjob queue workerを自作して使っており、これは公開済み

気づいたこと

当初想定していなかった多くの要素を発見できた

MapReduceに関する解説や論文ではmap, shuffle, reduceといった主要な要素だけが注目されがちで、自分もその部分について理解することが目的だった。

しかし、実際に分散システムとして動作するMapReduceのシステムをゼロから作ってみたところ、実際にはそれ以上に様々な機能が必要だった。特に、動的にworkerを追加したり、workerの状態を管理する仕組みが完全に当初の想定から抜けていた。この辺はノウハウもないので必要になったものを片っ端から実装していった。ちょっと考えれば気づく話ではあるのだが、分散処理システムをゼロから作ったことのない自分には色々と未知の領域だった。

実際に自分が実装したAPI endpointの一覧は下記の通り。肝心のjobとtaskの実行よりも、workerの登録や状態管理のための実装で作業時間の大半を要した。逆に、map taskとreduce taskを実行させる非同期処理は、最初から全体像のイメージが明確だったのですぐ動くものが実装できた。

  • JobTracker
    • POST /jobs
    • GET /jobs
    • GET /jobs/:id
    • GET /workers
    • POST /workers
    • GET /workers/:id
    • POST /workers/reserve
  • JobWorker
    • POST /map_tasks
    • post /reduce_tasks

新しく使うフレームワークの学習コストは大きい

普段使わないSinatraをAPIサーバーの実装に使ったら、Sinatraの学習にものすごく時間を取られてしまい、肝心のロジックの実装がなかなかできないという状況に陥ってしまった。

これは事前の調査不足ということもあるが、例えばrubyのscriptからSinatra appを起動する方法とか(rackupするのではなく)、hot reloadする方法とか、細かいことを調べるのに時間がかかってしまった。
ググると初心者向けtutorial的なものは大量にあったのだが、凝ったことをするための情報がなかなかネットには転がっていなかった。で、結局自分が知りたいことはSinatraのソースを読むのが一番手っ取り早かった。

普段使いのRailsを使った方がいいかなと途中で何度か思ったが、今回実装したJobTrackerとJobWorkerは、常駐プロセス + http interfaceという構成で作りたかったので、それを実現するためにはシンプルなrack applicationである方が都合が良かった。それに、たまには新しいものを試してみなくてはならないということもありm今回はSinatraを採用した。しかし、自分が使ったことがない物を使う時には、どんなものであれそれなりに調査に時間がかかる、ということは肝に銘じておくべきだと改めて思った。

手軽に動作確認できる雑な仕組みを最初から用意しておく

今回は時間の都合でtestをまだ書いてないのだが、それでも動作確認するコストを下げるための仕組みは最初から作っておいた方が良いと感じた。

特に、今回作ったような複数のSinatra Appが動いていないと動作確認できないようなめんどくさいブツの場合は特に重要だと思う。今回は用途別にtransaction scriptを書いて、JobTracketと複数のJobWorkerをまとめて起動したり、map taskやreduce taskだけを個別に実行できるようにするようにした。もちろん最終的にはtestを書くが、今回のように試行錯誤しながら実装していく場合は実際に動かせることの方が重要である。

nu boardは偉大

IMG_20171213_222910.jpg

nu boardというノートっぽく使えるホワイトボードがあるのだが、これがテンポラリなメモを書いたり、固まったシーケンス図を残していく、みたいな使い方をするのにすごく便利だった。(特に今回みたいに試行錯誤が多い開発の場合は)

基本的には思考のためのワークスペースとしてあれこれと書いては消し書いては消しという風に使っていた。で、ある程度まとまった段階でメモとして残しておきたくなったら、そのページはそのままにして、他のページで試行錯誤に使う、という感じで、残しておきたいものとそうでないものを明確に区別せずに使うことが出来る。

A4サイズのホワイトボード自体は自分の思考整理用に何年も愛用してきたが、いかんせんスペースに限りがあるので、残しておきたい情報は別の媒体に書き残していくしか手がなかった。しかし、nu boardはページ数がそこそこあるので、残しておきたい情報をある程度そのまま残しておくことができる。これは使ってみないとわからないが、作業を中断せずに思考を続けられつつも、いらないものはサッと消してまたスペースを確保して思考を続けられる、というのはストレスがなくとても良い体験である。普段からあれこれと考えながら仕事をしているエンジニアには是非おすすめしたい一品である。

11
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?