LoginSignup
5
0

More than 1 year has passed since last update.

ElixirでS3+ZipStream

Last updated at Posted at 2023-04-03

この記事について

  • ローカルのフォルダをZipStreamで取り込みます。
  • 5MiB単位のchunkにまとめ、S3にマルチパートアップロードします。
  • S3に保存したZIPファイルをストリームとして読み込み、必要なファイル情報に絞って取得します。

動作環境

  • macOS Ventura
  • Erlang 24.3.4.9
  • Elixir 1.14.3

mixプロジェクト作成

mix new zip_stream
cd zip_stream

依存パッケージ

depsを以下のように設定します。

mix.exs
defmodule ZipStream.MixProject do
  use Mix.Project

  def project do
    [
      app: :zip_stream,
      version: "0.1.0",
      elixir: "~> 1.14",
      start_permanent: Mix.env() == :prod,
      deps: deps(),
    ]
  end

  # Run "mix help compile.app" to learn about applications.
  def application do
    [
      extra_applications: [:logger]
    ]
  end

  # Run "mix help deps" to learn about dependencies.
  defp deps do
    [
+     {:ex_aws, "~> 2.4"},
+     {:ex_aws_s3, "~> 2.4"},
+     {:jason, "~> 1.4"},
+     {:hackney, "~> 1.18"},
+     {:sweet_xml, "~> 0.7"},
+     {:packmatic, "~> 1.1.2"},
+     {:unzip, "~> 0.8"}
    ]
  end
end

以下コマンドで取得しておきます。

mix local.hex --force
mix local.rebar --force
mix deps.get

config設定

ファイルを生成します。

mkdir config
touch config/config.exs
touch .env

それぞれ以下のように設定します。
今回は.envファイルから情報を取得するようにしてみました。

config/config.exs
import Config

config :ex_aws,
  access_key_id: System.get_env("AWS_ACCESS_KEY_ID"),
  secret_access_key: System.get_env("AWS_SECRET_ACCESS_KEY"),
  region: System.get_env("AWS_REGION")
.env
export AWS_ACCESS_KEY_ID="xxxx"
export AWS_SECRET_ACCESS_KEY="xxxx"
export AWS_REGION="ap-northeast-1"
export AWS_BUCKET="xxxx"

モジュール作成

いくつか必要な処理をモジュールとして定義していきます。

BinaryStream

マルチパートアップロードのサイズ制限(5MiB)に対応するため、ストリームを指定したバイトサイズのchunkにまとめる関数を定義します。

詳細は以下記事に記載しています。

touch lib/binary_stream.ex
lib/binary_stream.ex
defmodule BinaryStream do
  @doc """
  byte_sizeのchunkにまとめる関数

  ## Examples:
  iex> Stream.cycle([<<1>>, <<2, 3>>, <<4, 5, 6>>, <<7, 8, 9, 10>>])
  iex> |> BinaryStream.chunk_by_byte(7)
  iex> |> Enum.take(3)
  [<<1, 2, 3, 4, 5, 6, 7>>, <<8, 9, 10, 1, 2, 3, 4>>, <<5, 6, 7, 8, 9, 10, 1>>]

  iex> Stream.cycle([<<195>>, <<166, 195>>, <<167, 195, 168>>])
  iex> |> BinaryStream.chunk_by_byte(4)
  iex> |> Enum.take(3)
  [<<195, 166, 195, 167>>, <<195, 168, 195, 166>>, <<195, 167, 195, 168>>]
  """
  @spec chunk_by_byte(Enumerable.t(), pos_integer()) :: Enumerable.t()
  def chunk_by_byte(enum, chunk_size) when is_integer(chunk_size) and chunk_size > 0 do
    chunk_fn = fn element, acc ->
      binary = acc <> element

      if byte_size(binary) >= chunk_size do
        {chunk_list, rest} = do_chunks(binary, chunk_size, [])
        {:cont, chunk_list, rest}
      else
        {:cont, binary}
      end
    end

    after_fn = fn
      <<>> -> {:cont, nil}
      acc -> {:cont, [acc], nil}
    end

    enum
    |> Stream.chunk_while(<<>>, chunk_fn, after_fn)
    |> Stream.flat_map(& &1)
  end

  defp do_chunks(binary, chunk_size, acc) when byte_size(binary) < chunk_size do
    {Enum.reverse(acc), binary}
  end

  defp do_chunks(binary, chunk_size, acc) do
    <<chunk::binary-size(chunk_size), rest::binary>> = binary
    do_chunks(rest, chunk_size, [chunk | acc])
  end
end

Unzip.S3File

S3のZIPファイルを扱うための関数を定義します。

Unzipのドキュメントに記載されているコードをそのまま持ってきました。

mkdir lib/unzip
touch lib/unzip/s3file.ex
lib/unzip/s3file.ex
defmodule Unzip.S3File do
  defstruct [:bucket, :key, :s3_config]
  alias __MODULE__

  def new(bucket, key, s3_config) do
    %S3File{bucket: bucket, key: key, s3_config: s3_config}
  end
end

defimpl Unzip.FileAccess, for: Unzip.S3File do
  alias ExAws.S3

  def size(file) do
    %{headers: headers} = S3.head_object(file.bucket, file.key) |> ExAws.request!(file.s3_config)

    size =
      headers
      |> Enum.find(fn {k, _} -> String.downcase(k) == "content-length" end)
      |> elem(1)
      |> String.to_integer()

    {:ok, size}
  end

  def pread(file, offset, length) do
    {_, chunk} =
      S3.Download.get_chunk(
        %S3.Download{bucket: file.bucket, path: file.key, dest: nil},
        %{start_byte: offset, end_byte: offset + length - 1},
        file.s3_config
      )

    {:ok, chunk}
  end
end

ZipStream

メインとなる処理を作成します。
今回は(個人的なニーズを満たすため)以下の処理を実装してみました。

  • フォルダをZIPとしてS3へアップロードする処理
  • S3上のZIPを展開しながらダウンロードする処理
  • S3上のZIPに含まれるファイル一覧を取得する処理
  • S3上のZIPに含まれる特定のファイルのみをダウンロードする処理

ファイル単位のアップロードや、複数ファイル指定のダウンロードなんかも簡単に実装できそうです。

lib_zip_stream.ex
defmodule ZipStream do
  @moduledoc """
  ## Functions

  - put_folder/3
  - get_folder/3
  - list_files/2
  - get_file/4
  """

  defp stream_zip_folder(folder_path) do
    entries =
      File.ls!(folder_path)
      |> Enum.sort()
      |> Enum.map(fn file_name ->
        file_path = Path.join([folder_path, file_name])

        zip_path =
          file_path
          |> Path.dirname()
          |> Path.split()
          |> Enum.at(-1)
          |> then(&Path.join([&1, file_name]))

        [source: {:file, file_path}, path: zip_path]
      end)

    Packmatic.build_stream(entries)
    |> Stream.flat_map(&List.flatten(&1))
  end

  @doc """
  フォルダをZIPとしてS3へアップロードする関数

  - folder_path : 対象フォルダのパス
  - bucket : S3バケット名
  - key : S3上の名前

  ## Examples
  iex> ZipStream.put_folder("./target", "my-bucket", "key.zip")
  """
  def put_folder(folder_path, bucket, key) do
    IO.puts("Uploading:  #{folder_path}")

    stream_zip_folder(folder_path)
    |> BinaryStream.chunk_by_byte(5 * 1024 * 1024)
    |> ExAws.S3.upload(bucket, key)
    |> ExAws.request!()
    |> then(fn
      %{status_code: 200} -> IO.puts("Complete :  #{folder_path}")
      _ -> IO.puts("Failed to upload...")
    end)
  end

  defp get_unzip(bucket, key) do
    s3_config =
      ExAws.Config.new(:s3,
        access_key_id: [System.get_env("AWS_ACCESS_KEY_ID"), :instance_role],
        secret_access_key: [System.get_env("AWS_SECRET_ACCESS_KEY"), :instance_role]
      )

    s3_zip_file = Unzip.S3File.new(bucket, key, s3_config)
    {:ok, unzip} = Unzip.new(s3_zip_file)

    unzip
  end

  @doc """
  S3上のZIPを展開しながらダウンロードする関数

  - bucket : S3バケット名
  - key : S3上のZIPのkey
  - to : ダウンロード先を指定(デフォルトはカレントディレクトリ)

  ## Examples
  iex> ZipStream.get_folder("my-bucket", "key.zip")

  iex> ZipStream.get_folder("my-bucket", "key.zip", "./data")
  """
  def get_folder(bucket, key, to \\ ".") do
    unzip = get_unzip(bucket, key)
    file_list = Unzip.list_entries(unzip)

    file_list
    |> Enum.map(fn file ->
      file_name = file.file_name
      dist = "#{to}/#{file_name}"
      dir_path = Path.dirname(dist)

      unless File.exists?(dir_path) do
        File.mkdir_p!(dir_path)
      end

      unless File.exists?(dist) do
        IO.puts("Downloading:  #{file_name}")

        Unzip.file_stream!(unzip, file_name)
        |> Stream.into(File.stream!(dist))
        |> Stream.run()

        IO.puts("Complete   :  #{dist}")
      else
        IO.puts("Already exists.  #{dist}")
      end
    end)
  end

  @doc """
  S3上のZIPに含まれるファイル一覧を取得する関数

  - bucket : S3バケット名
  - key : S3上のZIPのkey

  ## Examples
  iex> ZipStream.list_files("my-bucket", "key.zip")
  """
  def list_files(bucket, key) do
    unzip = get_unzip(bucket, key)

    Unzip.list_entries(unzip)
    |> Enum.map(&Map.get(&1, :file_name))
  end

  @doc """
  S3上のZIPに含まれる特定のファイルのみをダウンロードする関数

  - bucket : S3バケット名
  - key : S3上のZIPのkey
  - file_name : ZIPに含まれるファイル名(`list_file/2`で確認可能)
  - to : ダウンロード先を指定

  ## Examples
  iex> ZipStream.get_file("my-bucket", "key.zip", "example.txt", "./example.txt")
  """
  def get_file(bucket, key, file_name, to) do
    unzip = get_unzip(bucket, key)

    dir_path = Path.dirname(to)

    unless File.exists?(dir_path) do
      File.mkdir_p!(dir_path)
    end

    unless File.exists?(to) do
      IO.puts("Downloading:  #{file_name}")

      Unzip.file_stream!(unzip, file_name)
      |> Stream.into(File.stream!(to))
      |> Stream.run()

      IO.puts("Complete   :  #{to}")
    else
      IO.puts("Already exists.  #{to}")
    end
  end
end

アップロード用のフォルダを作成

実際に動かすため、S3にアップロードするフォルダを作りたいと思います。

mkdir tmp
touch tmp/test.txt
echo "Test txt." >> tmp/test.txt

touch tmp/test.json
echo '{
  "glossary": {
    "title": "example glossary",
    "GlossDiv": {
      "title": "S",
      "GlossList": {
        "GlossEntry": {
          "ID": "SGML",
          "SortAs": "SGML",
          "GlossTerm": "Standard Generalized Markup Language",
          "Acronym": "SGML",
          "Abbrev": "ISO 8879:1986",
          "GlossDef": {
            "para": "A meta-markup language, used to create markup languages such as DocBook.",
            "GlossSeeAlso": ["GML", "XML"]
          },
          "GlossSee": "markup"
        }
      }
    }
  }
}' >> tmp/test.json

ついでに30MB超のpngファイルもtmpフォルダに入れてみました。

動かしてみる

実際にiexで動かしてみます。
source .envを入れないと、.envが読み込まれないので注意です。

source .env && iex -S mix
# 今回利用するbucketとkeyを指定します。
bucket = System.get_env("AWS_BUCKET")
key = "tmp.zip"

import ZipStream

# S3にアップロード
put_folder("./tmp", bucket, key)

AWSコンソールで、ちゃんとアップロードされていることが確認できました。

Screenshot 2023-03-31 13.51.09.jpg

# S3からダウンロード
get_folder(bucket, key, "./download")

# JSONを比較
original_json = File.read!("./tmp/test.json")
zip_json = File.read!("./download/tmp/test.json")
original_json === zip_json
# true

次は、ZIPから特定のファイルだけ取得してみたいと思います。

# ZIPに含まれるファイル名を取得
file_list = list_files(bucket, key)
# ["tmp/test.json", "tmp/test.png", "tmp/test.txt"]

# jsonだけダウンロード
file_list
|> Enum.at(0)
|> then(& get_file(bucket, key, &1, "./download/test.json"))

# 比較
zip_json2 = File.read!("./download/test.json")
original_json === zip_json2
# true

ZIPから特定のファイルだけダウンロードすることができました。

まとめ

ローカルとS3をZipStreamで繋いでみました。

ストリームなので、サイズが大きくてもメモリをあまり使わずに処理できていい感じです。

ZIPの中から特定のファイルだけを選んで取ってこれるのも嬉しいですね。

S3をデータレイクとして使う場合、ファイルに出力せず直接変数として受け取って処理してもいいかも。

ちなみにentriesのところはローカルだけでなくURLも指定することができるので、少し改造すれば色々応用が利きそうです。

参考

余談

  • 自動生成されたプロジェクトにconfigフォルダがなくて、少し迷いました。事情を調べると以下経緯のようです。

    mix newでconfig/config.exsをつくらなくなりました。設定ファイルに依存することは、ライブラリやその作者にとって望ましくないとされてきたからです。

5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0