2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricksにおける空間分析アプローチの比較(準備編)

Posted at

こちらのブログで紹介されているノートブックの1つ目を地理空間情報の処理の勉強を兼ねてウォークスルーします。

上の記事も後で翻訳します。

ノートブックはこちら。

翻訳したものはこちら。

Notebook-01: H3-Geometry アプローチ比較のデータ準備

Photon化H3製品やMosaicの機能の様々な組み合わせを用いたGeometry-CentricH3-CentricH3-Geometry Hybridの比較のためのデータエンジニアリング。

クラスター

  • DBR 13.3 LTS + Photonで実行します。
  • このデータエンジニアリングの一部は非常に計算量が大きいので、オートスケーリング2から10などに設定したm5d.4xlarge(メモリーよりも計算処理を優先) AWSワーカーを使用します。

上の指示の通り、大きめのクラスターを準備します。
Screenshot 2024-02-10 at 14.19.10.png

セットアップ

%pip install databricks-mosaic==0.4.0 --quiet
from pyspark.databricks.sql.functions import *

from pyspark.sql import functions as F
from pyspark.sql.functions import udf, col
from pyspark.sql.types import *
from pyspark.sql import Window

import os
import pathlib
import requests

import mosaic as mos
mos.enable_mosaic(spark, dbutils)

from keplergl import KeplerGl

def display_kepler(kmap:KeplerGl, height=800, width=1200) -> None:
  """
  kepler.glで地図をレンダリングする便利関数
  - 直接レンダリングできない場合、あるいは
    %%mosaic_kepler magic以上のことを行いたい場合に使います
  """
  displayHTML(
    kmap
      ._repr_html_()
      .decode("utf-8")
      .replace(".height||400", f".height||{height}")
      .replace(".width||400", f".width||{width}")
  )

データベースとファイルパスの準備、Hiveメタストアを使っているのでUnity Catalogに移行したいところ。

username = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()
db_name = username.split('.')[0].replace("@","_") + "_h3_geometry"
sql(f"CREATE DATABASE IF NOT EXISTS {db_name}")
sql(f"USE {db_name}")
os.environ['username'] = username
os.environ['db_name'] = db_name
print(f"...username: '{username}', db_name: '{db_name}' (create & use)")

raw_path = f"dbfs:/tmp/mosaic/{username}"
print(f"...raw data will be stored in {raw_path}")
...username: 'takaaki.yayoi@databricks.com', db_name: 'takaaki_h3_geometry' (create & use)
...raw data will be stored in dbfs:/tmp/mosaic/takaaki.yayoi@databricks.com

tripテーブル

DatabricksプラットフォームではDelta Lakeとして保存されたビルトインのデータセットを利用できます。 -- このデータには1.6Bのトリップが含まれています。

%sql 
drop table if exists trip;
create table if not exists trip using DELTA location "/databricks-datasets/nyctaxi/tables/nyctaxi_yellow";
select * from trip limit 10;

Screenshot 2024-02-10 at 14.21.58.png

%sql select format_number(count(1),0) from trip

確かに1.6Bですね。
Screenshot 2024-02-10 at 14.22.24.png

サンプリングします。

df_trip_kepler = (
  spark.table("trip")
    .sample(.00001)
    .withColumn("pickup_wkt", mos.st_point("pickup_longitude", "pickup_latitude"))
)
print(f"count? {df_trip_kepler.count():,}")
count? 16,149

Mosaicの機能を使ってプロットします。

%%mosaic_kepler
df_trip_kepler "pickup_wkt" "geometry" 20_000

ふむー。
Screenshot 2024-02-10 at 14.23.41.png

taxi_zoneテーブル

GeoJSON -> Delta Lake -- 263のタクシーゾーンがあります。

注意:
空港に関して、 "Newark Airport" = 1, "LaGuardia Airport" = 138, "JFK Airport" = 132

raw_taxi_zones_path = f"{raw_path}/taxi_zones" 

ファイルを取得してテーブルを作成します。

fuse_taxi_zones_path = pathlib.Path(raw_taxi_zones_path.replace('dbfs:/', '/dbfs/'))
fuse_taxi_zones_path.mkdir(parents=True, exist_ok=True)
 
req = requests.get('https://data.cityofnewyork.us/api/geospatial/d3c5-ddgc?method=export&format=GeoJSON')
with open(fuse_taxi_zones_path / f'nyc_taxi_zones.geojson', 'wb') as f:
  f.write(req.content)
  
spark.sql(f"drop table if exists taxi_zone_raw;")
spark.sql(
  f"""
     create table if not exists taxi_zone_raw
     using json
     options (multiline = true)
     location "{raw_taxi_zones_path}";
     """
)

display(dbutils.fs.ls(raw_taxi_zones_path))
%sql 
drop table if exists taxi_zone;
create table if not exists taxi_zone as (
  select
    type,
    feature.properties as properties,
    st_astext(st_geomfromgeojson(to_json(feature.geometry))) as geom_wkt
  from
    (
      select
        type,
        explode(features) as feature
      from
        taxi_zone_raw
    )
);

-- 生テーブルは不要です
drop table if exists taxi_zone_raw;
select * from taxi_zone limit 10;

Screenshot 2024-02-10 at 14.25.31.png

%sql select format_number(count(1),0) from taxi_zone

Screenshot 2024-02-10 at 14.26.05.png

こちらも可視化します。

%%mosaic_kepler
"taxi_zone" "geom_wkt" "geometry"

Screenshot 2024-02-10 at 14.27.21.png

H3のセットアップ

trip_h3テーブル

これは、h3_lonlatash3を用いた、すべてのH3アプローチのためのポイントテーブルです。

注意:

  • より精度の高い分析のために解像度 12 を使っています(それぞれのセルはNYCにおいて約60ftの直径)が、後で行う追加分析では、h3_toparent製品APIを用いて解像度108をキャプチャします。
  • この取り組みのフォーカスである(pickup_cell)に対するZ-Orderは、分析内容に応じて、zorder by (pickup_cell, dropoff_cell)のように組み合わせのZ-orderingにしたり、別のテーブルのzorder by (dropoff_cell)のようにすることができます。
%sql 
-- この処理は8ワーカーノードなどでは準備に10+分程度必要とします
-- 実行する際には drop & optimize 文のコメントを解除してください
drop table if exists trip_h3;
create table if not exists trip_h3 as (
  select
    h3_longlatash3(pickup_longitude, pickup_latitude, 12) as pickup_cell,
    h3_toparent(h3_longlatash3(pickup_longitude, pickup_latitude, 12), 10) as pickup_cell_10,
    h3_toparent(h3_longlatash3(pickup_longitude, pickup_latitude, 12), 8) as pickup_cell_8,
    
    h3_longlatash3(dropoff_longitude, dropoff_latitude, 12) as dropoff_cell,
    h3_toparent(h3_longlatash3(dropoff_longitude, dropoff_latitude, 12), 10) as dropoff_cell_10,
    h3_toparent(h3_longlatash3(dropoff_longitude, dropoff_latitude, 12), 8) as dropoff_cell_8,
    *
  from (select /*+ REPARTITION(4096) */ * from trip)
);
optimize trip_h3 zorder by (pickup_cell);
select * from trip_h3 where isnotnull(pickup_cell) limit 5;

緯度経度をH3セルIDに変換していると言うことですね。
Screenshot 2024-02-10 at 14.28.26.png

df_trip_h3_kepler = (
  spark.table("trip_h3")
  .drop("pickup_latitude", "pickup_longitude", "dropoff_cell", "dropoff_cell_10", "dropoff_cell_8", "dropoff_latitude", "dropoff_longitude")
  .dropna(how="any", subset=["pickup_cell"])
  .sample(.00005)
)
print(f"count? {df_trip_h3_kepler.count():,}")
count? 62,384

こちらも可視化します。

%%mosaic_kepler
df_trip_h3_kepler "pickup_cell" "h3" 100_000

おおー、ポリゴン(ヘキサゴン)だ。
Screenshot 2024-02-10 at 14.29.41.png

taxi_zone_h3テーブル

H3-centricアプローチのために h3_polyfillash3 を活用します。

注意: (cell)に対するZ-Orderを実行します。

%sql 
drop table if exists taxi_zone_h3;
create table if not exists taxi_zone_h3 as (
  select
    explode(h3_polyfillash3(geom_wkt, 12)) as cell,
    properties.borough,
    properties.location_id,
    properties.zone
  from
    taxi_zone
);
optimize taxi_zone_h3 zorder by (cell);
select * from taxi_zone_h3 where location_id > 1 limit 5;

Screenshot 2024-02-10 at 14.30.50.png

df_taxi_zone_h3_kepler = spark.table("taxi_zone_h3").filter("location_id = 1")
print(f"count? {df_taxi_zone_h3_kepler.count():,}")
count? 23,780

こちらも可視化。

%%mosaic_kepler
df_taxi_zone_h3_kepler "cell" "h3" 25_000

Screenshot 2024-02-10 at 14.31.48.png

taxi_zone_h3_chipテーブル

H3-Geometry hybridアプローチとしてMosaicのgrid-tessellateexplodeを活用します。

オリジナルのジオメトリを、指定された解像度でいくつかのグリッドインデックスの境界で分割します。指定された解像度で入力ジオメトリをカバーするMosaicチップのセットを返却します。Mosaicチップは以下から構成される構造体タイプです:

  • is_core: 当該チップがジオメトリに完全に内包されるかどうかを示します: Boolean
  • index_id: 設定された空間インデックス(デフォルトはH3)のインデックスID: Integer
  • wkb: インデックスの形状とオリジナルのジオメトリの共通部分に等しいWKBフォーマットのジオメトリ: Binary

注意:(cell)に対するZ-Orderを実行します。

%sql 
-- この処理は複数のワーカーノードなどでは準備に数分を必要とします
-- 実行する際には drop & optimize 文のコメントを解除してください
drop table if exists taxi_zone_h3_chip;
create table if not exists taxi_zone_h3_chip as (
  select
    index.index_id as cell,
    * except(index),
    index
  from (
    select 
      grid_tessellateexplode(geom_wkt, 12),
      properties.borough,
      properties.location_id,
      properties.zone
    from (select /*+ REPARTITION(128) */ * from taxi_zone)
    )
);
optimize taxi_zone_h3_chip zorder by (cell);
select * from taxi_zone_h3_chip limit 5;

Screenshot 2024-02-10 at 14.34.35.png

# zip code 10004周辺のlocation_idを取得するために以下を実行します
# これは Financial District の一部です
display( 
spark.table("taxi_zone_h3_chip")
  .withColumn("center_geojson", h3_centerasgeojson("cell"))
  .withColumn("center_x", F.split(F.split(F.split('center_geojson','\[')[1],'\]')[0],',')[0].cast("float")) # <- lon
  .withColumn("center_y", F.split(F.split(F.split('center_geojson','\[')[1],'\]')[0],',')[1].cast("float")) # <- lat
  .filter("center_x >= -74.020") # <- lon
  .filter("center_x <= -73.998") # <-lon  
  .filter("center_y <= 40.712")  # <- lat
  .filter("center_y >= 40.698")  # <- lat
  .select("location_id","zone").distinct().orderBy("location_id").display()
)

Screenshot 2024-02-10 at 14.35.11.png

df_taxi_zone_h3_kepler_chip = (
  spark.table("taxi_zone_h3_chip")
    .filter(col("location_id").isin([12,13,209,231,261,33,45,87,88]))
    .select(
      "location_id",
      "zone",
      h3_h3tostring("cell").alias("h3_idx"),
      "index.is_core",
      mos.st_astext("index.wkb").alias("geom_wkt")
    )
)
print(f"count? {df_taxi_zone_h3_kepler_chip.count():,}")
display(df_taxi_zone_h3_kepler_chip.limit(1))

Screenshot 2024-02-10 at 14.35.36.png

チップを可視化します。

display_kepler(
  KeplerGl(
    config={ 
      'version': 'v1', 
      'mapState': {
        'latitude': 40.7, 
        'longitude': -74.0, 
        'zoom': 14
      }, 
      'mapStyle': {'styleType': 'dark'},
      'options': {'readOnly': False, 'centerMap': True}
    },
    data={
      'hexring_5_10': df_taxi_zone_h3_kepler_chip.toPandas()
    },
    show_docs=False,
  )
)

Screenshot 2024-02-10 at 14.36.59.png

テーブルの表示

オプション: この比較で何を使用したのかを見てみましょう

%sql show tables

Screenshot 2024-02-10 at 14.37.57.png

テーブルを準備したので、次はアプローチを比較します。

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?