初めに
Google Cloud の Dataflow を使って、Cloud Pub/Sub のデータを Firestore にリアルタイムにストリーム処理で格納してみます。使用言語は Python3 です。
各サービスの詳細な説明は省かせていただきます。それぞれの製品に関する専門用語は解説なして利用させていただきます。ご了承ください。
今回登場するGoogle Cloud のサービス紹介
以下に簡単にですが今回利用する Google Cloud のサービスをご紹介します。
Cloud Pub/Sub
メッセージ配信サービスです、メッセージと呼ばれるデータを送信するパブリッシャーとサブスクライバーを仲介するサービスです。大量のメッセージの受信と配信を非同期にすることができます。リアルタイムに発生するログデータやIoTのデバイスデータなどを受けるサービスとして使われがちです。
Cloud Dataflowとは
Python と Java のプログラムを分散処理させるサービスです。Apache Beam という OSS フレームワークでプログラムは書きます。バッチ処理もストリーム処理も実装できるのが特徴です。分析基盤で大量データのETL処理などのデータパイプラインを実装するのに使われがちです。
Cloud Firestore
ドキュメント指向型のNoSQL データベースサービスです。モバイルバックエンドの Firebase のデータベースサービスで、モバイルアプリやWebサービスのバックエンドデータベースとして使われがちです。
シナリオ
Google Cloud には、公開データセットとして、ニューヨークのタクシーの移動データが、Cloud Pub/Sub にストリームされており利用することが可能です。今回はこのデータをリアルタイムにFirestore に格納してみたいと思います。
Cloud Pub/Sub から Dataflow でデータを読み込む
初めに、開発するための Cloud Shell を立ち上げます。Google Cloud コンソール画面の右上の「Cloud Shellをアクティブにする」で起動します。プロジェクトがセットされていない場合は、適切なプロジェクトをセットしてください。
$ gcloud config set core/project <PROJECT ID>
以降プロジェクトIDを使う部分は全てにリプレースしています。
Pub/Sub サブスクリプションの作成
まずは、公開データセットのトピックからデータを取得するためのサブスクリプションを作成します。
$ gcloud pubsub subscriptions create taxirides-realtime --topic=projects/pubsub-public-data/topics/taxirides-realtime
確認のために以下のコマンドでデータが取得できるか確認してください。
(取得できるまでに少し時間がかかるかもしれませんので、間隔を開けて何度か実行してください。)
$ gcloud pubsub subscriptions pull projects/<PROJECT ID>/subscriptions/taxirides-realtime
とれたらこんな感じの画面が返ってくると思います。(ちょっと長いですが。。)
┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬──────────────────┬────────────────────────────────────┬──────────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ DATA │ MESSAGE_ID │ ATTRIBUTES │ DELIVERY_ATTEMPT │ ACK_ID │
├────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼──────────────────┼────────────────────────────────────┼──────────────────┼────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ {"ride_id":"a1dfd8be-7ecd-4d03-a3d8-4efde03f89a1","point_idx":605,"latitude":40.68169,"longitude":-73.80504,"timestamp":"2020-08-23T21:15:13.90025-04:00","meter_reading":13.518753,"meter_increment":0.022345046,"ride_status":"enroute","passenger_count":1} │ 1454842611440512 │ ts=2020-08-23T21:15:13.90025-04:00 │ │ RFAGFixdRkhRNxkIaFEOT14jPzUgKEUUBQgUBXx9SUFPdV5YfGhRDRlyfWByblsXCVBDBXpcURMOb1hbdwdQBxlgTGFXSlsXBAZNV3xZWhoJbFxZdAV5666Zy-SJj2gJOjqqysKVbju43-MlZiI9XxJLLD5-MTFFQV5AEkw6BERJUytDCypYEU4EISE-MD5FUw │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴──────────────────┴────────────────────────────────────┴──────────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
一番左の Data
というところに JSON 形式で時間や位置情報や乗客数などが入っていることがわかると思います。
Dataflow で PubSub からデータを読み込む
Pythonの環境構築
適当にフォルダを作ってPythonの仮装環境を作っておき、必要なライブラリ(Apache Beam と Cloud Firestoreのクライアント)をインストールします。。
$ mkdir dataflow_pubsub2firestore
$ cd dataflow_pubsub2firestore
$ virtualenv .env
$ source .env/bin/activate
$ pip install apache_beam[gcp]
$ pip install google-cloud-firestore
Dataflowで実行するコードは次のようになります。
import argparse
import apache_beam as beam
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
# streaming=True とすることでDataflowジョブをストリーミングジョブにする
pipeline_options = PipelineOptions(pipeline_args, streaming=True)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
# ここからがDataflowパイプライン実装する部分
with beam.Pipeline(options=pipeline_options) as p:
# ここでPubSubからデータを取得してPCollectionにしている。
messages = p | 'Read' >> ReadFromPubSub(subscription='projects/<PROJECT ID>/subscriptions/taxirides-realtime').with_output_types(bytes)
# 文字列に変換して、確認用に画面出するだけの処理
p = (
messages
| 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'print' >> beam.Map(print)
)
# ここにFirestoreへの格納コードを書く
pass
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
試しに、DirectRunnerで実行すると、先ほどPubSubから読んだようなメッセージがズラズラ表示されます。DirectRunnerは、実際にDataflowインフラで動かさず、ローカルで動かすモードです。
$ python pubsub2firestore1.py --region='us-central1' --runner DirectRunner
Cloud Firestore へ格納
次に Firestore へ格納するところは、Firestoreのクライアントライブラリを使って実装します。
import argparse
import logging, json
import apache_beam as beam
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
# Firestoreに書き込むデータを作るクラス
class CreateDocumentFn(beam.DoFn):
def process(self, element):
items = json.loads(element)
document_id = items['ride_id']
document = {"timestamp": items['timestamp'],
"point_idx": items['point_idx'],
"latitude": items['latitude'],
"longitude": items['longitude'],
"ride_status": items['ride_status'],
"meter_reading": items['meter_reading'],
"meter_increment": items['meter_increment'],
"passenger_count": items['passenger_count']}
yield (document_id, document)
# Firestoreに書き込むためのクラス
class WriteToFirestore(beam.DoFn):
def __init__(self, project, collection):
self._project = project
self._collection = collection
def process(self, element):
from google.cloud import firestore
db = firestore.Client(self._project)
# document_idにride_idを指定することで同じ乗車のデータは上書き更新する
(document_id, document) = element
db.collection(self._collection).document(document_id).set(document)
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
# streaming=True とすることでDataflowジョブをストリーミングジョブにする
pipeline_options = PipelineOptions(pipeline_args, streaming=True)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
# ここからがDataflowパイプライン実装する部分
with beam.Pipeline(options=pipeline_options) as p:
# ここでPubSubからデータを取得してPCollectionにしている。
messages = p | 'Read' >> ReadFromPubSub(subscription='projects/<PROJECT ID>/subscriptions/taxirides-realtime').with_output_types(bytes)
# データを整形して、Firestoreに投げ込むためのDictを作っている。
p = (
messages
| 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'entiry' >> beam.ParDo(CreateEntityFn())
)
# ここで、上の方で作っているクラスをParDoで呼んでFirestoreにドキュメントを書き込む
output = (
p
| 'Write to Firestore' >> beam.ParDo(WriteToFirestore('<PROJECT ID>','rides'))
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
これを以下のような感じで動かすとFirestoreのコンソール画面でデータが追加・更新される様子が見れる。firestoreのクライアントがDataflowのVMにデフォルトでない(あってくれよ)ので外部ライブラリとして読み込むようにrequirements.txtを作って実行時のオプションで指定しています。
$ pip freeze | grep google-cloud-firestore > requirements.txt
$ python pubsub2firestore.py --requirements_file requirements.txt --region='us-central1' --runner DataflowRunner --project=<PROJECT ID> --temp_location=gs://<任意のバケット>/temp --staging_location=gs://<任意のバケット>/staging
Firestoreの画面はこんな感じです。たまに更新されたデータが緑に光って見えます。
一件ずつクライアントからFirestoreへsetをコールしているので、お金かかりそうです。バッチ挿入をうまく使って実装するとコスト削減できそうですね。しかも今回のデータソースでは問題になりませんでしたが、データ流量が多いと、FirestoreのAPIコールの制約に引っかかる場合も出てきそうです。
ということで、バッチ版もやってみました。WriteToFirestoreクラスを以下のようにします。
class WriteToFirestore(beam.DoFn):
def __init__(self, project, collection):
self._project = project
self._collection = collection
def setup(self):
from google.cloud import firestore
self._db = firestore.Client(self._project)
def start_bundle(self):
self._documents = []
def process(self, element):
self._documents.append(element)
if len(self._documents) >= 500:
self._commit_batch()
def finish_bundle(self):
self._commit_batch()
def _commit_batch(self):
batch = self._db.batch()
for doc in self._documents:
(document_id, entity) = doc
ref = self._db.collection(self._collection).document(document_id)
batch.set(ref, entity)
self._documents = []
batch.commit()
setup
関数は、DoFnインスタンスの初期化の際に一度だけ呼ばれるので、ここでクライアントを作ります。start_bundle
は、バンドルという一連のデータを処理単位に区切ったデータの塊ですが、これの区切りごとに実行されます。先ほどは、processで一件ずつ書き込んでFirestoreに書き込んでいましたが、バッチで書き込むために、start_bundle
で用意した、配列_documents
にデータを追加していきます。Firestoreの1回あたりの書き込み条件件数が500なので500になったら、一気に書き出すという動きです。最後、funish_bandle
は、start_bundle
の逆なので、バンドルの最後に残りを全て書き出すという動きです。
以上です。