AWS認定Big Data勉強記 - 5: Amazon EMR

みなさん、こんにちは、えいりんぐーです。

今回はEMRについてまとめます。


Q: Amazon EMR とは何ですか?

Amazon EMR は、企業、研究者、データアナリスト、および開発者が、簡単に、そして費用対効果の高い方法で、莫大な量のデータを処理することができるようにするウェブサービスです。Amazon Elastic Compute Cloud (Amazon EC2)、および Amazon Simple Storage Service (Amazon S3) のウェブスケールインストラクチャで実行されるホストされた Hadoop フレームワークを使用します。


資料集

基本的に以下の資料を参考にしています。


仕組み

Hadoopフレームワーク、マスターが多数のコアを管理して分散処理する。

ノード
セキュリティグループ
役割

Master Node
Master Instance Group
- コアノードやタスクノードの監視。ネームノードやリソースマネジャー (YARN) が動く。Failover非対応。sshのポートが開いている。

Core Node
Core Instance Group
HDFSをアタッチされたボリュームとして持つ。計算を実行する。データノードが動く。追加したり、HDFSやCPUやRAMを増設できる。基本的にマスターとしか通信しない。

Task Node
Task Instance Group
HDFSを持たないコアノード。増設がより自由。スポット価格やインスタンスタイプを調整できる。基本的にマスターとしか通信しない。


YARN: Yet Another Resource Negotiator


  • Resource Manager


    • マスターサーバーで動く

    • スレーブのリソースを管理

    • ジョブ管理はしない



  • Node Manager


    • スレーブサーバーで動く

    • サーバーのリソースをRMに報告

    • サーバー上のコンテナの管理



  • Container


    • スレーブのリソースが切り出されたもの

    • NMによって起動



  • Application Master


    • ジョブ全体を管理するコンテナ



EMRにおいては、MapReduceやHiveやSparkへ計算資源を割り振ってくれる。共通のリソースを分割して扱ってくれる。


特徴と機能


  • EMRFS


    • S3をHDFSのように扱える。


      • CPUとストレージの分離

      • S3の暗号化も対応



    • Consistent View


      • 新しいオブジェクトのプット → Read-after-write consistency

      • 上書きプット/デリート → 結果整合性

      • DynamoDBにメタデータを格納





  • Bootstrap Action


    • ノード起動時に実行されるスクリプト


      • Bash, Ruby, Python, etc

      • S3におく

      • 引数も自由





  • Step


    • 任意のタイミングで追加・設定できる処理


      • HQLとか



    • S3のURIやローカルファイルを指定して実行


      • jar, Streaming, Hive, Pig, Spark, bash





  • ジョブ実行


    • クラスター外


      • Step API, Lambda, Data Pipeline, Airflowなどから



    • クラスター内


      • マスターノードにssh

      • AWS System Managerから

      • Hive, Spark, Zeppelin, Hue, Livy, Oozie





  • Private Subnetの利用


    • S3/DynamoにはVPCエンドポイントが必要

    • それ以外にはNAT GW

    • Security Groupが必要



  • 暗号化


    • at-rest: サーバーサイドやクライアントサイドで暗号化

    • in-transit: TLS

    • セキュリティ設定やCloudFormationで



  • タグ


    • タグをIAMポリシーのコンディションに指定してアクセス管理できる



  • オートスケール



    • yarn.resourcemanager.decommissioning.timeout


      • defalut: 1 hour





  • HBaseのデータストアとしてのS3


    • 可用性・耐久性が高くなる



  • Cloud Watch Event


    • EMRのイベントをモニタリングできる



  • インスタンスフリート


    • スポットインスタンスをまとめて管理



  • カスタムAMIを利用したクラスター起動


    • 追加ソフトウェアを事前にロードできるので設定が楽



  • アプリケーション


    • サポート


      • クラスタ作成時にオプションをつけてインストール可能なもの



    • カスタム


      • 事前にApache Bigtopに追加してBootstrapでデプロイ

      • カスタムAMIを使う






ポイント


  • Hive Metastoreは重要


    • SparkやPrestoからも参照可能

    • MetastoreのMySQLをクラスタ外にも作成可能

    • Glue Data Catalogを利用可能



  • 列指向ファイルフォーマット


    • ORCとParquet


      • I/O効率と圧縮効率がいい





  • HiveでKinesisのデータ処理


    • Kinssisが保持するデータをHiveのテーブルとして扱える


      • アーカイブ用途など



    • StreamのShard毎にMapperがデータを読み出す



  • Spark


    • 分散インメモリ処理フレームワーク

    • アプリケーション群


      • SQL


        • HQL互換



      • Streaming


        • Discretized Streamと呼ばれる、高レベル抽象表現

        • マイクロバッチ処理

        • KinesisのストリームをEMRに流せる


          • KCLがバックエンド

          • 数秒〜数分、ニアリアルタイム





      • MLlib

      • GraphX





  • IAMロール: 2つ必要


    • EMRロール


      • EC2を起動するのに必要



    • EC2 instance profile


      • EC2がS3などにアクセスするのに必要





  • Kinesis統合


    • Kinesisコネクタというものがある


      • Kinesisストリームから直接データの読み取りとクエリを実行できる

      • いちいち独立したアプリケーションを開発する必要がなくなった



    • Cronなどでスケジュールされたジョブ実行ができる

    • MapReduce自体はバッチ処理なので、ストリームを複数のバッチに分割する


      • 各バッチを反復計算と呼ぶ

      • バッチの失敗に際して再試行するので、冪等性を保証する



    • 1つの反復計算で複数のクエリを実行できる



      • kinesis.checkpoint.iteration.no を設定する

      • 論理名を別にすることで、並列クエリ実行ができる



    • 連続ストリーム処理はできない


      • Twitter StormやSpark Streamingを使えばなんとか



    • 複数のKinesisストリームを結合することもできる

    • ストリームにデータがないとき



      • kinesis.nodata.timeout の時間分だけレコードの取得を試行する(ポーリング)

      • 時間が経ったらポーリングを停止して、すでにあるレコードだけ処理する

      • 新しくレコードが来たら kinesis.iteration.timeout 分だけ待ってポーリングする






ベストプラクティス


データ移行



  • s3distcp を使う



    • distcp はHadoopエコシステムでよく使われるコピーコマンド


    • s3distcp はS3に最適化されている



  • AWS Import/Exportを使う


    • データをストレージに入れて物理的に移行する

    • データが大きい時に有効



  • AWS Direct Connect


    • AWSとオンプレミスを専用回線でつなぐ

    • パブリック環境とプライベート環境とでネットワークを分離できる




データ集約


  • 多数の小さいデータではなく、少数の大きいデータにまとめる

  • S3のオブジェクトは、チャンクサイズに分割されてEMRに読み込まれる


    • だいたいチャンクサイズは64MBとか

    • データの圧縮形式が分割に対応していると良い


      • LZOやBZIP2





  • 集約されたデータのサイズは均等だと良い

  • マッパーとレデューサーの間の中間データも圧縮できる


    • データ量は REDUCE_SHUFFLE で確認できる

    • 圧縮は mapreduce.output.compress を true にする



  • マッパーが書き出すファイルも圧縮できる



    • mapred.compress.map.output を true にする



  • データのパーティション


    • 大きすぎるデータは逆に非効率なので、程よいサイズに分割する

    • 1時間単位で分析するようなデータなら、日時でデータを分割する




インスタンスタイプ

EBSをアタッチできるようになったので、M4やC4ファミリーのインスタンスを使えるようになった

ただし暗号化されたEBSをサポートしていない

要件として、計算が重いのか、メモリが必要なのか、I/Oが重要なのかに応じてインスタンスを選ぶ

EC2
A1, T3, T2, M5, M5a, M4
C5, C5n, C4, z1d
R5, R5a, R4, X1e, X1, ハイメモリ
P3, P2, G3, F1
H1, I3, D2

A1: ARMアーキテクチャ。Tx: バースト可能、処理のスパイクに対応。Mx: 汎用
コンピューティング最適化。CPUが強い
メモリ最適化。メモリが強い。
GPU
ストレージ最適化。ローカルストレージが強い。データのスループット良い

EBS
gp2
io1
st1
sc1

汎用
プロビジョンドIOPS。低レイテンシー・高スループット
スループット最適化HDD。HDFS向き。
Cold HDD。低アクセス。HDFS向き


  • 一時的なクラスター: Transient Cluster


    • 処理が終了したらシャットダウンするもの


      • HDFSがプライマリストレージでない

      • 試行錯誤する作業





  • 永続的なクラスター: Permanent Cluster


    • 処理が終了しても起動状態にしておくもの


      • ジョブが頻繁で定期的

      • 相互のジョブに依存関係がある





  • オンデマンドインスタンス


    • 使用する処理能力に応じて課金

    • 一時的なクラスターに向いている



  • リザーブドインスタンス


    • 予約するインスタンスに応じて予約金を払う

    • オンデマンドより単価は下がる

    • 永続的なクラスターに向いている



  • スポットインスタンス


    • 非稼動のEC2インスタンスに入札して利用する

    • オンデマンドより単価が下がる

    • 一時的なクラスターやタスクノードの追加に向いている




アーキテクチャ


  • S3をプライマリストレージにする


    • メリット


      • データの耐久性

      • コンピューティングとストレージの分離

      • データセキュリティやライフサイクルの管理

      • ストレージのスケーリング

      • Transient Clusterの利用

      • 複数クラスターでのデータ共有



    • デメリット


      • 試行錯誤する作業の場合は、S3とのネットワークがボトルネックになる





  • S3とHDFSの併用


    • S3に保存しつつ、実行時にHDFSにコピーする

    • メリット


      • S3の耐久性と可用性

      • Transient Clusterの利用

      • 反復的な作業の効率化



    • デメリット


      • データコピーによる処理時間増加





  • HDFSがプライマリ、S3をバックアップ


    • メリット


      • S3からデータをコピーする必要がない

      • 頻繁な処理の高速化



    • デメリット


      • データの耐久性





  • 伸縮自在なクラスター


    • マスターノードと必要最低限のコアノードを設定する

    • タスクノードをデータ処理の要求に応じて追加・削除する

    • インスタンスフリート




セキュリティ


  • IAMによるリソースのアクセスコントロール

  • セキュリティグループによるノードに対するネットワーク

  • S3-SSEやKMS、CSE-KMSを利用したat-rest暗号化

  • TLSを利用したin-transit暗号化

  • オープンソースアプリケーションやLUKSを利用したローカルディスクでの暗号化

  • Kerberos認証


細かいこと


  • TCPウィンドウスケーリング


    • TCP ウィンドウスケーリングを使用すると、64 KB を超えるウィンドウサイズをサポートすることにより、オペレーティングシステムおよびアプリケーションレイヤーと Amazon S3 との間でネットワークスループットのパフォーマンスを向上させることができます。TCP セッションの開始時に、クライアントは、そのサポートされているウィンドウ WSCALE 係数をアドバタイズし、Amazon S3 は、アップストリーム方向でサポートされている受信ウィンドウ WSCALE 係数を返します。



  • TCP選択的伝送確認


    • 一部は正しく受信しましたよ、と送信側に信号を送る。伝送効率が上がる。



  • 高度な最適化


    • データ構造の設計


      • 適切にパーティショニングする



    • Hadoopは基本的にバッチ処理フレームワークなので、時間制約がある場合はStormやSparkを使う

    • クラスターの細かいチューニングよりノードの追加が簡単で有効

    • マッパーの改善


      • マップ数の削減


        • 必然的に大きなデータを処理することになるので、マッパーあたりの処理量が向上する



      • マッパー出力の圧縮

      • マッパーのディスク書き出しの回避



    • レデューサーの改善


      • アイドル状態レデューサーの削減

      • レデューサーメモリの増加