SNSのJSONのパラメータからSignature Version 1で定義された手順でパラメータをカノニカライズして文字列(signin input)を作ります。
NOTIFICATION_SIGNING_INPUT_KEY = [
"Message",
"MessageId",
"Subject",
"SubscribeURL",
"Timestamp",
"Token",
"TopicArn",
"Type",
]
NOTIFICATION_SIGNING_INPUT = lambda jobj:\
"".join([
"%s\n%s\n" % (k, jobj.get(k))
for k in NOTIFICATION_SIGNING_INPUT_KEY
if k in jobj
])
SNS サービスハンドラに検証メソッドを実装します:
import json
import requests
def verify_notification(self, notification):
# Notification モデル message にSNSのJSONはいっています
jobj = json.loads(notification.message)
sinput = NOTIFICATION_SIGNING_INPUT(jobj)
if not self.service.public_key:
# Service オブジェクトがPEMを持っていなかったら取得します
# 厳密にはSSL Certificate をチェックします
res = requests.get(jobj['SigningCertURL'])
self.service.public_key = res.text
self.service.save()
# 証明書、singing input , シグネチャーを pycryptoで署名検証します
return verify_pycrypto(
self.service.public_key, sinput, jobj['Signature'])
検証ルーチン:
from Crypto.Signature import PKCS1_v1_5
from Crypto.Hash import SHA
from base64 import standard_b64decode
def verify_pycrypto(pem, signing_input, b64signature):
pub = import_pubkey_from_x509(pem) # パブリックキーを抜く(下)
verifier = PKCS1_v1_5.new(pub) # PKCS 1.5 パディングサイナー
sig = standard_b64decode(b64signature)
if isinstance(signing_input, unicode):
signing_input = signing_input.encode('utf8')
dig = SHA.new(signing_input)
return verifier.verify(dig, sig)
指定されたPEMの証明書からパブリックキーを抜きます。なぜかRSA.importKey(pem) で書式エラーするので:
from Crypto.PublicKey import RSA
from Crypto.Util.asn1 import DerSequence
def import_pubkey_from_x509(pem):
b64der = ''.join(pem.split('\n')[1:][:-2])
cert = DerSequence()
cert.decode(b64decode(b64der))
tbs_certificate = DerSequence()
tbs_certificate.decode(cert[0])
subject_public_key_info = tbs_certificate[6]
return RSA.importKey(subject_public_key_info)
ちなみに、 pyOpenSSLバージョン:
from OpenSSL.crypto import verify
from OpenSSL import crypto
def verify_openssl(pem, signing_input, b64signature):
if isinstance(signing_input, unicode):
signing_input = signing_input.encode('utf8')
cert = crypto.load_certificate(crypto.FILETYPE_PEM, pem)
sig = b64decode(b64signature)
return verify(cert, sig, signing_input, 'sha1') is None