0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

keycloackを使ってS3へファイルをアップロードしたときの備忘録

Last updated at Posted at 2024-09-29

はじめ

物ができてしまうと興味が次へ移ってしまうのが悪い癖。
これまで2個記事を書いてきたわけだけども、目的を果たしたため記事を〆ずに次に移ってしまった。

ちょっとこれまでの記事がごちゃごちゃしていたので、削除と整理をした。

前提条件

以下の記事で、Dockerを用いてKeycloakを立ち上げSMALでAWSコンソールに入れるところまで行ってる。

プログラムはpython(python 3.12.1)を用いる。
使用しているパッケージは以下のとおり

pip.list
Package            Version
------------------ -----------
beautifulsoup4     4.12.3
boto3              1.35.29
botocore           1.35.29
certifi            2024.8.30
charset-normalizer 3.3.2
idna               3.10
jmespath           1.0.1
pip                24.2
python-dateutil    2.9.0.post0
requests           2.32.3
s3transfer         0.10.2
six                1.16.0
soupsieve          2.6
urllib3            2.2.3

pip installしたのはrequestsとboto3とbeautifulsoap4くらいだったはず。

プログラム

pythonで作ったプログラムは以下のとおり。
といっても某サンプルプログラムを元にほとんどチャットGPTさんに作ってもらいました。
いい時代です。

mai.py
import requests
import boto3
from botocore.exceptions import ClientError
from botocore.exceptions import NoCredentialsError
from bs4 import BeautifulSoup
from urllib.parse import unquote
import os
import datetime
import re

# SSL認証を無視する設定 (必要に応じて)
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

def get_saml_assertion(sso_url, idp_user, idp_pass):
    try:
        print('++++++ samlアサーションを取得 ++++++')

        # セッションを使用してクッキーを管理
        session = requests.Session()

        # 初期リクエスト (GET)
        # verify=False は SSL証明書を無視するためのもの
        response = session.get(sso_url, verify=False)  

        # BeautifulSoup で HTML をパース
        soup = BeautifulSoup(response.text, 'html.parser')

        # フォームのURLを取得
        form = soup.find('form', {'id': 'kc-form-login'})
        post_url = unquote(form['action'])

        # フォームデータの準備
        form_data = {}
        for input_tag in soup.find_all('input'):
            name = input_tag.get('name')
            if name == 'username':
                form_data[name] = idp_user
            elif name == 'password':
                form_data[name] = idp_pass
            else:
                form_data[name] = input_tag.get('value', '')

        # フォームを送信 (POST)
        headers = {
            'Content-Type': 'application/x-www-form-urlencoded'
        }
        res = session.post(post_url, data=form_data, headers=headers, verify=False)

        # 再度レスポンスをパースしてSAMLアサーションを取得
        soup = BeautifulSoup(res.text, 'html.parser')
        saml_response = soup.find('input', {'name': 'SAMLResponse'})
        saml_assertion = saml_response['value'] if saml_response else None

        print('samlAssertion:', saml_assertion)
        print('\n')

        return saml_assertion

    except Exception as e:
        print('++++++ エラー ++++++')
        print(e)
        raise e

def get_aws_credentials(saml_assertion, account, saml_provider_name, role_name, region='us-east-1', duration_seconds=900):
    try:
        print('++++++ AWSの一時的な認証情報の取得 ++++++')

        # STSクライアントの作成
        client = boto3.client('sts', region_name=region)

        # AssumeRoleWithSAML APIに渡す入力
        response = client.assume_role_with_saml(
            RoleArn=f'arn:aws:iam::{account}:role/{role_name}',
            PrincipalArn=f'arn:aws:iam::{account}:saml-provider/{saml_provider_name}',
            SAMLAssertion=saml_assertion,
            DurationSeconds=duration_seconds
        )

        # レスポンスからクレデンシャルを取得
        credentials = response['Credentials']
        access_key_id = credentials['AccessKeyId']
        secret_access_key = credentials['SecretAccessKey']
        session_token = credentials['SessionToken']

        #改行は見栄え。
        print("+++++++access_key_id+++++++")
        print(access_key_id , "\n")
        print("+++++++secret_access_key+++++++")
        print(secret_access_key, "\n")
        print("\n")
        print("+++++++session_token+++++++")
        print(session_token, "\n")
        print("\n")

        return {
            'accessKeyId': access_key_id,
            'secretAccessKey': secret_access_key,
            'sessionToken': session_token
        }

    except ClientError as e:
        print('++++++ エラー ++++++')
        print(e)
        raise

# S3クライアントを作成
def create_s3_client(region, endpoint, access_key_id, secret_access_key, session_token):
    return boto3.client(
        's3',
        endpoint_url=endpoint,
        region_name=region,
        aws_access_key_id=access_key_id,
        aws_secret_access_key=secret_access_key,
        aws_session_token=session_token,
    )

# マルチパートアップロード
def s3_multipart_upload(s3_client, s3_bucket_name, source_file_name, region, code, endpoint):
    print('++++++ S3へのUpload実行開始 ++++++')
    try:
        # boto3のS3アップロードマネージャを使用してマルチパートアップロード
        part_size = 5 * 1024 * 1024  # 5MB
        config = boto3.s3.transfer.TransferConfig(multipart_threshold=part_size, multipart_chunksize=part_size)

        if len(code) > 0:
            key = code + "/" + source_file_name
        else:
            key = source_file_name

        with open(source_file_name, 'rb') as data:
            s3_client.upload_fileobj(data, Bucket=s3_bucket_name, Key = key, Config=config)
        
        print(f"\nアップロード完了 ファイル名: {source_file_name}\n\n")
    
    except NoCredentialsError:
        print("認証が有効じゃないエラー")
    except Exception as e:
        print(f"アップロードエラー: {e}")

if __name__ == '__main__':
    # キークロークにSAMLアサーションを取りに行く。
    sso_url = 'キークロークのURL'
    idp_user = 'ユーザ名'
    idp_pass = 'パスワード'
    # SAML アサーション
    saml_assertion = get_saml_assertion(sso_url, idp_user, idp_pass)

    # キークロークからもらったSAMLアサーションを持って、AWSに対して一時的な権限を発行してもらう。
    # アカウントID、SAMLプロバイダ名、ロール名を指定
    account = 'AWSのアカウントID'
    saml_provider_name = 'AWSで設定したSAMLプロバイダー名'
    role_name = 'ロール名'
    # クレデンシャルを取得
    credentials = get_aws_credentials(saml_assertion, account, saml_provider_name, role_name)

    #アップロードする
    s3_bucket_name = 'バケット名'  # バケット名
    source_file_name = 'test.csv'    # ファイル名(拡張子付き)
    region = 'us-east-1'                   # リージョン
    code = 'フォルダがある場合はフォルダ名'                     #フォルダ名※必要な時(code/source_file_nameになる)
    endpoint = 'https://s3.us-east-1.amazonaws.com'    # DXの場合はインタフェース型のエンドポイントを指定

    print('\n\nアップロード処理開始')
    # S3クライアントを作成
    s3_client = create_s3_client(region, endpoint, credentials['accessKeyId'], credentials['secretAccessKey'], credentials['sessionToken'])
    # マルチパートアップロートする。 
    s3_multipart_upload(s3_client, s3_bucket_name, source_file_name, region, code, endpoint)
        
    print('\n\nアップロード処理完了')

プログラムの動き

①keycloakにSAMLアサーションを取得しにいく。

※SAMLアサーションとは
 ユーザーがサインインしたことをサービスプロバイダーに伝えるメッセージのこと。SAMLアサーションには、アサーションのソース、発行時刻、アサーションを有効にする条件を含む、サービスプロバイダーがユーザーIDを確認するのに必要なすべての情報が含まれる。

②SAMLアサーションを持って、AWSに一時的な認証情報をもらう。

 一時的な認証情報には、「accessKeyId」「secretAccessKey」「sessionToken」が含まれている。
 プログラムを実行するとこれらが表示するようにしている。

③一時的な認証情報をもとに、S3へアップロード

・プログラムと同じフォルダ内にアップロードするファイルがある前提で作成している。
・S3のバケット直下ではなくフォルダの中に入れたい場合はcodeにフォルダ名を記載する。直下でよい場合は、「code = ""」 として文字数0とする。
・専用線接続している場合はインターフェース型のエンドポイントをendpointのところで指定する。
・リージョンは必要に応じて設定する。

アップロードに関しては、以上!

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?