1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

ClojureでKafkaにアクセスしてみる

Last updated at Posted at 2016-01-01

Kafkaとは

メッセージングミドルウェア

とりあえず、動くとこまで。

ClojureのKafkaクライアント

利用するライブラリ

利用するKafkaのバージョンは0.8.2。ダウンロードとコマンドラインからの動作確認は公式のquick startで確認。

サンプルアプリ

1.leiningenのインストール

公式サイトのインストールに従って、インストール

2.compojureテンプレートの利用

公式サイト

下記のコマンドを実行すると、自動でcompojureテンプレートが展開されプロジェクトの雛形が生成される(このときcompojureテンプレートの最新バージョンがローカルのMavenリポジトリに存在すればそれを利用し、存在しなければダウンロードされる)。

lein new compojure プロジェクト名

プロジェクト名はexwebにする。
下記のコマンドでサーバを起動

lein ring server

下記のURIにブラウザからアクセスすると「Hello World」と表示される。

http://localhost:3000/

3.project.cljの編集

project.clj

(defproject exweb "0.1.0-SNAPSHOT"
  :description "compojure sample"
  :url "http://example.com/exweb"
  :min-lein-version "2.0.0"
  :dependencies [[org.clojure/clojure "1.7.0"]
                 [compojure "1.4.0"]
                 [ring/ring-defaults "0.1.5"]
                 [clj-kafka "0.3.4"]] ;; 追加
  :plugins [[lein-ring "0.9.7"]]
  :ring {:handler exweb.handler/app}
  :profiles {:dev {:dependencies [[javax.servlet/servlet-api "2.5"]
                                  [ring/ring-mock "0.3.0"]]}}
  )

4.kafkaクライアントの作成

kafka.clj

(ns exweb.kafka
  (:require
    [clj-kafka.core :as kafka]
    [clj-kafka.new.producer :as kafka-pd]
    [clj-kafka.consumer.zk :as kafka-zk])
  (:import [kafka.serializer StringDecoder]
           [kafka.consumer KafkaStream]))

;;Producer用の記述
(def producer-config {"bootstrap.servers" "localhost:9092"})

(defn send-message-to-kafka [msg]
  (with-open [p (kafka-pd/producer producer-config (kafka-pd/string-serializer) (kafka-pd/string-serializer))]
    @(kafka-pd/send p (kafka-pd/record "test" msg))))


;;Consumer用の記述
(def consumer-config {"zookeeper.connect"  "localhost:2181"
                      "group.id"           "consumer-01"
                      "auto.offset.reset"  "smallest"
                      "auto.commit.enable" "false"})

(defn to-messages
  "ストリームから、値のリストを取り出し、返す。"
  [^KafkaStream stream]
  (->> (.iterator stream)
       iterator-seq
       (map kafka/to-clojure)
       (map :value)))

(defn string-decoder "文字列デコーダを返す" [] (StringDecoder. nil))

(defn consume-messages 
  "現在のTopicから全てのメッセージを読みこみ、返す"
  []
  (kafka/with-resource [c (kafka-zk/consumer consumer-config)]
                       kafka-zk/shutdown
                       (to-messages (kafka-zk/create-message-stream c "test" (string-decoder) (string-decoder)))))

5.handlerの修正

handler.clj
(ns exweb.handler
  (:require [compojure.core :refer :all]
            [compojure.route :as route]
            [ring.middleware.defaults :refer [wrap-defaults site-defaults]]
            [exweb.kafka :refer :all]))

(defroutes app-routes
           (GET "/" [] "Hello World")
           (GET "/send" [msg] (str (send-message-to-kafka msg)))
           (GET "/show" [] (consume-messages))
           (route/not-found "Not Found"))

(def app
  (wrap-defaults app-routes site-defaults))

6.動作確認

サーバを起動

lein ring server

下記のエンドポイントにメッセージを送信

http://localhost/send?msg=Kafka Message

ブラウザ上には「{:topic "test", :partition 0, :offset 1}」と表示。

下記のエンドポイントでメッセージを取得し、表示

http://localhost/show

ブラウザ上に「Kafka Message」と表示される。

1
0
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?