この記事は MicroAd Advent Calendar 2022 の10日目の記事です。
qiita.com/advent-calendar/2022/microad
概要
業務で機械学習の学習バッチ処理を高速化する機会があり、データ加工処理をpythonからBigQueryに置き換えました。
結果的に、15時間かかっていた処理が10分で終わるようになり、約100倍の高速化が実現できました。
BigQueryの凄さを感じた一方で、扱ったデータが時系列データを含んでいてBigQueryで処理を実現するのが難しかったので、その知見について共有できればと思います。
下記は、今回扱いたい入力テーブルと最終的に欲しいテーブルです。
入力テーブル
入力は1レコードごとにユーザ名、特徴量のタイプ名、時系列の特徴量であるxとyが入っているテーブルです。
複数のレコードに渡って、同じユーザの時系列特徴量が特徴量タイプごとに存在していることや、時系列の特徴量なので1つの特徴量が配列になっていることが特徴です。
SELECT 'A' AS user, '0_0' AS feature_type, generate_array(0, 2) AS x, generate_array(3, 4) AS y
UNION ALL SELECT 'A' AS user, '1_1' AS feature_type, generate_array(5, 7) AS x, generate_array(8, 9) AS y
UNION ALL SELECT 'B' AS user, '1_1' AS feature_type, generate_array(10, 12) AS x, generate_array(13, 14) AS y
一時テーブル
1レコード中にそのユーザの全特徴量が集約されているテーブルで、入力テーブルを最終テーブルに変形しやすいように作成しています。
特徴量のカラム名は入力テーブルと同じですが、1レコードに複数の配列(時系列特徴量)を持つ、配列の配列形式になっているのが特徴です(1レコード中に特徴量タイプの数だけ配列が存在)。
このままでは機械学習のモデルに入力することはできないので、最終的に横持ち変換(pivot)を行う必要があります。
WITH
t1 AS (SELECT [0, 1, 2] as value UNION ALL SELECT [5, 6, 7]),
t2 AS (SELECT [3, 4] as value UNION ALL SELECT [8, 9]),
t3 AS (SELECT [10, 11, 12] as value),
t4 AS (SELECT [13, 14] as value),
tmp_table AS (
SELECT 'A' AS user, ['0_0', '1_1'] AS feature_type, ARRAY(SELECT STRUCT(value) FROM t1) AS x, ARRAY(SELECT STRUCT(value) FROM t2) AS y
UNION ALL SELECT 'B' AS user, ['1_1'] AS feature_type, ARRAY(SELECT STRUCT(value) FROM t3) AS x, ARRAY(SELECT STRUCT(value) FROM t4) AS y
)
SELECT * FROM tmp_table
最終テーブル
一時テーブルを更に加工して、モデルに入力できるように変形したテーブルです。
各レコードに対して、特徴量タイプと時系列データの要素の2つについて横持ち変換(pivot変換)してモデルに入力できるようにする必要があります。
SELECT 'A' AS user, 0 AS x_0_0_0, 1 AS x_0_0_1, 2 AS x_0_0_2, 3 AS y_0_0_0, 4 AS y_0_0_1, 5 AS x_1_1_0, 6 AS x_1_1_1, 7 AS x_1_1_2, 8 AS y_1_1_0, 9 AS y_1_1_1
UNION ALL SELECT 'B' AS user, NULL AS x_0_0_0, NULL AS x_0_0_1, NULL AS x_0_0_2, NULL AS y_0_0_0, NULL AS y_0_0_1, 10 AS x_1_1_0, 11 AS x_1_1_1, 12 AS x_1_1_2, 13 AS y_1_1_0, 14 AS y_1_1_1
入力テーブル → 一時テーブル
userカラムでデータを集約して一時テーブルを作成します。
配列の配列(array<array<int64>>
)はBigQueryでは直接作れないので、間にstructを挟んでいます(array<struct<array<int64>>>
)
CREATE OR REPLACE TEMP TABLE tmp_table2 AS (
WITH
input_table AS (
SELECT 'A' AS user, '0_0' AS feature_type, generate_array(0, 2) AS x, generate_array(3, 4) AS y
UNION ALL SELECT 'A' AS user, '1_1' AS feature_type, generate_array(5, 7) AS x, generate_array(8, 9) AS y
UNION ALL SELECT 'B' AS user, '1_1' AS feature_type, generate_array(10, 12) AS x, generate_array(13, 14) AS y),
tmp_table AS (
SELECT
user
,ARRAY_AGG(feature_type) AS feature_type
,ARRAY_AGG(STRUCT(x AS value)) AS x
,ARRAY_AGG(STRUCT(y AS value)) AS y
FROM input_table
GROUP BY user)
SELECT * FROM tmp_table
);
SELECT * FROM tmp_table2;
【脱線】もしこの段階で時系列特徴量に前処理を行いたい場合は、UDF(ユーザ定義関数)を用いることで対応できます
今回xとyは3次元と2次元の可変長の時系列特徴量になっているので、例として2次元に圧縮する関数を定義してみました(※ xとyは2次元以上を想定)
-- N次元の配列(時系列特徴量)を2次元に圧縮する関数
-- 1次元目は元の配列の1要素目を、2次元目は元の配列の全要素の総和を入力する
CREATE TEMP FUNCTION preprocess(x ARRAY<STRUCT<value ARRAY<INT64>>>)
RETURNS ARRAY<STRUCT<value ARRAY<INT64>>>
LANGUAGE js AS r"""
var result = [];
const stacked_interval = [1, 1000000];
for (var i = 0; i < x.length; i++){{
const initialValue = 0;
var y = x[i].value;
var arr = [];
for (var j = 0; j < stacked_interval.length; j++) {{
let s = y.slice(0, stacked_interval[j]).reduce((a, b) => {{return Number(a) + Number(b)}}, initialValue);
arr.push(s);
}}
var valueDict = new Object();
valueDict.value = arr
result.push(valueDict)
}}
return result;
""";
WITH
input_table AS (
SELECT 'A' AS user, '0_0' AS feature_type, generate_array(0, 2) AS x, generate_array(3, 4) AS y
UNION ALL SELECT 'A' AS user, '1_1' AS feature_type, generate_array(5, 7) AS x, generate_array(8, 9) AS y
UNION ALL SELECT 'B' AS user, '1_1' AS feature_type, generate_array(10, 12) AS x, generate_array(13, 14) AS y),
tmp_table AS (
SELECT
user
,ARRAY_AGG(feature_type) AS feature_type
,ARRAY_AGG(STRUCT(x AS value)) AS x
,ARRAY_AGG(STRUCT(y AS value)) AS y
FROM input_table
GROUP BY user)
SELECT
user
,feature_type
,preprocess(x) AS x -- 前処理を適用
,preprocess(y) AS y -- 前処理を適用
FROM
tmp_table
一時テーブル → 最終テーブル
pivot関数を適用するためにデータを縦持ちに変換します。
対象となるデータはfeature_typeと時系列特徴量の2種類について同時にpivotしたいので、基準となるカラム(feature_type_offset)を新しく作ります(カラム名は思いつかなかったので適当です)
配列の配列となっているレコードをOFFSETを用いて配列だけを持つレコードに変換した後、配列のレコードをスカラだけ持つように変換しています(ネストでクエリが見づらくてすみません…)
-- feature_typeと時系列特徴量の次元数(配列の長さ)の2種類でpivotできるようにテーブルを加工する
-- また、後段でテーブルのカラムを参照したいので一時テーブル(tmp_table3)に結果を保存
CREATE OR REPLACE TEMP TABLE tmp_table3 AS (
SELECT
user
,feature_type_offset
,X
,Y
FROM(
SELECT
offset
,user
,feature_type
,CONCAT(feature_type, '_', offset) AS feature_type_offset -- 名前が思いつかなかったので適当です
,X2 AS X
,Y2 AS Y
FROM (
SELECT
user
,feature_type
,(SELECT ARRAY_AGG(x) FROM UNNEST(x.value) AS x) AS x -- ARRAY<STRUCT<INT64>>をARRAY<INT64>に変換
,(SELECT ARRAY_AGG(y) FROM UNNEST(y.value) AS y) AS y -- ARRAY<STRUCT<INT64>>をARRAY<INT64>に変換
FROM (
SELECT
user
,ft AS feature_type
,X1 AS x
,Y2 AS y
FROM
tmp_table2, UNNEST(feature_type) AS ft WITH OFFSET
LEFT JOIN UNNEST(x) AS X1 WITH OFFSET USING(offset)
LEFT JOIN UNNEST(y) AS Y2 WITH OFFSET USING(offset))
), UNNEST(x) AS X2 WITH OFFSET
LEFT JOIN UNNEST(y) AS Y2 WITH OFFSET USING(offset)
)
);
SELECT * FROM tmp_table3;
pivotで増やしたいカラムを変数として取得しておきます。
DECLARE unique_feature_types_offset STRING;
-- ここで動的SQLでpivotしたい変数(unique_feaute_types_offset)を取得
SET unique_feature_types_offset = (
WITH
t AS (
SELECT
DISTINCT(feature_type_offset) as feature_type_offset
FROM tmp_table3
ORDER BY feature_type_offset)
SELECT '"' || ARRAY_TO_STRING(ARRAY_AGG(feature_type_offset), '","') || '"' AS feature_type_offset FROM t -- 得られた複数のレコードを文字列化
);
SELECT unique_feature_types_offset;
最後にpivotで横持ち変換を行って、動的にカラムを増やします。
xとyが異なる次元数なので、全レコードがNULLだけのカラムが出来ちゃってますが、上で触れた前処理関数(UDFで実装)で固定長の配列にするか、全レコードがNULLのカラムは削除する等の対策を行えば良さそうです。得られたテーブルのカラムの順序は違っていますが、よく見ると概要で触れた最終テーブルと一致しています。
-- pivotで横持ち変換して動的にカラムを生成
-- 通常pivotは変数を使用できないので、動的SQLで実行
EXECUTE IMMEDIATE FORMAT("""
SELECT
*
FROM tmp_table3
PIVOT (
ANY_VALUE(x) AS x
,ANY_VALUE(y) AS y
FOR feature_type_offset IN (%s)
)
ORDER BY user;
""", unique_feature_types_offset);
最終的なクエリの全体像はこちらです
DECLARE unique_feature_types_offset STRING;
-- 冗長になるのでここまでの結果を一時テーブル(tmp_table2)に結果を保存
CREATE OR REPLACE TEMP TABLE tmp_table2 AS (
WITH
input_table AS (
SELECT 'A' AS user, '0_0' AS feature_type, generate_array(0, 2) AS x, generate_array(3, 4) AS y
UNION ALL SELECT 'A' AS user, '1_1' AS feature_type, generate_array(5, 7) AS x, generate_array(8, 9) AS y
UNION ALL SELECT 'B' AS user, '1_1' AS feature_type, generate_array(10, 12) AS x, generate_array(13, 14) AS y),
tmp_table AS (
SELECT
user
,ARRAY_AGG(feature_type) AS feature_type
,ARRAY_AGG(STRUCT(x AS value)) AS x
,ARRAY_AGG(STRUCT(y AS value)) AS y
FROM input_table
GROUP BY user)
SELECT * FROM tmp_table
);
-- feature_typeと時系列特徴量の次元数(配列の長さ)の2種類でpivotできるようにテーブルを加工する
-- また、後段でテーブルのカラムを参照したいので一時テーブル(tmp_table3)に結果を保存
CREATE OR REPLACE TEMP TABLE tmp_table3 AS (
SELECT
user
,feature_type_offset
,X
,Y
FROM(
SELECT
offset
,user
,feature_type
,CONCAT(feature_type, '_', offset) AS feature_type_offset -- 名前が思いつかなかったので適当です
,X2 AS X
,Y2 AS Y
FROM (
SELECT
user
,feature_type
,(SELECT ARRAY_AGG(x) FROM UNNEST(x.value) AS x) AS x -- ARRAY<STRUCT<INT64>>をARRAY<INT64>に変換
,(SELECT ARRAY_AGG(y) FROM UNNEST(y.value) AS y) AS y -- ARRAY<STRUCT<INT64>>をARRAY<INT64>に変換
FROM (
SELECT
user
,ft AS feature_type
,X1 AS x
,Y2 AS y
FROM
tmp_table2, UNNEST(feature_type) AS ft WITH OFFSET
LEFT JOIN UNNEST(x) AS X1 WITH OFFSET USING(offset)
LEFT JOIN UNNEST(y) AS Y2 WITH OFFSET USING(offset))
), UNNEST(x) AS X2 WITH OFFSET
LEFT JOIN UNNEST(y) AS Y2 WITH OFFSET USING(offset)
)
);
-- ここで動的SQLでpivotしたい変数(unique_feaute_types_offset)を取得
SET unique_feature_types_offset = (
WITH
t AS (
SELECT
DISTINCT(feature_type_offset) as feature_type_offset
FROM tmp_table3
ORDER BY feature_type_offset)
SELECT '"' || ARRAY_TO_STRING(ARRAY_AGG(feature_type_offset), '","') || '"' AS feature_type_offset FROM t -- 得られた複数のレコードを文字列化
);
-- pivotで横持ち変換して動的にカラムを生成
-- 通常pivotは変数を使用できないので、動的SQLで実行
EXECUTE IMMEDIATE FORMAT("""
SELECT
*
FROM tmp_table3
PIVOT (
ANY_VALUE(x) AS x
,ANY_VALUE(y) AS y
FOR feature_type_offset IN (%s)
)
ORDER BY user;
""", unique_feature_types_offset);
所感
この記事では一時テーブルから最終テーブルを作る過程もBigQueryで実装する例を紹介しましたが、実際の業務では使用していないです(この記事で供養です)。今回のデータだと、pivotで特徴量数 x 特徴量タイプ数 x 時系列特徴量の次元数(配列の長さ)
で動的にカラム数が増えるので、BigQueryのテーブルで持てるカラム数の上限(1万)を超えてしまったり、上記クエリのままだとリソースエラーに触れる可能性があるからです(このクエリよりも効率の良い書き方は沢山あると思いますのでご存知でしたら是非教えて下さい)。ただ、データサイズや横持ち変換後のカラム数が現実的に収まるのであれば採用してみても良いかもしれません。ここまで読んで頂きありがとうございました。