0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

GCPのStorage Transer Servce のS3 -> GCSのジョブ作成と実行をpythonで実装する

Posted at

背景

  • Storage Transfer Serviceの実行をスケジューラーで管理したい
  • pythonジョブとして実行管理したい
    • StorageTransferServiceClient#run_transfer_job を使えばpythonから実行をフックできる
    • すでに作成済みのTransferJobであれば、そのジョブのnameを指定すればいい
  • 一応、pythonでTransferJobの作成する処理も記載しておく

実装

class StorageTransferServiceUtil:

    def __init__(self, project: str):
        self.project: str = project

   # ジョブ作成用
       # ジョブの実行はTransferServiceで管理せず、別のSchedulerで管理する
   # そのためパラメータのSchedulerは渡さない
    def create_s3_transfer_job(self, description: str,
                               source_bucket: str, aws_access_key_id: str,
                               aws_secret_access_key: str,
                               sink_bucket: str) -> str:

        client = storage_transfer.StorageTransferServiceClient()

        transfer_job_request = storage_transfer.CreateTransferJobRequest({
            'transfer_job': {
                'project_id': self.project,
                'description': description,
                'status': storage_transfer.TransferJob.Status.ENABLED,
                'transfer_spec': {
                    'aws_s3_data_source': {
                        'bucket_name': source_bucket,
                        'aws_access_key': {
                            'access_key_id': aws_access_key_id,
                            'secret_access_key': aws_secret_access_key,
                        }
                    },
                    'gcs_data_sink': {
                        'bucket_name': sink_bucket,
                    }
                }
            }
        })

        result = client.create_transfer_job(transfer_job_request)
        return result.name

    # ジョブの実行用
    # ジョブの作成処理で作ったジョブのjob_name(transferJobs/xxxxx)を使用する
    def run_s3_transfer_job(self, job_name: str) -> None:
        client = storage_transfer.StorageTransferServiceClient()
        request = storage_transfer.RunTransferJobRequest(
            job_name=job_name,
            project_id=self.project,
        )
        operation = client.run_transfer_job(request=request)
        logging.info("Waiting for operation to complete...")
        response = operation.result()
        logging.info(response)
        return

# jobを作成して、一度だけ実行してみる
if __name__ == '__main__':
    service = StorageTransferServiceUtil(YOUR_PROJECT)
    name = service.create_s3_transfer_job(
        description="job_description",
        source_bucket=AWS_BUCKET,
        aws_access_key_id=YOUR_AWS_ACCESS_KEY,
        aws_secret_access_key=YOUR_AWS_SECRET,
        sink_bucket=GCS_BUCKET
    )
    service.run_s3_transfer_job(name)
    # 作成ずみのnameを使用すれば、pythonでtransfer_jobの実行をschedulingできる

参照

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?