はじめに
以前に書いたデータエンジニアリングの入門記事「1年前の自分が読みたかった、データエンジニアリング入門」で、データ抽出ではAPIが重要な役割を果たすとともに、その管理は大変であるということを取り上げました。
具体的な内容として、一部は「trocco®でMarketoのリードデータを安全に全量転送する方法」の記事で整理しましたが、APIによるデータ抽出という観点からもう少し詳細に取り上げてみようと思います。
細かい具体の操作は何となくでいいので、大枠の処理を理解するのに参考にしてみてください。なお、事例としては引き続きMarketoのBulk Extract APIを取り上げますが、もちろんサービスによってAPIの仕様は異なるため、あくまで一事例でしかないということはご認識ください。
特に、直近でTableauのGraphQL(APIの一種)を触ってみましたが、REST(同じくAPIの一種)とあまりに勝手が違うので少し戸惑ってしまいました。触り方が分かってくるとだんだん面白くなってきたのですけどね。
こんな方におすすめ
- APIによるデータ抽出について詳しく理解したい
- ソースシステムからDWHへのデータ転送について知見を深めたい
本記事におけるMarketo APIの仕様は2024/2/29現在の公式ドキュメントに準拠しています。
簡単な前提知識
データエンジニアリング入門の記事では、DBやその他のシステムからのデータ抽出では、APIを利用することが多いと説明しました。そこで記載した説明を改めて転載しておきます。
APIとは、Application Programming Interfaceの略であり、ソフトウェアコンポーネント同士が互いに情報をやり取りするのに使うインターフェースのことです。決められた仕様通りに依頼を出すと、決められた処理をしてくれるもので、例えば窓口に行って仕様通りの書類を提出すると、決まった処理をしてくれるようなものと考えればいいでしょう。
自分が過去に書いた記事でも、「初投稿のPVが爆増したのがtrocco®とQiita APIでよくわかった話」でQiitaの記事データを取っているのはtrocco®でQiitaのAPIを叩いているものですし、「Google Apps Scriptでデータを組織に流通させる:①BigQueryにクエリを叩いてSlackに投稿する」でSlackの投稿をしているのは、Google Apps ScriptでSlackのAPIを叩いているものです。
APIの概念自体はデータ抽出目的に限定されませんが、特にデータ抽出という観点に絞れば、こういう定義のデータが欲しいというリクエストを出すことで、対象のデータというレスポンスが返ってくるというものになります。
また、なぜAPIを設置するかというと、APIがなければ個別開発をすることになりますが、それは利用者に多大なコストを強いてしまうことになるからです。サービス提供者としては、外部からアクセスし利用するための窓口を設計しておくことで、他サービスからの連携を行う際の開発コストを下げ、サービスをより利用しやすくしているのです。
DB系のものではJDBC/ODBCというものがよく利用されます(本記事ではこれ以上触れません)。その他のサービスではREST APIを使うことが多いですが、最近ではGraphQLも増えてきています。私自身はあまり詳しくないですが、RESTは1つ1つがシンプルなものの用途が増えていくと管理が大変になり、GraphQLは複雑なものの広い用途に柔軟に利用できるような特徴があります。
直近だとForkwellのYouTubeでの勉強会「2024年こそ使いこなす!GraphQL最前線」が勉強になったので、よろしければ参考までに。
APIの概要
では、ここからMarketoを事例にAPIの概要について見てみましょう。
種類
何らかのサービスのAPIを利用するには、「サービス名 API」で検索してみると、開発者向けのレファレンスのサイトに行きつくことができます。Marketoでも同様のものとして「Marketo Developers」のサイトがあり、APIとしてはREST APIとJavaScript APIがあります(SOAP APIというのもありますが、これは現在開発されていないものです)。
2種類ある背景としては、Marketo内部のデータ操作/抽出のためのもの(REST API)と、Marketoにトラッキングデータを送るためのクライアントサイドのもの(JavaScript API)となっているからです。MAはリードデータを内部で保持してSFA等と連携しつつ、マーケティング施策に対するユーザー行動を収集するものと考えると、内部と外部の両方の機能が必要というわけです。
そのなかでもREST APIとしては、Lead Database、Bulk Extract、Bulk Import、Assetsなどといったものがあります。
あまり詳しくは踏み込みませんが、Lead DatabaseはそのなかでもDescribe、Query、Create and Update、Deleteといった種類があり、データベースに入っているデータを細かく操作するために利用できるということが想像できます。
一方でBulk Extractについては、その名の通り大量データを一度に抽出する目的で設定がされています。
アクセス方法(RESTの場合)
REST APIにはURLに一定の構造があります。Marketoの公式ドキュメントの事例をそのまま取り上げると、
https://284-RPR-133.mktorest.com/rest/v1/lead/318581.json?fields=email,firstName,lastName
というURLがあったときに、これは
- Base URL:
https://284-RPR-133.mktorest.com/rest
- Path:
/v1/lead/
- Resource:
318582.json
- Query parameter:
fields=email,firstName,lastName
といった要素でできています。
APIドキュメントでは、
POST /bulk/v1/leads/export/create.json
のような前半が省略された形で記載されることがよくありますが、これはhttps://284-RPR-133.mktorest.com/rest/bulk/v1/leads/export/create.json
に対してPOSTでアクセスせよという意味になっています。
ここでPOSTとありますが、これは後で出てくるGETと合わせて、HTTPリクエストの方式を意味しています。
公開と認証
一定の秘匿情報を扱うようなサービスでは、外部からサービスを操作するようなAPIは必ずしも当初から利用可能ではありません。そこで、APIを利用開始するための設定と、アクセスを限定するための認証設定を払い出すのが一般的です。
Bulk Extract APIではOAuth2.0認証に対応しており、クエリパラメータにaccess_token={AccessToken}
とするか、HTTPヘッダーでAuthorization: Bearer {AccessToken}
とすることで認証ができます。
利用制限
APIを自由に使われてしまうとサービス自体に負荷をかけることになってしまうので、利用制限が設定されていることもよくあります。以前もまとめましたが、Bulk APIでは下記のようなものがあります。
- 抽出ジョブの最大同時実行数:2
- 抽出ジョブの最大キュー数(実行中のジョブ含む):10
- ファイル保持期間:7日間
- ※これは一度CSVでファイル出力し、そこから転送する仕組みであるため
- 日次のデフォルト抽出データ量上限:500MB
- CST(アメリカ中部標準時)の午前12時にリセットされる
- CSTはUTC-06:00なので、JST(UTC+09:00)とは15時間ずれており、日本時間では午前3時にリセットされる
- 追加料金を支払うことで上限の緩和は可能
- CST(アメリカ中部標準時)の午前12時にリセットされる
- 日付期間フィルターの上限(createdAt / updatedAt):31日間
- ※trocco®としては入力された日付が32日間以上のとき、自動で分割処理がされるので考慮不要
データを抽出する
ここまでの内容を踏まえて、具体的なデータ抽出の手順を見てみましょう。シンプルに1つAPIを叩くだけでは不十分であるというのがポイントです。
①ジョブを作成する
抽出ジョブを始めるには、どのような条件で何を抽出するかを指定してあげなければいけません。そこで、リードの抽出条件として下記のように指示を出します。
POST /bulk/v1/leads/export/create.json
{
"fields": [
"firstName",
"lastName"
],
"format": "CSV",
"columnHeaderNames": {
"firstName": "First Name",
"lastName": "Last Name"
},
"filter": {
"createdAt": {
"startAt": "2017-01-01T00:00:00Z",
"endAt": "2017-01-31T00:00:00Z"
}
}
}
ここでは、firstName、lastNameというフィールドを、First Name、Last Nameをヘッダー行とするCSVにエクスポートしようとしています。その際に、createdAtがstartAtとendAtの間の期間であるものにレコードをフィルターしています。
上記のリクエストに対して、下記のようなレスポンスが返ってきます。
{
"requestId": "e42b#14272d07d78",
"success": true,
"result": [
{
"exportId": "ce45a7a1-f19d-4ce2-882c-a3c795940a7d",
"status": "Created",
"createdAt": "2017-01-21T11:47:30-08:00",
"queuedAt": "2017-01-21T11:48:30-08:00",
"format": "CSV",
}
]
}
このexportIdが今後の処理で利用できるIDになります。
ちなみに、trocco®では個別のフィールド指定を不要にして簡易に全量取得できるように、rest/v1/{field}/describe.json
でフィールド一覧を取得し、その情報をもとに自動で設定してくれています。
②ジョブをキューに入れる
抽出ジョブは作成しただけでは実行されません。そこでキューに入れるためのAPIを叩きます。
POST /bulk/v1/leads/export/{exportId}/enqueue.json
大量データを抽出するBulk Extract APIでは、即座に処理が完了するわけではないので、次に状態の確認が必要になります。
③ジョブステータスをポーリングする
ポーリングとは、定期的に問合せをして状態の変化を確かめることを意味しています。ジョブを実行して終了を検知しないとデータを取り出せないので、待機中、実行中、完了など、そのための確認作業が必要になるのです。
GET /bulk/v1/leads/export/{exportId}/status.json
{
"requestId": "e42b#14272d07d78",
"success": true,
"result": [
{
"exportId": "ce45a7a1-f19d-4ce2-882c-a3c795940a7d",
"status": "Completed",
"createdAt": "2017-01-21T11:47:30-08:00",
"queuedAt": "2017-01-21T11:48:30-08:00",
"startedAt": "2017-01-21T11:51:30-08:00",
"finishedAt": "2017-01-21T12:59:30-08:00",
"format": "CSV",
"numberOfRecords": 122323,
"fileSize": 123424,
"fileChecksum": "sha256:d9c73f0b6960c71623c8bafe29603b3e8e20fd0e4eeaefd119c0227506ea9be4"
}
]
}
上記のようにステータスがCompleted
になっていると、ファイルの作成が完了しているので、その後の抽出処理ができるようになります。
なお、ポーリングで処理をする際には、負荷を軽減するために指数バックオフという方法がよく利用されます。これは、ポーリングの間隔に関して、2秒、4秒、8秒、16秒のように待機時間を指数関数的に伸ばしていくような処理です。
④データを抽出する
ファイルが作成されていれば、後はそれを取得するだけです。下記のAPIで作成したファイルを取得します。
GET /bulk/v1/leads/export/{exportId}/file.json
これでファイルがレスポンスで送られてきて、データの抽出は完了です。
データを格納する
ここまででデータが抽出できましたが、もちろんそれで終わりではありません。今度はそのデータを格納する必要があります。ファイルをそのまま格納できるストレージサービスではある程度簡単に処理できるかもしれませんが、レコードとしてデータベースに挿入するには少し工夫が必要になってきます。
詳細はデータエンジニアリング入門の記事に記載したのでこちらでは簡単に留めますが、適切にデータを取り扱うために、
- データを一時テーブルに格納する
- それもデータ量が多いときは分割処理する
- 一時テーブルへの格納完了を確認した後に、ターゲットテーブルにデータを挿入する
- 洗い替え処理の場合は事前にターゲットテーブルを作成する
- 処理を終えた後に一時テーブルを削除する
といったような工夫が必要になってくるというわけです。
さいごに
ここまで調べてみて、APIでのデータ抽出の仕組みはとてもよくできているなと感心したものです。と同時に、やっぱりこれを自力でやるのは面倒だよねとも強く感じます。OSSに乗っかるとだいぶ楽になるのですが、とはいえOSSの実行環境の運用もそれはそれで手間が発生します。
APIの辛さについては以前下記のように書いていましたが、
- API仕様を理解しないとリクエストを投げられないので、抽出したいシステムの数だけAPIの仕様を理解しなければならない
- API仕様はサービス提供者によって変更されることがあるので、その場合は仕様変更を検知してプログラムの修正を行わなければならない
- API仕様によっては利用に制限がある(Rate Limitや不正レコードの際の挙動)場合があり、リカバリの処理が必要になる
- そもそもネットワークを介してAPIにアクセスしにいくと、ネットワークの状態によってエラーが発生することがある
このあたりの詳細についてイメージは明確になったでしょうか。データエンジニアリングの入門者に、本記事が少しでもお役に立てれば嬉しいです。