10
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Akka HTTPからAkka Streamへのつなぎ込みとGraphDSLで単純なリバースプロクシを構築

Last updated at Posted at 2016-04-10

スコープ

Web屋さん向け。また、やや実践寄りかも、と思います。

  • Akka HTTPからAkka Streamへのつなぎ込み
  • GraphDSLを用いて、簡単なフローグラフの構築

とりあえず書いてみた

調査とかトライアンドエラーがけっこう苦労した :sweat_smile:

build.sbt(一部のみ)
libraryDependencies ++= {
  val akkaV = "2.4.3"
  Seq(
    "com.typesafe.akka" %% "akka-stream" % akkaV,
    "com.typesafe.akka" %% "akka-http-experimental" % akkaV
  )
}
ReverseProxySample.scala
package mananan.eval.akka.stream

import akka.actor.ActorSystem
import akka.http.scaladsl._
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCode}
import akka.stream._
import akka.stream.scaladsl._

import scala.util.{Failure, Success, Try}

/**
  * リバースプロクシのサンプルプログラム
  * localhost:8080 で受けたリクエストをlocalhost:8081~8083に負荷分散する
  * 
  * Created by まななん on 2016/04/10.
  */
object ReverseProxySample extends App {
    implicit val system = ActorSystem("my-system")
    implicit val materializer = ActorMaterializer()

    // リバースプロキシからアクセスするエンドポイント一覧
    case class Endpoint(host: String, port: Int)
    val endpoints = for {
        host <- "localhost" :: Nil
        port <- 8081 to 8083
    } yield Endpoint(host, port)

    // サーバーリクエストをエンドポイントへのクライアントリクエストに変換するFlow(手抜き ※ゆえ、warnが出る)
    val convertReqFlow = Flow[HttpRequest].map { (_, None) }

    // Endpointの分だけrequestを渡すとresponseを戻すFlowをつくる
    val httpClientFlows = endpoints.map { case Endpoint(host, port) =>
        Http().cachedHostConnectionPool[None.type](host, port)
    }

    // エンドポイントからのクライアントレスポンスをサーバーレスポンスに変換するFlow(手抜き)
    val convertResFlow = Flow[(Try[HttpResponse], None.type)].map {
        case (Success(res), _) =>
            res
        case (Failure(e), _) =>
            e.printStackTrace()
            HttpResponse(StatusCode.int2StatusCode(500))
    }

    // リバースプロクシの動きをするFlow Graphをつくる
    val reverseProxyGraph = GraphDSL.create() { implicit builder =>
        import GraphDSL.Implicits._

        val input = builder.add(convertReqFlow)
        val balance = builder.add(Balance[(HttpRequest, None.type)](httpClientFlows.size))
        val merge = builder.add(Merge[(Try[HttpResponse], None.type)](httpClientFlows.size))
        val output = builder.add(convertResFlow)

        input ~> balance
        httpClientFlows.foreach { flow =>
            balance ~> flow ~> merge
        }
        merge ~> output

        FlowShape(input.in, output.out)
    }
    val reverseProxyFlow = Flow.fromGraph(reverseProxyGraph)

    // リバースプロクシを起てる   
    Http().bindAndHandle(reverseProxyFlow, "localhost", 8080)

    // 動作デモ用にエンドポイントをローカルホストに起てる(どのパスにきてもエンドポイントのポート番号を戻すだけ)
    endpoints.foreach { case Endpoint(host, port) =>
        Http().bindAndHandle(server.Directives.complete(s"${port}"), host, port)
    }
}

※2016.04.12追記 URIでアクセス先を振り分けるFlowGraphなら、 Partition 構文を使えばOK

ReverseProxySample.scala(一部のみ、Partitionの場合)
    val reverseProxyGraph = GraphDSL.create() { implicit builder =>
        import GraphDSL.Implicits._

        val input = builder.add(convertReqFlow)
        val partition = builder.add(Partition[(HttpRequest, None.type)](httpClientFlows.size, { case (req, _) =>
            req.uri.toRelative.toString match {
                case uri if uri.startsWith("/hoge") => 0
                case uri if uri.startsWith("/piyo") => 1
                case _ => 2
            }
        }))
        val merge = builder.add(Merge[(Try[HttpResponse], None.type)](httpClientFlows.size))
        val output = builder.add(convertResFlow)

        input ~> partition
        httpClientFlows.foreach { flow =>
            partition ~> flow ~> merge
        }
        merge ~> output

        FlowShape(input.in, output.out)
    }

要約

  • HTTPサーバーは、requestを与えたらresponseを戻す何らか(Flowまたはコールバック)を与えると動いてくれる
  • HTTPクライアントは、「request(とVoluntary)を与えたらTry[response](とさっきのVoluntary)を戻すFlow」の形で提供されてる。
  • GraphDSLは、builderのインスタンスにFlowを登録すると ~> 構文でつなげられる。
  • Fan-out/Fan-inの動作として、Balance以外にもBroadcastやPartition,Zip,Unzipなどいろんな働きをするDSLがある
  • GraphDSLは、部分的なGraphも生成できる。例えばPartialFlowの場合、最後にFlowShapeでFlowの始点と終点を指定して〆る
  • 参考にした資料について。今回の件でもっと深く知りたければ下記サイトをご確認ください

http://doc.akka.io/docs/akka/2.4.3/scala/http/low-level-server-side-api.html
http://doc.akka.io/docs/akka/2.4.3/scala/http/client-side/index.html
http://doc.akka.io/docs/akka/2.4.3/scala/stream/stream-graphs.html
http://doc.akka.io/docs/akka/2.4.3/scala/stream/stream-composition.html
http://labs.septeni.co.jp/entry/2016/01/13/100955 (中で使ってるDSLも新しいし、一番参考になった感)

個人的な感想

なんか、自分の知ってるアクターシステムのAkkaじゃないw
(けど、素のActorSystemとのつなぎ込みもできるみたい)
しかし、もし自前実装したら面倒くさそうな背圧制御を隠ぺいしてここまで簡単に書けるのはすばらしいと思う。

気になったのが、1つのAkkaHTTPサーバーの待ち受けポートに複数のFlowをバインドしつつ個別にアンバインドといったことができなさそう?そのへん動的にうまくできないかな。
本番で使うなら、設定からFlowDSLをおこし、それをもとに設定の動的リロードを無停止(Graceful的に)でやりたいところ。
それの実装が若干面倒くなりそうな印象を受けた。
(妄想だけど、HTTPサーバとは独立したStreamを作って、設定の入れ替え時にはStreamごと入れ替えとか?)

10
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?