背景
- Storage Transfer Serviceの実行をスケジューラーで管理したい
- pythonジョブとして実行管理したい
-
StorageTransferServiceClient#run_transfer_job
を使えばpythonから実行をフックできる - すでに作成済みのTransferJobであれば、そのジョブのnameを指定すればいい
-
- 一応、pythonでTransferJobの作成する処理も記載しておく
実装
class StorageTransferServiceUtil:
def __init__(self, project: str):
self.project: str = project
# ジョブ作成用
# ジョブの実行はTransferServiceで管理せず、別のSchedulerで管理する
# そのためパラメータのSchedulerは渡さない
def create_s3_transfer_job(self, description: str,
source_bucket: str, aws_access_key_id: str,
aws_secret_access_key: str,
sink_bucket: str) -> str:
client = storage_transfer.StorageTransferServiceClient()
transfer_job_request = storage_transfer.CreateTransferJobRequest({
'transfer_job': {
'project_id': self.project,
'description': description,
'status': storage_transfer.TransferJob.Status.ENABLED,
'transfer_spec': {
'aws_s3_data_source': {
'bucket_name': source_bucket,
'aws_access_key': {
'access_key_id': aws_access_key_id,
'secret_access_key': aws_secret_access_key,
}
},
'gcs_data_sink': {
'bucket_name': sink_bucket,
}
}
}
})
result = client.create_transfer_job(transfer_job_request)
return result.name
# ジョブの実行用
# ジョブの作成処理で作ったジョブのjob_name(transferJobs/xxxxx)を使用する
def run_s3_transfer_job(self, job_name: str) -> None:
client = storage_transfer.StorageTransferServiceClient()
request = storage_transfer.RunTransferJobRequest(
job_name=job_name,
project_id=self.project,
)
operation = client.run_transfer_job(request=request)
logging.info("Waiting for operation to complete...")
response = operation.result()
logging.info(response)
return
# jobを作成して、一度だけ実行してみる
if __name__ == '__main__':
service = StorageTransferServiceUtil(YOUR_PROJECT)
name = service.create_s3_transfer_job(
description="job_description",
source_bucket=AWS_BUCKET,
aws_access_key_id=YOUR_AWS_ACCESS_KEY,
aws_secret_access_key=YOUR_AWS_SECRET,
sink_bucket=GCS_BUCKET
)
service.run_s3_transfer_job(name)
# 作成ずみのnameを使用すれば、pythonでtransfer_jobの実行をschedulingできる
参照