Real-time stock data with Apache Flink® and Apache Kafka®の翻訳です。
2022年3月2日
Apache Flink®とApache Kafka®によるリアルタイムの株式データ
Apache Kafka®とApache Flink®により、リアルタイムデータ処理とストリーミング分析をコーディング不要でサポートできるようになりました!
2023年2月更新:Aiven for Apache Flinkは一般的に利用可能です!詳しくはブログで**
Apache Kafka®とApache Flinkは、リアルタイムデータ処理とストリーミング分析をサポートできるようになりました!コードを書く必要はなく、FlinkのシンプルなSQLステートメントだけです。1つのFlinkサービスに統合された受信データまたは処理データの複数のソースまたはシンクを持つことができます。
なぜFlinkを使うべきか
Flinkはオープンソースのツールであり、データガバナンスを維持しながらコラボレーションを行うことができます。Flinkを使えば、エンジニアはデータの種類ごとにパイプラインを構築する必要はありません。その代わり、データはFlinkのジョブを使って組織全体のチームやアプリケーションで利用できるようになります。
測定するために作られたデータ
Flinkのジョブを使えば、組織のニーズに応じてデータをフィルター、ウィンドウ、またはエンリッチすることができます。このデータは、保管や処理のために他のシステムに渡すことができます。
早く失敗し、早く成功する
Aiven Consoleを介した新しいジョブの追加は簡単かつ迅速で、データ処理の選択肢が広がります。このため、Flinkはデータ実験を実行するのに理想的なプラットフォームとなっている。
Apache Flinkを使った耐障害性の高いデータパイプライン
Flink は、データパイプラインを展開するための高可用性プラットフォームを提供します。Flinkは、フォールトトレランスのクラスタ再起動戦略で構成されており、Aivenのすべてのビジネスプランには、自動フェイルオーバーで高可用性を実現するマルチノードクラスタが付属しています。詳細はこちらをご覧ください。
マイクロサービスがすべてではない
マイクロサービスで組織のデータニーズを満たそうと考えていませんか?それは悪いアイデアではありませんが、Flinkを使えば、保守、デプロイ、実行するコードがないことに注意してください。つまり、複雑さを減らし、コストを下げ、市場投入までの時間を短縮し、リソースを節約することができる。
複雑なひねりを加えたシンプルさ
Flinkはwindowsとwatermarksを使って、複雑なSQLの結合やデータのウィンドウ化を簡単にします。状態はFlinkのジョブの中で維持されます。
次の例では、Webリクエストのアクセスタイムスタンプを収集するrequestsテーブルがあるので、PostgresSQLとFlinkを使って1時間あたりのリクエスト数をクエリします。
PostgreSQL では
時間を (
select generate_series(
date_trunc('hour', now()) - '1 day'::interval、
date_trunc('hour', now())、
'1時間'::間隔
) as hour
)
select
hours.hour、
count(requests.id)
時間から
left join requests on date_trunc('hour', requests.accesstime) = hours.hour
group by 1`クリップボードにコピーする
... 一方、Flinkでは次のことができます。
セレクト
時間
count(id)
FROM requests
GROUP BY symbol, TUMBLE(accesstime, INTERVAL '1' HOUR)` クリップボードにコピーする
Flink をテストする
エンドツーエンドのソリューションを示すために、実際のユースケースを見てみよう。これはFlinkとApache Kafkaを使って証券取引所のデータ処理をシミュレートしたものです。
この例では、Pythonコードが証券取引所のデータをKafkaトピックに生成します。次にFlinkがそれをピックアップして処理し、処理したデータを別のKafkaトピックに配置します。
以下のFlinkクエリは、これらすべてを実行する:
- 買値の最大値と最小値の差を計算する。
- 一定期間の株式ティックデータ(銘柄)の価格を求める。
- 指定された時間ウィンドウ(この例では9秒に設定されているため、デモのために秒単位で素早くデータが流れる)中の株式の間隔を追跡する。
INSERT INTO ${aiven_flink_table.sink.table_name}.
SELECT
シンボル
max(bid_price)-min(bid_price)、
max(ask_price)-min(ask_price)、
min(bid_price)、
max(bid_price)、
min(ask_price)、
max(ask_price)、
TIMESTAMPDIFF(SECOND, min(ts_ltz),max(ts_ltz))、
現在のタイムスタンプ
FROM ${aiven_flink_table.source.table_name}.
GROUP BY symbol, TUMBLE(ts_ltz, INTERVAL '9' SECOND)`クリップボードにコピーする
しかし、私たちの言葉を鵜呑みにしてはいけない。このソリューションが実際に動いているのを見るには、このコード例を見てほしい。Python 3とTerraformをセットアップし、指示に従ってください。いくつかのコマンドを実行するだけで、処理されたデータを見ることができる。
生産者の例(生成された株式データ):
`{"symbol":"M3", "bid_price":16.89, "ask_price":time_stamp": 16379779770211637977977021}
{"symbol":"シンボル": "KAFKA", "bid_price":891.25, "ask_price":891.75, "time_stamp":1637977977046}
{"symbol":"MYSQL", "bid_price":667.94、"ask_price":669.31, "time_stamp":1637977977072}
{"symbol":"PSQL", "bid_price":792.25, "ask_price":793.62, "time_stamp":1637977977096}
{"symbol":"シンボル": "INFLUX", "bid_price":24.43、"ask_price":24.79, "time_stamp":1637977977124}
{"symbol":"REDIS", "bid_price":12.87, "ask_price":12.57, "time_stamp":1637977977150}
{"symbol":"REDIS", "bid_price":13.33, "ask_price":12.26, "time_stamp":1637977977178}
{"symbol":"REDIS", "bid_price":11.54、"ask_price": 11.2911.29, "time_stamp":1637977979703}
{"symbol":"OS", "bid_price":9.49, "ask_price": 10.3, "time_stamp": 16379779770310.3, "time_stamp":1637977979731}
{"symbol":"シンボル": "INFLUX", "bid_price":24.44, "ask_price": 24.2724.27, "time_stamp":1637977979757}
{"symbol":"CQL", "bid_price":22.67, "ask_price":21.61, "time_stamp":1637977979788}
{"symbol":"OS", "bid_price":10.09, "ask_price":10.49, "time_stamp":1637977979813}
{"symbol":"CQL", "bid_price":22.06, "ask_price":21.28, "time_stamp":1637977979839}
{"symbol":"MYSQL", "bid_price":670.94, "ask_price":669.85, "time_stamp":1637977979864}
{"symbol":"PSQL", "bid_price":792.77, "ask_price":792.24, "time_stamp":1637977979889}`クリップボードにコピー
消費者の例(加工された株式データ):
`Received: b'{"symbol":"PSQL","change\_bid\_price":0,"change\_ask\_price":0,"min\_bid\_price":777.12,"max\_bid\_price":777.12,"min\_ask\_price":778.09,"max\_ask\_price":778.09,"time\_interval":0,"time\_stamp":"2021-11-27 01:52:33.744"}'
Received: b'{"symbol":"INFLUX","change_bid_price":2.25,"change_ask_price":2.02,"min_bid_price":23.96,"max_bid_price":26.21,"min_ask_price":24.66,"max_ask_price":26.68,"time_interval":0,"time_stamp":"2021-11-27 01:52:33.744"}'
Received: b'{"symbol":"M3","change_bid_price":0,"change_ask_price":0,"min_bid_price":26.57,"max_bid_price":26.57,"min_ask_price":25.9,"max_ask_price":25.9,"time_interval":0,"time_stamp":"2021-11-27 01:52:33.744"}'
Received: b'{"symbol":"REDIS","change_bid_price":0,"change_ask_price":0,"min_bid_price":13.3,"max_bid_price":13.3,"min_ask_price":14.48,"max_ask_price":14.48,"time_interval":0,"time_stamp":"2021-11-27 01:52:33.745"}'
Received: b'{"symbol":"OS","change_bid_price":1.52,"change_ask_price":1.92,"min_bid_price":19.04,"max_bid_price":20.56,"min_ask_price":18.61,"max_ask_price":20.53,"time_interval":0,"time_stamp":"2021-11-27 01:52:33.745"}'
Received: b'{"symbol":"OS","change_bid_price":1.74,"change_ask_price":2.27,"min_bid_price":15.24,"max_bid_price":16.98,"min_ask_price":14.45,"max_ask_price":16.72,"time_interval":2,"time_stamp":"2021-11-27 01:52:42.358"}'
Received: b'{"symbol":"MYSQL","change_bid_price":1.43,"change_ask_price":1.08,"min_bid_price":667.17,"max_bid_price":668.6,"min_ask_price":666.92,"max_ask_price":668,"time_interval":3,"time_stamp":"2021-11-27 01:52:42.358"}'
Received: b'{"シンボル": "REDIS", "change_bid_price": "2.7", "change_ask_price": "4.31", "min_bid_price": "12.98", "max_bid_price": "15.68,"min_ask_price":11.55,"max_ask_price":15.86,"time_interval":3,"time_stamp":"2021-11-27 01:52:42.359"}'`Copy to clipboard
続きを読む
- Apache Flink® 入門
- Apache Kafka®とApache Flink®でバッチからストリーミングへ
- データと災害復旧
- Apache Kafka中心のマイクロサービスアーキテクチャの5つの利点
まだAivenのサービスをご利用でない方は、https://console.aiven.io/signupから無料トライアルにお申し込みください!
それまでは、changelogとblogのRSSフィード、またはLinkedInとTwitterのアカウントをフォローして、製品や機能関連の最新情報をご確認ください。