はじめに
この記事では、GitHubのOpenLR Dereferencer Pythonを実装し、任意の地図データベースに対してOpenLRのデコードさせる方法を解説します。できるだけ元のソースコードに手を加えない形で実装した例となります。
今回使用するGitHubコードはこちらです。
https://github.com/tomtom-international/openlr-dereferencer-python
OpenLRとは
OpenLRは、位置情報を表現するためのオープンな標準フォーマットです。主に、地図データやナビゲーションシステムで使用されることを目的としています。OpenLRは、特にGPSデータが不完全または不正確な場合でも、位置情報を効率的に伝達するための方法を提供します。これにより、異なるシステムやアプリケーション間でのデータの相互運用性が向上します。
OpenLRは、地図データに依存せずに位置情報を表現できるため、自動運転車やスマートフォンアプリなどのさまざまな用途に活用されています。
手順
プロジェクトの作成
ここではOpenLrDecoderProject
とします。
環境構築
- 仮想環境の作成
-
python -m venv [仮想環境名]
ここでは
.venv
とします。python -m venv .venv
- 仮想環境をアクティベート
-
Windowsの場合
./venv/Scripts/activate
- pipのアップグレード
-
python -m pip install --upgrade pip
- openlr-dereferencerのインストール
-
pip install openlr-dereferencer
- psycopg2のインストール
-
pip install psycopg2
- pyprojのインストール
-
pip install pyproj
Map Database構成
本記事で想定するデータベース構成は下記のとおりとします。
データベース
Postgre SQL with PostGIS extension
データベース接続情報
- ホスト
- localhost
- ポート
- 5432
- ユーザー名
- user1
- パスワード
- P@ssw0rd
- データベース名
- test_db
- スキーマ名
- local
構成テーブル
roadsテーブルとintersectionsテーブルに地図データが格納されている前提とします。
列名 | 型 | Index | 外部キー | 備考 |
---|---|---|---|---|
Id | bigint | Y | プライマリーキー | |
FOW | smallint | Form of Way | ||
FRC | smallint | Functional Road Class | ||
flowdir | smallint | 1=both direction, 2=end -> start, 3=start -> end | ||
from_int | bigint | Y | intersections(id) | 始点の交差点ID |
to_int | bigint | Y | intersections(id) | 終点の交差点ID |
len | double precision | セグメントの距離(m) | ||
geom | LineString(4326) | Y | WGS84 LineString geometry |
列名 | 型 | Index | 外部キー | 備考 |
---|---|---|---|---|
Id | bigint | Y | プライマリーキー | |
geom | Point(4326) | Y | WGS84 Point geometry |
以降の手順では、上記環境の前提で説明します。
デコーダーの実装
pythonファイルを生成します。ここではmyMapReader.py
とします。
-
MapReader
,Line
,Node
のabstractの実装 -
具体的にはopenlr_dereferencer.mapsの
MapReader
,Line
,Node
を継承します。 -
myMapReader.pyの実装例
from __future__ import annotations from openlr_dereferencer.maps import MapReader from openlr_dereferencer.maps import Line as AbstractLine, Node as AbstractNode import psycopg2 as pg from psycopg2 import sql from openlr import Coordinates, FOW, FRC from shapely import wkb from itertools import chain from shapely.geometry import LineString, Point from typing import Iterable, Hashable, Sequence, cast from pyproj import Geod from shapely.ops import nearest_points GEOD = Geod(ellps="WGS84") @MapReader.register class MyMapReader(): SCHEMA = 'local' TABLE_LINE = 'roads' TABLE_NODE = 'intersections' NODE_QUERY_SELECT = "select id,st_x(geom),st_y(geom)" NODE_QUERY = NODE_QUERY_SELECT + " from {schema}.{table}" LINE_QUERY_SELECT = "select id,fow,flowdir,frc,len,from_int,to_int,geom" REV_LINE_QUERY_SELECT = "select -id,fow,flowdir,frc,len,to_int as from_int,from_int as to_int,st_reverse(geom)" LINE_QUERY = LINE_QUERY_SELECT + " from {schema}.{table}" REV_LINE_QUERY = REV_LINE_QUERY_SELECT + " from {schema}.{table}" def __init__(self, **kwargs): super().__init__(**kwargs) self.connection = pg.connect(host='127.0.0.1', port='5432', user='user1', password='P@ssw0rd', dbname='test_db') self.get_linecount_query = sql.SQL("select count(1) from {schema}.{table}").format(schema=sql.Identifier(self.SCHEMA),table=sql.Identifier(self.TABLE_LINE)) self.get_node_query = sql.SQL(self.NODE_QUERY + " where id=%s").format(schema=sql.Identifier(self.SCHEMA),table=sql.Identifier(self.TABLE_NODE)) self.get_nodes_query = sql.SQL(self.NODE_QUERY).format(schema=sql.Identifier(self.SCHEMA),table=sql.Identifier(self.TABLE_NODE)) self.get_nodecount_query = sql.SQL("select count(1) from {schema}.{table}").format(schema=sql.Identifier(self.SCHEMA),table=sql.Identifier(self.TABLE_NODE)) self.get_lines_query = sql.SQL(self.LINE_QUERY + " where flowdir in (1,3) union " + self.REV_LINE_QUERY + " where flowdir in (1,2)").format(schema=sql.Identifier(self.SCHEMA),table=sql.Identifier(self.TABLE_LINE)) self.find_nodes_close_to_query = sql.SQL(self.NODE_QUERY + " where geom && st_buffer(ST_GeographyFromText('SRID=4326;POINT(%s %s)'), %s)::geometry").format(schema=sql.Identifier(self.SCHEMA),table=sql.Identifier(self.TABLE_NODE)) self.find_lines_close_to_query = sql.SQL(f""" with sq as ({self.LINE_QUERY} where geom && st_buffer(ST_GeographyFromText('SRID=4326;POINT(%s %s)'), %s)::geometry) ({self.LINE_QUERY_SELECT} from sq where sq.flowdir in (1,3)) union ({self.REV_LINE_QUERY_SELECT} from sq where sq.flowdir in (1,2)) """).format(schema=sql.Identifier(self.SCHEMA), table=sql.Identifier(self.TABLE_LINE)) self.incoming_lines_query = sql.SQL(self.LINE_QUERY + " where to_int = %s and flowdir in (1,3) union " + self.REV_LINE_QUERY + " where from_int = %s and flowdir in (1,2)").format(table=sql.Identifier(self.TABLE_LINE),schema=sql.Identifier(self.SCHEMA)) self.outgoing_lines_query = sql.SQL(self.LINE_QUERY + " where from_int = %s and flowdir in (1,3) union " + self.REV_LINE_QUERY + " where to_int = %s and flowdir in (1,2)").format(table=sql.Identifier(self.TABLE_LINE),schema=sql.Identifier(self.SCHEMA)) def get_line(self, line_id: Hashable) -> MyLine: return line_id def get_lines(self) -> Iterable[MyLine]: with self.connection.cursor() as cursor: cursor.execute(self.get_lines_query) for (line_id, fow, flowdir, frc, length, from_int, to_int, geom) in cursor: print(f"get_lines:{line_id}") ls = LineString(wkb.loads(geom, hex=True)) l = MyLine(self, line_id, FOW(fow), FRC(frc), length, from_int, to_int, ls) yield l def get_linecount(self) -> int: with self.connection.cursor() as cursor: cursor.execute(self.get_linecount_query) res = cursor.fetchone() if res is None: raise Exception("Error retrieving line count from datastore") (count,) = res return count def get_node(self, node_id: Hashable) -> MyNode: with self.connection.cursor() as cursor: cursor.execute(self.get_node_query, (node_id,)) res = cursor.fetchone() if res is None: raise Exception(f"Error retrieving node {node_id} from datastore") (node_id, lon, lat) = res n = MyNode(self, node_id, lon, lat) return n def get_nodes(self) -> Iterable[MyNode]: with self.connection.cursor() as cursor: cursor.execute(self.get_nodes_query) for (node_id, lon, lat) in cursor: print(f"get_node:{node_id}") n = MyNode(self, node_id, lon, lat) yield n def get_nodecount(self) -> int: with self.connection.cursor() as cursor: cursor.execute(self.get_nodecount_query) res = cursor.fetchone() if res is None: raise Exception(f"Error retrieving node count from datastore") (count,) = res return count def find_nodes_close_to(self, coord: Coordinates, dist: float) -> Iterable[MyNode]: lon, lat = coord.lon, coord.lat with self.connection.cursor() as cursor: cursor.execute(self.find_nodes_close_to_query, (lon, lat, dist)) for (node_id, lon, lat) in cursor: print(f"find_nodes_close_to:{node_id}") n = MyNode(self, node_id, lon, lat) yield n def find_lines_close_to(self, coord: Coordinates, dist: float) -> Iterable[MyLine]: lon, lat = coord.lon, coord.lat with self.connection.cursor() as cursor: cursor.execute(self.find_lines_close_to_query, (lon, lat, dist)) for (line_id, fow, _, frc, length, from_int, to_int, geom) in cursor: print(f"find_lines_close_to:{line_id}") ls = LineString(wkb.loads(geom, hex=True)) l = MyLine(self, line_id, FOW(fow), FRC(frc), length, from_int, to_int, ls) yield l class MyNode(AbstractNode): def __init__(self, map_reader: MyMapReader, node_id: int, lon: float, lat: float): self.lon = lon self.lat = lat self.map_reader = map_reader self.id = node_id @property def node_id(self) -> Hashable: return self.id @property def coordinates(self) -> Coordinates: return Coordinates(lon=self.lon, lat=self.lat) def outgoing_lines(self) -> Iterable[MyLine]: with self.map_reader.connection.cursor() as cursor: cursor.execute(self.map_reader.outgoing_lines_query, (self.node_id, self.node_id)) for (line_id, fow, flowdir, frc, length, from_int, to_int, geom) in cursor: print(f"outgoingLines:{line_id}") ls = LineString(wkb.loads(geom, hex=True)) l = MyLine(self.map_reader, line_id, FOW(fow), FRC(frc), length, self, to_int, ls) yield l def incoming_lines(self) -> Iterable[MyLine]: with self.map_reader.connection.cursor() as cursor: cursor.execute(self.map_reader.incoming_lines_query,(self.node_id, self.node_id)) for (line_id, fow, flowdir, frc, length, from_int, to_int, geom) in cursor: print(f"incomingLines:{line_id}") ls = LineString(wkb.loads(geom, hex=True)) l = MyLine(self.map_reader, line_id, FOW(fow), FRC(frc), length, from_int, self, ls) yield l def connected_lines(self) -> Iterable[MyLine]: return chain(self.incoming_lines(), self.outgoing_lines()) class MyLine(AbstractLine): def __init__(self, map_reader: MyMapReader, line_id: int, fow: FOW, frc: FRC, length: float, from_int: int | MyNode, to_int: int | MyNode, geometry: LineString): self.id = line_id self.map_reader = map_reader self._fow: FOW = fow self._frc: FRC = frc self._length: float = length self.from_int: int | MyNode = from_int self.to_int: int | MyNode = to_int self._geometry: LineString = geometry @property def line_id(self) -> Hashable: return self.id @property def start_node(self) -> "MyNode": if type(self.from_int) == MyNode: return(self.from_int) else: self.from_int = self.map_reader.get_node(self.from_int) return(self.from_int) @property def end_node(self) -> "MyNode": if type(self.to_int) == MyNode: return(cast(MyNode, self.to_int)) else: self.to_int = self.map_reader.get_node(cast(int, self.to_int)) return(self.to_int) @property def frc(self) -> FRC: return self._frc @property def length(self) -> float: return self._length @property def fow(self) -> FOW: return self._fow @property def geometry(self) -> LineString: return self._geometry def coordinates(self) -> Sequence[Coordinates]: return [Coordinates(*point) for point in self.geometry.coords] def distance_to(self, coord: Coordinates) -> int: return GEOD.geometry_length(LineString(nearest_points(self._geometry, Point(coord.lon, coord.lat))))
- コード修正箇所
-
MyMapReader
を環境に合わせて修正してください。 -
- MyMapReaderクラス直下の変数群
- __init__関数の初期化変数群
動作確認
実行用のpythonファイルを生成します。ここではmyTest.py
とします。
import myMapReader
from openlr import binary_decode
from typing import cast
from openlr_dereferencer import decode
if __name__ == "__main__":
rdr = myMapReader.MyMapReader()
ref = binary_decode('C1ZqqRHT/DPTPf3A88czAQ==')
res = decode(reference=ref, reader=rdr)
for r in res.lines:
tmp = cast(myMapReader.MyLine, r)
print(tmp.id, tmp.frc, tmp.fow, tmp.length)
実行結果
結果が出力されることを確認します。
まとめ
本記事では、GitHubのPython向けOpenLR Dereferencerを実装してみました。
実装されているSQLは環境に合わせて変更する必要があります。また、エラー時のハンドリングやデコードパラメータについてもいつか記事にします。