この記事は ミクシィグループ Advent Calendar 2019 の4日目の記事です。
スポーツ事業部のエンジニアの @kodam です。普段はバックエンドのアプリケーションから後ろの部分を大体担当してます。スポーツ事業部ではバックエンドにElixirを採用しており、今年のElixirFestでも発表してきたので興味ある人はそちらを御覧ください。 (参考: XFLAG × スポーツ × Elixir)
はじめに
S3にあるParquetに対して直接S3Selectを叩けないかなと思って調べ、自前でバイナリパーサを書く必要があったのでせっかくなので公開しようと思います。
Amazon S3 SelectはEventStreamという形で返ってくるようです。 まずはExAwsでサポートしてないか調べたところ、リクエストまではできるようですがその先のパースはまだ実装されていないようでした。 (参考: https://github.com/ex-aws/ex_aws_s3/issues/23)
作ったパーサ
defmodule Aws.S3.Selector do
@doc """
S3SelectObject用のモジュール
> {:ok, result} = Aws.S3.Selector.query("my-bucket",
"x_report/result.parquet",
"select * from s3object")
> messages = Aws.S3.Selector.decode(result.body)
[%{
header: %{
":content-type" => "application/octet-stream",
":event-type" => "Records",
":message-type" => "event"
},
payload: "xxxxxxx\n"
},
%{
header: %{
":content-type" => "text/xml",
":event-type" => "Stats",
":message-type" => "event"
},
payload: "<Stats xmlns=\"\"><BytesScanned>4597</BytesScanned><BytesProcessed>543</BytesProcessed><BytesReturned>753</BytesReturned></Stats>"
},
%{
header: %{":event-type" => "End", ":message-type" => "event"},
payload: "failed"
}]
> messages |> Aws.S3.Selector.convert_messages()
[%{
"xxxx" => "yyy",
"yyyy" => "zzz"
},
%{
"xxxx" => "yyy",
"yyyy" => "zzz"
}]
Message Structure
Also See: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html
+--------------+---------------+-------------+----------+---------+-------------+-------+----
| total_length | header_length | payload_crc | header | payload | message_crc | .. Next
| 32 bit | 32 bits | 32 bit | x bit | y bit | 32 bits | .. Message
+--------------+---------------+-------------+----------+---------+-------------+------------
"""
def decode(
<<
total_len::big-integer-size(32),
header_len::big-integer-size(32),
prelude_crc::big-integer-size(32),
header::binary-size(header_len),
body::binary
>> = data
) do
unless is_valid_crc(<<total_len::big-integer-size(32), header_len::big-integer-size(32)>>, prelude_crc) do
raise Aws.S3.Selector.CRCError
end
# payload_length = total_length - header_length - sizeOf(total_length)
# - sizeOf(header_length) - sizeOf(prelude_crc) - sizeOf(message_crc)
payload_len = total_len - header_len - 16
# message_len = total_len - sizeOf(message_crc)
message_len = total_len - 4
decoded_header = decode_header(header)
body
|> case do
<<payload::binary-size(payload_len), m_crc::big-integer-size(32)>> when payload_len > 0 ->
<<m_data::binary-size(message_len), _::binary>> = data
unless is_valid_crc(m_data, m_crc), do: raise(Aws.S3.Selector.CRCError)
[%{header: decoded_header, payload: payload}]
<<payload::binary-size(payload_len), m_crc::big-integer-size(32), tail::binary>> when payload_len > 0 ->
<<m_data::binary-size(message_len), _::binary>> = data
unless is_valid_crc(m_data, m_crc), do: raise(Aws.S3.Selector.CRCError)
[%{header: decoded_header, payload: payload}] ++ decode(tail)
_ ->
[%{header: decoded_header, payload: ""}]
end
end
defp decode_header(data) do
case data do
<<kl::8, key::binary-size(kl), _type::8, vl::2*8, value::binary-size(vl)>> ->
%{key => value}
<<kl::8, key::binary-size(kl), _type::8, vl::2*8, value::binary-size(vl), tail::binary>> ->
Map.merge(%{key => value}, decode_header(tail))
_ ->
%{}
end
end
defp is_valid_crc(<<data::binary>>, checksum) do
CRC.crc_32(data) === checksum
end
def convert_messages(messages) do
messages
|> Enum.map(fn msg ->
msg.header[":event-type"]
|> case do
"Records" ->
msg.payload |> String.split("\n") |> Enum.reject(&(&1 == ""))
_ ->
[]
end
end)
|> List.flatten()
|> Enum.map(&Poison.decode!/1)
end
def query(bucket_name, parquet_path, expression) do
%ExAws.Operation.S3{
body: body(expression),
bucket: bucket_name,
headers: %{},
http_method: :post,
params: %{},
parser: &ExAws.Utils.identity/1,
path: "#{parquet_path}?select&select-type=2",
resource: "",
service: :s3,
stream_builder: nil
}
|> ExAws.request()
end
defp body(expression),
do: """
<?xml version="1.0" encoding="UTF-8"?>
<SelectRequest>
<Expression>#{expression}</Expression>
<ExpressionType>SQL</ExpressionType>
<InputSerialization>
<CompressionType>NONE</CompressionType>
<Parquet />
</InputSerialization>
<OutputSerialization>
<JSON>
<CompressionType>NONE</CompressionType>
<RecordDelimiter>\n</RecordDelimiter>
</JSON>
</OutputSerialization>
<RequestProgress>
<Enabled>FALSE</Enabled>
</RequestProgress>
</SelectRequest>
"""
defmodule(CRCError, do: defexception(message: "crc checksum error"))
end
依存しているモジュール
- ExAws - https://github.com/ex-aws/ex_aws
- ExAwsS3 - https://github.com/ex-aws/ex_aws_s3
- CRC - https://github.com/TattdCodeMonkey/crc
使い方
処理は3つのパートに別れています。 1.S3にクエリを投げる、2.S3EventStreamをデコードする, 3.デコードされたJSONデータをオブジェクトをMapに変換する
# 1.S3にクエリを投げる
> {:ok, result} = Aws.S3.Selector.query("special-bucket", "development/brian_report/result.parquet", "select * from s3object")
{:ok,
%{
body: <<0, 0, 3, 122, 0, 0, 0, 85, 75, 213, 240, 210, 13, 58, 109, 101, 115,
115, 97, 103, 101, 45, 116, 121, 112, 101, 7, 0, 5, 101, 118, 101, 110,
116, 11, 58, 101, 118, 101, 110, 116, 45, 116, 121, 112, 101, 7, ...>>,
headers: [
{"x-amz-id-2",
"v9/PTmkY229xaJLqsdVDw9WauRlapPhpMcbnwu+ruaqU90lp1YdxbL/IV6jVEZQyq/S3CR3T="},
{"x-amz-request-id", "1616814D8C131CC9"},
{"Date", "Tue, 03 Dec 2019 08:35:13 GMT"},
{"Transfer-Encoding", "chunked"},
{"Server", "AmazonS3"}
],
status_code: 200
}}
# 2.S3EventStreamをデコードする
> messages = result.body |> Aws.S3.Selector.decode()
[
%{
header: %{
":content-type" => "application/octet-stream",
":event-type" => "Records",
":message-type" => "event"
},
payload: "{\"updated_at\":\"2019-12-03 07:03:16.648 UTC\",\"user_id\":\"1\",\"point\":\"100\"}\n{\"updated_at\":\"2019-12-03 07:03:16.648 UTC\",\"user_id\":\"2\",\"point\":\"100\"}\n{\"updated_at\":\"2019-12-03 07:03:16.648 UTC\",\"user_id\":\"3\",\"point\":\"500\"}\n{\"updated_at\":\"2019-12-03 07:03:16.648 UTC\",\"user_id\":\"4\",\"point\":\"600\"}\n{\"updated_at\":\"2019-12-03 07:03:16.648 UTC\",\"user_id\":\"5\",\"point\":\"1000\"}\n"
},
%{
header: %{
":content-type" => "text/xml",
":event-type" => "Stats",
":message-type" => "event"
},
payload: "<Stats xmlns=\"\"><BytesScanned>4651</BytesScanned><BytesProcessed>579</BytesProcessed><BytesReturned>789</BytesReturned></Stats>"
},
%{header: %{":event-type" => "End", ":message-type" => "event"}, payload: ""}
]
# 3.デコードされたJSONデータをオブジェクトをMapに変換する
> records = messages |> Aws.S3.Selector.convert_messages()
[
%{
"user_id" => "1",
"point" => "100",
"updated_at" => "2019-12-03 07:03:16.648 UTC"
},
%{
"user_id" => "2",
"point" => "100",
"updated_at" => "2019-12-03 07:03:16.648 UTC"
},
%{
"user_id" => "3",
"point" => "500",
"updated_at" => "2019-12-03 07:03:16.648 UTC"
},
%{
"user_id" => "4",
"point" => "600",
"updated_at" => "2019-12-03 07:03:16.648 UTC"
},
%{
"user_id" => "5",
"point" => "1000",
"updated_at" => "2019-12-03 07:03:16.648 UTC"
}
]
真面目に設計するならそれぞれ別の役割なのでモジュールに分割するかインターナルモジュールにして内部で隠蔽化するのが良いと思います。また、テストは取得したデータをfixtureファイルとして持っておくか、Base64にしてテストファイルに付属させるのが良さそうです。需要があればExAwsS3にPRを出そうと思います。
おわりに
Elixirを使ってAWS S3 EventStreamのバイナリレスポンスをパースしました。Elixirは強力なパタンマッチを使うことができて、バイナリプロトコルを解析するのに向いています。弊社の他のエンジニアがこの辺の詳しい話はしてくれると思うので本質的なところはおまかせします。
再帰とパタンマッチ、前方でマッチした値を使って可変長なデータサイズをマッチすることもできるので、数行のパタンマッチで実装することができました。
バイナリパタンマッチをする時は是非Elixirを使ってください!