はじめに
Google Drive(以下Drive)と Google Cloud Storage(以下GCS)を同期したいことがしばしばあると思ったので実装しました。
GCSはディレクトリという概念が存在しないため、ファイルパスが分かっていればコピーの並列化が可能です。
[Drive] [GCS]
root/ gs:root/
├ hoge.txt ├ hoge.txt
├ folderA/ ├ folderA/fuga.txt
│ └ fuga.txt ├ folderB/folderC/hogehoge.txt
├ folderB/ -----> └ piyo.txt
│ └ folderC/
│ └ hogehoge.txt/
└ piyo.txt
*Drive上のファイルパスがGCS上でのファイル名になるイメージ
なぜ Dataflow?
最初はGoogle App Engine(以下GAE)で並列コピー処理を書いていた。
しかし、並列コピーのタスクを分散させてしまうと、全てのコピーが終わったことを検知するのが難しい。
また、単純にGAEはバッチ処理をこなすのが苦手ということと、最近仕事でDataflowに触れているため。
Dataflowを利用すれば、分散した処理が終わるのを待つことができます。その後、Pub/SubやらCustomIOを書くなりして後続の処理に繋げれば良いと考えました。
要件
Driveのあるフォルダ(以下ルートフォルダ)直下の階層構造をGCSに並列コピーする。
Spreadsheet等、コピーできないファイルは対象外とする。
同フォルダ内の同ファイル名のファイルに関してはどうする?
実装の概要
ルートフォルダのIDから以下を探索し、ファイルIDとファイルパスを持つオブジェクトのリストを作成する。
作成されたオブジェクトを各タスクに分散し、「Driveからファイルをダウンロードし、GCSにアップロードする」部分を並列化する。
パイプライン設計
- 標準でDriveIOは存在するの?
- 存在しません。自前で実装しましょう!
- カスタムソースやシンクを書かないといけないの?
- 面倒くさいのでDoFnで実装!
- 最初に適当な入力を作り、リストを作るためのDoFnのInputとして受け取る
簡単なパイプラインコード
// *ポイント1: 最初に適当な値で入力を作っておくのがミソ
p.apply("First Input",Create.of("A")).setCoder(StringUtf8Coder.of())
.apply("Read Drive", ParDo.of(new ReadDriveDoFn(rootFolderId)))
.apply("Write GCS", ParDo.of(new WriteStorageDoFn()));
// *ポイント2: 全てのコピー処理が終わるまで待ちたいのでOutputの合計値を取っている
.apply("Combine!", Sum.integersGlobally()))
.apply("コピーが終わってるので、後続の処理をお好きにどうぞ!")
p.run();
- ReadDriveDoFn: ルートフォルダ直下のファイルリストを作成
- ルートフォルダIDから再帰的に以下を探索し、ファイルIDとそのパスをオブジェクトとしてリストを作成する
- リストをループしてOutputすることで分散する
public class ReadDriveDoFn extends DoFn<String, File> {
private List<File> file;
@ProcessElement
public void processElement(ProcessContext c) {
recursiveSearch(rootFolderID, filePath); // リストを作成
for (File file : fileList) {
c.output(file); // リストを分散!
}
}
}
- WriteStorageDoFn: ファイルをDriveからダウンロードし、GCSにアップロードする
- Inputとして受け取ったオブジェクトからファイルIDを取得し、ダウンロード。その後、ファイルパスを元にGCSに保存
- ここでのOutputは適当に「1」とする(なんでも良いが、コピーしたファイル数を把握できる)
public class WriteStorageDoFn extends DoFn<File, Integer> {
@ProcessElement
public void processElement(ProcessContext c) {
downloadFromDrive(fileId);
uploadToGCS(filePath);
c.output(1);
}
}
- Sum.integersGlobally: Outputの要素数を足す > ここではコピーしたファイル数を表す
- 全てのOutput要素を足すDoFnを挟むことで、ファイルコピーの終了を待つことができ、コピーしたファイル数も把握することができる
さいごに
最初にGAE/Goで書いていた処理と比べると2倍以上早くはなりました。
しかし、G Suite(Apps)系のAPIってめちゃくちゃ脆いんですよね。
今回コピーを分散させることが可能になったわけですが、大量のファイル数を捌こうとすると、かなりエラーがでます。
リトライ処理はちゃんと書きましょう。
Dataflowはまだ細かいことが苦手ではありますが、無限の可能性を秘めていると思うので、今後も色々な用途で使っていきたいと思います。