はじめに
Oracle Cloud Infractructure(以下OCI)では、ストリーミングデータをリアルタイムに収集・処理が出来る Streaming というサービスが提供されています。Streaming は Apache Kafka 互換のAPIを持っているため、Kafka Client から接続して、Produce, Consume が出来ます。
前回は、Java で Producer を実装しました。
今回は、Java の Kafka Client から 単独ノードで Consume してデータを出力する方法を確認していきます。
仮想マシン
適当にCentOS 7の仮想マシンを作ります。
Auth Token生成
Streaming に、Kafka API で接続するためには、IAM ユーザーで Auth Token が必要です。自分の IAMユーザーの詳細画面で、Generate します。
適当に説明をいれます。
Token が表示されるので、メモっておきます。(画像のものは現在使えません)
Tokenの中に、; などの記号が含まれていると、正しく動かない気がします。もしハマって正常に動かない時は、Tokenの入れ替えも検討してみてください。
Stream の作成
OCI Console で、Analytics > Streaming を選択します。その後、Create Stream を選びます。
適当にパラメータを入れて作成します
Stream を作ったときに、Stream Pool が何もない場合は、Default の Stream Pool が自動的に作成されます。teststream01
は、DefaultPool に自動配置されます。Stream Pool は、複数のStreamを管理する概念となっていて、Stream への Endpoint を Public にするのか Private にするのか、またデータの暗号に使う鍵は何を使用するのか、といった事をまとめて管理できます。
今回は自動作成にしたので、Endpoint は Public になっていて、暗号化用の鍵は Oracle 側で管理されているものを使う設定です。
上記の画面にある「View Kafka Connection Settings」を押します。
Copy All を押して、すべての設定値をメモっておきます。
Open JDK Install
Kafka Client を Javaで動かすために、OpenJDK をインストールします。
sudo yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
Maven 3.6.3 Install
依存関係を管理するために、Maven を Install します。
以下のURLでダウンロードURLを確認します。
https://maven.apache.org/download.cgi
ダウンロードページでコピーしたURLを使って、CentOS 上に tar.gz ファイルをダウンロードします
mkdir ~/maven
cd ~/maven
wget https://ftp.jaist.ac.jp/pub/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
次の手順に従ってインストールしていきます。
https://maven.apache.org/install.html
tar.gz ファイルを解凍
tar xfvz apache-maven-3.6.3-bin.tar.gz
環境変数設定
bashrc に追記
echo 'export PATH=$PATH:$HOME/maven/apache-maven-3.6.3/bin' >> ~/.bashrc
bashrc 再読み込み
source ~/.bashrc
mvn コマンドが実行可能確認します
[opc@kafkaconsumer1 maven]$ mvn -v
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /home/opc/maven/apache-maven-3.6.3
Java version: 1.8.0_252, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.252.b09-2.el7_8.x86_64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1062.18.1.el7.x86_64", arch: "amd64", family: "unix"
[opc@kafkaconsumer1 maven]$
Java Build
ディレクトリを作成します。
mkdir ~/KafkaSingleConsumerApp
mkdir -p ~/KafkaSingleConsumerApp/src/main/java/jp/test/sugi
cd ~/kafkaconsumersingle
Java ソースコードを作成します。
cat <<'EOF' > ~/KafkaSingleConsumerApp/src/main/java/jp/test/sugi/KafkaSingleConsumerApp.java
package jp.test.sugi;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class KafkaSingleConsumerApp {
public static void main(final String[] args) {
System.out.println("Start.");
// 接続時の設定値を Properties インスタンスとして構築する
final Properties properties = new Properties();
// OCI Streaming に接続するための指定
properties.put("bootstrap.servers", "cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla\" password=\"8t[shwUN}I-d+{}8Nx_a\";");
properties.put("max.partition.fetch.bytes", 1024 * 1024);
// Consumer の動作指定
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "java-consumer-group");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// Consumer を構築する
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(),
new StringDeserializer());
// Consumer をトピックに割り当てる
consumer.subscribe(Arrays.asList("teststream01"));
try {
while (true) {
// メッセージをとりだす
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(60l));
// とりだしたメッセージを表示する
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("%s:%s", record.offset(), record.value()));
}
// メッセージの読み取り位置である offset を、最後に poll() した位置で(同期処理で)更新する
consumer.commitSync();
}
} finally {
consumer.close();
System.out.println("End.");
}
}
}
maven で依存関係を指定するために、pom.xml を作成します。
cat <<'EOF' > ~/KafkaSingleConsumerApp/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>jp.test.sugi</groupId>
<artifactId>KafkaSingleConsumerApp</artifactId>
<version>1.0</version>
<name>KafkaSingleConsumerApp</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.2</version>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>jp.test.sugi.KafkaSingleConsumerApp</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<archive>
<manifest>
<mainClass>jp.test.sugi.KafkaSingleConsumerApp</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
EOF
依存関係を全て含めた fat jar を作成します。初回実行は、ダウンロードが入るため数分待ちます。
mvn package assembly:single
出来上がった Jar ファイルを実行すると、Stream 上のデータを、Consume します。
java -jar target/KafkaSingleConsumerApp-1.0-jar-with-dependencies.jar
動作確認のため、OCI Console から Produce します。Test Message ボタンを押します。
適当なメッセージを入れて、Produce ボタンを押します。
実行例
[opc@kafkaconsumer01 KafkaSingleConsumerApp]$ java -jar target/KafkaSingleConsumerApp-1.0-jar-with-dependencies.jar
Start.
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.consumer.ConsumerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
1076:test1
1077:hello? i am streaming
参考URL
Azure
https://docs.microsoft.com/ja-jp/azure/hdinsight/kafka/apache-kafka-producer-consumer-api