始めに
前回に引き続き、今回はアクター部分を作成します。
構成
- IntelliJ 2018
- Scala 2.12.5
- sbt 1.1.4
- Akka 2.5.11
- AkkaHttp 10.1.0
- Slick 3.2.3
- Scalaz 7.2.19
成果物
アクター
早速、アクターを作ります。まずはsrc/main/resources/application.confに以下を追記します。
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "INFO"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
provider = "akka.actor.LocalActorRefProvider"
}
}
合わせて、logbackを使っているので以下も追加。
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %m%n</pattern>
</encoder>
</appender>
<logger name="todo.api" level="INFO"/>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
ルートパッケージ配下にactorパッケージを追加します。
APIルート(AkkaHttp)からAskでコマンドを受け取り、レポジトリの処理結果をakka.pattern.pipe
で返答するイメージですが、Future[\(バックスラッシュ)/[Throwable, T]]
をそのまま返すのは個人的にいまいち感があるので、Futureの結果がLeftだった場合はakka.actor.Status.Failure
で返して、AkkaHttpのExceptionHandlerで掴めるようにしてみます。
package todo.api.actor
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import akka.actor.{Actor, ActorRef, Status}
import akka.pattern.PipeToSupport
import scalaz.{-\/, \/, \/-}
// Future[\/[L, R]]をpipeで送れるようにサポート
trait EitherPipeToSupport extends PipeToSupport {
final class EitherPipeableFuture[L <: Throwable, R](val future: Future[\/[L, R]])(
implicit ec: ExecutionContext) {
// EitherがLeftの場合はakka.actor.Status.Failureへ変換
def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = Actor.noSender): Future[\/[L, R]] =
future andThen {
case Success(\/-(r)) =>
recipient ! r
case Success(-\/(f)) =>
recipient ! Status.Failure(f)
case Failure(f) =>
recipient ! Status.Failure(f)
}
def to(recipient: ActorRef): EitherPipeableFuture[L, R] = to(recipient, Actor.noSender)
def to(recipient: ActorRef, sender: ActorRef): EitherPipeableFuture[L, R] = {
pipeTo(recipient)(sender)
this
}
}
// Future[\/[L, R]] -> EitherPipeableFuture
implicit def eitherPipe[L <: Throwable, R](future: Future[\/[L, R]])(
implicit ec: ExecutionContext): EitherPipeableFuture[L, R] = new EitherPipeableFuture(future)
}
上記をミックスインしてアクターを作りますが、APIルートとレポジトリには依存関係を持たせたくないのでアクターに中間モデルを定義しています。
また、レポジトリが返すFuture[\(バックスラッシュ)/[Throwable, T]]
の文脈を保持したまま処理したいのでEitherT(モナドトランスフォーマー)を利用します。
package todo.api.actor
import scala.concurrent.{ExecutionContext, Future}
import akka.actor.{Actor, ActorLogging, Props}
import scalaz.{EitherT, \/}
import scalaz.std.scalaFuture.futureInstance
import todo.api.repository.TodoRepository
object TodoActor {
def props(todoRepository: TodoRepository) = Props(new TodoActor(todoRepository))
// 受信系
sealed trait Command
final case object FindAllCommand extends Command
final case class FindByIdCommand(id: Int) extends Command
final case class CreateCommand(body: String) extends Command
final case class UpdateCommand(id: Int, body: String) extends Command
final case class DeleteCommand(id: Int) extends Command
// 返信系
sealed trait Reply
final case class TodoReply(id: Int, body: String) extends Reply
final case class CreatedReply(id: Int) extends Reply
final case object UpdatedReply extends Reply
final case object DeletedReply extends Reply
}
// TodoRepositoryをインジェクション
class TodoActor(todoRepository: TodoRepository)
extends Actor
with ActorLogging
with EitherPipeToSupport {
import TodoActor._
import todo.api.repository.Model._
implicit val executor: ExecutionContext = context.dispatcher
// Future[\/[A, B]] -> EitherT[Future, A, B]
implicit class RichFutureEither[A, B](self: Future[\/[A, B]]) {
def toEitherT: EitherT[Future, A, B] = EitherT[Future, A, B](self)
}
private def findAll() =
for {
todos <- todoRepository.findAll().toEitherT
} yield todos.map(t => TodoReply(t.id, t.body))
private def findById(cmd: FindByIdCommand) =
for {
todo <- todoRepository.findById(cmd.id).toEitherT
} yield todo.map(t => TodoReply(t.id, t.body))
private def create(cmd: CreateCommand) =
for {
createdId <- todoRepository.create(Todo(0, cmd.body)).toEitherT
} yield CreatedReply(createdId)
private def update(cmd: UpdateCommand) =
for {
_ <- todoRepository.update(Todo(cmd.id, cmd.body)).toEitherT
} yield UpdatedReply
private def delete(cmd: DeleteCommand) =
for {
_ <- todoRepository.delete(cmd.id).toEitherT
} yield DeletedReply
override def preStart(): Unit = log.info("starting todo actor.")
override def postStop(): Unit = log.info("stopping todo actor.")
override def receive: Receive = {
case FindAllCommand =>
log.info("receive find all command.")
eitherPipe(findAll().run) to sender()
()
case cmd: FindByIdCommand =>
log.info("receive find by id command.")
eitherPipe(findById(cmd).run) to sender()
()
case cmd: CreateCommand =>
log.info("receive create command.")
eitherPipe(create(cmd).run) to sender()
()
case cmd: UpdateCommand =>
log.info("receive update command.")
eitherPipe(update(cmd).run) to sender()
()
case cmd: DeleteCommand =>
log.info("receive delete command.")
eitherPipe(delete(cmd).run) to sender()
()
case unknown =>
log.error(s"receive unknown type. type: ${unknown.getClass.getName}")
}
}
なんだか回りくどい感じもしますが、今の実力ではこれが精一杯・・・。
Supervisor
APIルートとの間にSuperviorを置くので定義します。
まずはTodoActorのアクター名を共有するためにパッケージオブジェクトを作成します。
package todo.api
package object actor {
final val TODO_ACTOR_NAME = "todo"
}
SupervisorはTodoActorのPropsを受け取り、子アクターとして監視下に置きます。TodoActorで例外が発生すると継続(Resume)を指示します。
package todo.api.actor
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
import akka.actor.SupervisorStrategy.Resume
import akka.actor.{Actor, ActorLogging, OneForOneStrategy, Props, SupervisorStrategy, Terminated}
object TodoSupervisor {
def props(maxRetries: Int, timeRange: Duration) = Props(new TodoSupervisor(maxRetries, timeRange))
final case class RegistrationCommand(props: Props)
}
class TodoSupervisor(maxRetries: Int, timeRange: Duration) extends Actor with ActorLogging {
import TodoSupervisor._
// 例外はログに出して継続
override def supervisorStrategy: SupervisorStrategy =
OneForOneStrategy(maxNrOfRetries = maxRetries, withinTimeRange = timeRange) {
case NonFatal(ex) =>
log.error(s"supervisor caught error. resume children. error: ${ex.getMessage}")
Resume
}
override def receive: Receive = {
case RegistrationCommand(props) =>
log.info("receive registration command.")
context.watch(context.actorOf(props, TODO_ACTOR_NAME))
()
case cmd: TodoActor.Command =>
context.child(TODO_ACTOR_NAME).foreach(_ forward cmd)
case Terminated(child) =>
log.warning(s"terminated child. path: ${child.path}")
case unknown =>
log.error(s"receive unknown type. type: ${unknown.getClass.getName}")
}
}
終わりに
次回はAPIルートです。