7
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ScalaでKafkaにメッセージを送る

Last updated at Posted at 2015-09-09

表題のアプリを実装しないといけなくなったのですが、どんぴしゃな情報が無くて1日苦労しました。

  • 急にKafkaを使わないといけなくなったが、Kafkaのことをよく知らない
  • 急にScalaを使わないといけなくなったが、Scalaのことをよく知らない

という人の参考になるかもしれないので、ここにまとめておきます。

Kafkaのインストール

とりあえず現時点で最新の0.8.2.1を入れます。諸事情により、使うScalaのバージョンは2.10系です。

# cp  kafka_2.10-0.8.2.1.tgz /usr/local/
# tar zxf kafka_2.10-0.8.2.1.tgz
# ln -s /usr/local/kafka_2.10-0.8.2.1 /usr/local/kafka
# useradd kafka
# passwd kafka
# chown -R kafka:kafka /usr/local/kafka_2.10-0.8.2.1

ZooKeeperのインストール

現時点でcurrent stableな3.4.6を入れます。3.5系いつ正式版が出るのだろうか。。。

# cp zookeeper-3.4.6.tar.gz /usr/local/
# tar zxf zookeeper-3.4.6.tar.gz
# ln -s /usr/local/zookeeper-3.4.6 /usr/local/zookeeper
# useradd zookeeper
# passwd zookeeper
# chown -R zookeeper:zookeeper /usr/local/zookeeper-3.4.6.tar.gz

Kafkaサーバの起動

とりあえず1台サーバ構成で。

先にZooKeeperサーバを立ち上げておきます。設定はKafkaにデフォルトで付いているものを使えばOKです。

$ su - kafka
$ cd /usr/local/kafka
$ bin/zookeeper-server-start.sh config/zookeeper.properties

次にKafkaサーバを立ち上げます。

$ su - kafka
$ cd /usr/local/kafka
$ bin/kafka-server-start.sh config/server.properties

あとで使うトピックを作っておきます。

$ bin/kafka-topics.sh --zookeeper localhost:2181 --topic testTopic --create --partitions 1 --replication-factor 1

Producerクライアントを作る

ここが辛かったです。

まず、Kafkaのドキュメントを見ると、

As of the 0.8.2 release we encourage all new development to use the new Java producer. This client is production tested and generally both faster and more fully featured than the previous Scala client.

と書いてあり、かなりあせりました。

なんで、Scala APIなくなったのー

しかも、

For those interested in the legacy Scala producer api, information can be found here.

とあるのだが、リンク先はJava APIの説明という罠Σ(´д`*)
うそでしょ・・・

ネット上の情報を探しつつ、色々な先人の方のサンプルコードをかき集めて、自分なりに整理したコードがこちら。

import java.util.Properties
import kafka.producer.{KeyedMessage, ProducerConfig, Producer}

object SimpleProducer {
  def main(args: Array[String]): Unit = {

	val topicName = "testTopic"
	println("connecting to %s".format(topicName))

	val props = new Properties()
	props.put("metadata.broker.list", "localhost:9092")
	props.put("serializer.class", "kafka.serializer.StringEncoder")
	props.put("request.required.acks", "1")

	val config = new ProducerConfig(props)
	val producer = new Producer[String, String](config)

	val data = new KeyedMessage[String, String](topicName, "one", "This is message from my simple producer") 
	producer.send(data)

	producer.close()
  }
}

Scalaのこと1ミリも知らなかったので、Javaの知識とコンパイルエラーをググって男気コンパイルしてます。
あと、パラメータとかは全部埋め込みです。
エラーハンドリングも無し!フゥ~

明日以降の自分に託します(;´Д⊂)

ビルド&実行確認

SBTもしくはTypesafe Activatorを使ってビルドしてあげます。
今回は、Activatorを使いました。

# cp typesafe-activator-1.3.6.zip /usr/local/
# unzip typesafe-activator-1.3.6.zip
# ln -s /usr/local/typesafe-activator-1.3.6 /usr/local/activator

Activatorを使うユーザにPATHを通します。
この時、/usr/local/activator/binにPATHを通すのではなく、/usr/local/activatorにしないといけないです。ややキモい。

とにかく、インストールすれば後はactivator newコマンドを叩くと、Scalaアプリのためのファイルやディレクトリのひな形を作ってくれます。便利(´・∀・`)

$ activator new simple-kafka
$ cd simple-kafka

src/main/scalaディレクトリの下に、自分で書いたScalaソースファイルを置きます。
そして、activatorが作ったbuild.sbtをカスタマイズします。

build.sbt
name := """simple-kafka"""

version := "1.0"

scalaVersion := "2.10.5"  ここを念のためKafkaサーバのScalaとバージョンを合わせておく

// Change this to another test framework if you prefer
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.4" % "test"
libraryDependencies += "org.apache.kafka" % "kafka_2.10" % "0.8.2.1"   Kafkaのライブラリを追加

// Uncomment to use Akka
//libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.3.11"

あとは、ビルド&実行です。関連ライブラリのダウンロードに時間がかかる。。。

$ activator run
(略)
[info] Running com.example.SimpleProducer
connecting to testTopic
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[success] Total time: 12 s, completed 2015/09/09 23:56:36

実際にメッセージがKafkaに送信されたかどうかを確認しておきます。

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testTopic --from-beginning
This is message from my simple producer

やった!

補足1

上にも書きましたが、ScalaからKafkaにメッセージを送るAPIが最新のKafkaのバージョンからは非推奨になっています。自分は、Scala APIを使い続けることが正解なのか判断できていません。

素直にJava APIを使うほうが良いかも。

補足2

Kafkaサーバを終了させる前にZooKeeperサーバを終了させてはいけません。

もしやってしまうと、Kafkaサーバが終了するためにZooKeeperサーバにアクセスしようとして失敗する怨嗟(エラー)をあげ続ける生霊と化します。

その時は、落ち着いてZooKeeperサーバを立ち上げると、成仏してくれます。(-∧-;) ナムナム

7
8
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?