node.jsでgcsからファイルを読み込む
gcsからファイルを読み込む方法を探すと、よくdownload()を使用する例が紹介されています。
const storage = new Storage();
const bucket = storage.bucket('test-20180903');
const file = bucket.file('sample');
file.download().then(function(data) {
res.status(200).json({'result': data.toString('utf-8')});
});
download()だと、サイズの大きいファイルを読み込むとメモリ不足に
cloud functionsだと2Gまでしかメモリ拡張できないので、gcs側にファイル配置する際に、ファイルサイズを小さく分割しながら.·゜゜·(/。\)·゜゜·.
@google-cloud/storageのソースを見ている
download()以外に、createReadStream()なるものが!
const storage = new Storage();
const bucket = storage.bucket('my-bucket');
const fs = require('fs');
const remoteFile = bucket.file('image.png');
const localFilename = '/Users/stephen/Photos/image.png';
remoteFile.createReadStream()
.on('error', function(err) {})
.on('response', function(response) {
// Server connected and responded with the specified status and headers.
})
.on('end', function() {
// The file is fully downloaded.
})
.pipe(fs.createWriteStream(localFilename));
createReadStream()でらファイルを読み込む
gcsからファイルを読み込む方法を探すと、よくdownload()を使用する例が紹介されています。
const storage = new Storage();
const bucket = storage.bucket('test-20180903');
const file = bucket.file('sample');
file.download().then(function(data) {
res.status(200).json({'result': data.toString('utf-8')});
});
download()だと、サイズの大きいファイルを読み込むとメモリ不足に
cloud functionsだと2Gまでしかメモリ拡張できないので、gcs側にファイル配置する際に、ファイルサイズを小さく分割しながら.·゜゜·(/。\)·゜゜·.
@google-cloud/storageのソースを見ている
download()以外に、createReadStream()なるものが!
const storage = new Storage();
const bucket = storage.bucket('my-bucket');
const fs = require('fs');
const remoteFile = bucket.file('image.png');
const localFilename = '/Users/stephen/Photos/image.png';
remoteFile.createReadStream()
.on('error', function(err) {})
.on('response', function(response) {
// Server connected and responded with the specified status and headers.
})
.on('end', function() {
// The file is fully downloaded.
})
.pipe(fs.createWriteStream(localFilename));
createReadStream()で、ストリーム処理に
↑サンプルをもとに、mongodbへの登録処理を実装してみると、なぞのエラーが・・・
responseイベントは、ノンブロッキング(非同期)処理されるので、mongodbへのアクセスが多すぎたみたい
gcsからサイズの大きいjsonlファイルをmongodbに登録する
createReadStream()で作成したストリームを、ブロッキング(同期)処理したら大丈夫でした。ダメ
でした・・・
大量データ処理すると、なぞのエラーが。何がダメなんだろう?
exports.execute = async (event, context) => {
const client = await mongo.connect(process.env.MONGODB_URL, { useNewUrlParser: true, useUnifiedTopology: true })
let rs = null
try {
const db = client.db(process.env.MONGODB_DATABASE)
rs = await storage.bucket(bucket).file(pubsubMessage.name).createReadStream();
for await (const line of readLines(rs)) {
const json = JSON.parse(line)
json.lastModified = new Date()
// 更新日時があたらしかった場合更新する
const result = await db.collection(collection).replaceOne({ _id: json._id, updateDttm: { $lte: json.updateDttm } }, json, { upsert: false })
if (result.matchedCount == 0) {
// 未登録の場合があるので登録してみる。
try {
await db.collection(collection).insertOne(json)
} catch (err) {
if (err.message.indexOf("E11000") < 0) {
throw err
}
}
}
}
} catch (err) {
throw err
} finally {
if (client) {
client.close()
}
if (rs) {
rs.destroy()
}
}
function readLines(rs) {
const output = new stream.PassThrough({ objectMode: true });
const rl = readline.createInterface(rs, {});
rl.on("line", line => {
output.write(line);
});
rl.on("close", () => {
output.push(null);
});
return output;
}
}
for await...of 文を使うことで、ブロッキング(同期処理)にできました!
node v12からは、標準のreadline.createInterface()が、async iterable を返すようになるようなので、自前のreadLines()もいらなくなるようです。スッキリ書けますね。
うまくいかね~
結局どうしたか
gcsから、少しずつローカルにファイルダウンロードして、1行ずつ処理しました!!
cloud functionsは、/tmpは自由に使えます
# ファイルとしてローカルにダウンロードする。(2MBずつ)
local_file = '/tmp/' + filename
bucket \
.blob(file_path, chunk_size=262144) \
.download_to_filename(local_file)
# ダウンロードしたファイルに対し・・・
with open(local_file, encoding='utf-8') as f:
for line in f.readlines():