1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Databricksランタイム13.0におけるApache Spark™ 3.4のご紹介

Posted at

Introducing Apache Spark™ 3.4 for Databricks Runtime 13.0 - The Databricks Blogの翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

本日、Databricks Runtime 13.0の一部としてApache Spark™ 3.4を利用できるようになったことを発表できることを嬉しく思っています。このSpark 3.4リリースに多大なる貢献をしたApache Sparkコミュニティに感謝の意を表します。

さらにSparkを統合し、あらゆるアプリケーションにSparkを提供し、生産性を高め、使用法をシンプルにし、新機能を追加するために、Spark 3.4では以下を含む様々な新機能を導入します:

  • Spark Connectを用いることで、いかなるアプリケーションからどこからでもSparkに接続。
  • 複数テーブルフォーマットにおけるカラムのDEFAULT値、タイムゾーンなしのタイムスタンプ、UNPIVOT、カラムエイリアス参照によるクエリーの簡素化のような新たなSQL機能による生産性能向上。
  • PySparkの新たなエラーメッセージフレームワークやSparkエグゼキューターメモリープロファイリングによるPython開発者体験の改善
  • クエリー数の削減、中間ストレージの脱却、カスタムロジックに対する任意ステートフルオペレーションのサポート、Protobufフォーマットでのレコードの読み書きのネイティブサポートによるパフォーマンスを改善やコストを削減するためのストリーミングの改善
  • SparkクラスターにおけるPyTorchの分散トレーニングを行うPySparkユーザーを支援。

本記事では、Apache Spark 3.4.0におけるトップレベルの機能と強化のいくつかの概要を説明します。これらの機能の詳細に関しては、より詳細に踏み込む今後の記事をお待ちいただくことをお勧めします。さらに、主要な機能の一覧やすべてのSparkコンポーネントで解決されたJIRAチケットに興味があるのであれば、Apache Spark 3.4.0のリリースノートをチェックすることをお勧めします。

Spark Connect

Apache Spark 3.4において、Spark Connectはどこでも動作している任意のアプリケーションからSparkクラスターへのリモート接続を実現する、分離されたクライアントサーバーアーキテクチャを導入します。このクライアントとサーバーの分離によって、モダンなデータアプリケーション、IDE、ノートブック、プログラミング言語はインタラクティブにSparkにアクセスできるようになります。Spark ConnectはSpark DataFrame APIのパワーを活用しています(SPARK-39375)。

Spark Connectを用いることで、クライアントアプリケーションはSparkクラスター外で動作するので、自分自身の環境にのみインパクトを与え、Sparkドライバーにおける依存関係の競合は排除され、企業においてはSparkをアップグレードする際にクライアントアプリケーションに変更を加える必要はなく、開発者は自分のIDEから直接クラアントサイドのステップスルーデバッグを行うことができます。

Spark ConnectはまもなくのDatabricks Connectのリリースを強化します。

Spark Connectによって、任意のクライアントアプリケーションからSparkへのリモート接続を実現します

PyTorch MLモデルの分散トレーニング

Apache Spark 3.4では、SparkクラスターでのPyTorchの分散トレーニングを行えるように、TorchDistributorモジュールが追加されます。内部では、環境とワーカー間のコミュニケーションチャネルを初期化し、ワーカーノード横断で分散トレーニングを実行するために、CLIコマンドtorch.distributed.runを活用します。このモジュールは、シングルノードマルチGPUとマルチノードGPUクラスターの両方における分散トレーニングジョブをサポートしています。使用法に関するコードスニペットのサンプルを以下に示します:

Python
from pyspark.ml.torch.distributor import TorchDistributor

def train(learning_rate, use_gpu):
  import torch
  import torch.distributed as dist
  import torch.nn.parallel.DistributedDataParallel as DDP
  from torch.utils.data import DistributedSampler, DataLoader

  backend = "nccl" if use_gpu else "gloo"
  dist.init_process_group(backend)
  device = int(os.environ["LOCAL_RANK"]) if use_gpu  else "cpu"
  model = DDP(createModel(), **kwargs)
  sampler = DistributedSampler(dataset)
  loader = DataLoader(dataset, sampler=sampler)

  output = train(model, loader, learning_rate)
  dist.cleanup()
  return output

distributor = TorchDistributor(num_processes=2, local_mode=False, use_gpu=True)
distributor.run(train, 1e-3, True)

詳細とサンプルノートブックに関しては、https://docs.databricks.com/machine-learning/train-model/distributed-training/spark-pytorch-distributor.html をご覧ください。

生産性の向上

テーブルのカラムに対するDEFAULT値のサポート(SPARK-38334): CSV、JSON、ORC、Parquetフォーマットのテーブルのカラムに対するSQLクエリーでデフォルト値指定がサポートされます。この機能はテーブル作成時と作成後の両方で動作します。以降のINSERT、UPDATE、DELETE、MERGEコマンドは、明示的なDEFAULTキーワードを使用しているすべてのカラムのデフォルト値を参照することができるようになります。あるいは、INSERTの割り当てにおいてターゲットテーブルよりも少ない明示的なカラムのリストがある場合、残りのカラムに対しては対応するカラムのデフォルト値で置き換えられます(デフォルト値が指定されていない場合にはNULLになります)。

例えば、新規テーブルを作成する際にカラムのDEFAULT値を指定するには:

SQL
CREATE TABLE t (first INT, second DATE DEFAULT CURRENT_DATE())
  USING PARQUET;
INSERT INTO t VALUES
  (0, DEFAULT), (1, DEFAULT), (2, DATE'2020-12-31');
SELECT first, second FROM t;

(0, 2023-03-28)
(1, 2023-03-28)
(2, 2020-12-31)

また、以下の例に示すようにUPDATE、DELETE、MERGE文でデフォルト値を使用することもできます。

SQL
UPDATE t SET first = 99 WHERE second = DEFAULT;

DELETE FROM t WHERE second = DEFAULT;

MERGE INTO t FROM VALUES (42, DATE'1999-01-01') AS S(c1, c2)
USING first = c1
WHEN NOT MATCHED THEN INSERT (first, second) = (c1, DEFAULT)
WHEN MATCHED THEN UPDATE SET (second = DEFAULT);

タイムゾーンなしの新たなタイムスタンプデータ型(SPARK-35662): Apache Spark 3.4では、タイムゾーンなしのタイムスタンプを表現する新たなデータ型が追加されました。これまでは、SQLクエリーに埋め込まれた、あるいはJDBCから引き渡されるSparkの既存のTIMESTAMPを用いて表現される値は、セッションのローカルのタイムゾーンを仮定しており、処理の前にはUTCへのキャストが必要でした。これらのセマンティックは、カレンダーを取り扱うような幾つかのケースでは望ましいものですが、ログファイルのような多くの他のケースでは、ユーザーはタイムゾーンに依存しないタイムスタンプを表現したいと考えます。このため、Sparkには新たなTIMESTAMP_NTZデータタイプが追加されました。

例えば:

SQL
CREATE TABLE ts (c1 TIMESTAMP_NTZ) USING PARQUET;
INSERT INTO ts VALUES
  (TIMESTAMP_NTZ'2016-01-01 10:11:12.123456');
INSERT INTO ts VALUES
  (NULL);
SELECT c1 FROM ts;

(2016-01-01 10:11:12.123456)
(NULL)

ラテラルカラムエイリアス参照(SPARK-27561): Apache Spark 3.4では、以前のアイテムを参照するためにSQLのSELECTリストでラテラルカラムリファレンスを使用することができるようになります。この機能によって、クエリーを構成する際、特に複雑なサブクエリーや共通テーブル表現を記述する必要がある際に非常に便利なものとなります。

例えば:

SQL
CREATE TABLE t (salary INT, bonus INT, name STRING)
  USING PARQUET;
INSERT INTO t VALUES (10000, 1000, 'amy');
INSERT INTO t VALUES (20000, 500, 'alice');
SELECT salary * 2 AS new_salary, new_salary + bonus
  FROM t WHERE name = 'amy';

(20000, 21000)

Dataset.to(StructType)(SPARK-39625): Apache Spark 3.4では、ソースのデータフレーム全体を指定されたスキーマに変換するDataset.to(StructType)という新たなAPIがサポートされます。この挙動は、入力クエリーがテーブルのスキーマにマッチするように調整されるテーブルの挿入と似ていますが、内部のフィールドにも動作するように拡張されています。これには以下が含まれています:

  • 指定されたスキーマにマッチするようにカラムと内部のフィールドを並び替え
  • 指定されたスキーマに必要とされないカラムと内部フィールドを排除
  • 期待されるデータタイプにマッチするようにカラムと内部フィールドをキャスト

例えば:

Scala
val innerFields = new StructType()
  .add("J", StringType).add("I", StringType)
val schema = new StructType()
  .add("struct", innerFields, nullable = false)
val df = Seq("a" -> "b").toDF("i", "j")
  .select(struct($"i", $"j").as("struct")).to(schema)
assert(df.schema == schema)
val result = df.collect()

("b", "a")

パラメーター化されたSQLクエリー(SPARK-41271, SPARK-42702): Apache Spark 3.4では、パラメーター化されたSQLクエリーを構築する機能がサポートされます。これによって、クエリーの裁量性が高まり、SQLインジェクション攻撃を防ぐことでセキュリティを改善します。このSparkSession APIは、キーがパラメーター名、値がScala/Javaリテラルであるmapを受け付けるsqlメソッドのオーバーライドによって拡張されます:

Scala
def sql(sqlText: String, args: Map[String, Any]): DataFrame

この拡張によって、リテラル値のような定数が許可される場所であれば、どこでもSQLテキストで名前付きのパラメーターを指定することができます。

SQLクエリーのパラメーター化のサンプルを示します:

Python
spark.sql(
    sqlText =
      "SELECT * FROM tbl WHERE date > :startDate LIMIT :maxRows",
    args = Map(
      "startDate" -> LocalDate.of(2022, 12, 1),
      "maxRows" -> 100))

UNPIVOT / MELTオペレーション(SPARK-39876, SPARK-38864): バージョン3.4までは、Apache SparkのデータセットAPIはPIVOTメソッドを提供していましたが、逆のオペレーションであるMELTは提供していませんでした。今では、後者も提供され、PIVOTで生成された幅広のデータフレームをUNPIVOTすることができ、オプションとして識別子のカラムセットをそのままにすることもできます。これは、逆変換できない集計処理を除く、groupBy(...).pivot(...).agg(...)の逆の処理となります。このオペレーションは、名前が示すように、いくつかが識別子のカラムであるデータフレームを、2つの非識別子カラムのみをそのままにして他のすべてのカラムを行に「unpivot」する際には有用です。

例:

Scala
val df = Seq((1, 11, 12L), (2, 21, 22L))
  .toDF("id", "int", "long")
df.show()
// output:
// +---+---+----+
// | id|int|long|
// +---+---+----+
// |  1| 11|  12|
// |  2| 21|  22|
// +---+---+----+

df.unpivot(
  Array($"id"),
  Array($"int", $"long"),
  "variable",
  "value")
  .show()
// output:
// +---+--------+-----+
// | id|variable|value|*
// +---+--------+-----+
// |  1|     int|   11|
// |  1|    long|   12|
// |  2|     int|   21|
// |  2|    long|   22|
// +---+--------+-----+

OFFSET句(SPARK-28330, SPARK-39159): そのままですが、Apache Spark 3.4のSQLクエリーではOFFSET句を使うことができます。このバージョン以前では、返却される行数を制限するにはLIMIT句を持つクエリーを発行することができました。今でもそれが可能ですが、OFFSET句を用いることで最初のN行を排除することもできます!Apache Spark™はこのオペレーションに要する作業量を最小化するために、効率的なクエリープランを作成して実行します。これはページネーションでよく使われますが、他の用途でも使用できます。

SQL
CREATE TABLE t (first INT, second DATE DEFAULT CURRENT_DATE())
  USING PARQUET;
INSERT INTO t VALUES
  (0, DEFAULT), (1, DEFAULT), (2, DATE'2020-12-31');
SELECT first, second FROM t ORDER BY first LIMIT 1 OFFSET 1;

(1, 2023-03-28)

FROM句における表形式の値ジェネレーター(SPARK-41594): 2021年時点で、SQL標準はISO/IEC 19075-7:2021 - Part 7: Polymorphic table functionsのセクションにおいて表形式の値の関数(table-valued functions)と呼ばれる構文をカバーしています。Apache Spark 3.4では、標準の方法に従ってデータのコレクションのクエリーと変換を容易にするためにこの構文をサポートしています。既存、新規のビルトインのtable-valued functionsはこの構文をサポートしています。

こちらがシンプルな例です。

SQL
SELECT * FROM EXPLODE(ARRAY(1, 2))

(1)
(2)

Numpyインスタンスの公式サポート(SPARK-39405): PySparkでNumPyインスタンスが公式にサポートされたので、NumPyインスタンスを用いてデータフレーム(spark.createDataFrame)を作成し、SQLエクスプレッションやMLに対するインプットとして指定できるようになります。

Python
spark.createDataFrame(np.array([[1, 2], [3, 4]])).show()
+---+---+
| _1| _2|
+---+---+
|  1|  2|
|  3|  4|
+---+---+

開発者体験の改善

エラークラスに対するSQLSTATE活用の強化(SPARK-41994): SQLSTATEとして知られる5バイトのコードを用いてSQLクエリーやコマンドからの返却状態を表現することは、データベース管理システム業界における標準となっています。このようにすることで、複数のクライアントとサーバーは、互いとのコミュニケーション方法を標準化し、実装をシンプルなものにしています。JDBC、ODBC接続経由でSQLクエリーやコマンドが送信される際には特に重要なものとなります。Apache Spark 3.4では、コミュニティで期待される方法にマッチするように、SQLSTATEの値を含めるようにエラーケースの大部分をアップデートすることで、この標準に準拠するようにしています。例えば、SQLSTATEの22003は数値が範囲外であること、22012はゼロによる除算を示しています。

エラーメッセージの改善(SPARK-41597, SPARK-37935): より多くのSparkの例外が、エラーメッセージの質を高めた新たなエラーフレームワーク(SPARK-33539)に移行されています。また、PySparkの例外は、この新たなフレームワークを活用しており、エラークラスとコードが分類されているので、ユーザーは例外が生じた際に固有のエラーケースに対して求められる挙動を定義することができます。

例:

Python
from pyspark.errors import PySparkTypeError

df = spark.range(1)

try:
    df.id.substr(df.id, 10)
except PySparkTypeError as e:
    if e.getErrorClass() == "NOT_SAME_TYPE":
        # Error handling
        ...

PySparkユーザー定義関数向けメモリープロファイラ(SPARK-40281): PySparkユーザー定義関数向けメモリープロファイラでは、当初Sparkエグゼキューターのプロファイリングをサポートしていませんでした。プログラムのパフォーマンスにおけるキーファクターの一つであるメモリーは、PySparkプロファイリングで欠けていました。Sparkドライバーで稼働しているPySparkプログラムは、すべてのPythonプロセスと同じように、その他のプロファイラーを用いて容易にプロファイルすることができますが、Sparkエグゼキューターのメモリーをプロファイリングする簡単な方法はありませんでした。PySparkにメモリープロファイラが含まれるので、ユーザーは自分のUDFを行ごとにプロファイルし、メモリー消費をチェックすることができます。

例:

Python
from pyspark.sql.functions import *

@udf("int")
def f(x):
   return x + 1

_ = spark.range(2).select(f('id')).collect()
spark.sparkContext.show_profiles()

============================================================
Profile of UDF<id=11>
============================================================
Filename: <command-1010423834128581>

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     3    116.9 MiB    116.9 MiB           2   @udf("int")
     4                                         def f(x):
     5    116.9 MiB      0.0 MiB           2       return x + 1

ストリーミングの改善

Project Lightspeed: Faster and Simpler Stream Processing with Apache Sparkは、Spark 3.4に更なる改善をもたらしています:

オフセット管理 - お客様のワークロードのプロファイリングとパフォーマンス実験によって、オフセット管理オペレーションが特定のパイプラインの実行時間の30-50%を消費していることが明らかになりました。これらのオペレーションを非同期にし、設定可能な周期で実行できるようにすることで、実行時間を劇的に改善することができます。

複数のステートフルオペレーターのサポート - ユーザーは、同じクエリーにおいて、連鎖した時間ウィンドウ集計を含む複数回のステートフルオペレーション(集計、重複排除、ストリーム同士のjoinなど)を実行することができます。これによって、ユーザーはメンテナンスコストを引き起こし、性能が出ない複数のストリーミングクエリーや、それらの間の中間ストレージを作成する必要がなくなります。これは、appendモードでのみ動作することに注意してください。

Pythonの任意ステートフル処理 - Spark 3.4以前は、PySparkは任意のステートフル処理をサポートしておらず、ユーザーは複雑かつカスタムのステートフルロジックを表現するためにはJava/Scala APIを使用しなくてはなりませんでした。Apache Spark 3.4以降は、ユーザーはPySparkで直接ステートフルな複雑な関数を表現できます。詳細は、構造化ストリーミングにおけるPythonの任意のステートフル処理の記事をご覧ください。

Protobufのサポート - Protobufのネイティブサポートは、特にストリーミングのユースケースでの要求が高いものでした。Apache Spark 3.4では、ユーザーはビルトインのfrom_protobuf()to_protobuf()関数を用いてProtobufフォーマットでレコードを読み書きすることができます。

Apache Spark 3.4におけるその他の改善

新機能に加えて、Sparkの最新リリースでは約2600の問題を解決することで、使いやすさ、安定性、改善性を強調しています。個人および、Databricks、LinkedIn、eBay、Baidu、Apple、Bloomberg、Microsoft、Amazon、Googleのような企業からなる270以上のコントリビューターがこの偉業に貢献しました。この記事では、Spark 3.4の特筆すべきSQL、Python、ストリーミングの改善にフォーカスしていますが、このマイルストーンにはここではカバーされていないその他の様々な改善点が含まれています。リリースノートで、bloomフィルターのjoin、スケーラブルなSpark UIのバックエンド、pandas APIのカバレッジの改善を含むこれらの機能に関して確認することができます。

Databricks Runtime 13.0でApache Spark 3.4を実験したいのであれば、無料のDatabricks Community Edition、あるいはDatabricksトライアルにサインアップすることで、簡単に行うことができます。アクセスしたら、バージョン13.0を選択することで、簡単にSpark 3.4のクラスターを起動することができます。この直感的なプロセスによって、Spark 3.4を数分でスタートすることができます。

Databricksクイックスタートガイド

Databricksクイックスタートガイド

Databricks無料トライアル

Databricks無料トライアル

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?