LoginSignup
23
17

More than 5 years have passed since last update.

GoogleDriveからGoogleCloudStorageに階層構造をコピーするのをDataflowで実装した

Last updated at Posted at 2018-02-16

はじめに

Google Drive(以下Drive)と Google Cloud Storage(以下GCS)を同期したいことがしばしばあると思ったので実装しました。
GCSはディレクトリという概念が存在しないため、ファイルパスが分かっていればコピーの並列化が可能です。

[Drive]                            [GCS]

root/                              gs:root/
 ├ hoge.txt                         ├ hoge.txt
 ├ folderA/                         ├ folderA/fuga.txt
 │  └ fuga.txt                      ├ folderB/folderC/hogehoge.txt 
 ├ folderB/               ----->    └ piyo.txt
 │  └ folderC/                      
 │   └ hogehoge.txt/             
 └ piyo.txt

*Drive上のファイルパスがGCS上でのファイル名になるイメージ                         

なぜ Dataflow?

最初はGoogle App Engine(以下GAE)で並列コピー処理を書いていた。
しかし、並列コピーのタスクを分散させてしまうと、全てのコピーが終わったことを検知するのが難しい。
また、単純にGAEはバッチ処理をこなすのが苦手ということと、最近仕事でDataflowに触れているため。
Dataflowを利用すれば、分散した処理が終わるのを待つことができます。その後、Pub/SubやらCustomIOを書くなりして後続の処理に繋げれば良いと考えました。

要件

Driveのあるフォルダ(以下ルートフォルダ)直下の階層構造をGCSに並列コピーする。
Spreadsheet等、コピーできないファイルは対象外とする。
同フォルダ内の同ファイル名のファイルに関してはどうする?

実装の概要

ルートフォルダのIDから以下を探索し、ファイルIDとファイルパスを持つオブジェクトのリストを作成する。
作成されたオブジェクトを各タスクに分散し、「Driveからファイルをダウンロードし、GCSにアップロードする」部分を並列化する。

パイプライン設計

  • 標準でDriveIOは存在するの?
    • 存在しません。自前で実装しましょう!
  • カスタムソースやシンクを書かないといけないの?
    • 面倒くさいのでDoFnで実装!
    • 最初に適当な入力を作り、リストを作るためのDoFnのInputとして受け取る

簡単なパイプラインコード

// *ポイント1: 最初に適当な値で入力を作っておくのがミソ
p.apply("First Input",Create.of("A")).setCoder(StringUtf8Coder.of())

 .apply("Read Drive", ParDo.of(new ReadDriveDoFn(rootFolderId)))
 .apply("Write GCS", ParDo.of(new WriteStorageDoFn()));

 // *ポイント2: 全てのコピー処理が終わるまで待ちたいのでOutputの合計値を取っている
 .apply("Combine!", Sum.integersGlobally()))

 .apply("コピーが終わってるので、後続の処理をお好きにどうぞ!")

p.run();
  • ReadDriveDoFn: ルートフォルダ直下のファイルリストを作成
    • ルートフォルダIDから再帰的に以下を探索し、ファイルIDとそのパスをオブジェクトとしてリストを作成する
    • リストをループしてOutputすることで分散する
    public class ReadDriveDoFn extends DoFn<String, File> {

        private List<File> file;

        @ProcessElement
        public void processElement(ProcessContext c) {
            recursiveSearch(rootFolderID, filePath); // リストを作成
            for (File file : fileList) {
                c.output(file); // リストを分散!
            }
        }
    }
  • WriteStorageDoFn: ファイルをDriveからダウンロードし、GCSにアップロードする
    • Inputとして受け取ったオブジェクトからファイルIDを取得し、ダウンロード。その後、ファイルパスを元にGCSに保存
    • ここでのOutputは適当に「1」とする(なんでも良いが、コピーしたファイル数を把握できる)
    public class WriteStorageDoFn extends DoFn<File, Integer> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            downloadFromDrive(fileId);
            uploadToGCS(filePath);
            c.output(1);
        }
    }
  • Sum.integersGlobally: Outputの要素数を足す > ここではコピーしたファイル数を表す
    • 全てのOutput要素を足すDoFnを挟むことで、ファイルコピーの終了を待つことができ、コピーしたファイル数も把握することができる

さいごに

最初にGAE/Goで書いていた処理と比べると2倍以上早くはなりました。
しかし、G Suite(Apps)系のAPIってめちゃくちゃ脆いんですよね。
今回コピーを分散させることが可能になったわけですが、大量のファイル数を捌こうとすると、かなりエラーがでます。
リトライ処理はちゃんと書きましょう。
Dataflowはまだ細かいことが苦手ではありますが、無限の可能性を秘めていると思うので、今後も色々な用途で使っていきたいと思います。

23
17
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
23
17