Help us understand the problem. What is going on with this article?

scalaではじめるGoogle Dataflow入門

少し前にgoogle dataflowを触ったので、そのことについてまとめていきます。

対象

  • scala初心者~
  • GCPを多少触ったことがある/今後触ってみたい
  • Big Data処理に興味がある

dataflow とは?

  • データのバッチ処理/ストリーミング処理をやってくれる
  • 処理に応じて自動でワーカーをスケールしてくれる
  • Apache Beam(Java, Python, Go向けのSDK)で動いている
  • GCSやBigQueryなどの他のGCPとの連携が容易にできる

scioとは?

  • Apache BeamのJava SDKのコレクションなどをラップしたもの
  • mapなどのコレクション操作の関数が実装されており、scalaでコレクション操作をするのと同じように処理を記載できる

なぜscioなのか?

  • Python, Go向けのSDKはJava向けのSDKと比べて機能が少ない
  • Java向けのSDKに関しては書き方が特殊(公式のexampleより)

使用例

共通部分

pipelineを作成するためのoptionを生成するロジックです。ここの情報をもとに、どのGCP projectにデプロイするのか、またどのようなスペックで動かすのかなどが設定されます。
今回は通常のpipelineに加え、cloud sqlも利用するため、DataflowPipelineOptionsCloudSqlOptionsをミックスインさせたtraitを独自に作成しています。
空欄の部分に関しては個々人のprojectの情報を埋めていただければと思います。
安定運用するにはtemplateを使ったほうが良いかもしれませんが、それは導入するときに考えれば良いかもしれません。

DataflowOptionFactory.createDefaultOption
object DataflowOptionFactory {

  def createDefaultOption: DataflowOption = {

    val options = PipelineOptionsFactory.as(classOf[DataflowOption])

    options.setProject("")
    options.setRunner(classOf[DataflowRunner])
    options.setZone("us-central1-f")
    options.setNumWorkers(1) //初期のworker数の指定
    options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.NONE) //auto scalingの設定。今回は簡単な例なので切っています。
    options.setWorkerMachineType("n1-standard-1") //dataflowを動かすmachineのスペック。今回は低スペックで良いので低いものにしています。
    options.setCloudSqlInstanceConnectionName("")
    options.setCloudSqlDb("")
    options.setCloudSqlUsername("")
    options.setCloudSqlPassword("")

    options
  }
}

trait DataflowOption extends DataflowPipelineOptions with CloudSqlOptions

Word Count

ファイル内に含まれる単語数をカウントし、Big Queryに保存します。

WordCount.scala
@BigQueryType.toTable
case class Output(
                 word: String,
                 count: Long)

def main(args: Array[String]): Unit = {

  val option = DataflowOptionFactory.createDefaultOption
  val inputPath = "" //ここにinputするファイルのpathを入れてください
  val outputTable = "" //ここにoutput先のBigQueryのpathを入れてください
  val sc = ScioContext(option)

  sc.textFile(inputPath)
    .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty))
    .countByValue
    .map { case (word, count) =>
        Output(word = word, count = count)}
    .saveAsTypedBigQuery(outputTable, WriteDisposition.WRITE_APPEND, CreateDisposition.CREATE_IF_NEEDED)

  sc.close().waitUntilFinish()
}

sc.textFile(inputPath)の部分で、SCollectionというscioで使われるStreamingのコレクションが作成されます。コレクションの中身はinputPathの一行一行です。
このコレクションの要素に対して、単語単位で分割し、flatMapで潰してあげて単語のSCollectionを作成します。
その後、countByValueにて単語の数を数え、map内でtupleをcase classに変換します。
最後にBigQueryに保存する、といった流れになっています。

BigQueryにはcase classをそのまま保存できるので、特別何かを意識する必要はありません。

Credit Card の不正利用検知

pubsubから受け取ったメッセージをもとにアラートをpubsubに飛ばします。
pubsubからはcloud functionにつなげることもできるので、そこから通知するなり何なりできます。

CreditCardAlert.scala
def main(args: Array[String]): Unit = {

  private[this] case class CreditUse(
                      userId: Long,
                      amount: Long,
                      DateTime: String,
                      shopId: Long)

  case class Shop(
                               shopId: Long,
                               location: String
                               )

  val option = DataflowOptionFactory.createDefaultOption
  val sc = ScioContext(option)
  val inputTopic = "" //ここにinputとなるpubsubのトピックをいれてください
  val outputTopic = "" //ここにoutputとなるpubsubのとぴっくを入れてください

  val jdbcUrl = s"jdbc:mysql://google/${option.getCloudSqlDb}?" +
    s"cloudSqlInstance=${option.getCloudSqlInstanceConnectionName}&" +
    s"socketFactory=com.google.cloud.sql.mysql.SocketFactory"

  val connOption = JdbcConnectionOptions(
    username = option.getCloudSqlUsername,
    password = Option(option.getCloudSqlPassword),
    driverClass = classOf[Driver],
    connectionUrl = jdbcUrl)

  val shops = sc.jdbcSelect(getShops()(connOption))
    .keyBy(_.shopId)

  sc.pubsubTopic[String](inputTopic)
    .withFixedWindows(Duration.standardMinutes(10))
    .map {json =>
      decode[CreditUse](json).right.get
    }
    .keyBy(_.shopId)
    .join(shops)
    .values
    .groupBy(_._1.userId)
    .values
    .filter(_.size > 2)
    .filter(_.exists(_._2.location != "Japan"))
    .map(_.head._1)
    .saveAsPubsub(outputTopic)

  sc.close().waitUntilFinish()
}

こちらはstreaming処理になります。
まず、pubsubからjson形式のメッセージを受取、それをwindowで10分単位に分割していきます。
その後、メッセージをcase classに変換し、cloud sqlから別に取得してきたshop一覧をjoinさせます。
joinさせたあとはuserIdでgroup byし、2回以上利用され、なおかつその中に日本以外の国での利用が含まれていればそれをpubsubに出力します。

まとめ

いかがだったでしょうか?今回はmap, flatMap, filter, groupByなど、scalaを普段から書いている人にとっては馴染み深いものを使って例をあげてみました。
実際にはアプリケーションログやアクセスログの監視などに使われることもあるそうなので、興味を持った方はぜひ一度つかってみてはいかがでしょうか?

また、実際のコードは私のgithubにあげていますので、参考になれば幸いです。

参考

  • 株式会社ビズリーチで勉強会でdataflowについて発表しましたスライド
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away