12
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Ring サーバーを実装する

Posted at

Ring サーバーを実装する

どうも、僕です。
最近、Clojure 界隈から遠ざかっておりネタが全く思い浮かばなかったので
過去にプロトタイプでつくった Ring Server の話を書きたいと思います。
社用で使う用のプロトタイプなので全ては公開しませんが、気が向いたら公開するかも
知れません。

Ring SPEC

Ring は Clojure で Web アプリケーション 開発する際の規格のようなものです。
python の wsgi などの影響を受けており非常にシンプルな仕組みになっています。

ここでは詳細は割愛します。以下を参考にして下さい。

サーバーの実装の方針

割と使う側の話はあるのですがサーバー側実装についてはあまり情報ないかも知れません。
多くの場合は以下の2パターンになるでしょう。

  • Servlet API から Ring へのマッピング
  • 独自実装

Servlet API からのマッピングはあまり面白くなく、Servlet コンテナも必要になるので
少々面倒です。

uberjar で 1ファイルで配布してしまいたいので独自実装で実装していきます。
速度面を考え、コアは netty を使用し、その上に Ring 部を実装していきます。
呼び出し回数の少ない箇所や、動的に変更したい設定部分は Clojure で実装、コアは Java
で実装します。

Java -> Clojure

まず Java 側の実装です。

Clojure 連携の基本

Clojure から関数(Ring Handler)を受け取り、引数を Java で作成し実行します。
以下は Ring Handler を実行する例です

Map<Keyword, Object> response = (Map<Keyword, Object>) handler.invoke(PersistentArrayMap.create(reqMap));

Clojure の関数は IFn として表されます。
関数実行は invoke メソッドでできます。
引数部は Clojure 側で使用する型に合わせなければなりませんが、多くの場合は簡単に
変換できるようヘルパーメソッドが用意されています。

Ring Response は Clojure 側の map (PersistentArrayMap) で返ってくるわけですが、
Map インターフェイスで扱えば特に意識する必要はないでしょう。

Keyword

Ring のリクエストは キーが Keyword な map で表します。
Keyword は以下のように作成できます。

public static final Keyword HTTP = Keyword.intern("http");

また使用する Keyword はほぼ固定なので使いまわせるように static で作成しておく
方がよいでしょう。

public static final Keyword BODY_KEY = Keyword.intern("body");
public static final Keyword URI_KEY = Keyword.intern("uri");
public static final Keyword QUERY_KEY = Keyword.intern("query-string");
public static final Keyword REQUEST_METHOD_KEY = Keyword.intern("request-method");
public static final Keyword SERVER_NAME_KEY = Keyword.intern("server-name");

public static final Keyword SERVER_PORT_KEY = Keyword.intern("server-port");
public static final Keyword REMOTE_PORT_KEY = Keyword.intern("remote-port");
public static final Keyword SCHEME_KEY = Keyword.intern("scheme");
public static final Keyword CONTENT_TYPE_KEY = Keyword.intern("content-type");
public static final Keyword CONTENT_LENGTH_KEY = Keyword.intern("content-length");
public static final Keyword CHAR_ENC_KEY = Keyword.intern("charactor-encoding");
public static final Keyword HEADERS_KEY = Keyword.intern("headers");

public static final Keyword HTTP = Keyword.intern("http");
public static final Keyword STATUS = Keyword.intern("status");
public static final Keyword USE_SEND_FILE = Keyword.intern("use-send-file");
static {
    METHOD_MAP = new HashMap<>();
    METHOD_MAP.put(HttpMethod.GET, Keyword.intern("get"));
    METHOD_MAP.put(HttpMethod.POST, Keyword.intern("post"));
    METHOD_MAP.put(HttpMethod.PUT, Keyword.intern("put"));
    METHOD_MAP.put(HttpMethod.PATCH, Keyword.intern("patch"));
    METHOD_MAP.put(HttpMethod.OPTIONS, Keyword.intern("options"));
    METHOD_MAP.put(HttpMethod.DELETE, Keyword.intern("delete"));
    METHOD_MAP.put(HttpMethod.HEAD, Keyword.intern("head"));
    METHOD_MAP.put(HttpMethod.CONNECT, Keyword.intern("connect"));
    METHOD_MAP.put(HttpMethod.TRACE, Keyword.intern("trace"));
}

Ring Request

Ring Request は上記でもあるように PersistentArrayMap です。
PersistentArrayMap は Map から作成できるため、Map で作成後、最後に変換します。

private Map<Keyword, Object> createRingRequest(final ChannelHandlerContext ctx, final FullHttpRequest request) {

    Map<Keyword, Object> m = new HashMap<>();
    // :scheme
    m.put(SCHEME_KEY, HTTP);
    // :uri
    // :query
    setURIAndQuery(m, request);
    // :server-name
    // :server-port
    // :content-type
    // :content-length
    // :charctor-encoding
    setServerNames(m, request);
    // :request-method
    m.put(REQUEST_METHOD_KEY, getHttpMethod(request));
    // :remote-address
    m.put(REMOTE_PORT_KEY, getRemoteAddress(ctx, request));
    // :headers
    m.put(HEADERS_KEY, getRequestHeaders(request));

    return m;
}

Ring Response

Ring Response は複数の型をサポートしています。
そのため型を調べ、実装します。


private void writeRingResponse(Channel ch, Map<Keyword, Object> ringResponse) throws Exception {

    Integer statusCode = ((Long) ringResponse.get(STATUS)).intValue();
    if (statusCode == null) {
        statusCode = 200;
    }
    HttpResponseStatus status = HttpResponseStatus.valueOf(statusCode);
    FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);

    setResponseHeaders(response, ringResponse);

    Object body = ringResponse.get(BODY_KEY);

    if (body != null) {
        sendResponse(body, response, ch, keepAlive);
    } else {
        throw new IllegalArgumentException("response body is null");
    }
}

private void setResponseHeaders(HttpResponse response, Map<Keyword, Object> ringResponse) {
    HttpHeaders headers = response.headers();

    @SuppressWarnings("unchecked")
        final Map<String, Object> headerMap = (Map<String, Object>) ringResponse.get(HEADERS_KEY);

    if (headerMap == null) {
        return;
    }

    for (Map.Entry<String, Object> header : headerMap.entrySet()) {
        headers.set(header.getKey(), (String)header.getValue());
    }

    if (keepAlive) {
        headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
    }

}

private void sendResponse(Object body, FullHttpResponse response, Channel ch, boolean keepAlive) throws Exception {

    // System.out.println(Thread.currentThread());
    if (body instanceof String) {
        sendStringResponse((String) body, response, ch, keepAlive);
    } else if (body instanceof ISeq) {
        StringBuilder sb = new StringBuilder();
        for (ISeq s = (ISeq) body; s != null; s = s.next()) {
            String str = (String) s.first();
            sb.append(str);
        }
        sendStringResponse(sb.toString(), response, ch, keepAlive);
    } else if (body instanceof InputStream) {
        sendInputStreamResponse((InputStream) body, response, ch, keepAlive);
    } else if (body instanceof File) {
        sendFileStreamResponse((File) body, response, ch, keepAlive);
    } else {
        //TODO WARN or 404 ?
        sendStringResponse("", response, ch, keepAlive);
    }
}

Clojure -> Java

Clojure 側は設定、組み立て部です。
基本はラッパーを作るだけです。
組み立て部は Clojure で書いた方が楽でしょう。

ラッパー

各種ラッパーを定義します。
Clojure から短い名前で呼び出せるようにしています。

(ns arianrod.netty
  (:import
   [java.nio.charset Charset]
   [io.netty.bootstrap ServerBootstrap Bootstrap]
   [io.netty.channel EventLoop EventLoopGroup ChannelHandler ChannelPipeline ChannelHandlerContext]
   [arianrod.support CljChannelInHandler BootstrapFactory ChannelHandlerContextUtil CljSimpleChannelHandler UnpooledWrapper
    ChunkedInputStream ChannelEventHandler]))

(defn- is-linux []
  (= "Linux" (System/getProperty "os.name")))

(defmacro ^EventLoopGroup create-elg []
  (if (is-linux)
    `(io.netty.channel.epoll.EpollEventLoopGroup.)
    `(io.netty.channel.nio.NioEventLoopGroup.)))

(defmacro ^EventLoop create-el []
  (if (is-linux)
    `(io.netty.channel.epoll.EpollEventLoop.)
    `(io.netty.channel.nio.NioEventLoop.)))

(defmacro ^EventLoop create-nssc []
  (if (is-linux)
    `io.netty.channel.epoll.EpollServerSocketChannel
    `io.netty.channel.socket.nio.NioServerSocketChannel))

(defmacro ^EventLoop create-nsc []
  (if (is-linux)
    `io.netty.channel.epoll.EpollSocketChannel
    `io.netty.channel.socket.nio.NioSocketChannel))

(defmacro ^ChannelHandler channel-in-handler [m]
  `(CljChannelInHandler. ~m))

(defn ^ChannelHandler make-handler [fn & {:keys [filter] :or {filter [:read :complete :exception]}}]
  (ChannelEventHandler. fn (set filter)))

(defmacro ^ChannelHandler simple-handler [m]
  `(CljSimpleChannelHandler. ~m))

(defmacro write-ctx [ctx msg]
  `(ChannelHandlerContextUtil/writeCtx ~ctx ~msg))

(defmacro flush-ctx [ctx]
  `(ChannelHandlerContextUtil/flushCtx ~ctx))

(defmacro write-flush-ctx [ctx msg]
  `(ChannelHandlerContextUtil/writeAndFlushCtx ~ctx ~msg))

(defmacro close-ctx [ctx]
  `(ChannelHandlerContextUtil/closeCtx ~ctx))

(defn ^ServerBootstrap server-factory [fn & {:keys [filter] :or {filter [:read :complete :exception]}}]
  (BootstrapFactory/createSimpleServerBootstrap (create-nssc) fn (set filter)))

(defn ^ServerBootstrap server-factory-init [initializer]
  (BootstrapFactory/createServerBootstrap (create-nssc) initializer))

(defn ^Bootstrap client-factory [initializer]
  (BootstrapFactory/createBootstrap (create-nsc) initializer))

(defmacro add-last-pipeline [pipline name fn]
  `(BootstrapFactory/addLastPipeline ~pipline ~name ~fn))

;; Buffer API

(defn wrap-buffer-bytes [a]
  (UnpooledWrapper/wrappedBufferBytes a))

(defn copied-buffer [^String string ^Charset charset]
  (UnpooledWrapper/copiedBuffer string charset))

(defn create-buf [cap]
  (UnpooledWrapper/createBuffer cap))

(defn create-direct-buf [cap]
  (UnpooledWrapper/createDirectBuffer cap))

(defn ->byte-array [s]
  (byte-array (map (comp byte int) s)))

(defn ->bytes [s]
  (bytes (byte-array (map (comp byte int) s))))

内部的に EventLoop の種類などを自動で切り替えます。

Server

起動部分はほぼ netty のお作法通りです。

(ns arianrod.server
  (:import
   [io.netty.bootstrap ServerBootstrap]
   [io.netty.channel EventLoopGroup ChannelFuture])
  (:require
   [arianrod.netty :as netty]))

(def ^:dynamic *debug* false)
(def ^:dynamic *backlog* 1024)

(defn wait-future [^ChannelFuture future ^EventLoopGroup boss ^EventLoopGroup work]
  (let [^ChannelFuture cf (-> (.channel future)
                              (.closeFuture))]
    (fn []
      (try
        (.sync cf)
        (finally
          (.shutdownGracefully boss)
          (.shutdownGracefully work))))))

(defn ^ServerBootstrap bind-server [^ServerBootstrap bootstrap ^long port]
  (-> (.bind bootstrap port)
      (.sync)))

(defn start-server* [factory port & [boss work]]
  (let [boss (or boss (netty/create-elg))
        work (or work (netty/create-elg))]
    (-> (factory)
        (.group boss work)
        (bind-server port)
        (wait-future boss work))))

(defn start-server [factory port & {:keys [wait] :or {wait true}}]
  (let [waiter (start-server* factory port)]
    (if wait
      (waiter)
      waiter)))

(defmacro start-server-wait [factory port]
  `(start-server ~factory ~port :wait true))
(ns arianrod.ring
  (:use arianrod.netty)
  (:import
   [io.netty.bootstrap ServerBootstrap]
   [arianrod.support.ring RingBootstrapFactory RingChannelHandler]))

(def ^:dynamic *option* {:debug false
                         :backlog 1024
                         :use-send-file true})

(defn ring-server-factory [handler options]
  (RingBootstrapFactory/createRingServerBootstrap
   (create-nssc) options handler))

(defn create-ring-server
  ([handler] (create-ring-server handler *option*))
  ([handler options]
     (ring-server-factory handler (merge *option* options))))

(defn create-http-server []
  (create-ring-server
   (fn [req]
     {:status 200
      :headers {"Content-Type" "text/plain"}
      :body "hello world"})))

全体

package arianrod.support.ring;

import clojure.lang.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedFile;
import io.netty.handler.stream.ChunkedStream;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.Map;

public class RingChannelHandler extends SimpleChannelInboundHandler<HttpObject> {

    public static final Keyword BODY_KEY = Keyword.intern("body");
    public static final Keyword URI_KEY = Keyword.intern("uri");
    public static final Keyword QUERY_KEY = Keyword.intern("query-string");
    public static final Keyword REQUEST_METHOD_KEY = Keyword.intern("request-method");
    public static final Keyword SERVER_NAME_KEY = Keyword.intern("server-name");

    public static final Keyword SERVER_PORT_KEY = Keyword.intern("server-port");
    public static final Keyword REMOTE_PORT_KEY = Keyword.intern("remote-port");
    public static final Keyword SCHEME_KEY = Keyword.intern("scheme");
    public static final Keyword CONTENT_TYPE_KEY = Keyword.intern("content-type");
    public static final Keyword CONTENT_LENGTH_KEY = Keyword.intern("content-length");
    public static final Keyword CHAR_ENC_KEY = Keyword.intern("charactor-encoding");
    public static final Keyword HEADERS_KEY = Keyword.intern("headers");

    public static final Keyword HTTP = Keyword.intern("http");
    public static final Keyword STATUS = Keyword.intern("status");
    public static final Keyword USE_SEND_FILE = Keyword.intern("use-send-file");

    private static final String CHARSET = "charset=";
    private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");

    private static final int BODY_MEMORY_LIMIT = 1024 * 1024 * 16;

    private static final Map<HttpMethod, Keyword> METHOD_MAP;

    static {
        METHOD_MAP = new HashMap<>();
        METHOD_MAP.put(HttpMethod.GET, Keyword.intern("get"));
        METHOD_MAP.put(HttpMethod.POST, Keyword.intern("post"));
        METHOD_MAP.put(HttpMethod.PUT, Keyword.intern("put"));
        METHOD_MAP.put(HttpMethod.PATCH, Keyword.intern("patch"));
        METHOD_MAP.put(HttpMethod.OPTIONS, Keyword.intern("options"));
        METHOD_MAP.put(HttpMethod.DELETE, Keyword.intern("delete"));
        METHOD_MAP.put(HttpMethod.HEAD, Keyword.intern("head"));
        METHOD_MAP.put(HttpMethod.CONNECT, Keyword.intern("connect"));
        METHOD_MAP.put(HttpMethod.TRACE, Keyword.intern("trace"));
    }

    private final APersistentMap option;
    private final IFn handler;
    private Map<Keyword, Object> reqMap;
    private ByteBuf byteBuf = null;
    private Path tempFilePath = null;
    private boolean keepAlive;

    public RingChannelHandler(final IFn handler, final APersistentMap option) {
        this.handler = handler;
        this.option = option;
    }

    @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    private void callRingHandler(ChannelHandlerContext ctx, InputStream input) throws Exception {
        //System.out.println(Thread.currentThread());

        try (InputStream in = input) {
                reqMap.put(BODY_KEY, in);

                @SuppressWarnings("unchecked")
                Map<Keyword, Object> response = (Map<Keyword, Object>) handler.invoke(PersistentArrayMap.create(reqMap));
                if (response != null) {
                    writeRingResponse(ctx.channel(), response);
                } else {
                    throw new IllegalArgumentException("response is null");
                }

            }
    }

    @Override
        public void messageReceived(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        if (msg instanceof HttpRequest) {
            FullHttpRequest request = (FullHttpRequest) msg;
            keepAlive = HttpHeaderUtil.isKeepAlive(request);

            Long contentLength = HttpHeaderUtil.getContentLength(request, 0);
            HttpMethod method = request.method();
            reqMap = this.createRingRequest(ctx, request);

            if (method == HttpMethod.POST || method == HttpMethod.PUT) {
                if (contentLength > BODY_MEMORY_LIMIT) {
                    // large data
                    tempFilePath = Files.createTempFile(null, null);

                } else {
                    // memory
                    byteBuf = Unpooled.directBuffer(contentLength.intValue());
                }
            } else {
                callRingHandler(ctx, new ByteBufInputStream(request.content()));
            }
        }

        if (msg instanceof HttpContent) {
            HttpContent content = (HttpContent) msg;
            if (byteBuf != null) {
                byteBuf.writeBytes(content.content());

                if (msg instanceof LastHttpContent) {
                    callRingHandler(ctx, new ByteBufInputStream(byteBuf));
                }
            } else if (tempFilePath != null) {
                Files.write(tempFilePath, content.content().array(), StandardOpenOption.APPEND);

                if (msg instanceof LastHttpContent) {
                    callRingHandler(ctx, Files.newInputStream(tempFilePath, StandardOpenOption.DELETE_ON_CLOSE));
                }
            }
        }
    }

    private Keyword getHttpMethod(final FullHttpRequest request) {
        HttpMethod method = request.method();
        if (METHOD_MAP.containsKey(method)) {
            return METHOD_MAP.get(method);
        }
        return Keyword.intern(null, String.valueOf(method.name().toLowerCase()));
    }

    private void setURIAndQuery(final Map<Keyword, Object> m, final FullHttpRequest request) {
        String url = request.uri();
        int idx = url.indexOf('?');
        String uri;
        String queryString;
        if (idx > 0) {
            uri = url.substring(0, idx);
            queryString = url.substring(idx + 1);
        } else {
            uri = url;
            queryString = null;
        }
        m.put(URI_KEY, uri);
        m.put(QUERY_KEY, queryString);
    }

    private void setServerNames(final Map<Keyword, Object> m, final FullHttpRequest request) {
        HttpHeaders headers = request.headers();
        String serverName = null;
        Integer serverPort = null;
        String contentType = null;
        String charset = null;

        String h = (String) headers.get("host");
        if (h != null) {
            int idx = h.lastIndexOf(':');
            if (idx != -1) {
                serverName = h.substring(0, idx);
                serverPort = Integer.valueOf(h.substring(idx + 1));
            } else {
                serverName = h;
            }
        }

        String ct = (String) headers.get(HttpHeaderNames.CONTENT_TYPE);
        if (ct != null) {
            int idx = ct.indexOf(";");
            if (idx != -1) {
                int cidx = ct.indexOf(CHARSET, idx);
                if (cidx != -1) {
                    contentType = ct.substring(0, idx);
                    charset = ct.substring(cidx + CHARSET.length());
                } else {
                    contentType = ct;
                }
            } else {
                contentType = ct;
            }
        }

        m.put(SERVER_NAME_KEY, serverName);
        m.put(SERVER_PORT_KEY, serverPort);
        m.put(CONTENT_TYPE_KEY, contentType);

        long length = HttpHeaderUtil.getContentLength(request);
        if (length > 0) {
            m.put(CONTENT_LENGTH_KEY, length);
        } else {
            m.put(CONTENT_LENGTH_KEY, null);
        }
        m.put(CHAR_ENC_KEY, charset);
    }

    private String getRemoteAddress(final ChannelHandlerContext ctx, final FullHttpRequest request) {
        String h = (String) request.headers().get("X-Forward-For");
        if (h != null) {
            int idx = h.indexOf(',');
            if (idx == -1) {
                return h;
            } else {
                // X-Forwarded-For: client, proxy1, proxy2
                return h.substring(0, idx);
            }
        }
        InetSocketAddress sockAddr = (InetSocketAddress) ctx.channel().remoteAddress();
        return sockAddr.getAddress().getHostAddress();
    }

    private IPersistentMap getRequestHeaders(final FullHttpRequest request) {
        HttpHeaders headers = request.headers();
        Map<String, String> m = new HashMap<>();
        for (Map.Entry<CharSequence, CharSequence> header : headers.entries()) {
            String lo = header.getKey().toString().toLowerCase();
            m.put(lo, (String)header.getValue());
        }
        return PersistentArrayMap.create(m);
    }

    private Map<Keyword, Object> createRingRequest(final ChannelHandlerContext ctx, final FullHttpRequest request) {

        Map<Keyword, Object> m = new HashMap<>();
        // :scheme
        m.put(SCHEME_KEY, HTTP);
        // :uri
        // :query
        setURIAndQuery(m, request);
        // :server-name
        // :server-port
        // :content-type
        // :content-length
        // :charctor-encoding
        setServerNames(m, request);
        // :request-method
        m.put(REQUEST_METHOD_KEY, getHttpMethod(request));
        // :remote-address
        m.put(REMOTE_PORT_KEY, getRemoteAddress(ctx, request));
        // :headers
        m.put(HEADERS_KEY, getRequestHeaders(request));

        return m;
    }

    private void writeRingResponse(Channel ch, Map<Keyword, Object> ringResponse) throws Exception {

        Integer statusCode = ((Long) ringResponse.get(STATUS)).intValue();
        if (statusCode == null) {
            statusCode = 200;
        }
        HttpResponseStatus status = HttpResponseStatus.valueOf(statusCode);
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);

        setResponseHeaders(response, ringResponse);

        Object body = ringResponse.get(BODY_KEY);

        if (body != null) {
            sendResponse(body, response, ch, keepAlive);
        } else {
            throw new IllegalArgumentException("response body is null");
        }
    }

    private void setResponseHeaders(HttpResponse response, Map<Keyword, Object> ringResponse) {
        HttpHeaders headers = response.headers();

        @SuppressWarnings("unchecked")
            final Map<String, Object> headerMap = (Map<String, Object>) ringResponse.get(HEADERS_KEY);

        if (headerMap == null) {
            return;
        }

        for (Map.Entry<String, Object> header : headerMap.entrySet()) {
            headers.set(header.getKey(), (String)header.getValue());
        }

        if (keepAlive) {
            headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        }

    }

    private void sendResponse(Object body, FullHttpResponse response, Channel ch, boolean keepAlive) throws Exception {

        // System.out.println(Thread.currentThread());
        if (body instanceof String) {
            sendStringResponse((String) body, response, ch, keepAlive);
        } else if (body instanceof ISeq) {
            StringBuilder sb = new StringBuilder();
            for (ISeq s = (ISeq) body; s != null; s = s.next()) {
                String str = (String) s.first();
                sb.append(str);
            }
            sendStringResponse(sb.toString(), response, ch, keepAlive);
        } else if (body instanceof InputStream) {
            sendInputStreamResponse((InputStream) body, response, ch, keepAlive);
        } else if (body instanceof File) {
            sendFileStreamResponse((File) body, response, ch, keepAlive);
        } else {
            //TODO WARN or 404 ?
            sendStringResponse("", response, ch, keepAlive);
        }
    }

    private void setContentLength(FullHttpResponse response, long length) {
        HttpHeaders headers = response.headers();
        CharSequence val = headers.get(HttpHeaderNames.CONTENT_LENGTH);
        if (val == null || val.equals("")) {
            headers.setLong(HttpHeaderNames.CONTENT_LENGTH, length);
        }
    }

    private Charset getResponseCharset(FullHttpResponse response) {
        HttpHeaders headers = response.headers();
        String type = (String)headers.get(HttpHeaderNames.CONTENT_TYPE);
        if (type != null) {
            try {
                type = type.toLowerCase();
                int i = type.indexOf(CHARSET);
                if (i != -1) {
                    String charset = type.substring(i + CHARSET.length()).trim();
                    return Charset.forName(charset);
                }
            } catch (Exception ignore) {
            }
        }
        return DEFAULT_CHARSET;
    }

    private void sendStringResponse(final String body, final FullHttpResponse response, Channel ch, boolean keepAlive) {
        Charset charset = getResponseCharset(response);
        ByteBuf buffer = Unpooled.copiedBuffer(body, charset);
        setContentLength(response, buffer.readableBytes());
        response.content().writeBytes(buffer);
        buffer.release();
        if (keepAlive) {
            ch.write(response);
            //ctx.write(response).addListener(ChannelFutureListener.CLOSE);
        } else {
            ch.write(response).addListener(ChannelFutureListener.CLOSE);
        }
    }

    private void sendInputStreamResponse(final InputStream stream, final FullHttpResponse response, Channel ch, boolean keepAlive) {
        ch.write(response);
        ChannelFuture future = ch.write(new ChunkedStream(stream));

        future.addListener(new GenericFutureListener<Future<Void>>() {
                @Override
                    public void operationComplete(Future<Void> f) throws Exception {
                    stream.close();
                }
            });
        future.addListener(ChannelFutureListener.CLOSE);
    }

    private void sendFileStreamResponse(final File file, final FullHttpResponse response, Channel ch, boolean keepAlive) throws Exception {

        Boolean useSendFile = (Boolean) option.get(USE_SEND_FILE);

        RandomAccessFile raf;
        try {
            raf = new RandomAccessFile(file, "r");
        } catch (FileNotFoundException e) {
            return;
        }
        long fileLength = raf.length();
        setContentLength(response, fileLength);
        ch.write(response);

        // Write the content.
        if (useSendFile != null && useSendFile) {
            ch.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ch.newProgressivePromise());
        } else {
            ch.write(new ChunkedFile(raf, 0, fileLength, 8192), ch.newProgressivePromise());
        }

        ChannelFuture lastContentFuture = ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
        if (!keepAlive) {
            lastContentFuture.addListener(ChannelFutureListener.CLOSE);
        }

    }

    static final class RingHandler implements Runnable {

        private final RingChannelHandler ringChannelHandler;
        private final Channel ch;
        private final InputStream in;

        public RingHandler(Channel ch, RingChannelHandler handler, InputStream in) {
            this.ringChannelHandler = handler;
            this.ch = ch;
            this.in = in;
        }

        @Override
        public void run() {

            try (InputStream input = this.in) {
                    this.ringChannelHandler.reqMap.put(BODY_KEY, input);
                    @SuppressWarnings("unchecked")
                        Map<Keyword, Object> response = (Map<Keyword, Object>) this.ringChannelHandler.handler.invoke(PersistentArrayMap.create(this.ringChannelHandler.reqMap));
                    if (response != null) {
                        this.ringChannelHandler.writeRingResponse(this.ch, response);
                    } else {
                        throw new IllegalArgumentException("response is null");
                    }
            } catch (Exception e) {
                //TODO Logging
                e.printStackTrace();
            } finally {
                this.ch.close();
            }

        }
    }

}
package arianrod.support.ring;

import clojure.lang.APersistentMap;
import clojure.lang.IFn;
import clojure.lang.Keyword;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class RingBootstrapFactory {

    public static final Keyword DEBUG_KEY = Keyword.intern("debug");
    public static final Keyword BACKLOG_KEY = Keyword.intern("backlog");

    public static ServerBootstrap createRingServerBootstrap(Class<? extends ServerChannel> channelClass, final APersistentMap option, final IFn handler) {

        final Boolean debug = (Boolean) option.get(DEBUG_KEY);
        final int backlog = ((Long) option.get(BACKLOG_KEY)).intValue();

        ServerBootstrap b = new ServerBootstrap();
        b.channel(channelClass)
                .option(ChannelOption.SO_BACKLOG, backlog)
                .option(ChannelOption.TCP_NODELAY, true)
                .childHandler(new RingChannelInitializer(handler, option));

        if (debug) {
            b.handler(new LoggingHandler(LogLevel.DEBUG));
        }
        return b;
    }

    static class RingChannelInitializer extends ChannelInitializer<SocketChannel> {

        private final IFn handler;
        private final APersistentMap option;

        public RingChannelInitializer(final IFn handler, final APersistentMap option) {
            super();
            this.option = option;
            this.handler = handler;
        }

        @Override
        public void initChannel(final SocketChannel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new HttpRequestDecoder())
                    .addLast(new HttpObjectAggregator(65536))
                    .addLast(new HttpResponseEncoder())
                    .addLast(new ChunkedWriteHandler())
                    .addLast(new RingChannelHandler(handler, option));
        }
    }

}
package arianrod.support;

import clojure.lang.IFn;
import clojure.lang.Keyword;
import clojure.lang.PersistentArrayMap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.logging.Level;
import java.util.logging.Logger;

public class CljSimpleChannelHandler extends SimpleChannelInboundHandler {

    private static final Logger logger = Logger.getLogger(
                                                          CljSimpleChannelHandler.class.getName());

    private static final Keyword isSharableKey = Keyword.intern(null, "is-sharable");
    private static final Keyword channelReadCompKey = Keyword.intern(null, "channel-read-complete");
    private static final Keyword channelActiveKey = Keyword.intern(null, "channel-active");
    private static final Keyword messageReceivedKey = Keyword.intern(null, "message-received");
    private static final Keyword channelInactiveKey = Keyword.intern(null, "channel-in-active");
    private static final Keyword exceptionCaughtKey = Keyword.intern(null, "exception-caught");

    private final PersistentArrayMap map;

    public CljSimpleChannelHandler(PersistentArrayMap map) {
        this.map = map;
    }

    @Override
    public boolean isSharable() {
        if (this.map.containsKey(isSharableKey)) {
            final IFn f = (IFn) this.map.valAt(isSharableKey);
            return (Boolean) f.invoke();
        } else {
            return super.isSharable();
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (this.map.containsKey(channelActiveKey)) {
            final IFn f = (IFn) this.map.valAt(channelActiveKey);
            f.invoke(ctx);
        } else {
            super.channelActive(ctx);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (this.map.containsKey(channelInactiveKey)) {
            final IFn f = (IFn) this.map.valAt(channelInactiveKey);
            f.invoke(ctx);
        } else {
            super.channelInactive(ctx);
        }
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (this.map.containsKey(messageReceivedKey)) {
            final IFn f = (IFn) this.map.valAt(messageReceivedKey);
            f.invoke(ctx, msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        if (this.map.containsKey(channelReadCompKey)) {
            final IFn f = (IFn) this.map.valAt(channelReadCompKey);
            f.invoke(ctx);
        } else {
            super.channelReadComplete(ctx);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if (this.map.containsKey(exceptionCaughtKey)) {
            final IFn f = (IFn) this.map.valAt(exceptionCaughtKey);
            f.invoke(ctx, cause);
        } else {

            logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
            ctx.close();
        }
    }

}
package arianrod.support;

import clojure.lang.IFn;
import clojure.lang.Keyword;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.Set;

public class ChannelEventHandler extends ChannelHandlerAdapter {

    public static final Keyword INACTIVE_KEY = Keyword.intern("inactive");
    public static final Keyword ACTIVE_KEY = Keyword.intern("active");
    public static final Keyword READ_KEY = Keyword.intern("read");
    public static final Keyword COMPLETE_KEY = Keyword.intern("complete");
    public static final Keyword EXCEPTION_KEY = Keyword.intern("exception");

    private EventSource eventSource;

    private final IFn handler;
    private final Set filter;

    public ChannelEventHandler(final IFn handler, final Set filter) {
        this.handler = handler;
        this.filter = filter;
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (!this.filter.contains(INACTIVE_KEY)) {
            return;
        }
        if (eventSource == null) {
            this.eventSource = new EventSource(ctx, INACTIVE_KEY);
        } else {
            this.eventSource.ctx = ctx;
            this.eventSource.type = INACTIVE_KEY;
            this.eventSource.object = null;
        }
        this.handler.invoke(this.eventSource);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (!this.filter.contains(ACTIVE_KEY)) {
            return;
        }
        if (eventSource == null) {
            this.eventSource = new EventSource(ctx, ACTIVE_KEY);
        } else {
            this.eventSource.ctx = ctx;
            this.eventSource.type = ACTIVE_KEY;
            this.eventSource.object = null;
        }
        this.handler.invoke(this.eventSource);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        if (!this.filter.contains(COMPLETE_KEY)) {
            return;
        }
        if (eventSource == null) {
            this.eventSource = new EventSource(ctx, COMPLETE_KEY);

        } else {
            this.eventSource.ctx = ctx;
            this.eventSource.type = COMPLETE_KEY;
            this.eventSource.object = null;
        }
        this.handler.invoke(this.eventSource);


    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (!this.filter.contains(EXCEPTION_KEY)) {
            return;
        }
        if (eventSource == null) {
            this.eventSource = new EventSource(ctx, EXCEPTION_KEY, cause);

        } else {
            this.eventSource.ctx = ctx;
            this.eventSource.type = EXCEPTION_KEY;
            this.eventSource.object = cause;
        }
        this.handler.invoke(this.eventSource);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (!this.filter.contains(COMPLETE_KEY)) {
            return;
        }

        if (eventSource == null) {
            this.eventSource = new EventSource(ctx, READ_KEY, msg);

        } else {
            this.eventSource.ctx = ctx;
            this.eventSource.type = READ_KEY;
            this.eventSource.object = msg;
        }
        this.handler.invoke(this.eventSource);
    }

}
package arianrod.support;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;

public class ChannelHandlerContextUtil {

    public static final ChannelFuture writeCtx(ChannelHandlerContext ctx, Object msg) {
        return ctx.write(msg);
    }

    public static final ChannelHandlerContext flushCtx(ChannelHandlerContext ctx) {
        return ctx.flush();
    }

    public static final ChannelFuture closeCtx(ChannelHandlerContext ctx) {
        return ctx.close();
    }

    public static final ChannelFuture writeAndFlushCtx(ChannelHandlerContext ctx, Object msg) {
        return ctx.writeAndFlush(msg);
    }

}
package arianrod.support;

import clojure.lang.IFn;
import clojure.lang.Keyword;
import clojure.lang.PersistentArrayMap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerAdapter;

import java.util.logging.Level;
import java.util.logging.Logger;

public class CljChannelInHandler extends ChannelHandlerAdapter {

    private static final Logger logger = Logger.getLogger(
                                                          CljChannelInHandler.class.getName());

    private static final Keyword isSharableKey = Keyword.intern(null, "is-sharable");
    private static final Keyword channelReadKey = Keyword.intern(null, "channel-read");
    private static final Keyword channelReadCompKey = Keyword.intern(null, "channel-read-complete");
    private static final Keyword channelActiveKey = Keyword.intern(null, "channel-active");
    private static final Keyword channelInactiveKey = Keyword.intern(null, "channel-in-active");
    private static final Keyword exceptionCaughtKey = Keyword.intern(null, "exception-caught");

    private final PersistentArrayMap map;

    public CljChannelInHandler(PersistentArrayMap map) {
        this.map = map;
    }

    @Override
    public boolean isSharable() {
        if (this.map.containsKey(isSharableKey)) {
            final IFn f = (IFn) this.map.valAt(isSharableKey);
            return (Boolean) f.invoke();
        } else {
            return super.isSharable();
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (this.map.containsKey(channelActiveKey)) {
            final IFn f = (IFn) this.map.valAt(channelActiveKey);
            f.invoke(ctx);
        } else {
            super.channelActive(ctx);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (this.map.containsKey(channelInactiveKey)) {
            final IFn f = (IFn) this.map.valAt(channelInactiveKey);
            f.invoke(ctx);
        } else {
            super.channelInactive(ctx);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (this.map.containsKey(channelReadKey)) {
            final IFn f = (IFn) this.map.valAt(channelReadKey);
            f.invoke(ctx, msg);
        } else {
            super.channelRead(ctx, msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        if (this.map.containsKey(channelReadCompKey)) {
            final IFn f = (IFn) this.map.valAt(channelReadCompKey);
            f.invoke(ctx);
        } else {
            super.channelReadComplete(ctx);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if (this.map.containsKey(exceptionCaughtKey)) {
            final IFn f = (IFn) this.map.valAt(exceptionCaughtKey);
            f.invoke(ctx, cause);
        } else {

            logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
            ctx.close();
        }
    }
}
package arianrod.support;

import clojure.lang.IDeref;
import clojure.lang.IFn;
import clojure.lang.Keyword;
import clojure.lang.RT;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.ChannelMatcher;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class EventSource implements IDeref {

    public static Map<String, DefaultChannelGroup> groupMap = new ConcurrentHashMap<>();

    public Keyword type;
    public Object object;
    public ChannelHandlerContext ctx;


    public EventSource(ChannelHandlerContext ctx, Keyword type) {
        this.ctx = ctx;
        this.type = type;
    }

    public EventSource(ChannelHandlerContext ctx, Keyword type, Object object) {
        this(ctx, type);
        this.object = object;
    }

    public void addGroup(String ns) {
        if (!groupMap.containsKey(ns)) {
            DefaultChannelGroup defaultChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
            groupMap.put(ns, defaultChannelGroup);
        }
    }

    public void joinGroup(String ns) {
        this.addGroup(ns);
        groupMap.get(ns).add(ctx.channel());
    }

    public ChannelGroupFuture putGroup(String ns, Object msg) {
        if (groupMap.containsKey(ns)) {
            return groupMap.get(ns).writeAndFlush(msg);
        }
        return null;
    }

    public ChannelGroupFuture putGroup(String ns, Object msg, final IFn matcher) {
        if (groupMap.containsKey(ns)) {
            return groupMap.get(ns).writeAndFlush(msg, new ChannelMatcher() {
                @Override
                public boolean matches(Channel channel) {
                    Object res = matcher.invoke(channel);
                    return res instanceof Boolean && (Boolean) res;
                }
            });
        }
        return null;
    }

    public ChannelGroupFuture closeGroup(String ns) {
        if (groupMap.containsKey(ns)) {
            return groupMap.get(ns).close();
        }
        return null;
    }

    public ChannelGroupFuture closeGroup(String ns, final IFn matcher) {
        if (groupMap.containsKey(ns)) {
            return groupMap.get(ns).close(new ChannelMatcher() {
                @Override
                public boolean matches(Channel channel) {
                    Object res = matcher.invoke(channel);
                    return res instanceof Boolean && (Boolean) res;
                }
            });
        }
        return null;
    }

    public void sendRemote(String host, int port, final IFn initializer, final IFn channelWriter) throws InterruptedException{
        Bootstrap b = BootstrapFactory.createBootstrap(this.ctx.channel().getClass(), initializer);
        b = b.group(this.ctx.channel().eventLoop());
        Channel channel = b.connect(host, port).sync().channel();
        channelWriter.invoke(channel);
        channel.closeFuture().sync();

    }

    @Override
    public Object deref() {
        return RT.vector(this.type, this.object);
    }

    public Object get() {
        return this.object;
    }

    public EventSource put(Object o) {
        this.ctx.write(o);
        return this;
    }

    public EventSource putFlush(Object o) {
        this.ctx.writeAndFlush(o);
        return this;
    }

    public EventSource flush() {
        this.ctx.flush();
        return this;
    }

}
package arianrod.support;

import clojure.lang.IFn;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;

public class SimpleChannelInitializer extends ChannelInitializer<SocketChannel> {

    private final IFn callback;

    public SimpleChannelInitializer(IFn callback) {
        this.callback = callback;
    }

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        this.callback.invoke(ch.pipeline());
    }
}

package arianrod.support;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;

public class UnpooledWrapper {

    public static final ByteBuf wrappedBufferBytes(byte[] array) {
        return Unpooled.wrappedBuffer(array);
    }

    public static final ByteBuf wrappedBuffer(ByteBuf buf) {
        return Unpooled.wrappedBuffer(buf);
    }

    public static final ByteBuf wrappedBuffer(ByteBuffer buf) {
        return Unpooled.wrappedBuffer(buf);
    }

    public static final ByteBuf copiedBuffer(byte[] array) {
        return Unpooled.copiedBuffer(array);
    }

    public static final ByteBuf copiedBuffer(ByteBuf buf) {
        return Unpooled.copiedBuffer(buf);
    }

    public static final ByteBuf copiedBuffer(ByteBuffer buf) {
        return Unpooled.copiedBuffer(buf);
    }

    public static final ByteBuf copiedBuffer(CharSequence string, Charset charset) {
        return Unpooled.copiedBuffer(string, charset);
    }

    public static final ByteBuf createBuffer(int capacity) {
        return Unpooled.buffer(capacity);
    }

    public static final ByteBuf createDirectBuffer(int capacity) {
        return Unpooled.directBuffer(capacity);
    }

}

package arianrod.support;

import clojure.lang.IFn;
import clojure.lang.RT;
import clojure.lang.Var;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import java.util.Set;


public class BootstrapFactory {


    public static ServerBootstrap createSimpleServerBootstrap(Class<? extends ServerChannel> channelClass, final IFn handler, final Set filter) {
        Var var = RT.var("arianrod.server", "*debug*");
        Boolean debug = (Boolean) var.get();
        var = RT.var("arianrod.server", "*backlog*");
        int backlog = ((Long) var.get()).intValue();

        ServerBootstrap b = new ServerBootstrap();
        b.channel(channelClass)
            .option(ChannelOption.SO_BACKLOG, backlog)
            .option(ChannelOption.TCP_NODELAY, true)
            .childHandler(new ChannelInitializer<SocketChannel>() {

                    @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new ChannelEventHandler(handler, filter));
                    }
                });

        if (debug) {
            b.handler(new LoggingHandler(LogLevel.DEBUG));
        }

        return b;
    }


    public static ServerBootstrap createServerBootstrap(Class<? extends ServerChannel> channelClass, final IFn initializer) {
        Var var = RT.var("arianrod.server", "*debug*");
        Boolean debug = (Boolean) var.get();
        var = RT.var("arianrod.server", "*backlog*");
        int backlog = ((Long) var.get()).intValue();

        ServerBootstrap b = new ServerBootstrap();
        b.channel(channelClass)
            .option(ChannelOption.SO_BACKLOG, (int) backlog)
            .option(ChannelOption.TCP_NODELAY, true)
            .childHandler(new ChannelInitializer<SocketChannel>() {

                    @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                        initializer.invoke(ch.pipeline());
                    }
                });

        if (debug) {
            b.handler(new LoggingHandler(LogLevel.DEBUG));
        }

        return b;
    }

    public static Bootstrap createBootstrap(Class<? extends Channel> channelClass, final IFn initializer) {
        Var var = RT.var("arianrod.client", "*debug*");
        Boolean debug = (Boolean) var.get();

        Bootstrap b = new Bootstrap();
        b.channel(channelClass)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                        initializer.invoke(ch.pipeline());
                    }
                });

        if (debug) {
            b.handler(new LoggingHandler(LogLevel.DEBUG));
        }
        return b;
    }

    public static ChannelPipeline addLastPipeline(ChannelPipeline pipeline, String name, ChannelHandler handler) {
        return pipeline.addLast(name, handler);
    }

}

終わりに

素直に Go 書きましょう!

12
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?