This note describes the solution to a small problem related to Airflow's BigQueryOperator
which I encountered a while ago.
The solution was not entirely trivial for me to figure out, so I decided to share it here in case it may be beneficial
to someone else. Therefore, I assume some basic knowledge of Airflow in this note, as it is not meant to be introductory.
What we mean by "double substitution"
I will start with describing the issue.
The
BigQueryOperator
contains the following templated parameters:
destination_dataset_table
sql
labels
query_params
-
bql
(deprecated, so we will ignore it here)
As it is common in Airflow, user can pass a dictionary param
to BigQueryOperator, that can be later referenced
in templates.
However, what if one wants to pass a template string in param
? That is, what if one wants to pass something like
{"x":"{{var.json.global_config.x}}"}
in params
, so to reference some global_config
variable defined in Airflow as {"x":"value"}
, and then
have SQL template
SELECT "{{params.x}}" AS x
to be substituted to
SELECT "value" AS x
?
This will not work, as you will in fact get
SELECT "{{var.json.global_config.x}}" AS x
This makes sense, as substitution in Airflow (in its underlying Jinja template engine)
does not happen recursively. This is justified, as otherwise one would have to face the possibility of endless substitution
loops. However, this is not always convenient.
Why we want double substitution
Now, the scenarios when one wants to perform double substitutions are more common, than one might thought at first glance.
Imagine that in our Airflow deployment every DAG we deploy has its own corresponding variable, and also every DAG
may reference certain global_config
variable, containing global configuration. Now, to avoid duplication, it would be ideally
to be able to reference global_config
within the configuration variables of individual DAGs, like in the diagram:
However, if double substitution
is unavailable, we have to do the substitution manually in every DAG's source code. This clutters the logic.
How we can achieve double substitution
One solution is to create our own ExtendedBigQueryOperator
, which one would then call as, for example
ExtendedBigQueryOperator(task_id="query",
sql="\"{{params.templated_params.x}}\" as x",
templated_params={
"x": "{{var.json.global_config.template_data.environment}}"
},
destination_dataset_table="xxx.yyy.zzz"
)
We want ExtendedBigQueryOperator
to satisfy the following properties:
- mimic the constructor of
BigQueryOperator
, except - that it would accept additional dictionary parameter
templated_params
, which would contain parameters which we want to
substitute twice - behave identically to
BigQueryOperator
in every other aspect
Requirements 1.
and 3.
naturally call for ExtendedBigQueryOperator
being inherited from BigQueryOperator
, and we also
add the additional requirement
- amount of additional code in
ExtendedBigQueryOperator
should be as small as possible, so to prevent the inheritance
from breaking in caseBigQueryOperator
will change in future
Finally, this is what I came up with, based on the requirements above (16 lines of code):
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
def _deep_copy_dict(dst,src):
for k in src:
dst[k] = src[k]
class ExtendedBigQueryOperator(BigQueryOperator):
def __init__(self,templated_params,*args,**kwargs):
kwargs["params"] = {**kwargs["params"],"templated_params":templated_params}
self.templated_params = templated_params
self.template_fields = ("templated_params",*(super(ExtendedBigQueryOperator, self).template_fields))
super(ExtendedBigQueryOperator, self).__init__(*args, **kwargs)
def render_template(self, content, *args, **kwargs):
res = super(ExtendedBigQueryOperator, self).render_template(content,*args,**kwargs)
if(content=="templated_params"):
_deep_copy_dict(self.templated_params,res)
return res