Edited at

Akka Streams でたこ焼き屋さん


はじめに

最近、Akka Streams に触れる機会があったので、以前勉強会で作成した「Akka Streams でたこ焼き屋さん」をまとめてみた。

なお、今回のコードは下記で公開しています。

https://github.com/imahiro-t/takoyaki/blob/master/src/main/scala/Main.scala


お題


  • たこ焼き屋さんの店員は注文担当、焼き担当、レジ・パッキング担当の3人いる

  • たこ焼きを焼く鉄板には1度に8個焼けるレーンが4レーンある

  • 注文はパック単位で1パック8個入りで、最大6パックまでランダムに注文が入る

  • たこ焼きには1/100の確率で虫が混入し、その場合は、パックのたこ焼きを全て破棄し焼き直す必要がある

  • お客さんを効率よく捌くため最初に鉄板一面(4パック)分を作り置く


実装


Akka Streams を使用しない実装について考えてみる

Akka Streams を使用しない単純な実装としては、注文をインプットに、注文数分たこ焼きを焼き、検品し、パック詰して提供する感じだと思う。

ただ、この場合だと、最初に作り置く要件を満たせないし、例えば3パックの注文が入った際に、焼かないレーンが1レーンあり、仮に次のお客さんが1パックの注文だった場合などに、効率が宜しくない。

なので、最初に作り置く実装を考えてみるが、お客さんには注文分ができたらそこで提供する必要があるため、注文分が揃っているのに作り置きが完了するまでお客さんを待たせるような処理は組めず、焼き担当は別スレッドでの管理となるだろう。

また、注文を受けた際にも、作り置きが完了しているかどうかを確認して待つ必要があるため、焼き担当の別ワーカーに加え、レジ・パッキング担当も別スレッドでの管理となるだろう。

などと、諸々考えてみても、Akka Streams を使用しない実装はかなり大変だということがわかる。


Akka Streams を使用すると


ダイアグラム

で、Akka Streams を使用するとどうなるのかなのだが、まずは、全体のダイアグラムについて考えてみる。

takoyaki_diagram.png


  • 注文を受けた注文担当が焼き担当とレジ・パッキング担当に注文を伝える

  • 焼き担当は注文数分の空パックを用意し、空パック分のたこ焼きを焼いていき、焼き終わったらたこ焼きの入ったパックをレジ・パッキング担当に渡していく


    • 虫が入っていた場合は、レーン毎に焼き直す

    • 最初に4パックたこ焼きを焼いていたり、焼き直しが発生するので注文時の空パックと提供するパックは異なる



  • レジ・パッキング担当は、注文書にしたがってお会計を行った後、注文数分の輪ゴムを用意し、焼き担当から渡されたたこ焼き入りのパックを輪ゴムの数だけパッキングしたらお客さんに商品を提供する


注文を受ける(注文担当)

final case class OrderCount(value: Int)

final case class OrderNumber(value: Int)
final case class Order(number: OrderNumber, count: OrderCount)

object Order {
private val ORDER_MAX = 6
private var number = 0
def create: Order = {
number += 1
Order(OrderNumber(number), OrderCount(scala.util.Random.nextInt(ORDER_MAX) + 1))
}
}

object Main extends App {
lazy private val order: Source[Order, Cancellable] =
Source.tick(2.seconds, (scala.util.Random.nextInt(10) + 1).seconds, Order).map(_ => Order.create)
.wireTap(x => println(s"${x.count.value} pack(s) ordered [${"%010d".format(x.number.value)}]"))
}

注文が一つ目のソースとなるが、今回は簡易にランダムな間隔で注文が入るようにしている。


最初の4つの空パックを用意する(焼き担当)

trait Takoball

final case class NormalTakoball() extends Takoball
final case class BugTakoball() extends Takoball
object Takoball {
def create: Takoball =
if (scala.util.Random.nextInt(100) == 0) BugTakoball() else NormalTakoball()
}
final case class Pack(private val takoballs: Seq[Takoball]) {
def valid: Boolean = takoballs.forall(_.isInstanceOf[NormalTakoball])
override def toString: String = s"[${takoballs.map(_ => "").mkString(",")}]"
}
object Pack {
val MAX_TAKOBALL_COUNT = 8
}
final case class EmptyPack() {
def fill(takoballs: Seq[Takoball]): Pack = Pack(takoballs)
}

object Main extends App {
lazy private val INIT_COOK_COUNT = 4
lazy private val prepareForFirstCook: Source[Seq[EmptyPack], NotUsed] =
Source.single(Seq.fill(INIT_COOK_COUNT)(EmptyPack()))
}

最初に作り置くため、注文とは別のソースより鉄板に4つの空のパックを流す。


空パックを用意する(焼き担当)

object Main extends App {

lazy private val prepareForCook: Flow[Order, Seq[EmptyPack], NotUsed] =
Flow[Order].map(x => List.fill(x.count.value)(EmptyPack()))
}

注文担当から注文が伝えられると、鉄板に注文数分の空のパックを流す。


たこ焼きを焼く(焼き担当)

final case class BugInsideException() extends RuntimeException {

override def printStackTrace(s: PrintWriter): Unit = s.println("sorry... bug inside... retrying to cook...")
}

object Main extends App {
lazy private val fillTakoball: EmptyPack => Pack = emptyPack => {
emptyPack.fill(Seq.fill(Pack.MAX_TAKOBALL_COUNT)(Takoball.create))
}

lazy private val cookPerPack: Flow[EmptyPack, Pack, NotUsed] =
Flow[EmptyPack].flatMapConcat(x => RestartSource.onFailuresWithBackoff(
minBackoff = 1.seconds,
maxBackoff = 2.seconds,
randomFactor = 0.2,
maxRestarts = 20
) {
() => Source.single(x).delay(3.seconds).map(fillTakoball)
.map(x => if (x.valid) x else throw BugInsideException())
})

lazy private val cook: Flow[Seq[EmptyPack], Pack, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
lazy val LANE_SIZE = 4
lazy val preparePack = builder.add(Flow[Seq[EmptyPack]].mapConcat(_.toList))
lazy val balance = builder.add(Balance[EmptyPack](LANE_SIZE).async)
lazy val merge = builder.add(Merge[Pack](LANE_SIZE))
preparePack ~> balance
for (i <- 1 to LANE_SIZE) {
balance ~> cookPerPack.async ~> merge
}
FlowShape(preparePack.in, merge.out)
})
}

4つのレーンに分散してたこ焼きを焼く。焼けたたこ焼きをパックに詰めた後に虫が入っていないかをチェックし、虫が入っていた場合はバックオフでリトライする。

ここでは注文内容を意識せず、用意された空パック分のたこ焼きをひたすら焼いていき、パック詰できたものからパック単位でレジ・パッキング担当に渡していく。(空パックの配列がインプットで、たこ焼きの入ったパックが1個単位でアウトプットになっている。)


お会計する(レジ・パッキング担当)

trait Band

final case class NormalBand() extends Band
final case class TopBand(order: Order) extends Band

object Main extends App {
lazy private val checkout: Flow[Order, Band, NotUsed] =
Flow[Order].mapConcat(x => TopBand(x) +: List.fill(x.count.value - 1)(NormalBand()))
}

お会計した後に、注文数分の輪ゴムを用意する。一つ目の輪ゴムには注文書を付けてパッキングしやすくしておく。


商品を提供する(レジ・パッキング担当)

final case class BufferQueue[T](private var size: Int = 0) {

private var queue: Queue[T] = Queue.empty[T]
def setSize(size: Int): Unit = {
this.size = size
}
def add(value: T): List[T] = {
this.queue = this.queue.enqueue(value)
this.queue.toList
}
def clear(): Unit = {
this.queue = Queue.empty[T]
}
def isEmpty: Boolean = this.queue.isEmpty
def isComplete: Boolean = !isEmpty && this.queue.size == this.size
}

class PackingFlow extends GraphStage[FlowShape[(Band, Pack), Seq[Pack]]] {
val in: Inlet[(Band, Pack)] = Inlet("PackingFlowIn")
val out: Outlet[Seq[Pack]] = Outlet("PackingFlowOut")
override val shape: FlowShape[(Band, Pack), Seq[Pack]] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
private val queue: BufferQueue[Pack] = BufferQueue[Pack]()
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
setHandler(in, new InHandler {
override def onPush(): Unit = {
val grabin = grab(in)
grabin._1 match {
case b: TopBand =>
queue.setSize(b.order.count.value)
case _ =>
()
}
val list = queue.add(grabin._2)
println(s"${Seq.fill(list.size)(".").mkString}")
if (queue.isComplete) {
push(out, list)
queue.clear()
} else {
pull(in)
}
}
})
}
}

object Main extends App {
lazy private val packing: Flow[(Band, Pack), Seq[Pack], NotUsed] = Flow.fromGraph(new PackingFlow)
lazy private val serve: Sink[(Band, Pack), NotUsed] = Flow[(Band, Pack)].via(packing)
.wireTap(x => println(s"serve ${x.size} pack(s) {${x.mkString(" ")}}")).to(Sink.ignore)
}

焼き担当からは、たこ焼きがパック単位で送られてくるので、レジからの輪ゴムもその単位に合わせている。ただ、お客さんには注文したパックが揃ってから提供する必要があるので、カスタムフローを使ってINとOUTを調整する。(パックが注文数に満たない場合は、上流からプッシュされても下流にプッシュせずに、再度上流にプルする。)


各フローをグラフで繋げる

object Main extends App {

lazy private val graph: RunnableGraph[NotUsed] = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
lazy val broadcast = builder.add(Broadcast[Order](2))
lazy val merge = builder.add(Merge[Seq[EmptyPack]](2))
lazy val zip = builder.add(ZipWith[Pack, Band, (Band, Pack)]((x, y) => (y, x)))
order ~> broadcast ~> prepareForCook ~> merge ~> cook ~> zip.in0
prepareForFirstCook ~> merge
broadcast ~> checkout ~> zip.in1
zip.out ~> serve
ClosedShape
})

graph.run()
}

これまでのフローを全て繋げて完成。


さいごに

今回の要件は、Akka Streams を使わずに実装しようとするとかなり大変で、Akka Streams を使うと、登場人物の役割に応じた形で、とてもわかりやすく実装できる。

こういった処理が Akka Streams に向いているのだろうなと思う。

バランスやブロードキャスト、バックオフにバックプレッシャー、カスタムフローなどが良い感じに使われていて、少ないコード量ながら、Akka Streams のサンプルとしてはありな気がしている。