LoginSignup
0
0

Microsoft Fabric workout - Apache Spark Runtimes in Fabric

Posted at

勉強したり使ったりするとき、環境を知らずに迷うことを避けたいと思う。基礎だもの。

Microsoft Fabric Runtime

Multiple runtimes support

Workspace ごとで Runtime 切り替えができるけれども、お好きなのをどうぞってことではなさそうだ。

Runtime 1.1 🔗 Runtime 1.2 🔗 Runtime 1.3 🔗
Apache Spark 3.3.1 🔗 3.4.1 🔗 3.5.0 🔗
Spark SQL, PySpark 3.3.1 🔗 3.4.1 🔗 3.5.0 🔗
Spark SQL, Built-in Functions 3.3.1 🔗 3.4.1 🔗 3.5.0 🔗
Delta Lake 2.2.0 🔗 2.4.0 🔗 3.0.0 🔗
Python 3.10 🔗 3.10 🔗 3.10 🔗
Operating System Ubuntu 18.04 Mariner 2.0 Mariner 2.0
Java 8 11 11
Scala 2.12.15 2.12.17 2.12.17
R 4.2.2 4.2.2 N/A
  • Runtime 1.1(Spark 3.3, Delta 2.2): 以前のバージョン
  • Runtime 1.2(Spark 3.4, Delta 2.4): 現在のバージョン / ワークスペース既定値
  • Runtime 1.3(Spark 3.5, Delta 3 OSS): Experimental public preview
  • Runtime 2.x(Spark 4.x, Delta x ): 将来のバージョン

Upgrade Delta Lake protocol

プロトコルをアップグレードできるが、常に影響を考慮すべき。Lakehouse で運用できても Warehouse サポートされないとかありそうな話である。

Runtime 1.2
from delta.tables import DeltaTable

(
    DeltaTable
        .forName(spark, "table1")
        .upgradeTableProtocol(3, 1)
)

Protocol version upgrades can't be undone and might break the existing Delta Lake table readers, writers, or both. Consider upgrading specific tables only when needed, and make sure your production tools support Delta Lake tables with the new protocol version.

プロトコル バージョンのアップグレードは元に戻すことはできず、既存の Delta Lake テーブル リーダー、ライター、またはその両方が壊れる可能性があります。必要な場合にのみ特定のテーブルをアップグレードを検討し、運用ツールが新しいプロトコル バージョンの Delta Lake テーブルをサポートしていることを確認してください。

Delta 2.2 vs Delta 2.4 changes

  • 明示的にフォーマットを定義していた Runtime 1.1
  • フォーマットの既定値が delta になった Runtime 1.2

Runtime 1.2 では多少冗長的ではあるけれど、明示的にしておく。

Runtime 1.1: null | Runtime 1.2: 'delta'
spark.sparkContext.getConf().get("spark.sql.sources.default")
Runtime 1.1: 'parquet' | Runtime 1.2: 'delta'
spark.conf.get("spark.sql.sources.default")
Runtime 1.1: 'parquet' | Runtime 1.2: 'delta'
%%sql
DROP TABLE IF EXISTS table1
;

CREATE TABLE table1 AS
SELECT
    *
FROM
    range (5)
;
Runtime 1.1: 'parquet' | Runtime 1.2: 'delta'
spark.range(5).write.mode("overwrite").saveAsTable("table1")
Runtime 1.1 / Runtime 1.2 いずれも 'delta'
%%sql
DROP TABLE IF EXISTS table1
;

CREATE TABLE table1 USING DELTA AS
SELECT
    *
FROM
    range (5)
;
Runtime 1.1 / Runtime 1.2 いずれも 'delta'
spark.range(5).write.format("delta").mode("overwrite").saveAsTable("table1")

最適化/V-Order

Runtime 1.1 / Runtime 1.2 で違いはなかった。

df_conf = spark.createDataFrame(
    spark.sparkContext.getConf().getAll()
    , ["propertyName", "value"]
)
df_conf_fabric = (
    df_conf
        .filter(
            df_conf.propertyName.contains("microsoft")
            | df_conf.propertyName.contains("vorder")
        )
        .sort(df_conf.propertyName)
)

df_conf_fabric.show(truncate=False)
propertyName value
spark.microsoft.delta.merge.lowShuffle.enabled true
spark.microsoft.delta.optimizeWrite.binSize 1073741824
spark.microsoft.delta.optimizeWrite.enabled true
spark.microsoft.delta.optimizeWrite.partitioned.enabled true
spark.sql.parquet.vorder.autoEncoding false
spark.sql.parquet.vorder.dictionaryPageSize 1073741824
spark.sql.parquet.vorder.enabled true

その他

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0