はじめに:Akka-httpでできること
akka-httpを利用することで、簡単にRest,もしくはステートフルなJson APIを構築することができる。
また、Akka-typedを組み合わせることで、シンプルな構成にできる。
サンプルコード
ビルド
plugins.sbt
addSbtPlugin("io.gatling" % "gatling-sbt" % "3.2.1")
build.sbt
name := "fakeactor"
version := "0.1"
scalaVersion := "2.13.5"
val ScalaTest: String = "3.2.2"
val AkkaHttpVersion: String = "10.2.4"
val AkkaActor: String = "2.6.12"
val gatlingVersion = "3.5.1"
val karateVersion = "1.0.0"
fork := true
//sbt-gatlingがmaven centralに存在しないので、bintrayRepoを追加する
resolvers += Resolver.bintrayRepo("gatling", "sbt-plugins")
enablePlugins(GatlingPlugin)
libraryDependencies ++= Seq(
"io.gatling.highcharts" % "gatling-charts-highcharts" % gatlingVersion % "test,it",
"io.gatling" % "gatling-test-framework" % gatlingVersion % "test,it",
"com.intuit.karate" % "karate-gatling" % karateVersion % "test,it",
"org.scalactic" %% "scalactic" % ScalaTest,
"org.scalatest" %% "scalatest-wordspec" % ScalaTest % "test",
"org.scalatest" %% "scalatest-diagrams" % ScalaTest % "test",
"org.scalatest" %% "scalatest-shouldmatchers" % ScalaTest % "test",
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-stream" % AkkaActor,
"com.typesafe.akka" %% "akka-actor-typed" % AkkaActor,
"com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaActor % Test,
"ch.qos.logback" % "logback-classic" % "1.2.3"
)
Main
Main-httpサーバ
HttpServer.scala
import akka.http.scaladsl.server.Route
import akka.util.Timeout
import counter.actor.Command._
import spray.json.DefaultJsonProtocol._
import spray.json.RootJsonFormat
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.io.StdIn
import scala.util.{Failure, Success}
object HttpServer {
def main(args: Array[String]): Unit = {
// these are from spray-json
implicit val system: ActorSystem[Command] =
ActorSystem(CounterActor(), "http-typed-counter")
// needed for the future flatMap/onComplete in the end
implicit val executionContext: ExecutionContext = system.executionContext
val route: Route = pathPrefix("counter") {
concat(
path("reset") {
put {
system ! Reset
complete(StatusCodes.Accepted, "Reset")
}
},
path("increment") {
put {
implicit val incrementFormat: RootJsonFormat[Command.Increase] =
jsonFormat1(Command.Increase)
entity(as[Command.Increase]) { command => // place a bid, fire-and-forget
system ! command
complete(StatusCodes.Accepted, "increment")
}
}
},
path("decrement") {
put {
implicit val decrementFormat: RootJsonFormat[Command.Decrease] =
jsonFormat1(Command.Decrease)
entity(as[Command.Decrease]) { command => // place a bid, fire-and-forget
system ! command
complete(StatusCodes.Accepted, "decrement")
}
}
},
get {
implicit val counterResponseFormat: RootJsonFormat[Response.Count] =
jsonFormat1(Response.Count)
implicit val timeout: Timeout = 5.seconds
// query the actor for the current auction state
val getCount: Future[Response.Count] =
system.ask(ReadCount).mapTo[Response.Count]
complete(getCount)
}
)
}
val bindingFuture: Future[Http.ServerBinding] =
Http(system).newServerAt("localhost", 9080).bind(route)
println(s"Server online at http://localhost:9080/\nPress RETURN to stop...")
//StdIn.readLine() // let it run until user presses return
bindingFuture
//.flatMap(_.unbind())
.onComplete {
case Success(value) =>
println("finish")
//system.terminate()
case Failure(exception) =>
println("fail")
system.terminate()
} // and shutdown when done
}
}
Main-Actor
Command.scala
package counter.actor
import akka.actor.typed.ActorRef
/** Actrorへのコマンド */
sealed trait Command
private[counter] object Command {
/** 初期値で初期化処理を行う */
final case object Reset extends Command
/** カウンターの値をvalueだけ増やす */
final case class Increase(value: Int) extends Command
/** カウンターの値をvalueだけ減らす */
final case class Decrease(value: Int) extends Command
// Responseを持つコマンドはなるべく分けて定義する
/** その時点のカウンターの値を取得する */
final case class ReadCount(replyTo: ActorRef[Response]) extends Command
}
/** Actrorからのメッセージ */
sealed trait Response
private[counter] object Response {
final case class Count(value: Int) extends Response
}
CounterActor.scala
package counter.actor
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior}
/** 計算ロジック */
private[counter] object CounterActor {
/** 初期値 */
private val InitialCount: Int = 0
/** 初期値で初期化処理を行う */
def apply(): Behavior[Command] = Behaviors.setup { context =>
context.log.info(s"actor started: ${context.self.path}")
changeCount(InitialCount)
}
/** nextCountに値を変更する */
private def changeCount(nextCount: Int): Behavior[Command] = {
counter(nextCount)
}
/** Behavior(コマンドに対する振る舞い) */
private def counter(currentCount: Int): Behaviors.Receive[Command] =
Behaviors.receive { (context: ActorContext[Command], command: Command) =>
context.log.debug(s"command:$command,current:$currentCount")
command match {
case Command.Kill => killed()
case Command.Reset => changeCount(InitialCount)
case Command.Increase(value: Int) => changeCount(currentCount + value)
case Command.Decrease(value: Int) => changeCount(currentCount - value)
case Command.ReadCount(replyTo: ActorRef[Response]) =>
replyTo ! Response.Count(currentCount)
Behaviors.same
}
}
}
テスト
テスト(UT)
CounterActorTest.scala
package counter.actor
import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe}
import akka.actor.typed.ActorRef
import org.scalatest.diagrams.Diagrams
import org.scalatest.wordspec.AnyWordSpecLike
class CounterActorTest
extends ScalaTestWithActorTestKit
with AnyWordSpecLike
with Diagrams {
// ScalaTestWithActorTestKitを継承しているので、テストがすべて終わったら、ActorSystem はシャットダウンされる。
"コマンド単体テスト" when afterWord("初期値=0") {
val actor: ActorRef[Command] =
testKit.spawn(CounterActor(), "typed-actor-test")
val resProbe: TestProbe[Response] =
testKit.createTestProbe[Response]
"ReadCount" must {
"count=0" in {
actor ! Command.Reset
actor ! Command.ReadCount(resProbe.ref)
resProbe.expectMessage(Response.Count(0))
// パターン2でもよい(複雑なアサーション向け)
// val res: Response.Count = resProbe.receiveMessage()
// assert(res.value === 0)
}
}
"Increase(n)" must {
val change: Int = 21
s"count+=$change" in {
actor ! Command.Reset
actor ! Command.Increase(change)
actor ! Command.ReadCount(resProbe.ref)
resProbe.expectMessage(Response.Count(21))
}
}
"Decrease(n)" must {
val change: Int = 21
s"count-=$change" in {
actor ! Command.Reset
actor ! Command.Decrease(change)
actor ! Command.ReadCount(resProbe.ref)
resProbe.expectMessage(Response.Count(-1 * change))
}
}
"kill" must {
s"killed" in {
actor ! Command.Reset
actor ! Command.Kill
actor ! Command.ReadCount(resProbe.ref)
resProbe.expectMessage(Response.Killed)
}
}
}
"サイクルテスト" when afterWord("多数回加算したとき") {
val actor: ActorRef[Command] =
testKit.spawn(CounterActor(), "typed-actor-test-loop")
val resProbe: TestProbe[Response] =
testKit.createTestProbe[Response]
"Increase(n)" must {
s"100回加算でき、結果も一致すること" in {
actor ! Command.Reset
(1 to 100).map(Command.Increase).foreach(actor.tell)
actor ! Command.ReadCount(resProbe.ref)
resProbe.receiveMessage() match {
case Response.Count(value: Int) => {
println(value)
assert(value === (1 to 100).sum)
}
case Response.Killed => {
fail("killed")
}
}
}
}
}
}
テスト(E2E)
KarateE2ESimulation.scala
package counter.actor
import com.intuit.karate.gatling.KarateProtocol
import com.intuit.karate.gatling.PreDef._
import com.intuit.karate.http.HttpRequest
import io.gatling.core.Predef._
import io.gatling.core.structure.ScenarioBuilder
/** E2E */
class KarateE2ESimulation extends Simulation {
val protocol: KarateProtocol = karateProtocol()
protocol.nameResolver = (req: HttpRequest, _) => req.getUrl
val search: ScenarioBuilder =
scenario("e2e").exec(karateFeature("classpath:counter.feature"))
setUp(search.inject(atOnceUsers(1)).protocols(protocol))
}
karate-config.js
function() {
var config = {
urlBase: 'http://localhost:9080/',
};
karate.configure('connectTimeout', 5000);
karate.configure('readTimeout', 5000);
return config;
}
counter.feature
Feature: Karate sample
Background:
* url urlBase
Scenario Outline: e2e-once
Given path <path>
And header Accept = <values>
And request <req>
When method <method>
* print <number>
Then status <status>
And match <expression>
Examples:
| number | path | req | values | method | status | expression |
| 1 | 'counter/reset' | "#ignore" | 'text/plain' | put | 202 | $ == "#ignore" |
| 2 | 'counter' | "#ignore" | 'application/json' | get | 200 | $ == {"value": 0} |
| 3 | 'counter/increment' | {"value": 4} | 'text/plain' | put | 202 | $ == "#ignore" |
| 4 | 'counter' | "#ignore" | 'application/json' | get | 200 | $ == {"value": 4} |
| 5 | 'counter/decrement' | {"value": 1} | 'text/plain' | put | 202 | $ == "#ignore" |
| 6 | 'counter' | "#ignore" | 'application/json' | get | 200 | $ == {"value": 3} |
| 7 | 'counter/reset' | "#ignore" | 'text/plain' | put | 202 | $ == "#ignore" |
| 8 | 'counter' | "#ignore" | 'application/json' | get | 200 | $ == {"value": 0} |
| 9 | 'counter/increment' | {"value": 2} | 'text/plain' | put | 202 | $ == "#ignore" |
| 10 | 'counter' | "#ignore" | 'application/json' | get | 200 | $ == {"value": 2} |
テスト(負荷テスト)
KarateGatlingSimulation.scala
import com.intuit.karate.gatling.KarateProtocol
import com.intuit.karate.gatling.PreDef._
import com.intuit.karate.http.HttpRequest
import io.gatling.core.Predef._
import io.gatling.core.controller.inject.open.OpenInjectionStep
import io.gatling.core.structure.ScenarioBuilder
import scala.concurrent.duration._
class KarateGatlingSimulation extends Simulation {
// Karate定義
// Gatling-開始時のレート(1ステップ目の1/3程度を推奨)
val startRate: Int = 5
// 安定負荷のレートステップ(TPSはfeatureのパターン×ユーザの数となるので注意)
val stableRates: Seq[Int] = Seq(15, 30)
// 負荷を段階的に高める時間
val rampUpTime: FiniteDuration = 10.seconds
// 安定負荷のテスト時間(TPSはfeatureのパターン×ユーザの数となるので注意)
val stableTime: FiniteDuration = 30.seconds
val steps: Seq[OpenInjectionStep] =
stableRates
.foldLeft((Seq.empty[OpenInjectionStep], startRate))(rampRateBuilder)
._1
private def rampRateBuilder(result: (Seq[OpenInjectionStep], Int),
next: Int): (Seq[OpenInjectionStep], Int) = {
(result._1 ++ Seq(rampUsersPerSec(result._2) to next during rampUpTime,
constantUsersPerSec(next) during stableTime),
next)
}
val protocol: KarateProtocol = karateProtocol()
protocol.nameResolver = (req: HttpRequest, _) => req.getUrl
// Karateシナリオ定義
val search: ScenarioBuilder =
scenario("get").exec(karateFeature("classpath:counter.feature"))
// Karate・Gatlingシナリオ紐づけ
setUp(search.inject(steps).protocols(protocol))
}
karate-config.js
function() {
var config = {
urlBase: 'http://localhost:9080/',
};
karate.configure('connectTimeout', 5000);
karate.configure('readTimeout', 5000);
return config;
}
counter.feature
Feature: Karate sample
Background:
* url urlBase
Scenario Outline: get
Given path <path>
And header Accept = <values>
And request <req>
When method <method>
* print <number>
Then status <status>
And match <expression>
Examples:
| number | path | req | values | method | status | expression |
| 1 | 'counter/reset' | "#ignore" | 'text/plain' | put | 202 | $ == "#ignore" |
| 2 | 'counter' | "#ignore" | 'application/json' | get | 200 | $ == {"value": #number} |
| 3 | 'counter/increment' | {"value": 4} | 'text/plain' | put | 202 | $ == "#ignore" |
| 4 | 'counter' | "#ignore" | 'application/json' | get | 200 | $ == {"value": #number} |
| 5 | 'counter/decrement' | {"value": 1} | 'text/plain' | put | 202 | $ == "#ignore" |
| 6 | 'counter' | "#ignore" | 'application/json' | get | 200 | $ == {"value": #number} |
| 7 | 'counter/reset' | "#ignore" | 'text/plain' | put | 202 | $ == "#ignore" |
| 8 | 'counter' | "#ignore" | 'application/json' | get | 200 | $ == {"value": #number} |
| 9 | 'counter/increment' | {"value": 2} | 'text/plain' | put | 202 | $ == "#ignore" |
| 10 | 'counter' | "#ignore" | 'application/json' | get | 200 | $ == {"value": #number} |