Posted at
MobingiDay 17

pyarrowを使ってLambdaからS3のparquetファイルを読む


あらすじ

この記事はMobingi Advent Calendar 2018の17日目の記事です。

Lambda Layerのおかげで重めのライブラリをLambdaで使うことが簡単になりました。

だもんでLambdaからpyarrowを使ってparquetファイルを読めるようにしたら色々と捗るのでは?と思い、ちょっと動かしてみました。


pyarrowを含めたLambda Layerを作成する

まずはpyarrowを含めたLambda Layerを作成します。

Lambda Layerを作るサンプルは以下に上げています。

https://github.com/kanga333/lambda-layer-pyarrow

中身を簡単に説明します。

まずLamnda Layerを作るためのSAMテンプレートを作成します。

workdir配下のパッケージをLayerとして上げるだけの設定です。


sam.yml

AWSTemplateFormatVersion: '2010-09-09'

Transform: AWS::Serverless-2016-10-31
Description: Lambda Layer to use pyarrow and related libraries.

Resources:
CpuTrace:
Type: AWS::Serverless::LayerVersion
Properties:
LayerName: AWSLambda-Python3-pyarrow
Description: Lambda Layer to use pyarrow and related libraries.
ContentUri: workdir
CompatibleRuntimes:
- python3.6
RetentionPolicy: Retain


次に必要なライブラリをrequirements.txtに書きます


requirements.txt

pandas==0.23.4

pyarrow==0.11.1
s3fs==0.2.0

pyarrowのExampleを見た感じだとpandasも入れた方が良さそうだったのでpandasもついでに含めまています。それからs3上のファイルをpyarrowから直接読むのに必要なのでs3fsも含めています。

これらのライブラリのビルドをAmazonLinux上で行うために次のようなDockerfileを用意してビルド用の環境を作ります。


Dockerfile

FROM amazonlinux:1

RUN yum install -y python36 python36-pip make gcc python36-devel
RUN python3 -m pip install awscli


あとは上のDockerイメージ(kanga333/pylambda-packer)を利用してpip moduleをworkdir/pythonの下に展開します。

docker run -v `pwd`:/tmp/work -w /tmp/work \

kanga333/pylambda-packer \
python3 -m pip install -r requirements.txt -t workdir/python

workdir/pythonの下の容量をみた感じだと250MB未満なので容量は大丈夫そうです。

# du -sm workdir

190 workdir

あとはaws-cliを使ってデプロイします。

aws cloudformation package \

--template-file sam.yml \
--s3-bucket bucket-name \
--s3-prefix pyarrow \
--output-template-file .template.yml
aws cloudformation deploy \
--template-file .template.yml \
--stack-name pyarrow

これでpyarrowをLambdaで使う準備が整いました。


適当なデータを用意する

手元にいい感じのデータがなかったのでapache-loggenを使ってjson形式のダミーログを生成しました。

こんな形式のログが出力されます。

{"host":"80.162.35.40","user":"-","method":"GET","path":"/category/electronics","code":200,"referer":"/category/software?from=20","size":87,"agent":"Mozilla/5.0 (Windows NT 6.0; rv:10.0.1) Gecko/20100101 Firefox/10.0.1"}

このログを30万件で吐き出した所70MBくらいのサイズになりました。

今回やりたいのはparquetを読むことなのでローカルのPCで(pyarrowを使って)parquetに変換してからs3上にアップしました。

ちなみにparquetに変換後のサイズは3MBでした。

以下のようなs3のパスに格納します。

s3://bucket-name/date=20181217/apache.parquet

パーティションのリードも試したかったので全く同じログを別パーティションにも配置しました。

s3://bucket-name/date=20181218/apache.parquet


LambdaからS3上のparquetを読む

いよいよLambdaからparquetを読んでみます。

書き捨てなのでコンソールからポチポチとPython3.6のLambdaを作って先程あげたLambdaレイヤーを追加しました。なおメモリがそれなりに要りそうな気がしたので、メモリは1024MBまで上げています。

まずは1つのparquet全体を読んで件数を調べてみます。

import json

import s3fs
import pyarrow.parquet as pq

fs = s3fs.S3FileSystem()

def lambda_handler(event, context):
dataset = pq.ParquetDataset('s3://bucket-name/date=20181217', filesystem=fs)
table = dataset.read()
df = table.to_pandas()
return len(df)

ちゃんと30万件読めてることが確認できます。


レスポンス

300000


もとのデータのサイズの割には結構メモリを消費しています。(Max Memory Used: 332 MB

START RequestId: 3c14124a-0223-11e9-9cee-fb000aecb4c5 Version: $LATEST

END RequestId: 3c14124a-0223-11e9-9cee-fb000aecb4c5
REPORT RequestId: 3c14124a-0223-11e9-9cee-fb000aecb4c5 Duration: 857.08 ms Billed Duration: 900 ms Memory Size: 1024 MB Max Memory Used: 332 MB

次にせっかくの列指向フォーマットなので読み込む列をcodeに絞り込んでその分布を調べてみます。

import json

import s3fs
import pyarrow.parquet as pq

fs = s3fs.S3FileSystem()

def lambda_handler(event, context):
dataset = pq.ParquetDataset('s3://bucket-name/date=20181217', filesystem=fs)
table = dataset.read(columns=['code'])
df = table.to_pandas()
return df.groupby('code').size().to_dict()


レスポンス

{

"200": 299472,
"404": 493,
"500": 35
}

列を絞り込んだ分、バッチの実行時間が短くなりメモリの消費量も少なくなっています。

START RequestId: 25d4ec02-0224-11e9-96ed-7f72d7f5cdae Version: $LATEST

END RequestId: 25d4ec02-0224-11e9-96ed-7f72d7f5cdae
REPORT RequestId: 25d4ec02-0224-11e9-96ed-7f72d7f5cdae Duration: 294.74 ms Billed Duration: 300 ms Memory Size: 1024 MB Max Memory Used: 137 MB

最後にディレクトリによるパーティション構造のデータを読み込んでみます。どうやらhiveのディレクトリ形式でパスを切っていれば特別な設定が無くてもパーティションカラムを認識してくれるらしいです。便利。

import s3fs

import pyarrow.parquet as pq

fs = s3fs.S3FileSystem()

def lambda_handler(event, context):
dataset = pq.ParquetDataset('s3://bucket-name', filesystem=fs)
table = dataset.read(columns=['code','date'])
df = table.to_pandas()
tuple_key_dict = df.groupby(['code']).date.value_counts().to_dict()
resp = {}
for tuple_key, value in tuple_key_dict.items():
code, date = tuple_key
counts = resp.get(date, {})
counts[code] = value
resp[date] = counts
return resp

pandas力が足りてなくてsieriesをdictにした際にキーがタプルがなってしまったので、ネストしたdictに変換するため最後ごちゃごちゃしてますが、ちゃんとパーティションキーもカラムとして認識されてることが確認できます。


レスポンス

{

"20181217": {
"200": 299472,
"404": 493,
"500": 35
},
"20181218": {
"200": 299472,
"404": 493,
"500": 35
}
}

START RequestId: 64bdf2fc-022b-11e9-b242-15c22cd54f32 Version: $LATEST

END RequestId: 64bdf2fc-022b-11e9-b242-15c22cd54f32
REPORT RequestId: 64bdf2fc-022b-11e9-b242-15c22cd54f32 Duration: 678.82 ms Billed Duration: 700 ms Memory Size: 1024 MB Max Memory Used: 174 MB


おわりに

以上簡単ですが、pyarrowをLambdaで使う一連の流れの紹介でした。

肝心の実務での使い道について、今の所まだ思いつかないですが、生成したデータに対して件数とか分布を調べて検査するバッチとかで使えなくもないかな?とは思いました。