12
14

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

[Python]polarsのio (Pandas, csv, parquet)

Last updated at Posted at 2021-06-29

個人的に作成している株式データベース関連のプログラムをPythonからrustに変更するため,PandasのDataFrameの処理をpolarsのDataFrameに代替させようと考えています.そこで今回はpolarsの勉強もかねてPythonからpolarsを利用し,そのio関連について見ていきます.polarsはrustで実装されていてパフォーマンスを売りにしており,簡単にPythonから利用できるのでもしかしたらpandasの代替として使われていくかもしれません.polarsのバージョンは0.8.8です.

インポート

インポート関連はpolarsの公式ドキュメントpyarrowの公式ドキュメントに準じています.

import polars as pl
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import pickle
from more_itertools import chunked

pandas <-> polars

pandas to polars

利用するデータをPandasで読み込みます.インデックスがdatetimeとなっており,naiveなものawareなもの二つを利用します.データ中にNaNを含むので,表示の際はそれらを除外します.

with open("aware_stock_df.pickle", "rb") as f:
    aware_pandas_df = pickle.load(f)
print(type(aware_pandas_df))
aware_pandas_df.dropna()[:5]
<class 'pandas.core.frame.DataFrame'>
Open_6502 High_6502 Low_6502 Close_6502 Volume_6502
timestamp
2020-11-04 09:00:00+09:00 2669.0 2670.0 2658.0 2664.0 93000.0
2020-11-04 09:01:00+09:00 2663.0 2664.0 2650.0 2652.0 17600.0
2020-11-04 09:02:00+09:00 2649.0 2655.0 2646.0 2649.0 19200.0
2020-11-04 09:03:00+09:00 2652.0 2670.0 2651.0 2670.0 31200.0
2020-11-04 09:04:00+09:00 2671.0 2674.0 2670.0 2674.0 12800.0

polarsはnaiveなdatetimeしか保持できないので(0.8.8現在)utcのnaiveなdatetimeに変更します.
naiveなdatetimeしか保持できないため,以降の関数では関連したwarningがでることがあります.

naive_utc_pandas_df = aware_pandas_df.copy()
naive_utc_pandas_df.index = aware_pandas_df.index.tz_convert("UTC").tz_localize(None)
naive_utc_pandas_df.dropna()[:5]
Open_6502 High_6502 Low_6502 Close_6502 Volume_6502
timestamp
2020-11-04 00:00:00 2669.0 2670.0 2658.0 2664.0 93000.0
2020-11-04 00:01:00 2663.0 2664.0 2650.0 2652.0 17600.0
2020-11-04 00:02:00 2649.0 2655.0 2646.0 2649.0 19200.0
2020-11-04 00:03:00 2652.0 2670.0 2651.0 2670.0 31200.0
2020-11-04 00:04:00 2671.0 2674.0 2670.0 2674.0 12800.0

polarsにpandasのindexのようなものはないので,indexをカラムにしてからpolars.from_pandasを行います.

naive_utc_polars_df = pl.from_pandas(naive_utc_pandas_df.reset_index())
naive_utc_polars_df.drop_nulls()[:5]
timestamp Open_6502 High_6502 Low_6502 Close_6502 Volume_6502
date64 f64 f64 f64 f64 f64
2020-11-04 00:00:00 2669 2670 2658 2664 9.3e4
2020-11-04 00:01:00 2663 2664 2650 2652 1.76e4
2020-11-04 00:02:00 2649 2655 2646 2649 1.92e4
2020-11-04 00:03:00 2652 2670 2651 2670 3.12e4
2020-11-04 00:04:00 2671 2674 2670 2674 1.28e4

実はawareなdatetimeを持つpandas.DataFrameを直接polars.from_pandasに入力すると,暗黙的にnaive な utcに変換されます.

aware_polars_df = pl.from_pandas(aware_pandas_df.reset_index())
aware_polars_df.drop_nulls()[:5]
c:\users\---\anaconda3\envs\calc_py39\lib\site-packages\polars\utils.py:9: UserWarning: Conversion of (potentially) timezone aware to naive datetimes. TZ information may be lost
  warnings.warn(
timestamp Open_6502 High_6502 Low_6502 Close_6502 Volume_6502
date64 f64 f64 f64 f64 f64
2020-11-04 00:00:00 2669 2670 2658 2664 9.3e4
2020-11-04 00:01:00 2663 2664 2650 2652 1.76e4
2020-11-04 00:02:00 2649 2655 2646 2649 1.92e4
2020-11-04 00:03:00 2652 2670 2651 2670 3.12e4
2020-11-04 00:04:00 2671 2674 2670 2674 1.28e4

polars to pandas

polars.frame.DataFrame.to_pandasが利用できます.polarsにはindexのようなものは無いので,timestampカラムをインデックスにします.

naive_utc_pandas_df = naive_utc_polars_df.to_pandas()
naive_utc_pandas_df = naive_utc_pandas_df.set_index("timestamp")
naive_utc_pandas_df.dropna()[:5]
Open_6502 High_6502 Low_6502 Close_6502 Volume_6502
timestamp
2020-11-04 00:00:00 2669.0 2670.0 2658.0 2664.0 93000.0
2020-11-04 00:01:00 2663.0 2664.0 2650.0 2652.0 17600.0
2020-11-04 00:02:00 2649.0 2655.0 2646.0 2649.0 19200.0
2020-11-04 00:03:00 2652.0 2670.0 2651.0 2670.0 31200.0
2020-11-04 00:04:00 2671.0 2674.0 2670.0 2674.0 12800.0

csv <-> polars

csv to polars

naiveなdatetimeを含む場合

勝手にdatetime文字列をdate64にバースしてくれます.

naive_polars_df = pl.read_csv("naive_stock_df.csv")
naive_polars_df.drop_nulls()[:5]
timestamp Open_6502 High_6502 Low_6502 Close_6502 Volume_6502
date64 f64 f64 f64 f64 f64
2020-11-04 09:00:00 2669 2670 2658 2664 9.3e4
2020-11-04 09:01:00 2663 2664 2650 2652 1.76e4
2020-11-04 09:02:00 2649 2655 2646 2649 1.92e4
2020-11-04 09:03:00 2652 2670 2651 2670 3.12e4
2020-11-04 09:04:00 2671 2674 2670 2674 1.28e4

awareなdatetimeを含む場合

文字列として受け取ってしまうため,naiveのutcであらかじめ保存して置くのがよさそうです.

aware_polars_df = pl.read_csv("aware_stock_df.csv")
aware_polars_df.drop_nulls()[:5]
timestamp Open_6502 High_6502 Low_6502 Close_6502 Volume_6502
str f64 f64 f64 f64 f64
"2020-11-04 09:00:00+09:00" 2669 2670 2658 2664 9.3e4
"2020-11-04 09:01:00+09:00" 2663 2664 2650 2652 1.76e4
"2020-11-04 09:02:00+09:00" 2649 2655 2646 2649 1.92e4
"2020-11-04 09:03:00+09:00" 2652 2670 2651 2670 3.12e4
"2020-11-04 09:04:00+09:00" 2671 2674 2670 2674 1.28e4

今のところpolars.series.Seriesの文字列処理では複雑なことはできないので,どうしても読み込みたい場合は,一度pandasで読み込んでからpolarsに変換するのがよさそうです.

aware_pandas_df = pd.read_csv("aware_stock_df.csv", index_col="timestamp", parse_dates=True)
aware_pandas_df.dropna()[:5]
Open_6502 High_6502 Low_6502 Close_6502 Volume_6502
timestamp
2020-11-04 09:00:00+09:00 2669.0 2670.0 2658.0 2664.0 93000.0
2020-11-04 09:01:00+09:00 2663.0 2664.0 2650.0 2652.0 17600.0
2020-11-04 09:02:00+09:00 2649.0 2655.0 2646.0 2649.0 19200.0
2020-11-04 09:03:00+09:00 2652.0 2670.0 2651.0 2670.0 31200.0
2020-11-04 09:04:00+09:00 2671.0 2674.0 2670.0 2674.0 12800.0

あとはpandas to polarsと同じです.

lazyに読み込む

polarsはpysparkのようにlazyに実行するapiを持っています.ioに関しても
polars.lazy.LazyFrameを出力し,最適化されて実行されます.

今のところ文字列として読み込むようで,うまく利用するにはdtypeを与えるかcastする必要がありそうです.自分はlazy-apiではdatetimeの文字列のパースはできませんでした.

naive_polars_df2 = (
    pl.scan_csv("naive_stock_df.csv",dtype={"Open_6502":pl.Float64})
    .select([pl.col("timestamp"), pl.col("Open_6502")])
    .collect()
)
naive_polars_df2.drop_nulls()[:5]
timestamp Open_6502
str f64
"2020-11-04 09:00:00" 2669
"2020-11-04 09:01:00" 2663
"2020-11-04 09:02:00" 2649
"2020-11-04 09:03:00" 2652
"2020-11-04 09:04:00" 2671
naive_polars_df2 = (
    pl.scan_csv("naive_stock_df.csv",dtype={"Open_6502":pl.Float64})
    .select((pl.col("Open_6502")+1).alias("a"))
    .select(pl.col("a"))
    .collect()
)
naive_polars_df2.drop_nulls()[:5]
a
f64
2670
2664
2650
2653
2672

polars to csv

naive_polars_df.to_csv("naive_stock_df_polars_to_csv.csv")
read_naive_polars_df = pl.read_csv("naive_stock_df_polars_to_csv.csv")
read_naive_polars_df.drop_nulls()[:5]
timestamp Open_6502 High_6502 Low_6502 Close_6502 Volume_6502
date64 f64 f64 f64 f64 f64
2020-11-04 09:00:00 2669 2670 2658 2664 9.3e4
2020-11-04 09:01:00 2663 2664 2650 2652 1.76e4
2020-11-04 09:02:00 2649 2655 2646 2649 1.92e4
2020-11-04 09:03:00 2652 2670 2651 2670 3.12e4
2020-11-04 09:04:00 2671 2674 2670 2674 1.28e4

日時の形式は"2020-11-04T00:00:00.000000000"のようになります.

parquet <-> polars

parquet to polars

naive_polars_df = pl.read_parquet("naive_stock_df.parquet")
naive_polars_df.drop_nulls()[:5]
Open_6502 High_6502 Low_6502 Close_6502 Volume_6502 timestamp
f64 f64 f64 f64 f64 date64
2669 2670 2658 2664 9.3e4 2020-11-04 09:00:00
2663 2664 2650 2652 1.76e4 2020-11-04 09:01:00
2649 2655 2646 2649 1.92e4 2020-11-04 09:02:00
2652 2670 2651 2670 3.12e4 2020-11-04 09:03:00
2671 2674 2670 2674 1.28e4 2020-11-04 09:04:00

parquetはカラムを指定して読み込みできます.

naive_polars_df = pl.read_parquet("naive_stock_df.parquet", columns=["timestamp", "Open_6502", "High_6502"])
naive_polars_df.drop_nulls()[:5]
timestamp Open_6502 High_6502
date64 f64 f64
2020-11-04 00:00:00 2669 2670
2020-11-04 00:01:00 2663 2664
2020-11-04 00:02:00 2649 2655
2020-11-04 00:03:00 2652 2670
2020-11-04 00:04:00 2671 2674

polars.scan_parquetはなぜかうまくいきませんでした.とりあえず,ioはeager-apiで行うのがよさそうです.

naive_polars_df_scan = pl.scan_parquet("naive_stock_df.parquet").collect()
naive_pandas_df_scan[:5]
---------------------------------------------------------------------------

PanicException                            Traceback (most recent call last)

<ipython-input-196-9e99cf9944e1> in <module>
----> 1 naive_polars_df_scan = pl.scan_parquet("naive_stock_df.parquet").collect()
      2 naive_pandas_df_scan[:5]


c:\users\---\anaconda3\envs\calc_py39\lib\site-packages\polars\functions.py in scan_parquet(file, stop_after_n_rows, cache)
    337     if isinstance(file, Path):
    338         file = str(file)
--> 339     return LazyFrame.scan_parquet(
    340         file=file, stop_after_n_rows=stop_after_n_rows, cache=cache
    341     )


c:\users\---\anaconda3\envs\calc_py39\lib\site-packages\polars\lazy\__init__.py in scan_parquet(file, stop_after_n_rows, cache)
    164 
    165         self = LazyFrame.__new__(LazyFrame)
--> 166         self._ldf = PyLazyFrame.new_from_parquet(file, stop_after_n_rows, cache)
    167         return self
    168 


PanicException: Arrow datatype Timestamp(Nanosecond, None) not supported by Polars

polars to parquet

一度に全部保存

naive_polars_df.to_parquet("naive_stock_df_polars_to_parquet.parquet")
parquet_file = pq.ParquetFile("naive_stock_df_polars_to_parquet.parquet")
print(parquet_file.metadata)
print(parquet_file.schema)
<pyarrow._parquet.FileMetaData object at 0x00000251253EA400>
  created_by: parquet-cpp-arrow version 4.0.1
  num_columns: 6
  num_rows: 38880
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 1401
<pyarrow._parquet.ParquetSchema object at 0x00000251254B3FC0>
required group field_id=0 schema {
  optional double field_id=1 Open_6502;
  optional double field_id=2 High_6502;
  optional double field_id=3 Low_6502;
  optional double field_id=4 Close_6502;
  optional double field_id=5 Volume_6502;
  optional int64 field_id=6 timestamp (Timestamp(isAdjustedToUTC=false, timeUnit=milliseconds, is_from_converted_type=false, force_set_converted_type=false));
}
read_naive_polars_df = pl.read_parquet("naive_stock_df_polars_to_parquet.parquet")
read_naive_polars_df.drop_nulls()[:5]
c:\users\---\anaconda3\envs\calc_py39\lib\site-packages\polars\utils.py:9: UserWarning: Conversion of (potentially) timezone aware to naive datetimes. TZ information may be lost
  warnings.warn(
Open_6502 High_6502 Low_6502 Close_6502 Volume_6502 timestamp
f64 f64 f64 f64 f64 date64
2669 2670 2658 2664 9.3e4 2020-11-04 09:00:00
2663 2664 2650 2652 1.76e4 2020-11-04 09:01:00
2649 2655 2646 2649 1.92e4 2020-11-04 09:02:00
2652 2670 2651 2670 3.12e4 2020-11-04 09:03:00
2671 2674 2670 2674 1.28e4 2020-11-04 09:04:00

(polars to arrow to parquet)

ちなみに,一度pyarrow.Tableに変換してからpqrquet形式で保存するとdatetimeの情報は失われdateのみになってしまいます.

naive_table = naive_polars_df.to_arrow()
pq.write_table(naive_table, "naive_stock_df_polars_to_arrow_to_parquet.parquet")
read_naive_polars_df = pl.read_parquet("naive_stock_df_polars_to_arrow_to_parquet.parquet")
read_naive_polars_df.drop_nulls()[:5]
Open_6502 High_6502 Low_6502 Close_6502 Volume_6502 timestamp
f64 f64 f64 f64 f64 date32
2669 2670 2658 2664 9.3e4 2020-11-04
2663 2664 2650 2652 1.76e4 2020-11-04
2649 2655 2646 2649 1.92e4 2020-11-04
2652 2670 2651 2670 3.12e4 2020-11-04
2671 2674 2670 2674 1.28e4 2020-11-04

保存されたparquetファイルの情報を確認してみるとすでにdate32であることから,write_parquetの前に情報が失われているみたいです.

parquet_file = pq.ParquetFile("naive_stock_df_polars_to_arrow_to_parquet.parquet")
print(parquet_file.metadata)
print(parquet_file.schema)
<pyarrow._parquet.FileMetaData object at 0x00000251256804A0>
  created_by: parquet-cpp-arrow version 4.0.1
  num_columns: 6
  num_rows: 38880
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 1378
<pyarrow._parquet.ParquetSchema object at 0x000002512814D440>
required group field_id=0 schema {
  optional double field_id=1 Open_6502;
  optional double field_id=2 High_6502;
  optional double field_id=3 Low_6502;
  optional double field_id=4 Close_6502;
  optional double field_id=5 Volume_6502;
  optional int32 field_id=6 timestamp (Date);
}

そこでdate32 -> date64のキャスト用の関数を作ります.これはpolarsのto_parquetで行われている処理です.

def cast_date32_to_date64(tbl):
    data = {}
    for i, column in enumerate(tbl):
        name = column._name

        # parquet casts date64 to date32 for some reason
        if column.type == pa.date64():
            column = pa.compute.cast(column, pa.timestamp("ms", None))
        data[name] = column
    tbl = pa.table(data)
    return tbl
naive_table = naive_polars_df.to_arrow()
print(naive_table.schema)

# cast
naive_table = cast_date32_to_date64(naive_table)
print(naive_table.schema)

pq.write_table(naive_table, "naive_stock_df_polars_to_arrow_to_parquet_v2.parquet", )

parquet_file = pq.ParquetFile("naive_stock_df_polars_to_arrow_to_parquet_v2.parquet")
print(parquet_file.metadata)
print(parquet_file.schema)
Open_6502: double
High_6502: double
Low_6502: double
Close_6502: double
Volume_6502: double
timestamp: date64[ms]
Open_6502: double
High_6502: double
Low_6502: double
Close_6502: double
Volume_6502: double
timestamp: timestamp[ms]
<pyarrow._parquet.FileMetaData object at 0x00000251254C1F40>
  created_by: parquet-cpp-arrow version 4.0.1
  num_columns: 6
  num_rows: 38880
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 1401
<pyarrow._parquet.ParquetSchema object at 0x0000025128154680>
required group field_id=0 schema {
  optional double field_id=1 Open_6502;
  optional double field_id=2 High_6502;
  optional double field_id=3 Low_6502;
  optional double field_id=4 Close_6502;
  optional double field_id=5 Volume_6502;
  optional int64 field_id=6 timestamp (Timestamp(isAdjustedToUTC=false, timeUnit=milliseconds, is_from_converted_type=false, force_set_converted_type=false));
}
read_naive_polars_df = pl.read_parquet("naive_stock_df_polars_to_arrow_to_parquet_v2.parquet")
read_naive_polars_df.drop_nulls()[:5]
Open_6502 High_6502 Low_6502 Close_6502 Volume_6502 timestamp
f64 f64 f64 f64 f64 date64
2669 2670 2658 2664 9.3e4 2020-11-04 09:00:00
2663 2664 2650 2652 1.76e4 2020-11-04 09:01:00
2649 2655 2646 2649 1.92e4 2020-11-04 09:02:00
2652 2670 2651 2670 3.12e4 2020-11-04 09:03:00
2671 2674 2670 2674 1.28e4 2020-11-04 09:04:00

rowを部分的に分けて保存する(pyarrowを利用)

parquetのパーティションを利用することで,データの書き込みを複数で行うことができます.以下では実際にはデータベースから取得したり大きいサイズのcsvをchunkしたりして書き込むような形です.自分はparquetファイルについて詳しくないのですが,全部メモリに乗り切らないデータをparquetファイルで保存するときに,パーティションを利用するのはいいのでしょうか?

naive_polars_df.height
38880
one_saved_rows = 10000

for i, one_chunck_indice in enumerate(chunked(range(naive_polars_df.height), one_saved_rows)):
    table = naive_polars_df[one_chunck_indice[0]:one_chunck_indice[-1]+1].to_arrow()
    table = cast_date32_to_date64(table)
    if i == 0:
        pqwriter = pa.parquet.ParquetWriter('naive_stock_df_polars_to_parquet_chunck.parquet', table.schema, version="2.0")
        
    pqwriter.write_table(table)

# close the parquet writer
if pqwriter:
    pqwriter.close()
parquet_file = pq.ParquetFile('naive_stock_df_polars_to_parquet_chunck.parquet')
print(parquet_file.metadata)
print(parquet_file.schema)
<pyarrow._parquet.FileMetaData object at 0x00000251254C1400>
  created_by: parquet-cpp-arrow version 4.0.1
  num_columns: 6
  num_rows: 38880
  num_row_groups: 4
  format_version: 2.0
  serialized_size: 3396
<pyarrow._parquet.ParquetSchema object at 0x0000025124CF0940>
required group field_id=0 schema {
  optional double field_id=1 Open_6502;
  optional double field_id=2 High_6502;
  optional double field_id=3 Low_6502;
  optional double field_id=4 Close_6502;
  optional double field_id=5 Volume_6502;
  optional int64 field_id=6 timestamp (Timestamp(isAdjustedToUTC=false, timeUnit=milliseconds, is_from_converted_type=false, force_set_converted_type=false));
}
read_naive_polars = pl.read_parquet("naive_stock_df_polars_to_parquet_chunck.parquet", columns=["timestamp","Open_6502"])
read_naive_polars.drop_nulls()[:5]
timestamp Open_6502
date64 f64
2020-11-04 09:00:00 2669
2020-11-04 09:01:00 2663
2020-11-04 09:02:00 2649
2020-11-04 09:03:00 2652
2020-11-04 09:04:00 2671

まとめ

polarsでは今のところデータベースからの読み込みはできないようで,pandas.read_sqlを利用して変換する必要があります.csvやparquetファイルは普通に読み込めるようです.

12
14
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
14

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?