はじめに
Oracle Cloud Infractructure(以下OCI)では、ストリーミングデータをリアルタイムに収集・処理が出来る Streaming というサービスが提供されています。Streaming は Apache Kafka 互換のAPIを持っているため、Kafka Client から接続して、Produce, Consume が出来ます。
今回は、Java の Kafka Client から Producer としてデータを入力する方法を確認していきます。
仮想マシン
適当にCentOS 7の仮想マシンを作ります。
Auth Token生成
Streaming に、Kafka API で接続するためには、IAM ユーザーで Auth Token が必要です。自分の IAMユーザーの詳細画面で、Generate します。
適当に説明をいれます。
Token が表示されるので、メモっておきます。(画像のものは現在使えません)
Tokenの中に、; などの記号が含まれていると、正しく動かない気がします。もしハマって正常に動かない時は、Tokenの入れ替えも検討してみてください。
Stream の作成
OCI Console で、Analytics > Streaming を選択します。その後、Create Stream を選びます。
適当にパラメータを入れて作成します
Stream を作ったときに、Stream Pool が何もない場合は、Default の Stream Pool が自動的に作成されます。teststream01
は、DefaultPool に自動配置されます。Stream Pool は、複数のStreamを管理する概念となっていて、Stream への Endpoint を Public にするのか Private にするのか、またデータの暗号に使う鍵は何を使用するのか、といった事をまとめて管理できます。
今回は自動作成にしたので、Endpoint は Public になっていて、暗号化用の鍵は Oracle 側で管理されているものを使う設定です。
上記の画面にある「View Kafka Connection Settings」を押します。
Copy All を押して、すべての設定値をメモっておきます。
Open JDK Install
Kafka Client を Javaで動かすために、OpenJDK をインストールします。
sudo yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
Maven 3.6.3 Install
依存関係を管理するために、Maven を Install します。
以下のURLでダウンロードURLを確認します。
https://maven.apache.org/download.cgi
ダウンロードページでコピーしたURLを使って、CentOS 上に tar.gz ファイルをダウンロードします
mkdir ~/maven
cd ~/maven
wget https://ftp.jaist.ac.jp/pub/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
次の手順に従ってインストールしていきます。
https://maven.apache.org/install.html
tar.gz ファイルを解凍
tar xfvz apache-maven-3.6.3-bin.tar.gz
環境変数設定
bashrc に追記
echo 'export PATH=$PATH:$HOME/maven/apache-maven-3.6.3/bin' >> ~/.bashrc
bashrc 再読み込み
source ~/.bashrc
mvn コマンドが実行可能確認します
[opc@kafkaconsumer1 maven]$ mvn -v
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /home/opc/maven/apache-maven-3.6.3
Java version: 1.8.0_252, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.252.b09-2.el7_8.x86_64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1062.18.1.el7.x86_64", arch: "amd64", family: "unix"
[opc@kafkaconsumer1 maven]$
Java Build
ディレクトリを作成します。
mkdir ~/kafkaproducer
mkdir -p ~/kafkaproducer/src/main/java/jp/test/sugi
cd ~/kafkaproducer
Java ソースコードを作成します。
cat <<'EOF' > ~/kafkaproducer/src/main/java/jp/test/sugi/KafkaProducerApps.java
package jp.test.sugi;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class KafkaProducerApp {
public static void main(final String[] args) {
System.out.println("Start.");
// 接続時の設定値を Properties インスタンスとして構築する
final Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
"cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092");
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
properties.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla\" password=\"8t[shwUN}I-d+{}8Nx_a\";");
properties.put(CommonClientConfigs.RETRIES_CONFIG, 5);
properties.put("max.request.size", 1024 * 1024);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// Producer を構築する
final KafkaProducer<String, String> producer = new KafkaProducer<>(properties, new StringSerializer(),
new StringSerializer());
try {
// トピックを指定してメッセージを送信する
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>("teststream01", String.format("message%02d", i)));
}
} catch (final Exception e) {
System.out.println("例外が発生しました。");
System.out.println(e);
} finally {
producer.close();
}
System.out.println("End.");
}
}
EOF
Java ソースコードの中で、大事なポイントを説明します。まず、下の部分は、Kafka Client が接続先の Streaming の Endpoint などを指定しています。Stream Pool からコピーしてきた値を指定します。SaslConfigs.SASL_JAAS_CONFIG
の password は、OCI Auth Token の値に書き換えます。
// 接続時の設定値を Properties インスタンスとして構築する
final Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
"cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092");
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
properties.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla\" password=\"8t[shwUN}I-d+{}8Nx_a\";");
properties.put(CommonClientConfigs.RETRIES_CONFIG, 5);
properties.put("max.request.size", 1024 * 1024);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
また、次の大事なポイントはこれです。testsream01
という名前が、Kafka Client から見ると Topic名を指定しています。Topic 名は、OCI の Stream の名前と紐づいているため、Stream 作成時に指定した名前を入れます。
producer.send(new ProducerRecord<String, String>("teststream01", String.format("message%02d", i)));
maven で依存関係を指定するために、pom.xml を作成します。
cat <<'EOF' > ~/kafkaproducer/pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>jp.test.sugi</groupId>
<artifactId>KafkaProducerApp</artifactId>
<version>1.0</version>
<name>KafkaProducerApp</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.2</version>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>jp.test.sugi.KafkaProducerApp</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<archive>
<manifest>
<mainClass>jp.test.sugi.KafkaProducerApp</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
EOF
依存関係を全て含めた fat jar を作成します。初回実行は、ダウンロードが入るため数分待ちます。
cd ~/kafkaproducer
mvn package assembly:single
出来上がった Jar ファイルを実行すると、OCI Streaming にデータが投入されます。
java -jar target/KafkaProducerApp-1.0-jar-with-dependencies.jar
OCI Console 上で、Load Messages をクリックすると、投入されたデータが見えます。
参考URL
Azure
https://docs.microsoft.com/ja-jp/azure/hdinsight/kafka/apache-kafka-producer-consumer-api