🎮 はじめに
前回の記事では、スマブラ対戦結果を記録するDjangoアプリを作成しました。
今回は、天候データを自動収集するために、PythonのBeautifulSoupを使ってスクレイピングを行います!
気象データを取り込むことで、
- 気温による勝率への影響は?
- 晴れの日はどのキャラが強い?
といった分析ができるようになります。
🛠 使用技術
- Python
- requests(HTTP通信)
- BeautifulSoup(HTMLパース)
- pandas(データ整理)
🌤 スクレイピング対象サイト
今回は、気象庁のWEBサイトを対象にスクレイピングします。
https://www.data.jma.go.jp/obd/stats/etrn/index.php?prec_no=&block_no=&year=&month=&day=&view=
場所と日付を選んでいくと、パラメータが埋まっていきます。
最後に、「~1時間毎の値を表示」をクリックすると次の画面に進みます。
URLに注目してください。
year、month、dayに対象の年月日が入っています。
ここに変数を使うと、動的な処理ができそうですね!
https://www.data.jma.go.jp/stats/etrn/view/hourly_s1.php?prec_no=44&block_no=47662&year=2025&month=1&day=1&view=
🧱 スクレイピング処理の流れ
- 取得する期間を設定
- BeautifulSoupでHTMLをパース
- 必要なデータを抽出
- pandasでCSVに保存
🧪 実装
1. 実際のコード
📄 preterite_weather_export.py
import requests
from bs4 import BeautifulSoup
import pandas as pd
from datetime import datetime
# 固定パスの設定
file_path = 'S3_upload/preterite_weather_export.csv'
def load_data_and_get_dates():
try:
df = pd.read_csv(file_path)
if df.empty:
print("データが存在しないため、指定日から昨日までのデータを取得します。")
# データがない場合、指定日を開始日にする(ここを変更)
start_date = datetime(2025, 3, 30) # ★ここで「指定したい開始日」を設定する
end_date = datetime.now() - pd.Timedelta(days=1)
return None, start_date, end_date, file_path
df.iloc[:, 0] = pd.to_datetime(df.iloc[:, 0]).dt.strftime('%Y-%m-%d')
last_date = pd.to_datetime(df.iloc[-1, 0])
start_date = last_date + pd.Timedelta(days=1)
end_date = datetime.now() - pd.Timedelta(days=1)
return df, start_date, end_date, file_path
except Exception as e:
print(f"ファイルの読み込み中にエラーが発生しました: {e}")
specified_start_date = datetime(2025, 3, 30) # ★ここで「指定したい開始日」を設定する
end_date = datetime.now() - pd.Timedelta(days=1)
return None, specified_start_date, end_date, file_path
def fetch_weather_data(start_date, end_date):
columns = ['date', 'time', 'local_pressure', 'sea_level_pressure', 'precipitation', 'temperature', 'dew_point', 'vapor_pressure', 'humidity', 'wind_speed', 'wind_direction', 'sunshine_duration', 'solar_radiation', 'snowfall', 'snow_depth', 'weather', 'cloud_cover', 'visibility']
df = pd.DataFrame(columns=columns)
# 指定された期間のデータを取得
date_range = pd.date_range(start_date, end_date)
for single_date in date_range:
url = f"https://www.data.jma.go.jp/stats/etrn/view/hourly_s1.php?prec_no=44&block_no=47662&year={single_date.year}&month={single_date.month}&day={single_date.day}&view="
response = requests.get(url)
soup = BeautifulSoup(response.content, "html.parser")
all_data = soup.find_all(class_="data_0_0")
for hour in range(24):
date_str = single_date.strftime('%Y-%m-%d')
hour_str = f"{hour + 1:02d}"
list_data = [date_str, hour_str] + [data.text.strip() for data in all_data[hour * 16:hour * 16 + 16]]
df.loc[len(df)] = list_data
return df
def main():
df, start_date, end_date, file_path = load_data_and_get_dates()
if df is not None and start_date <= end_date:
# 新しいデータを取得して既存のデータに追加
new_data = fetch_weather_data(start_date, end_date)
combined_data = pd.concat([df, new_data], ignore_index=True)
combined_data.to_csv(file_path, index=False, encoding='utf-8-sig')
print(f"更新したファイルはこちらに保存されました: {file_path}")
elif df is not None:
# 最終行が昨日の場合、スクレイピングを行わずに終了
print("最終行の日付が昨日です。スクレイピングは行いません。")
else:
# データが存在しない場合、指定日から昨日のデータを取得して保存
new_data = fetch_weather_data(start_date, end_date)
new_data.to_csv(file_path, index=False, encoding='utf-8-sig')
print(f"新しいファイルはこちらに保存されました: {file_path}")
if __name__ == "__main__":
main()
2. 解説
では、実際のコードについて、解説していきます!
1. 取得する期間を設定
まずは取得する期間を設定していきます。
該当のコードは以下になります。
def load_data_and_get_dates():
try:
df = pd.read_csv(file_path)
if df.empty:
print("データが存在しないため、指定日から昨日までのデータを取得します。")
# データがない場合、指定日を開始日にする(ここを変更)
start_date = datetime(2025, 3, 30) # ★ここで「指定したい開始日」を設定する
end_date = datetime.now() - pd.Timedelta(days=1)
return None, start_date, end_date, file_path
df.iloc[:, 0] = pd.to_datetime(df.iloc[:, 0]).dt.strftime('%Y-%m-%d')
last_date = pd.to_datetime(df.iloc[-1, 0])
start_date = last_date + pd.Timedelta(days=1)
end_date = datetime.now() - pd.Timedelta(days=1)
return df, start_date, end_date, file_path
except Exception as e:
print(f"ファイルの読み込み中にエラーが発生しました: {e}")
specified_start_date = datetime(2025, 3, 30) # ★ここで「指定したい開始日」を設定する
end_date = datetime.now() - pd.Timedelta(days=1)
return None, specified_start_date, end_date, file_path
ロジックを簡単に説明すると、固定パスのcsvを読み込み、
ファイルやデータがない場合は、指定日(Djangoアプリで初めてデータを入力した日)~昨日までの取得、
データが存在する場合は、最終行の次の日~昨日のデータを取得するようにしています。
また、スクレイピングによるWebへのアクセス負荷を下げるため、なるべく差分で取るように設計しています。
2. BeautifulSoupでHTMLをパース
次にBeautifulSoupでHTMLをパース(プログラムで扱えるようなデータ構造の集合体に変換)していきます。
該当のコードは以下になります。
url = f"https://www.data.jma.go.jp/stats/etrn/view/hourly_s1.php?prec_no=44&block_no=47662&year={single_date.year}&month={single_date.month}&day={single_date.day}&view="
response = requests.get(url)
soup = BeautifulSoup(response.content, "html.parser")
all_data = soup.find_all(class_="data_0_0")
ここでは、URLからページを取得し、HTMLをパースして、class="data_0_0"のタグだけをリストにして取り出す という処理をしています!
3. 必要なデータを抽出
次に必要なデータを抽出していきます。
該当のコードは以下になります。
for hour in range(24):
date_str = single_date.strftime('%Y-%m-%d')
hour_str = f"{hour + 1:02d}"
list_data = [date_str, hour_str] + [data.text.strip() for data in all_data[hour * 16:hour * 16 + 16]]
df.loc[len(df)] = list_data
(日付と時間 + 16項目)の形式でデータを抽出しています。
4. pandasでCSVに保存
最後に抽出したデータをCSVに保存していきます。
該当のコードは以下になります。
if df is not None and start_date <= end_date:
# 新しいデータを取得して既存のデータに追加
new_data = fetch_weather_data(start_date, end_date)
combined_data = pd.concat([df, new_data], ignore_index=True)
combined_data.to_csv(file_path, index=False, encoding='utf-8-sig')
print(f"更新したファイルはこちらに保存されました: {file_path}")
elif df is not None:
# 最終行が昨日の場合、スクレイピングを行わずに終了
print("最終行の日付が昨日です。スクレイピングは行いません。")
else:
# データが存在しない場合、指定日から昨日のデータを取得して保存
new_data = fetch_weather_data(start_date, end_date)
new_data.to_csv(file_path, index=False, encoding='utf-8-sig')
print(f"新しいファイルはこちらに保存されました: {file_path}")
以下の3種類の分岐を書いて、それぞれメッセージが出るようにしています!
これで気象データ抽出の処理が完成しました。
- データが存在する場合は、最終行の次の日~昨日のデータを取得
- 最終行が昨日の場合はスクレイピングしない
- ファイルやデータがない場合は、指定日~昨日までのデータを取得
🚀 次回
次回(第4回)は、抽出したデータをbashを使って、AWS S3にアップロードしていきます。
引き続きよろしくお願いいたします!
https://qiita.com/shota1212/items/7341b3c5f970362a7c89