はじめに
どうも、セキュリティ芸人ことアスースン・オンラインの「セキュリティホール」兼相方のそね氏です。今回、脆弱エンジニアのAdvent Calendar 2024に空いた“ポストホール”を埋める役割が回ってきたということで、筆を執りました。
実は、ストレングス・ファインダーで「回復志向」が上位にランクインしている私が、このミッションに選ばれるのはある意味必然と言えるのかもしれません。それでは、ここから本題に入っていきたいと思います。
概要
元来、このアドベント・カレンダーはセキュリティ縛りのネタであることが暗黙の了解であったが、セキュリティホールの肩書きを持つ我が身としては、これは守りたくない規則である。
故に、全く関係のない記事を書きたいと思う。
NOAA-ISDとは、アメリカ国立海洋大気庁(NOAA)が提供する包括的な地表観測データベースである。このようにDailyで更新されるそこそこの大きさの気象データがあると、自動取得するシステムを組んで自分のMySQLに入れたくなるのが人間の性だ。きっとみんなそうしたくなると思う。よって今回はこのデータのETLを作って行きたいと思う。
ゴールは
- 都合の良いフォーマットで
- 自分のAWSアカウントのS3バケットに
- 過去分(2021年~)を取り込み
- Dailyで増分更新する処理を作成する
である。
データソース
先述のドキュメントを見ると、データソースの概略は下記の通り。
- Description: ISD in CSV format
- Resource type: S3 Bucket
- ARN:
arn:aws:s3:::noaa-global-hourly-pds
- AWS Region:
us-east-1
- AWS CLI Access (No AWS account required):
aws s3 ls --no-sign-request s3://noaa-global-hourly-pds/
- Explore: Browse Bucket
要するに、バージニア北部リージョンにある公開S3で用意されているということ。取り上えず2023年分のデータ総量を見てみる。
% aws s3 ls --no-sign-request s3://noaa-global-hourly-pds/2023/ --recursive --human-readable --summarize | grep "Total Size" | awk '{print $3}'
48.6
ざっくり1年で50GBであることが分かった。これを何も考えずにus-east-1
から転送してしまうと、1年分一回あたり$5かかる。よって、バージニア北部でなるべく軽くして(S3は同一リージョン間のデータ転送料はタダ)自分のS3に格納したくなるのが人間の性だ。
さらに面倒なことに、
% aws s3 ls --no-sign-request s3://noaa-global-hourly-pds/2023/
2024-03-28 23:40:39 3025698 01001099999.csv
2024-03-28 23:44:33 1900320 01001499999.csv
2024-03-28 23:40:35 145363 01002099999.csv
2024-03-28 23:40:20 429820 01003099999.csv
2024-03-28 23:42:38 153103 01006099999.csv
...
と、Daily更新は同一ファイル名の.CSVの上書きで行われている様である。つまり、脳死で増分更新を試みるとその時点でのフルデータを毎回取ってくることになる。この点もなんとか上手くかわしたい(コスト的に)。
ここまでのETL仕様のポイントをまとめると以下の通り。
- なるべくバージニア北部でデータを軽量化
- フルデータを参照しないで増分更新する方法はないか
Amazon Athenaを用いたフォーマッティング
AWSにはS3に保存されたデータをクエリして情報を取得するためのサービスとして、Amazon S3 SelectとAmazon Athenaがある。似て非なるものだが、簡単に言えばAthenaの方がリッチなSQL(Prestoエンジン)をサポートし、スキャンデータサイズに応じて課金。一方、S3 SelectはインラインなんちゃってSQLで抽出データに基づいて課金される。
つまり、
- 過去分(2021年~現在)バルク取得→Athena
- 増分更新→S3 Select
が最適解な雰囲気だ。実際にAthenaのスキャンデータ課金料は$5.00/TB、つまり、50GB≒$0.25と脳死転送をした時の$5に比べたら全然可愛い。ここで、必要列だけを抽出したり、圧縮したりなど出来ればかなり良さそうだ。
ちなみに、増分更新のS3 Selectは一日一行×気象ステーション数であり、ハナクソみたいな金額なので無視する。
Athenaによるデータのバルク取得
さてここで、Athenaから実際にクエリを投げて都合の良いフォーマットにしてみる。勿論、Athenaを動かすリージョンはus-east-1
であることに留意
先ずはTableの作成から。
NOAAのドキュメントから各列の意味を見ながら必要なものだけをselect
文で選んでくる。
CREATE EXTERNAL TABLE IF NOT EXISTS `default`.`noaa_global_hourly_2023` (
`station` string,
`date` string,
`source` string,
`latitude` string,
`longitude` string,
`elevation` string,
`name` string,
`report_type` string,
`call_sign` string,
`quality_control` string,
`wnd` string,
`cig` string,
`vis` string,
`tmp` string,
`dew` string,
`slp` string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
'separatorChar' = ',',
'quoteChar' = '"',
'escapeChar' = '\\'
)
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://noaa-global-hourly-pds/2023/'
TBLPROPERTIES (
'classification' = 'csv',
'skip.header.line.count' = '1'
);
これでs3://noaa-global-hourly-pds/2023/
というTableが作成される。実際にプレビューを見てみると(列が多くて先頭一部しか見えません...m(_ _)m
結果をよくみるとname
やwnd
、cig
などセルにカンマが入っていて煩雑なので、さらに細分化していく。
SELECT
station as station_code,
from_iso8601_timestamp(date) as datetime,
latitude,
longitude,
elevation,
IF(CARDINALITY(SPLIT(name, ',')) > 0, SPLIT(name, ',')[1], NULL) AS station_name,
IF(CARDINALITY(SPLIT(name, ',')) > 1, SPLIT(name, ',')[2], NULL) AS country,
report_type,
call_sign,
quality_control,
cast(IF(CARDINALITY(SPLIT(wnd, ',')) > 0, SPLIT(wnd, ',')[1], NULL) as int) AS wnd_direction_angle,
IF(CARDINALITY(SPLIT(wnd, ',')) > 1, SPLIT(wnd, ',')[2], NULL) AS wnd_direction_quality_code,
IF(CARDINALITY(SPLIT(wnd, ',')) > 2, SPLIT(wnd, ',')[3], NULL) AS wnd_type_code,
cast(IF(CARDINALITY(SPLIT(wnd, ',')) > 3, SPLIT(wnd, ',')[4], NULL) as int) AS wnd_speed_rate,
IF(CARDINALITY(SPLIT(wnd, ',')) > 4, SPLIT(wnd, ',')[5], NULL) AS wnd_speed_quality_code,
cast(IF(CARDINALITY(SPLIT(cig, ',')) > 0, SPLIT(cig, ',')[1], NULL) as int) AS ceiling_height_dimension,
IF(CARDINALITY(SPLIT(cig, ',')) > 1, SPLIT(cig, ',')[2], NULL) AS ceiling_quality_code,
IF(CARDINALITY(SPLIT(cig, ',')) > 2, SPLIT(cig, ',')[3], NULL) AS ceiling_determination_code,
IF(CARDINALITY(SPLIT(cig, ',')) > 3, SPLIT(cig, ',')[4], NULL) AS cavok_code,
cast(IF(CARDINALITY(SPLIT(vis, ',')) > 0, SPLIT(vis, ',')[1], NULL) as int) AS distance_dimension,
IF(CARDINALITY(SPLIT(vis, ',')) > 1, SPLIT(vis, ',')[2], NULL) AS distance_quality_code,
IF(CARDINALITY(SPLIT(vis, ',')) > 2, SPLIT(vis, ',')[3], NULL) AS variability_code,
IF(CARDINALITY(SPLIT(vis, ',')) > 3, SPLIT(vis, ',')[4], NULL) AS quality_variability_code,
cast(IF(CARDINALITY(SPLIT(tmp, ',')) > 0, SPLIT(tmp, ',')[1], NULL) as int) AS air_temperature,
IF(CARDINALITY(SPLIT(tmp, ',')) > 1, SPLIT(tmp, ',')[2], NULL) AS air_temperature_quality_code,
cast(IF(CARDINALITY(SPLIT(dew, ',')) > 0, SPLIT(dew, ',')[1], NULL) as int) AS dew_point_temperature,
IF(CARDINALITY(SPLIT(dew, ',')) > 1, SPLIT(dew, ',')[2], NULL) AS dew_point_quality_code,
cast(IF(CARDINALITY(SPLIT(slp, ',')) > 0, SPLIT(slp, ',')[1], NULL) as int) AS sea_level_pressure,
IF(CARDINALITY(SPLIT(slp, ',')) > 1, SPLIT(slp, ',')[2], NULL) AS sea_level_pressure_quality_code
FROM noaa_global_hourly_2023
LIMIT 5;
これを実行すると綺麗なフォーマットで結果を得られる。
💪過去分のバルク取り込み
脳筋なので、過去分(2021年~現在)は力技で実施する。これについては特に記述することはない。面倒だが4回繰り返せば終わるので、コーディングする方が面倒だ。
ここまで掛かった費用は約$1である。抽出されたデータは一年分が約25GBになっているので、データ量を半分削減($5→$2.5)できている。
Amazon S3 Selectorを用いたデータ抽出
次に、S3 Selectorを用いて増分更新だけを取得する。例えば、2024年12月18日のアメリカ"SELAWIK"(station_code
: 70019726558)のデータを取得するコマンドは以下の通り。
aws s3api select-object-content \
--bucket noaa-global-hourly-pds \
--key 2024/70019726558.csv \
--expression "SELECT * FROM S3Object WHERE _2 LIKE '2024-12-18%'" \
--expression-type SQL \
--input-serialization '{"CSV": {"FileHeaderInfo": "IGNORE"}}' \
--output-serialization '{"CSV": {}}' \
output.csv
ところが厄介なのが、この更新がいつ行われるのか決まっていない点である。場合によっては翌日に反映される場合もあれば、一週間ほど要するケースもある。
面倒であるが、ステーションごとの取得済み日時を辞書化しておいて、増分更新の際に都度それを参照してWHERE句で増分を指定し、S3 Selectorクエリを書けば実現できる。
増分データ取得用のAWS アーキテクチャ
先ずは構想となるAWSアーキテクチャを図に示す。
- EventBridgeスケジュールでDailyなどでLambdaを起動
- Lambdaは先ず、
{station_id: 取得済み最新日時}
形式の取得済み日時辞書をS3から参照 - 参照にない日時を増分としてS3 SelectorでNOAAからデータ抽出
- これを全ステーションに対して実行し、マージしてS3に格納
辞書の初期化
Athenaでバルク取り込みした分から辞書を初期化する。4年分のデータから一気に辞書を作るのはメモリがかなり必要になるので、これもAthenaを用いて作成する。先ずはテーブルの作成から。
CREATE EXTERNAL TABLE IF NOT EXISTS `default`.`bulk` (
`station` string,
`date` string,
`source` string,
`latitude` string,
`longitude` string,
`elevation` string,
`name` string,
`report_type` string,
`call_sign` string,
`quality_control` string,
`wnd` string,
`cig` string,
`vis` string,
`tmp` string,
`dew` string,
`slp` string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
'separatorChar' = ',',
'quoteChar' = '"',
'escapeChar' = '\\'
)
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION --'s3://自分のS3バケット/NOAA/'
TBLPROPERTIES (
'classification' = 'csv',
'skip.header.line.count' = '1'
);
続いて、station_idと最新dateを抽出
SELECT
station,
MAX(date) as latest_date
FROM bulk
GROUP BY station;
この結果は1MBもしない.CSVとなるので、適当に辞書化して.pkl化&保存する。なお、.pklの利用についてはこちらを参照する。
Lambdaコード
前節で作成した辞書latest_date_by_station.pkl
を読み込んで、全ステーションに対して増分取得するS3 Selectorを投げるLambdaを書く。
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from io import StringIO
import pickle
import boto3
import pandas as pd
# names for the dataset
COLUMNS = [
"station",
"date",
"source",
"latitude",
"longitude",
"elevation",
"name",
"report_type",
"call_sign",
"quality_control",
"wnd",
"cig",
"vis",
"tmp",
"dew",
"slp",
]
def get_dict(s3, bucket, key="latest_date_by_station.pkl"):
try:
# S3 からファイルを取得
response = s3.get_object(Bucket=bucket, Key=key)
file_content = response["Body"].read()
# pickle ファイルを読み込み
return pickle.loads(file_content)
except Exception as e:
print(f"Error loading .pkl file from S3: {e}")
return {}
def save_dict(s3, bucket, data, key="latest_date_by_station.pkl"):
try:
# 辞書を pickle 形式にシリアライズ
serialized_data = pickle.dumps(data)
# S3 にファイルを保存
s3.put_object(Bucket=bucket, Key=key, Body=serialized_data)
print(f"Successfully saved .pkl file to S3: {bucket}/{key}")
except Exception as e:
print(f"Error saving .pkl file to S3: {e}")
def generate_sql_expression(latest_date):
try:
# Convert to ISO 8601 format
formatted = datetime.strptime(
latest_date, "%Y-%m-%d %H:%M:%S.%f %Z"
).isoformat()
return f"SELECT * FROM S3Object WHERE _2 > '{formatted}'"
except Exception as e:
print(f"Error generate query: {e}")
return None
def extract(s3, bucket, station, latest_date, query):
try:
object_key = latest_date.split("-")[0] + "/" + station + ".csv"
# S3 Selectorを実行
response = s3.select_object_content(
Bucket=bucket,
Key=object_key,
ExpressionType="SQL",
Expression=query,
InputSerialization={"CSV": {"FileHeaderInfo": "IGNORE"}},
OutputSerialization={"CSV": {}},
)
# 結果を取得
result = []
latest_date = datetime.strptime(latest_date, "%Y-%m-%d %H:%M:%S.%f %Z")
max_date = latest_date
for event in response["Payload"]:
if "Records" in event:
# S3 Selectorの結果をDataFrameに変換
_data = event["Records"]["Payload"].decode("utf-8")
df = pd.DataFrame(
[d.split(",")[:16] for d in _data.splitlines()], columns=COLUMNS
)
# datetime列を追加
df["datetime"] = pd.to_datetime(
df["date"], format="%Y-%m-%dT%H:%M:%S", errors="coerce"
)
df.dropna(subset=["datetime"], inplace=True)
# 増分データの確認と辞書の更新
local_max_date = df["datetime"].max()
if local_max_date > max_date:
max_date = local_max_date
df = df[df["datetime"] > latest_date]
result.append(df.drop(columns=["datetime"]))
return result, max_date
except Exception as e:
print(f"Error executing S3 Select: {e}")
return None, None
def format(data):
# Rename 'station' to 'station_code'
data.rename(columns={"station": "station_code"}, inplace=True)
# Convert 'date' column to datetime
data["datetime"] = (
pd.to_datetime(data["date"], format="%Y-%m-%dT%H:%M:%S")
.dt.strftime("%Y-%m-%d %H:%M:%S.%f")
.str[:-3]
+ " UTC"
)
# Extract station_name and country from 'name' column
name_split = data["name"].str.split(",", expand=True)
data["station_name"] = name_split[0]
data["country"] = name_split[1]
# Process 'wnd' column
wnd_split = data["wnd"].str.split(",", expand=True)
data["wnd_direction_angle"] = pd.to_numeric(wnd_split[0], errors="coerce")
data["wnd_direction_quality_code"] = wnd_split[1]
data["wnd_type_code"] = wnd_split[2]
data["wnd_speed_rate"] = pd.to_numeric(wnd_split[3], errors="coerce")
data["wnd_speed_quality_code"] = wnd_split[4]
# Process 'cig' column
cig_split = data["cig"].str.split(",", expand=True)
data["ceiling_height_dimension"] = pd.to_numeric(cig_split[0], errors="coerce")
data["ceiling_quality_code"] = cig_split[1]
data["ceiling_determination_code"] = cig_split[2]
data["cavok_code"] = cig_split[3]
# Process 'vis' column
vis_split = data["vis"].str.split(",", expand=True)
data["distance_dimension"] = pd.to_numeric(vis_split[0], errors="coerce")
data["distance_quality_code"] = vis_split[1]
data["variability_code"] = vis_split[2]
data["quality_variability_code"] = vis_split[3]
# Process 'tmp' column
tmp_split = data["tmp"].str.split(",", expand=True)
data["air_temperature"] = pd.to_numeric(tmp_split[0], errors="coerce")
data["air_temperature_quality_code"] = tmp_split[1]
# Process 'dew' column
dew_split = data["dew"].str.split(",", expand=True)
data["dew_point_temperature"] = pd.to_numeric(dew_split[0], errors="coerce")
data["dew_point_quality_code"] = dew_split[1]
# Process 'slp' column
slp_split = data["slp"].str.split(",", expand=True)
data["sea_level_pressure"] = pd.to_numeric(slp_split[0], errors="coerce")
data["sea_level_pressure_quality_code"] = slp_split[1]
# Select relevant columns for the final output
return data[
[
"station_code",
"datetime",
"latitude",
"longitude",
"elevation",
"station_name",
"country",
"report_type",
"call_sign",
"quality_control",
"wnd_direction_angle",
"wnd_direction_quality_code",
"wnd_type_code",
"wnd_speed_rate",
"wnd_speed_quality_code",
"ceiling_height_dimension",
"ceiling_quality_code",
"ceiling_determination_code",
"cavok_code",
"distance_dimension",
"distance_quality_code",
"variability_code",
"quality_variability_code",
"air_temperature",
"air_temperature_quality_code",
"dew_point_temperature",
"dew_point_quality_code",
"sea_level_pressure",
"sea_level_pressure_quality_code",
]
]
def save_csv_to_s3(s3, bucket, data):
try:
key = f"NOAA_update/{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
# DataFrameをCSV形式に変換
csv_buffer = StringIO()
data.to_csv(csv_buffer, index=False, quoting=1)
csv_buffer.seek(0)
# S3 にファイルを保存
s3.put_object(Bucket=bucket, Key=key, Body=csv_buffer.getvalue())
print(f"Successfully saved CSV file to S3: {bucket}/{key}")
except Exception as e:
print(f"Error saving CSV file to S3: {e}")
def process_station(s3, noaa_bucket, station, latest_date):
try:
# S3 Selector expressionの作成
query = generate_sql_expression(latest_date)
# S3 Selectorによる抽出
data, update_latest_date = extract(s3, noaa_bucket, station, latest_date, query)
return station, data, update_latest_date
except Exception as e:
print(f"Error processing station {station}: {e}")
return station, None, latest_date
def lambda_handler(event, context):
# Bucket名
noaa_bucket = "noaa-global-hourly-pds"
my_bucket = # 自分のS3バケット
# S3クライアントの作成
s3 = boto3.client("s3")
# 辞書の取得
_dict = get_dict(s3, my_bucket)
# 新しい辞書を作成
update_dict = {}
# 全てのステーションに対して増分を取得
data_list = []
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {
executor.submit(process_station, s3, noaa_bucket, station, latest_date): station
for station, latest_date in _dict.items()
}
for future in as_completed(futures):
station, data, update_latest_date = future.result()
# 辞書を更新
update_dict[station] = update_latest_date
# 増分データリストに追加
if data:
data_list.extend(data)
# 増分データがある場合
if data_list:
# データ整形
data = format(pd.concat(data_list))
# S3に保存
save_csv_to_s3(s3, my_bucket, data)
save_dict(s3, my_bucket, update_dict)
return {
"statusCode": 200,
"body": "Processing completed successfully."
}
最後に
上のLambdaを実行すると実行時間上限15分がギリギリなので、場合によってはECSやAWS Batchを用いると良いかも