背景
この記事では、ストリーム処理に関する内容をまとめています。
そもそもストリーム処理はどういう用途で使用されているの?
無限に発生し続けるデータを処理する場合
処理が永続的に継続する場合
例えば、Beaconなどから検知して送られてくる位置情報のストリームデータがあれば、仮に対象範囲外の位置データは必要とせず、処理しないとする場合に、必要なデータのみを保管し、それ以外のデータは価値がないのでフィルタリングするケースがあると思います。そのような時に、無限かつ流動的に発生するデータをリアルタイムで処理する為にストリーム処理を使用します。
課題を解決するために
Akka Actor
リアルタイムの処理をするための一つの解決策として、Akka Actorが挙げられます。
(Akka Actorに関しては、次回の記事で詳細にまとめていきます。)
What is Akka Actor
アクターと呼ばれるメッセージの送受信出来るオブジェクトが、非同期でお互いにやりとりするという仕組みです。しかし、Actor間のメッセージングを安定して実現する事は難しいです。
その一つの要因として
Publisher側の処理が早く完了する場合に、Subscriber側のバッファーが溢れてしまう。
Subscriberに対して、Publisher側の処理を抑えた場合は、その分だけ無駄になる
Akka Stream
そこで、Akka Streamを提案します。主な理由としては、
- Back Pressureによりバッファ溢れせず、性能を最大限まで引き出す事ができる。
- 構成要素のInput/Outputに型がついている
- マイクロサービス化の実現
Back Pressureとは
Subscriberが自分が処理できる量をPublisherにリクエストを送ることで、無駄なくSubscriberが処理できる量を処理する仕組み。
Subscriberが自分が処理できる量をPublishserに伝え、PublisherはSubscriberから伝えられた情報に合わせて、適切な量のメッセージをSubscriber側に送り、これによってSubscriberを溢れさせず、Subscriberのリソースを最大限に使って動作させることが可能となります。
BackPressure自体はakka-streamが勝手に裏側で制御してくれているので、特に制御について意識する必要はないです。
実際にakka streamsを実装してみた
Source
データの出力を行う。アウトプットチャンネルをひとつだけ持つ。(Publisher)
Flow
インプットとアウトプットのチャンネルをひとつずつ持つ。Sourceで取得したデータをフィルタリングの処理を実装したりする。
Sink
データの受信側。(Subscriber) インプットのチャンネルをひとつだけ持つ。
RunnableGraph
Source + Flow + Sink の要素を3つを連結したグラフをRunnableGraphと言います。
データを処理する一連の流れを形態を表す物になります。
ソースコード
- フローの流れ
- webページのスクレイピング → 保存した画像サイズフィルター → 1000*1000以下のimgファイル削除
class ScrapeStream extends ImgHelper{
implicit val system = ActorSystem("ScrapeSystem")
implicit val materializer = ActorMaterializer()
implicit val ec: ExecutionContext = ExecutionContext.Implicits.global
val targetUrls: List[String] = List (
// スクレーピングしたいページのURL
"http://hoge.com"
)
//webページのスクレイピング
val source = Source(targetUrls).mapAsyncUnordered(3) { url => scrape(url)}
//imgサイズフィルター
val step = Flow[List[File]].mapAsyncUnordered(3) { files =>
Future(for {
file <- files
if (check(file))} yield file)
}
//imgファイルの移動
val sink = Sink.foreachParallel[List[File]](3) { files => files.map(file => delete(file))}
val graph = source via step to sink
def executeScrape(): Unit = {
graph.run()
}
}
trait ImgHelper {
def delete(file: File) = {
file.delete()
}
def scrape(url: String) = Future {
val img = JsoupBrowser().get(url) >?> elementList("img")
img match {
case Some(l) => for {e <- l} yield {save(e.attr("src"))}
case None => List.empty[File]
}
}
private def save(url: String) = {
val file = new File("保存したいファイルパス")
Resource.fromFile(file).write(Resource.fromURL(url).byteArray)
file
}
def check(file: File) = {
val image = ImageIO.read(file)
image.getHeight() <= 1000 && image.getWidth() <= 1000
}
まとめ
Source、Flow、Sinkの独立した要素として実装することで、各々がマイクロサービス化されていることもメリットの一つかと思います。
お互いのフローに影響を与えずに、Sourceを「S3からのファイル読み込み」に書き換えることや、Sinkの部分を「別サーバへのファイルバックアップ処理」などに変更することも出来ます。
それぞれの要素のInput/Outputの型を合わせるだけで、処理要素を追加したり、入れ替えたりすることが容易に書き換えることが出来るのも、akka streamsのメリットかと思います。