はじめに
Oracle SQL Access to Kafka(OSaK)は、SQLでKafkaトピックに動的にクエリを実行できるOracle Database 23ai Freeの新機能です。
概要やユースケースはこちらを参照ください。
Oracle SQL Access to Kafkaには主に3つの機能があります。
本記事では、その中でもKafkaレコードをSQLやPL/SQLを使って順番にループ処理するストリーミング機能の使用手順を紹介します。
前提条件
- Oracle Database 23ai環境
- Kafkaクラスタを作成していること
本記事ではOracle Cloud Infrastructure(OCI) 上で提供されるフルマネージドな分散メッセージングサービスである、OCI Streaming Service(OSS)を使ってKafkaクラスタを作成しています。
OSSについては、OCI Streaming を動かしてみようというチュートリアルが参考になります。
Kafkaクラスタの登録
まずはKafkaクラスタの登録をする必要があります。
この手順については、こちらの記事を参照ください。
KafkaデータをPL/SQLで順番にループ処理するストリーミング・アプリケーションの作成
DBMS_KAFKAパッケージを使ってストリーミング・アプリケーションを作成することで、登録したKafkaクラスタのレコードをロードする一時表やビューが自動的に作成されます。
作成された一時表を使用したSQLクエリが、Kafkaレコードに順次アクセスし、ループ内で処理していくようなストリーミング・アプリケーションを実行することができます。
-
ロードするKafkaレコードの形式を決定する表を作成します。今回は1桁の数字がストリーミングするので、NUMBER型の1列のみを持つ表を作成します。
create table sample(col_num number);
-
ストリーミング・アプリケーションSampleStreamAppを作成します。
※OSaKsumStreamというトピック名を指定しておきます。(OCI Streaming Serviceのストリーム名)DECLARE v_options VARCHAR2(50); BEGIN v_options := '{"fmt" : "DSV", "reftable" : "sample"}'; DBMS_KAFKA.CREATE_STREAMING_APP ( 'KAFKACLUS1', 'SampleStreamApp', 'OSaKsumStream', v_options, 1); END; /
-
ここで内部的に作成された一時表を確認してみます。Oracle SQL Access to Kafkaで作成されたオブジェクトは、固有の
ORA$
接頭辞を持っています。
ORA$DKX
とORA$DKV
は Oracle SQL Access to Kafka が生成したビューと外部表の接頭辞で、Kafka からユーザー所有のテーブルまたはグローバル一時表にデータをロードするための DBMS_KAFKA の呼び出しに対応しています。ORA$DKVGTT
は、ストリーミングまたはシーキングアプリからロードされるグローバル一時表であることを指定する接頭辞です。この一時表は、DBMS_KAFKA.LOAD_TEMP_TABLEを呼び出すと透過的にロードされます。select object_name, object_type from user_objects where object_name like '%ORA$DK%';
OBJECT_NAME OBJECT_TYPE -------------------------------- --------------- ORA$DKVGTT_KAFKACLUS1_SAMPLESTREAMAPP_0 TABLE ORA$DKV_KAFKACLUS1_PARTITIONS VIEW ORA$DKV_KAFKACLUS1_SAMPLESTREAMAPP_0 VIEW ORA$DKX_KAFKACLUS1_PARTITIONS TABLE ORA$DKX_KAFKACLUS1_SAMPLESTREAMAPP TABLE ORA$DKX_KAFKACLUS1_SAMPLESTREAMAPP TABLE PARTITION 6 rows selected.
-
実際にKafkaレコードを処理するストリーミング・アプリケーションを実行してみます。今回は、単純にロードしたKafkaレコードを順番に足していき、SUMが正しい値になったらOFFSETを更新し、COMMITするアプリケーションにします。
出力結果を確認するため、serveroutputをonにしておきます。set serveroutput on
DECLARE record_sum number := 0; tmp_num number; BEGIN --Kafkaレコードをグローバル一時表にロード DBMS_KAFKA.LOAD_TEMP_TABLE('ORA$DKVGTT_KAFKACLUS1_SampleStreamAPP_0'); --ロードした行数(レコード数)だけループ FOR kafka_record IN (SELECT KAFKA_OFFSET offset FROM ORA$DKVGTT_KAFKACLUS1_SampleStreamAPP_0) LOOP DBMS_OUTPUT.PUT_LINE('Processing record offset:'|| kafka_record.offset); SELECT COL_NUM INTO tmp_num FROM ORA$DKVGTT_KAFKACLUS1_SampleStreamAPP_0 WHERE KAFKA_OFFSET = kafka_record.offset; record_sum := record_sum + tmp_num; DBMS_OUTPUT.PUT_LINE('record_sum:'|| record_sum); END LOOP; IF record_sum = 45 THEN DBMS_OUTPUT.PUT_LINE(record_sum); DBMS_KAFKA.UPDATE_OFFSET('ORA$DKV_KAFKACLUS1_SampleStreamAPP_0'); COMMIT; --ELSE --エラー処理を記述 END IF; END; /
本来は、LOAD_TEMP_TABLEもLOOPさせて常に発生しているレコードを処理し続けるアプリケーションが多いと思いますが、今回は
record_sum
が45になったら終了するようにしました。 -
メッセージをPublishします。
$KAFKA_HOME/bin/kafka-console-producer.sh \ --bootstrap-server cell-1.streaming.ap-Tokyo-1.oci.oraclecloud.com:9092 \ --topic OSaKsumStream \ --producer.config $KAFKA_HOME/config/producer.properties >1 >2 >3 >4 >5 >6 >7 >8 >9
-
実行結果を確認してみます。
Processing record offset:0 record_sum:1 Processing record offset:1 record_sum:3 Processing record offset:2 record_sum:6 Processing record offset:3 record_sum:10 Processing record offset:4 record_sum:15 Processing record offset:5 record_sum:21 Processing record offset:6 record_sum:28 Processing record offset:7 record_sum:36 Processing record offset:8 record_sum:45 45 Number of Kafka records processed = 9 PL/SQL procedure successfully completed.
1レコードずつ足されてrecord_sumが45となり、9レコードを処理した、と表示がされています。今回はコンソールからの手入力で検証を行いましたが、実際にProducerアプリを作成してメッセージをPublish⇒Oracle SQL Access to Kafkaのストリーミング・アプリケーションで処理することもできます。