S3を準備
S3 Bucketを作成
CloudFormationを用いて以下を作成します。
- S3 Bucket
- 作成したS3からデータを取得するためのIAMユーザー
AWSTemplateFormatVersion: "2010-09-09"
Resources:
S3Bucket:
Type: "AWS::S3::Bucket"
Properties:
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
OwnershipControls:
Rules:
- ObjectOwnership: BucketOwnerEnforced
VersioningConfiguration:
Status: Enabled
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: AES256
BucketKeyEnabled: true
S3AccessUser:
Type: "AWS::IAM::User"
Properties:
Path: /
UserName: S3AccessUser
Policies:
- PolicyName: S3AccessPolicy
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action:
- "s3:GetObject"
- "s3:ListBucket"
Resource:
- !Sub "arn:aws:s3:::${S3Bucket}"
- !Sub "arn:aws:s3:::${S3Bucket}/*"
Outputs:
BucketName:
Value: !Ref S3Bucket
Description: Name of S3 bucket
UserName:
Value: !Ref S3AccessUser
Description: Name of IAM user
マネジメントコンソールからCloudFormation Stackを作成します。
データの準備
テストデータはJSON Linesファイルで用意してみます。
以下のファイルを作成されたBucketにアップロードします。
test.jsonl
{ "id": 1, "name": "aaaa" }
{ "id": 2, "name": "bbbb" }
{ "id": 3, "name": "cccc" }
{ "id": 4, "name": "dddd" }
~/.aws/credentials の設定
[test-user]
aws_access_key_id = 作成されたIAMユーザーのアクセスキー
aws_secret_access_key = 作成されたIAMユーザーのシークレットアクセスキー
Rust
依存関係
Cargo.toml
[dependencies]
aws-config = { version = "1.5.4", features = ["behavior-version-latest"] }
aws-sdk-s3 = "1.40.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.120"
コード
main.rs
use std::{env, error::Error};
use aws_sdk_s3::{
types::{
InputSerialization, JsonInput, JsonOutput, OutputSerialization,
SelectObjectContentEventStream,
},
Client,
};
use serde::{de::IgnoredAny, Deserialize};
use serde_json::from_str;
#[derive(Deserialize, Debug)]
pub struct Record {
pub id: i32,
pub name: String,
}
type AppError = Box<dyn Error>;
fn parse_line_bufferd(buf: &mut String, line: &str) -> Result<Option<Record>, AppError> {
if buf.is_empty() && is_valid_json(line) {
Ok(Some(from_str(line)?))
} else {
buf.push_str(line);
if is_valid_json(&buf) {
let record = from_str(buf)?;
buf.clear();
Ok(Some(record))
} else {
Ok(None)
}
}
}
fn is_valid_json(data: impl AsRef<str>) -> bool {
from_str::<IgnoredAny>(data.as_ref()).is_ok()
}
async fn handler(client: &Client) -> Result<Vec<Record>, AppError> {
let bucket_name = match env::var("BUCKET_NAME") {
Ok(val) => val,
Err(_) => panic!("BUCKET_NAME is not set"),
};
let object_key = match env::var("OBJECT_KEY") {
Ok(val) => val,
Err(_) => panic!("OBJECT_KEY is not set"),
};
let mut output = match client
.select_object_content()
.bucket(bucket_name)
.key(object_key)
.expression_type(aws_sdk_s3::types::ExpressionType::Sql)
.expression("SELECT * FROM s3object s")
.input_serialization(
InputSerialization::builder()
.json(
JsonInput::builder()
.r#type(aws_sdk_s3::types::JsonType::Lines)
.build(),
)
.build(),
)
.output_serialization(
OutputSerialization::builder()
.json(JsonOutput::builder().build())
.build(),
)
.send()
.await
{
Ok(val) => val,
Err(e) => panic!("Error: {:?}", e),
};
let mut processed_records: Vec<Record> = vec![];
let mut buf = String::new();
while let Some(event) = output.payload.recv().await? {
if let SelectObjectContentEventStream::Records(records) = event {
let records_str = records.payload().map(|p| p.as_ref()).unwrap_or_default();
let records_str = std::str::from_utf8(records_str).expect("invalid utf8");
for line in records_str.lines() {
if let Some(record) = parse_line_bufferd(&mut buf, line)? {
processed_records.push(record);
}
}
}
}
Ok(processed_records)
}
#[tokio::main]
async fn main() -> Result<(), AppError> {
let config = aws_config::load_from_env().await;
let client = aws_sdk_s3::Client::new(&config);
let records = handler(&client).await?;
println!("{:?}", records);
Ok(())
}
実行
以下のコマンドで実行します。
AWS_PROFILE=test-user \
BUCKET_NAME=作成されたBucket名 \
OBJECT_KEY=test.jsonl \
cargo run
以下のような出力がされれば成功です。
[Record { id: 1, name: "aaaa" }, Record { id: 2, name: "bbbb" }, Record { id: 3, name: "cccc" }, Record { id: 4, name: "dddd" }]
WHERE句も試してみます。
クエリは55行目付近のexpression
を変更します。
- idが1のレコードを取得
SELECT * FROM s3object s WHERE s.id = 1
[Record { id: 1, name: "aaaa" }]
- idが1より大きいレコードを取得
SELECT * FROM s3object s WHERE s.id > 1
[Record { id: 2, name: "bbbb" }, Record { id: 3, name: "cccc" }, Record { id: 4, name: "dddd" }]
- nameに
b
が含まれるレコードを取得
SELECT * FROM s3object s WHERE s.name LIKE '%b%'
[Record { id: 2, name: "bbbb" }]
読んでいただきありがとうございました。