こんにちは。
「OSM データを NetworkX へ取り込んで最短経路計算」の続編として、今回は、osm.pbf 形式のデータファイルを読込みました(pyosmium 利用)。これは、より大規模のOSMデータの上で、最短経路計算しようとする意図です。
動作例は日本の高速道路利用(=motorway + motorway_link)で Kagoshima, Aomori 間の最短経路です(図例内の黄色線)。osmupdate、osmium-tool も利用しました。
$ wget https://download.geofabrik.de/asia/japan.poly
$ wget https://download.geofabrik.de/asia/japan-latest.osm.pbf
$ osmupdate --hour --day -B=japan.poly japan-latest.osm.pbf japan-latest-updated.osm.pbf
$ mv -f japan-latest-updated.osm.pbf japan-latest.osm.pbf
$ osmium tags-filter -o japan-motorway.osm.pbf japan-latest.osm.pbf w/highway=motorway,motorway_link
$ ./osmpbf2nx.py japan-motorway.osm.pbf > motorway.html
osmpbf2nx.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import math
import osmium as o
import networkx as nx
import geoleaflet
import geojson
RE = 6378137.0 # GRS80
FE = 1/298.257223563 # IS-GPS
E2 = FE * (2 - FE)
DEGREE = math.pi / 180
ONEWAY = 1
BIDIRECTIONAL = 0
class OsmHandler(o.SimpleHandler):
def __init__(self):
super(OsmHandler, self).__init__()
self.graph = None
self.nodes = {}
self.ways = {ONEWAY: {}, BIDIRECTIONAL: {}}
def node(self, n):
self.nodes[n.id] = [n.location.lon, n.location.lat]
def way(self, w):
highway = w.tags.get('highway')
if highway in ['motorway', 'motorway_link'] and w.tags.get('moped') != 'yes':
nodes = [n.ref for n in w.nodes]
oneway = wayDirection(w.tags.get('oneway'))
if oneway == -ONEWAY:
oneway = ONEWAY
nodes = nodes[::-1]
self.ways[oneway][w.id] = {'nodes': nodes, 'highway': highway}
def split_way(self):
node_hist = {}
for ways in self.ways.values():
for way in ways.values():
for node in way['nodes']:
node_hist.setdefault(node, 0)
node_hist[node] += 1
for oneway, ways in self.ways.items():
for id, way in ways.items():
self.ways[oneway][id]['nodes'] = split_nodes(way['nodes'], node_hist)
def geojson_features(self, route):
feas = []
for ways in self.ways.values():
nodes = [concat_nodes(way['nodes']) for way in ways.values()]
paths = [[self.nodes[n] for n in ns] for ns in nodes]
feas.extend([(geojson.LineString(p), {"color":"#40a040", "weight": 1.5, "opacity": 0.7}) for p in paths])
path = [self.nodes[n] for n in route]
feas.append((geojson.LineString(path), {"color":"#ffe000", "weight": 8, "opacity": 0.5}))
feas = [geojson.Feature(geometry=g, properties=p) for g, p in feas]
return geojson.FeatureCollection(feas)
def distance(self, u, v):
p, q = self.nodes[u], self.nodes[v]
coslat = math.cos((p[1]+q[1])/2*DEGREE)
w2 = 1 / (1 - E2 * (1 - coslat * coslat))
dx = (p[0]-q[0]) * coslat
dy = (p[1]-q[1]) * w2 * (1 - E2)
return math.sqrt((dx*dx+dy*dy)*w2) * RE * DEGREE
def routing(self, source, target):
route = nx.astar_path(self.graph, source, target, heuristic=self.distance, weight='length')
route = [self.graph.edges[route[i], route[i+1]]['nodes'] for i in range(len(route)-1)]
return concat_nodes(route)
def createGraph(self):
self.graph = nx.DiGraph()
for oneway, ways in self.ways.items():
for way in ways.values():
for nodes in way['nodes']:
length = path_length(nodes, self.distance)
self.graph.add_edge(nodes[0], nodes[-1], nodes=nodes, length=length)
if oneway == BIDIRECTIONAL:
self.graph.add_edge(nodes[-1], nodes[0], nodes=nodes[::-1], length=length)
def path_length(path, dist):
return sum([dist(path[i], path[i+1]) for i in range(len(path)-1)])
def wayDirection(oneway):
if oneway in [None, 'no', 'false', '0', 'reversible']:
return BIDIRECTIONAL
if oneway in ['reverse', '-1']:
return -ONEWAY
else:
return ONEWAY
def split_nodes(arr, dividers):
i = 0
arrs = []
for j in range(1, len(arr)-1):
if dividers[arr[j]] >= 2:
arrs.append(arr[i:j+1])
i = j
arrs.append(arr[i:len(arr)])
return arrs
def concat_nodes(nodes_list):
nodes = [nodes_list[0][0]]
for ns in nodes_list:
nodes.extend(ns[1:])
return nodes
def OSM(file):
o = OsmHandler()
o.apply_file(file, locations=False)
o.split_way()
o.createGraph()
return o
def htmlLeaflet(features):
MAXZOOM = 19
CD_STYLE = ('light_nolabels')
CD_URL = 'http://{s}.basemaps.cartocdn.com/{style}/{z}/{x}/{y}.png'
CD_ATTR = '© <a href="http://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors, © <a href="http://cartodb.com/attributions">CartoDB</a>'
html = geoleaflet.html(features, service=CD_URL, attribution=CD_ATTR, style=CD_STYLE, maxzoom=MAXZOOM)
return html
if __name__ == '__main__':
import sys
osmfile = sys.args[1]
source, target = 1928135161, 3749182296 # Kagoshima, Aomori
o = OSM(osmfile)
route = o.routing(source, target)
feas = o.geojson_features(route)
print(htmlLeaflet(feas))