0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

@aws-sdk/client-sqs SQSジョブキューへのメッセージ送信 📤

Posted at

はじめに

実務にて、@aws-sdk/client-sqsライブラリを用いて、
 SQSジョブキューへのメッセージ送信機能
の実装を行いましたので、備忘録として参考ロジックの記載になります。

目次

  1. 使用パッケージ
  2. メインロジック
  3. インターフェース
  4. まとめ
  5. 参考文献

🪄 使用パッケージ

sqs/packages.json
"dependencies": {
 "@aws-sdk/client-sqs": "^3.830.0",
 "inversify": "^6.0.2"
 "uuid": "^11.1.0"
},

🪄 メインロジック

本ロジックはCSVのインポート/エクスポート機能の中で使われる実装となります。

内容としては、ジョブキューへのメッセージ送信を行うもので、送信したメッセージをトリガーに、その後のCSVインポート/エクスポート処理をLambdaが実行する流れになります。

sqs/repository/index.ts
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { injectable } from 'inversify';
import { v4 as uuidv4 } from 'uuid';
import type { ISQSRepository } from './interface';

@injectable()
export class SQSRepository implements ISQSRepository {
  private readonly sqsClient: SQSClient;

  constructor() {
    const configuration =
      process.env.ENV === 'local'
        ? {
            region: process.env.AWS_REGION || 'ap-northeast-1',
            endpoint: process.env.LOCAL_STACK_ENDPOINT || 'http://localhost:4566',
            credentials: {
              accessKeyId: process.env.AWS_ACCESS_KEY_ID || 'test',
              secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY || 'test',
              sessionToken: process.env.AWS_SESSION_TOKEN || 'test',
            },
            forcePathStyle: true,
          }
        : {};

    this.sqsClient = new SQSClient(configuration);
  }

  /**
   * インポート用キューへのメッセージ登録実行
   *
   * @param historyId - 履歴ID
   * @return ジョブキューへの送信結果
   */
  public async executeImportSendJob(historyId: string): Promise<boolean> {
    const queueUrl = process.env.IMPORT_QUEUE_URL;
    if (!queueUrl) {
      throw new Error('IMPORT_QUEUE_URL is not defined');
    }
    const messageId = await this.sendJob(queueUrl, historyId);
    return !!messageId;
  }

  /**
   * エクスポート用キューへのメッセージ登録実行
   *
   * @param historyId - 履歴ID
   * @return ジョブキューへの送信結果
   */
  public async executeExportSendJob(historyId: string): Promise<boolean> {
    const queueUrl = process.env.EXPORT_QUEUE_URL;
    if (!queueUrl) {
      throw new Error('EXPORT_QUEUE_URL is not defined');
    }
    const messageId = await this.sendJob(queueUrl, historyId);
    return !!messageId;
  }

  /**
   * 対象のキューへのメッセージ登録
   *
   * @param queueUrl 送信先のSQSキューURL
   * @param historyId - 対象ジョブのインポート履歴のID
   * @return メッセージID
   */
  private async sendJob(queueUrl: string, historyId: string): Promise<string | undefined> {
    try {
      const command = new SendMessageCommand({
        QueueUrl: queueUrl,
        MessageBody: historyId,
        MessageGroupId: uuidv4(),
        MessageDeduplicationId: uuidv4(),
      });
      const response = await this.sqsClient.send(command);
      return response.MessageId;
    } catch (error) {
      if (error instanceof Error) {
        throw new Error('sqs sendJob error:', {
          message: error.message,
          stack: error.stack,
          params: { queueUrl, historyId },
        });
      }
      throw error;
    }
  }
}

🪄 インターフェース

sqs/repository/interface.ts
export interface ISQSRepository {
  /**
   * インポート用キューへのメッセージ登録実行
   *
   * @param historyId - 履歴ID
   * @return ジョブキューへの送信結果
   */
  executeImportSendJob(historyId: string): Promise<boolean>;

  /**
   * エクスポート用キューへのメッセージ登録実行
   *
   * @param historyId - 履歴ID
   * @return ジョブキューへの送信結果
   */
  executeExportSendJob(historyId: string): Promise<boolean>;
}

🪄 まとめ

処理概要

  • SQSClientインスタンス化
  • インポート/エクスポート用キューへのメッセージ登録実行
    • 対象のキューへのメッセージ登録

改良点

  • throw new Error箇所は、ロガーライブラリを用いてカスタム可能なエラークラスを別ファイルで作成・参照する形にすると良い!
  • 上記はさらに、@sentry/nodeを用いて、Sentry通知可能にするとより実務的なサービス構成になる

🪄 参考文献

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?