0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

MongoDBAdvent Calendar 2019

Day 22

node.jsで(はうまくいかなかったのでpythonで)、gcsにあるサイズの大きいjsonlファイルを、mongodbに登録する(メモリふっとばさずに)

Last updated at Posted at 2019-12-20

node.jsでgcsからファイルを読み込む

gcsからファイルを読み込む方法を探すと、よくdownload()を使用する例が紹介されています。

const storage = new Storage();
const bucket = storage.bucket('test-20180903');
const file = bucket.file('sample');
file.download().then(function(data) {
  res.status(200).json({'result': data.toString('utf-8')});
});

download()だと、サイズの大きいファイルを読み込むとメモリ不足
cloud functionsだと2Gまでしかメモリ拡張できないので、gcs側にファイル配置する際に、ファイルサイズを小さく分割しながら.·゜゜·(/。\)·゜゜·.

@google-cloud/storageのソースを見ている

download()以外に、createReadStream()なるものが!

file.ts
  const storage = new Storage();
  const bucket = storage.bucket('my-bucket');
  
  const fs = require('fs');
  const remoteFile = bucket.file('image.png');
  const localFilename = '/Users/stephen/Photos/image.png';
  
  remoteFile.createReadStream()
   .on('error', function(err) {})
   .on('response', function(response) {
     // Server connected and responded with the specified status and headers.
   })
   .on('end', function() {
     // The file is fully downloaded.
   })
   .pipe(fs.createWriteStream(localFilename));

createReadStream()でらファイルを読み込む

gcsからファイルを読み込む方法を探すと、よくdownload()を使用する例が紹介されています。

const storage = new Storage();
const bucket = storage.bucket('test-20180903');
const file = bucket.file('sample');
file.download().then(function(data) {
  res.status(200).json({'result': data.toString('utf-8')});
});

download()だと、サイズの大きいファイルを読み込むとメモリ不足
cloud functionsだと2Gまでしかメモリ拡張できないので、gcs側にファイル配置する際に、ファイルサイズを小さく分割しながら.·゜゜·(/。\)·゜゜·.

@google-cloud/storageのソースを見ている

download()以外に、createReadStream()なるものが!

file.ts
  const storage = new Storage();
  const bucket = storage.bucket('my-bucket');
  
  const fs = require('fs');
  const remoteFile = bucket.file('image.png');
  const localFilename = '/Users/stephen/Photos/image.png';
  
  remoteFile.createReadStream()
   .on('error', function(err) {})
   .on('response', function(response) {
     // Server connected and responded with the specified status and headers.
   })
   .on('end', function() {
     // The file is fully downloaded.
   })
   .pipe(fs.createWriteStream(localFilename));

createReadStream()で、ストリーム処理に

↑サンプルをもとに、mongodbへの登録処理を実装してみると、なぞのエラーが・・・
responseイベントは、ノンブロッキング(非同期)処理されるので、mongodbへのアクセスが多すぎたみたい

gcsからサイズの大きいjsonlファイルをmongodbに登録する

createReadStream()で作成したストリームを、ブロッキング(同期)処理したら大丈夫でした。ダメでした・・・
大量データ処理すると、なぞのエラーが。何がダメなんだろう?

exports.execute = async (event, context) => {
    const client = await mongo.connect(process.env.MONGODB_URL, { useNewUrlParser: true, useUnifiedTopology: true })

    let rs = null
    try {
        const db = client.db(process.env.MONGODB_DATABASE)
        rs = await storage.bucket(bucket).file(pubsubMessage.name).createReadStream();
        for await (const line of readLines(rs)) {
            const json = JSON.parse(line)
            json.lastModified = new Date()
            // 更新日時があたらしかった場合更新する 
            const result = await db.collection(collection).replaceOne({ _id: json._id, updateDttm: { $lte: json.updateDttm } }, json, { upsert: false })
            if (result.matchedCount == 0) {
                // 未登録の場合があるので登録してみる。 
                try {
                    await db.collection(collection).insertOne(json)
                } catch (err) {
                    if (err.message.indexOf("E11000") < 0) {
                        throw err
                    }
                }
            }
        }
    } catch (err) {
        throw err
    } finally {
        if (client) {
            client.close()
        }
        if (rs) {
            rs.destroy()
        }
    }

    function readLines(rs) {
        const output = new stream.PassThrough({ objectMode: true });
        const rl = readline.createInterface(rs, {});
        rl.on("line", line => {
            output.write(line);
        });
        rl.on("close", () => {
            output.push(null);
        });
        return output;
    }
}

for await...of 文を使うことで、ブロッキング(同期処理)にできました!
node v12からは、標準のreadline.createInterface()が、async iterable を返すようになるようなので、自前のreadLines()もいらなくなるようです。スッキリ書けますね。

うまくいかね~

結局どうしたか

gcsから、少しずつローカルにファイルダウンロードして、1行ずつ処理しました!!
cloud functionsは、/tmpは自由に使えます


        # ファイルとしてローカルにダウンロードする。(2MBずつ)
        local_file = '/tmp/' + filename
        bucket \
            .blob(file_path, chunk_size=262144) \
            .download_to_filename(local_file)

        # ダウンロードしたファイルに対し・・・
        with open(local_file, encoding='utf-8') as f:
            for line in f.readlines():
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?