LoginSignup
2
4

More than 1 year has passed since last update.

構造化データ処理におけるSQL、Pyspark、Pyflink、Polarsの基本的な構文比較

Last updated at Posted at 2023-04-01

はじめに

こんにちは。
構造化データの加工•分析のデファクトスタンダードはSQLですよね。SQLはほぼ単一のクエリ言語でダントツで浸透しています。ETL系のサービス•ライブラリは色々ありますが、サービスを普及させるにはSQLに寄せるのが一番手っ取り早い気がしております。

そういう訳で(どういう訳?)、SQLとSQLライクに利用可能なデータ処理系ライブラリの基本的な構文を比較してみたいと思います。
書いていて気がつきました。全てほぼほぼSQLなのでまとめる意味がないことに。

前提

  • ライブラリのVerは以下です
    • pyspark: 3.3.0
    • pyflink: 1.15.4
    • polars: 0.16.16
  • SQLはBigqueryの構文です
  • pyspark,pyflink,polar共に以下は一例となります。共通の処理に複数の書き方が可能な場合があります。
  • pyspark,pyflink共にバッチ処理の前提です
    • pyspark: Dataframe APIの構文
    • pyflink: Tablea APIの構文
  • ライブラリによって制限が存在する場合がありますが(圧倒的にpolars)、pyspark,pyflink,polars共に直接SQLを実行することも可能です

エントリポイント作成

-- SQL
-- お好きなDBとSQLクライアントを準備
-- 以下SQLコードはBigqueryで実行可能
# pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

ss = SparkSession.builder.master('local[*]').appName('test').getOrCreate()


# pyflink
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col, lit

tbl_settings = EnvironmentSettings.in_batch_mode()
tbl_env = TableEnvironment.create(tbl_settings)


# polars
import polars as pl

サンプルデータ作成

-- SQL Bigquery
create or replace table temp.sample_tbl as
with sample_tbl as (
  select 
    cols.cd as cd
    ,cols.name as name
    ,cols.amount as amount
  from unnest([
    struct(1 as cd, 'a' as name, 100 as amount),
    struct(2 as cd, 'b' as name, 200 as amount),
    struct(3 as cd, 'c' as name, 300 as amount),
    struct(4 as cd, 'a' as name, 400 as amount),
    struct(5 as cd, 'b' as name, 500 as amount)
  ]) as cols
)

select cd, name, amount from sample_tbl
;
# pyspark
sdf = ss.createDataFrame([
    (1, 'a', 100),
    (2, 'b', 200),
    (3, 'c', 300),
    (4, 'a', 400),
    (5, 'b', 500),
    ], 
    schema='cd int, name string, amount int'
)
sdf.show()


# pyflink
ftab = tbl_env.from_elements([
    (1, 'a', 100),
    (2, 'b', 200),
    (3, 'c', 300),
    (4, 'a', 400),
    (5, 'b', 500),
    ],
    schema=['cd', 'name', 'amount']
)
ftab.execute().print()


# polars
pdf = pl.DataFrame({
    'cd': [1, 2, 3, 4, 5],
    'name': ['a', 'b', 'c', 'a', 'b'],
    'amount': [100, 200, 300, 400, 500],
})
print(pdf)

Select

-- SQL Bigquery
select 
    cd
    ,name
    ,amount
    ,'sql' as literal_word_col
from
    temp.sample_tbl
;

-- 「*」で全列選択する
select * from sample_tbl;
# pyspark
sdf.select(
    'cd',
    'name',
    'amount',
    lit('sql').alias('literal_word_col'),
).show()

# col関数を明示的に利用してもOKです
sdf.select(
    col('cd'),
    col('name'),
    col('amount'),
    lit('sql').alias('literal_word_col'),
).show()

# 「*」で全列選択する
sdf.select('*').show()
sdf.select(col('*')).show()
sdf.show()


# pyflink
ftab.select(
    col('cd'),
    col('name'),
    col('amount'),
    lit('sql').alias('literal_word_col'),
).execute().print()

# 「*」で全列選択する
ftab.select('*').execute().print()
ftab.select(col('*')).execute().print()
ftab.execute().print()


# polars
pdf_result = pdf.select(
    'cd',
    'name',
    'amount',
    pl.lit('sql').alias('literal_word_col'),
)
print(pdf_result)

# col関数を明示的に利用してもOKです
pdf_result = pdf.select(
    pl.col('cd'),
    pl.col('name'),
    pl.col('amount'),
    pl.lit('sql').alias('literal_word_col'),
)
print(pdf_result)

# 「*」で全列選択する
print(pdf.select('*'))
print(pdf.select(pl.col('*')))
print(pdf)

As

-- SQL Bigquery
select
    name as name_alias_col
    ,100 as hundret_col
from
    temp.sample_tbl
; 
# pyspark
sdf.select(
    col('name').alias('name_alias_col'),
    lit(100).alias('hundret_col'),
).show()


# pyflink
ftab.select(
    col('name').alias('name_alias_col'),
    lit(100).alias('hundret_col'),
).execute().print()

ftab.select(
    col('name'),
    lit(100),
).alias('name_alias_col', 'hundret_col').execute().print()


# polars
pdf_result = pdf.select(
    pl.col('name').alias('name_alias_col'),
    pl.lit(100).alias('hundret_col'),
)
print(pdf_result)

Where

-- SQL Bigquery
select
    name
    ,amount
from
    temp.sample_tbl
where
    amount >= 300
;
# pyspark
# whereはfilterでもOKです
sdf.select('name', 'amount').where(col('amount') >= 300).show()
sdf.select('name', 'amount').where('amount >= 300').show()
sdf.filter('amount >= 300').show()

# pyflink
# whereはfilterでもOKです
(ftab
    .select(col('name'), col('amount'))
    .where(col('amount') >= 300)
    .execute().print()
)
(ftab
    .select(col('name'), col('amount'))
    .where('amount >= 300')
    .execute().print()
)
ftab.filter(col('amount') >= 300).execute().print()


# polars
pdf_result = pdf.select('name', 'amount').filter(pl.col('amount') >= 300)
print(pdf_result)

カラム操作

-- SQL Bigquery
select
    cd
    ,name
    ,amount as amount_alias  -- カラム名変更
    ,amount / 2 as half_amount  -- カラム追加
from
    temp.sample_tbl
;

-- カラム削除(削除ではないですが。。。)
select * except(cd) from sample_tbl;

以下は専用のメソッドを記載していますが、pyspark,pyflink,polars共にSQL同様select内でカラム追加、カラム名変更等が可能です。

# pyspark
# カラム追加
sdf.withColumn('half_amount', col('amount') / 2).show()
sdf.withColumns({'half_amount': col('amount') / 2}).show()

# カラム名変更
sdf.withColumnRenamed('amount', 'amount_alias').show()

# カラム削除
sdf.drop('cd').show()


# pyflink
# カラム追加
ftab.add_columns((col('amount') / 2).alias('half_amount')).execute().print()
ftab.add_or_replace_columns((col('amount') / 2).alias('half_amount')).execute().print()

# カラム名変更
ftab.rename_columns(col('amount').alias('amount_alias')).execute().print()

# カラム削除
ftab.drop_columns(col('amount')).execute().print()

# polars
# カラム追加
pdf_result = pdf.with_columns((pl.col('amount') / 2).alias('half_amount'))
print(pdf_result)

# カラム名変更
print(pdf.rename({'amount': 'amount_alias'}))

# カラム削除
print(pdf.drop('cd'))

集約

GroupBy

-- SQL Bigquery
select
    name
    ,sum(amount) as total_amount
from
    temp.sample_tbl
group by
    name
;
# pyspark
from pyspark.sql.functions import sum

sdf.groupBy('name').sum('amount').withColumnRenamed('sum(amount)', 'total_amount').show()
sdf.groupBy('name').agg(sum('amount').alias('total_amount')).show()

# pyflink
ftab.group_by('name').select(col('name'), col('amount').sum.alias('total_amount')).execute().print()

# polars
pdf_result = pdf.select('name', pl.col('amount').alias('total_amount')).groupby('name').sum()
print(pdf_result)

pdf_result = pdf.groupby('name').agg(pl.col('amount').sum().alias('total_amount'))
print(pdf_result)

Window関数

-- SQL Bigquery
select
    *
    ,sum(amount) over(partition by name order by cd 
            rows between unbounded preceding and current row) as total_amount
from
    temp.sample_tbl
;
# pyspark
from pyspark.sql.functions import sum
from pyspark.sql.window import Window

w = Window.partitionBy('name').orderBy('cd').rowsBetween(Window.unboundedPreceding, Window.currentRow)

sdf.select('*', sum(col('amount')).over(w).alias('total_amount')).show()
sdf.withColumn('amount_total', sum(col('amount')).over(w)).show()


# pyflink
# 以下構文を記載しますが、実際は時間属性を設定したカラムのみでOver句内のOrderByが許可されています
# その為以下を実行してもエラーとなります
# 時間属性に関しては以下を参照してください
# https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/

from pyflink.table.expressions import UNBOUNDED_RANGE, CURRENT_RANGE
from pyflink.table.window import Over

ftab_window = ftab.over_window(Over.partition_by(col('name')).order_by(col('regist_date')).preceding(UNBOUNDED_ROW).following(CURRENT_ROW).alias('w'))
ftab_window.select(col('*'), col('amount').sum.over(col('w')).alias('total_amount')).execute().print()


# polars
pdf_result = pdf.select(
    '*',
    pl.col('amount').sort_by('cd').rolling_sum(0).over('name').alias('total_amount')
)
print(pdf_result)

どうでも良いコラム

Polarsでrolling_sum等の移動集計関数を利用する場合、SQL(Pyspark)とは動きが異なる場合があるのでご注意下さい。具体的には、任意のソート順で移動集計したい場合にどこでソートするかによって動作が異なります。

# sort_no順にamountを累計したい。partitionは不要(テーブル全体)。

# pyspark
test_sdf = ss.createDataFrame([
    [1, 1000],
    [4, 4000],
    [2, 2000],
    [3, 3000],
    [5, 5000],],
    schema='sort_no int, amount int'
)

w = Window.orderBy('sort_no')
test_sdf.withColumn('amount_running_sum', sum(col('amount')).over(w)).show()

# 結果
+-------+------+------------------+
|sort_no|amount|amount_running_sum|
+-------+------+------------------+
|      1|  1000|              1000|
|      2|  2000|              3000|
|      3|  3000|              6000|
|      4|  4000|             10000|
|      5|  5000|             15000|
+-------+------+------------------+


# polars
test_pdf = pl.DataFrame({
    'sort_no': [1, 4, 2, 3, 5],
    'amount': [1000, 4000, 2000, 3000, 5000],
})

# 累計列のみに対してソートした場合
test_pdf_result = test_pdf.with_columns(
    pl.col('amount').sort_by('sort_no').rolling_sum(0).alias('amount_running_sum')
)
print(test_pdf_result)

# 結果
# amont_running_sumカラム単体での計算結果は正しいが、(SQL的な)行の整合性は無視されている
┌─────────┬────────┬────────────────────┐
 sort_no  amount  amount_running_sum 
 ---      ---     ---                
 i64      i64     i64                
╞═════════╪════════╪════════════════════╡
 1        1000    1000               
 4        4000    3000               
 2        2000    6000               
 3        3000    10000              
 5        5000    15000              
└─────────┴────────┴────────────────────┘

# Dataframe全体に対してソートした場合
test_pdf_result2 = (
    test_pdf
    .sort('sort_no')
    .with_columns(
        pl.col('amount').rolling_sum(0).alias('amount_running_sum'),
    )
)
print(test_pdf_result2)

# 結果
# SQLと同等の結果となる
┌─────────┬────────┬────────────────────┐
 sort_no  amount  amount_running_sum 
 ---      ---     ---                
 i64      i64     i64                
╞═════════╪════════╪════════════════════╡
 1        1000    1000               
 2        2000    3000               
 3        3000    6000               
 4        4000    10000              
 5        5000    15000              
└─────────┴────────┴────────────────────┘

Having

-- SQL Bigquery
select
    name
    ,sum(amount) as total_amount
from
    temp.sample_tbl
group by
    name
having
    sum(amount) >= 400
    -- BQでは集計カラム別名でも実行可能
    -- total_amount >= 400
;
# pyspark, pyflink, polars共に
# 集計後のDataframe/Tableに対してフィルターするのでwhere/fileterと完全に同じ
# pyspark
from pyspark.sql.functions import sum

(
    sdf
    .groupBy('name')
    .sum('amount')
    .withColumnRenamed('sum(amount)', 'total_amount')
    .where(col('amount_total') >= 400)
    .show()
)
(
    sdf
    .groupBy('name')
    .agg(sum('amount').alias('total_amount'))
    .where(col('total_amount') >= 400)
    .show()
)


# pyflink
(
    ftab
    .group_by('name')
    .select(col('name'), col('amount').sum.alias('total_amount'))
    .where(col('total_amount') >= 400)
    .execute().print()
)

# polars
pdf_result = (
    pdf
    .select('name', pl.col('amount').alias('total_amount'))
    .groupby('name')
    .sum()
    .filter(pl.col('total_amount') >= 400)
)
print(pdf_result)

pdf_result = (
    pdf
    .groupby('name')
    .agg(pl.col('amount').sum().alias('total_amount'))
    .filter(pl.col('total_amount') >= 400)
)
print(pdf_result)

Distinct

-- SQL Bigquery
select distinct
    name
from
    temp.sample_tbl
;
# pyspark
sdf.select('name').distinct().show()


# pyflink
ftab.select(col('name')).distinct().execute().print()


# polars
print(pdf.select('name').unique())

結合

Inner join

-- SQL Bigquery
select
    t1.name
    ,t2.amount
from
    temp.sample_tbl t1
inner join
    temp.sample_tbl t2
on
    t1.cd = t2.cd
;
# pyspark
sdf = sdf.alias('sdf1')
sdf2 = sdf.alias('sdf2')
(
    sdf
    .join(sdf2, on=col('sdf1.cd') == col('sdf2.cd'), how='inner')  # on='cd'でもOK
    .select('sdf1.name', 'sdf2.amount')
    .show()
)


# pyflink
# pyflinkは結合するテーブル同士でユニークなカラム名が必須
# pysparkとpyflinkだとaliasメソッドの動作が違うので注意
# pyspark: テーブルに別名をつける
# pyflink: カラムに別名をつける

ftab2 = ftab.alias('cd2', 'name2', 'amount2')
(
    ftab
    .join(ftab2, col('cd') == col('cd2'))
    .select(col('name'), col('amount2').alias('amount'))
    .execute().print()
)


# polars
pdf_result = (
    pdf
    .join(pdf, left_on='cd', right_on='cd', how='inner')  # on='cd'でもOK
    .select('cd', pl.col('amount_right').alias('amount'))
)
print(pdf_result)

Outer join

-- SQL Bigquery
-- 左外部結合
select
    t1.name
    ,t2.amount
from
    temp.sample_tbl t1
left outer join
    temp.sample_tbl t2
on
    t1.cd = t2.cd
;

-- 完全外部結合
select
    t1.name
    ,t2.amount
from
    temp.sample_tbl t1
full outer join
    temp.sample_tbl t2
on
    t1.cd = t2.cd
;
# pyspark
# 左外部結合
sdf = sdf.alias('sdf1')
sdf2 = sdf.alias('sdf2')
(
    sdf
    .join(sdf2, on=col('sdf1.cd') == col('sdf2.cd'), how='left')  # on='cd'でもOK
    .select('sdf1.name', 'sdf2.amount')
    .show()
)

# 完全外部結合
(
    sdf
    .join(sdf2, on=col('sdf1.cd') == col('sdf2.cd'), how='full')  # on='cd'でもOK
    .select('sdf1.name', 'sdf2.amount')
    .show()
)


# pyflink
# 左外部結合
ftab2 = ftab.alias('cd2', 'name2', 'amount2')
(
    ftab
    .left_outer_join(ftab2, col('cd') == col('cd2'))
    .select(col('name'), col('amount2').alias('amount'))
    .execute().print()
)

# 完全外部結合
(
    ftab
    .full_outer_join(ftab2, col('cd') == col('cd2'))
    .select(col('name'), col('amount2').alias('amount'))
    .execute().print()
)

# polars
# 左外部結合
pdf_result = (
    pdf
    .join(pdf, left_on='cd', right_on='cd', how='left')  # on='cd'でもOK
    .select('cd', pl.col('amount_right').alias('amount'))
)
print(pdf_result)

# 完全外部結合
pdf_result = (
    pdf
    .join(pdf, left_on='cd', right_on='cd', how='outer')  # on='cd'でもOK
    .select('cd', pl.col('amount_right').alias('amount'))
)
print(pdf_result)

Cross join

-- SQL Bigquery
select
    t1.name
    ,t2.amount
from
    temp.sample_tbl t1
cross join
    temp.sample_tbl t2
;
# pyspark
sdf = sdf.alias('sdf1')
sdf2 = sdf.alias('sdf2')
(
    sdf
    .join(sdf2, how='cross')  # how='cross'は無くてもOK
    .select('sdf1.name', 'sdf2.amount')
    .show()
)


# pyflink
ftab2 = ftab.alias('cd2', 'name2', 'amount2')
(
    ftab
    .join(ftab2)
    .select(col('name'), col('amount2').alias('amount'))
    .execute().print()
)


# polars
pdf_result = (
    pdf
    .join(pdf, how='cross')
    .select('cd', pl.col('amount_right').alias('amount'))
)
print(pdf_result)

集合演算

Union

-- SQL Bigquery
-- Union All
select
    *
from
    temp.sample_tbl
union all
select
    *
from
    temp.sample_tbl
;

-- Union Distinct
select
    *
from
    temp.sample_tbl
union distinct
select
    *
from
    temp.sample_tbl
;
# pyspark
# Union All
# pysparkのunionはunionallのエイリアスなので注意
sdf.unionAll(sdf).show()
sdf.union(sdf).show()

# Union Distinct
sdf.unionAll(sdf).distinct().show()
sdf.union(sdf).distinct().show()


# pyflink
# Union All
ftab.union_all(ftab).execute().print()

# Union Distinct
ftab.union(ftab).execute().print()


# polars
# Union All
pdf2 = pdf.clone()
print(pdf.vstack(pdf2))
print(pdf.extend(pdf2).unique())
print(pl.concat([pdf, pdf]))

# Union Distinct
pdf2 = pdf.clone()
print(pdf.vstack(pdf2).unique())
print(pdf.extend(pdf2).unique())
print(pl.concat([pdf, pdf]).unique())

Except

-- SQL Bigquery
-- Union All
-- Bigqueryでは未対応
select
    *
from
    temp.sample_tbl
except all
select
    *
from
    temp.sample_tbl
;

-- Except Distinct
select
    *
from
    temp.sample_tbl
except distinct
select
    *
from
    temp.sample_tbl
;
# pyspark
# Except All
sdf.exceptAll(sdf).show()

# Except Distinct
# なぜexceptではないのか。。。
sdf.subtract(sdf).show()


# pyflink
# Except All
ftab.minus_all(ftab).execute().print()

# Except Distinct
ftab.minus(ftab).execute().print()


# polars
# 恐らく同等のメソッドは未対応(あったらごめんなさい!!!)

Intersect

-- SQL Bigquery
-- Intersect All
-- Bigqueryでは未対応
select
    *
from
    temp.sample_tbl
intersect all
select
    *
from
    temp.sample_tbl
;

-- Intersect Distinct
select
    *
from
    temp.sample_tbl
intersect distinct
select
    *
from
    temp.sample_tbl
;
# pyspark
# Intersect All
sdf.intersectAll(sdf).show()

# Intersect Distinct
sdf.intersect(sdf).show()


# pyflink
# Intersect All
ftab.intersect_all(ftab).execute().print()

# Intersect Distinct
ftab.intersect(ftab).execute().print()


# polars
# 恐らく同等のメソッドは未対応(あったらごめんなさい!!!)

OrderBy/limit

-- SQL Bigquery
select
    *
from
    temp.sample_tbl t1
order by
    cd
limit 2
;
# pyspark
sdf.orderBy('cd').limit(2).show()


# pyflink
ftab.order_by(col('cd')).limit(2).execute().print()


# polars
print(pdf.sort('cd').limit(2))

その他

Pivot

-- SQL Bigquery
select
    pivot_col.*
from
    temp.sample_tbl t1
pivot
  (
      sum(amount) for name in('a', 'b', 'c')
  ) as pivot_col
;
# pyspark
sdf.groupBy('cd').pivot('name').sum('amount').show()


# pyflink
# 恐らく同等のメソッドは未対応(あったらごめんなさい!!!)

# polars
pdf2 = pdf.pivot(values='amount', index='cd', columns='name')
print(pdf2)

Case

-- SQL Bigquery
select
    amount
    ,case when amount >= 300 then 'expensive' else 'cheep' end as amount_stat
from
    temp.sample_tbl t1
;
# pyspark
from pyspark.sql.functions import when 

sdf.select('amount', 
    when(col('amount') >= 300, 'expensive')
        .otherwise('cheep').alias('amount_stat')
).show()


# pyflink
# Table APIでは未対応。SQLでは実行可能。
# なぜTable APIのみ未対応なのかは謎。。。


# polars
pdf2 = pdf.select('amount',
        pl.when(pl.col('amount') >= 300).then('expensive')
            .otherwise('cheep').alias('amount_stat')
)
print(pdf2)

終わりに

まるっと纏めてみましたが、SQLがわかっていれば学習コストをかけずに書けそうなのが良いですね(とっかかりの抵抗が低いことが新しいことを学ぶ上で重要だと思ってます)。特にSparkはSQLとの親和性がめちゃくちゃ高いので抵抗なく学習できると思います。
Docker(やAnaconda)を利用すればローカルにも簡単に環境を構築できるので、気になる方は利用してみるのも良いのではないでしょうか。
ありがとうございました!

参照

Bigquery SQL公式ドキュメント

Spark 公式ドキュメント

Flink 公式ドキュメント

Polars 公式ドキュメント

2
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
4