OCI FunctionsでPythonを使った場合の、オブジェクトストレージを操作する方法について説明します。
ここで紹介するコードを、fn init
コマンドで生成されるfunc.py
に追加してください(あくまでとりあえず動かす上での話です)。
必要なポリシー
Allow dynamic-group <dynamic-group-name> to manage objects in compartment <compartment-name> where target.bucket.name='<target-bucket-name>'
requirements.txt
requirements.txtは以下です。デフォルトの物にoci
を追加してください。
fdk>=0.1.75
oci
クライアントのセットアップ
下記のような感じでオブジェクトストレージを操作するためのクライアントをセットアップします。
import oci
from oci.auth.signers import get_resource_principals_signer
# OCI Object Storageクライアントのセットアップ
def get_oci_client():
try:
signer = get_resource_principals_signer()
object_storage_client = oci.object_storage.ObjectStorageClient(config={}, signer=signer)
namespace = object_storage_client.get_namespace().data
return object_storage_client, namespace
except Exception as e:
print(f"Error in get_oci_client: {str(e)}", file=sys.stderr, flush=True)
raise
object_storage_client, namespace = get_oci_client()
バケットの作成
上記クライアントを使って、バケットを作成するには下のようにします。
# バケットが存在しない場合に作成
def create_bucket_if_not_exists(bucket_name):
try:
object_storage_client.head_bucket(namespace, bucket_name)
except oci.exceptions.ServiceError as e:
if e.status == 404:
try:
request = oci.object_storage.models.CreateBucketDetails()
request.name = bucket_name
request.compartment_id = COMPARTMENT_ID
object_storage_client.create_bucket(namespace, request)
print(f"Bucket '{bucket_name}' created", flush=True)
except Exception as e:
print(f"Error in create_bucket_if_not_exists (creating bucket): {str(e)}", file=sys.stderr, flush=True)
raise
else:
print(f"Error in create_bucket_if_not_exists (head bucket): {str(e)}", file=sys.stderr, flush=True)
raise
create_bucket_if_not_exists(BUCKET_NAME)
バケットにファイルのアップロード
ファイルのアップロードは以下の様に行えます。
# OCI Object Storageにファイルをアップロードする関数
def upload_to_oci_object_storage(object_storage_client, namespace, bucket_name, object_name, file_content):
try:
response = object_storage_client.put_object(
namespace,
bucket_name,
object_name,
file_content
)
print(f"File uploaded successfully: {response}", flush=True)
return f"File uploaded to OCI Object Storage at {object_name}"
except oci.exceptions.ServiceError as e:
print(f"Error uploading to Object Storage: {str(e)}", file=sys.stderr, flush=True)
return f"Error uploading to Object Storage: {str(e)}"
バケットのファイルをダウンロード
ダウンロードは以下の様に行えます。
# ファイルをダウンロードして確認する関数
def download_from_oci(object_storage_client, namespace, bucket_name, object_name):
try:
response = object_storage_client.get_object(namespace, bucket_name, object_name)
downloaded_file_data = response.data.content
# ダウンロードしたファイルの型とサイズを確認
print(f"Downloaded file size: {len(downloaded_file_data)} bytes", flush=True)
print(f"Downloaded file type: {type(downloaded_file_data)}", flush=True)
return downloaded_file_data
except oci.exceptions.ServiceError as e:
print(f"Error downloading from Object Storage: {str(e)}", file=sys.stderr, flush=True)
return None