Scala
Akka
slick

Akka+AkkaHttp+SlickでTODOアプリAPI(その3)

始めに

前回に引き続き、今回はアクター部分を作成します。

構成

  • IntelliJ 2018
  • Scala 2.12.5
  • sbt 1.1.4
  • Akka 2.5.11
  • AkkaHttp 10.1.0
  • Slick 3.2.3
  • Scalaz 7.2.19

成果物

https://github.com/lightstaff/scala-todo-api

アクター

早速、アクターを作ります。まずはsrc/main/resources/application.confに以下を追記します。

application.conf
akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "INFO"
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"

  actor {
    provider = "akka.actor.LocalActorRefProvider"
  }
}

合わせて、logbackを使っているので以下も追加。

logback.xml
<configuration>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <!-- encoders are assigned the type
     ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %m%n</pattern>
        </encoder>
    </appender>

    <logger name="todo.api" level="INFO"/>

    <root level="INFO">
        <appender-ref ref="STDOUT"/>
    </root>
</configuration>

ルートパッケージ配下にactorパッケージを追加します。
APIルート(AkkaHttp)からAskでコマンドを受け取り、レポジトリの処理結果をakka.pattern.pipeで返答するイメージですが、Future[\(バックスラッシュ)/[Throwable, T]]をそのまま返すのは個人的にいまいち感があるので、Futureの結果がLeftだった場合はakka.actor.Status.Failureで返して、AkkaHttpのExceptionHandlerで掴めるようにしてみます。

EitherPipeToSupport.scala
package todo.api.actor

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

import akka.actor.{Actor, ActorRef, Status}
import akka.pattern.PipeToSupport
import scalaz.{-\/, \/, \/-}

// Future[\/[L, R]]をpipeで送れるようにサポート
trait EitherPipeToSupport extends PipeToSupport {

  final class EitherPipeableFuture[L <: Throwable, R](val future: Future[\/[L, R]])(
      implicit ec: ExecutionContext) {

    // EitherがLeftの場合はakka.actor.Status.Failureへ変換
    def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = Actor.noSender): Future[\/[L, R]] =
      future andThen {
        case Success(\/-(r)) =>
          recipient ! r
        case Success(-\/(f)) =>
          recipient ! Status.Failure(f)
        case Failure(f) =>
          recipient ! Status.Failure(f)
      }

    def to(recipient: ActorRef): EitherPipeableFuture[L, R] = to(recipient, Actor.noSender)

    def to(recipient: ActorRef, sender: ActorRef): EitherPipeableFuture[L, R] = {
      pipeTo(recipient)(sender)
      this
    }

  }

  // Future[\/[L, R]] -> EitherPipeableFuture
  implicit def eitherPipe[L <: Throwable, R](future: Future[\/[L, R]])(
      implicit ec: ExecutionContext): EitherPipeableFuture[L, R] = new EitherPipeableFuture(future)

}

上記をミックスインしてアクターを作りますが、APIルートとレポジトリには依存関係を持たせたくないのでアクターに中間モデルを定義しています。
また、レポジトリが返すFuture[\(バックスラッシュ)/[Throwable, T]]の文脈を保持したまま処理したいのでEitherT(モナドトランスフォーマー)を利用します。

TodoActor.scala
package todo.api.actor

import scala.concurrent.{ExecutionContext, Future}

import akka.actor.{Actor, ActorLogging, Props}
import scalaz.{EitherT, \/}
import scalaz.std.scalaFuture.futureInstance

import todo.api.repository.TodoRepository

object TodoActor {

  def props(todoRepository: TodoRepository) = Props(new TodoActor(todoRepository))

  // 受信系
  sealed trait Command

  final case object FindAllCommand extends Command

  final case class FindByIdCommand(id: Int) extends Command

  final case class CreateCommand(body: String) extends Command

  final case class UpdateCommand(id: Int, body: String) extends Command

  final case class DeleteCommand(id: Int) extends Command

  // 返信系
  sealed trait Reply

  final case class TodoReply(id: Int, body: String) extends Reply

  final case class CreatedReply(id: Int) extends Reply

  final case object UpdatedReply extends Reply

  final case object DeletedReply extends Reply

}

// TodoRepositoryをインジェクション
class TodoActor(todoRepository: TodoRepository)
    extends Actor
    with ActorLogging
    with EitherPipeToSupport {

  import TodoActor._
  import todo.api.repository.Model._

  implicit val executor: ExecutionContext = context.dispatcher

  // Future[\/[A, B]] -> EitherT[Future, A, B]
  implicit class RichFutureEither[A, B](self: Future[\/[A, B]]) {

    def toEitherT: EitherT[Future, A, B] = EitherT[Future, A, B](self)

  }

  private def findAll() =
    for {
      todos <- todoRepository.findAll().toEitherT
    } yield todos.map(t => TodoReply(t.id, t.body))

  private def findById(cmd: FindByIdCommand) =
    for {
      todo <- todoRepository.findById(cmd.id).toEitherT
    } yield todo.map(t => TodoReply(t.id, t.body))

  private def create(cmd: CreateCommand) =
    for {
      createdId <- todoRepository.create(Todo(0, cmd.body)).toEitherT
    } yield CreatedReply(createdId)

  private def update(cmd: UpdateCommand) =
    for {
      _ <- todoRepository.update(Todo(cmd.id, cmd.body)).toEitherT
    } yield UpdatedReply

  private def delete(cmd: DeleteCommand) =
    for {
      _ <- todoRepository.delete(cmd.id).toEitherT
    } yield DeletedReply

  override def preStart(): Unit = log.info("starting todo actor.")

  override def postStop(): Unit = log.info("stopping todo actor.")

  override def receive: Receive = {
    case FindAllCommand =>
      log.info("receive find all command.")
      eitherPipe(findAll().run) to sender()
      ()
    case cmd: FindByIdCommand =>
      log.info("receive find by id command.")
      eitherPipe(findById(cmd).run) to sender()
      ()
    case cmd: CreateCommand =>
      log.info("receive create command.")
      eitherPipe(create(cmd).run) to sender()
      ()
    case cmd: UpdateCommand =>
      log.info("receive update command.")
      eitherPipe(update(cmd).run) to sender()
      ()
    case cmd: DeleteCommand =>
      log.info("receive delete command.")
      eitherPipe(delete(cmd).run) to sender()
      ()
    case unknown =>
      log.error(s"receive unknown type. type: ${unknown.getClass.getName}")
  }

}

なんだか回りくどい感じもしますが、今の実力ではこれが精一杯・・・。

Supervisor

APIルートとの間にSuperviorを置くので定義します。
まずはTodoActorのアクター名を共有するためにパッケージオブジェクトを作成します。

actor.scala
package todo.api

package object actor {

  final val TODO_ACTOR_NAME = "todo"

}

SupervisorはTodoActorのPropsを受け取り、子アクターとして監視下に置きます。TodoActorで例外が発生すると継続(Resume)を指示します。

TodoSupervisor.scala
package todo.api.actor

import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

import akka.actor.SupervisorStrategy.Resume
import akka.actor.{Actor, ActorLogging, OneForOneStrategy, Props, SupervisorStrategy, Terminated}

object TodoSupervisor {

  def props(maxRetries: Int, timeRange: Duration) = Props(new TodoSupervisor(maxRetries, timeRange))

  final case class RegistrationCommand(props: Props)

}

class TodoSupervisor(maxRetries: Int, timeRange: Duration) extends Actor with ActorLogging {

  import TodoSupervisor._

  // 例外はログに出して継続
  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = maxRetries, withinTimeRange = timeRange) {
      case NonFatal(ex) =>
        log.error(s"supervisor caught error. resume children. error: ${ex.getMessage}")
        Resume
    }

  override def receive: Receive = {
    case RegistrationCommand(props) =>
      log.info("receive registration command.")
      context.watch(context.actorOf(props, TODO_ACTOR_NAME))
      ()
    case cmd: TodoActor.Command =>
      context.child(TODO_ACTOR_NAME).foreach(_ forward cmd)
    case Terminated(child) =>
      log.warning(s"terminated child. path: ${child.path}")
    case unknown =>
      log.error(s"receive unknown type. type: ${unknown.getClass.getName}")
  }

}

終わりに

次回はAPIルートです。