0
Help us understand the problem. What are the problem?

posted at

updated at

Organization

Cloudianオブジェクトストレージ/マルチスレッドによるファイルのアップロード

今回は、Python(boto3) のサンプル編です

マルチスレッドによるファイルのアップロード

指定したディレクトリ(このサンプルでは 「./log」)にある全てのファイルを、Cloudian上のバケット「logs」の中の実行日のフォルダ (YYYY-MM-DD)にアップロードする Python プログラムです。

アップロードするファイル毎にスレッドを作成して、マルチスレッドで Cloudianにファイルをアップロードします。アップロード時にファイルに対して、ACL に「private」を設定し、メタデータとしてアップロードした日付を付与し、AES256 のサーバーサイド暗号化を有効にしています。

MultipartUpload.py
import os
import glob
import threading
import sys
from datetime import datetime

import boto3, botocore
from boto3.s3.transfer import S3Transfer
from boto3.s3.transfer import TransferConfig


######### マルチスレッド関数 ##########
def multithreads_upload(upfile):

    print("Thread %s is uploading file: %s" % (threading.current_thread(), upfile))

    transfer.upload_file(
        upfile,
       bucket,
       today + '/' + os.path.basename(upfile),

       extra_args={
            'ACL': 'private',
            'Metadata': {'Stored': today},
            'ServerSideEncryption': 'AES256'
       }
    )

    print("Thread %s done uploading file: %s" % (threading.current_thread(), upfile))



########## メイン処理 ##########
# 変数セット
dir = './fileup/'

bucket = 'logs'
region = 'region1'
today = datetime.now().strftime('%Y-%m-%d')


# boto3オブジェクト作成
client = boto3.client(
    's3',
    endpoint_url='http://s3-region1.admin-tech.tokyo',

    # 認証情報の直接記述は推奨されない(テスト目的)
    aws_access_key_id='アクセスキーを記述',
    aws_secret_access_key='シークレットキーを記述'
)

# 転送オブジェクト作成
transfer = S3Transfer(client)



try:
    # バケットの存在確認
    client.head_bucket(Bucket = bucket)

except botocore.exceptions.ClientError:
    # バケットが存在しなければ、新規作成
    client.create_bucket(
        ACL='private',
        Bucket=bucket,
        CreateBucketConfiguration={
            'LocationConstraint': 'region1'
        }
)


# マルチスレッドによるファイルのアップロード
upfiles = [dir + file for file in os.listdir(dir) if os.path.isfile(dir + file)]
threads = []

for upfile in upfiles:
    t = threading.Thread(target=multithreads_upload, args=(upfile,))
    t.start()
    threads.append(t)

for t in threads:
    t.join()

プログラム実行の出力ログから、アップロードするファイルサイズにより各スレッドの開始と終了に差があることが分かります。
このサンプルでは各スレッドの終了を threading.join()を使って、全てのファイルのアップロードが終了するのを待機します。

下図はこのサンプルプログラムによりアップロードされたファイルを、CMC から確認した画面です

4-2-1.png

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
0
Help us understand the problem. What are the problem?