Help us understand the problem. What is going on with this article?

GoogleDriveからGoogleCloudStorageに階層構造をコピーするのをDataflowで実装した

More than 1 year has passed since last update.

はじめに

Google Drive(以下Drive)と Google Cloud Storage(以下GCS)を同期したいことがしばしばあると思ったので実装しました。
GCSはディレクトリという概念が存在しないため、ファイルパスが分かっていればコピーの並列化が可能です。

[Drive]                            [GCS]

root/                              gs:root/
 ├ hoge.txt                         ├ hoge.txt
 ├ folderA/                         ├ folderA/fuga.txt
 │  └ fuga.txt                      ├ folderB/folderC/hogehoge.txt 
 ├ folderB/               ----->    └ piyo.txt
 │  └ folderC/                      
 │   └ hogehoge.txt/             
 └ piyo.txt

*Drive上のファイルパスがGCS上でのファイル名になるイメージ                         

なぜ Dataflow?

最初はGoogle App Engine(以下GAE)で並列コピー処理を書いていた。
しかし、並列コピーのタスクを分散させてしまうと、全てのコピーが終わったことを検知するのが難しい。
また、単純にGAEはバッチ処理をこなすのが苦手ということと、最近仕事でDataflowに触れているため。
Dataflowを利用すれば、分散した処理が終わるのを待つことができます。その後、Pub/SubやらCustomIOを書くなりして後続の処理に繋げれば良いと考えました。

要件

Driveのあるフォルダ(以下ルートフォルダ)直下の階層構造をGCSに並列コピーする。
Spreadsheet等、コピーできないファイルは対象外とする。
同フォルダ内の同ファイル名のファイルに関してはどうする?

実装の概要

ルートフォルダのIDから以下を探索し、ファイルIDとファイルパスを持つオブジェクトのリストを作成する。
作成されたオブジェクトを各タスクに分散し、「Driveからファイルをダウンロードし、GCSにアップロードする」部分を並列化する。

パイプライン設計

  • 標準でDriveIOは存在するの?
    • 存在しません。自前で実装しましょう!
  • カスタムソースやシンクを書かないといけないの?
    • 面倒くさいのでDoFnで実装!
    • 最初に適当な入力を作り、リストを作るためのDoFnのInputとして受け取る

簡単なパイプラインコード

// *ポイント1: 最初に適当な値で入力を作っておくのがミソ
p.apply("First Input",Create.of("A")).setCoder(StringUtf8Coder.of())

 .apply("Read Drive", ParDo.of(new ReadDriveDoFn(rootFolderId)))
 .apply("Write GCS", ParDo.of(new WriteStorageDoFn()));

 // *ポイント2: 全てのコピー処理が終わるまで待ちたいのでOutputの合計値を取っている
 .apply("Combine!", Sum.integersGlobally()))

 .apply("コピーが終わってるので、後続の処理をお好きにどうぞ!")

p.run();
  • ReadDriveDoFn: ルートフォルダ直下のファイルリストを作成
    • ルートフォルダIDから再帰的に以下を探索し、ファイルIDとそのパスをオブジェクトとしてリストを作成する
    • リストをループしてOutputすることで分散する
    public class ReadDriveDoFn extends DoFn<String, File> {

        private List<File> file;

        @ProcessElement
        public void processElement(ProcessContext c) {
            recursiveSearch(rootFolderID, filePath); // リストを作成
            for (File file : fileList) {
                c.output(file); // リストを分散!
            }
        }
    }
  • WriteStorageDoFn: ファイルをDriveからダウンロードし、GCSにアップロードする
    • Inputとして受け取ったオブジェクトからファイルIDを取得し、ダウンロード。その後、ファイルパスを元にGCSに保存
    • ここでのOutputは適当に「1」とする(なんでも良いが、コピーしたファイル数を把握できる)
    public class WriteStorageDoFn extends DoFn<File, Integer> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            downloadFromDrive(fileId);
            uploadToGCS(filePath);
            c.output(1);
        }
    }
  • Sum.integersGlobally: Outputの要素数を足す > ここではコピーしたファイル数を表す
    • 全てのOutput要素を足すDoFnを挟むことで、ファイルコピーの終了を待つことができ、コピーしたファイル数も把握することができる

さいごに

最初にGAE/Goで書いていた処理と比べると2倍以上早くはなりました。
しかし、G Suite(Apps)系のAPIってめちゃくちゃ脆いんですよね。
今回コピーを分散させることが可能になったわけですが、大量のファイル数を捌こうとすると、かなりエラーがでます。
リトライ処理はちゃんと書きましょう。
Dataflowはまだ細かいことが苦手ではありますが、無限の可能性を秘めていると思うので、今後も色々な用途で使っていきたいと思います。

mako0715
GCPがだ〜いすき!
topgate
Google技術を中心に取り扱う技術者集団
https://www.topgate.co.jp/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away