2
2

More than 5 years have passed since last update.

RabbitMQでClojureのデータをやりとりする

Posted at

RabbitMQdata.fressianを使って、Clojureのデータを受け渡しする実験です。

RabbitMQはAMQP実装の代表的なプロダクトで、本格的なキューイングシステムでありながら手軽に試してみることができます。Windowsでもインストールパッケージがあるので楽々インストールできます。(ただしErlangが必要ですが)

data.fressianはFressianフォーマットのデータをClojureで扱うためのライブラリです。

Fressianとは、Datomicを開発する上で作られたデータフォーマットで、

  • バイナリフォーマット
  • 拡張可能
  • スキーマレス

が特長です。まぁこれだけだと尖ったところはなく、one of themな印象です、が…
Clojurianにとっての最大のメリットは、Clojure使いがFressianを開発しているという点に尽きます。そしてこのClojureライブラリであるdata.fressianを使えば、非常に簡単にClojureデータのserialize/deserializeが可能です。

=> (require '[clojure.data.fressian :as fress])
=> (def fress-obj (fress/write {:a 1 :b 2}))
#<HeapByteBuffer java.nio.HeapByteBuffer[pos=0 lim=14 cap=32]>
=> (fress/read fress-obj)
{:a 1, :b 2}

こんな感じです。

さて、このdata.fressianを使ってシリアライズしたデータをRabbitMQに載せてみます。
RabbitMQ用のClojureライブラリは、Langohrという最近バージョン2.x系にメジャーバージョンアップされたものがあります。

それでは、データをシリアライズして送る方を書いてみます。RabbitMQはデフォルトポートで起動している想定です。

(require '[langohr.queue :as lq]
         '[langohr.core :as rmq]
         '[langohr.channel :as lch]
         '[langohr.basic :as lb]
         '[langohr.consumers :as lc]
         '[clojure.data.fressian :as fress])

;; Publish clojure values
(let [conn (rmq/connect {:uri "amqp://localhost"})
      ch   (lch/open conn)]
  (lq/declare ch "test-queue")
  (try
    (lb/publish
     ch "" "test-queue"
     (->> {:a {:b [1 2 3] :c "CC" :d #{4 5 6}}}
          (fress/write)
          (.array)))
    (catch Exception e (.getMessage e))
    (finally (rmq/close ch)))

やり口としてはchennelをオープンして、publish関数でデータを送信してやるだけです。その際data.fressianのwrite関数を使ってシリアライズします。結果はHeapByteBufferになるので、arrayメソッドを呼んでバイト配列を取り出してRabbitMQに投げています。

これを受け取ってみます。メッセージの受け取りはPull型とPush型の両方が可能です。

それぞれこんな感じになります。

;; Consume clojure values (Push API)
(let [conn (rmq/connect {:uri "amqp://localhost"})
      ch   (lch/open conn)]
  (lq/declare ch "test-queue")
  (lb/consume ch "test-queue"
    (lc/create-default ch
      :handle-delivery-fn (fn [ch metadata payload]
                            (println (fress/read payload))))))

;; Consume clojure values (Pull API)
(let [conn (rmq/connect {:uri "amqp://localhost"})
      ch   (lch/open conn)]
  (lq/declare ch "test-queue")
  (try
    (if-let [[metadata msg] (lb/get ch "test-queue")]
      (println (fress/read msg)))
    (catch Exception e (.getMessage e))
    (finally (rmq/close ch))))
=> {:a {:b #<ArrayList [1, 2, 3]>, :c CC, :d #<HashSet [4, 5, 6]>}}

受け取ったメッセージをdata.fressianのread関数を呼んであげると元のClojureデータが復元できます。

現在のdata.fressianのバージョン(0.2.0)では、VectorはJavaのArrayListに、SetはJavaのHashSetにそれぞれ変換されてしまうので注意が必要です。
プルリクエストもあがっているので、次のバージョンではちゃんとVectorにマッピングされるようになるかもしれません。
https://github.com/clojure/data.fressian/pull/2

と、このようにRabbitMQとdata.fressianを使うと、異なるアプリケーション間でClojureのデータを、非常に簡単に受け渡しすることができます。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2