1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

朝のバス乗車時間よりも短い時間でリアルタイム分析パイプラインを構築

Last updated at Posted at 2023-10-18

Build a real-time analytics pipeline in less time than your morning bus rideの翻訳です。

2022年10月31日

朝のバスに乗るよりも短い時間でリアルタイム分析パイプラインを構築する

Apache Kafka®とApache Flink®を使用したイベント・ストリーミング・アーキテクチャ、リアルタイム分析データウェアハウスClickHouse®へのデータ着地について詳しくご紹介します。

興味深いデータセットがあり、多くのアイデアや質問があり、時間が刻々と迫っているとしたらどうしますか?数分以内に質の高い洞察を提供し、イノベーションをシンプルにしたい。Digitransitのデータセットを使い、Aiven Platformのツールやオープンソース・サービスを使ったデモで、それがどのように機能するかの一例をお見せします。

また、デイブがこのトピックについて詳しく説明するウェビナーも開催しました。録画はこちらから:

データ分析パイプラインの構築方法

何をカバーするか

  • デジトランスプラットフォーム:**我々のソリューションのデータアーキテクチャコンポーネントとサードパーティAPIからの入力データソースをチェックする。
  • 地下を掘る: 私たちの旅を始めるにあたり、私たちのチューブマップを見てみましょう!
  • 最初の足: MQTTフィードを購読するためのKafka Connect®フレームワークをご覧ください。
  • Apache Flink:でマッピング、フラット化、いくつかの変換を行う。
  • 最終目的地:では、分析クエリを実行できるようにデータを安全に取得します。
  • まとめ: 余分なクレジットを使って自分で試してみてください。Aiven ConsoleでCatch the Bus Challengeに挑戦し、他のリソースもご覧ください。

オープンソースプラットフォームから鉄道プラットフォームへ:私たちのデモの基礎

Digitransitは、フィンランドの旅行者が毎日利用するウェブサイトやモバイルアプリを支えています。このプラットフォームが特別なのは(そしてAivenが気に入っている理由は)、完全なオープンソースであることだ。彼らは、マッピング、ジオコーディング、サービスアラート、トリップアップデート、車両位置のためのほぼリアルタイムのデータフィードのためのAPIの素敵なセットを提供しています。

このデータフィードを取得し、Aiven上でパイプラインを素早く構築し、これらのイベントを使ってフィンランドの公共交通機関の状況について新しいことを学んでみよう!

image

ご覧のように、データパイプラインアーキテクチャは複雑になる可能性があります(さらに深く知りたい方は、future.comの素晴らしい情報をご覧ください)。しかし、ここでは、私たちのソリューションに使用するデータパイプラインアーキテクチャのコンポーネントを強調しました:

そしてこれらはすべてAiven上で稼働しています!

地下を掘り進む

これが建設予定の全マップだ。

image

すでに述べた中核的なサービス以外に、私たちは途中にいくつかの追加的な停車駅があることがおわかりいただけるだろう:

M3DBとGrafanaは、パイプラインの状態を監視し、健全であることを確認するために使用される。メッセージの着信率やコンシューマのラグをチェックし、それらにアラートのしきい値を設定することができる。

トライアルアカウントで、Aivenコンソールにドロップすると、提供されているオープンソースのマネージドサービスがすべて表示されます。

Kafka Connect®ラインの細かい設定

Kafka Connect®サービスの内部で、Stream ReactorのMQTT Source Connectorを実行します。カスタムコーディングは必要なく、正しいエンドポイントを指定し、トピックの詳細を渡し、コネクタに書き込むデータ形式を知らせるための最小限の設定だけです(シンプルにするためにJSONにしていますが、Avroもオプションです)。

これを実行すると、Kafkaトピックにこのようなレコードが表示される。

image

我々は、オペレーターID、車両ID、速度、方位、緯度/経度としての位置、その他さまざまなフィールドを持っている。しかし、ここでいくつか問題がある。

まず、全体がVP(車両位置)というJSONオブジェクトとしてラップされている。第二に、これらのIDはあまり意味がない。私たちは輸送オペレーターの名前を知りたいのだ。


注意: Terraformを使ってGithubでこのすべてを構築する方法を見たいなら、私たちにお任せください。

前処理のために Apache Flink に立ち寄る。

image

それでは、ストリームの前処理をするためにApache Flinkを通してみよう。ここでいくつかやっていることがある:

  • 各レコードをPostgreSQLデータベースにある小さな参照データセットに結合する。
  • 入れ子を取り除き、フラット化された値のセットを別のトピックに書き込みます。
  • 簡単な変換もできます。例えば、速度の値はメートル毎秒で報告されるが、マイル毎時やキロ毎時の方がいいかもしれない。ここで、元のカラムと一緒に新しい計算カラムを書くことができます。

これらの変換はSQLクエリとして定義されます。生データのKafkaトピックとPostgresの参照データテーブルの上にオーバーレイとして機能するFlinkテーブルを定義し、これらから選択し、好きなように結合します。

最後にクリックハウスのAiven

最後に、ClickHouseにデータが到着したらすぐに分析クエリーを実行できるようにしたい。ClickHouseは素晴らしいKafkaインテグレーションを備えているので、ここにKafkaのエンジン・タイプを持つテーブルを定義し、コンシューマー・グループを走らせ、私たちが望むトピックについて到着した新しいデータを常に読み込むことができる。

このテーブルをクエリすると、消費された最新のレコードが表示されるので、それだけでは意味がない。コンシューマー・グループの詳細をリセットしないと、同じデータを2回クエリすることはできない。そこで、このテーブルの上にClickHouseマテリアライズド・ビューを作成します。

image

ClickHouseはバックグラウンドでKafkaから常にデータを読み込んでいる。そのデータはマテリアライズド・ビューにトリクルされ、永続化されるので、何度でもクエリーを実行することができます。

まとめ

以上である。これらのツールとノウハウがあれば、独自のデータシステムで価値を創造し、イノベーションを起こすために、アンダーグラウンドをナビゲートすることができる。

ストリーミング異常検知システムの構築

データ変換にはAiven for Apache Flink®、データストリーミングにはAiven for Apache Kafka®、データストレージ/クエリにはAiven for PostgreSQL®を使用します。

チュートリアルを読む

参考文献

その他、ご意見・ご質問等ございましたら、TwitterまたはLinkedInまでご連絡ください。ブログのRSSフィードをフォローし、ドキュメントをご覧ください。Aivenと私たちのサービスに関する最新ニュース、さらにオープンソース全般に関する情報をご希望の方は、月刊ニュースレターを購読してください!

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?