Clojure
Kafka

ClojureでKafkaにアクセスしてみる

More than 3 years have passed since last update.


Kafkaとは

メッセージングミドルウェア

- 公式サイト

- Apache Kafka, 他とは異なるメッセージングシステム

- Qiitaの記事

とりあえず、動くとこまで。


ClojureのKafkaクライアント

利用するライブラリ

- clj-kafka

利用するKafkaのバージョンは0.8.2。ダウンロードとコマンドラインからの動作確認は公式のquick startで確認。


サンプルアプリ


1.leiningenのインストール

公式サイトのインストールに従って、インストール


2.compojureテンプレートの利用

公式サイト

下記のコマンドを実行すると、自動でcompojureテンプレートが展開されプロジェクトの雛形が生成される(このときcompojureテンプレートの最新バージョンがローカルのMavenリポジトリに存在すればそれを利用し、存在しなければダウンロードされる)。

lein new compojure プロジェクト名

プロジェクト名はexwebにする。

下記のコマンドでサーバを起動



lein ring server

下記のURIにブラウザからアクセスすると「Hello World」と表示される。



http://localhost:3000/


3.project.cljの編集


project.clj



(defproject exweb "0.1.0-SNAPSHOT"
:description "compojure sample"
:url "http://example.com/exweb"
:min-lein-version "2.0.0"
:dependencies [[org.clojure/clojure "1.7.0"]
[compojure "1.4.0"]
[ring/ring-defaults "0.1.5"]
[clj-kafka "0.3.4"]] ;; 追加
:plugins [[lein-ring "0.9.7"]]
:ring {:handler exweb.handler/app}
:profiles {:dev {:dependencies [[javax.servlet/servlet-api "2.5"]
[ring/ring-mock "0.3.0"]]}}
)


4.kafkaクライアントの作成


kafka.clj



(ns exweb.kafka
(:require
[clj-kafka.core :as kafka]
[clj-kafka.new.producer :as kafka-pd]
[clj-kafka.consumer.zk :as kafka-zk])
(:import [kafka.serializer StringDecoder]
[kafka.consumer KafkaStream]))

;;Producer用の記述
(def producer-config {"bootstrap.servers" "localhost:9092"})

(defn send-message-to-kafka [msg]
(with-open [p (kafka-pd/producer producer-config (kafka-pd/string-serializer) (kafka-pd/string-serializer))]
@(kafka-pd/send p (kafka-pd/record "test" msg))))

;;Consumer用の記述
(def consumer-config {"zookeeper.connect" "localhost:2181"
"group.id" "consumer-01"
"auto.offset.reset" "smallest"
"auto.commit.enable" "false"})

(defn to-messages
"ストリームから、値のリストを取り出し、返す。"
[^KafkaStream stream]
(->> (.iterator stream)
iterator-seq
(map kafka/to-clojure)
(map :value)))

(defn string-decoder "文字列デコーダを返す" [] (StringDecoder. nil))

(defn consume-messages
"現在のTopicから全てのメッセージを読みこみ、返す"
[]
(kafka/with-resource [c (kafka-zk/consumer consumer-config)]
kafka-zk/shutdown
(to-messages (kafka-zk/create-message-stream c "test" (string-decoder) (string-decoder)))))



5.handlerの修正


handler.clj

(ns exweb.handler

(:require [compojure.core :refer :all]
[compojure.route :as route]
[ring.middleware.defaults :refer [wrap-defaults site-defaults]]
[exweb.kafka :refer :all]))

(defroutes app-routes
(GET "/" [] "Hello World")
(GET "/send" [msg] (str (send-message-to-kafka msg)))
(GET "/show" [] (consume-messages))
(route/not-found "Not Found"))

(def app
(wrap-defaults app-routes site-defaults))



6.動作確認

サーバを起動



lein ring server

下記のエンドポイントにメッセージを送信



http://localhost/send?msg=Kafka Message



ブラウザ上には「{:topic "test", :partition 0, :offset 1}」と表示。

下記のエンドポイントでメッセージを取得し、表示



http://localhost/show



ブラウザ上に「Kafka Message」と表示される。