Pythonを用いたマップマッチングの実装を紹介します.
今回はその中でも,最寄り交差点(ノード)への割り当てというシンプルな幾何的アルゴリズムを使います.
KD-tree (k-dimensional tree)とは
各GPS測位点を最近傍点に割り当てる際,その探索を高速化する手法として,KD-treeがあります.通常の全探索では,ノード数$n$に対して,$O(N)$の計算量ですが,kd-treeでは$O(\log n)$の計算量で探索を行います.
kd-treeについては,以下の記事が非常に丁寧で,参考になるかと思います.
簡単に言うと,「このエリアが最寄りになることはないから探索する必要ないよね」と判断し,不要な探索を行わないことで高速化しています.
Kd-treeを用いたマップマッチングの実装
以下は,KD-treeで最寄り交差点を探索し,移動経路にマッチングさせるコードです.df_1user
にある1人のユーザの位置情報が記録されているとします.
import osmnx as ox
import geopandas as gpd
import pandas as pd
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
from shapely.geometry import Point
# -----------------------------
# 1) OSMグラフの準備(中区などポリゴンで取得)
# -----------------------------
place = "Naka Ward, Yokohama, Kanagawa, Japan"
gdf_place = ox.geocode_to_gdf(place)
polygon = gdf_place.loc[0, "geometry"]
G = ox.graph_from_polygon(polygon, network_type="all", simplify=True)
G_proj = ox.project_graph(G)
nodes_gdf, edges_gdf = ox.graph_to_gdfs(G_proj)
# -----------------------------
# 2) ユーザーの GPS データを GeoDataFrame に変換
# -----------------------------
gdf_user = gpd.GeoDataFrame(
df_1user,
geometry=gpd.points_from_xy(df_1user.lon, df_1user.lat),
crs="EPSG:4326"
)
# CRSをグラフに合わせる(距離計算用)
gdf_user = gdf_user.to_crs(G_proj.graph['crs'])
# -----------------------------
# 3) KD-tree で最寄り交差点(次数>=3)を取得
# -----------------------------
deg_series = pd.Series(dict(G_proj.degree()))
nodes_gdf["degree"] = nodes_gdf.index.map(deg_series)
cand_nodes = nodes_gdf[nodes_gdf["degree"] >= 3].copy()
coords = np.vstack((cand_nodes["x"].values, cand_nodes["y"].values)).T
from scipy.spatial import cKDTree
tree = cKDTree(coords)
pt_coords = np.vstack((gdf_user.geometry.x, gdf_user.geometry.y)).T
dists, idxs = tree.query(pt_coords, k=1)
gdf_user["nearest_node"] = cand_nodes.index.values[idxs]
gdf_user["snap_dist_m"] = dists
# -----------------------------
# 4) 経路復元(shortest path)/連続重複除去
# -----------------------------
node_seq = gdf_user["nearest_node"].tolist()
node_seq = [node_seq[i] for i in range(len(node_seq)) if i == 0 or node_seq[i] != node_seq[i-1]]
full_path_nodes = []
for a, b in zip(node_seq[:-1], node_seq[1:]):
try:
sp = nx.shortest_path(G_proj, a, b, weight="length")
except nx.NetworkXNoPath:
sp = [a, b]
if full_path_nodes:
full_path_nodes.extend(sp[1:])
else:
full_path_nodes.extend(sp)
# -----------------------------
# 5) 可視化(GPS vs マップマッチング)
# -----------------------------
gps_x = gdf_user.geometry.x.values
gps_y = gdf_user.geometry.y.values
path_x = [G_proj.nodes[n]['x'] for n in full_path_nodes]
path_y = [G_proj.nodes[n]['y'] for n in full_path_nodes]
fig, ax = plt.subplots(figsize=(10, 10))
ox.plot_graph(G_proj, ax=ax, show=False, close=False, node_size=0, edge_color="lightgray")
ax.plot(gps_x, gps_y, color='blue', linewidth=2, alpha=0.7, label='Original GPS')
ax.plot(path_x, path_y, color='red', linewidth=2, alpha=0.7, label='Matched Path')
ax.legend(fontsize=12)
plt.title("Original GPS vs Map-Matched Path")
plt.show()
以下で詳しく見ていきましょう.ちなみに,(3)の部分はKD-treeを自作していますが,通常はox.distance.nearest_nodes
を使うのが無難です!
(1) OSMグラフの準備
# -----------------------------
# 1) OSMグラフの準備(中区などポリゴンで取得)
# -----------------------------
place = "Naka Ward, Yokohama, Kanagawa, Japan"
gdf_place = ox.geocode_to_gdf(place)
polygon = gdf_place.loc[0, "geometry"]
G = ox.graph_from_polygon(polygon, network_type="all", simplify=True)
G_proj = ox.project_graph(G)
nodes_gdf, edges_gdf = ox.graph_to_gdfs(G_proj)
対象地域の道路(歩行者)ネットワークをosmnx
ライブラリを用いて取得します.G_proj
に道路ネットワークが格納されます.必要に応じて,network_type="walk"
など選択できます.
また,simplify=True
は,連続する次数2のノードを統合する等の単純化を行い,ノード数を抑える操作です.
(2) df_1user
をGeoDataFrame に変換
地理空間情報を扱う際,GeoDataFrame形式に変換するのは鉄則です.
すでに変換している場合は不要です.
# -----------------------------
# 2) ユーザーの GPS データを GeoDataFrame に変換
# -----------------------------
gdf_user = gpd.GeoDataFrame(
df_1user,
geometry=gpd.points_from_xy(df_1user.lon, df_1user.lat),
crs="EPSG:4326"
)
# CRSをグラフに合わせる(距離計算用)
gdf_user = gdf_user.to_crs(G_proj.graph['crs'])
また先ほど取得した道路ネットワークG_proj
と座標参照系(CRS)を一致させます.これも地理情報を扱う際の鉄則です.
(3) KD-treeで最寄り交差点(次数 > 3)を取得
KD-treeを用いて最寄り交差点探索を高速化します.ここで,交差点を,「道路ネットワークにおける次数3以上のノード」と定義します.
# -----------------------------
# 3) KD-tree で最寄り交差点(次数>=3)を取得
# -----------------------------
deg_series = pd.Series(dict(G_proj.degree()))
nodes_gdf["degree"] = nodes_gdf.index.map(deg_series)
cand_nodes = nodes_gdf[nodes_gdf["degree"] >= 3].copy()
coords = np.vstack((cand_nodes["x"].values, cand_nodes["y"].values)).T
from scipy.spatial import cKDTree
tree = cKDTree(coords)
pt_coords = np.vstack((gdf_user.geometry.x, gdf_user.geometry.y)).T
dists, idxs = tree.query(pt_coords, k=1)
gdf_user["nearest_node"] = cand_nodes.index.values[idxs]
gdf_user["snap_dist_m"] = dists
KD-treeのアルゴリズムの手順を詳しく追いましょう.
(3)-1. ノードに次数を付与
deg_series = pd.Series(dict(G_proj.degree()))
nodes_gdf["degree"] = nodes_gdf.index.map(deg_series)
-
G_proj.degree()
は,グラフG_proj
中の各ノードの(node_id, degree)
のペアを返します. -
dict(...)
で{node_id: degree}
の辞書に変換します. -
pd.Series(...)
でindex=node_id
,values=degree
のSeries
に
(3)-2. 交差点候補(次数 ≥ 3)を抽出し、座標配列を用意
cand_nodes = nodes_gdf[nodes_gdf["degree"] >= 3].copy()
coords = np.vstack((cand_nodes["x"].values, cand_nodes["y"].values)).T
(3)-3. KD-treeを構築
from scipy.spatial import cKDTree
tree = cKDTree(coords)
SciPy
の cKDTree
で KD-tree(C実装)を構築できます.
(3)-4. GPS点を同じ形の配列にして最近傍検索
pt_coords = np.vstack((gdf_user.geometry.x, gdf_user.geometry.y)).T
dists, idxs = tree.query(pt_coords, k=1)
-
np.vstack
でgdf_user
の緯度経度(x座標,y座標)
を縦方向に積み上げ,.T
によって行列を転置します.すなわち,pt_coords
は$(N\times2)$のNumPy配列で,cKDTree
の入力形式です. -
tree.query(pt_coords, k=1)
は,各ユーザーGPS点について,KD-tree内の点(交差点候補ノード)のうち最も近い点 (k=1) を探索します.-
dists
: 各GPS 点から最近傍ノードまでのユークリッド距離 -
idxs
: 最近傍がcoords
の 何番目($0〜N-1$)かの位置インデックス
-
(3)-5. 最近傍ノードID と距離を gdf_user に付与
gdf_user["nearest_node"] = cand_nodes.index.values[idxs]
gdf_user["snap_dist_m"] = dists
-
cand_nodes.index.values
: 候補ノードcand_nodes
のインデックス(ノードID)を NumPy 配列として取り出す -
dists
: 各GPS点と最寄り交差点のユークリッド距離
(3)部分は,KD-treeを自作せず,シンプルにox.distance.nearest_nodes
を用いる場合は,以下です( *´艸`)(遠くの交差点に割り当てられる可能性が低いため,絶対こっちの方が良いです.)
# x座標(経度系)とy座標(緯度系)を渡す
nearest_nodes = ox.distance.nearest_nodes(
G_proj,
X=gdf_user.geometry.x.values,
Y=gdf_user.geometry.y.values,
return_dist=True
)
# nearest_nodes は (ノードID配列, 距離配列) のタプルになる
gdf_user["nearest_node"], gdf_user["snap_dist_m"] = nearest_nodes
先ほどのように次数制限を設ける場合は,同様に行います.上記のコードでは,次数制限は設けていません.
(4)経路復元 (shortest path)
ステップ(4)では,割り当てられたノード列の最短経路を求めます.
(4)-1. nearest_node
の列をリスト化
node_seq = gdf_user["nearest_node"].tolist()
(4)-2. 連続した重複ノードを削除
A,A,A,B,B,C → A,B,C にする操作です.
node_seq = [node_seq[i] for i in range(len(node_seq)) if i == 0 or node_seq[i] != node_seq[i-1]]
(4)-3. 隣接するノードペアごとに最短経路を計算
full_path_nodes = []
for a, b in zip(node_seq[:-1], node_seq[1:]):
try:
sp = nx.shortest_path(G_proj, a, b, weight="length")
except nx.NetworkXNoPath:
sp = [a, b]
if full_path_nodes:
full_path_nodes.extend(sp[1:])
else:
full_path_nodes.extend(sp)
グラフ上で a
から b
への最短ノード列を返します.例外処理として,経路が無い場合は仮で [a,b]
を入れます.
そして最終的に全体経路full_path_nodes
に連結させます.
(5)可視化
# -----------------------------
# 5) 可視化(GPS vs マップマッチング)
# -----------------------------
gps_x = gdf_user.geometry.x.values
gps_y = gdf_user.geometry.y.values
path_x = [G_proj.nodes[n]['x'] for n in full_path_nodes]
path_y = [G_proj.nodes[n]['y'] for n in full_path_nodes]
fig, ax = plt.subplots(figsize=(10, 10))
ox.plot_graph(G_proj, ax=ax, show=False, close=False, node_size=0, edge_color="lightgray")
ax.plot(gps_x, gps_y, color='blue', linewidth=2, alpha=0.7, label='Original GPS')
ax.plot(path_x, path_y, color='red', linewidth=2, alpha=0.7, label='Matched Path')
ax.legend(fontsize=12)
plt.title("Original GPS vs Map-Matched Path")
plt.show()
HMM + Viterbiアルゴリズム
先述のアルゴリズムは,シンプルで高速ではあるものの,局所的なマッチングであり,時系列遷移を考慮できてません.そのため,GPS誤差に弱いのです.
そこで,マップマッチングの精度を上げる手法として,**HMM(隠れマルコフモデル)**を用いたものがあります.
理論については上記の記事が参考になるかと思いますが,実装コードはまた次回の記事にします.
最後に
この記事が少しでも参考になれば,いいねやフォローをしていただけるととても力になります.