偉大な先人
既に伊藤先生が、GTFSファイルをマージしてPostgreSQL+PostGISに投入する方法にて、ツールを活用してPostgreSQLにデータを投入する方法を掲載しております。
今回のモチベーション
日本のGTFSデータが、GCSに保存されているとの情報を入手したので、そのデータで遊ぶための準備です。
今回やること
Google Clound Platform のファイルサービスである、Google Cloud Strage 保存されている protocol buffers 形式の大量のファイルを、一気にダウンロードして一纏めのデータに変換します.
STEP1 : gsutilを用いてGCSからデータをダウンロードします
STEP2 : pythonを用いてファイルから構造化データに変換します。 変換対象は、Data Frame形式とCSVの2種類
実装
環境について
今回のコードはJupyter labなどに貼り付けて利用することを想定しています。
単体でツールとしても使えませんし、ライブラリとしても利用できません。
実装した環境はMacです。
STEP1 gsutilを用いたデータコピー
GCPにはpython用のツールもありますが、アカウント設定等が煩雑な為、コマンドラインツールであるgsutilを用いて、フォルダの中にあるファイルをまとめてローカルにダウンロードします。
gsutil cp -r gs://BACKET_NAME/realtime/unobus.co.jp/vehicle_position/2020/20200801/ ~/dev/unobus/
解説
-
gsutil cp
はUNIXのcpコマンドと似ています。 -
-r
再帰的に処理するオプション -
gs://BACKET_NAME/realtime/un....
コピー元です。今回はダミーの文字列 -
~/dev/unobus/
コピー先です
一般的に、gsutil -m cp
とするとマルチスレッドで動作し高速化できます。しかし、今回のバケットのアクセス権限の問題なのか上手く動作しませんでした。
STEP2 ファイルの読み込みと構造化
データフレームに変換
from google.transit import gtfs_realtime_pb2
import pandas as pd
import numpy as np
import os
import datetime
import time
path = '/Users/USER_NAME/dev/unobus/20200801'
files = os.listdir(path)
feed = gtfs_realtime_pb2.FeedMessage()
start = time.time()
i = 0;
temp_dict = {}
for file in files:
with open(path+'/'+file, 'rb') as f:
data = f.read()
feed.ParseFromString(data)
for entity in feed.entity:
temp_dict[i] = [
entity.id, #車両ID
entity.vehicle.vehicle.id, #車両ナンバー
entity.vehicle.trip.trip_id, #路線番号?
entity.vehicle.timestamp, #車両時刻
entity.vehicle.position.longitude, #車両緯度
entity.vehicle.position.latitude, #車両経度
entity.vehicle.occupancy_status #混雑度
]
i +=1
df = pd.DataFrame.from_dict(temp_dict, orient='index',columns=['id' , 'vehicle_id', 'trip_id','vehicle_timestamp','longitude','latitude','occupancy_status'])
elapsed_time = time.time() - start
print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")
CSVへ出力
from google.transit import gtfs_realtime_pb2
import pandas as pd
import numpy as np
import os
import datetime
import time
path = '/Users/USER_NAME/dev/unobus/20200801'
csvfilename = 'unobus_20200801.csv'
files = os.listdir(path)
feed = gtfs_realtime_pb2.FeedMessage()
with open(csvfilename, 'a') as csv :
start = time.time()
for file in files:
with open(path+'/'+file, 'rb') as f:
data = f.read()
feed.ParseFromString(data)
for entity in feed.entity:
print(
entity.id, #車両ID
entity.vehicle.vehicle.id, #車両ナンバー
entity.vehicle.trip.trip_id, #路線番号?
entity.vehicle.timestamp, #車両時刻
entity.vehicle.position.longitude, #車両緯度
entity.vehicle.position.latitude, #車両経度
entity.vehicle.occupancy_status, #混雑度
sep=',',file=csv)
elapsed_time = time.time() - start
print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")
失敗作 データフレームに変換(超低速)
pq2df.pyと比較すると、220倍も遅いコードです。
df.append
がその原因です。
from google.transit import gtfs_realtime_pb2
import pandas as pd
import numpy as np
import os
import datetime
import time
path = '/Users/USER_NAME/dev/unobus/20200801'
files = os.listdir(path)
feed = gtfs_realtime_pb2.FeedMessage()
df = pd.DataFrame(columns=['id' , 'vehicle_id', 'trip_id','vehicle_timestamp','longitude','latitude','occupancy_status'])
start = time.time()
for file in files:
with open(path+'/'+file, 'rb') as f:
data = f.read()
feed.ParseFromString(data)
for entity in feed.entity:
tmp_se = pd.Series( [
entity.id, #車両ID
entity.vehicle.vehicle.id, #車両ナンバー
entity.vehicle.trip.trip_id, #路線番号?
entity.vehicle.timestamp, #車両時刻
entity.vehicle.position.longitude, #車両緯度
entity.vehicle.position.latitude, #車両経度
entity.vehicle.occupancy_status #混雑度
], index=df.columns )
df = df.append( tmp_se, ignore_index=True ) #ここがだめ!!
elapsed_time = time.time() - start
print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")
終わりに
クラウド上に保存されているGTFSの過去データを、ダウンロードから構造化データへの変換までを実施しました。
DataFrameでデータを追記する処理は書き方によっては非常に処理が遅くなり、苦労しました。
微力ではありますが、バスデータ活用に貢献できれば幸いです。