はじめに
この記事は Apps Script Advent Calendar 2019 の8日目の記事です。
数日前に【GCP】BigQuery ScriptingとSchedule Queryを組み合わせようとした結果という記事にて、その日の日付に応じて条件分岐させてバッチ処理を組もうとしたのですが、現時点ではデータの保存までできないことが判明しました。
そこで、前回BigQuery ScriptingとSchedule Queryの組み合わせでは実現できなかったバッチ処理をGASにて実装してみました。
BigQuery × GASのサンプル
サンプルはこちらのリポジトリにpushしています。
https://github.com/MasashiFukuzawa/bigquery-gas/blob/master/bigquery.js
要件
- BigQueryからCloudSQL(MySQL)に直接アクセスしたい(CloudSQL Federationを利用)
- 毎月1日〜10日の間はMySQLのusersテーブルからデータを取得
- 毎月11日〜20日の間はMySQLのcommentsテーブルからデータを取得
- 毎月21日〜月末日まではMySQLのshopsテーブルからデータを取得
- 取得したデータはスプレッドシートに出力+BigQueryの対象テーブルへ上書き保存
- 毎日AM0~1時の間に上記の処理が定期実行される
下準備
1. スプレッドシートを作成し、シートに紐付くGASプロジェクトを作成
スプレッドシートのタブの「ツール」→「スクリプトエディタ」をクリックします。
2. BigQuery APIを有効化する
「リソース」→「Googleの拡張サービス」をクリックすると以下のようなポップアップが表示されるため、BigQuery APIの部分をオンに切り替えます。
3. ライブラリの追加
今回はunderscoreGS
というライブラリを使用したいので「Add a library」の欄に1yzaSmLB2aNXtKqIrSZ92SA4D14xPNdZOo3LQRH2Zc6DK6gHRpRK_StrT
と入れ、「追加」をクリックします。
すると、対象のライブラリが表示されるので、最新バージョンを指定し、Development modeをONにします。
4. GCPプロジェクトとの連携
「リソース」→「Cloud Platformプロジェクト」をクリックするとポップアップが表示されるので、自分のGCPのプロジェクト番号を記入し、「プロジェクトを設定」をクリックします。
以下のようにOAuth同意画面の設定を求められるので、「こちら」という部分をクリックしてGCPコンソール画面へと遷移します。
5. OAuth同意画面を作成
GCPのコンソールでは、任意のアプリケーション名を入れるだけで良いです。
その後、4.に戻り、再度「プロジェクトを設定」をクリックことで、GASとGCPプロジェクトが関連付けられます。
GASの実装
1. 処理の流れ
簡単に説明するとexecQueryAccordingToDate
関数が実行されると、以下の順で処理が進みます。
- クエリを実行する日の日付に応じて対象テーブルとクエリをセット
- BigQueryのジョブを実行(CloudSQL FederationによってMySQLデータを読みに行く)
- クエリが完了したらジョブの結果を取得
- 3.で取得した結果をスプレッドシートに出力
- 3.で取得したデータを1.でセットしたテーブルに保存(この時、テーブルがなければ新規作成)
それではポイントの箇所を見ていきます。
2. グローバル変数の宣言
ファイル全体を通して共通的に使用するGCPのプロジェクト番号、データセット名、対象スプレッドシートをあらかじめ指定しておくと便利です。
今回の場合、最終的にトリガーをセットして自動化したいので、スプレッドシートIDを指定する形で対象のスプレッドシートを取得しています。
// Your GCP information
var projectId = '<YOUR GCP PROJECT NUMBER>';
var datasetId = 'data_lake';
// Spreadsheet
var ss = SpreadsheetApp.openById('<YOUR SPREADSHEET ID>');
3. 日付による条件分岐
この部分がBigQueryのUI上だけでは完結しなかったので、今回GASで実装することとなりました。
今回のサンプルは日付によって参照テーブル先を変えるというシンプルな要件としていますが、実際に実務で使用する際はこの部分がもう少し複雑なものになるかと思います。
function setMysqlTable() {
const today = new Date();
const day = today.getDate();
var mysqlTable;
if (day < 11) {
mysqlTable = 'users';
} else if (day > 20) {
mysqlTable = 'shops';
} else {
mysqlTable = 'comments';
}
return mysqlTable;
}
4. CloudSQL Federationを用いたクエリ
このような形でBigQueryからMySQLのデータを読みに行くことができるようになります。
CloudSQL Federationの概要・設定方法・基本文法などについては、【GCP】Schedule QueryとCloudSQL Federationを組み合わせて超簡単にMySQLデータをBigQueryへコピーするをご参考下さい。
function setQuery(mysqlTable) {
// Read MySQL data directly from BigQuery by using CloudSQL Federation Query.
const query =
"SELECT * FROM EXTERNAL_QUERY( \
'mycurryapp.asia-northeast1.cloudsql-mycurryapp', \
'SELECT * FROM " + mysqlTable + "' \
);"
return query;
}
5. クエリ完了レスポンスの受信
クエリが完了していない状態で先の処理に進んでもエラーが出てしまう可能性があるため、ここでクエリが終わるまで待つようにしています。
// Check on status of the Query Job.
function checkOnQueryJobStatus(queryResults, jobId) {
var sleepTimeMs = 500;
while (!queryResults.jobComplete) {
Utilities.sleep(sleepTimeMs);
sleepTimeMs *= 2;
queryResults = BigQuery.Jobs.getQueryResults(projectId, jobId);
}
return queryResults;
}
6. クエリにヒットした全データの取得
ここでは、ヒットしたデータをwhileでループしながら取得+concat
でデータを連結しています。
// Get all the rows of results.
function getAllResults(queryResults, jobId) {
var rows = queryResults.rows;
while (queryResults.pageToken) {
queryResults = BigQuery.Jobs.getQueryResults(projectId, jobId, {
pageToken: queryResults.pageToken
});
rows = rows.concat(queryResults.rows);
}
return rows;
}
7. スプレッドシートへの出力
クエリにヒットしたデータがある場合は、ヘッダー付きでデータを出力するようにしています。
今回は、データを上書き保存していくという要件だったため、データ出力前にスプレッドシートをまっさらな状態に戻しています。
もし最終行にデータを追加していきたい場合は、ws.clear()
の部分をなくし、getRange
で最終行+1行目のところからデータが入るように指定してあげればOKです。
// Clear sheet data and output BigQuery results to sheet.
function outputToSpreadsheet(rows, ws, queryResults) {
if (rows) {
ws.clear();
// Append the headers and return header cols.
const headers = appendHeader(ws, queryResults);
// Append the results and return all spreadsheet data.
const data = appendResults(rows, headers);
ws.getRange(2, 1, rows.length, headers.length).setValues(data);
return data;
} else {
Logger.log('No rows returned.');
}
}
function appendHeader(ws, queryResults) {
const headers = queryResults.schema.fields.map(function(field) {
return field.name;
});
ws.appendRow(headers);
return headers;
}
function appendResults(rows, headers) {
const data = new Array(rows.length);
for (var i = 0; i < rows.length; i++) {
var cols = rows[i].f;
data[i] = new Array(cols.length);
for (var j = 0; j < cols.length; j++) {
data[i][j] = cols[j].v;
}
}
return data;
}
8. BigQueryへの保存
保存先のテーブルがあるかを確認した後、対象テーブルへデータをロードしていきます。
データをロードする際に、CSV形式など、BigQueryが対応している形式に変換する+BLOB(バイナリ・ラージ・オブジェクト)にしなければならないことにご注意下さい。
function loadDataToBigQuery(data, tableId) {
if (!targetTableExists(tableId)) {
createTable(tableId);
}
insertData(convertArrayToBlob(data), tableId);
}
保存先のテーブル(今回の場合、mysql_users
/ mysql_comments
/ mysql_shops
)がない場合は、スキーマ定義を基にして新規テーブルを作成します。
スキーマ定義は長くなるので、ここでは記載しませんが、サンプルのリポジトリの中では定数としてschema.gs
というファイルに切り出しています。
function targetTableExists(tableId) {
const tableInfo = BigQuery.Tables.list(projectId, datasetId);
if (tableInfo.totalItems === 0) {
return false;
}
const tables = tableInfo.tables.map(function(table){
return table.tableReference.tableId;
});
if (tables.indexOf(tableId) !== -1) {
return true;
} else {
return false;
}
}
function createTable(tableId) {
const table = {
tableReference: {
projectId: projectId,
datasetId: datasetId,
tableId: tableId
},
schema: {
fields: setBigQuerySchema(tableId)
}
};
table = BigQuery.Tables.insert(table, projectId, datasetId);
Logger.log('Table created: %s', table.id);
}
function setBigQuerySchema(tableId) {
var bq_schema;
switch (tableId) {
case 'mysql_users':
bq_schema = USERS_SCHEMA;
break;
case 'mysql_comments':
bq_schema = COMMENTS_SCHEMA;
break;
case 'mysql_shops':
bq_schema = SHOPS_SCHEMA;
break;
}
return bq_schema;
}
データをcsvに変換した後、BigQueryへのロード用にBLOBにしています。
function convertArrayToBlob(values) {
const contentType = "text/csv";
const lineDelimiter = ",";
var csvStr = '';
// Prepare array to csv format with underscoreGS module.
underscoreGS._map(
values,
function(row){
csvStr += row.join(lineDelimiter) + '\n';
}
)
// Get blob data and return.
return Utilities.newBlob(csvStr, contentType);
}
保存先や保存の際の条件を指定し、実際にデータをロードしている部分です。
今回は上書き保存することが要件だったため、writeDisposition: 'WRITE_TRUNCATE'
としていますが、行を追加する形で保存していきたい場合はWRITE_APPEND
とするようにして下さい。
function insertData(blob, tableId) {
const job = {
configuration: {
load: {
destinationTable: {
projectId: projectId,
datasetId: datasetId,
tableId: tableId
},
skipLeadingRows: 0,
allowJaggedRows: true,
allowQuotedNewlines: true,
writeDisposition: 'WRITE_TRUNCATE' // update the target table
}
}
};
BigQuery.Jobs.insert(job, projectId, blob);
}
動作確認
動作確認のため、GASを実行したいと思います。
初回実行時はOAuth認証が必要なので「許可を確認」→「詳細」→「(安全ではないページ)に移動」から認証を勧めていきます。
すると、このような形でデータが取得できていることが分かります。
BigQueryのテーブルにもデータが入ったかどうかも確認すると、きちんと対象のテーブルに保存されていました。
GASの定期実行
1. スクリプトファイルの画面から「編集」→「現在のプロジェクトのトリガー」をクリック
2. execQueryAccordingToDate
関数を実行する時間をセット
3.(余談)スプレッドシートから手動でexecQueryAccordingToDate
関数を呼び出せるようにする
bigquery.gs
に以下のコードを追加します。
function onOpen() {
const entries = [
{
name : "手動でクエリを実行",
functionName : "execQueryAccordingToDate"
}
];
ss.addMenu("BigQuery", entries);
};
その後、トリガーの設定画面から、スプレッドシート起動時にonOpen
関数が動くようにトリガーを追加します。
これをセットしておくと、スプレッドシートを開いた時に、スプレッドシートのタブ中に「BigQuery」という新規項目が追加されるようになり、そこから手動でexecQueryAccordingToDate
関数を呼び出せるようになるので何かと便利です。
まとめ
- BigQuery上では難しい条件分岐を含んだクエリの定期実行をGASとBigQuery APIで実現しました。
- GAS経由でもCloudSQL Federationは問題なく使用できました。
- コードのサンプルはこちらのリポジトリにpushしています。