LoginSignup
4
1

More than 3 years have passed since last update.

大量のGTFS Realtimeのファイルに対するETL処理(Python編)

Last updated at Posted at 2020-08-08

偉大な先人

既に伊藤先生が、GTFSファイルをマージしてPostgreSQL+PostGISに投入する方法にて、ツールを活用してPostgreSQLにデータを投入する方法を掲載しております。

今回のモチベーション

日本のGTFSデータが、GCSに保存されているとの情報を入手したので、そのデータで遊ぶための準備です。

今回やること

Google Clound Platform のファイルサービスである、Google Cloud Strage 保存されている protocol buffers 形式の大量のファイルを、一気にダウンロードして一纏めのデータに変換します.
スクリーンショット 2020-08-08 21.28.41.png

STEP1 : gsutilを用いてGCSからデータをダウンロードします
STEP2 : pythonを用いてファイルから構造化データに変換します。 変換対象は、Data Frame形式とCSVの2種類

実装

環境について

今回のコードはJupyter labなどに貼り付けて利用することを想定しています。
単体でツールとしても使えませんし、ライブラリとしても利用できません。
実装した環境はMacです。

STEP1 gsutilを用いたデータコピー

GCPにはpython用のツールもありますが、アカウント設定等が煩雑な為、コマンドラインツールであるgsutilを用いて、フォルダの中にあるファイルをまとめてローカルにダウンロードします。

dowload.sh
gsutil cp -r gs://BACKET_NAME/realtime/unobus.co.jp/vehicle_position/2020/20200801/ ~/dev/unobus/

解説

  • gsutil cp はUNIXのcpコマンドと似ています。
  • -r 再帰的に処理するオプション
  • gs://BACKET_NAME/realtime/un.... コピー元です。今回はダミーの文字列
  • ~/dev/unobus/ コピー先です

一般的に、gsutil -m cp とするとマルチスレッドで動作し高速化できます。しかし、今回のバケットのアクセス権限の問題なのか上手く動作しませんでした。

STEP2 ファイルの読み込みと構造化

データフレームに変換

pq2df.py
from google.transit import gtfs_realtime_pb2
import pandas as pd
import numpy as np
import os
import datetime
import time
path = '/Users/USER_NAME/dev/unobus/20200801'
files = os.listdir(path)
feed = gtfs_realtime_pb2.FeedMessage()

start = time.time()
i = 0;
temp_dict = {}
for file in files:
    with open(path+'/'+file, 'rb') as f:
        data = f.read()
        feed.ParseFromString(data)
        for entity in feed.entity:
            temp_dict[i] = [
                  entity.id,                         #車両ID
                  entity.vehicle.vehicle.id,         #車両ナンバー
                  entity.vehicle.trip.trip_id,       #路線番号?
                  entity.vehicle.timestamp,          #車両時刻
                  entity.vehicle.position.longitude, #車両緯度
                  entity.vehicle.position.latitude,  #車両経度
                  entity.vehicle.occupancy_status #混雑度
            ]
            i +=1
df = pd.DataFrame.from_dict(temp_dict, orient='index',columns=['id' , 'vehicle_id', 'trip_id','vehicle_timestamp','longitude','latitude','occupancy_status'])
elapsed_time = time.time() - start
print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")

CSVへ出力

pq2csv.py
from google.transit import gtfs_realtime_pb2
import pandas as pd
import numpy as np
import os
import datetime
import time
path = '/Users/USER_NAME/dev/unobus/20200801'
csvfilename = 'unobus_20200801.csv'
files = os.listdir(path)
feed = gtfs_realtime_pb2.FeedMessage()
with open(csvfilename, 'a') as csv :
    start = time.time()
    for file in files:
        with open(path+'/'+file, 'rb') as f:
            data = f.read()
            feed.ParseFromString(data)
            for entity in feed.entity:
                print(
                    entity.id,                         #車両ID
                    entity.vehicle.vehicle.id,         #車両ナンバー
                    entity.vehicle.trip.trip_id,       #路線番号?
                    entity.vehicle.timestamp,          #車両時刻
                    entity.vehicle.position.longitude, #車両緯度
                    entity.vehicle.position.latitude,  #車両経度
                    entity.vehicle.occupancy_status,   #混雑度
                    sep=',',file=csv)
    elapsed_time = time.time() - start
    print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")

失敗作 データフレームに変換(超低速)

pq2df.pyと比較すると、220倍も遅いコードです。
df.appendがその原因です。

pq2df_VeryLowSpeed.py
from google.transit import gtfs_realtime_pb2
import pandas as pd
import numpy as np
import os
import datetime
import time
path = '/Users/USER_NAME/dev/unobus/20200801'
files = os.listdir(path)
feed = gtfs_realtime_pb2.FeedMessage()

df = pd.DataFrame(columns=['id' , 'vehicle_id', 'trip_id','vehicle_timestamp','longitude','latitude','occupancy_status'])

start = time.time()
for file in files:
    with open(path+'/'+file, 'rb') as f:
        data = f.read()
        feed.ParseFromString(data)
        for entity in feed.entity:
            tmp_se = pd.Series( [
                  entity.id,                         #車両ID
                  entity.vehicle.vehicle.id,         #車両ナンバー
                  entity.vehicle.trip.trip_id,       #路線番号?
                  entity.vehicle.timestamp,          #車両時刻
                  entity.vehicle.position.longitude, #車両緯度
                  entity.vehicle.position.latitude,  #車両経度
                  entity.vehicle.occupancy_status #混雑度
            ], index=df.columns )
            df = df.append( tmp_se, ignore_index=True ) #ここがだめ!!

elapsed_time = time.time() - start
print ("elapsed_time:{0}".format(elapsed_time) + "[sec]")

終わりに

クラウド上に保存されているGTFSの過去データを、ダウンロードから構造化データへの変換までを実施しました。
DataFrameでデータを追記する処理は書き方によっては非常に処理が遅くなり、苦労しました。

微力ではありますが、バスデータ活用に貢献できれば幸いです。

4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1