1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【必読】†JK100人がためになったと言った† NOAA気象データを取ってくる

Last updated at Posted at 2024-12-26

はじめに

どうも、セキュリティ芸人ことアスースン・オンラインの「セキュリティホール」兼相方のそね氏です。今回、脆弱エンジニアのAdvent Calendar 2024に空いた“ポストホール”を埋める役割が回ってきたということで、筆を執りました。

実は、ストレングス・ファインダーで「回復志向」が上位にランクインしている私が、このミッションに選ばれるのはある意味必然と言えるのかもしれません。それでは、ここから本題に入っていきたいと思います。

概要

元来、このアドベント・カレンダーはセキュリティ縛りのネタであることが暗黙の了解であったが、セキュリティホールの肩書きを持つ我が身としては、これは守りたくない規則である。

故に、全く関係のない記事を書きたいと思う。

NOAA-ISDとは、アメリカ国立海洋大気庁(NOAA)が提供する包括的な地表観測データベースである。このようにDailyで更新されるそこそこの大きさの気象データがあると、自動取得するシステムを組んで自分のMySQLに入れたくなるのが人間の性だ。きっとみんなそうしたくなると思う。よって今回はこのデータのETLを作って行きたいと思う。

ゴールは

  1. 都合の良いフォーマットで
  2. 自分のAWSアカウントのS3バケットに
  3. 過去分(2021年~)を取り込み
  4. Dailyで増分更新する処理を作成する

である。

データソース

先述のドキュメントを見ると、データソースの概略は下記の通り。

  • Description: ISD in CSV format
  • Resource type: S3 Bucket
  • ARN: arn:aws:s3:::noaa-global-hourly-pds
  • AWS Region: us-east-1
  • AWS CLI Access (No AWS account required): aws s3 ls --no-sign-request s3://noaa-global-hourly-pds/
  • Explore: Browse Bucket

要するに、バージニア北部リージョンにある公開S3で用意されているということ。取り上えず2023年分のデータ総量を見てみる。

% aws s3 ls --no-sign-request s3://noaa-global-hourly-pds/2023/ --recursive --human-readable --summarize | grep "Total Size" | awk '{print $3}'
48.6

ざっくり1年で50GBであることが分かった。これを何も考えずにus-east-1から転送してしまうと、1年分一回あたり$5かかる。よって、バージニア北部でなるべく軽くして(S3は同一リージョン間のデータ転送料はタダ)自分のS3に格納したくなるのが人間の性だ。

さらに面倒なことに、

% aws s3 ls --no-sign-request s3://noaa-global-hourly-pds/2023/ 
2024-03-28 23:40:39    3025698 01001099999.csv
2024-03-28 23:44:33    1900320 01001499999.csv
2024-03-28 23:40:35     145363 01002099999.csv
2024-03-28 23:40:20     429820 01003099999.csv
2024-03-28 23:42:38     153103 01006099999.csv
...

と、Daily更新は同一ファイル名の.CSVの上書きで行われている様である。つまり、脳死で増分更新を試みるとその時点でのフルデータを毎回取ってくることになる。この点もなんとか上手くかわしたい(コスト的に)。

ここまでのETL仕様のポイントをまとめると以下の通り。

  • なるべくバージニア北部でデータを軽量化
  • フルデータを参照しないで増分更新する方法はないか

Amazon Athenaを用いたフォーマッティング

AWSにはS3に保存されたデータをクエリして情報を取得するためのサービスとして、Amazon S3 SelectとAmazon Athenaがある。似て非なるものだが、簡単に言えばAthenaの方がリッチなSQL(Prestoエンジン)をサポートし、スキャンデータサイズに応じて課金。一方、S3 SelectはインラインなんちゃってSQLで抽出データに基づいて課金される。

つまり、

  • 過去分(2021年~現在)バルク取得→Athena
  • 増分更新→S3 Select

が最適解な雰囲気だ。実際にAthenaのスキャンデータ課金料は$5.00/TB、つまり、50GB≒$0.25と脳死転送をした時の$5に比べたら全然可愛い。ここで、必要列だけを抽出したり、圧縮したりなど出来ればかなり良さそうだ。

ちなみに、増分更新のS3 Selectは一日一行×気象ステーション数であり、ハナクソみたいな金額なので無視する。

Athenaによるデータのバルク取得

さてここで、Athenaから実際にクエリを投げて都合の良いフォーマットにしてみる。勿論、Athenaを動かすリージョンはus-east-1であることに留意

先ずはTableの作成から。
NOAAのドキュメントから各列の意味を見ながら必要なものだけをselect文で選んでくる。

CREATE EXTERNAL TABLE IF NOT EXISTS `default`.`noaa_global_hourly_2023` (
  `station` string,
  `date` string,
  `source` string,
  `latitude` string,
  `longitude` string,
  `elevation` string,
  `name` string,
  `report_type` string,
  `call_sign` string,
  `quality_control` string,
  `wnd` string,
  `cig` string,
  `vis` string,
  `tmp` string,
  `dew` string,
  `slp` string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  'separatorChar' = ',',
  'quoteChar' = '"',
  'escapeChar' = '\\'
)
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://noaa-global-hourly-pds/2023/'
TBLPROPERTIES (
  'classification' = 'csv',
  'skip.header.line.count' = '1'
);

これでs3://noaa-global-hourly-pds/2023/というTableが作成される。実際にプレビューを見てみると(列が多くて先頭一部しか見えません...m(_ _)m

image.png

結果をよくみるとnamewndcigなどセルにカンマが入っていて煩雑なので、さらに細分化していく。

SELECT
  station as station_code,
  from_iso8601_timestamp(date) as datetime,
  latitude,
  longitude,
  elevation,
  IF(CARDINALITY(SPLIT(name, ',')) > 0, SPLIT(name, ',')[1], NULL) AS station_name,
  IF(CARDINALITY(SPLIT(name, ',')) > 1, SPLIT(name, ',')[2], NULL) AS country,
  report_type,
  call_sign,
  quality_control,
  cast(IF(CARDINALITY(SPLIT(wnd, ',')) > 0, SPLIT(wnd, ',')[1], NULL) as int) AS wnd_direction_angle, 
  IF(CARDINALITY(SPLIT(wnd, ',')) > 1, SPLIT(wnd, ',')[2], NULL) AS wnd_direction_quality_code, 
  IF(CARDINALITY(SPLIT(wnd, ',')) > 2, SPLIT(wnd, ',')[3], NULL) AS wnd_type_code,  
  cast(IF(CARDINALITY(SPLIT(wnd, ',')) > 3, SPLIT(wnd, ',')[4], NULL) as int) AS wnd_speed_rate,  
  IF(CARDINALITY(SPLIT(wnd, ',')) > 4, SPLIT(wnd, ',')[5], NULL) AS wnd_speed_quality_code,
  cast(IF(CARDINALITY(SPLIT(cig, ',')) > 0, SPLIT(cig, ',')[1], NULL) as int) AS ceiling_height_dimension,
  IF(CARDINALITY(SPLIT(cig, ',')) > 1, SPLIT(cig, ',')[2], NULL) AS ceiling_quality_code,
  IF(CARDINALITY(SPLIT(cig, ',')) > 2, SPLIT(cig, ',')[3], NULL) AS ceiling_determination_code,
  IF(CARDINALITY(SPLIT(cig, ',')) > 3, SPLIT(cig, ',')[4], NULL) AS cavok_code,
  cast(IF(CARDINALITY(SPLIT(vis, ',')) > 0, SPLIT(vis, ',')[1], NULL) as int) AS distance_dimension,
  IF(CARDINALITY(SPLIT(vis, ',')) > 1, SPLIT(vis, ',')[2], NULL) AS distance_quality_code,
  IF(CARDINALITY(SPLIT(vis, ',')) > 2, SPLIT(vis, ',')[3], NULL) AS variability_code,
  IF(CARDINALITY(SPLIT(vis, ',')) > 3, SPLIT(vis, ',')[4], NULL) AS quality_variability_code,
  cast(IF(CARDINALITY(SPLIT(tmp, ',')) > 0, SPLIT(tmp, ',')[1], NULL) as int) AS air_temperature,
  IF(CARDINALITY(SPLIT(tmp, ',')) > 1, SPLIT(tmp, ',')[2], NULL) AS air_temperature_quality_code,
  cast(IF(CARDINALITY(SPLIT(dew, ',')) > 0, SPLIT(dew, ',')[1], NULL) as int) AS dew_point_temperature,
  IF(CARDINALITY(SPLIT(dew, ',')) > 1, SPLIT(dew, ',')[2], NULL) AS dew_point_quality_code,
  cast(IF(CARDINALITY(SPLIT(slp, ',')) > 0, SPLIT(slp, ',')[1], NULL) as int) AS sea_level_pressure,
  IF(CARDINALITY(SPLIT(slp, ',')) > 1, SPLIT(slp, ',')[2], NULL) AS sea_level_pressure_quality_code
FROM noaa_global_hourly_2023
LIMIT 5;

これを実行すると綺麗なフォーマットで結果を得られる。

💪過去分のバルク取り込み

脳筋なので、過去分(2021年~現在)は力技で実施する。これについては特に記述することはない。面倒だが4回繰り返せば終わるので、コーディングする方が面倒だ。

ここまで掛かった費用は約$1である。抽出されたデータは一年分が約25GBになっているので、データ量を半分削減($5→$2.5)できている。

Amazon S3 Selectorを用いたデータ抽出

次に、S3 Selectorを用いて増分更新だけを取得する。例えば、2024年12月18日のアメリカ"SELAWIK"(station_code: 70019726558)のデータを取得するコマンドは以下の通り。

aws s3api select-object-content \
  --bucket noaa-global-hourly-pds \
  --key 2024/70019726558.csv \
  --expression "SELECT * FROM S3Object WHERE _2 LIKE '2024-12-18%'" \
  --expression-type SQL \
  --input-serialization '{"CSV": {"FileHeaderInfo": "IGNORE"}}' \
  --output-serialization '{"CSV": {}}' \
  output.csv

ところが厄介なのが、この更新がいつ行われるのか決まっていない点である。場合によっては翌日に反映される場合もあれば、一週間ほど要するケースもある。

面倒であるが、ステーションごとの取得済み日時を辞書化しておいて、増分更新の際に都度それを参照してWHERE句で増分を指定し、S3 Selectorクエリを書けば実現できる。

増分データ取得用のAWS アーキテクチャ

先ずは構想となるAWSアーキテクチャを図に示す。

image.png

  • EventBridgeスケジュールでDailyなどでLambdaを起動
  • Lambdaは先ず、{station_id: 取得済み最新日時}形式の取得済み日時辞書をS3から参照
  • 参照にない日時を増分としてS3 SelectorでNOAAからデータ抽出
  • これを全ステーションに対して実行し、マージしてS3に格納

辞書の初期化

Athenaでバルク取り込みした分から辞書を初期化する。4年分のデータから一気に辞書を作るのはメモリがかなり必要になるので、これもAthenaを用いて作成する。先ずはテーブルの作成から。

CREATE EXTERNAL TABLE IF NOT EXISTS `default`.`bulk` (
  `station` string,
  `date` string,
  `source` string,
  `latitude` string,
  `longitude` string,
  `elevation` string,
  `name` string,
  `report_type` string,
  `call_sign` string,
  `quality_control` string,
  `wnd` string,
  `cig` string,
  `vis` string,
  `tmp` string,
  `dew` string,
  `slp` string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  'separatorChar' = ',',
  'quoteChar' = '"',
  'escapeChar' = '\\'
)
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION --'s3://自分のS3バケット/NOAA/'
TBLPROPERTIES (
  'classification' = 'csv',
  'skip.header.line.count' = '1'
);

続いて、station_idと最新dateを抽出

SELECT
    station,
    MAX(date) as latest_date
FROM bulk
GROUP BY station;

この結果は1MBもしない.CSVとなるので、適当に辞書化して.pkl化&保存する。なお、.pklの利用についてはこちらを参照する。

Lambdaコード

前節で作成した辞書latest_date_by_station.pklを読み込んで、全ステーションに対して増分取得するS3 Selectorを投げるLambdaを書く。

from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from io import StringIO
import pickle

import boto3
import pandas as pd

# names for the dataset
COLUMNS = [
    "station",
    "date",
    "source",
    "latitude",
    "longitude",
    "elevation",
    "name",
    "report_type",
    "call_sign",
    "quality_control",
    "wnd",
    "cig",
    "vis",
    "tmp",
    "dew",
    "slp",
]


def get_dict(s3, bucket, key="latest_date_by_station.pkl"):
    try:
        # S3 からファイルを取得
        response = s3.get_object(Bucket=bucket, Key=key)
        file_content = response["Body"].read()

        # pickle ファイルを読み込み
        return pickle.loads(file_content)
    except Exception as e:
        print(f"Error loading .pkl file from S3: {e}")
        return {}


def get_NA_isd_set(filepath="./isd-history.csv"):
    df_isd = pd.read_csv(filepath)
    df_isd = df_isd[
        ((df_isd["CTRY"] == "US") | (df_isd["CTRY"] == "CA") | (df_isd["CTRY"] == "MX"))
        & ((df_isd["LAT"] != 0) & (df_isd["LAT"].notna()))
        & ((df_isd["LAT"] >= 14) & (df_isd["LAT"] <= 83))
        & ((df_isd["LON"] >= -170) & (df_isd["LON"] <= -50))
        & (df_isd["END"] >= 20240000)
    ]
    df_isd["station_code"] = df_isd["USAF"] + df_isd["WBAN"].astype(str)
    return set(df_isd["station_code"].values)


def chunk_dict(data_dict, n_jobs=1):
    # Split the dictionary into n chunks
    items = list(data_dict.items())
    chunk_size = len(items) // n_jobs + (len(items) % n_jobs > 0)
    for i in range(0, len(items), chunk_size):
        yield dict(items[i : i + chunk_size])


def generate_sql_expression(latest_date):
    try:
        # Convert to ISO 8601 format
        formatted = datetime.strptime(
            latest_date, "%Y-%m-%d %H:%M:%S.%f %Z"
        ).isoformat()
        return f"SELECT * FROM S3Object WHERE _2 > '{formatted}'"
    except Exception as e:
        print(f"Error generate query: {e}")
        return None


def extract(s3, bucket, station, latest_date, query):
    try:
        object_key = latest_date.split("-")[0] + "/" + station + ".csv"

        # S3 Selectorを実行
        response = s3.select_object_content(
            Bucket=bucket,
            Key=object_key,
            ExpressionType="SQL",
            Expression=query,
            InputSerialization={"CSV": {"FileHeaderInfo": "IGNORE"}},
            OutputSerialization={"CSV": {}},
        )

        # 結果を取得
        result = []
        latest_date = datetime.strptime(latest_date, "%Y-%m-%d %H:%M:%S.%f %Z")
        max_date = latest_date
        for event in response["Payload"]:
            if "Records" in event:
                # S3 Selectorの結果をDataFrameに変換
                df = pd.read_csv(
                    StringIO(event["Records"]["Payload"].decode("utf-8")),
                    header=None,
                    quotechar='"',
                    names=[f"col_{i+1}" for i in range(200)],
                    engine="python",
                    on_bad_lines="skip"
                )
                df = df.iloc[:, :16]
                df.columns = COLUMNS

                # datetime列を追加
                df["datetime"] = pd.to_datetime(
                    df["date"], format="%Y-%m-%dT%H:%M:%S", errors="coerce"
                )
                df.dropna(subset=["datetime"], inplace=True)

                # 増分データの確認と辞書の更新
                local_max_date = df["datetime"].max()
                if local_max_date > max_date:
                    max_date = local_max_date
                    df = df[df["datetime"] > latest_date]
                    result.append(df.drop(columns=["datetime"]))
        
        return result, max_date.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + " UTC"

    except Exception as e:
        print(f"Error executing S3 Select: {e}")
        return None, None


def process(s3, noaa_bucket, chank):
    update_dict_chunk = {}
    data_list_chunk = []

    for station, latest_date in chank.items():
        # S3 Selectorによる抽出
        query = generate_sql_expression(latest_date)
        data, update_latest_date = extract(s3, noaa_bucket, station, latest_date, query)
        update_dict_chunk[station] = update_latest_date
        if data:
            data_list_chunk.append(data)
            
    return update_dict_chunk, data_list_chunk


def format(data):
    # Rename 'station' to 'station_code'
    data.rename(columns={"station": "station_code"}, inplace=True)

    # Convert 'date' column to datetime
    data["datetime"] = (
        pd.to_datetime(data["date"], format="%Y-%m-%dT%H:%M:%S")
        .dt.strftime("%Y-%m-%d %H:%M:%S.%f")
        .str[:-3]
        + " UTC"
    )

    # Extract station_name and country from 'name' column
    name_split = data["name"].str.split(",", expand=True)
    data["station_name"] = name_split[0]
    data["country"] = name_split[1]

    # Process 'wnd' column
    wnd_split = data["wnd"].str.split(",", expand=True)
    data["wnd_direction_angle"] = pd.to_numeric(wnd_split[0], errors="coerce")
    data["wnd_direction_quality_code"] = wnd_split[1]
    data["wnd_type_code"] = wnd_split[2]
    data["wnd_speed_rate"] = pd.to_numeric(wnd_split[3], errors="coerce")
    data["wnd_speed_quality_code"] = wnd_split[4]

    # Process 'cig' column
    cig_split = data["cig"].str.split(",", expand=True)
    data["ceiling_height_dimension"] = pd.to_numeric(cig_split[0], errors="coerce")
    data["ceiling_quality_code"] = cig_split[1]
    data["ceiling_determination_code"] = cig_split[2]
    data["cavok_code"] = cig_split[3]

    # Process 'vis' column
    vis_split = data["vis"].str.split(",", expand=True)
    data["distance_dimension"] = pd.to_numeric(vis_split[0], errors="coerce")
    data["distance_quality_code"] = vis_split[1]
    data["variability_code"] = vis_split[2]
    data["quality_variability_code"] = vis_split[3]

    # Process 'tmp' column
    tmp_split = data["tmp"].str.split(",", expand=True)
    data["air_temperature"] = pd.to_numeric(tmp_split[0], errors="coerce")
    data["air_temperature_quality_code"] = tmp_split[1]

    # Process 'dew' column
    dew_split = data["dew"].str.split(",", expand=True)
    data["dew_point_temperature"] = pd.to_numeric(dew_split[0], errors="coerce")
    data["dew_point_quality_code"] = dew_split[1]

    # Process 'slp' column
    slp_split = data["slp"].str.split(",", expand=True)
    data["sea_level_pressure"] = pd.to_numeric(slp_split[0], errors="coerce")
    data["sea_level_pressure_quality_code"] = slp_split[1]

    # Select relevant columns for the final output
    return data[
        [
            "station_code",
            "datetime",
            "latitude",
            "longitude",
            "elevation",
            "station_name",
            "country",
            "report_type",
            "call_sign",
            "quality_control",
            "wnd_direction_angle",
            "wnd_direction_quality_code",
            "wnd_type_code",
            "wnd_speed_rate",
            "wnd_speed_quality_code",
            "ceiling_height_dimension",
            "ceiling_quality_code",
            "ceiling_determination_code",
            "cavok_code",
            "distance_dimension",
            "distance_quality_code",
            "variability_code",
            "quality_variability_code",
            "air_temperature",
            "air_temperature_quality_code",
            "dew_point_temperature",
            "dew_point_quality_code",
            "sea_level_pressure",
            "sea_level_pressure_quality_code",
        ]
    ]


def flatten_list(nested_list):
    flat_list = []
    for item in nested_list:
        if isinstance(item, list):
            # Split the dictionary into n chunks
            flat_list.extend(flatten_list(item))
        else:
            flat_list.append(item)
    return flat_list


def save_dict(s3, bucket, data, key="latest_date_by_station.pkl"):
    try:
        # 辞書を pickle 形式にシリアライズ
        serialized_data = pickle.dumps(data)

        # S3 にファイルを保存
        s3.put_object(Bucket=bucket, Key=key, Body=serialized_data)
        print(f"Successfully saved .pkl file to S3: {bucket}/{key}")
    except Exception as e:
        print(f"Error saving .pkl file to S3: {e}")


def save_csv_to_s3(s3, bucket, data):
    try:
        key = f"NOAA_update/{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"

        # DataFrameをCSV形式に変換
        csv_buffer = StringIO()
        data.to_csv(csv_buffer, index=False, quoting=1)
        csv_buffer.seek(0)

        # S3 にファイルを保存
        s3.put_object(Bucket=bucket, Key=key, Body=csv_buffer.getvalue())
        print(f"Successfully saved CSV file to S3: {bucket}/{key}")
    except Exception as e:
        print(f"Error saving CSV file to S3: {e}")


def lambda_handler(event, context):
    # Bucket名
    noaa_bucket = "noaa-global-hourly-pds"
    my_bucket = #自分のバケット名

    # S3クライアントの作成
    s3 = boto3.client("s3")

    # 辞書の取得
    _dict = get_dict(s3, my_bucket)
    update_dict = _dict.copy()

    # (オプション)北米地域のisdステーションデータを読み込み & フィルタリング
    NA_isd_set = get_NA_isd_set()
    filtered_dict = {key: value for key, value in _dict.items() if key in NA_isd_set}

    # チャンクを作成
    _dict_chunks = list(chunk_dict(filtered_dict, n_jobs=4))

    # (並列処理)全てのステーションに対して増分を取得
    results = []
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [
            executor.submit(process, s3, noaa_bucket, chunk) for chunk in _dict_chunks
        ]
        for future in futures:
            results.append(future.result())

    # 結果の取得
    data_list = []
    for update_dict_chunk, data_list_chunk in results:
        update_dict.update(update_dict_chunk)
        data_list.extend(data_list_chunk)

    # 増分データがある場合
    if data_list:
        # 平滑化
        data_list = flatten_list(data_list)

        # データ整形
        data = format(pd.concat(data_list))

        # S3に保存
        save_dict(s3, my_bucket, update_dict)
        save_csv_to_s3(s3, my_bucket, data)

最後に

上のLambdaを実行すると実行時間上限15分がギリギリなので、場合によってはECSやAWS Batchを用いると良いかも

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?