Help us understand the problem. What is going on with this article?

RabbitMQでClojureのデータをやりとりする

More than 5 years have passed since last update.

RabbitMQdata.fressianを使って、Clojureのデータを受け渡しする実験です。

RabbitMQはAMQP実装の代表的なプロダクトで、本格的なキューイングシステムでありながら手軽に試してみることができます。Windowsでもインストールパッケージがあるので楽々インストールできます。(ただしErlangが必要ですが)

data.fressianはFressianフォーマットのデータをClojureで扱うためのライブラリです。

Fressianとは、Datomicを開発する上で作られたデータフォーマットで、

  • バイナリフォーマット
  • 拡張可能
  • スキーマレス

が特長です。まぁこれだけだと尖ったところはなく、one of themな印象です、が…
Clojurianにとっての最大のメリットは、Clojure使いがFressianを開発しているという点に尽きます。そしてこのClojureライブラリであるdata.fressianを使えば、非常に簡単にClojureデータのserialize/deserializeが可能です。

=> (require '[clojure.data.fressian :as fress])
=> (def fress-obj (fress/write {:a 1 :b 2}))
#<HeapByteBuffer java.nio.HeapByteBuffer[pos=0 lim=14 cap=32]>
=> (fress/read fress-obj)
{:a 1, :b 2}

こんな感じです。

さて、このdata.fressianを使ってシリアライズしたデータをRabbitMQに載せてみます。
RabbitMQ用のClojureライブラリは、Langohrという最近バージョン2.x系にメジャーバージョンアップされたものがあります。

それでは、データをシリアライズして送る方を書いてみます。RabbitMQはデフォルトポートで起動している想定です。

(require '[langohr.queue :as lq]
         '[langohr.core :as rmq]
         '[langohr.channel :as lch]
         '[langohr.basic :as lb]
         '[langohr.consumers :as lc]
         '[clojure.data.fressian :as fress])

;; Publish clojure values
(let [conn (rmq/connect {:uri "amqp://localhost"})
      ch   (lch/open conn)]
  (lq/declare ch "test-queue")
  (try
    (lb/publish
     ch "" "test-queue"
     (->> {:a {:b [1 2 3] :c "CC" :d #{4 5 6}}}
          (fress/write)
          (.array)))
    (catch Exception e (.getMessage e))
    (finally (rmq/close ch)))

やり口としてはchennelをオープンして、publish関数でデータを送信してやるだけです。その際data.fressianのwrite関数を使ってシリアライズします。結果はHeapByteBufferになるので、arrayメソッドを呼んでバイト配列を取り出してRabbitMQに投げています。

これを受け取ってみます。メッセージの受け取りはPull型とPush型の両方が可能です。

それぞれこんな感じになります。

;; Consume clojure values (Push API)
(let [conn (rmq/connect {:uri "amqp://localhost"})
      ch   (lch/open conn)]
  (lq/declare ch "test-queue")
  (lb/consume ch "test-queue"
    (lc/create-default ch
      :handle-delivery-fn (fn [ch metadata payload]
                            (println (fress/read payload))))))

;; Consume clojure values (Pull API)
(let [conn (rmq/connect {:uri "amqp://localhost"})
      ch   (lch/open conn)]
  (lq/declare ch "test-queue")
  (try
    (if-let [[metadata msg] (lb/get ch "test-queue")]
      (println (fress/read msg)))
    (catch Exception e (.getMessage e))
    (finally (rmq/close ch))))
=> {:a {:b #<ArrayList [1, 2, 3]>, :c CC, :d #<HashSet [4, 5, 6]>}}

受け取ったメッセージをdata.fressianのread関数を呼んであげると元のClojureデータが復元できます。

現在のdata.fressianのバージョン(0.2.0)では、VectorはJavaのArrayListに、SetはJavaのHashSetにそれぞれ変換されてしまうので注意が必要です。
プルリクエストもあがっているので、次のバージョンではちゃんとVectorにマッピングされるようになるかもしれません。
https://github.com/clojure/data.fressian/pull/2

と、このようにRabbitMQとdata.fressianを使うと、異なるアプリケーション間でClojureのデータを、非常に簡単に受け渡しすることができます。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした