はじめに
これは Kyash Advent Calendar 2021 の7日目の記事です
Kyashで、経理・会計・資金決済法の担当エンジニアをしているPrinceチームの清水(@thimi0412)です。
Princeチームについてはこちら → https://foxtoy.hateblo.jp/entry/2021/07/19/100223
KyashではGoogle Cloud Composerを使用して会計業務に使うViewテーブルの作成などをおこなっています。
その中でBigQueryに対してクエリを実行し、結果をスプレッドシートに書き込み指定された共有ドライブのフォルダ内に入れる等の操作が必要になりCloudComposerでGoogle Workspace系のAPIを使う機会があったのでハマった点やどう使っているかを書きます。
Cloud Composerとは
Apache Airflow(ワークフロー管理ツール) で構築された、フルマネージドのワークフロー オーケストレーション サービスです。
https://cloud.google.com/composer
Google Workspace系のOperatorは共有ドライブに対応していないので注意
スプレッドシートを作成したのちそのシートに対してBigQueryのクエリ結果を書き込む処理をしたいと思ったのでGoogleSheetsCreateSpreadsheetOperatorを使用してスプレッドシートを作成したのですがフォルダIDが見つからないというエラーが出ました。
Operatorの使用の例
SPREADSHEET = {
"name": sheet_name, # シート名
"parents": [parent_folder_id], # シートを作成するフォルダID
"mimeType": "application/vnd.google-apps.spreadsheet",
}
create_spreadsheet = GoogleSheetsCreateSpreadsheetOperator(
task_id="create_spreadsheet", spreadsheet=SPREADSHEET
)
ググっても情報が出てこなかったのでairflowのソースコードを調べました。
分かったことはcreate()
の引数としてsupportsAllDrives=true
のパラメータがセットされていないことが問題でした。
共有ドライブに作成する際にはこのパラメータをセットする必要があります(他のGoogle Workspace系のOperatorも同様)。なんで対応してないんだ?
ref Implement shared drive support
def create_spreadsheet(self, spreadsheet: Dict[str, Any]) -> Dict[str, Any]:
"""
Creates a spreadsheet, returning the newly created spreadsheet.
:param spreadsheet: an instance of Spreadsheet
https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets#Spreadsheet
:type spreadsheet: Dict[str, Any]
:return: An spreadsheet object.
"""
self.log.info("Creating spreadsheet: %s", spreadsheet['properties']['title'])
response = (
self.get_conn().spreadsheets().create(body=spreadsheet).execute(num_retries=self.num_retries)
)
self.log.info("Spreadsheet: %s created", spreadsheet['properties']['title'])
return response
GoogleSheetsCreateSpreadsheetOperatorは共有ドライブには対応していないのでスプレッドシート作成の関数を自作しました。
この関数をPythonOperator内から呼び出してスプレッドシートを作成しています。フォルダを作成する場合はmimeType
をapplication/vnd.google-apps.folder
にすればフォルダも作成できます。
from airflow.providers.google.suite.hooks.sheets import GSheetsHook
def create_spread_sheet(sheet_name, parent_folder_id):
"""スプレッドシートを作成する
Args:
sheet_name (string): シート名
parent_folder_id (string): シートを作成する親フォルダのID
Returns:
[type]: [description]
"""
hook = GoogleDriveHook()
drive = hook.get_conn()
file_metadata = {
"name": sheet_name,
"parents": [parent_folder_id],
"mimeType": "application/vnd.google-apps.spreadsheet",
# "mimeType": "application/vnd.google-apps.folder", フォルダの場合
}
# supportsAllDrives=Trueがないと共有ドライブに作成できない
res = drive.files().create(body=file_metadata, supportsAllDrives=True).execute()
return res
フォルダIDとシートIDはXComで受け取ると便利
XComは異なるタスク間で値をやりとりする手段です。
フォルダ作成→スプレッドシート作成→データの書きこみの依存関係はこのように作成していて、フォルダ、スプレッドシートの作成はそれぞれPythonOperator内でおこなっているのでPythonOperatorも戻り値としてレスポンスのid
(フォルダやスプレッドシートのID)を返すことで別のタスクからXComを使用してその値を受け取れます。
def create_folder(**kwargs):
"""フォルダを作成する
Returns:
[str]: 作成されたフォルダのID
"""
folder_name = '<yyyy-mm-dd>'
folder_id = '<FOLDER_ID>'
res = create_drive_folder(folder_name, folder_id)
return res["id"]
def create_spreadsheet(**context):
"""スプレッドシートを作成する
Returns:
[str]: 作成したスプレッドシートのID
"""
# 前taskのcreate_folderから作成したフォルダのIDを受け取る
folder_id = context["task_instance"].xcom_pull(task_ids="create_folder")
sheet_name = "sheet_A"
res = create_spread_sheet(sheet_name, folder_id)
return res["id"]
with DAG(
DAG_NAME,
description="spreadsheet_test",
schedule_interval="0 23 * * *", # 毎日8:00(JST)に実行
catchup=False,
default_args=default_args,
) as dag:
task_create_folder = PythonOperator(
task_id="create_folder",
python_callable=create_folder,
provide_context=True,
dag=dag,
)
task_create_spreadsheet_A = PythonOperator(
task_id="create_spreadsheet_A",
python_callable=create_spreadsheet,
provide_context=True,
dag=dag,
)
task_create_spreadsheet_B = PythonOperator(
task_id="create_spreadsheet_B",
python_callable=create_spreadsheet,
provide_context=True,
dag=dag,
)
task_create_spreadsheet_C = PythonOperator(
task_id="create_spreadsheet_C",
python_callable=create_spreadsheet,
provide_context=True,
dag=dag,
)
task_create_folder >> task_create_spreadsheet_A
task_create_folder >> task_create_spreadsheet_B
task_create_folder >> task_create_spreadsheet_C
etc task失敗時のslack通知
各taskが失敗した際にon_failure_callback
に関数を登録しておくとslackの通知するようにしています。
logのurlの取得ができるのでslackの通知から直接logに飛べるのでとても便利。
↓のコードではslackweb
を使用して通知していますがairflowのproviderにもslackのhookが用意されているのでそっちを使ってもよかったと作ってから思いました。
- 参考にしたもの https://www.mikulskibartosz.name/send-cusomized-slack-notification-when-airflow-task-fails/
- slack provider https://github.com/apache/airflow/tree/main/airflow/providers/slack
def failure_notification(context):
task_instance = context["task_instance"]
dag_name = context["dag"]
task_name = task_instance.task_id
log_link = f"<{task_instance.log_url}|{task_name}>"
error_message = str(context.get("exception") or context.get("reason"))
slack = slackweb.Slack(url=SLACK_URL)
attachments = [
{
"username": "Cloud Composer(Airflow)",
"icon_emoji": ":airflow:",
"color": "danger",
"title": "エラーが発生しました",
"fields": [
{"title": "Project", "value": f"{PROJECT_ID}"},
{"title": "<DAG name> Task name", "value": f"{dag_name} {task_name}"},
{"title": "Log URL", "value": log_link},
{"title": "Error Message", "value": error_message},
],
}
]
slack.notify(attachments=attachments)
最後に
CloudComposer(airflow)のprovider周りのOperatorはあんまりドキュメントが整理されていないので直接airflowのソースコードを読んでどういった仕様になっているのか調べたほうがいいなと思いました。
今回はGoogle WorkspaceのAPI周りを調べましたがairflowのソースコードとGoogle WorkspaceのAPIのドキュメントを繰り返し見て作業してました。
providerはgoogleやslack以外にも多くあるので他のサービスと連携する際にproviderを見てみて連携するサービスがあれば一から実装しなくても良くなるかもしれないのでチェックしておくといいかもと思いました。PR送れるくらいGoogleのproviderのコードを読んだので共有ドライブ対応のPRを送ろうと思っています。
Kyash Advent Calendar 2021の他の記事も是非ご覧ください〜!