11
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Firehoseで出力したファイルをAthenaのパーティション形式に変換するLambda

Last updated at Posted at 2017-02-20

2019/02/18 追記

Firehose で出力先の prefix が指定できるようになりました!
これでこの Lambda もいらなくなりますね!素晴らしい。

概要

FirehoseはデフォルトでYYYY/MM/DD/HHというプレフィクスを付けてS3にファイルを格納する。
このままでもAthenaでパーティションとして利用することはできるが、ディレクトリが作成されるたびにADD PARTITIONしなければならない。

Amazon Athenaのパーティションを理解する #reinvent

大量データをクエリする場合はググればEMRを使ったカラムナフォーマットへの変換記事がたくさんヒットするけど、ライトにAthenaを使いたい場合に有効。

新しいパーティションを勝手には読み込んでくれないのでクエリ実行前に以下のSQLを実行する必要があります。冪等なので毎回実行すればOKです。

MSCK REPAIR TABLE database_name.table_name;

Lambda Function

Athenaの自動パーティション認識を有効にするために、Hiveパーティションに対応した形式のディレクトリにコピーするLambda関数を作った。
さらにFirehoseのYYYY/MM/DD/HHはUTCのため、JST(+9:00)のYYYY-MM-DD/HHに変更する。

'use strict';
process.env.TZ = 'Asia/Tokyo'; // Timezoneを明示的にJSTに

const aws = require('aws-sdk');
const s3client = new aws.S3({ apiVersion: '2006-03-01' });
const path = require('path');

const toBucket = process.env.TO_BUCKET; // Lambda Functionの環境変数
const toPrefix = process.env.TO_PREFIX; // Lambda Functionの環境変数

function getS3EventKey(s3obj) {
    return decodeURIComponent(s3obj.object.key.replace(/\+/g, ' '));
}

function getCopySource(s3obj) {
    const srcBucket = s3obj.bucket.name;
    const srcKey = getS3EventKey(s3obj);
    return `${srcBucket}/${srcKey}`;
}

function getTodayPartition() {
    const today = new Date();
    const year = today.getFullYear();
    let month = today.getMonth() + 1;
    let day = today.getDate();
    let hour = today.getHours();
    if (month < 10) month = "0" + month;
    if (day < 10) day = "0" + day;
    if (hour < 10) hour = "0" + hour;
    // ここがキモ。dtとhourというパーティションが自動認識される
    return `dt=${year}-${month}-${day}/hour=${hour}`;
}

function getDestKey(s3obj) {
    const srcKey = getS3EventKey(s3obj);
    const filename = path.basename(srcKey);
    const todayPartition = getTodayPartition();
    return `${toPrefix}${todayPartition}/${filename}`;
}

exports.handler = (event, context, callback) => {
    const s3obj = event.Records[0].s3;
    const copySrc = getCopySource(s3obj);
    // ディレクトリは無視
    if (copySrc.endsWith("/")) {
        callback(null, `Skip, because ${copySrc} is directory.`);
        return;
    }
    const destKey = getDestKey(s3obj);
    const params = { CopySource: copySrc, Bucket: toBucket, Key: destKey };
    console.log(`s3://${copySrc} copy to s3://${toBucket}/${destKey}`);
    s3client.copyObject(params, (err, data) => {
        if (err) {
            console.log(err, err.stack);
            callback(err);
        } else {
            console.log(data);
            callback(null, data);
        }
    });
};

Firehoseが出力するS3バケットのCreate*イベントで実行するようにこのLambda関数を作成する。

このコードは東京リージョンには対応していません。

AWSの設定など

Lambdaの実行ロールに以下のポリシーを設定。(Listはいらんかもしれんけど念のため)

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "Enable S3 Read Permissions",
      "Effect": "Allow",
      "Action": [
        "s3:List*",
        "s3:Get*"
      ],
      "Resource": [
        "arn:aws:s3:::Firehoseが出力するバケット名",
        "arn:aws:s3:::Firehoseが出力するバケット名/*"
      ]
    },
    {
      "Sid": "Enable S3 Write Permissions",
      "Effect": "Allow",
      "Action": [
        "s3:List*",
        "s3:Put*"
      ],
      "Resource": [
        "arn:aws:s3:::Athenaが参照するバケット名",
        "arn:aws:s3:::Athenaが参照するバケット名/*"
      ]
    }
  ]
}

余談

FirehoseがDynamic Prefix対応してくれればこんなんしなくて済むんだけど。

参考リンク

11
10
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?