LoginSignup
1
1

Cloudflare から Workers を経由して BigQuery に Logpush する

Last updated at Posted at 2023-08-28

目的

以下のような実装を検討します。

従来の実装

BigQuery にデータを書き込むには

Data Transfer Service を使う

S3 はサポートしていますが、S3 互換の Cloudflare R2 はサポート対象外のようなので、この方法は諦めます。

以下、参考までに CLI で Data Transfer を作成するコマンドです。

bq mk \
--transfer_config \
--data_source=google_cloud_storage \
--display_name=datetransferjob --target_dataset=bqdataset \
--schedule=None \
--params='{
  "data_path_template":"gs://bucket/row.json",
  "destination_table_name_template":"sample_table",
  "file_format":"JSON",
  "ignore_unknown_values": "true"
}'

Storage Write API を使う

毎月最初の 2 TB まで無料ですが、$0.03 per 1 GB(東京)の費用が発生します。

また、AppendRows の処理が bidirectional なストリーミングで、Workers のような Web 向けの gRPC では対応していない方式のため、諦めます。

  rpc AppendRows(stream AppendRowsRequest) returns (stream AppendRowsResponse) {
    option (google.api.http) = {
      post: "/v1/{write_stream=projects/*/datasets/*/tables/*/streams/*}"
      body: "*"
    };
    option (google.api.method_signature) = "write_stream";
  }
...
// Request message for `AppendRows`.
//
// Because AppendRows is a bidirectional streaming RPC, certain parts of the
// AppendRowsRequest need only be specified for the first request before
// switching table destinations. You can also switch table destinations within
// the same connection for the default stream.

ストリーミング挿入(tabledata.insertAll

$0.012 per 200 MB(東京)の費用が発生します。
(挿入に成功した行が課金対象になります。最小サイズ 1 KB で各行が計算されます。)

また、以下のようにデータをただちに利用できない場合もあるようです。

最近ストリーミングされた行の一部は、通常、数分間はテーブルのコピーに使用できない可能性があります。まれに最大 90 分かかることがあります。

今回、ストリーミング挿入は検討の対象外とします。

バッチ読み込み (jobs.insert)

ファイルアップロードしながらインサートできますが、1日1500件の上限があるので、毎分1回ロードする(1440件)より多いと上限に引っかかります。

BigQuery error in load operation: Error processing job 'project_id:bqjob_xxx_xxx_1': 
Quota exceeded: Your table exceeded quota for imports or query appends per table. 
For more information, see https://cloud.google.com/bigquery/docs/troubleshoot-quotas

Logpush で都度発生する通信を、どこかのオブジェクトストレージにログを保管することなく、Workers 単体でこの上限を回避することは難しそうなため、諦めます。

(Cloudflare Queue を使ったとしても max_batch_timeout の最大値が30秒のため、難しそうです)

Google Cloud コンソール、bq コマンドライン ツール、または読み込みタイプの jobs.insert API メソッドを使用して BigQuery にデータを読み込む場合、次の上限が適用されます。
1 日のテーブルあたりの読み込みジョブ数 上限 = 1,500 件のジョブ デフォルト

以下、参考までに REST API に POST するコマンドです。

export PROJECT_ID=YOUR_PROJECT_ID
export GOOGLE_APPLICATION_CREDENTIALS=~/key.json
TOKEN=$(gcloud auth application-default print-access-token)   

curl -vvv -X POST \
-H "Authorization: Bearer ${TOKEN}" \
-H "Content-Type: multipart/related; boundary=foo_bar_baz" \
'https://bigquery.googleapis.com/upload/bigquery/v2/projects/'${PROJECT_ID}'/jobs?uploadType=multipart' \
-d '
--foo_bar_baz
Content-Type: application/json; charset=UTF-8

{
  "configuration": {
    "load": {
      "sourceFormat": "NEWLINE_DELIMITED_JSON",
      "destinationTable": {
        "projectId": "'${PROJECT_ID}'",
        "datasetId": "sample_dataset",
        "tableId": "sample_table"
      }
    }
  }
}

--foo_bar_baz
Content-Type: application/json

{"id":"testId1","user_uuid":"testUuid1","action_type":1}
{"id":"testId2","user_uuid":"testUuid2","action_type":2}
{"id":"testId3","user_uuid":"testUuid3","action_type":3}
--foo_bar_baz--
'

バッチ読み込み (jobs.query)

SQL の INSERT 文を使ったクエリジョブによりデータを書き込む方法がありました。

You can use Data Manipulation Language (DML) since this uses jobs.query of bigquery API. The reason why this uses jobs.query to insert data rather than streaming insert api is to allow the inserted data to be modified immediately.

クエリでの課金対象となるため、オンデマンドの場合、毎月 1 TB まで無料ですが、$6.00 per TB(東京)の費用が発生します。

クエリジョブの上限も「Query usage per day = Unlimited」のため、問題なさそうです。

今回はこちらの方法を Workers で実装してみます。

以下、参考までに REST API に POST するコマンドです。

export PROJECT_ID=YOUR_PROJECT_ID
export GOOGLE_APPLICATION_CREDENTIALS=~/key.json
TOKEN=$(gcloud auth application-default print-access-token)

curl -vvv -X POST \
-H "Authorization: Bearer ${TOKEN}" \
-H "Content-Type: application/json; charset=UTF-8" \
'https://bigquery.googleapis.com/bigquery/v2/projects/'${PROJECT_ID}'/queries' \
-d '{
  "kind": "bigquery#queryRequest",
  "query": "INSERT INTO '${PROJECT_ID}'.sample_dataset.sample_table \(id, user_uuid, action_type\) VALUES \(\"testId01\", \"testUuid01\", 0\), \(\"testId02\", \"testUuid02\", 0\)",
  "location": "US",
  "useLegacySql": false,
}'

バッチ読み込み (jobs.query) による実装

スキーマファイル生成

以下のコマンドで http_requests データセット用のスキーマファイルを生成できます。

export EMAIL='YOUR_EMAIL'
export APIKEY='YOUR_APIKEY'
export ZONE_ID='YOUR_ZONE_ID'
curl -s \
-H "X-Auth-Email: $EMAIL" \
-H "X-Auth-Key: $APIKEY" \
"https://api.cloudflare.com/client/v4/zones/$ZONE_ID/logpush/datasets/http_requests/fields" \
| jq -r '
    .result |
    to_entries |
    [ map_values({
        name:.key,
        type:(.value|split(";")[0]),
        mode:"NULLABLE"
    }) | .[] |
    if .type == "string" then .type="STRING" else . end |
    if .type == "int" then .type="INT64" else . end |
    if .type == "bool" then .type="BOOLEAN" else . end |
    if .type == "float" then .type="FLOAT64" else . end |
    if .type == "object" then .type="JSON" else . end |
    if .type == "int or string" then .type="TIMESTAMP" else . end |
    if .type == "array[string]" then .mode="REPEATED" else . end |
    if .type == "array[string]" then .type="STRING" else . end |
    if .type == "array[int]" then .mode="REPEATED" else . end |
    if .type == "array[int]" then .type="INT64" else . end ]' \
> schema-http-requests.json

データセット・テーブル・スキーマ作成

BigQuery で、サンプルテーブルを以下のように作成します。

スキーマは、上記の方法で生成するか、GitHub からダウンロードし、事前に定義します。

git clone https://github.com/kyouheicf/logpush-bigquery.git && cd $(basename $_ .git)

bq mk kyouhei_logpush

bq mk --table --nouse_legacy_sql \
kyouhei_logpush.http_requests \
"schema-http-requests.json"

Workers から Google Cloud APIs を呼ぶには

上記の記事が詳しいです。

Node.js の crypto.createPrivateKey が Workers でもサポートされれば、もう少しコードをシンプル化できそうです。

Workers デプロイ

まず、こちらのコードをクローンします。

git clone https://github.com/kyouheicf/logpush-bigquery.git && cd $(basename $_ .git)

必要ばパラメータを Secret にセットします。

API 認証情報の取得は以下の記事を参考にしてください。

(また、src/worker.js スクリプト内の PRESHARED_AUTH_HEADER_VALUE も必要に応じて変更してください。)

cat ~/key.json | jq -c | pbcopy
wrangler secret put GOOGLE_APPLICATION_CREDENTIALS # Paste it
wrangler secret put DATASET_ID # kyouhei_logpush
wrangler secret put TABLE_ID # http_requests

先にローカルでテストしたい場合は、以下のようにすると確認できます。

.dev.vars
DATASET_ID=kyouhei_logpush
TABLE_ID=http_requests
GOOGLE_APPLICATION_CREDENTIALS='{"type":"service_account","project_id":"xxx",...}'
wrangler dev

# Initial pre-flight Logpush Request
curl "http://localhost:8787/" -X POST \
--data-raw '{"content":"test","filename":"test.txt"}' \
-H "Content-Type: application/json" \
-H "X-Logpush-Auth: mypresharedkey"

# Logpush HTTP POST Request
echo '[{"BotDetectionIDs":[33554817],"BotScore":1,"BotScoreSrc":"Heuristics","BotTags":[],"CacheCacheStatus":"dynamic","CacheReserveUsed":false,"CacheResponseBytes":6691,"CacheResponseStatus":200,"CacheTieredFill":false,"ClientASN":2527,"ClientCountry":"jp","ClientDeviceType":"desktop","ClientIP":"2400:00:000:0000:0000:0000:0000:0000","ClientIPClass":"noRecord","ClientMTLSAuthCertFingerprint":"","ClientMTLSAuthStatus":"unknown","ClientRegionCode":"13","ClientRequestBytes":3200,"ClientRequestHost":"example.com","ClientRequestMethod":"GET","ClientRequestPath":"/","ClientRequestProtocol":"HTTP/2","ClientRequestReferer":"","ClientRequestScheme":"https","ClientRequestSource":"eyeball","ClientRequestURI":"/","ClientRequestUserAgent":"curl/8.2.1","ClientSSLCipher":"AEAD-AES256-GCM-SHA384","ClientSSLProtocol":"TLSv1.3","ClientSrcPort":54044,"ClientTCPRTTMs":5,"ClientXRequestedWith":"","ContentScanObjResults":[],"ContentScanObjTypes":[],"Cookies":{"key1":"value1","key2":"value2"},"EdgeCFConnectingO2O":false,"EdgeColoCode":"NRT","EdgeColoID":408,"EdgeEndTimestamp":"2021-10-28T02:24:46Z","EdgePathingOp":"wl","EdgePathingSrc":"macro","EdgePathingStatus":"nr","EdgeRateLimitAction":"","EdgeRateLimitID":0,"EdgeRequestHost":"example.com","EdgeResponseBodyBytes":5953,"EdgeResponseBytes":6931,"EdgeResponseCompressionRatio":1,"EdgeResponseContentType":"text/html","EdgeResponseStatus":200,"EdgeServerIP":"172.68.119.186","EdgeStartTimestamp":"2021-10-28T02:24:46Z","EdgeTimeToFirstByteMs":48,"FirewallMatchesActions":["log","log"],"FirewallMatchesRuleIDs":["5296044f269b478f97056cb195c6b1f0","f53e56714f824ad6997377039d4175f3"],"FirewallMatchesSources":["firewallCustom","dlp"],"JA3Hash":"0149f47eabf9a20d0893e2a44e5a6323","OriginDNSResponseTimeMs":1,"OriginIP":"1.2.3.4","OriginRequestHeaderSendDurationMs":0,"OriginResponseBytes":0,"OriginResponseDurationMs":24,"OriginResponseHTTPExpires":"","OriginResponseHTTPLastModified":"","OriginResponseHeaderReceiveDurationMs":13,"OriginResponseStatus":200,"OriginResponseTime":24000000,"OriginSSLProtocol":"TLSv1.3","OriginTCPHandshakeDurationMs":3,"OriginTLSHandshakeDurationMs":8,"ParentRayID":"00","RayID":"7fc5705d9adb2650","RequestHeaders":{"cf-verified-bot":"false","cf-threat-score":"0","cf-ja3-hash":"1f24dbdea9cbd448a034e5d87c14168f","cf-bot-score":"2","x-threat-score":"0"},"ResponseHeaders":{"set-cookie":"__cf_bm=mRsPDRxqOtPMrCPQrdqr5OW37fQmcMqwCQncWA2M_Hw-1693060912-0-ATYc9Kq8QvuQINUpETbW0eqVGTe2vtKmv0DVsHk6QeVu1I2lTzXoO9AQsCZGIZ4p+hkVFJg+T5s+BVzJlChr+Sg=; path=/; expires=Sat, 26-Aug-23 15:11:52 GMT; domain=.example.com; HttpOnly; Secure; SameSite=None"},"SecurityAction":"log","SecurityActions":["log","log"],"SecurityLevel":"high","SecurityRuleDescription":"Javascript Injection","SecurityRuleID":"f53e56714f824ad6997377039d4175f3","SecurityRuleIDs":["5296044f269b478f97056cb195c6b1f0","f53e56714f824ad6997377039d4175f3"],"SecuritySources":["firewallCustom","dlp"],"SmartRouteColoID":0,"UpperTierColoID":0,"WAFAction":"unknown","WAFAttackScore":82,"WAFFlags":"0","WAFMatchedVar":"","WAFProfile":"unknown","WAFRCEAttackScore":86,"WAFRuleID":"","WAFRuleMessage":"","WAFSQLiAttackScore":96,"WAFXSSAttackScore":97,"WorkerCPUTime":0,"WorkerStatus":"unknown","WorkerSubrequest":false,"WorkerSubrequestCount":0,"WorkerWallTimeUs":0,"ZoneName":"example.com"}]' \
| jq -c '.[]' | gzip > body.gz
curl "http://localhost:8787/" -X POST \
--data-binary @body.gz \
-H "Content-Type: application/json" \
-H "Content-Encoding: gzip" \
-H "X-Logpush-Auth: mypresharedkey"

最後に wrangler deploy src/worker.js で Workers をデプロイします。

% wrangler deploy src/worker.js  
 ⛅️ wrangler 3.6.0
------------------
Total Upload: 10.33 KiB / gzip: 3.55 KiB
Uploaded logpush-bigquery (0.91 sec)
Published logpush-bigquery (0.24 sec)
  https://logpush-bigquery.example.workers.dev
Current Deployment ID: xxxxx

Workers で Logpush を待ち受けて、BigQuery にバッチ読み込み(jobs.query) する準備ができました。

Cloudflare Logpush 構成

以下のコマンドで、Workers をターゲットとした Logpush ジョブを構成します。

export EMAIL='YOUR_EMAIL'
export APIKEY='YOUR_APIKEY'
export ZONE_ID='YOUR_ZONE_ID'

# フィールドを指定
export FIELDS=$(curl -s \
-H "X-Auth-Email: $EMAIL" \
-H "X-Auth-Key: $APIKEY" \
"https://api.cloudflare.com/client/v4/zones/$ZONE_ID/logpush/datasets/http_requests/fields" \
| jq -r '.result | keys | join(",") | . + "&timestamps=rfc3339&CVE-2021-44228=false"')
# echo $FIELDS

# example.workers.dev は適宜変更
curl -s "https://api.cloudflare.com/client/v4/zones/$ZONE_ID/logpush/jobs" -X POST -d '
{
  "name": "bigquery-sql-insert-http-requests",                                 
  "logpull_options": "fields='$FIELDS'",
  "destination_conf": "https://logpush-bigquery.example.workers.dev?header_X-Logpush-Auth=mypresharedkey",
  "max_upload_bytes": 5000000,
  "max_upload_records": 1000,
  "dataset": "http_requests",
  "enabled": true
}' \
-H "X-Auth-Email: $EMAIL" \
-H "X-Auth-Key: $APIKEY"

BigQuery で確認

デプロイした Workers でログを確認できます。

% wrangler tail                
 ⛅️ wrangler 3.6.0
------------------
Successfully created tail, expires at 2023-08-27T20:35:11Z
Connected to logpush-bigquery, waiting for logs...

POST https://logpush-bigquery.example.workers.dev/ - Ok @ 8/27/2023, 11:41:08 PM
  (log) {"content":"test"}
POST https://logpush-bigquery.example.workers.dev/ - Ok @ 8/27/2023, 11:41:33 PM
  (log) Received json[0] === {...}

しばらくすると、以下のようにテーブルに行が挿入されたことがわかります。

image.png

bq query --nouse_legacy_sql '
SELECT 
  COUNT(*) AS RecordCount
FROM kyouhei_logpush.http_requests
'
result
+-------------+
| RecordCount |
+-------------+
|        5694 |
+-------------+

サンプルクエリ

リクエスト数

日次

bq query --nouse_legacy_sql '
SELECT 
  DATE(EdgeStartTimestamp) AS TimeStampDate, 
  COUNT(*) AS RequestCount
FROM kyouhei_logpush.http_requests
GROUP BY TimeStampDate
'
result
+---------------+--------------+
| TimeStampDate | RequestCount |
+---------------+--------------+
|    2023-08-27 |         2150 |
|    2023-08-28 |           70 |
|    2023-08-26 |         3474 |
+---------------+--------------+

月次

bq query --nouse_legacy_sql '
SELECT 
  DATE_TRUNC(DATE(EdgeStartTimestamp), MONTH) AS TimeStampMonth, 
  COUNT(*) AS RequestCount
FROM kyouhei_logpush.http_requests
GROUP BY TimeStampMonth
'
result
+----------------+--------------+
| TimeStampMonth | RequestCount |
+----------------+--------------+
|     2023-08-01 |         5694 |
+----------------+--------------+

トラフィック量(EdgeResponseBytes

日次

bq query --nouse_legacy_sql '
SELECT DATE(EdgeStartTimestamp) AS TimeStampDate, SUM(EdgeResponseBytes) AS EdgeResponseBytesSum
FROM kyouhei_logpush.http_requests
GROUP BY TimeStampDate
'
result
+---------------+----------------------+
| TimeStampDate | EdgeResponseBytesSum |
+---------------+----------------------+
|    2023-08-28 |               375295 |
|    2023-08-27 |             13484186 |
|    2023-08-26 |             14787998 |
+---------------+----------------------+

月次

bq query --nouse_legacy_sql '
SELECT
  DATE_TRUNC(DATE(EdgeStartTimestamp), MONTH) AS TimeStampMonth,
  SUM(EdgeResponseBytes) AS EdgeResponseBytesSum
FROM kyouhei_logpush.http_requests
GROUP BY TimeStampMonth
'
result
+----------------+----------------------+
| TimeStampMonth | EdgeResponseBytesSum |
+----------------+----------------------+
|     2023-08-01 |             28650878 |
+----------------+----------------------+

まとめ

BigQuery にデータを書き込む方法の調査から実装まで1つ1つ確認する中で学ぶことは多かったです。

BigQuery の集計の速さを体感するとともに、データ挿入後すぐにクエリに反映される形で使いやすくできました。

従来では Google Cloud Storage (GCS) にログファイルを保管しつつ、Google Cloud Functions (GCF) で定期ロードすることが主流でした。

Cloudflare で Workers を使うことで、Google Cloud 側のコストを最小限にしつつ、最新のデータ活用につなげられそうです。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1