LoginSignup
9

More than 5 years have passed since last update.

Ring サーバーを実装する

Posted at

Ring サーバーを実装する

どうも、僕です。
最近、Clojure 界隈から遠ざかっておりネタが全く思い浮かばなかったので
過去にプロトタイプでつくった Ring Server の話を書きたいと思います。
社用で使う用のプロトタイプなので全ては公開しませんが、気が向いたら公開するかも
知れません。

Ring SPEC

Ring は Clojure で Web アプリケーション 開発する際の規格のようなものです。
python の wsgi などの影響を受けており非常にシンプルな仕組みになっています。

ここでは詳細は割愛します。以下を参考にして下さい。

サーバーの実装の方針

割と使う側の話はあるのですがサーバー側実装についてはあまり情報ないかも知れません。
多くの場合は以下の2パターンになるでしょう。

  • Servlet API から Ring へのマッピング
  • 独自実装

Servlet API からのマッピングはあまり面白くなく、Servlet コンテナも必要になるので
少々面倒です。

uberjar で 1ファイルで配布してしまいたいので独自実装で実装していきます。
速度面を考え、コアは netty を使用し、その上に Ring 部を実装していきます。
呼び出し回数の少ない箇所や、動的に変更したい設定部分は Clojure で実装、コアは Java
で実装します。

Java -> Clojure

まず Java 側の実装です。

Clojure 連携の基本

Clojure から関数(Ring Handler)を受け取り、引数を Java で作成し実行します。
以下は Ring Handler を実行する例です

Map<Keyword, Object> response = (Map<Keyword, Object>) handler.invoke(PersistentArrayMap.create(reqMap));

Clojure の関数は IFn として表されます。
関数実行は invoke メソッドでできます。
引数部は Clojure 側で使用する型に合わせなければなりませんが、多くの場合は簡単に
変換できるようヘルパーメソッドが用意されています。

Ring Response は Clojure 側の map (PersistentArrayMap) で返ってくるわけですが、
Map インターフェイスで扱えば特に意識する必要はないでしょう。

Keyword

Ring のリクエストは キーが Keyword な map で表します。
Keyword は以下のように作成できます。

public static final Keyword HTTP = Keyword.intern("http");

また使用する Keyword はほぼ固定なので使いまわせるように static で作成しておく
方がよいでしょう。

public static final Keyword BODY_KEY = Keyword.intern("body");
public static final Keyword URI_KEY = Keyword.intern("uri");
public static final Keyword QUERY_KEY = Keyword.intern("query-string");
public static final Keyword REQUEST_METHOD_KEY = Keyword.intern("request-method");
public static final Keyword SERVER_NAME_KEY = Keyword.intern("server-name");

public static final Keyword SERVER_PORT_KEY = Keyword.intern("server-port");
public static final Keyword REMOTE_PORT_KEY = Keyword.intern("remote-port");
public static final Keyword SCHEME_KEY = Keyword.intern("scheme");
public static final Keyword CONTENT_TYPE_KEY = Keyword.intern("content-type");
public static final Keyword CONTENT_LENGTH_KEY = Keyword.intern("content-length");
public static final Keyword CHAR_ENC_KEY = Keyword.intern("charactor-encoding");
public static final Keyword HEADERS_KEY = Keyword.intern("headers");

public static final Keyword HTTP = Keyword.intern("http");
public static final Keyword STATUS = Keyword.intern("status");
public static final Keyword USE_SEND_FILE = Keyword.intern("use-send-file");
static {
    METHOD_MAP = new HashMap<>();
    METHOD_MAP.put(HttpMethod.GET, Keyword.intern("get"));
    METHOD_MAP.put(HttpMethod.POST, Keyword.intern("post"));
    METHOD_MAP.put(HttpMethod.PUT, Keyword.intern("put"));
    METHOD_MAP.put(HttpMethod.PATCH, Keyword.intern("patch"));
    METHOD_MAP.put(HttpMethod.OPTIONS, Keyword.intern("options"));
    METHOD_MAP.put(HttpMethod.DELETE, Keyword.intern("delete"));
    METHOD_MAP.put(HttpMethod.HEAD, Keyword.intern("head"));
    METHOD_MAP.put(HttpMethod.CONNECT, Keyword.intern("connect"));
    METHOD_MAP.put(HttpMethod.TRACE, Keyword.intern("trace"));
}

Ring Request

Ring Request は上記でもあるように PersistentArrayMap です。
PersistentArrayMap は Map から作成できるため、Map で作成後、最後に変換します。

private Map<Keyword, Object> createRingRequest(final ChannelHandlerContext ctx, final FullHttpRequest request) {

    Map<Keyword, Object> m = new HashMap<>();
    // :scheme
    m.put(SCHEME_KEY, HTTP);
    // :uri
    // :query
    setURIAndQuery(m, request);
    // :server-name
    // :server-port
    // :content-type
    // :content-length
    // :charctor-encoding
    setServerNames(m, request);
    // :request-method
    m.put(REQUEST_METHOD_KEY, getHttpMethod(request));
    // :remote-address
    m.put(REMOTE_PORT_KEY, getRemoteAddress(ctx, request));
    // :headers
    m.put(HEADERS_KEY, getRequestHeaders(request));

    return m;
}

Ring Response

Ring Response は複数の型をサポートしています。
そのため型を調べ、実装します。


private void writeRingResponse(Channel ch, Map<Keyword, Object> ringResponse) throws Exception {

    Integer statusCode = ((Long) ringResponse.get(STATUS)).intValue();
    if (statusCode == null) {
        statusCode = 200;
    }
    HttpResponseStatus status = HttpResponseStatus.valueOf(statusCode);
    FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);

    setResponseHeaders(response, ringResponse);

    Object body = ringResponse.get(BODY_KEY);

    if (body != null) {
        sendResponse(body, response, ch, keepAlive);
    } else {
        throw new IllegalArgumentException("response body is null");
    }
}

private void setResponseHeaders(HttpResponse response, Map<Keyword, Object> ringResponse) {
    HttpHeaders headers = response.headers();

    @SuppressWarnings("unchecked")
        final Map<String, Object> headerMap = (Map<String, Object>) ringResponse.get(HEADERS_KEY);

    if (headerMap == null) {
        return;
    }

    for (Map.Entry<String, Object> header : headerMap.entrySet()) {
        headers.set(header.getKey(), (String)header.getValue());
    }

    if (keepAlive) {
        headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
    }

}

private void sendResponse(Object body, FullHttpResponse response, Channel ch, boolean keepAlive) throws Exception {

    // System.out.println(Thread.currentThread());
    if (body instanceof String) {
        sendStringResponse((String) body, response, ch, keepAlive);
    } else if (body instanceof ISeq) {
        StringBuilder sb = new StringBuilder();
        for (ISeq s = (ISeq) body; s != null; s = s.next()) {
            String str = (String) s.first();
            sb.append(str);
        }
        sendStringResponse(sb.toString(), response, ch, keepAlive);
    } else if (body instanceof InputStream) {
        sendInputStreamResponse((InputStream) body, response, ch, keepAlive);
    } else if (body instanceof File) {
        sendFileStreamResponse((File) body, response, ch, keepAlive);
    } else {
        //TODO WARN or 404 ?
        sendStringResponse("", response, ch, keepAlive);
    }
}

Clojure -> Java

Clojure 側は設定、組み立て部です。
基本はラッパーを作るだけです。
組み立て部は Clojure で書いた方が楽でしょう。

ラッパー

各種ラッパーを定義します。
Clojure から短い名前で呼び出せるようにしています。

(ns arianrod.netty
  (:import
   [java.nio.charset Charset]
   [io.netty.bootstrap ServerBootstrap Bootstrap]
   [io.netty.channel EventLoop EventLoopGroup ChannelHandler ChannelPipeline ChannelHandlerContext]
   [arianrod.support CljChannelInHandler BootstrapFactory ChannelHandlerContextUtil CljSimpleChannelHandler UnpooledWrapper
    ChunkedInputStream ChannelEventHandler]))

(defn- is-linux []
  (= "Linux" (System/getProperty "os.name")))

(defmacro ^EventLoopGroup create-elg []
  (if (is-linux)
    `(io.netty.channel.epoll.EpollEventLoopGroup.)
    `(io.netty.channel.nio.NioEventLoopGroup.)))

(defmacro ^EventLoop create-el []
  (if (is-linux)
    `(io.netty.channel.epoll.EpollEventLoop.)
    `(io.netty.channel.nio.NioEventLoop.)))

(defmacro ^EventLoop create-nssc []
  (if (is-linux)
    `io.netty.channel.epoll.EpollServerSocketChannel
    `io.netty.channel.socket.nio.NioServerSocketChannel))

(defmacro ^EventLoop create-nsc []
  (if (is-linux)
    `io.netty.channel.epoll.EpollSocketChannel
    `io.netty.channel.socket.nio.NioSocketChannel))

(defmacro ^ChannelHandler channel-in-handler [m]
  `(CljChannelInHandler. ~m))

(defn ^ChannelHandler make-handler [fn & {:keys [filter] :or {filter [:read :complete :exception]}}]
  (ChannelEventHandler. fn (set filter)))

(defmacro ^ChannelHandler simple-handler [m]
  `(CljSimpleChannelHandler. ~m))

(defmacro write-ctx [ctx msg]
  `(ChannelHandlerContextUtil/writeCtx ~ctx ~msg))

(defmacro flush-ctx [ctx]
  `(ChannelHandlerContextUtil/flushCtx ~ctx))

(defmacro write-flush-ctx [ctx msg]
  `(ChannelHandlerContextUtil/writeAndFlushCtx ~ctx ~msg))

(defmacro close-ctx [ctx]
  `(ChannelHandlerContextUtil/closeCtx ~ctx))

(defn ^ServerBootstrap server-factory [fn & {:keys [filter] :or {filter [:read :complete :exception]}}]
  (BootstrapFactory/createSimpleServerBootstrap (create-nssc) fn (set filter)))

(defn ^ServerBootstrap server-factory-init [initializer]
  (BootstrapFactory/createServerBootstrap (create-nssc) initializer))

(defn ^Bootstrap client-factory [initializer]
  (BootstrapFactory/createBootstrap (create-nsc) initializer))

(defmacro add-last-pipeline [pipline name fn]
  `(BootstrapFactory/addLastPipeline ~pipline ~name ~fn))

;; Buffer API

(defn wrap-buffer-bytes [a]
  (UnpooledWrapper/wrappedBufferBytes a))

(defn copied-buffer [^String string ^Charset charset]
  (UnpooledWrapper/copiedBuffer string charset))

(defn create-buf [cap]
  (UnpooledWrapper/createBuffer cap))

(defn create-direct-buf [cap]
  (UnpooledWrapper/createDirectBuffer cap))

(defn ->byte-array [s]
  (byte-array (map (comp byte int) s)))

(defn ->bytes [s]
  (bytes (byte-array (map (comp byte int) s))))

内部的に EventLoop の種類などを自動で切り替えます。

Server

起動部分はほぼ netty のお作法通りです。

(ns arianrod.server
  (:import
   [io.netty.bootstrap ServerBootstrap]
   [io.netty.channel EventLoopGroup ChannelFuture])
  (:require
   [arianrod.netty :as netty]))

(def ^:dynamic *debug* false)
(def ^:dynamic *backlog* 1024)

(defn wait-future [^ChannelFuture future ^EventLoopGroup boss ^EventLoopGroup work]
  (let [^ChannelFuture cf (-> (.channel future)
                              (.closeFuture))]
    (fn []
      (try
        (.sync cf)
        (finally
          (.shutdownGracefully boss)
          (.shutdownGracefully work))))))

(defn ^ServerBootstrap bind-server [^ServerBootstrap bootstrap ^long port]
  (-> (.bind bootstrap port)
      (.sync)))

(defn start-server* [factory port & [boss work]]
  (let [boss (or boss (netty/create-elg))
        work (or work (netty/create-elg))]
    (-> (factory)
        (.group boss work)
        (bind-server port)
        (wait-future boss work))))

(defn start-server [factory port & {:keys [wait] :or {wait true}}]
  (let [waiter (start-server* factory port)]
    (if wait
      (waiter)
      waiter)))

(defmacro start-server-wait [factory port]
  `(start-server ~factory ~port :wait true))
(ns arianrod.ring
  (:use arianrod.netty)
  (:import
   [io.netty.bootstrap ServerBootstrap]
   [arianrod.support.ring RingBootstrapFactory RingChannelHandler]))

(def ^:dynamic *option* {:debug false
                         :backlog 1024
                         :use-send-file true})

(defn ring-server-factory [handler options]
  (RingBootstrapFactory/createRingServerBootstrap
   (create-nssc) options handler))

(defn create-ring-server
  ([handler] (create-ring-server handler *option*))
  ([handler options]
     (ring-server-factory handler (merge *option* options))))

(defn create-http-server []
  (create-ring-server
   (fn [req]
     {:status 200
      :headers {"Content-Type" "text/plain"}
      :body "hello world"})))

全体

package arianrod.support.ring;

import clojure.lang.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedFile;
import io.netty.handler.stream.ChunkedStream;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.Map;

public class RingChannelHandler extends SimpleChannelInboundHandler<HttpObject> {

    public static final Keyword BODY_KEY = Keyword.intern("body");
    public static final Keyword URI_KEY = Keyword.intern("uri");
    public static final Keyword QUERY_KEY = Keyword.intern("query-string");
    public static final Keyword REQUEST_METHOD_KEY = Keyword.intern("request-method");
    public static final Keyword SERVER_NAME_KEY = Keyword.intern("server-name");

    public static final Keyword SERVER_PORT_KEY = Keyword.intern("server-port");
    public static final Keyword REMOTE_PORT_KEY = Keyword.intern("remote-port");
    public static final Keyword SCHEME_KEY = Keyword.intern("scheme");
    public static final Keyword CONTENT_TYPE_KEY = Keyword.intern("content-type");
    public static final Keyword CONTENT_LENGTH_KEY = Keyword.intern("content-length");
    public static final Keyword CHAR_ENC_KEY = Keyword.intern("charactor-encoding");
    public static final Keyword HEADERS_KEY = Keyword.intern("headers");

    public static final Keyword HTTP = Keyword.intern("http");
    public static final Keyword STATUS = Keyword.intern("status");
    public static final Keyword USE_SEND_FILE = Keyword.intern("use-send-file");

    private static final String CHARSET = "charset=";
    private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");

    private static final int BODY_MEMORY_LIMIT = 1024 * 1024 * 16;

    private static final Map<HttpMethod, Keyword> METHOD_MAP;

    static {
        METHOD_MAP = new HashMap<>();
        METHOD_MAP.put(HttpMethod.GET, Keyword.intern("get"));
        METHOD_MAP.put(HttpMethod.POST, Keyword.intern("post"));
        METHOD_MAP.put(HttpMethod.PUT, Keyword.intern("put"));
        METHOD_MAP.put(HttpMethod.PATCH, Keyword.intern("patch"));
        METHOD_MAP.put(HttpMethod.OPTIONS, Keyword.intern("options"));
        METHOD_MAP.put(HttpMethod.DELETE, Keyword.intern("delete"));
        METHOD_MAP.put(HttpMethod.HEAD, Keyword.intern("head"));
        METHOD_MAP.put(HttpMethod.CONNECT, Keyword.intern("connect"));
        METHOD_MAP.put(HttpMethod.TRACE, Keyword.intern("trace"));
    }

    private final APersistentMap option;
    private final IFn handler;
    private Map<Keyword, Object> reqMap;
    private ByteBuf byteBuf = null;
    private Path tempFilePath = null;
    private boolean keepAlive;

    public RingChannelHandler(final IFn handler, final APersistentMap option) {
        this.handler = handler;
        this.option = option;
    }

    @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    private void callRingHandler(ChannelHandlerContext ctx, InputStream input) throws Exception {
        //System.out.println(Thread.currentThread());

        try (InputStream in = input) {
                reqMap.put(BODY_KEY, in);

                @SuppressWarnings("unchecked")
                Map<Keyword, Object> response = (Map<Keyword, Object>) handler.invoke(PersistentArrayMap.create(reqMap));
                if (response != null) {
                    writeRingResponse(ctx.channel(), response);
                } else {
                    throw new IllegalArgumentException("response is null");
                }

            }
    }

    @Override
        public void messageReceived(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        if (msg instanceof HttpRequest) {
            FullHttpRequest request = (FullHttpRequest) msg;
            keepAlive = HttpHeaderUtil.isKeepAlive(request);

            Long contentLength = HttpHeaderUtil.getContentLength(request, 0);
            HttpMethod method = request.method();
            reqMap = this.createRingRequest(ctx, request);

            if (method == HttpMethod.POST || method == HttpMethod.PUT) {
                if (contentLength > BODY_MEMORY_LIMIT) {
                    // large data
                    tempFilePath = Files.createTempFile(null, null);

                } else {
                    // memory
                    byteBuf = Unpooled.directBuffer(contentLength.intValue());
                }
            } else {
                callRingHandler(ctx, new ByteBufInputStream(request.content()));
            }
        }

        if (msg instanceof HttpContent) {
            HttpContent content = (HttpContent) msg;
            if (byteBuf != null) {
                byteBuf.writeBytes(content.content());

                if (msg instanceof LastHttpContent) {
                    callRingHandler(ctx, new ByteBufInputStream(byteBuf));
                }
            } else if (tempFilePath != null) {
                Files.write(tempFilePath, content.content().array(), StandardOpenOption.APPEND);

                if (msg instanceof LastHttpContent) {
                    callRingHandler(ctx, Files.newInputStream(tempFilePath, StandardOpenOption.DELETE_ON_CLOSE));
                }
            }
        }
    }

    private Keyword getHttpMethod(final FullHttpRequest request) {
        HttpMethod method = request.method();
        if (METHOD_MAP.containsKey(method)) {
            return METHOD_MAP.get(method);
        }
        return Keyword.intern(null, String.valueOf(method.name().toLowerCase()));
    }

    private void setURIAndQuery(final Map<Keyword, Object> m, final FullHttpRequest request) {
        String url = request.uri();
        int idx = url.indexOf('?');
        String uri;
        String queryString;
        if (idx > 0) {
            uri = url.substring(0, idx);
            queryString = url.substring(idx + 1);
        } else {
            uri = url;
            queryString = null;
        }
        m.put(URI_KEY, uri);
        m.put(QUERY_KEY, queryString);
    }

    private void setServerNames(final Map<Keyword, Object> m, final FullHttpRequest request) {
        HttpHeaders headers = request.headers();
        String serverName = null;
        Integer serverPort = null;
        String contentType = null;
        String charset = null;

        String h = (String) headers.get("host");
        if (h != null) {
            int idx = h.lastIndexOf(':');
            if (idx != -1) {
                serverName = h.substring(0, idx);
                serverPort = Integer.valueOf(h.substring(idx + 1));
            } else {
                serverName = h;
            }
        }

        String ct = (String) headers.get(HttpHeaderNames.CONTENT_TYPE);
        if (ct != null) {
            int idx = ct.indexOf(";");
            if (idx != -1) {
                int cidx = ct.indexOf(CHARSET, idx);
                if (cidx != -1) {
                    contentType = ct.substring(0, idx);
                    charset = ct.substring(cidx + CHARSET.length());
                } else {
                    contentType = ct;
                }
            } else {
                contentType = ct;
            }
        }

        m.put(SERVER_NAME_KEY, serverName);
        m.put(SERVER_PORT_KEY, serverPort);
        m.put(CONTENT_TYPE_KEY, contentType);

        long length = HttpHeaderUtil.getContentLength(request);
        if (length > 0) {
            m.put(CONTENT_LENGTH_KEY, length);
        } else {
            m.put(CONTENT_LENGTH_KEY, null);
        }
        m.put(CHAR_ENC_KEY, charset);
    }

    private String getRemoteAddress(final ChannelHandlerContext ctx, final FullHttpRequest request) {
        String h = (String) request.headers().get("X-Forward-For");
        if (h != null) {
            int idx = h.indexOf(',');
            if (idx == -1) {
                return h;
            } else {
                // X-Forwarded-For: client, proxy1, proxy2
                return h.substring(0, idx);
            }
        }
        InetSocketAddress sockAddr = (InetSocketAddress) ctx.channel().remoteAddress();
        return sockAddr.getAddress().getHostAddress();
    }

    private IPersistentMap getRequestHeaders(final FullHttpRequest request) {
        HttpHeaders headers = request.headers();
        Map<String, String> m = new HashMap<>();
        for (Map.Entry<CharSequence, CharSequence> header : headers.entries()) {
            String lo = header.getKey().toString().toLowerCase();
            m.put(lo, (String)header.getValue());
        }
        return PersistentArrayMap.create(m);
    }

    private Map<Keyword, Object> createRingRequest(final ChannelHandlerContext ctx, final FullHttpRequest request) {

        Map<Keyword, Object> m = new HashMap<>();
        // :scheme
        m.put(SCHEME_KEY, HTTP);
        // :uri
        // :query
        setURIAndQuery(m, request);
        // :server-name
        // :server-port
        // :content-type
        // :content-length
        // :charctor-encoding
        setServerNames(m, request);
        // :request-method
        m.put(REQUEST_METHOD_KEY, getHttpMethod(request));
        // :remote-address
        m.put(REMOTE_PORT_KEY, getRemoteAddress(ctx, request));
        // :headers
        m.put(HEADERS_KEY, getRequestHeaders(request));

        return m;
    }

    private void writeRingResponse(Channel ch, Map<Keyword, Object> ringResponse) throws Exception {

        Integer statusCode = ((Long) ringResponse.get(STATUS)).intValue();
        if (statusCode == null) {
            statusCode = 200;
        }
        HttpResponseStatus status = HttpResponseStatus.valueOf(statusCode);
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);

        setResponseHeaders(response, ringResponse);

        Object body = ringResponse.get(BODY_KEY);

        if (body != null) {
            sendResponse(body, response, ch, keepAlive);
        } else {
            throw new IllegalArgumentException("response body is null");
        }
    }

    private void setResponseHeaders(HttpResponse response, Map<Keyword, Object> ringResponse) {
        HttpHeaders headers = response.headers();

        @SuppressWarnings("unchecked")
            final Map<String, Object> headerMap = (Map<String, Object>) ringResponse.get(HEADERS_KEY);

        if (headerMap == null) {
            return;
        }

        for (Map.Entry<String, Object> header : headerMap.entrySet()) {
            headers.set(header.getKey(), (String)header.getValue());
        }

        if (keepAlive) {
            headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        }

    }

    private void sendResponse(Object body, FullHttpResponse response, Channel ch, boolean keepAlive) throws Exception {

        // System.out.println(Thread.currentThread());
        if (body instanceof String) {
            sendStringResponse((String) body, response, ch, keepAlive);
        } else if (body instanceof ISeq) {
            StringBuilder sb = new StringBuilder();
            for (ISeq s = (ISeq) body; s != null; s = s.next()) {
                String str = (String) s.first();
                sb.append(str);
            }
            sendStringResponse(sb.toString(), response, ch, keepAlive);
        } else if (body instanceof InputStream) {
            sendInputStreamResponse((InputStream) body, response, ch, keepAlive);
        } else if (body instanceof File) {
            sendFileStreamResponse((File) body, response, ch, keepAlive);
        } else {
            //TODO WARN or 404 ?
            sendStringResponse("", response, ch, keepAlive);
        }
    }

    private void setContentLength(FullHttpResponse response, long length) {
        HttpHeaders headers = response.headers();
        CharSequence val = headers.get(HttpHeaderNames.CONTENT_LENGTH);
        if (val == null || val.equals("")) {
            headers.setLong(HttpHeaderNames.CONTENT_LENGTH, length);
        }
    }

    private Charset getResponseCharset(FullHttpResponse response) {
        HttpHeaders headers = response.headers();
        String type = (String)headers.get(HttpHeaderNames.CONTENT_TYPE);
        if (type != null) {
            try {
                type = type.toLowerCase();
                int i = type.indexOf(CHARSET);
                if (i != -1) {
                    String charset = type.substring(i + CHARSET.length()).trim();
                    return Charset.forName(charset);
                }
            } catch (Exception ignore) {
            }
        }
        return DEFAULT_CHARSET;
    }

    private void sendStringResponse(final String body, final FullHttpResponse response, Channel ch, boolean keepAlive) {
        Charset charset = getResponseCharset(response);
        ByteBuf buffer = Unpooled.copiedBuffer(body, charset);
        setContentLength(response, buffer.readableBytes());
        response.content().writeBytes(buffer);
        buffer.release();
        if (keepAlive) {
            ch.write(response);
            //ctx.write(response).addListener(ChannelFutureListener.CLOSE);
        } else {
            ch.write(response).addListener(ChannelFutureListener.CLOSE);
        }
    }

    private void sendInputStreamResponse(final InputStream stream, final FullHttpResponse response, Channel ch, boolean keepAlive) {
        ch.write(response);
        ChannelFuture future = ch.write(new ChunkedStream(stream));

        future.addListener(new GenericFutureListener<Future<Void>>() {
                @Override
                    public void operationComplete(Future<Void> f) throws Exception {
                    stream.close();
                }
            });
        future.addListener(ChannelFutureListener.CLOSE);
    }

    private void sendFileStreamResponse(final File file, final FullHttpResponse response, Channel ch, boolean keepAlive) throws Exception {

        Boolean useSendFile = (Boolean) option.get(USE_SEND_FILE);

        RandomAccessFile raf;
        try {
            raf = new RandomAccessFile(file, "r");
        } catch (FileNotFoundException e) {
            return;
        }
        long fileLength = raf.length();
        setContentLength(response, fileLength);
        ch.write(response);

        // Write the content.
        if (useSendFile != null && useSendFile) {
            ch.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ch.newProgressivePromise());
        } else {
            ch.write(new ChunkedFile(raf, 0, fileLength, 8192), ch.newProgressivePromise());
        }

        ChannelFuture lastContentFuture = ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
        if (!keepAlive) {
            lastContentFuture.addListener(ChannelFutureListener.CLOSE);
        }

    }

    static final class RingHandler implements Runnable {

        private final RingChannelHandler ringChannelHandler;
        private final Channel ch;
        private final InputStream in;

        public RingHandler(Channel ch, RingChannelHandler handler, InputStream in) {
            this.ringChannelHandler = handler;
            this.ch = ch;
            this.in = in;
        }

        @Override
        public void run() {

            try (InputStream input = this.in) {
                    this.ringChannelHandler.reqMap.put(BODY_KEY, input);
                    @SuppressWarnings("unchecked")
                        Map<Keyword, Object> response = (Map<Keyword, Object>) this.ringChannelHandler.handler.invoke(PersistentArrayMap.create(this.ringChannelHandler.reqMap));
                    if (response != null) {
                        this.ringChannelHandler.writeRingResponse(this.ch, response);
                    } else {
                        throw new IllegalArgumentException("response is null");
                    }
            } catch (Exception e) {
                //TODO Logging
                e.printStackTrace();
            } finally {
                this.ch.close();
            }

        }
    }

}
package arianrod.support.ring;

import clojure.lang.APersistentMap;
import clojure.lang.IFn;
import clojure.lang.Keyword;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class RingBootstrapFactory {

    public static final Keyword DEBUG_KEY = Keyword.intern("debug");
    public static final Keyword BACKLOG_KEY = Keyword.intern("backlog");

    public static ServerBootstrap createRingServerBootstrap(Class<? extends ServerChannel> channelClass, final APersistentMap option, final IFn handler) {

        final Boolean debug = (Boolean) option.get(DEBUG_KEY);
        final int backlog = ((Long) option.get(BACKLOG_KEY)).intValue();

        ServerBootstrap b = new ServerBootstrap();
        b.channel(channelClass)
                .option(ChannelOption.SO_BACKLOG, backlog)
                .option(ChannelOption.TCP_NODELAY, true)
                .childHandler(new RingChannelInitializer(handler, option));

        if (debug) {
            b.handler(new LoggingHandler(LogLevel.DEBUG));
        }
        return b;
    }

    static class RingChannelInitializer extends ChannelInitializer<SocketChannel> {

        private final IFn handler;
        private final APersistentMap option;

        public RingChannelInitializer(final IFn handler, final APersistentMap option) {
            super();
            this.option = option;
            this.handler = handler;
        }

        @Override
        public void initChannel(final SocketChannel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new HttpRequestDecoder())
                    .addLast(new HttpObjectAggregator(65536))
                    .addLast(new HttpResponseEncoder())
                    .addLast(new ChunkedWriteHandler())
                    .addLast(new RingChannelHandler(handler, option));
        }
    }

}
package arianrod.support;

import clojure.lang.IFn;
import clojure.lang.Keyword;
import clojure.lang.PersistentArrayMap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.logging.Level;
import java.util.logging.Logger;

public class CljSimpleChannelHandler extends SimpleChannelInboundHandler {

    private static final Logger logger = Logger.getLogger(
                                                          CljSimpleChannelHandler.class.getName());

    private static final Keyword isSharableKey = Keyword.intern(null, "is-sharable");
    private static final Keyword channelReadCompKey = Keyword.intern(null, "channel-read-complete");
    private static final Keyword channelActiveKey = Keyword.intern(null, "channel-active");
    private static final Keyword messageReceivedKey = Keyword.intern(null, "message-received");
    private static final Keyword channelInactiveKey = Keyword.intern(null, "channel-in-active");
    private static final Keyword exceptionCaughtKey = Keyword.intern(null, "exception-caught");

    private final PersistentArrayMap map;

    public CljSimpleChannelHandler(PersistentArrayMap map) {
        this.map = map;
    }

    @Override
    public boolean isSharable() {
        if (this.map.containsKey(isSharableKey)) {
            final IFn f = (IFn) this.map.valAt(isSharableKey);
            return (Boolean) f.invoke();
        } else {
            return super.isSharable();
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (this.map.containsKey(channelActiveKey)) {
            final IFn f = (IFn) this.map.valAt(channelActiveKey);
            f.invoke(ctx);
        } else {
            super.channelActive(ctx);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (this.map.containsKey(channelInactiveKey)) {
            final IFn f = (IFn) this.map.valAt(channelInactiveKey);
            f.invoke(ctx);
        } else {
            super.channelInactive(ctx);
        }
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (this.map.containsKey(messageReceivedKey)) {
            final IFn f = (IFn) this.map.valAt(messageReceivedKey);
            f.invoke(ctx, msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        if (this.map.containsKey(channelReadCompKey)) {
            final IFn f = (IFn) this.map.valAt(channelReadCompKey);
            f.invoke(ctx);
        } else {
            super.channelReadComplete(ctx);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if (this.map.containsKey(exceptionCaughtKey)) {
            final IFn f = (IFn) this.map.valAt(exceptionCaughtKey);
            f.invoke(ctx, cause);
        } else {

            logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
            ctx.close();
        }
    }

}
package arianrod.support;

import clojure.lang.IFn;
import clojure.lang.Keyword;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.Set;

public class ChannelEventHandler extends ChannelHandlerAdapter {

    public static final Keyword INACTIVE_KEY = Keyword.intern("inactive");
    public static final Keyword ACTIVE_KEY = Keyword.intern("active");
    public static final Keyword READ_KEY = Keyword.intern("read");
    public static final Keyword COMPLETE_KEY = Keyword.intern("complete");
    public static final Keyword EXCEPTION_KEY = Keyword.intern("exception");

    private EventSource eventSource;

    private final IFn handler;
    private final Set filter;

    public ChannelEventHandler(final IFn handler, final Set filter) {
        this.handler = handler;
        this.filter = filter;
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (!this.filter.contains(INACTIVE_KEY)) {
            return;
        }
        if (eventSource == null) {
            this.eventSource = new EventSource(ctx, INACTIVE_KEY);
        } else {
            this.eventSource.ctx = ctx;
            this.eventSource.type = INACTIVE_KEY;
            this.eventSource.object = null;
        }
        this.handler.invoke(this.eventSource);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (!this.filter.contains(ACTIVE_KEY)) {
            return;
        }
        if (eventSource == null) {
            this.eventSource = new EventSource(ctx, ACTIVE_KEY);
        } else {
            this.eventSource.ctx = ctx;
            this.eventSource.type = ACTIVE_KEY;
            this.eventSource.object = null;
        }
        this.handler.invoke(this.eventSource);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        if (!this.filter.contains(COMPLETE_KEY)) {
            return;
        }
        if (eventSource == null) {
            this.eventSource = new EventSource(ctx, COMPLETE_KEY);

        } else {
            this.eventSource.ctx = ctx;
            this.eventSource.type = COMPLETE_KEY;
            this.eventSource.object = null;
        }
        this.handler.invoke(this.eventSource);


    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (!this.filter.contains(EXCEPTION_KEY)) {
            return;
        }
        if (eventSource == null) {
            this.eventSource = new EventSource(ctx, EXCEPTION_KEY, cause);

        } else {
            this.eventSource.ctx = ctx;
            this.eventSource.type = EXCEPTION_KEY;
            this.eventSource.object = cause;
        }
        this.handler.invoke(this.eventSource);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (!this.filter.contains(COMPLETE_KEY)) {
            return;
        }

        if (eventSource == null) {
            this.eventSource = new EventSource(ctx, READ_KEY, msg);

        } else {
            this.eventSource.ctx = ctx;
            this.eventSource.type = READ_KEY;
            this.eventSource.object = msg;
        }
        this.handler.invoke(this.eventSource);
    }

}
package arianrod.support;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;

public class ChannelHandlerContextUtil {

    public static final ChannelFuture writeCtx(ChannelHandlerContext ctx, Object msg) {
        return ctx.write(msg);
    }

    public static final ChannelHandlerContext flushCtx(ChannelHandlerContext ctx) {
        return ctx.flush();
    }

    public static final ChannelFuture closeCtx(ChannelHandlerContext ctx) {
        return ctx.close();
    }

    public static final ChannelFuture writeAndFlushCtx(ChannelHandlerContext ctx, Object msg) {
        return ctx.writeAndFlush(msg);
    }

}
package arianrod.support;

import clojure.lang.IFn;
import clojure.lang.Keyword;
import clojure.lang.PersistentArrayMap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerAdapter;

import java.util.logging.Level;
import java.util.logging.Logger;

public class CljChannelInHandler extends ChannelHandlerAdapter {

    private static final Logger logger = Logger.getLogger(
                                                          CljChannelInHandler.class.getName());

    private static final Keyword isSharableKey = Keyword.intern(null, "is-sharable");
    private static final Keyword channelReadKey = Keyword.intern(null, "channel-read");
    private static final Keyword channelReadCompKey = Keyword.intern(null, "channel-read-complete");
    private static final Keyword channelActiveKey = Keyword.intern(null, "channel-active");
    private static final Keyword channelInactiveKey = Keyword.intern(null, "channel-in-active");
    private static final Keyword exceptionCaughtKey = Keyword.intern(null, "exception-caught");

    private final PersistentArrayMap map;

    public CljChannelInHandler(PersistentArrayMap map) {
        this.map = map;
    }

    @Override
    public boolean isSharable() {
        if (this.map.containsKey(isSharableKey)) {
            final IFn f = (IFn) this.map.valAt(isSharableKey);
            return (Boolean) f.invoke();
        } else {
            return super.isSharable();
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (this.map.containsKey(channelActiveKey)) {
            final IFn f = (IFn) this.map.valAt(channelActiveKey);
            f.invoke(ctx);
        } else {
            super.channelActive(ctx);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (this.map.containsKey(channelInactiveKey)) {
            final IFn f = (IFn) this.map.valAt(channelInactiveKey);
            f.invoke(ctx);
        } else {
            super.channelInactive(ctx);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (this.map.containsKey(channelReadKey)) {
            final IFn f = (IFn) this.map.valAt(channelReadKey);
            f.invoke(ctx, msg);
        } else {
            super.channelRead(ctx, msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        if (this.map.containsKey(channelReadCompKey)) {
            final IFn f = (IFn) this.map.valAt(channelReadCompKey);
            f.invoke(ctx);
        } else {
            super.channelReadComplete(ctx);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if (this.map.containsKey(exceptionCaughtKey)) {
            final IFn f = (IFn) this.map.valAt(exceptionCaughtKey);
            f.invoke(ctx, cause);
        } else {

            logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
            ctx.close();
        }
    }
}
package arianrod.support;

import clojure.lang.IDeref;
import clojure.lang.IFn;
import clojure.lang.Keyword;
import clojure.lang.RT;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.ChannelMatcher;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class EventSource implements IDeref {

    public static Map<String, DefaultChannelGroup> groupMap = new ConcurrentHashMap<>();

    public Keyword type;
    public Object object;
    public ChannelHandlerContext ctx;


    public EventSource(ChannelHandlerContext ctx, Keyword type) {
        this.ctx = ctx;
        this.type = type;
    }

    public EventSource(ChannelHandlerContext ctx, Keyword type, Object object) {
        this(ctx, type);
        this.object = object;
    }

    public void addGroup(String ns) {
        if (!groupMap.containsKey(ns)) {
            DefaultChannelGroup defaultChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
            groupMap.put(ns, defaultChannelGroup);
        }
    }

    public void joinGroup(String ns) {
        this.addGroup(ns);
        groupMap.get(ns).add(ctx.channel());
    }

    public ChannelGroupFuture putGroup(String ns, Object msg) {
        if (groupMap.containsKey(ns)) {
            return groupMap.get(ns).writeAndFlush(msg);
        }
        return null;
    }

    public ChannelGroupFuture putGroup(String ns, Object msg, final IFn matcher) {
        if (groupMap.containsKey(ns)) {
            return groupMap.get(ns).writeAndFlush(msg, new ChannelMatcher() {
                @Override
                public boolean matches(Channel channel) {
                    Object res = matcher.invoke(channel);
                    return res instanceof Boolean && (Boolean) res;
                }
            });
        }
        return null;
    }

    public ChannelGroupFuture closeGroup(String ns) {
        if (groupMap.containsKey(ns)) {
            return groupMap.get(ns).close();
        }
        return null;
    }

    public ChannelGroupFuture closeGroup(String ns, final IFn matcher) {
        if (groupMap.containsKey(ns)) {
            return groupMap.get(ns).close(new ChannelMatcher() {
                @Override
                public boolean matches(Channel channel) {
                    Object res = matcher.invoke(channel);
                    return res instanceof Boolean && (Boolean) res;
                }
            });
        }
        return null;
    }

    public void sendRemote(String host, int port, final IFn initializer, final IFn channelWriter) throws InterruptedException{
        Bootstrap b = BootstrapFactory.createBootstrap(this.ctx.channel().getClass(), initializer);
        b = b.group(this.ctx.channel().eventLoop());
        Channel channel = b.connect(host, port).sync().channel();
        channelWriter.invoke(channel);
        channel.closeFuture().sync();

    }

    @Override
    public Object deref() {
        return RT.vector(this.type, this.object);
    }

    public Object get() {
        return this.object;
    }

    public EventSource put(Object o) {
        this.ctx.write(o);
        return this;
    }

    public EventSource putFlush(Object o) {
        this.ctx.writeAndFlush(o);
        return this;
    }

    public EventSource flush() {
        this.ctx.flush();
        return this;
    }

}
package arianrod.support;

import clojure.lang.IFn;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;

public class SimpleChannelInitializer extends ChannelInitializer<SocketChannel> {

    private final IFn callback;

    public SimpleChannelInitializer(IFn callback) {
        this.callback = callback;
    }

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        this.callback.invoke(ch.pipeline());
    }
}

package arianrod.support;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;

public class UnpooledWrapper {

    public static final ByteBuf wrappedBufferBytes(byte[] array) {
        return Unpooled.wrappedBuffer(array);
    }

    public static final ByteBuf wrappedBuffer(ByteBuf buf) {
        return Unpooled.wrappedBuffer(buf);
    }

    public static final ByteBuf wrappedBuffer(ByteBuffer buf) {
        return Unpooled.wrappedBuffer(buf);
    }

    public static final ByteBuf copiedBuffer(byte[] array) {
        return Unpooled.copiedBuffer(array);
    }

    public static final ByteBuf copiedBuffer(ByteBuf buf) {
        return Unpooled.copiedBuffer(buf);
    }

    public static final ByteBuf copiedBuffer(ByteBuffer buf) {
        return Unpooled.copiedBuffer(buf);
    }

    public static final ByteBuf copiedBuffer(CharSequence string, Charset charset) {
        return Unpooled.copiedBuffer(string, charset);
    }

    public static final ByteBuf createBuffer(int capacity) {
        return Unpooled.buffer(capacity);
    }

    public static final ByteBuf createDirectBuffer(int capacity) {
        return Unpooled.directBuffer(capacity);
    }

}

package arianrod.support;

import clojure.lang.IFn;
import clojure.lang.RT;
import clojure.lang.Var;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import java.util.Set;


public class BootstrapFactory {


    public static ServerBootstrap createSimpleServerBootstrap(Class<? extends ServerChannel> channelClass, final IFn handler, final Set filter) {
        Var var = RT.var("arianrod.server", "*debug*");
        Boolean debug = (Boolean) var.get();
        var = RT.var("arianrod.server", "*backlog*");
        int backlog = ((Long) var.get()).intValue();

        ServerBootstrap b = new ServerBootstrap();
        b.channel(channelClass)
            .option(ChannelOption.SO_BACKLOG, backlog)
            .option(ChannelOption.TCP_NODELAY, true)
            .childHandler(new ChannelInitializer<SocketChannel>() {

                    @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new ChannelEventHandler(handler, filter));
                    }
                });

        if (debug) {
            b.handler(new LoggingHandler(LogLevel.DEBUG));
        }

        return b;
    }


    public static ServerBootstrap createServerBootstrap(Class<? extends ServerChannel> channelClass, final IFn initializer) {
        Var var = RT.var("arianrod.server", "*debug*");
        Boolean debug = (Boolean) var.get();
        var = RT.var("arianrod.server", "*backlog*");
        int backlog = ((Long) var.get()).intValue();

        ServerBootstrap b = new ServerBootstrap();
        b.channel(channelClass)
            .option(ChannelOption.SO_BACKLOG, (int) backlog)
            .option(ChannelOption.TCP_NODELAY, true)
            .childHandler(new ChannelInitializer<SocketChannel>() {

                    @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                        initializer.invoke(ch.pipeline());
                    }
                });

        if (debug) {
            b.handler(new LoggingHandler(LogLevel.DEBUG));
        }

        return b;
    }

    public static Bootstrap createBootstrap(Class<? extends Channel> channelClass, final IFn initializer) {
        Var var = RT.var("arianrod.client", "*debug*");
        Boolean debug = (Boolean) var.get();

        Bootstrap b = new Bootstrap();
        b.channel(channelClass)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                        initializer.invoke(ch.pipeline());
                    }
                });

        if (debug) {
            b.handler(new LoggingHandler(LogLevel.DEBUG));
        }
        return b;
    }

    public static ChannelPipeline addLastPipeline(ChannelPipeline pipeline, String name, ChannelHandler handler) {
        return pipeline.addLast(name, handler);
    }

}

終わりに

素直に Go 書きましょう!

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
9