LoginSignup
90
88

More than 5 years have passed since last update.

AWS Lambdaをcronの代わりに使ってみるよ

Last updated at Posted at 2014-12-15

この記事は AWS Lambdaアドベントカレンダー 16日目の記事です。

概要

AWS Lambdaはイベント駆動で処理を実行する事ができます。
つまり
「何らかのイベント」>「Lambda実行」>「何らかのイベント」>(繰り返し)
と繰り返すことでcronの代わりに利用できないか?と考えました。
上手く行けばサーバレスであんな事やこんな事ができちゃうかもしれませんね…!
スクリーンショット 2014-12-16 3.03.11.png

今回試してみた構成

S3のイベントをトリガーにしてみました。
とあるファイルをトリガーにして、
その中身のフラグを見て 処理開始(0)/処理中(1)/処理終了(0/1以外) を判断しています。

% cat job
0

また、cronのインターバルは最大でも60秒です。
これはlambdaのtimeout設定が 60秒までしか設定できない からです。
これより小さい数値でsetTimeoutすることにより、擬似ループのようなことを可能にしています。
実際の処理の時間まで含めると、
setTimeout設定はmaxの60秒だとギリギリすぎるので5-10秒くらい余裕を持たせたほうがよいでしょう。

cronの 開始 / 終了 の流れ

  1. (事前準備) Lambdaのtimeout設定を60秒(Max)にしておきます
  2. jobファイルの中身を"0"にしてS3にアップロードします(実は"1"でも動いちゃうんですが、ログ分別のため)
  3. Lambdaは job ファイルの中身を見て 0 or 1 であれば無限ループに入ります
  4. 何か任意の処理を行います
  5. job を "9"(0/1以外であればOK) にしてS3へアップロードすると、ジョブが終了します

何か変な動作になったら中身を "9" にしてアップロードし、cronジョブを停止するとよいでしょう。

ちなみに以下ソースコードも実装もかなり適当です。。。が、一応動作します。
きちんとやるならDynamoDBなどを利用してロック処理を行ったり、
ジョブ管理したほうが良いでしょう。


雛形ソースコード

これが擬似cronの基本形です。間に好きな処理を差し込みましょう。

console.log('Loading event');
var aws = require('aws-sdk');
var s3 = new aws.S3({apiVersion: '2006-03-01'});
var request = require('request');

var jobctl_key_name = "sample/job" // ジョブコントロールを行うファイル名
var interval = 30 * 1000 // 処理のインターバル。ここでは30秒間隔で実行しています。

// s3file ジョブコントロールのステータスをアップデートしています
function change_status(bucket, key, content_type, callback) {
  s3.putObject({
      Bucket: bucket,
      Key: key,
      Body: new Buffer("1", 'binary'), // update running...
      ContentType: content_type
  }, callback);
}

// メインハンドラ
exports.handler = function(event, context) {
   console.log('Received event:' + Date.now());
   console.log(JSON.stringify(event, null, '  '));
   // Get the object from the event and show its content type
   var bucket = event.Records[0].s3.bucket.name;
   var key = event.Records[0].s3.object.key;
   // set timeoutで擬似cronを実現しています
   setTimeout(function() {
     if(key == jobctl_key_name) {
       s3.getObject({Bucket:bucket, Key:key},
          function(err,data) {
            if (err) {
               console.log('error getting object ' + key + ' from bucket ' + bucket +
                   '. Make sure they exist and your bucket is in the same region as this function.');
               context.done('error','error getting file'+err);
            }
            else {
               var content_type = data.ContentType;
               var body = data.Body.toString("utf-8");
               //console.log('BODY:('+body+')');
               if(body == 0) {
                   console.log('- start')
                   change_status(bucket, key, content_type, function(err){
                     if(err) context.done(err,  "- change_status_error");
                     else    context.done(null, "- running...");
                   });
               } else if(body == 1) {
                   console.log('- runnning...')
                   change_status(bucket, key, content_type, function(err){

                   //////////////////////////////////////////////                     
                   // この辺りに好きな処理を入れる


                     if(err) context.done(err,  "- change_status_error");
                     else    context.done(null, "- running...");
                   });
               } else {
                   console.log('- finish')
                   context.done(null, "- finish");
               }
            }
          }
       );
     } else {
        context.done(null, "");
     }
   }, interval);
};


それでは具体的な例を見ていきましょう。
※以下のソースコードでは非標準の request モジュールを利用しているため、
zipファイルの中に含める必要があります。

応用例1) BingAPIを利用して、画像収集するcronを設定

定期的にBingAPIを叩いて画像取得して、s3へ保存します。
アレな画像もどんどん収集できてしまいますね…!!

console.log('Loading event');
var aws = require('aws-sdk');
var s3 = new aws.S3({apiVersion: '2006-03-01'});
var request = require('request');
var fs = require('fs');

var jobctl_key_name = "sample1_aggregate/job"
var interval = 30 * 1000 // 30sec
var dest_bucket = "tori-aggregate-files" // ファイルを収集する先のバケット名

// s3file change_status
function change_status(bucket, key, content_type, callback) {
  s3.putObject({
      Bucket: bucket,
      Key: key,
      Body: new Buffer("1", 'binary'), // update running...
      ContentType: content_type
  }, callback);
}

// s3file upload
function s3_upload (bucket, key, body) {
  s3.putObject({
      Bucket: bucket,
      Key: key,
      Body: body,
  }, function(err, res) {
      if (err) {
        console.error(err);
        context.done('error putting object', err);
      } else {
        console.log('upload ok');
        console.log(JSON.stringify(res, null, 2));
        context.done(null, "success putting object");
      }
  });
}

function shuffle(x){
  x = x || []
  for(var i = 0, y = [], len = x.length; i < len; i++)
    y.push( x.splice( Math.random() * x.length | 0, 1)[0]);
  return y;
}

var http = require("http")
var acctKey = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx';  // bingアクセスキー
var rootUri = 'https://api.datamarket.azure.com/Bing/Search';
var auth    = new Buffer([ acctKey, acctKey ].join(':')).toString('base64');
var option = {
  headers : {
    'Authorization' : 'Basic ' + auth
  }
};
var request = require('request').defaults(option);
var cache_url = {}
function bing_api (bucket, callback) {
  var service_op  = "Image";
  var query       = "chome-chome-gazou";  // 検索文字列
  request.get({
    url : rootUri + '/' + service_op,
    qs  : {
      $format : 'json',
      Query   : "'" + query + "'", // the single quotes are required!
    }
  }, function(err, response, body) {
    if (err) {
      console.log(err)
    }
    if (response.statusCode !== 200) {
      console.log("internal error " + response.statusCode)
    }
    var results = JSON.parse(response.body);
    var sresult = shuffle(results.d.results)
    var len = (sresult.length >= 10) ? 10 : sresult.length
    var plen = 0;
    // ループ
    // 最大10件まで(最大アウトバウンドの件数に合わせている?)
    for(var i=0;i<len;i++) {
      var row = sresult[i];
      var media_url = row.MediaUrl
      var enc_media_url = encodeURIComponent(media_url)
      // 処理済みのURLはスキップ
      if(cache_url[enc_media_url]) {
        plen++
        continue;
      }
      // HTTPのみ処理する(手抜き)
      if(!media_url.match(/^http:\/\//)) {
        plen++
        continue;
      }

      var file_name = "/tmp/" + enc_media_url;
      var file = fs.createWriteStream(file_name);
      // get image
      http.get(media_url, function(res) {
        res.on('data', function(data) {
              file.write(data);
          }).on('end', function() {
              file.end();
              console.log(' downloaded to ' + file_name);
              // 一度取得したURLを保持しておく
              cache_url[enc_media_url] = true;
              // ファイル再読み込み
              var body = fs.readFileSync(file_name);
              // S3アップロード
              s3_upload(dest_bucket, enc_media_url, body);
              plen++
          });
      }).on('error', function(e) {
        plen++
        console.log(e)
      });
    }

    // check
    var timer = setInterval(function(){
      // 全処理終了後にcallback
      if(plen >= len) {
        clearInterval(timer)
        return callback()
      }
    }, 1000);
  });
}

// メインハンドラ
exports.handler = function(event, context) {
   console.log('Received event:' + Date.now());
   console.log(JSON.stringify(event, null, '  '));
   // Get the object from the event and show its content type
   var bucket = event.Records[0].s3.bucket.name;
   var key = event.Records[0].s3.object.key;
   // set timeout
   setTimeout(function() {
     if(key == jobctl_key_name) {
       s3.getObject({Bucket:bucket, Key:key},
          function(err,data) {
            if (err) {
               console.log('error getting object ' + key + ' from bucket ' + bucket +
                   '. Make sure they exist and your bucket is in the same region as this function.');
               context.done('error','error getting file'+err);
            }
            else {
               var content_type = data.ContentType;
               var body = data.Body.toString("utf-8");
               if(body == 0) {
                   console.log('- start')
                   change_status(bucket, key, content_type, function(err) {
                     if(err) context.done(err, "- change status error");

                     bing_api(bucket, function(){
                       context.done(null, "");
                     })
                   });
               } else if(body == 1) {
                   console.log('- runnning...')
                   change_status(bucket, key, content_type, function(err){
                     if(err) context.done(err, "- change status error");

                     bing_api(bucket, function(){
                       context.done(null, "");
                     })
                   });
               } else {
                   console.log('- finish')
                   context.done(null, "- finish");
               }
            }
          }
       );
     } else {
        context.done(null, "");
     }
   }, interval);
};

ダウンロード出来てるっぽい!ヒュ〜!
スクリーンショット 2014-12-16 2.11.38.png

※注意点としては、timeoutがあるので一度に多くは収集できないのと、アウトバウンド制限があるので制限内に収める必要がある、といったところでしょうか。


応用例2) 定期的に外部情報を取得して、Cloudwatchのカスタムメトリクスに登録する

今回は はてなブックマーク件数取得API を定期的に取得し、
Cloudwatchのカスタムメトリクスに自動登録してみました。
例では こちら のブクマ数を取得しています。

console.log('Loading event');
var aws = require('aws-sdk');
var s3 = new aws.S3({apiVersion: '2006-03-01'});
var cloudwatch = new aws.CloudWatch();
var request = require('request');

var jobctl_key_name = "sample2_cloudwatch/job"
var interval = 50 * 1000 // 50sec


// put cloudwatch
function put_cloudwatch(count, callback) {
  var params = {
    MetricData: [ /* required */
      {
        MetricName: 'hatebu_count',
        Dimensions: [
          {
            Name: 'Site',
            Value: 'niconico'
          },
        ],
        Timestamp: new Date,
        Unit: 'None',
        Value: Number(count)
      },
    ],
    Namespace: 'CUSTOM/Hatena'
  };
  // put metrics
  cloudwatch.putMetricData(params, callback)
}


// s3file change_status
function change_status(bucket, key, content_type, callback) {
  s3.putObject({
      Bucket: bucket,
      Key: key,
      Body: new Buffer("1", 'binary'), // update running...
      ContentType: content_type
  }, callback);
}

var http = require("http")
// this somewhere at the top of your code:
var rootUri = 'http://api.b.st-hatena.com/entry.count?url=http%3A%2F%2Fnicocas.com%2Fi%2Finfo%2F3';
var request = require('request').defaults({});
var cache_url = {}
function get_hatebu (callback) {
  request.get(rootUri, function(err, response, body) {
    if (err) {
      console.log(err)
    }
    if (response.statusCode !== 200) {
      console.log("internal error " + response.statusCode)
    }

    // put
    put_cloudwatch(body, callback);
  });
}

// メインハンドラ
exports.handler = function(event, context) {
   console.log('Received event:' + Date.now());
   console.log(JSON.stringify(event, null, '  '));
   // Get the object from the event and show its content type
   var bucket = event.Records[0].s3.bucket.name;
   var key = event.Records[0].s3.object.key;
   // set timeout
   setTimeout(function() {
     if(key == jobctl_key_name) {
       s3.getObject({Bucket:bucket, Key:key},
          function(err,data) {
            if (err) {
               console.log('error getting object ' + key + ' from bucket ' + bucket +
                   '. Make sure they exist and your bucket is in the same region as this function.');
               context.done('error','error getting file'+err);
            }
            else {
               var content_type = data.ContentType;
               var body = data.Body.toString("utf-8");
               if(body == 0) {
                   console.log('- start')
                   change_status(bucket, key, content_type, function(err){
                     if(err) {
                       console.log(err)
                       context.done("change_status error", err);
                     } else {
                       context.done(null, "");
                     }
                   });
               } else if(body == 1) {
                   console.log('- runnning...')
                   change_status(bucket, key, content_type, function(err){
                     if(err) {
                       console.log(err)
                       context.done("change_status error", err);
                     } else {
                       get_hatebu(function(err, data){
                         if(err) {
                           console.log(err)
                           context.done("get hatebu error", err);
                         } else {
                           context.done(null, "");
                         }
                       })
                     }
                   });
               } else {
                   console.log('- finish')
                   context.done(null, "- finish");
               }
            }
          }
       );
     } else {
        context.done(null, "");
     }
   }, interval);
};

しばらく待つと…
おおー!きちんと自動でグラフ化されている!!!
スクリーンショット 2014-12-16 0.59.27.png

注意点としては、CloudwatchへのIAMロールポリシーを設定する必要がありますので
忘れずに設定しておきましょう。

    {
      "Effect": "Allow",
      "Action": ["cloudwatch:*"],
      "Resource": ["*"]
    }


まとめ

Lambdaをcronのように利用してみました。
応用例を見ても分かる通り 一切サーバを使わずにcronっぽいことを実現できている のがポイントですね。
今回のはかなり無理矢理感漂いますが、何かのヒントになれば幸いです。
しかしLambda、アイデアによってはおもしろい使い方ができて楽しいですね!
今後が楽しみ〜

90
88
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
90
88