LoginSignup
0
0

前回と前々回

EC2 → Amazon EventBridge → Amazon SQS → Lambda

という処理の流れをこれまで構築してきました。
これで時間差で実行したい処理を疎結合気味に実行することができます。

問題はこのLambda以降の流れになります。

Lambdaについて

SQS → Lambda
のようなAWSサービスをトリガーにしてLambda関数が実行される仕組みのことをイベントソースマッピングといいます。

そしてこのイベントソースマッピングによるLambda関数の同時実行数が1000と記述があります。
それ以上のLambdaイベントが失われてしまいますので結構厄介な仕様です
開封されるSQSのバッチサイズ、このLambda関数によるトランザクションの処理時間の調整を頑張れば余程発生しなさそうですが、ちょっと心許ないです。

この心配を解消するために、Lambdaはイベントのトリガーにのみ任せることして、
後の処理はEC2などのサービスに託して早々にプロセスを終了するのがよいのではと考えました。

よって、LambdaでEC2上のAPIを実行 → EC2で処理を開始 → 処理の完了を待たずにLambdaにreturnしてLambdaのプロセスを終了 → EC2での処理は継続 → やがて終了

これを実現するための機能が非同期処理になります。

今回は、Lambdaのプロセスが完了した後も、EC2で処理が継続されているか検証するためのプログラムを作成します。

Lambda プログラム

import.mjs →import.js に変更

const http = require('http');

exports.handler = async (event) => {
    for (const record of event.Records) {
        const messageBody = record.body;
        await processMessage(messageBody);
    }
    const currentDate = new Date();
    const currentTime = new Date(currentDate.getTime());
    const iso8601String = currentTime.toISOString();
    console.log(`Finish! ${iso8601String}`);
    return {
        statusCode: 200,
        body: JSON.stringify('Messages processed successfully!')
    };
};

const processMessage = async (messageBody) => {
    // Message processing logic
    console.log(`Received message: ${messageBody}`);
    
    // Call EC2 API endpoint
    const ec2Endpoint = "EC2 Endpoint";
    try {
        const response = await makeHttpPostRequest(ec2Endpoint, { message: messageBody });
        console.log('EC2 API response:', response);
    } catch (error) {
        console.error('Error calling EC2 API:', error);
    }
};

const makeHttpPostRequest = (url, postData) => {
    return new Promise((resolve, reject) => {
        const req = http.request(url, {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json',
            },
        }, (res) => {
            res.on('data', (chunk) => {
                // This part is omitted
            });

            res.on('end', () => {
                if (res.statusCode === 200) {
                    resolve();
                } else {
                    reject(new Error(`Status code: ${res.statusCode}`));
                }
            });
        });

        req.on('error', (err) => {
            reject(err);
        });

        req.write(JSON.stringify(postData));
        req.end();
    });
};

EC2 プログラム

リクエストを受けた後、30秒のカウントダウンプログラムを実行して、早々にLambdaへreturnをする。

//for parse of json 
app.use(express.json());

// Endpoint to handle POST requests
app.post('/start-task', (req, res) => {
  // Log the message from request body
  console.log('Received message:', req.body);

  // Job processing
  // For example, running a countdown for 100 seconds
  countdown(30);

  // Send response
  res.status(200).send('Job started successfully!');
});

// Function to run a countdown for 100 seconds
const countdown = (seconds) => {
  const interval = setInterval(() => {
      console.log(seconds + ' seconds remaining...');
      seconds--;
      if (seconds < 0) {
          clearInterval(interval);
          console.log('Countdown complete!');
          const currentDate = new Date();
          const currentTime = new Date(currentDate.getTime())
          const iso8601String = currentTime.toISOString()
          console.log(iso8601String)
      }
  }, 1000);
};

今回使用した非同期関数は、setInterval
引数に指定した1000ミリ秒(1秒)ごとに内部処理を繰り返す。

Lambdaの関数のプロセス終了時刻(っぽいところ)をcloud watchログで確認
image.png

EC2でのカウントダウンプログラムの終了時刻をコンソールで確認
image.png

先に始まったLambdaが終了し、約30秒後までEC2で処理が継続していたことが確認できる。

このロジックの可能性は検証できたが・・・

非同期処理の細かい仕組みまでは理解していないので、あまりコードに言及はできず
もう少し勉強が必要

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0