Help us understand the problem. What is going on with this article?

pyarrowを使ってLambdaからS3のparquetファイルを読む

More than 1 year has passed since last update.

あらすじ

この記事はMobingi Advent Calendar 2018の17日目の記事です。

Lambda Layerのおかげで重めのライブラリをLambdaで使うことが簡単になりました。
だもんでLambdaからpyarrowを使ってparquetファイルを読めるようにしたら色々と捗るのでは?と思い、ちょっと動かしてみました。

pyarrowを含めたLambda Layerを作成する

まずはpyarrowを含めたLambda Layerを作成します。
Lambda Layerを作るサンプルは以下に上げています。

https://github.com/kanga333/lambda-layer-pyarrow

中身を簡単に説明します。
まずLamnda Layerを作るためのSAMテンプレートを作成します。
workdir配下のパッケージをLayerとして上げるだけの設定です。

sam.yml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Lambda Layer to use pyarrow and related libraries.

Resources:
  CpuTrace:
    Type: AWS::Serverless::LayerVersion
    Properties:
      LayerName: AWSLambda-Python3-pyarrow
      Description: Lambda Layer to use pyarrow and related libraries.
      ContentUri: workdir
      CompatibleRuntimes:
        - python3.6
      RetentionPolicy: Retain

次に必要なライブラリをrequirements.txtに書きます

requirements.txt
pandas==0.23.4
pyarrow==0.11.1
s3fs==0.2.0

pyarrowのExampleを見た感じだとpandasも入れた方が良さそうだったのでpandasもついでに含めまています。それからs3上のファイルをpyarrowから直接読むのに必要なのでs3fsも含めています。

これらのライブラリのビルドをAmazonLinux上で行うために次のようなDockerfileを用意してビルド用の環境を作ります。

Dockerfile
FROM amazonlinux:1

RUN yum install -y python36 python36-pip make gcc python36-devel
RUN python3 -m pip install awscli

あとは上のDockerイメージ(kanga333/pylambda-packer)を利用してpip moduleをworkdir/pythonの下に展開します。

docker run -v `pwd`:/tmp/work -w /tmp/work \
       kanga333/pylambda-packer \
       python3 -m pip install -r requirements.txt -t workdir/python

workdir/pythonの下の容量をみた感じだと250MB未満なので容量は大丈夫そうです。

# du -sm workdir
190 workdir

あとはaws-cliを使ってデプロイします。

aws cloudformation package \
        --template-file sam.yml \
        --s3-bucket bucket-name \
        --s3-prefix pyarrow \
        --output-template-file .template.yml
aws cloudformation deploy \
        --template-file .template.yml \
        --stack-name pyarrow

これでpyarrowをLambdaで使う準備が整いました。

適当なデータを用意する

手元にいい感じのデータがなかったのでapache-loggenを使ってjson形式のダミーログを生成しました。

こんな形式のログが出力されます。

{"host":"80.162.35.40","user":"-","method":"GET","path":"/category/electronics","code":200,"referer":"/category/software?from=20","size":87,"agent":"Mozilla/5.0 (Windows NT 6.0; rv:10.0.1) Gecko/20100101 Firefox/10.0.1"}

このログを30万件で吐き出した所70MBくらいのサイズになりました。
今回やりたいのはparquetを読むことなのでローカルのPCで(pyarrowを使って)parquetに変換してからs3上にアップしました。
ちなみにparquetに変換後のサイズは3MBでした。

以下のようなs3のパスに格納します。

s3://bucket-name/date=20181217/apache.parquet

パーティションのリードも試したかったので全く同じログを別パーティションにも配置しました。

s3://bucket-name/date=20181218/apache.parquet

LambdaからS3上のparquetを読む

いよいよLambdaからparquetを読んでみます。
書き捨てなのでコンソールからポチポチとPython3.6のLambdaを作って先程あげたLambdaレイヤーを追加しました。なおメモリがそれなりに要りそうな気がしたので、メモリは1024MBまで上げています。

まずは1つのparquet全体を読んで件数を調べてみます。

import json
import s3fs
import pyarrow.parquet as pq

fs = s3fs.S3FileSystem()

def lambda_handler(event, context):
    dataset = pq.ParquetDataset('s3://bucket-name/date=20181217', filesystem=fs)
    table = dataset.read()
    df = table.to_pandas()
    return len(df)

ちゃんと30万件読めてることが確認できます。

レスポンス
300000

もとのデータのサイズの割には結構メモリを消費しています。(Max Memory Used: 332 MB

START RequestId: 3c14124a-0223-11e9-9cee-fb000aecb4c5 Version: $LATEST
END RequestId: 3c14124a-0223-11e9-9cee-fb000aecb4c5
REPORT RequestId: 3c14124a-0223-11e9-9cee-fb000aecb4c5  Duration: 857.08 ms Billed Duration: 900 ms     Memory Size: 1024 MB    Max Memory Used: 332 MB 

次にせっかくの列指向フォーマットなので読み込む列をcodeに絞り込んでその分布を調べてみます。

import json
import s3fs
import pyarrow.parquet as pq

fs = s3fs.S3FileSystem()

def lambda_handler(event, context):
    dataset = pq.ParquetDataset('s3://bucket-name/date=20181217', filesystem=fs)
    table = dataset.read(columns=['code'])
    df = table.to_pandas()
    return df.groupby('code').size().to_dict()
レスポンス
{
  "200": 299472,
  "404": 493,
  "500": 35
}

列を絞り込んだ分、バッチの実行時間が短くなりメモリの消費量も少なくなっています。

START RequestId: 25d4ec02-0224-11e9-96ed-7f72d7f5cdae Version: $LATEST
END RequestId: 25d4ec02-0224-11e9-96ed-7f72d7f5cdae
REPORT RequestId: 25d4ec02-0224-11e9-96ed-7f72d7f5cdae  Duration: 294.74 ms Billed Duration: 300 ms     Memory Size: 1024 MB    Max Memory Used: 137 MB 

最後にディレクトリによるパーティション構造のデータを読み込んでみます。どうやらhiveのディレクトリ形式でパスを切っていれば特別な設定が無くてもパーティションカラムを認識してくれるらしいです。便利。

import s3fs
import pyarrow.parquet as pq

fs = s3fs.S3FileSystem()

def lambda_handler(event, context):
    dataset = pq.ParquetDataset('s3://bucket-name', filesystem=fs)
    table = dataset.read(columns=['code','date'])
    df = table.to_pandas()
    tuple_key_dict = df.groupby(['code']).date.value_counts().to_dict()
    resp = {}
    for tuple_key, value in tuple_key_dict.items():
        code, date = tuple_key
        counts = resp.get(date, {})
        counts[code] = value
        resp[date] = counts
    return resp

pandas力が足りてなくてsieriesをdictにした際にキーがタプルがなってしまったので、ネストしたdictに変換するため最後ごちゃごちゃしてますが、ちゃんとパーティションキーもカラムとして認識されてることが確認できます。

レスポンス
{
  "20181217": {
    "200": 299472,
    "404": 493,
    "500": 35
  },
  "20181218": {
    "200": 299472,
    "404": 493,
    "500": 35
  }
}
START RequestId: 64bdf2fc-022b-11e9-b242-15c22cd54f32 Version: $LATEST
END RequestId: 64bdf2fc-022b-11e9-b242-15c22cd54f32
REPORT RequestId: 64bdf2fc-022b-11e9-b242-15c22cd54f32  Duration: 678.82 ms Billed Duration: 700 ms     Memory Size: 1024 MB    Max Memory Used: 174 MB 

おわりに

以上簡単ですが、pyarrowをLambdaで使う一連の流れの紹介でした。
肝心の実務での使い道について、今の所まだ思いつかないですが、生成したデータに対して件数とか分布を調べて検査するバッチとかで使えなくもないかな?とは思いました。

kanga
うさんくさいインフラエンジニア
http://kangaat.hatenablog.com/
speee
株式会社Speeeは「解き尽くす。未来を引きよせる。」というミッションを実現すべく、中長期的な目線で企業価値を最大化させていくため、組織・事業のStyleを大切にした永続的な価値創造を目指しています。
https://www.speee.jp/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away