2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Apache Flink®とApache Kafka®によるリアルタイムの株式データ

Posted at

Real-time stock data with Apache Flink® and Apache Kafka®の翻訳です。

2022年3月2日

Apache Flink®とApache Kafka®によるリアルタイムの株式データ

Apache Kafka®とApache Flink®により、リアルタイムデータ処理とストリーミング分析をコーディング不要でサポートできるようになりました!


2023年2月更新:Aiven for Apache Flinkは一般的に利用可能です!詳しくはブログで**

Apache Kafka®とApache Flinkは、リアルタイムデータ処理とストリーミング分析をサポートできるようになりました!コードを書く必要はなく、FlinkのシンプルなSQLステートメントだけです。1つのFlinkサービスに統合された受信データまたは処理データの複数のソースまたはシンクを持つことができます。

なぜFlinkを使うべきか

Flinkはオープンソースのツールであり、データガバナンスを維持しながらコラボレーションを行うことができます。Flinkを使えば、エンジニアはデータの種類ごとにパイプラインを構築する必要はありません。その代わり、データはFlinkのジョブを使って組織全体のチームやアプリケーションで利用できるようになります。

測定するために作られたデータ

Flinkのジョブを使えば、組織のニーズに応じてデータをフィルターウィンドウ、またはエンリッチすることができます。このデータは、保管や処理のために他のシステムに渡すことができます。

早く失敗し、早く成功する

Aiven Consoleを介した新しいジョブの追加は簡単かつ迅速で、データ処理の選択肢が広がります。このため、Flinkはデータ実験を実行するのに理想的なプラットフォームとなっている。

Apache Flinkを使った耐障害性の高いデータパイプライン

Flink は、データパイプラインを展開するための高可用性プラットフォームを提供します。Flinkは、フォールトトレランスのクラスタ再起動戦略で構成されており、Aivenのすべてのビジネスプランには、自動フェイルオーバーで高可用性を実現するマルチノードクラスタが付属しています。詳細はこちらをご覧ください。

マイクロサービスがすべてではない

マイクロサービスで組織のデータニーズを満たそうと考えていませんか?それは悪いアイデアではありませんが、Flinkを使えば、保守、デプロイ、実行するコードがないことに注意してください。つまり、複雑さを減らし、コストを下げ、市場投入までの時間を短縮し、リソースを節約することができる。

複雑なひねりを加えたシンプルさ

Flinkはwindowswatermarksを使って、複雑なSQLの結合やデータのウィンドウ化を簡単にします。状態はFlinkのジョブの中で維持されます。

次の例では、Webリクエストのアクセスタイムスタンプを収集するrequestsテーブルがあるので、PostgresSQLとFlinkを使って1時間あたりのリクエスト数をクエリします。

PostgreSQL では

時間を (
 select generate_series(
 date_trunc('hour', now()) - '1 day'::interval、
 date_trunc('hour', now())、
 '1時間'::間隔
 ) as hour
)

select
 hours.hour、
 count(requests.id)
時間から
left join requests on date_trunc('hour', requests.accesstime) = hours.hour
group by 1`クリップボードにコピーする

... 一方、Flinkでは次のことができます。

 セレクト
 時間
 count(id)
 FROM requests
 GROUP BY symbol, TUMBLE(accesstime, INTERVAL '1' HOUR)` クリップボードにコピーする

Flink をテストする

エンドツーエンドのソリューションを示すために、実際のユースケースを見てみよう。これはFlinkとApache Kafkaを使って証券取引所のデータ処理をシミュレートしたものです。

この例では、Pythonコードが証券取引所のデータをKafkaトピックに生成します。次にFlinkがそれをピックアップして処理し、処理したデータを別のKafkaトピックに配置します。

Python、Kafka、Flink

以下のFlinkクエリは、これらすべてを実行する:

  • 買値の最大値と最小値の差を計算する。
  • 一定期間の株式ティックデータ(銘柄)の価格を求める。
  • 指定された時間ウィンドウ(この例では9秒に設定されているため、デモのために秒単位で素早くデータが流れる)中の株式の間隔を追跡する。
 INSERT INTO ${aiven_flink_table.sink.table_name}.
 SELECT
 シンボル
 max(bid_price)-min(bid_price)、
 max(ask_price)-min(ask_price)、
 min(bid_price)、
 max(bid_price)、
 min(ask_price)、
 max(ask_price)、
 TIMESTAMPDIFF(SECOND, min(ts_ltz),max(ts_ltz))、
 現在のタイムスタンプ
 FROM ${aiven_flink_table.source.table_name}.
 GROUP BY symbol, TUMBLE(ts_ltz, INTERVAL '9' SECOND)`クリップボードにコピーする

しかし、私たちの言葉を鵜呑みにしてはいけない。このソリューションが実際に動いているのを見るには、このコード例を見てほしい。Python 3とTerraformをセットアップし、指示に従ってください。いくつかのコマンドを実行するだけで、処理されたデータを見ることができる。

生産者の例(生成された株式データ):

`{"symbol":"M3", "bid_price":16.89, "ask_price":time_stamp": 16379779770211637977977021}
{"symbol":"シンボル": "KAFKA", "bid_price":891.25, "ask_price":891.75, "time_stamp":1637977977046}
{"symbol":"MYSQL", "bid_price":667.94、"ask_price":669.31, "time_stamp":1637977977072}
{"symbol":"PSQL", "bid_price":792.25, "ask_price":793.62, "time_stamp":1637977977096}
{"symbol":"シンボル": "INFLUX", "bid_price":24.43、"ask_price":24.79, "time_stamp":1637977977124}
{"symbol":"REDIS", "bid_price":12.87, "ask_price":12.57, "time_stamp":1637977977150}
{"symbol":"REDIS", "bid_price":13.33, "ask_price":12.26, "time_stamp":1637977977178}
{"symbol":"REDIS", "bid_price":11.54、"ask_price": 11.2911.29, "time_stamp":1637977979703}
{"symbol":"OS", "bid_price":9.49, "ask_price": 10.3, "time_stamp": 16379779770310.3, "time_stamp":1637977979731}
{"symbol":"シンボル": "INFLUX", "bid_price":24.44, "ask_price": 24.2724.27, "time_stamp":1637977979757}
{"symbol":"CQL", "bid_price":22.67, "ask_price":21.61, "time_stamp":1637977979788}
{"symbol":"OS", "bid_price":10.09, "ask_price":10.49, "time_stamp":1637977979813}
{"symbol":"CQL", "bid_price":22.06, "ask_price":21.28, "time_stamp":1637977979839}
{"symbol":"MYSQL", "bid_price":670.94, "ask_price":669.85, "time_stamp":1637977979864}
{"symbol":"PSQL", "bid_price":792.77, "ask_price":792.24, "time_stamp":1637977979889}`クリップボードにコピー

消費者の例(加工された株式データ):

`Received: b'{"symbol":"PSQL","change\_bid\_price":0,"change\_ask\_price":0,"min\_bid\_price":777.12,"max\_bid\_price":777.12,"min\_ask\_price":778.09,"max\_ask\_price":778.09,"time\_interval":0,"time\_stamp":"2021-11-27 01:52:33.744"}'
Received: b'{"symbol":"INFLUX","change_bid_price":2.25,"change_ask_price":2.02,"min_bid_price":23.96,"max_bid_price":26.21,"min_ask_price":24.66,"max_ask_price":26.68,"time_interval":0,"time_stamp":"2021-11-27 01:52:33.744"}'
Received: b'{"symbol":"M3","change_bid_price":0,"change_ask_price":0,"min_bid_price":26.57,"max_bid_price":26.57,"min_ask_price":25.9,"max_ask_price":25.9,"time_interval":0,"time_stamp":"2021-11-27 01:52:33.744"}'
Received: b'{"symbol":"REDIS","change_bid_price":0,"change_ask_price":0,"min_bid_price":13.3,"max_bid_price":13.3,"min_ask_price":14.48,"max_ask_price":14.48,"time_interval":0,"time_stamp":"2021-11-27 01:52:33.745"}'
Received: b'{"symbol":"OS","change_bid_price":1.52,"change_ask_price":1.92,"min_bid_price":19.04,"max_bid_price":20.56,"min_ask_price":18.61,"max_ask_price":20.53,"time_interval":0,"time_stamp":"2021-11-27 01:52:33.745"}'
Received: b'{"symbol":"OS","change_bid_price":1.74,"change_ask_price":2.27,"min_bid_price":15.24,"max_bid_price":16.98,"min_ask_price":14.45,"max_ask_price":16.72,"time_interval":2,"time_stamp":"2021-11-27 01:52:42.358"}'
Received: b'{"symbol":"MYSQL","change_bid_price":1.43,"change_ask_price":1.08,"min_bid_price":667.17,"max_bid_price":668.6,"min_ask_price":666.92,"max_ask_price":668,"time_interval":3,"time_stamp":"2021-11-27 01:52:42.358"}'
Received: b'{"シンボル": "REDIS", "change_bid_price": "2.7", "change_ask_price": "4.31", "min_bid_price": "12.98", "max_bid_price": "15.68,"min_ask_price":11.55,"max_ask_price":15.86,"time_interval":3,"time_stamp":"2021-11-27 01:52:42.359"}'`Copy to clipboard

続きを読む

まだAivenのサービスをご利用でない方は、https://console.aiven.io/signupから無料トライアルにお申し込みください!

それまでは、changelogblogのRSSフィード、またはLinkedInTwitterのアカウントをフォローして、製品や機能関連の最新情報をご確認ください。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?