個人的メモです
よく使うパイプライン式、変数など
パイプラインパラメータ/変数設定
パラメータ | 名称 | 値の例や説明 |
---|---|---|
source_system_type | ソースシステムの分類 | transactional / master_and_reference (統合マスター管理システムなど)/ log / telemetry |
source_system_name | ソースシステム名 | - |
entity_name | 対象エンティティ名 | スキーマ名_テーブルのように検討 |
version | ファイルのバージョン | version_{メジャーバージョン}.{マイナーバージョン}として、マイナーバージョンはメジャーバージョンデータに対して加工が必要となった場合に、加工結果を配置する。メジャーバージョン0.xを初期移行データとして保管する |
load_type | 連携種別 | delta (差分連携) / full (全件連携) |
partition_type | パーティション種別 | hive(※推奨値 列名=値の形式にフォルダ名が管理されるhiveパーティション形式で保存する場合に使用)/customer(それ以外のディレクトリ形式で保存する場合) |
audit__run_at | ジョブ実行タイムスタンプ | audit__run_at_id , audit__run_date の基となる |
変数 | 名称 | 値の例や説明 |
---|---|---|
audit__run_date | ジョブ実行日 | audit__run_atから導出。audit__run_date={yyyy-MM-dd} として、hiveパーティションが機能するように扱う |
audit__run_at_id | ジョブ実行タイムスタンプ(yyyyMMddhhmmss形式) | audit__run_atから導出。audit__run_at={yyyyMMddHHmmss}として、 hiveパーティションが機能するように扱う |
ファイルの配置先
@concat(
'landing/',
pipeline().parameters.source_system_type,'/',
pipeline().parameters.source_system_name,'/',
pipeline().parameters.entity_name,'/',
pipeline().parameters.version,'/',
pipeline().parameters.load_type,'/',
pipeline().parameters.partition_type,'/',
'audit__run_date=',variables('audit__run_date'),'/',
'audit__run_a_id=',variables('audit__run_at_id')
)
アクティビティの実行時間を日本時間、yyyyMMddhhmmss形式で記録
formatDateTime(convertFromUtc(utcnow(),'Tokyo Standard Time'),'yyyyMMddHHmmss')
audit__run_at 導出の場合
formatDateTime(convertFromUtc(pipeline().parameters.audit__run_at,'Tokyo Standard Time'),'yyyyMMddHHmmss')
アクティビティの実行日を日本時間で記録
formatDateTime(convertFromUtc(utcnow(),'Tokyo Standard Time'),'yyyy-MM-dd')
audit__run_at 導出の場合
formatDateTime(convertFromUtc(pipeline().parameters.audit__run_at,'Tokyo Standard Time'),'yyyy-MM-dd')
ファイル有無チェック
ファイル並び替え
アクティビティには今はないのでストアドで。
更新日時や、ファイル名取込で対応も