0
1

EventBridge Scheduler→Amazon SQS→Lambda の連携

Posted at

前回

前回はEC2上のプログラムからAWS SDKを使って、スケジュールタスクを作るというところまで作りました。!

システムなのでその後、実行したい何かがあるはずです。
次のロジックに繋がる出口の部分をAmazon SQS とLambdaを使って構築しました。

Amazon SQS

基本情報技術者試験で学ぶようなキューという概念だったりFIFOというものが活躍します。

Screenshot 2024-06-09 213340.jpg

何かプロセスが始まると別完了するまではプロセスの発行元は処理を待つ必要があります。
また、直接的な繋がりが発生しているので例えば左のコンポーネント別のシステムに転用するとなった際に、移行の手間も発生します。

Screenshot 2024-06-09 214157.jpg

メッセージングサービス(MQ)を使ってサービスを分断することで、処理を待たなくてもいったんプロセスが完了できるといった点やコンポーネント間の疎結合を実現することが可能です。
※シーケンス図を作成していると、マイクロサービスとかで多くのコンポーネントを横断するデータフローが出来上がることと思います。だいぶ楽になったと感じます。

SQSはAWSが提供するMQサービスの一つです。

Lambda

便利なサービスです

今回のアーキテクチャ

Screenshot 2024-06-09 215529.jpg

時間差で実行したい旨を引数とともにリクエストするのが
①EC2 -> EventBridgeのフロー
②MQを経て、Lambda以降で実行したい処理を実行するといった流れです。
(その手前までを記します)

①前回のプログラムの変更

const { SchedulerClient, CreateScheduleCommand, DeleteScheduleCommand } = require("@aws-sdk/client-scheduler");
const config = {
  region: 'Your Region',
};
const client = new SchedulerClient(config);

const express = require('express');
const path = require('path');

const app = express();
const port = 3000;

app.use(express.static('public'));

app.get('/create', async (req, res) => {
  try {
    const currentDate = new Date();
    const offset = 9 * 60 * 60 * 1000;
    const japanTime = new Date(currentDate.getTime() + offset);
    const futureDate = new Date(currentDate.getTime() + 5 * 60000);
    const iso8601String = futureDate.toISOString().slice(0, -5);

    const scheduleExpression_string = `at(${iso8601String})`;
    const scheduleParams = {
      Name: 'test',
      ScheduleExpression: scheduleExpression_string,
      Target: {
        Arn: 'SQS resource',
        RoleArn: 'role resource',
        Input: JSON.stringify({
          text: 'sent for scheduler',
          timestamp: new Date().toISOString(),
          metadata: {
            key1: 'value1',
            key2: 'value2'
          }
        }),
      FlexibleTimeWindow: {
        Mode: 'OFF',
      },
      ActionAfterCompletion: "DELETE"
    };
    const command = new CreateScheduleCommand(scheduleParams);
    const response = await client.send(command);

    console.log(`Response: ${response}`);
    console.log('Schedule created successfully!');
    res.send('Schedule created successfully!');

  } catch (error) {
    console.error('Error creating schedule:', error);
    res.status(500).send('Error creating schedule');
  }
});

app.get('/delete', async (req, res) => {
  try {
    const input = {
      Name: "test",
    };
    const command = new DeleteScheduleCommand(input);
    const response = await client.send(command);
    
    res.send('Schedule deleted successfully!');

  } catch (error) {
    console.error('Error deleting schedule:', error);
    res.status(500).send('Error deleting schedule');
  }
});

app.get('/', (req, res) => {
  res.sendFile(path.join(__dirname, 'public', 'index.html'));
});

app.listen(port, () => {
  console.log(`Server is running`);
});

変更点

Target: {
        Arn: 'SQS resource',
        RoleArn: 'role resource',
        Input: JSON.stringify({
          text: 'sent for scheduler',
          timestamp: new Date().toISOString(),
          metadata: {
            key1: 'value1',
            key2: 'value2'
          }
        }),

Schedulerから実行するAWSサービスがSQSになるので
SQS上で作成したキューを指定します。
それに伴い、SQSサービスにアクセスするための許可ポリシーの追加が必要です。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "sqs:*"
            ],
            "Effect": "Allow",
            "Resource": "*"
        }
    ]
}

テンプレートであった「AmazonSQSFullAccess」をそのまま追加しました。

重要な部分は、Input項目のところです。
ここに自由に渡したい引数をjson形式で記述します。
これが、MessageBodyとして、この後のSQS、そしてLambdaに引き継がれます。
※AWSドキュメントには、SQSsetParameterという項目があったので、
これに気づくまでにかなり時間を割きました。

②-1 SQSの設定

Screenshot 2024-06-09 181202.jpg

test.fifoというキューを作成して、①のTargetに指定してます。
キュータイプを標準か、FIFOの指定が可能ですが、
先にリクエストしたものからさばいて欲しいのでFIFOを選択してます。

②-2 Lambdaの設定

Screenshot 2024-06-09 221614.jpg

polling_testという関数を作成しています。
※左下にあるトリガー設定の部分でSQSを指定しているため、
リクエストがキューされるというイベントを検知してこの関数が実行されるので、
もはやポーリングではなくなりました。

関数のプログラム

※AWS SDKは既に内部に搭載されているらしい

export const handler = async (event) => {
    for (const record of event.Records) {
        
        const messageBody = record.body;
        const message = JSON.stringify(messageBody);
        
        // please write some logic
        await processMessage(message);
    }
    
    return {
        statusCode: 200,
        body: JSON.stringify('Message processed successfully!')
    };
};

const processMessage = async (messageBody) => {
    // process logic
    console.log(`Received message: ${messageBody}`);
};

processMessageというメソッドでSQSから送信されたメッセージのボディリクエスト内容を出力しているだけです。

しかし、要は

for (const record of event.Records) {
        
        const messageBody = record.body;
        const message = JSON.stringify(messageBody);
        
        // please write some logic
        await processMessage(message);
    }

message変数に①のInputで定義した引数が格納されています。

CloudWatch ※念のためMessageの確認

CloudWatchは、AWSサービス全般のメトリクスの収集ツールみたいなイメージです。
今回は具体的に言及しないですが、
Lambdaで作成した関数用のLog Groupsができていると思います。

Screenshot 2024-06-09 222350.jpg

Screenshot 2024-06-09 222545.jpg

Lambdaで、MessageBodyを文字列出力するようにプログラムで書きましたが、それが出力されているのがわかります。
image.png

まとめ

今回のフローをまとめると、
・EC2経由で数分後にSQSにキューするEventを引数を添えて作成して、Amazon EventBridge Schedulerにタスク定義する。
・指定した時間になったら、SchedulerからSQSキューにメッセージを送信する。
・メッセージの通知を受けて、Lambda関数がトリガーされ、今回はコンソール出力するという処理を実行する。

結論

もうちょっとAWS公式APIがわかりやすくなってくれたらな~

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1