Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
OrganizationEventAdvent CalendarQiitadon (β)
Qiita JobsQiita ZineQiita Blog
Help us understand the problem. What are the problem?

More than 3 years have passed since last update.






  • IntelliJ 2018
  • Scala 2.12.5
  • sbt 1.1.4
  • Akka 2.5.11
  • AkkaHttp 10.1.0
  • Slick 3.2.3
  • Scalaz 7.2.19




akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "INFO"
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"

  actor {
    provider = "akka.actor.LocalActorRefProvider"



    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <!-- encoders are assigned the type
     ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %m%n</pattern>

    <logger name="todo.api" level="INFO"/>

    <root level="INFO">
        <appender-ref ref="STDOUT"/>

APIルート(AkkaHttp)からAskでコマンドを受け取り、レポジトリの処理結果をakka.pattern.pipeで返答するイメージですが、Future[\(バックスラッシュ)/[Throwable, T]]をそのまま返すのは個人的にいまいち感があるので、Futureの結果がLeftだった場合はakka.actor.Status.Failureで返して、AkkaHttpのExceptionHandlerで掴めるようにしてみます。

package todo.api.actor

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

import akka.actor.{Actor, ActorRef, Status}
import akka.pattern.PipeToSupport
import scalaz.{-\/, \/, \/-}

// Future[\/[L, R]]をpipeで送れるようにサポート
trait EitherPipeToSupport extends PipeToSupport {

  final class EitherPipeableFuture[L <: Throwable, R](val future: Future[\/[L, R]])(
      implicit ec: ExecutionContext) {

    // EitherがLeftの場合はakka.actor.Status.Failureへ変換
    def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = Actor.noSender): Future[\/[L, R]] =
      future andThen {
        case Success(\/-(r)) =>
          recipient ! r
        case Success(-\/(f)) =>
          recipient ! Status.Failure(f)
        case Failure(f) =>
          recipient ! Status.Failure(f)

    def to(recipient: ActorRef): EitherPipeableFuture[L, R] = to(recipient, Actor.noSender)

    def to(recipient: ActorRef, sender: ActorRef): EitherPipeableFuture[L, R] = {


  // Future[\/[L, R]] -> EitherPipeableFuture
  implicit def eitherPipe[L <: Throwable, R](future: Future[\/[L, R]])(
      implicit ec: ExecutionContext): EitherPipeableFuture[L, R] = new EitherPipeableFuture(future)


また、レポジトリが返すFuture[\(バックスラッシュ)/[Throwable, T]]の文脈を保持したまま処理したいのでEitherT(モナドトランスフォーマー)を利用します。

package todo.api.actor

import scala.concurrent.{ExecutionContext, Future}

import akka.actor.{Actor, ActorLogging, Props}
import scalaz.{EitherT, \/}
import scalaz.std.scalaFuture.futureInstance

import todo.api.repository.TodoRepository

object TodoActor {

  def props(todoRepository: TodoRepository) = Props(new TodoActor(todoRepository))

  // 受信系
  sealed trait Command

  final case object FindAllCommand extends Command

  final case class FindByIdCommand(id: Int) extends Command

  final case class CreateCommand(body: String) extends Command

  final case class UpdateCommand(id: Int, body: String) extends Command

  final case class DeleteCommand(id: Int) extends Command

  // 返信系
  sealed trait Reply

  final case class TodoReply(id: Int, body: String) extends Reply

  final case class CreatedReply(id: Int) extends Reply

  final case object UpdatedReply extends Reply

  final case object DeletedReply extends Reply


// TodoRepositoryをインジェクション
class TodoActor(todoRepository: TodoRepository)
    extends Actor
    with ActorLogging
    with EitherPipeToSupport {

  import TodoActor._
  import todo.api.repository.Model._

  implicit val executor: ExecutionContext = context.dispatcher

  // Future[\/[A, B]] -> EitherT[Future, A, B]
  implicit class RichFutureEither[A, B](self: Future[\/[A, B]]) {

    def toEitherT: EitherT[Future, A, B] = EitherT[Future, A, B](self)


  private def findAll() =
    for {
      todos <- todoRepository.findAll().toEitherT
    } yield todos.map(t => TodoReply(t.id, t.body))

  private def findById(cmd: FindByIdCommand) =
    for {
      todo <- todoRepository.findById(cmd.id).toEitherT
    } yield todo.map(t => TodoReply(t.id, t.body))

  private def create(cmd: CreateCommand) =
    for {
      createdId <- todoRepository.create(Todo(0, cmd.body)).toEitherT
    } yield CreatedReply(createdId)

  private def update(cmd: UpdateCommand) =
    for {
      _ <- todoRepository.update(Todo(cmd.id, cmd.body)).toEitherT
    } yield UpdatedReply

  private def delete(cmd: DeleteCommand) =
    for {
      _ <- todoRepository.delete(cmd.id).toEitherT
    } yield DeletedReply

  override def preStart(): Unit = log.info("starting todo actor.")

  override def postStop(): Unit = log.info("stopping todo actor.")

  override def receive: Receive = {
    case FindAllCommand =>
      log.info("receive find all command.")
      eitherPipe(findAll().run) to sender()
    case cmd: FindByIdCommand =>
      log.info("receive find by id command.")
      eitherPipe(findById(cmd).run) to sender()
    case cmd: CreateCommand =>
      log.info("receive create command.")
      eitherPipe(create(cmd).run) to sender()
    case cmd: UpdateCommand =>
      log.info("receive update command.")
      eitherPipe(update(cmd).run) to sender()
    case cmd: DeleteCommand =>
      log.info("receive delete command.")
      eitherPipe(delete(cmd).run) to sender()
    case unknown =>
      log.error(s"receive unknown type. type: ${unknown.getClass.getName}")





package todo.api

package object actor {

  final val TODO_ACTOR_NAME = "todo"



package todo.api.actor

import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

import akka.actor.SupervisorStrategy.Resume
import akka.actor.{Actor, ActorLogging, OneForOneStrategy, Props, SupervisorStrategy, Terminated}

object TodoSupervisor {

  def props(maxRetries: Int, timeRange: Duration) = Props(new TodoSupervisor(maxRetries, timeRange))

  final case class RegistrationCommand(props: Props)


class TodoSupervisor(maxRetries: Int, timeRange: Duration) extends Actor with ActorLogging {

  import TodoSupervisor._

  // 例外はログに出して継続
  override def supervisorStrategy: SupervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = maxRetries, withinTimeRange = timeRange) {
      case NonFatal(ex) =>
        log.error(s"supervisor caught error. resume children. error: ${ex.getMessage}")

  override def receive: Receive = {
    case RegistrationCommand(props) =>
      log.info("receive registration command.")
      context.watch(context.actorOf(props, TODO_ACTOR_NAME))
    case cmd: TodoActor.Command =>
      context.child(TODO_ACTOR_NAME).foreach(_ forward cmd)
    case Terminated(child) =>
      log.warning(s"terminated child. path: ${child.path}")
    case unknown =>
      log.error(s"receive unknown type. type: ${unknown.getClass.getName}")




Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Help us understand the problem. What are the problem?