生成AIの台頭などなどによって優位性が消えつつあるので色々とコードを公開します。怒られが発生したら消えるかもしれません。
オマケとして本稿の最下部にペアトレードの初期案のコードを掲載しておきます。
上場企業の時価情報取得
上場している銘柄のticker (銘柄コード) をアップロードすれば各社について現在値、予想PER、予想配当利回り、PBR(実績)、普通株式数、時価総額を取得するコードです。
import pandas as pd
import requests
from bs4 import BeautifulSoup
import time
import random
import re
# --- 設定 ---
INPUT_FILE = 'Nikkei.xlsx'
OUTPUT_FILE = 'Nikkei_Scraped_Data_Final.xlsx'
# 必須: User-Agent設定
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}
def clean_text(text):
"""
テキストから余計な空白や改行を削除し、整形する関数
"""
if not text:
return "-"
# 連続する空白・改行を1つのスペースに置換して前後の空白削除
text = re.sub(r'\s+', ' ', text).strip()
return text
def scrape_nikkei_data(url):
try:
# サーバー負荷軽減とブロック回避のための待機(必須)
time.sleep(random.uniform(1.0, 3.0))
response = requests.get(url, headers=HEADERS, timeout=15)
if response.status_code != 200:
print(f"Error accessing {url}: Status {response.status_code}")
return None
soup = BeautifulSoup(response.content, 'html.parser')
data = {}
# --- 1. 基本情報の取得 (コード、社名) ---
# URLからコード抽出
code_match = re.search(r'scode=(\d+)', url)
data['ticker'] = code_match.group(1) if code_match else "-"
# タイトルから社名抽出 (例: "極洋【1301】の株価...")
title_tag = soup.select_one('h1.m-headlineLarge_text')
data['社名'] = clean_text(title_tag.text) if title_tag else "-"
# --- 2. 現在値の取得 ---
# クラス: m-stockPriceElm_value now
price_elm = soup.select_one('dd.m-stockPriceElm_value.now')
if price_elm:
# テキストを取得すると "4,590 円" のようになる
# 念のため "円" が重複しないよう一度削除してから付与する、あるいはそのまま使う
raw_price = clean_text(price_elm.text)
data['現在値'] = raw_price # HTML内に「円」が含まれているためそのまま使用
else:
data['現在値'] = "-"
# --- 3. 詳細指標の取得 (PER, PBR, 利回り, 株式数, 時価総額) ---
# 構造: <ul class="m-stockInfo_detail_list"> 内の li 要素を探索
# li > span.m-stockInfo_detail_title (項目名)
# li > span.m-stockInfo_detail_value (値)
# 取得したい項目のマッピング (HTML上の表記 -> 出力したいキー)
target_metrics = {
'予想PER': 'PER',
'予想配当利回り': '利回り',
'PBR(実績)': 'PBR',
'普通株式数': '株式数',
'時価総額': '時価総額'
}
# 初期値を "-" に設定
for key in target_metrics.values():
data[key] = "-"
# ページ内のすべての詳細リスト項目を取得
detail_items = soup.select('.m-stockInfo_detail_list li')
for item in detail_items:
title_span = item.select_one('.m-stockInfo_detail_title')
value_span = item.select_one('.m-stockInfo_detail_value')
if title_span and value_span:
title_text = clean_text(title_span.text)
# マッピングにある項目かチェック(部分一致)
for html_label, output_key in target_metrics.items():
if html_label in title_text:
# 値を取得(単位のspanタグなども含めてテキスト化される)
data[output_key] = clean_text(value_span.text)
break
return data
except Exception as e:
print(f"Exception for {url}: {e}")
return None
# --- メイン実行処理 ---
try:
print("Excelファイルを読み込んでいます...")
# ヘッダーなしと仮定して読み込み(A列=0番目)
df_input = pd.read_excel(INPUT_FILE, header=None)
# URLが含まれている列(A列)をリスト化
urls = df_input.iloc[:, 0].dropna().astype(str).tolist()
# httpを含む有効なURLのみ抽出
valid_urls = [u for u in urls if "http" in u]
print(f"{len(valid_urls)} 件のURLを処理します。")
results = []
# テスト実行用(全てのURLを処理する場合は以下の行を削除またはコメントアウトしてください)
# valid_urls = valid_urls[:5]
for i, url in enumerate(valid_urls):
print(f"[{i+1}/{len(valid_urls)}] Scraping: {url}")
info = scrape_nikkei_data(url)
if info:
# 指定された順序で辞書を作成
row = {
'ticker': info.get('ticker'),
'社名': info.get('社名'),
'現在値': info.get('現在値'),
'PER': info.get('PER'),
'利回り': info.get('利回り'),
'PBR': info.get('PBR'),
'株式数': info.get('株式数'),
'時価総額': info.get('時価総額')
}
results.append(row)
else:
# エラー時はURLだけ残す
results.append({'ticker': 'Error', '社名': url})
# 結果をDataFrame化して保存
if results:
df_output = pd.DataFrame(results)
df_output.to_excel(OUTPUT_FILE, index=False)
print(f"完了しました。 '{OUTPUT_FILE}' を保存しました。")
# Colabで自動ダウンロード
from google.colab import files
files.download(OUTPUT_FILE)
else:
print("データを取得できませんでした。")
except FileNotFoundError:
print(f"エラー: '{INPUT_FILE}' が見つかりません。ファイルをアップロードしてください。")
except Exception as e:
print(f"予期せぬエラー: {e}")
上場企業の決算情報取得
上場している銘柄のticker (銘柄コード) をアップロードすればEDINETのAPIを利用して各社についての決算情報を取得するコードです (「EDINET取得用.xlsx」という名前のexcelファイルを作り、A列に取得したい企業の銘柄コードを入力してcolaboratoryにアップロードしておきます)。
EDINET APIの利用には、以下のURLから登録及びAPIキーの発行が必要です。
https://api.edinet-fsa.go.jp/api/auth/index.aspx?mode=1
ここでは例示的に、資産合計、経常利益、売上高合計、親会社株主に帰属する当期純利益、流動資産、現金同等物及び短期性有価証券、流動負債、短期借入債務、減価償却費、長期借入債務、期末発行済株式数 - 普通株、売上総利益、未払法人税、販売費及び一般管理費、広告宣伝費、研究開発費、有形固定資産の取得を取得しています。
# 必要なライブラリのインストール
!pip install pandas openpyxl requests beautifulsoup4 lxml
import requests
import pandas as pd
import io
import zipfile
import time
import re
from bs4 import BeautifulSoup
from datetime import datetime, timedelta
# ==========================================
# 1. 設定項目
# ==========================================
API_KEY = "YOUR_API_KEY" # ★ここにAPIキーを入れてください
# 取得期間設定 (2023年4月1日~)
# ※データ量が多い場合、期間を短くして試してください
START_DATE = "2023-04-01"
END_DATE = "2025-03-31"
# アップロードされたファイル名
INPUT_FILE = "EDINET取得用.xlsx"
# ==========================================
# 2. 定数・タグ定義
# ==========================================
EDINET_API_ENDPOINT = "https://disclosure.edinet-fsa.go.jp/api/v2"
# 取得対象とする書類コード (120:有報, 130:訂正, 140:四半期, 180:半期)
TARGET_DOC_TYPE_CODES = ['120', '130', '140', '180']
# 取得したい項目の定義 (日本語カラム名: XBRLタグのキーワード)
# ※BS=貸借対照表(時点), PL=損益計算書(期間), CF=キャッシュフロー(期間)
TARGET_COLUMNS = {
"資産合計": {"keys": ["Assets"], "type": "BS"},
"経常利益": {"keys": ["OrdinaryIncome"], "type": "PL"},
"売上高合計": {"keys": ["NetSales", "OperatingRevenues"], "type": "PL"}, # 売上高または営業収益
"親会社株主に帰属する当期純利益": {"keys": ["ProfitLossAttributableToOwnersOfParent"], "type": "PL"},
"流動資産": {"keys": ["CurrentAssets"], "type": "BS"},
"現金同等物及び短期性有価証券": {"keys": ["CashAndDeposits"], "type": "BS"}, # BSの現金及び預金で代用
"流動負債": {"keys": ["CurrentLiabilities"], "type": "BS"},
"短期借入債務": {"keys": ["ShortTermLoansPayable"], "type": "BS"},
"減価償却費": {"keys": ["Depreciation", "DepreciationAndAmortization"], "type": "CF"}, # CF優先
"長期借入債務": {"keys": ["LongTermLoansPayable"], "type": "BS"},
"期末発行済株式数": {"keys": ["TotalNumberOfIssuedSharesCommonStock"], "type": "BS"}, # バランスシートではないがInstant
"売上総利益": {"keys": ["GrossProfit"], "type": "PL"},
"未払法人税": {"keys": ["IncomeTaxesPayable"], "type": "BS"},
"販売費及び一般管理費": {"keys": ["SellingGeneralAndAdministrativeExpenses"], "type": "PL"},
"広告宣伝費": {"keys": ["AdvertisingExpenses"], "type": "PL"}, # 販管費注記のためXBRLにない場合が多い
"研究開発費": {"keys": ["ResearchAndDevelopmentExpenses"], "type": "PL"}, # 販管費注記
"有形固定資産の取得": {"keys": ["PurchaseOfPropertyPlantAndEquipment"], "type": "CF"}
}
# ==========================================
# 3. 関数定義
# ==========================================
def get_documents_by_date(date_str, api_key):
"""指定日の書類一覧を取得"""
url = f"{EDINET_API_ENDPOINT}/documents.json"
params = {"date": date_str, "type": 2, "Subscription-Key": api_key}
try:
res = requests.get(url, params=params)
if res.status_code == 200:
return res.json().get("results", [])
except:
pass
return []
def extract_values_from_xbrl(doc_id, api_key):
"""
XBRL(Zip)をダウンロードし、BeautifulSoupで数値を抽出する
"""
url = f"{EDINET_API_ENDPOINT}/documents/{doc_id}"
params = {"type": 1, "Subscription-Key": api_key} # type=1: XBRL取得
# 結果格納用辞書 (初期値はNone)
data = {col: None for col in TARGET_COLUMNS.keys()}
try:
res = requests.get(url, params=params)
if res.status_code != 200:
return None
# ZIPチェック
if res.content[:2] != b'PK':
return None
with zipfile.ZipFile(io.BytesIO(res.content)) as z:
# XBRLファイル(.xbrl)を探す (PublicDocフォルダ内のもの)
# 優先順位: 財務諸表本表(ix:なし) > インラインXBRL
xbrl_files = [f for f in z.namelist() if f.endswith('.xbrl') and 'PublicDoc' in f]
if not xbrl_files:
# htmファイル(インラインXBRL)の場合もあるが、今回は.xbrlを対象とする
# (API v2では.xbrlが含まれるのが一般的)
return None
# 複数のxbrlがある場合、ファイル名が一番短いもの(メイン)を選ぶなどの工夫が可能だが
# ここでは最初に見つかったものを解析する
target_xbrl = xbrl_files[0]
with z.open(target_xbrl) as f:
soup = BeautifulSoup(f, 'lxml-xml')
# コンテキスト定義を解析 (id -> 期間/時点情報)
contexts = {}
for ctx in soup.find_all('context'):
ctx_id = ctx.get('id')
# 期間(Duration)か時点(Instant)か
period = ctx.find('period')
if not period:
continue
is_duration = period.find('startDate') is not None
is_instant = period.find('instant') is not None
# コンテキストの属性判定 (Current:当期/当四半期, Consolidated:連結)
# ※EDINETのコンテキストID命名規則に依存する簡易判定
is_current = 'Current' in ctx_id
is_consolidated = 'NonConsolidated' not in ctx_id # NonConsolidatedが含まれなければ連結とみなす
contexts[ctx_id] = {
'type': 'Duration' if is_duration else 'Instant',
'is_current': is_current,
'is_consolidated': is_consolidated
}
# 各項目について値を取得
for col_name, settings in TARGET_COLUMNS.items():
target_type = settings['type'] # BS, PL, CF
keys = settings['keys']
# 候補となるタグを検索
for key in keys:
# 名前空間プレフィックス(jppfs_corなど)を問わず、タグ名末尾が一致するものを探す
tags = soup.find_all(re.compile(f":{key}$"))
found_value = None
# 見つかったタグの中から最適なコンテキストのものを選ぶ
for tag in tags:
val_str = tag.text.strip()
if not val_str or not re.match(r'^-?\d+(\.\d+)?$', val_str):
continue
ctx_id = tag.get('contextRef')
if ctx_id not in contexts:
continue
ctx_info = contexts[ctx_id]
# 優先順位: 連結 > 当期 > 期間タイプ一致
if not ctx_info['is_consolidated']:
continue # 個別はスキップ(連結優先)
# BS項目はInstant, PL/CF項目はDuration
if target_type == 'BS':
if ctx_info['type'] == 'Instant' and ctx_info['is_current']:
found_value = float(val_str)
break # 最適なものが見つかったらループを抜ける
else: # PL, CF
if ctx_info['type'] == 'Duration' and ctx_info['is_current']:
found_value = float(val_str)
break
if found_value is not None:
data[col_name] = found_value
break # キーワードのループを抜ける
except Exception as e:
return None
return data
# ==========================================
# 4. メイン処理
# ==========================================
def main():
print("処理を開始します...")
# 1. 入力Excel読み込み
try:
input_df = pd.read_excel(INPUT_FILE)
# カラム名の正規化(Ticker, 社名 がA,B列にあると仮定)
# 強制的にカラム名をリネームして扱う
input_df.columns = ['Ticker', '社名'] + list(input_df.columns[2:])
# Tickerを文字列4桁に変換
input_df['code_4'] = input_df['Ticker'].astype(str).str[:4]
target_codes = set(input_df['code_4'].tolist())
print(f"入力ファイル読込完了: {len(input_df)}社")
except Exception as e:
print(f"Excel読み込みエラー: {e}")
return
# 2. 日付スキャン
results = []
s_date = datetime.strptime(START_DATE, "%Y-%m-%d")
e_date = datetime.strptime(END_DATE, "%Y-%m-%d") # エラーにならないよう未来の日付でも動くように
if e_date > datetime.now():
e_date = datetime.now()
curr = s_date
print(f"データ収集中 ({START_DATE} ~ {e_date.strftime('%Y-%m-%d')})...")
while curr <= e_date:
d_str = curr.strftime("%Y-%m-%d")
# 進捗表示 (5日おき)
if curr.day % 5 == 0:
print(f" 処理中: {d_str} ...")
docs = get_documents_by_date(d_str, API_KEY)
for doc in docs:
# 1. 証券コードがあるか
sec_code_full = doc.get('secCode')
if not sec_code_full:
continue
sec_code_4 = str(sec_code_full)[:4]
# 2. ターゲット企業か
if sec_code_4 not in target_codes:
continue
# 3. 書類種別チェック (有報、四半期、半期)
if doc['docTypeCode'] not in TARGET_DOC_TYPE_CODES:
continue
# 4. 【重要】臨時報告書の除外 (docTypeCodeだけでなくタイトルでも判定)
if "臨時報告書" in doc['docDescription']:
continue
# 5. 取り下げステータスチェック
if doc['withdrawalStatus'] != '0':
continue
# 条件クリア -> データ取得
print(f" 発見: {d_str} [{sec_code_4}] {doc.get('filerName')} - {doc['docDescription']}")
fin_data = extract_values_from_xbrl(doc['docID'], API_KEY)
if fin_data:
# 基本情報を追加
fin_data['Ticker'] = sec_code_4
# 社名は後でマージするが、確認用にAPI取得名も入れておく
fin_data['社名_API'] = doc.get('filerName')
fin_data['提出日'] = d_str
fin_data['決算期'] = doc.get('periodEnd')
fin_data['書類種別'] = doc['docDescription']
results.append(fin_data)
time.sleep(0.5) # API負荷軽減
curr += timedelta(days=1)
time.sleep(0.5) # API負荷軽減
# 3. データ整形と出力
if results:
df_results = pd.DataFrame(results)
# 入力Excelの社名情報をマージ(Left Join)
# これにより、データが見つからなかった企業もリストには残るが、今回は「取得できたデータ」を出力する形式にする
# ユーザー要望は「EDINET取得用.xlsxに準拠」なので、出力は「Ticker, 社名」を左端にする
# 全てのカラムを定義
output_columns = ['Ticker', '社名', '提出日', '決算期', '書類種別'] + list(TARGET_COLUMNS.keys())
# マージ用にTickerをキーにする
df_final = pd.merge(df_results, input_df[['code_4', '社名']], left_on='Ticker', right_on='code_4', how='left')
# 社名カラムの整理 (Excelの社名を優先、なければAPIの社名)
df_final['社名'] = df_final['社名'].fillna(df_final['社名_API'])
# カラムの存在確認と並び替え
final_cols_exists = [c for c in output_columns if c in df_final.columns]
df_final = df_final[final_cols_exists]
# 欠損値を空欄に
df_final = df_final.fillna("")
output_file = "EDINET_Financial_Data_Final.xlsx"
df_final.to_excel(output_file, index=False)
print(f"\n完了しました。 '{output_file}' をダウンロードしてください。")
print(f"取得件数: {len(df_final)}件")
else:
print("\n指定期間内に対象データが見つかりませんでした。期間、APIキー、対象企業の提出状況を確認してください。")
if __name__ == "__main__":
main()
投資ブログの記事の取得
某大物投資家の全てのブログ記事 (https://daken.hatenablog.jp/) の内容を一括で取得するコードです。
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import re
from datetime import datetime
from google.colab import files
# --- 設定 ---
BASE_URL = "https://daken.hatenablog.jp"
START_DATE = datetime(2017, 2, 16)
END_DATE = datetime(2023, 7, 2)
OUTPUT_FILENAME = "daken_blog_full_content.xlsx" # Excel形式に変更
def get_date_from_url(url):
# URLから日付を抽出
match = re.search(r'/entry/(\d{4})/(\d{2})/(\d{2})', url)
if match:
year, month, day = map(int, match.groups())
return datetime(year, month, day)
return None
def get_article_body(article_url):
# 記事の個別ページにアクセスして本文テキストを取得
try:
response = requests.get(article_url)
if response.status_code != 200:
return "取得失敗 (Status Error)"
soup = BeautifulSoup(response.text, 'html.parser')
# はてなブログの本文は通常 "entry-content" クラスに含まれる
content_div = soup.find("div", class_="entry-content")
if content_div:
# HTMLタグを除去してテキストのみ取得(strip=Trueで前後の空白削除)
return content_div.get_text(strip=True)
else:
return "本文が見つかりませんでした"
except Exception as e:
return f"エラー: {e}"
def scrape_blog_full():
current_url = f"{BASE_URL}/archive"
collected_data = []
page_count = 1
article_count_total = 0
print(f"スクレイピングを開始します... (期間: {START_DATE.strftime('%Y-%m-%d')} 〜 {END_DATE.strftime('%Y-%m-%d')})")
print("※ 全記事の本文を取得するため、完了まで15分程度かかります。")
while current_url:
print(f"アーカイブページ {page_count} を読み込み中...", end=" ")
try:
response = requests.get(current_url)
if response.status_code != 200:
print(f"アーカイブページの取得に失敗しました。")
break
soup = BeautifulSoup(response.text, 'html.parser')
entries = soup.find_all("section", class_="archive-entry")
# アーカイブページ内の記事をループ
for entry in entries:
title_tag = entry.find("a", class_="entry-title-link")
if not title_tag:
continue
article_url = title_tag.get('href')
article_title = title_tag.text.strip()
if article_url.startswith("/"):
article_url = BASE_URL + article_url
# 日付判定
article_date = get_date_from_url(article_url)
if article_date:
if article_date > END_DATE:
continue
if article_date < START_DATE:
print(f"\n指定開始日より古い記事に到達しました。処理を終了します。")
current_url = None
break
# --- 本文の取得処理 ---
# サーバー負荷軽減のため、記事取得ごとに少し待機
time.sleep(1)
body_text = get_article_body(article_url)
collected_data.append({
"Date": article_date.strftime('%Y-%m-%d'),
"Title": article_title,
"URL": article_url,
"Body": body_text # 本文を追加
})
article_count_total += 1
# 進捗表示 (10件ごとにドットを表示するなど)
if article_count_total % 10 == 0:
print(f"({article_count_total}件完了)", end=" ")
if current_url is None:
break
# 次のページへ
next_link = soup.find("a", rel="next")
if not next_link:
# 予備の検出処理
pager = soup.find(class_="pager-next")
if pager: next_link = pager.find("a")
if next_link:
next_url = next_link.get("href")
if next_url.startswith("/"):
current_url = BASE_URL + next_url
else:
current_url = next_url
page_count += 1
print(f"\n-> 次のページへ移動します。")
time.sleep(1)
else:
print("\n全てのページの確認が完了しました。")
current_url = None
except Exception as e:
print(f"\n予期せぬエラーが発生しました: {e}")
break
# --- Excel出力 ---
print(f"\n完了: 合計 {len(collected_data)} 件の記事を取得しました。Excel作成中...")
if collected_data:
df = pd.DataFrame(collected_data)
# Excelファイルとして出力
df.to_excel(OUTPUT_FILENAME, index=False)
print(f"Excelファイル '{OUTPUT_FILENAME}' をダウンロードします。")
files.download(OUTPUT_FILENAME)
else:
print("データが見つかりませんでした。")
# 実行
scrape_blog_full()
投資ブログの記事の取得 (その2)
某大物投資家の全てのブログ記事 (https://plaza.rakuten.co.jp/kabu1000/) の内容を一括で取得するコードです。
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import re
from datetime import datetime
from tqdm import tqdm
# --- 設定 ---
URL_TEMPLATE = "https://plaza.rakuten.co.jp/kabu1000/diary/{}/"
BASE_LIST_URL = "https://plaza.rakuten.co.jp/kabu1000/diaryall/"
START_DATE = datetime(2008, 8, 12)
END_DATE = datetime(2025, 11, 30)
OUTPUT_FILE = "kabu1000_final_v4.xlsx"
SLEEP_TIME = 2.0
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
'Referer': 'https://plaza.rakuten.co.jp/kabu1000/'
}
def fetch_article_content(url):
"""記事URLからタイトルと本文を取得"""
try:
response = requests.get(url, headers=HEADERS, timeout=15)
if response.status_code != 200:
return "Error", f"Error: Status {response.status_code}", url
soup = BeautifulSoup(response.content, 'html.parser')
# --- タイトル取得 ---
# h2, headline, あるいは titleタグから推測
title_tag = soup.find('h2') or soup.find('div', class_='headline')
if title_tag:
title_text = title_tag.text.strip().replace('\n', ' ')
else:
# ページタイトルから抽出
if soup.title:
title_text = soup.title.text.replace(' - 楽天ブログ', '').strip()
else:
title_text = "No Title"
# --- 本文取得 ---
body_content = None
# 作戦1: よくあるクラス名を総当たり
candidates = ['diary_body', 'article_body', 'dText', 'description', 'articleText']
for c in candidates:
found = soup.find('div', class_=c)
if found:
body_content = found
break
# 作戦2: キーワード探索 (kabu1000さんのブログ特化)
# 本文に必ず含まれるであろう単語を探し、その親要素を取得する
if not body_content:
keywords = ["今日の成績", "PF", "配当", "前日比", "コメント", "取引"]
for kw in keywords:
# テキストノードを探す
text_node = soup.find(string=re.compile(kw))
if text_node:
# そのテキストを含んでいる親のdivまたはtdを取得
parent = text_node.find_parent(['div', 'td'])
if parent:
body_content = parent
break
# テキスト抽出処理
if body_content:
# <br>を改行に変換
for br in body_content.find_all("br"):
br.replace_with("\n")
# 不要なスクリプトタグなどを除去
for script in body_content(['script', 'style']):
script.decompose()
body_text = body_content.get_text()
# 空白整理
body_text = re.sub(r'\n\s+\n', '\n\n', body_text).strip()
return title_text, body_text, url
else:
# 本当に見つからない場合
return title_text, "Error: Content structure not found (Manual check required)", url
except Exception as e:
return "Error", f"Error: {e}", url
def main():
print(f"収集設定: {START_DATE.date()} ~ {END_DATE.date()}")
print("【フェーズ1】記事IDリストを作成中...")
collected_items = []
seen_ids = set()
# URL収集 (PageId 0〜80)
for page_id in range(0, 80):
target_url = BASE_LIST_URL if page_id == 0 else f"{BASE_LIST_URL}?PageId={page_id}"
try:
response = requests.get(target_url, headers=HEADERS, timeout=20)
if response.status_code == 404: break
soup = BeautifulSoup(response.content, 'html.parser')
# 12桁ID抽出
links = soup.find_all('a', href=True)
id_pattern = re.compile(r'/diary/(\d{12})')
has_data = False
for a in links:
match = id_pattern.search(a['href'])
if match:
has_data = True
aid = match.group(1)
if aid in seen_ids: continue
dt_str = aid[:8]
try:
dt = datetime.strptime(dt_str, "%Y%m%d")
except: continue
if dt > END_DATE: continue
if dt < START_DATE: continue
seen_ids.add(aid)
collected_items.append({
"date": dt,
"url": URL_TEMPLATE.format(aid)
})
if not has_data: break
time.sleep(1) # 一覧取得は少し速くてOK
except Exception as e:
print(f"List Error: {e}")
break
df = pd.DataFrame(collected_items)
if df.empty:
print("記事が見つかりませんでした。")
return
df = df.sort_values('date')
print(f"\n【フェーズ1完了】有効記事数: {len(df)}件")
print("--------------------------------------------------")
print("【フェーズ2】本文取得(キーワード探索モード)")
print("最初の5件をプレビューします。正常にテキストが取れているか確認してください。")
print("--------------------------------------------------")
final_data = []
records = df.to_dict('records')
try:
for i, record in enumerate(tqdm(records)):
t, b, u = fetch_article_content(record['url'])
# プレビュー表示
if i < 5:
status = "FAILED" if "Error:" in b else "OK"
preview = b.replace('\n', '')[:40]
print(f"\n[Check {i+1}] {status}")
print(f" Date: {record['date'].date()}")
print(f" Body: {preview}...")
final_data.append({
"date": record['date'],
"title": t,
"url": u,
"body": b
})
time.sleep(SLEEP_TIME)
except KeyboardInterrupt:
print("\n中断: データを保存します。")
df_result = pd.DataFrame(final_data)
df_result.to_excel(OUTPUT_FILE, index=False)
print(f"\n全処理完了。ファイル '{OUTPUT_FILE}' をダウンロードしてください。")
if __name__ == "__main__":
main()
投資ブログの記事の取得 (その3)
某大物投資家の全てのブログ記事 (http://blog.livedoor.jp/love_aeria/) の内容を一括で取得するコードです。
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import re
from datetime import datetime
from tqdm import tqdm
# --- 設定 ---
BLOG_BASE_URL = "http://blog.livedoor.jp/love_aeria/"
START_DATE = datetime(2007, 10, 29)
END_DATE = datetime(2018, 3, 30)
OUTPUT_FILE = "love_aeria_final_clean.xlsx"
SLEEP_TIME = 1.5
# 偽装ヘッダー
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
'Referer': BLOG_BASE_URL
}
def generate_month_list(start, end):
"""対象期間の月別アーカイブURL用リスト(YYYY-MM)を生成"""
current = start.replace(day=1)
end_check = end.replace(day=1)
months = []
while current <= end_check:
months.append(current.strftime("%Y-%m"))
if current.month == 12:
current = current.replace(year=current.year + 1, month=1)
else:
current = current.replace(month=current.month + 1)
return months
def get_soup(url):
"""文字化け対策済みBeautifulSoup生成"""
try:
response = requests.get(url, headers=HEADERS, timeout=15)
if response.status_code == 404:
return None
content = response.content
html = None
# 文字コード判定の優先順位
encodings_to_try = ['euc-jp', 'utf-8', 'shift_jis', 'cp932']
for enc in encodings_to_try:
try:
html = content.decode(enc)
break
except UnicodeDecodeError:
continue
if html is None:
response.encoding = response.apparent_encoding
html = response.text
return BeautifulSoup(html, 'html.parser')
except Exception as e:
return None
def fetch_article_body(url):
"""記事URLからタイトル・日付・本文を取得"""
soup = get_soup(url)
if not soup:
return "Error", None, "Connection Failed"
# --- タイトル ---
title = "No Title"
title_tag = soup.find(class_=re.compile(r'(article-title|entry-title|entry-header)'))
if not title_tag:
title_tag = soup.find('h2') or soup.find('h3')
if title_tag:
title = title_tag.text.strip()
# --- 日付 ---
date_obj = None
date_tag = soup.find(class_=re.compile(r'(article-date|date|entry-date)'))
if date_tag:
date_text = date_tag.text.strip()
nums = re.findall(r'\d+', date_text)
if len(nums) >= 3:
try:
y, m, d = int(nums[0]), int(nums[1]), int(nums[2])
date_obj = datetime(y, m, d)
except:
pass
# --- 本文 ---
body_text = "No Content"
body_div = soup.find(class_=re.compile(r'(article-body-inner|article-body|entry-body|main)'))
if not body_div:
body_div = soup.find('div', class_='entry-content')
if body_div:
for br in body_div.find_all("br"):
br.replace_with("\n")
# 不要タグ除去
for tag in body_div.find_all(['script', 'style', 'iframe']):
tag.decompose()
# エラー回避しながら広告div等を除去
for junk in body_div.find_all(['div', 'span']):
try:
if junk is None or not hasattr(junk, 'get'):
continue
classes = junk.get('class', [])
if classes and any('ad' in c.lower() for c in classes):
junk.decompose()
except Exception:
continue
body_text = body_div.get_text().strip()
body_text = re.sub(r'\n\s+\n', '\n\n', body_text)
return title, date_obj, body_text
def main():
print(f"収集期間: {START_DATE.date()} ~ {END_DATE.date()}")
print("はてなブックマーク除外・文字化け対策済みモードで実行中...")
# 1. 月リスト生成
target_months = generate_month_list(START_DATE, END_DATE)
all_article_urls = []
# 2. URL収集 (Phase 1)
print("\n【フェーズ1】記事URLの収集を開始します...")
for yyyy_mm in tqdm(target_months):
month_url = f"{BLOG_BASE_URL}archives/{yyyy_mm}.html"
page = 1
while True:
target_url = month_url if page == 1 else f"{month_url}?p={page}"
soup = get_soup(target_url)
if not soup:
break
links = soup.find_all('a', href=True)
page_urls = set()
article_pattern = re.compile(r'/archives/(\d+)\.html')
has_article = False
for a in links:
href = a['href']
# --- 修正箇所: フィルタリングを厳格化 ---
# 1. 明確に「はてなブックマーク」を除外
if "b.hatena.ne.jp" in href:
continue
# 2. URLがブログのベースURLで始まっているかチェック (前方一致)
if not href.startswith(BLOG_BASE_URL):
continue
# 3. 記事IDパターンを含むかチェック
if article_pattern.search(href):
clean_url = href.split('#')[0]
page_urls.add(clean_url)
has_article = True
if not has_article:
break
all_article_urls.extend(list(page_urls))
# 次ページ判定
pager = soup.find(class_=re.compile(r'(pager|pagination|paging)'))
next_exists = False
if pager:
if pager.find('a', string=re.compile(r'次|Next|>')):
next_exists = True
elif pager.find('a', href=re.compile(f'p={page+1}')):
next_exists = True
if not next_exists:
break
page += 1
time.sleep(1)
unique_urls = sorted(list(set(all_article_urls)))
# URLチェック(念のためログ出力)
hatena_count = sum(1 for u in unique_urls if "hatena" in u)
print(f"\n【フェーズ1完了】URL収集数: {len(unique_urls)}件")
if hatena_count > 0:
print(f"警告: まだ {hatena_count} 件のはてなURLが含まれています。ロジックを確認してください。")
else:
print("確認: はてなブックマークURLは正常に除外されました。")
# 3. 本文取得 (Phase 2)
print(f"【フェーズ2】本文取得を開始します (予想時間: {len(unique_urls)*SLEEP_TIME/60:.1f}分)")
print("--------------------------------------------------")
final_data = []
try:
for i, url in enumerate(tqdm(unique_urls)):
title, date_obj, body = fetch_article_body(url)
is_target = True
if date_obj:
if date_obj < START_DATE or date_obj > END_DATE:
is_target = False
if is_target:
final_data.append({
"Date": date_obj if date_obj else "Check Manually",
"Title": title,
"URL": url,
"Body": body
})
time.sleep(SLEEP_TIME)
except KeyboardInterrupt:
print("\n中断されました。取得済みのデータのみ保存します。")
# Excel保存
if final_data:
df = pd.DataFrame(final_data)
df['Date'] = pd.to_datetime(df['Date'], errors='coerce')
df = df.sort_values('Date')
# 制御文字除去
def clean_text(text):
if isinstance(text, str):
return "".join(ch for ch in text if ch == '\n' or ch == '\t' or ch >= ' ')
return text
df['Title'] = df['Title'].apply(clean_text)
df['Body'] = df['Body'].apply(clean_text)
df.to_excel(OUTPUT_FILE, index=False)
print(f"\n完了しました。ファイル '{OUTPUT_FILE}' をダウンロードしてください。")
else:
print("データが見つかりませんでした。")
if __name__ == "__main__":
main()
投資ブログの記事の取得 (その4)
某大物投資家の全てのブログ記事 (http://stfkabu.blog.fc2.com/) の内容を一括で取得するコードです。
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import re
from datetime import datetime
from tqdm import tqdm
# --- 設定 ---
BASE_URL_TEMPLATE = "http://stfkabu.blog.fc2.com/blog-entry-{}.html"
START_DATE = datetime(2014, 1, 5)
END_DATE = datetime(2017, 1, 4)
OUTPUT_FILE = "stfkabu_fc2_fixed.xlsx"
SLEEP_TIME = 2.0
# FC2ブログは更新が止まるとトップに広告が出るため、User-AgentをPCに偽装
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
'Referer': 'http://stfkabu.blog.fc2.com/'
}
def get_soup(url):
"""文字化け対策を行いつつHTMLを取得"""
try:
response = requests.get(url, headers=HEADERS, timeout=15)
# 記事が存在しない(404)場合はNone
if response.status_code == 404:
return None
# ページ遷移系リダイレクトの場合も無視
if "location.href" in response.text and len(response.text) < 500:
return None
# 文字コード判定 (FC2はEUC-JPが多いが、テンプレートによる)
# contentから推測
content = response.content
soup = None
# 優先順位: EUC-JP -> UTF-8 -> Shift_JIS
encodings = ['euc-jp', 'utf-8', 'shift_jis', 'cp932']
for enc in encodings:
try:
html = content.decode(enc)
soup = BeautifulSoup(html, 'html.parser')
# 正しくデコードできたか、タイトルタグ等で簡易チェック
if soup.find('head'):
break
except:
continue
if not soup:
soup = BeautifulSoup(response.content, 'html.parser')
return soup
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
def clean_text(text):
"""テキストの整形"""
if not text: return ""
# 連続する改行や空白を整理
text = re.sub(r'\s+', ' ', text).strip()
return text
def fetch_entry(entry_id):
"""記事IDを指定してデータを取得"""
url = BASE_URL_TEMPLATE.format(entry_id)
soup = get_soup(url)
if not soup:
return None # 存在しないID
# --- 重要: 広告(スポンサーサイト)の排除 ---
# FC2は更新停止ブログのトップに "class=ad_..." や "スポンサーサイト" を含むdivを挿入する
# これらを先にDOMから削除してしまう
# 1. 明確な広告クラスを削除
for ad in soup.find_all(class_=re.compile(r'(ad_table|ad_overlay|fc2_footer)')):
ad.decompose()
# 2. タイトルが「スポンサーサイト」となっているh2/h3とその親要素を削除
for header in soup.find_all(['h2', 'h3']):
if "スポンサーサイト" in header.text:
# その親のdivごと消し去る(構造によるが、通常は親divが広告ブロック)
parent = header.find_parent('div')
if parent:
parent.decompose()
else:
header.decompose()
# --- タイトル取得 ---
# 広告を消した後で、最初に見つかる記事タイトル候補を探す
title = "No Title"
# FC2の一般的なクラス名
title_tag = soup.find(class_=re.compile(r'(entry_header|entry_title|sub_title)'))
if not title_tag:
# クラスがない場合、h2かh3を探す
# ただしサイドバーのタイトル(アーカイブ、プロフィール等)を拾わないよう注意
# 通常、メインカラム(main_contents)の中にあるはず
main_area = soup.find(id=re.compile(r'(main|content)'))
if main_area:
title_tag = main_area.find(['h2', 'h3'])
else:
title_tag = soup.find(['h2', 'h3'])
if title_tag:
title = title_tag.text.strip()
# --- 日付取得 ---
date_obj = None
date_str = ""
# パターンA: 特定のクラスを探す
date_tag = soup.find(class_=re.compile(r'(entry_date|date)'))
if date_tag:
date_str = date_tag.text
else:
# パターンB: タイトル周辺のテキストから正規表現で探す
# ページ全体のテキストから "2014-01-05" や "2014/01/05" を探す
# ただしサイドバーの日付を拾うリスクがあるため、メインカラム優先
target_area = soup.find(id=re.compile(r'(main|content)')) or soup
text_dump = target_area.get_text()
match = re.search(r'(\d{4})[-/.](\d{1,2})[-/.](\d{1,2})', text_dump)
if match:
date_str = f"{match.group(1)}-{match.group(2)}-{match.group(3)}"
# 日付パース
if date_str:
# 2014-01-05, 2014/01/05, 2014.01.05 などに対応
nums = re.findall(r'\d+', date_str)
if len(nums) >= 3:
try:
date_obj = datetime(int(nums[0]), int(nums[1]), int(nums[2]))
except:
pass
# --- 本文取得 ---
body_text = "No Content"
# FC2の本文クラス
body_div = soup.find(class_=re.compile(r'(entry_body|main_text)'))
if not body_div:
# なければメインエリアから推測
main_area = soup.find(id=re.compile(r'(main|content)'))
if main_area:
# タイトルと日付以外の長いテキストを持つdivなどを探す簡易ロジック
# 今回はFC2特有の 'entry_body' が無ければ、構造が変わっている可能性大
# entry_content クラスも試す
body_div = main_area.find(class_='entry-content')
if body_div:
# <br>を改行に
for br in body_div.find_all("br"):
br.replace_with("\n")
# スクリプト除去
for s in body_div(['script', 'style']):
s.decompose()
# 残った広告テキスト除去("上記の広告は...")
full_text = body_div.get_text()
if "上記の広告は" in full_text and "更新のないブログに表示" in full_text:
# これは広告divそのものを取得してしまっている(decompose漏れ)
return None # この記事は無効としてスキップ(または再試行ロジックが必要だが今回はスキップ)
body_text = clean_text(full_text)
return {
"title": title,
"date": date_obj,
"url": url,
"body": body_text
}
def main():
print(f"収集設定: {START_DATE.date()} ~ {END_DATE.date()}")
print("方式: 記事ID (blog-entry-X.html) の総当たりスキャン")
print("--------------------------------------------------")
collected_data = []
# 記事IDの範囲設定
# ユーザー情報: 2017年1月で entry-37 なので、数は少ない。
# 念のため ID 1 から 100 までスキャンすれば十分カバーできる。
# (37以降も存在する可能性があるため余裕を持つ)
SCAN_RANGE = range(1, 101)
for entry_id in tqdm(SCAN_RANGE):
data = fetch_entry(entry_id)
# データが取れなかった(404等)場合
if data is None:
continue
# 日付フィルタリング
# 日付が取得できなかった場合(None)は、IDが若いなら期間内とみなすか、除外するか。
# 今回は厳密に日付が取れたもの、かつ期間内のものだけを出力する
if data['date']:
if START_DATE <= data['date'] <= END_DATE:
collected_data.append(data)
# プレビュー
if len(collected_data) <= 3:
print(f"\n[Preview ID={entry_id}] {data['title']} ({data['date'].date()})")
print(f"Body: {data['body'][:30]}...")
# サーバー負荷軽減
time.sleep(SLEEP_TIME)
if not collected_data:
print("\nデータが見つかりませんでした。")
return
# DataFrame化と保存
df = pd.DataFrame(collected_data)
# 整形
df = df[['date', 'title', 'url', 'body']]
df.columns = ['Date', 'Title', 'URL', 'Body']
df = df.sort_values('Date')
df.to_excel(OUTPUT_FILE, index=False)
print(f"\n全処理完了。合計 {len(df)} 件の記事を取得しました。")
print(f"ファイル '{OUTPUT_FILE}' をダウンロードしてください。")
if __name__ == "__main__":
main()
投資ブログの記事の取得 (その5)
某大物投資家の全てのブログ記事 (https://plaza.rakuten.co.jp/mikimaru71/) の内容を一括で取得するコードです。
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import re
from datetime import datetime
from tqdm import tqdm
# --- 設定 ---
BLOG_USER_ID = "mikimaru71"
URL_TEMPLATE = f"https://plaza.rakuten.co.jp/{BLOG_USER_ID}/diary/{{}}/"
BASE_LIST_URL = f"https://plaza.rakuten.co.jp/{BLOG_USER_ID}/diaryall/"
START_DATE = datetime(2005, 10, 29)
END_DATE = datetime(2025, 12, 5)
OUTPUT_FILE = "mikimaru71_fixed_final.xlsx"
SLEEP_TIME = 2.0
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
'Referer': f'https://plaza.rakuten.co.jp/{BLOG_USER_ID}/'
}
def clean_text(text):
"""テキスト整形用"""
if not text: return ""
text = re.sub(r'\n\s+\n', '\n\n', text)
return text.strip()
def fetch_article_content(url):
"""記事URLからタイトルと本文を取得(エラー修正・ヘッダー除外版)"""
# 変数を初期化(UnboundLocalError回避のため)
title_text = "No Title"
try:
response = requests.get(url, headers=HEADERS, timeout=15)
if response.status_code != 200:
return "Error", f"Error: Status {response.status_code}", url
soup = BeautifulSoup(response.content, 'html.parser')
# --- 修正箇所: ヘッダー/サイドバーの安全な削除 ---
# エラーの原因だった「NoneTypeチェック」を追加
for junk in soup.find_all(['div', 'table', 'td']):
try:
if junk is None: continue
# .get()を持たない要素(NavigableStringなど)をスキップ
if not hasattr(junk, 'get'): continue
classes = str(junk.get('class', '')).lower()
ids = str(junk.get('id', '')).lower()
# ヘッダー、フッター、サイドバー、カレンダー、広告などを削除
if any(x in classes or x in ids for x in ['header', 'footer', 'side', 'menu', 'nav', 'banner', 'adbox']):
junk.decompose()
except Exception:
continue
# --- タイトル取得 ---
# 削除されずに残ったh2などを探す
title_tag = soup.find('h2') or \
soup.find('div', class_='headline') or \
soup.find('td', class_='diaryTitle') or \
soup.find('div', class_='diaryTitle')
if title_tag:
title_text = title_tag.text.strip()
else:
# タグがない場合はページタイトルから
if soup.title:
full_title = soup.title.text
parts = full_title.split(' - ')
if len(parts) > 0:
title_text = parts[0].strip()
# --- 本文取得 ---
body_text = "Error: Body Not Found"
body_content = None
# 作戦1: クラス名検索
candidates = ['diary_body', 'article_body', 'plaza_diary_main']
for c in candidates:
found = soup.find('div', class_=c)
if found:
body_content = found
break
# 作戦2: タイトルからの逆算(サンドイッチ法)
if not body_content and title_tag:
# タイトルの次にある要素群をかき集める
siblings = []
curr = title_tag.next_sibling
while curr:
# 終了条件のキーワード
text_content = getattr(curr, 'text', '')
if text_content and any(x in text_content for x in ['コメント(', 'トラックバック(', 'テーマ:', '2005年', '2006年']):
break
siblings.append(curr)
curr = curr.next_sibling
if siblings:
temp_text = ""
for s in siblings:
if hasattr(s, 'get_text'):
temp_text += s.get_text() + "\n"
else:
temp_text += str(s)
# HTMLタグ除去してテキスト化
clean_soup = BeautifulSoup(temp_text, 'html.parser')
body_text = clean_text(clean_soup.get_text())
# 本文が短すぎる場合は採用しない
if len(body_text) > 20:
return title_text, body_text, url
# 作戦3: 最終手段 (文字数最大ブロック)
if not body_content:
max_len = 0
best_tag = None
for tag in soup.find_all(['div', 'td']):
if tag == title_tag: continue
# ここでもNoneチェック
if not hasattr(tag, 'get_text'): continue
t_len = len(tag.get_text())
if t_len > max_len:
max_len = t_len
best_tag = tag
if best_tag and max_len > 50:
body_content = best_tag
# テキスト抽出
if body_content:
# 整形
for br in body_content.find_all("br"):
br.replace_with("\n")
for trash in body_content(['script', 'style', 'iframe', 'form']):
trash.decompose()
raw_text = body_content.get_text()
# タイトルが混ざっていたら削除
if title_text in raw_text and len(title_text) > 2:
raw_text = raw_text.replace(title_text, '', 1)
body_text = clean_text(raw_text)
return title_text, body_text, url
except Exception as e:
# title_textは冒頭で定義済みなのでエラーにならない
return title_text, f"Error: {e}", url
def main():
print(f"収集設定: {START_DATE.date()} ~ {END_DATE.date()}")
print("【フェーズ1】URLリスト生成...")
collected_items = []
seen_ids = set()
MAX_PAGES = 500
for page_id in range(0, MAX_PAGES):
target_url = BASE_LIST_URL if page_id == 0 else f"{BASE_LIST_URL}?PageId={page_id}"
if page_id % 10 == 0:
print(f"Scanning Page {page_id}...", end="\r")
try:
response = requests.get(target_url, headers=HEADERS, timeout=20)
if response.status_code == 404:
print(f"\nPage {page_id} not found. List collection finished.")
break
soup = BeautifulSoup(response.content, 'html.parser')
links = soup.find_all('a', href=True)
id_pattern = re.compile(r'/diary/(\d{12})')
has_article = False
for a in links:
match = id_pattern.search(a['href'])
if match:
has_article = True
aid = match.group(1)
if aid in seen_ids: continue
try:
dt = datetime.strptime(aid[:8], "%Y%m%d")
except: continue
if dt > END_DATE: continue
if dt < START_DATE: continue
correct_url = URL_TEMPLATE.format(aid)
seen_ids.add(aid)
collected_items.append({"date": dt, "url": correct_url})
if not has_article:
print(f"\nNo articles found on page {page_id}. Finished.")
break
time.sleep(1)
except Exception as e:
print(f"\nList Error: {e}")
break
df = pd.DataFrame(collected_items)
if df.empty:
print("\n記事が見つかりませんでした。")
return
df = df.sort_values('date')
print(f"\n【フェーズ1完了】有効記事数: {len(df)}件")
print("--------------------------------------------------")
print("【フェーズ2】本文取得(修正版)")
print("--------------------------------------------------")
final_data = []
records = df.to_dict('records')
try:
for i, record in enumerate(tqdm(records)):
t, b, u = fetch_article_content(record['url'])
final_data.append({
"Date": record['date'],
"Title": t,
"URL": u,
"Body": b
})
# プレビュー
if i < 5:
status = "OK"
if t == b: status = "SAME(BAD)"
elif "Error" in b: status = "ERROR"
elif len(b) < 10: status = "SHORT"
print(f"\n[Preview {status}]")
print(f" Title: {t}")
print(f" Body : {b[:40].replace('\n', '')}...")
time.sleep(SLEEP_TIME)
except KeyboardInterrupt:
print("\n中断: データを保存します。")
df_result = pd.DataFrame(final_data)
df_result.to_excel(OUTPUT_FILE, index=False)
print(f"\n全処理完了。ファイル '{OUTPUT_FILE}' をダウンロードしてください。")
if __name__ == "__main__":
main()
投資ブログの記事の取得 (その6)
某大物投資家の全てのブログ記事 (http://yuyutoushi.blog.fc2.com/) の内容を一括で取得するコードです。
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import re
from datetime import datetime
from tqdm import tqdm
# --- 設定 ---
START_ID = 1
END_ID = 593
BASE_URL_TEMPLATE = "http://yuyutoushi.blog.fc2.com/blog-entry-{}.html"
OUTPUT_FILE = "yuyutoushi_complete_fixed.xlsx"
SLEEP_TIME = 1.0
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
'Referer': 'http://yuyutoushi.blog.fc2.com/'
}
def clean_text(text):
"""テキスト整形"""
if not text: return ""
text = text.replace('\ufffd', '')
text = re.sub(r'\n\s+\n', '\n\n', text)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def get_soup(url):
"""文字コード対策済みHTML取得"""
try:
response = requests.get(url, headers=HEADERS, timeout=15)
if response.status_code == 404:
return None
content = response.content
soup = None
# HTML内のmetaタグを尊重しつつ、EUC-JPもケアする
encodings = ['utf-8', 'euc-jp', 'shift_jis', 'cp932']
for enc in encodings:
try:
html = content.decode(enc)
soup = BeautifulSoup(html, 'html.parser')
if soup.title and html.count('\ufffd') < 10:
break
except:
continue
if not soup:
soup = BeautifulSoup(response.content, 'html.parser')
return soup
except Exception as e:
return None
def fetch_entry(entry_id):
"""記事データ取得(広告回避・エラー対策済み)"""
url = BASE_URL_TEMPLATE.format(entry_id)
soup = get_soup(url)
if not soup: return None
# --- 1. 本物の記事ブロックを特定する ---
# ページ内には「スポンサーサイト」のヘッダーと「本物の記事」のヘッダーがある。
# 本物を特定してから、それに関連するBodyとDateを取得
real_header = None
real_title = "No Title"
# 全てのヘッダーを取得
headers = soup.find_all('div', class_='entry_header')
for header in headers:
text = header.get_text().strip()
if "スポンサーサイト" in text:
continue # 広告は無視
# これが本物の記事
real_header = header
# タイトル抽出 (h2タグなど)
title_tag = header.find(['h2', 'h3'])
if title_tag:
real_title = title_tag.get_text().strip()
else:
real_title = text
break
if not real_header:
return None # 本物の記事が見つからない場合
# --- 2. 本文 (Body) の取得 ---
# ヘッダーの直後にある entry_body を探す
real_body = "No Content"
body_div = real_header.find_next_sibling('div', class_='entry_body')
if body_div:
# さらにその中の main_text を優先
target_div = body_div.find('div', class_='main_text')
if not target_div:
target_div = body_div
# 不要要素の削除(ここでエラーが出ていた箇所を修正)
# 削除対象: ソーシャルボタン、広告、スクリプト
junk_candidates = target_div.find_all(['div', 'ul', 'script', 'noscript', 'iframe'])
for junk in junk_candidates:
try:
# 安全確認: 要素が存在し、属性にアクセスできるか
if junk is None: continue
# クラス属性の取得(安全に)
classes = junk.get('class', [])
# リストなら文字列に結合、Noneなら空文字に
class_str = str(classes).lower() if classes else ""
id_str = str(junk.get('id', '')).lower()
# 削除判定
if 'fc2_footer' in class_str or 'fc2button' in class_str or \
'twitter' in class_str or 'facebook' in class_str or \
'ad_' in class_str or 'analyzer' in id_str:
junk.decompose()
except Exception:
continue # エラーが出たらその要素は無視して次へ
# <br>を改行に変換
for br in target_div.find_all("br"):
br.replace_with("\n")
real_body = clean_text(target_div.get_text())
# --- 3. 日付 (Date) の取得 ---
# 本文エリアのさらに次にある entry_date を探す
real_date = "Unknown"
date_div = None
if body_div:
date_div = body_div.find_next_sibling('div', class_='entry_date')
if date_div:
# "2013/04/12 (金) ..." から日付のみ抽出
date_text = date_div.get_text()
match = re.search(r'(\d{4})[-/.](\d{1,2})[-/.](\d{1,2})', date_text)
if match:
real_date = f"{match.group(1)}-{match.group(2)}-{match.group(3)}"
# 最終的なタイトルが本文に残っていたら削除
if real_title in real_body[:len(real_title)+20]:
real_body = real_body.replace(real_title, "", 1).strip()
return {
"title": real_title,
"date": real_date,
"url": url,
"body": real_body
}
def main():
print(f"対象範囲: blog-entry-{START_ID} ~ {END_ID}")
print("エラー修正版: 広告スキップ&安全な本文抽出")
print("--------------------------------------------------")
collected_data = []
# ユーザー指定範囲: 1〜593
TARGET_RANGE = range(START_ID, END_ID + 1)
for entry_id in tqdm(TARGET_RANGE):
data = fetch_entry(entry_id)
if data is None:
continue
collected_data.append(data)
# 最初の1件だけプレビューして確認
if entry_id == 1:
print(f"\n[ID={entry_id}] {data['title']} ({data['date']})")
print(f"Body: {data['body'][:50]}...")
time.sleep(SLEEP_TIME)
if not collected_data:
print("\n記事が見つかりませんでした。")
return
# 保存
df = pd.DataFrame(collected_data)
df = df[['date', 'title', 'url', 'body']]
df.columns = ['Date', 'Title', 'URL', 'Body']
# 日付順ソート
try:
df['temp_date'] = pd.to_datetime(df['Date'], errors='coerce')
df = df.sort_values('temp_date').drop(columns=['temp_date'])
except:
pass
df.to_excel(OUTPUT_FILE, index=False)
print(f"\n全処理完了。合計 {len(df)} 件の記事を取得しました。")
print(f"ファイル '{OUTPUT_FILE}' をダウンロードしてください。")
if __name__ == "__main__":
main()
投資ブログの記事の取得 (その7)
某大物投資家の全てのブログ記事 (http://paris-texas.seesaa.net/) の内容を一括で取得するコードです。
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
from datetime import datetime
from dateutil.relativedelta import relativedelta
from tqdm import tqdm # 進捗バー表示用
from google.colab import files
# --- 設定 ---
BASE_URL = "http://paris-texas.seesaa.net/"
START_DATE = datetime(2006, 1, 29)
END_DATE = datetime(2024, 8, 30)
OUTPUT_FILE = "paris_texas_full_content.xlsx"
# サーバー負荷軽減のための待機時間(秒)
# ※本文取得はアクセス数が多いので、1.5秒以上空ける
SLEEP_TIME = 1.5
# 偽装ヘッダー
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
def get_article_body(url):
"""指定された記事URLから本文を取得する"""
try:
response = requests.get(url, headers=HEADERS, timeout=10)
if response.status_code != 200:
return "Error: Status Code " + str(response.status_code)
# 文字化け対策 (Shift_JIS / CP932)
response.encoding = 'cp932'
soup = BeautifulSoup(response.text, 'html.parser')
# Seesaaブログの記事本文は <div class="text"> 内にある
# ただし、記事ページでは .blogbody > .text の構造
# 記事エリアを特定
blog_body = soup.find('div', class_='blogbody')
if not blog_body:
return "Error: Layout Mismatch"
text_div = blog_body.find('div', class_='text')
if text_div:
# 不要な要素(広告タグ、スクリプトなど)を削除
for script in text_div.find_all(['script', 'style']):
script.decompose()
# Seesaa特有の広告div (id="article-ad") を削除
ad_div = text_div.find('div', id='article-ad')
if ad_div:
ad_div.decompose()
# ソーシャルブックマークなどのdivを削除
bookmark_div = text_div.find('div', class_='bookmark')
if bookmark_div:
bookmark_div.decompose()
# テキストを抽出(改行を維持して整形)
return text_div.get_text(separator='\n', strip=True)
else:
return "本文が見つかりませんでした"
except Exception as e:
return f"Error: {e}"
def get_articles_from_archive(year, month):
"""アーカイブからURLリストを取得(フェーズ1用)"""
articles_in_month = []
page = 1
while True:
archive_url = f"{BASE_URL}archives/{year}{month:02d}-{page}.html"
try:
response = requests.get(archive_url, headers=HEADERS, timeout=10)
if response.status_code == 404:
break
response.encoding = 'cp932'
soup = BeautifulSoup(response.text, 'html.parser')
date_elements = soup.find_all('h2', class_='date')
if not date_elements:
break
found_new_articles = False
for date_el in date_elements:
date_text = date_el.text.strip()
try:
article_date = datetime.strptime(date_text, '%Y年%m月%d日')
except ValueError:
continue
blogbody = date_el.find_next_sibling('div', class_='blogbody')
if blogbody:
title_h3 = blogbody.find('h3', class_='title')
if title_h3:
link = title_h3.find('a')
if link:
url = link.get('href')
title = link.text.strip()
if START_DATE <= article_date <= END_DATE:
articles_in_month.append({
"Date": article_date.strftime('%Y-%m-%d'),
"Title": title,
"URL": url,
"Body": "" # 後で取得
})
found_new_articles = True
if not found_new_articles and len(articles_in_month) == 0:
if page == 1: break
break
page += 1
time.sleep(1.0) # アーカイブ巡回は少し速くても可
except Exception:
break
return articles_in_month
def main():
print(f"収集期間: {START_DATE.strftime('%Y-%m-%d')} ~ {END_DATE.strftime('%Y-%m-%d')}")
# --- フェーズ1: URLリストの作成 ---
print("\n【フェーズ1】記事URLのリストを作成しています...")
all_articles = []
current_iter_date = START_DATE.replace(day=1)
end_iter_date = END_DATE.replace(day=1) + relativedelta(months=1)
months_to_scan = []
while current_iter_date < end_iter_date:
months_to_scan.append(current_iter_date)
current_iter_date += relativedelta(months=1)
# 月ごとのアーカイブを巡回
for target_date in tqdm(months_to_scan, desc="URL収集"):
year = target_date.year
month = target_date.month
articles = get_articles_from_archive(year, month)
all_articles.extend(articles)
print(f"URL収集完了: {len(all_articles)} 件の記事が見つかりました。")
if not all_articles:
print("記事が見つかりませんでした。終了します。")
return
# --- フェーズ2: 本文の取得 ---
print("\n【フェーズ2】各記事の本文を取得しています(時間がかかります)...")
# データフレーム作成
df = pd.DataFrame(all_articles)
# URLリストに対してループ処理(進捗バー付き)
bodies = []
urls = df['URL'].tolist()
try:
for url in tqdm(urls, desc="本文取得"):
body_text = get_article_body(url)
bodies.append(body_text)
time.sleep(SLEEP_TIME) # サーバー負荷軽減のため待機
except KeyboardInterrupt:
print("\n中断されました。取得済みのデータまでで保存します。")
# 中断した場合、長さが合わないのでリストを調整
df = df.iloc[:len(bodies)]
# データフレームに本文を追加
df['Body'] = bodies
# 日付順にソート
df = df.sort_values('Date')
# 列の順番を整理 (日付, タイトル, URL, 本文)
df = df[['Date', 'Title', 'URL', 'Body']]
# Excel出力
df.to_excel(OUTPUT_FILE, index=False)
print(f"\n全処理完了。ファイル {OUTPUT_FILE} を保存しました。")
# ダウンロード
try:
files.download(OUTPUT_FILE)
except ImportError:
pass
if __name__ == "__main__":
main()
投資ブログの記事の取得 (その8)
某大物投資家の全てのブログ記事 (http://blog.livedoor.jp/kessanderby/) の内容を一括で取得するコードです。
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
from datetime import datetime
from dateutil.relativedelta import relativedelta
from google.colab import files
from tqdm import tqdm
# --- 設定 ---
BASE_URL = "http://blog.livedoor.jp/kessanderby/"
START_DATE = datetime(2015, 5, 16)
END_DATE = datetime(2018, 8, 25)
OUTPUT_FILENAME = "kessanderby_fixed_utf8.xlsx"
SLEEP_TIME = 1.0
def get_soup(url):
"""URLからBeautifulSoupオブジェクトを作成(文字化け対策)"""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
# 【重要修正】 文字コードを強制的にUTF-8に設定
# LivedoorブログはUTF-8ですが、自動判定がLatin-1(ISO-8859-1)と誤認して
# 文字化け(第...)することがあるため、ここで明示
response.encoding = 'utf-8'
return BeautifulSoup(response.text, 'html.parser')
except Exception as e:
print(f"Connection Error ({url}): {e}")
return None
def get_article_body(url):
"""記事URLから本文テキストを取得"""
soup = get_soup(url)
if not soup:
return "取得エラー"
# 記事本文エリアの特定
# 提供されたHTML構造に基づく (article-body-inner または article-body)
body_div = soup.find('div', class_='article-body-inner')
if not body_div:
body_div = soup.find('div', class_='article-body')
if body_div:
# テキスト抽出(セパレータに改行を入れる)
return body_div.get_text(separator='\n', strip=True)
else:
return "本文エリアが見つかりませんでした"
def scrape_article_list():
"""フェーズ1: 対象期間の記事URLとタイトルを収集"""
all_articles = []
current_month = START_DATE.replace(day=1)
end_month_limit = END_DATE.replace(day=1) + relativedelta(months=1)
print("【フェーズ1】記事URLリストを作成中(文字化け対策済み)...")
while current_month < end_month_limit:
year_month_str = current_month.strftime('%Y-%m')
archive_url = f"{BASE_URL}archives/{year_month_str}.html"
page_num = 1
has_next_page = True
while has_next_page:
target_url = archive_url if page_num == 1 else f"{archive_url}?p={page_num}"
soup = get_soup(target_url)
if not soup:
break
# 記事一覧の取得
articles = soup.find_all('article', class_='article')
if not articles:
has_next_page = False
break
for article in articles:
# 日付チェック
date_elem = article.find('time')
if not date_elem: continue
date_str = date_elem.get('datetime')
if date_str:
try:
article_date = datetime.fromisoformat(date_str).replace(tzinfo=None)
except:
continue
else:
try:
article_date = datetime.strptime(date_elem.text.strip(), '%Y年%m月%d日')
except:
continue
# 期間判定
check_start = START_DATE.replace(hour=0, minute=0, second=0, microsecond=0)
check_end = END_DATE.replace(hour=23, minute=59, second=59, microsecond=999999)
if check_start <= article_date <= check_end:
title_h1 = article.find('h1', class_='article-title')
if title_h1:
link = title_h1.find('a')
if link:
all_articles.append({
'日付': article_date.strftime('%Y-%m-%d'),
'タイトル': link.text.strip(),
'URL': link['href'],
'本文': '' # フェーズ2で取得
})
# 次ページ判定
pager = soup.find('div', class_='pager')
next_link = None
if pager:
next_link = pager.find('a', string=lambda t: t and '次のページ' in t)
if not next_link:
li_next = pager.find('li', class_='next')
if li_next: next_link = li_next.find('a')
if next_link:
page_num += 1
time.sleep(SLEEP_TIME)
else:
has_next_page = False
current_month += relativedelta(months=1)
time.sleep(SLEEP_TIME)
return pd.DataFrame(all_articles)
# --- メイン処理 ---
# 1. URLリスト取得
df = scrape_article_list()
if not df.empty:
print(f"\n【フェーズ1完了】 {len(df)} 件の記事が見つかりました。")
print(f"【フェーズ2】 本文を取得します(予想時間: 約 {len(df) * SLEEP_TIME / 60:.1f} 分)...")
# 2. 本文取得(プログレスバー表示)
bodies = []
for url in tqdm(df['URL']):
body_text = get_article_body(url)
bodies.append(body_text)
time.sleep(SLEEP_TIME)
df['本文'] = bodies
# 日付順にソート
df = df.sort_values('日付')
# Excel出力
df.to_excel(OUTPUT_FILENAME, index=False)
print(f"\n全処理完了。ファイル {OUTPUT_FILENAME} を保存しました。")
files.download(OUTPUT_FILENAME)
else:
print("指定された期間内に記事が見つかりませんでした。")
投資ブログの記事の取得 (その9)
某大物投資家の全てのブログ記事 (https://plaza.rakuten.co.jp/tachanfund/) の内容を一括で取得するコードです。
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import re
from datetime import datetime
from google.colab import files
from tqdm import tqdm
# --- 設定 ---
BLOG_USER_ID = "tachanfund"
BASE_URL = f"https://plaza.rakuten.co.jp/{BLOG_USER_ID}/"
DIARY_ALL_URL = f"{BASE_URL}diaryall/"
# 正しいURLのテンプレート
ARTICLE_URL_TEMPLATE = f"https://plaza.rakuten.co.jp/{BLOG_USER_ID}/diary/{{}}/"
START_DATE = datetime(2006, 3, 21)
END_DATE = datetime(2018, 12, 31)
OUTPUT_FILE = "tachanfund_fixed_complete.xlsx"
SLEEP_TIME = 2.0 # エラー回避のため
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
'Referer': BASE_URL
}
def clean_text(text):
if not text: return ""
text = re.sub(r'\n\s+\n', '\n\n', text)
return text.strip()
def get_article_content(url):
"""正しいURLからタイトルと本文を取得"""
try:
response = requests.get(url, headers=HEADERS, timeout=15)
if response.status_code != 200:
return "Error", f"Error: Status {response.status_code}"
response.encoding = response.apparent_encoding
soup = BeautifulSoup(response.content, 'html.parser')
# タイトル取得
# 楽天ブログの構造: <div class="dTitle"><h2>...</h2></div>
title_div = soup.find('div', class_='dTitle')
title = "No Title"
if title_div and title_div.find('h2'):
title = title_div.find('h2').text.strip()
# 本文取得
# 構造: <div class="dText"> or <div class="diary_body">
body_div = soup.find('div', class_='dText')
if not body_div:
body_div = soup.find('div', class_='diary_body')
if body_div:
# <br>を改行に変換
for br in body_div.find_all("br"):
br.replace_with("\n")
# スクリプトやスタイルを削除
for script in body_div(["script", "style", "iframe"]):
script.decompose()
return title, clean_text(body_div.get_text())
else:
return title, "Error: Body Not Found"
except Exception as e:
return "Error", f"Error: {e}"
def get_article_list_from_page(page_id):
"""一覧ページから記事IDを抽出し、正しいURLリストを生成"""
articles = []
if page_id == 0:
target_url = DIARY_ALL_URL
else:
target_url = f"{DIARY_ALL_URL}?PageId={page_id}"
try:
response = requests.get(target_url, headers=HEADERS, timeout=15)
if response.status_code != 200:
return None, False
response.encoding = response.apparent_encoding
soup = BeautifulSoup(response.text, 'html.parser')
# 記事リンクからID(12桁の数字)を抽出
links = soup.find_all('a', href=re.compile(r'/diary/\d{12}/'))
seen_ids = set()
stop_scanning = False
for link in links:
href = link.get('href')
# 正規表現でID(日付+連番)を抽出
# 例: /diary/200603210000/ -> 200603210000
match = re.search(r'/diary/(\d{12})/', href)
if not match:
continue
article_id = match.group(1)
if article_id in seen_ids:
continue
seen_ids.add(article_id)
# 日付チェック (IDの前半8桁が日付)
date_str = article_id[:8]
try:
article_date = datetime.strptime(date_str, '%Y%m%d')
# 期間判定
if article_date > END_DATE:
continue # まだ新しいので次へ
elif article_date < START_DATE:
stop_scanning = True # 期間より古いので終了
continue
# ★ここで正しいURLを強制生成する★
correct_url = ARTICLE_URL_TEMPLATE.format(article_id)
articles.append({
"Date": article_date.strftime('%Y-%m-%d'),
"URL": correct_url,
# タイトルは後で個別ページから正確に取るため仮置き
"Title": link.text.strip()
})
except ValueError:
continue
return articles, stop_scanning
except Exception as e:
print(f"List Error at page {page_id}: {e}")
return None, False
def main():
print(f"収集期間: {START_DATE.strftime('%Y-%m-%d')} ~ {END_DATE.strftime('%Y-%m-%d')}")
print("【フェーズ1】記事IDから正しいURLリストを生成中...")
all_articles = []
page_id = 0
max_pages = 1000 # 無限ループ防止
while page_id < max_pages:
if page_id % 10 == 0:
print(f"Scanning list page {page_id}...", end="\r")
articles, stop_scanning = get_article_list_from_page(page_id)
if articles is None:
break
# 期間内の記事が見つかった場合のみ追加
if articles:
all_articles.extend(articles)
# 期間外(古い)記事に到達したら終了
if stop_scanning:
print(f"\n指定期間より古い記事に到達しました。(Page {page_id})")
break
# 記事が全くない(最終ページを超えた)
if not articles and not stop_scanning:
# 念のためもう1ページ見るか判断、ここでは終了
# ただし、期間が飛んでいる場合もあるので、articlesが空でもstop_scanningがFalseなら進むべきだが
# 楽天ブログのdiaryallは連続しているので、空なら終了でOK
if page_id > 0 and len(articles) == 0:
# ただし最新の記事が期間より未来の場合は空で返ってくるので、
# stop_scanningがTrueになるまでは回す必要がある。
# ここでは「記事リンクが1つもなかった」場合のみbreak
soup_check = requests.get(f"{DIARY_ALL_URL}?PageId={page_id}", headers=HEADERS)
if 'class="dTitle"' not in soup_check.text:
print("\nこれ以上記事がありません。")
break
page_id += 1
time.sleep(1)
# 重複除去
unique_articles = []
seen = set()
for a in all_articles:
if a['URL'] not in seen:
seen.add(a['URL'])
unique_articles.append(a)
print(f"\n【フェーズ1完了】 {len(unique_articles)} 件の有効なURLを生成しました。")
print("-" * 50)
if not unique_articles:
print("記事が見つかりませんでした。")
return
# --- フェーズ2: 本文取得 ---
print("【フェーズ2】 各記事の本文を取得します。")
print(f"予想所要時間: 約 {len(unique_articles) * SLEEP_TIME / 60:.1f} 分")
final_data = []
try:
# tqdmで進捗表示
for item in tqdm(unique_articles):
title, body = get_article_content(item['URL'])
final_data.append({
"Date": item['Date'],
"Title": title, # 個別ページから取った正確なタイトル
"URL": item['URL'],
"Body": body
})
time.sleep(SLEEP_TIME)
except KeyboardInterrupt:
print("\n中断: 取得済みのデータのみ保存します。")
# DataFrame作成と保存
df = pd.DataFrame(final_data)
if not df.empty:
df = df.sort_values('Date')
df = df[['Date', 'Title', 'URL', 'Body']]
df.to_excel(OUTPUT_FILE, index=False)
print(f"\n全処理完了。ファイル {OUTPUT_FILE} を保存しました。")
files.download(OUTPUT_FILE)
else:
print("データが保存されませんでした。")
if __name__ == "__main__":
main()
投資ブログの記事の取得 (その10)
某大物投資家の全てのブログ記事 (https://aako01jp.blog.fc2.com/) の内容を一括で取得するコードです。
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
from datetime import datetime
import re
from google.colab import files
from tqdm import tqdm
# --- 設定 ---
BASE_URL = "https://aako01jp.blog.fc2.com/blog-entry-{}.html"
START_DATE = datetime(2012, 10, 18)
END_DATE = datetime(2023, 9, 24)
# IDの範囲を大幅に拡大 (ユーザー情報のID 417をカバーし、さらに余裕を持つ)
START_ID = 1
END_ID = 600
OUTPUT_FILE = "aako01jp_fc2_complete_v2.xlsx"
SLEEP_TIME = 1.0
# 偽装ヘッダー
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
}
def clean_text(text):
if not text: return ""
# 不要な空白除去
text = re.sub(r'\n\s+\n', '\n\n', text)
return text.strip()
def fetch_article(entry_id):
url = BASE_URL.format(entry_id)
try:
response = requests.get(url, headers=HEADERS, timeout=15)
# 404 (存在しないID) はスキップ
if response.status_code == 404:
return None
# 文字化け対策
response.encoding = response.apparent_encoding
soup = BeautifulSoup(response.text, 'html.parser')
# --- 日付取得 ---
# 構造: <p class="info">2012/10/18 (Thu)</p>
date_element = soup.find('p', class_='info')
if not date_element:
return None # 日付がない=有効な記事ページではない可能性
date_text_raw = date_element.text.strip()
date_text = date_text_raw.split('(')[0].strip()
try:
article_date = datetime.strptime(date_text, '%Y/%m/%d')
except ValueError:
return None
# --- タイトル取得 ---
# リンクを含むh2タグを探す
title = "No Title"
h2_tags = soup.find_all('h2')
for h2 in h2_tags:
link = h2.find('a')
if link and f'blog-entry-{entry_id}.html' in link.get('href', ''):
title = link.text.strip()
break
# --- 本文取得 ---
body_text = ""
content_div = soup.find('div', class_='kijibox')
if content_div:
# 不要要素の削除
remove_classes = [
'info', # 日付
'fc2_footer', # フッターボタン
'relate_dl', # 関連記事
'info-footer', # カテゴリリンクなど
'thanks_button_div'
]
for cls in remove_classes:
for tag in content_div.find_all(class_=cls):
tag.decompose()
for tag in content_div.find_all(['script', 'style', 'iframe']):
tag.decompose()
body_text = clean_text(content_div.get_text(separator='\n'))
return {
"date_obj": article_date, # 判定用
"Date": date_text, # 出力用
"Title": title,
"URL": url,
"Body": body_text
}
except Exception as e:
# エラー時はログを出してNoneを返す
print(f"Error at ID {entry_id}: {e}")
return None
def main():
print(f"収集設定: ID {START_ID} ~ {END_ID}")
print(f"期間フィルタ: {START_DATE.date()} ~ {END_DATE.date()}")
print("スクレイピングを開始します(広範囲スキャンモード)...")
collected_data = []
# tqdmで進捗を表示しながらループ
for i in tqdm(range(START_ID, END_ID + 1)):
article = fetch_article(i)
if article:
# 日付フィルタリング
# 範囲内であればリストに追加
if START_DATE <= article['date_obj'] <= END_DATE:
collected_data.append(article)
time.sleep(SLEEP_TIME)
if not collected_data:
print("\n指定期間内の記事が見つかりませんでした。")
return
# DataFrame作成
df = pd.DataFrame(collected_data)
# 日付順に並べ替え
df = df.sort_values('date_obj')
# 出力用カラムのみ抽出
df_output = df[['Date', 'Title', 'URL', 'Body']]
# Excel出力
df_output.to_excel(OUTPUT_FILE, index=False)
print(f"\n完了しました。合計 {len(df)} 件の記事を取得しました。")
print(f"ファイル '{OUTPUT_FILE}' をダウンロードします。")
files.download(OUTPUT_FILE)
if __name__ == "__main__":
main()
投資ブログの記事の取得 (その11)
某大物投資家の全てのブログ記事 (https://aako01jp.com/) の内容を一括で取得するコードです。
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
from datetime import datetime
from tqdm import tqdm
from google.colab import files
# --- 設定 ---
BASE_URL_TEMPLATE = "https://aako01jp.com/?p={}"
START_ID = 32
END_ID = 576
TARGET_START_DATE = datetime(2013, 11, 11)
TARGET_END_DATE = datetime(2023, 3, 19)
OUTPUT_FILE = "aako01jp_fixed_content.xlsx"
SLEEP_TIME = 2.0 # エラー回避のため少し待機時間を増やします
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
}
def clean_text(text):
if not text: return ""
return text.strip()
def fetch_article(post_id):
url = BASE_URL_TEMPLATE.format(post_id)
try:
# エラー発生時の再試行(リトライ)ロジック
response = None
for _ in range(3):
try:
response = requests.get(url, headers=HEADERS, timeout=15)
if response.status_code == 200:
break
elif response.status_code == 404:
return None # 存在しないページはスキップ
time.sleep(2) # リトライ待機
except:
time.sleep(2)
if not response or response.status_code != 200:
return None # 取得失敗
soup = BeautifulSoup(response.content, 'html.parser')
# --- 1. 日付の取得 (安全対策) ---
date_tag = soup.find('time', class_='entry-date')
if not date_tag:
return None # 日付がないページは無効とみなす
# .get() の前にタグの存在は確認済みだが、念のため安全に取得
datetime_str = date_tag.attrs.get('datetime') if hasattr(date_tag, 'attrs') else None
if not datetime_str:
return None
try:
article_date = datetime.fromisoformat(datetime_str).replace(tzinfo=None)
except ValueError:
return None
# --- 2. タイトルの取得 ---
title_tag = soup.find('h1', class_='entry-title')
title = title_tag.text.strip() if title_tag else "No Title"
# --- 3. 本文の取得 ---
body_text = ""
content_div = soup.find('div', class_='entry-content')
if content_div:
# 削除対象のタグをリストアップしてから削除(ループ中のエラー回避)
tags_to_remove = []
# スクリプトとスタイル
tags_to_remove.extend(content_div.find_all(['script', 'style']))
# 不要なdiv(広告ボタンなど)
for div in content_div.find_all('div'):
# class属性を安全に取得
classes = div.attrs.get('class', []) if hasattr(div, 'attrs') else []
if 'thanks_button_div' in classes or 'addthis_tool' in classes:
tags_to_remove.append(div)
# 一括削除
for tag in tags_to_remove:
tag.decompose()
# テキスト抽出
body_text = content_div.get_text(separator='\n', strip=True)
return {
"id": post_id,
"date": article_date,
"title": title,
"url": url,
"body": body_text
}
except Exception as e:
# エラーが出ても止まらずにログを出して次へ
print(f"Skipping ID {post_id}: {e}")
return None
def main():
print(f"収集設定: ID {START_ID} ~ {END_ID}")
print("処理を開始します(安全モード)...")
collected_data = []
for post_id in tqdm(range(START_ID, END_ID + 1)):
article = fetch_article(post_id)
if article:
check_date = article['date'].date()
start_date = TARGET_START_DATE.date()
end_date = TARGET_END_DATE.date()
if start_date <= check_date <= end_date:
collected_data.append(article)
time.sleep(SLEEP_TIME)
if not collected_data:
print("記事が見つかりませんでした。")
return
# DataFrame作成
df = pd.DataFrame(collected_data)
if not df.empty:
df = df.sort_values('date')
df_output = df[['date', 'title', 'url', 'body']]
df_output.columns = ['日付', 'タイトル', 'URL', '本文']
df_output.to_excel(OUTPUT_FILE, index=False)
print(f"\n完了しました。合計 {len(df)} 件の記事を取得しました。")
print(f"ファイル '{OUTPUT_FILE}' をダウンロードします。")
files.download(OUTPUT_FILE)
else:
print("有効なデータがありませんでした。")
if __name__ == "__main__":
main()
投資ブログの記事の取得 (その12)
某大物投資家の全てのブログ記事 (https://magic1821.blog.fc2.com/) の内容を一括で取得するコードです。
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
from datetime import datetime
from tqdm import tqdm
from google.colab import files
# --- 設定 ---
START_ID = 1
END_ID = 1058
BASE_URL_TEMPLATE = "https://magic1821.blog.fc2.com/blog-entry-{}.html"
# 取得対象期間
TARGET_START_DATE = datetime(2005, 10, 10)
TARGET_END_DATE = datetime(2025, 11, 29)
OUTPUT_FILE = "magic1821_full_content.xlsx"
SLEEP_TIME = 1.0 # サーバー負荷軽減のための待機時間(秒)
# 偽装ヘッダー
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
}
def clean_text(text):
"""テキストを整形"""
if not text:
return ""
return text.strip()
def fetch_article(post_id):
"""指定されたIDの記事情報(本文含む)を取得"""
url = BASE_URL_TEMPLATE.format(post_id)
try:
response = requests.get(url, headers=HEADERS, timeout=15)
if response.status_code != 200:
return None
response.encoding = response.apparent_encoding
soup = BeautifulSoup(response.text, 'html.parser')
# 1. 日付の取得
date_ul = soup.find('ul', class_='entry_date')
if not date_ul: return None
date_li = date_ul.find('li')
if not date_li: return None
date_text = date_li.text.strip()
try:
article_date = datetime.strptime(date_text, '%Y/%m/%d')
except ValueError:
return None
# 2. タイトルの取得
title_tag = soup.find('h2', class_='entry_header')
title = title_tag.text.strip() if title_tag else "No Title"
# 3. 本文の取得 (追加機能)
body_text = ""
# FC2ブログの本文エリア
body_div = soup.find('div', class_='entry_body')
if body_div:
# 不要な要素(広告、ソーシャルボタン、スクリプト)を削除
for junk in body_div.find_all(['script', 'style']):
junk.decompose()
# fc2_footer (拍手ボタンやツイートボタンのエリア) を削除
# このブログのHTML構造では entry_body の中にこれらが含まれています
for footer in body_div.find_all('div', class_='fc2_footer'):
footer.decompose()
# テキストを抽出(改行を維持)
body_text = body_div.get_text(separator='\n', strip=True)
return {
"id": post_id,
"date_obj": article_date,
"Date": date_text,
"Title": title,
"URL": url,
"Body": body_text
}
except Exception as e:
print(f"Error at ID {post_id}: {e}")
return None
def main():
print(f"収集設定: ID {START_ID} ~ {END_ID}")
print(f"期間指定: {TARGET_START_DATE.strftime('%Y-%m-%d')} ~ {TARGET_END_DATE.strftime('%Y-%m-%d')}")
print("処理を開始します(本文取得モード)...")
collected_data = []
# プログレスバーを表示しながらループ
for post_id in tqdm(range(START_ID, END_ID + 1)):
article = fetch_article(post_id)
if article:
# 指定期間内かチェック
if TARGET_START_DATE <= article['date_obj'] <= TARGET_END_DATE:
collected_data.append(article)
time.sleep(SLEEP_TIME)
if not collected_data:
print("記事が見つかりませんでした。")
return
# DataFrame作成
df = pd.DataFrame(collected_data)
# 日付順にソート
df = df.sort_values('date_obj')
# 出力用データの整理 (D列にBodyを追加)
df_output = df[['Date', 'Title', 'URL', 'Body']]
df_output.columns = ['日付', 'タイトル', 'URL', '本文']
# Excel出力
df_output.to_excel(OUTPUT_FILE, index=False)
print(f"\n完了しました。合計 {len(df)} 件の記事を取得しました。")
print(f"ファイル '{OUTPUT_FILE}' をダウンロードします。")
files.download(OUTPUT_FILE)
if __name__ == "__main__":
main()
投資ブログの記事の取得 (その13)
某大物投資家の全てのブログ記事 (http://itaru2.blog.fc2.com/) の内容を一括で取得するコードです。
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
from datetime import datetime
from tqdm import tqdm
from google.colab import files
# --- 設定 ---
START_ID = 1
END_ID = 180
BASE_URL_TEMPLATE = "http://itaru2.blog.fc2.com/blog-entry-{}.html"
# 取得対象期間
TARGET_START_DATE = datetime(2017, 5, 4)
TARGET_END_DATE = datetime(2025, 11, 28)
OUTPUT_FILE = "itaru2_blog_full_content.xlsx"
SLEEP_TIME = 1.0 # サーバー負荷軽減のための待機時間(秒)
# 偽装ヘッダー
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36',
}
def clean_text(text):
"""テキストを整形する"""
if not text:
return ""
return text.strip()
def fetch_article(post_id):
"""指定されたIDの記事情報(本文含む)を取得する"""
url = BASE_URL_TEMPLATE.format(post_id)
try:
response = requests.get(url, headers=HEADERS, timeout=15)
if response.status_code != 200:
return None
response.encoding = response.apparent_encoding
soup = BeautifulSoup(response.text, 'html.parser')
# 1. 日付の取得
date_ul = soup.find('ul', class_='entry_date')
if not date_ul: return None
date_li = date_ul.find('li')
if not date_li: return None
date_text = date_li.text.strip()
try:
article_date = datetime.strptime(date_text, '%Y/%m/%d')
except ValueError:
return None
# 2. タイトルの取得
title_tag = soup.find('h2', class_='entry_header')
title = title_tag.text.strip() if title_tag else "No Title"
# 3. 本文の取得 (追加機能)
body_text = ""
# FC2ブログの本文エリア
body_div = soup.find('div', class_='entry_body')
if body_div:
# 不要な要素(広告、ソーシャルボタン、スクリプト)を削除
for junk in body_div.find_all(['script', 'style']):
junk.decompose()
# fc2_footer (拍手ボタンやツイートボタンのエリア) を削除
for footer in body_div.find_all('div', class_='fc2_footer'):
footer.decompose()
# テキストを抽出(改行を維持)
# separator='\n' を使うことで、Excel上でも改行が見やすくなります
body_text = body_div.get_text(separator='\n', strip=True)
return {
"id": post_id,
"date_obj": article_date,
"Date": date_text,
"Title": title,
"URL": url,
"Body": body_text
}
except Exception as e:
print(f"Error at ID {post_id}: {e}")
return None
def main():
print(f"収集設定: ID {START_ID} ~ {END_ID}")
print(f"期間指定: {TARGET_START_DATE.strftime('%Y-%m-%d')} ~ {TARGET_END_DATE.strftime('%Y-%m-%d')}")
print("処理を開始します(本文取得モード)...")
collected_data = []
# プログレスバーを表示しながらループ
for post_id in tqdm(range(START_ID, END_ID + 1)):
article = fetch_article(post_id)
if article:
# 指定期間内かチェック
if TARGET_START_DATE <= article['date_obj'] <= TARGET_END_DATE:
collected_data.append(article)
time.sleep(SLEEP_TIME)
if not collected_data:
print("記事が見つかりませんでした。")
return
# DataFrame作成
df = pd.DataFrame(collected_data)
# 日付順にソート
df = df.sort_values('date_obj')
# 出力用データの整理 (D列にBodyを追加)
df_output = df[['Date', 'Title', 'URL', 'Body']]
df_output.columns = ['日付', 'タイトル', 'URL', '本文']
# Excel出力
df_output.to_excel(OUTPUT_FILE, index=False)
print(f"\n完了しました。合計 {len(df)} 件の記事を取得しました。")
print(f"ファイル '{OUTPUT_FILE}' をダウンロードします。")
files.download(OUTPUT_FILE)
if __name__ == "__main__":
main()
おまけ (ペアトレード用コード初期案)
#!/usr/bin/env python3
"""
pairs_trading_kabu_nonblocking.py
- ペアごとの "to_enter" は next_action_time を保持し、メインループで到達判定して再評価・発注する(ブロッキングなし)。
- REST polling での最小実装。実運用では Push API (WebSocket) に切替。
"""
import os
import time
import logging
from datetime import datetime, timedelta, time as dtime
from typing import List, Tuple, Dict
import requests
import pandas as pd
import numpy as np
# ----------------------------
# 環境 / 設定
# ----------------------------
KABU_REST_BASE = os.getenv("KABU_REST_BASE", "http://localhost:18080/kabusapi")
KABU_API_TOKEN = os.getenv("KABU_API_TOKEN", "")
KABU_API_PASSWORD = os.getenv("KABU_API_PASSWORD", "")
HEADERS = {"Content-Type": "application/json", "X-API-KEY": KABU_API_TOKEN}
TOP_N_PAIRS = 20
SIGMA_THRESHOLD = 2.0
DOLLAR_SIZE = 100000
WAIT_ONE_DAY = True
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("pairs_trader_nonblocking")
# ----------------------------
# kabu API wrappers (REST)
# ----------------------------
def kabu_get_quote(symbol: str, exchange: int = 1) -> dict:
url = f"{KABU_REST_BASE}/board/{symbol}@{exchange}"
try:
r = requests.get(url, headers=HEADERS, timeout=5)
if r.status_code != 200:
logger.debug(f"Quote status {r.status_code} for {symbol}")
return None
return r.json()
except Exception as e:
logger.debug(f"Quote fetch failed for {symbol}: {e}")
return None
def kabu_send_order(order: dict) -> dict:
url = f"{KABU_REST_BASE}/sendorder"
r = requests.post(url, headers=HEADERS, json=order, timeout=5)
r.raise_for_status()
return r.json()
# ----------------------------
# ユーティリティ(データ処理)
# ----------------------------
def normalize_series(price: pd.Series) -> pd.Series:
if price.iloc[0] == 0:
raise ValueError("Initial price is zero.")
return price / price.iloc[0]
def pair_distance(s1: pd.Series, s2: pd.Series) -> float:
common = s1.index.intersection(s2.index)
if len(common) == 0:
return float("inf")
diff = normalize_series(s1.loc[common]) - normalize_series(s2.loc[common])
return float(np.sum(diff.values ** 2))
def form_pairs(price_df: pd.DataFrame) -> List[Tuple[str, str, float]]:
tickers = list(price_df.columns)
pairs = []
for i in range(len(tickers)):
for j in range(i+1, len(tickers)):
ti, tj = tickers[i], tickers[j]
try:
d = pair_distance(price_df[ti], price_df[tj])
pairs.append((ti, tj, d))
except Exception:
continue
pairs_sorted = sorted(pairs, key=lambda x: x[2])
return pairs_sorted
def compute_pair_stats(pi: pd.Series, pj: pd.Series):
common = pi.index.intersection(pj.index)
si = normalize_series(pi.loc[common])
sj = normalize_series(pj.loc[common])
spread = si - sj
mu = float(spread.mean())
sigma = float(spread.std(ddof=0))
return mu, sigma
# ----------------------------
# 取引時間関係
# ----------------------------
def is_weekday(dt: datetime) -> bool:
return dt.weekday() < 5
def next_trading_open(now: datetime) -> datetime:
"""
簡易: 当日朝9:00前なら当日9:00を返す。そうでなければ次の平日9:00。
実運用では祝日判定ライブラリを使用(jpholiday等)。
"""
today = now.date()
morning_open = datetime.combine(today, dtime(hour=9, minute=0))
if is_weekday(now) and now < morning_open:
return morning_open
nd = now
while True:
nd = nd + timedelta(days=1)
if is_weekday(nd):
return datetime.combine(nd.date(), dtime(hour=9, minute=0))
# ----------------------------
# kabu sendorder enum / builder (簡易)
# ----------------------------
EXCHANGE_TSE = 1
SECURITY_TYPE_STOCK = 1
SIDE_BUY = "2"
SIDE_SELL = "1"
FRONT_ORDER_MARKET = 10
def build_order_kabu(password: str, symbol: str, side: str, qty: int,
cash_margin: int = 0, margin_trade_type: int = 0,
account_type: int = 4, front_order_type: int = FRONT_ORDER_MARKET) -> dict:
if side not in (SIDE_BUY, SIDE_SELL):
raise ValueError("Invalid side")
return {
"Password": password,
"Symbol": symbol,
"Exchange": EXCHANGE_TSE,
"SecurityType": SECURITY_TYPE_STOCK,
"Side": side,
"CashMargin": cash_margin,
"MarginTradeType": margin_trade_type,
"DelivType": 0,
"AccountType": account_type,
"Qty": qty,
"FrontOrderType": front_order_type,
"Price": 0,
"ExpireDay": 0
}
# ----------------------------
# PairTrader(非ブロッキング版)
# ----------------------------
class PairTrader:
def __init__(self, formation_df: pd.DataFrame, top_n: int = TOP_N_PAIRS):
self.formation_df = formation_df
self.top_n = top_n
self.pairs = form_pairs(formation_df)[:top_n]
self.stats = {} # (i,j) -> {mu,sigma,init_i,init_j}
self.positions = {} # (i,j) -> {"state":..., ...}
self.password = KABU_API_PASSWORD
self._init_stats()
def _init_stats(self):
for i, j, _ in self.pairs:
mu, sigma = compute_pair_stats(self.formation_df[i], self.formation_df[j])
init_i = float(self.formation_df[i].iloc[0])
init_j = float(self.formation_df[j].iloc[0])
self.stats[(i,j)] = {"mu": mu, "sigma": sigma, "init_i": init_i, "init_j": init_j}
self.positions[(i,j)] = {"state": "flat"}
logger.info(f"Pair {(i,j)} mu={mu:.6f} sigma={sigma:.6f}")
def compute_spread(self, pair, price_map):
i,j = pair
s = self.stats[pair]
pi = price_map.get(i)
pj = price_map.get(j)
if pi is None or pj is None:
raise ValueError("Missing price")
norm_i = float(pi) / s["init_i"]
norm_j = float(pj) / s["init_j"]
return norm_i - norm_j
def enter_pair(self, pair, direction, price_map):
i,j = pair
pi = float(price_map[i]); pj = float(price_map[j])
qty_i = max(1, int(DOLLAR_SIZE / pi)); qty_j = max(1, int(DOLLAR_SIZE / pj))
try:
if direction == "i_long":
# Buy i (現物)、Sell j (信用新規)
o_buy_i = build_order_kabu(self.password, i, SIDE_BUY, qty_i, cash_margin=0, margin_trade_type=0)
o_sell_j = build_order_kabu(self.password, j, SIDE_SELL, qty_j, cash_margin=2, margin_trade_type=2)
r1 = kabu_send_order(o_buy_i)
r2 = kabu_send_order(o_sell_j)
else:
o_sell_i = build_order_kabu(self.password, i, SIDE_SELL, qty_i, cash_margin=2, margin_trade_type=2)
o_buy_j = build_order_kabu(self.password, j, SIDE_BUY, qty_j, cash_margin=0, margin_trade_type=0)
r1 = kabu_send_order(o_sell_i)
r2 = kabu_send_order(o_buy_j)
logger.info(f"Entered pair {pair} dir={direction}")
self.positions[pair] = {"state": "open", "direction": direction, "entry_time": datetime.now(), "entry_z": None}
except Exception as e:
logger.exception(f"Entry failed for {pair}: {e}")
self.positions[pair] = {"state": "flat"}
def close_pair(self, pair, price_map):
pos = self.positions.get(pair, {})
if pos.get("state") != "open":
return
direction = pos.get("direction")
i,j = pair
pi = float(price_map[i]); pj = float(price_map[j])
qty_i = max(1, int(DOLLAR_SIZE / pi)); qty_j = max(1, int(DOLLAR_SIZE / pj))
try:
if direction == "i_long":
o_sell_i = build_order_kabu(self.password, i, SIDE_SELL, qty_i, cash_margin=0, margin_trade_type=0)
o_buy_j = build_order_kabu(self.password, j, SIDE_BUY, qty_j, cash_margin=3, margin_trade_type=2)
kabu_send_order(o_sell_i); kabu_send_order(o_buy_j)
else:
o_buy_i = build_order_kabu(self.password, i, SIDE_BUY, qty_i, cash_margin=3, margin_trade_type=2)
o_sell_j = build_order_kabu(self.password, j, SIDE_SELL, qty_j, cash_margin=0, margin_trade_type=0)
kabu_send_order(o_buy_i); kabu_send_order(o_sell_j)
logger.info(f"Closed pair {pair}")
self.positions[pair] = {"state": "flat"}
except Exception as e:
logger.exception(f"Close failed for {pair}: {e}")
# do not force-flat; in production handle partials/alerts
def process_pair_signal(self, pair, price_map, now):
i,j = pair
try:
spread = self.compute_spread(pair, price_map)
except Exception:
return
s = self.stats[pair]; mu, sigma = s["mu"], s["sigma"]
z = (spread - mu) / (sigma if sigma>0 else 1e-9)
state = self.positions[pair].get("state", "flat")
if state == "flat":
if abs(z) >= SIGMA_THRESHOLD:
direction = "i_long" if z < 0 else "i_short"
if WAIT_ONE_DAY:
next_open = next_trading_open(now)
self.positions[pair] = {
"state": "to_enter",
"direction": direction,
"pending_z": z,
"next_action_time": next_open,
"pending_time": now
}
logger.info(f"Pair {pair} -> to_enter until {next_open.isoformat()} (dir={direction}, z={z:.3f})")
else:
self.enter_pair(pair, direction, price_map)
elif state == "to_enter":
# non-blocking: check time, if due then re-evaluate and possibly enter
next_action = self.positions[pair].get("next_action_time")
if next_action is not None and now >= next_action:
# fetch fresh prices for re-eval
q_i = kabu_get_quote(i); q_j = kabu_get_quote(j)
if q_i is None or q_j is None:
logger.warning(f"Failed to fetch prices at pending execution for {pair}; cancel pending.")
self.positions[pair] = {"state": "flat"}
return
price_map_fresh = {i: q_i.get("CurrentPrice") or q_i.get("Close"),
j: q_j.get("CurrentPrice") or q_j.get("Close")}
try:
spread2 = self.compute_spread(pair, price_map_fresh)
except Exception:
logger.warning(f"Cannot compute spread at pending time for {pair}; cancel pending.")
self.positions[pair] = {"state": "flat"}
return
z2 = (spread2 - mu) / (sigma if sigma>0 else 1e-9)
direction = self.positions[pair]["direction"]
# require sign and magnitude still hold
if (direction == "i_long" and z2 < -SIGMA_THRESHOLD) or (direction == "i_short" and z2 > SIGMA_THRESHOLD):
logger.info(f"Pending entry valid for {pair} (z={z2:.3f}). Entering.")
self.enter_pair(pair, direction, price_map_fresh)
else:
logger.info(f"Pending entry invalid for {pair} (z={z2:.3f}). Cancel pending.")
self.positions[pair] = {"state": "flat"}
elif state == "open":
entry_z = self.positions[pair].get("entry_z")
# If entry_z not set, set baseline now (could be improved by saving entry z at entry)
if entry_z is None:
self.positions[pair]["entry_z"] = z
return
# crossing rule
if entry_z * z <= 0:
logger.info(f"Pair {pair} crossing detected (entry_z={entry_z:.3f}, z_now={z:.3f}), closing.")
self.close_pair(pair, price_map)
def is_market_open(self, now: datetime):
if now.weekday() >= 5: return False
t = now.time()
m_open = dtime(9,0); m_close = dtime(11,30); a_open = dtime(12,30); a_close = dtime(15,0)
return (m_open <= t <= m_close) or (a_open <= t <= a_close)
def monitoring_loop(self):
logger.info("Starting monitoring loop (non-blocking).")
while True:
now = datetime.now()
if not self.is_market_open(now):
time.sleep(30)
continue
# gather tickers
tickers = set()
for (i,j,_) in self.pairs:
tickers.add(i); tickers.add(j)
# fetch quotes (naive sequential; in production parallelize or use Push)
price_map = {}
for tk in tickers:
q = kabu_get_quote(tk)
price_map[tk] = None if q is None else (q.get("CurrentPrice") or q.get("Close") or None)
# process each pair (non-blocking)
for (i,j,_) in self.pairs:
self.process_pair_signal((i,j), price_map, now)
# small sleep for rate-limit
time.sleep(2)
# ----------------------------
# main
# ----------------------------
def main():
csv_path = "formation_prices.csv"
if not os.path.exists(csv_path):
logger.error("formation_prices.csv not found.")
return
df = pd.read_csv(csv_path, parse_dates=["date"], index_col="date")
df = df.dropna(axis=1, how="any")
if df.shape[1] < 2:
logger.error("Insufficient tickers.")
return
trader = PairTrader(df, top_n=TOP_N_PAIRS)
try:
trader.monitoring_loop()
except KeyboardInterrupt:
logger.info("Stopped by user.")
except Exception as e:
logger.exception(f"Fatal error: {e}")
if __name__ == "__main__":
main()