1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

[Oracle Cloud] Streaming で Consumer Group を動かしてみる

Posted at

はじめに

Oracle Cloud Infractructure(以下OCI)では、ストリーミングデータをリアルタイムに収集・処理が出来る Streaming というサービスが提供されています。Streaming は Apache Kafka 互換のAPIを持っているため、Kafka Client から接続して、データのProduce, Consume が出来ます。

今回は、Consumer Group を使って複数の Consumer からデータを取得してみます。Consumer Group は、複数の Consumer で負荷分散してデータを取得する機能です。こちらの記事に図付きで説明があります。OCI の Streaming でも同様の挙動をするか確認します。

Stream のパーティション数

teststream1 という 名前で Stream を作っています。パーティション数は3です。

1588703929401.png

Java Code

1個の Java プログラム上で、複数の Consumer を起動するために Executor を使います。詳細は省略します。コードを見ればなんとなくわかると思います。

メイン Class です。int numConsumers = 1; の数値を変更すると、Consumer の数を調整できます。

package jp.test.sugi;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

public class App {
    public static void main(String[] args) {
        int numConsumers = 1;
        ExecutorService executorService = Executors.newFixedThreadPool(numConsumers);

        IntStream.rangeClosed(1, numConsumers).forEach(i -> {
            ConsumeTask consumeTask = new ConsumeTask("myConsumer" + i, "teststream01");
            executorService.submit(consumeTask);
        });
    }
}

実際に Consume する Class です。Consumer Group の名前はmyConsumerGroupの文字列を固定で指定しています。

package jp.test.sugi;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumeTask implements Runnable {
    private final KafkaConsumer<String, String> consumer;
    private final String consumerGroupName;
    private final String consumerName;
    private final String topic;

    public ConsumeTask(String consumerName, String topic) {
        this("myConsumerGroup", consumerName, topic);
    }

    public ConsumeTask(String consumerGroupName, String consumerName, String topic) {
        this.consumerGroupName = consumerGroupName;
        this.consumerName = consumerName;
        this.topic = topic;

        // configuration
        Properties properties = new Properties();

        // OCI Streaming に接続するための指定
        properties.put("bootstrap.servers", "cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("sasl.jaas.config",
                "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla\" password=\"8t[shwUN}I-d+{}8Nx_a\";");
        properties.put("max.partition.fetch.bytes", 1024 * 1024);

        properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupName);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(),
                new StringDeserializer());

        this.consumer = consumer;
    }

    @Override
    public void run() {
        consumer.subscribe(Collections.singletonList(topic));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1_000));

                // print information about topic record
                records.forEach(record -> {
                    System.out.println("group: " + consumerGroupName + ", consumer: " + consumerName + ", partition: "
                            + record.partition() + ", topic: " + record.topic() + ", key: " + record.key() + ", value: "
                            + record.value());
                });
            }
        } finally {
            consumer.close();
        }
    }
}

実行結果 : Consumer 1

Consumerの数を1で実行した結果です。myConsumer1が全てのパーティションを担当してデータをConsumeしています。

group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9

実行結果 : Consumer 2

Consumerの数を2で実行した結果です。
担当パーティションは、次のようにばらけています。

  • myConsumer1 : パーティション0, パーティション1
  • myConsumer2 : パーティション2
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=8

実行結果 : Consumer 3

Consumerの数を3で実行した結果です。
担当パーティションは、次のようにばらけています。

  • myConsumer1 : パーティション1
  • myConsumer2 : パーティション2
  • myConsumer3 : パーティション0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9

実行結果 : Consumer 4

Consumerの数を4で実行した結果です。
担当パーティションは、次のようにばらけています。

  • myConsumer1 : パーティション2
  • myConsumer3 : パーティション0
  • myConsumer4 : パーティション1

myConsumer2 は何も処理をしていないです。パーティションを超えるConsumerでは、負荷分散されないことがわかります。

group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=5

まとめ

Streaming の Kafka 互換APIでも、正しくConsumer Groupが動作しました。Kafkaと同様に、1パーティションを1Consumerが担当してデータを読み出すことができます。
本番環境で動かすときには、性能要件を基にパーティション数とConsumerの数を検証して決めていくと良いと思います。

参考URL

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?