LoginSignup
4
0

More than 3 years have passed since last update.

Elixirを使ったAWS S3 EventStream バイナリパーサ

Last updated at Posted at 2019-12-03

この記事は ミクシィグループ Advent Calendar 2019 の4日目の記事です。

スポーツ事業部のエンジニアの @kodam です。普段はバックエンドのアプリケーションから後ろの部分を大体担当してます。スポーツ事業部ではバックエンドにElixirを採用しており、今年のElixirFestでも発表してきたので興味ある人はそちらを御覧ください。 (参考: XFLAG × スポーツ × Elixir)

はじめに

S3にあるParquetに対して直接S3Selectを叩けないかなと思って調べ、自前でバイナリパーサを書く必要があったのでせっかくなので公開しようと思います。

Amazon S3 SelectはEventStreamという形で返ってくるようです。 まずはExAwsでサポートしてないか調べたところ、リクエストまではできるようですがその先のパースはまだ実装されていないようでした。 (参考: https://github.com/ex-aws/ex_aws_s3/issues/23)

作ったパーサ

defmodule Aws.S3.Selector do
  @doc """
  S3SelectObject用のモジュール

  > {:ok, result} = Aws.S3.Selector.query("my-bucket",
                      "x_report/result.parquet",
                      "select * from s3object")
  > messages = Aws.S3.Selector.decode(result.body)
  [%{
    header: %{
      ":content-type" => "application/octet-stream",
      ":event-type" => "Records",
      ":message-type" => "event"
    },
    payload: "xxxxxxx\n"
  },
  %{
    header: %{
      ":content-type" => "text/xml",
      ":event-type" => "Stats",
      ":message-type" => "event"
    },
    payload: "<Stats xmlns=\"\"><BytesScanned>4597</BytesScanned><BytesProcessed>543</BytesProcessed><BytesReturned>753</BytesReturned></Stats>"
  },
  %{
    header: %{":event-type" => "End", ":message-type" => "event"},
    payload: "failed"
  }]
  > messages |> Aws.S3.Selector.convert_messages()
  [%{
    "xxxx" => "yyy",
    "yyyy" => "zzz"
  },
  %{
    "xxxx" => "yyy",
    "yyyy" => "zzz"
  }]

  Message Structure
  Also See: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html
  +--------------+---------------+-------------+----------+---------+-------------+-------+----
  | total_length | header_length | payload_crc |  header  | payload | message_crc |  .. Next
  |   32 bit     |    32 bits    |    32 bit   |   x bit  |  y bit  |   32 bits   |  .. Message
  +--------------+---------------+-------------+----------+---------+-------------+------------
  """
  def decode(
        <<
          total_len::big-integer-size(32),
          header_len::big-integer-size(32),
          prelude_crc::big-integer-size(32),
          header::binary-size(header_len),
          body::binary
        >> = data
      ) do
    unless is_valid_crc(<<total_len::big-integer-size(32), header_len::big-integer-size(32)>>, prelude_crc) do
      raise Aws.S3.Selector.CRCError
    end

    # payload_length = total_length - header_length - sizeOf(total_length)
    #                    - sizeOf(header_length) - sizeOf(prelude_crc) - sizeOf(message_crc)
    payload_len = total_len - header_len - 16

    # message_len = total_len - sizeOf(message_crc)
    message_len = total_len - 4
    decoded_header = decode_header(header)

    body
    |> case do
      <<payload::binary-size(payload_len), m_crc::big-integer-size(32)>> when payload_len > 0 ->
        <<m_data::binary-size(message_len), _::binary>> = data
        unless is_valid_crc(m_data, m_crc), do: raise(Aws.S3.Selector.CRCError)

        [%{header: decoded_header, payload: payload}]

      <<payload::binary-size(payload_len), m_crc::big-integer-size(32), tail::binary>> when payload_len > 0 ->
        <<m_data::binary-size(message_len), _::binary>> = data
        unless is_valid_crc(m_data, m_crc), do: raise(Aws.S3.Selector.CRCError)

        [%{header: decoded_header, payload: payload}] ++ decode(tail)

      _ ->
        [%{header: decoded_header, payload: ""}]
    end
  end

  defp decode_header(data) do
    case data do
      <<kl::8, key::binary-size(kl), _type::8, vl::2*8, value::binary-size(vl)>> ->
        %{key => value}

      <<kl::8, key::binary-size(kl), _type::8, vl::2*8, value::binary-size(vl), tail::binary>> ->
        Map.merge(%{key => value}, decode_header(tail))

      _ ->
        %{}
    end
  end

  defp is_valid_crc(<<data::binary>>, checksum) do
    CRC.crc_32(data) === checksum
  end

  def convert_messages(messages) do
    messages
    |> Enum.map(fn msg ->
      msg.header[":event-type"]
      |> case do
        "Records" ->
          msg.payload |> String.split("\n") |> Enum.reject(&(&1 == ""))

        _ ->
          []
      end
    end)
    |> List.flatten()
    |> Enum.map(&Poison.decode!/1)
  end

  def query(bucket_name, parquet_path, expression) do
    %ExAws.Operation.S3{
      body: body(expression),
      bucket: bucket_name,
      headers: %{},
      http_method: :post,
      params: %{},
      parser: &ExAws.Utils.identity/1,
      path: "#{parquet_path}?select&select-type=2",
      resource: "",
      service: :s3,
      stream_builder: nil
    }
    |> ExAws.request()
  end

  defp body(expression),
    do: """
    <?xml version="1.0" encoding="UTF-8"?>
    <SelectRequest>
       <Expression>#{expression}</Expression>
       <ExpressionType>SQL</ExpressionType>
       <InputSerialization>
          <CompressionType>NONE</CompressionType>
          <Parquet />
       </InputSerialization>
       <OutputSerialization>
          <JSON>
             <CompressionType>NONE</CompressionType>
             <RecordDelimiter>\n</RecordDelimiter>
          </JSON>
       </OutputSerialization>
       <RequestProgress>
          <Enabled>FALSE</Enabled>
       </RequestProgress>
    </SelectRequest>
    """

  defmodule(CRCError, do: defexception(message: "crc checksum error"))
end
依存しているモジュール

使い方

処理は3つのパートに別れています。 1.S3にクエリを投げる、2.S3EventStreamをデコードする, 3.デコードされたJSONデータをオブジェクトをMapに変換する

# 1.S3にクエリを投げる
> {:ok, result} = Aws.S3.Selector.query("special-bucket", "development/brian_report/result.parquet", "select * from s3object")
{:ok,
 %{
   body: <<0, 0, 3, 122, 0, 0, 0, 85, 75, 213, 240, 210, 13, 58, 109, 101, 115,
     115, 97, 103, 101, 45, 116, 121, 112, 101, 7, 0, 5, 101, 118, 101, 110,
     116, 11, 58, 101, 118, 101, 110, 116, 45, 116, 121, 112, 101, 7, ...>>,
   headers: [
     {"x-amz-id-2",
      "v9/PTmkY229xaJLqsdVDw9WauRlapPhpMcbnwu+ruaqU90lp1YdxbL/IV6jVEZQyq/S3CR3T="},
     {"x-amz-request-id", "1616814D8C131CC9"},
     {"Date", "Tue, 03 Dec 2019 08:35:13 GMT"},
     {"Transfer-Encoding", "chunked"},
     {"Server", "AmazonS3"}
   ],
   status_code: 200
 }}

# 2.S3EventStreamをデコードする
> messages = result.body |> Aws.S3.Selector.decode()
[
  %{
    header: %{
      ":content-type" => "application/octet-stream",
      ":event-type" => "Records",
      ":message-type" => "event"
    },
    payload: "{\"updated_at\":\"2019-12-03 07:03:16.648 UTC\",\"user_id\":\"1\",\"point\":\"100\"}\n{\"updated_at\":\"2019-12-03 07:03:16.648 UTC\",\"user_id\":\"2\",\"point\":\"100\"}\n{\"updated_at\":\"2019-12-03 07:03:16.648 UTC\",\"user_id\":\"3\",\"point\":\"500\"}\n{\"updated_at\":\"2019-12-03 07:03:16.648 UTC\",\"user_id\":\"4\",\"point\":\"600\"}\n{\"updated_at\":\"2019-12-03 07:03:16.648 UTC\",\"user_id\":\"5\",\"point\":\"1000\"}\n"
  },
  %{
    header: %{
      ":content-type" => "text/xml",
      ":event-type" => "Stats",
      ":message-type" => "event"
    },
    payload: "<Stats xmlns=\"\"><BytesScanned>4651</BytesScanned><BytesProcessed>579</BytesProcessed><BytesReturned>789</BytesReturned></Stats>"
  },
  %{header: %{":event-type" => "End", ":message-type" => "event"}, payload: ""}
]

# 3.デコードされたJSONデータをオブジェクトをMapに変換する
> records = messages |> Aws.S3.Selector.convert_messages()                                                                                  
[
  %{
    "user_id" => "1",
    "point" => "100",
    "updated_at" => "2019-12-03 07:03:16.648 UTC"
  },
  %{
    "user_id" => "2",
    "point" => "100",
    "updated_at" => "2019-12-03 07:03:16.648 UTC"
  },
  %{
    "user_id" => "3",
    "point" => "500",
    "updated_at" => "2019-12-03 07:03:16.648 UTC"
  },
  %{
    "user_id" => "4",
    "point" => "600",
    "updated_at" => "2019-12-03 07:03:16.648 UTC"
  },
  %{
    "user_id" => "5",
    "point" => "1000",
    "updated_at" => "2019-12-03 07:03:16.648 UTC"
  }
]

真面目に設計するならそれぞれ別の役割なのでモジュールに分割するかインターナルモジュールにして内部で隠蔽化するのが良いと思います。また、テストは取得したデータをfixtureファイルとして持っておくか、Base64にしてテストファイルに付属させるのが良さそうです。需要があればExAwsS3にPRを出そうと思います。

おわりに

Elixirを使ってAWS S3 EventStreamのバイナリレスポンスをパースしました。Elixirは強力なパタンマッチを使うことができて、バイナリプロトコルを解析するのに向いています。弊社の他のエンジニアがこの辺の詳しい話はしてくれると思うので本質的なところはおまかせします。

再帰とパタンマッチ、前方でマッチした値を使って可変長なデータサイズをマッチすることもできるので、数行のパタンマッチで実装することができました。

バイナリパタンマッチをする時は是非Elixirを使ってください!

4
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
0