背景
Marketoのデータを取得したい。
Leadsのデータをバルクで取得するサンプルをメモしておく。
他のデータリソースにも使える。
始める前に
Getting Started をやっておくとREST APIの雰囲気が掴める
REST API
Bulk取得の説明ページをやっておくと雰囲気掴める
Bulk Extract
Bulk取得のサンプルコード
import requests
import json
import time
import os
import pendulum
from google.cloud import bigquery
# ↓管理画面から取得できる
# https://developers.marketo.com/rest-api/
ENDPOINT = 'YOUR_ENDPOINT'
CLIENT_ID = 'YOUR_CLIENT_ID'
CLIENT_SECRET = 'YOUR_CLIENT_SECRET'
class MarketoAPI:
def __init__(self, target_date):
self.target_date = target_date
self.token = self.get_token()
self.header = {
'Authorization': 'Bearer {}'.format(self.token),
'content-type': 'application/json'
}
self.export_id = None # job作成時にセットされる
# CLIENT_IDとCLIENT_SECRETからtokenを発行する
# 管理画面から発行したtokenはすぐexpireされるので注意
def get_token(self):
URL = '{}/identity/oauth/token?client_id={}&client_secret={}&grant_type=client_credentials'.format(
ENDPOINT, CLIENT_ID, CLIENT_SECRET
)
headers = {'content-type': 'application/json'}
r = requests.get(URL, headers=headers)
if r.status_code != requests.codes.ok:
r.raise_for_status()
return r.json()['access_token']
# 今回はBulkExtractのAPIを使う
# まずはLEADを取得するジョブの作成
# https://developers.marketo.com/rest-api/bulk-extract/bulk-lead-extract/
def create(self):
URL = '{}/bulk/v1/leads/export/create.json'.format(ENDPOINT)
payload = {
# 取得したいfieldを指定する
# 指定可能なfieldsは `GET /rest/v1/leads/describe.json` のAPIで確認できる
"fields": [
"id",
"email",
"createdAt",
"updatedAt"
],
"format": "CSV",
# フィルターを指定する
# 指定可能なfileterは↓docに書いてある
# https://developers.marketo.com/rest-api/bulk-extract/bulk-lead-extract/
"filter": {
"createdAt": {
"startAt": self.target_date.isoformat(),
"endAt": self.target_date.end_of('day').isoformat()
}
}
}
r = requests.post(URL, data=json.dumps(payload), headers=self.header)
if r.status_code != requests.codes.ok:
r.raise_for_status()
export_id = r.json()['result'][0]['exportId']
self.export_id = export_id
return export_id
# create()で作成しただけではジョブは処理されない
# ここでジョブをenqueueすることによって実行待ちにできる
def enqueue(self):
URL = '{}/bulk/v1/leads/export/{}/enqueue.json'.format(
ENDPOINT, self.export_id
)
r = requests.post(URL, headers=self.header)
if r.status_code != requests.codes.ok:
r.raise_for_status()
# enqueueさせたjobの処理終わるまでstatuspを pollingするメソッド
def check_until_done(self):
URL = '{}/bulk/v1/leads/export/{}/status.json'.format(
ENDPOINT, self.export_id
)
# 3分待ってからリクエスト
# Completedになるまで3回実行する
for i in range(3):
time.sleep(60 * 3)
r = requests.get(URL, headers=self.header)
if r.status_code == requests.codes.ok:
if r.json()['result'][0]['status'] == 'Completed':
return True
if r.status_code != requests.codes.ok:
r.raise_for_status()
else:
raise('job status is: ' + r.json()['result'][0]['status'])
# jobが完了したらfileをDownloadするメソッド
def dl_file(self):
URL = '{}/bulk/v1/leads/export/{}/file.json'.format(
ENDPOINT, self.export_id
)
r = requests.get(URL, headers=self.header)
if r.status_code != requests.codes.ok:
r.raise_for_status()
with open('./marketo_lead_file_{}.csv'.format(self.target_date.strftime('%Y%m%d')), mode='w') as f:
f.write(r.text)
今回のcreateの条件だと以下のようなfileがDLされる
id,email,createdAt,updatedAt
111111,xxxxx@gmail.com,2020-01-02T15:35:37Z,2020-01-02T15:35:36Z
222222,xxxxx@gmail.com,2030-01-02T22:58:07Z,2020-01-02T22:58:07Z
※BulkのAPIは1日500MBまでデータをロードできないので注意