Created
December 3, 2019 08:11
-
-
Save oppai/36bd2de6ba3739d7ed539781bf9f275c to your computer and use it in GitHub Desktop.
S3 select objectのAPIクライアント & デコーダ
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Aws.S3.Selector do | |
@doc """ | |
S3SelectObject用のモジュール | |
> {:ok, result} = Aws.S3.Selector.query("my-bucket", | |
"x_report/result.parquet", | |
"select * from s3object") | |
> messages = Aws.S3.Selector.decode(result.body) | |
[%{ | |
header: %{ | |
":content-type" => "application/octet-stream", | |
":event-type" => "Records", | |
":message-type" => "event" | |
}, | |
payload: "xxxxxxx\n" | |
}, | |
%{ | |
header: %{ | |
":content-type" => "text/xml", | |
":event-type" => "Stats", | |
":message-type" => "event" | |
}, | |
payload: "<Stats xmlns=\"\"><BytesScanned>4597</BytesScanned><BytesProcessed>543</BytesProcessed><BytesReturned>753</BytesReturned></Stats>" | |
}, | |
%{ | |
header: %{":event-type" => "End", ":message-type" => "event"}, | |
payload: "failed" | |
}] | |
> messages |> Aws.S3.Selector.convert_messages() | |
[%{ | |
"xxxx" => "yyy", | |
"yyyy" => "zzz" | |
}, | |
%{ | |
"xxxx" => "yyy", | |
"yyyy" => "zzz" | |
}] | |
Message Structure | |
Also See: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html | |
+--------------+---------------+-------------+----------+---------+-------------+-------+---- | |
| total_length | header_length | payload_crc | header | payload | message_crc | .. Next | |
| 32 bit | 32 bits | 32 bit | x bit | y bit | 32 bits | .. Message | |
+--------------+---------------+-------------+----------+---------+-------------+------------ | |
""" | |
def decode( | |
<< | |
total_len::big-integer-size(32), | |
header_len::big-integer-size(32), | |
prelude_crc::big-integer-size(32), | |
header::binary-size(header_len), | |
body::binary | |
>> = data | |
) do | |
unless is_valid_crc(<<total_len::big-integer-size(32), header_len::big-integer-size(32)>>, prelude_crc) do | |
raise Aws.S3.Selector.CRCError | |
end | |
# payload_length = total_length - header_length - sizeOf(total_length) | |
# - sizeOf(header_length) - sizeOf(prelude_crc) - sizeOf(message_crc) | |
payload_len = total_len - header_len - 16 | |
# message_len = total_len - sizeOf(message_crc) | |
message_len = total_len - 4 | |
decoded_header = decode_header(header) | |
body | |
|> case do | |
<<payload::binary-size(payload_len), m_crc::big-integer-size(32)>> when payload_len > 0 -> | |
<<m_data::binary-size(message_len), _::binary>> = data | |
unless is_valid_crc(m_data, m_crc), do: raise(Aws.S3.Selector.CRCError) | |
[%{header: decoded_header, payload: payload}] | |
<<payload::binary-size(payload_len), m_crc::big-integer-size(32), tail::binary>> when payload_len > 0 -> | |
<<m_data::binary-size(message_len), _::binary>> = data | |
unless is_valid_crc(m_data, m_crc), do: raise(Aws.S3.Selector.CRCError) | |
[%{header: decoded_header, payload: payload}] ++ decode(tail) | |
_ -> | |
[%{header: decoded_header, payload: ""}] | |
end | |
end | |
defp decode_header(data) do | |
case data do | |
<<kl::8, key::binary-size(kl), _type::8, vl::2*8, value::binary-size(vl)>> -> | |
%{key => value} | |
<<kl::8, key::binary-size(kl), _type::8, vl::2*8, value::binary-size(vl), tail::binary>> -> | |
Map.merge(%{key => value}, decode_header(tail)) | |
_ -> | |
%{} | |
end | |
end | |
defp is_valid_crc(<<data::binary>>, checksum) do | |
CRC.crc_32(data) === checksum | |
end | |
def convert_messages(messages) do | |
messages | |
|> Enum.map(fn msg -> | |
msg.header[":event-type"] | |
|> case do | |
"Records" -> | |
msg.payload |> String.split("\n") |> Enum.reject(&(&1 == "")) | |
_ -> | |
[] | |
end | |
end) | |
|> List.flatten() | |
|> Enum.map(&Poison.decode!/1) | |
end | |
def query(bucket_name, parquet_path, expression) do | |
%ExAws.Operation.S3{ | |
body: body(expression), | |
bucket: bucket_name, | |
headers: %{}, | |
http_method: :post, | |
params: %{}, | |
parser: &ExAws.Utils.identity/1, | |
path: "#{parquet_path}?select&select-type=2", | |
resource: "", | |
service: :s3, | |
stream_builder: nil | |
} | |
|> ExAws.request() | |
end | |
defp body(expression), | |
do: """ | |
<?xml version="1.0" encoding="UTF-8"?> | |
<SelectRequest> | |
<Expression>#{expression}</Expression> | |
<ExpressionType>SQL</ExpressionType> | |
<InputSerialization> | |
<CompressionType>NONE</CompressionType> | |
<Parquet /> | |
</InputSerialization> | |
<OutputSerialization> | |
<JSON> | |
<CompressionType>NONE</CompressionType> | |
<RecordDelimiter>\n</RecordDelimiter> | |
</JSON> | |
</OutputSerialization> | |
<RequestProgress> | |
<Enabled>FALSE</Enabled> | |
</RequestProgress> | |
</SelectRequest> | |
""" | |
defmodule(CRCError, do: defexception(message: "crc checksum error")) | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment