少し前にgoogle dataflowを触ったので、そのことについてまとめていきます。
対象
- scala初心者~
- GCPを多少触ったことがある/今後触ってみたい
- Big Data処理に興味がある
dataflow とは?
- データのバッチ処理/ストリーミング処理をやってくれる
- 処理に応じて自動でワーカーをスケールしてくれる
- Apache Beam(Java, Python, Go向けのSDK)で動いている
- GCSやBigQueryなどの他のGCPとの連携が容易にできる
scioとは?
- Apache BeamのJava SDKのコレクションなどをラップしたもの
- mapなどのコレクション操作の関数が実装されており、scalaでコレクション操作をするのと同じように処理を記載できる
なぜscioなのか?
- Python, Go向けのSDKはJava向けのSDKと比べて機能が少ない
- Java向けのSDKに関しては書き方が特殊(公式のexampleより)
使用例
共通部分
pipelineを作成するためのoptionを生成するロジックです。ここの情報をもとに、どのGCP projectにデプロイするのか、またどのようなスペックで動かすのかなどが設定されます。
今回は通常のpipelineに加え、cloud sqlも利用するため、DataflowPipelineOptions
とCloudSqlOptions
をミックスインさせたtraitを独自に作成しています。
空欄の部分に関しては個々人のprojectの情報を埋めていただければと思います。
安定運用するにはtemplateを使ったほうが良いかもしれませんが、それは導入するときに考えれば良いかもしれません。
object DataflowOptionFactory {
def createDefaultOption: DataflowOption = {
val options = PipelineOptionsFactory.as(classOf[DataflowOption])
options.setProject("")
options.setRunner(classOf[DataflowRunner])
options.setZone("us-central1-f")
options.setNumWorkers(1) //初期のworker数の指定
options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.NONE) //auto scalingの設定。今回は簡単な例なので切っています。
options.setWorkerMachineType("n1-standard-1") //dataflowを動かすmachineのスペック。今回は低スペックで良いので低いものにしています。
options.setCloudSqlInstanceConnectionName("")
options.setCloudSqlDb("")
options.setCloudSqlUsername("")
options.setCloudSqlPassword("")
options
}
}
trait DataflowOption extends DataflowPipelineOptions with CloudSqlOptions
Word Count
ファイル内に含まれる単語数をカウントし、Big Queryに保存します。
@BigQueryType.toTable
case class Output(
word: String,
count: Long)
def main(args: Array[String]): Unit = {
val option = DataflowOptionFactory.createDefaultOption
val inputPath = "" //ここにinputするファイルのpathを入れてください
val outputTable = "" //ここにoutput先のBigQueryのpathを入れてください
val sc = ScioContext(option)
sc.textFile(inputPath)
.flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty))
.countByValue
.map { case (word, count) =>
Output(word = word, count = count)}
.saveAsTypedBigQuery(outputTable, WriteDisposition.WRITE_APPEND, CreateDisposition.CREATE_IF_NEEDED)
sc.close().waitUntilFinish()
}
sc.textFile(inputPath)
の部分で、SCollectionというscioで使われるStreamingのコレクションが作成されます。コレクションの中身はinputPathの一行一行です。
このコレクションの要素に対して、単語単位で分割し、flatMapで潰してあげて単語のSCollectionを作成します。
その後、countByValueにて単語の数を数え、map内でtupleをcase classに変換します。
最後にBigQueryに保存する、といった流れになっています。
BigQueryにはcase classをそのまま保存できるので、特別何かを意識する必要はありません。
Credit Card の不正利用検知
pubsubから受け取ったメッセージをもとにアラートをpubsubに飛ばします。
pubsubからはcloud functionにつなげることもできるので、そこから通知するなり何なりできます。
def main(args: Array[String]): Unit = {
private[this] case class CreditUse(
userId: Long,
amount: Long,
DateTime: String,
shopId: Long)
case class Shop(
shopId: Long,
location: String
)
val option = DataflowOptionFactory.createDefaultOption
val sc = ScioContext(option)
val inputTopic = "" //ここにinputとなるpubsubのトピックをいれてください
val outputTopic = "" //ここにoutputとなるpubsubのとぴっくを入れてください
val jdbcUrl = s"jdbc:mysql://google/${option.getCloudSqlDb}?" +
s"cloudSqlInstance=${option.getCloudSqlInstanceConnectionName}&" +
s"socketFactory=com.google.cloud.sql.mysql.SocketFactory"
val connOption = JdbcConnectionOptions(
username = option.getCloudSqlUsername,
password = Option(option.getCloudSqlPassword),
driverClass = classOf[Driver],
connectionUrl = jdbcUrl)
val shops = sc.jdbcSelect(getShops()(connOption))
.keyBy(_.shopId)
sc.pubsubTopic[String](inputTopic)
.withFixedWindows(Duration.standardMinutes(10))
.map {json =>
decode[CreditUse](json).right.get
}
.keyBy(_.shopId)
.join(shops)
.values
.groupBy(_._1.userId)
.values
.filter(_.size > 2)
.filter(_.exists(_._2.location != "Japan"))
.map(_.head._1)
.saveAsPubsub(outputTopic)
sc.close().waitUntilFinish()
}
こちらはstreaming処理になります。
まず、pubsubからjson形式のメッセージを受取、それをwindowで10分単位に分割していきます。
その後、メッセージをcase classに変換し、cloud sqlから別に取得してきたshop一覧をjoinさせます。
joinさせたあとはuserIdでgroup byし、2回以上利用され、なおかつその中に日本以外の国での利用が含まれていればそれをpubsubに出力します。
まとめ
いかがだったでしょうか?今回はmap, flatMap, filter, groupByなど、scalaを普段から書いている人にとっては馴染み深いものを使って例をあげてみました。
実際にはアプリケーションログやアクセスログの監視などに使われることもあるそうなので、興味を持った方はぜひ一度つかってみてはいかがでしょうか?
また、実際のコードは私のgithubにあげていますので、参考になれば幸いです。
参考
- 株式会社ビズリーチで勉強会でdataflowについて発表しましたスライド