2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

[Oracle Cloud] Streaming に Java Kafka Client を使って Producer してみる

Last updated at Posted at 2020-05-04

はじめに

Oracle Cloud Infractructure(以下OCI)では、ストリーミングデータをリアルタイムに収集・処理が出来る Streaming というサービスが提供されています。Streaming は Apache Kafka 互換のAPIを持っているため、Kafka Client から接続して、Produce, Consume が出来ます。

今回は、Java の Kafka Client から Producer としてデータを入力する方法を確認していきます。

仮想マシン

適当にCentOS 7の仮想マシンを作ります。

Auth Token生成

Streaming に、Kafka API で接続するためには、IAM ユーザーで Auth Token が必要です。自分の IAMユーザーの詳細画面で、Generate します。

1588474181611.png

適当に説明をいれます。

1588474211704.png

Token が表示されるので、メモっておきます。(画像のものは現在使えません)

Tokenの中に、; などの記号が含まれていると、正しく動かない気がします。もしハマって正常に動かない時は、Tokenの入れ替えも検討してみてください。

1588474230416.png

Stream の作成

OCI Console で、Analytics > Streaming を選択します。その後、Create Stream を選びます。

1588519801919.png

適当にパラメータを入れて作成します

1588519908911.png

Stream を作ったときに、Stream Pool が何もない場合は、Default の Stream Pool が自動的に作成されます。teststream01は、DefaultPool に自動配置されます。Stream Pool は、複数のStreamを管理する概念となっていて、Stream への Endpoint を Public にするのか Private にするのか、またデータの暗号に使う鍵は何を使用するのか、といった事をまとめて管理できます。

今回は自動作成にしたので、Endpoint は Public になっていて、暗号化用の鍵は Oracle 側で管理されているものを使う設定です。

1588520017422.png

上記の画面にある「View Kafka Connection Settings」を押します。

1588521024252.png

Copy All を押して、すべての設定値をメモっておきます。

1588521056591.png

Open JDK Install

Kafka Client を Javaで動かすために、OpenJDK をインストールします。

sudo yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel

Maven 3.6.3 Install

依存関係を管理するために、Maven を Install します。

以下のURLでダウンロードURLを確認します。
https://maven.apache.org/download.cgi

1587385604751.png

ダウンロードページでコピーしたURLを使って、CentOS 上に tar.gz ファイルをダウンロードします

mkdir ~/maven
cd ~/maven
wget https://ftp.jaist.ac.jp/pub/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz

次の手順に従ってインストールしていきます。
https://maven.apache.org/install.html

tar.gz ファイルを解凍

tar xfvz apache-maven-3.6.3-bin.tar.gz

環境変数設定
bashrc に追記

echo 'export PATH=$PATH:$HOME/maven/apache-maven-3.6.3/bin' >> ~/.bashrc

bashrc 再読み込み

source ~/.bashrc

mvn コマンドが実行可能確認します

[opc@kafkaconsumer1 maven]$ mvn -v
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /home/opc/maven/apache-maven-3.6.3
Java version: 1.8.0_252, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.252.b09-2.el7_8.x86_64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1062.18.1.el7.x86_64", arch: "amd64", family: "unix"
[opc@kafkaconsumer1 maven]$ 

Java Build

ディレクトリを作成します。

mkdir ~/kafkaproducer
mkdir -p ~/kafkaproducer/src/main/java/jp/test/sugi
cd ~/kafkaproducer

Java ソースコードを作成します。

cat <<'EOF' > ~/kafkaproducer/src/main/java/jp/test/sugi/KafkaProducerApps.java
package jp.test.sugi;

import java.util.Properties;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaProducerApp {
    public static void main(final String[] args) {
        System.out.println("Start.");

        // 接続時の設定値を Properties インスタンスとして構築する
        final Properties properties = new Properties();

        properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
                "cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092");
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        properties.put(SaslConfigs.SASL_JAAS_CONFIG,
                "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla\" password=\"8t[shwUN}I-d+{}8Nx_a\";");
        properties.put(CommonClientConfigs.RETRIES_CONFIG, 5);
        properties.put("max.request.size", 1024 * 1024);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // Producer を構築する
        final KafkaProducer<String, String> producer = new KafkaProducer<>(properties, new StringSerializer(),
                new StringSerializer());

        try {
            // トピックを指定してメッセージを送信する
            for (int i = 0; i < 10; i++) {
                producer.send(new ProducerRecord<String, String>("teststream01", String.format("message%02d", i)));
            }
        } catch (final Exception e) {
            System.out.println("例外が発生しました。");
            System.out.println(e);
        } finally {
            producer.close();
        }

        System.out.println("End.");
    }
}
EOF

Java ソースコードの中で、大事なポイントを説明します。まず、下の部分は、Kafka Client が接続先の Streaming の Endpoint などを指定しています。Stream Pool からコピーしてきた値を指定します。SaslConfigs.SASL_JAAS_CONFIG の password は、OCI Auth Token の値に書き換えます。


        // 接続時の設定値を Properties インスタンスとして構築する
        final Properties properties = new Properties();

        properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
                "cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092");
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        properties.put(SaslConfigs.SASL_JAAS_CONFIG,
                "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla\" password=\"8t[shwUN}I-d+{}8Nx_a\";");
        properties.put(CommonClientConfigs.RETRIES_CONFIG, 5);
        properties.put("max.request.size", 1024 * 1024);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

また、次の大事なポイントはこれです。testsream01 という名前が、Kafka Client から見ると Topic名を指定しています。Topic 名は、OCI の Stream の名前と紐づいているため、Stream 作成時に指定した名前を入れます。

                producer.send(new ProducerRecord<String, String>("teststream01", String.format("message%02d", i)));

maven で依存関係を指定するために、pom.xml を作成します。


cat <<'EOF' > ~/kafkaproducer/pom.xml
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>jp.test.sugi</groupId>
  <artifactId>KafkaProducerApp</artifactId>
  <version>1.0</version>
  <name>KafkaProducerApp</name>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.26</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-api</artifactId>
      <version>2.13.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-core</artifactId>
      <version>2.13.2</version>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-assembly-plugin</artifactId>
          <version>3.3.0</version>
          <configuration>
            <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
            <archive>
              <manifest>
                <mainClass>jp.test.sugi.KafkaProducerApp</mainClass>
              </manifest>
            </archive>
          </configuration>
          <executions>
            <execution>
              <id>make-assembly</id>
              <phase>package</phase>
              <goals>
                <goal>single</goal>
              </goals>
            </execution>
          </executions>
        </plugin>
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
          <configuration>
            <archive>
              <manifest>
                <mainClass>jp.test.sugi.KafkaProducerApp</mainClass>
              </manifest>
            </archive>
          </configuration>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>
EOF

依存関係を全て含めた fat jar を作成します。初回実行は、ダウンロードが入るため数分待ちます。

cd ~/kafkaproducer
mvn package assembly:single

出来上がった Jar ファイルを実行すると、OCI Streaming にデータが投入されます。

java -jar target/KafkaProducerApp-1.0-jar-with-dependencies.jar

OCI Console 上で、Load Messages をクリックすると、投入されたデータが見えます。

1588563837576.png

参考URL

Azure
https://docs.microsoft.com/ja-jp/azure/hdinsight/kafka/apache-kafka-producer-consumer-api

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?