Python boto3 でS3のディレクトリだけを再帰的に取得する
Athena使用する際に、大量のデータでLoad Partition(MSCK Repair Table)をすると、10分以上実行が走った後、落ちる現象に遭遇しました。
そこで、ディレクトリだけを再帰的に取得し、ALTER TABLEでPARTITIONを追加するクエリを取得するPythonコードを書きました。
import boto3
import pyperclip
DIR_LIST = [] # global変数
BUCKET = '{bucket名を入力}'
ATHENA_TABLE = '{Athenaのテーブル名}'
PARTITIONS = ['partitino1','partition2','partition3'] # HIVE形式で partition1=xxx となっている前提
RootDirectory = 'test/' #ディレクトリを指定
### 再帰的に、s3のディレクトリを取得する
def getS3Dir(prefix):
client = boto3.client('s3')
result = client.list_objects(Bucket=BUCKET, Prefix=prefix, Delimiter='/')
if result.get('CommonPrefixes'):
for o in result.get('CommonPrefixes'):
path = o.get('Prefix')
chLi = getS3Dir(path)
if chLi:
for l in chLi:
getS3Dir(l)
else:
DIR_LIST.append(path)
return chLi
return None
### パーティション追加
def hasAllKeys(keys:dict,partitions:list):
flg=True
for pt in partitions:
if pt not in keys:
flg=False
break
return flg
def customRepairTable():
q1 = []
query = ''
for directory in DIR_LIST:
dirL = directory.split('/')
keys = {d.split('=')[0]:d.split('=')[1] for d in dirL if '=' in d}
if hasAllKeys(keys,PARTITIONS):
part_list = []
for pt in PARTITIONS:
part_list.append("{} = '{}'".format(pt,keys[pt]))
q1.append( "PARTITION ({})".format(','.join(part_list)))
if len(q1) >0:
query = 'ALTER TABLE '+ATHENA_TABLE + ' ADD IF NOT EXISTS ' + ' '.join(q1)
print(query)
return query
getS3Dir(RootDirectory)
q = customRepairTable()
pyperclip.copy(q) # Copy to ClipBoard
print(len(DIR_LIST))
print('クリップボードにコピーしました')
pyperclipでクリップボードにコピーできるのは、便利ですね。