こちらをウォークスルーします。
翻訳版はこちら。
Mosaicとは
Databricks Labsが開発した大規模地理空間データ分析のためのライブラリです。
概要はこちら。
以下のようなツールを提供しています。
- データ取り込み(WKT、WKB、GeoJSON)
- データ処理
- ジオメトリ、ジオグラフィオペレーション
- インデックス作成
- インデックスグリッドに対するポリゴンやラインのチッピング
- データの可視化(Kepler)
Mosaicクイックスタートノートブックのウォークスルー
NYCタクシートリップとゾーンの間でのポイントインポリゴンの空間joinを実行します。 注意: ここでは大規模joinで利用できるパフォーマンスの調整には踏み込みません。
-
地理空間データのエンジニアリング、分析、可視化機能のためのDatabricks LabsによるMosaicライブラリを使うには:
-
%pip install databricks-mosaic
でインストール - 以下のようにインポートして利用:
import mosaic as mos mos.enable_mosaic(spark, dbutils)
-
-
地図レイヤーのレンダリングのためのKeplerGlOSSライブラリ:
- Mosaicとともにインストールされるので、マジックコマンド
%%mosaic_kepler
を使用 [Mosaic Docs] - 直接使うには
from keplergl import KeplerGl
でインポート
- Mosaicとともにインストールされるので、マジックコマンド
ボリュームにアクセスする際にトラブルが起きるのであれば:
- Mosaic 0.3 シリーズ (< DBR 13) - ワークアラウンドとしてリソースをDBFSにコピーします
- Mosaic 0.4 シリーズ (DBR 13.3 LTS) - DBFSへのリソースのコピーやUnity Catalogのセットアップ + ご自身のワークスペース管理者との共有アクセスの設定。アップデートされた手順はこちら。
注意
こちらを執筆時にはMosaic 0.4ではMosaicFrameでエラーとなってしまったため、0.3シリーズを使用しています。Issueも上がっています。
Mosaicのインストール
Mosaicフレームワークはpip installで利用でき、Python、SQL、Scala、Rのバインディングと共に提供されます。pip installで提供されるwheelファイルは他の言語サポートに必要なすべてのjarを登録します。
%pip install "databricks-mosaic==0.3.14"
# -- より計算資源を必要とするオペレーションではAQEを設定
# - 以下の option-1 か option-2 を選んでください。REPARTITIONで重要となります!
# spark.conf.set("spark.databricks.optimizer.adaptive.enabled", False) # <- option-1: フルコントロールのために完全にオフ
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False) # <- option-2: パーティション管理を少し調整
spark.conf.set("spark.sql.shuffle.partitions", 1_024) # <-- デフォルトは200
# -- databricks + spark 関数のインポート
from pyspark.sql import functions as F
from pyspark.sql.functions import col, udf
from pyspark.sql.types import *
# -- mosaicのセットアップ
import mosaic as mos
mos.enable_mosaic(spark, dbutils)
# mos.enable_gdal(spark) # <- このサンプルでは不要です
# -- その他のインポート
import os
import pathlib
import requests
import warnings
warnings.simplefilter("ignore")
データのセットアップ
user_name = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()
data_dir = f"/tmp/mosaic/{user_name}"
print(f"Initial data stored in '{data_dir}'")
Initial data stored in '/tmp/mosaic/takaaki.yayoi@databricks.com'
NYCタクシーゾーンのダウンロード
我々の環境でニューヨークのタクシーゾーンのシェイプを利用できるようにします。
zone_dir = f"{data_dir}/taxi_zones"
zone_dir_fuse = f"/dbfs{zone_dir}"
dbutils.fs.mkdirs(zone_dir)
os.environ['ZONE_DIR_FUSE'] = zone_dir_fuse
print(f"ZONE_DIR_FUSE? '{zone_dir_fuse}'")
ZONE_DIR_FUSE? '/dbfs/tmp/mosaic/takaaki.yayoi@databricks.com/taxi_zones'
zone_url = 'https://data.cityofnewyork.us/api/geospatial/d3c5-ddgc?method=export&format=GeoJSON'
zone_fuse_path = pathlib.Path(zone_dir_fuse) / 'nyc_taxi_zones.geojson'
if not zone_fuse_path.exists():
req = requests.get(zone_url)
with open(zone_fuse_path, 'wb') as f:
f.write(req.content)
else:
print(f"...skipping '{zone_fuse_path}', already exits.")
display(dbutils.fs.ls(zone_dir))
GeoJSONからの初期タクシーゾーン [ポリゴン]
Mosaicの提供する機能によって、容易にGeoJSONファイルをロードできます。
neighbourhoods = (
spark.read
.option("multiline", "true")
.format("json")
.load(zone_dir)
.select("type", F.explode(col("features")).alias("feature"))
.select("type", col("feature.properties").alias("properties"), F.to_json(col("feature.geometry")).alias("json_geometry"))
.withColumn("geometry", mos.st_aswkt(mos.st_geomfromgeojson("json_geometry")))
)
print(f"count? {neighbourhoods.count():,}")
neighbourhoods.limit(3).show() # <- 件数制限 + ipynb のみでの表示
count? 263
+-----------------+--------------------+--------------------+--------------------+
| type| properties| json_geometry| geometry|
+-----------------+--------------------+--------------------+--------------------+
|FeatureCollection|{EWR, 1, 1, 0.000...|{"coordinates":[[...|MULTIPOLYGON (((-...|
|FeatureCollection|{Queens, 2, 2, 0....|{"coordinates":[[...|MULTIPOLYGON (((-...|
|FeatureCollection|{Bronx, 3, 3, 0.0...|{"coordinates":[[...|MULTIPOLYGON (((-...|
+-----------------+--------------------+--------------------+--------------------+
いくつかの基本ジオメトリ属性の計算
Mosaicはジオメトリのプロパティを抽出するための関数を多く提供しています。こちらはポリゴンジオメトリに適したいくつかの関数です:
display(
neighbourhoods
.withColumn("calculatedArea", mos.st_area(col("geometry")))
.withColumn("calculatedLength", mos.st_length(col("geometry")))
# 注意: エリアと長さの計測単位は使用されたCRSに依存します。
# GPSロケーションではラジアンの二乗とラジアンです
.select("geometry", "calculatedArea", "calculatedLength")
.limit(3)
.show() # <- 件数制限 + ipynb のみでの表示
)
+--------------------+--------------------+-------------------+
| geometry| calculatedArea| calculatedLength|
+--------------------+--------------------+-------------------+
|MULTIPOLYGON (((-...|7.823067885002558E-4|0.11635745318867867|
|MULTIPOLYGON (((-...|0.001422779097814599| 0.8431218810128791|
|MULTIPOLYGON (((-...|3.144141568206508E-4| 0.0843411059010578|
+--------------------+--------------------+-------------------+
初期トリップデータ [ポイント]
ポイントデータを表現するタクシートリップデータのいくつかをロードします。このデータはDatabricksが提供している皆様の環境で利用できるパブリックなデータセットです。注意: これはそのままでは16億のトリップデータとなっています。処理自体には問題ありませんが、クイックスタートレベルを保つために、16億の1000分の1、160万を使用します。
trips = (
spark.table("delta.`/databricks-datasets/nyctaxi/tables/nyctaxi_yellow`")
# - .1% のサンプル
.sample(.001)
.drop("vendorId", "rateCodeId", "store_and_fwd_flag", "payment_type")
.withColumn("pickup_geom", mos.st_astext(mos.st_point(col("pickup_longitude"), col("pickup_latitude"))))
.withColumn("dropoff_geom", mos.st_astext(mos.st_point(col("dropoff_longitude"), col("dropoff_latitude"))))
# - row idの追加
.selectExpr(
"xxhash64(pickup_datetime, dropoff_datetime, pickup_geom, dropoff_geom) as row_id", "*"
)
)
print(f"count? {trips.count():,}")
trips.limit(10).display() # <- ipynb でのみ制限
空間join
Mosaicのインデクシング戦略の有り無しの両方でMosaicの空間joinを行うことができます。サイズやシェイプ(頂点の数など)の両方が非常に異なるジオメトリを取り扱う際には、インデクシングが非常に重要となります。
最適な解像度の取得
特定のデータフレームに含まれるデータに基づいて、どのようにデータのインデックスを作成するのがベストなのかを特定するためにMosaicの機能を活用することができます。適切なインデックスの解像度を選択することは、パフォーマンスに対して非常に大きなインパクトをもたらします。
from mosaic import MosaicFrame
neighbourhoods_mosaic_frame = MosaicFrame(neighbourhoods, "geometry")
optimal_resolution = neighbourhoods_mosaic_frame.get_optimal_resolution(sample_fraction=0.75)
print(f"Optimal resolution is {optimal_resolution}")
Optimal resolution is 9
すべての解像度でパフォーマンス改善があるわけではありません。経験則としては、過度のインデクシングよりも不足気味のインデックスの方が常に優れています - わからない場合には低い解像度を選択しましょう。サイズのバランスが非常に悪い、あるいは頂点の数が大きく異なるジオメトリを取り扱う際に、高い解像度が必要となります。そのような場合、より多くのインデックスによるインデックスの作成は、オペレーションの並列性を高めることになります。それぞれの計算量のバランスが取れている過度に複雑な行をパーティショニングする手段としてMosaicを捉えることができます。
display(
neighbourhoods_mosaic_frame.get_resolution_metrics(sample_rows=150)
)
最適な解像度を用いたインデックス作成
我々のポイントデータのインデックスを作成するためにMosaicのSQL関数を使用します。ここでは解像度9を使用し、インデックスの解像度は使用するデータセットに依存します。
tripsWithIndex = (
trips
.withColumn("pickup_h3", mos.grid_pointascellid(col("pickup_geom"), F.lit(optimal_resolution)))
.withColumn("dropoff_h3", mos.grid_pointascellid(col("dropoff_geom"), F.lit(optimal_resolution)))
# - テーブルの最初の32列のインデックスが有効となります
.selectExpr(
"row_id", "pickup_h3", "dropoff_h3", "* except(row_id, pickup_h3, dropoff_h3)"
)
)
display(tripsWithIndex.limit(10))
また、ビルトインのジェネレーター関数を用いて、我々の neighbourhoods のインデックスも作成します。
neighbourhoodsWithIndex = (
neighbourhoods
# オリジナルのジオメトリを、自身がインデックスを持つより小規模かつ複数のMosaicチップに分割します。
.withColumn(
"mosaic_index",
mos.grid_tessellateexplode(col("geometry"), F.lit(optimal_resolution))
)
# より小さいMosaicチップにブレークダウンしたので、オリジナルのジオメトリは不要です。
.drop("json_geometry", "geometry")
)
print(f"count? {neighbourhoodsWithIndex.count():,}") # <- explodeによって行数が増えていることに注意
neighbourhoodsWithIndex.limit(5).show() # <- 件数制限 + ipynb のみでの表示
count? 11,885
+-----------------+--------------------+--------------------+
| type| properties| mosaic_index|
+-----------------+--------------------+--------------------+
|FeatureCollection|{EWR, 1, 1, 0.000...|{true, 6177331507...|
|FeatureCollection|{EWR, 1, 1, 0.000...|{true, 6177331508...|
|FeatureCollection|{EWR, 1, 1, 0.000...|{true, 6177331508...|
|FeatureCollection|{EWR, 1, 1, 0.000...|{true, 6177331507...|
|FeatureCollection|{EWR, 1, 1, 0.000...|{true, 6177331508...|
+-----------------+--------------------+--------------------+
空間joinの実行
我々のデータセットの位置情報に基づいて、乗車と降車のゾーンの両方に空間joinを行うことができます。
pickupNeighbourhoods = neighbourhoodsWithIndex.select(col("properties.zone").alias("pickup_zone"), col("mosaic_index"))
withPickupZone = (
tripsWithIndex.join(
pickupNeighbourhoods,
tripsWithIndex["pickup_h3"] == pickupNeighbourhoods["mosaic_index.index_id"]
).where(
# 区がコアチップである場合(チップがジオメトリに完全に内包されている)、同じインデックスにマッチするすべてのポイントは間違いなく区に含まれているので、
# 交差処理を行う必要がありません。それ以外の場合、チップのジオメトリに対して st_contains オペレーションを実行する必要があります。
col("mosaic_index.is_core") | mos.st_contains(col("mosaic_index.wkb"), col("pickup_geom"))
).select(
"trip_distance", "pickup_geom", "pickup_zone", "dropoff_geom", "pickup_h3", "dropoff_h3"
)
)
display(withPickupZone.limit(10)) # <- ipynb でのみ制限
降車位置に対しても同様のjoinを容易に実行することができます。注意: このケースでは、上のjoinの左サイドで
withPickupZone
を使用しています。
dropoffNeighbourhoods = neighbourhoodsWithIndex.select(col("properties.zone").alias("dropoff_zone"), col("mosaic_index"))
withDropoffZone = (
withPickupZone.join(
dropoffNeighbourhoods,
withPickupZone["dropoff_h3"] == dropoffNeighbourhoods["mosaic_index.index_id"]
).where(
col("mosaic_index.is_core") | mos.st_contains(col("mosaic_index.wkb"), col("dropoff_geom"))
).select(
"trip_distance", "pickup_geom", "pickup_zone", "dropoff_geom", "dropoff_zone", "pickup_h3", "dropoff_h3"
)
.withColumn("trip_line", mos.st_astext(mos.st_makeline(F.array(mos.st_geomfromwkt(col("pickup_geom")), mos.st_geomfromwkt(col("dropoff_geom"))))))
)
display(withDropoffZone.limit(10)) # <- ipynb でのみ制限
Keplerで結果を可視化
Mosaicでは
%%mosaic_kepler
マジックコマンドを用いることで、PythonにおけるKeplerの取り扱いを抽象化します。ノートブック言語がPythonでない場合には、切り替えるために、マジックコマンドの前に%python
を追加することができます。
注意
以下で%%mosaic_kepler
のセルを実行すると、こちらのエラーAttributeError: 'DataFrame' object has no attribute 'iteritems'
が発生します。回避するために以下を実行しています。
import pandas as pd
pd.DataFrame.iteritems = pd.DataFrame.items
%%mosaic_kepler
withDropoffZone "pickup_h3" "h3" 5000