はじめに
Digest認証について検索してもほとんど情報がなかったので Pythonを使ったBasic認証の設定@Lambda の続編として書いてみた。
Digest認証自体ほとんど触ったことがなく、その仕組みを勉強するのも兼ねて。
コード
import os
import ctypes
import json
import base64
import time
import hashlib
import copy
from Crypto.Cipher import AES
accounts = [
{
"user": "user1",
"pass": "pass1"
},
{
"user": "user2",
"pass": "pass2"
}
]
realm = "sample@test.com"
qop = "auth"
# Basic認証と違って、認証後のタイムアウトを設定できるので入れてみた
timeout = 30 * (10 ** 9) # 30 seconds
# AES暗号化で使うための情報の準備
raw_key = "password1234567890"
raw_iv = "12345678"
key = hashlib.sha256(raw_key.encode()).digest()
iv = hashlib.md5(raw_iv.encode()).digest()
def lambda_handler(event, context):
request = event.get("Records")[0].get("cf").get("request")
if not check_authorization_header(request):
return {
'headers': {
'www-authenticate': [
{
'key': 'WWW-Authenticate',
'value': create_digest_header()
}
]
},
'status': 401,
'body': 'Unauthorized'
}
return request
def check_authorization_header(request: dict) -> bool:
headers = request.get("headers")
authorization_header = headers.get("authorization")
if not authorization_header:
return False
data = {
"method": request.get("method"),
"uri": request.get("uri")
}
header_value = authorization_header[0].get("value")
# Digest認証のデータは、「Digest 〜」と言う形式で来るので、まずは不要な部分を削除する
header_value = header_value[len("Digest "):]
# 各値がカンマで区切られてるので、分割する
values = header_value.split(",")
data = {
"method": request.get("method"),
"uri": request.get("uri")
}
# 各値を扱いやすいようにまたまた分割
for v in values:
# nonceをBase64エンコードしているので、単純に`=`で分割するとおかしくなるので、このような対応をしている
idx = v.find("=")
vv = [v[0:idx], v[idx+1:]]
# 前後に半角スペースが入るので削除する
vv[0] = vv[0].strip()
vv[1] = vv[1].strip()
# 値によってはダブルクォーテーションで括られているので、削除する
if vv[1].startswith("\""):
vv[1] = vv[1][1:]
if vv[1].endswith("\""):
vv[1] = vv[1][:len(vv[1])-1]
data[vv[0]] = vv[1]
for account in accounts:
if account.get("user") != data.get("username"):
continue
d = copy.deepcopy(data)
d["user"] = account.get("user")
d["pass"] = account.get("pass")
encoded_value = create_validation_data(d)
if d.get("response") == encoded_value:
if check_timeout(data.get("nonce")):
return True
return False
def check_timeout(nonce: str) -> bool:
aes = AES.new(key, AES.MODE_CBC, iv)
value = aes.decrypt(base64.b64decode(nonce.encode())).decode()
# AESで暗号化する時にpaddingで`_`を追加しているので、その分を削除する
while value.endswith("_"):
value = value[:len(value)-1]
return int(value) + timeout > time.time_ns()
def create_validation_data(data: dict) -> str:
v1 = "{}:{}:{}".format(data.get("user"), realm, data.get("pass"))
vv1 = hashlib.md5(v1.encode()).hexdigest()
v2 = "{}:{}".format(data.get("method"), data.get("uri"))
vv2 = hashlib.md5(v2.encode()).hexdigest()
v3 = "{}:{}:{}:{}:{}:{}".format(vv1, data.get("nonce"), data.get("nc"), data.get("cnonce"), qop, vv2)
return hashlib.md5(v3.encode()).hexdigest()
def create_digest_header() -> str:
aes = AES.new(key, AES.MODE_CBC, iv)
timestamp = "{}".format(time.time_ns()).encode()
# 暗号化する時に長さが16の倍数じゃないとダメなので、paddingで詰めている
while len(timestamp) % 16 != 0:
timestamp += "_".encode()
header = "Digest "
values = {
"realm": '"' + realm + '"',
"qop": '"auth,auth-int"',
"algorithm": 'MD5',
"nonce": '"' + base64.b64encode(aes.encrypt(timestamp)).decode() + '"'
}
idx = 0
for k, v in values.items():
if idx != 0:
header += ","
header += '{}={}'.format(k, v)
idx += 1
return header
動かすための準備
LambdaやCloudFrontの設定はBasic認証の時と同じなので、記述することはない。
ただ、AESの暗号化するライブラリが pip
でインストールする必要があるものなので、ちょっとだけ対応が必要。
ライブラリをzipファイルにする
Lambda上で pip
を実行することができないので、ローカルPCなどで pip install
したものを zip ファイル化してアップする必要がある。
この際気をつけないといけないのが、AWS LambdaのOSは、Amazon Linuxだと言うこと。
「あ〜単純にzipしたら良いのね」ってローカルのMacでzipを作成しても動かないので注意。
Amazon LinuxのEC2を作成してzipファイルを作成しても良いけど、高々zipファイル作成するだけなので、Dockerで十分。
と言うことで、Dockerを使って作成。
# Amazon Linux2のイメージをpullして起動
$ docker run -it amazonlinux:2 bash
# Dockerイメージ上で必要なパッケージをインストール
$ yum install -y gcc python3 pip3 python3-devel.x86_64
# Lambda上で使うパッケージをインストール
$ pip install pycrypto -t ./
# zipファイル作成
$ zip -r pycrypto.zip Crypto/
lambda_handlerの作成
zipファイルをアップロードすると、lambda_function.py
はなくなっているので改めて lambda_function.py
を作成し、lambda_handler
を記述する