個人的に作成している株式データベース関連のプログラムをPythonからrustに変更するため,PandasのDataFrameの処理をpolarsのDataFrameに代替させようと考えています.そこで今回はpolarsの勉強もかねてPythonからpolarsを利用し,そのio関連について見ていきます.polarsはrustで実装されていてパフォーマンスを売りにしており,簡単にPythonから利用できるのでもしかしたらpandasの代替として使われていくかもしれません.polarsのバージョンは0.8.8
です.
インポート
インポート関連はpolarsの公式ドキュメントやpyarrowの公式ドキュメントに準じています.
import polars as pl
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import pickle
from more_itertools import chunked
pandas <-> polars
pandas to polars
利用するデータをPandasで読み込みます.インデックスがdatetimeとなっており,naiveなものawareなもの二つを利用します.データ中にNaNを含むので,表示の際はそれらを除外します.
with open("aware_stock_df.pickle", "rb") as f:
aware_pandas_df = pickle.load(f)
print(type(aware_pandas_df))
aware_pandas_df.dropna()[:5]
<class 'pandas.core.frame.DataFrame'>
Open_6502 | High_6502 | Low_6502 | Close_6502 | Volume_6502 | |
---|---|---|---|---|---|
timestamp | |||||
2020-11-04 09:00:00+09:00 | 2669.0 | 2670.0 | 2658.0 | 2664.0 | 93000.0 |
2020-11-04 09:01:00+09:00 | 2663.0 | 2664.0 | 2650.0 | 2652.0 | 17600.0 |
2020-11-04 09:02:00+09:00 | 2649.0 | 2655.0 | 2646.0 | 2649.0 | 19200.0 |
2020-11-04 09:03:00+09:00 | 2652.0 | 2670.0 | 2651.0 | 2670.0 | 31200.0 |
2020-11-04 09:04:00+09:00 | 2671.0 | 2674.0 | 2670.0 | 2674.0 | 12800.0 |
polarsはnaiveなdatetimeしか保持できないので(0.8.8現在)utcのnaiveなdatetimeに変更します.
naiveなdatetimeしか保持できないため,以降の関数では関連したwarningがでることがあります.
naive_utc_pandas_df = aware_pandas_df.copy()
naive_utc_pandas_df.index = aware_pandas_df.index.tz_convert("UTC").tz_localize(None)
naive_utc_pandas_df.dropna()[:5]
Open_6502 | High_6502 | Low_6502 | Close_6502 | Volume_6502 | |
---|---|---|---|---|---|
timestamp | |||||
2020-11-04 00:00:00 | 2669.0 | 2670.0 | 2658.0 | 2664.0 | 93000.0 |
2020-11-04 00:01:00 | 2663.0 | 2664.0 | 2650.0 | 2652.0 | 17600.0 |
2020-11-04 00:02:00 | 2649.0 | 2655.0 | 2646.0 | 2649.0 | 19200.0 |
2020-11-04 00:03:00 | 2652.0 | 2670.0 | 2651.0 | 2670.0 | 31200.0 |
2020-11-04 00:04:00 | 2671.0 | 2674.0 | 2670.0 | 2674.0 | 12800.0 |
polarsにpandasのindexのようなものはないので,indexをカラムにしてからpolars.from_pandas
を行います.
naive_utc_polars_df = pl.from_pandas(naive_utc_pandas_df.reset_index())
naive_utc_polars_df.drop_nulls()[:5]
timestamp | Open_6502 | High_6502 | Low_6502 | Close_6502 | Volume_6502 |
---|---|---|---|---|---|
date64 | f64 | f64 | f64 | f64 | f64 |
2020-11-04 00:00:00 | 2669 | 2670 | 2658 | 2664 | 9.3e4 |
2020-11-04 00:01:00 | 2663 | 2664 | 2650 | 2652 | 1.76e4 |
2020-11-04 00:02:00 | 2649 | 2655 | 2646 | 2649 | 1.92e4 |
2020-11-04 00:03:00 | 2652 | 2670 | 2651 | 2670 | 3.12e4 |
2020-11-04 00:04:00 | 2671 | 2674 | 2670 | 2674 | 1.28e4 |
実はawareなdatetimeを持つpandas.DataFrameを直接polars.from_pandas
に入力すると,暗黙的にnaive な utcに変換されます.
aware_polars_df = pl.from_pandas(aware_pandas_df.reset_index())
aware_polars_df.drop_nulls()[:5]
c:\users\---\anaconda3\envs\calc_py39\lib\site-packages\polars\utils.py:9: UserWarning: Conversion of (potentially) timezone aware to naive datetimes. TZ information may be lost
warnings.warn(
timestamp | Open_6502 | High_6502 | Low_6502 | Close_6502 | Volume_6502 |
---|---|---|---|---|---|
date64 | f64 | f64 | f64 | f64 | f64 |
2020-11-04 00:00:00 | 2669 | 2670 | 2658 | 2664 | 9.3e4 |
2020-11-04 00:01:00 | 2663 | 2664 | 2650 | 2652 | 1.76e4 |
2020-11-04 00:02:00 | 2649 | 2655 | 2646 | 2649 | 1.92e4 |
2020-11-04 00:03:00 | 2652 | 2670 | 2651 | 2670 | 3.12e4 |
2020-11-04 00:04:00 | 2671 | 2674 | 2670 | 2674 | 1.28e4 |
polars to pandas
polars.frame.DataFrame.to_pandas
が利用できます.polarsにはindexのようなものは無いので,timestampカラムをインデックスにします.
naive_utc_pandas_df = naive_utc_polars_df.to_pandas()
naive_utc_pandas_df = naive_utc_pandas_df.set_index("timestamp")
naive_utc_pandas_df.dropna()[:5]
Open_6502 | High_6502 | Low_6502 | Close_6502 | Volume_6502 | |
---|---|---|---|---|---|
timestamp | |||||
2020-11-04 00:00:00 | 2669.0 | 2670.0 | 2658.0 | 2664.0 | 93000.0 |
2020-11-04 00:01:00 | 2663.0 | 2664.0 | 2650.0 | 2652.0 | 17600.0 |
2020-11-04 00:02:00 | 2649.0 | 2655.0 | 2646.0 | 2649.0 | 19200.0 |
2020-11-04 00:03:00 | 2652.0 | 2670.0 | 2651.0 | 2670.0 | 31200.0 |
2020-11-04 00:04:00 | 2671.0 | 2674.0 | 2670.0 | 2674.0 | 12800.0 |
csv <-> polars
csv to polars
naiveなdatetimeを含む場合
勝手にdatetime文字列をdate64にバースしてくれます.
naive_polars_df = pl.read_csv("naive_stock_df.csv")
naive_polars_df.drop_nulls()[:5]
timestamp | Open_6502 | High_6502 | Low_6502 | Close_6502 | Volume_6502 |
---|---|---|---|---|---|
date64 | f64 | f64 | f64 | f64 | f64 |
2020-11-04 09:00:00 | 2669 | 2670 | 2658 | 2664 | 9.3e4 |
2020-11-04 09:01:00 | 2663 | 2664 | 2650 | 2652 | 1.76e4 |
2020-11-04 09:02:00 | 2649 | 2655 | 2646 | 2649 | 1.92e4 |
2020-11-04 09:03:00 | 2652 | 2670 | 2651 | 2670 | 3.12e4 |
2020-11-04 09:04:00 | 2671 | 2674 | 2670 | 2674 | 1.28e4 |
awareなdatetimeを含む場合
文字列として受け取ってしまうため,naiveのutcであらかじめ保存して置くのがよさそうです.
aware_polars_df = pl.read_csv("aware_stock_df.csv")
aware_polars_df.drop_nulls()[:5]
timestamp | Open_6502 | High_6502 | Low_6502 | Close_6502 | Volume_6502 |
---|---|---|---|---|---|
str | f64 | f64 | f64 | f64 | f64 |
"2020-11-04 09:00:00+09:00" | 2669 | 2670 | 2658 | 2664 | 9.3e4 |
"2020-11-04 09:01:00+09:00" | 2663 | 2664 | 2650 | 2652 | 1.76e4 |
"2020-11-04 09:02:00+09:00" | 2649 | 2655 | 2646 | 2649 | 1.92e4 |
"2020-11-04 09:03:00+09:00" | 2652 | 2670 | 2651 | 2670 | 3.12e4 |
"2020-11-04 09:04:00+09:00" | 2671 | 2674 | 2670 | 2674 | 1.28e4 |
今のところpolars.series.Series
の文字列処理では複雑なことはできないので,どうしても読み込みたい場合は,一度pandasで読み込んでからpolarsに変換するのがよさそうです.
aware_pandas_df = pd.read_csv("aware_stock_df.csv", index_col="timestamp", parse_dates=True)
aware_pandas_df.dropna()[:5]
Open_6502 | High_6502 | Low_6502 | Close_6502 | Volume_6502 | |
---|---|---|---|---|---|
timestamp | |||||
2020-11-04 09:00:00+09:00 | 2669.0 | 2670.0 | 2658.0 | 2664.0 | 93000.0 |
2020-11-04 09:01:00+09:00 | 2663.0 | 2664.0 | 2650.0 | 2652.0 | 17600.0 |
2020-11-04 09:02:00+09:00 | 2649.0 | 2655.0 | 2646.0 | 2649.0 | 19200.0 |
2020-11-04 09:03:00+09:00 | 2652.0 | 2670.0 | 2651.0 | 2670.0 | 31200.0 |
2020-11-04 09:04:00+09:00 | 2671.0 | 2674.0 | 2670.0 | 2674.0 | 12800.0 |
あとはpandas to polarsと同じです.
lazyに読み込む
polarsはpysparkのようにlazyに実行するapiを持っています.ioに関しても
polars.lazy.LazyFrame
を出力し,最適化されて実行されます.
今のところ文字列として読み込むようで,うまく利用するにはdtypeを与えるかcastする必要がありそうです.自分はlazy-apiではdatetimeの文字列のパースはできませんでした.
naive_polars_df2 = (
pl.scan_csv("naive_stock_df.csv",dtype={"Open_6502":pl.Float64})
.select([pl.col("timestamp"), pl.col("Open_6502")])
.collect()
)
naive_polars_df2.drop_nulls()[:5]
timestamp | Open_6502 |
---|---|
str | f64 |
"2020-11-04 09:00:00" | 2669 |
"2020-11-04 09:01:00" | 2663 |
"2020-11-04 09:02:00" | 2649 |
"2020-11-04 09:03:00" | 2652 |
"2020-11-04 09:04:00" | 2671 |
naive_polars_df2 = (
pl.scan_csv("naive_stock_df.csv",dtype={"Open_6502":pl.Float64})
.select((pl.col("Open_6502")+1).alias("a"))
.select(pl.col("a"))
.collect()
)
naive_polars_df2.drop_nulls()[:5]
a |
---|
f64 |
2670 |
2664 |
2650 |
2653 |
2672 |
polars to csv
naive_polars_df.to_csv("naive_stock_df_polars_to_csv.csv")
read_naive_polars_df = pl.read_csv("naive_stock_df_polars_to_csv.csv")
read_naive_polars_df.drop_nulls()[:5]
timestamp | Open_6502 | High_6502 | Low_6502 | Close_6502 | Volume_6502 |
---|---|---|---|---|---|
date64 | f64 | f64 | f64 | f64 | f64 |
2020-11-04 09:00:00 | 2669 | 2670 | 2658 | 2664 | 9.3e4 |
2020-11-04 09:01:00 | 2663 | 2664 | 2650 | 2652 | 1.76e4 |
2020-11-04 09:02:00 | 2649 | 2655 | 2646 | 2649 | 1.92e4 |
2020-11-04 09:03:00 | 2652 | 2670 | 2651 | 2670 | 3.12e4 |
2020-11-04 09:04:00 | 2671 | 2674 | 2670 | 2674 | 1.28e4 |
日時の形式は"2020-11-04T00:00:00.000000000"のようになります.
parquet <-> polars
parquet to polars
naive_polars_df = pl.read_parquet("naive_stock_df.parquet")
naive_polars_df.drop_nulls()[:5]
Open_6502 | High_6502 | Low_6502 | Close_6502 | Volume_6502 | timestamp |
---|---|---|---|---|---|
f64 | f64 | f64 | f64 | f64 | date64 |
2669 | 2670 | 2658 | 2664 | 9.3e4 | 2020-11-04 09:00:00 |
2663 | 2664 | 2650 | 2652 | 1.76e4 | 2020-11-04 09:01:00 |
2649 | 2655 | 2646 | 2649 | 1.92e4 | 2020-11-04 09:02:00 |
2652 | 2670 | 2651 | 2670 | 3.12e4 | 2020-11-04 09:03:00 |
2671 | 2674 | 2670 | 2674 | 1.28e4 | 2020-11-04 09:04:00 |
parquetはカラムを指定して読み込みできます.
naive_polars_df = pl.read_parquet("naive_stock_df.parquet", columns=["timestamp", "Open_6502", "High_6502"])
naive_polars_df.drop_nulls()[:5]
timestamp | Open_6502 | High_6502 |
---|---|---|
date64 | f64 | f64 |
2020-11-04 00:00:00 | 2669 | 2670 |
2020-11-04 00:01:00 | 2663 | 2664 |
2020-11-04 00:02:00 | 2649 | 2655 |
2020-11-04 00:03:00 | 2652 | 2670 |
2020-11-04 00:04:00 | 2671 | 2674 |
polars.scan_parquet
はなぜかうまくいきませんでした.とりあえず,ioはeager-apiで行うのがよさそうです.
naive_polars_df_scan = pl.scan_parquet("naive_stock_df.parquet").collect()
naive_pandas_df_scan[:5]
---------------------------------------------------------------------------
PanicException Traceback (most recent call last)
<ipython-input-196-9e99cf9944e1> in <module>
----> 1 naive_polars_df_scan = pl.scan_parquet("naive_stock_df.parquet").collect()
2 naive_pandas_df_scan[:5]
c:\users\---\anaconda3\envs\calc_py39\lib\site-packages\polars\functions.py in scan_parquet(file, stop_after_n_rows, cache)
337 if isinstance(file, Path):
338 file = str(file)
--> 339 return LazyFrame.scan_parquet(
340 file=file, stop_after_n_rows=stop_after_n_rows, cache=cache
341 )
c:\users\---\anaconda3\envs\calc_py39\lib\site-packages\polars\lazy\__init__.py in scan_parquet(file, stop_after_n_rows, cache)
164
165 self = LazyFrame.__new__(LazyFrame)
--> 166 self._ldf = PyLazyFrame.new_from_parquet(file, stop_after_n_rows, cache)
167 return self
168
PanicException: Arrow datatype Timestamp(Nanosecond, None) not supported by Polars
polars to parquet
一度に全部保存
naive_polars_df.to_parquet("naive_stock_df_polars_to_parquet.parquet")
parquet_file = pq.ParquetFile("naive_stock_df_polars_to_parquet.parquet")
print(parquet_file.metadata)
print(parquet_file.schema)
<pyarrow._parquet.FileMetaData object at 0x00000251253EA400>
created_by: parquet-cpp-arrow version 4.0.1
num_columns: 6
num_rows: 38880
num_row_groups: 1
format_version: 1.0
serialized_size: 1401
<pyarrow._parquet.ParquetSchema object at 0x00000251254B3FC0>
required group field_id=0 schema {
optional double field_id=1 Open_6502;
optional double field_id=2 High_6502;
optional double field_id=3 Low_6502;
optional double field_id=4 Close_6502;
optional double field_id=5 Volume_6502;
optional int64 field_id=6 timestamp (Timestamp(isAdjustedToUTC=false, timeUnit=milliseconds, is_from_converted_type=false, force_set_converted_type=false));
}
read_naive_polars_df = pl.read_parquet("naive_stock_df_polars_to_parquet.parquet")
read_naive_polars_df.drop_nulls()[:5]
c:\users\---\anaconda3\envs\calc_py39\lib\site-packages\polars\utils.py:9: UserWarning: Conversion of (potentially) timezone aware to naive datetimes. TZ information may be lost
warnings.warn(
Open_6502 | High_6502 | Low_6502 | Close_6502 | Volume_6502 | timestamp |
---|---|---|---|---|---|
f64 | f64 | f64 | f64 | f64 | date64 |
2669 | 2670 | 2658 | 2664 | 9.3e4 | 2020-11-04 09:00:00 |
2663 | 2664 | 2650 | 2652 | 1.76e4 | 2020-11-04 09:01:00 |
2649 | 2655 | 2646 | 2649 | 1.92e4 | 2020-11-04 09:02:00 |
2652 | 2670 | 2651 | 2670 | 3.12e4 | 2020-11-04 09:03:00 |
2671 | 2674 | 2670 | 2674 | 1.28e4 | 2020-11-04 09:04:00 |
(polars to arrow to parquet)
ちなみに,一度pyarrow.Tableに変換してからpqrquet形式で保存するとdatetimeの情報は失われdateのみになってしまいます.
naive_table = naive_polars_df.to_arrow()
pq.write_table(naive_table, "naive_stock_df_polars_to_arrow_to_parquet.parquet")
read_naive_polars_df = pl.read_parquet("naive_stock_df_polars_to_arrow_to_parquet.parquet")
read_naive_polars_df.drop_nulls()[:5]
Open_6502 | High_6502 | Low_6502 | Close_6502 | Volume_6502 | timestamp |
---|---|---|---|---|---|
f64 | f64 | f64 | f64 | f64 | date32 |
2669 | 2670 | 2658 | 2664 | 9.3e4 | 2020-11-04 |
2663 | 2664 | 2650 | 2652 | 1.76e4 | 2020-11-04 |
2649 | 2655 | 2646 | 2649 | 1.92e4 | 2020-11-04 |
2652 | 2670 | 2651 | 2670 | 3.12e4 | 2020-11-04 |
2671 | 2674 | 2670 | 2674 | 1.28e4 | 2020-11-04 |
保存されたparquetファイルの情報を確認してみるとすでにdate32であることから,write_parquetの前に情報が失われているみたいです.
parquet_file = pq.ParquetFile("naive_stock_df_polars_to_arrow_to_parquet.parquet")
print(parquet_file.metadata)
print(parquet_file.schema)
<pyarrow._parquet.FileMetaData object at 0x00000251256804A0>
created_by: parquet-cpp-arrow version 4.0.1
num_columns: 6
num_rows: 38880
num_row_groups: 1
format_version: 1.0
serialized_size: 1378
<pyarrow._parquet.ParquetSchema object at 0x000002512814D440>
required group field_id=0 schema {
optional double field_id=1 Open_6502;
optional double field_id=2 High_6502;
optional double field_id=3 Low_6502;
optional double field_id=4 Close_6502;
optional double field_id=5 Volume_6502;
optional int32 field_id=6 timestamp (Date);
}
そこでdate32 -> date64のキャスト用の関数を作ります.これはpolarsのto_parquetで行われている処理です.
def cast_date32_to_date64(tbl):
data = {}
for i, column in enumerate(tbl):
name = column._name
# parquet casts date64 to date32 for some reason
if column.type == pa.date64():
column = pa.compute.cast(column, pa.timestamp("ms", None))
data[name] = column
tbl = pa.table(data)
return tbl
naive_table = naive_polars_df.to_arrow()
print(naive_table.schema)
# cast
naive_table = cast_date32_to_date64(naive_table)
print(naive_table.schema)
pq.write_table(naive_table, "naive_stock_df_polars_to_arrow_to_parquet_v2.parquet", )
parquet_file = pq.ParquetFile("naive_stock_df_polars_to_arrow_to_parquet_v2.parquet")
print(parquet_file.metadata)
print(parquet_file.schema)
Open_6502: double
High_6502: double
Low_6502: double
Close_6502: double
Volume_6502: double
timestamp: date64[ms]
Open_6502: double
High_6502: double
Low_6502: double
Close_6502: double
Volume_6502: double
timestamp: timestamp[ms]
<pyarrow._parquet.FileMetaData object at 0x00000251254C1F40>
created_by: parquet-cpp-arrow version 4.0.1
num_columns: 6
num_rows: 38880
num_row_groups: 1
format_version: 1.0
serialized_size: 1401
<pyarrow._parquet.ParquetSchema object at 0x0000025128154680>
required group field_id=0 schema {
optional double field_id=1 Open_6502;
optional double field_id=2 High_6502;
optional double field_id=3 Low_6502;
optional double field_id=4 Close_6502;
optional double field_id=5 Volume_6502;
optional int64 field_id=6 timestamp (Timestamp(isAdjustedToUTC=false, timeUnit=milliseconds, is_from_converted_type=false, force_set_converted_type=false));
}
read_naive_polars_df = pl.read_parquet("naive_stock_df_polars_to_arrow_to_parquet_v2.parquet")
read_naive_polars_df.drop_nulls()[:5]
Open_6502 | High_6502 | Low_6502 | Close_6502 | Volume_6502 | timestamp |
---|---|---|---|---|---|
f64 | f64 | f64 | f64 | f64 | date64 |
2669 | 2670 | 2658 | 2664 | 9.3e4 | 2020-11-04 09:00:00 |
2663 | 2664 | 2650 | 2652 | 1.76e4 | 2020-11-04 09:01:00 |
2649 | 2655 | 2646 | 2649 | 1.92e4 | 2020-11-04 09:02:00 |
2652 | 2670 | 2651 | 2670 | 3.12e4 | 2020-11-04 09:03:00 |
2671 | 2674 | 2670 | 2674 | 1.28e4 | 2020-11-04 09:04:00 |
rowを部分的に分けて保存する(pyarrowを利用)
parquetのパーティションを利用することで,データの書き込みを複数で行うことができます.以下では実際にはデータベースから取得したり大きいサイズのcsvをchunkしたりして書き込むような形です.自分はparquetファイルについて詳しくないのですが,全部メモリに乗り切らないデータをparquetファイルで保存するときに,パーティションを利用するのはいいのでしょうか?
naive_polars_df.height
38880
one_saved_rows = 10000
for i, one_chunck_indice in enumerate(chunked(range(naive_polars_df.height), one_saved_rows)):
table = naive_polars_df[one_chunck_indice[0]:one_chunck_indice[-1]+1].to_arrow()
table = cast_date32_to_date64(table)
if i == 0:
pqwriter = pa.parquet.ParquetWriter('naive_stock_df_polars_to_parquet_chunck.parquet', table.schema, version="2.0")
pqwriter.write_table(table)
# close the parquet writer
if pqwriter:
pqwriter.close()
parquet_file = pq.ParquetFile('naive_stock_df_polars_to_parquet_chunck.parquet')
print(parquet_file.metadata)
print(parquet_file.schema)
<pyarrow._parquet.FileMetaData object at 0x00000251254C1400>
created_by: parquet-cpp-arrow version 4.0.1
num_columns: 6
num_rows: 38880
num_row_groups: 4
format_version: 2.0
serialized_size: 3396
<pyarrow._parquet.ParquetSchema object at 0x0000025124CF0940>
required group field_id=0 schema {
optional double field_id=1 Open_6502;
optional double field_id=2 High_6502;
optional double field_id=3 Low_6502;
optional double field_id=4 Close_6502;
optional double field_id=5 Volume_6502;
optional int64 field_id=6 timestamp (Timestamp(isAdjustedToUTC=false, timeUnit=milliseconds, is_from_converted_type=false, force_set_converted_type=false));
}
read_naive_polars = pl.read_parquet("naive_stock_df_polars_to_parquet_chunck.parquet", columns=["timestamp","Open_6502"])
read_naive_polars.drop_nulls()[:5]
timestamp | Open_6502 |
---|---|
date64 | f64 |
2020-11-04 09:00:00 | 2669 |
2020-11-04 09:01:00 | 2663 |
2020-11-04 09:02:00 | 2649 |
2020-11-04 09:03:00 | 2652 |
2020-11-04 09:04:00 | 2671 |
まとめ
polarsでは今のところデータベースからの読み込みはできないようで,pandas.read_sql
を利用して変換する必要があります.csvやparquetファイルは普通に読み込めるようです.