AWSのS3には「署名付きURL」があります。
この機能と似たようなことをSQSでも実現します。
どういうときに使うの?
たとえば、こんな感じのシステムを作りたいとします。
- 端末の画面状態(操作中かどうか、人がいるかどうか)を定期的にAWSに送信します。
- 端末の台数は1万台、送信間隔は5秒、AWSはSQSにデータを溜めておいて、1時間に1回解析します。
- 端末ではAWS-SDKが動かず、なおかつ簡単なHTTPS通信しかできません。
AWSで構成を考えてみます。
<構成A:API Gateway + Lambda>
安いと評判のLambda + API Gatewayですが、大量のリクエストを捌くとさすがに高価になります。
ざっくりした計算ですが、1日に1億7280万リクエストがあるとき、データの蓄積にかかる料金は以下の通りです。
サービス | 100万リクエストあたりの料金 | 1日にかかる料金 | 条件 |
---|---|---|---|
Lambda | 条件による | 387 USD | 平均1秒間処理、メモリ割当128MB |
Api Gateway | 4.25 USD | 731 USD | REST API |
SQS | 0.4 USD | 69 USD |
合計:1日あたり1187ドル
Lambdaの遅延、WAF、CloudFront、認証なんかが入ると、もっと上がります。
<構成B:AWS IoT + ルールエンジン + SQS>
AWS IoTからルールエンジンでSQSに転送する構成なら値段は下がります。ただ今回は…
サービス | 100万リクエストあたりの料金 | 1日にかかる料金 | 条件 |
---|---|---|---|
AWS IoT | 送信:1.2 USD ルール:0.18 USD アクション:0.18 USD |
268 USD | ルール+アクションを実行 |
SQS | 0.4 USD | 69 USD |
合計:1日あたり337ドル
今回は、ポートの都合でMQTTは無理、端末側の都合でWebSocketも無理です。
<構成C:SQSにそのまま入れる>
ところで、端末からSQSに直接送信できれば20分の1くらいの金額、毎日69ドルで済むのですが、
AWS-SDKが使えないのでセキュリティに難があります。なんかもういっそのこと…
そりゃそうですよね。
<構成D:SQS+署名付きURL>
よくよく考えると、SDKを使わない認証を入れて、SQSに直接送信できればうまくできそうです。
- 運用コストが安い
- どんなネットワーク、どんな環境でも実装できる
- セキュリティもそこそこ
そんな仕組みを作るために、SQS + 署名付きURLを実現していきます。
完成図
実施する処理と、それぞれの実行タイミングは以下の通りです
処理 | 対象 | タイミング |
---|---|---|
SQSの署名付きURLの取得 | 端末 → APIGateway | 15分に1回実行 |
SQSに端末状態を送信 | 端末 → SQS | 5秒に1回実行 |
- 署名付きURLは発行から15分で失効するため、定期的に更新が必要です。
SQSで投げるデータについて、今回は仮に以下の通りとします
送信するデータ | データの内容 |
---|---|
SQSのキューの名前 | sqs-send-request-test-0424 |
送信するデータのパターン | Open/Open(端末オン、人感オン) Close/Open(端末オフ、人感オン) Open/Close(端末オン、人感オフ) Close/Close(端末オフ、人感オフ) ※4通りのうち、いずれかを送信 |
SQSの署名付きURLを取得(15分に1回実行)
API Gatwayに署名付きURLをリクエストします 1
curl "https://xxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/url" -s -X POST \
-d '{"que_name":"sqs-send-request-test-0424", "patterns":["Open/Open","Close/Open","Open/Close","Close/Close"]}'
リクエストが成功すれば、データのパターンと同じ件数の署名付きURLが返ってきます
{
"url": {
"Open/Open": "https://sqs.ap-northeast-1.amazonaws.com/xxxxxx/sqs-send-request-test-0424?AWSAccessKeyId=xxxxx&Action=SendMessage&MessageBody=Open%2FOpen&SignatureMethod=HmacSHA256&SignatureVersion=2&Timestamp=2020-04-30T10%3A42%3A54&Version=2012-11-05&Signature=xxxxxxxxxxxxxxxxxxxxx",
"Close/Open": "https://sqs.ap-northeast-1.amazonaws.com/xxxxxx/sqs-send-request-test-0424?AWSAccessKeyId=xxxxx&Action=SendMessage&MessageBody=Close%2FOpen&SignatureMethod=HmacSHA256&SignatureVersion=2&Timestamp=2020-04-30T10%3A42%3A54&Version=2012-11-05&Signature=xxxxxxxxxxxxxxxxxxxxx",
"Open/Close": "https://sqs.ap-northeast-1.amazonaws.com/xxxxxx/sqs-send-request-test-0424?AWSAccessKeyId=xxxxxx&Action=SendMessage&MessageBody=Open%2FClose&SignatureMethod=HmacSHA256&SignatureVersion=2&Timestamp=2020-04-30T10%3A42%3A54&Version=2012-11-05&Signature=xxxxxxxxxxxxxxxxxxxxx",
"Close/Close": "https://sqs.ap-northeast-1.amazonaws.com/xxxxxx/sqs-send-request-test-0424?AWSAccessKeyId=xxxxxx&Action=SendMessage&MessageBody=Close%2FClose&SignatureMethod=HmacSHA256&SignatureVersion=2&Timestamp=2020-04-30T10%3A42%3A54&Version=2012-11-05&Signature=xxxxxxxxxxxxxxxxxxxxx"
}
}
SQSに端末状態を送信(5秒に1回実行)
送信するURLは、data["url"][${送りたい状態}]のデータです。
例では「Open/Close」を送っています。
curl "https://sqs.ap-northeast-1.amazonaws.com/xxxxxx/sqs-send-request-test-0424?AWSAccessKeyId=xxxxxx&Action=SendMessage&MessageBody=Open%2FClose&SignatureMethod=HmacSHA256&SignatureVersion=2&Timestamp=2020-04-30T10%3A42%3A54&Version=2012-11-05&Signature=xxxxxxxxxxxxxxxxxxxxx"
実行するたびに、設定したトピックに${送りたい状態}のメッセージが登録されます。
例では「sqs-send-request-test-0424」に「Open/Close」が登録されています。
実装
下準備として、SQSでキューを作っておきます。
好きなキューの名前を入れて、画面下の「キューのクイック作成」をクリックします。
また、SQS専用のIAMユーザーを作っておきます。
ポリシーにはAmazonSQSFullAccessを設定します。SQS以外の権限は不要です。
IAMユーザーを作成したら、アクセスキーIDとシークレットアクセスキーを取得しておきます。
Lambdaを実装する
Python3.8でLambdaを作成します。
Lambdaの環境変数には以下の通り設定します。
環境変数のキー | 設定する値 |
---|---|
AWS_ACCOUNT_NUMBER | AWSアカウントIDの番号 (IAMの画面の左下に出ている12桁の数字のこと) |
SQS_ACCOUNT_ID | SQS専用のIAMユーザーのアクセスキーID |
SQS_SECRET_KEY | SQS専用のIAMユーザーのシークレットアクセスキー |
Lambdaに以下のソースを貼り付けます。
# coding: utf-8
import boto3, json, hashlib, hmac, base64, os
from datetime import datetime, timezone, timedelta
from urllib import parse as url_encode
# 署名情報(SHA256形式、署名バージョンは2)
SIGNATURE_METHOD = "HmacSHA256"
SIGNATURE_VERSION = "2"
ENCODE = "utf-8"
# SQSのメソッド情報(REST APIのGETを使ってQueueにメッセージを送信する)
HTTP_METHOD = "GET"
SQS_METHOD = "SendMessage"
AWS_VERSION = "2012-11-05"
# タイムゾーン
UTC_TIMEZONE = timezone(timedelta(hours = 0), 'UTC')
class Credentials:
"""
IAMユーザーから認証情報を設定する
"""
@staticmethod
def from_iam():
instance = Credentials()
instance.aws_access_key = os.environ.get("SQS_ACCOUNT_ID", DEFAULT.SQS_ACCOUNT_ID)
instance.aws_secret_key = os.environ.get("SQS_SECRET_KEY", DEFAULT.SQS_SECRET_KEY)
return instance
class Endpoint:
"""
SQSのエンドポイントの情報を設定する
"""
def __init__(self, topic_name):
self.protocol = "https"
self.host_name = "sqs.{}.amazonaws.com".format(os.environ.get("AWS_REGION", DEFAULT.AWS_REGION))
self.url_path = "/{}/{}".format(os.environ.get("AWS_ACCOUNT_NUMBER", DEFAULT.AWS_ACCOUNT_NUMBER), topic_name)
@property
def url(self):
return f"{self.protocol}://{self.host_name}{self.url_path}"
# SQSの署名付きURLを作成する
def create_presigned_url(credential, endpoint, message):
# 現在日時をUTCで取得、文字列に変換する
current_date = datetime.now().astimezone(UTC_TIMEZONE)
current_date_str = url_encode.quote(
current_date.strftime('%Y-%m-%dT%H:%M:%S')
)
# 送信するデータをもとに、シークレットアクセスキーでハッシュを作成する
return endpoint.url + "?" + create_query(SQS_METHOD, message, credential.aws_access_key, current_date_str, option = {
# ハッシュデータはバイト配列になっているため、GETで送信できるようにURLエンコード+base64エンコードする
"Signature" : url_encode.quote(
base64.b64encode(
sign(
# 署名に使うシークレットアクセスキーを指定する
credential.aws_secret_key.encode(ENCODE),
# 送信データを指定する(指定したデータはSHA256でハッシュ化される)
create_certificate(endpoint, SQS_METHOD, message, credential.aws_access_key, current_date_str)
)
)
)
})
# シークレットアクセスキーでハッシュ化する
def sign(key, msg):
return hmac.new(key, msg.encode(ENCODE), hashlib.sha256).digest()
# v2の形式で署名データを作成する
# フォーマットは公式ドキュメント参照(https://docs.aws.amazon.com/ja_jp/general/latest/gr/signature-version-2.html)
def create_certificate(endpoint, method, message_body, aws_access_key, current_date_str):
return "\n".join([
HTTP_METHOD,
endpoint.host_name,
endpoint.url_path,
create_query(method, message_body, aws_access_key, current_date_str)
])
# クエリデータを正規化する
# フォーマットは公式ドキュメント参照(https://docs.aws.amazon.com/ja_jp/general/latest/gr/signature-version-2.html)
def create_query(method, message_body, aws_access_key, current_date_str, option = None):
query_map = {
"AWSAccessKeyId" : aws_access_key,
"Action" : method,
"MessageBody" : message_body,
"SignatureMethod" : SIGNATURE_METHOD,
"SignatureVersion" : SIGNATURE_VERSION,
"Timestamp" : current_date_str,
"Version" : AWS_VERSION
}
# Signatureはハッシュには含めず、送信するときにだけ含める必要がある
# もしSignatureが指定されていれば、クエリにデータを追加する
if option is not None:
query_map.update(option)
# GETクエリの形式に変換する
return "&".join([
f"{key}={value}"
for key, value in query_map.items()
])
# 必要な件数だけ署名付きURLを作成する
def request(credential, endpoint, data_patterns):
url = {}
for pattern in data_patterns:
url[pattern] = create_presigned_url(credential, endpoint, url_encode.quote(pattern, safe = ""))
return {
"url" : url
}
# APIGateway(HTTP API)の引数から、POSTされたデータを取得する(データはAPIGateway側でbase64エンコードされる)
def get_payload_from_event(event):
payload_str = ""
if event["isBase64Encoded"]:
payload_str = base64.b64decode(event["body"].encode(ENCODE))
else:
payload_str = event["body"]
return json.loads(payload_str)
# Lambda実行時のエントリポイント
def lambda_handler(event, context):
payload = get_payload_from_event(event)
return {
'statusCode': 200,
'body': json.dumps(request(Credentials.from_iam(), Endpoint(payload["que_name"]), payload["patterns"]))
}
# ---------------------------------------------
# 以下はローカル実行用
# Lambda以外で実行するときに指定する
# ---------------------------------------------
# ローカル実行用変数
class _Default:
def __init__(self):
self.SQS_ACCOUNT_ID = "" # IAMのアクセスキーIDを指定する
self.SQS_SECRET_KEY = "" # IAMのシークレットキーを指定する
self.AWS_ACCOUNT_NUMBER = "" # AWSのアカウントIDを指定する
self.AWS_REGION = "ap-northeast-1" # SQSのあるリージョンを指定する
self.SQS_QUEUE_NAME = "" # 送信先のキュー名を指定する
DEFAULT = _Default()
# ローカル環境デバッグ実行時のエントリポイント
if __name__ == "__main__":
print(json.dumps(request(Credentials.from_iam(), Endpoint(DEFAULT.SQS_QUEUE_NAME), [
"Open/Open", "Close/Open", "Wait/Open",
"Open/Close", "Close/Close", "Wait/Close",
"Open/Wait", "Close/Wait", "Wait/Wait"
])))
作成したLambdaを、APIGatewayのバックエンドに設定します。
※例のAPIGatewayはHTTP APIで作成しています。
動けばOKなら、ここまで読んでいただければ大丈夫です。
APIGatewayをデプロイすれば、「完成図」と同じように動くようになっているはずです。
ここからは詳しい話になります。
詳しい話
SQSにはもともと、AWS-SDKを通さずに、GETやPOSTでメッセージを送信する仕組みがあります。
参考:クエリ API リクエストを行う
https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-making-api-requests.html
匿名ユーザーで、全体公開したキューに送信する
一番簡単なリクエストの方法は以下の通りです。
参考サイトにあるURLをたたくと、キュー(MyQueue)にメッセージ(data)が送信されます。2
https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue ?
Version = 2012-11-05 &
Action = SendMessage &
MessageBody = data
なお、このクエリで飛ぶのは匿名ユーザーのメッセージです。デフォルトのキューでは権限エラーで届きません。
この状態のメッセージを受け取るには、全体公開を受け入れるように設定しておく必要があります。
キューのアクセス許可をキャプチャの通り変更して、「全員」にチェックを入れます。
特定のユーザーだけを許可したキューに送信する
全体公開を受け入れる設定をせず、特定のユーザーで送信したいのなら、「自分が誰なのか」を設定します。
https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue ?
Version = 2012-11-05 &
Action = SendMessage &
MessageBody = data &
AWSAccessKeyId = AKIA****************
AWSAccessKeyIDを設定したので、「誰か」が伝わるようになりました。
ただ、これだと本当に本人かどうかがわかりません。自称本人のなりすましかもしれないですよね。
シークレットアクセスキーを使って、本人だと証明してあげます。
ただ、そのままシークレットアクセスキーを送るわけにはいかないので、ハッシュ化したものを送ります。
https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue ?
Version = 2012-11-05 &
Action = SendMessage &
MessageBody = data &
AWSAccessKeyId = AKIA**************** &
Signature = ********************************** &
SignatureMethod = HmacSHA256 &
SignatureVersion = 2 &
Timestamp = 2020-04-30T10:42:54
Signatureは、送信するデータを、シークレットアクセスキーを鍵にしてハッシュ化したものです。
送信するデータは、Signatureを除いたクエリ全体と、ホストの情報を合わせたものです。
SignatureMethodはSignatureのハッシュ化に使ったアルゴリズムです。
HMAC-SHA256なので、そのことをAWSに伝えます。
SignatureVersionは署名バージョンです。今回は署名バージョン2を使っています。
TimeStampはハッシュ化した日時のことです。
参考:署名バージョン 2 の署名プロセス
https://docs.aws.amazon.com/ja_jp/general/latest/gr/signature-version-2.html
ところで本家の署名付きURLとは何が違うの?
本家のS3の「署名付きURL」と比べると、挙動が異なります。
URL発行後、データが変更できる範囲が異なる
SQSでは、送信するメッセージの内容によって署名が変わります。
S3では、送信するファイルの内容がどうだとしても、同じ署名で送信できます。
SQSで違うメッセージを送るには、その数だけSignatureを発行する必要があります。
STSで渡した一時認証情報の生存期間が異なる
S3の場合、Lambdaで発行した一時的なアクセスキーIDは、署名付きURLの期限が切れるまで利用できます。
SQSでも同じように、一時的なアクセスキーIDとシークレットアクセスキーを発行して付与するとどうなるでしょう。
Lambdaが終了するとアクセスキーIDが消えるため、LambdaからAPIGatewayにURLを返した時点で認証が通らなくなります。