0
0

S3 Selectをaws-sdk-rustで行う

Posted at

S3を準備

S3 Bucketを作成

CloudFormationを用いて以下を作成します。

  • S3 Bucket
  • 作成したS3からデータを取得するためのIAMユーザー
AWSTemplateFormatVersion: "2010-09-09"
Resources:
  S3Bucket:
    Type: "AWS::S3::Bucket"
    Properties:
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      OwnershipControls:
        Rules:
          - ObjectOwnership: BucketOwnerEnforced
      VersioningConfiguration:
        Status: Enabled
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
            BucketKeyEnabled: true
  S3AccessUser:
    Type: "AWS::IAM::User"
    Properties:
      Path: /
      UserName: S3AccessUser
      Policies:
        - PolicyName: S3AccessPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - "s3:GetObject"
                  - "s3:ListBucket"
                Resource:
                  - !Sub "arn:aws:s3:::${S3Bucket}"
                  - !Sub "arn:aws:s3:::${S3Bucket}/*"

Outputs:
  BucketName:
    Value: !Ref S3Bucket
    Description: Name of S3 bucket
  UserName:
    Value: !Ref S3AccessUser
    Description: Name of IAM user

マネジメントコンソールからCloudFormation Stackを作成します。

データの準備

テストデータはJSON Linesファイルで用意してみます。
以下のファイルを作成されたBucketにアップロードします。

test.jsonl
{ "id": 1, "name": "aaaa" }
{ "id": 2, "name": "bbbb" }
{ "id": 3, "name": "cccc" }
{ "id": 4, "name": "dddd" }

~/.aws/credentials の設定

[test-user]
aws_access_key_id = 作成されたIAMユーザーのアクセスキー
aws_secret_access_key = 作成されたIAMユーザーのシークレットアクセスキー

Rust

依存関係

Cargo.toml
[dependencies]
aws-config = { version = "1.5.4", features = ["behavior-version-latest"] }
aws-sdk-s3 = "1.40.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.120"

コード

main.rs
use std::{env, error::Error};

use aws_sdk_s3::{
    types::{
        InputSerialization, JsonInput, JsonOutput, OutputSerialization,
        SelectObjectContentEventStream,
    },
    Client,
};
use serde::{de::IgnoredAny, Deserialize};
use serde_json::from_str;

#[derive(Deserialize, Debug)]
pub struct Record {
    pub id: i32,
    pub name: String,
}

type AppError = Box<dyn Error>;

fn parse_line_bufferd(buf: &mut String, line: &str) -> Result<Option<Record>, AppError> {
    if buf.is_empty() && is_valid_json(line) {
        Ok(Some(from_str(line)?))
    } else {
        buf.push_str(line);
        if is_valid_json(&buf) {
            let record = from_str(buf)?;
            buf.clear();
            Ok(Some(record))
        } else {
            Ok(None)
        }
    }
}

fn is_valid_json(data: impl AsRef<str>) -> bool {
    from_str::<IgnoredAny>(data.as_ref()).is_ok()
}

async fn handler(client: &Client) -> Result<Vec<Record>, AppError> {
    let bucket_name = match env::var("BUCKET_NAME") {
        Ok(val) => val,
        Err(_) => panic!("BUCKET_NAME is not set"),
    };
    let object_key = match env::var("OBJECT_KEY") {
        Ok(val) => val,
        Err(_) => panic!("OBJECT_KEY is not set"),
    };

    let mut output = match client
        .select_object_content()
        .bucket(bucket_name)
        .key(object_key)
        .expression_type(aws_sdk_s3::types::ExpressionType::Sql)
        .expression("SELECT * FROM s3object s")
        .input_serialization(
            InputSerialization::builder()
                .json(
                    JsonInput::builder()
                        .r#type(aws_sdk_s3::types::JsonType::Lines)
                        .build(),
                )
                .build(),
        )
        .output_serialization(
            OutputSerialization::builder()
                .json(JsonOutput::builder().build())
                .build(),
        )
        .send()
        .await
    {
        Ok(val) => val,
        Err(e) => panic!("Error: {:?}", e),
    };

    let mut processed_records: Vec<Record> = vec![];
    let mut buf = String::new();

    while let Some(event) = output.payload.recv().await? {
        if let SelectObjectContentEventStream::Records(records) = event {
            let records_str = records.payload().map(|p| p.as_ref()).unwrap_or_default();
            let records_str = std::str::from_utf8(records_str).expect("invalid utf8");
            for line in records_str.lines() {
                if let Some(record) = parse_line_bufferd(&mut buf, line)? {
                    processed_records.push(record);
                }
            }
        }
    }

    Ok(processed_records)
}

#[tokio::main]
async fn main() -> Result<(), AppError> {
    let config = aws_config::load_from_env().await;
    let client = aws_sdk_s3::Client::new(&config);

    let records = handler(&client).await?;
    println!("{:?}", records);
    Ok(())
}

実行

以下のコマンドで実行します。

AWS_PROFILE=test-user \
BUCKET_NAME=作成されたBucket名 \
OBJECT_KEY=test.jsonl \
cargo run

以下のような出力がされれば成功です。

[Record { id: 1, name: "aaaa" }, Record { id: 2, name: "bbbb" }, Record { id: 3, name: "cccc" }, Record { id: 4, name: "dddd" }]

WHERE句も試してみます。
クエリは55行目付近のexpressionを変更します。

  • idが1のレコードを取得
SELECT * FROM s3object s WHERE s.id = 1
[Record { id: 1, name: "aaaa" }]
  • idが1より大きいレコードを取得
SELECT * FROM s3object s WHERE s.id > 1
[Record { id: 2, name: "bbbb" }, Record { id: 3, name: "cccc" }, Record { id: 4, name: "dddd" }]
  • nameにbが含まれるレコードを取得
SELECT * FROM s3object s WHERE s.name LIKE '%b%'
[Record { id: 2, name: "bbbb" }]

読んでいただきありがとうございました。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0