node.jsでgcsからファイルを読み込む
gcsからファイルを読み込む方法を探すと、よくdownload()を使用する例が紹介されています。
const storage = new Storage();
const bucket = storage.bucket('test-20180903');
const file = bucket.file('sample');
file.download().then(function(data) {
  res.status(200).json({'result': data.toString('utf-8')});
});
download()だと、サイズの大きいファイルを読み込むとメモリ不足に
cloud functionsだと2Gまでしかメモリ拡張できないので、gcs側にファイル配置する際に、ファイルサイズを小さく分割しながら.·゜゜·(/。\)·゜゜·.
@google-cloud/storageのソースを見ている
download()以外に、createReadStream()なるものが!
  const storage = new Storage();
  const bucket = storage.bucket('my-bucket');
  
  const fs = require('fs');
  const remoteFile = bucket.file('image.png');
  const localFilename = '/Users/stephen/Photos/image.png';
  
  remoteFile.createReadStream()
   .on('error', function(err) {})
   .on('response', function(response) {
     // Server connected and responded with the specified status and headers.
   })
   .on('end', function() {
     // The file is fully downloaded.
   })
   .pipe(fs.createWriteStream(localFilename));
createReadStream()でらファイルを読み込む
gcsからファイルを読み込む方法を探すと、よくdownload()を使用する例が紹介されています。
const storage = new Storage();
const bucket = storage.bucket('test-20180903');
const file = bucket.file('sample');
file.download().then(function(data) {
  res.status(200).json({'result': data.toString('utf-8')});
});
download()だと、サイズの大きいファイルを読み込むとメモリ不足に
cloud functionsだと2Gまでしかメモリ拡張できないので、gcs側にファイル配置する際に、ファイルサイズを小さく分割しながら.·゜゜·(/。\)·゜゜·.
@google-cloud/storageのソースを見ている
download()以外に、createReadStream()なるものが!
  const storage = new Storage();
  const bucket = storage.bucket('my-bucket');
  
  const fs = require('fs');
  const remoteFile = bucket.file('image.png');
  const localFilename = '/Users/stephen/Photos/image.png';
  
  remoteFile.createReadStream()
   .on('error', function(err) {})
   .on('response', function(response) {
     // Server connected and responded with the specified status and headers.
   })
   .on('end', function() {
     // The file is fully downloaded.
   })
   .pipe(fs.createWriteStream(localFilename));
createReadStream()で、ストリーム処理に
↑サンプルをもとに、mongodbへの登録処理を実装してみると、なぞのエラーが・・・
responseイベントは、ノンブロッキング(非同期)処理されるので、mongodbへのアクセスが多すぎたみたい
gcsからサイズの大きいjsonlファイルをmongodbに登録する
createReadStream()で作成したストリームを、ブロッキング(同期)処理したら大丈夫でした。ダメでした・・・
大量データ処理すると、なぞのエラーが。何がダメなんだろう?
exports.execute = async (event, context) => {
    const client = await mongo.connect(process.env.MONGODB_URL, { useNewUrlParser: true, useUnifiedTopology: true })
    let rs = null
    try {
        const db = client.db(process.env.MONGODB_DATABASE)
        rs = await storage.bucket(bucket).file(pubsubMessage.name).createReadStream();
        for await (const line of readLines(rs)) {
            const json = JSON.parse(line)
            json.lastModified = new Date()
            // 更新日時があたらしかった場合更新する 
            const result = await db.collection(collection).replaceOne({ _id: json._id, updateDttm: { $lte: json.updateDttm } }, json, { upsert: false })
            if (result.matchedCount == 0) {
                // 未登録の場合があるので登録してみる。 
                try {
                    await db.collection(collection).insertOne(json)
                } catch (err) {
                    if (err.message.indexOf("E11000") < 0) {
                        throw err
                    }
                }
            }
        }
    } catch (err) {
        throw err
    } finally {
        if (client) {
            client.close()
        }
        if (rs) {
            rs.destroy()
        }
    }
    function readLines(rs) {
        const output = new stream.PassThrough({ objectMode: true });
        const rl = readline.createInterface(rs, {});
        rl.on("line", line => {
            output.write(line);
        });
        rl.on("close", () => {
            output.push(null);
        });
        return output;
    }
}
for await...of 文を使うことで、ブロッキング(同期処理)にできました!
node v12からは、標準のreadline.createInterface()が、async iterable を返すようになるようなので、自前のreadLines()もいらなくなるようです。スッキリ書けますね。
うまくいかね~
結局どうしたか
gcsから、少しずつローカルにファイルダウンロードして、1行ずつ処理しました!!
cloud functionsは、/tmpは自由に使えます
        # ファイルとしてローカルにダウンロードする。(2MBずつ)
        local_file = '/tmp/' + filename
        bucket \
            .blob(file_path, chunk_size=262144) \
            .download_to_filename(local_file)
        # ダウンロードしたファイルに対し・・・
        with open(local_file, encoding='utf-8') as f:
            for line in f.readlines():
