LoginSignup
8
6

More than 1 year has passed since last update.

OSM データを NetworkX へ取り込んで最短経路計算(osm.pbf 形式)

Last updated at Posted at 2019-06-25

こんにちは。
OSM データを NetworkX へ取り込んで最短経路計算」の続編として、今回は、osm.pbf 形式のデータファイルを読込みました(pyosmium 利用)。これは、より大規模のOSMデータの上で、最短経路計算しようとする意図です。

動作例は日本の高速道路利用(=motorway + motorway_link)で Kagoshima, Aomori 間の最短経路です(図例内の黄色線)。osmupdateosmium-tool も利用しました。
motorway.jpg

$ wget https://download.geofabrik.de/asia/japan.poly
$ wget https://download.geofabrik.de/asia/japan-latest.osm.pbf
$ osmupdate --hour --day -B=japan.poly japan-latest.osm.pbf japan-latest-updated.osm.pbf
$ mv -f japan-latest-updated.osm.pbf japan-latest.osm.pbf
$ osmium tags-filter -o japan-motorway.osm.pbf japan-latest.osm.pbf w/highway=motorway,motorway_link
$ ./osmpbf2nx.py japan-motorway.osm.pbf > motorway.html
osmpbf2nx.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import math
import osmium as o
import networkx as nx
import geoleaflet
import geojson

RE = 6378137.0  # GRS80
FE = 1/298.257223563  # IS-GPS
E2 = FE * (2 - FE)
DEGREE = math.pi / 180
ONEWAY = 1
BIDIRECTIONAL = 0

class OsmHandler(o.SimpleHandler):
    def __init__(self):
        super(OsmHandler, self).__init__()
        self.graph = None
        self.nodes = {}
        self.ways = {ONEWAY: {}, BIDIRECTIONAL: {}}

    def node(self, n):
        self.nodes[n.id] = [n.location.lon, n.location.lat]

    def way(self, w):
        highway = w.tags.get('highway')
        if highway in ['motorway', 'motorway_link'] and w.tags.get('moped') != 'yes':
            nodes = [n.ref for n in w.nodes]
            oneway = wayDirection(w.tags.get('oneway'))
            if oneway == -ONEWAY:
                oneway = ONEWAY
                nodes = nodes[::-1]
            self.ways[oneway][w.id] = {'nodes': nodes, 'highway': highway}

    def split_way(self):
        node_hist = {}
        for ways in self.ways.values():
            for way in ways.values():
                for node in way['nodes']:
                    node_hist.setdefault(node, 0)
                    node_hist[node] += 1
        for oneway, ways in self.ways.items():
            for id, way in ways.items():
                self.ways[oneway][id]['nodes'] = split_nodes(way['nodes'], node_hist)

    def geojson_features(self, route):
        feas = []
        for ways in self.ways.values():
            nodes = [concat_nodes(way['nodes']) for way in ways.values()]
            paths = [[self.nodes[n] for n in ns] for ns in nodes]
            feas.extend([(geojson.LineString(p), {"color":"#40a040", "weight": 1.5, "opacity": 0.7}) for p in paths])
        path = [self.nodes[n] for n in route]
        feas.append((geojson.LineString(path), {"color":"#ffe000", "weight": 8, "opacity": 0.5}))
        feas = [geojson.Feature(geometry=g, properties=p) for g, p in feas]
        return geojson.FeatureCollection(feas)

    def distance(self, u, v):
        p, q = self.nodes[u], self.nodes[v]
        coslat = math.cos((p[1]+q[1])/2*DEGREE)
        w2 = 1 / (1 - E2 * (1 - coslat * coslat))
        dx = (p[0]-q[0]) * coslat
        dy = (p[1]-q[1]) * w2 * (1 - E2)
        return math.sqrt((dx*dx+dy*dy)*w2) * RE * DEGREE

    def routing(self, source, target):
        route = nx.astar_path(self.graph, source, target, heuristic=self.distance, weight='length')
        route = [self.graph.edges[route[i], route[i+1]]['nodes'] for i in range(len(route)-1)]
        return concat_nodes(route)

    def createGraph(self):
        self.graph = nx.DiGraph()
        for oneway, ways in self.ways.items():
            for way in ways.values():
                for nodes in way['nodes']:
                    length = path_length(nodes, self.distance)
                    self.graph.add_edge(nodes[0], nodes[-1], nodes=nodes, length=length)
                    if oneway == BIDIRECTIONAL:
                        self.graph.add_edge(nodes[-1], nodes[0], nodes=nodes[::-1], length=length)

def path_length(path, dist):
    return sum([dist(path[i], path[i+1]) for i in range(len(path)-1)])

def wayDirection(oneway):
    if oneway in [None, 'no', 'false', '0', 'reversible']:
        return BIDIRECTIONAL
    if oneway in ['reverse', '-1']:
        return -ONEWAY
    else:
        return ONEWAY

def split_nodes(arr, dividers):
    i = 0
    arrs = []
    for j in range(1, len(arr)-1):
        if dividers[arr[j]] >= 2:
            arrs.append(arr[i:j+1])
            i = j
    arrs.append(arr[i:len(arr)])
    return arrs

def concat_nodes(nodes_list):
    nodes = [nodes_list[0][0]]
    for ns in nodes_list:
        nodes.extend(ns[1:])
    return nodes

def OSM(file):
    o = OsmHandler()
    o.apply_file(file, locations=False)
    o.split_way()
    o.createGraph()
    return o

def htmlLeaflet(features):
    MAXZOOM = 19
    CD_STYLE = ('light_nolabels')
    CD_URL = 'http://{s}.basemaps.cartocdn.com/{style}/{z}/{x}/{y}.png'
    CD_ATTR = '&copy; <a href="http://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors, &copy; <a href="http://cartodb.com/attributions">CartoDB</a>'
    html = geoleaflet.html(features, service=CD_URL, attribution=CD_ATTR, style=CD_STYLE, maxzoom=MAXZOOM)
    return html

if __name__ == '__main__':
    import sys
    osmfile = sys.args[1]
    source, target = 1928135161, 3749182296  # Kagoshima, Aomori
    o = OSM(osmfile)
    route = o.routing(source, target)
    feas = o.geojson_features(route)
    print(htmlLeaflet(feas))
8
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
6