LoginSignup
5
0

More than 1 year has passed since last update.

【Lambda】Node.js + Stream API によるファイル結合、圧縮、解凍

Posted at

きっかけ

  • とある案件でLambdaでS3 to S3のファイル結合等の操作をするため、StreamAPIを用いたメモリに展開しないファイル変形を実装した。その実装の備忘録のため。
  • 調べながら実装したため、不備等あればご指摘ください!
  • 基本はasync/awaitの構文ですが、一部異なります。

変形内容

  1. ファイル結合
  2. ファイル圧縮
  3. ファイル解凍

1. ファイル結合

  • 大きな流れ
    1. アップロード用の書き込みStream, S3アップロードPromiseを定義
    2. 1つ目ファイルの読み込みファイルStreamを作成
    3. 1つ目読み込みファイルStreamに書き込みStreamをend:falseでpipe ※end:falseをつけないと書き込みファイルStreamが閉じられてしまう
    4. 2つ目ファイルの読み込みファイルStreamを作成
    5. 2つ目読み込みファイルStreamに書き込みStreamをpipe
    6. S3アップロードPromiseをawait
integrate.js
const aws = require("aws-sdk");
const stream = require("stream");

const s3 = new aws.S3();

const integrate = (
  sourceBucket,
  sourceFile,
  sourceintegratedFile,
  outputFilename
) => {
  return new Promise(async () => {
    try {
      const uploadStream = () => {
        const pass = new stream.PassThrough();
        return {
          writeStream: pass,
          promise: s3
            .upload({
              Bucket: sourceBucket,
              Key: outputFilename,
              Body: pass,
            })
            .promise(),
        };
      };
      const { writeStream, promise } = uploadStream();

      const fileStream = s3
        .getObject({ Bucket: sourceBucket, Key: sourceintegratedFile })
        .createReadStream();

      fileStream.pipe(writeStream, { end: false });

      const integratedFileStream = s3
        .getObject({ Bucket: sourceBucket, Key: sourceFile })
        .createReadStream();

      integratedFileStream.pipe(writeStream);

      await promise;
    } catch (error) {
      console.log(error);
    }
  });
};

module.exports.integrate = integrate;

2. ファイル圧縮

  • 大きな流れ
    1. archiverオブジェクト、アップロード用の書き込みStreamを定義
    2. Bodyに書き込みStreamを設定したS3アップロードを実行 ※書き込みStreamにデータが流れてくるまで待機。完了するとresolve内のパラメータがreturnされる。
    3. archiverオブジェクトに書き込みStreamをpipe
    4. ファイルの読み込みファイルStreamを作成
    5. archiverオブジェクトに読み込みファイルStreamをappend ※ここで渡されたデータがzip化されている認識。
    6. archiverオブジェクトをクローズ
zip.js
const aws = require('aws-sdk');
const archiver = require('archiver');
const {PassThrough} = require('stream');

const s3 = new aws.S3();

const zip = (sourceBucket, sourceFile, outputFilename) => {
  return new Promise(async (resolve, reject) => {
    try {
      const streamArchiver = archiver('zip');

      const outputStream = new PassThrough();

      const params = {
        Bucket: sourceBucket,
        Key: outputFilename,
        Body: outputStream,
      };

      s3.upload(params, function(error, data) {
        if (error) {
          reject(error);
        } else {
          resolve({
            s3Bucket: data.Bucket,
            fileKey: data.Key,
            fileSize: streamArchiver.pointer(),
          });
        }
      });

      streamArchiver.pipe(outputStream);

      const fileReadStream = s3
        .getObject({Bucket: sourceBucket, Key: sourceFile})
        .createReadStream();
      streamArchiver.append(fileReadStream, {
        name: sourceFile.substring(sourceFile.lastIndexOf('/') + 1),
      });

      streamArchiver.finalize();
    } catch (error) {
      reject(error);
    }
  });
};

module.exports.zip = zip;

2. ファイル圧縮

  • 大きな流れ
    1. アップロード用の書き込みStream, S3アップロードPromiseを定義
    2. 読み込みファイルStreamを作成
    3. 読み込みファイルStreamにzipperの解凍処理、書き込みStreamの順でpipe
    4. S3アップロードPromiseをawait
unzip.js
const aws = require("aws-sdk");
const unzipper = require("unzipper");
const stream = require("stream");

const s3 = new aws.S3();

const unzip = (sourceBucket, sourceFile, outputFilename) => {
  return new Promise(async () => {
    try {
      const uploadStream = () => {
        const pass = new stream.PassThrough();
        return {
          writeStream: pass,
          promise: s3
            .upload({
              Bucket: sourceBucket,
              Key: outputFilename,
              Body: pass,
            })
            .promise(),
        };
      };
      const { writeStream, promise } = uploadStream();

      const fileStream = s3
        .getObject({ Bucket: sourceBucket, Key: sourceFile })
        .createReadStream();

      fileStream.pipe(unzipper.ParseOne()).pipe(writeStream);

      await promise;
    } catch (error) {
      console.log("upload uncompleted");
    }
  });
};

module.exports.unzip = unzip;
5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0