21
23

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Bigquery上で実行するバッチ処理のテストコードを書く (Ruby編)

Posted at

bigquery上でデータを加工して集計する時、このSQLが本当に合ってんのかテストコードで検証したくなる。
しかし、こういう外部サービスを使った処理のテストコードを書くのはとても面倒臭い。
とはいえ、書かんわけにもいかんし、実際に動かしてみないと分からないこともあるので、実際にbigqueryで処理を実行してテストする方法をまとめてみる。

テストデータのロード

bigqueryにデータを突っ込む方法はバルクロードするかStreaming Insertの二つ。
しかし、バルクロードはテストコードを書く時に困るのが、データ量に関わらず処理に一定時間かかること。
どれだけ小さいデータでも最低1分前後は待たされる上に、時々謎の刺さり方をして最悪数分かかる場合がある。
一方でStreaming Insertはまずテーブルを作っておかなければいけないし、Streaming Insertのレスポンスが返ってきた時点でクエリを即発行できるのかイマイチはっきりしない。

しかし、一つ裏技的なやり方があって、それがBigQuery で無からリレーションを出現させる - Qiitaである。
テストデータを静的なSQLに変換してViewとして定義すればAPIリクエストを送るだけでクエリ可能なダミーテーブルが準備できる。
bigqueryにおいてはViewはほぼテーブルと同じなのだが、どうもJOINした時のカラムの扱いに微妙な違いがあるような挙動をする。
端的に言うとテーブル名エイリアスを省略するとカラムが見つからなくなったりする。よく分からん……。
とはいえ、概ね問題なく処理が可能なのでテストデータの投入はこれで行うことにした。

で、このテクニックをテストコードから楽に実行するためにgemを作った。

joker1007/bq_fake_view

使い方はこんな感じ。

bq_fake_view = BqFakeView.new(Google::Auth.get_application_default(["https://www.googleapis.com/auth/bigquery"]))

data = [
  {foo: 1, bar: "bar", time: Time.now},
  {foo: 2, bar: nil, time: Time.now}
]
schema = [
  {name: "foo", type: "INTEGER"},
  {name: "bar", type: "STRING"},
  {name: "time", type: "TIMESTAMP"}
]
bq_fake_view.create_view("your_project_id", "your_dataset_id", "test_data", data, schema)
# =>
# create "test_data" view.
# view query is
# SELECT * FROM (SELECT 1 as foo, "bar" as bar, TIMESTAMP('2016-1-8 10:02:01') as time), (SELECT 2 as foo, CAST(NULL as STRING) as bar, TIMESTAMP('2016-1-8 10:02:01') as time)

bq_fake_view.view_query(data, schema) # => return Query string used by View definition

スキーマの指定が必要なのはNULLに型を指定しないと上手く結合できないからだ。

この方法で大量のレコードを差し込むのはかなり無理があるのだが、ロジックのテストについてはこれでも充分に実施できる。

データのクリア

テストを実施する時には、過去に実行したテスト結果をクリアしてフラットな状態からテストデータをロードしなければならない。
手元のDBだったら、database_cleanerを使うとかすれば簡単にデータをクリアできる。
bigqueryの場合はそんなもの無いのでAPIを叩いてクリアする必要がある。
しかもbigqueryはレコード単位の削除が存在しないのでテーブルごと消さなければならない。
しかし、バッチ処理なんかだと中間テーブルが大量に発生することもあって、それを逐一APIを叩いて削除するのはとても面倒臭い。
というわけで、datasetごと削除してデータをクリアするようにした。

def reset_test_dataset!
  begin
    Bigquery.client.delete_dataset("project_id", "test_dataset", delete_contents: true)
  rescue Google::Apis::ClientError
  end

  Bigquery.client.insert_dataset("project_id", Google::Apis::BigqueryV2::Dataset.new({
    dataset_reference: {
      dataset_id: "test_dataset"
    },
    default_table_expiration_ms: 3600000, # 1hour
  }))
end

こんな感じのヘルパーメソッドを定義しておいて、beforeフックで呼び出すようにする。
これで割とすぐにデータをクリアできる。

テストデータのロードとクリアが可能になれば、後はテストコードを書くだけだ。
テストコードは実際にバッチ処理を実行してみて、出力結果を取得して手元の正解と見做したデータと付き合わせているだけ。
とは言え、あんまり頻繁にbigqueryのAPIコールをするのも問題ありそうなので、RSpecで適当な粒度でdescribeを分けて、before(:context)でテストの事前準備を整える様にした。
ちなみに、テストコード実施後のテーブル状況を確認できるように、afterフックでデータセットを消すようにはしていない。その代わり放置してもデータが消えるようにdatasetにexpiration timeを設定している。

21
23
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
21
23

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?