概要
AWS IoT CoreのMQTTブローカーをWebアプリケーションから使うときはMQTT over WebSocketを使用します。その際、一般的にはAWS IoT Device SDK for JavaScript v2を使うと思います。
ただし、例えば(あまり多くはないと思いますが)以下のようなケースでは、MQTT.jsなどの汎用MQTTクライアントを使いたいことがあります。
- オンプレミスで構築したMosquittoとAWS IoT Coreを切り替えて使いたい
- 既存のMQTTクライアント実装を流用したい
なお、WebアプリからAWS IoT Coreに接続する方法として、以下の2つが考えられます。
- サーバー側でAWS Security Token Service(STS)により一時認証情報を発行し、Web側でIoT Device SDKを使って接続する方法
- サーバー側で一時認証情報を用いて署名済みWebSocket URLを発行し、Web側はMQTT.jsなどで接続する方法
この記事では、サーバー側(Python)で、AWS Signature Version 4により署名したWebSocket URL(wss://)を発行する手順を説明します。
ポイント
URLへの署名ですので、通常のAPIリクエストへの署名と同様に botocore.auth.SigV4QueryAuth() を使えばできると思っていたのですが、AWS IoT CoreのWebSocket URLの場合は少し異なります。
AWSドキュメント「一時的なセキュリティ認証情報でリクエストに署名する」によると、
一部のサービスでは、正規リクエストに
X-Amz-Security-Tokenを追加する必要があります。その他のサービスでは、署名の計算後に、X-Amz-Security-Tokenを末尾に追加するだけです。
とのことですが、今回の場合は後者の「その他のサービス」に該当するようです。
つまり、
- 署名の計算にはセッショントークンは使わない
- 署名後にクエリパラメータとして
X-Amz-Security-Tokenを追加する
という手順になります。
実装
上記のポイントを理解していれば、実装はシンプルです。ここではbotocoreを使って実装してみます(botocoreを使わない実装も後ほど紹介します)。
なお、botocoreは公式にはエンドユーザー利用を想定したAPIではない(?)との記事もみかけたので、将来的に予告なく仕様が変わる可能性がある点には注意してください。
動作確認
- Rocky Linux 9.4
- Python 3.14.2
- boto3 1.42.8
- botocore 1.42.8
botocoreを使った実装
#!/usr/bin/env python3
import json
from uuid import uuid4
from datetime import datetime, timezone
from urllib.parse import urlencode
import boto3
from botocore.credentials import ReadOnlyCredentials
from botocore.auth import SigV4QueryAuth
from botocore.awsrequest import AWSRequest
def get_credentials(base_role_arn, region, account_id, topic, duration_seconds):
# STSの一時認証情報を得る
session_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["iot:Connect"],
"Resource": "*", # 必要ならClientIDで制限もできるはず
},
# Subscribe / Receive 権限: 指定プレフィックスだけ
{
"Effect": "Allow",
"Action": ["iot:Subscribe", "iot:Receive"],
"Resource": [
f"arn:aws:iot:{region}:{account_id}:topicfilter/{topic}",
f"arn:aws:iot:{region}:{account_id}:topic/{topic}",
]
},
# Publish 権限: 同じプレフィックスだけ
{
"Effect": "Allow",
"Action": ["iot:Publish"],
"Resource": f"arn:aws:iot:{region}:{account_id}:topic/{topic}"
}
]
}
session_name = "web-mqtt-" + str(uuid4())
res = boto3.client("sts").assume_role(
RoleArn=base_role_arn,
RoleSessionName=session_name,
DurationSeconds=duration_seconds,
Policy=json.dumps(session_policy),
)
creds = res["Credentials"]
expiration: datetime = creds["Expiration"]
return {
"accessKeyId": creds["AccessKeyId"],
"secretAccessKey": creds["SecretAccessKey"],
"sessionToken": creds["SessionToken"],
"expiration": expiration.astimezone(timezone.utc).isoformat(),
}
def build_signed_url(region, host, creds, duration_seconds):
# 認証情報はbotocoreが期待する形のReadOnlyCredentialsオブジェクトで渡す
# 署名にセッショントークンを使わないため、token=Noneにしておく
credentials = ReadOnlyCredentials(creds["accessKeyId"], creds["secretAccessKey"], token=None, account_id=None)
# リクエストを作成し、署名、セッショントークンを追加する
request = AWSRequest(method="GET", url=f"wss://{host}/mqtt")
signer = SigV4QueryAuth(credentials, "iotdevicegateway", region, duration_seconds)
signer.add_auth(request)
return request.url + "&" + urlencode({"X-Amz-Security-Token": creds["sessionToken"]})
if __name__ == "__main__":
REGION = "ap-northeast-1"
ACCOUNT_ID = "123456789012"
BASE_ROLE_ARN = f"arn:aws:iam::{ACCOUNT_ID}:role/iot-core"
HOST = "xxxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
TOPIC = "test/topic"
DURATION = 900
creds = get_credentials(BASE_ROLE_ARN, REGION, ACCOUNT_ID, TOPIC, DURATION)
signed_url = build_signed_url(REGION, HOST, creds, DURATION)
print(signed_url)
get_credentials() ではIoT Coreへフルアクセス権限をもつロールをベースに、STSのセッションポリシーで権限を絞っています。状況に応じて適切なロールを作成・指定して下さい。
重要なのは build_signed_url() 内で ReadOnlyCredentials を初期化する際に、 token パラメータを None にしていることです。STSによる一時認証情報ではセッショントークンも含まれますが、 SigV4QueryAuth() にセッショントークンを渡してしまうと署名に使われてしまい、有効な署名付きURLとして認識されません。
つまり、
-
SigV4QueryAuth()ではセッショントークンを使わずに署名 - 得られたURLに
X-Amz-Security-Tokenパラメータを追加
という手順を行っています。
SigV4QueryAuth() の仕様上、強制的に X-Amz-Expires が追加されますが、特に問題はないようです(IoT Device SDKで求めたURL場合、X-Amz-Expiresは含まれません)。
求められたURLは以下のようになります(見やすいように改行を入れてあります。実際は一行です)。
wss://xxxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com/mqtt?
X-Amz-Algorithm=AWS4-HMAC-SHA256&
X-Amz-Credential=AEXAMPLEACCESSIDXXXX%2F20251212%2Fap-northeast-1%2Fiotdevicegateway%2Faws4_request&
X-Amz-Date=20251212T082341Z&
X-Amz-Expires=900&
X-Amz-SignedHeaders=host&
X-Amz-Signature=<SIGNATURE LOWER HEX>&
X-Amz-Security-Token=<URLENCODED SESSION TOKEN>
このURLを使えば、MQTT.jsなどの汎用MQTTクライアントからAWS IoT Coreに接続することができます。なお、一時認証情報を使っているので、有効期限が切れる前に新たにURLを取得して接続し直す、などの処理が必要になります。
botocoreに頼らない実装
Signature V4を自分で実装することになりますが、botocoreに頼らず実装することも可能です。手順自体はそれほど難しいものではなく、この場合は X-Amz-Expires を含まない署名済みURLを出力します。
#!/usr/bin/env python3
# 署名済みURLを作成する
# クライアントに一時認証情報ではなく署名済みwssを渡すため
# 2025-12-12
import json, hmac
from uuid import uuid4
from hashlib import sha256
from urllib.parse import urlencode
from datetime import datetime, timezone
import boto3
class MQTT5SignedURL:
SERVICE_NAME = "iotdevicegateway"
def __init__(self, region, host, creds):
self.dt = datetime.now(tz=timezone.utc)
self.region = region
self.host = host
self.access_key_id = creds["accessKeyId"]
self.secret_access_key = creds["secretAccessKey"]
self.session_token = creds["sessionToken"]
@property
def datetimestr(self):
return self.dt.strftime("%Y%m%dT%H%M%SZ")
@property
def datestr(self):
return self.dt.strftime("%Y%m%d")
@property
def query_params(self):
return {
"X-Amz-Algorithm": "AWS4-HMAC-SHA256",
"X-Amz-Credential": f"{self.access_key_id}/{self.datestr}/{self.region}/{self.SERVICE_NAME}/aws4_request",
"X-Amz-Date": self.datetimestr,
"X-Amz-SignedHeaders": "host",
}
def get_signature_raw(self):
canonical_headers = [
f"host:{self.host}",
]
payload = b""
canonical_request = [
"GET", # HTTP Method
"/mqtt", # URI絶対パス
urlencode(self.query_params), # QueryString
"\n".join(canonical_headers) + "\n", # Canonical Headers(末尾は\n)
"host", # Signed Headers
sha256(payload).hexdigest(), # e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
]
return "\n".join([
"AWS4-HMAC-SHA256",
self.datetimestr,
f"{self.datestr}/{self.region}/{self.SERVICE_NAME}/aws4_request",
sha256("\n".join(canonical_request).encode()).hexdigest(),
]).encode()
def get_signing_key(self):
# SigningKeyを求める
date_key = hmac.new(("AWS4" + self.secret_access_key).encode(), self.datestr.encode(), sha256).digest()
date_region_key = hmac.new(date_key, self.region.encode(), sha256).digest()
date_region_service_key = hmac.new(date_region_key, self.SERVICE_NAME.encode(), sha256).digest()
signing_key = hmac.new(date_region_service_key, b"aws4_request", sha256).digest()
return signing_key
def get_signed_url(self):
signature_raw = self.get_signature_raw()
signing_key = self.get_signing_key()
signature = hmac.new(signing_key, signature_raw, sha256).hexdigest()
# 署名済みURLを作成する
q = self.query_params.copy()
q["X-Amz-Signature"] = signature
q["X-Amz-Security-Token"] = self.session_token
return f"wss://{self.host}/mqtt?{urlencode(q)}"
def get_credentials(base_role_arn, region, account_id, topic, duration_seconds):
# STSの一時認証情報を得る(省略; 上と同じ)
...
def get_websocket_url(region, host, creds):
# WebSocketの署名付きURLを得る
signer = MQTT5SignedURL(region, host, creds)
return signer.get_signed_url()
if __name__ == "__main__":
HOST = "xxxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
REGION = "ap-northeast-1"
ACCOUNT_ID = "123456789012"
BASE_ROLE_ARN = f"arn:aws:iam::{ACCOUNT_ID}:role/iot-core"
TOPIC = "test/topic"
DURATION_SECONDS = 900
creds = get_credentials(BASE_ROLE_ARN, REGION, ACCOUNT_ID, TOPIC, DURATION_SECONDS)
signed_url = get_websocket_url(REGION, HOST, creds)
print(signed_url)
まとめ
サーバー側から、
- 一時認証情報そのものを渡す方法
- 署名付きWebSocket URLを渡す方法
のどちらがよいかは状況によりますが、MQTT.jsなどの汎用MQTTクライアントを使える点で、署名付きURLを使う方式にメリットがあるかもしれません。ただし、AWS IoT CoreはMQTT5に完全準拠しているわけではないので、必要な仕様を確認するなど、気を付ける必要がありそうです。