TL;DR
To estimate the number of bytes processed by a single run of a given DAG on a given day, execute the following:
$ pip3 install https://github.com/dtws/airflow-dag-profiler@v0.0.1
$ airflow-dag-profiler <dag_id> <date in the format %Y-%m-%d>
Problem
At work I often use BigQuery and Airflow. However, BigQuery is not free and is billed based
on the amount of data processed. Therefore, it is often necessary to know how many bytes the run of the given DAG will process, so to estimate
the price of the backfill. In this case, I find the following script being of use:
#!/usr/bin/env python3
"""===============================================================================
FILE: airflow-dag-profiler.py
USAGE: ./airflow-dag-profiler <DAG_ID> <DATE in format %Y-%m-%d>
DESCRIPTION:
OPTIONS: ---
REQUIREMENTS: ---
BUGS: ---
NOTES: ---
AUTHOR: Alex Leontiev (nailbiter@dtws-work.in)
ORGANIZATION: Datawise Inc.
VERSION: ---
CREATED: 2020-12-11T10:15:42.030633
REVISION: ---
==============================================================================="""
import click
import logging
import os
import subprocess
from collections import namedtuple
import re
from tqdm import tqdm
from google.cloud import bigquery
import pandas as pd
from jinja2 import Template
def _add_logger(f):
logger = logging.getLogger(f.__name__)
def _f(*args, **kwargs):
return f(*args, logger=logger, **kwargs)
_f.__name__ = f.__name__
return _f
RetCodeAndOutput = namedtuple("RetCodeAndOutput", "retcode output")
@_add_logger
def _system(cmd, logger=None):
"""return (exitcode,output)"""
logger.info(f"> {cmd}")
exitcode, output = subprocess.getstatusoutput(cmd)
return RetCodeAndOutput(retcode=exitcode, output=output)
@click.command()
@click.option("--debug/--no-debug", default=False)
@click.argument("dag_id")
@click.argument("date", type=click.DateTime())
@click.option("--airflow-render-command", envvar="AIRFLOW_DAG_PROFILER__AIRFLOW_RENDER_COMMAND", default="airflow render {{dag_id}} {{bq_task}} {{ds}}")
@click.option("--airflow-list-tasks-command", envvar="AIRFLOW_DAG_PROFILER__AIRFLOW_LIST_TASKS_COMMAND", default="airflow list_tasks -t {{dag_id}} 2>/dev/null")
def airflow_dag_profiler(dag_id, debug, date, airflow_list_tasks_command, airflow_render_command):
if debug:
logging.basicConfig(level=logging.INFO)
tasktree = _system(Template(airflow_list_tasks_command).render(
{"dag_id": dag_id, "date": date})).output
client = bigquery.Client()
# <Task(BigQueryOperator): do_filter>
bq_tasks = [t for t in re.findall(
r"<Task\(BigQueryOperator\): ([a-zA-Z0-9_]+)>", tasktree)]
bq_tasks = list(set(bq_tasks))
quota = []
for bq_task in tqdm(bq_tasks):
sql = _system(
Template(airflow_render_command).render({"dag_id": dag_id, "bq_task": bq_task, "date": date,"ds":date.strftime("%Y-%m-%d")})).output
lines = sql.split("\n")
start = next(i for i, line in enumerate(lines) if re.match(
"^ *# property: sql *$", line) is not None)
end = next(i for i, line in enumerate(lines) if re.match(
"^ *# property: destination_dataset_table *$", line) is not None)
sql = "\n".join(lines[start+2:end-1])
total_bytes_processed = client.query(sql, job_config=bigquery.job.QueryJobConfig(
dry_run=True, use_query_cache=False)).total_bytes_processed
quota.append(
{"bq_task": bq_task, "total_bytes_processed": total_bytes_processed})
df = pd.DataFrame(quota)
print(df)
print(f"total: {sum(df.total_bytes_processed)} bytes")
if __name__ == "__main__":
airflow_dag_profiler()
The online version can be found at https://github.com/dtws/airflow-dag-profiler. The script can also be installed via pip
, as show in TL;DR section
above.
Under the hood, the script does the following:
- queries Airflow to get the list of tasks in the DAG;
- filters through tasks to leave only
BigQueryOperator
tasks; - renders the remaining tasks one-by-one using the given datetime;
- uses
google.cloud.bigquery
API to get the estimate for the number of bytes processed by each SQL;
By default, script queries the local airflow via airflow
command. However, it can also be used in scenarios when your Airflow is deployed remotely. In this case, you will need to provide --airflow-render-command
and
--airflow-list-tasks-command
keys (alternatively, AIRFLOW_DAG_PROFILER__AIRFLOW_RENDER_COMMAND
and AIRFLOW_DAG_PROFILER__AIRFLOW_LIST_TASKS_COMMAND
environment variables).
Current Limitations
- DAG should be already deployed to Airflow (which means that you will need working Airflow deployment)
- If your DAG uses input tables which it itself creates during the DAG run (e.g. the DAG consists of tasks
A
andB
andA
creates tablea
thatB
then uses), then you can only use the script after you ran the DAG. Although it
makes the script pretty useless in this situation, I often find myself in situation when I need to estimate the price of a long backfill (~200 days). In this case, I can first backfill for the first day, and then use this data for the
estimate, exploiting the fact that the number of bytes processed every day is about the same; - if you use BigQuery through operators other than BigQueryOperator (say, PythonOperator or using your own heir of BigQueryOperator), those will not be included in the bill;