LoginSignup
13
7

More than 3 years have passed since last update.

AWS認定Big Data勉強記 - 5: Amazon EMR

Posted at

みなさん、こんにちは、えいりんぐーです。

今回はEMRについてまとめます。

Q: Amazon EMR とは何ですか?

Amazon EMR は、企業、研究者、データアナリスト、および開発者が、簡単に、そして費用対効果の高い方法で、莫大な量のデータを処理することができるようにするウェブサービスです。Amazon Elastic Compute Cloud (Amazon EC2)、および Amazon Simple Storage Service (Amazon S3) のウェブスケールインストラクチャで実行されるホストされた Hadoop フレームワークを使用します。

資料集

基本的に以下の資料を参考にしています。

仕組み

Hadoopフレームワーク、マスターが多数のコアを管理して分散処理する。

ノード セキュリティグループ 役割
Master Node Master Instance Group - コアノードやタスクノードの監視。ネームノードやリソースマネジャー (YARN) が動く。Failover非対応。sshのポートが開いている。
Core Node Core Instance Group HDFSをアタッチされたボリュームとして持つ。計算を実行する。データノードが動く。追加したり、HDFSやCPUやRAMを増設できる。基本的にマスターとしか通信しない。
Task Node Task Instance Group HDFSを持たないコアノード。増設がより自由。スポット価格やインスタンスタイプを調整できる。基本的にマスターとしか通信しない。
YARN: Yet Another Resource Negotiator
  • Resource Manager
    • マスターサーバーで動く
    • スレーブのリソースを管理
    • ジョブ管理はしない
  • Node Manager
    • スレーブサーバーで動く
    • サーバーのリソースをRMに報告
    • サーバー上のコンテナの管理
  • Container
    • スレーブのリソースが切り出されたもの
    • NMによって起動
  • Application Master
    • ジョブ全体を管理するコンテナ

EMRにおいては、MapReduceやHiveやSparkへ計算資源を割り振ってくれる。共通のリソースを分割して扱ってくれる。

特徴と機能

  • EMRFS
    • S3をHDFSのように扱える。
      • CPUとストレージの分離
      • S3の暗号化も対応
    • Consistent View
      • 新しいオブジェクトのプット → Read-after-write consistency
      • 上書きプット/デリート → 結果整合性
      • DynamoDBにメタデータを格納
  • Bootstrap Action
    • ノード起動時に実行されるスクリプト
      • Bash, Ruby, Python, etc
      • S3におく
      • 引数も自由
  • Step
    • 任意のタイミングで追加・設定できる処理
      • HQLとか
    • S3のURIやローカルファイルを指定して実行
      • jar, Streaming, Hive, Pig, Spark, bash
  • ジョブ実行
    • クラスター外
      • Step API, Lambda, Data Pipeline, Airflowなどから
    • クラスター内
      • マスターノードにssh
      • AWS System Managerから
      • Hive, Spark, Zeppelin, Hue, Livy, Oozie
  • Private Subnetの利用
    • S3/DynamoにはVPCエンドポイントが必要
    • それ以外にはNAT GW
    • Security Groupが必要
  • 暗号化
    • at-rest: サーバーサイドやクライアントサイドで暗号化
    • in-transit: TLS
    • セキュリティ設定やCloudFormationで
  • タグ
    • タグをIAMポリシーのコンディションに指定してアクセス管理できる
  • オートスケール
    • yarn.resourcemanager.decommissioning.timeout
      • defalut: 1 hour
  • HBaseのデータストアとしてのS3
    • 可用性・耐久性が高くなる
  • Cloud Watch Event
    • EMRのイベントをモニタリングできる
  • インスタンスフリート
    • スポットインスタンスをまとめて管理
  • カスタムAMIを利用したクラスター起動
    • 追加ソフトウェアを事前にロードできるので設定が楽
  • アプリケーション
    • サポート
      • クラスタ作成時にオプションをつけてインストール可能なもの
    • カスタム
      • 事前にApache Bigtopに追加してBootstrapでデプロイ
      • カスタムAMIを使う

ポイント

  • Hive Metastoreは重要
    • SparkやPrestoからも参照可能
    • MetastoreのMySQLをクラスタ外にも作成可能
    • Glue Data Catalogを利用可能
  • 列指向ファイルフォーマット
    • ORCとParquet
      • I/O効率と圧縮効率がいい
  • HiveでKinesisのデータ処理
    • Kinssisが保持するデータをHiveのテーブルとして扱える
      • アーカイブ用途など
    • StreamのShard毎にMapperがデータを読み出す
  • Spark
    • 分散インメモリ処理フレームワーク
    • アプリケーション群
      • SQL
        • HQL互換
      • Streaming
        • Discretized Streamと呼ばれる、高レベル抽象表現
        • マイクロバッチ処理
        • KinesisのストリームをEMRに流せる
          • KCLがバックエンド
          • 数秒〜数分、ニアリアルタイム
      • MLlib
      • GraphX
  • IAMロール: 2つ必要
    • EMRロール
      • EC2を起動するのに必要
    • EC2 instance profile
      • EC2がS3などにアクセスするのに必要
  • Kinesis統合
    • Kinesisコネクタというものがある
      • Kinesisストリームから直接データの読み取りとクエリを実行できる
      • いちいち独立したアプリケーションを開発する必要がなくなった
    • Cronなどでスケジュールされたジョブ実行ができる
    • MapReduce自体はバッチ処理なので、ストリームを複数のバッチに分割する
      • 各バッチを反復計算と呼ぶ
      • バッチの失敗に際して再試行するので、冪等性を保証する
    • 1つの反復計算で複数のクエリを実行できる
      • kinesis.checkpoint.iteration.no を設定する
      • 論理名を別にすることで、並列クエリ実行ができる
    • 連続ストリーム処理はできない
      • Twitter StormやSpark Streamingを使えばなんとか
    • 複数のKinesisストリームを結合することもできる
    • ストリームにデータがないとき
      • kinesis.nodata.timeout の時間分だけレコードの取得を試行する(ポーリング)
      • 時間が経ったらポーリングを停止して、すでにあるレコードだけ処理する
      • 新しくレコードが来たら kinesis.iteration.timeout 分だけ待ってポーリングする

ベストプラクティス

データ移行
  • s3distcp を使う
    • distcp はHadoopエコシステムでよく使われるコピーコマンド
    • s3distcp はS3に最適化されている
  • AWS Import/Exportを使う
    • データをストレージに入れて物理的に移行する
    • データが大きい時に有効
  • AWS Direct Connect
    • AWSとオンプレミスを専用回線でつなぐ
    • パブリック環境とプライベート環境とでネットワークを分離できる
データ集約
  • 多数の小さいデータではなく、少数の大きいデータにまとめる
  • S3のオブジェクトは、チャンクサイズに分割されてEMRに読み込まれる
    • だいたいチャンクサイズは64MBとか
    • データの圧縮形式が分割に対応していると良い
      • LZOやBZIP2
  • 集約されたデータのサイズは均等だと良い
  • マッパーとレデューサーの間の中間データも圧縮できる
    • データ量は REDUCE_SHUFFLE で確認できる
    • 圧縮は mapreduce.output.compress を true にする
  • マッパーが書き出すファイルも圧縮できる
    • mapred.compress.map.output を true にする
  • データのパーティション
    • 大きすぎるデータは逆に非効率なので、程よいサイズに分割する
    • 1時間単位で分析するようなデータなら、日時でデータを分割する
インスタンスタイプ

EBSをアタッチできるようになったので、M4やC4ファミリーのインスタンスを使えるようになった
ただし暗号化されたEBSをサポートしていない
要件として、計算が重いのか、メモリが必要なのか、I/Oが重要なのかに応じてインスタンスを選ぶ

EC2 A1, T3, T2, M5, M5a, M4 C5, C5n, C4, z1d R5, R5a, R4, X1e, X1, ハイメモリ P3, P2, G3, F1 H1, I3, D2
A1: ARMアーキテクチャ。Tx: バースト可能、処理のスパイクに対応。Mx: 汎用 コンピューティング最適化。CPUが強い メモリ最適化。メモリが強い。 GPU ストレージ最適化。ローカルストレージが強い。データのスループット良い
EBS gp2 io1 st1 sc1
汎用 プロビジョンドIOPS。低レイテンシー・高スループット スループット最適化HDD。HDFS向き。 Cold HDD。低アクセス。HDFS向き
  • 一時的なクラスター: Transient Cluster
    • 処理が終了したらシャットダウンするもの
      • HDFSがプライマリストレージでない
      • 試行錯誤する作業
  • 永続的なクラスター: Permanent Cluster
    • 処理が終了しても起動状態にしておくもの
      • ジョブが頻繁で定期的
      • 相互のジョブに依存関係がある
  • オンデマンドインスタンス
    • 使用する処理能力に応じて課金
    • 一時的なクラスターに向いている
  • リザーブドインスタンス
    • 予約するインスタンスに応じて予約金を払う
    • オンデマンドより単価は下がる
    • 永続的なクラスターに向いている
  • スポットインスタンス
    • 非稼動のEC2インスタンスに入札して利用する
    • オンデマンドより単価が下がる
    • 一時的なクラスターやタスクノードの追加に向いている
アーキテクチャ
  • S3をプライマリストレージにする
    • メリット
      • データの耐久性
      • コンピューティングとストレージの分離
      • データセキュリティやライフサイクルの管理
      • ストレージのスケーリング
      • Transient Clusterの利用
      • 複数クラスターでのデータ共有
    • デメリット
      • 試行錯誤する作業の場合は、S3とのネットワークがボトルネックになる
  • S3とHDFSの併用
    • S3に保存しつつ、実行時にHDFSにコピーする
    • メリット
      • S3の耐久性と可用性
      • Transient Clusterの利用
      • 反復的な作業の効率化
    • デメリット
      • データコピーによる処理時間増加
  • HDFSがプライマリ、S3をバックアップ
    • メリット
      • S3からデータをコピーする必要がない
      • 頻繁な処理の高速化
    • デメリット
      • データの耐久性
  • 伸縮自在なクラスター
    • マスターノードと必要最低限のコアノードを設定する
    • タスクノードをデータ処理の要求に応じて追加・削除する
    • インスタンスフリート

セキュリティ

  • IAMによるリソースのアクセスコントロール
  • セキュリティグループによるノードに対するネットワーク
  • S3-SSEやKMS、CSE-KMSを利用したat-rest暗号化
  • TLSを利用したin-transit暗号化
  • オープンソースアプリケーションやLUKSを利用したローカルディスクでの暗号化
  • Kerberos認証

細かいこと

  • TCPウィンドウスケーリング
    • TCP ウィンドウスケーリングを使用すると、64 KB を超えるウィンドウサイズをサポートすることにより、オペレーティングシステムおよびアプリケーションレイヤーと Amazon S3 との間でネットワークスループットのパフォーマンスを向上させることができます。TCP セッションの開始時に、クライアントは、そのサポートされているウィンドウ WSCALE 係数をアドバタイズし、Amazon S3 は、アップストリーム方向でサポートされている受信ウィンドウ WSCALE 係数を返します。
  • TCP選択的伝送確認
    • 一部は正しく受信しましたよ、と送信側に信号を送る。伝送効率が上がる。
  • 高度な最適化
    • データ構造の設計
      • 適切にパーティショニングする
    • Hadoopは基本的にバッチ処理フレームワークなので、時間制約がある場合はStormやSparkを使う
    • クラスターの細かいチューニングよりノードの追加が簡単で有効
    • マッパーの改善
      • マップ数の削減
        • 必然的に大きなデータを処理することになるので、マッパーあたりの処理量が向上する
      • マッパー出力の圧縮
      • マッパーのディスク書き出しの回避
    • レデューサーの改善
      • アイドル状態レデューサーの削減
      • レデューサーメモリの増加
13
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
7