Firestoreへのデータ操作ログをまんまBigQueryに書き込むのはExtensionを利用すればできますが、全部がjson形式で保存されてしまうので必要な情報だけをBigQueryに書き込むサンプルを書いてみます。
そもそもnodeでBigQueryを制御する方法はこちら。
準備
- firebase側にプロジェクトが存在しFirestoreが利用できる状態になっている。
- BigQuery側でデータセット(firestore)とテーブル(members)が作成されている。
なお、BigQueryを利用するには有料プランになっている必要があります。
実装
方針
- firestoreに書き込み(create)が行われたら、その内容をBigQueryにも書き込む
- 厳密にはonCreate()を利用すべきだが、下記ではonWriteを利用している
- document名をワイルドカード{docId}として指定してcontextから取得する
- 書き込まれた内容(name, ageとか)をchange.after.data()から取得する
トリガーの詳細はこちらを確認。
作業場所の確保
とりあえず下記のような感じ。
mkdir firestore-bq
cd firestore-bq
firebase init
.
.
cd functions
npm install --save @google-cloud/bigquery
後はfunctionsを選択してよしなに実装。
実装
BigQueryにデータをInsertする方法はいくつかあるが、それぞれ注意が必要。
INSERT INTOを使う(結果よくない)
普通にINSERT INTOを利用して挿入した場合、バッチ等で連続処理を行うと、
Error: Exceeded rate limits: too many table update operations for this table.
というエラーがでることがある。
const functions = require('firebase-functions');
const admin = require('firebase-admin');
const { BigQuery } = require('@google-cloud/bigquery');
exports.firestoreChange = functions.firestore.document('members/{docId}').onWrite((change, context) => {
//値を取得
console.log("docId=" + context.params.docId);
console.log("name=" + change.after.data().name);
const docId = context.params.docId;
const name = change.after.data().name;
const age = change.after.data().age;
//BigQueryにInsert
const bigquery = new BigQuery();
const query = "insert into firestore.members(docId,name,age) values('" + docId + "','" + name + "'," + age + ")";
console.log(query);
bigquery.query(query).then(data => {
console.log("success");
}).catch((error) => {
console.log(error);
});
//ログにエラーが出るのを防止
return 0;
});
Streaming Insertを使う(推奨:だが有償+注意点あり)
Streaming Insertした場合、InsertでExceeded rate limitのエラーが発生することは少ないが、InsertしたデータをDELETEで削除しようとすると、
UPDATE or DELETE statement over table firestore.members would affect rows in the streaming buffer, which is not supported
というエラーが出て削除できない。
Streaming Insertの場合、挿入データが90分Bufferに保存されるようでその間はデータのUPDATE, DELETE等は行えない仕様らしい。継続調査。
const functions = require('firebase-functions');
const admin = require('firebase-admin');
const { BigQuery } = require('@google-cloud/bigquery');
exports.onFirestoreChange = functions.region('asia-northeast1').firestore.document('members/{docId}').onWrite((change, context) => {
//値を取得
const docId = context.params.docId;
const name = change.after.data().name;
const age = change.after.data().age;
//Streaming insert
insertRowsAsStream(docId, name, age);
//warning対策
return 0;
});
insertRowsAsStream = async (docId, name, age) => {
const bigqueryClient = new BigQuery();
await bigqueryClient
.dataset('firestore')
.table('members')
.insert([{ docId: docId, name: name, age: age }])
console.log("Inserted " + docId);
}
動作確認
- firebase deployでデプロイ
- 完了したらどうにかしてfirestoreのmembersコレクションにadd()する。
- 私はSQL Tabsを簡易クライアントツールとして利用してます。[
- firebase functionsのログを確認する。
- BigQueryの内容を確認する。
参考
ついでにCloud SQL(MySQL)のコードも加えて関数化。
Cloud SQLも特に何も(proxyとか)設定しなくてもFunctionsからアクセスできるもよう。
結局SQLが無難か・・・。
const functions = require('firebase-functions');
const admin = require('firebase-admin');
const { BigQuery } = require('@google-cloud/bigquery');
const mysql = require('mysql');
exports.onFirestoreChange = functions.region('asia-northeast1').firestore.document('members/{docId}').onWrite((change, context) => {
//値を取得
const docId = context.params.docId;
const name = change.after.data().name;
const age = change.after.data().age;
//Streaming insert
// insertRowsAsStream(docId, name, age);
//Load Insert
// insertRowsAsLoad(docId, name, age);
//MySQL
insertRowsMySQL(docId, name, age);
//warning対策
return 0;
});
insertRowsAsStream = async (docId, name, age) => {
const bigqueryClient = new BigQuery();
await bigqueryClient
.dataset('firestore')
.table('members')
.insert([{ docId: docId, name: name, age: age }])
console.log("Inserted " + docId);
}
insertRowsAsLoad = async (docId, name, age) => {
const bigqueryClient = new BigQuery();
const query = "insert into firestore.members(docId,name,age) values('" + docId + "','" + name + "'," + age + ")";
await bigqueryClient.query(query);
sleep(100);
console.log(query);
}
insertRowsMySQL = async (docId, name, age) => {
const con = mysql.createConnection({
socketPath: "/cloudsql/" + "project-id:region:dbname",
user: "dbuser",
password: "xxxx",
database: "dbname"
});
con.connect();
const query = "insert into members(docId,name,age) values('" + docId + "','" + name + "'," + age + ")";
con.query(query);
console.log(query);
}
sleep = async (time) => {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve();
}, time);
});
}