1. inukai-norio

    No comment

    inukai-norio
Changes in body
Source | HTML | Preview

はじめに

写真のコンバート方法を変更することになりました。 S3 に置かれた時それをトリガにSQSにエンキューする必要がありました。
そこで、コンバート方法の設定を一般的には、S3 Event Notificasions によりエンキューをすれば良いのですが、エンキュー先が複数に必要になりました。yaml として S3 に置きそれをトリガにSQSを経由してfargateでコンバートが走ることになったのですが、そのまま実装してしまうと利用者の要求を満たさない可能性が出てきたため、うまく切り抜けようとした記録です。

前提条件エンキュー先

  • これまでのコンバートでは「時間がかかっても良いコンバート」と「少数の高速なコンバート」で分かれていたが、今回の変更で統一する
  • 新しいコンバートでは yaml には写真のコンバート方法が記載されている。
    • 一つのyamlに対して複数の写真をコンバートできる。

最初の方法

  1. 夜間用SQS(process.env.sqsUriNight)
  2. 昼の低速処理用SQS(process.env.sqsUriNormal)
  3. 昼の高速処理用SQS(process.env.sqsUriFast)

AWSに詳しい方なら分かると思いますが、用途別で分けるだけなら、S3のプレフィックスで制御することもできますが、時間帯により分ける必要があったので、lambda S3を使って実装しました。 に yaml が置かれた時に、SQSにエンキューするのはとても簡単な作業です。
そのため、上記の方法で実装したのですが、写真枚数が多いと次のコンバートまでの待機時間が多くなってしまいます。
そこで問題になってきたのが、利用者である業務の人からの要望で「なるべく早くコンバートが終わって欲しい」を叶えるのが難しいと思うようになりました。

考えた方法(ボツ案)

最初に考えたのはSQS低速高速の分岐はコードを見ての通り、 のエンキューは S3 のプレフィックスで制御することもできるため、それらを用いて枚数ごとにおく場所を変える方法でした。photos の数により判断します。
yamlこれは、確かにこれで問題は解決しそうですが、枚数がアップロード後でないとわからないことや夜間など全てをゆっくりなコンバートで問題ない時などをfargateのタイプを小さくできないので、今回は不採用になりました。内にある 同様のソースを書く場合の参考にしてください。

採用した案

S3からのエンキューはSQS以外にもlambdaに対しても行うことができます。
そのため、これを用いてlambdaを起動しyaml記載の枚数をカウントして閾値を設けエンキューするSQSを分けました。
また、夜間においてもエンキューするSQSを分けることにより変更できるのでこちらの方法を採用しました。

使ったコード

index.js(node8)
'use strict';

const aws = require('aws-sdk');
const yaml = require('yaml');
require('date-utils');

const s3 = new aws.S3({apiVersion: '2006-03-01'});
const sqs = new aws.SQS({apiVersion: '2012-11-05'});
let sqsUriNormal = process.env.sqsUriNormal;
let sqsUriFast = process.env.sqsUriFast;
let sqsUriNight = process.env.sqsUriNight;
let threshold = process.env.threshold;
let nightStart = process.env.nightStart;
let nightEnd = process.env.nightEnd;

let enqueueSQS = (uri, key) => {
  let params = {
    MessageBody: key,
    QueueUrl: uri,
  };

  return sqs.sendMessage(params).promise();
};

let getYaml = (bucket, key) => {
  let yamlParam = {
    Bucket: bucket,
    Key: key,
  };

  return s3.getObject(yamlParam).promise();
};

let isNight = () => {
  let now = Number(new Date().toFormat('HH24MISS'));
  return nightStart <= now && now < nightEnd;
};

let parseYaml = (data) => yaml.parse(data.Body.toString());

let getPutSqs = (data) => {
  if (isNight()) {
    return sqsUriNight;
  }
  if (data.photos.length > threshold) {
    return sqsUriNormal;
  }
  return sqsUriFast;
};

let retmsg = (msg) => {
  console.log(msg);
  return msg;
};

exports.handler = async (event, context) => {
  let bucket = event.Records[0].s3.bucket.name;
  let key = decodeURIComponent(event.Records[0].s3.object.key);

  try {
    let sqsName = getPutSqs(parseYaml(await getYaml(bucket, key)));
    let ret = await enqueueSQS(sqsName, key);
    return retmsg(ret);
  } catch (err) {
    throw new Error(retmsg(err));
  }
};

見ての通り、ただエンキューしているだけです。

最後に

実際このコンバートを使うのはもう少し後なので、問題が発生するかもしれないです。
lambda を使うとこのようなこともできるよという共有でした。

index.js(node8)
'use strict';

const aws = require('aws-sdk');
const yaml = require('yaml');
require('date-utils');

const s3 = new aws.S3({apiVersion: '2006-03-01'});
const sqs = new aws.SQS({apiVersion: '2012-11-05'});
let sqsUriNormal = process.env.sqsUriNormal;
let sqsUriFast = process.env.sqsUriFast;
let sqsUriNight = process.env.sqsUriNight;
let threshold = process.env.threshold;
let nightStart = process.env.nightStart;
let nightEnd = process.env.nightEnd;

let enqueueSQS = (uri, key) => {
  let params = {
    MessageBody: key,
    QueueUrl: uri,
  };

  return sqs.sendMessage(params).promise();
};

let getYaml = (bucket, key) => {
  let yamlParam = {
    Bucket: bucket,
    Key: key,
  };

  return s3.getObject(yamlParam).promise();
};

let isNight = () => {
  let now = Number(new Date().toFormat('HH24MISS'));
  return nightStart <= now && now < nightEnd;
};

let parseYaml = (data) => yaml.parse(data.Body.toString());

let getPutSqs = (data) => {
  if (isNight()) {
    return sqsUriNight;
  }
  if (data.photos.length > threshold) {
    return sqsUriNormal;
  }
  return sqsUriFast;
};

let retmsg = (msg) => {
  console.log(msg);
  return msg;
};

exports.handler = async (event, context) => {
  let bucket = event.Records[0].s3.bucket.name;
  let key = decodeURIComponent(event.Records[0].s3.object.key);

  try {
    let sqsName = getPutSqs(parseYaml(await getYaml(bucket, key)));
    let ret = await enqueueSQS(sqsName, key);
    return retmsg(ret);
  } catch (err) {
    throw new Error(retmsg(err));
  }
};