はじめに
フルカイテンが提供する在庫分析SaaS「FULL KAITEN V3」(以下FULL KAITEN)では、顧客データをETLを介してデータストアに格納しています。日次バッチとして、ETLでのデータ処理→データストアに処理したデータの投入を実行しています。
AWS Athana移行前のFULL KAITENでは、データストアとして、Elasticsearchを使用していました。多くの顧客データを扱うようになり、Elasticsearchへのデータ投入後から利用可能な状態になるまでに時間がかかってしまう問題が発生しました。そこで、データストアをS3へのデータ投入後に速やかに利用することができるAWS Athanaに移行しています。
本記事では、ElasticsearchからAthenaへデータストアを移行した経緯や理由を紹介します。
FULL KAITENではWebAPIにRustを使用しています。後半ではRustでAthenaにリクエストを投げる際の実装例についても紹介します。
※本記事は筆者が担当者へのヒアリングを行なった情報に基づいて執筆しております。
Elasticsearchの導入
FULL KAITEN V3の前身、FULL KAITEN V2では、データの読み込みに時間がかかっており、画面が表示されるまで5分ほどかかっていたそうです。スキーマレスなデータを投入できるElasticsearchにすることでデータの読み込み時間を削減する狙いがあったそうです。
非正規化されたデータの方がソート、フィルタリングがしやすいだろうという意図もあったそうです。
Elasticsearch使用時の問題
1.データの投入に時間がかかる
Elasticsearchはデータ投入時にindexingを行うため、データ投入から利用可能になるまでに時間を要します。FULL KAITENの日次バッチではデータ投入時間が処理時間の40%ほどを占めていました。顧客が増えるに従いリソース競合も発生し、データ投入時間がさらに伸びる状態でした。
FULL KAITENのプロダクト特性上、顧客は集計処理された結果を朝一番に確認したいという要求があるので最も改善すべき点です。
2.ホットデータとコールドデータが混じっている
スキーマレスな状態のデータをElasticsearchに投入したため、1data内でホットデータとコールドデータがごちゃ混ぜになっており、リクエスト時にリソースの無駄遣いをしてしまっている状態でした。そのため、インスタンスサイズが増大し、コストがかさんでしまっていました。
3.集計(COUNT)された値が概数値である
ElasticsearchのCardinality aggregationは対象の概数値を集計するクエリであり、公式リファレンスには1~6%の誤差があると記載されています。
そのため、正確な値が欲しい場面では活用できない側面があります。
AWS Athena導入にあたっての技術選定ポイント
Elasticsearchに代わる新たなデータストアを検討する際に候補として、snowflakeとAWS Athenaが挙がりました。その中でAthenaの採用に至ったポイントを2点取り上げます。
1.利用可能になるまでの時間が短い
snowflakeはS3上にあるデータをロード処理する必要があり、利用可能までに時間がかかってしまいます。Athenaの場合は、S3上にあるデータに対してクエリを実行できるため、利用可能になるまでの時間がかかりません。
2.パーティションを切ることで参照データを絞り込める
Athenaではパーティションを作成することで、データの読み込みをホットデータのみに絞り込むことができます。適切なパーティション設計を行えば、パフォーマンスの低下を抑えることができるとともにコストを抑えることもできます。パーティションは処理日時を第一パーティションとすると良いです。
FULL KAITENでは処理日時をat
としてパーティションを作成しています。
s3://***backet/pathToTable/at=20220527_121845/some-bucket/***.parquet
Athena導入の注意点
ここまでFULL KAITENでのAthena導入について紹介しました。
読者のみなさまの中には、Athenaを導入してみたいと考えていらっしゃる方もいるかと思います。しかし、Athenaとて万能なツールではありませんので、技術選定の際は注意が必要です。
以下に注意点を挙げます。
- 1クエリあたりの時間は他のデータストアよりも多くかかる
- リクエスト上限がある(AWSサポートに問い合わせをすれば上げてもらえるが条件有)
- (Elasticsearchやsnowflakeのような)統計処理は行われない
FULL KAITENのような(toB向け)バーティカルSaaSや業務システムには向いている傾向にある反面、toCプロダクト・サービスにおいては、特性やユースケースを踏まえて検討する方が無難かと思います。
RustでAhenaに対してクエリを実行する例
ライブラリ
- rusoto_core = "0.43.0”
- rusoto_athena = "0.43.0”
- tokio = "0.2.5”
- actix-web = "3.3.2”
今回の実装例ではactix-web3系上でのWebアプリを想定して、非同期ランタイムはtokio = "0.2.5”を指定しています。rusotoのバージョンがtokioに依存するのでご注意ください。
リクエスト処理の実装例
大まかな処理の流れを説明すると以下のようになります。
StartQueryExecutionでリクエストを送る→GetQueryExecutionでクエリ実行情報を取得する→GetQueryResultsで実行結果を取得する→実行結果に後続データがある場合は再度リクエストを送る
※本プログラムはFULL KAITENで使用されているものの一部抜粋です。
use async_std::task;
use exponential_backoff::Backoff;
use once_cell::sync::Lazy;
use rusoto_athena::*;
use rusoto_core::Region;
use std::time::Duration;
// リトライ処理の閾値
static BACKOFF: Lazy<Backoff> =
Lazy::new(|| Backoff::new(8, Duration::from_millis(100), Duration::from_secs(10)));
pub struct AthenaRequest {
client: AthenaClient, //rusoto-ahenaのclient造体
query_string: String, //Athenaへ投げるクエリ
query_execution_id: String, //クエリ実行ID
csv_path: String, //クエリの実行出力先CSVのS3path
state: String, //クエリの実行状態。SUCCEEDED FAILED CANCELLEDのいずれか。
next_token: Option<String>, //次ページあればnext_tokenに値が入る
}
impl AthenaRequest {
pub fn csv_path(&self) -> &str {
self.csv_path.as_str()
}
pub async fn new(region: Region, query_string: String) -> anyhow::Result<Self> {
Ok(Self::new_with_workgroup(region, query_string, None, None, None).await?)
}
pub async fn new_with_workgroup(
region: Region,
query_string: String,
work_group: Option<String>, //Athenaのワークグループ
output_location: Option<String>, //クエリの実行出力先CSVのS3path
client_request_token: Option<String>,
) -> anyhow::Result<Self> {
let client: AthenaClient = AthenaClient::new(region);
// StartQueryExecutionの実行
let job: StartQueryExecutionOutput = client
.start_query_execution(StartQueryExecutionInput {
client_request_token: client_request_token
.or(Some(uuid::Uuid::new_v4().to_string())),
query_execution_context: None,
query_string: query_string.clone(),
result_configuration: Some(ResultConfiguration {
encryption_configuration: None,
output_location,
}),
work_group,
})
.await?;
let query_execution_id = job.query_execution_id.unwrap();
//クエリの実行情報を取得
let exec_output = client
.get_query_execution(GetQueryExecutionInput {
query_execution_id: query_execution_id.clone(),
})
.await?;
let csv_path: String = exec_output
.query_execution
.as_ref()
.map(|qe| {
qe.result_configuration
.as_ref()
.map(|rc| rc.output_location.clone())
})
.flatten()
.flatten()
.unwrap();
println!("{}", &csv_path);
let state = Self::get_execution_state(&exec_output).unwrap();
Ok(AthenaRequest {
client,
query_string,
query_execution_id,
csv_path,
state,
next_token: None,
})
}
//クエリの実行状態を取得
fn get_execution_state(get_query_execution_output: &GetQueryExecutionOutput) -> Option<String> {
get_query_execution_output
.query_execution
.as_ref()
.map(|qe| qe.status.as_ref().map(|e| e.state.clone()))
.flatten()
.flatten()
}
//finishまでクエリの実行状態をポーリング
pub async fn await_finish(&mut self) -> anyhow::Result<()> {
match self.state.as_str() {
"SUCCEEDED" => return Ok(()),
"FAILED" => anyhow::bail!("request failed"),
"CANCELLED" => anyhow::bail!("request cancelled"),
_ => {}
};
// リトライ処理
for duration in &BACKOFF.clone() {
task::sleep(duration).await;
let ex = self
.client
.get_query_execution(GetQueryExecutionInput {
query_execution_id: self.query_execution_id.clone(),
})
.await?;
let state: String = Self::get_execution_state(&ex).unwrap();
self.state = state.clone();
match state.as_str() {
"SUCCEEDED" => return Ok(()),
"FAILED" => anyhow::bail!("request failed"),
"CANCELLED" => anyhow::bail!("request cancelled"),
_ => {}
};
}
anyhow::bail!("request timeout")
}
//クエリ実行結果を取得
pub async fn get_result(&mut self) -> anyhow::Result<(bool, Option<ResultSet>)> {
let _ = self.await_finish().await?;
let result = self
.client
.get_query_results(GetQueryResultsInput {
max_results: Some(3),
next_token: self.next_token.clone(),
query_execution_id: self.query_execution_id.clone(),
})
.await?;
self.next_token = result.next_token.clone();
Ok((result.next_token.is_some(), result.result_set))
}
}
実行方法
テストモジュールを用いた簡単な実行方法を紹介します。
後続データがなくなるまでリクエストを送り、実行結果を出力する例です。
※本プログラムはFULL KAITENで使用されているものの一部抜粋です。
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn query_test() {
let mut rt = actix_rt::Runtime::new().unwrap();
let result = rt.block_on(query_test_sub());
println!("{:?}", result);
}
async fn query_test_sub() -> anyhow::Result<()> {
let mut request = AthenaRequest::new_with_workgroup(
Region::UsEast1,
"SELECT * FROM \"sample_test_athena\".\"demo\" limit 10".to_string(),
Some("primary".to_string()),
Some("s3://aws-athena-test-us-east-1/workgroups/primary/".to_string()),
None,
)
.await?;
let mut has_next = true;
while has_next {
let result = request.get_result().await?;
println!("awaitResult({:?}): {:?}", &request.next_token, &result.1);
let _result_set = result.1.unwrap();
has_next = result.0
}
Ok(())
}
終わりに
本記事の内容以外にも、FULL KAITENの開発で工夫した点、改善点、苦労した部分などまだまだあります。またの機会に紹介できればと思います。
また、フルカイテンではRust、Athena、PySparkでビッグデータ処理に関わりたいエンジニアを募集しています。