0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Flask と Amazon Cognito で実装する認証システム 〜 カスタムLambdaチャレンジとOIDCフロー 〜

Posted at

前段

この記事はGithubCopilotを使用して作成した、動作確認をしたサンプルコードを元にして記事も自動作成しています。

はじめに

Webアプリケーションにおける認証・認可の実装は、セキュリティの観点から非常に重要な課題です。Amazon Cognito は、ユーザー管理や認証機能を簡単に実装できるマネージドサービスですが、より高度な認証フローを実現するためのカスタマイズ方法についてはあまり情報が多くありません。

この記事では、Flask と Amazon Cognito を組み合わせて、カスタムチャレンジ(2要素認証)を含むOIDC認可コードフローを実装する方法を解説します。認証セキュリティを強化したいバックエンド開発者の方に役立つ内容となっています。

このアプリケーションで実現できること

  • ユーザー登録・ログイン機能
  • カスタムチャレンジによる2段階認証
  • OIDC認可コードフロー
  • セキュアなトークン管理

技術スタック

  • バックエンド: Python (Flask)
  • 認証基盤: Amazon Cognito
  • AWS SDK: Boto3

アプリケーションの全体像

このアプリケーションは、以下のコンポーネントで構成されています:

  1. 認証フロー: ユーザー登録、ログイン、カスタムチャレンジ検証
  2. OIDC認可コードフロー: 認可エンドポイント、トークンエンドポイント
  3. Lambda関数: Cognitoのカスタムチャレンジを処理するトリガー

認証フローの流れ

1. ユーザーがログイン情報を入力
2. Cognitoでパスワード認証
3. カスタムチャレンジの生成・提示
4. ユーザーがチャレンジコードを入力
5. チャレンジ検証
6. 認証トークン発行

シーケンス図

実装のポイント

1. Cognitoクライアントの設定

まず、Boto3を使ってCognitoクライアントを初期化します。

import boto3
from botocore.exceptions import ClientError

cognito_client = boto3.client('cognito-idp', region_name=REGION_NAME)

2. シークレットハッシュの計算

Cognitoのクライアントシークレットを使用する場合、認証リクエスト時にシークレットハッシュを計算する必要があります。

def calculate_secret_hash(client_id, client_secret, username):
    message = username + client_id
    dig = hmac.new(str(client_secret).encode('utf-8'),
                   msg=str(message).encode('utf-8'),
                   digestmod=hashlib.sha256).digest()
    return base64.b64encode(dig).decode()

3. 認可エンドポイントの実装

OIDC認可コードフローの開始点となる認可エンドポイントを実装します。

@app.route('/authorize', methods=['GET'])
def authorize():
    client_id = request.args.get('client_id')
    redirect_uri = request.args.get('redirect_uri')
    state = request.args.get('state')

    session['redirect_uri'] = redirect_uri
    session['state'] = state
    session['is_authorization_request'] = True
    return redirect(url_for('sign_in'))

4. ログイン処理とカスタムチャレンジの開始

Cognitoの2段階認証を実現するため、まずパスワード認証をした後、カスタムチャレンジを開始します。

@app.route('/signin', methods=['GET', 'POST'])
def sign_in():
    if request.method == 'POST':
        data = request.form
        try:
            # パスワード認証
            secret_hash = calculate_secret_hash(CLIENT_ID, CLIENT_SECRET, data['username'])
            initiate_response = cognito_client.initiate_auth(
                ClientId=CLIENT_ID,
                AuthFlow='USER_PASSWORD_AUTH',
                AuthParameters={
                    'USERNAME': data['username'],
                    'PASSWORD': data['password'],
                    'SECRET_HASH': secret_hash
                },
            )
            
            # カスタムチャレンジの開始
            custom_response = cognito_client.initiate_auth(
                ClientId=CLIENT_ID,
                AuthFlow='CUSTOM_AUTH',
                AuthParameters={
                    'USERNAME': data['username'],
                    'SECRET_HASH': secret_hash
                },
            )
            
            # セッションを保存
            session_token = custom_response.get('Session')
            if not session_token:
                return render_template('signin.html', error="Custom challenge session not found")
                
            session['session_token'] = session_token
            session['username'] = data['username']
            
            # 検証画面にリダイレクト
            return redirect(url_for('verify'))
            
        except ClientError as e:
            print(e)
            return render_template('signin.html', error="An error occurred. Please try again.")
    return render_template('signin.html')

5. チャレンジ検証とトークン発行

ユーザーが入力したチャレンジコードを検証し、認証が成功したらトークンを発行します。

@app.route('/verify', methods=['GET', 'POST'])
def verify():
    if request.method == 'POST':
        data = request.form
        try:
            challenge_response = data['challenge_response']
            session_token = session.get('session_token')
            username = session.get('username')

            if not session_token or not username:
                return render_template('verify.html', error="Session expired. Please try again.")

            # チャレンジ応答を送信
            response = respond_to_challenge(
                CLIENT_ID,
                session_token,
                username,
                challenge_response
            )

            # チャレンジ結果を確認
            if 'AuthenticationResult' not in response:
                return render_template('verify.html', error="Invalid challenge response. Please try again.")

            # 認可コードリクエストの場合
            if session.pop('is_authorization_request', False):
                authorization_code = str(uuid.uuid4())
                authorization_codes[authorization_code] = {
                    'username': username,
                    'id_token': response['AuthenticationResult']['IdToken'],
                    'access_token': response['AuthenticationResult']['AccessToken'],
                    'refresh_token': response['AuthenticationResult']['RefreshToken']
                }

                redirect_uri = session.pop('redirect_uri', '/')
                state = session.pop('state', '')
                return redirect(f"{redirect_uri}?code={authorization_code}&state={state}")

            # 通常のログイン処理
            session['username'] = username
            return redirect(url_for('home'))

        except ClientError as e:
            print(e)
            return render_template('verify.html', error="An error occurred. Please try again.")
    return render_template('verify.html')

6. チャレンジ応答処理

Cognitoのカスタムチャレンジに対して応答を送信します。

def respond_to_challenge(client_id, session, username, challenge_response):
    secret_hash = calculate_secret_hash(CLIENT_ID, CLIENT_SECRET, username)
    response = cognito_client.respond_to_auth_challenge(
        ClientId=client_id,
        ChallengeName='CUSTOM_CHALLENGE',
        Session=session,
        ChallengeResponses={
            'USERNAME': username,
            'ANSWER': challenge_response,
            'SECRET_HASH': secret_hash
        }
    )
    return response

7. トークンエンドポイントの実装

認可コードを使ってトークンを発行するエンドポイントを実装します。

@app.route('/token', methods=['POST'])
def token():
    code = request.form.get('code')
    client_id = request.form.get('client_id')
    client_secret = request.form.get('client_secret')

    if client_id != CLIENT_ID or client_secret != CLIENT_SECRET:
        return jsonify({'error': 'Invalid client credentials'}), 400

    if code not in authorization_codes:
        return jsonify({'error': 'Invalid authorization code'}), 400

    tokens = authorization_codes.pop(code)
    return jsonify({
        'id_token': tokens['id_token'],
        'access_token': tokens['access_token'],
        'refresh_token': tokens['refresh_token'],
        'token_type': 'Bearer',
        'expires_in': 3600
    })

以下に、Lambda関数の実装部分のみを修正した内容を示します。

カスタムチャレンジを処理するLambda関数

Cognitoのカスタムチャレンジを処理するためには、3つのLambda関数を実装する必要があります。ここでは、それぞれの関数の役割と実装例を示します。

1. DefineAuthChallenge Lambda関数

認証フローの各ステップを定義します。

import json
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    logger.info(f"Received event: {json.dumps(event)}")

    trigger_source = event['triggerSource']
    
    if trigger_source == "DefineAuthChallenge_Authentication":
        define_auth_challenge(event)

    logger.info(f"Response event: {json.dumps(event)}")
    return event

def define_auth_challenge(event):
    """
    Determines the next step in the authentication process.
    """
    # 新規セッションまたは前回のチャレンジが失敗した場合は新しいチャレンジを発行
    if not event['request']['session']:
        # 最初のチャレンジを発行
        event['response']['issueTokens'] = False
        event['response']['failAuthentication'] = False
        event['response']['challengeName'] = 'CUSTOM_CHALLENGE'
    elif event['request']['session'][-1]['challengeResult'] == True:
        # 前回のチャレンジが成功した場合、トークンを発行
        event['response']['issueTokens'] = True
        event['response']['failAuthentication'] = False
    else:
        # チャレンジの試行回数をカウント
        challenge_attempts = len([s for s in event['request']['session'] if s['challengeName'] == 'CUSTOM_CHALLENGE'])
        
        if challenge_attempts >= 3:
            # 3回失敗したら認証失敗
            event['response']['issueTokens'] = False
            event['response']['failAuthentication'] = True
        else:
            # 再度チャレンジを発行
            event['response']['issueTokens'] = False
            event['response']['failAuthentication'] = False
            event['response']['challengeName'] = 'CUSTOM_CHALLENGE'

2. CreateAuthChallenge Lambda関数

チャレンジの内容を生成します。

import json
import logging
import random

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    logger.info(f"Received event: {json.dumps(event)}")

    trigger_source = event['triggerSource']
    
    if trigger_source == "CreateAuthChallenge_Authentication":
        create_auth_challenge(event)

    logger.info(f"Response event: {json.dumps(event)}")
    return event

def create_auth_challenge(event):
    """
    Creates a custom challenge for the user.
    """
    # 6桁のランダムコードを生成
    challenge_code = str(random.randint(100000, 999999))
    logger.info(f"Generated challenge code: {challenge_code} for user: {event['userName']}")
    
    # パブリックパラメータはユーザーに表示される
    event['response']['publicChallengeParameters'] = {
        'challenge': 'Please enter the 6-digit verification code:',
    }
    
    # プライベートパラメータはバックエンドでのみ使用される
    event['response']['privateChallengeParameters'] = {
        'answer': challenge_code
    }
    
    # メタデータはチャレンジタイプを示す
    event['response']['challengeMetadata'] = 'CUSTOM_CHALLENGE'
    
    # 実際のシステムでは、ここでSMSやメールでコードを送信する処理を追加
    # 例: send_verification_code(event['userName'], challenge_code)

3. VerifyAuthChallengeResponse Lambda関数

ユーザーからの応答を検証します。

import json
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    logger.info(f"Received event: {json.dumps(event)}")

    trigger_source = event['triggerSource']
    
    if trigger_source == "VerifyAuthChallengeResponse_Authentication":
        verify_auth_challenge_response(event)

    logger.info(f"Response event: {json.dumps(event)}")
    return event

def verify_auth_challenge_response(event):
    """
    Verifies the user's response to the custom challenge.
    """
    # 期待される回答
    expected_answer = event['request']['privateChallengeParameters']['answer']
    # ユーザーが入力した回答
    user_answer = event['request']['challengeAnswer']
    
    # WebAuthn認証のサポート
    # 特別なフラグを確認して、パスキー認証が成功したケースを処理
    if user_answer == 'WEBAUTHN_SUCCESS':
        logger.info(f"WebAuthn authentication successful for user: {event['userName']}")
        event['response']['answerCorrect'] = True
        return
    
    # 通常のMFA検証
    if user_answer == expected_answer:
        # 正しい回答
        logger.info(f"Challenge answered correctly for user: {event['userName']}")
        event['response']['answerCorrect'] = True
    else:
        # 不正解の場合
        logger.info(f"Challenge answered incorrectly for user: {event['userName']}")
        event['response']['answerCorrect'] = False

これら3つのLambda関数を実装し、Cognitoのユーザープールにトリガーとして設定することで、カスタムチャレンジによる2段階認証を実現できます。さらに、WebAuthn(パスキー認証)にも対応できるように拡張しています。

実際のシステムでは、セキュリティを強化するために以下の点も考慮してください:

  1. チャレンジコードに有効期限を設ける
  2. レート制限を設けて総当たり攻撃を防ぐ
  3. 失敗回数に応じて一時的にアカウントをロックする

また、ログの出力には機密情報を含めないように注意し、実運用環境では適切なログレベルに調整することをお勧めします。

Amazon Cognito の設定

1. ユーザープールの作成

  1. AWS マネジメントコンソールで Cognito サービスを開きます
  2. 「ユーザープールの作成」をクリックします
  3. 必要な設定を行い、ユーザープールを作成します

2. アプリクライアントの設定

  1. ユーザープール内で「アプリの統合」タブを選択します
  2. 「アプリクライアントの追加」をクリックします
  3. 以下のような設定を行います:
    • クライアントタイプ: 「機密クライアント」
    • 認証フロー: 「ALLOW_CUSTOM_AUTH」を有効化
    • クライアントシークレット: 生成する

3. Lambdaトリガーの設定

  1. Lambda関数を3つ作成します:
    • DefineAuthChallenge
    • CreateAuthChallenge
    • VerifyAuthChallengeResponse
  2. ユーザープールの「Lambdaトリガー」セクションでこれらの関数を設定します

運用上の注意点

セキュリティ対策

  1. HTTPS の使用: 本番環境では必ずHTTPSを使用してください
  2. 認可コードの管理: 認可コードは一度だけ使用可能とし、有効期限を短く設定してください
  3. セッション管理: Flaskのセッションを適切に管理し、タイムアウト設定を行ってください

スケーラビリティ

認可コードは現在メモリ内に保存していますが、本番環境では Redis や DynamoDB などを使用して永続化することをお勧めします。

# 認可コードを永続化する例(Redis使用時)
def store_authorization_code(code, data):
    redis_client.setex(f"auth_code:{code}", 600, json.dumps(data))  # 10分間有効

def get_authorization_code(code):
    data = redis_client.get(f"auth_code:{code}")
    if data:
        redis_client.delete(f"auth_code:{code}")  # 一度だけ使用可能
        return json.loads(data)
    return None

カスタムチャレンジの拡張

今回は単純な数字コードによる検証を実装しましたが、以下のように拡張することも可能です:

  1. SMSによる検証コード送信: SNSと連携して携帯電話にコードを送信
  2. メール認証: SESを使用してメールでコードを送信
  3. TOTP: Time-based One-Time Password による認証
  4. パスキー認証: FIDO2/WebAuthnによる生体認証との連携

まとめ

FlaskとAmazon Cognitoを組み合わせることで、セキュアで柔軟なOIDC認証システムを実装できます。カスタムチャレンジ機能を活用することで、多要素認証のような高度なセキュリティ機能も比較的容易に実現できることがわかりました。

このアプリケーションは、自社サービスでの認証基盤として利用したり、SSOプロバイダーとしての機能も果たせるため、様々なシステム構成で活用できます。

今回のコードはプロトタイプとして利用できますが、本番環境では適切なエラーハンドリングやロギング、セキュリティ対策をさらに強化することをお忘れなく。

参考資料


サンプルコードを実際に動かしながら、自社サービスに合わせてカスタマイズしてみてください。コメントやフィードバックがあれば、ぜひお聞かせください!

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?