本記事の目的
データプロバイダーとしてデータコンシューマに対して適切にフォーマットされたデータを供給する共通機能を設計するにあたり、候補となるAthenaを機能面、非機能面で評価するのがこの記事の目的
データの前処理という観点ではGlue Databrew、Glueという選択肢があるが、こちらは後日評価して記事を投稿する予定。スケールしないRedshiftのリーダーノードを使って実装しようという発想は私にはない。
共通処理の例
- 余計な空白削除処理
- 日付のフォーマット変換(yyyymmdd⇒yyyy/mm/dd)
- 文字列結合
- コードの桁揃え、文字のパディング(数字コードなら0埋め)
評価の観点
- データ変換処理のための標準関数の品ぞろえ
- 標準機能での性能
- 標準関数でカバーできない場合(があったとしてもの趣旨)のユーザ定義関数の開発容易性と性能
- データカタログ機能(Glue)との統合
テストデータの準備
Amazon Athenaはミリ秒を求めるOLTPではなく大量データの分析処理を得意とする。並列分散処理の内部機構のオーバーヘッドが相応に大きいため100万件レベルのデータでは適切に評価ができない。少なくとも1000万件レベル以上のデータで検証するのが望ましい。
本検証で利用したデータ増幅用のプログラムは下記の通り。
Athenaは、列フォーマットや圧縮機能等の内部フォーマッティング機能に優れており、単純文字列の膨らましでは追い風参考記録(POCでは性能がよかったが本番で性能が出ない!)となる。
テストデータのカーディナリティやランダム(=圧縮率)の部分でよくよく注意を払う必要がある。
import java.io.File;
import java.io.FileWriter;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Random;
public class CreateData {
static int rowCount = 10000000; // デフォルトのレコード件数は1000万レコード
static int stringLength = 10;
static String delimiter = ",";
static String blank = " ";
static String nullString = null;
static String outputFile = "data.csv";
public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();
// 第一引数が出力するレコード件数
if (args.length >= 1) {
if (args[0] != null && args[0].chars().allMatch(Character::isDigit)) {
rowCount = Integer.parseInt(args[0]);
} else {
System.out.println("args[0] is not number.");
return;
}
}
int divisionNumber = 1000000; // 分割単位の数。100万件レコードずつ処理する。大きすぎるとOutOfMemoryErrorとなる。
int loopCount = rowCount / divisionNumber; // rowCountが4000000でdivisionNumberが1000000だと4回ループ
int lastLoopCount = rowCount % divisionNumber; // 最後のループ数。あまり分。
File file = new File(outputFile);
FileWriter filewriter = new FileWriter(file);
try {
if (loopCount > 0) {
for (int i = 0; i < loopCount; i++) {
getData(0, divisionNumber, filewriter);
System.out.print(((i + 1) * divisionNumber) + "件処理完了\n");
}
}
getData(0, lastLoopCount, filewriter);
if (lastLoopCount != 0) {
System.out.print(((divisionNumber * loopCount) + lastLoopCount) + "件処理完了\n");
}
} catch (Exception e) {
System.out.println(e);
} finally {
filewriter.close();
}
long endTime = System.currentTimeMillis();
System.out.println("実行時間:" + (endTime - startTime) + "msec");
}
public static void getData(int start, int end, FileWriter filewriter) {
int rowCount = end - start;
try {
String lineString = "";
String[] includeBlankString = getIncludeBlankString(rowCount);
String[] dateString1 = getDateString(rowCount);
String[] dateString2 = getDateString(rowCount);
String[] dateString3 = getDateString(rowCount);
String[] fullName = getFullName(rowCount);
String[] number = getNumber(rowCount);
for (int i = 0; i < rowCount; i++) {
lineString = includeBlankString[i] + delimiter;
lineString = lineString + dateString1[i] + delimiter;
lineString = lineString + dateString2[i] + delimiter;
lineString = lineString + dateString3[i] + delimiter;
lineString = lineString + fullName[i] + delimiter;
lineString = lineString + number[i] + delimiter + number[i] + "\n";
// System.out.print(lineString);
filewriter.write(lineString);
}
} catch (Exception e) {
System.out.println(e);
}
}
public static String[] getIncludeBlankString(int rowCount) {
String[] resultString = new String[rowCount];
for (int i = 0; i < rowCount; i++) {
resultString[i] = getIncludeBlankString();
}
return resultString;
}
public static String getIncludeBlankString() {
String resultString = "";
String headBlank = " ";
String tailBlank = " ";
Random rand = new Random();
int blankLength = rand.nextInt(3);
for (int i = 0; i < blankLength; i++) {
headBlank = headBlank + blank;
tailBlank = tailBlank + blank;
}
resultString = getRendomString(stringLength);
resultString = headBlank + resultString;
resultString = resultString + tailBlank;
return resultString;
}
public static String[] getDateString(int rowCount) {
String[] resultString = new String[rowCount];
for (int i = 0; i < rowCount; i++) {
resultString[i] = getDateString();
}
return resultString;
}
public static String getDateString() {
String nullDate = "";
String invalideDate = "88888888";
String maxDate = "99999999";
Random rand = new Random();
int pattern = rand.nextInt(100);
if (pattern == 0) {
return nullDate;
} else if (pattern == 1) {
return invalideDate;
} else if (pattern == 2) {
return maxDate;
} else {
return getRandomDate();
}
}
private static String getRandomDate() {
// 開始日
Calendar dateStart = Calendar.getInstance();
dateStart.set(1997, 1, 1);
// 終了日
Calendar dateEnd = Calendar.getInstance();
// 期間日数
long dateDiff = (dateEnd.getTimeInMillis() - dateStart.getTimeInMillis())
/ (1000 * 60 * 60 * 24);
// ランダムな年月日の生成
Calendar dateRandom = Calendar.getInstance();
dateRandom.add(dateEnd.DATE, -new Random().nextInt((int) dateDiff));
// 取得形式の設定
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
String date = sdf.format(dateRandom.getTime());
return date;
}
public static String[] getFullName(int rowCount) {
String[] resultString = new String[rowCount];
for (int i = 0; i < rowCount; i++) {
resultString[i] = getFullName();
}
return resultString;
}
public static String getFullName() {
String noBlankName = "性" + getRendomString(4) + "名";
String valid = "性" + getRendomString(2) + " " + getRendomString(2) + "名";
String zenkaku = "性" + getRendomString(2) + " " + getRendomString(2) + "名";
Random rand = new Random();
int pattern = rand.nextInt(10);
if (pattern == 1) {
return noBlankName;
} else if (pattern == 2) {
return zenkaku;
} else {
return valid;
}
}
public static String[] getNumber(int rowCount) {
String[] resultString = new String[rowCount];
for (int i = 0; i < rowCount; i++) {
resultString[i] = getNumber();
}
return resultString;
}
public static String getNumber() {
Random rand = new Random();
int number = rand.nextInt(999999);
return Integer.toString(number);
}
public static String getRendomString(int length) {
String alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
Random r = new Random();
String output = "";
for (int i = 0; i < length; i++) {
char c = alphabet.charAt(r.nextInt(alphabet.length()));
output = output + c;
}
return output;
}
}
$java CreateData
上記プログラムを実行し、生成されたdata.csvをS3に格納しておく。
デフォルトで1000万件レコードを生成する。引数指定でデータ件数変更可能。
$aws s3 cp data.csv s3://xxxxxxxxx/test6/
xxxxはS3バケット
次にAthenaのテーブルを作成する。Athenaの利用に先んじてハンズオンを実施しておくことをお勧めする。
おすすめのAthena Workshop
大きく二つのハンズオンがあり、一つ目の初級編はAthenaの操作に慣れるためにも実施することをお勧めする。二つ目のハンズオンはCloudFormationのスタック注意したい。あらかじめスタック詳細を把握して使わないと余計な従量課金が発生するのとCloud9の使い勝手が悪く非効率なため。
本記事ではworkshopのUDF関数作成手順ではなく、
UDF関数の作成方法
を参考にしている。
まずはAthenaのファイル(テーブル)作成。
上流システムから受け取るファイルを想定し、csvフォーマットとしている。
CREATE EXTERNAL TABLE IF NOT EXISTS `testdb`.`test6` (`col1` string, `col2` string, `col3` string, `col4` string, `col5` string, `col6` string, `col7` string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES ('field.delim' = ',')
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://xxxx/test6/'
TBLPROPERTIES ('classification' = 'csv');
Athenaの武器はPythonやJavaなどの高級言語でデータ処理コードを書かずして、SQLとPresto標準関数のみで簡易なETL処理が組めること。(=SQLで表現できる。)
AthenaではSQL実行結果をS3に出力させるAPIが提供されており、本検証で利用している。
また、OSSのtrino(presto)関数がそのまま利用して相応に複雑なETL処理をAPI1コールで済ますことが可能。
関数一覧
本検証では文字列操作のtrim, lpad, concatを利用している。
元データ(data.csv)を仮想テーブルとみたてて簡易ETL処理で加工後ファイルをS3に格納する処理
#!/bin/bash
log=result.log
#sql="USING EXTERNAL FUNCTION dateconvert(col2 VARCHAR) RETURNS VARCHAR LAMBDA 'customudf' SELECT trim(col1) as col1, col2, col3, col4, trim(col5) as col5, concat(lpad(col6,7,'0'), lpad(col7,7,'0')) as col6 FROM testdb.test6"
sql="USING EXTERNAL FUNCTION dateconvert(col2 VARCHAR) RETURNS VARCHAR LAMBDA 'customudf' SELECT trim(col1) as col1, dateconvert(col2) as col2, col3, col4, trim(col5) as col5, concat(lpad(col6,7,'0'), lpad(col7,7,'0')) as col6 FROM testdb.test6"
#sql="USING EXTERNAL FUNCTION dateconvert(col2 VARCHAR) RETURNS VARCHAR LAMBDA 'customudf' SELECT trim(col1) as col1, dateconvert(col2) as col2, dateconvert(col3) as col3, col4, trim(col5) as col5, concat(lpad(col6,7,'0'), lpad(col7,7,'0')) as col6 FROM testdb.test6"
#sql="USING EXTERNAL FUNCTION dateconvert(col2 VARCHAR) RETURNS VARCHAR LAMBDA 'customudf' SELECT trim(col1) as col1, dateconvert(col2) as col2, dateconvert(col3) as col3, dateconvert(col4) as col4, trim(col5) as col5, concat(lpad(col6,7,'0'), lpad(col7,7,'0')) as col6 FROM testdb.test6"
id=`aws athena start-query-execution --work-group "primary" --query-string "$sql" --result-configuration OutputLocation=s3://xxxxx --output text`
echo $id >> $log
status=`aws athena get-query-execution --query-execution-id $id | grep SUCCEEDED | awk -F " " '{print $2}' | sed s/\"//g | sed s/,//g`
echo $status >> $log
success="SUCCEEDED"
while [[ $status != $success ]]
do
sleep 1
status=`aws athena get-query-execution --query-execution-id $id | grep SUCCEEDED | awk -F " " '{print $2}' | sed s/\"//g | sed s/,//g`
echo $status >> $log
done
bucket=`aws athena get-query-execution --query-execution-id $id | grep OutputLocation | awk -F " " '{print $2}' | sed s/\"//g`
echo $bucket >> $log
executionTime=`aws athena get-query-execution --query-execution-id $id | grep TotalExecutionTimeInMillis`
echo $executionTime >> $log
SQL実行は同期処理ではなく、非同期処理となるため実行ステータスを1秒間隔でポーリングしている。
ステータスがSUCCEEDEDになれば処理が完了し、結果セットもS3へ格納されている。
本検証における実行時間計測はAthena APIで内部保持されるTotalExecutionTimeMillsを採用した。
本検証では自作のUDF関数「dateconvert」を利用している。
利用方法等は上記のリンクが参考になる。
Gitからソースコード一式をcloneし、UserDefinedFunctionHandlerをextendsしたクラスに下記の関数を追加してmvn実行すればLambdaとしてデプロイ可能なjarファイルが生成できる。開発容易性の面では良いと評価。
本検証で追加したJavaのメソッドは下記の2つ。
public static String dateconvert(String input) {
if (input == null || input.equals("") || input.length() != 8) {
return "0001-01-01";
} else if (input.equals("99999999")) {
return "9999-12-31";
} else {
if (!isDate(input)) {
return "0001-01-01";
} else {
return input.substring(0, 4) + "-" + input.substring(4, 6) + "-" + input.substring(6);
}
}
}
public static boolean isDate(String value) {
boolean result = false;
try {
if (value != null) {
String checkDate = value.replace("-", "").replace("/", "");
DateTimeFormatter.ofPattern("uuuuMMdd").withResolverStyle(ResolverStyle.STRICT).parse(checkDate,
LocalDate::from);
result = true;
}
} catch (Exception e) {
result = false;
}
return result;
}
検証結果総評
- SQL同時実行数(1-15)を増やしても性能劣化はみられなかった。
- データ件数比例で実行時間が伸びる傾向ではあるが劣化傾向ではなかった。(むしろ良い結果)
- UDFのオーバーヘッドは標準関数と比較して高いがSQL中の利用回数で性能劣化はみられなかった。
- UDFを利用するケースにおいてlambdaデプロイ直後、一発目等の節目で若干実行時間を要することがある。が使い物にならないレベルではなかった。
- 約1GBの生データをフルスキャンしてデータ加工とファイル出力が約20秒なのは優秀な気がする。(気がするだけで他の方式と比較しないといけない。)
その他
- UDFがCTASで使えなかった。viewでUDFが使えないのはマニュアルに記載されている。
- Lambdaのinvocationカウントの仕様が不明。まとまった回数で呼ばれるとドキュメントに記載されているが正確な値がわからず必要経費が読めない。マネコン情報からは他ソリューションと比較して無視できるレベル。
- 今回のような簡単な処理ではCTASの出番はないがより複雑なケースではCTASが活躍する。(=高度なフォーマットと圧縮環境下でスキャン量が減り処理性能が向上する。)
- Cloud9はどのユースケースで使うのか?(使い勝手悪いだけじゃなく、キラー機能があるかもしれないので今後調査する。)。
- Glueデータカタログとの統合がされている。Data Zoneの登場によって益々S3(Athena)がいいものになりつつある。
- Athenaクォーターがどこまで緩和申請できるか?。大量ユーザがオンラインで利用するシステムにはむいていないと評価しているが大量数ファイルを対象とするバッチの処理並列度をあげることができるとうれしい。
- 追記:自作Java vs Athena