はじめに
フューチャー Advent Calendarの14日目の記事です。
前日は@statatasanさんのデータがあるのでBigQuery MLで分析してみるという風潮を作りたい話でした。
背景
AWS Lambda(以下Lambda)はAmazon Web Service(以下AWS)が提供するFaaS(Function as a Service)であり、簡単にサーバーレスコンピューティングを実現できます。
一般的には、1つのLambda関数が1つのAWSサービスからのイベントを受け取れるように関数を実装します。しかしながら私が経験した事例では、下記の背景からAmazon Simple Queue Service(以下SQS)からのイベントとAmazon Simple Storage Service(以下S3)からのイベント通知を同一のLambda関数で処理できるような構成を取ろうと考えました。
- 非同期処理の実現のためにキューイングサービスとしてSQSキューを、コンシューマとしてLambda関数を利用する
- システム全体の可用性の向上のため、S3バケットを補助的にキューとして利用する1
- メッセージのプロデューサは何らかの理由でメッセージをSQSキューへ送信できなかった場合、S3バケットへメッセージを含むファイルを格納する2
- S3イベント通知機能を利用し、S3バケットへファイルが格納されたことをトリガーとしてLambda関数を起動し、非同期処理を実施する
- SQSキューまたはS3バケットからメッセージを受け取ってからの処理は同一のロジックを利用するため、Lambda関数のメンテナンス性の観点から同一のLambda関数で各々のイベントを受け取れるようにしたい
本記事では、このような事例に対してどのようにLambdaを実装したのかを紹介します。
実装方針
まずはSQSからのイベントとS3からのイベントを区別できるのかを確認するために、SQSとS3からLambdaに送られるイベントの中身を見てみます。
SQSイベントの例
Amazon SQS での Lambda の使用より引用しました。
{
"Records": [
{
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185"
},
"messageAttributes": {},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2"
},
{
"messageId": "2e1424d4-f796-459a-8184-9c92662be6da",
"receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082650636",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082650649"
},
"messageAttributes": {},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2"
}
]
}
S3イベント通知の例
イベントメッセージの構造より引用しました。
{
"Records":[
{
"eventVersion":"2.1",
"eventSource":"aws:s3",
"awsRegion":"us-west-2",
"eventTime":"1970-01-01T00:00:00.000Z",
"eventName":"ObjectCreated:Put",
"userIdentity":{
"principalId":"AIDAJDPLRKLG7UEXAMPLE"
},
"requestParameters":{
"sourceIPAddress":"127.0.0.1"
},
"responseElements":{
"x-amz-request-id":"C3D13FE58DE4C810",
"x-amz-id-2":"FMyUVURIY8/IgAtTv8xRjskZQpcIZ9KG4V5Wp6S7S/JRWeUWerMUE5JgHvANOjpD"
},
"s3":{
"s3SchemaVersion":"1.0",
"configurationId":"testConfigRule",
"bucket":{
"name":"amzn-s3-demo-bucket",
"ownerIdentity":{
"principalId":"A3NL1KOZZKExample"
},
"arn":"arn:aws:s3:::amzn-s3-demo-bucket"
},
"object":{
"key":"HappyFace.jpg",
"size":1024,
"eTag":"d41d8cd98f00b204e9800998ecf8427e",
"versionId":"096fKKXTRTtl3on89fVO.nfljtsv6qko",
"sequencer":"0055AED6DCD90281E5"
}
}
}
]
}
各々のサービスから送られるイベントを見てみると、$.Records[0].eventSource
にどのサービスからのイベントかを示す情報が格納されているようです。
そこで、下のような処理方針とすればメッセージの処理をうまくできそうです。
- Lambdaへの入力値から
$.Records[0].eventSource
を取得する - 取得された情報をもとにメッセージを取得する
- SQSからのイベントの場合、入力値からメッセージを取得する
- S3からのイベントの場合、入力値からバケットとオブジェクトキーの情報を取得し、S3バケットからメッセージを取得する
- 取得したメッセージをもとに処理を実施する
実装例
上記の方針をもとに実装した検証用アプリケーションを示します。
本事例ではプロジェクトの指針として開発言語にJavaを用いることとなっていたため、Java 21とmavenを利用して実装しています。
また、maven依存関係のバージョンは2024年12月現在の最新版を利用しています。
構成
検証用のアプリケーションの構成は下図の通りです。SQSキューからメッセージを受け取ったこと、またはS3バケットにオブジェクトが格納されたことをトリガーにLambda関数を起動し、ログにメッセージを出力します。
ソースコード
javaを利用した実装例を示します3。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.myapp</groupId>
<artifactId>myapp</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<maven.shade.plugin.version>3.6.0</maven.shade.plugin.version>
<maven.compiler.plugin.version>3.6.1</maven.compiler.plugin.version>
<exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
<aws.java.sdk.version>2.29.37</aws.java.sdk.version>
<aws.lambda.java.core.version>1.2.3</aws.lambda.java.core.version>
<aws.lambda.java.events.version>3.14.0</aws.lambda.java.events.version>
<junit5.version>5.11.4</junit5.version>
<jackson.version>2.18.2</jackson.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>${aws.java.sdk.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<exclusions>
<exclusion>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
</exclusion>
<exclusion>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3-event-notifications</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-crt-client</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-core</artifactId>
<version>${aws.lambda.java.core.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>${aws.lambda.java.events.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>${aws.lambda.java.events.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Test Dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit5.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.compiler.plugin.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven.shade.plugin.version}</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<finalName>myapp</finalName>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!-- Suppress module-info.class warning-->
<exclude>module-info.class</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.example.myapp;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import software.amazon.awssdk.eventnotifications.s3.model.S3EventNotification;
import software.amazon.awssdk.eventnotifications.s3.model.S3EventNotificationRecord;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
public class App implements RequestHandler<Map<String, Object>, Void> {
private final S3Client s3Client;
private final ObjectMapper mapper =
JsonMapper.builder().enable(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES).build();
public App() {
// 連続的なLambdaの呼び出しで使いまわせるように、コンストラクタを利用して初期化する
this.s3Client = S3Client.create();
}
@Override
public Void handleRequest(final Map<String, Object> input, final Context context) {
final LambdaLogger logger = context.getLogger();
final String eventSource = findEventSource(input);
// イベントソースがSQSの場合
if (Objects.equals(eventSource, "aws:sqs")) {
logger.log("SQSキューからのイベントを受け取りました");
// SQSEvent型へ変換
final SQSEvent event = this.mapper.convertValue(input, SQSEvent.class);
// メッセージを取得し、出力する
for (SQSMessage message : event.getRecords()) {
logger.log(message.getBody());
}
return null;
}
// イベントソースがS3の場合
if (Objects.equals(eventSource, "aws:s3")) {
logger.log("S3バケットからのイベント通知を受け取りました");
// S3EventNotification型へ変換
S3EventNotification notification;
try {
notification = S3EventNotification.fromJson(this.mapper.writeValueAsBytes(input));
} catch (JsonProcessingException e) {
logger.log("入力値が不正です");
throw new IllegalArgumentException(e);
}
// イベント通知からS3オブジェクトの情報を取得
final S3EventNotificationRecord s3Record = notification.getRecords().getFirst();
final String bucket = s3Record.getS3().getBucket().getName();
final String key = s3Record.getS3().getObject().getKey();
// S3バケットからオブジェクトを取得し、メッセージを出力する
final String message =
this.s3Client
.getObjectAsBytes(GetObjectRequest.builder().bucket(bucket).key(key).build())
.asString(StandardCharsets.UTF_8);
logger.log(message);
return null;
}
// どちらでもない場合、エラーを返す
logger.log("入力値が不正です");
throw new IllegalArgumentException();
}
private String findEventSource(Map<String, Object> input) {
// "$.Records[0].eventSource"に格納されている値を返す
try {
final JsonNode node = this.mapper.valueToTree(input);
return node.get("Records").get(0).get("eventSource").asText();
} catch (Exception e) {
// 見つからなければ空文字を返す
return "";
}
}
}
上記の実装について抜粋して説明します。
入力値の型について
public Void handleRequest(final Map<String, Object> input, final Context context)
AWSの特定のサービスからLambdaを起動する場合、AWSが公式に提供している各イベントに対応したPOJOクラスのライブラリ(aws-lambda-java-events)を使うことができますが、本事例のように複数のサービスからのイベントをトリガーとする場合には不適です。
今回はJSONを入力として受け付けられるように、Map<String, Object>
型を入力タイプとすることにしました4。
$.Records[0].eventSource
の取得
private String findEventSource(Map<String, Object> input) {
// JSONPath表現で"$.Records[0].eventSource"に格納されている値を返す
try {
final JsonNode node = this.mapper.valueToTree(input);
return node.get("Records").get(0).get("eventSource").asText();
} catch (Exception e) {
// 見つからなければ空文字を返す
return "";
}
}
Map型を使うことでLambda関数への入力としてJSONを扱えるようになりますが、そのまま値を取得しようとすると型キャストが頻発し、ソースの可読性低下につながります。そこで、入力をJsonNode型に変換したうえで値を取得するような方式をとることにしました。
asText()
は対象nodeが存在しない場合には空文字を返すため5、入力のJsonNodeへの変換に失敗したときも同様に空文字を返すようにしています。
やってみた
上記のような実装で複数AWSサービスからのイベントを受け付けられるのか、検証してみました。
1. Lambda関数の作成
マネジメントコンソールからLambda関数を作成します。
上記ソースコードをビルドしてjarを作成し、デプロイします。
ハンドラを変更し、Lambda関数が呼び出されたときに実行するメソッドを指定します。
この関数がSQSおよびS3へアクセスできるようにするため、実行ロールにAmazonS3ReadOnlyAccess
とAWSLambdaSQSQueueExecutionRole
を追加します。
ちなみに、メッセージをログ出力するために必要なロールはLambda関数をマネジメントコンソールから作成すると自動で追加してくれます。便利ですね。
2. トリガーの作成
SQSキューおよびS3バケットを作成し、それぞれのイベントをトリガーとして1. で作成したLambda関数を呼び出すように設定します。詳細は割愛しますが、今回は下図のようにトリガーを設定しました。
3. Lambda関数のテスト
送信完了後、期待通りメッセージがログに出力されていることを確認できました!
期待通り、S3バケットからのメッセージもログに出力されていることを確認できました!
まとめ
SQSおよびS3からのイベントをトリガーとして起動するLambda関数の実装例について紹介しました。
本記事が何らかのお役に立てば幸いです。
-
S3バケットをキューとして利用することで、プロデューサがキューに送信したメッセージの順序とコンシューマが処理するメッセージの順序は一致しない可能性があるため、注意が必要です。幸い、本事例ではメッセージの順序性は重要でなく、アプリケーションのロジックによる制御で対応することができました。 ↩
-
SQSキューへメッセージを送信できない原因の一例として、メッセージサイズの上限超過が挙げられます。AWSのドキュメントにある通り、SQSキューには256 KiBを超えるメッセージを送信できません。
サイズの大きなメッセージをSQSキューで扱うための拡張ライブラリがAWSから提供されています。しかし、このライブラリはメッセージがキューへ送信された際にメッセージをS3バケットへ保存し、保存したオブジェクトの参照情報をSQSキューに送信するというもので、SQS自体の障害には対応できないと判断して採用を見送りました。 ↩ -
mavenプロジェクトの初期構成にはAWSのドキュメントを参考にしました。 ↩