BigQueryを利用し始めて時間が経ち、データも少しずつ増えてきました。
それと同時に利用者や、BigQueryにアクセスするツールが増えデータスキャン量も当然増えてきます。
そこで費用を抑えるために定額料金にすることにしました。
ただ、24時間ずっとBigQueryでクエリが実行しているわけではないので、月次・年間契約だとコストが見合わなく、Flex SlotsとAirflowを組み合わせて営業時間のみ定額化することにしました。
設定する場合はBigQuery Reservationsの詳細について一度確認することをお勧めします。
今回はAirflowのデプロイ方法については割愛します。
定額化の設定に必要な権限
処理を行うアカウントに以下の権限が必要なので、こちらの権限を設定したサービスアカウントを利用します。
フォルダに対して定額化を有効にする場合、フォルダもしくは組織に対する権限を設定してください。
- bigquery.capacityCommitments.create
- bigquery.capacityCommitments.delete
- bigquery.reservationAssignments.create
- bigquery.reservationAssignments.delete
- bigquery.reservations.create
- bigquery.reservations.delete
定額化設定DAG
import jpholiday
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta, timezone, datetime
from google.cloud.bigquery_reservation_v1 import ReservationServiceClient, CapacityCommitment, Reservation, Assignment
jst = timezone(timedelta(hours=9), 'JST')
default_args = {
'retries': 0,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
'BigQueryReservationSlots',
default_args=default_args,
description='Create BigQuery Commitment',
schedule_interval='0 9 * * 1-5',
dagrun_timeout=timedelta(minutes=5),
start_date=datetime(),
catchup=False
)
def reservation_slots(**kwargs):
if jpholiday.is_holiday(datetime.now(jst).date()):
kwargs['ti'].xcom_push(key='is_holiday', value=1)
return
kwargs['ti'].xcom_push(key='is_holiday', value=0)
reserve_client = ReservationServiceClient()
parent = 'projects/{PROJECT_NAME}/locations/{ZONE_NAME}'
commit_conf = CapacityCommitment(plan='FLEX', slot_count=500)
commit = reserve_client.create_capacity_commitment(parent=parent, capacity_commitment=commit_conf)
kwargs['ti'].xcom_push(key='commitment_name', value=commit.name)
reserve_config = Reservation(slot_capacity=500, ignore_idle_slots=False)
reserve = reserve_client.create_reservation(parent=parent, reservation_id='daily-commitment',
reservation=reserve_config)
kwargs['ti'].xcom_push(key='reservation_name', value=reserve.name)
assign_config = Assignment(job_type='QUERY', assignee='folders/{FOLDER_ID}')
assign = reserve_client.create_assignment(parent=reserve.name, assignment=assign_config)
kwargs['ti'].xcom_push(key='assignment_name', value=assign.name)
reservation_slots = PythonOperator(
task_id='reservation_slots',
python_callable=reservation_slots,
provide_context=True,
dag=dag
)
こちらでスロットの購入・予約の作成・割り当てを行います。
スロットの購入
以下の部分で、スロットの購入処理を行います。
今回はjpholiday
を利用して祝日は定額化を行わないようにしています。
休日だった場合returnしていますが、BranchPythonOperator
を利用してスキップする方が丁寧かもです。
if jpholiday.is_holiday(datetime.now(jst).date()):
kwargs['ti'].xcom_push(key='is_holiday', value=1)
return
kwargs['ti'].xcom_push(key='is_holiday', value=0)
reserve_client = ReservationServiceClient()
parent = 'projects/{PROJECT_NAME}/locations/{ZONE_NAME}'
commit_conf = CapacityCommitment(plan='FLEX', slot_count=500)
commit = reserve_client.create_capacity_commitment(parent=parent, capacity_commitment=commit_conf)
kwargs['ti'].xcom_push(key='commitment_name', value=commit.name)
定額化を解除するときにcommit名が必要になるので、XCOMを利用してこの後のDAGに渡します。
予約の作成
reserve_config = Reservation(slot_capacity=500, ignore_idle_slots=False)
reserve = reserve_client.create_reservation(parent=parent, reservation_id='daily-commitment',
reservation=reserve_config)
kwargs['ti'].xcom_push(key='reservation_name', value=reserve.name)
こちらでも定額化解除の際に必要となるので、reservation名をXCOMで渡します。
予約の割り当て
assign_config = Assignment(job_type='QUERY', assignee='folders/{FOLDER_ID}')
assign = reserve_client.create_assignment(parent=reserve.name, assignment=assign_config)
kwargs['ti'].xcom_push(key='assignment_name', value=assign.name)
ここでは、特定のフォルダー配下に配置されているプロジェクトに対して購入したスロットを共有するようにしています。
特定のプロジェクトのみに設定する場合はprojects/{PROJECT_ID}
組織全体に設定する場合はorganizations/{ORG_ID}
となります。
定額化の解除
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta, timezone, datetime
from google.cloud.bigquery_reservation_v1 import ReservationServiceClient
XCOM_PUSH_DAG_ID = 'BigQueryReservationSlots'
XCOM_PUSH_TASK_ID = 'reservation_slots'
default_args = {
'retries': 0,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
'BigQueryDeleteReservation',
default_args=default_args,
description='Delete BigQuery Commitment',
schedule_interval='0 19 * * 1-5',
dagrun_timeout=timedelta(minutes=5),
start_date=datetime(),
catchup=False
)
def delete_reservation(**kwargs):
is_holiday = kwargs['ti'].xcom_pull(dag_id=XCOM_PUSH_DAG_ID,
task_ids=XCOM_PUSH_TASK_ID,
key='is_holiday',
include_prior_dates=True)
if is_holiday == 1:
return
reserve_client = ReservationServiceClient()
assignment_name = kwargs['ti'].xcom_pull(dag_id=XCOM_PUSH_DAG_ID,
task_ids=XCOM_PUSH_TASK_ID,
key='assignment_name',
include_prior_dates=True)
commitment_name = kwargs['ti'].xcom_pull(dag_id=XCOM_PUSH_DAG_ID,
task_ids=XCOM_PUSH_TASK_ID,
key='commitment_name',
include_prior_dates=True)
reservation_name = kwargs['ti'].xcom_pull(dag_id=XCOM_PUSH_DAG_ID,
task_ids=XCOM_PUSH_TASK_ID,
key='reservation_name',
include_prior_dates=True)
reserve_client.delete_assignment(name=assignment_name)
reserve_client.delete_reservation(name=reservation_name)
reserve_client.delete_capacity_commitment(name=commitment_name)
delete_reservation_task = PythonOperator(
task_id='delete_reservation',
python_callable=delete_reservation,
provide_context=True,
dag=dag
)
こちらでスロット・予約・予約の割り当てを削除します。
予約名 commit名 割り当て名の取得
assignment_name = kwargs['ti'].xcom_pull(dag_id=XCOM_PUSH_DAG_ID,
task_ids=XCOM_PUSH_TASK_ID,
key='assignment_name',
include_prior_dates=True)
commitment_name = kwargs['ti'].xcom_pull(dag_id=XCOM_PUSH_DAG_ID,
task_ids=XCOM_PUSH_TASK_ID,
key='commitment_name',
include_prior_dates=True)
reservation_name = kwargs['ti'].xcom_pull(dag_id=XCOM_PUSH_DAG_ID,
task_ids=XCOM_PUSH_TASK_ID,
key='reservation_name',
include_prior_dates=True)
定額化設定のDAGでXCOMにpushしたデータをpullし、後続処理で使用します。
スロット・予約・割り当ての削除
reserve_client.delete_assignment(name=assignment_name)
reserve_client.delete_reservation(name=reservation_name)
reserve_client.delete_capacity_commitment(name=commitment_name)
もう何もいうことはありません。
以上となります。
最後に
DAGの記述はかなり単純なものになりました。
定額化することで、コストを一定にすることすることが可能ですが、スロットが一定になるので、オンデマンドよりもパフォーマンスが落ちる可能性があります。
また、Flex Slotsは1時間当たりの値段は月次よりも高額になっているので運用にあったプランの設定を検討してみてください。
今回は、平日日中としましたが、BigQueryのデータを大量にスキャンするワークフローに組み込むことでコストを抑えることができるかもしれません。