きっかけ
- とある案件でLambdaでS3 to S3のファイル結合等の操作をするため、StreamAPIを用いたメモリに展開しないファイル変形を実装した。その実装の備忘録のため。
- 調べながら実装したため、不備等あればご指摘ください!
- 基本はasync/awaitの構文ですが、一部異なります。
変形内容
- ファイル結合
- ファイル圧縮
- ファイル解凍
1. ファイル結合
- 大きな流れ
- アップロード用の書き込みStream, S3アップロードPromiseを定義
- 1つ目ファイルの読み込みファイルStreamを作成
- 1つ目読み込みファイルStreamに書き込みStreamをend:falseでpipe ※end:falseをつけないと書き込みファイルStreamが閉じられてしまう
- 2つ目ファイルの読み込みファイルStreamを作成
- 2つ目読み込みファイルStreamに書き込みStreamをpipe
- S3アップロードPromiseをawait
integrate.js
const aws = require("aws-sdk");
const stream = require("stream");
const s3 = new aws.S3();
const integrate = (
sourceBucket,
sourceFile,
sourceintegratedFile,
outputFilename
) => {
return new Promise(async () => {
try {
const uploadStream = () => {
const pass = new stream.PassThrough();
return {
writeStream: pass,
promise: s3
.upload({
Bucket: sourceBucket,
Key: outputFilename,
Body: pass,
})
.promise(),
};
};
const { writeStream, promise } = uploadStream();
const fileStream = s3
.getObject({ Bucket: sourceBucket, Key: sourceintegratedFile })
.createReadStream();
fileStream.pipe(writeStream, { end: false });
const integratedFileStream = s3
.getObject({ Bucket: sourceBucket, Key: sourceFile })
.createReadStream();
integratedFileStream.pipe(writeStream);
await promise;
} catch (error) {
console.log(error);
}
});
};
module.exports.integrate = integrate;
2. ファイル圧縮
- 大きな流れ
- archiverオブジェクト、アップロード用の書き込みStreamを定義
- Bodyに書き込みStreamを設定したS3アップロードを実行 ※書き込みStreamにデータが流れてくるまで待機。完了するとresolve内のパラメータがreturnされる。
- archiverオブジェクトに書き込みStreamをpipe
- ファイルの読み込みファイルStreamを作成
- archiverオブジェクトに読み込みファイルStreamをappend ※ここで渡されたデータがzip化されている認識。
- archiverオブジェクトをクローズ
zip.js
const aws = require('aws-sdk');
const archiver = require('archiver');
const {PassThrough} = require('stream');
const s3 = new aws.S3();
const zip = (sourceBucket, sourceFile, outputFilename) => {
return new Promise(async (resolve, reject) => {
try {
const streamArchiver = archiver('zip');
const outputStream = new PassThrough();
const params = {
Bucket: sourceBucket,
Key: outputFilename,
Body: outputStream,
};
s3.upload(params, function(error, data) {
if (error) {
reject(error);
} else {
resolve({
s3Bucket: data.Bucket,
fileKey: data.Key,
fileSize: streamArchiver.pointer(),
});
}
});
streamArchiver.pipe(outputStream);
const fileReadStream = s3
.getObject({Bucket: sourceBucket, Key: sourceFile})
.createReadStream();
streamArchiver.append(fileReadStream, {
name: sourceFile.substring(sourceFile.lastIndexOf('/') + 1),
});
streamArchiver.finalize();
} catch (error) {
reject(error);
}
});
};
module.exports.zip = zip;
2. ファイル圧縮
- 大きな流れ
- アップロード用の書き込みStream, S3アップロードPromiseを定義
- 読み込みファイルStreamを作成
- 読み込みファイルStreamにzipperの解凍処理、書き込みStreamの順でpipe
- S3アップロードPromiseをawait
unzip.js
const aws = require("aws-sdk");
const unzipper = require("unzipper");
const stream = require("stream");
const s3 = new aws.S3();
const unzip = (sourceBucket, sourceFile, outputFilename) => {
return new Promise(async () => {
try {
const uploadStream = () => {
const pass = new stream.PassThrough();
return {
writeStream: pass,
promise: s3
.upload({
Bucket: sourceBucket,
Key: outputFilename,
Body: pass,
})
.promise(),
};
};
const { writeStream, promise } = uploadStream();
const fileStream = s3
.getObject({ Bucket: sourceBucket, Key: sourceFile })
.createReadStream();
fileStream.pipe(unzipper.ParseOne()).pipe(writeStream);
await promise;
} catch (error) {
console.log("upload uncompleted");
}
});
};
module.exports.unzip = unzip;