Ring サーバーを実装する
どうも、僕です。
最近、Clojure 界隈から遠ざかっておりネタが全く思い浮かばなかったので
過去にプロトタイプでつくった Ring Server の話を書きたいと思います。
社用で使う用のプロトタイプなので全ては公開しませんが、気が向いたら公開するかも
知れません。
Ring SPEC
Ring
は Clojure で Web アプリケーション 開発する際の規格のようなものです。
python の wsgi
などの影響を受けており非常にシンプルな仕組みになっています。
ここでは詳細は割愛します。以下を参考にして下さい。
サーバーの実装の方針
割と使う側の話はあるのですがサーバー側実装についてはあまり情報ないかも知れません。
多くの場合は以下の2パターンになるでしょう。
- Servlet API から Ring へのマッピング
- 独自実装
Servlet API からのマッピングはあまり面白くなく、Servlet コンテナも必要になるので
少々面倒です。
uberjar
で 1ファイルで配布してしまいたいので独自実装で実装していきます。
速度面を考え、コアは netty
を使用し、その上に Ring 部を実装していきます。
呼び出し回数の少ない箇所や、動的に変更したい設定部分は Clojure で実装、コアは Java
で実装します。
Java -> Clojure
まず Java 側の実装です。
Clojure 連携の基本
Clojure から関数(Ring Handler)を受け取り、引数を Java で作成し実行します。
以下は Ring Handler を実行する例です
Map<Keyword, Object> response = (Map<Keyword, Object>) handler.invoke(PersistentArrayMap.create(reqMap));
Clojure の関数は IFn として表されます。
関数実行は invoke
メソッドでできます。
引数部は Clojure 側で使用する型に合わせなければなりませんが、多くの場合は簡単に
変換できるようヘルパーメソッドが用意されています。
Ring Response は Clojure 側の map (PersistentArrayMap) で返ってくるわけですが、
Map インターフェイスで扱えば特に意識する必要はないでしょう。
Keyword
Ring のリクエストは キーが Keyword
な map で表します。
Keyword
は以下のように作成できます。
public static final Keyword HTTP = Keyword.intern("http");
また使用する Keyword
はほぼ固定なので使いまわせるように static で作成しておく
方がよいでしょう。
public static final Keyword BODY_KEY = Keyword.intern("body");
public static final Keyword URI_KEY = Keyword.intern("uri");
public static final Keyword QUERY_KEY = Keyword.intern("query-string");
public static final Keyword REQUEST_METHOD_KEY = Keyword.intern("request-method");
public static final Keyword SERVER_NAME_KEY = Keyword.intern("server-name");
public static final Keyword SERVER_PORT_KEY = Keyword.intern("server-port");
public static final Keyword REMOTE_PORT_KEY = Keyword.intern("remote-port");
public static final Keyword SCHEME_KEY = Keyword.intern("scheme");
public static final Keyword CONTENT_TYPE_KEY = Keyword.intern("content-type");
public static final Keyword CONTENT_LENGTH_KEY = Keyword.intern("content-length");
public static final Keyword CHAR_ENC_KEY = Keyword.intern("charactor-encoding");
public static final Keyword HEADERS_KEY = Keyword.intern("headers");
public static final Keyword HTTP = Keyword.intern("http");
public static final Keyword STATUS = Keyword.intern("status");
public static final Keyword USE_SEND_FILE = Keyword.intern("use-send-file");
static {
METHOD_MAP = new HashMap<>();
METHOD_MAP.put(HttpMethod.GET, Keyword.intern("get"));
METHOD_MAP.put(HttpMethod.POST, Keyword.intern("post"));
METHOD_MAP.put(HttpMethod.PUT, Keyword.intern("put"));
METHOD_MAP.put(HttpMethod.PATCH, Keyword.intern("patch"));
METHOD_MAP.put(HttpMethod.OPTIONS, Keyword.intern("options"));
METHOD_MAP.put(HttpMethod.DELETE, Keyword.intern("delete"));
METHOD_MAP.put(HttpMethod.HEAD, Keyword.intern("head"));
METHOD_MAP.put(HttpMethod.CONNECT, Keyword.intern("connect"));
METHOD_MAP.put(HttpMethod.TRACE, Keyword.intern("trace"));
}
Ring Request
Ring Request は上記でもあるように PersistentArrayMap
です。
PersistentArrayMap
は Map から作成できるため、Map で作成後、最後に変換します。
private Map<Keyword, Object> createRingRequest(final ChannelHandlerContext ctx, final FullHttpRequest request) {
Map<Keyword, Object> m = new HashMap<>();
// :scheme
m.put(SCHEME_KEY, HTTP);
// :uri
// :query
setURIAndQuery(m, request);
// :server-name
// :server-port
// :content-type
// :content-length
// :charctor-encoding
setServerNames(m, request);
// :request-method
m.put(REQUEST_METHOD_KEY, getHttpMethod(request));
// :remote-address
m.put(REMOTE_PORT_KEY, getRemoteAddress(ctx, request));
// :headers
m.put(HEADERS_KEY, getRequestHeaders(request));
return m;
}
Ring Response
Ring Response は複数の型をサポートしています。
そのため型を調べ、実装します。
private void writeRingResponse(Channel ch, Map<Keyword, Object> ringResponse) throws Exception {
Integer statusCode = ((Long) ringResponse.get(STATUS)).intValue();
if (statusCode == null) {
statusCode = 200;
}
HttpResponseStatus status = HttpResponseStatus.valueOf(statusCode);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
setResponseHeaders(response, ringResponse);
Object body = ringResponse.get(BODY_KEY);
if (body != null) {
sendResponse(body, response, ch, keepAlive);
} else {
throw new IllegalArgumentException("response body is null");
}
}
private void setResponseHeaders(HttpResponse response, Map<Keyword, Object> ringResponse) {
HttpHeaders headers = response.headers();
@SuppressWarnings("unchecked")
final Map<String, Object> headerMap = (Map<String, Object>) ringResponse.get(HEADERS_KEY);
if (headerMap == null) {
return;
}
for (Map.Entry<String, Object> header : headerMap.entrySet()) {
headers.set(header.getKey(), (String)header.getValue());
}
if (keepAlive) {
headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
}
private void sendResponse(Object body, FullHttpResponse response, Channel ch, boolean keepAlive) throws Exception {
// System.out.println(Thread.currentThread());
if (body instanceof String) {
sendStringResponse((String) body, response, ch, keepAlive);
} else if (body instanceof ISeq) {
StringBuilder sb = new StringBuilder();
for (ISeq s = (ISeq) body; s != null; s = s.next()) {
String str = (String) s.first();
sb.append(str);
}
sendStringResponse(sb.toString(), response, ch, keepAlive);
} else if (body instanceof InputStream) {
sendInputStreamResponse((InputStream) body, response, ch, keepAlive);
} else if (body instanceof File) {
sendFileStreamResponse((File) body, response, ch, keepAlive);
} else {
//TODO WARN or 404 ?
sendStringResponse("", response, ch, keepAlive);
}
}
Clojure -> Java
Clojure 側は設定、組み立て部です。
基本はラッパーを作るだけです。
組み立て部は Clojure で書いた方が楽でしょう。
ラッパー
各種ラッパーを定義します。
Clojure から短い名前で呼び出せるようにしています。
(ns arianrod.netty
(:import
[java.nio.charset Charset]
[io.netty.bootstrap ServerBootstrap Bootstrap]
[io.netty.channel EventLoop EventLoopGroup ChannelHandler ChannelPipeline ChannelHandlerContext]
[arianrod.support CljChannelInHandler BootstrapFactory ChannelHandlerContextUtil CljSimpleChannelHandler UnpooledWrapper
ChunkedInputStream ChannelEventHandler]))
(defn- is-linux []
(= "Linux" (System/getProperty "os.name")))
(defmacro ^EventLoopGroup create-elg []
(if (is-linux)
`(io.netty.channel.epoll.EpollEventLoopGroup.)
`(io.netty.channel.nio.NioEventLoopGroup.)))
(defmacro ^EventLoop create-el []
(if (is-linux)
`(io.netty.channel.epoll.EpollEventLoop.)
`(io.netty.channel.nio.NioEventLoop.)))
(defmacro ^EventLoop create-nssc []
(if (is-linux)
`io.netty.channel.epoll.EpollServerSocketChannel
`io.netty.channel.socket.nio.NioServerSocketChannel))
(defmacro ^EventLoop create-nsc []
(if (is-linux)
`io.netty.channel.epoll.EpollSocketChannel
`io.netty.channel.socket.nio.NioSocketChannel))
(defmacro ^ChannelHandler channel-in-handler [m]
`(CljChannelInHandler. ~m))
(defn ^ChannelHandler make-handler [fn & {:keys [filter] :or {filter [:read :complete :exception]}}]
(ChannelEventHandler. fn (set filter)))
(defmacro ^ChannelHandler simple-handler [m]
`(CljSimpleChannelHandler. ~m))
(defmacro write-ctx [ctx msg]
`(ChannelHandlerContextUtil/writeCtx ~ctx ~msg))
(defmacro flush-ctx [ctx]
`(ChannelHandlerContextUtil/flushCtx ~ctx))
(defmacro write-flush-ctx [ctx msg]
`(ChannelHandlerContextUtil/writeAndFlushCtx ~ctx ~msg))
(defmacro close-ctx [ctx]
`(ChannelHandlerContextUtil/closeCtx ~ctx))
(defn ^ServerBootstrap server-factory [fn & {:keys [filter] :or {filter [:read :complete :exception]}}]
(BootstrapFactory/createSimpleServerBootstrap (create-nssc) fn (set filter)))
(defn ^ServerBootstrap server-factory-init [initializer]
(BootstrapFactory/createServerBootstrap (create-nssc) initializer))
(defn ^Bootstrap client-factory [initializer]
(BootstrapFactory/createBootstrap (create-nsc) initializer))
(defmacro add-last-pipeline [pipline name fn]
`(BootstrapFactory/addLastPipeline ~pipline ~name ~fn))
;; Buffer API
(defn wrap-buffer-bytes [a]
(UnpooledWrapper/wrappedBufferBytes a))
(defn copied-buffer [^String string ^Charset charset]
(UnpooledWrapper/copiedBuffer string charset))
(defn create-buf [cap]
(UnpooledWrapper/createBuffer cap))
(defn create-direct-buf [cap]
(UnpooledWrapper/createDirectBuffer cap))
(defn ->byte-array [s]
(byte-array (map (comp byte int) s)))
(defn ->bytes [s]
(bytes (byte-array (map (comp byte int) s))))
内部的に EventLoop の種類などを自動で切り替えます。
Server
起動部分はほぼ netty のお作法通りです。
(ns arianrod.server
(:import
[io.netty.bootstrap ServerBootstrap]
[io.netty.channel EventLoopGroup ChannelFuture])
(:require
[arianrod.netty :as netty]))
(def ^:dynamic *debug* false)
(def ^:dynamic *backlog* 1024)
(defn wait-future [^ChannelFuture future ^EventLoopGroup boss ^EventLoopGroup work]
(let [^ChannelFuture cf (-> (.channel future)
(.closeFuture))]
(fn []
(try
(.sync cf)
(finally
(.shutdownGracefully boss)
(.shutdownGracefully work))))))
(defn ^ServerBootstrap bind-server [^ServerBootstrap bootstrap ^long port]
(-> (.bind bootstrap port)
(.sync)))
(defn start-server* [factory port & [boss work]]
(let [boss (or boss (netty/create-elg))
work (or work (netty/create-elg))]
(-> (factory)
(.group boss work)
(bind-server port)
(wait-future boss work))))
(defn start-server [factory port & {:keys [wait] :or {wait true}}]
(let [waiter (start-server* factory port)]
(if wait
(waiter)
waiter)))
(defmacro start-server-wait [factory port]
`(start-server ~factory ~port :wait true))
(ns arianrod.ring
(:use arianrod.netty)
(:import
[io.netty.bootstrap ServerBootstrap]
[arianrod.support.ring RingBootstrapFactory RingChannelHandler]))
(def ^:dynamic *option* {:debug false
:backlog 1024
:use-send-file true})
(defn ring-server-factory [handler options]
(RingBootstrapFactory/createRingServerBootstrap
(create-nssc) options handler))
(defn create-ring-server
([handler] (create-ring-server handler *option*))
([handler options]
(ring-server-factory handler (merge *option* options))))
(defn create-http-server []
(create-ring-server
(fn [req]
{:status 200
:headers {"Content-Type" "text/plain"}
:body "hello world"})))
全体
package arianrod.support.ring;
import clojure.lang.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedFile;
import io.netty.handler.stream.ChunkedStream;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.Map;
public class RingChannelHandler extends SimpleChannelInboundHandler<HttpObject> {
public static final Keyword BODY_KEY = Keyword.intern("body");
public static final Keyword URI_KEY = Keyword.intern("uri");
public static final Keyword QUERY_KEY = Keyword.intern("query-string");
public static final Keyword REQUEST_METHOD_KEY = Keyword.intern("request-method");
public static final Keyword SERVER_NAME_KEY = Keyword.intern("server-name");
public static final Keyword SERVER_PORT_KEY = Keyword.intern("server-port");
public static final Keyword REMOTE_PORT_KEY = Keyword.intern("remote-port");
public static final Keyword SCHEME_KEY = Keyword.intern("scheme");
public static final Keyword CONTENT_TYPE_KEY = Keyword.intern("content-type");
public static final Keyword CONTENT_LENGTH_KEY = Keyword.intern("content-length");
public static final Keyword CHAR_ENC_KEY = Keyword.intern("charactor-encoding");
public static final Keyword HEADERS_KEY = Keyword.intern("headers");
public static final Keyword HTTP = Keyword.intern("http");
public static final Keyword STATUS = Keyword.intern("status");
public static final Keyword USE_SEND_FILE = Keyword.intern("use-send-file");
private static final String CHARSET = "charset=";
private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
private static final int BODY_MEMORY_LIMIT = 1024 * 1024 * 16;
private static final Map<HttpMethod, Keyword> METHOD_MAP;
static {
METHOD_MAP = new HashMap<>();
METHOD_MAP.put(HttpMethod.GET, Keyword.intern("get"));
METHOD_MAP.put(HttpMethod.POST, Keyword.intern("post"));
METHOD_MAP.put(HttpMethod.PUT, Keyword.intern("put"));
METHOD_MAP.put(HttpMethod.PATCH, Keyword.intern("patch"));
METHOD_MAP.put(HttpMethod.OPTIONS, Keyword.intern("options"));
METHOD_MAP.put(HttpMethod.DELETE, Keyword.intern("delete"));
METHOD_MAP.put(HttpMethod.HEAD, Keyword.intern("head"));
METHOD_MAP.put(HttpMethod.CONNECT, Keyword.intern("connect"));
METHOD_MAP.put(HttpMethod.TRACE, Keyword.intern("trace"));
}
private final APersistentMap option;
private final IFn handler;
private Map<Keyword, Object> reqMap;
private ByteBuf byteBuf = null;
private Path tempFilePath = null;
private boolean keepAlive;
public RingChannelHandler(final IFn handler, final APersistentMap option) {
this.handler = handler;
this.option = option;
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
private void callRingHandler(ChannelHandlerContext ctx, InputStream input) throws Exception {
//System.out.println(Thread.currentThread());
try (InputStream in = input) {
reqMap.put(BODY_KEY, in);
@SuppressWarnings("unchecked")
Map<Keyword, Object> response = (Map<Keyword, Object>) handler.invoke(PersistentArrayMap.create(reqMap));
if (response != null) {
writeRingResponse(ctx.channel(), response);
} else {
throw new IllegalArgumentException("response is null");
}
}
}
@Override
public void messageReceived(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof HttpRequest) {
FullHttpRequest request = (FullHttpRequest) msg;
keepAlive = HttpHeaderUtil.isKeepAlive(request);
Long contentLength = HttpHeaderUtil.getContentLength(request, 0);
HttpMethod method = request.method();
reqMap = this.createRingRequest(ctx, request);
if (method == HttpMethod.POST || method == HttpMethod.PUT) {
if (contentLength > BODY_MEMORY_LIMIT) {
// large data
tempFilePath = Files.createTempFile(null, null);
} else {
// memory
byteBuf = Unpooled.directBuffer(contentLength.intValue());
}
} else {
callRingHandler(ctx, new ByteBufInputStream(request.content()));
}
}
if (msg instanceof HttpContent) {
HttpContent content = (HttpContent) msg;
if (byteBuf != null) {
byteBuf.writeBytes(content.content());
if (msg instanceof LastHttpContent) {
callRingHandler(ctx, new ByteBufInputStream(byteBuf));
}
} else if (tempFilePath != null) {
Files.write(tempFilePath, content.content().array(), StandardOpenOption.APPEND);
if (msg instanceof LastHttpContent) {
callRingHandler(ctx, Files.newInputStream(tempFilePath, StandardOpenOption.DELETE_ON_CLOSE));
}
}
}
}
private Keyword getHttpMethod(final FullHttpRequest request) {
HttpMethod method = request.method();
if (METHOD_MAP.containsKey(method)) {
return METHOD_MAP.get(method);
}
return Keyword.intern(null, String.valueOf(method.name().toLowerCase()));
}
private void setURIAndQuery(final Map<Keyword, Object> m, final FullHttpRequest request) {
String url = request.uri();
int idx = url.indexOf('?');
String uri;
String queryString;
if (idx > 0) {
uri = url.substring(0, idx);
queryString = url.substring(idx + 1);
} else {
uri = url;
queryString = null;
}
m.put(URI_KEY, uri);
m.put(QUERY_KEY, queryString);
}
private void setServerNames(final Map<Keyword, Object> m, final FullHttpRequest request) {
HttpHeaders headers = request.headers();
String serverName = null;
Integer serverPort = null;
String contentType = null;
String charset = null;
String h = (String) headers.get("host");
if (h != null) {
int idx = h.lastIndexOf(':');
if (idx != -1) {
serverName = h.substring(0, idx);
serverPort = Integer.valueOf(h.substring(idx + 1));
} else {
serverName = h;
}
}
String ct = (String) headers.get(HttpHeaderNames.CONTENT_TYPE);
if (ct != null) {
int idx = ct.indexOf(";");
if (idx != -1) {
int cidx = ct.indexOf(CHARSET, idx);
if (cidx != -1) {
contentType = ct.substring(0, idx);
charset = ct.substring(cidx + CHARSET.length());
} else {
contentType = ct;
}
} else {
contentType = ct;
}
}
m.put(SERVER_NAME_KEY, serverName);
m.put(SERVER_PORT_KEY, serverPort);
m.put(CONTENT_TYPE_KEY, contentType);
long length = HttpHeaderUtil.getContentLength(request);
if (length > 0) {
m.put(CONTENT_LENGTH_KEY, length);
} else {
m.put(CONTENT_LENGTH_KEY, null);
}
m.put(CHAR_ENC_KEY, charset);
}
private String getRemoteAddress(final ChannelHandlerContext ctx, final FullHttpRequest request) {
String h = (String) request.headers().get("X-Forward-For");
if (h != null) {
int idx = h.indexOf(',');
if (idx == -1) {
return h;
} else {
// X-Forwarded-For: client, proxy1, proxy2
return h.substring(0, idx);
}
}
InetSocketAddress sockAddr = (InetSocketAddress) ctx.channel().remoteAddress();
return sockAddr.getAddress().getHostAddress();
}
private IPersistentMap getRequestHeaders(final FullHttpRequest request) {
HttpHeaders headers = request.headers();
Map<String, String> m = new HashMap<>();
for (Map.Entry<CharSequence, CharSequence> header : headers.entries()) {
String lo = header.getKey().toString().toLowerCase();
m.put(lo, (String)header.getValue());
}
return PersistentArrayMap.create(m);
}
private Map<Keyword, Object> createRingRequest(final ChannelHandlerContext ctx, final FullHttpRequest request) {
Map<Keyword, Object> m = new HashMap<>();
// :scheme
m.put(SCHEME_KEY, HTTP);
// :uri
// :query
setURIAndQuery(m, request);
// :server-name
// :server-port
// :content-type
// :content-length
// :charctor-encoding
setServerNames(m, request);
// :request-method
m.put(REQUEST_METHOD_KEY, getHttpMethod(request));
// :remote-address
m.put(REMOTE_PORT_KEY, getRemoteAddress(ctx, request));
// :headers
m.put(HEADERS_KEY, getRequestHeaders(request));
return m;
}
private void writeRingResponse(Channel ch, Map<Keyword, Object> ringResponse) throws Exception {
Integer statusCode = ((Long) ringResponse.get(STATUS)).intValue();
if (statusCode == null) {
statusCode = 200;
}
HttpResponseStatus status = HttpResponseStatus.valueOf(statusCode);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
setResponseHeaders(response, ringResponse);
Object body = ringResponse.get(BODY_KEY);
if (body != null) {
sendResponse(body, response, ch, keepAlive);
} else {
throw new IllegalArgumentException("response body is null");
}
}
private void setResponseHeaders(HttpResponse response, Map<Keyword, Object> ringResponse) {
HttpHeaders headers = response.headers();
@SuppressWarnings("unchecked")
final Map<String, Object> headerMap = (Map<String, Object>) ringResponse.get(HEADERS_KEY);
if (headerMap == null) {
return;
}
for (Map.Entry<String, Object> header : headerMap.entrySet()) {
headers.set(header.getKey(), (String)header.getValue());
}
if (keepAlive) {
headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
}
private void sendResponse(Object body, FullHttpResponse response, Channel ch, boolean keepAlive) throws Exception {
// System.out.println(Thread.currentThread());
if (body instanceof String) {
sendStringResponse((String) body, response, ch, keepAlive);
} else if (body instanceof ISeq) {
StringBuilder sb = new StringBuilder();
for (ISeq s = (ISeq) body; s != null; s = s.next()) {
String str = (String) s.first();
sb.append(str);
}
sendStringResponse(sb.toString(), response, ch, keepAlive);
} else if (body instanceof InputStream) {
sendInputStreamResponse((InputStream) body, response, ch, keepAlive);
} else if (body instanceof File) {
sendFileStreamResponse((File) body, response, ch, keepAlive);
} else {
//TODO WARN or 404 ?
sendStringResponse("", response, ch, keepAlive);
}
}
private void setContentLength(FullHttpResponse response, long length) {
HttpHeaders headers = response.headers();
CharSequence val = headers.get(HttpHeaderNames.CONTENT_LENGTH);
if (val == null || val.equals("")) {
headers.setLong(HttpHeaderNames.CONTENT_LENGTH, length);
}
}
private Charset getResponseCharset(FullHttpResponse response) {
HttpHeaders headers = response.headers();
String type = (String)headers.get(HttpHeaderNames.CONTENT_TYPE);
if (type != null) {
try {
type = type.toLowerCase();
int i = type.indexOf(CHARSET);
if (i != -1) {
String charset = type.substring(i + CHARSET.length()).trim();
return Charset.forName(charset);
}
} catch (Exception ignore) {
}
}
return DEFAULT_CHARSET;
}
private void sendStringResponse(final String body, final FullHttpResponse response, Channel ch, boolean keepAlive) {
Charset charset = getResponseCharset(response);
ByteBuf buffer = Unpooled.copiedBuffer(body, charset);
setContentLength(response, buffer.readableBytes());
response.content().writeBytes(buffer);
buffer.release();
if (keepAlive) {
ch.write(response);
//ctx.write(response).addListener(ChannelFutureListener.CLOSE);
} else {
ch.write(response).addListener(ChannelFutureListener.CLOSE);
}
}
private void sendInputStreamResponse(final InputStream stream, final FullHttpResponse response, Channel ch, boolean keepAlive) {
ch.write(response);
ChannelFuture future = ch.write(new ChunkedStream(stream));
future.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(Future<Void> f) throws Exception {
stream.close();
}
});
future.addListener(ChannelFutureListener.CLOSE);
}
private void sendFileStreamResponse(final File file, final FullHttpResponse response, Channel ch, boolean keepAlive) throws Exception {
Boolean useSendFile = (Boolean) option.get(USE_SEND_FILE);
RandomAccessFile raf;
try {
raf = new RandomAccessFile(file, "r");
} catch (FileNotFoundException e) {
return;
}
long fileLength = raf.length();
setContentLength(response, fileLength);
ch.write(response);
// Write the content.
if (useSendFile != null && useSendFile) {
ch.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ch.newProgressivePromise());
} else {
ch.write(new ChunkedFile(raf, 0, fileLength, 8192), ch.newProgressivePromise());
}
ChannelFuture lastContentFuture = ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
if (!keepAlive) {
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
}
static final class RingHandler implements Runnable {
private final RingChannelHandler ringChannelHandler;
private final Channel ch;
private final InputStream in;
public RingHandler(Channel ch, RingChannelHandler handler, InputStream in) {
this.ringChannelHandler = handler;
this.ch = ch;
this.in = in;
}
@Override
public void run() {
try (InputStream input = this.in) {
this.ringChannelHandler.reqMap.put(BODY_KEY, input);
@SuppressWarnings("unchecked")
Map<Keyword, Object> response = (Map<Keyword, Object>) this.ringChannelHandler.handler.invoke(PersistentArrayMap.create(this.ringChannelHandler.reqMap));
if (response != null) {
this.ringChannelHandler.writeRingResponse(this.ch, response);
} else {
throw new IllegalArgumentException("response is null");
}
} catch (Exception e) {
//TODO Logging
e.printStackTrace();
} finally {
this.ch.close();
}
}
}
}
package arianrod.support.ring;
import clojure.lang.APersistentMap;
import clojure.lang.IFn;
import clojure.lang.Keyword;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class RingBootstrapFactory {
public static final Keyword DEBUG_KEY = Keyword.intern("debug");
public static final Keyword BACKLOG_KEY = Keyword.intern("backlog");
public static ServerBootstrap createRingServerBootstrap(Class<? extends ServerChannel> channelClass, final APersistentMap option, final IFn handler) {
final Boolean debug = (Boolean) option.get(DEBUG_KEY);
final int backlog = ((Long) option.get(BACKLOG_KEY)).intValue();
ServerBootstrap b = new ServerBootstrap();
b.channel(channelClass)
.option(ChannelOption.SO_BACKLOG, backlog)
.option(ChannelOption.TCP_NODELAY, true)
.childHandler(new RingChannelInitializer(handler, option));
if (debug) {
b.handler(new LoggingHandler(LogLevel.DEBUG));
}
return b;
}
static class RingChannelInitializer extends ChannelInitializer<SocketChannel> {
private final IFn handler;
private final APersistentMap option;
public RingChannelInitializer(final IFn handler, final APersistentMap option) {
super();
this.option = option;
this.handler = handler;
}
@Override
public void initChannel(final SocketChannel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpRequestDecoder())
.addLast(new HttpObjectAggregator(65536))
.addLast(new HttpResponseEncoder())
.addLast(new ChunkedWriteHandler())
.addLast(new RingChannelHandler(handler, option));
}
}
}
package arianrod.support;
import clojure.lang.IFn;
import clojure.lang.Keyword;
import clojure.lang.PersistentArrayMap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
public class CljSimpleChannelHandler extends SimpleChannelInboundHandler {
private static final Logger logger = Logger.getLogger(
CljSimpleChannelHandler.class.getName());
private static final Keyword isSharableKey = Keyword.intern(null, "is-sharable");
private static final Keyword channelReadCompKey = Keyword.intern(null, "channel-read-complete");
private static final Keyword channelActiveKey = Keyword.intern(null, "channel-active");
private static final Keyword messageReceivedKey = Keyword.intern(null, "message-received");
private static final Keyword channelInactiveKey = Keyword.intern(null, "channel-in-active");
private static final Keyword exceptionCaughtKey = Keyword.intern(null, "exception-caught");
private final PersistentArrayMap map;
public CljSimpleChannelHandler(PersistentArrayMap map) {
this.map = map;
}
@Override
public boolean isSharable() {
if (this.map.containsKey(isSharableKey)) {
final IFn f = (IFn) this.map.valAt(isSharableKey);
return (Boolean) f.invoke();
} else {
return super.isSharable();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (this.map.containsKey(channelActiveKey)) {
final IFn f = (IFn) this.map.valAt(channelActiveKey);
f.invoke(ctx);
} else {
super.channelActive(ctx);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (this.map.containsKey(channelInactiveKey)) {
final IFn f = (IFn) this.map.valAt(channelInactiveKey);
f.invoke(ctx);
} else {
super.channelInactive(ctx);
}
}
@Override
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
if (this.map.containsKey(messageReceivedKey)) {
final IFn f = (IFn) this.map.valAt(messageReceivedKey);
f.invoke(ctx, msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (this.map.containsKey(channelReadCompKey)) {
final IFn f = (IFn) this.map.valAt(channelReadCompKey);
f.invoke(ctx);
} else {
super.channelReadComplete(ctx);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (this.map.containsKey(exceptionCaughtKey)) {
final IFn f = (IFn) this.map.valAt(exceptionCaughtKey);
f.invoke(ctx, cause);
} else {
logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
ctx.close();
}
}
}
package arianrod.support;
import clojure.lang.IFn;
import clojure.lang.Keyword;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.Set;
public class ChannelEventHandler extends ChannelHandlerAdapter {
public static final Keyword INACTIVE_KEY = Keyword.intern("inactive");
public static final Keyword ACTIVE_KEY = Keyword.intern("active");
public static final Keyword READ_KEY = Keyword.intern("read");
public static final Keyword COMPLETE_KEY = Keyword.intern("complete");
public static final Keyword EXCEPTION_KEY = Keyword.intern("exception");
private EventSource eventSource;
private final IFn handler;
private final Set filter;
public ChannelEventHandler(final IFn handler, final Set filter) {
this.handler = handler;
this.filter = filter;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (!this.filter.contains(INACTIVE_KEY)) {
return;
}
if (eventSource == null) {
this.eventSource = new EventSource(ctx, INACTIVE_KEY);
} else {
this.eventSource.ctx = ctx;
this.eventSource.type = INACTIVE_KEY;
this.eventSource.object = null;
}
this.handler.invoke(this.eventSource);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (!this.filter.contains(ACTIVE_KEY)) {
return;
}
if (eventSource == null) {
this.eventSource = new EventSource(ctx, ACTIVE_KEY);
} else {
this.eventSource.ctx = ctx;
this.eventSource.type = ACTIVE_KEY;
this.eventSource.object = null;
}
this.handler.invoke(this.eventSource);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (!this.filter.contains(COMPLETE_KEY)) {
return;
}
if (eventSource == null) {
this.eventSource = new EventSource(ctx, COMPLETE_KEY);
} else {
this.eventSource.ctx = ctx;
this.eventSource.type = COMPLETE_KEY;
this.eventSource.object = null;
}
this.handler.invoke(this.eventSource);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (!this.filter.contains(EXCEPTION_KEY)) {
return;
}
if (eventSource == null) {
this.eventSource = new EventSource(ctx, EXCEPTION_KEY, cause);
} else {
this.eventSource.ctx = ctx;
this.eventSource.type = EXCEPTION_KEY;
this.eventSource.object = cause;
}
this.handler.invoke(this.eventSource);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!this.filter.contains(COMPLETE_KEY)) {
return;
}
if (eventSource == null) {
this.eventSource = new EventSource(ctx, READ_KEY, msg);
} else {
this.eventSource.ctx = ctx;
this.eventSource.type = READ_KEY;
this.eventSource.object = msg;
}
this.handler.invoke(this.eventSource);
}
}
package arianrod.support;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
public class ChannelHandlerContextUtil {
public static final ChannelFuture writeCtx(ChannelHandlerContext ctx, Object msg) {
return ctx.write(msg);
}
public static final ChannelHandlerContext flushCtx(ChannelHandlerContext ctx) {
return ctx.flush();
}
public static final ChannelFuture closeCtx(ChannelHandlerContext ctx) {
return ctx.close();
}
public static final ChannelFuture writeAndFlushCtx(ChannelHandlerContext ctx, Object msg) {
return ctx.writeAndFlush(msg);
}
}
package arianrod.support;
import clojure.lang.IFn;
import clojure.lang.Keyword;
import clojure.lang.PersistentArrayMap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerAdapter;
import java.util.logging.Level;
import java.util.logging.Logger;
public class CljChannelInHandler extends ChannelHandlerAdapter {
private static final Logger logger = Logger.getLogger(
CljChannelInHandler.class.getName());
private static final Keyword isSharableKey = Keyword.intern(null, "is-sharable");
private static final Keyword channelReadKey = Keyword.intern(null, "channel-read");
private static final Keyword channelReadCompKey = Keyword.intern(null, "channel-read-complete");
private static final Keyword channelActiveKey = Keyword.intern(null, "channel-active");
private static final Keyword channelInactiveKey = Keyword.intern(null, "channel-in-active");
private static final Keyword exceptionCaughtKey = Keyword.intern(null, "exception-caught");
private final PersistentArrayMap map;
public CljChannelInHandler(PersistentArrayMap map) {
this.map = map;
}
@Override
public boolean isSharable() {
if (this.map.containsKey(isSharableKey)) {
final IFn f = (IFn) this.map.valAt(isSharableKey);
return (Boolean) f.invoke();
} else {
return super.isSharable();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (this.map.containsKey(channelActiveKey)) {
final IFn f = (IFn) this.map.valAt(channelActiveKey);
f.invoke(ctx);
} else {
super.channelActive(ctx);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (this.map.containsKey(channelInactiveKey)) {
final IFn f = (IFn) this.map.valAt(channelInactiveKey);
f.invoke(ctx);
} else {
super.channelInactive(ctx);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (this.map.containsKey(channelReadKey)) {
final IFn f = (IFn) this.map.valAt(channelReadKey);
f.invoke(ctx, msg);
} else {
super.channelRead(ctx, msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (this.map.containsKey(channelReadCompKey)) {
final IFn f = (IFn) this.map.valAt(channelReadCompKey);
f.invoke(ctx);
} else {
super.channelReadComplete(ctx);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (this.map.containsKey(exceptionCaughtKey)) {
final IFn f = (IFn) this.map.valAt(exceptionCaughtKey);
f.invoke(ctx, cause);
} else {
logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
ctx.close();
}
}
}
package arianrod.support;
import clojure.lang.IDeref;
import clojure.lang.IFn;
import clojure.lang.Keyword;
import clojure.lang.RT;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.ChannelMatcher;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class EventSource implements IDeref {
public static Map<String, DefaultChannelGroup> groupMap = new ConcurrentHashMap<>();
public Keyword type;
public Object object;
public ChannelHandlerContext ctx;
public EventSource(ChannelHandlerContext ctx, Keyword type) {
this.ctx = ctx;
this.type = type;
}
public EventSource(ChannelHandlerContext ctx, Keyword type, Object object) {
this(ctx, type);
this.object = object;
}
public void addGroup(String ns) {
if (!groupMap.containsKey(ns)) {
DefaultChannelGroup defaultChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
groupMap.put(ns, defaultChannelGroup);
}
}
public void joinGroup(String ns) {
this.addGroup(ns);
groupMap.get(ns).add(ctx.channel());
}
public ChannelGroupFuture putGroup(String ns, Object msg) {
if (groupMap.containsKey(ns)) {
return groupMap.get(ns).writeAndFlush(msg);
}
return null;
}
public ChannelGroupFuture putGroup(String ns, Object msg, final IFn matcher) {
if (groupMap.containsKey(ns)) {
return groupMap.get(ns).writeAndFlush(msg, new ChannelMatcher() {
@Override
public boolean matches(Channel channel) {
Object res = matcher.invoke(channel);
return res instanceof Boolean && (Boolean) res;
}
});
}
return null;
}
public ChannelGroupFuture closeGroup(String ns) {
if (groupMap.containsKey(ns)) {
return groupMap.get(ns).close();
}
return null;
}
public ChannelGroupFuture closeGroup(String ns, final IFn matcher) {
if (groupMap.containsKey(ns)) {
return groupMap.get(ns).close(new ChannelMatcher() {
@Override
public boolean matches(Channel channel) {
Object res = matcher.invoke(channel);
return res instanceof Boolean && (Boolean) res;
}
});
}
return null;
}
public void sendRemote(String host, int port, final IFn initializer, final IFn channelWriter) throws InterruptedException{
Bootstrap b = BootstrapFactory.createBootstrap(this.ctx.channel().getClass(), initializer);
b = b.group(this.ctx.channel().eventLoop());
Channel channel = b.connect(host, port).sync().channel();
channelWriter.invoke(channel);
channel.closeFuture().sync();
}
@Override
public Object deref() {
return RT.vector(this.type, this.object);
}
public Object get() {
return this.object;
}
public EventSource put(Object o) {
this.ctx.write(o);
return this;
}
public EventSource putFlush(Object o) {
this.ctx.writeAndFlush(o);
return this;
}
public EventSource flush() {
this.ctx.flush();
return this;
}
}
package arianrod.support;
import clojure.lang.IFn;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
public class SimpleChannelInitializer extends ChannelInitializer<SocketChannel> {
private final IFn callback;
public SimpleChannelInitializer(IFn callback) {
this.callback = callback;
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
this.callback.invoke(ch.pipeline());
}
}
package arianrod.support;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
public class UnpooledWrapper {
public static final ByteBuf wrappedBufferBytes(byte[] array) {
return Unpooled.wrappedBuffer(array);
}
public static final ByteBuf wrappedBuffer(ByteBuf buf) {
return Unpooled.wrappedBuffer(buf);
}
public static final ByteBuf wrappedBuffer(ByteBuffer buf) {
return Unpooled.wrappedBuffer(buf);
}
public static final ByteBuf copiedBuffer(byte[] array) {
return Unpooled.copiedBuffer(array);
}
public static final ByteBuf copiedBuffer(ByteBuf buf) {
return Unpooled.copiedBuffer(buf);
}
public static final ByteBuf copiedBuffer(ByteBuffer buf) {
return Unpooled.copiedBuffer(buf);
}
public static final ByteBuf copiedBuffer(CharSequence string, Charset charset) {
return Unpooled.copiedBuffer(string, charset);
}
public static final ByteBuf createBuffer(int capacity) {
return Unpooled.buffer(capacity);
}
public static final ByteBuf createDirectBuffer(int capacity) {
return Unpooled.directBuffer(capacity);
}
}
package arianrod.support;
import clojure.lang.IFn;
import clojure.lang.RT;
import clojure.lang.Var;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import java.util.Set;
public class BootstrapFactory {
public static ServerBootstrap createSimpleServerBootstrap(Class<? extends ServerChannel> channelClass, final IFn handler, final Set filter) {
Var var = RT.var("arianrod.server", "*debug*");
Boolean debug = (Boolean) var.get();
var = RT.var("arianrod.server", "*backlog*");
int backlog = ((Long) var.get()).intValue();
ServerBootstrap b = new ServerBootstrap();
b.channel(channelClass)
.option(ChannelOption.SO_BACKLOG, backlog)
.option(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ChannelEventHandler(handler, filter));
}
});
if (debug) {
b.handler(new LoggingHandler(LogLevel.DEBUG));
}
return b;
}
public static ServerBootstrap createServerBootstrap(Class<? extends ServerChannel> channelClass, final IFn initializer) {
Var var = RT.var("arianrod.server", "*debug*");
Boolean debug = (Boolean) var.get();
var = RT.var("arianrod.server", "*backlog*");
int backlog = ((Long) var.get()).intValue();
ServerBootstrap b = new ServerBootstrap();
b.channel(channelClass)
.option(ChannelOption.SO_BACKLOG, (int) backlog)
.option(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
initializer.invoke(ch.pipeline());
}
});
if (debug) {
b.handler(new LoggingHandler(LogLevel.DEBUG));
}
return b;
}
public static Bootstrap createBootstrap(Class<? extends Channel> channelClass, final IFn initializer) {
Var var = RT.var("arianrod.client", "*debug*");
Boolean debug = (Boolean) var.get();
Bootstrap b = new Bootstrap();
b.channel(channelClass)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
initializer.invoke(ch.pipeline());
}
});
if (debug) {
b.handler(new LoggingHandler(LogLevel.DEBUG));
}
return b;
}
public static ChannelPipeline addLastPipeline(ChannelPipeline pipeline, String name, ChannelHandler handler) {
return pipeline.addLast(name, handler);
}
}
終わりに
素直に Go 書きましょう!