Apache Spark™ Clusters in Autopilot Mode - The Databricks Blogの翻訳です。
Apache Spark™は、ユーザーが様々なユースケースで分散コンピューティングフレームワークを利用できるようにするための統合分析エンジンです。クラウドコンピューティングの利点を活用することで、Apache Sparkを用いたプラットフォームの構築は比較的容易なものとなっています。クラウドプロバイダーはセットアップを容易にするためのツール・サービスを提供しています。しかし、devopsやプラットフォームエンジニアがこのようなプラットフォームを構築し、維持していくために多大な時間を費やすことになる、以下に示すような困難な課題が存在します。
- 高い可用性: クラウドにおいては、全てのワークロードは安価なコモディティマシンで動作します。障害から容易に復旧できることが重要となります。また、ソフトウェアアップデートの際のサイドカーサービスのように、クラスターの高可用性の確保もまた重要となります。
- コストの最適化: クラスターは効率的にオートスケールするように設定され、使われていないときには停止し、スポットマーケットを活用し、スポットインスタンスを使用することでコストを削減しつつも、スポット価格の変動があった際にワークロードに影響を与えてはなりません。
- パフォーマンスの最適化: それぞれのワークロードには、それぞれのリソース要件があり、全てのリソースを効率的に活用して高速に実行される必要があります。
- ユーザーコードを取り扱う際のエラーの分離: ユーザーコードのエラーによってクラスターがダウンしないように設定される必要があります。
多くの場合、これらの問題に対するソリューションは、実装に膨大な時間を要するだけではなく、設定に複雑性をもたらし、手動で問題に対応しなくてはならないことが度々あります。この記事では、これらの問題に煩わさせることがなくなるようにDatabricksですぐに利用できる、Databricksクラスターの「オートパイロットモード」の主要な機能全てを説明します。
計算資源のオートスケーリング
計算資源のオートスケーリングは、多くのビッグデータプラットフォームが提供する基本的な機能となっています。しかし、これらのツールの多くは、単一ジョブに対して静的なリソースサイズが割り当てられ、クラウドの弾力性を活用できていません。YARNのようなリソーススケジューラーは、異なるジョブごとに「粒度の粗い」オートスケーリングを行なっており、Sparkジョブが完了した後にのみリソースを解放します。このことは、以下の二つの問題に直面することになります。
- ジョブに対する適切なサイズの見積もりには、多くのトライアンドエラーが必要となります。
- 多くの場合、ユーザーは1日の間、1週間、ブラックフライデーなどのイベントなどの、最も大きい負荷を想定して、リソースを過度に配備してしまいます。
Databricksのオートスケーリングは、Sparkスケジューラーにキューイングされた、粒度の細かいSparkタスクに基づいて、より動的に行われます。これによって、クラスターは負荷に対してより積極的にスケールアップ、スケールダウンできるようになり、ユーザーによる複雑な設定なしに自動でクラスターのリソース利用率を改善することができます。Databricksのオートスケーリングを活用することで、お使いのワークロードに応じて最大クラウドのコストを最大30%削減することができます。Databricksのオートスケーリングを用いることで、どのようにクラウドコストを削減できるのかに関しては、オートスケーリングに関する記事をご覧ください。
図1. Databricksの最適化されたリソース管理によって、よりワークロードに応じて適切なエグゼキューターの数を決定します。結果として、このケースでは、オートスケーリングを活用することで、ワークロード全体を通じてデプロイされるリソースは25%削減され、ユーザーによってコストが25%削減したことを意味します。
ローカルストレージのオートスケーリング
ビッグデータのワークロードにおいて、特に中間結果がメモリに収まらない場合には、様々なオペレーションでディスク領域へのアクセスが必要となります。必要なディスク容量がない場合には、ジョブは失敗します。ジョブの失敗を回避するために、データエンジニアとデータサイエンティストは、トライアンドエラーを通じて、必要なディスク容量の見積りに時間を浪費します。固定容量のローカルストレージを割り当て、ジョブを実行し、ジョブがディスク容量を使い切らないかどうか、システムのメトリクスをチェックします。この実験は、特に複数のジョブが単一のクラスターで稼働しているときにさらに複雑なものになり、かれらプロフェッショナルが真のゴールに到達するのを妨げ、コストを引き上げます。
Databricksクラスターにおいては、インスタンスのストレーージは計算資源と独立して透過的にオートスケールするので、データサイエンティストとデータエンジニアは、ETLジョブを開発フェーズから運用フェーズに移行する際に、劇的に生産性を改善することができます。
どのようにインスタンスのストレージをオートスケールするのか、そして、そのメリットに関しては、インスタンスストレージの透過的オートスケーリングに関する記事を参照ください。
図2. ローカルストレージのオートスケーリングが有効化されたインスタンスにおける空きディスク容量の変化を示したグラフ。空き容量が最低容量の閾値を下回った際には、追加のディスクをリクエストし、インスタンスにアタッチします。事前に設定した最大容量に到達するまでは、リクエストに応じてディスクを追加し続けます。
自動停止
Databricksクラスターは、お使いのクラウドコストを節約できるように、設定されたアイドル時間の経過後に自動で停止します。
図3. クラスターUIで設定できる自動停止オプション
自動起動
Databricksは自動で起動することもできます!クラスターがジョブスケジューラーやJDBC/ODBCインタフェースからコマンドを受診した際には、クラスターは自動で起動し、コマンドを実行します。この機能は、以下のようなシナリオで有用です。
- スケジュールされた実運用のジョブ: 一台のクラスターで複数のスケジュールされたDatabricksジョブがあり、コストを節約するためにクラスターをシャットダウンしたいアイドル期間がある場合には、これを簡単に実現することができます。停止されているクラスターにスケジュールされたジョブが送信された際にクラスターは自動で起動し、ジョブを実行します。
- BIユースケース: TableauなどのBIツールでDatabricksクラスターに接続している際、この機能がリリースされる前は、データアナリストはDatabricksにログインし、クラスターを起動する必要がありました。今では、Databricksにログインする必要はありません。Tableauからコマンドを実行するだけで大丈夫です。クラスターが停止状態の場合には、自動でクラスターが起動します。これによって、プラットフォームチームは、内部のデータアナリストがクラスターの概念を気にすることなしにビッグデータを様々な角度から分析できる、サーバーレスのビッグデータを提供することができます。
- クラスターのウォームアップ: 明日の9時にオフィスに出社した時に、すでにクラスターが起動しており、いくつかの分析が完了しており、必要なデータがキャッシュされており、オフィスに到着するとすぐに生産的な作業にとりかかれるといった状況を考えてみます。あなたがオフィスに到着したらすぐに作業を開始できるように、早朝にクラスターを起動し、必要な分析・データ準備を実行するジョブをスケジュールすることができます。このために、Databricks REST APIを用いたカスタムスクリプトを記述する必要はありません。
自動停止と自動起動を共に活用することで、ユーザーは一度設定を行うだけで、Databricksが使用状況に応じて自動でクラスターの起動・停止を行います。
スポット価格変動に対する自動リカバリ
クラウド環境においては、多くの理由で計算資源が障害を起こし得ます。
- スポットインスタンスや優先順位の低いコスト効率性の高いノードは、任意のタイミングでクラウドプロバイダーによって停止される場合があります。
- ソフトウェアバグによってノードが反応しなくなる場合があります。
- ネットワーク障害によって、インスタンスの機能不全が生じる場合があります。
分散ん計算環境においては、様々な障害の検知、復旧は主要な課題となっています。自動リカバリーのビルトインサポートによって、Databricksはこのような障害が起きても、Sparkワークロードが実行し続けられるように保証します。障害の性質に応じて、最小限のコストでクラスターを障害から回復させます。例えば、エグゼキューターがクラッシュあるいは応答なしになった場合には、自動でプロセスを再起動し、ワークロード全体が停止することを回避します。インスタンスが完全に反応しなくなった場合には、Databricksはクラウドプロバイダーから新規インスタンスを取得し、障害を起こしたインスタンスを置き換えます。
自動リカバリーによって、Databricksクラスターのコストを削減しつつも高い可用性を実現することができます。例としてAWSを考えてみると、ユーザーはクラスターにスポットインスタンスを使うように設定することができ、スポットインスタンを取得できない場合にはオンデマンドインスタンスに切り替えるように設定することができます。この設定によって、ユーザーによって指定されたスポットインスタンスのビッディング比率を用いて、Databricksはスポットインスタンスを取得しようとします。これに失敗した場合には、自動でオンデマンドインスタンスに切り替えるので、スポットインスタンス価格の変動があったとしても、スムーズにワークロードを実行することができます。
図4. スポット/オンデマンドの組み合わせ、フォールバックの設定をするためのUI
自動モニタリング
Databricksは、内部のモニタリングシステムに対して数百のメトリクスを毎分ごとストリームを送信します。これらのメトリクスには、クラスターの起動時間、クラスター、ノートの停止、ワーカーのディスク使用量の上昇、Spark設定のエラーが含まれます。モニタリングシステムは、自動でメトリクスをカテゴリ分けし、ダッシュボードに可視化します。これらのメトリクスを用いることで、Databricksがユーザーの代わりにクラスターの健康状態を検知し防御することができます。Sparkのドライバーノードがダウンしたり、クラウドプロバイダーの障害の長期化など、障害からの復旧が不可能な場合には、Databricksは診断情報を収集し、お客様に対してクラスター停止の理由をご報告します。
また、Databricksは、関連するインシデントの急激な増加に対してアラートを発呼します。例えば、不正なクラスター停止の割合が閾値を超えた場合には、根本原因を特定できるように、第一対応者に対して、モニタリングシステムが自動でアラートを送信します。Databricksは、お客様のために大規模なインスタンスを管理しているため、クラウドプロバイダーがレポートする前にクラウドの大規模障害を検知するケースもあります。根本原因が特定されると、Databricksは影響を受けたお客様に対して、障害および根本原因をお知らせするバナーを表示します。
図5. 2018/3/20のus-east-1におけるS3障害において匿名化した顧客のクラスター停止のバースト。Databricksでは、AWSがサービスステータスページにレポートする15分前に障害を検知しました。
自動ソフトウェアアップデート
SaaSプラットフォームとして、Databricksは定期的に新機能やバグフィックスをリリースしています。多数のアップデートは、Sparkのワーカーノードを含むクラスターにおけるサイドカーサービスに影響を与えます。ここでの課題は、数万のインスタンスを阻害したり、お客様の実行中のワークロードに影響を与えずにどのようにアップデートを行うかということです。
Databricksのアップデートマネージャは、既存のSparkワーカーで動作しているサイドカーサービスのバージョンをモニタリングし、それらを徐々に最新リリースにアップデートしていきます。このアップデートは、Sparkワークロードで性能劣化を引き起こさないように注意深く制御されます。新規クラスターは、常に最新ソフトウェアで起動されます。このようにして、お客様の本格運用のワークロードのダウンタイムをゼロにしつつも、リリース時の1時間以内に透過的にすべてのSparkクラスターに対してアップデートが実行されます。
自動プリエンプション
複数ユーザーがクラスターを共有している際、ユーザーの単一のジョブがクラスターの全てのリソースを占有してしまうことは一般的なことであり、クラスターの他の全てのジョブを遅くしてしまいます。クラスター上のユーザー数が増加すると、大規模なSparkジョブが全てのクラスターリソースを占有する確率が高まります。複数のデータペルソナが同一クラスターで異なるワークロードを実行すると、この問題はさらに悪化します。例えば、大規模ETLジョブを実行するデータエンジニアは、多くのケースで短期間のインタラクティブなクエリーを実行するデータアナリストの邪魔になるケースがあります。
このような問題に対応するために、Databricksクラスターは全てのユーザーが平等にクラスターの時間を確保できるように、ユーザーに対してオーバーコミットしないように、先行的にSparkのタスクを無効化します。これによって、全体的なリソースコストを最小化しつつも、高いインタラクティブ体験を提供できます。詳細に関しては、タスクプリエンプションのドキュメントを参照ください。
問題のあるジョブの自動停止
マルチユーザーのシナリオにおいては、ユーザーのジョブは、全てのリソースを消費しつくしてしまう問題のあるジョブになる可能性があります。タスクのプリエンプションを用いたとしても、ジョブはクラスターのリソースを枯渇させ、全てのジョブを停止させる可能性があります。例えば、意図せずして、大規模データセットをクロスジョインしてしまい、膨大な中間データを生成したというケースが考えられます。このような場合、Databricksのクラスターにはシンプルなプロセスが存在します。Query Watchdogは、ジョブが入力行と比較して大量の出力行をタスクレベルで生成したかどうかを定期的にチェックし、膨大な出力を生成する問題のあるジョブをkillします。
自動キャッシュ
インタラクティブな分析の過程で、ユーザーは同じデータに対して繰り返しアドホックのクエリーをジョブを実行するケースは多くあります。多くの場合、ユーザーはどのデータをキャッシュし、どのようにして使用されていないデータを除外し、キャッシュされたデータのロードバランス、キャッシュされたデータに対する更新の管理に頭を悩ませなくてはなりません。
Databricksでは、SSD搭載のインスタンスタイプを選択すると、リモートファイルがアクセスされた際は常に、Databricksが透過的にデータをディスク上のキャッシングフォーマットに変換し、インスタンスのSSDにキャッシュします。また、長い期間使用されていないデータの除外、クラスターの前ノードでキャッシュされたデータのロードバランスの面倒を見ます。また、キャッシュはオリジナルのソースロケーションにおけるファイルの追加・削除を自動で検知し、最新のデータを提示します。キャッシュ機能の詳細はDBIOキャッシュの記事をご覧下さい。
まとめ
この記事では、ユーザーがプラットフォームの心配をせずにデータにフォーカスできるようにするための、一連の「オートマティック」な機能をウォークスルーしました。これらの機能を通じて、我々のレイクハウスプラットフォームがビッグデータプロジェクトがゴールに到達するための時間を短縮できるのかをご説明しました。
これらの「オートマティック」な機能を試したいと考えているのでしたら、是非こちらからフリーのトライアルにサインアップしてください。
また、数千の企業ユーザーの問題を解決することに情熱を感じているのでしたら、こちらからご応募ください。積極的に採用中です!