はじめに
実務にて、@aws-sdk/client-sqsライブラリを用いて、
SQSジョブキューへのメッセージ送信機能
の実装を行いましたので、備忘録として参考ロジックの記載になります。
目次
🪄 使用パッケージ
sqs/packages.json
"dependencies": {
"@aws-sdk/client-sqs": "^3.830.0",
"inversify": "^6.0.2"
"uuid": "^11.1.0"
},
🪄 メインロジック
本ロジックはCSVのインポート/エクスポート機能の中で使われる実装となります。
内容としては、ジョブキューへのメッセージ送信を行うもので、送信したメッセージをトリガーに、その後のCSVインポート/エクスポート処理をLambdaが実行する流れになります。
sqs/repository/index.ts
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { injectable } from 'inversify';
import { v4 as uuidv4 } from 'uuid';
import type { ISQSRepository } from './interface';
@injectable()
export class SQSRepository implements ISQSRepository {
private readonly sqsClient: SQSClient;
constructor() {
const configuration =
process.env.ENV === 'local'
? {
region: process.env.AWS_REGION || 'ap-northeast-1',
endpoint: process.env.LOCAL_STACK_ENDPOINT || 'http://localhost:4566',
credentials: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID || 'test',
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY || 'test',
sessionToken: process.env.AWS_SESSION_TOKEN || 'test',
},
forcePathStyle: true,
}
: {};
this.sqsClient = new SQSClient(configuration);
}
/**
* インポート用キューへのメッセージ登録実行
*
* @param historyId - 履歴ID
* @return ジョブキューへの送信結果
*/
public async executeImportSendJob(historyId: string): Promise<boolean> {
const queueUrl = process.env.IMPORT_QUEUE_URL;
if (!queueUrl) {
throw new Error('IMPORT_QUEUE_URL is not defined');
}
const messageId = await this.sendJob(queueUrl, historyId);
return !!messageId;
}
/**
* エクスポート用キューへのメッセージ登録実行
*
* @param historyId - 履歴ID
* @return ジョブキューへの送信結果
*/
public async executeExportSendJob(historyId: string): Promise<boolean> {
const queueUrl = process.env.EXPORT_QUEUE_URL;
if (!queueUrl) {
throw new Error('EXPORT_QUEUE_URL is not defined');
}
const messageId = await this.sendJob(queueUrl, historyId);
return !!messageId;
}
/**
* 対象のキューへのメッセージ登録
*
* @param queueUrl 送信先のSQSキューURL
* @param historyId - 対象ジョブのインポート履歴のID
* @return メッセージID
*/
private async sendJob(queueUrl: string, historyId: string): Promise<string | undefined> {
try {
const command = new SendMessageCommand({
QueueUrl: queueUrl,
MessageBody: historyId,
MessageGroupId: uuidv4(),
MessageDeduplicationId: uuidv4(),
});
const response = await this.sqsClient.send(command);
return response.MessageId;
} catch (error) {
if (error instanceof Error) {
throw new Error('sqs sendJob error:', {
message: error.message,
stack: error.stack,
params: { queueUrl, historyId },
});
}
throw error;
}
}
}
🪄 インターフェース
sqs/repository/interface.ts
export interface ISQSRepository {
/**
* インポート用キューへのメッセージ登録実行
*
* @param historyId - 履歴ID
* @return ジョブキューへの送信結果
*/
executeImportSendJob(historyId: string): Promise<boolean>;
/**
* エクスポート用キューへのメッセージ登録実行
*
* @param historyId - 履歴ID
* @return ジョブキューへの送信結果
*/
executeExportSendJob(historyId: string): Promise<boolean>;
}
🪄 まとめ
処理概要
- SQSClientインスタンス化
- インポート/エクスポート用キューへのメッセージ登録実行
- 対象のキューへのメッセージ登録
改良点
-
throw new Error
箇所は、ロガーライブラリを用いてカスタム可能なエラークラスを別ファイルで作成・参照する形にすると良い! - 上記はさらに、@sentry/nodeを用いて、Sentry通知可能にするとより実務的なサービス構成になる