5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Kubeflow PipelinesでBigQueryにクエリを投げてその結果を保存する方法と注意点

Posted at

はじめに

Kubeflow PipelinesからBigQueryにクエリを投げ、クエリ結果を以下の3パターンで保存する方法をまとめます。

1. CSVファイル
2. GCS
3. BigQuery

併せて実装上の注意点も思いついたものを書いていきます。

環境

import sys
sys.version
"""
'3.7.7 (default, May  6 2020, 04:59:01) \n[Clang 4.0.1 (tags/RELEASE_401/final)]'
"""

import kfp
kfp.__version__
"""
'1.0.0'
"""

2021年1月現在Kubeflow PipelinesのPython SDKであるkfpの最新バージョンは1.3.0ですが、筆者の実行環境(AI Platform Pipelines)にインストールされているのが1.0.0だったため、このバージョンを利用しています。

ベースイメージについて

BigQueryにクエリを投げるKFPのコンポーネントは2020年7月頃から存在していましたが、ベースイメージにpython2.7を使っていたためクエリ文に日本語が入っているとエンコーディングエラーが出ていました。

それがつい先日のP-Rマージでベースイメージがpython3.7に更新されたことで、クエリに日本語が入っていても正しくクエリを処理できるようになりました。

つまり2021年1月現在、クエリに日本語が入っている場合は以下のようなコンポーネントURLを指定しない場合、python2系のイメージを使ったコンポーネントが指定されてエンコーディングエラーで落ちるので注意が必要です。
'https://raw.githubusercontent.com/kubeflow/pipelines/ここが1.3.0のものを使う/components/gcp/bigquery/query/...'

準備

この記事で示すサンプルは以下の宣言がされているものとします。

import kfp
from kfp import dsl
from kfp import components as comp
from kfp.components import func_to_container_op
from kfp.components import InputPath

HOST = 'Kubeflow PipelinesのURL'
PROJECT_ID = 'GCPを使っている場合は実行先のProject Id'
QUERY = '''
SELECT
    * 
FROM
    `bigquery-public-data.stackoverflow.posts_questions` 
LIMIT
    10
-- これはテストです
'''

実行は全部これです。

result = kfp.Client(host=HOST).create_run_from_pipeline_func(pipeline, arguments={})
result
"""
Experiment link here
Run link here
RunPipelineResult(run_id=ee82166c-707b-4e5f-84d2-5d98d7189023)
"""

CSVファイルに保存

コード

保存するファイル名とコンポーネントを宣言します。

# CSVのファイル名
FILENAME = 'query_result.csv'

# BigQuery to CSVのコンポーネントURL
bigquery_op_url = 'https://raw.githubusercontent.com/kubeflow/pipelines/1.3.0/components/gcp/bigquery/query/to_CSV/component.yaml'
bigquery_query_op = comp.load_component_from_url(bigquery_op_url)
help(bigquery_query_op)
"""
Help on function Bigquery - Query:

Bigquery - Query(query: str, project_id: 'GCPProjectID', job_config: dict = '', output_filename: str = 'bq_results.csv')
    Bigquery - Query
    A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery and
    store the results to a csv file.
"""

help関数を使うとそのコンポーネントに渡すべき引数がわかるので、ここを見ながら引数を設定してやります。

CSVに出力されたことは以下の2つの手順で確認してみます。
task 1. 出力先パスを確認
task 2. 出力先パスからCSVを読んでshapeを出力

# task 1
@func_to_container_op
def print_op(text: InputPath('CSV')) -> None:
    print(f"text: {text}")
    print(f"type: {type(text)}")

# task 2
@func_to_container_op
def handle_csv_op(path: InputPath('CSV')) -> None:
    print(f'path: {path}')
    print(f'type: {type(path)}')
    
    import subprocess
    subprocess.run(['pip', 'install', 'pandas'])
    import pandas as pd

    df = pd.read_csv(path)
    print(f'shape: {df.shape}')

# おまけ
@func_to_container_op
def print_op_non_type(text) -> None:
    print(f"text: {text}")
    print(f"type: {type(text)}")

# pipeline
@dsl.pipeline(
    name='Bigquery query pipeline name',
    description='Bigquery query pipeline'
)
def pipeline():
    bq_task = bigquery_query_op(
        query=QUERY,
        project_id=PROJECT_ID,
        output_filename=FILENAME)
    print_op(bq_task.outputs['table']) # task 1
    handle_csv_op(f"{bq_task.outputs['table']}/{FILENAME}") # task 2
    print_op_non_type(bq_task.outputs['table']) # おまけ

実行結果

# print_opのログ
text: /tmp/inputs/text/data
type: <class 'str'>

# handle_csv_opのログ
path: /tmp/inputs/path/data
type: <class 'str'>
shape: (10, 20)

# print_op_non_typeのログ
text: ,id,title,body,accepted_answer_id,answer_count,comment_count,community_owned_date,creation_date,favorite_count,last_activity_date,last_edit_date,last_editor_display_name,last_editor_user_id,owner_display_name,owner_user_id,parent_id,post_type_id,score,tags,view_count
0,65070674,NewRelic APM cpu usage shows incorrect values in comparison to K8S cluster cpu chart,"<p>Here goes charts of CPU usage of same pod. <strong>chart 1</strong> is from k8s cluster, <strong>chart 2</strong> is from APM.</p>
<ol></ol>"
...
type: <class 'str'>

実行結果のログから、以下のことがわかります。

  • InputPath('CSV')で受け取ったパスは/tmp/inputs/変数名/dataのようになる
  • 引数で指定したファイル名はコンポーネントの出力(bq_task.outputs['table'])に表示されない
# print_opのログ
text: /tmp/inputs/text/data

# handle_csv_opのログ
# 引数としてf"{bq_task.outputs['table']}/{FILENAME}"を渡しているがFILENAMEは出力されない
path: /tmp/inputs/path/data
  • 次のコンポーネントにクエリ結果を渡す際、引数の型をInputPath('CSV')で指定しないとクエリ結果が文字列として渡る
# print_op_non_typeのログ
text: ,id,title,body,accepted_answer_id,answer_count,comment_count,community_owned_date,creation_date,favorite_count,last_activity_date,last_edit_date,last_editor_display_name,last_editor_user_id,owner_display_name,owner_user_id,parent_id,post_type_id,score,tags,view_count
0,65070674,NewRelic APM cpu usage shows incorrect values in comparison to K8S cluster cpu chart,"<p>Here goes charts of CPU usage of same pod. <strong>chart 1</strong> is from k8s cluster, <strong>chart 2</strong> is from APM.</p>
<ol></ol>"

...中略
type: <class 'str'>

注意点

その1

クエリ結果を文字列として渡す際に渡し先のコンポーネントの引数の型をstrにすると型の不一致で落ちるため、InputPath('xxx')以外の形でコンポーネントの出力を受け渡すことは非推奨と思われます。

...

# 引数の型をstrに指定
@func_to_container_op
def print_op(text:str) -> None:
    print(f"text: {text}")
    print(f"type: {type(text)}")

def pipeline():
    bq_task = bigquery_query_op(
        query=QUERY,
        project_id=PROJECT_ID,
        output_filename=FILENAME)
    # コンポーネントの出力はPipelineParam型で引数はstrを指定しているため、以下のタスクは引数の型の不一致で落ちる
    print_op(bq_task.outputs['table']) # task 1

その2

上で述べたようにコンポーネントの出力(bq_task.outputs['table'])はPipelineParam型というプレースホルダになっているため、文字列との連結や演算などはできません。

そのため、上のプログラムではf-stringでの代入をしていたというわけです。

def pipeline():
    bq_task = bigquery_query_op(
        query=QUERY,
        project_id=PROJECT_ID,
        output_filename=FILENAME)
    # PipelineParam型はstringにキャストできないため以下の方法は落ちる
    # print_op(bq_task.outputs['table'] + "/" + FILENAME) # task 1
    # これは通る
    print_op(f"{bq_task.outputs['table']}/{FILENAME}") # task 1

実際に値が割り当てられるのはパイプライン実行時なので、コンポーネントの出力の扱いには注意が必要です。

参考:Kubeflow - Pipeline Parameters

GCSに保存

コード

保存するファイル名とコンポーネントを宣言します。help関数の出力を見てわかるように、CSVファイルを保存する時とは違った引数が必要です。

# GCSに保存するファイルへのパス
BUCKET = 'バケット名'
GCS_PATH = f'gs://{BUCKET}/query_from_kfp/query_result.csv'

bigquery_op_url = 'https://raw.githubusercontent.com/kubeflow/pipelines/1.3.0/components/gcp/bigquery/query/to_gcs/component.yaml'
bigquery_query_op = comp.load_component_from_url(bigquery_op_url)
help(bigquery_query_op)
"""
Help on function Bigquery - Query:

Bigquery - Query(query: str, project_id: 'GCPProjectID', dataset_id: str = '', table_id: str = '', output_gcs_path: 'GCSPath' = '', dataset_location: str = 'US', job_config: dict = '', output_kfp_path: str = '')
    Bigquery - Query
    A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery 
    service and dump outputs to a Google Cloud Storage blob.
"""

GCSに出力されたことは先ほどと同様に、以下の2つの手順で確認します。
task 1. 出力先のGCSパスを確認
task 2. 出力先のGCSパスからCSVを読んでshapeを出力

# task 1
@func_to_container_op
def print_op(text: InputPath('GCSPath')) -> None:
    print(f"text: {text}")
    print(f"type: {type(text)}")

# task 2
@func_to_container_op
def handle_csv_op(gcs_file_path: InputPath('GCSPath'), project:str) -> None:
    print(f'path: {gcs_file_path}')
    print(f'type: {type(gcs_file_path)}')
    
    import subprocess
    subprocess.run(['pip', 'install', 'google-cloud-storage', 'pandas'])

    from google.cloud import storage
    from io import BytesIO
    import pandas as pd

    client = storage.Client(project)
    # point 1
    with open(gcs_file_path, 'r') as f:
        path = f.read()
    # point 2
    with BytesIO() as f:
        client.download_blob_to_file(path, f)
        content = f.getvalue()
    df = pd.read_csv(BytesIO(content))
    print(f'shape: {df.shape}')

# pipeline
@dsl.pipeline(
    name='Bigquery query pipeline name',
    description='Bigquery query pipeline'
)
def pipeline():
    bq_task = bigquery_query_op(
        query=QUERY,
        project_id=PROJECT_ID,
        output_gcs_path=GCS_PATH})
    print_op(bq_task.outputs['output_gcs_path']) # task 1
    handle_task = handle_csv_op(gcs=bq_task.outputs['output_gcs_path'],
                                project=PROJECT_ID) # task 2

実行結果

# print_opのログ
text: /tmp/inputs/text/data
type: <class 'str'>

# handle_csv_opのログ
path: /tmp/inputs/gcs/data
type: <class 'str'>
shape: (10, 20)

注意点

その1

handle_csv_opコンポーネントでの処理のクセが強い気がしてます。今回のケースではクエリ結果がGCSに保存されているため、bigquery_query_opコンポーネントからの出力はstr型のパスではなく、GCSのパスが記述されたファイルへのパスになっています。

そのため、以下のようにGCSのパスを読み込んでから、

# point 1
with open(gcs_file_path, 'r') as f:
    path = f.read() # gs://{BUCKET}/query_from_kfp/query_result.csv

以下のようにGCSからファイルの中身を取得します。

# point 2
with BytesIO() as f:
    client.download_blob_to_file(path, f)
    content = f.getvalue()
df = pd.read_csv(BytesIO(content))

この挙動はコンポーネントの定義ファイルoutput_gcs_pathOutputPath型で定義していることに拠ります。素直にstringにしてくれよ…と思いますが、その理由は謎に包まれています。

その2

コンポーネントの引数の型にInputPathを指定した場合、引数名から特定の文字列が除外されます。

例えばhandle_csv_opコンポーネントの引数でgcs_file_pathがありますが、参照するときにはgcsとして参照しています。

# gcs_file_path=bq_task.outputs['output_gcs_path']ではない
handle_task = handle_csv_op(gcs=bq_task.outputs['output_gcs_path'],
                            project=PROJECT_ID) # task 2

一応以下のようにドキュメント?はあるのですが如何せん探しにくいので地味にハマりどころです。Kubeflowのドキュメントだったりチュートリアルが色んなところに散っていて探すのが大変です。

参考:Building Python function-based components - passing parameters by value

BigQueryに保存

BigQueryにクエリを投げてその結果をBigQueryの任意のテーブルに書き出します。

コード

保存先のテーブルとコンポーネントを宣言します。

# クエリ結果の保存先
DATASET_ID = 'mail_retention_pipeline'
TABLE_ID = 'query_result'
FILENAME = 'query_result.csv'

# クエリ結果の確認用クエリ
VERIFY_QUERY = f'''
SELECT
    * 
FROM
    `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`
'''

# クエリ結果をBigQueryに保存するコンポーネント
bigquery_table_op_url = 'https://raw.githubusercontent.com/kubeflow/pipelines/1.3.0/components/gcp/bigquery/query/to_table/component.yaml'
bigquery_query_table_op = comp.load_component_from_url(bigquery_table_op_url)

# クエリ結果をCSVに出力するコンポーネント
bigquery_csv_op_url = 'https://raw.githubusercontent.com/kubeflow/pipelines/1.3.0/components/gcp/bigquery/query/to_CSV/component.yaml'
bigquery_query_csv_op = comp.load_component_from_url(bigquery_csv_op_url)

help(bigquery_query_table_op)
"""
Help on function Bigquery - Query:

Bigquery - Query(query: str, project_id: 'GCPProjectID', table: str, dataset_id: str = '', table_id: str = '', dataset_location: str = 'US', job_config: dict = '')
    Bigquery - Query
    A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery 
    service and dump outputs to new table.
"""

BigQueryに出力されたことは以下の3つの手順で確認します。
task 1. BigQueryにクエリを投げてBigQueryに結果を保存
task 2. クエリ結果をBigQueryから取得しCSVで保存
task 3. CSVファイルを読んでshapeを確認

# task 3
@func_to_container_op
def handle_csv_op(path: InputPath('CSV')) -> None:
    import subprocess
    subprocess.run(['pip', 'install', 'pandas'])
    import pandas as pd
    df = pd.read_csv(path)
    print(f'shape: {df.shape}')

@dsl.pipeline(
    name='Bigquery query pipeline name',
    description='Bigquery query pipeline'
)
def pipeline():
    # task 1: クエリ結果をBigQueryに保存
    bq_table_task = bigquery_query_table_op(
        query=QUERY,
        project_id=PROJECT_ID,
        dataset_id=DATASET_ID,
        table_id=TABLE_ID,
        table='')
    # task 2: クエリ結果をCSVで保存
    bq_csv_task = bigquery_query_csv_op(
        query=VERIFY_QUERY,
        project_id=PROJECT_ID,
        output_filename=FILENAME).after(bq_table_task)
    handle_task = handle_csv_op(f"{bq_csv_task.outputs['table']}/{FILENAME}") # task 3

実行結果

# handle_csv_opのログ
path: /tmp/inputs/gcs/data
type: <class 'str'>
shape: (10, 20)

DAGがパイプラインっぽい形になりました。
image.png

注意点

その1

bq_table_taskコンポーネントにはtableという謎の引数があり、この引数に何かしらのstringを入れないと動作しません。ソースコードを見る限りこのパラメータは使われていないので修正漏れと思われます。

# クエリ結果をBigQueryに保存するコンポーネント
bigquery_table_op_url = 'https://raw.githubusercontent.com/kubeflow/pipelines/1.3.0/components/gcp/bigquery/query/to_table/component.yaml'
bigquery_query_table_op = comp.load_component_from_url(bigquery_table_op_url)
...
# table というpositional argumentがある
help(bigquery_query_table_op)
"""
Help on function Bigquery - Query:

Bigquery - Query(query: str, project_id: 'GCPProjectID', table: str, dataset_id: str = '', table_id: str = '', dataset_location: str = 'US', job_config: dict = '')
    Bigquery - Query
    A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery 
    service and dump outputs to new table.
"""

Kubeflow Pipelinesのリポジトリを確認したところ修正PRが出ていたので、マージされればこの問題は解消されます。

その2

クエリ結果をBigQueryに保存する処理は、実はGCSにクエリ結果を保存するコンポーネントでも実現できます。helpの出力からわかるように、クエリ結果をGCSに保存するコンポーネントにもdataset_idtable_idという引数があります。

# クエリ結果をBigQueryに保存するコンポーネント
"""
Help on function Bigquery - Query:

Bigquery - Query(query: str, project_id: 'GCPProjectID', table: str, dataset_id: str = '', table_id: str = '', dataset_location: str = 'US', job_config: dict = '')
    Bigquery - Query
    A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery 
    service and dump outputs to new table.
"""

# クエリ結果をGCSに保存するコンポーネント
"""
Help on function Bigquery - Query:

Bigquery - Query(query: str, project_id: 'GCPProjectID', dataset_id: str = '', table_id: str = '', output_gcs_path: 'GCSPath' = '', dataset_location: str = 'US', job_config: dict = '', output_kfp_path: str = '')
    Bigquery - Query
    A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery 
    service and dump outputs to a Google Cloud Storage blob.
"""

つまり「クエリ結果はパイプライン内だけで使うからどこにも保存する必要はない」という状況以外は、クエリ結果をGCSに保存するコンポーネントを使えばOKということです。

まとめ

  • BigQueryにクエリを投げる公式コンポーネントを使う際はバージョンに注意
    • 古いとクエリのコメントに日本語が使えない
  • コンポーネントの入力と出力の型に注意
  • コンポーネントのキーワード引数には省略される文字列がある
  • クエリ結果をパイプライン内で完結させる場合以外はGCSに保存するコンポーネントを使う

以上。

5
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?