PyArrowを利用してParquetを生成する方法についてです。
PyArrowがコーディング量が少なく、Spark環境も用意せずに済むからラクできるかな…
と思いきや、ちょっと一工夫必要だったという話。
※過去記事Redshift Spectrumの実装フローで触れてなかった部分です。
前提条件
- CentOS 7.6
- VSCode 1.33.1
- Python 3.7.3
- anaconda 4.6.14
- PyArrow 0.13.0
目次
- 事前準備
1.1 PyArrowのインストール
1.2 VSCodeのインテリセンス有効化 - テキストファイル読込→Parquetファイル作成
2.1 データをDecimal型に変換不要の場合
2.2 データをDecimal型に変換必要の場合
- 参考
1. 事前準備
PyArrowのインストールとVSCodeのインテリセンスを有効化を実行します
1.1 PyArrowのインストール
PyArrowのインストールは2パターン。
① anacondaを利用する場合
② pipを利用する場合
anacondaをインストールしている場合は、anacondaを使うのが無難
①anacondaを利用する場合
下記コマンドを実行
2019年5月2日時点でpyarrow 0.13.0 がインストールされる
$ conda install -c conda-forge pyarrow
②pipを利用する場合
下記コマンドを実行
$ pip install pyarrow
1.2 VSCodeのインテリセンス有効化
外部モジュールをインポートすると、インテリセンスが効かないケースがあります。
インテリセンスが効かない場合は、この記事を参照し、インテリセンスを有効化してください。
2. テキストファイル読込→Parquetファイル作成
下記のテキストファイルを変換することを想定します。
PyArrowのモジュールでは、テキストファイルを直接読込してPqrquetファイルに変換できたのですが、そこには色々と制限がありました。
id | first_name | last_name | number | gender | date_time |
---|---|---|---|---|---|
1 | Brody | Brickhill | 82.782 | Male | 2018-06-17 22:37:16 |
2 | Gaylord | Pavic | 11.978 | Male | 2019-03-10 19:39:02 |
3 | Ravid | Mountain | 51.047 | Male | 2018-07-19 15:49:45 |
4 | Bourke | Frentz | 62.795 | Male | 2018-11-22 02:58:22 |
5 | Garland | Smitherman | 38.383 | Female | 2019-01-30 16:49:28 |
2.1 元データにDecimal型が 存在しない or 存在するが精度が落ちてもよい 場合
Parquetファイル作成にあたり、次のポイントのいずれかに該当する場合は
pyarrow.csvのread_csvモジュールで直接変換します。
【ポイント】
・元データにDecimal型のデータが存在しない場合
・Parquet化したときに、数値が浮動小数点型になっても問題ない
(≒元データと多少数値がずれても問題ない)場合
【手順概略】
1. テキストファイルを読込む(pyarrow.csvのread_csv関数)
2. parquetファイルを作成
3. 作成したparquetファイルを読込、確認
【具体的手順(コード)】
pyarrow.csvモジュールのread_csv関数を利用し、テキストファイルを読込む。
from pyarroy.csv import read_csv, ReadOption, ParseOption, ConvertOption, MemoryPool
from pyarrow import int32, timestamp, string, Table, float32, parquet as pq
""" === 参考 ===
pyarrow.csv.read_csv(
input_file, # ファイルの場所を文字列で渡す
read_options=None, # ReadOptionクラスを渡す
parse_options=None, # ParseOptionクラスを渡す
convert_options=None, # ConvertOptionクラスを渡す
memory_pool=None # MemoryPoolクラスを渡す
)
### 返り値:pyarrow.Tableクラス
"""
def get_convert_options() -> ConvertOptions:
"""
ConvertOptionを返す
column_typesは、Decimal型は非対応
"""
cls = ConvertOptions
opts = cls()
opts.check_utf8 = True
# 列のデータ型を辞書型で渡す
opts.column_types = {
'id': int32(),
'first_name': string(),
'last_name': string(),
'number': float32(),
'gender': string(),
'date_time': timestamp('s')
}
# int型列に''がある場合にNaNとして変換する
opts.null_values = ['']
return(opts)
def get_pyarrow_table(target: str) -> Table:
"""
テキストファイルを読込む
target:処理対象テキストファイルのパス
返値:Tableクラス
"""
readoptions = ReadOptions(use_threads=True, block_size=1000)
convertoptions = get_convert_options()
partheroptions = ParseOptions(
delimiter='\t', double_quote=False, escape_char='\'', header_rows=1
)
readed_csv = read_csv(input_file=target, read_options=readoptions,
parse_options=partheroptions, convert_options=convertoptions)
return(readed_csv)
# 1. pyarrowでテキストファイルを読込する
print(' ---- pyarrow で テキストファイルを読込、Tableクラスを取得 ----')
arrow_table = get_pyarrow_table('/home/myuser/mock_data.txt')
# 読込んだテーブルクラスのスキーマを確認
print(arrow_table.schema)
# 読込んだテーブルクラスの中身を確認
print(arrow_table.to_pandas())
# 2. parquetファイルの作成
output_file = '/home/myuser/output/test_csv.parquet'
pq.write_table(
arrow_table, output_file, compression='gzip')
# 3. 作成したparquetファイルを読込み、確認する
print('----- parquetを読込、確認 -----')
res = pq.read_table(output_file)
print(res.to_pandas())
【注意点】
- 元データのint型列にnull(空白)が存在するとエラーが発生する
- ConvertOptionsクラスのnull_valuesに['']を指定すればNaNとして解釈される
【read_csv関数の特徴】※公式ドキュメント参照
- シングルスレッド、マルチスレッドの読み込みに対応
- 圧縮ファイルは自動解凍される(ファイル名に拡張子を付与し明示すること 例:xxx.gz)
- ヘッダーからカラム名を取得
- NaNや#N/Aなど、null値の綴りを検出
- read_csv関数に対応するデータ型は以下(Decimal型は非対応(0.13.0時点))
1. null
2. int8, int16, int32, int64
3. float16, float32, float64
4. timestamp[s]
5. string
6. binary
2.2 元データにDecimal型が存在 or 存在しないがDecimal型にしたい場合
Parquetファイル作成にあたり、次のポイントのいずれかに該当する場合は
pandasのread_csvモジュールで一度読込んでからDecimal型に変換する。
【ポイント】
・元データの数値列にDecimal型が存在する場合
・元データにDecimal型のデータは存在しないが、Parquet化にあたりDecimal型にしたい場合
・Parquet化したときに、元データの数値列とズレを起こしたくない場合
【手順概略】
1. テキストファイルを読込む(pandasのread_csv関数)
2. 読込んだデータをpyarrow.Tableクラスに変換する
3. parquetファイルを作成
4. 作成したparquetファイルを読込、確認
【具体的手順(コード)】
import pandas as pd
import decimal as D
import time
from pyarrow import Table, int32, schema, string, decimal128, timestamp, parquet as pq
# 読込データ型を指定する辞書を作成
# int型は、欠損値があるとエラーになる。
# PyArrowでint型に変換するため、いったんfloatで定義。※strだとintにできない
# convertersで指定済みの列は、「convertersで指定されたけど使わないよ」とwarningsがでる
dic_dtype = dict(
[
('id', float),
('first_name', str),
('last_name', str),
# ('number', str), #convertersで指定済みなので、不要
('gender', str)
]
)
# 速度比較用のconvert辞書その1
dic_convert = dict(
[
('number', D.Decimal),
('date_time', pd.Timestamp)
]
)
# 速度比較用のconvert辞書その2
dic_convert2 = dict(
[('number', D.Decimal)
]
)
# CSVの読込(Pandas)
filepath = "/home/myuser/mock_data.txt"
######### pandasのread_csvの代表的オプション ###########
# parse_dates :timestamp型に変換する列を[]で指定
# dtype :データ型を辞書型で指定。decimal型やtimestamp型など
# オブジェクト変換する列がある場合は、その列は記入しない。
# timestamp型は、parse_dates引数で指定、もしくはconverters引数で指定する。
# converters :ファイル読込時に、特定列に適用したいクラスを辞書型で指定。
# 例:dict[('列名A',D.Decimal),('列名B',pd.Timestamp),...]
# infer_datetime_format :parse_datesを指定する場合、Trueにすると高速解析される。
# 指定すると、速度が倍〜10倍くらい向上。
# header :ヘッダーの有無。0=ヘッダーあり None=ヘッダーなし
# テキストファイル読込の関数その1(高速化オプションを利用しない)
def get_res(filepath: str, dic_dtype: dict, dic_convert: dict) -> pd.DataFrame:
res = pd.read_csv(filepath_or_buffer=filepath, header=0,
sep='\t',
na_values='',
# compression='gzip', # gzip圧縮されている場合は、コメントを外す
encoding="utf-8",
dtype=dic_dtype,
converters=dic_convert
)
return(res)
# テキストファイル読込の関数その2(高速化オプションを利用する)
def get_res2(filepath: str, dic_dtype: dict, dic_convert: dict) -> pd.DataFrame:
res2 = pd.read_csv(filepath_or_buffer=filepath, header=0,
encoding="utf-8",
sep='\t',
na_values='',
# compression='gzip', # gzip圧縮されている場合は、コメントを外す
dtype=dic_dtype,
converters=dic_convert,
parse_dates=['date_time'],
infer_datetime_format=True
)
return(res2)
# 1. テキストファイルを読込む --- 高速化オプション利用なし
start = time.time()
res = get_res(filepath=filepath, dic_dtype=dic_dtype, dic_convert=dic_convert)
process_time = time.time() - start
print("resは{f}秒かかりました".format(f=process_time))
# 1. テキストファイルを読込む --- 高速化オプション利用あり
start = time.time()
res2 = get_res2(filepath=filepath, dic_dtype=dic_dtype,
dic_convert=dic_convert2)
process_time = time.time() - start
print("res2は{f}秒かかりました".format(f=process_time))
# 2. pyarrow.Tableクラスに変換する
# pyarrowのTable変換用に、スキーマを作成しておく
pschema = schema(
[
('id', int32()),
('first_name', string()),
('last_name', string()),
('number', decimal128(10, 3)),
('gender', string()),
('date_time', timestamp('s'))
]
)
pyarrow_table = Table.from_pandas(res2, pschema)
print(pyarrow_table)
# int型のid列にNaNが存在すると、pandasに戻した時にint型 -> float型に変換される
# print(pyarrow_table.to_pandas().iloc[0:10, :])
# 3. parquetに変換する
output_file_name = '/home/myuser/test.parquet'
pq.write_table(
pyarrow_table,
output_file_name,
compression='gzip',
# 以下2つのオプションについては、後述のTimeStamp型の注意を参照
flavor=['spark'], # 互換性のためsparkを設定
use_deprecated_int96_timestamps = True # int96のParquet形式で書き込み
)
# 作成したparquetファイルを読込、確認
# 省略します
【注意点】
- int型でデータ型指定した列にNaNが存在すると、エラーが発生し読込めない。
- 回避するには、いったんfloat型で定義 -> parquet作成時にint型に指定する。
- 逆に、parquetファイルでint型の列をpandasに戻した時に、int型列にnullが存在するとfloat型になってしまう
- 数値精度の影響はほぼないと想定、仕様だと思いましょう。
parquetのnullableについて
- pyarrowでparquetを作成すると、デフォルトでは nullable = true
- nullable = false にするとパフォーマンスに変化あるかもしれないが、未検証
TimeStamp型の注意
pyarrow.Tableクラスに格納された列にTimeStamp型がある状態で
write_table
を実行すると、エラーが起きる。
解決するためには、次のいずれか或いは両方を引数に記述してあればよい
- flavor=['spark'])
- use_deprecated_int96_timestamps=True
その他オプションに関してはpyarrow.parquet.write_tableのドキュメントを参照
Boolean型について(2019年8月16日追記)
- boolean型の列にnullが存在すると、falseに変換される
- float型、int型、string型からbooleanに変換を試みるも変換エラーで変換不可
- booleanをあきらめてint型に置換して誤魔化すしかなさそう
データ内の改行コードについて(2019年8月29日追記)
- データ内の改行コード(\nや\r)は、エスケープされていればParquet変換可能
- 上記は2.2の手順でParquet化したときに確認。2.1の手順では未検証
3. 参考
PyArrowのドキュメント
- Docs » Python bindings
- Docs » Python bindings » API Reference
- Docs » Python bindings » Reading CSV files
Decimal型のエラーに関する事例
Pandasのread_csvのドキュメント
Pandas上でのDecimal型への変換
- Using Pandas with Python Decimal for accurate currency arithmetic
- pandas read_csv column dtype is set to decimal but converts to string