1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

openlr-dereferencer-pythonを実装してみた

Last updated at Posted at 2025-02-10

はじめに

この記事では、GitHubのOpenLR Dereferencer Pythonを実装し、任意の地図データベースに対してOpenLRのデコードさせる方法を解説します。できるだけ元のソースコードに手を加えない形で実装した例となります。

今回使用するGitHubコードはこちらです。
https://github.com/tomtom-international/openlr-dereferencer-python

OpenLRとは

OpenLRは、位置情報を表現するためのオープンな標準フォーマットです。主に、地図データやナビゲーションシステムで使用されることを目的としています。OpenLRは、特にGPSデータが不完全または不正確な場合でも、位置情報を効率的に伝達するための方法を提供します。これにより、異なるシステムやアプリケーション間でのデータの相互運用性が向上します。
OpenLRは、地図データに依存せずに位置情報を表現できるため、自動運転車やスマートフォンアプリなどのさまざまな用途に活用されています。

手順

プロジェクトの作成

ここではOpenLrDecoderProjectとします。

環境構築

仮想環境の作成
python -m venv [仮想環境名]

ここでは.venvとします。

python -m venv .venv
仮想環境をアクティベート
Windowsの場合
./venv/Scripts/activate
pipのアップグレード
python -m pip install --upgrade pip
openlr-dereferencerのインストール
pip install openlr-dereferencer
psycopg2のインストール
pip install psycopg2
pyprojのインストール
pip install pyproj

Map Database構成

本記事で想定するデータベース構成は下記のとおりとします。

データベース

Postgre SQL with PostGIS extension

データベース接続情報

ホスト
localhost
ポート
5432
ユーザー名
user1
パスワード
P@ssw0rd
データベース名
test_db
スキーマ名
local
構成テーブル

roadsテーブルとintersectionsテーブルに地図データが格納されている前提とします。

roadsテーブル
列名 Index 外部キー 備考
Id bigint Y プライマリーキー
FOW smallint Form of Way
FRC smallint Functional Road Class
flowdir smallint 1=both direction, 2=end -> start, 3=start -> end
from_int bigint Y intersections(id) 始点の交差点ID
to_int bigint Y intersections(id) 終点の交差点ID
len double precision セグメントの距離(m)
geom LineString(4326) Y WGS84 LineString geometry
intersectionsテーブル
列名 Index 外部キー 備考
Id bigint Y プライマリーキー
geom Point(4326) Y WGS84 Point geometry

以降の手順では、上記環境の前提で説明します。

デコーダーの実装

pythonファイルを生成します。ここではmyMapReader.pyとします。

MapReader, Line, Nodeのabstractの実装

具体的にはopenlr_dereferencer.mapsのMapReader, Line, Nodeを継承します。

myMapReader.pyの実装例
from __future__ import annotations
from openlr_dereferencer.maps import MapReader
from openlr_dereferencer.maps import Line as AbstractLine, Node as AbstractNode
import psycopg2 as pg
from psycopg2 import sql
from openlr import Coordinates, FOW, FRC
from shapely import wkb
from itertools import chain
from shapely.geometry import LineString, Point
from typing import Iterable, Hashable, Sequence, cast
from pyproj import Geod
from shapely.ops import nearest_points

GEOD = Geod(ellps="WGS84")

@MapReader.register
class MyMapReader():

    SCHEMA = 'local'
    TABLE_LINE = 'roads'
    TABLE_NODE = 'intersections'
    
    NODE_QUERY_SELECT = "select id,st_x(geom),st_y(geom)"
    NODE_QUERY = NODE_QUERY_SELECT + " from {schema}.{table}"
    LINE_QUERY_SELECT = "select id,fow,flowdir,frc,len,from_int,to_int,geom"
    REV_LINE_QUERY_SELECT = "select -id,fow,flowdir,frc,len,to_int as from_int,from_int as to_int,st_reverse(geom)"
    LINE_QUERY = LINE_QUERY_SELECT + " from {schema}.{table}"
    REV_LINE_QUERY = REV_LINE_QUERY_SELECT + " from {schema}.{table}"

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

        self.connection = pg.connect(host='127.0.0.1', port='5432', user='user1', password='P@ssw0rd', dbname='test_db')        
        self.get_linecount_query = sql.SQL("select count(1) from {schema}.{table}").format(schema=sql.Identifier(self.SCHEMA),table=sql.Identifier(self.TABLE_LINE))
        self.get_node_query = sql.SQL(self.NODE_QUERY + " where id=%s").format(schema=sql.Identifier(self.SCHEMA),table=sql.Identifier(self.TABLE_NODE))
        self.get_nodes_query = sql.SQL(self.NODE_QUERY).format(schema=sql.Identifier(self.SCHEMA),table=sql.Identifier(self.TABLE_NODE))
        self.get_nodecount_query = sql.SQL("select count(1) from {schema}.{table}").format(schema=sql.Identifier(self.SCHEMA),table=sql.Identifier(self.TABLE_NODE))
        self.get_lines_query = sql.SQL(self.LINE_QUERY + " where flowdir in (1,3) union " + self.REV_LINE_QUERY + " where flowdir in (1,2)").format(schema=sql.Identifier(self.SCHEMA),table=sql.Identifier(self.TABLE_LINE))        
        self.find_nodes_close_to_query = sql.SQL(self.NODE_QUERY + " where geom && st_buffer(ST_GeographyFromText('SRID=4326;POINT(%s %s)'), %s)::geometry").format(schema=sql.Identifier(self.SCHEMA),table=sql.Identifier(self.TABLE_NODE))
        self.find_lines_close_to_query = sql.SQL(f"""
                                                    with sq as ({self.LINE_QUERY} where geom && st_buffer(ST_GeographyFromText('SRID=4326;POINT(%s %s)'), %s)::geometry)
                                                        ({self.LINE_QUERY_SELECT} from sq where sq.flowdir in (1,3)) 
                                                        union 
                                                        ({self.REV_LINE_QUERY_SELECT} from sq where sq.flowdir in (1,2))
                                                """).format(schema=sql.Identifier(self.SCHEMA),
                                                            table=sql.Identifier(self.TABLE_LINE))
        self.incoming_lines_query = sql.SQL(self.LINE_QUERY + " where to_int = %s and flowdir in (1,3) union " + self.REV_LINE_QUERY + " where from_int = %s and flowdir in (1,2)").format(table=sql.Identifier(self.TABLE_LINE),schema=sql.Identifier(self.SCHEMA))
        self.outgoing_lines_query = sql.SQL(self.LINE_QUERY + " where from_int = %s and flowdir in (1,3) union " + self.REV_LINE_QUERY + " where to_int = %s and flowdir in (1,2)").format(table=sql.Identifier(self.TABLE_LINE),schema=sql.Identifier(self.SCHEMA))

    def get_line(self, line_id: Hashable) -> MyLine:
        return line_id
    
    def get_lines(self) -> Iterable[MyLine]:
        with self.connection.cursor() as cursor:
            cursor.execute(self.get_lines_query)
            for (line_id, fow, flowdir, frc, length, from_int, to_int, geom) in cursor:
                print(f"get_lines:{line_id}")
                ls = LineString(wkb.loads(geom, hex=True))
                l = MyLine(self, line_id, FOW(fow), FRC(frc), length, from_int, to_int, ls)
                yield l

    def get_linecount(self) -> int:
        with self.connection.cursor() as cursor:
            cursor.execute(self.get_linecount_query)
            res = cursor.fetchone()
            if res is None:
                raise Exception("Error retrieving line count from datastore")
            (count,) = res
            return count
    def get_node(self, node_id: Hashable) -> MyNode:
        with self.connection.cursor() as cursor:
            cursor.execute(self.get_node_query, (node_id,))
            res = cursor.fetchone()
            if res is None:
                raise Exception(f"Error retrieving node {node_id} from datastore")
            (node_id, lon, lat) = res
            n = MyNode(self, node_id, lon, lat)
            return n
    def get_nodes(self) -> Iterable[MyNode]:
        with self.connection.cursor() as cursor:
            cursor.execute(self.get_nodes_query)
            for (node_id, lon, lat) in cursor:
                print(f"get_node:{node_id}")
                n = MyNode(self, node_id, lon, lat)
                yield n
    def get_nodecount(self) -> int:
        with self.connection.cursor() as cursor:
            cursor.execute(self.get_nodecount_query)
            res = cursor.fetchone()
            if res is None:
                raise Exception(f"Error retrieving node count from datastore")
            (count,) = res
            return count
    def find_nodes_close_to(self, coord: Coordinates, dist: float) -> Iterable[MyNode]:
        lon, lat = coord.lon, coord.lat
        with self.connection.cursor() as cursor:
            cursor.execute(self.find_nodes_close_to_query, (lon, lat, dist))
            for (node_id, lon, lat) in cursor:
                print(f"find_nodes_close_to:{node_id}")
                n = MyNode(self, node_id, lon, lat)
                yield n

    def find_lines_close_to(self, coord: Coordinates, dist: float) -> Iterable[MyLine]:
        lon, lat = coord.lon, coord.lat
        with self.connection.cursor() as cursor:
            cursor.execute(self.find_lines_close_to_query, (lon, lat, dist))
            for (line_id, fow, _, frc, length, from_int, to_int, geom) in cursor:
                print(f"find_lines_close_to:{line_id}")
                ls = LineString(wkb.loads(geom, hex=True))
                l = MyLine(self, line_id, FOW(fow), FRC(frc), length, from_int, to_int, ls)
                yield l

class MyNode(AbstractNode):

    def __init__(self, map_reader: MyMapReader, node_id: int, lon: float, lat: float):
        self.lon = lon
        self.lat = lat
        self.map_reader = map_reader
        self.id = node_id

    @property
    def node_id(self) -> Hashable:
        return self.id

    @property
    def coordinates(self) -> Coordinates:
        return Coordinates(lon=self.lon, lat=self.lat)

    def outgoing_lines(self) -> Iterable[MyLine]:
        with self.map_reader.connection.cursor() as cursor:
            cursor.execute(self.map_reader.outgoing_lines_query, (self.node_id, self.node_id))
            for (line_id, fow, flowdir, frc, length, from_int, to_int, geom) in cursor:
                print(f"outgoingLines:{line_id}")
                ls = LineString(wkb.loads(geom, hex=True))
                l = MyLine(self.map_reader, line_id, FOW(fow), FRC(frc), length, self, to_int, ls)
                yield l
        
    def incoming_lines(self) -> Iterable[MyLine]:
        with self.map_reader.connection.cursor() as cursor:
            cursor.execute(self.map_reader.incoming_lines_query,(self.node_id, self.node_id))
            for (line_id, fow, flowdir, frc, length, from_int, to_int, geom) in cursor:
                print(f"incomingLines:{line_id}")
                ls = LineString(wkb.loads(geom, hex=True))
                l = MyLine(self.map_reader, line_id, FOW(fow), FRC(frc), length, from_int, self, ls)
                yield l

    def connected_lines(self) -> Iterable[MyLine]:
        return chain(self.incoming_lines(), self.outgoing_lines())

class MyLine(AbstractLine):
    
    def __init__(self, map_reader: MyMapReader, line_id: int, fow: FOW, frc: FRC, length: float, from_int: int | MyNode, to_int: int | MyNode, geometry: LineString):
        
        self.id = line_id
        self.map_reader = map_reader
        self._fow: FOW = fow
        self._frc: FRC = frc
        self._length: float = length
        self.from_int: int | MyNode = from_int
        self.to_int: int | MyNode = to_int
        self._geometry: LineString = geometry


    @property
    def line_id(self) -> Hashable:
        return self.id

    @property
    def start_node(self) -> "MyNode":
        if type(self.from_int) == MyNode:
            return(self.from_int)
        else:
            self.from_int = self.map_reader.get_node(self.from_int)
            return(self.from_int)

    @property
    def end_node(self) -> "MyNode":
        if type(self.to_int) == MyNode:
            return(cast(MyNode, self.to_int))
        else:
            self.to_int = self.map_reader.get_node(cast(int, self.to_int))
            return(self.to_int)

    @property
    def frc(self) -> FRC:
        return self._frc

    @property
    def length(self) -> float:
        return self._length
    
    @property
    def fow(self) -> FOW:
        return self._fow

    @property
    def geometry(self) -> LineString:
        return self._geometry

    def coordinates(self) -> Sequence[Coordinates]:
        return [Coordinates(*point) for point in self.geometry.coords]


    def distance_to(self, coord: Coordinates) -> int:
        return GEOD.geometry_length(LineString(nearest_points(self._geometry, Point(coord.lon, coord.lat))))


コード修正箇所

MyMapReaderを環境に合わせて修正してください。

  • MyMapReaderクラス直下の変数群
  • __init__関数の初期化変数群

動作確認

実行用のpythonファイルを生成します。ここではmyTest.pyとします。

myTest.pyの実装例
import myMapReader
from openlr import binary_decode
from typing import cast
from openlr_dereferencer import decode

if __name__ == "__main__":
    
    rdr = myMapReader.MyMapReader()
    ref = binary_decode('C1ZqqRHT/DPTPf3A88czAQ==')
    res = decode(reference=ref, reader=rdr)
    
    for r in res.lines:
        tmp = cast(myMapReader.MyLine, r)
        print(tmp.id, tmp.frc, tmp.fow, tmp.length)
実行結果

結果が出力されることを確認します。

まとめ

本記事では、GitHubのPython向けOpenLR Dereferencerを実装してみました。
実装されているSQLは環境に合わせて変更する必要があります。また、エラー時のハンドリングやデコードパラメータについてもいつか記事にします。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?