2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Cloud Composer(airflow)でGoogle WorkspaceのAPIを使う

Last updated at Posted at 2021-12-06

はじめに

これは Kyash Advent Calendar 2021 の7日目の記事です

Kyashで、経理・会計・資金決済法の担当エンジニアをしているPrinceチームの清水(@thimi0412)です。
Princeチームについてはこちら → https://foxtoy.hateblo.jp/entry/2021/07/19/100223

KyashではGoogle Cloud Composerを使用して会計業務に使うViewテーブルの作成などをおこなっています。
その中でBigQueryに対してクエリを実行し、結果をスプレッドシートに書き込み指定された共有ドライブのフォルダ内に入れる等の操作が必要になりCloudComposerでGoogle Workspace系のAPIを使う機会があったのでハマった点やどう使っているかを書きます。

Cloud Composerとは
Apache Airflow(ワークフロー管理ツール) で構築された、フルマネージドのワークフロー オーケストレーション サービスです。
https://cloud.google.com/composer

Google Workspace系のOperatorは共有ドライブに対応していないので注意

スプレッドシートを作成したのちそのシートに対してBigQueryのクエリ結果を書き込む処理をしたいと思ったのでGoogleSheetsCreateSpreadsheetOperatorを使用してスプレッドシートを作成したのですがフォルダIDが見つからないというエラーが出ました。

Operatorの使用の例

SPREADSHEET = {
    "name": sheet_name, # シート名
    "parents": [parent_folder_id], # シートを作成するフォルダID
    "mimeType": "application/vnd.google-apps.spreadsheet",
}

create_spreadsheet = GoogleSheetsCreateSpreadsheetOperator(
    task_id="create_spreadsheet", spreadsheet=SPREADSHEET
)

ググっても情報が出てこなかったのでairflowのソースコードを調べました。
分かったことはcreate()の引数としてsupportsAllDrives=trueのパラメータがセットされていないことが問題でした。
共有ドライブに作成する際にはこのパラメータをセットする必要があります(他のGoogle Workspace系のOperatorも同様)。なんで対応してないんだ?
ref Implement shared drive support

def create_spreadsheet(self, spreadsheet: Dict[str, Any]) -> Dict[str, Any]:
    """
    Creates a spreadsheet, returning the newly created spreadsheet.
    :param spreadsheet: an instance of Spreadsheet
        https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets#Spreadsheet
    :type spreadsheet: Dict[str, Any]
    :return: An spreadsheet object.
    """
    self.log.info("Creating spreadsheet: %s", spreadsheet['properties']['title'])

    response = (
        self.get_conn().spreadsheets().create(body=spreadsheet).execute(num_retries=self.num_retries)
    )
    self.log.info("Spreadsheet: %s created", spreadsheet['properties']['title'])
    return response

GoogleSheetsCreateSpreadsheetOperatorは共有ドライブには対応していないのでスプレッドシート作成の関数を自作しました。
この関数をPythonOperator内から呼び出してスプレッドシートを作成しています。フォルダを作成する場合はmimeTypeapplication/vnd.google-apps.folderにすればフォルダも作成できます。

from airflow.providers.google.suite.hooks.sheets import GSheetsHook

def create_spread_sheet(sheet_name, parent_folder_id):
    """スプレッドシートを作成する

    Args:
        sheet_name (string): シート名
        parent_folder_id (string): シートを作成する親フォルダのID

    Returns:
        [type]: [description]
    """
    hook = GoogleDriveHook()
    drive = hook.get_conn()
    file_metadata = {
        "name": sheet_name,
        "parents": [parent_folder_id],
        "mimeType": "application/vnd.google-apps.spreadsheet",
        # "mimeType": "application/vnd.google-apps.folder", フォルダの場合
    }
    # supportsAllDrives=Trueがないと共有ドライブに作成できない
    res = drive.files().create(body=file_metadata, supportsAllDrives=True).execute()

    return res

フォルダIDとシートIDはXComで受け取ると便利

スクリーンショット 2021-11-30 14.18.52.png
XComは異なるタスク間で値をやりとりする手段です。
フォルダ作成→スプレッドシート作成→データの書きこみの依存関係はこのように作成していて、フォルダ、スプレッドシートの作成はそれぞれPythonOperator内でおこなっているのでPythonOperatorも戻り値としてレスポンスのid(フォルダやスプレッドシートのID)を返すことで別のタスクからXComを使用してその値を受け取れます。

def create_folder(**kwargs):
    """フォルダを作成する

    Returns:
        [str]: 作成されたフォルダのID
    """
    folder_name = '<yyyy-mm-dd>'
    folder_id = '<FOLDER_ID>'

    res = create_drive_folder(folder_name, folder_id)
    return res["id"]

def create_spreadsheet(**context):
    """スプレッドシートを作成する

    Returns:
        [str]: 作成したスプレッドシートのID
    """
    # 前taskのcreate_folderから作成したフォルダのIDを受け取る
    folder_id = context["task_instance"].xcom_pull(task_ids="create_folder")
    sheet_name = "sheet_A"

    res = create_spread_sheet(sheet_name, folder_id)
    return res["id"]

with DAG(
    DAG_NAME,
    description="spreadsheet_test",
    schedule_interval="0 23 * * *",  # 毎日8:00(JST)に実行
    catchup=False,
    default_args=default_args,
) as dag:

    task_create_folder = PythonOperator(
        task_id="create_folder",
        python_callable=create_folder,
        provide_context=True,
        dag=dag,
    )
   task_create_spreadsheet_A = PythonOperator(
        task_id="create_spreadsheet_A",
        python_callable=create_spreadsheet,
        provide_context=True,
        dag=dag,
    )
   task_create_spreadsheet_B = PythonOperator(
        task_id="create_spreadsheet_B",
        python_callable=create_spreadsheet,
        provide_context=True,
        dag=dag,
    )
   task_create_spreadsheet_C = PythonOperator(
        task_id="create_spreadsheet_C",
        python_callable=create_spreadsheet,
        provide_context=True,
        dag=dag,
    )
    task_create_folder >> task_create_spreadsheet_A
    task_create_folder >> task_create_spreadsheet_B
    task_create_folder >> task_create_spreadsheet_C

etc task失敗時のslack通知

各taskが失敗した際にon_failure_callbackに関数を登録しておくとslackの通知するようにしています。
logのurlの取得ができるのでslackの通知から直接logに飛べるのでとても便利。
↓のコードではslackwebを使用して通知していますがairflowのproviderにもslackのhookが用意されているのでそっちを使ってもよかったと作ってから思いました。

スクリーンショット 2021-11-30 17.23.03.png

def failure_notification(context):
    task_instance = context["task_instance"]
    dag_name = context["dag"]
    task_name = task_instance.task_id
    log_link = f"<{task_instance.log_url}|{task_name}>"
    error_message = str(context.get("exception") or context.get("reason"))

    slack = slackweb.Slack(url=SLACK_URL)
    attachments = [
        {
            "username": "Cloud Composer(Airflow)",
            "icon_emoji": ":airflow:",
            "color": "danger",
            "title": "エラーが発生しました",
            "fields": [
                {"title": "Project", "value": f"{PROJECT_ID}"},
                {"title": "<DAG name> Task name", "value": f"{dag_name} {task_name}"},
                {"title": "Log URL", "value": log_link},
                {"title": "Error Message", "value": error_message},
            ],
        }
    ]
    slack.notify(attachments=attachments)

最後に

CloudComposer(airflow)のprovider周りのOperatorはあんまりドキュメントが整理されていないので直接airflowのソースコードを読んでどういった仕様になっているのか調べたほうがいいなと思いました。
今回はGoogle WorkspaceのAPI周りを調べましたがairflowのソースコードとGoogle WorkspaceのAPIのドキュメントを繰り返し見て作業してました。
providerはgoogleやslack以外にも多くあるので他のサービスと連携する際にproviderを見てみて連携するサービスがあれば一から実装しなくても良くなるかもしれないのでチェックしておくといいかもと思いました。PR送れるくらいGoogleのproviderのコードを読んだので共有ドライブ対応のPRを送ろうと思っています。

Kyash Advent Calendar 2021の他の記事も是非ご覧ください〜!

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?