LoginSignup
63

More than 5 years have passed since last update.

BigQueryを使う処理のテストをどう書くか

Last updated at Posted at 2014-12-06

この記事はVOYAGE GROUP エンジニアブログ Advent Calendar 2014の6日目の記事です。

こんにちは、VOYAGE GROUPで片手間データサイエンティスト業に従事している@hagino3000です。

昨今のBigQueryムーブメントに乗って、分析用のデータをBigQueryに投入しはじめた方も多いと思います。しかし、BigQueryを使い出すと、集計バッチ等のテストコードがローカル環境で完結しなくなり、BigQueryそのものを参照したくなります。本記事ではいくつかのアプローチを紹介します。

サンプルコードはPython + nose + BigQuery-Pythonを使っています。

何が問題か

何故テストコードで悩むかというと、BigQueryは次の2つの特徴を持つから。

  1. ローカル環境が作れない
  2. 少量のデータでもクエリに5秒程度かかる

特にクエリに時間がかかってしまうので、テストではこれを短縮したくなる。

全てMock化する

例えばBigQuery-Pythonのテストコードは、BigQueryに一切アクセスしていない。

とにかく高速に動作するメリットはあるが、INSERT処理、SELECT文が実際に動作するのかどうか確認できない。さらにテストコードがMockだらけになる。

テストモジュール毎にデータセットを作る

単純に考えるとユニットテスト用のデータセットを用意しておけば良さそうなのだが、複数人で同時にテストを走らせた時に干渉してしまうので、テストを走らせる毎のデータセットが必要になる。Djangoがテストを走らせる度にCreate Databaseするのと同じ事をやってみる。

まずは使い捨てデータセット(+テーブル)を作るための処理。

tests/helper.py
# coding=utf-8
from datetime import datetime
import glob
import json
import os
import random
import re


def setup_dataset(client, test_name):
    """
    テスト用のデータセットを準備する

    Parameters
    ----------
    client : bigquery.client
        See https://github.com/tylertreat/BigQuery-Python

    Returns
    -------
    dataset_id : string
        作成したデータセットのID (ex. ut_hoge_test_359103)

    schemas : dict (key: string, value: list)
        キーはテーブル名、値はテーブルを作成するのに利用したスキーマ定義リスト
    """
    # データセットの作成
    dataset_id = 'ut_%s_%d' % (test_name ,int(random.random() * 1000000))
    client.create_dataset(
        dataset_id,
        friendly_name='For unit test started at %s' % datetime.now())

    # スキーマ定義ファイルからテーブルを作成する
    schemas = {}
    BASE_DIR = os.path.normpath(os.path.join(os.path.dirname(__file__), '../'))
    for schema_file in glob.glob(os.path.join(BASE_DIR, 'schema/*.json')):
        table_name = re.search(r'([^/.]+).json$', schema_file).group(1)
        schema = json.loads(open(schema_file).read())
        schemas[table_name] = schema
        client.create_table(dataset_id, table_name, schema)

    return dataset_id, schemas

テスト本体、setupでデータセットの作成する。さらにテスト対象処理にこのデータセットを利用させるため、データセットIDを取得する処理をMock化する。

test_hoge.py
# coding=utf-8
import time

import mock
from nose.tools import eq_
from nose.plugins.attrib import attr

import myapp.bq
import myapp.calc_daily_state
from . import helper

dataset_id = None
bq_client = None

# 並列実行させる
_multiprocess_can_split_ = True


def setup():
    global dataset_id
    global bq_client
    # BigQuery-Pythonのclientインスタンスの取得
    bq_client = myapp.bq.get_client(readonly=False)
    # データセットの作成
    dataset_id, schemas = helper.setup_dataset(bq_client, 'test_hoge')
    # データセットIDを取得する処理をMock化
    myapp.bq.get_dataset_id = mock.Mock(return_value=dataset_id)
    # テスト用データのINSERT
    bq_client.push_rows(dataset_id, 'events', [....  ....])
    # INSERTした直後だとクエリできない事があるのでsleepする
    time.sleep(10)


@attr('slow')
def test_calc_dau():
    # BigQueryを参照するテスト
    ret = myapp.calc_daily_state.calc_dau('2014/08/01')
    eq_(ret, "....略....")


@attr('slow')
def test_calc_new_user():
    # BigQueryを参照するテスト
    ret = myapp.calc_daily_state.calc_new_user('2014/08/01')
    eq_(ret, "....略....")


def teadown():
    # データセットの削除、Failしたテストがあった時には残すとした方が良さそう
    bq_client.delete_dataset(dataset_id)

この例は、テスト対象の処理はReadOnlyであると想定したため、データセット作成は1度で済ませたある。テストケース毎に5秒はかかるので、1テスト1アサートを遵守したい。

setupのデータセット作成とデータロードにかかる時間は1秒程度。各ケースに時間がかかっているため並列化する事である程度時間を短縮する事ができる。

# 5並列でテストを走らせる
nosetests --processes=5 --process-timeout=30

Multiprocess: parallel testing — nose 1.3.4 documentation
http://nose.readthedocs.org/en/latest/plugins/multiprocess.html

テスト関数毎にデータセットを作る

上の例ではmoduleのsetupでデータセットを作成していたが、INSERTのある処理のテストとなると、テスト間での影響を排除する必要が出てくる。これはさらに時間がかかる。なぜなら、INSERT直後に結果を確認しようとしてクエリを実行しても結果が得られないから。INSERT後、数秒のsleepを置いてからクエリ(およそ5秒かかる)を実行して結果を確認する必要がある。

test_fuga.py
# 並列実行させる
_multiprocess_can_split_ = True

@attr('slow')
class TestFugaMethodsWhichHasInsert(object):
    def setup(self):
        # データセット作る
        ()
        self.dataset_id = dataset_id
        self.bq_client = bq_client

    def test_insert_foo(self):
        # INSERTを共なう処理のテスト

    def test_insert_bar(self):
        # INSERTを共なう処理のテスト

    def teardown(self):
        self.bq_client.delete_dataset(self.dataset_id)

テストが終るまでに寝落ち必至なので、実行はCIツールにまかせよう。この場合、BigQueryのデータについてはテスト間の影響を除去できているので並列実行が可能になる。

バランスを取った折衷案

クエリの発行及びデータをINSERTするメソッドのみ実際のBigQueryを利用し、slowテストとしてディレクトリを分けておく。
それ以外の処理は、クエリの結果を返す部分をMock化する。

過激派

分析タスクのコードにテストなんて書かない

未来派

DynamoDB Localみたいな物が登場するのを待つ。もしくは作る。

まとめ

これがベストといった選択肢は今の所ないので、Mockを減らしてテストコードをシンプルに保ちたいならBigQueryを直に参照、逆ならMockを使うといった所でしょうか。テストは並列実行できるように、テスト間の依存は除去しておく。データセットの作成には時間がかからないので、テスト毎に作っても問題無い。

もっと良いパターンがあれば教えてもらえると助かります。

明日の担当は@brtriverさんです、おたのしみに。

補足:google-bigquery-toolsのテストコードはどうなっているか

Python製であるbqコマンドのテストコードがどうなっているか見てみる。 bq query xxx でクエリを発行できるので、そういったテストがあっても良いはず。

クエリを実行するテストは無い。(´・ω・`)

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
63