はじめ
物ができてしまうと興味が次へ移ってしまうのが悪い癖。
これまで2個記事を書いてきたわけだけども、目的を果たしたため記事を〆ずに次に移ってしまった。
ちょっとこれまでの記事がごちゃごちゃしていたので、削除と整理をした。
前提条件
以下の記事で、Dockerを用いてKeycloakを立ち上げSMALでAWSコンソールに入れるところまで行ってる。
プログラムはpython(python 3.12.1)を用いる。
使用しているパッケージは以下のとおり
Package Version
------------------ -----------
beautifulsoup4 4.12.3
boto3 1.35.29
botocore 1.35.29
certifi 2024.8.30
charset-normalizer 3.3.2
idna 3.10
jmespath 1.0.1
pip 24.2
python-dateutil 2.9.0.post0
requests 2.32.3
s3transfer 0.10.2
six 1.16.0
soupsieve 2.6
urllib3 2.2.3
pip installしたのはrequestsとboto3とbeautifulsoap4くらいだったはず。
プログラム
pythonで作ったプログラムは以下のとおり。
といっても某サンプルプログラムを元にほとんどチャットGPTさんに作ってもらいました。
いい時代です。
import requests
import boto3
from botocore.exceptions import ClientError
from botocore.exceptions import NoCredentialsError
from bs4 import BeautifulSoup
from urllib.parse import unquote
import os
import datetime
import re
# SSL認証を無視する設定 (必要に応じて)
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def get_saml_assertion(sso_url, idp_user, idp_pass):
try:
print('++++++ samlアサーションを取得 ++++++')
# セッションを使用してクッキーを管理
session = requests.Session()
# 初期リクエスト (GET)
# verify=False は SSL証明書を無視するためのもの
response = session.get(sso_url, verify=False)
# BeautifulSoup で HTML をパース
soup = BeautifulSoup(response.text, 'html.parser')
# フォームのURLを取得
form = soup.find('form', {'id': 'kc-form-login'})
post_url = unquote(form['action'])
# フォームデータの準備
form_data = {}
for input_tag in soup.find_all('input'):
name = input_tag.get('name')
if name == 'username':
form_data[name] = idp_user
elif name == 'password':
form_data[name] = idp_pass
else:
form_data[name] = input_tag.get('value', '')
# フォームを送信 (POST)
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
res = session.post(post_url, data=form_data, headers=headers, verify=False)
# 再度レスポンスをパースしてSAMLアサーションを取得
soup = BeautifulSoup(res.text, 'html.parser')
saml_response = soup.find('input', {'name': 'SAMLResponse'})
saml_assertion = saml_response['value'] if saml_response else None
print('samlAssertion:', saml_assertion)
print('\n')
return saml_assertion
except Exception as e:
print('++++++ エラー ++++++')
print(e)
raise e
def get_aws_credentials(saml_assertion, account, saml_provider_name, role_name, region='us-east-1', duration_seconds=900):
try:
print('++++++ AWSの一時的な認証情報の取得 ++++++')
# STSクライアントの作成
client = boto3.client('sts', region_name=region)
# AssumeRoleWithSAML APIに渡す入力
response = client.assume_role_with_saml(
RoleArn=f'arn:aws:iam::{account}:role/{role_name}',
PrincipalArn=f'arn:aws:iam::{account}:saml-provider/{saml_provider_name}',
SAMLAssertion=saml_assertion,
DurationSeconds=duration_seconds
)
# レスポンスからクレデンシャルを取得
credentials = response['Credentials']
access_key_id = credentials['AccessKeyId']
secret_access_key = credentials['SecretAccessKey']
session_token = credentials['SessionToken']
#改行は見栄え。
print("+++++++access_key_id+++++++")
print(access_key_id , "\n")
print("+++++++secret_access_key+++++++")
print(secret_access_key, "\n")
print("\n")
print("+++++++session_token+++++++")
print(session_token, "\n")
print("\n")
return {
'accessKeyId': access_key_id,
'secretAccessKey': secret_access_key,
'sessionToken': session_token
}
except ClientError as e:
print('++++++ エラー ++++++')
print(e)
raise
# S3クライアントを作成
def create_s3_client(region, endpoint, access_key_id, secret_access_key, session_token):
return boto3.client(
's3',
endpoint_url=endpoint,
region_name=region,
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
aws_session_token=session_token,
)
# マルチパートアップロード
def s3_multipart_upload(s3_client, s3_bucket_name, source_file_name, region, code, endpoint):
print('++++++ S3へのUpload実行開始 ++++++')
try:
# boto3のS3アップロードマネージャを使用してマルチパートアップロード
part_size = 5 * 1024 * 1024 # 5MB
config = boto3.s3.transfer.TransferConfig(multipart_threshold=part_size, multipart_chunksize=part_size)
if len(code) > 0:
key = code + "/" + source_file_name
else:
key = source_file_name
with open(source_file_name, 'rb') as data:
s3_client.upload_fileobj(data, Bucket=s3_bucket_name, Key = key, Config=config)
print(f"\nアップロード完了 ファイル名: {source_file_name}\n\n")
except NoCredentialsError:
print("認証が有効じゃないエラー")
except Exception as e:
print(f"アップロードエラー: {e}")
if __name__ == '__main__':
# キークロークにSAMLアサーションを取りに行く。
sso_url = 'キークロークのURL'
idp_user = 'ユーザ名'
idp_pass = 'パスワード'
# SAML アサーション
saml_assertion = get_saml_assertion(sso_url, idp_user, idp_pass)
# キークロークからもらったSAMLアサーションを持って、AWSに対して一時的な権限を発行してもらう。
# アカウントID、SAMLプロバイダ名、ロール名を指定
account = 'AWSのアカウントID'
saml_provider_name = 'AWSで設定したSAMLプロバイダー名'
role_name = 'ロール名'
# クレデンシャルを取得
credentials = get_aws_credentials(saml_assertion, account, saml_provider_name, role_name)
#アップロードする
s3_bucket_name = 'バケット名' # バケット名
source_file_name = 'test.csv' # ファイル名(拡張子付き)
region = 'us-east-1' # リージョン
code = 'フォルダがある場合はフォルダ名' #フォルダ名※必要な時(code/source_file_nameになる)
endpoint = 'https://s3.us-east-1.amazonaws.com' # DXの場合はインタフェース型のエンドポイントを指定
print('\n\nアップロード処理開始')
# S3クライアントを作成
s3_client = create_s3_client(region, endpoint, credentials['accessKeyId'], credentials['secretAccessKey'], credentials['sessionToken'])
# マルチパートアップロートする。
s3_multipart_upload(s3_client, s3_bucket_name, source_file_name, region, code, endpoint)
print('\n\nアップロード処理完了')
プログラムの動き
①keycloakにSAMLアサーションを取得しにいく。
※SAMLアサーションとは
ユーザーがサインインしたことをサービスプロバイダーに伝えるメッセージのこと。SAMLアサーションには、アサーションのソース、発行時刻、アサーションを有効にする条件を含む、サービスプロバイダーがユーザーIDを確認するのに必要なすべての情報が含まれる。
②SAMLアサーションを持って、AWSに一時的な認証情報をもらう。
一時的な認証情報には、「accessKeyId」「secretAccessKey」「sessionToken」が含まれている。
プログラムを実行するとこれらが表示するようにしている。
③一時的な認証情報をもとに、S3へアップロード
・プログラムと同じフォルダ内にアップロードするファイルがある前提で作成している。
・S3のバケット直下ではなくフォルダの中に入れたい場合はcodeにフォルダ名を記載する。直下でよい場合は、「code = ""」 として文字数0とする。
・専用線接続している場合はインターフェース型のエンドポイントをendpointのところで指定する。
・リージョンは必要に応じて設定する。
アップロードに関しては、以上!