概要
これは Python, LuigiでPipeline管理の基本を学ぶの続編です。
今回はpandas gbqを使って(参考)、bigqueryを操作するpipelineを作成しようと思います。
ただ、単純に操作するだけでは物足りないので、最近勉強している空間統計学に絡めてVariogramを計算する事を考えます。
環境
macOS Mojave (バージョン 10.14.6)
Python バージョン 3.7.4
pandas-gbq 0.13.2
やる事
Variogramとは、空間統計学における「距離と観測値の相関」です。
(詳しくはGDS:バリオグラムとはや空間統計学への招待:3.1 バリオグラム(Variogram)の推定等を見てください)
今回は前回で学んだ内容を踏まえ、仕事で頻繁に使うBigQueryと連携するためにpandas-gbqを使っていこうと思います。
(BigQueryとPython(pandas)を連携させてみたを参考にしています)
PipeLine構造
Task名 | 概要 |
---|---|
TaskA | BQのテーブルからSQLで一次加工したデータをローカルにcsv保存 |
TaskB | TaskAで保存したcsvにテーブル名をつけてBQへアップロード |
TaskC | TaskBの結果を元に2地点間のvariogramを計算してcsv保存 |
となります。
データ
COVID19の感染者を日別に、検査都道府県別に記録した公開資料(7/16まで分)からVariogramを求めるための緯度経度情報等をを取り出して事前にBQに格納しています。
csvでの見た目はこんな感じです。
コード
それではコードを見ていきましょう
フォルダ構造は作業フォルダ下に出力したcsvを入れるoutputフォルダとTarget格納用のmanagementフォルダがあるだけのシンプル構造です。
各タスクをモジュール化して流用しやすくしました。
.
└output
└management
PipeLine
TaskA->TaskB->TaskCの順に実行するだけのシンプルコードです。
import os
import luigi
import taskA
import taskB
import taskC
path_manage = './management/'
path_output = './output/'
class TaskA(luigi.Task):
"""
TaskA class
"""
def requires(self):
return [] # In case of no pre-running task, an empty list.
def output(self):
return luigi.LocalTarget(path_manage + 'TaskA.txt') # Where the managiment file is.
# return luigi.LocalTarget(path_manage + 'TaskA') # Where the managiment file is.
def run(self):
taskA.task_a()
# If the line does not exist no managed file created
with self.output().open('w') as out_file:
out_file.write("")
class TaskB(luigi.Task):
"""
TaskB class
"""
def requires(self):
return [TaskA(), ] # TaskB depends on TaskA.
def output(self):
return luigi.LocalTarget(path_manage + 'TaskB.txt') # Where the managiment file is.
def run(self):
taskB.task_b()
# If the line does not exist no managed file created
with self.output().open('w') as out_file:
out_file.write("")
class TaskC(luigi.Task):
"""
TaskB class
"""
def requires(self):
return [TaskB(), ] # TaskB depends on TaskA.
def output(self):
return luigi.LocalTarget(path_manage + 'TaskC.txt') # Where the managiment file is.
def run(self):
taskC.task_c()
# If the line does not exist no managed file created
with self.output().open('w') as out_file:
out_file.write("")
if __name__ == '__main__':
luigi.run()
TaskA
簡単な前処理をして結果をcsvに出力します。
import pandas as pd
import itertools
project_id = '<your_pjt_id>'
dataset_id = '<your_dataset_id>'
table_id = '<your_table_id>'
start_dt = '2020-02-01'
end_dt = '2020-08-01'
def task_a():
query = '''
#standardSQL
WITH tmp AS -- 簡単な条件抽出
(
SELECT Hospital_Pref
, lon
, lat
, yyyymmdd
FROM {dataset}.{table}
WHERE Hospital_Pref != 'Unknown'
AND ( lon > 0 OR lat > 0 )
)
SELECT Hospital_Pref -- 抽出用の集計
, avg(lon) as lon
, avg(lat) as lat
, count(*) as CNT
FROM tmp
WHERE '{start_date}' <= yyyymmdd and yyyymmdd < '{end_date}'
GROUP BY Hospital_Pref
'''
# strベースのクエリと、project_idが必要
df = pd.read_gbq(query.format(dataset=dataset_id
, table=table_id
, start_date = start_dt
, end_date = end_dt
)
, project_id)
df.to_csv('./output/{p0}to{p1}.csv'.format(p0=start_dt, p1=end_dt), index=False)
# main関数
if __name__ == "__main__":
task_a()
TaskB
TaskAで出力したcsvをBQにアップロードします。
pandas-gbqを事前にインストールしないといけませんが、
インストールしてあればimportは不要です。
import pandas as pd
import itertools
project_id = '<your_pjt_id>'
dataset_id = '<your_dataset_id>'
start_dt = '2020-02-01'
end_dt = '2020-08-01'
# strベースのクエリと、project_idが必要
def task_b(): # 書き込む
# 読み込む
df = pd.read_csv('./output/{p0}to{p1}.csv'.format(p0=start_dt, p1=end_dt))
# BQに書き出す
df.to_gbq('{dataset_id}.variogram{p0}to{p1}'.format(
dataset_id=dataset_id
, p0=start_dt.replace(u'-',u'')
, p1=end_dt.replace(u'-', u'')
)
, project_id)
# main関数
if __name__ == "__main__":
task_b()
TaskC
TaskBで作成した中間テーブルをCROSS JOINして、BQ組み込みの地理計算関数 ST_DISTANCE で距離計算、また、二点間のVariogram(差の二乗平均)を求めています。
import pandas as pd
import itertools
project_id = '<your_pjt_id>'
dataset_id = '<your_dataset_id>'
table_id = 'variogram'
start_dt = '2020-02-01'.replace(u'-',u'')
end_dt = '2020-08-01'.replace(u'-',u'')
def task_c():
query = '''
#standardSQL
WITH tmp AS -- CROSS JOIN用1
(SELECT * FROM {dataset}.{table}{start_date}to{end_date}),
tmp2 AS -- CROSS JOIN用2
(SELECT * FROM {dataset}.{table}{start_date}to{end_date}),
tmp3 AS( -- CROSS JOIN で全組み合わせを採用
SELECT t1.Hospital_Pref AS t1_Pref
, t1.lon AS t1_lon
, t1.lat AS t1_lat
,t1.CNT AS t1_CNT
,t2.Hospital_Pref AS t2_Pref
, t2.lon AS t2_lon
, t2.lat AS t2_lat
,t2.CNT AS t2_CNT
, ST_DISTANCE(ST_GEOGPOINT(t1.lon, t1.lat)
, ST_GEOGPOINT(t2.lon, t2.lat)
) / 1000 AS dist_km -- 距離をkm変換
, POWER(t1.CNT - t2.CNT, 2)/2 AS variogram
FROM tmp AS t1
CROSS JOIN tmp2 AS t2
)
SELECT * FROM tmp3
'''
# strベースのクエリと、project_idが必要
df = pd.read_gbq(query.format(dataset=dataset_id
, table=table_id
, start_date=start_dt
, end_date=end_dt
)
, project_id)
df.to_csv('./output/variogram_result{p0}to{p1}.csv'.format(
p0=start_dt, p1=end_dt
), index = False)
# main関数
if __name__ == "__main__":
task_c()
出力結果
都道府県の緯度経度から求めた相互の距離をkm換算し、
そこからVariogramを求めました。
csvの出力はこんな感じです。
一番右がVariogramの値。右から2番目がkm距離になります。
次回予告
空間統計学の話としては、上記を応用して距離を離散化し、経験VariogramやKrigingを求めていければなと思っています。
PipeLineの方の話では2点。
まず、TaskBが冗長なので SQL に create_table を入れることを考えています。
ただし、この場合 pd.read_gbq() の出力結果がEmptyになり Alartが表示されるので、そこの処理をどうしようか考え中です。
もう一つは、典型的なLuigiのPipelineでは、Targetファイルを作成するために以下のコードが必須の様なのですが、
# If the line does not exist no managed file created
with self.output().open('w') as out_file:
out_file.write("")
今回みたいにモジュール化した自作関数を使い、PipeLine構造をシンプルにしたい時(まぁ、実務上はだいたいこれ)はこの項目が削れないかと悩ましくなりました。
そうしたらエムスリーさんのBlogで機械学習で便利なluigiフレームワークの紹介の「他のタスクを実行するだけのタスク」によると、このあたりが省略できそうな感じになってます。
次回は上記の内容を踏まえてブラッシュアップした上で、
空間統計学の内容である経験バリオグラムとクリギング参考まで手を出していけたらと思います。
以上