はじめに
Oracle Cloud Infractructure(以下OCI)では、ストリーミングデータをリアルタイムに収集・処理が出来る Streaming というサービスが提供されています。Streaming は Apache Kafka 互換のAPIを持っているため、Kafka Client から接続して、データのProduce, Consume が出来ます。
今回は、Consumer Group を使って複数の Consumer からデータを取得してみます。Consumer Group は、複数の Consumer で負荷分散してデータを取得する機能です。こちらの記事に図付きで説明があります。OCI の Streaming でも同様の挙動をするか確認します。
Stream のパーティション数
teststream1
という 名前で Stream を作っています。パーティション数は3です。
Java Code
1個の Java プログラム上で、複数の Consumer を起動するために Executor を使います。詳細は省略します。コードを見ればなんとなくわかると思います。
メイン Class です。int numConsumers = 1;
の数値を変更すると、Consumer の数を調整できます。
package jp.test.sugi;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
public class App {
public static void main(String[] args) {
int numConsumers = 1;
ExecutorService executorService = Executors.newFixedThreadPool(numConsumers);
IntStream.rangeClosed(1, numConsumers).forEach(i -> {
ConsumeTask consumeTask = new ConsumeTask("myConsumer" + i, "teststream01");
executorService.submit(consumeTask);
});
}
}
実際に Consume する Class です。Consumer Group の名前はmyConsumerGroup
の文字列を固定で指定しています。
package jp.test.sugi;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumeTask implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final String consumerGroupName;
private final String consumerName;
private final String topic;
public ConsumeTask(String consumerName, String topic) {
this("myConsumerGroup", consumerName, topic);
}
public ConsumeTask(String consumerGroupName, String consumerName, String topic) {
this.consumerGroupName = consumerGroupName;
this.consumerName = consumerName;
this.topic = topic;
// configuration
Properties properties = new Properties();
// OCI Streaming に接続するための指定
properties.put("bootstrap.servers", "cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla\" password=\"8t[shwUN}I-d+{}8Nx_a\";");
properties.put("max.partition.fetch.bytes", 1024 * 1024);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupName);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(),
new StringDeserializer());
this.consumer = consumer;
}
@Override
public void run() {
consumer.subscribe(Collections.singletonList(topic));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1_000));
// print information about topic record
records.forEach(record -> {
System.out.println("group: " + consumerGroupName + ", consumer: " + consumerName + ", partition: "
+ record.partition() + ", topic: " + record.topic() + ", key: " + record.key() + ", value: "
+ record.value());
});
}
} finally {
consumer.close();
}
}
}
実行結果 : Consumer 1
Consumerの数を1で実行した結果です。myConsumer1
が全てのパーティションを担当してデータをConsumeしています。
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9
実行結果 : Consumer 2
Consumerの数を2で実行した結果です。
担当パーティションは、次のようにばらけています。
- myConsumer1 : パーティション0, パーティション1
- myConsumer2 : パーティション2
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=8
実行結果 : Consumer 3
Consumerの数を3で実行した結果です。
担当パーティションは、次のようにばらけています。
- myConsumer1 : パーティション1
- myConsumer2 : パーティション2
- myConsumer3 : パーティション0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: teststream01, key: null, value: message=9
実行結果 : Consumer 4
Consumerの数を4で実行した結果です。
担当パーティションは、次のようにばらけています。
- myConsumer1 : パーティション2
- myConsumer3 : パーティション0
- myConsumer4 : パーティション1
myConsumer2 は何も処理をしていないです。パーティションを超えるConsumerでは、負荷分散されないことがわかります。
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=5
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=1
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=0
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=2
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=7
group: myConsumerGroup, consumer: myConsumer3, partition: 0, topic: teststream01, key: null, value: message=9
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=4
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=6
group: myConsumerGroup, consumer: myConsumer4, partition: 1, topic: teststream01, key: null, value: message=8
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=3
group: myConsumerGroup, consumer: myConsumer1, partition: 2, topic: teststream01, key: null, value: message=5
まとめ
Streaming の Kafka 互換APIでも、正しくConsumer Groupが動作しました。Kafkaと同様に、1パーティションを1Consumerが担当してデータを読み出すことができます。
本番環境で動かすときには、性能要件を基にパーティション数とConsumerの数を検証して決めていくと良いと思います。
参考URL