1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

pandas を用いて JSON Lines 形式で S3 に保存する

Posted at

最近はじめて AWS Lambda を触り、 Python ライブラリの pandas を用いてデータ処理しました。 JSON データの読み書きや S3 への保存・取得といった基本的なことについて、その方法を書き残しておきます。

本記事で扱う内容は以下のとおりです。

  • pandas で JSON Lines 形式のデータを読み書きする
  • データを S3 に保存、 S3 から取得する
  • S3 から一部のデータのみを取得する

サンプルデータ

本記事では以下の JSON を扱います。7つのレコードを持つ配列です。

[{"number":"JY01","code":"TYO","name":{"ja":"東京","en":"Tokyo"}},
 {"number":"JY05","code":"UEN","name":{"ja":"上野","en":"Ueno"}},
 {"number":"JY13","code":"IKB","name":{"ja":"池袋","en":"Ikebukuro"}},
 {"number":"JY17","code":"SJK","name":{"ja":"新宿","en":"Shinjuku"}},
 {"number":"JY20","code":"SBY","name":{"ja":"渋谷","en":"Shibuya"}},
 {"number":"JY25","code":"SGW","name":{"ja":"品川","en":"Shinagawa"}},
 {"number":"JY01","code":"TYO","name":{"ja":"東京","en":"Tokyo"}}]

JSON Lines の場合は、配列ではなく改行区切りの形になります。例えば上の JSON を jq コマンドで jq -c '.[]' と加工すればいいです。

基本的な入出力

Lambda のサンプルコード

Lambda から S3 へのデータ保存および読み込みを行うコードです。データは Lambda のペイロード(イベント JSON )から入力します。

実際に試す際の設定

実際に試す際は以下の設定が必要です。

  • S3
    • オブジェクトを読み書きするためのバケットを用意する
  • Lambda
    • レイヤーに AWSSDKPandas を追加する
    • 環境変数 S3_BUCKET_NAMES3_OBJECT_NAME を設定する(ここに JSON Lines のファイルを書き出す)
    • Lambda の実行ロールに S3 へのオブジェクトの読み書きを許可する
lambda_function.py
import io
import os
import pandas as pd
import boto3

s3_client = boto3.client('s3')
s3_bucket_name = os.environ['S3_BUCKET_NAME']
s3_object_name = os.environ['S3_OBJECT_NAME']

def lambda_handler(event, context):

    df = pd.DataFrame(event)

    # serialize
    buf = io.BytesIO()
    df.to_json(buf, orient='records', lines=True, force_ascii=False)

    # upload
    buf.seek(0)  # rewind
    options = {'ContentType': 'application/jsonlines'}
    response = s3_client.put_object(Body=buf, Bucket=s3_bucket_name, Key=s3_object_name, **options)
    # response = s3_client.upload_fileobj(buf, s3_bucket_name, s3_object_name, ExtraArgs=options)

    # (debug)
    buf.seek(0)
    print(buf.read().decode())

    # download
    response = s3_client.get_object(Bucket=s3_bucket_name, Key=s3_object_name)

    # deserialize
    buf = response['Body']
    df = pd.read_json(buf, orient='records', lines=True)

    return df.to_dict(orient='list')

S3 に保存したデータは、 S3 Select でも確認できます。

SELECT s.name.ja, s.code FROM s3object s
東京,TYO
上野,UEN
池袋,IKB
新宿,SJK
渋谷,SBY
品川,SGW
東京,TYO

データ形式の変換方法

JSON ↔ dict / list ↔ DataFrame を相互変換できる関数には以下のようなものがあります。( pandas モジュールの指定は省略しています)

入力\出力 JSON dict / list DataFrame
JSON json.load
json.loads
read_json
dict / list json.dump
json.dumps
DataFrame
DataFrame.from_dict
json_normalize
DataFrame DataFrame.to_json DataFrame.to_dict
  • JSON ↔ dict / list
    • 標準ライブラリの json モジュールでできます
    • Lambda の入出力に関しては Lambda 側で処理してくれるため、出番は無いかもしれません
  • JSON ↔ DataFrame
    • pandas の read_jsonto_json でできます
    • 表形式のデータを JSON でどう表現するか、 orient などのオプションで指定します
      • オブジェクトの配列(サンプルデータの形)なら orient='records' 、 JSON Lines なら加えて lines=True を指定します
      • gzip などの圧縮形式にも対応していて、 compression='gzip' のように指定します
    • ファイルの読み書きにも対応しています
      • パスを s3://... にして直接 S3 オブジェクトを扱うこともできます…が、 AWS が提供する pandas のレイヤーだとライブラリが削られていて利用できないようです
        https://github.com/aws/aws-sdk-pandas/issues/414
      • ファイルでなくメモリ上の文字列を扱う際は、ストリームを読み書きさせます
        そのため純粋な文字列の場合は StringIO などを噛ませる必要があります
  • DataFrame → dict / list
    • pandas の to_dict でできます
    • JSON のときと同様に、表現方法を orient で指定します
  • dict / list → DataFrame
    • from_dict だと対応している形式が少なく、 to_dict の逆をできないことがあります
    • コンストラクタ DataFrame はいくつかの形式に対応していて、今回の用途にも使えます
    • json_normalizeorient='records' の形式に利用できます
      • 本来は「半構造化された JSON データを平坦なテーブルに正規化する」もので、今回のようなネストしているデータに対しては注意が必要です

S3 オブジェクトの読み書き

AWS が提供する Lambda レイヤー AWSSDKPandas (aws-sdk-pandas) のライブラリだと read_jsonto_json で直接 S3 オブジェクトを扱うことができなかったため、 Boto3 の S3 クライアント で JSON を格納したストリームを使用します。

  • アップロード
    • 一番単純には put_object でできます
    • Body に指定するストリームは、書き出し始めたい位置にシークしておきます(これを忘れたら空のオブジェクトができました)
  • ダウンロード
    • get_object でできます
    • レスポンス内の Body がストリームなので、これを read_json に渡せば DataFrame に変換できます

JSON Lines の連結や部分読み込み

JSON で複数レコードを配列化した場合と異なり、 JSON Lines は複数ファイルを連結しても正しい JSON Lines になります。また、ある範囲の行だけを抽出してもやはり正しい JSON Lines です。

このことを利用して、 S3 オブジェクトから一部区間のデータだけを読み込めるようにしてみます。

( AWS 資格試験の問題集にあった手法をヒントに、単純化して試しています)

索引の作成と利用

サンプルデータの7行を2行+3行+2行と適当に区切って、区間2〜3(3〜7行目)だけを S3 から読み込んでみます。 S3 の GetObject では読み込む範囲を指定できるので、そのバイト数が事前に分かればいいです。

pos[0] =   0 ---
                {"number":"JY01","code":"TYO","name":{"ja":"東京","en":"Tokyo"}}
                {"number":"JY05","code":"UEN","name":{"ja":"上野","en":"Ueno"}}
pos[1] = 133 ---
                {"number":"JY13","code":"IKB","name":{"ja":"池袋","en":"Ikebukuro"}}
                {"number":"JY17","code":"SJK","name":{"ja":"新宿","en":"Shinjuku"}}
                {"number":"JY20","code":"SBY","name":{"ja":"渋谷","en":"Shibuya"}}
pos[2] = 343 ---
                {"number":"JY25","code":"SGW","name":{"ja":"品川","en":"Shinagawa"}}
                {"number":"JY01","code":"TYO","name":{"ja":"東京","en":"Tokyo"}}
pos[3] = 481 ---

JSON Lines を作る際は、区切りたい単位で分割しながら書き出し、何バイト目で区切られたのかを記録しておきます。以下では最小限の情報として、 pos というリストに区切り位置を記録しています。

serialize
    group_keys = (df.index + 1) // 3  # [0, 0, 1, 1, 1, 2, 2]

    buf = io.BytesIO()
    pos = [buf.tell()]
    for _, df_chunk in df.groupby(group_keys):
        df_chunk.to_json(buf, orient='records', lines=True, force_ascii=False)
        pos.append(buf.tell())

    print(pos)

buf に続けて書き込んだことでデータの連結は完了しているため、 S3 へのアップロードは通常と同じようにできます。

S3 から部分的にダウンロードする際は、記録した情報を使って Range の指定を作ります(仕様は RFC9110 に従います)。例えば区間2〜3(3〜7行目)をダウンロードするなら、 pos を参照すると以下のようにできます。

download
    range_spec = 'bytes={0}-{1}'.format(pos[1], pos[3] - 1)
    response = s3_client.get_object(Bucket=s3_bucket_name, Key=s3_object_name, Range=range_spec)

あとは response['Body'] を読み込むことで部分的な JSON Lines が手に入ります。

データ圧縮

JSON はテキストデータなので、大きなデータは圧縮すれば容量を大幅に減らせることが多いです。一方でファイルを丸ごと圧縮してしまうと部分的な読み込みができなくなってしまいます。

実は gzip や bzip2 などは、圧縮したデータを連結しても展開でき、その結果は元のデータを連結したものと一致します。
https://manpages.debian.org/testing/manpages-ja/gzip.1.ja.html#高度な使用法

したがって、前節でグループ毎に JSON Lines 化した際に圧縮しても問題ありません。そうして保存した S3 オブジェクトは、 S3 Select でもきちんと全区間を読み込めることが確認できます。

コード全体

変更点は JSON 読み書き時の compression='gzip' の指定(と ContentType の変更)だけです。

lambda_function.py
import io
import os
import pandas as pd
import boto3

s3_client = boto3.client('s3')
s3_bucket_name = os.environ['S3_BUCKET_NAME']
s3_object_name = os.environ['S3_OBJECT_NAME']

def lambda_handler(event, context):

    df = pd.DataFrame(event)

    # serialize
    group_keys = (df.index + 1) // 3  # [0, 0, 1, 1, 1, 2, 2]

    buf = io.BytesIO()
    pos = [buf.tell()]
    for _, df_chunk in df.groupby(group_keys):
        df_chunk.to_json(buf, orient='records', lines=True, compression='gzip', force_ascii=False)
        pos.append(buf.tell())

    print(pos)

    # upload
    buf.seek(0)  # rewind
    options = {'ContentType': 'application/gzip'}
    response = s3_client.put_object(Body=buf, Bucket=s3_bucket_name, Key=s3_object_name, **options)
    # response = s3_client.upload_fileobj(buf, s3_bucket_name, s3_object_name, ExtraArgs=options)

    # download
    range_spec = 'bytes={0}-{1}'.format(pos[1], pos[3] - 1)
    response = s3_client.get_object(Bucket=s3_bucket_name, Key=s3_object_name, Range=range_spec)

    # deserialize
    buf = response['Body']
    df = pd.read_json(buf, orient='records', lines=True, compression='gzip')

    return df.to_dict(orient='list')
1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?