はじめに
PL/SQL SDKを使用して、Autonomous DatabaseからOCI Streamingのストリームに直接アクセスできるか検証してみました。
※こちらの記事の内容はあくまで個人の実験メモ的な内容のため、こちらの内容を利用した場合のトラブルには一切責任を負いません。
また、こちらの記事の内容を元にしたOracleサポートへの問い合わせはご遠慮ください。
1. リソース・プリンシパルの有効化
リソース・プリンシパルを使用すると、Autonomous Databaseに対して、OCIリソースに対するアクセスを許可することができます。
リソース・プリンシパルを使用する手順は、以下の通りです。
1.対象となるAutonomous Databaseを含む動的グループを作成
2.動的グループに対して、OCIリソースへのアクセスを許可するポリシーを作成
3.adminユーザとしてAutonomous Databaseに接続し、リソース・プリンシパルを有効化
まずはじめに、使用するAutonomous Databaseを含む動的グループを作成します。
ここでは、「ADB-dg」という名前の動的グループを作成しました。
一致ルールには、単一のAutonomous Databaseにマッチするルールを以下のように設定しました
All {resource.id = 'Autonomous DatabaseのOCID'}
次に、作成した動的グループに対してOCI Streamigの使用を許可するポリシーを作成します。
ここでは、テナンシ内の全てのストリームの利用を許可するポリシー「ADB-use-streams」をrootコンパートメントに作成しました。
ポリシー・ステートメントは、以下のように設定しました。
allow dynamic-group ADB-dg to use streams in tenancy
最後に、adminユーザとしてAutonomous Databaseに接続し、DBMS_CLOUD_ADMIN.ENABLE_RESOURCE_PRINCIPALプロシージャを実行して、リソース・プリンシパルを有効化します。
EXEC DBMS_CLOUD_ADMIN.ENABLE_RESOURCE_PRINCIPAL();
SQL> EXEC DBMS_CLOUD_ADMIN.ENABLE_RESOURCE_PRINCIPAL();
PL/SQL procedure successfully completed.
SQL>
adminユーザ以外にリソース・プリンシパルの使用を許可する場合には、リソース・プリンシパルを有効化したあとに、以下のようにユーザ名を指定してDBMS_CLOUD_ADMIN.ENABLE_RESOURCE_PRINCIPALプロシージャを実行します。
EXEC DBMS_CLOUD_ADMIN.ENABLE_RESOURCE_PRINCIPAL(username => 'ADBユーザ名');
SQL> EXEC DBMS_CLOUD_ADMIN.ENABLE_RESOURCE_PRINCIPAL(username => 'adbuser');
PL/SQL procedure successfully completed.
SQL>
2. ストリームの作成
今回は「MyTestStream」という名前のストリームを作成しました。
ストリームが作成できたら、ストリームのOCIDをメモしておきます。
3. Autonomous DatabaseからOCI Streamingにメッセージをパブリッシュする
今回は、ストリームのOCIDおよびメッセージに含めるキーとその値をパラメータとして渡すと、指定したストリームにメッセージをパブリッシュするPL/SQLプロシージャ「put_stream」を作成しました。
CREATE OR REPLACE PROCEDURE put_stream( stream_id IN VARCHAR2, key IN VARCHAR2, value IN VARCHAR2 )
IS
key_base64 VARCHAR2(32767);
value_base64 VARCHAR2(32767);
target_region VARCHAR2(200);
put_message_detail dbms_cloud_oci_streaming_put_messages_details_t := dbms_cloud_oci_streaming_put_messages_details_t();
put_message_detail_entry_tbl dbms_cloud_oci_streaming_put_messages_details_entry_tbl := dbms_cloud_oci_streaming_put_messages_details_entry_tbl();
put_message_detail_entry_t dbms_cloud_oci_streaming_put_messages_details_entry_t := dbms_cloud_oci_streaming_put_messages_details_entry_t();
put_message_res dbms_cloud_oci_st_stream_put_messages_response_t;
BEGIN
DBMS_OUTPUT.PUT_LINE('Key: '||key);
DBMS_OUTPUT.PUT_LINE('Value: '||value);
-- keyおよびvalueの値をBase64でエンコード
key_base64 := utl_raw.cast_to_varchar2(utl_encode.base64_encode(utl_raw.cast_to_raw(key)));
value_base64 := utl_raw.cast_to_varchar2(utl_encode.base64_encode(utl_raw.cast_to_raw(value)));
DBMS_OUTPUT.PUT_LINE('Key(Base64): '||key_base64);
DBMS_OUTPUT.PUT_LINE('Value(Base64): '||value_base64);
-- ADBのリージョン、OCIDを取得
SELECT json_value(cloud_identity, '$.REGION') INTO target_region FROM v$pdbs;
-- ストリームに書き込むメッセージを組み立てる
put_message_detail_entry_t.key := key_base64;
put_message_detail_entry_t.value := value_base64;
put_message_detail_entry_tbl.extend();
put_message_detail_entry_tbl(put_message_detail_entry_tbl.last) := put_message_detail_entry_t;
put_message_detail.messages := put_message_detail_entry_tbl;
-- ストリームにメッセージを書き込む
put_message_res := DBMS_CLOUD_OCI_ST_STREAM.PUT_MESSAGES(
stream_id => stream_id,
put_messages_details => put_message_detail,
region => target_region,
credential_name => 'OCI$RESOURCE_PRINCIPAL'
);
DBMS_OUTPUT.PUT_LINE('Failures:'||put_message_res.response_body.failures);
END;
/
パラメータとして、2.で作成したストリームのOCID、keyとしてKey1、valueとしてValue1を渡して、プロシージャ「put_stream」を実行します。
SQL> EXEC put_stream('ocid1.stream.oc1.ap-tokyo-1.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx','Key1','Value1')
Key: Key1
Value: Value1
Key(Base64): S2V5MQ==
Value(Base64): VmFsdWUx
Failures:0
PL/SQL procedure successfully completed.
SQL>
パラメータとして、2.で作成したストリームのOCID、keyとしてKey2、valueとしてValue2を渡して、プロシージャ「put_stream」を実行します。
SQL> EXEC put_stream('ocid1.stream.oc1.ap-tokyo-1.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx','Key2','Value2')
Key: Key2
Value: Value2
Key(Base64): S2V5Mg==
Value(Base64): VmFsdWUy
Failures:0
PL/SQL procedure successfully completed.
SQL>
パラメータとして、2.で作成したストリームのOCID、keyとしてKey3、valueとしてValue3を渡して、プロシージャ「put_stream」を実行します。
SQL> EXEC put_stream('ocid1.stream.oc1.ap-tokyo-1.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx','Key3','Value3')
Key: Key3
Value: Value3
Key(Base64): S2V5Mw==
Value(Base64): VmFsdWUz
Failures:0
PL/SQL procedure successfully completed.
SQL>
OCIコンソールのストリームの詳細画面で、「メッセージのロード」をクリックして、最近のメッセージを確認します。
(過去1分間にパブリッシュされたメッセージのみしか表示されないので注意が必要です)
Autonomous Databaseからストリームに直接メッセージがパブリッシュされたことを確認できました。
4. OCI Streamingからメッセージを受信する
今回は、ストリームのOCIDおよびパーティションとカーソルのタイプをパラメータとして渡すと、指定したストリームからメッセージを受信するPL/SQLプロシージャ「consume_stream」を作成しました。
CREATE OR REPLACE PROCEDURE consume_stream( stream_id IN VARCHAR2, partition IN VARCHAR2, l_type IN VARCHAR2 )
IS
target_region VARCHAR2(200);
l_cursor_value VARCHAR2(32767);
cursor_detail dbms_cloud_oci_streaming_create_cursor_details_t := dbms_cloud_oci_streaming_create_cursor_details_t();
create_cursor_res dbms_cloud_oci_st_stream_create_cursor_response_t;
create_cursor_res_body dbms_cloud_oci_streaming_cursor_t;
get_messages_res dbms_cloud_oci_st_stream_get_messages_response_t;
get_messages_res_body dbms_cloud_oci_streaming_message_tbl;
key_base64 VARCHAR2(32767);
value_base64 VARCHAR2(32767);
key VARCHAR2(32767);
value VARCHAR2(32767);
BEGIN
-- ADBのリージョン、OCIDを取得
SELECT json_value(cloud_identity, '$.REGION') INTO target_region FROM v$pdbs;
-- カーソル作成のパラメータを設定
cursor_detail.partition := partition;
cursor_detail.l_type := l_type;
-- カーソルの作成
create_cursor_res := DBMS_CLOUD_OCI_ST_STREAM.CREATE_CURSOR(
stream_id => stream_id,
create_cursor_details => cursor_detail,
region => target_region,
credential_name => 'OCI$RESOURCE_PRINCIPAL'
);
-- カーソルを取得
create_cursor_res_body := create_cursor_res.response_body;
l_cursor_value := create_cursor_res_body.value;
-- ストリームからメッセージを取得
get_messages_res := DBMS_CLOUD_OCI_ST_STREAM.GET_MESSAGES(
stream_id => stream_id,
l_cursor => l_cursor_value,
region => target_region,
credential_name => 'OCI$RESOURCE_PRINCIPAL'
);
-- responseからresponse_bodyを取得
get_messages_res_body := get_messages_res.response_body;
-- 取得したresponse_bodyを元にループ
FOR i IN get_messages_res_body.first..get_messages_res_body.last LOOP
-- keyおよびvalueの値(Base64)を取得
key_base64 := get_messages_res_body(i).key;
value_base64 := get_messages_res_body(i).value;
-- keyおよびvalueの値(Base64)をデコード
key := utl_raw.cast_to_varchar2(utl_encode.base64_decode(utl_raw.cast_to_raw(key_base64)));
value := utl_raw.cast_to_varchar2(utl_encode.base64_decode(utl_raw.cast_to_raw(value_base64)));
DBMS_OUTPUT.PUT_LINE('Key: '||key||' Value:'||value);
END LOOP;
END;
/
ストリームとして2.で作成したストリームのOCID、パーティションとして「0」、カーソルのタイプとして「TRIM_HORIZON」を渡して、プロシージャ「consume_stream」を実行します。
SQL> EXEC consume_stream('ocid1.stream.oc1.ap-tokyo-1.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx','0','TRIM_HORIZON');
Key: Key1 Value:Value1
Key: Key2 Value:Value2
Key: Key3 Value:Value3
PL/SQL procedure successfully completed.
SQL>
Autonomous Databaseにストリームから直接メッセージを受信できることを確認できました。
参考情報
・リソース・プリンシパルを使用したOracle Cloud Infrastructureリソースへのアクセス
・ポリシー:ストリーミング・サービスの詳細
・DBMS_CLOUD_OCI_ST_STREAM.CREATE_CURSOR Function
・DBMS_CLOUD_OCI_ST_STREAM.PUT_MESSAGES Function
・DBMS_CLOUD_OCI_ST_STREAM.GET_MESSAGES Function