Introducing Apache Spark™ 3.1 - The Databricks Blogの翻訳です。
Databricks Runtime 8.0で利用できます
Databricks Runtime 8.0の一部としてApache Spark 3.1が利用可能になったことを発表できて、我々は非常に興奮しています。Spark 3.1のリリースにおいては、Apache Spark™コミュニティに多大なる感謝を申し上げます。
Sparkをより早く、より容易に、よりスマートにすると言うゴールを目指して、Spark 3.1では以下の機能を実現しました:
- Python使いやすさ改善
- ANSI SQLへの準拠
- クエリ最適化の強化
- シャッフルハッシュジョインの改善
- 構造化ストリーミングに対するヒストリサーバーのサポート
本記事では、改善項目及び機能に対するハイレベルの要約をご説明します。今後は個々の機能に関して、詳細な記事を公開しますのでご期待ください。すべてのSparkコンポーネント及び解決されたJIRAに関する包括的な一覧に関しては、Apache Spark 3.1.1のリリースノートを参照ください。
Project Zen
以下の観点からPySparkの使いやすさを改善するためにProject Zenはスタートしました:
- よりPythonicに
- PySparkにおけるユーザビリティの改善
- 他のPythonライブラリとの相互運用性(interoperability)
このプロジェクトの一環として、本リリースにおいてはこちらの記事にある様に、Pythonの型ヒントからPySparkドキュメントの刷新まで、PySparkに多くの改善がなされました。
- PySparkにおけるPython型ヒントのサポートは当初サードパーティのライブラリspark-stubsでスタートし、成熟し安定したものとなっています。本リリースにおいてPySparkは正式にPythonの型ヒントを導入しました(SPARK-32681)。開発者がIDEやノートブックにおいて、オートコンプリートを活用しながら開発を行う際に、Python型ヒントは大きな効果を発揮します。加えて、Python型ヒントによる静的型・エラー検知により、開発者は生産性を改善することができます。
Autocompletion in Databricks Notebooks from Databricks on Vimeo.
-
PySparkにおける依存性管理サポートを完全なものにし、PySparkユーザー・開発者向けのドキュメントを準備しました(SPARK-33824)。これまでの依存性管理は不完全なもので、ドキュメントも不十分、かつ、YARNでのみ動作するものでした。本リリースにおいては、-archiveオプション(SPARK-33530, SPARK-33615)を利用することで、Condaやvirtualenv、PEXの様なパッケージ管理システムが動作します。こちらのPython依存性管理に関する記事がPySparkのドキュメンテーションに貢献しています。
-
PyPIユーザー向けの新たなインストールオプションが導入されました(SPARK-32017)。pipはPySparkで用いられる最も一般的なインストール手段です。しかし、これまではPyPIのHadoop 2のみが利用可能であり、他のApache SparkにおけるHadoop 2やHadoop 3は利用できませんでした。本リリースでは、Project Zenの一環として、PyPIユーザーに対してすべてのオプションが利用可能となっています。これにより、あらゆるタイプの既存のSparkクラスターにおいて、PyPIからインストールを行いアプリケーションを実行することが可能になります。
-
PySparkの新たなドキュメンテーションが本リリースで導入されました(SPARK-31851)。これまでのPySparkのドキュメントは検索が困難で、APIリファレンスしか含まれていませんでした。本リリースにおいて、ドキュメントが刷新され、きめ細かい分類、探索しやすい階層構造が導入されました(SPARK-32188)。docstringは可読性の高いnumpydoc形式となりました(SPARK-32085)。また、デバッグ方法(SPARK-32186)、貢献方法(SPARK-32190)、テスト方法(SPARK-31851)、ノートブックによるクイックスタート(SPARK-32182)など有用なページが追加されました。
ANSI SQL準拠
本リリースでは、従来のデータウェアハウスからSparkへの移行を容易にするために、さらなるANSI SQL準拠がなされました。
-
Spark 3.0で導入されたANSIダイアレクトモードが強化されました。ANSIモードにおける振る舞いは、厳密にANSI SQLによるものではない場合でもANSI SQLのスタイルに即したものになっています。本リリースにおいては、多くのオペレーター・関数において、入力が不正な場合、NULLを返却するのではなくランタイムエラーをスローする様になりました(SPARK-33275)。また、本リリースにおいては、明示的な型キャスト[SQLリファレンス]においてより厳密なチェックが行われる様になりました。クエリに不正な型キャスト(例えば、日付/タイムスタンプの数値へのキャスト)を含む場合、コンパイル時にユーザーに対して不正な変換であることをエラーとして通知します。ANSIダイアレクトモードは開発中であるため、デフォルトでは有効化されていません。spark.sql.ansi.enabled を true にすることで有効化できます。今後のリリースでさらに安定化する予定です。
-
本リリースでは、様々なSQL機能が追加されました。広く使われている標準的な CHAR/VARCHAR データ型が、これまでサポートされていたStringタイプの派生として追加されました。wide_bucketの様なビルトイン関数(SPARK-21117)、regexp_extract_all(SPARK-24884)が追加されました。現時点でビルトインのオペレーター、関数の数は350にもなりました。INSERT(SPARK-32976)、MERGE(SPARK-32030)、EXPLAIN(SPARK-32337)などのDDL/DML/ユーティリティコマンドがさらに強化されました。本リリースから、WebUIにおいて、SQLプランが(EXPLAIN FORMATTEDを用いて)シンプルかつ構造化された形で表示される様になりました。
-
本リリースにおいて、CREATE TABLEのSQLシンタックスの統合が完了しました。Sparkは2セットのCREATE TABLEシンタックスを保持しています。SQL文がUSINGもSTORED AS句を含まない場合には、SparkはデフォルtのHiveファイルフォーマットを使用しました。spark.sql.legacy.createHiveTableByDefault が false(Spark 3.1ではデフォルトは true になっていますが、Databricks Runtime 8.0ではデフォルトは false になっています)の場合、デフォルトのテーブルフォーマットは spark.sql.sources.default(Spark 3.1ではデフォルトは parquet になっていますが、Databricks Runtime 8.0ではデフォルトは delta になっています) に依存します。これはすなわち、Databricks Runtime 8.0以降においては、よりすぐれタパフォーマンス、信頼性を提供するDelta Lakeテーブルがデフォルトのフォーマットになることを意味します。ユーザーが明示的にUSINGもしくはSTORED AS句を指定しない場合の、CREATE TABLEのSQLシンタックスの振る舞いの変化を例示します。
CREATE TABLE table1 (col1 int);
CREATE TABLE table2 (col1 int) PARTITIONED BY (partCol int);
以下がテーブルフォーマットのサマリーとなります。
Spark 3.0 (DBR 7) 以前 | Spark 3.1 * | DBR 8.0 | |
---|---|---|---|
デフォルトのフォーマット | Hive Text Serde | Parquet | Delta |
注意 Apache Sparkでは、手動でspark.sql.legacy.createHiveTableByDefault
をfalse
に設定する必要があります。そうしないと、デフォルトはHive Text Serdeとなります。
パフォーマンス
Catalystは多くのSparkアプリケーションにおいて、最適化を行うクエリコンパイラーです。Databricksでは、1日あたり数十億のクエリが最適化され実行されています。本リリースでは、クエリの最適化を強化し、クエリ処理をさらに高速なものにしています。
-
Predicate pushdown(述語のプッシュダウン) は、スキャン・処理するデータ量を削減できるため最も効果的なパフォーマンス改善の機能の一つとなっています。Spark 3.1においては、様々な機能強化がなされています:
- Filter述語、Join条件を再実装することで、より多くの述語がメタストア、データストアにプッシュダウンされる様になりました。
- Hiveメタストアにおけるパーティションのスキャンを削減するために、DATEデータタイプのサポート、contains/start-with/end-with/not-equalsなどのオペレーターをサポートすることで、パーティションのpredicate pushdownが改善されました。
- より広範にpredicate pushdownを有効にするために、数値データタイプにおけるバイナリー比較オペレーターにおいて型変換を解凍する新たなルールを追加しました(SPARK-32858, SPARK-24994)。
- JSONとAvroデータソースにおいて、predicate pushdownをサポートしました(SPARK-32346)。また、ORCデータタイプにおいては、ネストされたフィールドに対するpredicate pushdownをサポートしました。
- EXPANDオペレータを通じてFilterがプッシュされる様になりました(SPARK-33302)。
-
シャッフルの除外(shuffle removal)、副表現の排除(subexpression elimination)、**ネストされたフィールドの削除(nested field pruning)**も重要な最適化機能です。シャッフルの除外後には適合クエリ計画が適用できない場合がありますが、最もコストを要するオペレーションであるシャッフルはいくつかのケースにおいては避けるべきものです(SPARK-31869, SPARK-32282, SPARK-33399)。また、計算量を削減するためには、重複、冗長な表現を削除できます(SPARK-33092, SPARK-33337, SPARK-33427, SPARK-33540)。様々なオペレータにおいては、I/Oを削減し、以降の最適化を有効化する目的で、ネストされたフィールドを削除するために、カラム削除が適用できます(SPARK-29721, SPARK-27217, SPARK-31736, SPARK-32163, SPARK-32059)。
-
本リリースにおいて、シャッフルハッシュジョイン(SHJ)は対応するcodegen実行(SPARK-32421)とともに全てのjoinタイプをサポートします(SPARK-32399)。シャッフルマージソートジョイン(SMJ)と異なり、SHJはソートを必要としないため、ブロードキャストするには大きすぎる小規模なテーブルとより規模の大きいテーブルをジョインする際には、SMJよりもCPU、IOの観点で効率的です。しかしながら、SHJは構築するハッシュマップが大きい際にはメモリーを圧迫した結果、アウトオブメモリー(OOM)を引き起こす可能性があることに注意ください。
ストリーミング
分散ストリーム処理アプリケーションを構築するにはSparkはベストな選択肢となります。Databricksにおいては、構造化ストリーミングとして1日あたり10兆レコードが処理されています。本リリースにおいては、モニタリング、ユーザビリティ、機能が強化されました。
- 構造化ストリーミングアプリケーションをデバッグするために、ヒストリーサーバーのサポートが追加されました(SPARK-31953)。また、状態に関するメトリクス(SPARK-33223)、ウォーターマークのギャップ(SPARK-33224)、その他状態に関するカスタムメトリクス(SPARK-33287)を表示するためのライブUIがサポートされました。
Enhanced Debugging and Monitoring from Databricks on Vimeo.
-
こちらのノートブックにあるDataFrameReaderやDataFrameWriterのテーブルAPIの様に、ストリーミングデータフレームをテーブルに対して読み書きする新たなストリーミングテーブルAPIが追加されました。Databricks Runtimeにおいては、優れたパフォーマンス、exactly-onceの観点からDeltaテーブルフォーマットを使うことを推奨しています。
-
ストリーム同士のジョインにおいては、full outer(SPARK-32862)、left semi(SPARK-32863)の新たに二つのタイプのジョインをサポートしました。こちらの記事にある様に、Apache Spark 3.1までは、inner、left outer、right outerジョインがサポートされていました。
Spark 3.1におけるその他のアップデート
これらの新機能に加えて、本リリースは、1500チケット解決を通じて、ユーザビリティ、安定性、改善にフォーカスを置いています。これは、個人及びDatabricks、Google、Apple、Linkedin、Microsoft、Intel、IBM、Alibaba、Facebook、Nvidia、Netflix、 Adobeなどの企業を含む200ものコントリビューターによる貢献の賜物です。本記事では、キーとなるSQL、Python、ストリーミングをハイライトしましたが、3.1にはここではカバーしきれない機能が含まれています。KubernetesにおけるSparkのGAや、ノード停止、状態スキーマ評価、Sparkドキュメントの検索機能など他の機能に関しては、こちらのリリースノートを参照してください。