0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PythonのBigQueryの query_parameters を dict から生成する

Posted at

はじめに

Pythonの BigQueryライブラリ でも パラメータ化されたクエリの実行 を使えますが、
なんかちょっと素人お断り的な難しさがあります。

(もしかしたら既にそういうライブラリがあるのかもしれないですが、)
list型 や struct型 や list[struct] とか struct of struct of list とか、
そういう複雑な構造の QueryParameter も Pythonの Dictオブジェクトからよしなに変換してくれるコードのメモです。

query_parameters を dict から生成する

以下のように書くとだいぶ対応できます。

from __future__ import annotations

import datetime

from google.cloud.bigquery import (
    QueryJobConfig,
    ArrayQueryParameter,
    ScalarQueryParameter,
    StructQueryParameter,
)

BigQueryQueryParameter = ScalarQueryParameter | ArrayQueryParameter | StructQueryParameter


def _create_query_parameters(params: dict[str, Any]) -> list[BigQueryQueryParameter]:
    """DictをBigQueryのQueryParameterに変換する

    ex:
    {
        "name": "foo",
        "age": 20,
        "is_active": True,
        "created_at": datetime.datetime(2020, 1, 1),
        "array": ["a", "b", "c"],
        "struct": {
            "name": "bar",
            "age": 30
        },
        "array_of_struct": [
            {"name": "bar", "age": 30},
            {"name": "baz", "age": 40},
        ],
        "struct_of_array": {
            "a": ["a", "b", "c"],
            "b": ["d", "e", "f"],
        },
        "struct_of_struct": {
            "a": {"name": "bar", "age": 30},
            "b": {"name": "baz", "age": 40},
        },
        // "array_of_array": not supported,
        array_of_struct_of_array: [
            {"a": ["a", "b", "c"], "b": ["d", "e", "f"]},
            {"a": ["g", "h", "i"], "b": ["j", "k", "l"]},
        ],
    }
    """
    if not params:
        return []

    query_parameters = []
    for key, value in params.items():
        if key.endswith(".type"):
            continue
        value_type = params.get(f"{key}.type")
        array_item_value_type = params.get(f"{key}.item.type")
        enc_value = _encode_value(key, value, value_type=value_type, array_item_value_type=array_item_value_type)
        if enc_value is not None:
            query_parameters.append(enc_value)
    return query_parameters


def _encode_value(
    key: str,
    value: int | float | str | list | dict,
    value_type: str = None,
    array_item_value_type: str = None,
) -> BigQueryQueryParameter | None:
    _value_types = {
        "STRING",
        "INT64",
        "FLOAT64",
        "BOOL",
        "TIMESTAMP",
        "DATE",
        "ARRAY",
        "STRUCT",
    }
    if value_type is None:
        value_type = _guess_value_type(value)
    else:
        value_type = value_type.upper()
    if value_type not in _value_types:
        raise ValueError(f"unknown type {key}={type(value)}")
    if value_type == "ARRAY":
        if array_item_value_type is None:
            if len(value) > 0:
                array_item_value_type = _guess_value_type(value[0])
            else:
                raise ValueError(f"unknown type {key}.item")
        array_item_value_type = array_item_value_type.upper()
        if array_item_value_type not in _value_types:
            raise ValueError(f"unknown type {key}.item={type(value)}")
        if array_item_value_type == "STRUCT":
            # noinspection PyTypeChecker
            value = [_encode_value(None, _d) for _d in value]
        elif array_item_value_type == "ARRAY":
            # maybe not supported in BigQuery
            raise ValueError(f"nested array is not supported {key}")
        # noinspection PyTypeChecker
        return ArrayQueryParameter(key, array_item_value_type, value)
    elif value_type == "STRUCT":
        fields = _create_query_parameters(value)
        # noinspection PyTypeChecker
        return StructQueryParameter(key, *fields)
    else:
        # noinspection PyTypeChecker
        return ScalarQueryParameter(key, value_type, value)


def _guess_value_type(value: Any) -> str | None:
    """値から型を推測する"""
    if isinstance(value, int):
        return "INT64"
    elif isinstance(value, float):
        return "FLOAT64"
    elif isinstance(value, str):
        return "STRING"
    elif isinstance(value, bool):
        return "BOOL"
    elif isinstance(value, datetime.datetime):
        return "TIMESTAMP"
    elif isinstance(value, datetime.date):
        return "DATE"
    elif isinstance(value, list):
        return "ARRAY"
    elif isinstance(value, dict):
        return "STRUCT"
    else:
        return None

使い方

上記ページに載っているサンプルコードを拝借して、説明するとこういう感じになります。

# Construct a BigQuery client object.
client = bigquery.Client()

query = """
    SELECT name, sum(number) as count
    FROM `bigquery-public-data.usa_names.usa_1910_2013`
    WHERE gender = @gender
    AND state IN UNNEST(@states)
    GROUP BY name
    ORDER BY count DESC
    LIMIT 10;
"""

params = dict(
            gender="M",
            states=["WA", "WI", "WV", "WY"],
         )
query_parameters = _create_query_parameters(params)  # 引数の型を見てよしなに変換

job_config = bigquery.QueryJobConfig(query_parameters=query_parameters)
query_job = client.query(query, job_config=job_config)  # Make an API request.

for row in query_job:
    print("{}: \t{}".format(row.name, row.count))

ちょっとすごいとこ

  • struct of struct とか list of struct of list of struct とかもちゃんと変換できる

できないこと

  • list of list というパラメータは渡せない(仕様的に無理?)
  • listのサイズが 0 のときエラーになる(要素の型がわからないので)
    • 一応、上記の例だと "states.item.type": "STRING" などとしてあげると通るには通るにしてはありますが...
      • Pythonで型情報だけを渡すいい方法ないかな...

さいごに

listやstructが入れ子になるケースが難しかった...のですが、 ChatGPT が教えてくれました。
便利な世の中になりましたね。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?