みなさん、こんにちは、えいりんぐーです。
今回はEMRについてまとめます。
Q: Amazon EMR とは何ですか?
Amazon EMR は、企業、研究者、データアナリスト、および開発者が、簡単に、そして費用対効果の高い方法で、莫大な量のデータを処理することができるようにするウェブサービスです。Amazon Elastic Compute Cloud (Amazon EC2)、および Amazon Simple Storage Service (Amazon S3) のウェブスケールインストラクチャで実行されるホストされた Hadoop フレームワークを使用します。
資料集
基本的に以下の資料を参考にしています。
- Black Beltオンラインセミナー資料 (2017年)
- EMRのベストプラクティス (2013年)
- EMRを保護するベストプラクティス (2018年)
- よくある質問
- re:Invent 2017 (2017年, YouTube, 英語)
仕組み
Hadoopフレームワーク、マスターが多数のコアを管理して分散処理する。
ノード | セキュリティグループ | 役割 |
---|---|---|
Master Node | Master Instance Group | - コアノードやタスクノードの監視。ネームノードやリソースマネジャー (YARN) が動く。Failover非対応。sshのポートが開いている。 |
Core Node | Core Instance Group | HDFSをアタッチされたボリュームとして持つ。計算を実行する。データノードが動く。追加したり、HDFSやCPUやRAMを増設できる。基本的にマスターとしか通信しない。 |
Task Node | Task Instance Group | HDFSを持たないコアノード。増設がより自由。スポット価格やインスタンスタイプを調整できる。基本的にマスターとしか通信しない。 |
YARN: Yet Another Resource Negotiator
- Resource Manager
- マスターサーバーで動く
- スレーブのリソースを管理
- ジョブ管理はしない
- Node Manager
- スレーブサーバーで動く
- サーバーのリソースをRMに報告
- サーバー上のコンテナの管理
- Container
- スレーブのリソースが切り出されたもの
- NMによって起動
- Application Master
- ジョブ全体を管理するコンテナ
EMRにおいては、MapReduceやHiveやSparkへ計算資源を割り振ってくれる。共通のリソースを分割して扱ってくれる。
特徴と機能
- EMRFS
- S3をHDFSのように扱える。
- CPUとストレージの分離
- S3の暗号化も対応
- Consistent View
- 新しいオブジェクトのプット → Read-after-write consistency
- 上書きプット/デリート → 結果整合性
- DynamoDBにメタデータを格納
- S3をHDFSのように扱える。
- Bootstrap Action
- ノード起動時に実行されるスクリプト
- Bash, Ruby, Python, etc
- S3におく
- 引数も自由
- ノード起動時に実行されるスクリプト
- Step
- 任意のタイミングで追加・設定できる処理
- HQLとか
- S3のURIやローカルファイルを指定して実行
- jar, Streaming, Hive, Pig, Spark, bash
- 任意のタイミングで追加・設定できる処理
- ジョブ実行
- クラスター外
- Step API, Lambda, Data Pipeline, Airflowなどから
- クラスター内
- マスターノードにssh
- AWS System Managerから
- Hive, Spark, Zeppelin, Hue, Livy, Oozie
- クラスター外
- Private Subnetの利用
- S3/DynamoにはVPCエンドポイントが必要
- それ以外にはNAT GW
- Security Groupが必要
- 暗号化
- at-rest: サーバーサイドやクライアントサイドで暗号化
- in-transit: TLS
- セキュリティ設定やCloudFormationで
- タグ
- タグをIAMポリシーのコンディションに指定してアクセス管理できる
- オートスケール
-
yarn.resourcemanager.decommissioning.timeout
- defalut: 1 hour
-
- HBaseのデータストアとしてのS3
- 可用性・耐久性が高くなる
- Cloud Watch Event
- EMRのイベントをモニタリングできる
- インスタンスフリート
- スポットインスタンスをまとめて管理
- カスタムAMIを利用したクラスター起動
- 追加ソフトウェアを事前にロードできるので設定が楽
- アプリケーション
- サポート
- クラスタ作成時にオプションをつけてインストール可能なもの
- カスタム
- 事前にApache Bigtopに追加してBootstrapでデプロイ
- カスタムAMIを使う
- サポート
ポイント
- Hive Metastoreは重要
- SparkやPrestoからも参照可能
- MetastoreのMySQLをクラスタ外にも作成可能
- Glue Data Catalogを利用可能
- 列指向ファイルフォーマット
- ORCとParquet
- I/O効率と圧縮効率がいい
- ORCとParquet
- HiveでKinesisのデータ処理
- Kinssisが保持するデータをHiveのテーブルとして扱える
- アーカイブ用途など
- StreamのShard毎にMapperがデータを読み出す
- Kinssisが保持するデータをHiveのテーブルとして扱える
- Spark
- 分散インメモリ処理フレームワーク
- アプリケーション群
- SQL
- HQL互換
- Streaming
- Discretized Streamと呼ばれる、高レベル抽象表現
- マイクロバッチ処理
- KinesisのストリームをEMRに流せる
- KCLがバックエンド
- 数秒〜数分、ニアリアルタイム
- MLlib
- GraphX
- SQL
- IAMロール: 2つ必要
- EMRロール
- EC2を起動するのに必要
- EC2 instance profile
- EC2がS3などにアクセスするのに必要
- EMRロール
- Kinesis統合
- Kinesisコネクタというものがある
- Kinesisストリームから直接データの読み取りとクエリを実行できる
- いちいち独立したアプリケーションを開発する必要がなくなった
- Cronなどでスケジュールされたジョブ実行ができる
- MapReduce自体はバッチ処理なので、ストリームを複数のバッチに分割する
- 各バッチを反復計算と呼ぶ
- バッチの失敗に際して再試行するので、冪等性を保証する
- 1つの反復計算で複数のクエリを実行できる
-
kinesis.checkpoint.iteration.no
を設定する - 論理名を別にすることで、並列クエリ実行ができる
-
- 連続ストリーム処理はできない
- Twitter StormやSpark Streamingを使えばなんとか
- 複数のKinesisストリームを結合することもできる
- ストリームにデータがないとき
-
kinesis.nodata.timeout
の時間分だけレコードの取得を試行する(ポーリング) - 時間が経ったらポーリングを停止して、すでにあるレコードだけ処理する
- 新しくレコードが来たら
kinesis.iteration.timeout
分だけ待ってポーリングする
-
- Kinesisコネクタというものがある
ベストプラクティス
データ移行
-
s3distcp
を使う-
distcp
はHadoopエコシステムでよく使われるコピーコマンド -
s3distcp
はS3に最適化されている
-
- AWS Import/Exportを使う
- データをストレージに入れて物理的に移行する
- データが大きい時に有効
- AWS Direct Connect
- AWSとオンプレミスを専用回線でつなぐ
- パブリック環境とプライベート環境とでネットワークを分離できる
データ集約
- 多数の小さいデータではなく、少数の大きいデータにまとめる
- S3のオブジェクトは、チャンクサイズに分割されてEMRに読み込まれる
- だいたいチャンクサイズは64MBとか
- データの圧縮形式が分割に対応していると良い
- LZOやBZIP2
- 集約されたデータのサイズは均等だと良い
- マッパーとレデューサーの間の中間データも圧縮できる
- データ量は
REDUCE_SHUFFLE
で確認できる - 圧縮は
mapreduce.output.compress
を true にする
- データ量は
- マッパーが書き出すファイルも圧縮できる
-
mapred.compress.map.output
を true にする
-
- データのパーティション
- 大きすぎるデータは逆に非効率なので、程よいサイズに分割する
- 1時間単位で分析するようなデータなら、日時でデータを分割する
インスタンスタイプ
EBSをアタッチできるようになったので、M4やC4ファミリーのインスタンスを使えるようになった
ただし暗号化されたEBSをサポートしていない
要件として、計算が重いのか、メモリが必要なのか、I/Oが重要なのかに応じてインスタンスを選ぶ
EC2 | A1, T3, T2, M5, M5a, M4 | C5, C5n, C4, z1d | R5, R5a, R4, X1e, X1, ハイメモリ | P3, P2, G3, F1 | H1, I3, D2 |
---|---|---|---|---|---|
A1: ARMアーキテクチャ。Tx: バースト可能、処理のスパイクに対応。Mx: 汎用 | コンピューティング最適化。CPUが強い | メモリ最適化。メモリが強い。 | GPU | ストレージ最適化。ローカルストレージが強い。データのスループット良い | |
EBS | gp2 | io1 | st1 | sc1 | |
汎用 | プロビジョンドIOPS。低レイテンシー・高スループット | スループット最適化HDD。HDFS向き。 | Cold HDD。低アクセス。HDFS向き |
- 一時的なクラスター: Transient Cluster
- 処理が終了したらシャットダウンするもの
- HDFSがプライマリストレージでない
- 試行錯誤する作業
- 処理が終了したらシャットダウンするもの
- 永続的なクラスター: Permanent Cluster
- 処理が終了しても起動状態にしておくもの
- ジョブが頻繁で定期的
- 相互のジョブに依存関係がある
- 処理が終了しても起動状態にしておくもの
- オンデマンドインスタンス
- 使用する処理能力に応じて課金
- 一時的なクラスターに向いている
- リザーブドインスタンス
- 予約するインスタンスに応じて予約金を払う
- オンデマンドより単価は下がる
- 永続的なクラスターに向いている
- スポットインスタンス
- 非稼動のEC2インスタンスに入札して利用する
- オンデマンドより単価が下がる
- 一時的なクラスターやタスクノードの追加に向いている
アーキテクチャ
- S3をプライマリストレージにする
- メリット
- データの耐久性
- コンピューティングとストレージの分離
- データセキュリティやライフサイクルの管理
- ストレージのスケーリング
- Transient Clusterの利用
- 複数クラスターでのデータ共有
- デメリット
- 試行錯誤する作業の場合は、S3とのネットワークがボトルネックになる
- メリット
- S3とHDFSの併用
- S3に保存しつつ、実行時にHDFSにコピーする
- メリット
- S3の耐久性と可用性
- Transient Clusterの利用
- 反復的な作業の効率化
- デメリット
- データコピーによる処理時間増加
- HDFSがプライマリ、S3をバックアップ
- メリット
- S3からデータをコピーする必要がない
- 頻繁な処理の高速化
- デメリット
- データの耐久性
- メリット
- 伸縮自在なクラスター
- マスターノードと必要最低限のコアノードを設定する
- タスクノードをデータ処理の要求に応じて追加・削除する
- インスタンスフリート
セキュリティ
- IAMによるリソースのアクセスコントロール
- セキュリティグループによるノードに対するネットワーク
- S3-SSEやKMS、CSE-KMSを利用したat-rest暗号化
- TLSを利用したin-transit暗号化
- オープンソースアプリケーションやLUKSを利用したローカルディスクでの暗号化
- Kerberos認証
細かいこと
- TCPウィンドウスケーリング
- TCP ウィンドウスケーリングを使用すると、64 KB を超えるウィンドウサイズをサポートすることにより、オペレーティングシステムおよびアプリケーションレイヤーと Amazon S3 との間でネットワークスループットのパフォーマンスを向上させることができます。TCP セッションの開始時に、クライアントは、そのサポートされているウィンドウ WSCALE 係数をアドバタイズし、Amazon S3 は、アップストリーム方向でサポートされている受信ウィンドウ WSCALE 係数を返します。
- TCP選択的伝送確認
- 一部は正しく受信しましたよ、と送信側に信号を送る。伝送効率が上がる。
- 高度な最適化
- データ構造の設計
- 適切にパーティショニングする
- Hadoopは基本的にバッチ処理フレームワークなので、時間制約がある場合はStormやSparkを使う
- クラスターの細かいチューニングよりノードの追加が簡単で有効
- マッパーの改善
- マップ数の削減
- 必然的に大きなデータを処理することになるので、マッパーあたりの処理量が向上する
- マッパー出力の圧縮
- マッパーのディスク書き出しの回避
- マップ数の削減
- レデューサーの改善
- アイドル状態レデューサーの削減
- レデューサーメモリの増加
- データ構造の設計