Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationEventAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
0
Help us understand the problem. What are the problem?

More than 1 year has passed since last update.

LuigiとBQで計算するVariogram

概要

これは Python, LuigiでPipeline管理の基本を学ぶの続編です。

今回はpandas gbqを使って(参考)、bigqueryを操作するpipelineを作成しようと思います。

ただ、単純に操作するだけでは物足りないので、最近勉強している空間統計学に絡めてVariogramを計算する事を考えます。

環境

macOS Mojave (バージョン 10.14.6)
Python バージョン 3.7.4
pandas-gbq 0.13.2

やる事

Variogramとは、空間統計学における「距離と観測値の相関」です。
(詳しくはGDS:バリオグラムとは空間統計学への招待:3.1 バリオグラム(Variogram)の推定等を見てください)

今回は前回で学んだ内容を踏まえ、仕事で頻繁に使うBigQueryと連携するためにpandas-gbqを使っていこうと思います。
(BigQueryとPython(pandas)を連携させてみたを参考にしています)

PipeLine構造

Task名 概要
TaskA BQのテーブルからSQLで一次加工したデータをローカルにcsv保存
TaskB TaskAで保存したcsvにテーブル名をつけてBQへアップロード
TaskC TaskBの結果を元に2地点間のvariogramを計算してcsv保存

となります。

データ

COVID19の感染者を日別に、検査都道府県別に記録した公開資料(7/16まで分)からVariogramを求めるための緯度経度情報等をを取り出して事前にBQに格納しています。

csvでの見た目はこんな感じです。

キャプチャ.PNG

コード

それではコードを見ていきましょう

フォルダ構造は作業フォルダ下に出力したcsvを入れるoutputフォルダとTarget格納用のmanagementフォルダがあるだけのシンプル構造です。
各タスクをモジュール化して流用しやすくしました。

.
└output
└management

PipeLine

TaskA->TaskB->TaskCの順に実行するだけのシンプルコードです。

import os
import luigi

import taskA
import taskB
import taskC

path_manage = './management/'
path_output = './output/'

class TaskA(luigi.Task):
    """
    TaskA class
    """
    def requires(self):
        return []         # In case of no pre-running task, an empty list.

    def output(self):
        return luigi.LocalTarget(path_manage + 'TaskA.txt')  # Where the managiment file is.
        # return luigi.LocalTarget(path_manage + 'TaskA')  # Where the managiment file is.

    def run(self):
        taskA.task_a()

      # If the line does not exist no managed file created
        with self.output().open('w') as out_file:
            out_file.write("")

class TaskB(luigi.Task):
    """
    TaskB class
    """
    def requires(self):
        return [TaskA(), ]  # TaskB depends on TaskA.

    def output(self):
        return luigi.LocalTarget(path_manage + 'TaskB.txt')  # Where the managiment file is.

    def run(self):
        taskB.task_b()

      # If the line does not exist no managed file created
        with self.output().open('w') as out_file:
            out_file.write("")

class TaskC(luigi.Task):
    """
    TaskB class
    """
    def requires(self):
        return [TaskB(), ]  # TaskB depends on TaskA.

    def output(self):
        return luigi.LocalTarget(path_manage + 'TaskC.txt')  # Where the managiment file is.

    def run(self):
        taskC.task_c()

      # If the line does not exist no managed file created
        with self.output().open('w') as out_file:
            out_file.write("")

if __name__ == '__main__':
    luigi.run()

TaskA

簡単な前処理をして結果をcsvに出力します。

import pandas as pd
import itertools

project_id = '<your_pjt_id>'
dataset_id = '<your_dataset_id>'
table_id = '<your_table_id>'

start_dt = '2020-02-01'
end_dt = '2020-08-01'

def task_a():

  query = '''

  #standardSQL

  WITH tmp AS  -- 簡単な条件抽出
   (
     SELECT Hospital_Pref
               , lon
               , lat
               , yyyymmdd
        FROM {dataset}.{table}
     WHERE Hospital_Pref != 'Unknown'
          AND ( lon > 0 OR lat > 0 )
  )

  SELECT Hospital_Pref  -- 抽出用の集計
             , avg(lon) as lon
             , avg(lat) as lat
             , count(*) as CNT
    FROM tmp
   WHERE   '{start_date}' <= yyyymmdd and yyyymmdd < '{end_date}'
  GROUP BY Hospital_Pref
  '''


  # strベースのクエリと、project_idが必要
  df = pd.read_gbq(query.format(dataset=dataset_id
    , table=table_id
    , start_date = start_dt
    , end_date = end_dt
    )
  , project_id)

  df.to_csv('./output/{p0}to{p1}.csv'.format(p0=start_dt, p1=end_dt), index=False)

# main関数
if __name__ == "__main__":
  task_a()

TaskB

TaskAで出力したcsvをBQにアップロードします。
pandas-gbqを事前にインストールしないといけませんが、
インストールしてあればimportは不要です。

import pandas as pd
import itertools

project_id = '<your_pjt_id>'
dataset_id = '<your_dataset_id>'

start_dt = '2020-02-01'
end_dt = '2020-08-01'

# strベースのクエリと、project_idが必要
def task_b():  # 書き込む
  # 読み込む
  df = pd.read_csv('./output/{p0}to{p1}.csv'.format(p0=start_dt, p1=end_dt))

  # BQに書き出す
  df.to_gbq('{dataset_id}.variogram{p0}to{p1}'.format(
              dataset_id=dataset_id
            , p0=start_dt.replace(u'-',u'')
            , p1=end_dt.replace(u'-', u'')
              )
            , project_id)

# main関数
if __name__ == "__main__":
  task_b()

TaskC

TaskBで作成した中間テーブルをCROSS JOINして、BQ組み込みの地理計算関数 ST_DISTANCE で距離計算、また、二点間のVariogram(差の二乗平均)を求めています。

import pandas as pd
import itertools

project_id = '<your_pjt_id>'
dataset_id = '<your_dataset_id>'
table_id = 'variogram'
start_dt = '2020-02-01'.replace(u'-',u'')
end_dt = '2020-08-01'.replace(u'-',u'')

def task_c():
  query = '''

  #standardSQL
  WITH tmp AS -- CROSS JOIN用1
   (SELECT * FROM {dataset}.{table}{start_date}to{end_date}),
          tmp2 AS   -- CROSS JOIN用2
  (SELECT * FROM {dataset}.{table}{start_date}to{end_date}),

  tmp3 AS(  -- CROSS JOIN で全組み合わせを採用
   SELECT t1.Hospital_Pref AS t1_Pref
              , t1.lon AS t1_lon
              , t1.lat AS t1_lat
               ,t1.CNT AS t1_CNT
               ,t2.Hospital_Pref AS t2_Pref
              , t2.lon AS t2_lon
              , t2.lat AS t2_lat
              ,t2.CNT AS t2_CNT
              , ST_DISTANCE(ST_GEOGPOINT(t1.lon, t1.lat)
                                    , ST_GEOGPOINT(t2.lon, t2.lat)
                                    ) / 1000 AS dist_km  -- 距離をkm変換
             , POWER(t1.CNT - t2.CNT, 2)/2 AS variogram
     FROM tmp AS t1
     CROSS JOIN tmp2 AS t2
   )

   SELECT * FROM tmp3
   '''


  # strベースのクエリと、project_idが必要

  df = pd.read_gbq(query.format(dataset=dataset_id
    , table=table_id
    , start_date=start_dt
    , end_date=end_dt
    )
  , project_id)

  df.to_csv('./output/variogram_result{p0}to{p1}.csv'.format(
    p0=start_dt, p1=end_dt
    ), index = False)

# main関数
if __name__ == "__main__":
  task_c()

出力結果

都道府県の緯度経度から求めた相互の距離をkm換算し、
そこからVariogramを求めました。

csvの出力はこんな感じです。

キャプチャ2.PNG

一番右がVariogramの値。右から2番目がkm距離になります。

次回予告

空間統計学の話としては、上記を応用して距離を離散化し、経験VariogramやKrigingを求めていければなと思っています。

PipeLineの方の話では2点。

まず、TaskBが冗長なので SQL に create_table を入れることを考えています。
ただし、この場合 pd.read_gbq() の出力結果がEmptyになり Alartが表示されるので、そこの処理をどうしようか考え中です。

もう一つは、典型的なLuigiのPipelineでは、Targetファイルを作成するために以下のコードが必須の様なのですが、

# If the line does not exist no managed file created
    with self.output().open('w') as out_file:
        out_file.write("")

今回みたいにモジュール化した自作関数を使い、PipeLine構造をシンプルにしたい時(まぁ、実務上はだいたいこれ)はこの項目が削れないかと悩ましくなりました。

そうしたらエムスリーさんのBlogで機械学習で便利なluigiフレームワークの紹介の「他のタスクを実行するだけのタスク」によると、このあたりが省略できそうな感じになってます。

次回は上記の内容を踏まえてブラッシュアップした上で、
空間統計学の内容である経験バリオグラムとクリギング参考まで手を出していけたらと思います。

以上

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
0
Help us understand the problem. What are the problem?