はじめに
ASTERIA Warp2312では「RecordGroup」でグループキーによるストリームの分割が可能になりましたが、グループ処理として、今回は同じ結果を出力する処理をサンプルとして、RecordGroupを利用した場合と利用しない場合での構築の違いを見てみたいと思います。
処理内容
下記CSVファイルを、日付毎に動物の種類(哺乳類、爬虫類、鳥類)の動物名に加え、その日の合計金額を計算します。
日付,動物の種類,動物名,金額
2024-01-01,哺乳類,ライオン,2000
2024-01-01,鳥類,ペンギン,800
2024-01-01,爬虫類,ワニ,1500
2024-01-02,哺乳類,ゾウ,3000
2024-01-02,鳥類,フラミンゴ,1200
2024-01-02,爬虫類,ヘビ,1000
2024-01-03,哺乳類,クマ,1800
2024-01-03,鳥類,ハゲワシ,1000
2024-01-03,爬虫類,カメ,800
2024-01-04,哺乳類,キリン,2500
2024-01-04,鳥類,オウム,900
2024-01-04,爬虫類,イグアナ,1200
2024-01-01,哺乳類,カンガルー,1700
2024-01-02,鳥類,カラス,600
2024-01-03,爬虫類,トカゲ,800
2024-01-04,哺乳類,パンダ,2500
2024-01-05,鳥類,フクロウ,800
2024-01-04,爬虫類,ワニ,1500
2024-01-03,哺乳類,チーター,2000
2024-01-04,鳥類,ペリカン,1100
2024-01-05,爬虫類,ヘビ,1000
2024-01-01,哺乳類,クマ,1800
2024-01-02,鳥類,ハヤブサ,1200
2024-01-04,爬虫類,カメレオン,1000
2024-01-05,哺乳類,サイ,2200
2024-01-01,鳥類,カモメ,700
2024-01-02,爬虫類,ワニ,1500
2024-01-03,哺乳類,カバ,2000
2024-01-02,鳥類,カザリカク,600
2024-01-05,爬虫類,イグアナ,1200
このデータを
日付,動物名_哺乳類,動物名_爬虫類,動物名_鳥類,金額
2024-01-01,ライオン・クマ・カンガルー,ワニ,ペンギン・カモメ,8500
2024-01-02,ゾウ,ワニ・ヘビ,フラミンゴ・ハヤブサ・カラス・カザリカク,9100
2024-01-03,チーター・クマ・カバ,トカゲ・カメ,ハゲワシ,8400
2024-01-04,パンダ・キリン,ワニ・カメレオン・イグアナ,ペリカン・オウム,10700
2024-01-05,サイ,ヘビ・イグアナ,フクロウ,5200
この様に加工します。
各日付毎に、動物の種類を結合し、その日のトータルの金額を集計するイメージです。
RecordGroupを利用する場合
処理フロー
【処理概要】
- ファイルの読み込み
- RecordGroupのループ(日付)
- 動物名によるソート、レコードのマッピング
- ファイルの出力
グループキーでのレコード圧縮となるため、グループ後にソートは行っていません。また、動物名の並び順がランダムとなるため、同一出力の比較をしやすいように並び順を動物名順で固定する様グループ分割後にソートをしています。
RecordGroupのループのマッパーではストリーム内の動物名を種類毎に結合、金額の合計を行います。
Mapper
Mapperではレイヤーを5つ指定しています。処理内容は前回縦横変換とほぼ同様です。
マッパー変数
マッパー変数 | データ型 | 初期値 |
---|---|---|
日付 | String | |
動物名_哺乳類 | String | |
動物名_爬虫類 | String | |
動物名_鳥類 | String | |
金額 | Decimal | 0 |
レイヤー
レイヤー名 | レイヤー条件 |
---|---|
合計 | なし |
哺乳類 | $record.動物の種類 = "哺乳類" |
爬虫類 | $record.動物の種類 = "爬虫類" |
鳥類 | $record.動物の種類 = "鳥類" |
移送 | $stream.RecordNo = $stream.RecordCount |
RecordGroupを使用しない場合
処理フロー
【処理概要】
- ファイルの読み込み
- 日付の昇順、動物名の降順でソート
- キー(日付)毎のレコード番号をレコードに登録
- 日付の昇順、レコード番号の降順でソート
- 日付レコードNoが1のレコードをマッピング
- ファイルの出力
ブレイク判定処理の場合、現在レコードが前レコードのグループキーと同一かを判断して、異なる場合はキーブレイク処理を行う、という処理になろうかと思います。このフローでは、グループ内の番号付けする事で、キーブレイクをした最初の行に1を設定します。並び順が降順、つまり最後に処理する行に対して1をつける事で、2回目のソートで日付とそのレコード順を降順でソートする事で、キーブレイクするレコード=1と判断を行えます。
Mapper1(レコード順登録)
前もって【日付(昇順)、動物名(降順)】でソートされている情報に、日付をグループキーと同一かを判定しグループ行カウントの加算か初期化を行います。
マッパー変数
マッパー変数 | データ型 | 初期値 |
---|---|---|
グループキー | String | |
グループ行カウント | Integer |
レイヤー
レイヤー名 | レイヤー条件 |
---|---|
ブレイク判定 | なし |
Mapper2(レコード圧縮)
RecordGroupのループ処理があらかじめキーで分かれていた事に対して、今回明確にブレイクする行が1となっていますので、移送判定が異なる以外はほぼ変わらない内容となっています。移送=最終行のキーブレイクですので、マッパー変数の初期化を行っています。
マッパー変数
マッパー変数 | データ型 | 初期値 |
---|---|---|
日付 | String | |
動物名_哺乳類 | String | |
動物名_爬虫類 | String | |
動物名_鳥類 | String | |
金額 | Decimal | 0 |
レイヤー
レイヤー名 | レイヤー条件 |
---|---|
合計 | なし |
哺乳類 | $record.動物の種類 = "哺乳類" |
爬虫類 | $record.動物の種類 = "爬虫類" |
鳥類 | $record.動物の種類 = "鳥類" |
移送 | $record.日付レコードNo = 1 |
1レコードづつループする場合
処理フロー
【処理概要】
- ファイルの読み込み
- 日付の昇順、動物名の昇順でソート
- ストリームを出力するストリームに合わせて加工
- 1レコードづつのループをし、ブレイクキー(日付)の判定を行う
- False:フロー変数にレコード値の格納
- True:フロー変数に格納した値をレコード化
- ファイルの出力(新規)
- 最終レコードをレコード化
- ファイルの出力(追加)
フロー変数
マッパー変数 | データ型 | 初期値 |
---|---|---|
ブレイクキー | String | |
動物名_哺乳類 | String | |
動物名_爬虫類 | String | |
動物名_鳥類 | String | |
金額 | Decimal | 0 |
1レコードづつループする場合の問題点は「生成レコードの出力タイミング」「最終レコードの出力」です。先読みが行えないため、キーブレイクが発生する場合、そのレコードはすで次のレコードです。そのためループでは前のレコードの情報を保持している必要があり、フロー変数が利用されます。
また、処理の構造上最終レコードの出力を出力ストリーム内にそのまま含める事ができません。それを行うためにはRecordSQLなど利用しストリームの合成を行う必要がありますが、この例はファイル出力ですので単純にファイル出力でレコード追加を行っています。
Mapper1(レコード成形)
最終出力レコードの形に成形し動物名を条件レイヤーによりそれぞれのカラムに格納しています。
レイヤー
レイヤー名 | レイヤー条件 |
---|---|
移送 | なし |
哺乳類 | $record.動物の種類 = "哺乳類" |
爬虫類 | $record.動物の種類 = "爬虫類" |
鳥類 | $record.動物の種類 = "鳥類" |
Mapper2(条件判定)
次コンポーネントのブレイク判定の条件を判定しています。BranchStartのプロパティー式で直接入力できますが条件のデバッグ時の視認性、条件指定方法の対応力の観点で、この様なMapperでの指条件指定を行っています。ここでは、ブレイクの判定と初回判定を行っています。
レイヤー
レイヤー名 | レイヤー条件 |
---|---|
判定 | なし |
Mapper3(ブレイクでない場合の移送)
キーブレイクしない=日付が同一の場合のマッピングで、ストリームには移送合成、合計を行っていフロー変数に移送していますが、レコードにはマッピングしていません。そのためこのマッパーからのストリームは空で出力されます。
レイヤー
レイヤー名 | レイヤー条件 |
---|---|
移送 | なし |
Mapper4(ブレイクの場合の移送)
フロー変数に格納されている前レコードまでの情報をこのマッパーのストリームとしてレコード出力しています。変わりに現在のレコード情報をフロー変数に格納しています。
レイヤー
レイヤー名 | レイヤー条件 |
---|---|
移送 | なし |
Mapper5(最終レコードの移送)
最終レコードキーブレイクしても、しなくてもフロー変数に格納されたままとなっています。そのフロー変数の内容を新たに1行のストリームとして生成します。カラムは直前のものと同一ですが、直前のストリームがヘッダありなのに対して、このストリームはファイルへのレコード追加ですのでヘッダをなしで指定します。
レイヤー
レイヤー名 | レイヤー条件 |
---|---|
移送 | なし |
最後に
今回の3つの同一処理を別の構築方法で構築しましたが、それぞれが異なったロジックで構築されています。また、移送・保持するデータ量やメモリ使用などにも影響があるかと思いますが、構築の参考にはなるかと思います。
ASTEIRA Warpでの構築では、通常のプログラミング言語と同様に3つめのケースの様なキーブレイク処理を構築する事ももちろんできますが、ASTERIA Warpの独自の仕組みを使う事で単純化、簡略化が可能となります。Mapperやレイヤーの特性、Mapper変数を詳しく知る事で、平面的ではなく多層的なレイヤーを利用して立体的なデータ移送を行へ、1フローあたりの利用コンポーネントの減少と処理の高速化が見込めます。
新しいRecordGroupも「あるキーによりストリームを分割できる」という特性は、思いもよらぬなケースでの利用シーンが期待できるコンポーネントではないかと考えます。
蛇足
今回のCSVデータはChatGPTで生成しましたが、データ内にある「カザリカク」という鳥は存在しない様です。