1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

[Oracle Cloud] Streaming に Java Kafka Client を使って、単一ノードで Consume してみる

Last updated at Posted at 2020-05-04

はじめに

Oracle Cloud Infractructure(以下OCI)では、ストリーミングデータをリアルタイムに収集・処理が出来る Streaming というサービスが提供されています。Streaming は Apache Kafka 互換のAPIを持っているため、Kafka Client から接続して、Produce, Consume が出来ます。

前回は、Java で Producer を実装しました。

今回は、Java の Kafka Client から 単独ノードで Consume してデータを出力する方法を確認していきます。

仮想マシン

適当にCentOS 7の仮想マシンを作ります。

Auth Token生成

Streaming に、Kafka API で接続するためには、IAM ユーザーで Auth Token が必要です。自分の IAMユーザーの詳細画面で、Generate します。

1588474181611.png

適当に説明をいれます。

1588474211704.png

Token が表示されるので、メモっておきます。(画像のものは現在使えません)

Tokenの中に、; などの記号が含まれていると、正しく動かない気がします。もしハマって正常に動かない時は、Tokenの入れ替えも検討してみてください。

1588474230416.png

Stream の作成

OCI Console で、Analytics > Streaming を選択します。その後、Create Stream を選びます。

1588519801919.png

適当にパラメータを入れて作成します

1588519908911.png

Stream を作ったときに、Stream Pool が何もない場合は、Default の Stream Pool が自動的に作成されます。teststream01は、DefaultPool に自動配置されます。Stream Pool は、複数のStreamを管理する概念となっていて、Stream への Endpoint を Public にするのか Private にするのか、またデータの暗号に使う鍵は何を使用するのか、といった事をまとめて管理できます。

今回は自動作成にしたので、Endpoint は Public になっていて、暗号化用の鍵は Oracle 側で管理されているものを使う設定です。

1588520017422.png

上記の画面にある「View Kafka Connection Settings」を押します。

1588521024252.png

Copy All を押して、すべての設定値をメモっておきます。

1588521056591.png

Open JDK Install

Kafka Client を Javaで動かすために、OpenJDK をインストールします。

sudo yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel

Maven 3.6.3 Install

依存関係を管理するために、Maven を Install します。

以下のURLでダウンロードURLを確認します。
https://maven.apache.org/download.cgi

1587385604751.png

ダウンロードページでコピーしたURLを使って、CentOS 上に tar.gz ファイルをダウンロードします

mkdir ~/maven
cd ~/maven
wget https://ftp.jaist.ac.jp/pub/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz

次の手順に従ってインストールしていきます。
https://maven.apache.org/install.html

tar.gz ファイルを解凍

tar xfvz apache-maven-3.6.3-bin.tar.gz

環境変数設定
bashrc に追記

echo 'export PATH=$PATH:$HOME/maven/apache-maven-3.6.3/bin' >> ~/.bashrc

bashrc 再読み込み

source ~/.bashrc

mvn コマンドが実行可能確認します

[opc@kafkaconsumer1 maven]$ mvn -v
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /home/opc/maven/apache-maven-3.6.3
Java version: 1.8.0_252, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.252.b09-2.el7_8.x86_64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1062.18.1.el7.x86_64", arch: "amd64", family: "unix"
[opc@kafkaconsumer1 maven]$ 

Java Build

ディレクトリを作成します。

mkdir ~/KafkaSingleConsumerApp
mkdir -p ~/KafkaSingleConsumerApp/src/main/java/jp/test/sugi
cd ~/kafkaconsumersingle

Java ソースコードを作成します。

cat <<'EOF' > ~/KafkaSingleConsumerApp/src/main/java/jp/test/sugi/KafkaSingleConsumerApp.java
package jp.test.sugi;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class KafkaSingleConsumerApp {
    public static void main(final String[] args) {
        System.out.println("Start.");

        // 接続時の設定値を Properties インスタンスとして構築する
        final Properties properties = new Properties();

        // OCI Streaming に接続するための指定
        properties.put("bootstrap.servers", "cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("sasl.jaas.config",
                "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla\" password=\"8t[shwUN}I-d+{}8Nx_a\";");
        properties.put("max.partition.fetch.bytes", 1024 * 1024);

        // Consumer の動作指定
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "java-consumer-group");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

        // Consumer を構築する
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(),
                new StringDeserializer());

        // Consumer をトピックに割り当てる
        consumer.subscribe(Arrays.asList("teststream01"));

        try {
            while (true) {
                // メッセージをとりだす
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(60l));

                // とりだしたメッセージを表示する
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(String.format("%s:%s", record.offset(), record.value()));
                }

                // メッセージの読み取り位置である offset を、最後に poll() した位置で(同期処理で)更新する
                consumer.commitSync();
            }
        } finally {
            consumer.close();
            System.out.println("End.");
        }
    }
}

maven で依存関係を指定するために、pom.xml を作成します。

cat <<'EOF' > ~/KafkaSingleConsumerApp/pom.xml
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>jp.test.sugi</groupId>
  <artifactId>KafkaSingleConsumerApp</artifactId>
  <version>1.0</version>
  <name>KafkaSingleConsumerApp</name>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.26</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-api</artifactId>
      <version>2.13.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-core</artifactId>
      <version>2.13.2</version>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-assembly-plugin</artifactId>
          <version>3.3.0</version>
          <configuration>
            <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
            <archive>
              <manifest>
                <mainClass>jp.test.sugi.KafkaSingleConsumerApp</mainClass>
              </manifest>
            </archive>
          </configuration>
          <executions>
            <execution>
              <id>make-assembly</id>
              <phase>package</phase>
              <goals>
                <goal>single</goal>
              </goals>
            </execution>
          </executions>
        </plugin>
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
          <configuration>
            <archive>
              <manifest>
                <mainClass>jp.test.sugi.KafkaSingleConsumerApp</mainClass>
              </manifest>
            </archive>
          </configuration>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>
EOF

依存関係を全て含めた fat jar を作成します。初回実行は、ダウンロードが入るため数分待ちます。

mvn package assembly:single

出来上がった Jar ファイルを実行すると、Stream 上のデータを、Consume します。

java -jar target/KafkaSingleConsumerApp-1.0-jar-with-dependencies.jar

動作確認のため、OCI Console から Produce します。Test Message ボタンを押します。

1588602314673.png

適当なメッセージを入れて、Produce ボタンを押します。

1588602336798.png

実行例

[opc@kafkaconsumer01 KafkaSingleConsumerApp]$ java -jar target/KafkaSingleConsumerApp-1.0-jar-with-dependencies.jar
Start.
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.consumer.ConsumerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
1076:test1
1077:hello? i am streaming

参考URL

Azure
https://docs.microsoft.com/ja-jp/azure/hdinsight/kafka/apache-kafka-producer-consumer-api

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?