本稿は個人の見解および確認によるもので、所属する団体等とは関係がありません。
はじめに
前回投稿したこちらの記事では、streamingオプションやdeferredオプションがどのようにテンポラリファイルに影響するか調査しました。
今回は少々極端な例ですが、DataWeaveを記述するTransformコンポーネントを複数並べた場合および、前回Non repeatable streamに固定していたFile:ReadのStream StrategyオプションをデフォルトであるRepeatableにした場合の挙動について考察を行います。
実装
早速ですが、以下のような、5連続のTransform Messageコンポーネントを走らせてみます。
各々のコンポーネントは以下のようなDataWeave式となっています。
(以下の例は、最初のTransform Messageコンポーネントの内容で、CSVの1番目と2番目の項目はupdate関数を使用して変更し、最後の項目にタイムスタンプを追加しています。
その他のコンポーネントについても、同様にmapを使用して各行を処理しています。
ここで注目するポイントしては、このフローにはdeferred=trueはそれぞれのDataWeave式でつけられていますが、その1つ前のコンポーネントからは、streaming=trueというオプションが渡されてこないといった点です。
%dw 2.0
import * from dw::util::Values
@StreamCapable
input payload csv header=false
output application/csv deferred=true,header=false
---
payload map ((r, idx) -> do {
r
update "column_0" with r.column_1
update "column_1" with r.column_0
update "column_71" with now()
})
それではこのフローを実行させた状態をみてみましょう。
尚、フローの呼び出しは、Flow Referenceを利用して以下のフローから呼び出してFile:Readで読み込んだCSVデータを流している形となります。
また、File::Readには、Output MIMEにstreaming=trueはつけていない状態です。
フロー実行 (Transform x 5のパターン)
それではフローを実行して、/tmpをみてみると、ずらっと、indexファイルとinputファイルができていることがわかります。
inputは合計6つ、indexは合計12個存在しています。5つのTransformコンポーネントとFile::Writeの際に出力されていると推測されます。
尚、deferred=trueオプションが聞いて、outputはでていないということがわかります。
$ \ls /tmp/dw* -l
-rw-r--r-- 1 myst myst 275670848 Jun 17 13:52 /tmp/dw-buffer-index-14.tmp
-rw-r--r-- 1 myst myst 275661760 Jun 17 13:52 /tmp/dw-buffer-index-16.tmp
-rw-r--r-- 1 myst myst 275639040 Jun 17 13:52 /tmp/dw-buffer-index-18.tmp
-rw-r--r-- 1 myst myst 275629952 Jun 17 13:52 /tmp/dw-buffer-index-20.tmp
-rw-r--r-- 1 myst myst 275608368 Jun 17 13:52 /tmp/dw-buffer-index-22.tmp
-rw-r--r-- 1 myst myst 275581104 Jun 17 13:52 /tmp/dw-buffer-index-24.tmp
-rw-r--r-- 1 myst myst 1572864 Jun 17 13:51 /tmp/dw-buffer-index-25.tmp
-rw-r--r-- 1 myst myst 1572864 Jun 17 13:51 /tmp/dw-buffer-index-26.tmp
-rw-r--r-- 1 myst myst 1572864 Jun 17 13:51 /tmp/dw-buffer-index-27.tmp
-rw-r--r-- 1 myst myst 1572864 Jun 17 13:51 /tmp/dw-buffer-index-28.tmp
-rw-r--r-- 1 myst myst 1572864 Jun 17 13:51 /tmp/dw-buffer-index-29.tmp
-rw-r--r-- 1 myst myst 1572864 Jun 17 13:51 /tmp/dw-buffer-index-30.tmp
-rw-r--r-- 1 myst myst 184025088 Jun 17 13:52 /tmp/dw-buffer-input-13.tmp
-rw-r--r-- 1 myst myst 183181312 Jun 17 13:52 /tmp/dw-buffer-input-15.tmp
-rw-r--r-- 1 myst myst 183164928 Jun 17 13:52 /tmp/dw-buffer-input-17.tmp
-rw-r--r-- 1 myst myst 184614912 Jun 17 13:52 /tmp/dw-buffer-input-19.tmp
-rw-r--r-- 1 myst myst 185327616 Jun 17 13:52 /tmp/dw-buffer-input-21.tmp
-rw-r--r-- 1 myst myst 192102400 Jun 17 13:52 /tmp/dw-buffer-input-23.tmp
それでは、前回学んだとおり、これらのinput、indexの出力を抑制するためには1つ前のコンポーネントにstreaming=trueをOutput MIMEに指定すればよいということで設定をしてみます。
1つ目のTransform Messageコンポーネント用には、File::ReadのOutput MIMEへ設定します。
2つ目以降には、DataWeaveのOutput MIMEへstreaming=trueを設定してみます。
これで、index、inputの出力を抑制して無駄なI/Oがなくなりめでたしめでたし。
となるはずが、しかし
いざHTTPでリクエストをなげると以下のようなエラーが!
HTTP/1.1 500 Server Error
Connection: close
Content-Length: 747
Content-Type: text/plain; charset=UTF-8
Date: Sat, 17 Jun 2023 05:12:31 GMT
"Option `streaming` is not valid. Valid options are: `separator`, `encoding`, `quote`, `escape`, `lineSeparator`, `bufferSize`, `bodyStartLineNumber`, `ignoreEmptyLine`, `header`, `quoteHeader`, `headerLineNumber`, `quoteValues`, `deferred`
5| output application/csv deferred=true,header=false,streaming=true
^^^^^^^^^^^^^^
Trace:
at anonymous::main (line: 5, column: 51)" evaluating expression: "%dw 2.0
import * from dw::util::Values
@StreamCapable
input payload csv header=false
output application/csv deferred=true,header=false,streaming=true
---
payload map ((r, idx) -> do {
r
update "column_0" with r.column_1
update "column_1" with r.column_0
update "column_70" with now()
})".
DataWeaveのOutput MIMEではstreaming=trueが使えない模様。
まぁ、そもそもTransformを連続で並べるのがいけないのはわかってますけど、、、とも思いつつマニュアルをみてみると、以下のような記述がありました。
HTTP Listener 操作、HTTP Request 操作、On New or Updated File 操作、Set Payload コンポーネントなど、データを生成する任意のコネクタ操作または Mule コンポーネントでプロパティを設定できます。
なるほど、Set Payloadではstreamingオプションを付与できるのかということで早速試してみましょう。
フロー実行 (Transform x 5 + Set Payload パターン)
かなり不格好ではありますが、以下のようにSet Payloadを1つ前において、Output MIMEにstreaming=trueをつけてみました。
それでは実行してみます。
$ ls /tmp/dw* -l
ls: cannot access '/tmp/dw*': No such file or directory
ちゃんと出力が抑制されていることが確認されました。
簡易ベンチマーク
実際この2つのフローでどれくらい時間の差があったのか計測してみました。ファイルは755MB (100万レコード)のCSVでローカルファイルからのリード・ライトです。
環境
- CPU: Ryzen 9 6900HX 4.7GHz 8C16T
- Memory: 32GB
- Storage: SSD
- OS: Linux kernel version 6.1.0
- File System: Btrfs
- Java: OpenJDK 8
(Mule Runtimeの実行においては、CPUはtasksetにより1つに制限し、JVMが使用するHeapについても1024MBに上限を設定)
結果
フロー | 時間 | 備考 |
---|---|---|
Transform x 5 | 7分54秒 | 3回計測した平均値 |
Transform x 5 (Set Payload付き) | 6分27秒 | 3回計測した平均値 |
今ファイル出力の抑制効果で、20%ほど高速化しているようです。また必要となるディスクについても、index、inputでは元のファイルサイズ以上が出力されるため、必要となるディスク容量の削減も期待できます。
まとめ
本稿では、streamingオプションの挙動を観察するために、Transformを5つ並べた場合での挙動をみてきました。ランダムアクセスが必要でない簡易な変換の場合は、streamingオプションやdeferredオプションを付与してI/Oの抑制という観点でMule アプリケーションのチューニングを行うことができますので、ぜひみなさんもお試しください。
最後に1つ
今回はテンポラリファイルの抑制という観点での投稿となりましたためふれていませんが、チューニングという観点ではそもそもTransform Messageコンポーネントを集約して1つにするということのほうが絶大な効果がありますので、DataWeave処理系を扱う場合にはできるだけ集約するということをおすすめします。