背景
Pythonを使ってSalesforceからデータを取得したい。
今回は simple_salesforce というライブラリを使用し、Salsesorce APIのBulkAPI を使ってデータを取得し、それをBigQueryにロードする。
認証周り
以下の3つを用意する
- username
- password
- security_token
- security_tokenは管理画面の [私のセキュリティトークンのリセット] で発行されるものを用意する
実装例
from google.cloud import bigquery
from simple_salesforce import Salesforce
import json
import os
class SalesforceAPI:
def __init__(self, job_type):
self.sf = Salesforce(
username='USERNAME',
password='PASSWORD',
security_token='SECURITY_TOKEN'
)
def execute(self):
self.dl_file()
self.load_to_bq()
def dl_file(self):
res = self.sf.bulk.TABLE_NAME.query('SELECT column1, column2 FROM TABLE_NAME')
with open('dl_file_name', mode='w') as f:
for d in res:
f.write(json.dumps(d, ensure_ascii=False) + "\n") # 日本語の文字化け対応
def load_to_bq(self):
client = bigquery.Client('project')
filename = 'file_name'
dataset_id = 'dataset'
dataset_ref = client.dataset(dataset_id)
table_id = 'table_name'
table_ref = dataset_ref.table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
job_config.autodetect = True # 必要ならばschemaを指定
with open(filename, "rb") as source_file:
job = client.load_table_from_file(
source_file, table_ref, job_config=job_config
)
job.result()
print("Loaded {} rows into {}:{}.".format(
job.output_rows, dataset_id, table_id))
参考
- ここで紹介されているSOQL実行ツールが便利
- DL用のSOQLの作成はこれを使うといい