3
0

More than 5 years have passed since last update.

Akka Streams について

Last updated at Posted at 2017-06-30

背景

この記事では、ストリーム処理に関する内容をまとめています。

そもそもストリーム処理はどういう用途で使用されているの?

  1. 無限に発生し続けるデータを処理する場合

  2. 処理が永続的に継続する場合

例えば、Beaconなどから検知して送られてくる位置情報のストリームデータがあれば、仮に対象範囲外の位置データは必要とせず、処理しないとする場合に、必要なデータのみを保管し、それ以外のデータは価値がないのでフィルタリングするケースがあると思います。そのような時に、無限かつ流動的に発生するデータをリアルタイムで処理する為にストリーム処理を使用します。

課題を解決するために

Akka Actor

リアルタイムの処理をするための一つの解決策として、Akka Actorが挙げられます。
(Akka Actorに関しては、次回の記事で詳細にまとめていきます。)

What is Akka Actor

アクターと呼ばれるメッセージの送受信出来るオブジェクトが、非同期でお互いにやりとりするという仕組みです。しかし、Actor間のメッセージングを安定して実現する事は難しいです。

その一つの要因として

  1. Publisher側の処理が早く完了する場合に、Subscriber側のバッファーが溢れてしまう。

  2. Subscriberに対して、Publisher側の処理を抑えた場合は、その分だけ無駄になる

Akka Stream

そこで、Akka Streamを提案します。主な理由としては、

  1. Back Pressureによりバッファ溢れせず、性能を最大限まで引き出す事ができる。
  2. 構成要素のInput/Outputに型がついている
  3. マイクロサービス化の実現

Back Pressureとは

Subscriberが自分が処理できる量をPublisherにリクエストを送ることで、無駄なくSubscriberが処理できる量を処理する仕組み。

Subscriberが自分が処理できる量をPublishserに伝え、PublisherはSubscriberから伝えられた情報に合わせて、適切な量のメッセージをSubscriber側に送り、これによってSubscriberを溢れさせず、Subscriberのリソースを最大限に使って動作させることが可能となります。
BackPressure自体はakka-streamが勝手に裏側で制御してくれているので、特に制御について意識する必要はないです。

実際にakka streamsを実装してみた

Source

データの出力を行う。アウトプットチャンネルをひとつだけ持つ。(Publisher)

Flow

インプットとアウトプットのチャンネルをひとつずつ持つ。Sourceで取得したデータをフィルタリングの処理を実装したりする。

Sink

データの受信側。(Subscriber) インプットのチャンネルをひとつだけ持つ。

RunnableGraph

Source + Flow + Sink の要素を3つを連結したグラフをRunnableGraphと言います。
データを処理する一連の流れを形態を表す物になります。

ソースコード

  • フローの流れ
    • webページのスクレイピング → 保存した画像サイズフィルター → 1000*1000以下のimgファイル削除
class ScrapeStream extends ImgHelper{

  implicit val system = ActorSystem("ScrapeSystem")
  implicit val materializer = ActorMaterializer()
  implicit val ec: ExecutionContext = ExecutionContext.Implicits.global

  val targetUrls: List[String] = List (
    // スクレーピングしたいページのURL
    "http://hoge.com"
  )

  //webページのスクレイピング
  val source = Source(targetUrls).mapAsyncUnordered(3) { url => scrape(url)}

  //imgサイズフィルター
  val step = Flow[List[File]].mapAsyncUnordered(3) { files =>
    Future(for {
        file <- files
        if (check(file))} yield file)
  }

  //imgファイルの移動
  val sink = Sink.foreachParallel[List[File]](3) { files => files.map(file => delete(file))}

  val graph = source via step to sink

  def executeScrape(): Unit = {
    graph.run()
  }
}

trait ImgHelper {

def delete(file: File) = {
 file.delete()
}

def scrape(url: String) = Future {
  val img = JsoupBrowser().get(url) >?> elementList("img")
  img match {
    case Some(l) => for {e <- l} yield {save(e.attr("src"))}
    case None => List.empty[File]
  }
}

private def save(url: String) = {
  val file = new File("保存したいファイルパス")
  Resource.fromFile(file).write(Resource.fromURL(url).byteArray)
  file
}

def check(file: File) = {
  val image = ImageIO.read(file)
  image.getHeight() <= 1000 && image.getWidth() <= 1000
}

まとめ

Source、Flow、Sinkの独立した要素として実装することで、各々がマイクロサービス化されていることもメリットの一つかと思います。
お互いのフローに影響を与えずに、Sourceを「S3からのファイル読み込み」に書き換えることや、Sinkの部分を「別サーバへのファイルバックアップ処理」などに変更することも出来ます。
それぞれの要素のInput/Outputの型を合わせるだけで、処理要素を追加したり、入れ替えたりすることが容易に書き換えることが出来るのも、akka streamsのメリットかと思います。

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0