0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

GUI(TKinter)でS3を操作する。

Posted at

S3の操作をGUIで行いたい!

以前に作成したプログラムで、Fileのアップロードができるようになったものの、アップロードができたのかの確認などができなかった。

それに、コマンドプロンプトでの操作だといまいちだったのもあって、GUIで何とかしたいと思ったわけである。

今回初めてTkinterなるものを使ってみた。
あと、いまいちClassの使い方がわかっていないので、ちょっと一般的な書き方と違うかもしれない。
というかいろんなところを参照しているから、書き方に統一性がないのは今後の課題である。

実際のコード

今回、ファイルを二種類に分けている。
プログラム本体と、設定ファイルを分けてみた。

一つ目は、設定ファイル
以下の情報は設定ファイルに記載する。

param.ini
#SAML
sso_url = 〇キークロークのURL〇
idp_user = 〇キークローク(IDP)のユーザ名〇
idp_pass = 〇キークローク(IDP)のパスワード〇
#AWS
account = 〇アカウントのID(数字12桁)〇
saml_provider_name = 〇AWSで設定したユーザ〇
role_name = 〇AWSで設定したロール名〇

2つ目はプログラム本体である。

main.py
import sys
import json
import time
import hmac
import math
import hashlib
import datetime
from datetime import datetime
from urllib.parse import unquote
import tkinter
from tkinter import ttk, filedialog, messagebox
import requests
import boto3
from botocore.exceptions import ClientError, NoCredentialsError
from bs4 import BeautifulSoup

#####################変数###########################
region = ""
s3_file_info = ""

#####################処理関数###########################
def ini_read():
    """
    #設定ファイルの読み込み
    #エラーで合致しないときに0で返すように変数に初期設定。
    """
    ans_sso_url = "0"
    ans_idp_user = "0"
    ans_idp_pass = "0"
    ans_account = "0"
    ans_saml_provider_name = "0"
    ans_role_name = "0"
    try:
        file_name = "param.ini"
        with open(file_name) as f:
            line = f.readlines()

        for i in line:
            if i[0] == "#":
                pass
            else:
                #各設定値を取得
                #スペースを削除して、「=」でSplitする。
                data = i.replace(" ","").split("=")
                if data[0] == "sso_url":
                    ans_sso_url = data[1].strip()
                elif data[0] == "idp_user":
                    ans_idp_user = data[1].strip()
                elif data[0] == "idp_pass":
                    ans_idp_pass = data[1].strip()
                elif data[0] == "account":
                    ans_account = data[1].strip()
                elif data[0] == "saml_provider_name":
                    ans_saml_provider_name = data[1].strip()
                elif data[0] == "role_name":
                    ans_role_name = data[1].strip()
        return ans_sso_url, ans_idp_user, ans_idp_pass, ans_account, ans_saml_provider_name, ans_role_name

    except FileNotFoundError:
        print(f"Error: {file_name} not found.")
        #エラーの場合は全部文字列のゼロ返し。
        return "0","0","0","0","0","0"

    except Exception as e:
        print("iniファイル読み込みエラー")
        print(e)
        #エラーの場合は全部文字列のゼロ返し。
        return "0","0","0","0","0","0"

def aws_client():
    """
    S3クライアントを作る。
    """
    global credentials
    # S3クライアントを作成
    s3_client = create_s3_client(credentials['accessKeyId'], credentials['secretAccessKey'], credentials['sessionToken'])
    return s3_client

def create_s3_client(access_key_id, secret_access_key, session_token):
    """
    S3クライアントを作る。
    複数作るかもだったから分けたけど、分けなくてよかったかも。
    """
    global region
    #エンドポイントの指定
    if region == "us-east-1":
        endpoint = "https://s3.us-east-1.amazonaws.com"
    elif region == "us-east-2":
        endpoint = "https://s3.us-east-2.amazonaws.com"
    elif region == "us-us-west-1":
        endpoint = "https://s3.us-west-1.amazonaws.com"
    elif region == "us-west-2":
        endpoint = "https://s3.us-west-2.amazonaws.com"
    elif region == "ap-northeast-1":
        endpoint = "https://s3.ap-northeast-1.amazonaws.com"
    elif region == "ap-northeast-3":
        endpoint = "https://s3.ap-northeast-3.amazonaws.com"
    
    #インタフェース型のエンドポイントの場合は上記をコメントアウトして、以下でエンドポイント指定する。
    #endpoint = "https://s3.us-east-1.amazonaws.com"
    
    #エンドポイント選択
    return boto3.client(
        's3',
        endpoint_url=endpoint,
        region_name=region,
        aws_access_key_id=access_key_id,
        aws_secret_access_key=secret_access_key,
        aws_session_token=session_token,
    )

def get_saml_assertion(sso_url, idp_user, idp_pass):
    """
    SAMLアサーションを取得。
    """
    try:
        # セッションを使用してクッキーを管理
        session = requests.Session()

        # 初期リクエスト (GET)
        # verify=False は SSL証明書を無視するためのもの
        response = session.get(sso_url, verify=False)  

        # BeautifulSoup で HTML をパース
        soup = BeautifulSoup(response.text, 'html.parser')

        # フォームのURLを取得
        form = soup.find('form', {'id': 'kc-form-login'})
        post_url = unquote(form['action'])

        # フォームデータの準備
        form_data = {}
        for input_tag in soup.find_all('input'):
            name = input_tag.get('name')
            if name == 'username':
                form_data[name] = idp_user
            elif name == 'password':
                form_data[name] = idp_pass
            else:
                form_data[name] = input_tag.get('value', '')

        # フォームを送信 (POST)
        headers = {
            'Content-Type': 'application/x-www-form-urlencoded'
        }
        res = session.post(post_url, data=form_data, headers=headers, verify=False)

        # 再度レスポンスをパースしてSAMLアサーションを取得
        soup = BeautifulSoup(res.text, 'html.parser')
        saml_response = soup.find('input', {'name': 'SAMLResponse'})
        saml_assertion = saml_response['value'] if saml_response else None
        flag = 0
        print('********** SAML ASSERSION **********')
        return saml_assertion

    except Exception as e:
        print('********** SAML ERROR **********')
        print(e)
        flag = 1
        return e

def get_aws_credentials(saml_assertion, account, saml_provider_name, role_name, region='us-east-1', duration_seconds=900):
    """
    一時的なクレデンシャル情報の取得
    """
    try:
        # STSクライアントの作成
        client = boto3.client('sts', region_name=region)

        # AssumeRoleWithSAML APIに渡す入力
        response = client.assume_role_with_saml(
            RoleArn=f'arn:aws:iam::{account}:role/{role_name}',
            PrincipalArn=f'arn:aws:iam::{account}:saml-provider/{saml_provider_name}',
            SAMLAssertion=saml_assertion,
            DurationSeconds=duration_seconds
        )

        # レスポンスからクレデンシャルを取得
        credentials = response['Credentials']
        access_key_id = credentials['AccessKeyId']
        secret_access_key = credentials['SecretAccessKey']
        session_token = credentials['SessionToken']
        return {
            'accessKeyId': access_key_id,
            'secretAccessKey': secret_access_key,
            'sessionToken': session_token
        }

    except Exception as e:
        print('********** ERROR **********')
        print(e)
        raise

def get_filenames_from_s3_response(s3_response):
    """
    引数はListObjectのレスポンス
    選択したバケット内にあるファイル名と更新時間の一覧を取得し返す。
    ただし、ファイルサイズが0以上のもの。
    """
    try:
        contents = s3_response.get('Contents', [])
        file_info = [(item['Key'], item['LastModified'].strftime('%Y/%m/%d %H:%M:%S')) for item in contents if item['Size'] > 0]
        return file_info
    except KeyError as e:
        print(f"Error parsing S3 response: {e}")
        return []


#####################画面表示###########################
class GUI(tkinter.Frame):
    budgets_name = []   #budgetsの一覧を格納する用の配列
    budget_name = ""    #参照したいbudgetの名前を格納する用

    def __init__(self, master = None):
        super().__init__(master)
       # GUIを描画するコード
        master.geometry("1000x550") 
        master.title("AWS S3 Operation")

        #####Scrollable part of the window
        self.frame = tkinter.Frame(master)
        self.frame.place(x=10, y=220, width=980, height=290)
        self.canvas = tkinter.Canvas(self.frame)
        self.v_scrollbar = ttk.Scrollbar(self.frame, orient=tkinter.VERTICAL, command=self.canvas.yview)
        self.h_scrollbar = ttk.Scrollbar(self.frame, orient=tkinter.HORIZONTAL, command=self.canvas.xview)
        self.canvas.configure(yscrollcommand=self.v_scrollbar.set, xscrollcommand=self.h_scrollbar.set)

        self.v_scrollbar.pack(side=tkinter.RIGHT, fill=tkinter.Y)
        self.h_scrollbar.pack(side=tkinter.BOTTOM, fill=tkinter.X)
        self.canvas.pack(side=tkinter.LEFT, fill=tkinter.BOTH, expand=True)
        self.canvas.bind('<Configure>', self.on_configure)
        self.inner_frame = tkinter.Frame(self.canvas)
        self.canvas.create_window((0, 0), window=self.inner_frame, anchor='nw')
        ####

        self.canvas_fix = tkinter.Canvas(master, width=1000, height=210)
        self.canvas_fix.pack()

        account_info = tkinter.Label(text="設定されているアカウント:" + account)
        account_info.place(x=10, y=35)
        role_info = tkinter.Label(text="設定されているrole name:" + role_name)
        role_info.place(x=10, y=55)
        role_info = tkinter.Label(text="1.リージョンを選択してregion setボタンを押す。 → 2.Get List Bugetsを押す。 →3.操作するBudgetを選択してBuget name setを押す。" ,anchor=tkinter.W)
        role_info.place(x=10, y=85)
        role_info = tkinter.Label(text="→ 4.アップロードの時はUpload Fileボタンを押すとファイル選択ができる。削除とダウンロードは、ファイルを選択してDelete FileまたはDownload Fileボタンを押す。" ,anchor=tkinter.W)
        role_info.place(x=10, y=105)


        # regionを選択するプルダウンリスト
        aws_region = ["us-east-1","us-east-2","us-west-1","us-west-2","ap-northeast-1","ap-northeast-3"]
        self.valuesoption = ttk.Combobox(master,values = aws_region)
        self.valuesoption.place(x=10, y=140)
        self.valuesoption.set("us-east-1")
        #バケット名を選択するプルダウンリスト。内容はバケット一覧取得時にセット
        self.budgets_name_select = ttk.Combobox(master)
        self.budgets_name_select.place(x=200, y=140)

        # ファイル名を選択するプルダウンリスト
        self.file_name_select = ttk.Combobox(master, width=90)
        self.file_name_select.place(x=400, y=140)
        
        #ボタン類、引数を渡すときはlambda式にしないといけない。
        #SAMLリクエストボタン
        saml_btn = tkinter.Button(text="Saml request", command = lambda:self.saml_request())
        saml_btn.place(x=10, y=10)
        #バジェットの情報を取るための事前設定
        region_setting_btn = tkinter.Button(self.master,text="region set", command = self.bugets_setting)
        region_setting_btn.place(x=95, y=10)
        #Budgeetsの一覧を取得し、Bugetsを選択できるように設定。
        budgets_listbudgets_btn = tkinter.Button(self.master,text="Get List Bugets", command = self.list_s3_buckets)
        budgets_listbudgets_btn.place(x=165, y=10)
        #Budgeetsの一覧を取得し、Bugetsを選択できるように設定。
        budgetname_set_btn = tkinter.Button(self.master,text="Buget name set", command = self.buget_name_set)
        budgetname_set_btn.place(x=260, y=10)
        """
        #Bugets内のファイル一覧を取得。Budget name setの時に呼び出すことにしたので削除。
        budgets_getinfo_btn = tkinter.Button(self.master,text="Get Bugets Info", command = self.get_info)
        budgets_getinfo_btn.place(x=360, y=10)
        """
        # ファイルを選択してS3にアップロード
        upload_file_btn = tkinter.Button(master, text="Upload File", command=self.upload_file)
        upload_file_btn.place(x=460, y=10)
        # 選択したファイルをS3から削除
        delete_file_btn = tkinter.Button(master, text="Delete File", command=self.delete_file)
        delete_file_btn.place(x=535, y=10)
        # 選択したファイルをS3からダウンロード
        download_file_btn = tkinter.Button(master, text="Download File", command=self.download_file)
        download_file_btn.place(x=610, y=10)

    def on_configure(self, event):
        """
        スクロールを文字長さに併せて更新。
        """
        self.canvas.configure(scrollregion=self.canvas.bbox('all'))


    def update(self):
        """
        表示される文字の更新。
        """
        global region
        global s3_file_info
        self.canvas.delete("all")
        self.canvas_fix.delete("all")
        self.canvas_fix.create_text(10, 75,text=credentials, anchor= "nw", font=("MSゴシック体", "9", "normal"), fill="#000000")
        if region == "":
            self.canvas_fix.create_text(10, 170,text="region未設定", anchor= "nw", font=("MSゴシック体", "9", "normal"), fill="#000000") 
        else:
            self.canvas_fix.create_text(10, 170,text="選択リージョン:"+region, anchor= "nw", font=("MSゴシック体", "9", "normal"), fill="#000000") 
        if len(GUI.budgets_name) == 0 :
            self.canvas_fix.create_text(10, 190,text="budget一覧未取得", anchor= "nw", font=("MSゴシック体", "9", "normal"), fill="#000000") 
        else:
            self.canvas_fix.create_text(10, 190,text=GUI.budgets_name, anchor= "nw", font=("MSゴシック体", "9", "normal"), fill="#000000") 
        if GUI.budget_name == "":
            self.canvas_fix.create_text(200, 170,text="budget未設定", anchor= "nw", font=("MSゴシック体", "9", "normal"), fill="#000000") 
        else:        
            self.canvas_fix.create_text(200, 170,text="選択budget:"+GUI.budget_name, anchor= "nw", font=("MSゴシック体", "9", "normal"), fill="#000000") 
        
        self.canvas.create_text(10, 10,text=s3_file_info, anchor= "nw", font=("MSゴシック体", "9", "normal"), fill="#000000") 

        #更新(1000ミリ秒)
        self.after(100,self.update)
        # Update scroll region to fit the content
        self.canvas.configure(scrollregion=self.canvas.bbox("all"))


    def saml_request(self):
        """
        Saml requestのボタンを押したとき動作。
        認証の時間が切れたときに再度認証をしに行く。
        """
        global saml_as
        global credentials
        saml_as = get_saml_assertion(sso_url, idp_user, idp_pass)
        credentials = get_aws_credentials(saml_as, account, saml_provider_name, role_name)

 
    def bugets_setting(self):
        """
        #情報を取得するBugetsの設定をする。Bugets settingのボタンを押したときの動作

        """
        global region
        global s3_client
        region = self.valuesoption.get()
        s3_client = aws_client()

    def list_s3_buckets(self):
        """
        Budgetのリストを取得して、プルダウンリストを更新する。
        """
        global s3_client
        try:
            GUI.budgets_name = []
            response = s3_client.list_buckets()
            buckets = [bucket['Name'] for bucket in response['Buckets']]
            #print(buckets)
            for name in buckets:
                GUI.budgets_name.append(name)
            #return buckets
            self.budgets_name_select['values'] = GUI.budgets_name
            print(response)

        except Exception as e:
            messagebox.showerror("Error", f"バケットの一覧が取得できませんでした。: {e}")
            return []

    def buget_name_set(self):
        """
        #情報を取得するBugetsの設定をする。Bugets name setのボタンを押したときの動作

        """
        GUI.budget_name = self.budgets_name_select.get()
        self.get_info()
    #
    def get_info(self):
        """
        Budget内のfile一覧をListObjectで表示する。        
        """
        global s3_client
        global s3_file_info
        try:
            s3_file_info = GUI.budget_name + "のファイル名と更新時間\n"        
            response = s3_client.list_objects_v2(Bucket = GUI.budget_name)
            datas = get_filenames_from_s3_response(response)
            filenames = []
            for data in datas:
                s3_file_info = s3_file_info + str(data).replace("(","").replace(")","").replace("'","")  + "\n"
                filenames.append(data[0])
            self.file_name_select['values'] = filenames

        except Exception as e:
            messagebox.showerror("Error", f"オブジェクトの一覧が取得できませんでした。: {e}")
            return []

    def upload_file(self):
        """
        ファイルを選択してS3にアップロードする。
        """
        global s3_client
        file_path = filedialog.askopenfilename()
        if file_path:
            bucket_name = self.budgets_name_select.get()
            if not bucket_name:
                print("Error: No bucket selected.")
                messagebox.showerror("Error", "バケットが選択されていません。")
                return
            try:
                s3_client.upload_file(file_path, bucket_name, file_path.split('/')[-1])
                messagebox.showinfo("Success", f"ファイルのアップロードが成功しました!\n{file_path} 」の {bucket_name}へのアップロードが成功しました。")
                self.get_info()  # 更新されたファイルリストを取得
            except Exception as e:
                messagebox.showerror("Error", f"ファイルのアップロードが失敗しました。\n {e}")

    def delete_file(self):
        """
        プルダウンリストから選択したファイルをS3から削除する。
        """
        global s3_client
        file_name = self.file_name_select.get()
        if not file_name:
            print("Error: No file selected.")
            messagebox.showerror("Error", "ファイルが選択されていません。")
            return
        bucket_name = self.budgets_name_select.get()
        if not bucket_name:
            print("Error: No bucket selected.")
            messagebox.showerror("Error", "バケットが選択されていません。")
            return
        try:
            s3_client.delete_object(Bucket=bucket_name, Key=file_name)
            messagebox.showinfo("Success", f"ファイルの削除が成功しました!\n{file_name} 」の {bucket_name}からの削除が成功しました。")
            self.get_info()  # 更新されたファイルリストを取得
        except Exception as e:
            messagebox.showerror("Error", f"ファイルの削除が失敗しました。\n {e}")

    def download_file(self):
        """
        プルダウンリストから選択したファイルをS3からダウンロードする。
        """
        global s3_client
        file_name = self.file_name_select.get()
        if not file_name:
            print("Error: No file selected.")
            messagebox.showerror("Error", "ファイルが選択されていません。")
            return
        bucket_name = self.budgets_name_select.get()
        if not bucket_name:
            print("Error: No bucket selected.")
            messagebox.showerror("Error", "バケットが選択されていません。")
            return
        save_path = filedialog.asksaveasfilename(initialfile=file_name)
        if not save_path:
            return
        try:
            s3_client.download_file(bucket_name, file_name, save_path)
            messagebox.showinfo("Success", f"ファイルのダウンロードが成功しました!\n{file_name} 」を {save_path} に保存しました。")
        except Exception as e:
            messagebox.showerror("Error", f"ファイルのダウンロードが失敗しました。\n {e}")


def main():
   gui = tkinter.Tk()
   app = GUI(master = gui)
   app.update()
   app.mainloop()

#######################################################

if __name__ == "__main__":
    #初期読み込み
    sso_url, idp_user, idp_pass, account, saml_provider_name, role_name= ini_read()
    #AWSクレデンシャル取得。
    saml_as = get_saml_assertion(sso_url, idp_user, idp_pass)
    #print(saml_as)
    credentials = get_aws_credentials(saml_as, account, saml_provider_name, role_name)
    #print(credentials)
    main()

これを実行すると、以下のような画面が立ち上がる。
画面.png

Samlの認証は起動時にかけるが時間がたったら期限切れになるので、その場合はSaml requestボタンを押す。
そうすると認証情報を取り直す。

基本的な操作は、
左のプルダウンリストからリージョンを選択して、region setボタンを押す。
その次に、GetListBudgetsボタンを押すと、真ん中のプルダウンリストからバケットを選択できるようになるので、選択する。
選択したら、Budget name set ボタンを押すと、ファイル一覧を取得する。

ここまでで、S3のファイルを操作する事前準備ができた。

実際にアップロードをしたい場合は、Upload Fileボタンを押すとファイルの選択画面がポップするので、ファイルを選択するとアップロードが開始される。

ファイルの削除やダウンロードを行いたい場合は、右のプルダウンリストからファイルを選択して、目的のボタンを押すことで操作が可能である。

色々改善点はあるが、とりあえずファイル操作はこんな感じでGUIでできるようになった。

次は、コスト周りをGUIで何とか出来たらいいな。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?