0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

DatabricksとOSRMを用いて日本国内の移動ルート生成を行う

Last updated at Posted at 2022-09-26

こちらで紹介したルート生成のソリューションアクセラレータを日本の地図に対応させました。

ノートブックはこちらです。

バックグラウンドなどは上の記事を参照ください。こちらでは手順のみを説明します。

OSRMサーバーのセットアップ

シングルノードのクラスター(メモリーは256GB程度を推奨)を作成し、ノートブック01_ Setup OSRM Serverを実行していきます。

Step 1: サーバーソフトウェアの構築

こちらは従来の手順と変わりません。

依存関係のインストール

shell
%sh -e

sudo apt -qq install -y build-essential git cmake pkg-config \
libbz2-dev libxml2-dev libzip-dev libboost-all-dev \
lua5.2 liblua5.2-dev libtbb-dev

OSRMバックエンドサーバーリポジトリのクローン

shell
%sh -e 

# make directory for repo clone
mkdir -p /srv/git
cd /srv/git

# clone the osrm backend server repo
rm -rf osrm-backend
git clone --depth 1 -b v5.26.0 https://github.com/Project-OSRM/osrm-backend

OSRMバックエンドサーバーの構築

shell
%sh -e

cd /srv/git/osrm-backend

mkdir -p build
cd build

cmake ..
cmake --build .
sudo cmake --build . --target install

Step 2: 地図ファイルの準備

地図ファイルのダウンロード

アジアのファイル: Geofabrik Download Serverから日本のファイルをダウンロードしていきます。

shell
%sh -e 

# create clean folder to house downloaded map file
rm -rf /srv/git/osrm-backend/maps/japan
mkdir -p /srv/git/osrm-backend/maps/japan

# download map file to appropriate folder
cd /srv/git/osrm-backend/maps/japan
wget --quiet https://download.geofabrik.de/asia/japan-latest.osm.pbf

# list folder contents
ls -l .

地図ファイルのコンテンツの抽出

shell
%sh -e

# setup location to house log files
mkdir -p /srv/git/osrm-backend/logs

# move to folder housing map file
cd /srv/git/osrm-backend/maps/japan

# extract map file contents
/srv/git/osrm-backend/build/osrm-extract japan-latest.osm.pbf -p /srv/git/osrm-backend/profiles/car.lua > /srv/git/osrm-backend/logs/extract_log.txt

# review output from extract command
#echo '----------------------------------------'
#tail /srv/git/osrm-backend/logs/extract_log.txt
shell
%sh -e
tail /srv/git/osrm-backend/logs/extract_log.txt

Screen Shot 2022-09-26 at 15.29.23.png

地図ファイル抽出の検証

shell
%sh -e ls -l /srv/git/osrm-backend/maps/japan

Screen Shot 2022-09-26 at 15.30.02.png

抽出地図ファイルのパーティション

shell
%sh -e 

cd /srv/git/osrm-backend/maps/japan

/srv/git/osrm-backend/build/osrm-partition japan-latest.osrm

抽出地図ファイルのカスタマイズ

shell
%sh -e 

cd /srv/git/osrm-backend/maps/japan

/srv/git/osrm-backend/build/osrm-customize japan-latest.osrm

Step 3: OSRMアセットの永続化

shell
%sh -e

rm -rf /dbfs/FileStore/osrm-backend-japan
cp -L -R /srv/git/osrm-backend /dbfs/FileStore/osrm-backend-japan

Step 4: initスクリプトの作成

Python
# make folder to house init script
dbutils.fs.mkdirs('dbfs:/databricks/scripts')

# write init script
dbutils.fs.put(
  '/databricks/scripts/osrm-backend-japan.sh',
  '''
#!/bin/bash

if [[ $DB_IS_DRIVER != "TRUE" ]]; then  

  echo "installing osrm backend server dependencies"
  sudo apt -qq install -y build-essential git cmake pkg-config libbz2-dev libxml2-dev libzip-dev libboost-all-dev lua5.2 liblua5.2-dev libtbb-dev
  
  echo "launching osrm backend server"
  /dbfs/FileStore/osrm-backend-japan/build/osrm-routed --algorithm=MLD /dbfs/FileStore/osrm-backend-japan/maps/japan/japan-latest.osrm &
  
  echo "wait until osrm backend server becomes responsive"
  res=-1
  i=1

  # while no response
  while [ $res -ne 0 ]
  do

    # test connectivity
    curl --silent "http://127.0.0.1:5000/route/v1/driving/139.6503,35.6762;139.6380,35.4437"
    res=$?
    
    # increment the loop counter
    if [ $i -gt 40 ] 
    then 
      break
    fi
    i=$(( $i + 1 ))

    # if no response, sleep
    if [ $res -ne 0 ]
    then
      sleep 30
    fi

  done  
  
fi
''', 
  True
  )

# show script content
print(
  dbutils.fs.head('dbfs:/databricks/scripts/osrm-backend-japan.sh')
  )

ルート生成

ノートブック02_ Generate Routesを実行していきます。

クラスターのinitスクリプトとして、上で作成したdbfs:/databricks/scripts/osrm-backend-japan.shを指定します。
Screen Shot 2022-09-26 at 15.33.04.png

必要なライブラリのインストール

%pip install tabulate databricks-mosaic

必要なライブラリのインポート

Python
import requests

import pandas as pd
import numpy as np
import json

import itertools

import subprocess

import pyspark.sql.functions as fn
from pyspark.sql.types import *

# mosaicのインポートと設定
import mosaic as mos
spark.conf.set('spark.databricks.labs.mosaic.geometry.api', 'ESRI')
spark.conf.set('spark.databricks.labs.mosaic.index.system', 'H3')
mos.enable_mosaic(spark, dbutils)

from tabulate import tabulate

Step 1: それぞれのワーカーで稼働するサーバーの検証

ワーカーノードのIPアドレスの取得

Python
# それぞれのワーカーのエグゼキューターに分散させるRDDの生成
myRDD = sc.parallelize(range(sc.defaultParallelism))

# IPアドレスの一覧を取得
ip_addresses = set( # 出力の重複を排除するためにsetに変換
  sc.runJob(
    myRDD, 
    lambda _: [subprocess.run(['hostname','-I'], capture_output=True).stdout.decode('utf-8').strip()] # それぞれのエグゼキューターで hostname -I を実行
    )
  )

ip_addresses

Screen Shot 2022-09-26 at 15.35.09.png

ルーティングレスポンスに対するそれぞれのワーカーのテスト

各ノードがOSRMサーバになっています。ある意味変態的で私は好きです。

Python
responses = []

# それぞれのワーカーのIPアドレスに対するループ
for ip in ip_addresses:
  print(ip)
  
  # OSRMバックエンドサーバーからレスポンスを取得
  resp = requests.get(f'http://{ip}:5000/route/v1/driving/139.6503,35.6762;139.6380,35.4437').text
  responses += [(ip, resp)]
  
# それぞれのワーカーで生成されたレスポンスを表示
display(
  pd.DataFrame(responses, columns=['ip','response'])
  )

以下の様に各ノードのOSRMサーバからレスポンスが返ってきます。
Screen Shot 2022-09-26 at 15.36.20.png

Step 2: ルート生成データの取得

日本のルート生成をデモンストレーションするために、駅の緯度経度を使用します。

東京都の駅-路線の最新リストデータ 鉄道 | オープンポータル

GUIからCSVをアップロードしてcsv_eki_13_csvというテーブルを作成します。CSVは以下の場所に置いてあります。

データベースを作成します。名前は適宜変更してください。

SQL
%sql
CREATE DATABASE IF NOT EXISTS takaakiyayoi_osrm;

UIを使うなどしてテーブルを作成します。

SQL
%sql
SELECT * FROM hive_metastore.takaakiyayoi_osrm.csv_eki_13_csv;

Screen Shot 2022-09-26 at 15.38.23.png

出発地点と行き先をランダムに組み合わせます。

SQL
%sql
DROP TABLE IF EXISTS hive_metastore.takaakiyayoi_osrm.eki_movement;
SQL
%sql CREATE TEMPORARY VIEW from_table AS (
  SELECT
    station_name AS from_station_name,
    station_lat AS from_station_lat,
    station_lon AS from_station_lon,
    floor(RAND() * 700) AS random_id,
    floor(RAND() * 50) AS random_table_id -- あとでドライビングテーブルを作成する際に使用します
  FROM
    hive_metastore.takaakiyayoi_osrm.csv_eki_13_csv
)
SQL
%sql 
SELECT * FROM from_table;

Screen Shot 2022-09-26 at 15.39.21.png

SQL
%sql
CREATE TABLE hive_metastore.takaakiyayoi_osrm.eki_movement AS 

SELECT
  from_station_name,
  from_station_lat,
  from_station_lon,
  to_station_name,
  to_station_lat,
  to_station_lon,
  random_table_id
FROM
  from_table A
  INNER JOIN (
    SELECT
      station_name AS to_station_name,
      station_lat AS to_station_lat,
      station_lon AS to_station_lon,
      floor(RAND() * 700) AS random_id
    FROM
      hive_metastore.takaakiyayoi_osrm.csv_eki_13_csv
    ORDER BY
      RAND()
  ) B ON A.random_id = B.random_id
Python
eki_movement_df = spark.read.table("hive_metastore.takaakiyayoi_osrm.eki_movement")
display(eki_movement_df)

Screen Shot 2022-09-26 at 15.39.58.png

Step 3: 移動ルートの取得

ルートを取得すための関数の定義

Python
@fn.pandas_udf(StringType())
def get_osrm_route(
  start_longitudes: pd.Series, 
  start_latitudes:pd.Series, 
  end_longitudes: pd.Series, 
  end_latitudes: pd.Series
  ) -> pd.Series:
   
  # データフレームを構成するために入力を組み合わせます
  df = pd.concat([start_longitudes, start_latitudes, end_longitudes, end_latitudes], axis=1)
  df.columns = ['start_lon','start_lat','end_lon','end_lat']

  # 特定の行に対するルートを取得するための内部関数
  def _route(row):
    r = requests.get(
      f'http://127.0.0.1:5000/route/v1/driving/{row.start_lon},{row.start_lat};{row.end_lon},{row.end_lat}?alternatives=true&steps=false&geometries=geojson&overview=simplified&annotations=false'
    )
    return r.text
  
  # 行ごとにルーティング関数を適用
  return df.apply(_route, axis=1)

ルートの取得

Python
display(
  eki_movement_df
    .withColumn(
      'osrm_route',
      get_osrm_route('from_station_lon','from_station_lat','to_station_lon','to_station_lat')
      )
    .selectExpr(
      'from_station_lon',
      'from_station_lat',
      'to_station_lon',
      'to_station_lat',
      'osrm_route',
      'from_station_name',
      'to_station_name'
      )
  )

Screen Shot 2022-09-26 at 15.41.00.png

ルートのJSONを複雑なデータ型表現に変換

Python
# JSONドキュメントのスキーマ
response_schema = '''
  STRUCT<
    code: STRING, 
    routes: 
      ARRAY<
        STRUCT<
          distance: DOUBLE, 
          duration: DOUBLE, 
          geometry: STRUCT<
            coordinates: ARRAY<ARRAY<DOUBLE>>, 
            type: STRING
            >, 
          legs: ARRAY<
            STRUCT<
              distance: DOUBLE, 
              duration: DOUBLE, 
              steps: ARRAY<STRING>, 
              summary: STRING, 
              weight: DOUBLE
              >
            >, 
          weight: DOUBLE, 
          weight_name: STRING
          >
        >,
      waypoints: ARRAY<
        STRUCT<
          distance: DOUBLE, 
          hint: STRING, 
          location: ARRAY<DOUBLE>, 
          name: STRING
          >
        >
      >
  '''

# ルートの取得、JSONをstructに変換
eki_routes = (
  eki_movement_df
  .withColumn(
    'osrm_route',
    get_osrm_route('from_station_lon','from_station_lat','to_station_lon','to_station_lat')
    )
  .withColumn(
    'osrm_route',
    fn.from_json('osrm_route',response_schema)
    )
  .selectExpr(
    'osrm_route',
    'from_station_name',
    'to_station_name'
    )
  )


display(
  eki_routes
  )

Screen Shot 2022-09-26 at 15.41.32.png

ルートから距離と時間を取得

Python
display(
   eki_routes
    .withColumn('route', fn.explode('osrm_route.routes'))
    .withColumn('route_meters', fn.col('route.distance'))
    .withColumn('route_seconds', fn.col('route.duration'))
    .selectExpr(
      'from_station_name',
      'to_station_name',
      'route_meters',
      'route_seconds'
      )
  )

Screen Shot 2022-09-26 at 15.42.05.png

ルートジオメトリの取得

Python
eki_geometry = (
  eki_routes
    .withColumn('route', fn.explode('osrm_route.routes')) # ルートの配列をexplode
    .withColumn('geojson', fn.to_json(fn.col('route.geometry')))
    .withColumn('geom', mos.st_aswkb(mos.st_geomfromgeojson('geojson')))
    .drop('osrm_route')
  )

display(eki_geometry)

Screen Shot 2022-09-26 at 15.42.39.png

ルートの可視化

日本の地図で見ると実感が湧きますね。

%%mosaic_kepler
eki_geometry geom "geometry" 1000

Screen Shot 2022-09-25 at 17.12.40.png
Screen Shot 2022-09-25 at 17.13.13.png

ドライブ時間テーブルの取得

もちろんですが、我々はOSRMバックエンドサーバーからルーティングデータを取得することに限定されません。我々のゴールがポイント間の移動の最適化であるのならば、移動時間のテーブルを作成する必要があるかもしれません。このためには、OSRMバックエンドサーバーのtable methodを呼び出す関数を記述することができます。

Python
@fn.pandas_udf(StringType())
def get_driving_table(
  points_arrays: pd.Series
  ) -> pd.Series:

  # 配列に含まれるポイントのテーブルを取得する内部関数
  def _table(points_array):
    
    points = ';'.join(points_array)
    
    r = requests.get(
      f'http://127.0.0.1:5000/table/v1/driving/{points}'
    )
    
    return r.text
  
  # テーブル関数を行ごとに適用
  return points_arrays.apply(_table)

ドライビング時間テーブルの取得

Python
# ドライビングテーブルのスキーマ
response_schema = StructType([
  StructField('code',StringType()),
  StructField('destinations',ArrayType(
    StructType([
      StructField('hint',StringType()),
      StructField('distance',FloatType()),
      StructField('name',StringType()),
      StructField('location',ArrayType(FloatType()))
      ])
     )
   ),
  StructField('durations',ArrayType(ArrayType(FloatType()))),
  StructField('sources',ArrayType(
    StructType([
      StructField('hint',StringType()),
      StructField('distance',FloatType()),
      StructField('name',StringType()),
      StructField('location',ArrayType(FloatType()))
      ])
    ))
  ])

上の関数を呼び出すためにはポイントのコレクションを提供する必要があります。使っているデータはこれに適した良い方法を提供していませんので、上で準備したrandom_table_idでグルーピングされる地点を移動することにします。

Python
# ドライビングテーブルを取得しマトリクスを抽出
driving_tables = (
  eki_movement_df
  .withColumn('start_point', fn.expr("concat(from_station_lon,',',from_station_lat)"))
  .groupBy('random_table_id')
    .agg(fn.collect_set('start_point').alias('start_points'))
  .filter(fn.expr('size(start_points) > 1')) # テーブルには1つ以上のポイントが必要です
  .withColumn('driving_table', get_driving_table('start_points'))
  .withColumn('driving_table', fn.from_json('driving_table', response_schema))
  .withColumn('driving_table_durations', fn.col('driving_table.durations'))
  )  

display(driving_tables)

Screen Shot 2022-09-26 at 15.45.16.png

単一のドライビングテーブルの表示

Python
# 単一のテーブルの生成
driving_table = driving_tables.limit(1).collect()[0]['driving_table_durations']

# ドライビングテーブルの表示
print(
    tabulate(
      np.array(driving_table),
      tablefmt='grid'
      )
    )

Screen Shot 2022-09-26 at 15.45.49.png

こちらの例では東京都の駅のみを使用しましたが、データを差し替えれば任意の地点間でのルートを自在に生成することができます。しかも、OSRMサーバは容易に並列化できるので、大量のルート生成も高速に行うことができます。

Databricks 無料トライアル

Databricks 無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?