最近はじめて AWS Lambda を触り、 Python ライブラリの pandas を用いてデータ処理しました。 JSON データの読み書きや S3 への保存・取得といった基本的なことについて、その方法を書き残しておきます。
本記事で扱う内容は以下のとおりです。
- pandas で JSON Lines 形式のデータを読み書きする
- データを S3 に保存、 S3 から取得する
- S3 から一部のデータのみを取得する
サンプルデータ
本記事では以下の JSON を扱います。7つのレコードを持つ配列です。
[{"number":"JY01","code":"TYO","name":{"ja":"東京","en":"Tokyo"}},
{"number":"JY05","code":"UEN","name":{"ja":"上野","en":"Ueno"}},
{"number":"JY13","code":"IKB","name":{"ja":"池袋","en":"Ikebukuro"}},
{"number":"JY17","code":"SJK","name":{"ja":"新宿","en":"Shinjuku"}},
{"number":"JY20","code":"SBY","name":{"ja":"渋谷","en":"Shibuya"}},
{"number":"JY25","code":"SGW","name":{"ja":"品川","en":"Shinagawa"}},
{"number":"JY01","code":"TYO","name":{"ja":"東京","en":"Tokyo"}}]
JSON Lines の場合は、配列ではなく改行区切りの形になります。例えば上の JSON を jq コマンドで jq -c '.[]'
と加工すればいいです。
基本的な入出力
Lambda のサンプルコード
Lambda から S3 へのデータ保存および読み込みを行うコードです。データは Lambda のペイロード(イベント JSON )から入力します。
実際に試す際の設定
実際に試す際は以下の設定が必要です。
- S3
- オブジェクトを読み書きするためのバケットを用意する
- Lambda
- レイヤーに AWSSDKPandas を追加する
- 環境変数
S3_BUCKET_NAME
とS3_OBJECT_NAME
を設定する(ここに JSON Lines のファイルを書き出す) - Lambda の実行ロールに S3 へのオブジェクトの読み書きを許可する
import io
import os
import pandas as pd
import boto3
s3_client = boto3.client('s3')
s3_bucket_name = os.environ['S3_BUCKET_NAME']
s3_object_name = os.environ['S3_OBJECT_NAME']
def lambda_handler(event, context):
df = pd.DataFrame(event)
# serialize
buf = io.BytesIO()
df.to_json(buf, orient='records', lines=True, force_ascii=False)
# upload
buf.seek(0) # rewind
options = {'ContentType': 'application/jsonlines'}
response = s3_client.put_object(Body=buf, Bucket=s3_bucket_name, Key=s3_object_name, **options)
# response = s3_client.upload_fileobj(buf, s3_bucket_name, s3_object_name, ExtraArgs=options)
# (debug)
buf.seek(0)
print(buf.read().decode())
# download
response = s3_client.get_object(Bucket=s3_bucket_name, Key=s3_object_name)
# deserialize
buf = response['Body']
df = pd.read_json(buf, orient='records', lines=True)
return df.to_dict(orient='list')
S3 に保存したデータは、 S3 Select でも確認できます。
SELECT s.name.ja, s.code FROM s3object s
東京,TYO
上野,UEN
池袋,IKB
新宿,SJK
渋谷,SBY
品川,SGW
東京,TYO
データ形式の変換方法
JSON ↔ dict / list ↔ DataFrame を相互変換できる関数には以下のようなものがあります。( pandas モジュールの指定は省略しています)
入力\出力 | JSON | dict / list | DataFrame |
---|---|---|---|
JSON | json.load json.loads |
read_json | |
dict / list | json.dump json.dumps |
DataFrame DataFrame.from_dict json_normalize |
|
DataFrame | DataFrame.to_json | DataFrame.to_dict |
- JSON ↔ dict / list
- 標準ライブラリの json モジュールでできます
- Lambda の入出力に関しては Lambda 側で処理してくれるため、出番は無いかもしれません
- JSON ↔ DataFrame
- pandas の
read_json
とto_json
でできます - 表形式のデータを JSON でどう表現するか、
orient
などのオプションで指定します- オブジェクトの配列(サンプルデータの形)なら
orient='records'
、 JSON Lines なら加えてlines=True
を指定します - gzip などの圧縮形式にも対応していて、
compression='gzip'
のように指定します
- オブジェクトの配列(サンプルデータの形)なら
- ファイルの読み書きにも対応しています
- パスを
s3://...
にして直接 S3 オブジェクトを扱うこともできます…が、 AWS が提供する pandas のレイヤーだとライブラリが削られていて利用できないようです
https://github.com/aws/aws-sdk-pandas/issues/414 - ファイルでなくメモリ上の文字列を扱う際は、ストリームを読み書きさせます
そのため純粋な文字列の場合はStringIO
などを噛ませる必要があります
- パスを
- pandas の
- DataFrame → dict / list
- pandas の
to_dict
でできます - JSON のときと同様に、表現方法を
orient
で指定します
- pandas の
- dict / list → DataFrame
-
from_dict
だと対応している形式が少なく、to_dict
の逆をできないことがあります - コンストラクタ
DataFrame
はいくつかの形式に対応していて、今回の用途にも使えます -
json_normalize
もorient='records'
の形式に利用できます- 本来は「半構造化された JSON データを平坦なテーブルに正規化する」もので、今回のようなネストしているデータに対しては注意が必要です
-
S3 オブジェクトの読み書き
AWS が提供する Lambda レイヤー AWSSDKPandas (aws-sdk-pandas) のライブラリだと read_json
や to_json
で直接 S3 オブジェクトを扱うことができなかったため、 Boto3 の S3 クライアント で JSON を格納したストリームを使用します。
- アップロード
- 一番単純には
put_object
でできます -
Body
に指定するストリームは、書き出し始めたい位置にシークしておきます(これを忘れたら空のオブジェクトができました)
- 一番単純には
- ダウンロード
-
get_object
でできます - レスポンス内の
Body
がストリームなので、これをread_json
に渡せば DataFrame に変換できます
-
JSON Lines の連結や部分読み込み
JSON で複数レコードを配列化した場合と異なり、 JSON Lines は複数ファイルを連結しても正しい JSON Lines になります。また、ある範囲の行だけを抽出してもやはり正しい JSON Lines です。
このことを利用して、 S3 オブジェクトから一部区間のデータだけを読み込めるようにしてみます。
( AWS 資格試験の問題集にあった手法をヒントに、単純化して試しています)
索引の作成と利用
サンプルデータの7行を2行+3行+2行と適当に区切って、区間2〜3(3〜7行目)だけを S3 から読み込んでみます。 S3 の GetObject
では読み込む範囲を指定できるので、そのバイト数が事前に分かればいいです。
pos[0] = 0 ---
{"number":"JY01","code":"TYO","name":{"ja":"東京","en":"Tokyo"}}
{"number":"JY05","code":"UEN","name":{"ja":"上野","en":"Ueno"}}
pos[1] = 133 ---
{"number":"JY13","code":"IKB","name":{"ja":"池袋","en":"Ikebukuro"}}
{"number":"JY17","code":"SJK","name":{"ja":"新宿","en":"Shinjuku"}}
{"number":"JY20","code":"SBY","name":{"ja":"渋谷","en":"Shibuya"}}
pos[2] = 343 ---
{"number":"JY25","code":"SGW","name":{"ja":"品川","en":"Shinagawa"}}
{"number":"JY01","code":"TYO","name":{"ja":"東京","en":"Tokyo"}}
pos[3] = 481 ---
JSON Lines を作る際は、区切りたい単位で分割しながら書き出し、何バイト目で区切られたのかを記録しておきます。以下では最小限の情報として、 pos
というリストに区切り位置を記録しています。
group_keys = (df.index + 1) // 3 # [0, 0, 1, 1, 1, 2, 2]
buf = io.BytesIO()
pos = [buf.tell()]
for _, df_chunk in df.groupby(group_keys):
df_chunk.to_json(buf, orient='records', lines=True, force_ascii=False)
pos.append(buf.tell())
print(pos)
buf
に続けて書き込んだことでデータの連結は完了しているため、 S3 へのアップロードは通常と同じようにできます。
S3 から部分的にダウンロードする際は、記録した情報を使って Range
の指定を作ります(仕様は RFC9110 に従います)。例えば区間2〜3(3〜7行目)をダウンロードするなら、 pos
を参照すると以下のようにできます。
range_spec = 'bytes={0}-{1}'.format(pos[1], pos[3] - 1)
response = s3_client.get_object(Bucket=s3_bucket_name, Key=s3_object_name, Range=range_spec)
あとは response['Body']
を読み込むことで部分的な JSON Lines が手に入ります。
データ圧縮
JSON はテキストデータなので、大きなデータは圧縮すれば容量を大幅に減らせることが多いです。一方でファイルを丸ごと圧縮してしまうと部分的な読み込みができなくなってしまいます。
実は gzip や bzip2 などは、圧縮したデータを連結しても展開でき、その結果は元のデータを連結したものと一致します。
https://manpages.debian.org/testing/manpages-ja/gzip.1.ja.html#高度な使用法
したがって、前節でグループ毎に JSON Lines 化した際に圧縮しても問題ありません。そうして保存した S3 オブジェクトは、 S3 Select でもきちんと全区間を読み込めることが確認できます。
コード全体
変更点は JSON 読み書き時の compression='gzip'
の指定(と ContentType
の変更)だけです。
import io
import os
import pandas as pd
import boto3
s3_client = boto3.client('s3')
s3_bucket_name = os.environ['S3_BUCKET_NAME']
s3_object_name = os.environ['S3_OBJECT_NAME']
def lambda_handler(event, context):
df = pd.DataFrame(event)
# serialize
group_keys = (df.index + 1) // 3 # [0, 0, 1, 1, 1, 2, 2]
buf = io.BytesIO()
pos = [buf.tell()]
for _, df_chunk in df.groupby(group_keys):
df_chunk.to_json(buf, orient='records', lines=True, compression='gzip', force_ascii=False)
pos.append(buf.tell())
print(pos)
# upload
buf.seek(0) # rewind
options = {'ContentType': 'application/gzip'}
response = s3_client.put_object(Body=buf, Bucket=s3_bucket_name, Key=s3_object_name, **options)
# response = s3_client.upload_fileobj(buf, s3_bucket_name, s3_object_name, ExtraArgs=options)
# download
range_spec = 'bytes={0}-{1}'.format(pos[1], pos[3] - 1)
response = s3_client.get_object(Bucket=s3_bucket_name, Key=s3_object_name, Range=range_spec)
# deserialize
buf = response['Body']
df = pd.read_json(buf, orient='records', lines=True, compression='gzip')
return df.to_dict(orient='list')