TROCCOのHTTPS転送ではカーソルベースのページングに対応しており、カーソルが以下のいずれかになるまでリクエストを繰り返します。
- カーソルが含まれていない
- カーソルの値が
null
参考: 転送元 - HTTP・HTTPS
しかし、Slack APIのページングでは最終ページのカーソルに空文字が入ることになっており、HTTPS転送でページングを行うと下記のような感じで無限ループしてしまいます。
Request 1: GET https://slack.com/api/conversations.list
Request 2: GET https://slack.com/api/conversations.list?cursor=XXXXXXXX
Request 3: GET https://slack.com/api/conversations.list?cursor=YYYYYYYY
Request 4: GET https://slack.com/api/conversations.list?cursor=ZZZZZZZZ
Request 5: GET https://slack.com/api/conversations.list?cursor=
Request 6: GET https://slack.com/api/conversations.list?cursor=XXXXXXXX
...
この問題を無理やり解決してみたので、この記事ではその方法について記します。
解決方法
解決方法を図にまとめたものを以下に示します(図はconversations.list APIを叩く場合を例として描かれています)。
この方法のポイントは次の点です。
- Slack APIのレスポンスをJSON Lines(=1行のレコード)としてBigQuery上にあるテーブルへ追記
- TROCCOの転送ジョブ設定で転送日時カラムを設定
- TROCCOのカスタム変数ループで最後に転送されたレコードの
next_cursor
を取得し、それを使って次のSlack APIリクエストを行う - 1.~3.を行うジョブをページ数分だけ複製する
(補足)ジョブ複製なしで実現する方法
4.のジョブ複製をなしで実現する方法もあります。詳細は後述します。
具体例
次の要件を満たすワークフローを作成する場合を例に具体的に説明します。
要件:
- conversations.list(チャンネル一覧を取得するAPIメソッド) を叩いてBigQuery上に「channels」というチャンネル一覧テーブルを作成する
- channelsテーブルは日次で洗い替える
全体像
ワークフローの全体像は次のとおりです(図は4ページまで取得する場合で描かれています)。
テーブルリネージは次のとおりです。conversations.list APIのレスポンスを一旦conversations_listテーブルへ格納し、その後加工してchannelsテーブルを作成します。
① conversations_listテーブル初期化用データマートシンクジョブ
データマートシンクジョブで次のクエリを実行し、conversations_listテーブルを新規作成(すでに存在している場合は初期化)します。
conversations_listテーブルはconversations.list APIのレスポンスを1レスポンス1行で格納していくためのテーブルです。
(補足)1レスポンス1行で格納する理由
Slack APIではレスポンスのJSONオブジェクトの構造が要素(conversations.listの場合、チャンネル)によって微妙に異なることがあるためです。
詳細は次の記事をご覧ください。
「内定者インターン時代に半年かけて作った全社横断のSlackデータ基盤、TROCCOなら爆速構築できる説」の検証 - Yappli Tech Blog
このジョブのポイントは次のとおりです。
- ②、③で転送日時を格納するためのカラム(今回は
transferred_at
)を用意する transferred_at
に確実に最古なタイムスタンプ(今回は2000-01-01 00:00:00
)を設定する
CREATE OR REPLACE TABLE `YOUR_PROJECT_ID.YOUR_DATASET.conversations_list`
AS
SELECT
false AS ok,
"" AS channels,
"" AS response_metadata,
TIMESTAMP("2000-01-01 00:00:00") AS transferred_at, -- 確実に最古なタイムスタンプを設定
"invalid_cursor" AS `error`
② conversations.list API初回リクエスト用データ転送ジョブ
conversations.list APIを叩くためのデータ転送ジョブの初回リクエスト用です(初回と2回目以降でジョブを分けている理由については後述します)。
TROCCOでの転送設定は次のとおりです。conversations.list APIのレスポンスを1行のJSON Linesとして①で作成したconversations_listテーブルへAPPENDしています。
③ conversations.list API2回目以降リクエスト用データ転送ジョブ
conversations.list APIを叩くためのデータ転送ジョブの2回目以降リクエスト用です。
TROCCOでの転送設定は下図のとおりです。②のジョブとの違いを以下に記します。
-
カスタム変数
$cursor$
を設定している
→$cursor$
はカスタム変数ループで次のページへのカーソル値を入れるための変数です -
conversations.list APIのパラメータに
cursor=$cursor$
を設定している
→ これにより、$cursor$
にnext_cursor
の値をセットすればページ送りができます
このジョブに対して、ワークフロー上で次のカスタム変数ループを設定します。設定しているクエリでは、最後に転送されたレコードの next_cursor
を $cursor$
へセットしています(つまり、このカスタム変数ループは1周しかしません)。
ワークフロー上でこのジョブを複製すれば、複製した数だけページを送ることができます。
(補足)初回と2回目以降でジョブを分けている理由
下記のようにすれば、2回目以降のジョブでも初回のジョブと同じことができるように思えます。
- ①のジョブでconversations_listテーブルを初期化する際に、
ok
カラムの値にfalse
を入れておく - これにより、次のようにして③の2回目以降リクエスト用ジョブで②の初回リクエスト用ジョブを実現できるはず
- ③のジョブが初めて動くとき、先ほどのカスタム変数ループのクエリは
ok=false
として動く - この際に返す値を空文字にしておけば、リクエスト時に叩かれるURLは次のようになり、初回リクエストが実現できる
conversations.list?cursor=
- ③のジョブが初めて動くとき、先ほどのカスタム変数ループのクエリは
クエリで説明したものを下記に記します。
IF(ok, -- Slack APIの仕様より、レスポンスが正常に取得できていたらok=trueが入っている
JSON_EXTRACT_SCALAR(response_metadata, "$.next_cursor"), -- next_cursorを取得
+ "" -- 初回実行時に$cursor$へセットされる値
+ -- 空文字にしておけばconversations.list?cursor=となり、初回用ジョブを2回目以降用ジョブで実現できる
) AS next_cursor
しかし、実際にやってみると実行時に次のエラーが出てしまいます。どうやらカスタム変数に空文字はセットできないようです。
IllegalStateException: params: value or values is required. Suppressed: NullPointerException
この問題を回避するため、初回と2回目以降でジョブを分けている次第です。
(以下、個人的なコメントです)
転送設定でカスタム変数を設定する際に、値を空で設定できないのはこのためだったんですね…。
ただ、HTTPS転送のカーソルベースのページングでは空文字でいけていたわけなので、下記2つで処理(バリデーション等)が異なるのだと思われます
- HTTPS転送でのカーソル値のセット方法
- カスタム変数によるカーソル値のセット方法
④ channelsテーブル作成用データマートシンクジョブ
次のクエリを実行し、conversations_listテーブルからchannlesテーブルを作成します。
CREATE OR REPLACE TABLE `YOUR_PROJECT_ID.YOUR_DATASET.channels`
AS
WITH
unnest_channels AS (
SELECT
channel
FROM
`YOUR_PROJECT_ID.YOUR_DATASET.conversations_list`,
UNNEST(JSON_EXTRACT_ARRAY(channels, "$.")) AS channel
),
final AS (
SELECT
JSON_EXTRACT_SCALAR(channel, "$.id") AS channel_id,
JSON_EXTRACT_SCALAR(channel, "$.name") AS channel_name,
channel
FROM
unnest_channels
)
SELECT * FROM final
ジョブ複製なしで実現する方法
カスタム変数ループを利用することで、③の2回目以降リクエスト用ジョブの複製を不要にすることができます。
方法
ワークフローの全体像は次のとおりです。
ポイントは次のとおりです。
💡 ③子のジョブのカスタム変数に $loop$
を追加する
$cursor$
に加え、 $loop$
をカスタム変数ループに追加します。 $loop$
は後述する親ワークフローでのカスタム変数ループで使用します。
💡 ③子のジョブをまとめて1つのワークフロー(子ワークフロー)に切り出し、親ワークフローの③親のジョブで $loop$
を使ってカスタム変数ループさせる
ワークフロー全体像の下図の部分です。カスタム変数ループのクエリにて、 $loop$
をループ変数として GENERATE_ARRAY()
で指定した回数だけループさせる=③子のジョブを実行させるようにしています。指定する回数はページ数より確実に多くしてください(でないと、最終ページまで到達できません)。
💡 最終ページに到達したら無効なカーソル値を返し、わざとジョブを失敗させてループを終了させる
③子のジョブのカスタム変数ループに設定するクエリを次のようにします。ポイントは、最終ページだったら無効なカーソル値を返すことです。これにより、以降のループで無効なリクエストが発生してジョブが失敗し、ループが終了します。
SELECT
IF(ok,
+ IF(JSON_EXTRACT_SCALAR(response_metadata, "$.next_cursor") = "", -- 最終ページかどうかを判別
+ "xxx", -- 最終ページだった場合、ループを止めるために無効なカーソル値を返す
+ JSON_EXTRACT_SCALAR(response_metadata, "$.next_cursor") -- それ以外の場合、next_cursorを取得して返す
+ ),
""
) AS next_cursor
FROM
`YOUR_GC_PROJECT_ID.YOUR_DATASET.conversations_list`
ORDER BY
transferred_at DESC
LIMIT
1
ループが終了するまでの流れをトレースした図を以下に示します(図は3ページ目が最終ページだった場合で描かれています)。
このままだと③親のジョブがエラーとなり、そこで親ワークフローが終了してしまいます。これを回避するため、親ワークフローはエラーが発生しても処理を継続するよう設定します。
留意点
この方法だとワークフローの実行が成功していようが失敗していようが完了ステータスがエラーとなってしまいます。そのため、ワークフローのエラーハンドリングが難しくなります。
(補足)カスタム変数ループに設定したクエリの実行タイミング
カスタム変数ループに設定したクエリが実行されるのはループ開始時のみで、ループのたびに実行されるわけではないようです。これは、ループの終了条件に動的な要素を加えることはできないことを意味します。
例えば今回の場合、チャート1のフローチャートのような処理はできません。そのため、チャート2のフローチャートのような処理を行っています。
チャート1 (これはできない) |
チャート2 (だからこうしている) |
結び
この記事では、TROCCOのHTTPS転送を使ってSlack APIのページングを無理やり行う方法について記しました。
書いておいてなんですが、ここまでするんだったら素直にコード書いてスクリプトを用意した方が運用しやすいんじゃないかなと思います…。ただ、今回採った「レスポンスを一旦JSON LinesとしてAPPENDして後で整形する」という方法はスクリプトを用意する場合でも有用かなと思います(つまり、Slack APIを叩くのをTROCCOでやるか自作スクリプトでやるかの違いです)。
今回の取り組みで、普段なら直面しないであろうTROCCOの細かい仕様に触れることができてとても面白かったです!
ここまでお読みいただきありがとうございました