PHP
aws-sdk
AmazonSQS

AWS SDK for PHPでAmazon SQSを使う

新しいサービスにメッセージキューを使ってみたかったのでAmazon SQSについて調べました。

メッセージキューとは何か

時間がかかる処理があるとして、

・メインのプログラムでは処理に使用するデータを一時保存して次の処理に進む
・他のプログラムから一時保存したデータを取り出して処理を行う

このような仕組みがメッセージキューです。キューにデータを保存するプログラムをclient、キューを監視してデータを取り出して実行するプログラムをworkerと呼んだりします。

非同期で処理を実行することで、メインのプログラムでは素早くレスポンスを返すことができるようになります。
また、マイクロサービスやサーバレスアーキテクチャのような分散型のシステムで複数のサービスをつなぐ際にも使用します。

Amazon SQSについて

https://aws.amazon.com/jp/sqs/

Amazon SQSはAmazonが提供しているメッセージキューのサービスです。
毎月100万リクエストまでは無料。その後は100万リクエストあたり0.4$。その上、AWSお馴染みの転送量課金があります。

Amazon SQSの機能

Amazon SQSの機能はシンプルです。

・キューを作成する
・キューを削除する
・キューにメッセージを保存する
・キューからメッセージを取り出す
・メッセージを削除する

PHPにAWS SDKを導入する

PHPでAmazon SQSのclient、workerを実装していきます。といっても大したことはなくて、AWSのSDKを使えば簡単に実装できます。

https://aws.amazon.com/jp/sdk-for-php/

SDKのインストールはcomposerで行います。

composer require aws/aws-sdk-php

AWSのSDKを使うためには認証情報(credential)を取得する必要があります。credentialの取得方法は
https://aws.amazon.com/jp/developers/access-keys/
を参照してください。

credentialはプログラムの中にハードコードすることも可能ですが、それはセキュリティー的にも作法的にもよろしくありません。今回はVirtualBox上のCentOSで動かしているので\$HOME/.aws/credentialsに認証情報を書いて参照します。$HOMEというのは環境変数でHOMEに設定されているディレクトリということです。

[default]
aws_access_key_id = ACCESS_KEY
aws_secret_access_key = SECRET_KEY

最初、nginx + php-fpmの環境でプログラムを実行したら、全然動きませんでした。phpinfoで確認したところ$HOMEが/var/lib/php/fpmになっていました。実行しているのがphp-fpmなので当たり前と言えば当たり前ですが、あまり気にしたことがなかったので少しはまりました。

キューの作成

キュー自体をsdkから作成することもできますが、今回はConsoleから作成します。ConsoleからAmazon SQSを選べば流れで作成できると思います。作成したキューのURLを保存します。このURLがapiのエンドポイントとなります。以下のサンプルではQUEUE_URLとか称されています。

メッセージを送る

メッセージを送ってみます。

<?php
require 'vendor/autoload.php';
use Aws\Sqs\SqsClient;
use Aws\Exception\AwsException;

define('QUEUE_URL', 'QUEUE_URL');

try{
    $client = new SqsClient([
        'profile' => 'default',
        'region' => 'ap-northeast-1',
        'version' => 'latest',
    ]);

    $params = [
        'DelaySeconds' => 0,
        'MessageAttributes' => [
            'Title' => [
                'DataType' => 'String',
                'StringValue' => 'Amazon SQS Test',
            ]
        ],
        'MessageBody' => 'This is Amazon SQS Test',
        'QueueUrl' => QUEUE_URL,
    ];

    $result = $client->sendMessage($params);
    var_dump($result);
} catch(AwsException $e){
    var_dump($e->getMessage());
}

メッセージのサイズは最大256kbです。

MessageAttributesでメッセージに最大10個まで属性をつけることができます。サンプルのDataTypeはStringですが、DataTypeにbinaryを指定すればbase64でエンコードしたバイナリを与えることもできます。

DelaySecondsは取り出せるまでの秒数を設定します。30と入れた場合、30秒後までは取り出すことができません。0-900まで指定できるので、ちょっとしたタイマーとして使用できます。

メッセージを受け取る

メッセージを受け取ります。

<?php

require 'vendor/autoload.php';
use Aws\Sqs\SqsClient;
use Aws\Exception\AwsException;

define('QUEUE_URL', 'QUEUE_URL');
try{
    $client = new SqsClient([
        'profile' => 'default',
        'region' => 'ap-northeast-1',
        'version' => 'latest',
    ]);
    $receive = [
        'AttributeNames' => ['All'],
        'MessageAttributeNames' => ['All'],
        'MaxNumberOfMessages' => 10,
        'QueueUrl' => QUEUE_URL,
        'WaitTimeSeconds' => 20,
        'VisibilityTimeout' => 60,
    ];
    while(true){
        $result = $client->receiveMessage($receive);
        $data = $result->get('Messages');
        if($data){
            foreach($data as $item){
                echo $item['Body'];
                //受け取って処理が終わったら削除する
                $client->deleteMessage([
                    'QueueUrl' => QUEUE_URL,
                    'ReceiptHandle' => $item['ReceiptHandle'],
                ]);
            }
        }
    }
} catch(AwsException $e){
    var_dump($e->getMessage());
}

Amazon SQSを監視するためにwhile文でループさせています。サンプルなので適当な作りですが、実際はそれなりに作り込んでsupervisordなどでデーモン化させるといいでしょう。

AttributeNamesMessageAttributeNamesは取得したい属性を指定します。AttributesはタイムスタンプなどSQSが付けてくる属性で、MessageAttributesは送信時に指定した属性が含まれます。AttributeNamesMessageAttributeNamesを指定しない場合、属性の情報は何も返ってきません。

VisibilityTimeoutを指定すると、指定した時間内は他のプログラムからは見えないようになります。複数のプログラムがAmazon SQSを叩いて同時に処理を実行してしまうことを防ぐためですね。

VisibilityTimeoutを超えそうになったらどうするのかという話は当然あるわけですが、VisibilityTimeoutは以下のような感じで変更することができます。

$result = $client->changeMessageVisibility([
    'QueueUrl' => QUEUE_URL,
    'ReceiptHandle' => $item['ReceiptHandle'],
    'VisibilityTimeout' => 1000,
]);

処理が終わったらメッセージは削除します。そうしないと何度でも返ってきます。

WaitTimeSecondsは0から20まで秒数を指定し、レスポンスを指定した時間待機する機能です。接続したタイミングでメッセージが存在しなくても、待っている間にメッセージが登録されれば即座にレスポンスが返る仕組みです。そうすることで、上のサンプルのようにwhile文の無限ループなどで監視している場合、APIを叩く回数を削減できます。

MaxNumberOfMessagesは一度に受け取るメッセージの数を1-10で指定します。普通ならば、一度に沢山取ってきた方がよいわけですが、Amazon SQSにはここに落とし穴があります。

Amazon SQSの落とし穴

落とし穴というのは、Amazon SQSは平然と同じメッセージを複数個返してくることです。MaxNumberOfMessagesに10を指定すると、同じメッセージが10個返ってくることもあります。

正確には、標準キューはメッセージが少なくとも一回配信されることしか保証していないのです。なので、同じメッセージを複数のサーバに保存して幾つも取ってくることがあります。また、メッセージの順番も保証されません。

当然、二回叩いて同じメッセージが返ってくることもあり得るわけですが、これは先ほど紹介したVisibilityTimeoutである程度回避することができます。

なので、Amazon SQSは順番が違ったり同じメッセージが複数返ってくることを前提として処理を作り込む必要があります。

一回限りの処理を保証するFIFOキュー

それらの落とし穴を回避する新しいQueueであるFIFOキューが一部のリージョンで実装されています。ちなみに東京リージョンでは現在まだ実装されていません。FIFOはFirst In First Out、先に入れたデータを先に取り出す、という意味ですね。

FIFOキューでは何度も同じメッセージが取り出される事はなく、入れたメッセージが入れた順番で取得できます。その代わり、メッセージ単位でのDelaySecondsの指定ができないとった制約もあります。

まとめ

AWS SDK for PHPを使ってAmazon SQSの実装をしました。SDKを使うと簡単にメッセージを保存したり受け取ったりできます。ただし、Amazon SQSは少なくとも一回配信されることしか保証されないという落とし穴もあるので、実装に気をつける必要もあります。