0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【第3回】BeautifulSoupで気象データをスクレイピング(データ収集②)

Last updated at Posted at 2025-05-05

🎮 はじめに

前回の記事では、スマブラ対戦結果を記録するDjangoアプリを作成しました。
今回は、天候データを自動収集するために、PythonのBeautifulSoupを使ってスクレイピングを行います!

気象データを取り込むことで、

  • 気温による勝率への影響は?
  • 晴れの日はどのキャラが強い?

といった分析ができるようになります。


🛠 使用技術

  • Python
  • requests(HTTP通信)
  • BeautifulSoup(HTMLパース)
  • pandas(データ整理)

🌤 スクレイピング対象サイト

今回は、気象庁のWEBサイトを対象にスクレイピングします。
https://www.data.jma.go.jp/obd/stats/etrn/index.php?prec_no=&block_no=&year=&month=&day=&view=

場所と日付を選んでいくと、パラメータが埋まっていきます。
最後に、「~1時間毎の値を表示」をクリックすると次の画面に進みます。

気象.png

URLに注目してください。
year、month、dayに対象の年月日が入っています。
ここに変数を使うと、動的な処理ができそうですね!
https://www.data.jma.go.jp/stats/etrn/view/hourly_s1.php?prec_no=44&block_no=47662&year=2025&month=1&day=1&view=


🧱 スクレイピング処理の流れ

  1. 取得する期間を設定
  2. BeautifulSoupでHTMLをパース
  3. 必要なデータを抽出
  4. pandasでCSVに保存

🧪 実装

1. 実際のコード

📄 preterite_weather_export.py

import requests
from bs4 import BeautifulSoup
import pandas as pd
from datetime import datetime

# 固定パスの設定
file_path = 'S3_upload/preterite_weather_export.csv'

def load_data_and_get_dates():
    try:
        df = pd.read_csv(file_path)

        if df.empty:
            print("データが存在しないため、指定日から昨日までのデータを取得します。")
            # データがない場合、指定日を開始日にする(ここを変更)
            start_date = datetime(2025, 3, 30)  # ★ここで「指定したい開始日」を設定する
            end_date = datetime.now() - pd.Timedelta(days=1)
            return None, start_date, end_date, file_path

        df.iloc[:, 0] = pd.to_datetime(df.iloc[:, 0]).dt.strftime('%Y-%m-%d')
        last_date = pd.to_datetime(df.iloc[-1, 0])
        start_date = last_date + pd.Timedelta(days=1)
        end_date = datetime.now() - pd.Timedelta(days=1)
        return df, start_date, end_date, file_path
    except Exception as e:
        print(f"ファイルの読み込み中にエラーが発生しました: {e}")
        specified_start_date = datetime(2025, 3, 30)  # ★ここで「指定したい開始日」を設定する
        end_date = datetime.now() - pd.Timedelta(days=1)
        return None, specified_start_date, end_date, file_path

def fetch_weather_data(start_date, end_date):
    columns = ['date', 'time', 'local_pressure', 'sea_level_pressure', 'precipitation', 'temperature', 'dew_point', 'vapor_pressure', 'humidity', 'wind_speed', 'wind_direction', 'sunshine_duration', 'solar_radiation', 'snowfall', 'snow_depth', 'weather', 'cloud_cover', 'visibility']
    df = pd.DataFrame(columns=columns)

    # 指定された期間のデータを取得
    date_range = pd.date_range(start_date, end_date)
    for single_date in date_range:
        url = f"https://www.data.jma.go.jp/stats/etrn/view/hourly_s1.php?prec_no=44&block_no=47662&year={single_date.year}&month={single_date.month}&day={single_date.day}&view="
        response = requests.get(url)
        soup = BeautifulSoup(response.content, "html.parser")
        all_data = soup.find_all(class_="data_0_0")

        for hour in range(24):
            date_str = single_date.strftime('%Y-%m-%d')
            hour_str = f"{hour + 1:02d}"
            list_data = [date_str, hour_str] + [data.text.strip() for data in all_data[hour * 16:hour * 16 + 16]]
            df.loc[len(df)] = list_data

    return df

def main():
    df, start_date, end_date, file_path = load_data_and_get_dates()
    if df is not None and start_date <= end_date:
        # 新しいデータを取得して既存のデータに追加
        new_data = fetch_weather_data(start_date, end_date)
        combined_data = pd.concat([df, new_data], ignore_index=True)
        combined_data.to_csv(file_path, index=False, encoding='utf-8-sig')
        print(f"更新したファイルはこちらに保存されました: {file_path}")
    elif df is not None:
        # 最終行が昨日の場合、スクレイピングを行わずに終了
        print("最終行の日付が昨日です。スクレイピングは行いません。")
    else:
        # データが存在しない場合、指定日から昨日のデータを取得して保存
        new_data = fetch_weather_data(start_date, end_date)
        new_data.to_csv(file_path, index=False, encoding='utf-8-sig')
        print(f"新しいファイルはこちらに保存されました: {file_path}")

if __name__ == "__main__":
    main()

2. 解説

では、実際のコードについて、解説していきます!

1. 取得する期間を設定

まずは取得する期間を設定していきます。
該当のコードは以下になります。

def load_data_and_get_dates():
    try:
        df = pd.read_csv(file_path)

        if df.empty:
            print("データが存在しないため、指定日から昨日までのデータを取得します。")
            # データがない場合、指定日を開始日にする(ここを変更)
            start_date = datetime(2025, 3, 30)  # ★ここで「指定したい開始日」を設定する
            end_date = datetime.now() - pd.Timedelta(days=1)
            return None, start_date, end_date, file_path

        df.iloc[:, 0] = pd.to_datetime(df.iloc[:, 0]).dt.strftime('%Y-%m-%d')
        last_date = pd.to_datetime(df.iloc[-1, 0])
        start_date = last_date + pd.Timedelta(days=1)
        end_date = datetime.now() - pd.Timedelta(days=1)
        return df, start_date, end_date, file_path
    except Exception as e:
        print(f"ファイルの読み込み中にエラーが発生しました: {e}")
        specified_start_date = datetime(2025, 3, 30)  # ★ここで「指定したい開始日」を設定する
        end_date = datetime.now() - pd.Timedelta(days=1)
        return None, specified_start_date, end_date, file_path

ロジックを簡単に説明すると、固定パスのcsvを読み込み、
ファイルやデータがない場合は、指定日(Djangoアプリで初めてデータを入力した日)~昨日までの取得、
データが存在する場合は、最終行の次の日~昨日のデータを取得するようにしています。

また、スクレイピングによるWebへのアクセス負荷を下げるため、なるべく差分で取るように設計しています。

2. BeautifulSoupでHTMLをパース

次にBeautifulSoupでHTMLをパース(プログラムで扱えるようなデータ構造の集合体に変換)していきます。
該当のコードは以下になります。

url = f"https://www.data.jma.go.jp/stats/etrn/view/hourly_s1.php?prec_no=44&block_no=47662&year={single_date.year}&month={single_date.month}&day={single_date.day}&view="
response = requests.get(url)
soup = BeautifulSoup(response.content, "html.parser")
all_data = soup.find_all(class_="data_0_0")

ここでは、URLからページを取得し、HTMLをパースして、class="data_0_0"のタグだけをリストにして取り出す という処理をしています!

3. 必要なデータを抽出

次に必要なデータを抽出していきます。
該当のコードは以下になります。

for hour in range(24):
            date_str = single_date.strftime('%Y-%m-%d')       
            hour_str = f"{hour + 1:02d}"
            list_data = [date_str, hour_str] + [data.text.strip() for data in all_data[hour * 16:hour * 16 + 16]]
            df.loc[len(df)] = list_data

(日付と時間 + 16項目)の形式でデータを抽出しています。

4. pandasでCSVに保存

最後に抽出したデータをCSVに保存していきます。
該当のコードは以下になります。

if df is not None and start_date <= end_date:
        # 新しいデータを取得して既存のデータに追加
        new_data = fetch_weather_data(start_date, end_date)
        combined_data = pd.concat([df, new_data], ignore_index=True)
        combined_data.to_csv(file_path, index=False, encoding='utf-8-sig')
        print(f"更新したファイルはこちらに保存されました: {file_path}")
    elif df is not None:
        # 最終行が昨日の場合、スクレイピングを行わずに終了
        print("最終行の日付が昨日です。スクレイピングは行いません。")
    else:
        # データが存在しない場合、指定日から昨日のデータを取得して保存
        new_data = fetch_weather_data(start_date, end_date)
        new_data.to_csv(file_path, index=False, encoding='utf-8-sig')
        print(f"新しいファイルはこちらに保存されました: {file_path}")

以下の3種類の分岐を書いて、それぞれメッセージが出るようにしています!
これで気象データ抽出の処理が完成しました。

  • データが存在する場合は、最終行の次の日~昨日のデータを取得
  • 最終行が昨日の場合はスクレイピングしない
  • ファイルやデータがない場合は、指定日~昨日までのデータを取得

🚀 次回

次回(第4回)は、抽出したデータをbashを使って、AWS S3にアップロードしていきます。
引き続きよろしくお願いいたします!
https://qiita.com/shota1212/items/7341b3c5f970362a7c89


0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?