LoginSignup
4
4

More than 3 years have passed since last update.

Cloud Dataflow で Cloud Firestore に書き込む

Last updated at Posted at 2020-08-24

初めに

Google Cloud の Dataflow を使って、Cloud Pub/Sub のデータを Firestore にリアルタイムにストリーム処理で格納してみます。使用言語は Python3 です。

各サービスの詳細な説明は省かせていただきます。それぞれの製品に関する専門用語は解説なして利用させていただきます。ご了承ください。

今回登場するGoogle Cloud のサービス紹介

以下に簡単にですが今回利用する Google Cloud のサービスをご紹介します。

Cloud Pub/Sub

メッセージ配信サービスです、メッセージと呼ばれるデータを送信するパブリッシャーとサブスクライバーを仲介するサービスです。大量のメッセージの受信と配信を非同期にすることができます。リアルタイムに発生するログデータやIoTのデバイスデータなどを受けるサービスとして使われがちです。

Cloud Dataflowとは

Python と Java のプログラムを分散処理させるサービスです。Apache Beam という OSS フレームワークでプログラムは書きます。バッチ処理もストリーム処理も実装できるのが特徴です。分析基盤で大量データのETL処理などのデータパイプラインを実装するのに使われがちです。

Cloud Firestore

ドキュメント指向型のNoSQL データベースサービスです。モバイルバックエンドの Firebase のデータベースサービスで、モバイルアプリやWebサービスのバックエンドデータベースとして使われがちです。

シナリオ

Google Cloud には、公開データセットとして、ニューヨークのタクシーの移動データが、Cloud Pub/Sub にストリームされており利用することが可能です。今回はこのデータをリアルタイムにFirestore に格納してみたいと思います。

Cloud Pub/Sub から Dataflow でデータを読み込む

初めに、開発するための Cloud Shell を立ち上げます。Google Cloud コンソール画面の右上の「Cloud Shellをアクティブにする」で起動します。プロジェクトがセットされていない場合は、適切なプロジェクトをセットしてください。

$ gcloud config set core/project <PROJECT ID>

以降プロジェクトIDを使う部分は全てにリプレースしています。

Pub/Sub サブスクリプションの作成

まずは、公開データセットのトピックからデータを取得するためのサブスクリプションを作成します。

$ gcloud pubsub subscriptions create taxirides-realtime --topic=projects/pubsub-public-data/topics/taxirides-realtime

確認のために以下のコマンドでデータが取得できるか確認してください。
(取得できるまでに少し時間がかかるかもしれませんので、間隔を開けて何度か実行してください。)

$ gcloud pubsub subscriptions pull projects/<PROJECT ID>/subscriptions/taxirides-realtime

とれたらこんな感じの画面が返ってくると思います。(ちょっと長いですが。。)

┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬──────────────────┬────────────────────────────────────┬──────────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│                                                                                                                              DATA                                                                                                            │    MESSAGE_ID    │             ATTRIBUTES             │ DELIVERY_ATTEMPT │                                                                                               ACK_ID                                                                                  │
├────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼──────────────────┼────────────────────────────────────┼──────────────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ {"ride_id":"a1dfd8be-7ecd-4d03-a3d8-4efde03f89a1","point_idx":605,"latitude":40.68169,"longitude":-73.80504,"timestamp":"2020-08-23T21:15:13.90025-04:00","meter_reading":13.518753,"meter_increment":0.022345046,"ride_status":"enroute","passenger_count":1} │ 1454842611440512 │ ts=2020-08-23T21:15:13.90025-04:00 │                  │ RFAGFixdRkhRNxkIaFEOT14jPzUgKEUUBQgUBXx9SUFPdV5YfGhRDRlyfWByblsXCVBDBXpcURMOb1hbdwdQBxlgTGFXSlsXBAZNV3xZWhoJbFxZdAV5666Zy-SJj2gJOjqqysKVbju43-MlZiI9XxJLLD5-MTFFQV5AEkw6BERJUytDCypYEU4EISE-MD5FUw │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴──────────────────┴────────────────────────────────────┴──────────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

一番左の Data というところに JSON 形式で時間や位置情報や乗客数などが入っていることがわかると思います。

Dataflow で PubSub からデータを読み込む

Pythonの環境構築

適当にフォルダを作ってPythonの仮装環境を作っておき、必要なライブラリ(Apache Beam と Cloud Firestoreのクライアント)をインストールします。。

$ mkdir dataflow_pubsub2firestore
$ cd dataflow_pubsub2firestore
$ virtualenv .env
$ source .env/bin/activate
$ pip install apache_beam[gcp]
$ pip install google-cloud-firestore

Dataflowで実行するコードは次のようになります。

pubsub2firestore1.py
import argparse

import apache_beam as beam

from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

def run(argv=None, save_main_session=True):

    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)

    # streaming=True とすることでDataflowジョブをストリーミングジョブにする
    pipeline_options = PipelineOptions(pipeline_args, streaming=True)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    # ここからがDataflowパイプライン実装する部分
    with beam.Pipeline(options=pipeline_options) as p:

        # ここでPubSubからデータを取得してPCollectionにしている。
        messages = p | 'Read' >> ReadFromPubSub(subscription='projects/<PROJECT ID>/subscriptions/taxirides-realtime').with_output_types(bytes)

        # 文字列に変換して、確認用に画面出するだけの処理
        p = (
            messages
            | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
            | 'print' >> beam.Map(print)
            )

        # ここにFirestoreへの格納コードを書く
        pass

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

試しに、DirectRunnerで実行すると、先ほどPubSubから読んだようなメッセージがズラズラ表示されます。DirectRunnerは、実際にDataflowインフラで動かさず、ローカルで動かすモードです。

$ python pubsub2firestore1.py --region='us-central1' --runner DirectRunner

Cloud Firestore へ格納

次に Firestore へ格納するところは、Firestoreのクライアントライブラリを使って実装します。

pubsub2firestore2.py
import argparse
import logging, json

import apache_beam as beam

from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

# Firestoreに書き込むデータを作るクラス
class CreateDocumentFn(beam.DoFn):
    def process(self, element):
        items = json.loads(element)
        document_id = items['ride_id']
        document = {"timestamp": items['timestamp'],
                    "point_idx": items['point_idx'],
                    "latitude": items['latitude'],
                    "longitude": items['longitude'],
                    "ride_status": items['ride_status'],
                    "meter_reading": items['meter_reading'],
                    "meter_increment": items['meter_increment'],
                    "passenger_count": items['passenger_count']}

        yield (document_id, document)

# Firestoreに書き込むためのクラス
class WriteToFirestore(beam.DoFn):
    def __init__(self, project, collection):
        self._project = project
        self._collection = collection

    def process(self, element):
        from google.cloud import firestore

        db = firestore.Client(self._project)

        # document_idにride_idを指定することで同じ乗車のデータは上書き更新する
        (document_id, document) = element
        db.collection(self._collection).document(document_id).set(document)


def run(argv=None, save_main_session=True):

    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)

    # streaming=True とすることでDataflowジョブをストリーミングジョブにする
    pipeline_options = PipelineOptions(pipeline_args, streaming=True)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    # ここからがDataflowパイプライン実装する部分
    with beam.Pipeline(options=pipeline_options) as p:

        # ここでPubSubからデータを取得してPCollectionにしている。
        messages = p | 'Read' >> ReadFromPubSub(subscription='projects/<PROJECT ID>/subscriptions/taxirides-realtime').with_output_types(bytes)

        # データを整形して、Firestoreに投げ込むためのDictを作っている。
        p = (
            messages
            | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
            | 'entiry' >> beam.ParDo(CreateEntityFn())
            )

        # ここで、上の方で作っているクラスをParDoで呼んでFirestoreにドキュメントを書き込む
        output = (
            p
            | 'Write to Firestore' >> beam.ParDo(WriteToFirestore('<PROJECT ID>','rides'))
        )

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

これを以下のような感じで動かすとFirestoreのコンソール画面でデータが追加・更新される様子が見れる。firestoreのクライアントがDataflowのVMにデフォルトでない(あってくれよ)ので外部ライブラリとして読み込むようにrequirements.txtを作って実行時のオプションで指定しています。

$ pip freeze | grep google-cloud-firestore > requirements.txt
$ python pubsub2firestore.py --requirements_file requirements.txt --region='us-central1' --runner DataflowRunner --project=<PROJECT ID> --temp_location=gs://<任意のバケット>/temp --staging_location=gs://<任意のバケット>/staging

Firestoreの画面はこんな感じです。たまに更新されたデータが緑に光って見えます。

Screen Shot 2020-08-25 at 0.57.15.png

一件ずつクライアントからFirestoreへsetをコールしているので、お金かかりそうです。バッチ挿入をうまく使って実装するとコスト削減できそうですね。しかも今回のデータソースでは問題になりませんでしたが、データ流量が多いと、FirestoreのAPIコールの制約に引っかかる場合も出てきそうです。

ということで、バッチ版もやってみました。WriteToFirestoreクラスを以下のようにします。

class WriteToFirestore(beam.DoFn):

    def __init__(self, project, collection):
        self._project = project
        self._collection = collection

    def setup(self):
        from google.cloud import firestore
        self._db = firestore.Client(self._project)

    def start_bundle(self):
        self._documents = []

    def process(self, element):
        self._documents.append(element)
        if len(self._documents) >= 500:
            self._commit_batch()

    def finish_bundle(self):
        self._commit_batch()

    def _commit_batch(self):
        batch = self._db.batch()
        for doc in self._documents:
            (document_id, entity) = doc
            ref = self._db.collection(self._collection).document(document_id)
            batch.set(ref, entity)
        self._documents = []
        batch.commit()


setup関数は、DoFnインスタンスの初期化の際に一度だけ呼ばれるので、ここでクライアントを作ります。start_bundleは、バンドルという一連のデータを処理単位に区切ったデータの塊ですが、これの区切りごとに実行されます。先ほどは、processで一件ずつ書き込んでFirestoreに書き込んでいましたが、バッチで書き込むために、start_bundleで用意した、配列_documentsにデータを追加していきます。Firestoreの1回あたりの書き込み条件件数が500なので500になったら、一気に書き出すという動きです。最後、funish_bandleは、start_bundleの逆なので、バンドルの最後に残りを全て書き出すという動きです。

以上です。

4
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
4