Hello, my name is Sourav Saha, a member of NEC Digital Technology Research and Development Laboratories. This is an article for NEC デジタルテクノロジー開発研究所 Advent Calendar 2023 Day 11, where I am going to discuss several possible ways to improve some commonly used Join queries during complex data analysis.
Background
Join
is one of the frequently used operations in table data analysis, involving two or more tables to be combined based on some common keys for further analysis. The computation cost grows with the growth of the input table sizes. There are many research artifacts available to optimize a Join
algorithm. In today's article, I will talk about some commonly used patterns involving join operations, which can be optimized more than 10x by some expert-level code modifications. It is assumed that the readers are aware of the syntaxes of pandas merge method along with different type of join operations.
Optimization Examples
Let's discuss with some problem statements!
Below are the input tables for three different entities, supplier
, nation
, and region
(these are the part of the input tables from TPC-H SF-1 benchmark).
Problem 01:
Let's try to capture the names and phone numbers of the suppliers who operate in the AMERICA
region.
From the above figure, it can be noted that the supplier
and the nation
tables are linked with a common key, n_key
, whereas, the nation
and the region
tables are linked with another common key, r_key
. Hence to get the information on suppliers operating in the AMERICA
region, we would need to merge these three tables using the common keys. So, the very simple implementation for this query might look like:
def ver_01(supplier: pd.DataFrame,
nation: pd.DataFrame,
region: pd.DataFrame):
merged = supplier.merge(nation, on="n_key")
.merge(region, on="r_key")
filtered = merged[merged["r_name"] == "AMERICA"]
return filtered[["s_name", "s_phone"]]
The same is depicted in the following figure:
Can you guess the performance issues with the above implementation?
⇒ Yes, the very first bottleneck is in the order of join queries.
The supplier
table has 10000 rows, whereas the nation
table has 25 rows (covering 25 unique nations), and the region
key has only 5 rows (covering 5 unique regions).
Thus when joining the supplier
table with the nation
table, it will trigger an inner-join operation as follows:
- (10000 x 5) inner_join (25 x 3) -> (10000 x 7)
joining the same with the region
table will again trigger another inner-join operation as follows:
- (10000 x 7) inner_join (5 x 2) -> (10000 x 8)
Both of these join operations depend on the size of the supplier
table, which will significantly increase the computational cost of the above method with the increase in supplier numbers in the supplier
table.
Therefore, let's look for an alternative solution!
Since region
and nation
tables are relatively smaller in size, those can first be joined to address the above problem to some extent. So, let's give it a try!
def ver_02(supplier: pd.DataFrame,
nation: pd.DataFrame,
region: pd.DataFrame):
merged = nation.merge(region, on="r_key")
.merge(supplier, on="n_key") # changes in join-order
filtered = merged[merged["r_name"] == "AMERICA"]
return filtered[["s_name", "s_phone"]]
The same is depicted in the following figure:
It can be seen that when joining the nation
table with the region
table, it will trigger an inner-join operation as follows:
- (25 x 3) inner_join (5 x 2) -> (25 x 4)
joining the same with the supplier
table triggers another inner-join operation as follows:
- (25 x 4) inner_join (10000 x 5) -> (10000 x 8)
The first join operation can be done much faster due to the very small sizes of the input tables. Hence, it can significantly improve the overall performance of the entire query.
So, the very first thing to be noted is that the order of join operation matters when joining more than two tables. It is always preferable to join the smaller tables before joining the larger tables.
The cost of the second operation will still be high and will grow along with the growth of the supplier
table. So let's try to improve that.
The supplier
table contains the details of all the suppliers irrespective of where they operate, but since our objective is to capture only those suppliers, who operate in the AMERICA
region, we can indeed optimize the flow of the above query.
If we can simply join the region
table with the nation
table only considering the AMERICA
region, and then join with the supplier
table, we can significantly reduce the cost for the second join operation, as depicted in the following figure:
※ Some facts about the data:
- the joined table (america_region_nat) resultant from joining the
nation
and theregion
tables specific to theAMERICA
region has only 5 rows and 4 columns. - there are 2036 supplier entries for the
AMERICA
region in thesupplier
table.
Hence due to the inner-join of the supplier
table with the america_region_nat
table, we can optimize the size of the resultant table as follows:
- (5 x 4) inner_join (10000 x 5) -> (2036 x 8)
Well, now let's consider how to join the nation
and the region
tables only for the AMERICA
region. There are possibly two ways of doing the same:
def merge_filter(nation: pd.DataFrame,
region: pd.DataFrame):
merged = nation.merge(region, on="r_key")
return merged[merged["r_name"] == "AMERICA"] # filter-after-join
def filter_merge(nation: pd.DataFrame,
region: pd.DataFrame):
america_region = region[region["r_name"] == "AMERICA"]
return nation.merge(america_region, on="r_key") # join-after-filter
The same is depicted in the following figure:
It is visible that filter_merge
will make it more efficient since it reduces the target table sizes for the join
operation. So, the second important point to be noted is that the order of join-filter operation matters. It is always preferable to perform the join
operation followed by the filter
operation.
So the overall query will look like this:
def ver_03(supplier: pd.DataFrame,
nation: pd.DataFrame,
region: pd.DataFrame):
america_region = region[region["r_name"] == "AMERICA"] # filter
merged = nation.merge(america_region, on="r_key") # smaller-join
.merge(supplier, on="n_key") # larger-join
ret = merged[["s_name", "s_phone"]]
Wait! There is still room for improvement, as we only need the name
and phone
information for the suppliers operating in the AMERICA
region.
Generally data frames are columnar data structures and performing join on the entire table (including all columns) might degrade the performance when we need only a specific field information from the joined table. For example, if we subset only the required columns from the input tables for the second join operation as follows:
def ver_04(supplier: pd.DataFrame,
nation: pd.DataFrame,
region: pd.DataFrame):
america_region = region[region["r_name"] == "AMERICA"]
america_region_nat = nation.merge(america_region, on="r_key")
america_nation_keys = america_region_nat[["n_key"]] # keeping only required column for joining
supplier_name_phone = supplier[["s_name", "n_key", "s_phone"]] # keeping only required columns
merged = supplier_name_phone.merge(america_nation_keys, on="n_key") # faster join on reduced data
ret = merged[["s_name", "s_phone"]]
we can further reduce the cost of the overall query computation as depicted in the following figure:
So, another important point to be noted is that Subsetting the required columns before performing join matters when there are many columns in the input tables to be joined, but a few of them are required from the joined result.
The following table shows the evaluation of the above four versions for different supplier sizes using pandas 1.5.3
(the execution times are in milliseconds):
Problem 02:
Let's consider the following table and find the rows that contain the key
-wise minimum value for the x
column and maximum value for the y
column.
The very basic approach to solve the problem is as follows:
- x_min: perform key-wise min of
x
column - y_max: perform key-wise max of
y
column - join df with x_min and with y_max
- filter rows from resultant table where the value of
x
matches with the value ofx_min
and the value ofy
matches with the value ofy_max
- extract the target columns
def prob_02(df: pd.DataFrame):
t1 = df.groupby("key").agg(x_min = ("x", "min"))
t2 = df.groupby("key").agg(y_max = ("y", "max"))
merged = df.merge(t1, left_on="key", right_index=True)\
.merge(t2, left_on="key", right_index=True)
ret = merged[(merged["x"] == merged["x_min"]) & (merged["y"] == merged["y_max"])]
return ret[df.columns]
The same is depicted in the following figure:
In the above example, we can find multiple join operations, and the problem with join-order
is to be noted here as well. The tables t1
and t2
contain group-wise aggregated results of df
. If there are a smaller number of key groups (e.g. 10) in the input table df
in comparison to its size (e.g., 100000), then joining df
with t1
will cause significant performance issues. Hence, as discussed previously the ideal solution should join t1
with t2
first, before joining with df
as depicted in the following figure:
Now, there exists one more performance issue with the second join operation.
Joining with the "key" column will result in a large data frame when the input df
table has a large number of rows. Hence, joining with the key
column and then filtering on the x_min
and y_max
columns will involve significant computational costs. Instead, it is recommended to perform multi-key-equi-join in this case as depicted in the following figure:
def opt_prob_02(df: pd.DataFrame):
t1 = df.groupby("key").agg(x_min = ("x", "min"))
t2 = df.groupby("key").agg(y_max = ("y", "max"))
t1_t2 = t1.merge(t2, left_index=True, right_index=True).reset_index() # join on aggregated results
ret = df.merge(t1_t2, left_on=["key", "x", "y"], right_on=["key", "x_min", "y_max"]) # multi-key equi-join
return ret[df.columns]
FireDucks - Compiler-accelerated DataFrame Library
We at NEC, have recently released a high-performance data analysis library, named FireDucks
which is highly compatible with pandas and accelerated with different layers of optimization. It can outperform pandas even on CPU-only systems. Some of the above-explained performance bottlenecks can be auto-detected by the FireDucks compiler and automatically optimized according to the above-mentioned strategies. Whereas, development work is in progress for some of the other strategies to be automatically optimized and will be released in the future version of FireDucks. Please check out one of my articles explaining key features of FireDucks
, along with its different layers of optimization to realize its potential to be used as a drop-in replacement for pandas.
Nevertheless, the current version itself is powerful enough to auto-tune your program using
the critical domain-specific intelligence it is trained with. Hence, when evaluating with several performance-critical queries in the TPC-H benchmark, we could achieve up to 38x performance gain when using FireDucks.
Wrapping-up
Thank you for reading so far!
To summarize, we discussed different optimization strategies for commonly used Join
operations in complex data analysis. Effective code modification using the explained strategies can significantly reduce your data analysis time with pandas. However, when switching to the compiler-accelerated FireDucks library, it can automatically optimize many of such patterns without any need for expert-level code changes.
We recommend getting it installed using pip
and trying to use it in your pandas application to enjoy its benefits. We are continuously striving towards improving its performance and releasing the same at frequent intervals. Please stay tuned with us and keep sharing your valuable feedback on any execution issues you may face or optimization requests you may have. We are eagerly looking forward to hearing from you!