3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Pythonを使ったDigest認証の設定@Lambda

Last updated at Posted at 2020-03-17

はじめに

Digest認証について検索してもほとんど情報がなかったので Pythonを使ったBasic認証の設定@Lambda の続編として書いてみた。
Digest認証自体ほとんど触ったことがなく、その仕組みを勉強するのも兼ねて。

コード

import os
import ctypes
import json
import base64
import time
import hashlib
import copy
from Crypto.Cipher import AES

accounts = [
    {
        "user": "user1",
        "pass": "pass1"
    },
    {
        "user": "user2",
        "pass": "pass2"
    }
    ]

realm = "sample@test.com"
qop = "auth"
# Basic認証と違って、認証後のタイムアウトを設定できるので入れてみた
timeout = 30 * (10 ** 9) # 30 seconds
# AES暗号化で使うための情報の準備
raw_key = "password1234567890"
raw_iv = "12345678"
key = hashlib.sha256(raw_key.encode()).digest()
iv = hashlib.md5(raw_iv.encode()).digest()

def lambda_handler(event, context):
    request = event.get("Records")[0].get("cf").get("request")
    
    if not check_authorization_header(request):
        return {
            'headers': {
                'www-authenticate': [
                    {
                        'key': 'WWW-Authenticate',
                        'value': create_digest_header()
                    }
                ]
            },
            'status': 401,
            'body': 'Unauthorized'
        }
            
        
    return request

def check_authorization_header(request: dict) -> bool:
    headers = request.get("headers")
    authorization_header = headers.get("authorization")
    
    if not authorization_header:
        return False
    
    data = {
        "method": request.get("method"),
        "uri": request.get("uri")
    }
    header_value = authorization_header[0].get("value")
    # Digest認証のデータは、「Digest 〜」と言う形式で来るので、まずは不要な部分を削除する
    header_value = header_value[len("Digest "):]
    
    # 各値がカンマで区切られてるので、分割する
    values = header_value.split(",")
    data = {
        "method": request.get("method"),
        "uri": request.get("uri")
    }
    # 各値を扱いやすいようにまたまた分割
    for v in values:
        # nonceをBase64エンコードしているので、単純に`=`で分割するとおかしくなるので、このような対応をしている
        idx = v.find("=")
        vv = [v[0:idx], v[idx+1:]]
        # 前後に半角スペースが入るので削除する
        vv[0] = vv[0].strip()
        vv[1] = vv[1].strip()
        # 値によってはダブルクォーテーションで括られているので、削除する
        if vv[1].startswith("\""):
            vv[1] = vv[1][1:]
        if vv[1].endswith("\""):
            vv[1] = vv[1][:len(vv[1])-1]
            
        data[vv[0]] = vv[1]

    for account in accounts:
        if account.get("user") != data.get("username"):
            continue
        
        d = copy.deepcopy(data)
        d["user"] = account.get("user")
        d["pass"] = account.get("pass")
        
        encoded_value = create_validation_data(d)

        if d.get("response") == encoded_value:
            if check_timeout(data.get("nonce")):
                return True

    return False

def check_timeout(nonce: str) -> bool:
    aes = AES.new(key, AES.MODE_CBC, iv)
    value = aes.decrypt(base64.b64decode(nonce.encode())).decode()
    # AESで暗号化する時にpaddingで`_`を追加しているので、その分を削除する
    while value.endswith("_"):
        value = value[:len(value)-1]

    return int(value) + timeout > time.time_ns()
    
def create_validation_data(data: dict) -> str:
    v1 = "{}:{}:{}".format(data.get("user"), realm, data.get("pass"))
    vv1 = hashlib.md5(v1.encode()).hexdigest()
    v2 = "{}:{}".format(data.get("method"), data.get("uri"))
    vv2 = hashlib.md5(v2.encode()).hexdigest()
    
    v3 = "{}:{}:{}:{}:{}:{}".format(vv1, data.get("nonce"), data.get("nc"), data.get("cnonce"), qop, vv2)

    return hashlib.md5(v3.encode()).hexdigest()

def create_digest_header() -> str:
    aes = AES.new(key, AES.MODE_CBC, iv)
    timestamp = "{}".format(time.time_ns()).encode()
    # 暗号化する時に長さが16の倍数じゃないとダメなので、paddingで詰めている
    while len(timestamp) % 16 != 0:
        timestamp += "_".encode()
        
    header = "Digest "
    values = {
        "realm": '"' + realm + '"',
        "qop": '"auth,auth-int"',
        "algorithm": 'MD5',
        "nonce": '"' + base64.b64encode(aes.encrypt(timestamp)).decode() + '"'
    }
    
    idx = 0
    for k, v in values.items():
        if idx != 0:
            header += ","
        header += '{}={}'.format(k, v)
        idx += 1
        
    return header

動かすための準備

LambdaやCloudFrontの設定はBasic認証の時と同じなので、記述することはない。
ただ、AESの暗号化するライブラリが pip でインストールする必要があるものなので、ちょっとだけ対応が必要。

ライブラリをzipファイルにする

Lambda上で pip を実行することができないので、ローカルPCなどで pip install したものを zip ファイル化してアップする必要がある。

この際気をつけないといけないのが、AWS LambdaのOSは、Amazon Linuxだと言うこと。
「あ〜単純にzipしたら良いのね」ってローカルのMacでzipを作成しても動かないので注意。

Amazon LinuxのEC2を作成してzipファイルを作成しても良いけど、高々zipファイル作成するだけなので、Dockerで十分。
と言うことで、Dockerを使って作成。

# Amazon Linux2のイメージをpullして起動
$ docker run -it amazonlinux:2 bash
# Dockerイメージ上で必要なパッケージをインストール
$ yum install -y gcc python3 pip3 python3-devel.x86_64
# Lambda上で使うパッケージをインストール
$ pip install pycrypto -t ./
# zipファイル作成
$ zip -r pycrypto.zip Crypto/

lambda_handlerの作成

zipファイルをアップロードすると、lambda_function.pyはなくなっているので改めて lambda_function.pyを作成し、lambda_handlerを記述する

その他参考にしたサイト

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?