Edited at

GoogleDriveからGoogleCloudStorageに階層構造をコピーするのをDataflowで実装した

More than 1 year has passed since last update.


はじめに

Google Drive(以下Drive)と Google Cloud Storage(以下GCS)を同期したいことがしばしばあると思ったので実装しました。

GCSはディレクトリという概念が存在しないため、ファイルパスが分かっていればコピーの並列化が可能です。

[Drive]                            [GCS]

root/ gs:root/
 ├ hoge.txt ├ hoge.txt
 ├ folderA/ ├ folderA/fuga.txt
 │ └ fuga.txt ├ folderB/folderC/hogehoge.txt
 ├ folderB/ -----> └ piyo.txt
 │ └ folderC/
 │   └ hogehoge.txt/
 └ piyo.txt

*Drive上のファイルパスがGCS上でのファイル名になるイメージ


なぜ Dataflow?

最初はGoogle App Engine(以下GAE)で並列コピー処理を書いていた。

しかし、並列コピーのタスクを分散させてしまうと、全てのコピーが終わったことを検知するのが難しい。

また、単純にGAEはバッチ処理をこなすのが苦手ということと、最近仕事でDataflowに触れているため。

Dataflowを利用すれば、分散した処理が終わるのを待つことができます。その後、Pub/SubやらCustomIOを書くなりして後続の処理に繋げれば良いと考えました。


要件

Driveのあるフォルダ(以下ルートフォルダ)直下の階層構造をGCSに並列コピーする。

Spreadsheet等、コピーできないファイルは対象外とする。

同フォルダ内の同ファイル名のファイルに関してはどうする?


実装の概要

ルートフォルダのIDから以下を探索し、ファイルIDとファイルパスを持つオブジェクトのリストを作成する。

作成されたオブジェクトを各タスクに分散し、「Driveからファイルをダウンロードし、GCSにアップロードする」部分を並列化する。


パイプライン設計


  • 標準でDriveIOは存在するの?


    • 存在しません。自前で実装しましょう!



  • カスタムソースやシンクを書かないといけないの?


    • 面倒くさいのでDoFnで実装!

    • 最初に適当な入力を作り、リストを作るためのDoFnのInputとして受け取る




簡単なパイプラインコード

// *ポイント1: 最初に適当な値で入力を作っておくのがミソ

p.apply("First Input",Create.of("A")).setCoder(StringUtf8Coder.of())

.apply("Read Drive", ParDo.of(new ReadDriveDoFn(rootFolderId)))
.apply("Write GCS", ParDo.of(new WriteStorageDoFn()));

// *ポイント2: 全てのコピー処理が終わるまで待ちたいのでOutputの合計値を取っている
.apply("Combine!", Sum.integersGlobally()))

.apply("コピーが終わってるので、後続の処理をお好きにどうぞ!")

p.run();


  • ReadDriveDoFn: ルートフォルダ直下のファイルリストを作成


    • ルートフォルダIDから再帰的に以下を探索し、ファイルIDとそのパスをオブジェクトとしてリストを作成する

    • リストをループしてOutputすることで分散する



    public class ReadDriveDoFn extends DoFn<String, File> {

private List<File> file;

@ProcessElement
public void processElement(ProcessContext c) {
recursiveSearch(rootFolderID, filePath); // リストを作成
for (File file : fileList) {
c.output(file); // リストを分散!
}
}
}


  • WriteStorageDoFn: ファイルをDriveからダウンロードし、GCSにアップロードする


    • Inputとして受け取ったオブジェクトからファイルIDを取得し、ダウンロード。その後、ファイルパスを元にGCSに保存

    • ここでのOutputは適当に「1」とする(なんでも良いが、コピーしたファイル数を把握できる)



    public class WriteStorageDoFn extends DoFn<File, Integer> {

@ProcessElement
public void processElement(ProcessContext c) {
downloadFromDrive(fileId);
uploadToGCS(filePath);
c.output(1);
}
}


  • Sum.integersGlobally: Outputの要素数を足す > ここではコピーしたファイル数を表す


    • 全てのOutput要素を足すDoFnを挟むことで、ファイルコピーの終了を待つことができ、コピーしたファイル数も把握することができる




さいごに

最初にGAE/Goで書いていた処理と比べると2倍以上早くはなりました。

しかし、G Suite(Apps)系のAPIってめちゃくちゃ脆いんですよね。

今回コピーを分散させることが可能になったわけですが、大量のファイル数を捌こうとすると、かなりエラーがでます。

リトライ処理はちゃんと書きましょう。

Dataflowはまだ細かいことが苦手ではありますが、無限の可能性を秘めていると思うので、今後も色々な用途で使っていきたいと思います。