*こちらはOpt社内で実施しているオライリーのHadoop本第3版の輪読会用資料になります
自己紹介
株式会社オプト シニアエンジニア @sisisin
Hadoop本第3版第3章Hadoop分散ファイルシステムについてかいつまんで説明していこうと思います
内容的にはHadoop本の内容をピックアップしているだけなので、真面目にやるなら実際のHadoop本を読んだほうが良いです
事前準備
(必須ではないが、動かしてみながらやるなら。)
はじめに
Hadoopは独自の分散ファイルシステムであるHDFS(Hadoop Distributed Filesystem)と呼ばれる仕組みを持っている
この章では、HDFSがどのような設計で、どんなAPIを用いて扱えるのかといったことを説明する
HDFSの設計
HDFSの設計は「HDFSは、ストリーミング型のデータアクセスパターンによって、非常に大きなファイルを保存するために設計されたファイルシステムで、コモディティハードウェアによって構成されるクラスタで動作します」と表現されている
- 非常に大きなファイル
- 数ペタバイトに及ぶ
- ストリーミング型のデータアクセス
- 「書き込みは一度、読み出しは何度も行う」のが最も効率的なデータ処理パターンだという発想に基づいている
- コモディティハードウェア
- 市場で普通に調達可能なハードウェアでクラスタを構成しても問題なく動くよう設計されている
HDFSの設計
HDFSの不向きな領域
- 低レイテンシのデータアクセス
- HDFSは高スループットを出すためにレイテンシを犠牲にしてしまうことがあり得る設計になっている
- 大量の小さなファイル
- ネームノード(後述。クラスタ上のどこに実際のデータが有るかを管理するデーモン)はメモリ上にファイルシステムのメタデータを持つので、ファイルシステム上に持てるファイル数がネームノードのメモリ量によって制限されてしまう。そのためファイル数が多くなる状況には向かない
- ファイル・ディレクトリなどは1つに付き150バイト必要なので、例えばクラスタ内に100万個のファイルがあると最低でも300MBのメモリが必要といった具合
- 複数のライターからの書き込みや任意のファイルの修正
- HDFSは1つのライターのみから、ファイルの末尾に対してのみ書き込みが出来るという制約がある
- 「書き込みは一度」の思想に反するのでそりゃそう、という感想
HDFSに関する概念
ブロック
- ファイルシステムのブロックと同様の概念
- ただし一般のファイルシステムよりは遥かに大きい(デフォルトで64MB)
- HDFS上のファイルはブロックサイズのchunkに分割され、それぞれのchunkは独立した単位として保存される
ブロックを抽象化していることによる恩恵
- ディスクのサイズよりも大きいファイルが作れる
- ストレージのサブシステムが単純化出来る
- ブロックが固定長になる
- メタデータ(例えばパーミッション情報など)を持たないですむ
- レプリケーションと相性が良い
- ブロックだけを物理的に別のマシンに複製しておける
ブロックが大きい理由
ファイルシステムのブロックに比べてHDFSのブロックが大きくなっているのは、ディスクのシークのコストを抑え、大容量ファイルの転送にかかる時間のボトルネックをシークコストではなくディスクの転送レートにすることが出来るため。
例えば、シークタイムを10ms、転送レートを100MB/sと仮定したときに、シークに使われる時間を1%に抑えようとするとブロックサイズを100MBにしなければいけないということがわかる。
なお、 ブロック数 < クラスタ内のノード数
になると、MapReduceするときにノードが余ることになるので行き過ぎは良くない。
ネームノードとデータノード
HDFSクラスタには、マスター/ワーカーパターンで動作する2種類のノード群がある
- ネームノード(マスター)
- データノード(ワーカー)
ネームノード
ファイルシステムの名前空間を管理するノード。
- ファイルシステムのツリーと、ツリー内の全ファイル及びディレクトリのメタデータを管理する。
- この情報はローカルのディスク上に名前空間のイメージと編集ログという2つのファイルとして保存される。
- 全てのファイルの全てのブロックがどのデータノードにあるかも保持しているが、この情報は永続化されない。
- システム起動時に毎回データノード群から再構築される作りになっているため
データノード
ブロックを管理するノード。
- クライアントあるいはネームノードからの要求に応じてブロックの読み書きを行う
- 定期的にネームノードに自分が保管しているブロックのリストを報告する
ここまでの説明でわかるとおり、ネームノードがないとHDFSは使えないので、単一障害点となる。
Hadoopはネームノードの耐障害性を高めるために以下の2つの仕組みを持っている
- メタデータ情報構成ファイルのバックアップ
- セカンダリネームノード
- セカンダリといいつつ、ネームノードとして働かないので注意
- セカンダリの活用方法は10章で解説するらしい
HDFSフェデレーション
大規模なHDFSクラスタではネームノードのメモリがスケーリングする上でボトルネックになりうる。
HDFSフェデレーションを用いることで、ネームノードがファイルシステムの名前空間の一部を管理出来るようになり、この問題を解決する。
例えば、 ネームノード1には /user
以下を、ネームノード2には /share
以下を管理させる、というように出来る
このとき、ネームノード1が障害を起こしていても、ネームノード2が管理する /share
以下は問題なく扱える。
HDFSの高可用性
Hadoop2系からはHDFSのHA対応がリリースされており、これによってネームノードの単一障害点問題を緩和している。
アクティブ・スタンバイ構成でネームノードのペアを起き、アクティブなネームノードに障害が起きた場合はスタンバイしていたネームノードが処理を引き継ぐように出来ている
フェイルオーバーとフェンシング
フェイルオーバーについて
- アクティブネームノードからスタンバイネームノードへの移行はHadoopシステム内のフェイルオーバーコントローラによって管理されている
- 各ネームノードはフェイルオーバーコントローラのプロセスを実行する
- プロセスは、自ネームノードの障害モニタリングとフェイルオーバーのトリガー実行を行う
- 手動でフェイルオーバーすることも可能(その場合はグレースフルにフェイルオーバーされる)
フェンシングについて
- ネットワーク分断などで、アクティブなネームノードが動いてるにもかかわらずフェイルオーバーが実施された際にシステムに影響を出さないようにする処理をフェンシングという
- ネームノードのプロセスをkillしたり、ネームノードのプロセスから共有ストレージへのアクセス権を剥奪するなどを行う
コマンドラインインターフェース
*実際に動かしてみる場合は、ローカルでHadoopを擬似分散モードで立ち上げること
Hadoopでは、HDFSのファイルシステムを利用するコマンドラインインターフェースが用意されている。
コマンドの例を紹介する
$ hadoop fs -help
$ hadoop fs -copyFromLocal input/docs/quangle.txt hdfs://localhost/user/tom/quangle.txt # ローカルの `quangle.txt`をhdfs上へコピー
$ hadoop fs -copyToLocal /user/tom/quangle.txt ./quangle.txt # hdfs上からローカルファイルシステムへコピー
$ hadoop fs -mkdir books # `books`ディレクトリを作成
$ hadoop fs -ls # リスト
hadoop fs -ls
では、通常の ls
コマンド異なり、2つ目の列にファイルの複製数が表示される
Hadoopのファイルシステム群
- Hadoopは抽象化されたファイルシステムの概念を持っており、HDFSはその実装の1つでしかない
- Javaの抽象クラスである
org.apache.hadoop.fs.FileSystem
はHadoopのファイルシステムを表現するもので、実装は複数ある(例えばローカルディスクのファイルシステムを利用したものやAmazon S3を利用したものなどがある) - これらのファイルシステムへの選択にはURIスキーマを用いる
- 例:
hadoop fs -ls file:///
- 例:
- どのファイルシステムへもMapReduceを実行することは出来るが、大規模データを処理するなら分散ファイルシステムを用いるべきである
インターフェース
HDFSにHTTP経由でアクセスができる
- ノードへ直接アクセス
- WebHDFSを利用することで読み書き込みが出来るようになる
- 有効化するには
dfs.webhdfs.enabled
をtrue
にする
- HDFSプロキシへアクセス(
DistributedFileSystemAPI
を使う)
HTTP以外にもC言語向けのライブラリや、Unixファイルシステムと統合するFUSE(Filesystem in Userspace)というものもある
Javaインターフェース
HadoopのファイルシステムとやりとりするためのAPIとして、FileSystemクラスが提供されている
HDFS向けのDistributedFileSystemやその他のクラスは全てこのFileSystemクラスの子クラスとして実装されている。
この章ではJava上でどんな事ができるかをサンプルコードを記載しながら解説しているが、特に解説することもないので本の内容と以下のコードを参照してもらえればと。
https://github.com/tomwhite/hadoop-book/tree/3e/ch03/src
なお、以下のような項目について解説されている
- HadoopURLからのデータ読み出し
- FileSystem APIを使ったデータの読み出し
- FSDataInputStream
- データ書き込み
- FSDataOutputStream
- ディレクトリ
- ファイルシステムへの問い合わせ
- ファイルのメタデータ:FileStatus
- ファイルのリスト
- ファイルパターン
- パスフィルタ
- データの削除
データフロー
ファイルの読み・書きをするときのデータフローを解説する
ファイル読み込み
-
DistributedFileSystem#open
を呼び、ファイルをオープンする - ネームノードからファイルのメタデータを取得し、ネームノード/データノードからデータを取得できる
FSDataInputStream
を返す - クライアントは渡されたStreamに対して
FSDataInputStream#read
を呼び、データを読む -
DFSInputStream
は自身の保持するファイルの1つ目(=最も近い)のブロックに接続し、クライアントにデータを渡す - ブロックの終端にたどり着いたら、データノードへの接続をクローズし、次のブロックを探して接続、引き続きクライアントにデータを渡す(この処理はクライアントに対して透過的に行われるのでクライアントからは連続したStreamを受け取り続けているようにしか見えない)
- 該当のファイルの最終ブロックまで読み終えたらクライアントは
FSDataInputStream#close
を呼び、ファイルをクローズする
読み込み中にデータノードとの通信時にエラーが起こった場合
- そのブロックを保持している最も近いデータノードを試す
- 障害のあったデータノードを保持しておく(その後の処理で不要なリトライが行われないようにするため)
- データノードから転送されたデータのチェックサムを確認し、壊れたブロックがあったらネームノードに報告の上、そのブロックの複製を読みに行く
以上のような設計になっているおかげで、データへのアクセスをクラスタ内の全てのデータノードに分散できるようになっている
ファイル書き込み
ファイルの新規作成とデータの書き込みというケースを考える
-
DistributedFileSystem#create
を呼び、ファイルを作成する - ネームノードの持つメタデータを元にファイル作成が可能化を検証し、問題がなければブロック未割り当てのファイルが作成されて、
FSDataOutputStream
を返す - クライアントが書き込み要求を Streamに対して行い、
FSDataOutputStream
は書き込みの準備を行う(ネームノードへの書き込むべきデータノードのリスト取得要求など) - 書き込むパケットを
FSDataOutputStream
がデータノードに対して転送する。複製数が2以上の場合は、データノードは複製対象の別のデータノードへデータを転送する - 全てのデータノードは、自身が書き込み要求を終えたことを
FSDataOutputStream
へ伝える。これはackQueue
と呼ばれるもので管理されており、要求が完了したらキューから取り除かれる - 全てのパケットについて書き込みが完了したら
FSDataOutputStream#close
を呼び、Streamを閉じる。ここでネームノードは1つのファイル書き込みが完了した通知を得る
補足
dataQueue, ackQueue: https://github.com/apache/hadoop/blob/f27a4ad0324aa0b4080a1c4c6bf4cd560c927e20/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java#L514
一貫性モデル
ファイル読み書きに際してのデータの見え方を示す。
HDFSではPOSIXの要求のいくつかを性能のためのトレードオフで犠牲にしており、通常期待されるものと違う結果になる操作がある。
具体的には、ファイルに書き込まれた内容は、ストリームがフラッシュされたあとでも見えるとは限らないというものがある
Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(p).getLen(), is(0L)); // 直前で `flush()`しているにもかかわらずファイルの内容は0byteに見える
書き込み中のデータは、1ブロック分書き終わった時点で初めて読み込み側から参照できるようになる。つまり基本的にはブロック単位で書き終わったもののみが参照可能である扱い。
ただし、 FSDataOutoputStream#sync
を実行すると強制的に参照可能にすることが出来る
なお、HDFSでファイルをクローズした時(OutputStream#close
が呼ばれた時)はこの sync
が暗黙的に呼ばれている
この一貫性モデルは、書き込み中になんらかの障害が発生した場合に最大で1ブロック分のデータが失われうることを示している。
そのため、データをロストしないように適宜 sync
が呼ばれているべきである。
だが、 sync
メソッドにはオーバーヘッドもあるので、データの頑健性とスループットのトレードオフを考慮して適切なタイミングを見つける必要がある。
データ取り込み(Flume,Sqoop)
- ファイルシステムからHDFSへデータを取り込む際はFlumeやSqoopといったツールがあるので使おう
- Flumeは大量のストリーミングデータを取り込むことができる
- Sqoopは構造化データストアからバルクインポートを行える
distcpによる並列コピー
ファイル集合への操作を並列実行するコマンド distcp
が提供されている。
本来の用途としてはHDFSクラスタ間のデータ転送だが、オプションによって変更のあったファイルのみ上書きなどが出来る
このコマンドはMapReduceのジョブとして実装されており、コピーの処理はmapとして実行される(reducerはない)
例
$ hadoop distcp hdfs://namenode1/foo hdfs://namenode2/bar
namenode1とnamenode2が同一のバージョンのHadoopで動作している場合は以上の例のようにhdfsスキーマを利用する。
異なるバージョンだとRPCシステムに互換がないため動かせないためである。
この問題を回避するためには、以下の方法がある
- 転送元をHTTPベースのHFTPファイルシステムを利用する
- 転送元と転送先にHDFS HTTPプロキシを利用する
データをHDFSにコピーする場合はクラスタのバランスを考慮することが重要。
HDFSはブロックがクラスタ間で平等に分配されているときに最も上手く動くので、 distcp
で偏りが出る状況は避けたい
distcp
する際にクラスタ内のノード数より多くのmapを使えばこの問題は回避できるので、1ノード辺り20のmapをデフォルトとして実行し始めるのが良いとされている
Hadoopアーカイブ
Hadoopアーカイブ(HARファイル)は複数のファイルをHDFSブロックに効率よくまとめ、ネームノードのメモリ使用量を削減しながら、ファイルへのアクセスは透過的に行えるようにしてくれるファイルアーカイブ機能である。
これを活用することでHDFSは小さいファイル群を効率よく保存できないという問題を緩和出来る
が、現実ではあんまり使われていないらしい
おわり