この度はAidemy Premiumの自然言語処理カリキュラム修了課題のため、映画の主人公が持つ手段や動機、目的などからシナリオの方向性を分類するプログラムを作ることにした。
ちなみに、映画好きなので「あわよくば面白そうな映画をたくさん発掘できたら嬉しいな」などとも思っていたが、そんな甘い考えを持つ余裕は割とすぐに消し飛んだ。
目次
目指す機能
学習用データセットの確保
いざ学習!しかし……
対策
実際にあらすじを分類する
反省と総括
参考にした記事
- <開発環境>
- Python 3.10.12
- Windows11
- Chrome
- Google Colaboratory
- T4 GPU
- cassava
目指す機能
映画のあらすじを入力することで、その作品のシナリオを以下の8種類のいずれかに分類させたい。
-
帰還型:意図せずして非日常的で危険な状況に巻き込まれてしまった主人公が、身の安全を取り戻したり「日常」に戻ったりしようとする物語。襲い来る脅威をすべて切り抜けて主人公が安全を確保できれば、観客も共に安堵感を覚える。
-
探求型:危険を冒してでも手に入れたいものがある主人公が、強い動機に基づいて「旅」に出る物語。安定した生活や身の安全よりも優先される目的を果たせた時、強い達成感を得られる。
-
変身型:主人公が心機一転し、その象徴として主人公の装いや名前、立場が変化する物語。主人公の選択がそのような変身を実現することで、観客の変身願望を満たすことができる。
-
半神型:人々から異端と見做されるほど強力で希少な能力や道具を手にしたことで、主人公の人間性が大きな影響を受けてしまう物語。人の身には余る、しかし万能ではない力をいかに支配するか、あるいは支配できないとどのような悲劇が起こるかが見どころ。
-
逆転型:社会やコミュニティー、業界から追放された、あるいは最底辺まで失墜した主人公が、再び界隈の中で身分を取り戻そうとする物語。名声や肩書といった外付けの要素が一度取り払われることで、その界隈で真に求められる技術や能力か、あるいはまったく新しい存在意義を生み出すことが求められる。
-
交流型:主要な複数の登場人物が互いに、相手の心に強く干渉しようとする物語。登場人物同士が相互に精神的勝利条件を満たし合うことで、多重に幸福感を得られる。
-
歯車型:職業や役割などを決められた主人公が、求められる技能を駆使して職務を全うできるかが問われる物語。社会の中に自分の居場所を創り出せるだけの能力を主人公が持っているか問われる。
-
解明型:主要キャラクターの目的や実態が不明確なままシナリオが進み、次第に真相が明らかになっていく物語。好奇心を刺激され、それが満たされることで大きなカタルシスを得られる。
学習用データセットの確保
Wikipediaの記事から「あらすじ」ないしは「ストーリー」の項目をスクレイピングし、辞書を作成する。
import random
import csv
from bs4 import BeautifulSoup
from urllib import request
from google.colab import files
# ヘッダーを修正
header = ['id', 'title','plot', 'label']
# Wikipedia内のテーブル内のリンクを取得
def process_link(link_soup, csv_writer, link_id):
# スクレイピング
link_content = link_soup.find('table', class_="wikitable sortable plainrowheaders jquery-tablesorter")
if link_content:
# テーブル内のすべてのリンクを抽出
links = link_content.find_all('a')
# 各リンクごとに処理
for i, link in enumerate(links):
href = link.get('href')
if href and href.startswith('/wiki/'): # Wikipedia内のリンクのみ処理
# リンク先のページを取得
link_url = 'https://ja.wikipedia.org' + href
link_response = request.urlopen(link_url)
link_soup = BeautifulSoup(link_response, 'html.parser')
link_response.close()
# 各リンクの処理を呼び出し
process_link_detail(link_soup, csv_writer, link_id, i + 1)
# あらすじに関わる箇所を抽出
def process_link_detail(link_soup, csv_writer, link_id, sublink_id):
# スクレイピング
link_content = link_soup.find('div', class_="mw-content-ltr mw-parser-output")
if link_content:
# divタグの中のテキストを取得
link_content_text = link_content.get_text(strip=True)
# タイトルを取得
link_title = link_soup.find('h1', class_="firstHeading mw-first-heading")
link_title_text = link_title.get_text(strip=True)
# "あらすじ"または"ストーリー"から始まる3000文字までの内容を抽出
if "あらすじ" in link_content_text:
start_index = link_content_text.find("あらすじ") + 8
else:
start_index = link_content_text.find("ストーリー") + 9
# キャスト、スタッフ、登場人物、キャラクターのいずれかが含まれていた場合、その単語の直前までのテキストを抽出
keywords = ["キャスト", "スタッフ", "登場人物", "キャラクター"]
end_index = len(link_content_text)
for keyword in keywords:
index = link_content_text.find(keyword)
if index != -1:
end_index = min(end_index, index)
plot_text = link_content_text[start_index:end_index]
# CSVに書き込むデータ
csv_data = [link_id, link_title_text, plot_text, 0]
# CSVファイルに書き込む
csv_writer.writerow(csv_data)
target_page_url = 'https://ja.wikipedia.org/wiki/%E8%88%88%E8%A1%8C%E5%8F%8E%E5%85%A5%E4%B8%8A%E4%BD%8D%E3%81%AE%E3%82%A2%E3%83%8B%E3%83%A1%E3%83%BC%E3%82%B7%E3%83%A7%E3%83%B3%E6%98%A0%E7%94%BB%E4%B8%80%E8%A6%A7'
# CSVファイルを書き込みモードで開く
csv_file_path = 'movie_data.csv'
with open(csv_file_path, 'w', newline='', encoding='utf-8') as csv_file:
# CSVライターを作成
csv_writer = csv.writer(csv_file)
# ヘッダーを書き込む
csv_writer.writerow(header)
# ページのソースコードを取得
base_response = request.urlopen(target_page_url)
base_soup = BeautifulSoup(base_response, 'html.parser')
base_response.close()
# すべてのリンクを抽出
all_links_1 = base_soup.find_all('a')
# 各リンクごとに処理
for i, link in enumerate(all_links_1):
href = link.get('href')
if href and href.startswith('/wiki/'): # Wikipedia内のリンクのみ処理
# ランダムなIDを生成
link_id = i + 1
# リンク先のページを取得
link_url = 'https://ja.wikipedia.org' + href
link_response = request.urlopen(link_url)
link_soup = BeautifulSoup(link_response, 'html.parser')
link_response.close()
# 各リンクの処理を呼び出し
process_link(link_soup, csv_writer, link_id)
# 映画作品の一覧のURL
page_urls = [
'https://ja.wikipedia.org/wiki/%E6%97%A5%E6%9C%AC%E3%81%AE%E6%98%A0%E7%94%BB%E4%BD%9C%E5%93%81%E4%B8%80%E8%A6%A7',
'https://ja.wikipedia.org/wiki/Category:%E6%97%A5%E6%9C%AC%E3%81%AE%E3%82%AA%E3%83%AA%E3%82%B8%E3%83%8A%E3%83%AB%E3%82%A2%E3%83%8B%E3%83%A1%E6%98%A0%E7%94%BB',
'https://ja.wikipedia.org/wiki/%E6%98%A0%E7%94%BB%E4%BD%9C%E5%93%81%E4%B8%80%E8%A6%A7'
]
num_links_list = [500, 200, 1000]
for page_url, num_links in zip(page_urls, num_links_list):
base_response = request.urlopen(page_url)
base_soup = BeautifulSoup(base_response, 'html.parser')
base_response.close()
all_links_2 = base_soup.select('a[href^="/wiki/"]')
random_links = random.sample(all_links_2, num_links)
for i, link in enumerate(random_links):
href = link.get('href')
if href:
link_id = i + 1
link_url = 'https://ja.wikipedia.org' + href
link_response = request.urlopen(link_url)
link_soup = BeautifulSoup(link_response, 'html.parser')
link_response.close()
process_link_detail(link_soup, csv_writer, link_id, i + 1)
# CSV ファイルに書き込むためのデータリスト
csv_data_list = []
# 2つのリストを結合
all_links = all_links_1 + random_links
# 1からの連番でIDを生成
for i, link in enumerate(all_links):
href = link.get('href')
if href:
# リンク先のページを取得
link_url = 'https://ja.wikipedia.org' + href
try:
# リンクが存在する場合のみページを取得
if not link_url.startswith("https://ja.wikipedia.org//"):
link_response = request.urlopen(link_url)
link_soup = BeautifulSoup(link_response, 'html.parser')
link_response.close()
# 1からの連番でIDを生成
link_id = i + 1
# 各リンクの処理を呼び出し
process_link_detail(link_soup, csv_writer, link_id, 0)
else:
print(f"Skipped link: {link_url}")
except Exception as e:
print(f"Error: {e} - Link: {link_url}")
# CSVファイルをダウンロード
files.download('movie_data.csv')
自動で取得したあらすじは、文字数が多過ぎたり少な過ぎたり、文章が破綻していたり、不必要な描写の記述が多かったり、シナリオとは関係ない役者名などが紛れ込んでいたりするので、下記のコードで大まかに整理。
import os
import urllib.request
import re
import csv
import tarfile
import tensorflow as tf
import numpy as np
import pandas as pd
from google.colab import files
# GPUが利用可能な場合はGPUを使用し、それ以外の場合はCPUを使用
if tf.test.is_gpu_available():
device = '/gpu:0'
else:
device = '/cpu:0'
# movie_data.csvファイルを読み込む
df = pd.read_csv("/hoge/Scenario_Style/movie_data .csv")
# idを1からの連番で上書き
df['id'] = range(1, len(df) + 1)
# 必要なカラムの選択
df = df[['id','title', 'plot','label']]
def normalize(text):
if isinstance(text, str):
# 記号や英数字大文字小文字を除く
brackets_tail = re.compile('【[^】]*】$')
brackets_head = re.compile('^【[^】]*】')
output = re.sub(brackets_head, '', re.sub(brackets_tail, '', text))
replaced_text = re.sub("[0-9a-zA-Z]+", "", output) # 入力のtextではなく、正規化したoutputを使用
# Wikipediaとウィキペディアを含む行を削除
if 'wikipedia' in replaced_text.lower() or 'ウィキペディア' in replaced_text or 'Box Office Mojo' in replaced_text:
return None
return replaced_text
else:
return str(text)
# df の plot カラムに、修正した normalize 関数を適用する
df['plot'] = df['plot'].apply(normalize)
# NaNを含む行を削除
df.dropna(subset=['plot'], inplace=True)
# 指定されたキーワードを含むtitleカラムの行を削除
keywords_to_remove = ["メインページ", "Wikipedia", "Box Office Mojo", "ファイル", "アニメ", "Category", "日本週末", "歴代興行"]
for keyword in keywords_to_remove:
df = df[~df['title'].str.contains(keyword)]
# 指定された文言を含むplotカラムの行を削除
df = df[~df['plot'].str.contains("参考文献や出典が全く示されていないか、不十分です")]
df = df[~df['plot'].str.contains("nan")]
あとは、350~800文字になるよう手作業で要約することになるのだが、ここで地獄を見ることになった。
このクレンジング作業がたいへんな労力を要求するものであり、総作業時間の99.99%はこれに費やされたのではないかというほどだった。
続いて、分類するクラスの特徴を定義し、BERTを用いて特徴量を分析する。
from transformers import BertModel, BertTokenizer
import torch
# BERTのトークナイザーとモデルを読み込む
tokenizer = BertTokenizer.from_pretrained('cl-tohoku/bert-base-japanese-whole-word-masking')
model = BertModel.from_pretrained('cl-tohoku/bert-base-japanese-whole-word-masking')
# 分類したい種類の対象と数を指定
fname_class_list = {
"demigod_type": [], #半神型
"transformation_type": [], #変身型
"extraordinary_type": [], #帰還型
"exploration_type": [], #探求型
"job_type": [], #歯車型
"reversal_type": [], #逆転型
"interaction_type": [], #交流型
"elucidation_type": [] #解明型
}
#fname_class_list辞書のすべてのキーをリスト化
target_genres = list(fname_class_list.keys())
# 各クラスの文章を定義
class_texts = {
"帰還型": "主人公が危険で未経験の非日常から安全で安定した日常生活に戻ることを目的にアクションを起こし、アクシデントを解決して危機的状況から脱出しようとする。異世界に迷い込む、世界や地球を危機から救う、自分の命を狙う者たちから逃れようとする物語など。",
"探求型": "強い動機に基づいて、安定した生活や進路を放棄してでも危険な旅に出て目的を達成しようとする物語。リスクを冒してでも達成したい目的や手に入れたいものがある、芸術の道に進むため会社を辞める、家族の病を治すために薬を探す旅に出る、自分の命を顧みず復讐を決行しようとする物語など。",
"変身型": "主人公の動機や心持ちの変化に伴い、主人公の装いや名前、立場が象徴的に変化する物語。主人公が自分の知識と技術の活かし方を変えて新しい立場で行動する、自分の立場に相応しい振舞いをする、ファッションに気を遣うことで仕事のパフォーマンスが上がる物語など。",
"半神型": "社会や世界の中でも希少で強力かつ異端の能力や道具を手にした主人公が過ぎたる力に振り回されつつ、人格的に成長できるか否かが問われる物語。魔法の道具を手に入れて世界を変えるため使う、超能力を手に入れて私利私欲のために使おうとする物語など。",
"歯車型": "職業という社会的な役割や立場によって機械的に設定される目的を達成しようと、主人公は求められる能力や技術、社会性を駆使する物語。医者が難病の患者を救う、探偵が事件を解決する、スパイが情報を盗む、部活動のチームが競技に励む物語など。",
"交流型": "複数の主要人物が互いの心理に強く干渉することを目的や手段とする物語。相思相愛になりたい男女がいる物語、友情を尊重する物語など。",
"逆転型": "業界やコミュニティーから追放された、あるいはその最底辺まで落ちぶれた主人公が、その界隈の中で栄光を手に入れたり返り咲いたりすることを目的とする物語。その業界の権威といえる強者を打倒してジャイアントキリングを達成する、あるスポーツの弱小校が全国大会で優勝する、体格に恵まれない主人公が体格差を覆す、スキャンダルに見舞われた料理人が再評価される物語など。",
"解明型": "主人公や主要人物の目的や手段が明らかでない状態から物語が始まり、シナリオの進行とともに主人公や主要人物の目的や手段、動機が明らかになっていく物語。"
}
# 各クラスの文章をBERTに入力し、特徴量を取得する
class_features = {}
for class_name, class_text in class_texts.items():
# テキストをトークン化
inputs = tokenizer(class_text, return_tensors="pt", padding=True, truncation=True)
# BERTモデルにテキストを入力し、特徴量を取得
with torch.no_grad():
outputs = model(**inputs)
# 最終層の特徴量を取得
last_hidden_states = outputs.last_hidden_state
# 平均プーリングして文章の特徴量を得る
mean_pooled = torch.mean(last_hidden_states, dim=1).squeeze()
class_features[class_name] = mean_pooled
# 各特徴量に対応するクラス名を定義
class_labels = {
"帰還型": 0,
"探求型": 1,
"変身型": 2,
"半神型": 3,
"歯車型": 4,
"交流型": 5,
"逆転型": 6,
"解明型": 7
}
# 各特徴量に対してラベルを付ける
labeled_features = []
for class_name, features in class_features.items():
# 特徴量に最も近いクラス名を見つける
closest_class = min(class_labels, key=lambda x: torch.dist(features, class_features[x]))
# クラス名に対応するラベルを取得
label = class_labels[closest_class]
# ラベル付きの特徴量を追加
labeled_features.append((features, label))
いざ学習!しかし……
どうにかデータを1000件余り確保できたので一度学習させてみたのだが、3回学習を繰り返させても30~40%ほどという惨憺たる精度でしか学習できなかった。
だが1000作品もの映画のあらすじを要約する作業があまりに辛過ぎて心が折れていたため、現実逃避する方向で考察を進めていると、一部の分類の出現頻度が少なかったことに思い至った。
そこでひとまず、出現数が最も多い帰還型と同数になるようそれぞれのラベルのデータをランダムで複製してオーバーサンプリング。
試しにこれを学習させてみたところ、今度はすべてのepochで精度が1.000(100%)に。
その数値の高さに一瞬浮かれそうになったが、明らかに過学習である。
さらに思案し、準備したデータセットのあらすじが学習用のデータとして複雑過ぎたかもしれないと推測。
どうにかして手っ取り早く対処できないかと(手作業のあらすじ作成作業にだけは何としても戻りたくないので)色々と調査し着目したのが、ノイズデータの追加とepoch数増加であった。
対策
ノイズデータ追加の効果は、既存のデータセットを利用してデータのバリエーションを増やせることと、過学習対策になること、エラーや外れ値への耐性が増すこと。
そしてepoch数についてだが、350~800文字のあらすじから文章の傾向や性質を読み取るというのはかなり複雑な分析となるため、epoch数が3というのは少な過ぎた可能性がある。
もちろんepoch数をただ増やせばいいという訳ではないので、損失関数を利用して過学習になっていないか確認する。
よって、以下の方針を採ることにした。
- ノイズデータの割合が10%、20%、30%の3パターンで、精度が最も高まるのがどれか検証。
- 損失関数を使い、過学習になる寸前の学習回数を模索する。
この条件で比較検証し、正解率が最も高く、なおかつ訓練損失とバリデーション損失の差が小さくなったモデルを、epoch数を増やして訓練の精度を向上させる。
ということで、まずはデータセットにノイズを加える。
# ノイズを加える関数
def random_word_replacement(text, replacement_rate=0.1):
words = text.split()
num_replacements = int(len(words) * replacement_rate)
for _ in range(num_replacements):
idx = random.randint(0, len(words) - 1)
words[idx] = 'ノイズ'
return ' '.join(words)
# ラベルの出現回数を計測
label_counts = df['label'].value_counts().sort_index()
print("Original label counts:\n", label_counts)
# 最も出現回数が多いラベルの出現回数を取得
max_count = label_counts.max()
print("Max label count:", max_count)
# 出現回数を最も多いラベルの数に合わせて複製する
balanced_df = pd.DataFrame()
for label in label_counts.index:
label_df = df[df['label'] == label]
count = len(label_df)
if count < max_count:
additional_samples = label_df.sample(max_count - count, replace=True, random_state=1)
label_df = pd.concat([label_df, additional_samples])
balanced_df = pd.concat([balanced_df, label_df])
# データフレームの列名を表示して確認
print(balanced_df.columns)
final_df = pd.DataFrame()
# 各ラベルの10%をノイズデータにする
for label in range(8):
label_df = balanced_df[balanced_df['label'] == label]
num_noisy_samples = int(len(label_df) * 0.1) # ここでノイズ割合を変更
noisy_samples = label_df.sample(num_noisy_samples, replace=False, random_state=1).copy()
noisy_samples['plot'] = noisy_samples['plot'].apply(lambda x: random_word_replacement(x))
final_df = pd.concat([final_df, label_df, noisy_samples])
そしてノイズデータを混ぜる割合ごとに分類機でトレーニング。
# データのシャッフル
df = df.sample(frac=1, random_state=42).reset_index(drop=True)
# 訓練用と検証用にデータを分割(80%訓練用、20%検証用)
train_size = int(0.8 * len(df))
train_df = df[:train_size]
val_df = df[train_size:]
# BERTトークナイザーの読み込み
tokenizer = BertTokenizer.from_pretrained('cl-tohoku/bert-base-japanese-whole-word-masking')
# データセットクラスの定義
class StyleDataset(Dataset):
def __init__(self, dataframe, tokenizer, max_length=512):
self.data = dataframe
self.tokenizer = tokenizer
self.max_length = max_length
def __len__(self):
return len(self.data)
def __getitem__(self, index):
text = self.data.iloc[index]['plot']
label = self.data.iloc[index]['label']
encoding = self.tokenizer.encode_plus(
text,
add_special_tokens=True,
max_length=self.max_length,
padding='max_length',
truncation=True,
return_attention_mask=True,
return_tensors='pt'
)
return {
'input_ids': encoding['input_ids'].flatten(),
'attention_mask': encoding['attention_mask'].flatten(),
'labels': torch.tensor(label, dtype=torch.long)
}
# バッチサイズ、最大シーケンス長を指定
batch_size = 4
max_length = 512
# データセットのインスタンス化
train_dataset = StyleDataset(train_df, tokenizer, max_length=max_length)
val_dataset = StyleDataset(val_df, tokenizer, max_length=max_length)
# データローダーの定義
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
# BERTモデルの読み込み
model = BertForSequenceClassification.from_pretrained('cl-tohoku/bert-base-japanese-whole-word-masking', num_labels=8)
# GPUが利用可能であればGPUを使用
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
# 最適化関数、学習率の設定
learning_rate = 2e-6
optimizer = AdamW(model.parameters(), lr=learning_rate)
# 学習
num_epochs = 5
for epoch in range(num_epochs):
model.train()
for batch in train_loader:
input_ids = batch['input_ids'].to(device)
attention_mask = batch['attention_mask'].to(device)
labels = batch['labels'].to(device)
outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
loss = outputs.loss
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 検証
model.eval()
val_acc = 0
total_val_samples = 0
for batch in val_loader:
input_ids = batch['input_ids'].to(device)
attention_mask = batch['attention_mask'].to(device)
labels = batch['labels'].to(device)
with torch.no_grad():
outputs = model(input_ids, attention_mask=attention_mask)
_, predicted = torch.max(outputs.logits, dim=1)
val_acc += torch.sum(predicted == labels).item()
total_val_samples += labels.size(0)
print(f'Epoch {epoch+1}/{num_epochs}, Validation Accuracy: {val_acc/total_val_samples:.4f}')
ノイズ10%のパターンは5回目でバリデーション損失が訓練損失を上回ったが、これは過学習であることを示す。
ノイズ20%と30%では、学習回数5回では差があまり見られなかったので、今回は20%の方を採用。
次に、epoch数を増やしたことで過学習の兆候が見られそうになった場合は自動で訓練を中止するコードを組み込み、精度が最高となる学習回数を特定する。
# データのシャッフル
df = df.sample(frac=1, random_state=42).reset_index(drop=True)
# 訓練用と検証用にデータを分割(80%訓練用、20%検証用)
train_size = int(0.8 * len(df))
train_df = df[:train_size]
val_df = df[train_size:]
# BERTの読み込み
tokenizer = BertTokenizer.from_pretrained('cl-tohoku/bert-base-japanese-whole-word-masking')
# データセットクラスの定義
class StyleDataset(Dataset):
def __init__(self, dataframe, tokenizer, max_length=512):
self.data = dataframe
self.tokenizer = tokenizer
self.max_length = max_length
def __len__(self):
return len(self.data)
def __getitem__(self, index):
text = self.data.iloc[index]['plot']
label = self.data.iloc[index]['label']
encoding = self.tokenizer.encode_plus(
text,
add_special_tokens=True,
max_length=self.max_length,
padding='max_length',
truncation=True,
return_attention_mask=True,
return_tensors='pt'
)
return {
'input_ids': encoding['input_ids'].flatten(),
'attention_mask': encoding['attention_mask'].flatten(),
'labels': torch.tensor(label, dtype=torch.long)
}
# バッチサイズ、最大シーケンス長を指定
batch_size = 4
max_length = 512
# データセットのインスタンス化
train_dataset = StyleDataset(train_df, tokenizer, max_length=max_length)
val_dataset = StyleDataset(val_df, tokenizer, max_length=max_length)
# データローダーの定義
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
# BERTの読み込み
model = BertForSequenceClassification.from_pretrained('cl-tohoku/bert-base-japanese-whole-word-masking', num_labels=8)
# GPUが利用可能であればGPUを使用
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
# 最適化関数、学習率の設定
learning_rate = 2e-6
optimizer = AdamW(model.parameters(), lr=learning_rate)
# 早期停止クラスの定義
class EarlyStopping:
def __init__(self, patience=3, verbose=False, path='checkpoint.pth'):
self.patience = patience
self.verbose = verbose
self.counter = 0
self.best_loss = np.inf
self.early_stop = False
self.path = path
def __call__(self, val_loss, model):
if val_loss < self.best_loss:
self.best_loss = val_loss
self.counter = 0
torch.save(model.state_dict(), self.path)
if self.verbose:
print(f"Validation loss improved. Saving model to {self.path}")
else:
self.counter += 1
if self.counter >= self.patience:
self.early_stop = True
if self.verbose:
print("Early stopping triggered")
# 学習ループ内
early_stopping = EarlyStopping(patience=3, verbose=True, path='/hoge/Scenario_Style/best_scenario_analyst.pth')
# 早期停止の設定(3エポック連続でバリデーション損失が改善しない場合に停止)
early_stopping = EarlyStopping(patience=3, verbose=True)
# 学習と検証損失の記録用リスト
train_losses = []
val_losses = []
# 学習
num_epochs = 20
for epoch in range(num_epochs):
model.train()
epoch_train_loss = 0
for batch in train_loader:
input_ids = batch['input_ids'].to(device)
attention_mask = batch['attention_mask'].to(device)
labels = batch['labels'].to(device)
outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
loss = outputs.loss
optimizer.zero_grad()
loss.backward()
optimizer.step()
epoch_train_loss += loss.item()
# 訓練損失を記録
avg_train_loss = epoch_train_loss / len(train_loader)
train_losses.append(avg_train_loss)
# 検証
model.eval()
epoch_val_loss = 0
val_acc = 0
total_val_samples = 0
with torch.no_grad():
for batch in val_loader:
input_ids = batch['input_ids'].to(device)
attention_mask = batch['attention_mask'].to(device)
labels = batch['labels'].to(device)
outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
loss = outputs.loss
epoch_val_loss += loss.item()
_, predicted = torch.max(outputs.logits, dim=1)
val_acc += torch.sum(predicted == labels).item()
total_val_samples += labels.size(0)
# 検証損失を記録
avg_val_loss = epoch_val_loss / len(val_loader)
val_losses.append(avg_val_loss)
# 精度の表示
val_accuracy = val_acc / total_val_samples
print(f'Epoch {epoch+1}/{num_epochs}, Training Loss: {avg_train_loss:.4f}, Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')
# 早期停止のチェック
early_stopping(avg_val_loss, model)
if early_stopping.early_stop:
print("Early stopping at epoch:", epoch + 1)
break
# 学習曲線をプロット
plt.figure(figsize=(12, 6))
plt.plot(range(len(train_losses)), train_losses, label='Training Loss')
plt.plot(range(len(val_losses)), val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Loss')
plt.show()
7回学習した時点でバリデーション損失が訓練損失を上回り、13回目以降はバリデーション損失が増えて精度が落ちていった。そして15回目は過学習の兆候が見られ始めた回数となるので、epoch=13でモデルを準備することに。
以下が、他のパラメータも調整した決定版。
# データのシャッフル
df = df.sample(frac=1, random_state=42).reset_index(drop=True)
# 訓練用と検証用にデータを分割(80%訓練用、20%検証用)
train_size = int(0.8 * len(df))
train_df = df[:train_size]
val_df = df[train_size:]
# BERTトークナイザーの読み込み
tokenizer = BertTokenizer.from_pretrained('cl-tohoku/bert-base-japanese-whole-word-masking')
# データセットクラスの定義
class StyleDataset(Dataset):
def __init__(self, dataframe, tokenizer, max_length=512):
self.data = dataframe
self.tokenizer = tokenizer
self.max_length = max_length
def __len__(self):
return len(self.data)
def __getitem__(self, index):
text = self.data.iloc[index]['plot']
label = self.data.iloc[index]['label']
encoding = self.tokenizer.encode_plus(
text,
add_special_tokens=True,
max_length=self.max_length,
padding='max_length',
truncation=True,
return_attention_mask=True,
return_tensors='pt'
)
return {
'input_ids': encoding['input_ids'].flatten(),
'attention_mask': encoding['attention_mask'].flatten(),
'labels': torch.tensor(label, dtype=torch.long)
}
# バッチサイズ、最大シーケンス長を指定
batch_size = 8
max_length = 512
# データセットのインスタンス化
train_dataset = StyleDataset(train_df, tokenizer, max_length=max_length)
val_dataset = StyleDataset(val_df, tokenizer, max_length=max_length)
# データローダーの定義
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
# BERTモデルの読み込み
model = BertForSequenceClassification.from_pretrained('cl-tohoku/bert-base-japanese-whole-word-masking', num_labels=8)
# GPUが利用可能であればGPUを使用
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
# 最適化関数、学習率の設定
learning_rate = 2e-6
optimizer = AdamW(model.parameters(), lr=learning_rate)
# 学習
num_epochs = 13 # 確定した最適なepoch数
for epoch in range(num_epochs):
model.train()
for batch in train_loader:
input_ids = batch['input_ids'].to(device)
attention_mask = batch['attention_mask'].to(device)
labels = batch['labels'].to(device)
outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
loss = outputs.loss
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 検証
model.eval()
val_acc = 0
total_val_samples = 0
for batch in val_loader:
input_ids = batch['input_ids'].to(device)
attention_mask = batch['attention_mask'].to(device)
labels = batch['labels'].to(device)
with torch.no_grad():
outputs = model(input_ids, attention_mask=attention_mask)
_, predicted = torch.max(outputs.logits, dim=1)
val_acc += torch.sum(predicted == labels).item()
total_val_samples += labels.size(0)
print(f'Epoch {epoch+1}/{num_epochs}, Validation Accuracy: {val_acc/total_val_samples:.4f}')
# モデルの保存
model_save_path = "/hoge/Scenario_Style/best_model.pth"
model.save_pretrained(model_save_path)
tokenizer.save_pretrained(model_save_path)
実際にあらすじを分類する
映画のあらすじを入力し、それがどの型に分類されるか検証。
# モデルとトークナイザーのロード
model_save_path = "/hoge/Scenario_Style/best_model.pth"
# BERTトークナイザーの読み込み
tokenizer = BertTokenizer.from_pretrained('cl-tohoku/bert-base-japanese-whole-word-masking')
# モデルの定義
model = BertForSequenceClassification.from_pretrained('cl-tohoku/bert-base-japanese-whole-word-masking', num_labels=8)
# モデルの状態をCPUにマッピングしてロード
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.load_state_dict(torch.load(model_save_path, map_location=device))
# モデルをデバイスに移動
model.to(device)
# テキストデータを分類する関数の定義
def classify_text(text, model, tokenizer, device):
model.eval()
inputs = tokenizer.encode_plus(
text,
add_special_tokens=True,
max_length=512,
padding='max_length',
truncation=True,
return_attention_mask=True,
return_tensors='pt'
)
input_ids = inputs['input_ids'].to(device)
attention_mask = inputs['attention_mask'].to(device)
with torch.no_grad():
outputs = model(input_ids, attention_mask=attention_mask)
_, predicted = torch.max(outputs.logits, dim=1)
return predicted.item()
# クラス名の辞書
class_names = {
0: "帰還型", 1: "探求型", 2: "変身型", 3: "半神型",
4: "歯車型", 5: "交流型", 6: "逆転型", 7: "解明型"
}
# 自分が入力した文章を分類する
input_text = input("あらすじを入力してください: ")
predicted_class = classify_text(input_text, model, tokenizer, device)
predicted_class_name = class_names.get(predicted_class, "分類不可")
print(f'分類結果: {predicted_class_name}')
コードを実行すると以下のようにボックスが表示されるので、ここにあらすじを入力。
試しに、好きな映画『シェフ 三ツ星フードトラック始めました』のあらすじ(「有名レストランのシェフである主人公はトラブルを起こして職場を解雇されるが、心機一転し開業したフードトラックがたちまち繁盛する。」)を入力してみる。
見事予想通りの結果が出た!
反省と総括
準備すべきデータセットの内容が複雑過ぎた上、教師データのラベリング自体が難しく作業に時間がかかり、その割に用意できたデータの数が少なかったせいで訓練の精度が低く、オーバーサンプリングで補おうとしたことで過学習が起きる惨劇に部屋で一人呻き声を上げる日々であった。
データセットを準備する労力が非常に大きかったのもそうだが、データの質を確保するためにも、やはり設計段階でデータの入手手段も詳細に設定しておくことが重要であると言える。
次にAIの実装で学習用データセットが必要となる機会があれば、もっとシンプルなデータを大量に用意できるようにする。