はじめに
Clojure Advent Calendar 2021 に間に合った(?)ので25日目として公開します。
そろそろ本格的にClojureに入門したいという思い+生のHTTPデータを扱いたかったので遊んでみました。
ClojureでTCPを扱えるフレームワークとして利用者が多そうなalephというライブラリを使います。alephはNettyを基盤としており、TCP以外にもHTTP, WebSockets, UDPをサポートしています。普通にHTTPサーバーを使いたい人はaleph.httpを使うか、他のHTTPサーバーのフレームワークを選ぶと良いでしょう。
aleph.tcpのsampleは公式サイトの中にドキュメントがあります。
https://aleph.io/aleph/literate.html#aleph.examples.tcp
上記のsampleは独自のプロトコル上に実装されたechoサーバーです。そのため、まずプロトコルを定義し、クライアント自身もaleph.tcpのclientを使用しています。シンプルでaleph.tcpの要素が詰まっているのですが、後述する他のライブラリの知識も必要なので理解するのに時間を要しました。そのような経緯もあり、日本語の記事も無かったので、この記事がaleph.tcpを使う上での足がかりとなれば幸いです。
aleph.tcp
Alephを使用する上でデータをやりとりするインターフェースはManifold streamになります。Manifoldは 非同期プログラミングのためのライブラリでClojureで非同期というとcore.asyncのようですが、aleph.tcp では Manifold を使っていきます。ただし、core.asyncと親和性がないというわけではないようです。
ちなみに私自身はcore.asyncというか非同期について知っていることはあまり無く
TypeScriptでPromiseを少し触ったことがあり、そして既に忘れてしまっている程度です。
話を戻すと、aleph.tcpを扱う上で少なくともManifoldのことを知っておく必要があります。
Manifoldの基本構成は以下の2つです。
- Deferred ― 非同期的な値を表す
- Stream ― 非同期的な値の順序が保証されたシーケンスを表す
先に説明しておくとaleph.tcpでは非同期的な値に対してコールバックを登録していくプログラミングスタイルになってます。
実際に触っていきます。とりあえずLeiningenで遊び場を作っておきます。
lein new aleph-tcp-sample
manifold.deferred
依存関係にmanifoldを追加します。
--- a/project.clj
+++ b/project.clj
@@ -3,5 +3,6 @@
:url "http://example.com/FIXME"
:license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0"
:url "https://www.eclipse.org/legal/epl-2.0/"}
- :dependencies [[org.clojure/clojure "1.10.3"]]
+ :dependencies [[org.clojure/clojure "1.10.3"]
+ [manifold "0.2.3"]]
:repl-options {:init-ns aleph-tcp-sample.core})
おもむろにREPLを立ち上げ(cider-jack-in)、動作を確認します。
> (require '[manifold.deferred :as d])
nil
> (def d (d/deferred))
#'aleph-tcp-sample.core/d
> @d
d/deferredでDeferredを1つ作りました。
Deferredを参照するにはderef(のリーダーマクロ(@))を使用します。
さっそくderefで確認しようとしていますが、結果が帰ってきません。
未来に確定する値がまだ確定されてないからだと推測できます。
未来の値に対して確定(現実化)されたかどうか確認するためにはrealized?を使用します。
(realize?はclojure.coreの関数です)
未来の値を現実化するためにはd/success!を使用します。
> (realized? d)
false
> (d/success! d :foo)
true
> @d
:foo
> (realized? d)
true
値が返ってきました。
余談ですが、ManifoldのサンプルはdがDeferred、sがStreamを表すようで最初かなり分かりにくかったです。同じ関数型言語のHaskellも一文字の変数は多いですが、型の情報がドキュメント上にあるため意味を理解しやすいです。少なくともaleph.tcpを使うにあたり関連するドキュメント上から情報を得るのに苦労しました。英語や不慣れな領域だからというのもあるのでしょうか。一旦ライブラリの流儀には従っておきます。
不満を述べてしまいましたが、Deferredについて最後に便利な関数を紹介しておきます。
> (def d (d/deferred))
#'aleph-tcp-sample.core/d
> (d/chain d inc inc println)
#<Deferred@19ea596a: :not-delivered>
> (d/success! d 1)
3
true
chainは第一引数にDeferredをとり、現実化された際に後続の引数の関数たちが順番に適用されます。これが先ほど述べたコールバックを登録していくプログラミングスタイルといった所以です。ちなみに戻り値自身もDeferredになってますね。
manifold.stream
Streamのイメージとしては水道管などの方向性のあるパイプです。
StreamはSinkとSourceから成り、Sinkはメッセージを消費しSourceはメッセージを生産するものと見なすことができます。Streamの外側の視点から見るとSinkにメッセージ送り、Sourceからメッセージを取り出すとも言えます。
早速試してみます。
> (require '[manifold.stream :as s])
nil
> (def s (s/stream))
#'aleph-tcp-sample.core/s
> (s/put! s 1)
#<Deferred@43d957ea: :not-delivered>
> (s/take! s)
#<SuccessDeferred@589c44d8: 1>
> @*1
1
> (s/close! s)
nil
直感的に分かりますが、Streamを定義し1をputで入れてしてtakeで取り出してます。
戻り値はDeferredになっていて、最後にStreamを閉じました。
また、Streamは繋げることができます。
> (def a (s/stream))
#'aleph-tcp-sample.core/a
> (def b (s/stream))
#'aleph-tcp-sample.core/b
> (s/connect a b)
true
> @(s/put! a 1)
true
> @(s/take! b)
1
> (s/close! a)
nil
> (s/closed? b)
true
上流がa, 下流がbとなるようにconnectでStream同士を繋げています。
結果として、aにputされた値をbから取り出すことができます。
最後に、aを閉じるとbも閉じられます。
このSourceが閉じられた時にSinkも閉じられる動作はデフォルトになっており、connectの引数にオプションとして{:downstream? false}を追加で変更することができます。このオプションは遊ぶにあたり特に使わないですが、Streamのコードを追ったりした時に見かけるので触れておきました。
続いての特徴として、Streamはシーケンスのような概念のためmapやreduceといった関数が用意されています。
> (def a (s/stream))
#'aleph-tcp-sample.core/a
> (s/put! a 1)
#<Deferred@7f6f2ffa: :not-delivered>
> (s/put! a 2)
#<Deferred@3e5d958a: :not-delivered>
> (def b (s/map inc a))
#'aleph-tcp-sample.core/b
> (s/take! b)
#<SuccessDeferred@1fc9240e: 2>
> (s/take! b)
#<SuccessDeferred@3c1d8bcd: 3>
aというStreamに対してs/mapを適用した結果、putされた各Deferredに対してincが適用されたことを確認できました。
ここでs/mapのソースの中身を見てみましょう。
615 (defn map
616 "Equivalent to Clojure's `map`, but for streams instead of sequences."
617 ([f s]
618 (let [s' (stream)]
619 (connect-via s
620 (fn [msg]
621 (put! s' (f msg)))
622 s'
623 {:description {:op "map"}})
624 (source-only s')))
625 ([f s & rest]
626 (map #(apply f %)
627 (apply zip s rest))))
ここで確認したかったのはconnect-viaという関数で、これはconnectと同様2つのStreamを繋げるのですが、違うところは上流から流れてくる値に対して処理を行うコールバックを登録できる点です。処理を適用し最終的に下流にputする形になっています。ちなみにコールバックの戻り値はbooleanを返すDeferredであるべきでfalseの場合、下流のSinkは閉じられていると見なされます。
filterも見てみましょう。
述語がfalseであった場合、putせずにsuccess-deferredをtrueに適用しています。完全に理解しました。
674 (defn filter
675 "Equivalent to Clojure's `filter`, but for streams instead of sequences."
676 [pred s]
677 (let [s' (stream)]
678 (connect-via s
679 (fn [msg]
680 (if (pred msg)
681 (put! s' msg)
682 (d/success-deferred true)))
683 s'
684 {:description {:op "filter"}})
685 (source-only s')))
Streamについて、最後に語るべきはduplex stream (splice関数、SplicedStream)ですが、これはaleph.tcpのサーバー起動サンプルを見てからの方が良いでしょう。以下は最も単純なTCP echoサーバーの例で公式のトップページにあったものです。ちなみにaleph.tcp名前空間にはclientとstart-serverの2つの関数しかありません。
(require '[aleph.tcp :as tcp])
(defn echo-handler [s info]
(s/connect s s))
(tcp/start-server echo-handler {:port 10001})
start-serverの引数としてハンドラとオプション(ここではポート番号)を指定しています。
ハンドラの第一引数はduplex streamで、第二引数がクライアントの情報が入ったマップです。
aleph.tcpを使う上でデータをやりとりするインターフェースとしてStreamが使用されていると最初に説明しましたが、このduplex streamがそれです。お気づきかもしれませんが、Streamは単方向にしか流れないのでクライアントとサーバー間でデータをやりとりするためには、Streamが2つ(クライアント→サーバー、サーバー→クライアント)必要になります。これらを接合したものがduplex streamです。このduplex streamに対してtake (クライアントからのデータを受け取る)、put(クライアントにデータを渡す)という操作が可能になります。
2つのStreamを上流と下流で繋げるconnectという関数がありましたが、接合する関数はspliceで戻り値としてSplicedStreamが返されます。spliceは公式のsampleにも出てくるので覚えておくと良いかもしれません。
上記の例はconnectでクライアントの入力がそのままクライアントへ出力(echo)しているわけです。
HTTPサーバーを作るぞい
お膳立てが済んだところでHTTPサーバー作りに取り掛かります。
今回作るechoサーバーの仕様は、"受け取ったHTTPリクエストの生テキストをそのままHTTPレスポンスのボディとしてクライアントに返す"ということにします。本当にそのままクライアントに返したらHTTPレスポンスとしては不正になってしまいますので。
まず、本質的でない部分を作ってしまいます。
--- a/project.clj
+++ b/project.clj
@@ -4,5 +4,6 @@
:license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0"
:url "https://www.eclipse.org/legal/epl-2.0/"}
:dependencies [[org.clojure/clojure "1.10.3"]
- [manifold "0.2.3"]]
+ [manifold "0.2.3"]
+ [aleph "0.4.6"]]
:repl-options {:init-ns aleph-tcp-sample.core})
--- a/src/aleph_tcp_sample/core.clj
+++ b/src/aleph_tcp_sample/core.clj
@@ -1,6 +1,32 @@
-(ns aleph-tcp-sample.core)
+(ns aleph-tcp-sample.core
+ (:require [aleph.tcp :as tcp]
+ [manifold.deferred :as d]
+ [manifold.stream :as s]
+ [clojure.string :as str]))
-(defn foo
- "I don't do a whole lot."
- [x]
- (println x "Hello, World!"))
+(def echo-server (atom nil))
+
+(defn response-echo [^String x]
+ (str/join "\r\n"
+ ["HTTP/1.1 200 OK"
+ "Server: echo-server"
+ (str "Date: " (.format java.time.format.DateTimeFormatter/RFC_1123_DATE_TIME
+ (java.time.OffsetDateTime/now java.time.ZoneOffset/UTC)))
+ "Content-Type: text/plain"
+ (str "Content-Length: " (.length x))
+ ""
+ x]))
+
+(defn echo-handler [s info]
+ (s/connect s s))
+
+(defn go []
+ (reset! echo-server (tcp/start-server echo-handler {:port 10001})))
+
+(defn halt []
+ (when @echo-server
+ (.close @echo-server)))
+
+(defn reset []
+ (halt)
+ (go))
start-serverからの戻り値はatomで包み後で停止できるようにして、各種ユーティリティを用意します。goで起動、resetで再起動、haltで停止。HTTPレスポンスの枠もresponse-echo関数を作りました。
ここからまず、クライアントからどのような形のデータがStreamから受け取ることのできるのか確認します。ドキュメントによればバイト配列を受け取れるみたいですが、byte-streamsというライブラリによって他のバイト表現に変換できるみたいですね。
--- a/project.clj
+++ b/project.clj
@@ -5,5 +5,6 @@
:url "https://www.eclipse.org/legal/epl-2.0/"}
:dependencies [[org.clojure/clojure "1.10.3"]
[manifold "0.2.3"]
- [aleph "0.4.6"]]
+ [aleph "0.4.6"]
+ [org.clj-commons/byte-streams "0.2.10"]]
:repl-options {:init-ns aleph-tcp-sample.core})
--- a/src/aleph_tcp_sample/core.clj
+++ b/src/aleph_tcp_sample/core.clj
@@ -2,7 +2,8 @@
(:require [aleph.tcp :as tcp]
[manifold.deferred :as d]
[manifold.stream :as s]
- [clojure.string :as str]))
+ [clojure.string :as str]
+ [byte-streams :as bs]))
(def echo-server (atom nil))
@@ -18,7 +19,14 @@
x]))
(defn echo-handler [s info]
- (s/connect s s))
+ (s/connect-via s
+ (fn [x]
+ (->> x
+ (bs/to-string)
+ (response-echo)
+ (bs/to-byte-buffer)
+ (s/put! s)))
+ s))
とりあえずクライアントから受け取ったデータをStringに変換→HTTPレスポンス生成→バイト列に戻す→Streamにputという流れを実装しました。
お、できてるっぽいです。
うまくいってるようだけど、ずっとKeep-Aliveが続いていました。
例えばApacheだとKeepAliveTimeoutがデフォルトで5秒みたいなので、クライアントから5秒データが来なかったらクローズする処理を入れる必要がありそうですね。
--- a/src/aleph_tcp_sample/core.clj
+++ b/src/aleph_tcp_sample/core.clj
@@ -19,14 +19,19 @@
x]))
(defn echo-handler [s info]
- (s/connect-via s
- (fn [x]
- (->> x
- (bs/to-string)
- (response-echo)
- (bs/to-byte-buffer)
- (s/put! s)))
- s))
+ (d/loop [n 100] ; MaxKeepAliveRequests
+ (-> (d/timeout! (s/take! s) 5000 :timeout) ;KeepAliveTimeout
+ (d/chain (fn [x]
+ (if (= :timeout x)
+ (s/close! s)
+ (do (->> x
+ (bs/to-string)
+ (response-echo)
+ (bs/to-byte-buffer)
+ (s/put! s))
+ (if (pos? n)
+ (d/recur (dec n))
+ (s/close! s)))))))))
一旦connect-viaから離れ、自分でtakeする方式に変更しました。この変更により、connect-viaによって制御されていた仕組み(duplex streamからデータを受け取ってそれをトリガーに処理を進めること)を自分で実装しなくてはなりません。この機能はd/chainで実現できます。また、takeするときはtimeoutの設定をしたいのでd/timeout関数でくるみました。
また、同様にconnect-viaによって自動で行われていたもう1つの仕組み(データを繰り返し受け取る)は d/loop-d/recurへと置き換えることが可能です。こちらは通常のClojureのloop-recurのDeferred版という認識で大きな問題はないと思います。例えばApacheでのMaxKeepAliveRequestsというKeep-Alive内で受け付けることのできるリクエスト数に相当するものと思います。
Wiresharkで再確認します。
ちゃんと1回目のリクエストと2回目のリクエスト(favicon.ico)の受け取ったあと5秒後に切断されてました。
curlでPOSTも試してみます。
% curl -X POST -H "Content-Type: application/json" -d '{"Name":"sensuikan1973", "Age":"100"}' localhost:10001/api/v1/users
POST /api/v1/users HTTP/1.1
Host: localhost:10001
User-Agent: curl/7.76.1
Accept: */*
Content-Type: application/json
Content-Length: 37
{"Name":"sensuikan1973", "Age":"100"}%
はい、問題なく返ってきました。
終わりに
今回作成したソースはgithubに置きました。
https://github.com/tempxla/aleph-tcp-sample
今回説明してないですが、clientもserverと同様にインターフェースがduplex streamとなっていてconnectで繋げると簡単にプロキシサーバーに成ります。以下にプロキシのようなものを作ってみたので公開しています。
https://github.com/tempxla/proxy-router
公式のsampleはglossというバイトフォーマットDSLを使用していて、HTTP以外や独自のプロトコルを組む場合に参考になると思います。
Manifoldに限らずClojureは全般的に難しいけど楽しいという感触を得ることができました。