LoginSignup
6
1

More than 3 years have passed since last update.

Airflowを利用してBigQueryを平日日中のみ定額化する

Last updated at Posted at 2020-12-09

BigQueryを利用し始めて時間が経ち、データも少しずつ増えてきました。
それと同時に利用者や、BigQueryにアクセスするツールが増えデータスキャン量も当然増えてきます。
そこで費用を抑えるために定額料金にすることにしました。
ただ、24時間ずっとBigQueryでクエリが実行しているわけではないので、月次・年間契約だとコストが見合わなく、Flex SlotsとAirflowを組み合わせて営業時間のみ定額化することにしました。
設定する場合はBigQuery Reservationsの詳細について一度確認することをお勧めします。
今回はAirflowのデプロイ方法については割愛します。

定額化の設定に必要な権限

処理を行うアカウントに以下の権限が必要なので、こちらの権限を設定したサービスアカウントを利用します。
フォルダに対して定額化を有効にする場合、フォルダもしくは組織に対する権限を設定してください。

  • bigquery.capacityCommitments.create
  • bigquery.capacityCommitments.delete
  • bigquery.reservationAssignments.create
  • bigquery.reservationAssignments.delete
  • bigquery.reservations.create
  • bigquery.reservations.delete

定額化設定DAG

import jpholiday
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta, timezone, datetime
from google.cloud.bigquery_reservation_v1 import ReservationServiceClient, CapacityCommitment, Reservation, Assignment

jst = timezone(timedelta(hours=9), 'JST')

default_args = {
    'retries': 0,
    'retry_delay': timedelta(minutes=1)
}

dag = DAG(
    'BigQueryReservationSlots',
    default_args=default_args,
    description='Create BigQuery Commitment',
    schedule_interval='0 9 * * 1-5',
    dagrun_timeout=timedelta(minutes=5),
    start_date=datetime(),
    catchup=False
)

def reservation_slots(**kwargs):

    if jpholiday.is_holiday(datetime.now(jst).date()):
        kwargs['ti'].xcom_push(key='is_holiday', value=1)
        return

    kwargs['ti'].xcom_push(key='is_holiday', value=0)
    reserve_client = ReservationServiceClient()
    parent = 'projects/{PROJECT_NAME}/locations/{ZONE_NAME}'
    commit_conf = CapacityCommitment(plan='FLEX', slot_count=500)
    commit = reserve_client.create_capacity_commitment(parent=parent, capacity_commitment=commit_conf)
    kwargs['ti'].xcom_push(key='commitment_name', value=commit.name)

    reserve_config = Reservation(slot_capacity=500, ignore_idle_slots=False)
    reserve = reserve_client.create_reservation(parent=parent, reservation_id='daily-commitment',
                                                reservation=reserve_config)
    kwargs['ti'].xcom_push(key='reservation_name', value=reserve.name)

    assign_config = Assignment(job_type='QUERY', assignee='folders/{FOLDER_ID}')
    assign = reserve_client.create_assignment(parent=reserve.name, assignment=assign_config)
    kwargs['ti'].xcom_push(key='assignment_name', value=assign.name)


reservation_slots = PythonOperator(
    task_id='reservation_slots',
    python_callable=reservation_slots,
    provide_context=True,
    dag=dag
)

こちらでスロットの購入・予約の作成・割り当てを行います。

スロットの購入

以下の部分で、スロットの購入処理を行います。
今回はjpholidayを利用して祝日は定額化を行わないようにしています。
休日だった場合returnしていますが、BranchPythonOperatorを利用してスキップする方が丁寧かもです。


if jpholiday.is_holiday(datetime.now(jst).date()):
        kwargs['ti'].xcom_push(key='is_holiday', value=1)
        return

    kwargs['ti'].xcom_push(key='is_holiday', value=0)

    reserve_client = ReservationServiceClient()
    parent = 'projects/{PROJECT_NAME}/locations/{ZONE_NAME}'
    commit_conf = CapacityCommitment(plan='FLEX', slot_count=500)
    commit = reserve_client.create_capacity_commitment(parent=parent, capacity_commitment=commit_conf)
    kwargs['ti'].xcom_push(key='commitment_name', value=commit.name)

定額化を解除するときにcommit名が必要になるので、XCOMを利用してこの後のDAGに渡します。

予約の作成

     reserve_config = Reservation(slot_capacity=500, ignore_idle_slots=False)
    reserve = reserve_client.create_reservation(parent=parent, reservation_id='daily-commitment',
                                                reservation=reserve_config)
    kwargs['ti'].xcom_push(key='reservation_name', value=reserve.name)

こちらでも定額化解除の際に必要となるので、reservation名をXCOMで渡します。

予約の割り当て

    assign_config = Assignment(job_type='QUERY', assignee='folders/{FOLDER_ID}')
    assign = reserve_client.create_assignment(parent=reserve.name, assignment=assign_config)
    kwargs['ti'].xcom_push(key='assignment_name', value=assign.name)

ここでは、特定のフォルダー配下に配置されているプロジェクトに対して購入したスロットを共有するようにしています。
特定のプロジェクトのみに設定する場合はprojects/{PROJECT_ID}組織全体に設定する場合はorganizations/{ORG_ID}となります。

定額化の解除

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta, timezone, datetime

from google.cloud.bigquery_reservation_v1 import ReservationServiceClient

XCOM_PUSH_DAG_ID = 'BigQueryReservationSlots'
XCOM_PUSH_TASK_ID = 'reservation_slots'

default_args = {
    'retries': 0,
    'retry_delay': timedelta(minutes=1)
}

dag = DAG(
    'BigQueryDeleteReservation',
    default_args=default_args,
    description='Delete BigQuery Commitment',
    schedule_interval='0 19 * * 1-5',
    dagrun_timeout=timedelta(minutes=5),
    start_date=datetime(),
    catchup=False
)


def delete_reservation(**kwargs):
    is_holiday = kwargs['ti'].xcom_pull(dag_id=XCOM_PUSH_DAG_ID,
                                        task_ids=XCOM_PUSH_TASK_ID,
                                        key='is_holiday',
                                        include_prior_dates=True)

    if is_holiday == 1:
        return

    reserve_client = ReservationServiceClient()
    assignment_name = kwargs['ti'].xcom_pull(dag_id=XCOM_PUSH_DAG_ID,
                                             task_ids=XCOM_PUSH_TASK_ID,
                                             key='assignment_name',
                                             include_prior_dates=True)
    commitment_name = kwargs['ti'].xcom_pull(dag_id=XCOM_PUSH_DAG_ID,
                                             task_ids=XCOM_PUSH_TASK_ID,
                                             key='commitment_name',
                                             include_prior_dates=True)
    reservation_name = kwargs['ti'].xcom_pull(dag_id=XCOM_PUSH_DAG_ID,
                                              task_ids=XCOM_PUSH_TASK_ID,
                                              key='reservation_name',
                                              include_prior_dates=True)

    reserve_client.delete_assignment(name=assignment_name)
    reserve_client.delete_reservation(name=reservation_name)
    reserve_client.delete_capacity_commitment(name=commitment_name)


delete_reservation_task = PythonOperator(
    task_id='delete_reservation',
    python_callable=delete_reservation,
    provide_context=True,
    dag=dag
)

こちらでスロット・予約・予約の割り当てを削除します。

予約名 commit名 割り当て名の取得

    assignment_name = kwargs['ti'].xcom_pull(dag_id=XCOM_PUSH_DAG_ID,
                                             task_ids=XCOM_PUSH_TASK_ID,
                                             key='assignment_name',
                                             include_prior_dates=True)
    commitment_name = kwargs['ti'].xcom_pull(dag_id=XCOM_PUSH_DAG_ID,
                                             task_ids=XCOM_PUSH_TASK_ID,
                                             key='commitment_name',
                                             include_prior_dates=True)
    reservation_name = kwargs['ti'].xcom_pull(dag_id=XCOM_PUSH_DAG_ID,
                                              task_ids=XCOM_PUSH_TASK_ID,
                                              key='reservation_name',
                                              include_prior_dates=True)

定額化設定のDAGでXCOMにpushしたデータをpullし、後続処理で使用します。

スロット・予約・割り当ての削除

    reserve_client.delete_assignment(name=assignment_name)
    reserve_client.delete_reservation(name=reservation_name)
    reserve_client.delete_capacity_commitment(name=commitment_name)

もう何もいうことはありません。
以上となります。

スクリーンショット 2020-12-09 22.20.38.png
このようになれば定額化は正常に有効となっています。

最後に

DAGの記述はかなり単純なものになりました。
定額化することで、コストを一定にすることすることが可能ですが、スロットが一定になるので、オンデマンドよりもパフォーマンスが落ちる可能性があります。
また、Flex Slotsは1時間当たりの値段は月次よりも高額になっているので運用にあったプランの設定を検討してみてください。
今回は、平日日中としましたが、BigQueryのデータを大量にスキャンするワークフローに組み込むことでコストを抑えることができるかもしれません。

6
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
1