S3の操作をGUIで行いたい!
以前に作成したプログラムで、Fileのアップロードができるようになったものの、アップロードができたのかの確認などができなかった。
それに、コマンドプロンプトでの操作だといまいちだったのもあって、GUIで何とかしたいと思ったわけである。
今回初めてTkinterなるものを使ってみた。
あと、いまいちClassの使い方がわかっていないので、ちょっと一般的な書き方と違うかもしれない。
というかいろんなところを参照しているから、書き方に統一性がないのは今後の課題である。
実際のコード
今回、ファイルを二種類に分けている。
プログラム本体と、設定ファイルを分けてみた。
一つ目は、設定ファイル
以下の情報は設定ファイルに記載する。
#SAML
sso_url = 〇キークロークのURL〇
idp_user = 〇キークローク(IDP)のユーザ名〇
idp_pass = 〇キークローク(IDP)のパスワード〇
#AWS
account = 〇アカウントのID(数字12桁)〇
saml_provider_name = 〇AWSで設定したユーザ〇
role_name = 〇AWSで設定したロール名〇
2つ目はプログラム本体である。
import sys
import json
import time
import hmac
import math
import hashlib
import datetime
from datetime import datetime
from urllib.parse import unquote
import tkinter
from tkinter import ttk, filedialog, messagebox
import requests
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
from bs4 import BeautifulSoup
#####################変数###########################
region = ""
s3_file_info = ""
#####################処理関数###########################
def ini_read():
"""
#設定ファイルの読み込み
#エラーで合致しないときに0で返すように変数に初期設定。
"""
ans_sso_url = "0"
ans_idp_user = "0"
ans_idp_pass = "0"
ans_account = "0"
ans_saml_provider_name = "0"
ans_role_name = "0"
try:
file_name = "param.ini"
with open(file_name) as f:
line = f.readlines()
for i in line:
if i[0] == "#":
pass
else:
#各設定値を取得
#スペースを削除して、「=」でSplitする。
data = i.replace(" ","").split("=")
if data[0] == "sso_url":
ans_sso_url = data[1].strip()
elif data[0] == "idp_user":
ans_idp_user = data[1].strip()
elif data[0] == "idp_pass":
ans_idp_pass = data[1].strip()
elif data[0] == "account":
ans_account = data[1].strip()
elif data[0] == "saml_provider_name":
ans_saml_provider_name = data[1].strip()
elif data[0] == "role_name":
ans_role_name = data[1].strip()
return ans_sso_url, ans_idp_user, ans_idp_pass, ans_account, ans_saml_provider_name, ans_role_name
except FileNotFoundError:
print(f"Error: {file_name} not found.")
#エラーの場合は全部文字列のゼロ返し。
return "0","0","0","0","0","0"
except Exception as e:
print("iniファイル読み込みエラー")
print(e)
#エラーの場合は全部文字列のゼロ返し。
return "0","0","0","0","0","0"
def aws_client():
"""
S3クライアントを作る。
"""
global credentials
# S3クライアントを作成
s3_client = create_s3_client(credentials['accessKeyId'], credentials['secretAccessKey'], credentials['sessionToken'])
return s3_client
def create_s3_client(access_key_id, secret_access_key, session_token):
"""
S3クライアントを作る。
複数作るかもだったから分けたけど、分けなくてよかったかも。
"""
global region
#エンドポイントの指定
if region == "us-east-1":
endpoint = "https://s3.us-east-1.amazonaws.com"
elif region == "us-east-2":
endpoint = "https://s3.us-east-2.amazonaws.com"
elif region == "us-us-west-1":
endpoint = "https://s3.us-west-1.amazonaws.com"
elif region == "us-west-2":
endpoint = "https://s3.us-west-2.amazonaws.com"
elif region == "ap-northeast-1":
endpoint = "https://s3.ap-northeast-1.amazonaws.com"
elif region == "ap-northeast-3":
endpoint = "https://s3.ap-northeast-3.amazonaws.com"
#インタフェース型のエンドポイントの場合は上記をコメントアウトして、以下でエンドポイント指定する。
#endpoint = "https://s3.us-east-1.amazonaws.com"
#エンドポイント選択
return boto3.client(
's3',
endpoint_url=endpoint,
region_name=region,
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
aws_session_token=session_token,
)
def get_saml_assertion(sso_url, idp_user, idp_pass):
"""
SAMLアサーションを取得。
"""
try:
# セッションを使用してクッキーを管理
session = requests.Session()
# 初期リクエスト (GET)
# verify=False は SSL証明書を無視するためのもの
response = session.get(sso_url, verify=False)
# BeautifulSoup で HTML をパース
soup = BeautifulSoup(response.text, 'html.parser')
# フォームのURLを取得
form = soup.find('form', {'id': 'kc-form-login'})
post_url = unquote(form['action'])
# フォームデータの準備
form_data = {}
for input_tag in soup.find_all('input'):
name = input_tag.get('name')
if name == 'username':
form_data[name] = idp_user
elif name == 'password':
form_data[name] = idp_pass
else:
form_data[name] = input_tag.get('value', '')
# フォームを送信 (POST)
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
res = session.post(post_url, data=form_data, headers=headers, verify=False)
# 再度レスポンスをパースしてSAMLアサーションを取得
soup = BeautifulSoup(res.text, 'html.parser')
saml_response = soup.find('input', {'name': 'SAMLResponse'})
saml_assertion = saml_response['value'] if saml_response else None
flag = 0
print('********** SAML ASSERSION **********')
return saml_assertion
except Exception as e:
print('********** SAML ERROR **********')
print(e)
flag = 1
return e
def get_aws_credentials(saml_assertion, account, saml_provider_name, role_name, region='us-east-1', duration_seconds=900):
"""
一時的なクレデンシャル情報の取得
"""
try:
# STSクライアントの作成
client = boto3.client('sts', region_name=region)
# AssumeRoleWithSAML APIに渡す入力
response = client.assume_role_with_saml(
RoleArn=f'arn:aws:iam::{account}:role/{role_name}',
PrincipalArn=f'arn:aws:iam::{account}:saml-provider/{saml_provider_name}',
SAMLAssertion=saml_assertion,
DurationSeconds=duration_seconds
)
# レスポンスからクレデンシャルを取得
credentials = response['Credentials']
access_key_id = credentials['AccessKeyId']
secret_access_key = credentials['SecretAccessKey']
session_token = credentials['SessionToken']
return {
'accessKeyId': access_key_id,
'secretAccessKey': secret_access_key,
'sessionToken': session_token
}
except Exception as e:
print('********** ERROR **********')
print(e)
raise
def get_filenames_from_s3_response(s3_response):
"""
引数はListObjectのレスポンス
選択したバケット内にあるファイル名と更新時間の一覧を取得し返す。
ただし、ファイルサイズが0以上のもの。
"""
try:
contents = s3_response.get('Contents', [])
file_info = [(item['Key'], item['LastModified'].strftime('%Y/%m/%d %H:%M:%S')) for item in contents if item['Size'] > 0]
return file_info
except KeyError as e:
print(f"Error parsing S3 response: {e}")
return []
#####################画面表示###########################
class GUI(tkinter.Frame):
budgets_name = [] #budgetsの一覧を格納する用の配列
budget_name = "" #参照したいbudgetの名前を格納する用
def __init__(self, master = None):
super().__init__(master)
# GUIを描画するコード
master.geometry("1000x550")
master.title("AWS S3 Operation")
#####Scrollable part of the window
self.frame = tkinter.Frame(master)
self.frame.place(x=10, y=220, width=980, height=290)
self.canvas = tkinter.Canvas(self.frame)
self.v_scrollbar = ttk.Scrollbar(self.frame, orient=tkinter.VERTICAL, command=self.canvas.yview)
self.h_scrollbar = ttk.Scrollbar(self.frame, orient=tkinter.HORIZONTAL, command=self.canvas.xview)
self.canvas.configure(yscrollcommand=self.v_scrollbar.set, xscrollcommand=self.h_scrollbar.set)
self.v_scrollbar.pack(side=tkinter.RIGHT, fill=tkinter.Y)
self.h_scrollbar.pack(side=tkinter.BOTTOM, fill=tkinter.X)
self.canvas.pack(side=tkinter.LEFT, fill=tkinter.BOTH, expand=True)
self.canvas.bind('<Configure>', self.on_configure)
self.inner_frame = tkinter.Frame(self.canvas)
self.canvas.create_window((0, 0), window=self.inner_frame, anchor='nw')
####
self.canvas_fix = tkinter.Canvas(master, width=1000, height=210)
self.canvas_fix.pack()
account_info = tkinter.Label(text="設定されているアカウント:" + account)
account_info.place(x=10, y=35)
role_info = tkinter.Label(text="設定されているrole name:" + role_name)
role_info.place(x=10, y=55)
role_info = tkinter.Label(text="1.リージョンを選択してregion setボタンを押す。 → 2.Get List Bugetsを押す。 →3.操作するBudgetを選択してBuget name setを押す。" ,anchor=tkinter.W)
role_info.place(x=10, y=85)
role_info = tkinter.Label(text="→ 4.アップロードの時はUpload Fileボタンを押すとファイル選択ができる。削除とダウンロードは、ファイルを選択してDelete FileまたはDownload Fileボタンを押す。" ,anchor=tkinter.W)
role_info.place(x=10, y=105)
# regionを選択するプルダウンリスト
aws_region = ["us-east-1","us-east-2","us-west-1","us-west-2","ap-northeast-1","ap-northeast-3"]
self.valuesoption = ttk.Combobox(master,values = aws_region)
self.valuesoption.place(x=10, y=140)
self.valuesoption.set("us-east-1")
#バケット名を選択するプルダウンリスト。内容はバケット一覧取得時にセット
self.budgets_name_select = ttk.Combobox(master)
self.budgets_name_select.place(x=200, y=140)
# ファイル名を選択するプルダウンリスト
self.file_name_select = ttk.Combobox(master, width=90)
self.file_name_select.place(x=400, y=140)
#ボタン類、引数を渡すときはlambda式にしないといけない。
#SAMLリクエストボタン
saml_btn = tkinter.Button(text="Saml request", command = lambda:self.saml_request())
saml_btn.place(x=10, y=10)
#バジェットの情報を取るための事前設定
region_setting_btn = tkinter.Button(self.master,text="region set", command = self.bugets_setting)
region_setting_btn.place(x=95, y=10)
#Budgeetsの一覧を取得し、Bugetsを選択できるように設定。
budgets_listbudgets_btn = tkinter.Button(self.master,text="Get List Bugets", command = self.list_s3_buckets)
budgets_listbudgets_btn.place(x=165, y=10)
#Budgeetsの一覧を取得し、Bugetsを選択できるように設定。
budgetname_set_btn = tkinter.Button(self.master,text="Buget name set", command = self.buget_name_set)
budgetname_set_btn.place(x=260, y=10)
"""
#Bugets内のファイル一覧を取得。Budget name setの時に呼び出すことにしたので削除。
budgets_getinfo_btn = tkinter.Button(self.master,text="Get Bugets Info", command = self.get_info)
budgets_getinfo_btn.place(x=360, y=10)
"""
# ファイルを選択してS3にアップロード
upload_file_btn = tkinter.Button(master, text="Upload File", command=self.upload_file)
upload_file_btn.place(x=460, y=10)
# 選択したファイルをS3から削除
delete_file_btn = tkinter.Button(master, text="Delete File", command=self.delete_file)
delete_file_btn.place(x=535, y=10)
# 選択したファイルをS3からダウンロード
download_file_btn = tkinter.Button(master, text="Download File", command=self.download_file)
download_file_btn.place(x=610, y=10)
def on_configure(self, event):
"""
スクロールを文字長さに併せて更新。
"""
self.canvas.configure(scrollregion=self.canvas.bbox('all'))
def update(self):
"""
表示される文字の更新。
"""
global region
global s3_file_info
self.canvas.delete("all")
self.canvas_fix.delete("all")
self.canvas_fix.create_text(10, 75,text=credentials, anchor= "nw", font=("MSゴシック体", "9", "normal"), fill="#000000")
if region == "":
self.canvas_fix.create_text(10, 170,text="region未設定", anchor= "nw", font=("MSゴシック体", "9", "normal"), fill="#000000")
else:
self.canvas_fix.create_text(10, 170,text="選択リージョン:"+region, anchor= "nw", font=("MSゴシック体", "9", "normal"), fill="#000000")
if len(GUI.budgets_name) == 0 :
self.canvas_fix.create_text(10, 190,text="budget一覧未取得", anchor= "nw", font=("MSゴシック体", "9", "normal"), fill="#000000")
else:
self.canvas_fix.create_text(10, 190,text=GUI.budgets_name, anchor= "nw", font=("MSゴシック体", "9", "normal"), fill="#000000")
if GUI.budget_name == "":
self.canvas_fix.create_text(200, 170,text="budget未設定", anchor= "nw", font=("MSゴシック体", "9", "normal"), fill="#000000")
else:
self.canvas_fix.create_text(200, 170,text="選択budget:"+GUI.budget_name, anchor= "nw", font=("MSゴシック体", "9", "normal"), fill="#000000")
self.canvas.create_text(10, 10,text=s3_file_info, anchor= "nw", font=("MSゴシック体", "9", "normal"), fill="#000000")
#更新(1000ミリ秒)
self.after(100,self.update)
# Update scroll region to fit the content
self.canvas.configure(scrollregion=self.canvas.bbox("all"))
def saml_request(self):
"""
Saml requestのボタンを押したとき動作。
認証の時間が切れたときに再度認証をしに行く。
"""
global saml_as
global credentials
saml_as = get_saml_assertion(sso_url, idp_user, idp_pass)
credentials = get_aws_credentials(saml_as, account, saml_provider_name, role_name)
def bugets_setting(self):
"""
#情報を取得するBugetsの設定をする。Bugets settingのボタンを押したときの動作
"""
global region
global s3_client
region = self.valuesoption.get()
s3_client = aws_client()
def list_s3_buckets(self):
"""
Budgetのリストを取得して、プルダウンリストを更新する。
"""
global s3_client
try:
GUI.budgets_name = []
response = s3_client.list_buckets()
buckets = [bucket['Name'] for bucket in response['Buckets']]
#print(buckets)
for name in buckets:
GUI.budgets_name.append(name)
#return buckets
self.budgets_name_select['values'] = GUI.budgets_name
print(response)
except Exception as e:
messagebox.showerror("Error", f"バケットの一覧が取得できませんでした。: {e}")
return []
def buget_name_set(self):
"""
#情報を取得するBugetsの設定をする。Bugets name setのボタンを押したときの動作
"""
GUI.budget_name = self.budgets_name_select.get()
self.get_info()
#
def get_info(self):
"""
Budget内のfile一覧をListObjectで表示する。
"""
global s3_client
global s3_file_info
try:
s3_file_info = GUI.budget_name + "のファイル名と更新時間\n"
response = s3_client.list_objects_v2(Bucket = GUI.budget_name)
datas = get_filenames_from_s3_response(response)
filenames = []
for data in datas:
s3_file_info = s3_file_info + str(data).replace("(","").replace(")","").replace("'","") + "\n"
filenames.append(data[0])
self.file_name_select['values'] = filenames
except Exception as e:
messagebox.showerror("Error", f"オブジェクトの一覧が取得できませんでした。: {e}")
return []
def upload_file(self):
"""
ファイルを選択してS3にアップロードする。
"""
global s3_client
file_path = filedialog.askopenfilename()
if file_path:
bucket_name = self.budgets_name_select.get()
if not bucket_name:
print("Error: No bucket selected.")
messagebox.showerror("Error", "バケットが選択されていません。")
return
try:
s3_client.upload_file(file_path, bucket_name, file_path.split('/')[-1])
messagebox.showinfo("Success", f"ファイルのアップロードが成功しました!\n「 {file_path} 」の {bucket_name}へのアップロードが成功しました。")
self.get_info() # 更新されたファイルリストを取得
except Exception as e:
messagebox.showerror("Error", f"ファイルのアップロードが失敗しました。\n {e}")
def delete_file(self):
"""
プルダウンリストから選択したファイルをS3から削除する。
"""
global s3_client
file_name = self.file_name_select.get()
if not file_name:
print("Error: No file selected.")
messagebox.showerror("Error", "ファイルが選択されていません。")
return
bucket_name = self.budgets_name_select.get()
if not bucket_name:
print("Error: No bucket selected.")
messagebox.showerror("Error", "バケットが選択されていません。")
return
try:
s3_client.delete_object(Bucket=bucket_name, Key=file_name)
messagebox.showinfo("Success", f"ファイルの削除が成功しました!\n「 {file_name} 」の {bucket_name}からの削除が成功しました。")
self.get_info() # 更新されたファイルリストを取得
except Exception as e:
messagebox.showerror("Error", f"ファイルの削除が失敗しました。\n {e}")
def download_file(self):
"""
プルダウンリストから選択したファイルをS3からダウンロードする。
"""
global s3_client
file_name = self.file_name_select.get()
if not file_name:
print("Error: No file selected.")
messagebox.showerror("Error", "ファイルが選択されていません。")
return
bucket_name = self.budgets_name_select.get()
if not bucket_name:
print("Error: No bucket selected.")
messagebox.showerror("Error", "バケットが選択されていません。")
return
save_path = filedialog.asksaveasfilename(initialfile=file_name)
if not save_path:
return
try:
s3_client.download_file(bucket_name, file_name, save_path)
messagebox.showinfo("Success", f"ファイルのダウンロードが成功しました!\n「 {file_name} 」を {save_path} に保存しました。")
except Exception as e:
messagebox.showerror("Error", f"ファイルのダウンロードが失敗しました。\n {e}")
def main():
gui = tkinter.Tk()
app = GUI(master = gui)
app.update()
app.mainloop()
#######################################################
if __name__ == "__main__":
#初期読み込み
sso_url, idp_user, idp_pass, account, saml_provider_name, role_name= ini_read()
#AWSクレデンシャル取得。
saml_as = get_saml_assertion(sso_url, idp_user, idp_pass)
#print(saml_as)
credentials = get_aws_credentials(saml_as, account, saml_provider_name, role_name)
#print(credentials)
main()
Samlの認証は起動時にかけるが時間がたったら期限切れになるので、その場合はSaml requestボタンを押す。
そうすると認証情報を取り直す。
基本的な操作は、
左のプルダウンリストからリージョンを選択して、region setボタンを押す。
その次に、GetListBudgetsボタンを押すと、真ん中のプルダウンリストからバケットを選択できるようになるので、選択する。
選択したら、Budget name set ボタンを押すと、ファイル一覧を取得する。
ここまでで、S3のファイルを操作する事前準備ができた。
実際にアップロードをしたい場合は、Upload Fileボタンを押すとファイルの選択画面がポップするので、ファイルを選択するとアップロードが開始される。
ファイルの削除やダウンロードを行いたい場合は、右のプルダウンリストからファイルを選択して、目的のボタンを押すことで操作が可能である。
色々改善点はあるが、とりあえずファイル操作はこんな感じでGUIでできるようになった。
次は、コスト周りをGUIで何とか出来たらいいな。