0
0

OCIのオブジェクトストレージをOCI Functionsから操作する

Posted at

OCI FunctionsでPythonを使った場合の、オブジェクトストレージを操作する方法について説明します。
ここで紹介するコードを、fn initコマンドで生成されるfunc.pyに追加してください(あくまでとりあえず動かす上での話です)。

必要なポリシー

Allow dynamic-group <dynamic-group-name> to manage objects in compartment <compartment-name> where target.bucket.name='<target-bucket-name>'

requirements.txt

requirements.txtは以下です。デフォルトの物にociを追加してください。

fdk>=0.1.75
oci

クライアントのセットアップ

下記のような感じでオブジェクトストレージを操作するためのクライアントをセットアップします。

import oci
from oci.auth.signers import get_resource_principals_signer

# OCI Object Storageクライアントのセットアップ
def get_oci_client():
    try:
        signer = get_resource_principals_signer()
        object_storage_client = oci.object_storage.ObjectStorageClient(config={}, signer=signer)
        namespace = object_storage_client.get_namespace().data
        return object_storage_client, namespace
    except Exception as e:
        print(f"Error in get_oci_client: {str(e)}", file=sys.stderr, flush=True)
        raise

object_storage_client, namespace = get_oci_client()

バケットの作成

上記クライアントを使って、バケットを作成するには下のようにします。

# バケットが存在しない場合に作成
def create_bucket_if_not_exists(bucket_name):
    try:
        object_storage_client.head_bucket(namespace, bucket_name)
    except oci.exceptions.ServiceError as e:
        if e.status == 404:
            try:
                request = oci.object_storage.models.CreateBucketDetails()
                request.name = bucket_name
                request.compartment_id = COMPARTMENT_ID
                object_storage_client.create_bucket(namespace, request)
                print(f"Bucket '{bucket_name}' created", flush=True)
            except Exception as e:
                print(f"Error in create_bucket_if_not_exists (creating bucket): {str(e)}", file=sys.stderr, flush=True)
                raise
        else:
            print(f"Error in create_bucket_if_not_exists (head bucket): {str(e)}", file=sys.stderr, flush=True)
            raise

create_bucket_if_not_exists(BUCKET_NAME)

バケットにファイルのアップロード

ファイルのアップロードは以下の様に行えます。

# OCI Object Storageにファイルをアップロードする関数
def upload_to_oci_object_storage(object_storage_client, namespace, bucket_name, object_name, file_content):
    try:
        response = object_storage_client.put_object(
            namespace,
            bucket_name,
            object_name,
            file_content
        )
        print(f"File uploaded successfully: {response}", flush=True)
        return f"File uploaded to OCI Object Storage at {object_name}"
    except oci.exceptions.ServiceError as e:
        print(f"Error uploading to Object Storage: {str(e)}", file=sys.stderr, flush=True)
        return f"Error uploading to Object Storage: {str(e)}"

バケットのファイルをダウンロード

ダウンロードは以下の様に行えます。

# ファイルをダウンロードして確認する関数
def download_from_oci(object_storage_client, namespace, bucket_name, object_name):
    try:
        response = object_storage_client.get_object(namespace, bucket_name, object_name)
        downloaded_file_data = response.data.content
        
        # ダウンロードしたファイルの型とサイズを確認
        print(f"Downloaded file size: {len(downloaded_file_data)} bytes", flush=True)
        print(f"Downloaded file type: {type(downloaded_file_data)}", flush=True)
        
        return downloaded_file_data
    except oci.exceptions.ServiceError as e:
        print(f"Error downloading from Object Storage: {str(e)}", file=sys.stderr, flush=True)
        return None
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0