Intro
wslを使いながら、kafkaをホスティングしてwindowsで直接アクセスすることは少し複雑です。 これを解決する方法をご紹介します。
1. WSLでKafkaをインストールして起動
-
Kafkaをインストール
WSL(例:Ubuntu)にKafkaをインストールします。以下は一般的なインストール手順です。# Kafkaのダウンロード wget https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz tar -xzf kafka_2.13-3.5.1.tgz cd kafka_2.13-3.5.1 # Zookeeperの起動 bin/zookeeper-server-start.sh config/zookeeper.properties & # Kafkaブローカーの起動 bin/kafka-server-start.sh config/server.properties &
-
WSLのIPアドレスを確認
WSLのIPアドレスを確認するには、以下のコマンドを実行します。ip route
出力例:
default via 172.18.208.1 dev eth0 proto kernel scope link src 172.18.208.2
この場合、
src 172.18.208.2
がWSLのIPアドレスです。このアドレスはKafkaブローカーとJavaアプリケーション間の接続に使用されます。 -
Kafkaブローカーの設定を変更
Kafkaのserver.properties
ファイルでadvertised.listeners
を以下のように修正し、WSLのIPを明示的に設定します。listeners=PLAINTEXT://0.0.0.0:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). advertised.listeners=PLAINTEXT://localhost:9092
その後、Kafkaを再起動します。
bin/kafka-server-stop.sh bin/kafka-server-start.sh config/server.properties &
2. Windowsでポートフォワーディングを設定
-
管理者権限でPowerShellを実行
WindowsからWSLのKafkaポートにアクセスするには、ポートフォワーディングが必要です。PowerShellを管理者権限で実行します。 -
ポートフォワーディングを設定
以下のコマンドを実行してポートをWSLからWindowsにフォワーディングします。netsh interface portproxy add v4tov4 listenport=9092 listenaddress=0.0.0.0 connectport=9092 connectaddress=172.18.208.2
ここで、
172.18.208.2
はWSLのIPアドレスです。 -
ポートフォワーディングの確認
ポートフォワーディングが正しく設定されているか確認します。netsh interface portproxy show all
出力例:
Listen on ipv4: Connect to ipv4: Address Port Address Port --------------- ---------- --------------- ---------- 0.0.0.0 9092 172.18.208.2 9092
3. Javaコードを作成して実行
3.1 Kafkaクライアントライブラリを設定
MavenまたはGradleプロジェクトでKafkaクライアントを追加します。
Maven
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
Gradle
implementation 'org.apache.kafka:kafka-clients:3.5.1'
3.2 Javaコードを作成
Kafkaプロデューサーとコンシューマーのコードを作成します。以下は簡単な例です。
Kafkaプロデューサー
public class KStreamJoinGlobalKTable {
private static String APPLICATION_NAME = "global-table-join-application";
private static String BOOTSTRAP_SERVERS = "172.18.208.2:9092";
private static String ADDRESS_GLOBAL_TABLE = "address_v2";
private static String ORDER_STREAM = "order";
private static String ORDER_JOIN_STREAM = "order_join";
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
GlobalKTable<String, String> addressGlobalTable = builder.globalTable(ADDRESS_GLOBAL_TABLE);
KStream<String, String> orderStream = builder.stream(ORDER_STREAM);
orderStream.join(addressGlobalTable,
(orderKey, orderValue) -> orderKey,
(order, address) -> order + " send to " + address)
.to(ORDER_JOIN_STREAM);
KafkaStreams streams;
streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
4. Javaコードを実行
-
Kafkaプロデューサーの実行
プロデューサーコードを実行してKafkaにメッセージを送信します。 -
Kafkaコンシューマーの実行
コンシューマーコードを実行してKafkaからメッセージを読み取ります。
この手順を通じて、WSLでホスティングしたKafkaをWindowsのJavaアプリケーションで利用することが可能です。