はじめに
Azure Data Factory は、データの抽出、変換、読み込みを GUI で実装可能なオーケストレーション サービスです。今回は、まだプレビューですが、Azure Storage アカウントの SFTP 機能でアップロードしたファイルをきっかけに動作するパイプラインを作成してみたいと思います。「イヤイヤ、それではちょっと」とか「もっとこうすればいいよ」などのお気づきの点があればコメントで教えてください。
全体像
社内外を問わず、システム間の連携では FTP が利用されているケースが多くあると思います。今回はそのようなシナリオに準拠した想定で、SFTP 機能が有効化された Azure Storage アカウントに、SFTP クライアントで、ファイルが転送されたことをきっかけとして動作するパイプライントリガー(と後続の処理のアイデア)を実装してみます。
いいね!と思うポイント
送信される側から転送の完了を検知することができるので便利 (ファイル転送のあるあるなお悩みポイントが解消される)
おしい!と思うポイント
利用する機能が投稿時点でパブリック プレビューであること
Update on 2023/1/5: Storage アカウントの SFTP 機能は一般提供開始となりました。
Data Factory でパイプラインを発行するたびに、ストレージ トリガーの設定は引き続き初期化されます。
事前準備
実際に試すためには以下のものが必要になります。
- Azure サブスクリプション
- Azure Data Factory リソース(V2を使用します。作成時の Git 統合は不要です。こちら の手順が参考になります)
- Azure Storage アカウント (「階層型名前空間を有効にする」、「SFTP を有効にする」にチェックを入れて作成します。作成は こちら の手順が参考になります)
- 今回はデータを移動させたり、取り込んだりしないのでその他のリソースは不要です。
作ってみる
それでは作ってみましょう。手順書を目指しているわけではないので、完全なステップごとのスクリーンショットはありません。ご了承ください。また、RBAC、IAM に関しても商用レベルを想定していません。実際の利用にあたっては、十分ご検討いただき、皆さんのシステム要件に合った設定を採用してください。
SFTP のユーザー構成
SFTP の機能を有効化しただけで、SFTP は利用可能とはなりません。
- ストレージ アカウントにコンテナーを作成します
- SFTP ユーザーを作成します
今回は、簡単に試すことが目的なので、証明書は使用せずにパスワードでSSH接続するように構成します。
ホーム ディレクトリはアクセス許可を設定したコンテナー以下の任意のディレクトリを指定することに注意してください。(/ とだけ書いてしまうと、ADLS のルートの指定と認識されるので、/{コンテナー名}/{ディレクトリ名}/ での指定が必要です。
ユーザーの作成が終わると、パスワードが生成されて通知されます。確実にコピーを取って控えておいてください。(もし紛失した場合は再生成が可能です)
パイプラインを作成する
今回は、パイプラインはどのようなものでも差し支えありません。このため、待機だけの非常にシンプルな構成にしてみました。
パラメーターを2つ追加していますが、これは後で動作確認のタイミングで使用します。
ストレージ イベント トリガーを作成する
あまり深く考えずにストレージ、コンテナーを選択しただけの状態にします。実際のシステムでご利用いただく場合は、どのパスに対してのファイル作成なのかを厳密に設計して決めておくことが重要です。
本記事投稿時点においては、SFTP でファイルをアップロードした場合のイベントをトリガー作成時に指定できないので、トリガーを後で変更する必要があります。このため、この時点では適当に設定してしまって問題ありません。
トリガーを作成し終えたら、「すべて発行」を選択して、パイプラインとトリガーを有効化します。
発行に失敗した場合
典型的な失敗のパターンは、EventGrid のリソース プロバイダーが登録済みになっていない事です。
サブスクリプションのリソース プロバイダーの設定を確認してください。
緑色の Registered になっていない場合は対象行を選択し、「登録」をクリックすることで登録できます。(登録する権限がない場合は、サブスクリプションの管理者に問い合わせてください)
問題が解消したあとは、対象のトリガーを開始状態にして再度「公開」します。
ストレージ アカウントのイベント設定変更
Data Factory で公開に成功すると、指定したストレージ アカウントに対してイベント サブスクリプションが作成されます。ストレージ側で起こった変更のイベントが登録され、条件に一致したものが Data Factory のトリガーで検知されます。
GUIDが名前になっているイベント サブスクリプションを選択し、フィルターのタブを選択します。
ストレージ イベント トリガーの作成時に指定した内容で、api 名とファイルサイズがフィルター条件になっていることが確認できます。
SFTP を使用してファイルをアップロードした場合は、data.api は SFTP 固有のものとなります。どのようなapi が利用されるかは、以下のドキュメントにスキーマが纏まっています。ドキュメントとにらめっこして設定します。
Event Grid ソースとしての Azure Blob Storage
data.api キーは文字列 SftpCreate または SftpCommit に設定されます。
今回は SftpCommit のみを利用してみます。かなりすっきりしました。変更した後は忘れず保存します。
動作確認
それでは、意図した通りに動作するか確認してみましょう。トリガーが発火したかどうかを見ていきます。
対象のストレージアカウントに SFTP クライアントでファイルを配置します。本記事の投稿時点では、対応している SFTP クライアントのリストにあるものをご利用いただく方が無難だと思います。
SFTP でのファイル転送が終了してイベントが発生したことは、イベント サブスクリプションのメトリックで確認できます。
ファイルを1つだけ転送したので、フィルター条件に合ったイベントの数と配信されたイベントの数がそれぞれ1になっています。
また、Data Factory のモニター画面でトリガーが実行されたことも確認できます。SFTP でファイルが転送された場合に対象ファイルを処理できる状況が見えてきました。
パイプラインの中でファイルはどのように取り扱う?
ここからはオプションです。最初に変数を作成して動作状況を確認していきます。
Data Factory のトリガーは起動時の情報を保持していますので、システム変数を利用して取り出してみます。
トリガーのシステム変数はトリガーの設定時のみ利用可能であるため、トリガーの設定を変更してパラメーターにファイル名、フォルダパスを引き渡すようにします。
この後、トリガーを再び発行するのですが、このタイミングでストレージのイベント サブスクリプションが上書きされてしまいます。
このため、上記で変更したdata.apiのフィルター条件を再度変更する必要があります。
トリガーの設定変更後、改めてファイルを SFTP で送信し、パイプラインの実行結果を確認します。
パイプラインの実行時に引き渡されたパラメーターを確認すると、ファイル名、フォルダパスが値として設定されていることが確認できます。
まとめ
今回は Azure Storage アカウントを SFTP サーバーとして利用してファイル転送を受けた場合に起動するパイプラインを作成してみました。
Storage アカウントの SFTP 機能は、本記事投稿時点でプレビューの状態です。また、 Data Factory 側からはイベント サブスクリプションが上書きされてしまいます。
SFTP 機能が一般提供になり、 Data Factory 側の GUI で指定可能となるのが待ち遠しいところです。