この記事は?
この記事は、Hortonworks Advent Calendar 2016の17日目の記事です。
この記事で書いている内容は?
もう1つのHadoop Summitの参加レポートです。
各発表の概要(ただ、発表資料もあるので実際のところは補足になりますが)と、感想ですね。
データ活用を推進する「Pivotal HDB(Apache HAWQ(ホーク))」
発表資料
データ活用を推進する「Pivotal HDB(Apache HAWQ(ホーク))」 from Masayuki Matsushita
- SQL on Hadoopの一種 HDFS上の高速データベースエンジン
- HDFSに対する標準SQLによる高速クエリ処理
- Hive、HBase、AvroなどHadoopデータとの連携
- データローカリティ/ショートサーキットリード対応
- 各種統計解析関数に対応
- 元々はPivotal HDBだったものをOSS化し、Apache HAWQという形で公開したもの
- MapReduceやHiveで処理をしていたが、GPDBで並列分散DBがある
- 並列分散DBのファイルをHDFS上に配置可能とするようにして開発したのがHAWQ
- 他プロダクトとの親和性
- Ambariから既に統合管理が可能
- YARN上でプロセスを実行可能
- TPC-DS Performance Review – 比較: Pivotal HDB2.0 vs. Cloudera Impala 2.5
- こちらについては公開なし。
- 実際のところこれは微妙な内容ですので、詳細は感想部分に。
- どのような機能があるか?
- マスター/スレーブ方式の構造
- マスタは冗長化
- 何故早いのか?
- データを各DataNodeに存在するHAWQセグメントサーバで受け取り、libhdfs3で書き込み。
- 良く使用されている用途は?
- HDPと一緒に使っているケースが多い。SQLが高速なのが大きい。
感想:
「Pivotal HDB2.0 vs. Cloudera Impala 2.5」については、HDB2.0の方が数倍高速という結果でしたが、
既にImpala2.5は過去のバージョンですし、条件も不明確ということで、あてにはしません。
いわゆるネイティブなAPIを駆使したSQL on Hadoopの一種ですが、
HAWQを介して書き込みを行ったデータはHadoopの他のエコシステムから読めなくなるという事情(懇親会で聞きました)もあり、
実際のところはImpala + Kuduに近い存在のようにも思えます。
そのため、最終的にはImpala on KuduとApache HAWQで機能や実際のスピードについて
ガチの比較を見てみたい気はしますが・・・どなたかいらっしゃいませんですかね^^;
Spark Streamingを活用したシステムの検証結果と設計時のノウハウ
※現状資料未公開
- Sparkは?SparkStreamingは?
- もうおなじみなのでここでは省きます。
- SparkStreamingで出来ること
- Windowオペレーション
- 状態更新オペレーション(過去のデータの合計値に最新のデータを加算)
- 検証結果と設計時のノウハウ
- モデルシステム
- 運動リズムに合わせた音楽を再生するシステム
- 携帯電話から加速度などを収集
- 動作状況を判定5秒間隔で動作状況を判定
- 運動リズムに合わせた音楽を再生するシステム
- システム構成
- センサデータ>Kafka>SparkStreamimg(MLLib使用)>ElasticSearch&HDFS
- YARN上で動作
- SparkStreamingが5秒間隔で運動状態を判定
- 学習済みの学習モデルを使用して判定
- SparkStreamingはDirectStreaming方式で実行
- KafkaのParittion数と同数のTaskを実行
- 構築後のチューニング
- 使用するノードの数を調整して全体としてスループットが出るようチューニング
- Kafka Producerの送信メッセージサイズ調整
- ElasticSearchのリフレッシュ間隔を調整
- Producer、Brokerを増やすことでKafkaの性能を積み増し
- 発生した問題
- ノード間の通信量より、Kafkaクラスタのネットワーク帯域がボトルネックとなっている模様
- KafkaBrokerの1ノードあたりの通信量が帯域(1Gbps)を超過
- レプリケーションが遅延実行されるため、通信量はぎりぎりおさまっていた
- 使用するノードの数を調整して全体としてスループットが出るようチューニング
- 設計時のノウハウ
- KafkaのPartition数をSparkのExecutorに割り当てたCPUコア数以上に設定する必要がある
- パーティション数少ないとコアを使いきれない
- Kafkaのレプリケーションでネットワーク帯域がボトルネックとなりやすい
- 10GB回線にする
- Kafkaはメッセージをディスクに保存するため、ディスク性能がボトルネックになる場合もある
- レプリカ数を調整
- ディスクを追加
- SSD化
- KafkaのPartition数をSparkのExecutorに割り当てたCPUコア数以上に設定する必要がある
- モデルシステム
感想:
KafkaのPartition数を増やすことでSparkのスループットは増すのですが、
Partition数を増やしすぎると今度はIO競合で性能がガタ落ちするというのもあり、
このあたりのチューニングは実際に試しながらやる必要があるという印象でした。
Is Spark Streaming based on Reactive Streams?
発表資料:
Is spark streaming based on reactive streams? from chibochibo
- Back Pressureの重要性
- そもそもBack Pressureとは
- ストリーム処理にてデータのフロー制御用
- 過負荷であることを前段のコンポーネントにフィードバックする仕組み
- 何故重要なのか?
- 送信側で常に一定のデータ量を保つのは難しいため
- 一時的な増加などの波はあってもシステム全体として動き続けることの方が重要
- Sparkは1.5からBack Pressureに対応
- そもそもBack Pressureとは
- Reactive Streamsとは
- 非同期ストリーム処理の標準化を目指す規格
- ScalaだとAkka Streamsがサポート
- JDK 9でFlow APIとして導入予定
- Spring 5はReactive対応になり、その際にback pressureにも対応
- 動作原理
- Subscriber側でサイズを制限
- 過負荷に直面するとSubscriberはback pressureのシグナルを送信
- Back Pressureのシグナルは非同期
- 負荷状況に応じてPull型かPush型かは切り替わる
- Spark StreamingのBack Pressure実装
- Reactive Streamsには準拠していない。
- Reactive Streamsの設計方針からインスピレーションを受けているものの、Sparkの内部がこの仕様を遵守するつもりはないとのこと。
- ではどうやってBack Pressureを実現しているか?
- StreamingListener#onBatchCompletedメソッド
- ミニバッチが終了した段階で実行
- 下記のような情報が取得可能
- ミニバッチ実行時間
- 処理開始時刻、処理終了時刻
- スケジュール済~実行開始までの待ち時間
- 処理時間(ミニバッチ自体の処理時間)
- スケジュール済~完了までの総所要時間
- 上記の値をRateEstimatorに渡して新しい取得Rateを計算し、次の受信量を決める方式
- StreamingListener#onBatchCompletedメソッド
- Reactive Streamsには準拠していない。
感想:
準拠していないようです。
実際のところ、SparkのSubscriber自体はPull型のモデルになっており、
遅れというのは、「ミニバッチの実行間隔が定義した実行間隔内に収まらない」というものになります。
そのため、準拠すること自体がそもそも困難だろう、というのが見立てではありますが。
あとは、Kafka、Kinesis、Cloud PubSubのようなメッセージキューを介して一時的な流量増大に対処するのが
基本スタンスのストリームデータ処理基盤プロダクトだと
Reactive Streamsにそもそも準拠する必要はないというスタンスなのかもしれません。
AWSでつくる小中規模Apache Kafkaといろんな悩み
資料:
Awsでつくるapache kafkaといろんな悩み from Keigo Suda
- AWS上でKafkaを利⽤するために考えたこと
- どのようなポイントがあったか
- どのようにそのポイントに対応したか
- IoTのためのデータプラットフォームを開発中
- 工場内の各センサーデータを収集加工蓄積分析するための基盤
- IIoT(Industrial Internet of Things)
- 世界中にある工場に対して展開
- 工場内の各センサーデータを収集加工蓄積分析するための基盤
- そもそもなぜKafka on AWSなのか?
- 機密なデータも扱うのでVPCに対してDirectConnectで投入したかった
- Amazon KinesisやAmazon IoTでは暗号化されているとはいえ、インターネットに出てしまう。それは避けたい。
- Amazon Kinesis EndPointはないため、インターネットに出ない経路を保証することができない。
- Amazon KinesisやAmazon IoTでは暗号化されているとはいえ、インターネットに出てしまう。それは避けたい。
- 機密なデータも扱うのでVPCに対してDirectConnectで投入したかった
- KafkaでマルチAZをどうするのか?
- AZのどれかが障害発生しても全体としての停止は避けたい
- 東京リージョンは2AZのため、AZ毎にクラスタを配置することに
- AZのどれかが障害発生しても全体としての停止は避けたい
- どうやってKafkaにデータを投入するのか?
- API/Producer処理はGoで開発、クライアントライブラリはSaramaを利用
- Brokerは内部向けELBにアタッチし、接続先クラスタを切り替える
- トピック設計
- トピックを細かく分割すると爆発的に増えていくため、工場単位に集約し、後段のストリームデータ処理部で分割
- データの欠損が無いことをどうやって保証する?
- 各工場のデータと、最終的な出力データの数を突きあわせ
- 各工場からKafkaに対して投入したデータと、Kafkaに対して投入されたデータの数を突き合わせ
- 工場のデータを集約してKafkaに投入するため、突き合わせる個所は2か所
感想:
「Amazon Kinesis EndPointはないため、インターネットに出ない経路を保証することができない」や、
「東京リージョンは2AZのため、奇数台用意する必要があるクラスタの配置が困難」など、
AWSあるあるな苦労話満載でした・・・
ただ、特に突飛なことはやっておらず、やるべきことを確実に積み重ねて構築という、非常に参考になるセッションでした。
Hortonworks Data Cloudの概要
資料:
Introduction to Hortonworks Data Cloud for AWS from Yifeng Jiang
- 何故Hortonworks Data Cloudがあるのか?
- Ambariによって、クラスタに対するデプロイの手間は大幅に軽減された。
- だが、そもそもマシンを用意するのに時間がかかることが多く、導入に時間がかかる。
- ならクラウドだ!
- クラウド上でHortonworksのDataクラスタを構築
- 使用した分だけの課金
- Privateなフォーラム上でのコミュニティサポートを無償提供
- デモ
- クラウド上にクラスタを即展開し、S3上のデータを外部テーブルとしてHiveにインポートし、即読むことが出来た!
- オンプレミスのクラスタと異なる点
- クラスタ自体が一時的なものとなるため、永続化する必要があるものはクラスタを跨いで保存
- HiveServer用データなどのメタデータ
- S3上に配置したデータ
- コンピュートノードとストレージノードを分離
- S3へのアクセスがリモートとなるため、性能特性が異なってくる。
- レスポンスは劣るが、スループットは出る。
- S3へのホットスポットを避けるためにランダム順で取得しに行くなどの対処が入っている
- クラスタ単位にIAM Roleを割り振り、アクセス可能なデータを制限することも可能
- Apache Ranger、Apache Atlasと組み合わせることでデータ権限管理が柔軟・容易に
- Apache Ranger、Apache Atlasはマルチクラスタに対して対応可能になっている。
- クラスタ自体が一時的なものとなるため、永続化する必要があるものはクラスタを跨いで保存
- 今後追加される機能
- コンピュートノードの自動復旧
感想:
AWSには既にElastic MapReduceが存在するため、
クラウド上に単にディストリビューションを載せるだけでなく、
マルチクラスタ対応やメタデータ系の外部から読み込む等の対応を取っていることで差別化につなげているようでした。
実際、クラウドに乗せた段階でAmazonやGoogleの元々持っているビッグデータ基盤とも競合が発生するので、
どこを強みにするか、という立ち位置は重要だと感じました。
まとめ
全体通して発表の内容や傾向も多彩で、さすがHadoop Summit番外編という形でした。
特に最後の発表はクラウドも選択肢に入った中、同じ機能や性能で争うのではなく、
今後もより差別化が図られるのが楽しみになる発表でした。