はじめに
記事をご覧いただきありがとうございます。
本シリーズでは「Databricks初心者が野球のデータ分析に挑戦する」をテーマに、データ収集 → 加工・分析 → 自動化 の流れを紹介していきます。
全体の概要については、こちらの記事にまとめていますので、あわせてご確認ください。
今回はデータ分析の前段階として、必要なデータをどのように集めるか(スクレイピング) について記載します。
データ収集(スクレイピング)について
ウェブサイトに掲載されている情報を、プログラムを使って自動的に取得・整理することを スクレイピング と呼びます。
Databricksで分析を行うには、まず材料となる「データ」が必要です。
公式のハンズオンやチュートリアルでは練習用のサンプルデータが用意されていますが、今回のテーマである 野球成績データ については、自分で収集する必要があります。
今回利用したサイトは以下の2つです。
スクレイピングに関する注意点
スクレイピングを行う際には、いくつかの注意が必要です。
-
サイトによってはスクレイピングを禁止している場合があります。利用規約や robots.txt を必ず確認してください。
-
今回利用したサイトには明示的な禁止事項は記載されていませんが、利用は自己責任でお願いします。
-
過剰なアクセスはサーバーに負荷をかける行為 となります。取得間隔をあけるなど、相手サイトに配慮した実行を心がけましょう。
今回取得するデータ
今回のデータ分析で取得するデータは以下のとおりです。
- 現時点(スクレイピング実施時点)の野手・投手の成績
- 各チームの試合数
- 各選手の対チーム別成績
- 各選手の球場別成績
- チームごとの打撃・投手・守備成績
これらのデータを取得する理由は、2025年シーズンの各選手の最終成績をDatabricksで予測するためです。
予測の際には、残り試合数を考慮しながら「球場別成績」や「対チーム別成績」を組み合わせることで、より現実的なシミュレーションが可能になります。
※ 本来であれば、より多くの要素(例:打順、対戦投手の相性、天候など)を考慮すべきですが、今回は学習が目的のため、上記のデータに絞って進めています。
スクレイピング処理の実装
今回は AWS 上にスクレイピング用のインスタンス(EC2)を用意し、Python で処理を記述していくことにしました。
インスタンスのスペックは学習・検証用ということで、最小構成の t2.micro を選択しています。
取得したデータは整理しやすいように、スクリプトと出力データを分けて管理しています。
以下が今回のディレクトリ構成です。
ディレクトリ構成
batch/ # スクレイピング関連スクリプト・データ管理用ディレクトリ
├── pitcher_scraping.py # 投手成績スクレイピング
├── batter_scraping.py # 打者成績スクレイピング
├── games_scraping.py # 試合日程スクレイピング
├── fielding_central.py # セ・リーグ守備成績
├── fielding_pacific.py # パ・リーグ守備成績
├── upload_to_s3.sh # S3アップロード用シェルスクリプト
├── team_pitcher.py # チーム単位の投手成績取得
├── team_batting.py # チーム単位の打撃成績取得
├── scrape_hitters_vs_team_all.py # 打者×対チーム成績
├── scrape_pitchers_vs_team_all.py # 投手×対チーム成績
├── scrape_hitters_vs_stadium_all.py # 打者×球場成績
├── scrape_pitchers_vs_stadium_all.py # 投手×球場成績
├── scrape_nf3_schedule_all_teams.py # NPB全チーム試合日程
├── rotate_logs.py # ログローテーションスクリプト
├── data/ # 取得したデータ格納ディレクトリ
│ ├── batter/ # 打者成績(チームごと)
│ ├── matches/ # 試合日程(チームごと)
│ ├── pitcher/ # 投手成績(チームごと)
│ ├── team_batting/ # チーム打撃成績
│ ├── team_defense/ # チーム守備成績
│ ├── team_pitcher/ # チーム投手成績
│ └── team_splits/ # チーム分割データ
└── logs/ # ログ出力先ディレクトリ
打者関連スクリプト
打者成績に関するスクレイピング処理です。
ここでは代表的な batter_scraping.py を全文掲載し、その他のスクリプトは要点だけ紹介します。詳細なコードは GitHub にまとめています。
batter_scraping.py
打者の基本成績(打率・安打・本塁打など)を取得するスクリプトです。
コードを見る
import pandas as pd
import os
from datetime import datetime
# === ログ設定 ===
log_dir = "/home/ec2-user/batch/logs"
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f"batter_scraping_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
def log(msg):
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
with open(log_file, "a", encoding="utf-8") as f:
f.write(f"[{timestamp}] {msg}\n")
print(f"[{timestamp}] {msg}")
log("=== バッターデータ取得処理 開始 ===")
# 球団名とURLのマッピング
teams = {
"fighters": "https://baseball-data.com/stats/hitter-f/",
"hawks": "https://baseball-data.com/stats/hitter-h/",
"buffaloes": "https://baseball-data.com/stats/hitter-bs/",
"marines": "https://baseball-data.com/stats/hitter-m/",
"lions": "https://baseball-data.com/stats/hitter-l/",
"eagles": "https://baseball-data.com/stats/hitter-e/",
"giants": "https://baseball-data.com/stats/hitter-g/",
"tigers": "https://baseball-data.com/stats/hitter-t/",
"swallows": "https://baseball-data.com/stats/hitter-s/",
"dragons": "https://baseball-data.com/stats/hitter-d/",
"carp": "https://baseball-data.com/stats/hitter-c/",
"baystars": "https://baseball-data.com/stats/hitter-yb/"
}
# 出力先ディレクトリ
output_dir = "/home/ec2-user/batch/data/batter"
os.makedirs(output_dir, exist_ok=True)
# カラム名定義
columns = [
"背番号", "選手名", "打率", "試合", "打席数", "打数", "安打", "本塁打",
"打点", "盗塁", "四球", "死球", "三振", "犠打", "併殺打",
"出塁率", "長打率", "OPS", "RC27", "XR27"
]
# 各球団のデータ取得&保存
for team_key, url in teams.items():
try:
log(f"[{team_key}] URL取得開始:{url}")
tables = pd.read_html(url)
df = tables[0]
scrape_hitters_vs_team_all.py
- 各打者の 対チーム別成績 を取得
- 出力ファイル例:data/batter/fighters.csv、data/batter/tigers.csv
scrape_hitters_vs_stadium_all.py
- 各打者の 球場別成績 を取得
- 出力ファイル例:data/batter/tokyo_dome.csv、data/batter/koshien.csv
その他のスクリプトはこちらのリポジトリにまとめています:
ログローテーションの設定
スクレイピング処理では実行結果をログファイルとして出力するようにしています。
ログを蓄積していくとディスク容量を圧迫するため、古いログを自動でローテーション(世代管理)する仕組み を導入しておきます。
コードを見る
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import time
import gzip
import shutil
from pathlib import Path
from datetime import datetime, timedelta
# ===== 設定 =====
LOG_DIR = Path("/home/ec2-user/batch/logs")
# 基本ポリシー
UNCOMPRESSED_KEEP_DAYS = 2 # 2日より古い未圧縮ログは圧縮
COMPRESSED_KEEP_DAYS = 45 # 45日より古い圧縮済みログは削除
ACTIVE_GRACE_MINUTES = 10 # 直近10分内に更新のファイルは触らない
MAX_TOTAL_MB = 500 # ログ全体の上限MB(超えたら古い圧縮ログから削除 )
# ===== ここまで設定 =====
def human(n):
for unit in ["B","KB","MB","GB","TB"]:
if n < 1024.0:
return f"{n:3.1f}{unit}"
n /= 1024.0
return f"{n:.1f}PB"
def iter_log_files(root: Path):
# 隠しファイルを除く、ファイルのみ
for p in root.iterdir():
if p.is_file() and not p.name.startswith("."):
yield p
def is_recently_modified(path: Path, minutes: int) -> bool:
mtime = path.stat().st_mtime
return (time.time() - mtime) < (minutes * 60)
def compress_file(path: Path):
gz_path = path.with_suffix(path.suffix + ".gz") if path.suffix != ".gz" else path
if gz_path.suffix == ".gz":
# すでに .gz は圧縮不要
return False
try:
with open(path, "rb") as fin, gzip.open(gz_path, "wb") as fout:
shutil.copyfileobj(fin, fout)
orig_size = path.stat().st_size
os.remove(path)
print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] COMPRESS {path.name} -> {gz_path.name} ({human(orig_size)})")
return True
except Exception as e:
print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] WARN 圧縮失敗: {path.name} ({e})")
return False
def delete_file(path: Path, reason: str):
try:
size = path.stat().st_size
os.remove(path)
print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] DELETE {path.name} ({human(size)}) reason={reason}")
return size
except Exception as e:
print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] WARN 削除失敗: {path.name} ({e})")
return 0
def total_size_bytes(root: Path) -> int:
return sum(p.stat().st_size for p in iter_log_files(root))
def main():
if not LOG_DIR.exists():
print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] INFO 作成: {LOG_DIR}")
LOG_DIR.mkdir(parents=True, exist_ok=True)
now = datetime.now()
compressed_cutoff = now - timedelta(days=COMPRESSED_KEEP_DAYS)
uncompressed_cutoff = now - timedelta(days=UNCOMPRESSED_KEEP_DAYS)
# 1) 未圧縮の古いログを圧縮
for p in iter_log_files(LOG_DIR):
if p.suffix == ".gz":
continue
if is_recently_modified(p, ACTIVE_GRACE_MINUTES):
continue
try:
mtime = datetime.fromtimestamp(p.stat().st_mtime)
except FileNotFoundError:
continue
if mtime < uncompressed_cutoff:
compress_file(p)
# 2) 圧縮済みで古すぎるログ(期限切れ)を削除
for p in iter_log_files(LOG_DIR):
if p.suffix != ".gz":
continue
if is_recently_modified(p, ACTIVE_GRACE_MINUTES):
continue
try:
mtime = datetime.fromtimestamp(p.stat().st_mtime)
except FileNotFoundError:
continue
if mtime < compressed_cutoff:
delete_file(p, reason=f"older_than_{COMPRESSED_KEEP_DAYS}d")
# 3) 容量制限(MAX_TOTAL_MB)を超える場合、古い圧縮ログから削除
max_total_bytes = MAX_TOTAL_MB * 1024 * 1024
取得したCSVデータをS3に配置するための事前作業
scraping
処理で生成された CSV データは、data/
ディレクトリ配下に保存されます。
これらのファイルを Databricks で扱えるようにするため、EC2 から S3 にアップロードします。
そのためには、以下の事前作業が必要です。
- S3 にデータ配置用のバケットを作成する
- EC2 内に AWS 認証情報を設定しておく
それぞれの手順を以下にまとめます。
1. S3 にデータ配置用のバケットを作成する
まず AWS マネジメントコンソールにログインし、「S3」を検索してサービスを選択します。
「バケットを作成」ボタンをクリックします。
バケットの作成画面が表示されます。
- バケット名 は任意(重複しない一意の名前を設定)
- リージョン は任意(東京リージョンを推奨)
- それ以外の項目はデフォルトのままで問題ありません
最後に、指定した名前でバケットが作成されていることを確認してください。
2. S3 へデータを配置するための AWS 認証情報設定(EC2)
作成した S3 バケットにデータをアップロードするため、スクレイピング用の EC2 からアクセスできるように 認証情報の設定 を行います。
方法1. IAMユーザーのアクセスキーを使用する(簡単な方法)
- 事前に IAM ユーザーに S3 へのアクセス権限(例:AmazonS3FullAccess) を付与しておきます。
- IAM ユーザーの アクセスキー ID と シークレットアクセスキー を確認します。
- EC2 にログインし、以下を実行します。
aws configure
対話形式でアクセスキー・シークレットキー・リージョン・出力形式を入力すると、~/.aws/credentials に認証情報が保存されます。
👉 詳しい手順はこちらの記事が参考になります:
方法2. IAMロールをEC2に付与する(推奨)
セキュリティ上の観点からは、アクセスキーを直接 EC2 に保存せず、IAM ロールを EC2 にアタッチする方法 が推奨されます。
この方法では認証情報を手動で設定する必要がなく、EC2 が自動的に S3 にアクセス可能となります。
参考:
実際の設定例
こちらの記事では EC2 から S3 へアクセスする具体的な設定例が紹介されています。
3. 配置用スクリプトの作成
取得したデータを S3 バケットにまとめて配置するため、シェルスクリプトを作成しておきます。
サンプルコードを以下に記載しますので、EC2 内のディレクトリ構成や S3 バケット名は環境に合わせて適宜変更してください。
コードを見る
#!/bin/bash
# .env を読み込み
set -a
source /home/ec2-user/batch/.env
set +a
# ログ出力用
LOGFILE="${LOG_DIR}/upload_s3_$(date '+%Y%m%d_%H%M%S').log"
echo "=== S3 アップロード開始 $(date) ===" > "$LOGFILE"
# バッター成績
/usr/local/bin/aws s3 cp "${LOCAL_DATA_DIR}/batter/" \
"s3://${S3_BUCKET}/${S3_PREFIX}/batting/${S3_YEAR}/" --recursive >> "$LOGFILE" 2>&1
# ピッチャー成績
/usr/local/bin/aws s3 cp "${LOCAL_DATA_DIR}/pitcher/" \
"s3://${S3_BUCKET}/${S3_PREFIX}/pitcher/${S3_YEAR}/" --recursive >> "$LOGFILE" 2>&1
# 試合日程
/usr/local/bin/aws s3 cp "${LOCAL_DATA_DIR}/matches/" \
"s3://${S3_BUCKET}/${S3_PREFIX}/games/" --recursive >> "$LOGFILE" 2>&1
# チーム打撃成績
/usr/local/bin/aws s3 cp "${LOCAL_DATA_DIR}/team_batting/" \
"s3://${S3_BUCKET}/${S3_PREFIX}/team_batting/" --recursive >> "$LOGFILE" 2>&1
# チーム投手成績
/usr/local/bin/aws s3 cp "${LOCAL_DATA_DIR}/team_pitcher/" \
"s3://${S3_BUCKET}/${S3_PREFIX}/team_pitcher/" --recursive >> "$LOGFILE" 2>&1
# チーム守備成績
/usr/local/bin/aws s3 cp "${LOCAL_DATA_DIR}/team_defense/" \
"s3://${S3_BUCKET}/${S3_PREFIX}/team_defense/" --recursive >> "$LOGFILE" 2>&1
実行結果
処理が正常に完了すると、バケットに取得した CSV データが格納されます。
4. スケジュール設定(cron)
スクレイピング処理や S3 への配置スクリプトを cron に登録することで、毎日自動的にデータを更新できるようにします。
以下はサンプル設定です。実際の稼働時間は環境や用途に合わせて調整してください。
コードを見る
# 個人打者・投手 成績取得
00 8 * * * python3 /home/ec2-user/batch/batter_scraping.py
05 8 * * * python3 /home/ec2-user/batch/pitcher_scraping.py
# 消化した試合数の取得
10 8 * * * python3 /home/ec2-user/batch/games_scraping.py
# 残試合数の取得
15 8 * * * python3 /home/ec2-user/batch/scrape_nf3_schedule_all_teams.py
# チーム打撃・投手・守備 成績取得
20 8 * * * python3 /home/ec2-user/batch/team_batting.py
25 8 * * * python3 /home/ec2-user/batch/team_pitcher.py
30 8 * * * python3 /home/ec2-user/batch/fielding_central.py
35 8 * * * python3 /home/ec2-user/batch/fielding_pacific.py
# 個人打撃・投手 対チーム別の成績取得
40 8 * * * python3 /home/ec2-user/batch/scrape_hitters_vs_team_all.py
45 8 * * * python3 /home/ec2-user/batch/scrape_pitchers_vs_team_all.py
# 個人打撃・投手 球場別の成績取得
50 8 * * * python3 /home/ec2-user/batch/scrape_hitters_vs_stadium_all.py
55 8 * * * python3 /home/ec2-user/batch/scrape_pitchers_vs_stadium_all.py
# S3 に CSV データを配置
00 10 * * * /home/ec2-user/batch/upload_to_s3.sh
# ログローテーション(バッチログ)
10 1 * * * /home/ec2-user/batch/rotate_logs.py >> /home/ec2-user/batch/logs/rotate_logs_runner.log 2>&1
これで、野球データの取得から S3 バケットへのアップロードまでを自動化できる流れが完成します。
毎日決まった時間に最新のデータが集約されるため、Databricks 側では常に新しい情報を利用した分析が可能になります。
まとめ
今回の記事では、0からスクレイピング環境を構築し、野球データを自動取得して S3 に配置するところまで を紹介しました。
スクレイピング作業を実際にやってみると、データ項目の検討・処理の実装・エラー修正など試行錯誤の連続で、思った以上に時間がかかる作業でした。
しかし、こうして一連の流れを構築できたことで「分析に使えるデータ基盤」を自分の手で作れたのは大きな経験になったと感じています。
次回は、今回取得したデータを活用し、Databricks 上でのデータ分析や可視化、機械学習による予測 の内容をまとめていきます。