2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

TROCCO®Advent Calendar 2024

Day 16

TROCCOのHTTPS転送でSlack APIのページングを無理やり実現してみた

Last updated at Posted at 2024-12-15

TROCCOのHTTPS転送ではカーソルベースのページングに対応しており、カーソルが以下のいずれかになるまでリクエストを繰り返します。

  • カーソルが含まれていない
  • カーソルの値が null

参考: 転送元 - HTTP・HTTPS

しかし、Slack APIのページングでは最終ページのカーソルに空文字が入ることになっており、HTTPS転送でページングを行うと下記のような感じで無限ループしてしまいます。

Request 1: GET https://slack.com/api/conversations.list
Request 2: GET https://slack.com/api/conversations.list?cursor=XXXXXXXX
Request 3: GET https://slack.com/api/conversations.list?cursor=YYYYYYYY
Request 4: GET https://slack.com/api/conversations.list?cursor=ZZZZZZZZ
Request 5: GET https://slack.com/api/conversations.list?cursor=
Request 6: GET https://slack.com/api/conversations.list?cursor=XXXXXXXX
...

この問題を無理やり解決してみたので、この記事ではその方法について記します。

解決方法

解決方法を図にまとめたものを以下に示します(図はconversations.list APIを叩く場合を例として描かれています)。

この方法のポイントは次の点です。

  1. Slack APIのレスポンスをJSON Lines(=1行のレコード)としてBigQuery上にあるテーブルへ追記
  2. TROCCOの転送ジョブ設定で転送日時カラムを設定
  3. TROCCOのカスタム変数ループで最後に転送されたレコードの next_cursor を取得し、それを使って次のSlack APIリクエストを行う
  4. 1.~3.を行うジョブをページ数分だけ複製する

(補足)ジョブ複製なしで実現する方法

4.のジョブ複製をなしで実現する方法もあります。詳細は後述します。

具体例

次の要件を満たすワークフローを作成する場合を例に具体的に説明します。

要件:

全体像

ワークフローの全体像は次のとおりです(図は4ページまで取得する場合で描かれています)。

テーブルリネージは次のとおりです。conversations.list APIのレスポンスを一旦conversations_listテーブルへ格納し、その後加工してchannelsテーブルを作成します。

① conversations_listテーブル初期化用データマートシンクジョブ

データマートシンクジョブで次のクエリを実行し、conversations_listテーブルを新規作成(すでに存在している場合は初期化)します。

conversations_listテーブルはconversations.list APIのレスポンスを1レスポンス1行で格納していくためのテーブルです。

(補足)1レスポンス1行で格納する理由

Slack APIではレスポンスのJSONオブジェクトの構造が要素(conversations.listの場合、チャンネル)によって微妙に異なることがあるためです。

詳細は次の記事をご覧ください。

「内定者インターン時代に半年かけて作った全社横断のSlackデータ基盤、TROCCOなら爆速構築できる説」の検証 - Yappli Tech Blog

このジョブのポイントは次のとおりです。

  • ②、③で転送日時を格納するためのカラム(今回は transferred_at )を用意する
  • transferred_at に確実に最古なタイムスタンプ(今回は 2000-01-01 00:00:00 )を設定する
CREATE OR REPLACE TABLE `YOUR_PROJECT_ID.YOUR_DATASET.conversations_list`
AS

SELECT
  false AS ok,
  "" AS channels,
  "" AS response_metadata,
  TIMESTAMP("2000-01-01 00:00:00") AS transferred_at, -- 確実に最古なタイムスタンプを設定
  "invalid_cursor" AS `error`

② conversations.list API初回リクエスト用データ転送ジョブ

conversations.list APIを叩くためのデータ転送ジョブの初回リクエスト用です(初回と2回目以降でジョブを分けている理由については後述します)。

TROCCOでの転送設定は次のとおりです。conversations.list APIのレスポンスを1行のJSON Linesとして①で作成したconversations_listテーブルへAPPENDしています。

image.png

③ conversations.list API2回目以降リクエスト用データ転送ジョブ

conversations.list APIを叩くためのデータ転送ジョブの2回目以降リクエスト用です。

TROCCOでの転送設定は下図のとおりです。②のジョブとの違いを以下に記します。

  • カスタム変数 $cursor$ を設定している
    $cursor$ はカスタム変数ループで次のページへのカーソル値を入れるための変数です
  • conversations.list APIのパラメータに cursor=$cursor$ を設定している
    → これにより、 $cursor$next_cursor の値をセットすればページ送りができます

image.png

このジョブに対して、ワークフロー上で次のカスタム変数ループを設定します。設定しているクエリでは、最後に転送されたレコードの next_cursor$cursor$ へセットしています(つまり、このカスタム変数ループは1周しかしません)。

image.png

ワークフロー上でこのジョブを複製すれば、複製した数だけページを送ることができます。

(補足)初回と2回目以降でジョブを分けている理由

下記のようにすれば、2回目以降のジョブでも初回のジョブと同じことができるように思えます。

  • ①のジョブでconversations_listテーブルを初期化する際に、 ok カラムの値に false を入れておく
  • これにより、次のようにして③の2回目以降リクエスト用ジョブで②の初回リクエスト用ジョブを実現できるはず
    • ③のジョブが初めて動くとき、先ほどのカスタム変数ループのクエリは ok=false として動く
    • この際に返す値を空文字にしておけば、リクエスト時に叩かれるURLは次のようになり、初回リクエストが実現できる
      conversations.list?cursor=
      

クエリで説明したものを下記に記します。

  IF(ok, -- Slack APIの仕様より、レスポンスが正常に取得できていたらok=trueが入っている
    JSON_EXTRACT_SCALAR(response_metadata, "$.next_cursor"), -- next_cursorを取得
+   "" -- 初回実行時に$cursor$へセットされる値
+      -- 空文字にしておけばconversations.list?cursor=となり、初回用ジョブを2回目以降用ジョブで実現できる
    ) AS next_cursor

しかし、実際にやってみると実行時に次のエラーが出てしまいます。どうやらカスタム変数に空文字はセットできないようです。

IllegalStateException: params: value or values is required. Suppressed: NullPointerException

この問題を回避するため、初回と2回目以降でジョブを分けている次第です。


(以下、個人的なコメントです)

転送設定でカスタム変数を設定する際に、値を空で設定できないのはこのためだったんですね…。

image.png

ただ、HTTPS転送のカーソルベースのページングでは空文字でいけていたわけなので、下記2つで処理(バリデーション等)が異なるのだと思われます :thinking:

  • HTTPS転送でのカーソル値のセット方法
  • カスタム変数によるカーソル値のセット方法

④ channelsテーブル作成用データマートシンクジョブ

次のクエリを実行し、conversations_listテーブルからchannlesテーブルを作成します。

CREATE OR REPLACE TABLE `YOUR_PROJECT_ID.YOUR_DATASET.channels`
AS

WITH

unnest_channels AS (
SELECT
  channel
FROM
  `YOUR_PROJECT_ID.YOUR_DATASET.conversations_list`,
  UNNEST(JSON_EXTRACT_ARRAY(channels, "$.")) AS channel
),

final AS (
SELECT
  JSON_EXTRACT_SCALAR(channel, "$.id") AS channel_id,
  JSON_EXTRACT_SCALAR(channel, "$.name") AS channel_name,
  channel
FROM
  unnest_channels
)

SELECT * FROM final

ジョブ複製なしで実現する方法

カスタム変数ループを利用することで、③の2回目以降リクエスト用ジョブの複製を不要にすることができます。

方法

ワークフローの全体像は次のとおりです。

ポイントは次のとおりです。

💡 ③子のジョブのカスタム変数に $loop$ を追加する

$cursor$ に加え、 $loop$ をカスタム変数ループに追加します。 $loop$ は後述する親ワークフローでのカスタム変数ループで使用します。

image.png

💡 ③子のジョブをまとめて1つのワークフロー(子ワークフロー)に切り出し、親ワークフローの③親のジョブで $loop$ を使ってカスタム変数ループさせる

ワークフロー全体像の下図の部分です。カスタム変数ループのクエリにて、 $loop$ をループ変数として GENERATE_ARRAY() で指定した回数だけループさせる=③子のジョブを実行させるようにしています。指定する回数はページ数より確実に多くしてください(でないと、最終ページまで到達できません)。

image.png

💡 最終ページに到達したら無効なカーソル値を返し、わざとジョブを失敗させてループを終了させる

③子のジョブのカスタム変数ループに設定するクエリを次のようにします。ポイントは、最終ページだったら無効なカーソル値を返すことです。これにより、以降のループで無効なリクエストが発生してジョブが失敗し、ループが終了します。

SELECT
  IF(ok,
+   IF(JSON_EXTRACT_SCALAR(response_metadata, "$.next_cursor") = "", -- 最終ページかどうかを判別
+     "xxx", -- 最終ページだった場合、ループを止めるために無効なカーソル値を返す
+     JSON_EXTRACT_SCALAR(response_metadata, "$.next_cursor") -- それ以外の場合、next_cursorを取得して返す
+     ),
    ""
    ) AS next_cursor
FROM
  `YOUR_GC_PROJECT_ID.YOUR_DATASET.conversations_list`
ORDER BY
  transferred_at DESC
LIMIT
  1

ループが終了するまでの流れをトレースした図を以下に示します(図は3ページ目が最終ページだった場合で描かれています)。

このままだと③親のジョブがエラーとなり、そこで親ワークフローが終了してしまいます。これを回避するため、親ワークフローはエラーが発生しても処理を継続するよう設定します。

image.png

留意点

この方法だとワークフローの実行が成功していようが失敗していようが完了ステータスがエラーとなってしまいます。そのため、ワークフローのエラーハンドリングが難しくなります。

(補足)カスタム変数ループに設定したクエリの実行タイミング

カスタム変数ループに設定したクエリが実行されるのはループ開始時のみで、ループのたびに実行されるわけではないようです。これは、ループの終了条件に動的な要素を加えることはできないことを意味します。

例えば今回の場合、チャート1のフローチャートのような処理はできません。そのため、チャート2のフローチャートのような処理を行っています。

チャート1
(これはできない)
チャート2
(だからこうしている)

結び

この記事では、TROCCOのHTTPS転送を使ってSlack APIのページングを無理やり行う方法について記しました。

書いておいてなんですが、ここまでするんだったら素直にコード書いてスクリプトを用意した方が運用しやすいんじゃないかなと思います…。ただ、今回採った「レスポンスを一旦JSON LinesとしてAPPENDして後で整形する」という方法はスクリプトを用意する場合でも有用かなと思います(つまり、Slack APIを叩くのをTROCCOでやるか自作スクリプトでやるかの違いです)。

今回の取り組みで、普段なら直面しないであろうTROCCOの細かい仕様に触れることができてとても面白かったです!

ここまでお読みいただきありがとうございました :bow:

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?