0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Intro

wslを使いながら、kafkaをホスティングしてwindowsで直接アクセスすることは少し複雑です。 これを解決する方法をご紹介します。

1. WSLでKafkaをインストールして起動

  1. Kafkaをインストール
    WSL(例:Ubuntu)にKafkaをインストールします。以下は一般的なインストール手順です。

    # Kafkaのダウンロード
    wget https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
    tar -xzf kafka_2.13-3.5.1.tgz
    cd kafka_2.13-3.5.1
    
    # Zookeeperの起動
    bin/zookeeper-server-start.sh config/zookeeper.properties &
    
    # Kafkaブローカーの起動
    bin/kafka-server-start.sh config/server.properties &
    
  2. WSLのIPアドレスを確認
    WSLのIPアドレスを確認するには、以下のコマンドを実行します。

    ip route
    

    出力例:

    default via 172.18.208.1 dev eth0 proto kernel scope link src 172.18.208.2
    

    この場合、src 172.18.208.2がWSLのIPアドレスです。このアドレスはKafkaブローカーとJavaアプリケーション間の接続に使用されます。

  3. Kafkaブローカーの設定を変更
    Kafkaのserver.propertiesファイルでadvertised.listenersを以下のように修正し、WSLのIPを明示的に設定します。

    listeners=PLAINTEXT://0.0.0.0:9092
    
    # Hostname and port the broker will advertise to producers and consumers. If not set, 
    # it uses the value for "listeners" if configured.  Otherwise, it will use the value
    # returned from java.net.InetAddress.getCanonicalHostName().
    advertised.listeners=PLAINTEXT://localhost:9092
    
    

    その後、Kafkaを再起動します。

    bin/kafka-server-stop.sh
    bin/kafka-server-start.sh config/server.properties &
    

2. Windowsでポートフォワーディングを設定

  1. 管理者権限でPowerShellを実行
    WindowsからWSLのKafkaポートにアクセスするには、ポートフォワーディングが必要です。PowerShellを管理者権限で実行します。

  2. ポートフォワーディングを設定
    以下のコマンドを実行してポートをWSLからWindowsにフォワーディングします。

    netsh interface portproxy add v4tov4 listenport=9092 listenaddress=0.0.0.0 connectport=9092 connectaddress=172.18.208.2
    

    ここで、172.18.208.2はWSLのIPアドレスです。

  3. ポートフォワーディングの確認
    ポートフォワーディングが正しく設定されているか確認します。

    netsh interface portproxy show all
    

    出力例:

    Listen on ipv4:             Connect to ipv4:
    Address         Port        Address         Port
    --------------- ----------  --------------- ----------
    0.0.0.0         9092        172.18.208.2    9092
    

3. Javaコードを作成して実行

3.1 Kafkaクライアントライブラリを設定

MavenまたはGradleプロジェクトでKafkaクライアントを追加します。

Maven

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.1</version>
</dependency>

Gradle

implementation 'org.apache.kafka:kafka-clients:3.5.1'

3.2 Javaコードを作成

Kafkaプロデューサーとコンシューマーのコードを作成します。以下は簡単な例です。

Kafkaプロデューサー

public class KStreamJoinGlobalKTable {

    private static String APPLICATION_NAME = "global-table-join-application";
    private static String BOOTSTRAP_SERVERS = "172.18.208.2:9092";
    private static String ADDRESS_GLOBAL_TABLE = "address_v2";
    private static String ORDER_STREAM = "order";
    private static String ORDER_JOIN_STREAM = "order_join";

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        GlobalKTable<String, String> addressGlobalTable = builder.globalTable(ADDRESS_GLOBAL_TABLE);
        KStream<String, String> orderStream = builder.stream(ORDER_STREAM);

        orderStream.join(addressGlobalTable,
                        (orderKey, orderValue) -> orderKey,
                        (order, address) -> order + " send to " + address)
                .to(ORDER_JOIN_STREAM);

        KafkaStreams streams;
        streams = new KafkaStreams(builder.build(), props);
        streams.start();

    }
}


4. Javaコードを実行

  1. Kafkaプロデューサーの実行
    プロデューサーコードを実行してKafkaにメッセージを送信します。

  2. Kafkaコンシューマーの実行
    コンシューマーコードを実行してKafkaからメッセージを読み取ります。


この手順を通じて、WSLでホスティングしたKafkaをWindowsのJavaアプリケーションで利用することが可能です。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?