1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

RedshiftでUNLOADしたgzipファイルをLambdaのPythonで処理をして再びgzipしてS3へアップする

Posted at

やりたいこと

タイトルの通りですが、Redshiftにデータウェアハウスがあり、普段ELTで処理することが多いのですが、プログラミングによるデータ加工が必要な場合もあります。

RedshiftのUNLOADを利用することでRedshiftからSQLの結果をS3にgzipファイルを作成することが出来ますので、S3へのputイベントをトリガーにLambdaで処理して再びgzipした状態でS3にアップ、ということをしてみました。

UNLOAD

Lambdaは現時点では3008MBが最大となります。今回のような処理はファイルサイズが増えると必然利用メモリ量が増えてしまいます。
そこで、MAXFILESIZE パラメータ を設定することでLambdaへ渡すファイルサイズを調整します。
完全なるケースバイケースですが、今回は50MBで設定してみました。

Lambda上のコード

トリガ設定は割愛します。

import json
import boto3
import urllib.parse
import os
import sys
import csv
import re
import traceback
import gzip
import subprocess

s3client = boto3.client('s3')
s3resource = boto3.resource('s3')
SEP = '\t'
L_SEP = '\n'
S3OUTBACKET='XXXXXXXX'
S3OUTBASE='athena/preprocessing/XXXXXXtmp/'

def lambda_handler(event, context):
    
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    taragetfile=os.path.split(key)[1]
    outputprefixA=os.path.split(key)[0].split("/")[-1]
    outputprefixB=os.path.split(key)[0].split("/")[-2]

    outputdata = "";
    try:
        dlfilename ='/tmp/'+key.replace("/","")
        s3client.download_file(bucket, key, dlfilename)
        gzipfile = gzip.open(dlfilename, 'rt') 
        csvreader = csv.reader(gzipfile, delimiter=SEP, lineterminator=L_SEP, quoting=csv.QUOTE_NONE)
        for line in csvreader:
            # 1行ずつ様々な処理をして、outputdataに格納していく。
            # 割愛された処理の中に、利用しているimportがございます。
            # ご了承ください
    except Exception as e:
        print(e)
        raise e
        
    print("memory size at outputdata:"+str(sys.getsizeof(outputdata)))
    os.remove(dlfilename)
    uploadbinary = gzip.compress(bytes(outputdata , 'utf-8'))
    print("memory size at uploadbinary:"+str(sys.getsizeof(uploadbinary)))
    uploadfilename='processed_'+taragetfile

    try:
        bucket = S3OUTBACKET
        key = S3OUTBASE+outputprefixA+"/"+outputprefixB+"/"+uploadfilename
        obj = s3resource.Object(bucket,key)
        obj.put( Body=uploadbinary ) 
    except Exception as e:
        print(e)
        raise e
 
    return 0

チューニングしてゆく

実際のファイルでテストをしたところメモリエラーとなってしまいました。
コードの途中に挟んでいる str(sys.getsizeof(outputdata)) はその確認用で、メモリサイズを見て状況を把握しました。コードには書いてませんが、gzip自体の対象データへの圧縮率も見ておくと良いと思います。
なお私が今回扱ったデータは、gzip圧縮後50MBだったのですが、処理後のデータ+圧縮後のデータで1000MBものメモリを要してしまいました。やはり実際にやってみないと分からないものですね。Pythonのメモリ事情をもう少し調べた方が良いかもしれません。

なおLambdaはメモリサイズを増やすとCPUリソースなども増えますので、処理内容とファイルサイズ次第ですが、一度最大の3008MBだとどのくらい処理が速くなるのかは確認すると良いです。今回も、メモリを倍にしたら処理時間が半分に、というケースすらありました。

もし定常的に行う処理の場合、ここでのチューニングがランニングコストに直結するので重要度が高いです。

語彙の無い感想

Lambdaめっちゃ便利

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?