1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Oracle SQL Access to Kafkaを使ってKafkaレコードをSQLとPL/SQLで処理する

Last updated at Posted at 2023-06-08

はじめに

Oracle SQL Access to Kafka(OSaK)は、SQLでKafkaトピックに動的にクエリを実行できるOracle Database 23ai Freeの新機能です。

概要やユースケースはこちらを参照ください。

Oracle SQL Access to Kafkaには主に3つの機能があります。
本記事では、その中でもKafkaレコードをSQLやPL/SQLを使って順番にループ処理するストリーミング機能の使用手順を紹介します。

前提条件

  • Oracle Database 23ai環境
  • Kafkaクラスタを作成していること
    本記事ではOracle Cloud Infrastructure(OCI) 上で提供されるフルマネージドな分散メッセージングサービスである、OCI Streaming Service(OSS)を使ってKafkaクラスタを作成しています。
    OSSについては、OCI Streaming を動かしてみようというチュートリアルが参考になります。

Kafkaクラスタの登録

まずはKafkaクラスタの登録をする必要があります。
この手順については、こちらの記事を参照ください。

KafkaデータをPL/SQLで順番にループ処理するストリーミング・アプリケーションの作成

DBMS_KAFKAパッケージを使ってストリーミング・アプリケーションを作成することで、登録したKafkaクラスタのレコードをロードする一時表やビューが自動的に作成されます。
作成された一時表を使用したSQLクエリが、Kafkaレコードに順次アクセスし、ループ内で処理していくようなストリーミング・アプリケーションを実行することができます。

  1. ロードするKafkaレコードの形式を決定する表を作成します。今回は1桁の数字がストリーミングするので、NUMBER型の1列のみを持つ表を作成します。

    create table sample(col_num number);
    
  2. ストリーミング・アプリケーションSampleStreamAppを作成します。
    ※OSaKsumStreamというトピック名を指定しておきます。(OCI Streaming Serviceのストリーム名)

    DECLARE
    v_options  VARCHAR2(50);
    BEGIN
      v_options := '{"fmt" : "DSV", "reftable" : "sample"}';
      DBMS_KAFKA.CREATE_STREAMING_APP (
                'KAFKACLUS1',
                'SampleStreamApp',
                'OSaKsumStream',
                v_options, 
                1);
    END;
    /
    
  3. ここで内部的に作成された一時表を確認してみます。Oracle SQL Access to Kafkaで作成されたオブジェクトは、固有のORA$接頭辞を持っています。
    ORA$DKXORA$DKV は Oracle SQL Access to Kafka が生成したビューと外部表の接頭辞で、Kafka からユーザー所有のテーブルまたはグローバル一時表にデータをロードするための DBMS_KAFKA の呼び出しに対応しています。ORA$DKVGTTは、ストリーミングまたはシーキングアプリからロードされるグローバル一時表であることを指定する接頭辞です。この一時表は、DBMS_KAFKA.LOAD_TEMP_TABLEを呼び出すと透過的にロードされます。

    select object_name, object_type from user_objects where object_name like '%ORA$DK%';
    
    OBJECT_NAME                                OBJECT_TYPE
    --------------------------------           ---------------
    ORA$DKVGTT_KAFKACLUS1_SAMPLESTREAMAPP_0    TABLE
    ORA$DKV_KAFKACLUS1_PARTITIONS              VIEW
    ORA$DKV_KAFKACLUS1_SAMPLESTREAMAPP_0       VIEW
    ORA$DKX_KAFKACLUS1_PARTITIONS              TABLE
    ORA$DKX_KAFKACLUS1_SAMPLESTREAMAPP         TABLE
    ORA$DKX_KAFKACLUS1_SAMPLESTREAMAPP         TABLE PARTITION
    
    6 rows selected.
    
  4. 実際にKafkaレコードを処理するストリーミング・アプリケーションを実行してみます。今回は、単純にロードしたKafkaレコードを順番に足していき、SUMが正しい値になったらOFFSETを更新し、COMMITするアプリケーションにします。
    出力結果を確認するため、serveroutputをonにしておきます。

    set serveroutput on
    
    DECLARE
    record_sum number := 0;
    tmp_num number;
    BEGIN
       --Kafkaレコードをグローバル一時表にロード
       DBMS_KAFKA.LOAD_TEMP_TABLE('ORA$DKVGTT_KAFKACLUS1_SampleStreamAPP_0');
       --ロードした行数(レコード数)だけループ
       FOR kafka_record IN (SELECT KAFKA_OFFSET offset FROM ORA$DKVGTT_KAFKACLUS1_SampleStreamAPP_0) LOOP
           DBMS_OUTPUT.PUT_LINE('Processing record offset:'|| kafka_record.offset);
           SELECT COL_NUM INTO tmp_num FROM ORA$DKVGTT_KAFKACLUS1_SampleStreamAPP_0 WHERE KAFKA_OFFSET = kafka_record.offset;
           record_sum := record_sum + tmp_num;
           DBMS_OUTPUT.PUT_LINE('record_sum:'|| record_sum);
       END LOOP;
       IF record_sum = 45 THEN
            DBMS_OUTPUT.PUT_LINE(record_sum);
            DBMS_KAFKA.UPDATE_OFFSET('ORA$DKV_KAFKACLUS1_SampleStreamAPP_0');
            COMMIT;
       --ELSE
           --エラー処理を記述
       END IF;
     END;
     /
    

    本来は、LOAD_TEMP_TABLEもLOOPさせて常に発生しているレコードを処理し続けるアプリケーションが多いと思いますが、今回はrecord_sumが45になったら終了するようにしました。

  5. メッセージをPublishします。

    $KAFKA_HOME/bin/kafka-console-producer.sh \
    --bootstrap-server cell-1.streaming.ap-Tokyo-1.oci.oraclecloud.com:9092 \
    --topic OSaKsumStream \
    --producer.config $KAFKA_HOME/config/producer.properties
    
    >1
    >2
    >3
    >4
    >5
    >6
    >7
    >8
    >9
    
  6. 実行結果を確認してみます。

    Processing record offset:0
    record_sum:1
    Processing record offset:1
    record_sum:3
    Processing record offset:2
    record_sum:6
    Processing record offset:3
    record_sum:10
    Processing record offset:4
    record_sum:15
    Processing record offset:5
    record_sum:21
    Processing record offset:6
    record_sum:28
    Processing record offset:7
    record_sum:36
    Processing record offset:8
    record_sum:45
    45
    Number of Kafka records processed = 9
    
    PL/SQL procedure successfully completed.
    

    1レコードずつ足されてrecord_sumが45となり、9レコードを処理した、と表示がされています。今回はコンソールからの手入力で検証を行いましたが、実際にProducerアプリを作成してメッセージをPublish⇒Oracle SQL Access to Kafkaのストリーミング・アプリケーションで処理することもできます。

参考情報

Oracle SQL Access to Kafka

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?