LoginSignup
10
7

More than 3 years have passed since last update.

PyArrowでテキストファイルからParquetファイルを作成する方法

Last updated at Posted at 2019-05-19

PyArrowを利用してParquetを生成する方法についてです。
PyArrowがコーディング量が少なく、Spark環境も用意せずに済むからラクできるかな…
と思いきや、ちょっと一工夫必要だったという話。

※過去記事Redshift Spectrumの実装フローで触れてなかった部分です。

前提条件

  • CentOS 7.6
  • VSCode 1.33.1
  • Python 3.7.3
  • anaconda 4.6.14
  • PyArrow 0.13.0

目次

  1. 事前準備
    1.1 PyArrowのインストール
    1.2 VSCodeのインテリセンス有効化
  2. テキストファイル読込→Parquetファイル作成
    2.1 データをDecimal型に変換不要の場合
    2.2 データをDecimal型に変換必要の場合
  3. 参考

1. 事前準備

PyArrowのインストールとVSCodeのインテリセンスを有効化を実行します

1.1 PyArrowのインストール

PyArrowのインストールは2パターン。
① anacondaを利用する場合
② pipを利用する場合
anacondaをインストールしている場合は、anacondaを使うのが無難

①anacondaを利用する場合

下記コマンドを実行
2019年5月2日時点でpyarrow 0.13.0 がインストールされる
$ conda install -c conda-forge pyarrow

②pipを利用する場合

下記コマンドを実行
$ pip install pyarrow

1.2 VSCodeのインテリセンス有効化

外部モジュールをインポートすると、インテリセンスが効かないケースがあります。
インテリセンスが効かない場合は、この記事を参照し、インテリセンスを有効化してください。

2. テキストファイル読込→Parquetファイル作成

下記のテキストファイルを変換することを想定します。
PyArrowのモジュールでは、テキストファイルを直接読込してPqrquetファイルに変換できたのですが、そこには色々と制限がありました。

id first_name last_name number gender date_time
1 Brody Brickhill 82.782 Male 2018-06-17 22:37:16
2 Gaylord Pavic 11.978 Male 2019-03-10 19:39:02
3 Ravid Mountain 51.047 Male 2018-07-19 15:49:45
4 Bourke Frentz 62.795 Male 2018-11-22 02:58:22
5 Garland Smitherman 38.383 Female 2019-01-30 16:49:28

2.1 元データにDecimal型が 存在しない or 存在するが精度が落ちてもよい 場合

Parquetファイル作成にあたり、次のポイントのいずれかに該当する場合は
pyarrow.csvのread_csvモジュールで直接変換します。

【ポイント】

・元データにDecimal型のデータが存在しない場合
・Parquet化したときに、数値が浮動小数点型になっても問題ない
 (≒元データと多少数値がずれても問題ない)場合

【手順概略】

    1. テキストファイルを読込む(pyarrow.csvのread_csv関数)
    2. parquetファイルを作成
    3. 作成したparquetファイルを読込、確認

【具体的手順(コード)】

pyarrow.csvモジュールのread_csv関数を利用し、テキストファイルを読込む。

read_csv_test.py
from pyarroy.csv import read_csv, ReadOption, ParseOption, ConvertOption, MemoryPool
from pyarrow import int32, timestamp, string, Table, float32, parquet as pq


""" === 参考 ===
pyarrow.csv.read_csv(
 input_file, # ファイルの場所を文字列で渡す
 read_options=None, # ReadOptionクラスを渡す
 parse_options=None, # ParseOptionクラスを渡す
 convert_options=None, # ConvertOptionクラスを渡す
 memory_pool=None # MemoryPoolクラスを渡す
 ) 
### 返り値:pyarrow.Tableクラス
"""

def get_convert_options() -> ConvertOptions:
    """
    ConvertOptionを返す
    column_typesは、Decimal型は非対応
    """
    cls = ConvertOptions
    opts = cls()
    opts.check_utf8 = True
    # 列のデータ型を辞書型で渡す
    opts.column_types = {
        'id': int32(),
        'first_name': string(),
        'last_name': string(), 
        'number': float32(),
        'gender': string(),
        'date_time': timestamp('s')
    }
    # int型列に''がある場合にNaNとして変換する
    opts.null_values = ['']
    return(opts)

def get_pyarrow_table(target: str) -> Table:
    """
    テキストファイルを読込む

  target:処理対象テキストファイルのパス
    返値:Tableクラス
    """
    readoptions = ReadOptions(use_threads=True, block_size=1000)
    convertoptions = get_convert_options()
    partheroptions = ParseOptions(
        delimiter='\t', double_quote=False, escape_char='\'', header_rows=1
    )
    readed_csv = read_csv(input_file=target, read_options=readoptions,
                          parse_options=partheroptions, convert_options=convertoptions)
    return(readed_csv)


# 1. pyarrowでテキストファイルを読込する
print(' ---- pyarrow で テキストファイルを読込、Tableクラスを取得 ----')
arrow_table = get_pyarrow_table('/home/myuser/mock_data.txt')

# 読込んだテーブルクラスのスキーマを確認
print(arrow_table.schema)

# 読込んだテーブルクラスの中身を確認
print(arrow_table.to_pandas())

# 2. parquetファイルの作成
output_file = '/home/myuser/output/test_csv.parquet'
pq.write_table(
    arrow_table, output_file, compression='gzip')

# 3. 作成したparquetファイルを読込み、確認する
print('----- parquetを読込、確認 -----')
res = pq.read_table(output_file)
print(res.to_pandas())

【注意点】

  • 元データのint型列にnull(空白)が存在するとエラーが発生する
    • ConvertOptionsクラスのnull_valuesに['']を指定すればNaNとして解釈される

【read_csv関数の特徴】※公式ドキュメント参照

  • シングルスレッド、マルチスレッドの読み込みに対応
  • 圧縮ファイルは自動解凍される(ファイル名に拡張子を付与し明示すること 例:xxx.gz)
  • ヘッダーからカラム名を取得
  • NaNや#N/Aなど、null値の綴りを検出
  • read_csv関数に対応するデータ型は以下(Decimal型は非対応(0.13.0時点))
    1. null
    2. int8, int16, int32, int64
    3. float16, float32, float64
    4. timestamp[s]
    5. string
    6. binary

2.2 元データにDecimal型が存在 or 存在しないがDecimal型にしたい場合

Parquetファイル作成にあたり、次のポイントのいずれかに該当する場合は
pandasのread_csvモジュールで一度読込んでからDecimal型に変換する。

【ポイント】

・元データの数値列にDecimal型が存在する場合
・元データにDecimal型のデータは存在しないが、Parquet化にあたりDecimal型にしたい場合
・Parquet化したときに、元データの数値列とズレを起こしたくない場合

【手順概略】

    1. テキストファイルを読込む(pandasのread_csv関数)
    2. 読込んだデータをpyarrow.Tableクラスに変換する
    3. parquetファイルを作成
    4. 作成したparquetファイルを読込、確認

【具体的手順(コード)】

from_pd_to_parquet.py
import pandas as pd
import decimal as D
import time
from pyarrow import Table, int32, schema, string, decimal128, timestamp, parquet as pq

# 読込データ型を指定する辞書を作成
# int型は、欠損値があるとエラーになる。
# PyArrowでint型に変換するため、いったんfloatで定義。※strだとintにできない
# convertersで指定済みの列は、「convertersで指定されたけど使わないよ」とwarningsがでる
dic_dtype = dict(
    [
        ('id', float),  
        ('first_name', str),
        ('last_name', str),
        # ('number', str), #convertersで指定済みなので、不要
        ('gender', str)
    ]
)

# 速度比較用のconvert辞書その1
dic_convert = dict(
    [
        ('number', D.Decimal),
        ('date_time', pd.Timestamp)
    ]
)

# 速度比較用のconvert辞書その2
dic_convert2 = dict(
    [('number', D.Decimal)
     ]
)


# CSVの読込(Pandas)
filepath = "/home/myuser/mock_data.txt"

######### pandasのread_csvの代表的オプション ###########
# parse_dates :timestamp型に変換する列を[]で指定
# dtype       :データ型を辞書型で指定。decimal型やtimestamp型など
#               オブジェクト変換する列がある場合は、その列は記入しない。
#               timestamp型は、parse_dates引数で指定、もしくはconverters引数で指定する。
# converters  :ファイル読込時に、特定列に適用したいクラスを辞書型で指定。 
#               例:dict[('列名A',D.Decimal),('列名B',pd.Timestamp),...]
# infer_datetime_format :parse_datesを指定する場合、Trueにすると高速解析される。
#                         指定すると、速度が倍〜10倍くらい向上。
# header      :ヘッダーの有無。0=ヘッダーあり None=ヘッダーなし

# テキストファイル読込の関数その1(高速化オプションを利用しない)
def get_res(filepath: str, dic_dtype: dict, dic_convert: dict) -> pd.DataFrame:
    res = pd.read_csv(filepath_or_buffer=filepath, header=0,
                      sep='\t',
                      na_values='',
                      # compression='gzip', # gzip圧縮されている場合は、コメントを外す
                      encoding="utf-8",
                      dtype=dic_dtype,
                      converters=dic_convert
                      )
    return(res)

# テキストファイル読込の関数その2(高速化オプションを利用する)
def get_res2(filepath: str, dic_dtype: dict, dic_convert: dict) -> pd.DataFrame:
    res2 = pd.read_csv(filepath_or_buffer=filepath, header=0,
                       encoding="utf-8",
                       sep='\t',
                       na_values='',
                       # compression='gzip', # gzip圧縮されている場合は、コメントを外す
                       dtype=dic_dtype,
                       converters=dic_convert,
                       parse_dates=['date_time'],
                       infer_datetime_format=True
                       )
    return(res2)

# 1. テキストファイルを読込む --- 高速化オプション利用なし
start = time.time()
res = get_res(filepath=filepath, dic_dtype=dic_dtype, dic_convert=dic_convert)
process_time = time.time() - start
print("resは{f}秒かかりました".format(f=process_time))

# 1. テキストファイルを読込む --- 高速化オプション利用あり
start = time.time()
res2 = get_res2(filepath=filepath, dic_dtype=dic_dtype,
                dic_convert=dic_convert2)
process_time = time.time() - start
print("res2は{f}秒かかりました".format(f=process_time))

# 2. pyarrow.Tableクラスに変換する
# pyarrowのTable変換用に、スキーマを作成しておく
pschema = schema(
    [
        ('id', int32()),
        ('first_name', string()),
        ('last_name', string()),
        ('number', decimal128(10, 3)),
        ('gender', string()),
        ('date_time', timestamp('s'))
    ]
)
pyarrow_table = Table.from_pandas(res2, pschema)
print(pyarrow_table)
# int型のid列にNaNが存在すると、pandasに戻した時にint型 -> float型に変換される
# print(pyarrow_table.to_pandas().iloc[0:10, :])

# 3. parquetに変換する
output_file_name = '/home/myuser/test.parquet'
pq.write_table(
        pyarrow_table, 
        output_file_name,
        compression='gzip', 
        # 以下2つのオプションについては、後述のTimeStamp型の注意を参照
        flavor=['spark'], # 互換性のためsparkを設定 
        use_deprecated_int96_timestamps = True # int96のParquet形式で書き込み
        )

# 作成したparquetファイルを読込、確認
# 省略します

【注意点】

  • int型でデータ型指定した列にNaNが存在すると、エラーが発生し読込めない。
  • 回避するには、いったんfloat型で定義 -> parquet作成時にint型に指定する。
  • 逆に、parquetファイルでint型の列をpandasに戻した時に、int型列にnullが存在するとfloat型になってしまう
  • 数値精度の影響はほぼないと想定、仕様だと思いましょう。

parquetのnullableについて

  • pyarrowでparquetを作成すると、デフォルトでは nullable = true
  • nullable = false にするとパフォーマンスに変化あるかもしれないが、未検証

TimeStamp型の注意

pyarrow.Tableクラスに格納された列にTimeStamp型がある状態で
write_tableを実行すると、エラーが起きる。
解決するためには、次のいずれか或いは両方を引数に記述してあればよい

  • flavor=['spark'])
  • use_deprecated_int96_timestamps=True

その他オプションに関してはpyarrow.parquet.write_tableのドキュメントを参照

Boolean型について(2019年8月16日追記)

  • boolean型の列にnullが存在すると、falseに変換される
  • float型、int型、string型からbooleanに変換を試みるも変換エラーで変換不可
  • booleanをあきらめてint型に置換して誤魔化すしかなさそう

データ内の改行コードについて(2019年8月29日追記)

  • データ内の改行コード(\nや\r)は、エスケープされていればParquet変換可能
  • 上記は2.2の手順でParquet化したときに確認。2.1の手順では未検証

3. 参考

PyArrowのドキュメント

Decimal型のエラーに関する事例

Pandasのread_csvのドキュメント

Pandas上でのDecimal型への変換

Pandas上でのTimestamp型への変換

10
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
7