スコープ
Web屋さん向け。また、やや実践寄りかも、と思います。
- Akka HTTPからAkka Streamへのつなぎ込み
- GraphDSLを用いて、簡単なフローグラフの構築
とりあえず書いてみた
調査とかトライアンドエラーがけっこう苦労した
libraryDependencies ++= {
val akkaV = "2.4.3"
Seq(
"com.typesafe.akka" %% "akka-stream" % akkaV,
"com.typesafe.akka" %% "akka-http-experimental" % akkaV
)
}
package mananan.eval.akka.stream
import akka.actor.ActorSystem
import akka.http.scaladsl._
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCode}
import akka.stream._
import akka.stream.scaladsl._
import scala.util.{Failure, Success, Try}
/**
* リバースプロクシのサンプルプログラム
* localhost:8080 で受けたリクエストをlocalhost:8081~8083に負荷分散する
*
* Created by まななん on 2016/04/10.
*/
object ReverseProxySample extends App {
implicit val system = ActorSystem("my-system")
implicit val materializer = ActorMaterializer()
// リバースプロキシからアクセスするエンドポイント一覧
case class Endpoint(host: String, port: Int)
val endpoints = for {
host <- "localhost" :: Nil
port <- 8081 to 8083
} yield Endpoint(host, port)
// サーバーリクエストをエンドポイントへのクライアントリクエストに変換するFlow(手抜き ※ゆえ、warnが出る)
val convertReqFlow = Flow[HttpRequest].map { (_, None) }
// Endpointの分だけrequestを渡すとresponseを戻すFlowをつくる
val httpClientFlows = endpoints.map { case Endpoint(host, port) =>
Http().cachedHostConnectionPool[None.type](host, port)
}
// エンドポイントからのクライアントレスポンスをサーバーレスポンスに変換するFlow(手抜き)
val convertResFlow = Flow[(Try[HttpResponse], None.type)].map {
case (Success(res), _) =>
res
case (Failure(e), _) =>
e.printStackTrace()
HttpResponse(StatusCode.int2StatusCode(500))
}
// リバースプロクシの動きをするFlow Graphをつくる
val reverseProxyGraph = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val input = builder.add(convertReqFlow)
val balance = builder.add(Balance[(HttpRequest, None.type)](httpClientFlows.size))
val merge = builder.add(Merge[(Try[HttpResponse], None.type)](httpClientFlows.size))
val output = builder.add(convertResFlow)
input ~> balance
httpClientFlows.foreach { flow =>
balance ~> flow ~> merge
}
merge ~> output
FlowShape(input.in, output.out)
}
val reverseProxyFlow = Flow.fromGraph(reverseProxyGraph)
// リバースプロクシを起てる
Http().bindAndHandle(reverseProxyFlow, "localhost", 8080)
// 動作デモ用にエンドポイントをローカルホストに起てる(どのパスにきてもエンドポイントのポート番号を戻すだけ)
endpoints.foreach { case Endpoint(host, port) =>
Http().bindAndHandle(server.Directives.complete(s"${port}"), host, port)
}
}
※2016.04.12追記 URIでアクセス先を振り分けるFlowGraphなら、 Partition 構文を使えばOK
val reverseProxyGraph = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val input = builder.add(convertReqFlow)
val partition = builder.add(Partition[(HttpRequest, None.type)](httpClientFlows.size, { case (req, _) =>
req.uri.toRelative.toString match {
case uri if uri.startsWith("/hoge") => 0
case uri if uri.startsWith("/piyo") => 1
case _ => 2
}
}))
val merge = builder.add(Merge[(Try[HttpResponse], None.type)](httpClientFlows.size))
val output = builder.add(convertResFlow)
input ~> partition
httpClientFlows.foreach { flow =>
partition ~> flow ~> merge
}
merge ~> output
FlowShape(input.in, output.out)
}
要約
- HTTPサーバーは、requestを与えたらresponseを戻す何らか(Flowまたはコールバック)を与えると動いてくれる
- HTTPクライアントは、「request(とVoluntary)を与えたらTry[response](とさっきのVoluntary)を戻すFlow」の形で提供されてる。
- GraphDSLは、builderのインスタンスにFlowを登録すると ~> 構文でつなげられる。
- Fan-out/Fan-inの動作として、Balance以外にもBroadcastやPartition,Zip,Unzipなどいろんな働きをするDSLがある
- GraphDSLは、部分的なGraphも生成できる。例えばPartialFlowの場合、最後にFlowShapeでFlowの始点と終点を指定して〆る
- 参考にした資料について。今回の件でもっと深く知りたければ下記サイトをご確認ください
http://doc.akka.io/docs/akka/2.4.3/scala/http/low-level-server-side-api.html
http://doc.akka.io/docs/akka/2.4.3/scala/http/client-side/index.html
http://doc.akka.io/docs/akka/2.4.3/scala/stream/stream-graphs.html
http://doc.akka.io/docs/akka/2.4.3/scala/stream/stream-composition.html
http://labs.septeni.co.jp/entry/2016/01/13/100955 (中で使ってるDSLも新しいし、一番参考になった感)
個人的な感想
なんか、自分の知ってるアクターシステムのAkkaじゃないw
(けど、素のActorSystemとのつなぎ込みもできるみたい)
しかし、もし自前実装したら面倒くさそうな背圧制御を隠ぺいしてここまで簡単に書けるのはすばらしいと思う。
気になったのが、1つのAkkaHTTPサーバーの待ち受けポートに複数のFlowをバインドしつつ個別にアンバインドといったことができなさそう?そのへん動的にうまくできないかな。
本番で使うなら、設定からFlowDSLをおこし、それをもとに設定の動的リロードを無停止(Graceful的に)でやりたいところ。
それの実装が若干面倒くなりそうな印象を受けた。
(妄想だけど、HTTPサーバとは独立したStreamを作って、設定の入れ替え時にはStreamごと入れ替えとか?)