4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Autonomous DatabaseからOCI Streamingのストリームに直接アクセスしてみた

Posted at

はじめに

PL/SQL SDKを使用して、Autonomous DatabaseからOCI Streamingのストリームに直接アクセスできるか検証してみました。

※こちらの記事の内容はあくまで個人の実験メモ的な内容のため、こちらの内容を利用した場合のトラブルには一切責任を負いません。
また、こちらの記事の内容を元にしたOracleサポートへの問い合わせはご遠慮ください。

1. リソース・プリンシパルの有効化

リソース・プリンシパルを使用すると、Autonomous Databaseに対して、OCIリソースに対するアクセスを許可することができます。
リソース・プリンシパルを使用する手順は、以下の通りです。

1.対象となるAutonomous Databaseを含む動的グループを作成
2.動的グループに対して、OCIリソースへのアクセスを許可するポリシーを作成
3.adminユーザとしてAutonomous Databaseに接続し、リソース・プリンシパルを有効化

まずはじめに、使用するAutonomous Databaseを含む動的グループを作成します。
ここでは、「ADB-dg」という名前の動的グループを作成しました。
一致ルールには、単一のAutonomous Databaseにマッチするルールを以下のように設定しました

All {resource.id = 'Autonomous DatabaseのOCID'}

スクリーンショット 2025-02-26 9.38.58.png

スクリーンショット 2025-02-26 9.39.10.png

次に、作成した動的グループに対してOCI Streamigの使用を許可するポリシーを作成します。
ここでは、テナンシ内の全てのストリームの利用を許可するポリシー「ADB-use-streams」をrootコンパートメントに作成しました。
ポリシー・ステートメントは、以下のように設定しました。

allow dynamic-group ADB-dg to use streams in tenancy

スクリーンショット 2025-02-26 9.55.31.png

スクリーンショット 2025-02-26 9.55.48.png

最後に、adminユーザとしてAutonomous Databaseに接続し、DBMS_CLOUD_ADMIN.ENABLE_RESOURCE_PRINCIPALプロシージャを実行して、リソース・プリンシパルを有効化します。

EXEC DBMS_CLOUD_ADMIN.ENABLE_RESOURCE_PRINCIPAL();
SQL> EXEC DBMS_CLOUD_ADMIN.ENABLE_RESOURCE_PRINCIPAL();

PL/SQL procedure successfully completed.

SQL>

adminユーザ以外にリソース・プリンシパルの使用を許可する場合には、リソース・プリンシパルを有効化したあとに、以下のようにユーザ名を指定してDBMS_CLOUD_ADMIN.ENABLE_RESOURCE_PRINCIPALプロシージャを実行します。

EXEC DBMS_CLOUD_ADMIN.ENABLE_RESOURCE_PRINCIPAL(username => 'ADBユーザ名');
SQL> EXEC DBMS_CLOUD_ADMIN.ENABLE_RESOURCE_PRINCIPAL(username => 'adbuser');

PL/SQL procedure successfully completed.

SQL>

2. ストリームの作成

OCIコンソールで「ストリーミング」に移動します。
スクリーンショット 2025-02-25 23.42.04.png

「ストリームの作成」をクリックします。
スクリーンショット 2025-02-25 23.39.15.png

今回は「MyTestStream」という名前のストリームを作成しました。
スクリーンショット 2025-02-25 23.19.08.png

スクリーンショット 2025-02-25 23.19.27.png

ストリームが作成できたら、ストリームのOCIDをメモしておきます。
スクリーンショット 2025-02-25 23.19.27のコピー.png

3. Autonomous DatabaseからOCI Streamingにメッセージをパブリッシュする

今回は、ストリームのOCIDおよびメッセージに含めるキーとその値をパラメータとして渡すと、指定したストリームにメッセージをパブリッシュするPL/SQLプロシージャ「put_stream」を作成しました。

put_streamプロシージャ
CREATE OR REPLACE PROCEDURE put_stream( stream_id IN VARCHAR2, key IN VARCHAR2, value IN VARCHAR2 )
IS
    key_base64     VARCHAR2(32767);
    value_base64   VARCHAR2(32767);
	target_region  VARCHAR2(200);

    put_message_detail            dbms_cloud_oci_streaming_put_messages_details_t := dbms_cloud_oci_streaming_put_messages_details_t();
	put_message_detail_entry_tbl  dbms_cloud_oci_streaming_put_messages_details_entry_tbl := dbms_cloud_oci_streaming_put_messages_details_entry_tbl();
    put_message_detail_entry_t    dbms_cloud_oci_streaming_put_messages_details_entry_t := dbms_cloud_oci_streaming_put_messages_details_entry_t();
    put_message_res               dbms_cloud_oci_st_stream_put_messages_response_t;

BEGIN
    DBMS_OUTPUT.PUT_LINE('Key: '||key);
    DBMS_OUTPUT.PUT_LINE('Value: '||value);

-- keyおよびvalueの値をBase64でエンコード
    key_base64   := utl_raw.cast_to_varchar2(utl_encode.base64_encode(utl_raw.cast_to_raw(key)));
	value_base64 := utl_raw.cast_to_varchar2(utl_encode.base64_encode(utl_raw.cast_to_raw(value)));

    DBMS_OUTPUT.PUT_LINE('Key(Base64): '||key_base64);
    DBMS_OUTPUT.PUT_LINE('Value(Base64): '||value_base64);

-- ADBのリージョン、OCIDを取得
    SELECT json_value(cloud_identity, '$.REGION') INTO target_region FROM v$pdbs;

-- ストリームに書き込むメッセージを組み立てる
    put_message_detail_entry_t.key   := key_base64;
    put_message_detail_entry_t.value := value_base64;

    put_message_detail_entry_tbl.extend();
    put_message_detail_entry_tbl(put_message_detail_entry_tbl.last)  := put_message_detail_entry_t;
    put_message_detail.messages  := put_message_detail_entry_tbl;
    
-- ストリームにメッセージを書き込む
    put_message_res := DBMS_CLOUD_OCI_ST_STREAM.PUT_MESSAGES(
                           stream_id => stream_id,
                           put_messages_details => put_message_detail,
                           region => target_region,
                           credential_name => 'OCI$RESOURCE_PRINCIPAL'
                       );

    DBMS_OUTPUT.PUT_LINE('Failures:'||put_message_res.response_body.failures);

END;
/

パラメータとして、2.で作成したストリームのOCID、keyとしてKey1、valueとしてValue1を渡して、プロシージャ「put_stream」を実行します。

SQL> EXEC put_stream('ocid1.stream.oc1.ap-tokyo-1.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx','Key1','Value1')
Key: Key1
Value: Value1
Key(Base64): S2V5MQ==
Value(Base64): VmFsdWUx
Failures:0

PL/SQL procedure successfully completed.

SQL> 

パラメータとして、2.で作成したストリームのOCID、keyとしてKey2、valueとしてValue2を渡して、プロシージャ「put_stream」を実行します。

SQL> EXEC put_stream('ocid1.stream.oc1.ap-tokyo-1.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx','Key2','Value2')
Key: Key2
Value: Value2
Key(Base64): S2V5Mg==
Value(Base64): VmFsdWUy
Failures:0

PL/SQL procedure successfully completed.

SQL> 

パラメータとして、2.で作成したストリームのOCID、keyとしてKey3、valueとしてValue3を渡して、プロシージャ「put_stream」を実行します。

SQL> EXEC put_stream('ocid1.stream.oc1.ap-tokyo-1.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx','Key3','Value3')
Key: Key3
Value: Value3
Key(Base64): S2V5Mw==
Value(Base64): VmFsdWUz
Failures:0

PL/SQL procedure successfully completed.

SQL>

OCIコンソールのストリームの詳細画面で、「メッセージのロード」をクリックして、最近のメッセージを確認します。
(過去1分間にパブリッシュされたメッセージのみしか表示されないので注意が必要です)
スクリーンショット 2025-02-25 23.20.27.png

Autonomous Databaseからストリームに直接メッセージがパブリッシュされたことを確認できました。

4. OCI Streamingからメッセージを受信する

今回は、ストリームのOCIDおよびパーティションとカーソルのタイプをパラメータとして渡すと、指定したストリームからメッセージを受信するPL/SQLプロシージャ「consume_stream」を作成しました。

consume_streamプロシージャ
CREATE OR REPLACE PROCEDURE consume_stream( stream_id IN VARCHAR2, partition IN VARCHAR2, l_type IN VARCHAR2 )
IS
	target_region   VARCHAR2(200);
    l_cursor_value  VARCHAR2(32767);

    cursor_detail               dbms_cloud_oci_streaming_create_cursor_details_t := dbms_cloud_oci_streaming_create_cursor_details_t();
	create_cursor_res           dbms_cloud_oci_st_stream_create_cursor_response_t;
    create_cursor_res_body      dbms_cloud_oci_streaming_cursor_t;
    get_messages_res            dbms_cloud_oci_st_stream_get_messages_response_t;
    get_messages_res_body       dbms_cloud_oci_streaming_message_tbl;

    key_base64    VARCHAR2(32767);
    value_base64  VARCHAR2(32767);
    key           VARCHAR2(32767);
    value         VARCHAR2(32767);

BEGIN
-- ADBのリージョン、OCIDを取得
    SELECT json_value(cloud_identity, '$.REGION') INTO target_region FROM v$pdbs;

-- カーソル作成のパラメータを設定
    cursor_detail.partition := partition;
    cursor_detail.l_type    := l_type;
	
-- カーソルの作成
    create_cursor_res := DBMS_CLOUD_OCI_ST_STREAM.CREATE_CURSOR(
                             stream_id => stream_id,
                             create_cursor_details => cursor_detail,
                             region => target_region,
                             credential_name => 'OCI$RESOURCE_PRINCIPAL'
                         );

-- カーソルを取得
    create_cursor_res_body := create_cursor_res.response_body;
    l_cursor_value         := create_cursor_res_body.value;

-- ストリームからメッセージを取得
    get_messages_res := DBMS_CLOUD_OCI_ST_STREAM.GET_MESSAGES(
	                        stream_id => stream_id,
                            l_cursor  => l_cursor_value,
                            region => target_region,
                            credential_name => 'OCI$RESOURCE_PRINCIPAL'
                        );
-- responseからresponse_bodyを取得
    get_messages_res_body := get_messages_res.response_body;
	
  -- 取得したresponse_bodyを元にループ
    FOR i IN get_messages_res_body.first..get_messages_res_body.last LOOP
  -- keyおよびvalueの値(Base64)を取得
        key_base64   := get_messages_res_body(i).key;
        value_base64 := get_messages_res_body(i).value;

  -- keyおよびvalueの値(Base64)をデコード
        key    := utl_raw.cast_to_varchar2(utl_encode.base64_decode(utl_raw.cast_to_raw(key_base64)));
        value  := utl_raw.cast_to_varchar2(utl_encode.base64_decode(utl_raw.cast_to_raw(value_base64)));

        DBMS_OUTPUT.PUT_LINE('Key: '||key||'  Value:'||value);
    END LOOP;
END;
/

ストリームとして2.で作成したストリームのOCID、パーティションとして「0」、カーソルのタイプとして「TRIM_HORIZON」を渡して、プロシージャ「consume_stream」を実行します。

SQL> EXEC consume_stream('ocid1.stream.oc1.ap-tokyo-1.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx','0','TRIM_HORIZON');
Key: Key1  Value:Value1
Key: Key2  Value:Value2
Key: Key3  Value:Value3

PL/SQL procedure successfully completed.

SQL>

Autonomous Databaseにストリームから直接メッセージを受信できることを確認できました。

参考情報

リソース・プリンシパルを使用したOracle Cloud Infrastructureリソースへのアクセス
ポリシー:ストリーミング・サービスの詳細
DBMS_CLOUD_OCI_ST_STREAM.CREATE_CURSOR Function
DBMS_CLOUD_OCI_ST_STREAM.PUT_MESSAGES Function
DBMS_CLOUD_OCI_ST_STREAM.GET_MESSAGES Function

4
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?