こちらのブログで紹介されているノートブックの1つ目を地理空間情報の処理の勉強を兼ねてウォークスルーします。
上の記事も後で翻訳します。
ノートブックはこちら。
翻訳したものはこちら。
Notebook-01: H3-Geometry アプローチ比較のデータ準備
Photon化H3製品やMosaicの機能の様々な組み合わせを用いた
Geometry-Centric
、H3-Centric
、H3-Geometry Hybrid
の比較のためのデータエンジニアリング。
クラスター
- DBR 13.3 LTS + Photonで実行します。
- このデータエンジニアリングの一部は非常に計算量が大きいので、オートスケーリング2から10などに設定したm5d.4xlarge(メモリーよりも計算処理を優先) AWSワーカーを使用します。
セットアップ
%pip install databricks-mosaic==0.4.0 --quiet
from pyspark.databricks.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, col
from pyspark.sql.types import *
from pyspark.sql import Window
import os
import pathlib
import requests
import mosaic as mos
mos.enable_mosaic(spark, dbutils)
from keplergl import KeplerGl
def display_kepler(kmap:KeplerGl, height=800, width=1200) -> None:
"""
kepler.glで地図をレンダリングする便利関数
- 直接レンダリングできない場合、あるいは
%%mosaic_kepler magic以上のことを行いたい場合に使います
"""
displayHTML(
kmap
._repr_html_()
.decode("utf-8")
.replace(".height||400", f".height||{height}")
.replace(".width||400", f".width||{width}")
)
データベースとファイルパスの準備、Hiveメタストアを使っているのでUnity Catalogに移行したいところ。
username = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()
db_name = username.split('.')[0].replace("@","_") + "_h3_geometry"
sql(f"CREATE DATABASE IF NOT EXISTS {db_name}")
sql(f"USE {db_name}")
os.environ['username'] = username
os.environ['db_name'] = db_name
print(f"...username: '{username}', db_name: '{db_name}' (create & use)")
raw_path = f"dbfs:/tmp/mosaic/{username}"
print(f"...raw data will be stored in {raw_path}")
...username: 'takaaki.yayoi@databricks.com', db_name: 'takaaki_h3_geometry' (create & use)
...raw data will be stored in dbfs:/tmp/mosaic/takaaki.yayoi@databricks.com
trip
テーブル
DatabricksプラットフォームではDelta Lakeとして保存されたビルトインのデータセットを利用できます。 -- このデータには1.6Bのトリップが含まれています。
%sql
drop table if exists trip;
create table if not exists trip using DELTA location "/databricks-datasets/nyctaxi/tables/nyctaxi_yellow";
select * from trip limit 10;
%sql select format_number(count(1),0) from trip
サンプリングします。
df_trip_kepler = (
spark.table("trip")
.sample(.00001)
.withColumn("pickup_wkt", mos.st_point("pickup_longitude", "pickup_latitude"))
)
print(f"count? {df_trip_kepler.count():,}")
count? 16,149
Mosaicの機能を使ってプロットします。
%%mosaic_kepler
df_trip_kepler "pickup_wkt" "geometry" 20_000
taxi_zone
テーブル
GeoJSON -> Delta Lake -- 263のタクシーゾーンがあります。
注意:
空港に関して、 "Newark Airport" = 1, "LaGuardia Airport" = 138, "JFK Airport" = 132
raw_taxi_zones_path = f"{raw_path}/taxi_zones"
ファイルを取得してテーブルを作成します。
fuse_taxi_zones_path = pathlib.Path(raw_taxi_zones_path.replace('dbfs:/', '/dbfs/'))
fuse_taxi_zones_path.mkdir(parents=True, exist_ok=True)
req = requests.get('https://data.cityofnewyork.us/api/geospatial/d3c5-ddgc?method=export&format=GeoJSON')
with open(fuse_taxi_zones_path / f'nyc_taxi_zones.geojson', 'wb') as f:
f.write(req.content)
spark.sql(f"drop table if exists taxi_zone_raw;")
spark.sql(
f"""
create table if not exists taxi_zone_raw
using json
options (multiline = true)
location "{raw_taxi_zones_path}";
"""
)
display(dbutils.fs.ls(raw_taxi_zones_path))
%sql
drop table if exists taxi_zone;
create table if not exists taxi_zone as (
select
type,
feature.properties as properties,
st_astext(st_geomfromgeojson(to_json(feature.geometry))) as geom_wkt
from
(
select
type,
explode(features) as feature
from
taxi_zone_raw
)
);
-- 生テーブルは不要です
drop table if exists taxi_zone_raw;
select * from taxi_zone limit 10;
%sql select format_number(count(1),0) from taxi_zone
こちらも可視化します。
%%mosaic_kepler
"taxi_zone" "geom_wkt" "geometry"
H3のセットアップ
trip_h3
テーブル
これは、h3_lonlatash3を用いた、すべてのH3アプローチのためのポイントテーブルです。
注意:
- より精度の高い分析のために解像度
12
を使っています(それぞれのセルはNYCにおいて約60ftの直径)が、後で行う追加分析では、h3_toparent製品APIを用いて解像度10
や8
をキャプチャします。 - この取り組みのフォーカスである
(pickup_cell)
に対するZ-Orderは、分析内容に応じて、zorder by (pickup_cell, dropoff_cell)
のように組み合わせのZ-orderingにしたり、別のテーブルのzorder by (dropoff_cell)
のようにすることができます。
%sql
-- この処理は8ワーカーノードなどでは準備に10+分程度必要とします
-- 実行する際には drop & optimize 文のコメントを解除してください
drop table if exists trip_h3;
create table if not exists trip_h3 as (
select
h3_longlatash3(pickup_longitude, pickup_latitude, 12) as pickup_cell,
h3_toparent(h3_longlatash3(pickup_longitude, pickup_latitude, 12), 10) as pickup_cell_10,
h3_toparent(h3_longlatash3(pickup_longitude, pickup_latitude, 12), 8) as pickup_cell_8,
h3_longlatash3(dropoff_longitude, dropoff_latitude, 12) as dropoff_cell,
h3_toparent(h3_longlatash3(dropoff_longitude, dropoff_latitude, 12), 10) as dropoff_cell_10,
h3_toparent(h3_longlatash3(dropoff_longitude, dropoff_latitude, 12), 8) as dropoff_cell_8,
*
from (select /*+ REPARTITION(4096) */ * from trip)
);
optimize trip_h3 zorder by (pickup_cell);
select * from trip_h3 where isnotnull(pickup_cell) limit 5;
df_trip_h3_kepler = (
spark.table("trip_h3")
.drop("pickup_latitude", "pickup_longitude", "dropoff_cell", "dropoff_cell_10", "dropoff_cell_8", "dropoff_latitude", "dropoff_longitude")
.dropna(how="any", subset=["pickup_cell"])
.sample(.00005)
)
print(f"count? {df_trip_h3_kepler.count():,}")
count? 62,384
こちらも可視化します。
%%mosaic_kepler
df_trip_h3_kepler "pickup_cell" "h3" 100_000
taxi_zone_h3
テーブル
H3-centricアプローチのために h3_polyfillash3 を活用します。
注意: (cell)
に対するZ-Orderを実行します。
%sql
drop table if exists taxi_zone_h3;
create table if not exists taxi_zone_h3 as (
select
explode(h3_polyfillash3(geom_wkt, 12)) as cell,
properties.borough,
properties.location_id,
properties.zone
from
taxi_zone
);
optimize taxi_zone_h3 zorder by (cell);
select * from taxi_zone_h3 where location_id > 1 limit 5;
df_taxi_zone_h3_kepler = spark.table("taxi_zone_h3").filter("location_id = 1")
print(f"count? {df_taxi_zone_h3_kepler.count():,}")
count? 23,780
こちらも可視化。
%%mosaic_kepler
df_taxi_zone_h3_kepler "cell" "h3" 25_000
taxi_zone_h3_chip
テーブル
H3-Geometry hybridアプローチとしてMosaicのgrid-tessellateexplodeを活用します。
オリジナルのジオメトリを、指定された解像度でいくつかのグリッドインデックスの境界で分割します。指定された解像度で入力ジオメトリをカバーするMosaicチップのセットを返却します。Mosaicチップは以下から構成される構造体タイプです:
-
is_core
: 当該チップがジオメトリに完全に内包されるかどうかを示します: Boolean -
index_id
: 設定された空間インデックス(デフォルトはH3)のインデックスID: Integer -
wkb
: インデックスの形状とオリジナルのジオメトリの共通部分に等しいWKBフォーマットのジオメトリ: Binary
注意:(cell)
に対するZ-Orderを実行します。
%sql
-- この処理は複数のワーカーノードなどでは準備に数分を必要とします
-- 実行する際には drop & optimize 文のコメントを解除してください
drop table if exists taxi_zone_h3_chip;
create table if not exists taxi_zone_h3_chip as (
select
index.index_id as cell,
* except(index),
index
from (
select
grid_tessellateexplode(geom_wkt, 12),
properties.borough,
properties.location_id,
properties.zone
from (select /*+ REPARTITION(128) */ * from taxi_zone)
)
);
optimize taxi_zone_h3_chip zorder by (cell);
select * from taxi_zone_h3_chip limit 5;
# zip code 10004周辺のlocation_idを取得するために以下を実行します
# これは Financial District の一部です
display(
spark.table("taxi_zone_h3_chip")
.withColumn("center_geojson", h3_centerasgeojson("cell"))
.withColumn("center_x", F.split(F.split(F.split('center_geojson','\[')[1],'\]')[0],',')[0].cast("float")) # <- lon
.withColumn("center_y", F.split(F.split(F.split('center_geojson','\[')[1],'\]')[0],',')[1].cast("float")) # <- lat
.filter("center_x >= -74.020") # <- lon
.filter("center_x <= -73.998") # <-lon
.filter("center_y <= 40.712") # <- lat
.filter("center_y >= 40.698") # <- lat
.select("location_id","zone").distinct().orderBy("location_id").display()
)
df_taxi_zone_h3_kepler_chip = (
spark.table("taxi_zone_h3_chip")
.filter(col("location_id").isin([12,13,209,231,261,33,45,87,88]))
.select(
"location_id",
"zone",
h3_h3tostring("cell").alias("h3_idx"),
"index.is_core",
mos.st_astext("index.wkb").alias("geom_wkt")
)
)
print(f"count? {df_taxi_zone_h3_kepler_chip.count():,}")
display(df_taxi_zone_h3_kepler_chip.limit(1))
チップを可視化します。
display_kepler(
KeplerGl(
config={
'version': 'v1',
'mapState': {
'latitude': 40.7,
'longitude': -74.0,
'zoom': 14
},
'mapStyle': {'styleType': 'dark'},
'options': {'readOnly': False, 'centerMap': True}
},
data={
'hexring_5_10': df_taxi_zone_h3_kepler_chip.toPandas()
},
show_docs=False,
)
)
テーブルの表示
オプション: この比較で何を使用したのかを見てみましょう
%sql show tables
テーブルを準備したので、次はアプローチを比較します。