0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Computing the BigQuery cost of a single Airflow DAG run

Last updated at Posted at 2020-12-23

TL;DR

To estimate the number of bytes processed by a single run of a given DAG on a given day, execute the following:

$ pip3 install https://github.com/dtws/airflow-dag-profiler@v0.0.1
$ airflow-dag-profiler <dag_id> <date in the format %Y-%m-%d>

Problem

At work I often use BigQuery and Airflow. However, BigQuery is not free and is billed based
on the amount of data processed
. Therefore, it is often necessary to know how many bytes the run of the given DAG will process, so to estimate
the price of the backfill. In this case, I find the following script being of use:

#!/usr/bin/env python3
"""===============================================================================

        FILE: airflow-dag-profiler.py

       USAGE: ./airflow-dag-profiler <DAG_ID> <DATE in format %Y-%m-%d>

 DESCRIPTION: 

     OPTIONS: ---
REQUIREMENTS: ---
        BUGS: ---
       NOTES: ---
      AUTHOR: Alex Leontiev (nailbiter@dtws-work.in)
ORGANIZATION: Datawise Inc.
     VERSION: ---
     CREATED: 2020-12-11T10:15:42.030633
    REVISION: ---

==============================================================================="""

import click
import logging
import os
import subprocess
from collections import namedtuple
import re
from tqdm import tqdm
from google.cloud import bigquery
import pandas as pd
from jinja2 import Template


def _add_logger(f):
    logger = logging.getLogger(f.__name__)

    def _f(*args, **kwargs):
        return f(*args, logger=logger, **kwargs)
    _f.__name__ = f.__name__
    return _f


RetCodeAndOutput = namedtuple("RetCodeAndOutput", "retcode output")


@_add_logger
def _system(cmd, logger=None):
    """return (exitcode,output)"""
    logger.info(f"> {cmd}")
    exitcode, output = subprocess.getstatusoutput(cmd)
    return RetCodeAndOutput(retcode=exitcode, output=output)


@click.command()
@click.option("--debug/--no-debug", default=False)
@click.argument("dag_id")
@click.argument("date", type=click.DateTime())
@click.option("--airflow-render-command", envvar="AIRFLOW_DAG_PROFILER__AIRFLOW_RENDER_COMMAND", default="airflow render {{dag_id}} {{bq_task}} {{ds}}")
@click.option("--airflow-list-tasks-command", envvar="AIRFLOW_DAG_PROFILER__AIRFLOW_LIST_TASKS_COMMAND", default="airflow list_tasks -t {{dag_id}} 2>/dev/null")
def airflow_dag_profiler(dag_id, debug, date, airflow_list_tasks_command, airflow_render_command):
    if debug:
        logging.basicConfig(level=logging.INFO)
    tasktree = _system(Template(airflow_list_tasks_command).render(
        {"dag_id": dag_id, "date": date})).output
    client = bigquery.Client()
    # <Task(BigQueryOperator): do_filter>
    bq_tasks = [t for t in re.findall(
        r"<Task\(BigQueryOperator\): ([a-zA-Z0-9_]+)>", tasktree)]
    bq_tasks = list(set(bq_tasks))
    quota = []
    for bq_task in tqdm(bq_tasks):
        sql = _system(
            Template(airflow_render_command).render({"dag_id": dag_id, "bq_task": bq_task, "date": date,"ds":date.strftime("%Y-%m-%d")})).output
        lines = sql.split("\n")
        start = next(i for i, line in enumerate(lines) if re.match(
            "^ *# property: sql *$", line) is not None)
        end = next(i for i, line in enumerate(lines) if re.match(
            "^ *# property: destination_dataset_table *$", line) is not None)
        sql = "\n".join(lines[start+2:end-1])
        total_bytes_processed = client.query(sql, job_config=bigquery.job.QueryJobConfig(
            dry_run=True, use_query_cache=False)).total_bytes_processed
        quota.append(
            {"bq_task": bq_task, "total_bytes_processed": total_bytes_processed})
    df = pd.DataFrame(quota)
    print(df)
    print(f"total: {sum(df.total_bytes_processed)} bytes")


if __name__ == "__main__":
    airflow_dag_profiler()

The online version can be found at https://github.com/dtws/airflow-dag-profiler. The script can also be installed via pip, as show in TL;DR section
above.

Under the hood, the script does the following:

  1. queries Airflow to get the list of tasks in the DAG;
  2. filters through tasks to leave only BigQueryOperator tasks;
  3. renders the remaining tasks one-by-one using the given datetime;
  4. uses google.cloud.bigquery API to get the estimate for the number of bytes processed by each SQL;

By default, script queries the local airflow via airflow command. However, it can also be used in scenarios when your Airflow is deployed remotely. In this case, you will need to provide --airflow-render-command and
--airflow-list-tasks-command keys (alternatively, AIRFLOW_DAG_PROFILER__AIRFLOW_RENDER_COMMAND and AIRFLOW_DAG_PROFILER__AIRFLOW_LIST_TASKS_COMMAND environment variables).

Current Limitations

  1. DAG should be already deployed to Airflow (which means that you will need working Airflow deployment)
  2. If your DAG uses input tables which it itself creates during the DAG run (e.g. the DAG consists of tasks A and B and A creates table a that B then uses), then you can only use the script after you ran the DAG. Although it
    makes the script pretty useless in this situation, I often find myself in situation when I need to estimate the price of a long backfill (~200 days). In this case, I can first backfill for the first day, and then use this data for the
    estimate, exploiting the fact that the number of bytes processed every day is about the same;
  3. if you use BigQuery through operators other than BigQueryOperator (say, PythonOperator or using your own heir of BigQueryOperator), those will not be included in the bill;
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?