LoginSignup
1
0

More than 5 years have passed since last update.

Akka+AkkaHttp+SlickでTODOアプリAPI(その3)

Last updated at Posted at 2018-04-12

始めに

前回に引き続き、今回はアクター部分を作成します。

構成

  • IntelliJ 2018
  • Scala 2.12.5
  • sbt 1.1.4
  • Akka 2.5.11
  • AkkaHttp 10.1.0
  • Slick 3.2.3
  • Scalaz 7.2.19

成果物

アクター

早速、アクターを作ります。まずはsrc/main/resources/application.confに以下を追記します。

application.conf
akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "INFO"
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"

  actor {
    provider = "akka.actor.LocalActorRefProvider"
  }
}

合わせて、logbackを使っているので以下も追加。

logback.xml
<configuration>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <!-- encoders are assigned the type
     ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %m%n</pattern>
        </encoder>
    </appender>

    <logger name="todo.api" level="INFO"/>

    <root level="INFO">
        <appender-ref ref="STDOUT"/>
    </root>
</configuration>

ルートパッケージ配下にactorパッケージを追加します。
APIルート(AkkaHttp)からAskでコマンドを受け取り、レポジトリの処理結果をakka.pattern.pipeで返答するイメージですが、Future[\(バックスラッシュ)/[Throwable, T]]をそのまま返すのは個人的にいまいち感があるので、Futureの結果がLeftだった場合はakka.actor.Status.Failureで返して、AkkaHttpのExceptionHandlerで掴めるようにしてみます。

EitherPipeToSupport.scala
package todo.api.actor

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

import akka.actor.{Actor, ActorRef, Status}
import akka.pattern.PipeToSupport
import scalaz.{-\/, \/, \/-}

// Future[\/[L, R]]をpipeで送れるようにサポート
trait EitherPipeToSupport extends PipeToSupport {

  final class EitherPipeableFuture[L <: Throwable, R](val future: Future[\/[L, R]])(
      implicit ec: ExecutionContext) {

    // EitherがLeftの場合はakka.actor.Status.Failureへ変換
    def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = Actor.noSender): Future[\/[L, R]] =
      future andThen {
        case Success(\/-(r)) =>
          recipient ! r
        case Success(-\/(f)) =>
          recipient ! Status.Failure(f)
        case Failure(f) =>
          recipient ! Status.Failure(f)
      }

    def to(recipient: ActorRef): EitherPipeableFuture[L, R] = to(recipient, Actor.noSender)

    def to(recipient: ActorRef, sender: ActorRef): EitherPipeableFuture[L, R] = {
      pipeTo(recipient)(sender)
      this
    }

  }

  // Future[\/[L, R]] -> EitherPipeableFuture
  implicit def eitherPipe[L <: Throwable, R](future: Future[\/[L, R]])(
      implicit ec: ExecutionContext): EitherPipeableFuture[L, R] = new EitherPipeableFuture(future)

}

上記をミックスインしてアクターを作りますが、APIルートとレポジトリには依存関係を持たせたくないのでアクターに中間モデルを定義しています。
また、レポジトリが返すFuture[\(バックスラッシュ)/[Throwable, T]]の文脈を保持したまま処理したいのでEitherT(モナドトランスフォーマー)を利用します。

TodoActor.scala
package todo.api.actor

import scala.concurrent.{ExecutionContext, Future}

import akka.actor.{Actor, ActorLogging, Props}
import scalaz.{EitherT, \/}
import scalaz.std.scalaFuture.futureInstance

import todo.api.repository.TodoRepository

object TodoActor {

  def props(todoRepository: TodoRepository) = Props(new TodoActor(todoRepository))

  // 受信系
  sealed trait Command

  final case object FindAllCommand extends Command

  final case class FindByIdCommand(id: Int) extends Command

  final case class CreateCommand(body: String) extends Command

  final case class UpdateCommand(id: Int, body: String) extends Command

  final case class DeleteCommand(id: Int) extends Command

  // 返信系
  sealed trait Reply

  final case class TodoReply(id: Int, body: String) extends Reply

  final case class CreatedReply(id: Int) extends Reply

  final case object UpdatedReply extends Reply

  final case object DeletedReply extends Reply

}

// TodoRepositoryをインジェクション
class TodoActor(todoRepository: TodoRepository)
    extends Actor
    with ActorLogging
    with EitherPipeToSupport {

  import TodoActor._
  import todo.api.repository.Model._

  implicit val executor: ExecutionContext = context.dispatcher

  // Future[\/[A, B]] -> EitherT[Future, A, B]
  implicit class RichFutureEither[A, B](self: Future[\/[A, B]]) {

    def toEitherT: EitherT[Future, A, B] = EitherT[Future, A, B](self)

  }

  private def findAll() =
    for {
      todos <- todoRepository.findAll().toEitherT
    } yield todos.map(t => TodoReply(t.id, t.body))

  private def findById(cmd: FindByIdCommand) =
    for {
      todo <- todoRepository.findById(cmd.id).toEitherT
    } yield todo.map(t => TodoReply(t.id, t.body))

  private def create(cmd: CreateCommand) =
    for {
      createdId <- todoRepository.create(Todo(0, cmd.body)).toEitherT
    } yield CreatedReply(createdId)

  private def update(cmd: UpdateCommand) =
    for {
      _ <- todoRepository.update(Todo(cmd.id, cmd.body)).toEitherT
    } yield UpdatedReply

  private def delete(cmd: DeleteCommand) =
    for {
      _ <- todoRepository.delete(cmd.id).toEitherT
    } yield DeletedReply

  override def preStart(): Unit = log.info("starting todo actor.")

  override def postStop(): Unit = log.info("stopping todo actor.")

  override def receive: Receive = {
    case FindAllCommand =>
      log.info("receive find all command.")
      eitherPipe(findAll().run) to sender()
      ()
    case cmd: FindByIdCommand =>
      log.info("receive find by id command.")
      eitherPipe(findById(cmd).run) to sender()
      ()
    case cmd: CreateCommand =>
      log.info("receive create command.")
      eitherPipe(create(cmd).run) to sender()
      ()
    case cmd: UpdateCommand =>
      log.info("receive update command.")
      eitherPipe(update(cmd).run) to sender()
      ()
    case cmd: DeleteCommand =>
      log.info("receive delete command.")
      eitherPipe(delete(cmd).run) to sender()
      ()
    case unknown =>
      log.error(s"receive unknown type. type: ${unknown.getClass.getName}")
  }

}

なんだか回りくどい感じもしますが、今の実力ではこれが精一杯・・・。

Supervisor

APIルートとの間にSuperviorを置くので定義します。
まずはTodoActorのアクター名を共有するためにパッケージオブジェクトを作成します。

actor.scala
package todo.api

package object actor {

  final val TODO_ACTOR_NAME = "todo"

}

SupervisorはTodoActorのPropsを受け取り、子アクターとして監視下に置きます。TodoActorで例外が発生すると継続(Resume)を指示します。

TodoSupervisor.scala
package todo.api.actor

import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

import akka.actor.SupervisorStrategy.Resume
import akka.actor.{Actor, ActorLogging, OneForOneStrategy, Props, SupervisorStrategy, Terminated}

object TodoSupervisor {

  def props(maxRetries: Int, timeRange: Duration) = Props(new TodoSupervisor(maxRetries, timeRange))

  final case class RegistrationCommand(props: Props)

}

class TodoSupervisor(maxRetries: Int, timeRange: Duration) extends Actor with ActorLogging {

  import TodoSupervisor._

  // 例外はログに出して継続
  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = maxRetries, withinTimeRange = timeRange) {
      case NonFatal(ex) =>
        log.error(s"supervisor caught error. resume children. error: ${ex.getMessage}")
        Resume
    }

  override def receive: Receive = {
    case RegistrationCommand(props) =>
      log.info("receive registration command.")
      context.watch(context.actorOf(props, TODO_ACTOR_NAME))
      ()
    case cmd: TodoActor.Command =>
      context.child(TODO_ACTOR_NAME).foreach(_ forward cmd)
    case Terminated(child) =>
      log.warning(s"terminated child. path: ${child.path}")
    case unknown =>
      log.error(s"receive unknown type. type: ${unknown.getClass.getName}")
  }

}

終わりに

次回はAPIルートです。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0