Help us understand the problem. What is going on with this article?

node.jsで、gcsにあるサイズの大きいjsonlファイルを、mongodbに登録する(メモリふっとばさずに)

node.jsでgcsからファイルを読み込む

gcsからファイルを読み込む方法を探すと、よくdownload()を使用する例が紹介されています。

const storage = new Storage();
const bucket = storage.bucket('test-20180903');
const file = bucket.file('sample');
file.download().then(function(data) {
  res.status(200).json({'result': data.toString('utf-8')});
});

download()だと、サイズの大きいファイルを読み込むとメモリ不足
cloud functionsだと2Gまでしかメモリ拡張できないので、gcs側にファイル配置する際に、ファイルサイズを小さく分割しながら.·゜゜·(/。\)·゜゜·.

@google-cloud/storageのソースを見ている

download()以外に、createReadStream()なるものが!

file.ts
  const storage = new Storage();
  const bucket = storage.bucket('my-bucket');

  const fs = require('fs');
  const remoteFile = bucket.file('image.png');
  const localFilename = '/Users/stephen/Photos/image.png';

  remoteFile.createReadStream()
   .on('error', function(err) {})
   .on('response', function(response) {
     // Server connected and responded with the specified status and headers.
   })
   .on('end', function() {
     // The file is fully downloaded.
   })
   .pipe(fs.createWriteStream(localFilename));

createReadStream()でらファイルを読み込む

gcsからファイルを読み込む方法を探すと、よくdownload()を使用する例が紹介されています。

const storage = new Storage();
const bucket = storage.bucket('test-20180903');
const file = bucket.file('sample');
file.download().then(function(data) {
  res.status(200).json({'result': data.toString('utf-8')});
});

download()だと、サイズの大きいファイルを読み込むとメモリ不足
cloud functionsだと2Gまでしかメモリ拡張できないので、gcs側にファイル配置する際に、ファイルサイズを小さく分割しながら.·゜゜·(/。\)·゜゜·.

@google-cloud/storageのソースを見ている

download()以外に、createReadStream()なるものが!

file.ts
  const storage = new Storage();
  const bucket = storage.bucket('my-bucket');

  const fs = require('fs');
  const remoteFile = bucket.file('image.png');
  const localFilename = '/Users/stephen/Photos/image.png';

  remoteFile.createReadStream()
   .on('error', function(err) {})
   .on('response', function(response) {
     // Server connected and responded with the specified status and headers.
   })
   .on('end', function() {
     // The file is fully downloaded.
   })
   .pipe(fs.createWriteStream(localFilename));

createReadStream()で、ストリーム処理に

↑サンプルをもとに、mongodbへの登録処理を実装してみると、なぞのエラーが・・・
responseイベントは、ノンブロッキング(非同期)処理されるので、mongodbへのアクセスが多すぎたみたい

gcsからサイズの大きいjsonlファイルをmongodbに登録する

createReadStream()で作成したストリームを、ブロッキング(同期)処理したら大丈夫でした。

exports.execute = async (event, context) => {
    const client = await mongo.connect(process.env.MONGODB_URL, { useNewUrlParser: true, useUnifiedTopology: true })

    let rs = null
    try {
        const db = client.db(process.env.MONGODB_DATABASE)
        rs = await storage.bucket(bucket).file(pubsubMessage.name).createReadStream();
        for await (const line of readLines(rs)) {
            const json = JSON.parse(line)
            json.lastModified = new Date()
            // 更新日時があたらしかった場合更新する 
            const result = await db.collection(collection).replaceOne({ _id: json._id, updateDttm: { $lte: json.updateDttm } }, json, { upsert: false })
            if (result.matchedCount == 0) {
                // 未登録の場合があるので登録してみる。 
                try {
                    await db.collection(collection).insertOne(json)
                } catch (err) {
                    if (err.message.indexOf("E11000") < 0) {
                        throw err
                    }
                }
            }
        }
    } catch (err) {
        throw err
    } finally {
        if (client) {
            client.close()
        }
        if (rs) {
            rs.destroy()
        }
    }

    function readLines(rs) {
        const output = new stream.PassThrough({ objectMode: true });
        const rl = readline.createInterface(rs, {});
        rl.on("line", line => {
            output.write(line);
        });
        rl.on("close", () => {
            output.push(null);
        });
        return output;
    }
}

for await...of 文を使うことで、ブロッキング(同期処理)にできました!
node v12からは、標準のreadline.createInterface()が、async iterable を返すようになるようなので、自前のreadLines()もいらなくなるようです。スッキリ書けますね。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした