LoginSignup
2

More than 5 years have passed since last update.

Gearpumpの最小アプリケーションの構成は?

Posted at

こんにちは。

前回サンプルのアプリケーションをデプロイして動作させてみましたが、実際にそれがどのようなコードで構成されているかを見てみます。

デプロイしたアプリケーションのソースコード構成

GitHubより、デプロイしたアプリケーションは下記のソースコードから構成されます。

  • WordCount.scala
    • コマンドライン設定と環境設定を読み込み、SplitとSumを生成してクラスタにアプリケーションをデプロイ
  • Split.scala
    • 固定の文章を生成し、単語に分割して下流に送信
  • Sum.scala
    • 単語を含んだメッセージを受け取り、単語のカウントを実施

1アプリケーションが1クラス、1Processorが1クラスに対応しているのでわかりやすい構成ではありますね。

では、アプリケーションのソースから実際に何をしているかを見てみます。

アプリケーションデプロイコード

WordCount.scalaは下記になります。
ちょっとソースコードと解説が前後してわかりにくいかもしれませんが、ソースコード中にコメントでナンバリングして読む内容を対応づけています。

上から何をしているかを読んでいきます。

1. 起動時パラメータの取得

まずoptions変数で起動時のコマンドラインからオプションを読み込む内容について定義しています。
起動時、下記のような形式で設定を記述することで、アプリケーション起動時に設定を追加することが出来るようです。
remainArgについてはぱっと見わからなかったのでまた今度に。

-option1 value1 -option2 value3 -flag1 -flag2 remainArg1 remainArg2...
2. Processorの生成

起動時のCLIから読み込んだ設定項目を用いてProcessorの生成を行っています。
Processorの並列度もこのタイミングで指定しているようです。

3. Partitionerの生成

メッセージを下流に流す際にProcessorが複数存在した場合にどこのProcessorに流すかを決めるPartitionerの生成を行っています。
HashPartitionerということで、メッセージの値のハッシュ値を基に送信先のProcessorの送信先を決めるもののようです。
今回の場合WordCountですので、同じ単語が同じProcessorに到達するようにしているわけですね。

4. ProcessorとPartitionerを用いてDAGを作成

これまで生成したProcessorとPartitionerを用いてDAGを生成し、Applicationとしています。

その中で、DAGを非常に直観的に下記のような記述で生成可能になっています。

Graph(split ~ partitioner ~> sum)

io.gearpump.util.Graphクラスでパースをしているのですが、「Node ~ Edge ~> Node」という形で解釈されています。
で、Edgeの箇所にPartitionerを入れている形になりますね。

5. System設定読み込み

System Properties > -Dgearpump.config.file > gear.conf > geardefault.conf > reference.confという優先度で
プロパティを読み込み、クラスタ構成やシステム設定として用いているようです。

6. デプロイするアプリケーションを生成

クラスタにデプロイするアプリケーションのオブジェクトをここで生成しています。
定義とDAGを合わせている形ですね。

7. クラスタにSubmit

クラスタにアプリケーションをSubmit。
このタイミングまではgearコマンドを実行したローカルで実行されているようではあります。

8. デプロイするプログラムを停止

クラスタにデプロイした後はクラスタ上での処理に移行するため、
デプロイするクライアントプログラムをここで停止しています。

WordCount.scala
package io.gearpump.streaming.examples.wordcount

import io.gearpump.cluster.local.LocalCluster
import io.gearpump.streaming.{StreamApplication, Processor}
import io.gearpump.cluster.UserConfig
import io.gearpump.cluster.client.ClientContext
import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
import io.gearpump.partitioner.HashPartitioner
import io.gearpump.util.Graph.Node
import io.gearpump.util.{AkkaApp, Graph, LogUtil}
import org.slf4j.Logger

object WordCount extends AkkaApp with ArgumentsParser {
  private val LOG: Logger = LogUtil.getLogger(getClass)
  val RUN_FOR_EVER = -1

  // 1. 起動時のCLIから読み込む項目と形式、注釈、必須/オプショナル、デフォルト値を定義
  override val options: Array[(String, CLIOption[Any])] = Array(
    "split" -> CLIOption[Int]("<how many split tasks>", required = false, defaultValue = Some(1)),
    "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1))
    )

  def application(config: ParseResult) : StreamApplication = {
    // 2. CLIから読み込んだ設定項目を用いてProcessorを生成
    val splitNum = config.getInt("split")
    val sumNum = config.getInt("sum")
    val split = Processor[Split](splitNum)
    val sum = Processor[Sum](sumNum)
    // 3. メッセージのProcessor間の割り振りを行うPartitionerを生成
    val partitioner = new HashPartitioner
    // 4. ProcessorとPartitionerを用いてDAGを作成
    val app = StreamApplication("wordCount", Graph(split ~ partitioner ~> sum), UserConfig.empty)
    app
  }

  // 5. System Properties > -Dgearpump.config.file > gear.conf > geardefault.conf > reference.conf
  // という優先度で読み込んだ設定オブジェクトConfigを用いて初期化
  override def main(akkaConf: Config, args: Array[String]): Unit = {
    val config = parse(args)

    val localCluster = if (System.getProperty("DEBUG") != null) {
      val cluster = new LocalCluster(akkaConf: Config)
      cluster.start
      Some(cluster)
    } else {
      None
    }

    val context: ClientContext = localCluster match {
      case Some(local) => local.newClientContext
      case None => ClientContext(akkaConf)
    }

    // 6. デプロイするアプリケーションを生成
    val app = application(config)
    // 7. クラスタにSubmit
    context.submit(app)
    // 8. デプロイするプログラムを停止
    context.close()
    localCluster.map(_.stop)
  }
}

Processorコード

上記でクラスタにデプロイするアプリケーション全体を組む個所がわかったので、あとは個々のProcessorを見ていきます。

Split.scala

Split、つまり文書と単語を生成するProcessorのコードは下記のようになっていました。

同様にナンバリングをつけています。

1. 自分自身にStartメッセージを通知

起動したタイミングで自分自身にStartメッセージを通知しています。
これは、外部からメッセージを取得しない、かつ自分が起点となるProcessorであるため、実行しているようです。
この処理を行っておくことで、起動後にonNextのメソッドが実行され、動き出す形になります。

2. 文章を単語に分割し、空文字を除去した上で下流に送信

メッセージ受信時、固定の文章を単語に分割し、空文字を除去した上で下流に送信しています。
Split Processorが1回実行されると数十回のメッセージが送信される形になりますね。

3. 次メッセージを自分に対して送信するタスクを仕掛ける

メッセージ送信処理完了後、自分自身に対して一定時間後に自分にメッセージを送信するようタスクを仕掛けます。
こうすることで、一定時間後に自分自身にメッセージが通知され、再度onNextメソッドが実行される流れになります。

Split.scala
package io.gearpump.streaming.examples.wordcount

import java.util.concurrent.TimeUnit

import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
import io.gearpump.Message
import io.gearpump.cluster.UserConfig

class Split(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
  import taskContext.{output, self}

  // 1. 自分自身にStartメッセージを通知。

  override def onStart(startTime : StartTime) : Unit = {
    self ! Message("start")
  }

  // 2. 文章を単語に分割し、空文字を除去した上で下流に送信
  override def onNext(msg : Message) : Unit = {
    Split.TEXT_TO_SPLIT.lines.foreach { line =>
      line.split("[\\s]+").filter(_.nonEmpty).foreach { msg =>
        output(new Message(msg, System.currentTimeMillis()))
      }
    }

    // 3. 次メッセージを自分に対して送信するタスクを仕掛ける
    import scala.concurrent.duration._
    taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self ! Message("continue", System.currentTimeMillis()))
  }
}

object Split {
  val TEXT_TO_SPLIT =
    """
      |   Licensed to the Apache Software Foundation (ASF) under one
      |   or more contributor license agreements.  See the NOTICE file
      |   distributed with this work for additional information
      |   regarding copyright ownership.  The ASF licenses this file
      |   to you under the Apache License, Version 2.0 (the
      |   "License"); you may not use this file except in compliance
      |   with the License.  You may obtain a copy of the License at
      |
      |       http://www.apache.org/licenses/LICENSE-2.0
      |
      |   Unless required by applicable law or agreed to in writing, software
      |   distributed under the License is distributed on an "AS IS" BASIS,
      |   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      |   See the License for the specific language governing permissions and
      |   limitations under the License.
    """.stripMargin
}

Sum.scala

Sum、つまり単語をカウントするProcessorは下記のようになっています。

同様にナンバリングをつけています。
Sumが一番単純な構成になりますね。

1. 起動時に状態出力タスクを仕掛ける

起動時に受信数を出力するタスクを定期的に実行するタスクを仕掛けています。

2. 受信したメッセージから単語を取得し、総受信回数と単語ごとの受信回数をカウント

受信したメッセージから単語を抽出して送受信回数と単語ごとの受信回数をマップに保存しています。

3. Processor停止時に状態出力タスクを停止

Processor停止時に状態出力タスクをキャンセルしています。

4. 状態出力タスクにて、前回との差分を基にスループットを算出し、出力

状態出力タスク(メソッド)で、前回との差分を基にスループットを算出し、出力しています。

Sum.scala

package io.gearpump.streaming.examples.wordcount

import java.util.concurrent.TimeUnit

import akka.actor.Cancellable
import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
import io.gearpump.Message
import io.gearpump.cluster.UserConfig

import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration

class Sum (taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
  private[wordcount] val map : mutable.HashMap[String, Long] = new mutable.HashMap[String, Long]()

  private[wordcount] var wordCount : Long = 0
  private var snapShotTime : Long = System.currentTimeMillis()
  private var snapShotWordCount : Long = 0

  private var scheduler : Cancellable = null

  override def onStart(startTime : StartTime) : Unit = {
    // 1. 起動時に状態出力タスクを仕掛ける。
    scheduler = taskContext.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
      new FiniteDuration(30, TimeUnit.SECONDS))(reportWordCount)
  }

  override def onNext(msg : Message) : Unit = {
    if (null == msg) {
      return
    }
    // 2. 受信したメッセージから単語を取得し、総受信回数と単語ごとの受信回数をカウント
    val current = map.getOrElse(msg.msg.asInstanceOf[String], 0L)
    wordCount += 1
    map.put(msg.msg.asInstanceOf[String], current + 1)
  }

  // 3. Processor停止時に状態出力タスクを停止
  override def onStop() : Unit = {
    if (scheduler != null) {
      scheduler.cancel()
    }
  }

  // 4. 状態出力タスクにて、前回との差分を基にスループットを算出し、出力
  def reportWordCount() : Unit = {
    val current : Long = System.currentTimeMillis()
    LOG.info(s"Task ${taskContext.taskId} Throughput: ${(wordCount - snapShotWordCount, (current - snapShotTime) / 1000)} (words, second)")
    snapShotWordCount = wordCount
    snapShotTime = current
  }
}

まとめ

Processorと、後はそれをメソッドでつなぐことで非常に直観的にDAGによるストリーム処理を組めるというのは面白いですね。
Processor間はメッセージのみでやり取りがされ、互いに依存していないというのも非常によくわかるコードでした。

後はProcessorの方でもUserConfigが取得できるので、初期化に困るということもなさそうです。

これで基本の基本をまず見てみました。
この後はExampleをあと何個か動かして&ソースを読んだ上で、実際に自前のアプリケーション作成に入っていきます。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2