0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

FlinkSQL 一時結合開発

Last updated at Posted at 2025-03-12

本記事はこちらのブログを参考にしています。
翻訳にはアリババクラウドのModelStudio(Qwen)を使用しております。

実時間開発における一時結合の経験共有

著者: Tuyuan

実時間開発において、特定の瞬間に該当する属性を取得するために二重ストリーム結合を使用する場合、一時結合がよく活用されます。トラフィックのアップグレードをリアルタイムで反復する際、筆者はトラフィックログと閲覧時間に対応する商品属性を正確にマッチさせる必要があり、一時結合を開発する中でいくつかの課題に直面しました。本記事では、そのような経験をまとめ、参考および議論のために提供します。

1. 背景

この記事を読む前に、FlinkSQLのリアルタイム二重ストリーム結合に関する背景知識を理解しておくことをお勧めします。現在、TTストリームAにはトラフィックログの詳細情報、TTストリームBには商品タグがあります。Flink上でストリームAとBを二重ストリーム結合することは、ストリームAをHBaseディメンションテーブルと結合することに似ています。一時結合には以下の特徴があります:

  • 単一ストリーム駆動型: 二重ストリーム結合であっても、データの出力は一方のストリームによってトリガーされます。バージョン管理されたテーブルを定義し、各時点での属性情報を記録し、二つのストリームを結合する際にパッシブにクエリされる必要があります。例えば、銀行の為替レート表のように、通貨交換時の為替レートを参照する必要があります。

  • クエリは時間バージョン情報を含む: 一時結合は2つのストリームのウォーターマークによってトリガーされます。したがって、取得される属性は関連する時間枠内のものとなります。

1
出典: Jincheng Sun, Blinkシリーズ - Temporal Table JOIN

2. 適用シナリオ & 事例共有

リアルタイムの為替レートに基づいて通貨金額の合計を計算したり、リアルタイムの商品価格に基づいて取引総額を計算する場合、一時結合はしばしば使用され、リアルタイムの為替レートや価格情報を取得します。私のトラフィックアップグレード業務の反復では、リアルタイムの商品タグを取得する必要がありました。そのため、商品タグのバージョン管理されたテーブルを定義する必要があります。構文は次の通りです:sql
CREATE TEMPORARY TABLE tag_ri (
id VARCHAR,
tag VARCHAR,
time VARCHAR,
ts AS TO_TIMESTAMP(time, yyyy-MM-dd HH:flag_mm:ss),
WATERMARK FOR ts AS withOffset(ts, 0) -- ウォーターマークの定義
) WITH (
connector = tt,
router = ******,
topic = tag_ri,
lineDelimiter = \n,
fieldDelimiter = \u0001,
encoding = utf-8
);

-- バージョンテーブルの定義
CREATE TEMPORARY VIEW tag
AS
SELECT id
, tag
, time
, ts
FROM ( SELECT id
, tag
, time
, ts
, ROW_NUMBER() OVER (PARTITION BY id -- 結合主キー
ORDER BY time DESC) AS rownum
FROM tag_ri
)
WHERE rownum = 1;

同様に、トラフィックログの詳細ストリームにもウォーターマークを定義し、二重ストリーム結合を行う必要があります。sql
CREATE TEMPORARY TABLE log_ri (
id VARCHAR,
time VARCHAR,
......
ts AS TO_TIMESTAMP(time, yyyy-MM-dd HH:flag_mm:ss),
WATERMARK FOR ts AS withOffset(ts, 0)
) WITH (
connector = tt,
router = ******,
topic = log_ri,
lineDelimiter = \n,
fieldDelimiter = \u0001,
encoding = utf-8,
);

select a.id
,......
,b.tag
from (
SELECT *
FROM log_ri
) AS a
LEFT JOIN tag FOR SYSTEM_TIME AS OF a.ts AS b ON a.id = b.id

結果は次のようになります:sql
-- 商品タグ情報
12:00> SELECT * FROM tag_ri;

id tag (商品タグ)
======= =======================
t1 A

12:30> SELECT * FROM tag_ri;

id tag (商品タグ)
======= =======================
t1 B

-- トラフィックログ詳細クエリ、商品t1に対する3つのエントリを表示
SELECT * FROM log_ri;

id time
======= ========
t1 12:00
t1 12:15
t1 12:30

-- 一時結合を実行
select a.id
,a.time
,b.tag
from (
SELECT *
FROM log_ri
) AS a
LEFT JOIN tag FOR SYSTEM_TIME AS OF a.ts AS b ON a.id = b.id

id time tag (商品タグ)
======= ======== =======================
t1 12:00 A
t1 12:15 A
t1 12:30 B

3. 開発経験

3.1 スパースデータ処理

一時結合は2つのストリームのウォーターマークによってトリガーされるため、もしバージョン管理されたテーブルがスパースなストリーム(一定期間データが流れ込まない)を表している場合、結合は待機状態になり、データが出力されないことがあります。このような場合、パラメータ set table.exec.source.idle-timeout = 10s を設定することで、ストリームAのデータが待機を回避できます。具体的なパラメータについての詳細は、こちらをご参照ください:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/#table-exec-source-idle-timeout

3.2 遅延データの出力

問題の説明

実際の開発中、一時結合を行った後、データは常に待ち状態となり、整時になってから初めて出力されることを確認しました。

2

原因分析

SQL構文でTTログフローを分析したところ、ロジックは4つのソースのユニオン後に定義されたバージョン管理されたテーブルとの結合であることが判明しました。sql
select a.*
,b.tag
from
(
select * from source_1
union all
select * from source_2
union all
select * from source_3
union all
select * from source_4
) a
temporay join
b stream

source_4は現在の時刻の59分間の一部のデータを整時に処理します。一時結合は両方のストリームのウォーターマークによってトリガーされるため、ストリームAはストリームBが現在の時刻の59分に達するまで待たなければなりません。

3

解決策

source_4の一部で、log_timeが現在の時刻より大きい場合、一時結合中にlog_timeを現在の時刻に設定することでこの問題を解決できます。

4. まとめ

二重ストリーム結合が単一ストリームによって駆

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?