あらすじ
この記事はMobingi Advent Calendar 2018の17日目の記事です。
Lambda Layerのおかげで重めのライブラリをLambdaで使うことが簡単になりました。
だもんでLambdaからpyarrowを使ってparquetファイルを読めるようにしたら色々と捗るのでは?と思い、ちょっと動かしてみました。
pyarrowを含めたLambda Layerを作成する
まずはpyarrowを含めたLambda Layerを作成します。
Lambda Layerを作るサンプルは以下に上げています。
中身を簡単に説明します。
まずLamnda Layerを作るためのSAMテンプレートを作成します。
workdir配下のパッケージをLayerとして上げるだけの設定です。
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Lambda Layer to use pyarrow and related libraries.
Resources:
CpuTrace:
Type: AWS::Serverless::LayerVersion
Properties:
LayerName: AWSLambda-Python3-pyarrow
Description: Lambda Layer to use pyarrow and related libraries.
ContentUri: workdir
CompatibleRuntimes:
- python3.6
RetentionPolicy: Retain
次に必要なライブラリをrequirements.txtに書きます
pandas==0.23.4
pyarrow==0.11.1
s3fs==0.2.0
pyarrowのExampleを見た感じだとpandasも入れた方が良さそうだったのでpandasもついでに含めまています。それからs3上のファイルをpyarrowから直接読むのに必要なのでs3fsも含めています。
これらのライブラリのビルドをAmazonLinux上で行うために次のようなDockerfileを用意してビルド用の環境を作ります。
FROM amazonlinux:1
RUN yum install -y python36 python36-pip make gcc python36-devel
RUN python3 -m pip install awscli
あとは上のDockerイメージ(kanga333/pylambda-packer
)を利用してpip moduleをworkdir/python
の下に展開します。
docker run -v `pwd`:/tmp/work -w /tmp/work \
kanga333/pylambda-packer \
python3 -m pip install -r requirements.txt -t workdir/python
workdir/pythonの下の容量をみた感じだと250MB未満なので容量は大丈夫そうです。
# du -sm workdir
190 workdir
あとはaws-cliを使ってデプロイします。
aws cloudformation package \
--template-file sam.yml \
--s3-bucket bucket-name \
--s3-prefix pyarrow \
--output-template-file .template.yml
aws cloudformation deploy \
--template-file .template.yml \
--stack-name pyarrow
これでpyarrowをLambdaで使う準備が整いました。
適当なデータを用意する
手元にいい感じのデータがなかったのでapache-loggenを使ってjson形式のダミーログを生成しました。
こんな形式のログが出力されます。
{"host":"80.162.35.40","user":"-","method":"GET","path":"/category/electronics","code":200,"referer":"/category/software?from=20","size":87,"agent":"Mozilla/5.0 (Windows NT 6.0; rv:10.0.1) Gecko/20100101 Firefox/10.0.1"}
このログを30万件で吐き出した所70MBくらいのサイズになりました。
今回やりたいのはparquetを読むことなのでローカルのPCで(pyarrowを使って)parquetに変換してからs3上にアップしました。
ちなみにparquetに変換後のサイズは3MBでした。
以下のようなs3のパスに格納します。
s3://bucket-name/date=20181217/apache.parquet
パーティションのリードも試したかったので全く同じログを別パーティションにも配置しました。
s3://bucket-name/date=20181218/apache.parquet
LambdaからS3上のparquetを読む
いよいよLambdaからparquetを読んでみます。
書き捨てなのでコンソールからポチポチとPython3.6のLambdaを作って先程あげたLambdaレイヤーを追加しました。なおメモリがそれなりに要りそうな気がしたので、メモリは1024MBまで上げています。
まずは1つのparquet全体を読んで件数を調べてみます。
import json
import s3fs
import pyarrow.parquet as pq
fs = s3fs.S3FileSystem()
def lambda_handler(event, context):
dataset = pq.ParquetDataset('s3://bucket-name/date=20181217', filesystem=fs)
table = dataset.read()
df = table.to_pandas()
return len(df)
ちゃんと30万件読めてることが確認できます。
300000
もとのデータのサイズの割には結構メモリを消費しています。(Max Memory Used: 332 MB
)
START RequestId: 3c14124a-0223-11e9-9cee-fb000aecb4c5 Version: $LATEST
END RequestId: 3c14124a-0223-11e9-9cee-fb000aecb4c5
REPORT RequestId: 3c14124a-0223-11e9-9cee-fb000aecb4c5 Duration: 857.08 ms Billed Duration: 900 ms Memory Size: 1024 MB Max Memory Used: 332 MB
次にせっかくの列指向フォーマットなので読み込む列をcode
に絞り込んでその分布を調べてみます。
import json
import s3fs
import pyarrow.parquet as pq
fs = s3fs.S3FileSystem()
def lambda_handler(event, context):
dataset = pq.ParquetDataset('s3://bucket-name/date=20181217', filesystem=fs)
table = dataset.read(columns=['code'])
df = table.to_pandas()
return df.groupby('code').size().to_dict()
{
"200": 299472,
"404": 493,
"500": 35
}
列を絞り込んだ分、バッチの実行時間が短くなりメモリの消費量も少なくなっています。
START RequestId: 25d4ec02-0224-11e9-96ed-7f72d7f5cdae Version: $LATEST
END RequestId: 25d4ec02-0224-11e9-96ed-7f72d7f5cdae
REPORT RequestId: 25d4ec02-0224-11e9-96ed-7f72d7f5cdae Duration: 294.74 ms Billed Duration: 300 ms Memory Size: 1024 MB Max Memory Used: 137 MB
最後にディレクトリによるパーティション構造のデータを読み込んでみます。どうやらhiveのディレクトリ形式でパスを切っていれば特別な設定が無くてもパーティションカラムを認識してくれるらしいです。便利。
import s3fs
import pyarrow.parquet as pq
fs = s3fs.S3FileSystem()
def lambda_handler(event, context):
dataset = pq.ParquetDataset('s3://bucket-name', filesystem=fs)
table = dataset.read(columns=['code','date'])
df = table.to_pandas()
tuple_key_dict = df.groupby(['code']).date.value_counts().to_dict()
resp = {}
for tuple_key, value in tuple_key_dict.items():
code, date = tuple_key
counts = resp.get(date, {})
counts[code] = value
resp[date] = counts
return resp
pandas力が足りてなくてsieriesをdictにした際にキーがタプルがなってしまったので、ネストしたdictに変換するため最後ごちゃごちゃしてますが、ちゃんとパーティションキーもカラムとして認識されてることが確認できます。
{
"20181217": {
"200": 299472,
"404": 493,
"500": 35
},
"20181218": {
"200": 299472,
"404": 493,
"500": 35
}
}
START RequestId: 64bdf2fc-022b-11e9-b242-15c22cd54f32 Version: $LATEST
END RequestId: 64bdf2fc-022b-11e9-b242-15c22cd54f32
REPORT RequestId: 64bdf2fc-022b-11e9-b242-15c22cd54f32 Duration: 678.82 ms Billed Duration: 700 ms Memory Size: 1024 MB Max Memory Used: 174 MB
おわりに
以上簡単ですが、pyarrowをLambdaで使う一連の流れの紹介でした。
肝心の実務での使い道について、今の所まだ思いつかないですが、生成したデータに対して件数とか分布を調べて検査するバッチとかで使えなくもないかな?とは思いました。