Edited at

PyArrowでテキストファイルからParquetファイルを作成する方法

PyArrowを利用してParquetを生成する方法についてです。

PyArrowがコーディング量が少なく、Spark環境も用意せずに済むからラクできるかな…

と思いきや、ちょっと一工夫必要だったという話。

※過去記事Redshift Spectrumの実装フローで触れてなかった部分です。


前提条件


  • CentOS 7.6

  • VSCode 1.33.1

  • Python 3.7.3

  • anaconda 4.6.14

  • PyArrow 0.13.0


目次


  1. 事前準備

    1.1 PyArrowのインストール

    1.2 VSCodeのインテリセンス有効化

  2. テキストファイル読込→Parquetファイル作成

    2.1 データをDecimal型に変換不要の場合

    2.2 データをDecimal型に変換必要の場合


  3. 参考


1. 事前準備

PyArrowのインストールとVSCodeのインテリセンスを有効化を実行します


1.1 PyArrowのインストール

PyArrowのインストールは2パターン。

① anacondaを利用する場合

② pipを利用する場合

anacondaをインストールしている場合は、anacondaを使うのが無難


①anacondaを利用する場合

下記コマンドを実行

2019年5月2日時点でpyarrow 0.13.0 がインストールされる

$ conda install -c conda-forge pyarrow


②pipを利用する場合

下記コマンドを実行

$ pip install pyarrow


1.2 VSCodeのインテリセンス有効化

外部モジュールをインポートすると、インテリセンスが効かないケースがあります。

インテリセンスが効かない場合は、この記事を参照し、インテリセンスを有効化してください。


2. テキストファイル読込→Parquetファイル作成

下記のテキストファイルを変換することを想定します。

PyArrowのモジュールでは、テキストファイルを直接読込してPqrquetファイルに変換できたのですが、そこには色々と制限がありました。

id
first_name
last_name
number
gender
date_time

1
Brody
Brickhill
82.782
Male
2018-06-17 22:37:16

2
Gaylord
Pavic
11.978
Male
2019-03-10 19:39:02

3
Ravid
Mountain
51.047
Male
2018-07-19 15:49:45

4
Bourke
Frentz
62.795
Male
2018-11-22 02:58:22

5
Garland
Smitherman
38.383
Female
2019-01-30 16:49:28


2.1 元データにDecimal型が 存在しない or 存在するが精度が落ちてもよい 場合

Parquetファイル作成にあたり、次のポイントのいずれかに該当する場合は

pyarrow.csvのread_csvモジュールで直接変換します。


【ポイント】

・元データにDecimal型のデータが存在しない場合

・Parquet化したときに、数値が浮動小数点型になっても問題ない
 (≒元データと多少数値がずれても問題ない)場合


【手順概略】

    1. テキストファイルを読込む(pyarrow.csvのread_csv関数)

    2. parquetファイルを作成

    3. 作成したparquetファイルを読込、確認


【具体的手順(コード)】

pyarrow.csvモジュールのread_csv関数を利用し、テキストファイルを読込む。


read_csv_test.py

from pyarroy.csv import read_csv, ReadOption, ParseOption, ConvertOption, MemoryPool

from pyarrow import int32, timestamp, string, Table, float32, parquet as pq

""" === 参考 ===
pyarrow.csv.read_csv(
input_file, # ファイルの場所を文字列で渡す
read_options=None, # ReadOptionクラスを渡す
parse_options=None, # ParseOptionクラスを渡す
convert_options=None, # ConvertOptionクラスを渡す
memory_pool=None # MemoryPoolクラスを渡す
)
### 返り値:pyarrow.Tableクラス
"""

def get_convert_options() -> ConvertOptions:
"""
ConvertOptionを返す
column_typesは、Decimal型は非対応
"""

cls = ConvertOptions
opts = cls()
opts.check_utf8 = True
# 列のデータ型を辞書型で渡す
opts.column_types = {
'id': int32(),
'first_name': string(),
'last_name': string(),
'number': float32(),
'gender': string(),
'date_time': timestamp('s')
}
# int型列に''がある場合にNaNとして変換する
opts.null_values = ['']
return(opts)

def get_pyarrow_table(target: str) -> Table:
"""
テキストファイルを読込む

  target:処理対象テキストファイルのパス
返値:Tableクラス
"""
readoptions = ReadOptions(use_threads=True, block_size=1000)
convertoptions = get_convert_options()
partheroptions = ParseOptions(
delimiter='\t', double_quote=False, escape_char='\'', header_rows=1
)
readed_csv = read_csv(input_file=target, read_options=readoptions,
parse_options=partheroptions, convert_options=convertoptions)
return(readed_csv)

# 1. pyarrowでテキストファイルを読込する
print(' ---- pyarrow で テキストファイルを読込、Tableクラスを取得 ----')
arrow_table = get_pyarrow_table('/home/myuser/mock_data.txt')

# 読込んだテーブルクラスのスキーマを確認
print(arrow_table.schema)

# 読込んだテーブルクラスの中身を確認
print(arrow_table.to_pandas())

# 2. parquetファイルの作成
output_file = '/home/myuser/output/test_csv.parquet'
pq.write_table(
arrow_table, output_file, compression='gzip')

# 3. 作成したparquetファイルを読込み、確認する
print('----- parquetを読込、確認 -----')
res = pq.read_table(output_file)
print(res.to_pandas())



【注意点】


  • 元データのint型列にnull(空白)が存在するとエラーが発生する


    • ConvertOptionsクラスのnull_valuesに['']を指定すればNaNとして解釈される




【read_csv関数の特徴】※公式ドキュメント参照


  • シングルスレッド、マルチスレッドの読み込みに対応

  • 圧縮ファイルは自動解凍される(ファイル名に拡張子を付与し明示すること 例:xxx.gz)

  • ヘッダーからカラム名を取得

  • NaNや#N/Aなど、null値の綴りを検出

  • read_csv関数に対応するデータ型は以下(Decimal型は非対応(0.13.0時点))
    1. null
    2. int8, int16, int32, int64
    3. float16, float32, float64
    4. timestamp[s]
    5. string
    6. binary


2.2 元データにDecimal型が存在 or 存在しないがDecimal型にしたい場合

Parquetファイル作成にあたり、次のポイントのいずれかに該当する場合は

pandasのread_csvモジュールで一度読込んでからDecimal型に変換する。


【ポイント】

・元データの数値列にDecimal型が存在する場合

・元データにDecimal型のデータは存在しないが、Parquet化にあたりDecimal型にしたい場合

・Parquet化したときに、元データの数値列とズレを起こしたくない場合


【手順概略】

    1. テキストファイルを読込む(pandasのread_csv関数)

    2. 読込んだデータをpyarrow.Tableクラスに変換する

    3. parquetファイルを作成

    4. 作成したparquetファイルを読込、確認


【具体的手順(コード)】


from_pd_to_parquet.py

import pandas as pd

import decimal as D
import time
from pyarrow import Table, int32, schema, string, decimal128, timestamp, parquet as pq

# 読込データ型を指定する辞書を作成
# int型は、欠損値があるとエラーになる。
# PyArrowでint型に変換するため、いったんfloatで定義。※strだとintにできない
# convertersで指定済みの列は、「convertersで指定されたけど使わないよ」とwarningsがでる
dic_dtype = dict(
[
('id', float),
('first_name', str),
('last_name', str),
# ('number', str), #convertersで指定済みなので、不要
('gender', str)
]
)

# 速度比較用のconvert辞書その1
dic_convert = dict(
[
('number', D.Decimal),
('date_time', pd.Timestamp)
]
)

# 速度比較用のconvert辞書その2
dic_convert2 = dict(
[('number', D.Decimal)
]
)

# CSVの読込(Pandas)
filepath = "/home/myuser/mock_data.txt"

######### pandasのread_csvの代表的オプション ###########
# parse_dates :timestamp型に変換する列を[]で指定
# dtype :データ型を辞書型で指定。decimal型やtimestamp型など
# オブジェクト変換する列がある場合は、その列は記入しない。
# timestamp型は、parse_dates引数で指定、もしくはconverters引数で指定する。
# converters :ファイル読込時に、特定列に適用したいクラスを辞書型で指定。
# 例:dict[('列名A',D.Decimal),('列名B',pd.Timestamp),...]
# infer_datetime_format :parse_datesを指定する場合、Trueにすると高速解析される。
# 指定すると、速度が倍〜10倍くらい向上。
# header :ヘッダーの有無。0=ヘッダーあり None=ヘッダーなし

# テキストファイル読込の関数その1(高速化オプションを利用しない)
def get_res(filepath: str, dic_dtype: dict, dic_convert: dict) -> pd.DataFrame:
res = pd.read_csv(filepath_or_buffer=filepath, header=0,
sep='\t',
na_values='',
# compression='gzip', # gzip圧縮されている場合は、コメントを外す
encoding="utf-8",
dtype=dic_dtype,
converters=dic_convert
)
return(res)

# テキストファイル読込の関数その2(高速化オプションを利用する)
def get_res2(filepath: str, dic_dtype: dict, dic_convert: dict) -> pd.DataFrame:
res2 = pd.read_csv(filepath_or_buffer=filepath, header=0,
encoding="utf-8",
sep='\t',
na_values='',
# compression='gzip', # gzip圧縮されている場合は、コメントを外す
dtype=dic_dtype,
converters=dic_convert,
parse_dates=['date_time'],
infer_datetime_format=True
)
return(res2)

# 1. テキストファイルを読込む --- 高速化オプション利用なし
start = time.time()
res = get_res(filepath=filepath, dic_dtype=dic_dtype, dic_convert=dic_convert)
process_time = time.time() - start
print("resは{f}秒かかりました".format(f=process_time))

# 1. テキストファイルを読込む --- 高速化オプション利用あり
start = time.time()
res2 = get_res2(filepath=filepath, dic_dtype=dic_dtype,
dic_convert=dic_convert2)
process_time = time.time() - start
print("res2は{f}秒かかりました".format(f=process_time))

# 2. pyarrow.Tableクラスに変換する
# pyarrowのTable変換用に、スキーマを作成しておく
pschema = schema(
[
('id', int32()),
('first_name', string()),
('last_name', string()),
('number', decimal128(10, 3)),
('gender', string()),
('date_time', timestamp('s'))
]
)
pyarrow_table = Table.from_pandas(res2, pschema)
print(pyarrow_table)
# int型のid列にNaNが存在すると、pandasに戻した時にint型 -> float型に変換される
# print(pyarrow_table.to_pandas().iloc[0:10, :])

# 3. parquetに変換する
output_file_name = '/home/myuser/test.parquet'
pq.write_table(
pyarrow_table,
output_file_name,
compression='gzip',
# 以下2つのオプションについては、後述のTimeStamp型の注意を参照
flavor=['spark'], # 互換性のためsparkを設定
use_deprecated_int96_timestamps = True # int96のParquet形式で書き込み
)

# 作成したparquetファイルを読込、確認
# 省略します



【注意点】


  • int型でデータ型指定した列にNaNが存在すると、エラーが発生し読込めない。

  • 回避するには、いったんfloat型で定義 -> parquet作成時にint型に指定する。

  • 逆に、parquetファイルでint型の列をpandasに戻した時に、int型列にnullが存在するとfloat型になってしまう

  • 数値精度の影響はほぼないと想定、仕様だと思いましょう。


parquetのnullableについて


  • pyarrowでparquetを作成すると、デフォルトでは nullable = true

  • nullable = false にするとパフォーマンスに変化あるかもしれないが、未検証


TimeStamp型の注意

pyarrow.Tableクラスに格納された列にTimeStamp型がある状態で

write_tableを実行すると、エラーが起きる。

解決するためには、次のいずれか或いは両方を引数に記述してあればよい


  • flavor=['spark'])

  • use_deprecated_int96_timestamps=True

その他オプションに関してはpyarrow.parquet.write_tableのドキュメントを参照


Boolean型について(2019年8月16日追記)


  • boolean型の列にnullが存在すると、falseに変換される

  • float型、int型、string型からbooleanに変換を試みるも変換エラーで変換不可

  • booleanをあきらめてint型に置換して誤魔化すしかなさそう


データ内の改行コードについて(2019年8月29日追記)


  • データ内の改行コード(\nや\r)は、エスケープされていればParquet変換可能

  • 上記は2.2の手順でParquet化したときに確認。2.1の手順では未検証


3. 参考


PyArrowのドキュメント


Decimal型のエラーに関する事例


Pandasのread_csvのドキュメント


Pandas上でのDecimal型への変換


Pandas上でのTimestamp型への変換