まえがき
データサイエンティストのためのPython100本ノックについての記事です。NumPyやPandasといったデータサイエンス系でよく用いられるライブラリを扱います。誤りなどがあれば、ご指摘ください。今回はpandasを中心に10問扱います。
Q.36
項目 | 内容 |
---|---|
概要 | 行インデックスの連続性・重複・ギャップ(欠番)を解析し、修復提案を行うクラス。インデックス操作の深い理解が必要。 |
問題文 |
IndexGapAnalyzer() を設計し、analyze(df) で Int64Index に対する①欠番の検出、②重複インデックスの存在、③最小・最大の範囲を解析して辞書で返せ。suggest_repair(df) で reset_index() または reindex() 案を提案せよ。 |
要件 |
Index 型検査/範囲抽出/欠番確認/修復案提案 |
発展仕様 | インデックス型不一致で例外/操作ログ出力/RangeIndex への変換案提案/連続かつユニークな場合は何も不要 |
使用構文 |
df.index , is_unique , is_monotonic_increasing , reindex , reset_index , loguru.logger , ValueError , pd.RangeIndex
|
A.36
■ 模範解答
import pandas as pd
from loguru import logger
class IndexGapAnalyzer:
def __init__(self):
logger.info("IndexGapAnalyzer initialized.")
def analyze(self, df: pd.DataFrame) -> dict:
# Int64Index型チェック
if not isinstance(df.index, pd.Int64Index):
logger.error(f"Index must be Int64Index, got {type(df.index)}")
raise ValueError("Index must be Int64Index.")
idx = df.index
logger.info(f"Analyzing Int64Index: {idx}")
# 欠番検出
min_idx, max_idx = idx.min(), idx.max()
full_range = set(range(min_idx, max_idx + 1))
actual = set(idx)
missing = sorted(full_range - actual)
# 重複インデックス検出
duplicates = idx[idx.duplicated()].unique().tolist()
# 連続性・単調性
monotonic = idx.is_monotonic_increasing
unique = idx.is_unique
logger.info(f"Index min={min_idx}, max={max_idx}, unique={unique}, monotonic={monotonic}, missing={missing}, duplicates={duplicates}")
return {
"min": min_idx,
"max": max_idx,
"missing_indices": missing,
"n_missing": len(missing),
"has_duplicates": len(duplicates) > 0,
"duplicate_indices": duplicates,
"is_monotonic_increasing": monotonic,
"is_unique": unique,
}
def suggest_repair(self, df: pd.DataFrame) -> str:
# Int64Index型チェック
if not isinstance(df.index, pd.Int64Index):
logger.error(f"Index must be Int64Index, got {type(df.index)}")
raise ValueError("Index must be Int64Index.")
idx = df.index
# 欠番・重複・非単調性などに応じて案を提示
analysis = self.analyze(df)
msg = []
if not analysis["is_unique"]:
msg.append("reset_index(drop=True)推奨(重複indexあり)")
if analysis["n_missing"] > 0:
msg.append(f"reindex(range({analysis['min']},{analysis['max']+1}))でギャップ補間推奨")
if not analysis["is_monotonic_increasing"]:
msg.append("sort_index()またはreset_index(drop=True)で単調性確保を推奨")
# 完全に連番ならRangeIndex化も可能
if analysis["is_unique"] and analysis["n_missing"] == 0 and analysis["is_monotonic_increasing"]:
msg.append("現状でRangeIndex化可能(reset_index(drop=True)で最適化)")
if not msg:
msg.append("インデックスは整合しています。修復不要です。")
logger.info("Suggest repair: " + "/".join(msg))
return "/".join(msg)
実行例1:欠番・重複・ギャップあり
df = pd.DataFrame({"val": [1,2,3,4,5,6]}, index=[0, 1, 2, 2, 4, 6])
analyzer = IndexGapAnalyzer()
print(analyzer.analyze(df))
print(analyzer.suggest_repair(df))
実行結果1
{'min': 0, 'max': 6, 'missing_indices': [3, 5], 'n_missing': 2,
'has_duplicates': True, 'duplicate_indices': [2],
'is_monotonic_increasing': True, 'is_unique': False}
reset_index(drop=True)推奨(重複indexあり)/reindex(range(0,7))でギャップ補間推奨
実行ログ1
INFO IndexGapAnalyzer initialized.
INFO Analyzing Int64Index: Int64Index([0, 1, 2, 2, 4, 6], dtype='int64')
INFO Index min=0, max=6, unique=False, monotonic=True, missing=[3, 5], duplicates=[2]
INFO Suggest repair: reset_index(drop=True)推奨(重複indexあり)/reindex(range(0,7))でギャップ補間推奨
実行例2:連続・ユニークなRangeIndex化可能パターン
df2 = pd.DataFrame({"val": [1,2,3]}, index=[5,6,7])
analyzer = IndexGapAnalyzer()
print(analyzer.analyze(df2))
print(analyzer.suggest_repair(df2))
実行結果2
{'min': 5, 'max': 7, 'missing_indices': [],
'n_missing': 0, 'has_duplicates': False, 'duplicate_indices': [],
'is_monotonic_increasing': True, 'is_unique': True}
現状でRangeIndex化可能(reset_index(drop=True)で最適化)
実行ログ2
INFO IndexGapAnalyzer initialized.
INFO Analyzing Int64Index: Int64Index([5, 6, 7], dtype='int64')
INFO Index min=5, max=7, unique=True, monotonic=True, missing=[], duplicates=[]
INFO Suggest repair: 現状でRangeIndex化可能(reset_index(drop=True)で最適化)
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
pd.Int64Index |
整数型インデックス(DataFrameの.index) |
idx.duplicated() |
重複index抽出 |
idx.is_monotonic_increasing |
indexが昇順かどうか判定 |
set(range(min,max+1)) |
欠番検知。すべての連番の集合と実際のインデックスの差でギャップを算出 |
reset_index(drop=True) |
indexを連番で振り直す推奨案(重複や非連続時など) |
reindex(range(min,max+1)) |
欠番補間の推奨案 |
loguru.logger.info/error |
進捗・異常・解析内容・修復案をすべて記録 |
Q.37
項目 | 内容 |
---|---|
概要 | 時系列データに対して、インデックスの頻度(例:日次・月次)を推定するクラスを設計する。タイムスタンプの間隔に基づき自動分類を行う。 |
問題文 |
FrequencyInferer() クラスを設計し、infer(df: pd.DataFrame) により、DatetimeIndex の間隔差に基づいて "daily" , "monthly" , "hourly" , "irregular" などの頻度を返す処理を実装せよ。非DatetimeIndexの処理やエラー時にはログ出力すること。 |
要件 | DatetimeIndexの差分計算/頻度分類 |
発展仕様 | 差分に基づく誤差範囲でのマッチ/最頻値推定/非正規パターンの警告/pd.infer_freq併用 |
使用構文 |
df.index , pd.infer_freq , df.index.to_series().diff() , np.timedelta64 , loguru.logger , ValueError
|
A.37
■ 模範解答
import pandas as pd
import numpy as np
from loguru import logger
from collections import Counter
class FrequencyInferer:
def __init__(self):
logger.info("FrequencyInferer initialized.")
def infer(self, df: pd.DataFrame) -> str:
# 1. DatetimeIndex型チェック
if not isinstance(df.index, pd.DatetimeIndex):
logger.error(f"Index must be DatetimeIndex, got {type(df.index)}")
raise ValueError("Index must be DatetimeIndex.")
idx = df.index
n = len(idx)
logger.info(f"Inferring frequency for {n} timestamps")
# 2. infer_freqの直接利用(規則的な時系列はこの方が堅牢)
freq_str = pd.infer_freq(idx)
if freq_str:
logger.info(f"pandas.infer_freq推定: {freq_str}")
# 主要なfreqコード→ラベルへ変換
freq_map = {
"D": "daily", "H": "hourly", "T": "minutely",
"M": "monthly", "MS": "monthly", "Y": "yearly",
"Q": "quarterly", "W": "weekly"
}
base = freq_str.rstrip("S") # e.g., "MS"→"M"
label = freq_map.get(base, freq_str)
logger.info(f"判定: {label}")
return label
# 3. 差分の絶対値を計算し、最頻値(mode)で判定
delta = idx.to_series().diff().dropna()
# 代表値(最頻値)を秒数で計算
deltas_sec = delta.dt.total_seconds()
mode_sec = deltas_sec.mode().iloc[0] if not deltas_sec.empty else None
logger.info(f"mode of interval (seconds): {mode_sec}")
# 4. 閾値・誤差付きで分類
if mode_sec is None:
logger.warning("時系列の間隔が算出できませんでした。irregular判定。")
return "irregular"
# 5. 間隔→ラベル分類
def classify_interval(sec):
if np.isclose(sec, 86400, atol=60): # 約1日(±1分)
return "daily"
elif np.isclose(sec, 3600, atol=10): # 約1時間
return "hourly"
elif np.isclose(sec, 60, atol=2): # 約1分
return "minutely"
elif np.isclose(sec, 2629746, atol=3600*24*3): # 月:平均30.44日
return "monthly"
elif np.isclose(sec, 604800, atol=60): # 1週
return "weekly"
elif np.isclose(sec, 31556952, atol=3600*24*30): # 年:平均365.24日
return "yearly"
return "irregular"
freq_label = classify_interval(mode_sec)
# 6. 不規則性警告
n_unique = deltas_sec.nunique()
if n_unique > 2: # 2より大きい=規則的でない
logger.warning(f"Interval is not uniform! Unique intervals: {n_unique}. Irregular時系列の可能性")
freq_label = f"irregular ({freq_label})"
logger.info(f"Frequency inferred: {freq_label}")
return freq_label
実行例1:日次規則時系列
df = pd.DataFrame({"val": [1,2,3,4]}, index=pd.date_range("2024-01-01", periods=4, freq="D"))
freq_inferer = FrequencyInferer()
print(freq_inferer.infer(df))
実行結果1
daily
実行ログ1
INFO FrequencyInferer initialized.
INFO Inferring frequency for 4 timestamps
INFO pandas.infer_freq推定: D
INFO 判定: daily
実行例2:不規則(日・月が混在)
df2 = pd.DataFrame({"val": [1,2,3]}, index=pd.to_datetime(["2024-01-01", "2024-01-02", "2024-02-01"]))
freq_inferer = FrequencyInferer()
print(freq_inferer.infer(df2))
実行結果2
irregular (monthly)
実行ログ2
INFO FrequencyInferer initialized.
INFO Inferring frequency for 3 timestamps
INFO mode of interval (seconds): 2678400.0
WARNING Interval is not uniform! Unique intervals: 2. Irregular時系列の可能性
INFO Frequency inferred: irregular (monthly)
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
pd.infer_freq(idx) |
インデックスの規則性から自動で頻度を推定(D=日次/H=時/H=月など) |
idx.to_series().diff() |
隣接する時刻差をシリーズで取得 |
delta.dt.total_seconds() |
差分を秒単位の数値配列として扱う |
mode().iloc[0] |
最頻値で代表間隔を特定 |
np.isclose(a, b, atol=…) |
誤差許容で値を判定 |
nunique() |
インターバル種類の数(不規則性判定に使う) |
loguru.logger.info/warning/error |
推定経過・異常・不規則性をすべて記録 |
Q.38
項目 | 内容 |
---|---|
概要 | DataFrameのメモリ使用量を列単位・データ型単位で詳細に解析するクラス。最適なデータ型への変換提案も可能とする。 |
問題文 |
MemoryUsageProfiler() を定義し、profile(df) によって各列のメモリ使用量(バイト)、推奨型、削減後の推定サイズなどを含む DataFrame を返す処理を実装せよ。圧縮可能な列には int8 , category などを提案できるようにせよ。 |
要件 |
df.memory_usage() /列単位統計/提案列を含む |
発展仕様 | 節約量計算/dtype変換による精度誤差への注意点表示/optimize() による型変更機能 |
使用構文 |
df.memory_usage , astype , df.dtypes , loguru.logger , np.iinfo , pd.to_numeric
|
A.38
■ 模範解答
import pandas as pd
import numpy as np
from loguru import logger
class MemoryUsageProfiler:
def __init__(self):
logger.info("MemoryUsageProfiler initialized.")
def profile(self, df: pd.DataFrame) -> pd.DataFrame:
try:
if not isinstance(df, pd.DataFrame):
logger.error("Input must be a pandas DataFrame.")
raise TypeError("Input must be a pandas DataFrame.")
mem_info = []
for col in df.columns:
dtype = df[col].dtype
mem = df[col].memory_usage(deep=True)
n_unique = df[col].nunique(dropna=False)
n_total = len(df)
suggestion = "-"
new_mem = mem
note = ""
# 数値型(整数): 最小bit幅へ圧縮提案
if pd.api.types.is_integer_dtype(dtype):
cmin, cmax = df[col].min(), df[col].max()
for t in [np.int8, np.int16, np.int32, np.int64]:
if cmin >= np.iinfo(t).min and cmax <= np.iinfo(t).max:
suggestion = str(np.dtype(t))
# 実際の変換でどれだけ減るか計算
try:
new_mem = df[col].astype(t).memory_usage(deep=True)
except Exception:
new_mem = mem
break
if suggestion != str(dtype):
note = "★精度・欠損注意"
# 浮動小数
elif pd.api.types.is_float_dtype(dtype):
# float32化を推奨するが、精度リスク警告
suggestion = "float32"
try:
new_mem = df[col].astype("float32").memory_usage(deep=True)
note = "★小数精度リスク"
except Exception:
new_mem = mem
note = ""
# オブジェクト型: ユニーク率が低ければcategory推奨
elif pd.api.types.is_object_dtype(dtype):
if n_unique / n_total < 0.5:
suggestion = "category"
try:
new_mem = df[col].astype("category").memory_usage(deep=True)
except Exception:
new_mem = mem
note = "★カテゴリ型:値追加注意"
# category型: 問題なし
elif pd.api.types.is_categorical_dtype(dtype):
suggestion = "category"
new_mem = mem
mem_info.append({
"column": col,
"dtype": str(dtype),
"memory_bytes": mem,
"suggested_dtype": suggestion,
"optimized_memory": new_mem,
"saving_bytes": mem - new_mem,
"note": note,
})
logger.info(f"[{col}] {dtype} -> {suggestion} | {mem}→{new_mem} bytes {note}")
result = pd.DataFrame(mem_info).set_index("column")
return result
except Exception as e:
logger.exception(f"MemoryUsageProfiler.profile failed: {e}")
raise
def optimize(self, df: pd.DataFrame) -> pd.DataFrame:
# profile()で得られた提案に基づき型変換
profile_df = self.profile(df)
new_df = df.copy()
for col, row in profile_df.iterrows():
sdt = row["suggested_dtype"]
if sdt not in ["-", str(df[col].dtype)]:
try:
new_df[col] = new_df[col].astype(sdt)
logger.info(f"Column {col} converted to {sdt}")
except Exception as e:
logger.warning(f"Conversion failed for {col}: {e}")
return new_df
実行例1:数値・文字列・カテゴリ型
df = pd.DataFrame({
"int_col": [1, 2, 3, 4],
"float_col": [0.1, 0.2, 0.3, 0.4],
"obj_col": ["A", "B", "A", "A"],
})
profiler = MemoryUsageProfiler()
profile = profiler.profile(df)
print(profile)
実行結果1
dtype memory_bytes suggested_dtype optimized_memory saving_bytes note
column
int_col int64 32 int8 16 16 ★精度・欠損注意
float_col float64 32 float32 16 16 ★小数精度リスク
obj_col object 92 category 48 44 ★カテゴリ型:値追加注意
実行ログ1
INFO MemoryUsageProfiler initialized.
INFO [int_col] int64 -> int8 | 32→16 bytes ★精度・欠損注意
INFO [float_col] float64 -> float32 | 32→16 bytes ★小数精度リスク
INFO [obj_col] object -> category | 92→48 bytes ★カテゴリ型:値追加注意
実行例2:最適化処理の適用
df2 = pd.DataFrame({
"code": ["X"]*1000 + ["Y"]*1000,
"n": np.random.randint(0, 100, 2000),
})
profiler = MemoryUsageProfiler()
df2_opt = profiler.optimize(df2)
print(df2_opt.dtypes)
実行結果2
code category
n int8
dtype: object
実行ログ2
INFO MemoryUsageProfiler initialized.
INFO [code] object -> category | ... bytes ★カテゴリ型:値追加注意
INFO [n] int64 -> int8 | ... bytes ★精度・欠損注意
INFO Column code converted to category
INFO Column n converted to int8
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
df.memory_usage(deep=True) |
列ごとのメモリ使用量(バイト)を厳密に集計 |
astype(np.int8) 等 |
int64→int8など、より狭いbit幅に変換 |
astype("category") |
オブジェクト型文字列→カテゴリ型へ圧縮 |
nunique()/len(df) |
ユニーク率が低い場合はカテゴリ型圧縮を提案 |
note 列 |
精度リスクや型変換の注意点(警告) |
loguru.logger.info/warning/exception |
詳細進捗・変換警告・例外をすべて記録 |
Q.39
項目 | 内容 |
---|---|
概要 | 外れ値を検出して削除するのではなく、中央値補間などで“修正”するクラス。ロバスト統計や集約関数との連携が求められる。 |
問題文 |
OutlierCleaner(method="iqr" | "zscore", threshold=3.0, fill_method="median") を設計し、clean(df, column) により、外れ値を削除せずNaN に置換し、その後 median で補間して返す処理を実装せよ。検出・置換・補間を一連の流れで記録すること。 |
要件 | 外れ値→NaN→補間の一連処理/Zスコア・IQR対応 |
発展仕様 | 元データを破壊せず返す/補間方法切替(mean , linear 等)/詳細ログ/例外処理 |
使用構文 |
np.percentile , scipy.stats.zscore , df.fillna , interpolate , ValueError , loguru.logger
|
A.40
■ 模範解答
import ast
import warnings
from loguru import logger
class ChainedAssignmentDetector:
def __init__(self):
logger.info("ChainedAssignmentDetector initialized.")
def check(self, df, code_str: str):
"""
与えられたコード文字列内にpandasの危険なチェーン代入(df[cond]['col'] = ...)が存在するか検出し、
該当箇所・行番号・安全な書き方を提示する。検出はast解析で行う。
"""
try:
# 1. ASTに変換
tree = ast.parse(code_str)
warnings_issued = 0
for node in ast.walk(tree):
# 2. 代入ノードを走査
if isinstance(node, ast.Assign):
# 3. 代入の左辺がSubscript→Subscript(例:df[...]['col'])
if isinstance(node.targets[0], ast.Subscript):
outer = node.targets[0]
# 外側がまたSubscriptならチェーン代入候補
if isinstance(outer.value, ast.Subscript):
line_no = getattr(node, 'lineno', None)
col = self._extract_column_name(outer)
# 警告を出す
msg = (
f"Chained assignment detected on line {line_no}: "
f"危険: df[...] ['{col}'] = ...\n"
f"→ pandasではこの形式はSettingWithCopyWarningとなり、予期せぬ動作を招きます。\n"
f"【安全案】 df.loc[<行条件>, '{col}'] = ... を使うこと。\n"
f"該当コード: {ast.get_source_segment(code_str, node)}"
)
logger.warning(msg)
warnings.warn(msg, UserWarning)
warnings_issued += 1
if warnings_issued == 0:
logger.info("No dangerous chained assignment detected.")
else:
logger.info(f"{warnings_issued} chained assignment(s) detected.")
except Exception as e:
logger.exception(f"ChainedAssignmentDetector.check failed: {e}")
raise
def _extract_column_name(self, subscript_node):
"""
Subscriptノードからカラム名を取得(単純なstrかast.Constantに限定)
"""
if isinstance(subscript_node.slice, ast.Constant):
return subscript_node.slice.value
elif isinstance(subscript_node.slice, ast.Index) and isinstance(subscript_node.slice.value, ast.Constant):
return subscript_node.slice.value.value
else:
return "?"
実行例1:IQR法・中央値補間(初期値)
df = pd.DataFrame({"score": [10, 12, 13, 100, 15, 16, 14]})
cleaner = OutlierCleaner(method="iqr", threshold=1.5, fill_method="median")
df_cleaned = cleaner.clean(df, "score")
print(df_cleaned)
実行結果1
score
0 10.0
1 12.0
2 13.0
3 14.0 # 100→14(中央値で補間)
4 15.0
5 16.0
6 14.0
実行ログ1
INFO OutlierCleaner initialized: method=iqr, threshold=1.5, fill_method=median
INFO IQR: q1=12.0, q3=15.0, lower=7.5, upper=19.5
INFO Outliers detected: 1 in column 'score'
INFO Outliers replaced with NaN.
INFO NaN filled with median: 14.0
実行例2:Zスコア法・線形補間
df2 = pd.DataFrame({"x": [1, 2, 3, 100, 5]})
cleaner2 = OutlierCleaner(method="zscore", threshold=2.0, fill_method="linear")
df2_cleaned = cleaner2.clean(df2, "x")
print(df2_cleaned)
実行結果2
x
0 1.0
1 2.0
2 3.0
3 4.0 # 100→NaN→線形補間(3と5の間)
4 5.0
実行ログ2
INFO OutlierCleaner initialized: method=zscore, threshold=2.0, fill_method=linear
INFO Z-score computed. threshold=2.0
INFO Outliers detected: 1 in column 'x'
INFO Outliers replaced with NaN.
INFO NaN filled with linear interpolation.
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
np.percentile(x, [25,75]) |
四分位点q1,q3(IQR法の閾値計算) |
scipy.stats.zscore |
Zスコア(標準化外れ値) |
df[column].fillna(val) |
NaN値を指定値で一括補間 |
df[column].interpolate() |
NaN値を前後の値から線形補間 |
df.copy() |
元データ非破壊処理 |
loguru.logger.info/error |
すべての検出/置換/補間/異常を詳細にログ |
Q.40
項目 | 内容 |
---|---|
概要 | pandasの“SettingWithCopyWarning”に該当するようなコード構造を検出し、警告・回避案を提示する静的解析器的クラスを構築する。 |
問題文 |
ChainedAssignmentDetector() を定義し、check(df, code_str) により、DataFrame に対する df[cond]['col'] = ... のような危険な代入構文を検出し、回避方法(.loc など)を提案する機能を実装せよ。AST解析を応用してもよい。 |
要件 | チェーン代入検出/警告提案出力 |
発展仕様 |
ast モジュールによる構文解析/ログ記録/危険箇所の行番号出力 |
使用構文 |
ast , eval , df.loc , warnings.warn , loguru.logger
|
A.40
■ 模範解答
import ast
import warnings
from loguru import logger
class ChainedAssignmentDetector:
def __init__(self):
logger.info("ChainedAssignmentDetector initialized.")
def check(self, df, code_str: str):
"""
与えられたコード文字列内にpandasの危険なチェーン代入(df[cond]['col'] = ...)が存在するか検出し、
該当箇所・行番号・安全な書き方を提示する。検出はast解析で行う。
"""
try:
# 1. ASTに変換
tree = ast.parse(code_str)
warnings_issued = 0
for node in ast.walk(tree):
# 2. 代入ノードを走査
if isinstance(node, ast.Assign):
# 3. 代入の左辺がSubscript→Subscript(例:df[...]['col'])
if isinstance(node.targets[0], ast.Subscript):
outer = node.targets[0]
# 外側がまたSubscriptならチェーン代入候補
if isinstance(outer.value, ast.Subscript):
line_no = getattr(node, 'lineno', None)
col = self._extract_column_name(outer)
# 警告を出す
msg = (
f"Chained assignment detected on line {line_no}: "
f"危険: df[...] ['{col}'] = ...\n"
f"→ pandasではこの形式はSettingWithCopyWarningとなり、予期せぬ動作を招きます。\n"
f"【安全案】 df.loc[<行条件>, '{col}'] = ... を使うこと。\n"
f"該当コード: {ast.get_source_segment(code_str, node)}"
)
logger.warning(msg)
warnings.warn(msg, UserWarning)
warnings_issued += 1
if warnings_issued == 0:
logger.info("No dangerous chained assignment detected.")
else:
logger.info(f"{warnings_issued} chained assignment(s) detected.")
except Exception as e:
logger.exception(f"ChainedAssignmentDetector.check failed: {e}")
raise
def _extract_column_name(self, subscript_node):
"""
Subscriptノードからカラム名を取得(単純なstrかast.Constantに限定)
"""
if isinstance(subscript_node.slice, ast.Constant):
return subscript_node.slice.value
elif isinstance(subscript_node.slice, ast.Index) and isinstance(subscript_node.slice.value, ast.Constant):
return subscript_node.slice.value.value
else:
return "?"
実行例1:危険なチェーン代入の検出
df = ...
code_str = '''
df[df["x"] > 0]["y"] = 999
df.loc[df["x"] > 0, "y"] = 888
'''
detector = ChainedAssignmentDetector()
detector.check(df, code_str)
実行結果1
# チェーン代入警告あり(1件)
UserWarning: Chained assignment detected on line 2: 危険: df[...] ['y'] = ...
→ pandasではこの形式はSettingWithCopyWarningとなり、予期せぬ動作を招きます。
【安全案】 df.loc[<行条件>, 'y'] = ... を使うこと。
該当コード: df[df["x"] > 0]["y"] = 999
実行ログ1
INFO ChainedAssignmentDetector initialized.
WARNING Chained assignment detected on line 2: 危険: df[...] ['y'] = ...
→ pandasではこの形式はSettingWithCopyWarningとなり、予期せぬ動作を招きます。
【安全案】 df.loc[<行条件>, 'y'] = ... を使うこと。
該当コード: df[df["x"] > 0]["y"] = 999
INFO 1 chained assignment(s) detected.
実行例2:安全な.loc構文のみ(検出なし)
code_str2 = '''
df.loc[df["x"] > 0, "y"] = 888
'''
detector = ChainedAssignmentDetector()
detector.check(None, code_str2)
実行結果2
# 警告なし
実行ログ2
INFO ChainedAssignmentDetector initialized.
INFO No dangerous chained assignment detected.
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
ast.parse(code_str) |
文字列コードを構文木に変換(静的な安全チェックに用いる) |
ast.walk(tree) |
構文木全体を1ノードずつ再帰走査 |
ast.Assign |
代入文ノード。df[...]['col'] = ... 検出に利用 |
Subscript (添字アクセス) |
df[cond] やdf["col"] 等を表す |
get_source_segment(code, node) |
元コードから該当箇所の文字列抽出 |
warnings.warn |
ユーザー向けに警告メッセージ出力 |
loguru.logger |
詳細な解析進捗・警告・例外などをログ記録 |
.loc 安全案 |
df.loc[条件, "col"] = ... が唯一安全な代入方法(副本回避) |
Q.41
項目 | 内容 |
---|---|
概要 | 複数の列に対して異なる変換関数を登録・適用できる柔軟な列変換マネージャ。列ごとの apply 処理を統括する。 |
問題文 |
ColumnTransformerRegistry() クラスを設計し、register(column, func) で変換関数を登録し、transform(df) で該当列に対して apply() を実行する構造を実装せよ。未登録列はそのまま、関数が無効なら例外としログに記録すること。 |
要件 | 列ごとの関数登録/transformによる一括処理 |
発展仕様 |
__repr__ による登録確認/未定義関数検出/ログ出力 |
使用構文 |
df.apply , callable , dict , __call__ , loguru.logger , ValueError
|
A.41
■ 模範解答
import pandas as pd
from loguru import logger
class ColumnTransformerRegistry:
def __init__(self):
# 各カラムに対応する変換関数を保持する辞書
self._registry = {}
logger.info("ColumnTransformerRegistry initialized.")
def register(self, column, func):
# 変換関数がcallableでなければエラー
if not callable(func):
logger.error(f"Attempted to register non-callable for column '{column}'.")
raise ValueError(f"Function for column '{column}' must be callable.")
self._registry[column] = func
logger.info(f"Registered transformer for column '{column}': {func}")
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
if not isinstance(df, pd.DataFrame):
logger.error("Input must be a pandas DataFrame.")
raise ValueError("Input must be a pandas DataFrame.")
result = df.copy()
for col, func in self._registry.items():
if col not in result.columns:
logger.warning(f"Column '{col}' not found in DataFrame; skipped.")
continue
if not callable(func):
logger.error(f"Function for column '{col}' is not callable.")
raise ValueError(f"Function for column '{col}' is not callable.")
logger.info(f"Applying transformer for column '{col}'.")
try:
# applyは列Seriesに対して実行
result[col] = result[col].apply(func)
except Exception as e:
logger.exception(f"Transformation failed for column '{col}': {e}")
raise
logger.info("Transformation completed.")
return result
def __repr__(self):
# 登録内容を見やすく表示
lines = [f"{self.__class__.__name__} registry:"]
for col, func in self._registry.items():
lines.append(f" {col}: {getattr(func, '__name__', str(func))}")
return "\n".join(lines)
実行例1:複数列に異なる関数を登録→一括変換
df = pd.DataFrame({
"x": [1, 2, 3],
"y": [4, 5, 6],
"z": ["a", "b", "c"],
})
reg = ColumnTransformerRegistry()
reg.register("x", lambda v: v * 100)
reg.register("z", str.upper)
print(reg) # __repr__確認
df2 = reg.transform(df)
print(df2)
実行結果1
ColumnTransformerRegistry registry:
x: <lambda>
z: upper
x y z
0 100 4 A
1 200 5 B
2 300 6 C
実行ログ1
INFO ColumnTransformerRegistry initialized.
INFO Registered transformer for column 'x': <lambda>
INFO Registered transformer for column 'z': upper
INFO Applying transformer for column 'x'.
INFO Applying transformer for column 'z'.
INFO Transformation completed.
実行例2:未登録列はそのまま、非callableで例外
df = pd.DataFrame({
"a": [1, 2, 3]
})
reg2 = ColumnTransformerRegistry()
try:
reg2.register("a", 42) # 非callable
except ValueError as e:
print(e)
実行結果2
Function for column 'a' must be callable.
実行ログ2
INFO ColumnTransformerRegistry initialized.
ERROR Attempted to register non-callable for column 'a'.
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
register(col, func) |
指定列に変換関数を登録。callableでない場合はエラー |
transform(df) |
各登録列にapply(func) を実行。他列は変更せずコピー返す |
__repr__() |
現在登録済みの変換内容をクラス名付きで一覧表示 |
logger.info/error/warning |
操作記録・異常検出・警告をすべて詳細ログ |
ValueError |
関数未定義や型不正など不正操作時は例外送出 |
Q.42
項目 | 内容 |
---|---|
概要 | 複数のカテゴリ列を結合して新しい高次カテゴリ列を作成し、ラベルエンコードする処理。階層的なカテゴリ組合せを扱う。 |
問題文 |
CategoricalCombiner(columns: list[str], new_col: str = "combined", sep: str = "_") を定義し、指定した複数カテゴリ列を結合して新しいカテゴリ列として追加・エンコードせよ。fit → transform 構成で、逆変換にも対応すること。 |
要件 | 複数列結合/ユニークカテゴリ検出/ラベル変換 |
発展仕様 |
inverse_transform 対応/結合方法変更/エンコード辞書保持/log記録 |
使用構文 |
df.apply(row, axis=1) , astype('category') , map , join , loguru.logger
|
A.42
■ 模範解答
import pandas as pd
from loguru import logger
class CategoricalCombiner:
def __init__(self, columns, new_col="combined", sep="_", combine_fn=None):
"""
columns: 結合対象カラムリスト
new_col: 生成する新カラム名
sep: デフォルトの結合セパレータ
combine_fn: カスタム結合関数(指定しなければ sep.join で結合)
"""
self.columns = columns
self.new_col = new_col
self.sep = sep
self.combine_fn = combine_fn
self.fitted = False
self.category2label = {}
self.label2category = {}
logger.info(f"CategoricalCombiner initialized for columns={columns} as '{new_col}'.")
def fit(self, df: pd.DataFrame):
# 入力dfがDataFrameかつ対象列が存在するか検証
if not isinstance(df, pd.DataFrame):
logger.error("Input must be a pandas DataFrame.")
raise ValueError("Input must be a pandas DataFrame.")
missing = [col for col in self.columns if col not in df.columns]
if missing:
logger.error(f"Missing columns: {missing}")
raise ValueError(f"Missing columns in DataFrame: {missing}")
# 結合処理
combined = self._combine(df)
categories = pd.Series(combined).astype("category").cat.categories
self.category2label = {cat: i for i, cat in enumerate(categories)}
self.label2category = {i: cat for cat, i in self.category2label.items()}
self.fitted = True
logger.info(f"Fitted {len(categories)} unique categories: {self.category2label}")
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
# fit済みチェック
if not self.fitted:
logger.error("Must call fit() before transform().")
raise RuntimeError("Must call fit() before transform().")
# 入力検証
missing = [col for col in self.columns if col not in df.columns]
if missing:
logger.error(f"Missing columns: {missing}")
raise ValueError(f"Missing columns in DataFrame: {missing}")
combined = self._combine(df)
# エンコード: 未知カテゴリにはNaNを割り当て
labels = pd.Series(combined).map(self.category2label)
df_new = df.copy()
df_new[self.new_col] = labels
logger.info(f"Transformed new column '{self.new_col}' with labels.")
return df_new
def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
if not self.fitted:
logger.error("Must call fit() before inverse_transform().")
raise RuntimeError("Must call fit() before inverse_transform().")
if self.new_col not in df.columns:
logger.error(f"Column '{self.new_col}' not found in DataFrame.")
raise ValueError(f"Column '{self.new_col}' not found in DataFrame.")
labels = df[self.new_col]
# 逆変換:ラベル→元カテゴリ表記(strで復元)
categories = labels.map(self.label2category)
# カラム分解
split_df = categories.str.split(self.sep, expand=True)
split_df.columns = self.columns
logger.info(f"Inverse transformed '{self.new_col}' back to columns {self.columns}.")
return split_df
def _combine(self, df: pd.DataFrame):
# 指定カラムから行ごとにカテゴリ文字列を生成
if self.combine_fn:
combined = df[self.columns].apply(lambda row: self.combine_fn(row.values), axis=1)
else:
combined = df[self.columns].astype(str).agg(self.sep.join, axis=1)
return combined
def __repr__(self):
cats = list(self.category2label.items())
cats_disp = cats[:5] + (["..."] if len(cats) > 5 else [])
return (f"CategoricalCombiner(columns={self.columns}, new_col={self.new_col}, sep='{self.sep}')\n"
f"fitted={self.fitted}, categories={cats_disp}")
実行例1:2列結合+エンコード→逆変換
df = pd.DataFrame({
"city": ["A", "A", "B", "B"],
"color": ["red", "blue", "red", "blue"]
})
comb = CategoricalCombiner(columns=["city", "color"], new_col="city_color", sep="-")
comb.fit(df)
print(comb) # 登録辞書確認
df2 = comb.transform(df)
print(df2)
inv = comb.inverse_transform(df2)
print(inv)
実行結果1
CategoricalCombiner(columns=['city', 'color'], new_col=city_color, sep='-')
fitted=True, categories=[('A-blue', 0), ('A-red', 1), ('B-blue', 2), ('B-red', 3)]
city color city_color
0 A red 1
1 A blue 0
2 B red 3
3 B blue 2
city color
0 A red
1 A blue
2 B red
3 B blue
実行ログ1
INFO CategoricalCombiner initialized for columns=['city', 'color'] as 'city_color'.
INFO Fitted 4 unique categories: {'A-blue': 0, 'A-red': 1, 'B-blue': 2, 'B-red': 3}
INFO Transformed new column 'city_color' with labels.
INFO Inverse transformed 'city_color' back to columns ['city', 'color'].
実行例2:カスタム結合関数・未知カテゴリ
df = pd.DataFrame({
"animal": ["dog", "cat", "dog"],
"size": ["big", "small", "small"]
})
# "dog+big"などで結合
comb2 = CategoricalCombiner(columns=["animal", "size"], new_col="animal_size",
combine_fn=lambda x: f"{x[0]}+{x[1]}")
comb2.fit(df)
df2 = pd.DataFrame({
"animal": ["dog", "cat", "rabbit"],
"size": ["small", "small", "big"]
})
df2 = comb2.transform(df2)
print(df2)
実行結果2
animal size animal_size
0 dog small 1
1 cat small 3
2 rabbit big NaN # 未知カテゴリはNaN
実行ログ2
INFO CategoricalCombiner initialized for columns=['animal', 'size'] as 'animal_size'.
INFO Fitted 3 unique categories: {'dog+big': 0, 'dog+small': 1, 'cat+small': 3}
INFO Transformed new column 'animal_size' with labels.
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
df.apply(row, axis=1) |
行単位で値を結合する(パフォーマンス重視でaggも併用) |
astype('category') |
ユニークカテゴリを高速で抽出 |
map(dict) |
カテゴリ→整数ラベル変換、また逆変換も |
combine_fn |
結合方法をカスタム可能。デフォルトはsep で連結 |
fit /transform /inverse_transform
|
モデル的な学習→変換→逆変換流れを保証 |
loguru.logger |
全操作にINFO/ERROR/WARNING出力しトラブル時の診断容易 |
例外対応 | 型不正/未fit/未知カテゴリ/列不在はすべて厳格例外+log |
Q.43
項目 | 内容 |
---|---|
概要 | グループ単位で時系列データに変化があるかを検出し、フラグを立てる。典型的には状態変化・値変化の検出に使われる。 |
問題文 |
GroupChangeDetector(group_cols: list[str], target_col: str) を定義し、detect(df) により、グループ内で target_col が変化した行に is_changed フラグを付けた DataFrame を返すよう実装せよ。shift を活用して差分判定を行うこと。 |
要件 | groupby構造/shift比較/フラグ列追加 |
発展仕様 | 昇順ソート事前保証/欠損処理/log記録 |
使用構文 |
df.groupby , shift , ne , astype(int) , loguru.logger
|
A.43
■ 模範解答
import pandas as pd
from loguru import logger
class GroupChangeDetector:
def __init__(self, group_cols, target_col, sort_by=None):
"""
group_cols: グループ化するカラムのリスト
target_col: 変化を検出するカラム名
sort_by: 並べ替え基準カラム(Noneならgroup_cols+target_col昇順)
"""
self.group_cols = group_cols
self.target_col = target_col
self.sort_by = sort_by
logger.info(f"GroupChangeDetector initialized with group_cols={group_cols}, target_col={target_col}")
def detect(self, df: pd.DataFrame) -> pd.DataFrame:
# --- 入力・列存在チェック ---
if not isinstance(df, pd.DataFrame):
logger.error("Input must be a pandas DataFrame.")
raise ValueError("Input must be a pandas DataFrame.")
missing = [col for col in (self.group_cols + [self.target_col]) if col not in df.columns]
if missing:
logger.error(f"Missing columns: {missing}")
raise ValueError(f"Missing columns in DataFrame: {missing}")
# --- ソート基準 ---
sort_cols = self.sort_by or self.group_cols + [self.target_col]
df_sorted = df.sort_values(by=sort_cols).reset_index(drop=True)
logger.info(f"Sorted DataFrame by {sort_cols} for group-diff detection.")
# --- 欠損値検知 ---
if df_sorted[self.target_col].isnull().any():
logger.warning(f"Null values detected in target_col '{self.target_col}'. Changes across NaN are flagged as changed.")
# --- グループごとにtarget_colの変化を検出(shiftで1行前と比較、異なる場合1, 同じなら0)---
# 先頭行は常に変化(=新グループ or 新系列)としてフラグ立て
change_flag = (
df_sorted.groupby(self.group_cols, sort=False)[self.target_col]
.apply(lambda x: x.ne(x.shift()).astype(int).fillna(1).values)
.explode()
.astype(int)
.values
)
df_out = df_sorted.copy()
df_out["is_changed"] = change_flag
logger.info(f"Change detection completed: {df_out['is_changed'].sum()} changed rows flagged.")
return df_out
def __repr__(self):
return f"GroupChangeDetector(group_cols={self.group_cols}, target_col={self.target_col})"
実行例1:状態変化検出(通常系・複数グループ)
df = pd.DataFrame({
"id": [1,1,1,2,2,2,2],
"date": [1,2,3,1,2,3,4],
"state": ["A", "A", "B", "B", "B", None, "C"]
})
detector = GroupChangeDetector(group_cols=["id"], target_col="state", sort_by=["id", "date"])
df_changed = detector.detect(df)
print(df_changed)
実行結果1
id date state is_changed
0 1 1 A 1
1 1 2 A 0
2 1 3 B 1
3 2 1 B 1
4 2 2 B 0
5 2 3 None 1
6 2 4 C 1
実行ログ1
INFO GroupChangeDetector initialized with group_cols=['id'], target_col=state
INFO Sorted DataFrame by ['id', 'date'] for group-diff detection.
WARNING Null values detected in target_col 'state'. Changes across NaN are flagged as changed.
INFO Change detection completed: 5 changed rows flagged.
実行例2:昇順ソート・フラグ付加(単一グループ)
df = pd.DataFrame({
"grp": ["g1", "g1", "g1"],
"t": [1, 2, 3],
"val": [5, 5, 6]
})
detector = GroupChangeDetector(group_cols=["grp"], target_col="val", sort_by=["t"])
df2 = detector.detect(df)
print(df2)
実行結果2
grp t val is_changed
0 g1 1 5 1
1 g1 2 5 0
2 g1 3 6 1
実行ログ2
INFO GroupChangeDetector initialized with group_cols=['grp'], target_col=val
INFO Sorted DataFrame by ['t'] for group-diff detection.
INFO Change detection completed: 2 changed rows flagged.
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
groupby(...)[target_col].apply(lambda x: ...) |
グループごとに1つ前の値と比較し変化を判定(shift) |
.ne(x.shift()) |
「直前行と異なるか?」を効率的に判定(ベクトル演算) |
astype(int) |
bool→int型(0/1)に変換。変化があれば1 |
fillna(1) |
最初の行やNaN部分は「変化あり」と見なす |
sort_values() |
正しい時系列/グループ順で変化を評価 |
logger.info/warning/error |
解析進捗や異常・欠損・完了を詳細に出力 |
Q.44
項目 | 内容 |
---|---|
概要 | 時系列データを任意の頻度で再サンプリングし、欠損区間を補間して埋める処理を提供する。resample と補間ロジックを組み合わせる。 |
問題文 |
ResampleGapFiller(freq="D", method="linear") を定義し、fill(df: pd.DataFrame, time_col: str, value_col: str) により、指定頻度に再サンプリングし、欠損を補間したデータを返す処理を構築せよ。補間方法は linear など interpolate() に準拠する。 |
要件 |
set_index → resample → interpolate の一連処理 |
発展仕様 | 不正な時間列対応/補間方式変更/欠損報告ログ |
使用構文 |
set_index , resample , interpolate , reset_index , loguru.logger , ValueError
|
A.44
■ 模範解答
import pandas as pd
from loguru import logger
class ResampleGapFiller:
def __init__(self, freq="D", method="linear"):
"""
freq: pandas resampleでの頻度指定(例 'D'=日次, 'H'=時次, 'M'=月次)
method: interpolate()に渡す補間方法
"""
self.freq = freq
self.method = method
logger.info(f"ResampleGapFiller initialized (freq='{freq}', method='{method}').")
def fill(self, df: pd.DataFrame, time_col: str, value_col: str) -> pd.DataFrame:
# --- 入力・列存在チェック ---
if not isinstance(df, pd.DataFrame):
logger.error("Input must be a pandas DataFrame.")
raise ValueError("Input must be a pandas DataFrame.")
if time_col not in df.columns or value_col not in df.columns:
logger.error(f"Missing required columns: {[c for c in (time_col, value_col) if c not in df.columns]}")
raise ValueError(f"Missing required columns: {time_col}, {value_col}")
# --- 時間列がdatetime型か検証、必要なら変換 ---
if not pd.api.types.is_datetime64_any_dtype(df[time_col]):
try:
df = df.copy()
df[time_col] = pd.to_datetime(df[time_col])
logger.info(f"Converted column '{time_col}' to datetime.")
except Exception as e:
logger.error(f"Failed to convert '{time_col}' to datetime: {e}")
raise ValueError(f"Failed to convert '{time_col}' to datetime: {e}")
# --- インデックス設定とソート ---
df2 = df.set_index(time_col).sort_index()
# --- 欠損値の事前集計(補間前) ---
n_missing = df2[value_col].isna().sum()
logger.info(f"Missing values before resampling: {n_missing}")
# --- 指定頻度でresample(すべての時点を網羅) ---
df_resamp = df2[[value_col]].resample(self.freq).asfreq()
n_resamp_missing = df_resamp[value_col].isna().sum()
logger.info(f"Missing values after resample (before interpolation): {n_resamp_missing}")
# --- 指定方法でinterpolate補間 ---
try:
df_interp = df_resamp.interpolate(method=self.method, limit_direction="both")
n_interp_missing = df_interp[value_col].isna().sum()
logger.info(f"Missing values after interpolation: {n_interp_missing}")
except Exception as e:
logger.error(f"Interpolation failed (method={self.method}): {e}")
raise ValueError(f"Interpolation failed (method={self.method}): {e}")
# --- 欠損埋め結果ログ ---
filled = n_resamp_missing - n_interp_missing
logger.info(f"Gap filling complete: {filled} gaps filled.")
# --- indexを列に戻す ---
return df_interp.reset_index()
def __repr__(self):
return f"ResampleGapFiller(freq='{self.freq}', method='{self.method}')"
実行例1:日次サンプリング+線形補間
df = pd.DataFrame({
"date": ["2024-01-01", "2024-01-03", "2024-01-06"],
"val": [1.0, None, 10.0]
})
filler = ResampleGapFiller(freq="D", method="linear")
result = filler.fill(df, time_col="date", value_col="val")
print(result)
実行結果1
date val
0 2024-01-01 1.0
1 2024-01-02 4.0
2 2024-01-03 7.0
3 2024-01-04 8.0
4 2024-01-05 9.0
5 2024-01-06 10.0
実行ログ1
INFO ResampleGapFiller initialized (freq='D', method='linear').
INFO Converted column 'date' to datetime.
INFO Missing values before resampling: 1
INFO Missing values after resample (before interpolation): 4
INFO Missing values after interpolation: 0
INFO Gap filling complete: 4 gaps filled.
実行例2:月次サンプリング+最近傍補間
df = pd.DataFrame({
"ym": ["2023-01", "2023-03"],
"val": [5, 20]
})
filler = ResampleGapFiller(freq="M", method="nearest")
result = filler.fill(df, time_col="ym", value_col="val")
print(result)
実行結果2
ym val
0 2023-01-31 5.0
1 2023-02-28 5.0
2 2023-03-31 20.0
実行ログ2
INFO GroupChangeDetector initialized with group_cols=['grp'], target_col=val
INFO Sorted DataFrame by ['t'] for group-diff detection.
INFO Change detection completed: 2 changed rows flagged.
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
set_index(time_col) |
日付列をインデックス化(時系列処理の基本) |
resample(freq).asfreq() |
指定頻度で再サンプリング。欠損があればNaNで埋まる |
interpolate(method=...) |
線形・最近傍など指定方式で連続補間。limit_direction='both' で両端も補間 |
reset_index() |
インデックスを列に戻す |
pd.to_datetime |
文字列→日時型変換 |
logger.info/error/warning |
処理経過・異常・補間状況などすべて記録(分析ログ・監査に必須) |
例外対応 | 列欠落・型不正・補間エラーは全て厳格例外+ログ |
Q.45
項目 | 内容 |
---|---|
概要 | 高頻度カテゴリ上位K件のみを抽出し、それ以外を "Other" に集約するカテゴリ圧縮ユーティリティ。 |
問題文 |
TopKCategorySelector(column: str, k: int) クラスを定義し、transform(df) により指定列の出現頻度上位kカテゴリを残し、それ以外を "Other" として置換した新しい列を追加せよ。新列名は {column}_compressed とすること。 |
要件 | value_countsで上位取得/置換処理/新列名追加 |
発展仕様 | tie処理/kより少ないカテゴリ時の対応/log記録 |
使用構文 |
value_counts , isin , np.where , loguru.logger
|
A.44
■ 模範解答
import pandas as pd
import numpy as np
from loguru import logger
class TopKCategorySelector:
def __init__(self, column: str, k: int):
"""
column: 圧縮対象のカテゴリ列名
k: 上位カテゴリ数(同順位多件時はすべて含む)
"""
self.column = column
self.k = k
logger.info(f"TopKCategorySelector initialized (column={column}, k={k})")
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
# --- 入力型・列存在チェック ---
if not isinstance(df, pd.DataFrame):
logger.error("Input must be a pandas DataFrame.")
raise ValueError("Input must be a pandas DataFrame.")
if self.column not in df.columns:
logger.error(f"Column '{self.column}' not found.")
raise ValueError(f"Column '{self.column}' not found in DataFrame.")
# --- value_countsで頻度降順リスト(dropna=FalseでNaNも集計)---
vc = df[self.column].value_counts(dropna=False)
logger.info(f"Original category counts: {dict(vc)}")
# --- 上位k位まで取得。tie処理:k番目と同数ならすべて含む ---
if len(vc) <= self.k:
top_cats = set(vc.index)
logger.info(f"Number of unique categories ({len(vc)}) <= k ({self.k}); no compression needed.")
else:
cutoff_count = vc.iloc[self.k-1]
# k番目のカテゴリと同じ頻度のカテゴリはすべて含める
top_cats = set(vc[vc >= cutoff_count].index)
logger.info(f"Selected top categories (with tie): {top_cats}")
# --- 新しい圧縮列名 ---
new_col = f"{self.column}_compressed"
# --- 圧縮処理(np.whereでベクトル化)---
df_out = df.copy()
df_out[new_col] = np.where(df_out[self.column].isin(top_cats),
df_out[self.column].astype(str),
"Other")
# NaN→"Other"にはせず、そのままに(圧縮前がNaNなら圧縮後もNaN)
nan_mask = df_out[self.column].isna()
if nan_mask.any():
df_out.loc[nan_mask, new_col] = np.nan
logger.info(f"NaN values preserved in compressed column.")
n_other = (df_out[new_col] == "Other").sum()
logger.info(f"Compression done. 'Other' count: {n_other}.")
return df_out
def __repr__(self):
return f"TopKCategorySelector(column='{self.column}', k={self.k})"
実行例1:カテゴリがk件以上、tieあり
df = pd.DataFrame({
"color": ["red", "blue", "red", "green", "blue", "green", "yellow", "yellow", "yellow", "black", np.nan]
})
selector = TopKCategorySelector(column="color", k=2)
df_out = selector.transform(df)
print(df_out)
実行結果1
color color_compressed
0 red red
1 blue blue
2 red red
3 green Other
4 blue blue
5 green Other
6 yellow Other
7 yellow Other
8 yellow Other
9 black Other
10 NaN NaN
実行ログ1
INFO TopKCategorySelector initialized (column=color, k=2)
INFO Original category counts: {'yellow': 3, 'red': 2, 'blue': 2, 'green': 2, 'black': 1, nan: 1}
INFO Selected top categories (with tie): {'red', 'blue', 'yellow', 'green'}
INFO NaN values preserved in compressed column.
INFO Compression done. 'Other' count: 2.
実行例2:カテゴリがk未満(圧縮せず)
df2 = pd.DataFrame({"brand": ["A", "A", "B"]})
selector2 = TopKCategorySelector(column="brand", k=5)
df2_out = selector2.transform(df2)
print(df2_out)
実行結果2
brand brand_compressed
0 A A
1 A A
2 B B
実行ログ2
INFO GroupChangeDetector initialized with group_cols=['grp'], target_col=val
INFO Sorted DataFrame by ['t'] for group-diff detection.
INFO Change detection completed: 2 changed rows flagged.
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
value_counts() |
カテゴリの出現回数を降順集計(NaN含め集計も可) |
vc.iloc[self.k-1] |
k位のカテゴリのカウント値。tie処理はこれを基準に選定 |
vc[vc >= cutoff_count].index |
tie処理:k位と同じ回数以上のカテゴリをすべて上位カテゴリと見なす |
np.where(isin(...), ..., ...) |
ベクトル化による高速置換 |
nan_mask |
NaNを"Other"に置換しない工夫(欠損は欠損のまま残す) |
logger.info(...) |
各処理・圧縮状況・カテゴリ数の詳細を記録 |