まえがき
データサイエンティストのためのPython100本ノックについての記事です。NumPyやPandasといったデータサイエンス系でよく用いられるライブラリを扱います。誤りなどがあれば、ご指摘ください。今回はpandasを中心に10問扱います。
Q.16
項目 | 内容 | |
---|---|---|
概要 |
groupby 集約によるカテゴリ別統計量を柔軟に抽出するクラス設計。キー・集約列・関数指定・MultiIndexフラット化・詳細エラーとlog出力に対応。 |
|
問題文 | `GroupSummaryExtractor(by: str | list[str], agg_funcs: dict[str, list[str]]) を設計し、 call(df: pd.DataFrame)` で指定カラムによるgroupby集約+出力を単純インデックス化して返せ。カテゴリ列・集約列存在やagg_func形式などすべて検証し、異常時は例外+loguru記録せよ。 |
要件 | groupby集約/集約関数指定/MultiIndexフラット化/非破壊処理/カテゴリ・列存在チェック/agg_func形式検証/詳細log・例外対応 | |
発展仕様 |
|
|
使用構文 |
groupby , agg , reset_index , try-except , ValueError , loguru.logger , isinstance , issubclass , dict
|
A.16
■ 模範解答
import pandas as pd
from loguru import logger
from typing import Union
class GroupSummaryExtractor:
def __init__(self, by: Union[str, list], agg_funcs: dict):
# 集約キーの検証
if not (isinstance(by, (str, list)) and (isinstance(by, str) or all(isinstance(k, str) for k in by))):
logger.error("by must be str or list[str].")
raise ValueError("by must be str or list of str.")
# agg_funcsが dict[str, list[str]] 形式か検証
if not (isinstance(agg_funcs, dict) and all(isinstance(k, str) for k in agg_funcs.keys()) and
all(isinstance(v, list) and all(isinstance(f, str) for f in v) for v in agg_funcs.values())):
logger.error("agg_funcs must be dict[str, list[str]].")
raise ValueError("agg_funcs must be dict[str, list[str]].")
self.by = by
self.agg_funcs = agg_funcs
logger.info(f"GroupSummaryExtractor initialized (by={by}, agg_funcs={agg_funcs})")
def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
try:
# DataFrame型確認
if not isinstance(df, pd.DataFrame):
logger.error("Input must be a pandas DataFrame.")
raise TypeError("Input must be a pandas DataFrame.")
# カテゴリ列・集約列の存在検証
keys = [self.by] if isinstance(self.by, str) else self.by
missing_cols = [col for col in keys if col not in df.columns]
if missing_cols:
logger.error(f"Missing groupby key columns: {missing_cols}")
raise ValueError(f"Missing groupby key columns: {missing_cols}")
missing_agg = [col for col in self.agg_funcs if col not in df.columns]
if missing_agg:
logger.error(f"Missing aggregation columns: {missing_agg}")
raise ValueError(f"Missing aggregation columns: {missing_agg}")
logger.debug(f"groupby keys: {keys}, agg columns: {list(self.agg_funcs.keys())}")
# groupby集約処理
grouped = df.groupby(self.by, dropna=False).agg(self.agg_funcs)
logger.debug(f"Aggregated DataFrame (possibly MultiIndex columns):\n{grouped}")
# MultiIndexカラムをフラット化
if isinstance(grouped.columns, pd.MultiIndex):
grouped.columns = ['_'.join([str(a) for a in col if a]) for col in grouped.columns.values]
logger.info(f"Flattened MultiIndex columns: {list(grouped.columns)}")
result = grouped.reset_index() # インデックスを普通の列に戻す
logger.info("GroupSummaryExtractor: aggregation and flattening complete.")
return result
except Exception as e:
logger.exception(f"GroupSummaryExtractor failed: {e}")
raise
実行例1:正常系(カテゴリ別・平均+合計)
import pandas as pd
df = pd.DataFrame({
"category": ["A", "B", "A", "B", "A"],
"value1": [10, 20, 30, 40, 50],
"value2": [1, 2, 3, 4, 5]
})
extractor = GroupSummaryExtractor(
by="category",
agg_funcs={"value1": ["mean", "sum"], "value2": ["max"]}
)
summary = extractor(df)
print(summary)
実行結果1
category value1_mean value1_sum value2_max
0 A 30.000000 90 5
1 B 30.000000 60 4
実行ログ1
INFO GroupSummaryExtractor initialized (by=category, agg_funcs={'value1': ['mean', 'sum'], 'value2': ['max']})
DEBUG groupby keys: ['category'], agg columns: ['value1', 'value2']
DEBUG Aggregated DataFrame (possibly MultiIndex columns):
value1 value2
mean sum max
category
A 30 90 5
B 30 60 4
INFO Flattened MultiIndex columns: ['value1_mean', 'value1_sum', 'value2_max']
INFO GroupSummaryExtractor: aggregation and flattening complete.
実行例2:異常系(列未存在)
df = pd.DataFrame({"foo": [1, 2], "bar": [3, 4]})
extractor = GroupSummaryExtractor(by="category", agg_funcs={"bar": ["sum"]})
extractor(df)
実行結果2
ValueError: Missing groupby key columns: ['category']
実行ログ2
INFO GroupSummaryExtractor initialized (by=category, agg_funcs={'bar': ['sum']})
ERROR Missing groupby key columns: ['category']
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
groupby(by).agg(agg_funcs) |
by列でカテゴリ分割し、指定列・集約関数dictで一括統計 |
reset_index() |
groupbyでindex化された列をふつうの列に戻す |
MultiIndex → ['a_b'...]
|
複数階層のカラム名をフラット化("列_集約関数" のような1階層列名に変換) |
try-except + logger |
すべての異常系・実行状況を詳細にログ記録 |
Q.17
項目 | 内容 |
---|---|
概要 | DataFrame内の欠損値出現パターンをベクトル化・高速集計。連続欠損行・列・閾値超列を抽出し、詳細な集計レポートを辞書で返すクラス。 |
問題文 |
MissingPatternAnalyzer() を設計し、analyze(df) で1. 連続欠損行/列の番号 2. 欠損数・率の集計 3. 欠損率80%以上の列 を全てdictで返却。異常時は例外+loguru記録。 |
要件 | 欠損集計/連続欠損検出(行・列)/高欠損列抽出(80%以上)/欠損率計算/マスクベクトル化/詳細log記録 |
発展仕様 |
|
使用構文 |
pd.isna , np.diff , np.where , sum , shift , loguru.logger , dict , pd.DataFrame , np.any , np.all
|
A.17
■ 模範解答
import pandas as pd
import numpy as np
from loguru import logger
class MissingPatternAnalyzer:
def __init__(self):
logger.info("MissingPatternAnalyzer initialized.")
def analyze(self, df: pd.DataFrame) -> dict:
try:
# DataFrame型検証
if not isinstance(df, pd.DataFrame):
logger.error("Input must be a pandas DataFrame.")
raise TypeError("Input must be a pandas DataFrame.")
n_rows, n_cols = df.shape
logger.info(f"Analyzing DataFrame of shape {df.shape}")
# 1. 欠損マスク作成(True=欠損, False=非欠損)
mask = df.isna().values # ndarray型に変換
logger.debug(f"Missing mask:\n{mask.astype(int)}")
# 2. 欠損数・率(全体、行、列ごと)
n_missing = mask.sum()
missing_per_row = mask.sum(axis=1)
missing_per_col = mask.sum(axis=0)
missing_rate = n_missing / (n_rows * n_cols)
missing_rate_per_row = missing_per_row / n_cols
missing_rate_per_col = missing_per_col / n_rows
# 3. 連続欠損行・列の検出
def find_consecutive_missing(mat, axis):
# axis=1: 行方向(各行がすべてTrueならTrue)
all_missing = np.all(mat, axis=axis)
# np.diffで立ち上がり/立ち下がりを検出
starts = np.where(np.diff(np.concatenate(([0], all_missing.astype(int)))) == 1)[0]
ends = np.where(np.diff(np.concatenate((all_missing.astype(int), [0]))) == -1)[0]
blocks = [list(range(s, e)) for s, e in zip(starts, ends)]
return blocks
consecutive_missing_rows = find_consecutive_missing(mask, axis=1)
consecutive_missing_cols = find_consecutive_missing(mask, axis=0)
# 欠損率80%以上の列
high_missing_cols = [df.columns[i] for i, rate in enumerate(missing_rate_per_col) if rate >= 0.8]
logger.info("Missing pattern analysis completed.")
# レポートを辞書形式でまとめる
report = {
"n_missing": int(n_missing),
"missing_rate": float(missing_rate),
"missing_per_row": missing_per_row.tolist(),
"missing_per_col": missing_per_col.tolist(),
"missing_rate_per_row": missing_rate_per_row.tolist(),
"missing_rate_per_col": missing_rate_per_col.tolist(),
"consecutive_missing_rows": consecutive_missing_rows, # 連続欠損区間のリスト
"consecutive_missing_cols": consecutive_missing_cols,
"high_missing_cols": high_missing_cols
}
return report
except Exception as e:
logger.exception(f"MissingPatternAnalyzer failed: {e}")
raise
実行例1:一部欠損と連続欠損があるケース
import pandas as pd
df = pd.DataFrame({
"A": [1, np.nan, np.nan, 4, 5],
"B": [np.nan, np.nan, np.nan, np.nan, 10],
"C": [3, 4, 5, 6, 7]
})
analyzer = MissingPatternAnalyzer()
report = analyzer.analyze(df)
for k, v in report.items():
print(f"{k}: {v}")
実行結果1
n_missing: 6
missing_rate: 0.4
missing_per_row: [1, 2, 2, 1, 0]
missing_per_col: [2, 4, 0]
missing_rate_per_row: [0.3333333333333333, 0.6666666666666666, 0.6666666666666666, 0.3333333333333333, 0.0]
missing_rate_per_col: [0.4, 0.8, 0.0]
consecutive_missing_rows: []
consecutive_missing_cols: [[1, 2, 3, 4]]
high_missing_cols: ['B']
実行ログ1
INFO MissingPatternAnalyzer initialized.
INFO Analyzing DataFrame of shape (5, 3)
DEBUG Missing mask:
[[0 1 0]
[1 1 0]
[1 1 0]
[0 1 0]
[0 0 0]]
INFO Missing pattern analysis completed.
実行例2:全行全欠損のケース
df = pd.DataFrame({"A": [np.nan, np.nan], "B": [np.nan, np.nan]})
analyzer = MissingPatternAnalyzer()
report = analyzer.analyze(df)
print(report)
実行結果2
{'n_missing': 4,
'missing_rate': 1.0,
'missing_per_row': [2, 2],
'missing_per_col': [2, 2],
'missing_rate_per_row': [1.0, 1.0],
'missing_rate_per_col': [1.0, 1.0],
'consecutive_missing_rows': [[0, 1]],
'consecutive_missing_cols': [[0, 1]],
'high_missing_cols': ['A', 'B']}
実行ログ2
INFO MissingPatternAnalyzer initialized.
INFO Analyzing DataFrame of shape (2, 2)
DEBUG Missing mask:
[[1 1]
[1 1]]
INFO Missing pattern analysis completed.
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
pd.isna()/df.isna() |
DataFrameで欠損(NaN)位置をTrue/Falseで取得 |
mask.sum(axis=1/0) |
行・列ごとの欠損数(ベクトル化で高速) |
np.all(mask, axis=axis) |
行や列すべてが欠損かどうかを一括判定 |
np.diff , np.where
|
立ち上がり・立ち下がり検出で連続区間を抽出 |
high_missing_cols |
欠損率80%以上の列ラベルを自動抽出 |
logger.info/debug/exception |
loguruで全実行・異常を記録 |
Q.18
項目 | 内容 |
---|---|
概要 | DataFrame 各列を dtype・ユニーク性等の特徴から 'numerical' , 'categorical' , 'datetime' , 'other' の4分類へ自動分類するクラス。未分類列はログ警告。 |
問題文 |
ColumnTypeClassifier() クラスを定義し、classify(df) で列ごとに型推定+カテゴリ分類し、ラベル辞書(列名→分類)を返す。型判定はpandas.api.types利用。未分類列があれば警告をログ出力すること。 |
要件 | dtype判定/ユニーク比活用/カテゴリ定義/分類辞書返却/log記録 |
発展仕様 |
|
使用構文 |
pd.api.types.is_numeric_dtype , is_datetime64_any_dtype , is_categorical_dtype , pd.to_datetime , nunique , dict , loguru.logger , try-except
|
A.18
■ 模範解答
import pandas as pd
from loguru import logger
class ColumnTypeClassifier:
def __init__(self):
logger.info("ColumnTypeClassifier initialized.")
def classify(self, df: pd.DataFrame) -> dict:
try:
if not isinstance(df, pd.DataFrame):
logger.error("Input must be a pandas DataFrame.")
raise TypeError("Input must be a pandas DataFrame.")
n_rows = len(df)
results = {}
for col in df.columns:
s = df[col]
col_type = "other" # デフォルト
# 1. datetime判定(明示的型 or 変換可)
if pd.api.types.is_datetime64_any_dtype(s):
col_type = "datetime"
else:
try:
parsed = pd.to_datetime(s, errors="raise")
if not parsed.isnull().all():
col_type = "datetime"
except Exception:
pass
# 2. numerical判定(float, intなど)
if col_type == "other" and pd.api.types.is_numeric_dtype(s):
col_type = "numerical"
# 3. categorical判定(カテゴリ型 or ユニーク率閾値)
if col_type == "other":
nunique = s.nunique(dropna=True)
uniq_ratio = nunique / n_rows if n_rows > 0 else 0
# カテゴリ型 or "ユニーク比0.05未満か50未満ならカテゴリ"のヒューリスティック
if pd.api.types.is_categorical_dtype(s) or uniq_ratio < 0.05 or nunique < 50:
col_type = "categorical"
results[col] = col_type
# 未分類(other)列の警告
others = [col for col, typ in results.items() if typ == "other"]
if others:
logger.warning(f"Some columns could not be classified: {others}")
logger.info(f"Classification result: {results}")
return results
except Exception as e:
logger.exception(f"ColumnTypeClassifier failed: {e}")
raise
実行例1:基本的な型分類
df = pd.DataFrame({
"num": [1.0, 2.0, 3.0],
"cat": ["a", "b", "a"],
"date": pd.to_datetime(["2022-01-01", "2022-01-02", "2022-01-03"]),
"obj": ["x", "y", "z"]
})
clf = ColumnTypeClassifier()
result = clf.classify(df)
print(result)
実行結果1
{'num': 'numerical', 'cat': 'categorical', 'date': 'datetime', 'obj': 'categorical'}
実行ログ1
INFO ColumnTypeClassifier initialized.
INFO Classification result: {'num': 'numerical', 'cat': 'categorical', 'date': 'datetime', 'obj': 'categorical'}
実行例2:未分類列(other)
df = pd.DataFrame({
"misc": [lambda x: x, lambda x: x+1, lambda x: x-1],
"num": [1, 2, 3]
})
clf = ColumnTypeClassifier()
result = clf.classify(df)
print(result)
実行結果2
{'misc': 'other', 'num': 'numerical'}
実行ログ2
INFO ColumnTypeClassifier initialized.
WARNING Some columns could not be classified: ['misc']
INFO Classification result: {'misc': 'other', 'num': 'numerical'}
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
is_numeric_dtype , is_datetime64_any_dtype
|
pandas標準の型自動判定メソッド |
pd.to_datetime(s) |
object列でも日付変換できるか判定 |
nunique()/len(df) |
ユニーク比によるカテゴリ判定ヒューリスティック |
'other' |
どの型にも分類できない列のデフォルトカテゴリ |
logger.info , logger.warning
|
loguruで詳細な分類結果や未分類警告を記録 |
Q.19
項目 | 内容 |
---|---|
概要 | 任意の2つのDataFrameを指定キーで効率的に結合し、競合列名の解決(suffix付与)、内部結合・外部結合切替、キーや方式の異常時例外+log記録にも対応するクラス設計 |
問題文 |
CustomJoiner(on: str, how: str = "inner", suffixes: tuple[str, str] = ("_x", "_y")) を定義し、join(df1, df2) で指定条件に基づき結合し、異常時は例外+loguru記録せよ |
要件 | 結合方式指定/onキー存在チェック/列重複検知・suffix自動付与/try-except例外/全処理log記録/pandas高速ベクトル化結合 |
発展仕様 |
|
使用構文 |
pd.merge , suffixes , on , ValueError , TypeError , loguru.logger , try-except , issubclass , tuple , list
|
A.19
■ 模範解答
import pandas as pd
from loguru import logger
class CustomJoiner:
def __init__(self, on: str, how: str = "inner", suffixes: tuple = ("_x", "_y")):
# 入力検証: on, how, suffixes
if not isinstance(on, str):
logger.error("Join key 'on' must be a string.")
raise ValueError("Join key 'on' must be a string.")
if how not in {"inner", "outer", "left", "right"}:
logger.error(f"Join method '{how}' not supported.")
raise ValueError("Join method must be one of: 'inner', 'outer', 'left', 'right'.")
if not (isinstance(suffixes, tuple) and len(suffixes) == 2):
logger.error("Suffixes must be a tuple of two strings.")
raise ValueError("Suffixes must be a tuple of two strings.")
self.on = on
self.how = how
self.suffixes = suffixes
logger.info(f"CustomJoiner initialized (on={on}, how={how}, suffixes={suffixes})")
def join(self, df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
try:
# DataFrame型確認
if not isinstance(df1, pd.DataFrame) or not isinstance(df2, pd.DataFrame):
logger.error("Both inputs must be pandas DataFrame.")
raise TypeError("Both inputs must be pandas DataFrame.")
# onキー存在確認
if self.on not in df1.columns or self.on not in df2.columns:
logger.error(f"Key '{self.on}' not found in both DataFrames.")
raise ValueError(f"Key '{self.on}' must exist in both DataFrames.")
# 重複列名の検出(on以外で共通名がある場合警告)
cols1 = set(df1.columns)
cols2 = set(df2.columns)
overlap = (cols1 & cols2) - {self.on}
if overlap:
logger.warning(f"Overlapping columns detected: {overlap}. Suffixes {self.suffixes} will be applied.")
logger.info(f"Joining on '{self.on}' with method '{self.how}'.")
# merge処理
result = pd.merge(
df1, df2,
on=self.on,
how=self.how,
suffixes=self.suffixes
)
logger.info(f"Join completed. Result shape: {result.shape}")
return result
except Exception as e:
logger.exception(f"CustomJoiner join failed: {e}")
raise
実行例1:正常系(共通列の競合あり・suffix付与)
df1 = pd.DataFrame({"id": [1, 2], "value": [10, 20]})
df2 = pd.DataFrame({"id": [1, 2], "value": [100, 200], "other": ["a", "b"]})
joiner = CustomJoiner(on="id", how="inner", suffixes=("_a", "_b"))
result = joiner.join(df1, df2)
print(result)
実行結果1
id value_a value_b other
0 1 10 100 a
1 2 20 200 b
実行ログ1
INFO CustomJoiner initialized (on=id, how=inner, suffixes=('_a', '_b'))
WARNING Overlapping columns detected: {'value'}. Suffixes ('_a', '_b') will be applied.
INFO Joining on 'id' with method 'inner'.
INFO Join completed. Result shape: (2, 4)
実行例2:異常系(onキーが一方に存在しない)
df1 = pd.DataFrame({"id": [1, 2], "v": [10, 20]})
df2 = pd.DataFrame({"x": [1, 2], "v": [100, 200]})
joiner = CustomJoiner(on="id")
joiner.join(df1, df2)
実行結果2
ValueError: Key 'id' must exist in both DataFrames.
実行ログ2
INFO CustomJoiner initialized (on=id, how=inner, suffixes=('_x', '_y'))
ERROR Key 'id' not found in both DataFrames.
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
pd.merge(..., on, how, suffixes) |
pandasの最速・最強の結合関数。key,結合方式,競合列の接尾辞指定が可能 |
set(df1.columns) & set(df2.columns) |
両データフレームの重複列検出 |
logger.warning/info/error/exception |
loguruで警告・進行・異常・例外の詳細記録 |
try-except |
失敗時の異常検出・再現性向上 |
ValueError , TypeError
|
異常な入力やパラメータを即例外化 |
Q.20
項目 | 内容 |
---|---|
概要 |
MultiIndex を持つDataFrameの行または列に対して、階層情報を1階層のstrインデックスに変換して通常のDataFrameとして扱えるよう変換するクラス設計。セパレータや階層深さも柔軟に制御・log記録にも対応。 |
問題文 |
MultiIndexFlattener(separator="_") を定義し、flatten(df) により、列または行のMultiIndexを階層名をセパレータ結合したstr型index/columnsに変換した新DataFrameを返す。元dfは非破壊。全処理・例外をloguruで記録せよ。 |
要件 | 階層確認/MultiIndex→str化/列・行対応/セパレータ指定/ベクトル化高速変換/詳細log記録/例外処理 |
発展仕様 |
|
使用構文 |
df.columns.map , df.index.map , isinstance , pd.MultiIndex , str.join , loguru.logger , try-except
|
A.20
■ 模範解答
import pandas as pd
from loguru import logger
class MultiIndexFlattener:
def __init__(self, separator: str = "_"):
# 階層連結用のセパレータ
self.separator = separator
logger.info(f"MultiIndexFlattener initialized (separator='{separator}')")
def flatten(self, df: pd.DataFrame) -> pd.DataFrame:
try:
# DataFrame型か検証
if not isinstance(df, pd.DataFrame):
logger.error("Input must be a pandas DataFrame.")
raise TypeError("Input must be a pandas DataFrame.")
changed = False # 変換フラグ
# columnsがMultiIndexの場合: tuple→str変換
if isinstance(df.columns, pd.MultiIndex):
logger.info("Flattening columns MultiIndex.")
# 各カラムの階層タプルをセパレータ結合しstrへ
new_columns = df.columns.map(lambda tup: self.separator.join(map(str, tup)))
changed = True
else:
new_columns = df.columns
# indexがMultiIndexの場合: tuple→str変換
if isinstance(df.index, pd.MultiIndex):
logger.info("Flattening index MultiIndex.")
new_index = df.index.map(lambda tup: self.separator.join(map(str, tup)))
changed = True
else:
new_index = df.index
# 新DataFrameを返却(元は非破壊)
out_df = df.copy()
out_df.columns = new_columns
out_df.index = new_index
if not changed:
logger.info("No MultiIndex found. DataFrame unchanged.")
logger.info("MultiIndex flattening completed.")
return out_df
except Exception as e:
logger.exception(f"MultiIndexFlattener failed: {e}")
raise
実行例1:列・行の両方がMultiIndex
import pandas as pd
cols = pd.MultiIndex.from_tuples([('A', 'x'), ('A', 'y'), ('B', 'z')])
idx = pd.MultiIndex.from_tuples([('foo', 1), ('foo', 2), ('bar', 1)])
df = pd.DataFrame([[1,2,3],[4,5,6],[7,8,9]], columns=cols, index=idx)
flattener = MultiIndexFlattener(separator=":")
result = flattener.flatten(df)
print(result)
実行結果1
A:x A:y B:z
foo:1 1 2 3
foo:2 4 5 6
bar:1 7 8 9
実行ログ1
INFO MultiIndexFlattener initialized (separator=':')
INFO Flattening columns MultiIndex.
INFO Flattening index MultiIndex.
INFO MultiIndex flattening completed.
実行例2:通常の単層DataFrame
df = pd.DataFrame({"A": [1,2], "B": [3,4]}, index=["x", "y"])
flattener = MultiIndexFlattener()
result = flattener.flatten(df)
print(result)
実行結果2
A B
x 1 3
y 2 4
実行ログ2
INFO MultiIndexFlattener initialized (separator='_')
INFO No MultiIndex found. DataFrame unchanged.
INFO MultiIndex flattening completed.
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
isinstance(df.columns, pd.MultiIndex) |
複数階層インデックスかどうか確認 |
df.columns.map(lambda tup: "_".join(map(str, tup))) |
タプル(階層)→文字列連結して1階層へ変換 |
out_df = df.copy() |
非破壊設計(元dfの状態は一切変更しない) |
logger.info , logger.error , logger.exception
|
loguruで正常系・異常系をすべて詳細記録 |
Q.21
項目 | 内容 |
---|---|
概要 | 2つのDataFrameのインデックスまたはカラムを自動でアライン(再インデックス&並び替え)。pandas演算前に完全整合をとるクラス。整合性ログ記録・重複ラベル検証・方式切替・例外制御もすべて網羅する。 |
問題文 |
DataFrameAligner(method='index' | 'columns') クラスを設計し、align(df1, df2) でラベルを自動でreindex・並び替えた2つの新DataFrameを返せ。方式検証・重複エラー・処理ログも出力せよ。 |
要件 |
reindex 利用/index・columnsの方式切替/非破壊/方式バリデーション/重複ラベル例外/処理内容のlog出力 |
発展仕様 |
|
使用構文 |
reindex , sort_index , sort_values , loguru.logger , ValueError , try-except , pd.DataFrame.duplicated
|
A.21
■ 模範解答
import pandas as pd
from loguru import logger
class DataFrameAligner:
def __init__(self, method: str = "index"):
# アライメント方式のバリデーション
if method not in ("index", "columns"):
logger.error("method must be 'index' or 'columns'.")
raise ValueError("method must be 'index' or 'columns'.")
self.method = method
logger.info(f"DataFrameAligner initialized (method='{method}')")
def align(self, df1: pd.DataFrame, df2: pd.DataFrame):
try:
# 型チェック
if not isinstance(df1, pd.DataFrame) or not isinstance(df2, pd.DataFrame):
logger.error("Both inputs must be pandas DataFrame.")
raise TypeError("Both inputs must be pandas DataFrame.")
# ラベル取得
if self.method == "index":
labels1 = df1.index
labels2 = df2.index
axis = 0
else:
labels1 = df1.columns
labels2 = df2.columns
axis = 1
# 重複ラベルチェック
if pd.Index(labels1).duplicated().any() or pd.Index(labels2).duplicated().any():
logger.error("Duplicated labels detected in input DataFrames.")
raise ValueError("Duplicated labels found in DataFrame.")
# 共通ラベル集合を取得してソート(順序性確保)
all_labels = sorted(set(labels1) | set(labels2))
logger.debug(f"All labels after union: {all_labels}")
# reindexで合わせて並び替え
aligned1 = df1.reindex(all_labels, axis=axis)
aligned2 = df2.reindex(all_labels, axis=axis)
logger.info(f"Alignment completed on axis={self.method}.")
logger.debug(f"Aligned df1 {self.method}s: {aligned1.index if axis == 0 else aligned1.columns}")
logger.debug(f"Aligned df2 {self.method}s: {aligned2.index if axis == 0 else aligned2.columns}")
return aligned1, aligned2
except Exception as e:
logger.exception(f"DataFrameAligner.align failed: {e}")
raise
実行例1:行indexアライメント
df1 = pd.DataFrame({"A": [1, 2]}, index=["x", "y"])
df2 = pd.DataFrame({"B": [10, 20]}, index=["y", "z"])
aligner = DataFrameAligner(method="index")
a1, a2 = aligner.align(df1, df2)
print(a1)
print(a2)
実行結果1
A
x 1.0
y 2.0
z NaN
B
x NaN
y 10.0
z 20.0
実行ログ1
INFO DataFrameAligner initialized (method='index')
DEBUG All labels after union: ['x', 'y', 'z']
INFO Alignment completed on axis=index.
DEBUG Aligned df1 indexs: Index(['x', 'y', 'z'], dtype='object')
DEBUG Aligned df2 indexs: Index(['x', 'y', 'z'], dtype='object')
実行例2:列labelアライメント+重複エラー
df1 = pd.DataFrame([[1, 2]], columns=["A", "A"])
df2 = pd.DataFrame([[3, 4]], columns=["A", "B"])
aligner = DataFrameAligner(method="columns")
aligner.align(df1, df2)
実行結果2
ValueError: Duplicated labels found in DataFrame.
実行ログ2
INFO DataFrameAligner initialized (method='columns')
ERROR Duplicated labels detected in input DataFrames.
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
reindex(all_labels, axis=...) |
ラベルを揃えて欠損はNaNで補い、順序も強制的に揃える |
sorted(set(...)) |
ラベル集合を結合し昇順に。順序性を担保 |
duplicated().any() |
indexやcolumnsに重複ラベルがないかを検出 |
logger.info/debug/error |
loguruで進行・異常・整合情報を記録 |
try-except |
すべての例外に詳細ログ+re-raiseで安全運用 |
Q.22
項目 | 内容 |
---|---|
概要 | 条件関数に合致するセルを定数や他列値で動的に置換する超汎用セル操作クラス。np.where 構文のDataFrame版。列指定や関数・値の柔軟性、エラー制御・詳細ログ記録も必須。 |
問題文 |
ConditionalReplacer(condition_fn, replacement_value, target_columns=None) を設計し、apply(df) で、condition_fn(row) がTrueなセルをreplacement_value (定数 or 関数 or 別列名指定)で置換した新DataFrameを返せ。target_columns で列指定も可能。異常時は例外+loguru記録必須。 |
要件 | 行ごと条件判定/np.whereベクトル構造/定数・関数・列名での置換柔軟化/callable検証/列指定可/全処理・異常log/非破壊 |
発展仕様 |
|
使用構文 |
apply , np.where , df.loc , callable , loguru.logger , TypeError , ValueError , copy
|
A.22
■ 模範解答
import pandas as pd
import numpy as np
from loguru import logger
from typing import Callable, Any, Optional, List, Union
class ConditionalReplacer:
def __init__(self, condition_fn: Callable, replacement_value: Any, target_columns: Optional[Union[str, List[str]]] = None):
# 条件関数がcallableか検証
if not callable(condition_fn):
logger.error("condition_fn must be callable.")
raise TypeError("condition_fn must be callable.")
self.condition_fn = condition_fn
self.replacement_value = replacement_value
# 列指定の正当性検証
if target_columns is not None and not (isinstance(target_columns, (str, list))):
logger.error("target_columns must be None, a str, or a list of str.")
raise TypeError("target_columns must be None, a str, or a list of str.")
self.target_columns = [target_columns] if isinstance(target_columns, str) else target_columns
logger.info(f"ConditionalReplacer initialized (target_columns={self.target_columns})")
def apply(self, df: pd.DataFrame) -> pd.DataFrame:
try:
# DataFrame型検証
if not isinstance(df, pd.DataFrame):
logger.error("Input must be a pandas DataFrame.")
raise TypeError("Input must be a pandas DataFrame.")
# 列存在検証
targets = self.target_columns if self.target_columns is not None else list(df.columns)
for col in targets:
if col not in df.columns:
logger.error(f"Target column '{col}' does not exist in DataFrame.")
raise ValueError(f"Target column '{col}' does not exist in DataFrame.")
logger.info(f"Columns to apply replacement: {targets}")
out = df.copy()
# 行方向に条件判定 (ベクトル化高速化)
mask = df.apply(self.condition_fn, axis=1) # 各行でbool取得
logger.debug(f"Condition mask: {mask.values}")
for col in targets:
# 置換値の決定方式
if callable(self.replacement_value):
# replacement_valueが関数の場合:行→値
new_values = df.apply(self.replacement_value, axis=1)
elif isinstance(self.replacement_value, str) and self.replacement_value in df.columns:
# replacement_valueが列名の場合
new_values = df[self.replacement_value]
else:
# replacement_valueが定数
new_values = self.replacement_value
# np.whereで一括置換 (maskがTrueならnew_values、Falseは現状値)
out[col] = np.where(mask, new_values, df[col])
logger.debug(f"Column '{col}' replaced where condition is True.")
logger.info("Conditional replacement completed.")
return out
except Exception as e:
logger.exception(f"ConditionalReplacer.apply failed: {e}")
raise
実行例1:列A>1のセルをB列の値で置換(全列対象)
df = pd.DataFrame({"A": [1, 2, 3], "B": [100, 200, 300]})
# Aが2より大きい or 2の行のセルをB列値で置換
replacer = ConditionalReplacer(
condition_fn=lambda row: row["A"] >= 2,
replacement_value="B"
)
result = replacer.apply(df)
print(result)
実行結果1
A B
0 1 100
1 200 200
2 300 300
実行ログ1
INFO ConditionalReplacer initialized (target_columns=None)
INFO Columns to apply replacement: ['A', 'B']
DEBUG Condition mask: [False True True]
DEBUG Column 'A' replaced where condition is True.
DEBUG Column 'B' replaced where condition is True.
INFO Conditional replacement completed.
実行例2:特定列のみ、関数による置換(2倍する)
df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
# A列が偶数の行のB列のみ、B列値を2倍で置換
replacer = ConditionalReplacer(
condition_fn=lambda row: row["A"] % 2 == 0,
replacement_value=lambda row: row["B"] * 2,
target_columns="B"
)
result = replacer.apply(df)
print(result)
実行結果2
A B
0 1 4
1 2 10
2 3 6
実行ログ2
INFO ConditionalReplacer initialized (target_columns=['B'])
INFO Columns to apply replacement: ['B']
DEBUG Condition mask: [False True False]
DEBUG Column 'B' replaced where condition is True.
INFO Conditional replacement completed.
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
df.apply(condition_fn, axis=1) |
各行でbool判定(マスクをベクトル的に作る) |
np.where(mask, new, old) |
マスクTrueの場所だけ置換値、それ以外は元値 |
replacement_value の多様性 |
定数/列名/関数どれも柔軟に許容 |
target_columns |
対象列をstrまたはlist指定で柔軟化 |
logger.info/debug/error/exception |
loguruで正常系・異常系・詳細実行をすべて記録 |
Q.23
項目 | 内容 |
---|---|
概要 | カテゴリ列を「ラベルエンコード」または「one-hotエンコード」で変換し、fit→transform→inverse_transformを連続利用可能なクラス設計。方式切替・ラベル管理・逆変換・未fit時例外・カテゴリ数制限・全処理ログも必須。 |
問題文 |
CategoricalEncoder(method="label" | "onehot") を設計し、fit(df) →transform(df) →inverse_transform(df) を順番に利用できるようにせよ。label方式は LabelEncoder 相当、onehot方式はget_dummies 相当。全エラーと詳細logも記録せよ。 |
要件 | エンコード方式切替/fit→transform必須順守/ラベル辞書・カテゴリ数管理/逆変換実装/未fit時エラー/log記録/DataFrame高速ベクトル変換 |
発展仕様 |
|
使用構文 |
pd.get_dummies , astype('category') , map , dict , ValueError , RuntimeError , loguru.logger
|
A.23
■ 模範解答
import pandas as pd
from loguru import logger
class CategoricalEncoder:
def __init__(self, method: str = "label", max_categories: int = 100):
# methodのバリデーション
if method not in {"label", "onehot"}:
logger.error("method must be 'label' or 'onehot'.")
raise ValueError("method must be 'label' or 'onehot'.")
self.method = method
self.max_categories = max_categories
self.fitted = False
self.label_maps = {} # 列ごとのカテゴリ→整数マップ
self.inverse_maps = {} # 列ごとの整数→カテゴリマップ
self.columns_ = [] # fitしたカラム情報
logger.info(f"CategoricalEncoder initialized (method={method})")
def fit(self, df: pd.DataFrame):
# fit時はカテゴリ列のみ選別
cat_cols = [col for col in df.columns
if pd.api.types.is_object_dtype(df[col]) or pd.api.types.is_categorical_dtype(df[col])]
logger.info(f"Fitting columns: {cat_cols}")
self.columns_ = cat_cols
self.label_maps.clear()
self.inverse_maps.clear()
for col in cat_cols:
cats = pd.Series(df[col].astype("category").cat.categories)
if len(cats) > self.max_categories:
logger.error(f"Column '{col}' exceeds maximum {self.max_categories} categories.")
raise ValueError(f"Too many categories in column '{col}'.")
label_map = {cat: i for i, cat in enumerate(cats)}
inv_map = {i: cat for i, cat in enumerate(cats)}
self.label_maps[col] = label_map
self.inverse_maps[col] = inv_map
self.fitted = True
logger.info("Fit complete.")
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
if not self.fitted:
logger.error("transform called before fit.")
raise RuntimeError("Must fit before transform.")
logger.info("Transforming DataFrame.")
out = df.copy()
if self.method == "label":
for col in self.columns_:
if col in out:
out[col] = out[col].map(self.label_maps[col])
logger.debug(f"Column '{col}' label-encoded.")
elif self.method == "onehot":
# onehotのみ対象列のみ展開、その他はそのまま
ohe = pd.get_dummies(out[self.columns_], prefix=self.columns_, dtype=int)
out = out.drop(columns=self.columns_).reset_index(drop=True)
ohe = ohe.reset_index(drop=True)
out = pd.concat([out, ohe], axis=1)
logger.info("One-hot encoding completed.")
return out
def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
if not self.fitted:
logger.error("inverse_transform called before fit.")
raise RuntimeError("Must fit before inverse_transform.")
logger.info("Inverse transforming DataFrame.")
out = df.copy()
if self.method == "label":
for col in self.columns_:
if col in out:
out[col] = out[col].map(self.inverse_maps[col])
logger.debug(f"Column '{col}' label-decoded.")
elif self.method == "onehot":
for col in self.columns_:
# onehotカラム(prefix==col)を抽出
ohe_cols = [c for c in out.columns if c.startswith(col + "_")]
if not ohe_cols:
logger.warning(f"No one-hot columns for '{col}' found.")
continue
idx = out[ohe_cols].values.argmax(axis=1)
inv_map = self.inverse_maps[col]
out[col] = [inv_map[i] for i in idx]
out = out.drop(columns=ohe_cols)
logger.debug(f"Column '{col}' one-hot decoded.")
return out
実行例1:ラベルエンコーディング・逆変換
df = pd.DataFrame({"cat": ["a", "b", "a", "c"], "num": [1, 2, 3, 4]})
enc = CategoricalEncoder(method="label")
enc.fit(df)
df_enc = enc.transform(df)
print(df_enc)
df_dec = enc.inverse_transform(df_enc)
print(df_dec)
実行結果1
cat num
0 0 1
1 1 2
2 0 3
3 2 4
cat num
0 a 1
1 b 2
2 a 3
3 c 4
実行ログ1
INFO CategoricalEncoder initialized (method=label)
INFO Fitting columns: ['cat']
INFO Fit complete.
INFO Transforming DataFrame.
DEBUG Column 'cat' label-encoded.
INFO Inverse transforming DataFrame.
DEBUG Column 'cat' label-decoded.
実行例2:one-hotエンコーディング・逆変換
df = pd.DataFrame({"color": ["red", "green", "red"], "size": ["S", "M", "L"]})
enc = CategoricalEncoder(method="onehot")
enc.fit(df)
df_enc = enc.transform(df)
print(df_enc)
df_dec = enc.inverse_transform(df_enc)
print(df_dec)
実行結果2
size_L size_M size_S color_green color_red
0 0 0 1 0 1
1 0 1 0 1 0
2 1 0 0 0 1
size color
0 S red
1 M green
2 L red
実行ログ2
INFO CategoricalEncoder initialized (method=onehot)
INFO Fitting columns: ['color', 'size']
INFO Fit complete.
INFO Transforming DataFrame.
INFO One-hot encoding completed.
INFO Inverse transforming DataFrame.
DEBUG Column 'color' one-hot decoded.
DEBUG Column 'size' one-hot decoded.
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
astype('category').cat.categories |
カテゴリ値を自動取得(順序保証) |
map(self.label_maps[col]) |
カテゴリ→ラベルへの高速一括変換 |
pd.get_dummies |
one-hotエンコーディング(カテゴリごとに新列) |
inverse_maps |
ラベル→カテゴリへの逆写像管理 |
RuntimeError |
fit前にtransform/inverse_transformを呼ぶと必ず例外 |
loguru.logger |
進行・詳細実行・例外・警告をすべて記録 |
Q.24
項目 | 内容 |
---|---|
概要 | Zスコア・IQR法など複数指標での外れ値判定を行い、結果を新たなバイナリ列(is_outlier)として追加するクラス設計。パラメータ切替・列名競合回避・例外・詳細logも必須。 |
問題文 |
OutlierTagger(method='zscore' | 'iqr', threshold=3.0) を設計し、tag(df, column) により指定列で外れ値を検出し、'is_outlier'列を追加して返せ。手法切替・例外・ログ必須。 |
要件 | 指標切替/Zスコア・IQR法実装/バイナリ列追加/列名競合回避/ベクトル演算/全処理・例外log |
発展仕様 |
|
使用構文 |
np.mean , np.std , np.percentile , np.abs , df['new_col'] , loguru.logger , ValueError , copy
|
A.24
■ 模範解答
import pandas as pd
import numpy as np
from loguru import logger
class OutlierTagger:
def __init__(self, method: str = "zscore", threshold: float = 3.0):
# methodのバリデーション
if method not in ("zscore", "iqr"):
logger.error("method must be 'zscore' or 'iqr'.")
raise ValueError("method must be 'zscore' or 'iqr'.")
self.method = method
self.threshold = threshold
logger.info(f"OutlierTagger initialized (method={method}, threshold={threshold})")
def tag(self, df: pd.DataFrame, column: str) -> pd.DataFrame:
try:
# DataFrame型検証
if not isinstance(df, pd.DataFrame):
logger.error("Input must be a pandas DataFrame.")
raise TypeError("Input must be a pandas DataFrame.")
# カラム存在検証
if column not in df.columns:
logger.error(f"Column '{column}' does not exist in DataFrame.")
raise ValueError(f"Column '{column}' does not exist.")
# 結果バイナリ列名の競合回避
base_flag = "is_outlier"
flag_col = base_flag
suffix = 1
while flag_col in df.columns:
flag_col = f"{base_flag}_{suffix}"
suffix += 1
if flag_col != base_flag:
logger.warning(f"Flag column name '{base_flag}' conflicts. Using '{flag_col}'.")
# 対象列抽出(欠損除外せずそのまま演算)
x = df[column].values.astype(float)
out_flag = np.zeros_like(x, dtype=bool)
# Zスコア法
if self.method == "zscore":
mu = np.nanmean(x)
sigma = np.nanstd(x)
if sigma == 0 or np.isnan(sigma):
logger.error("Standard deviation is zero or NaN. Cannot compute z-score.")
raise ValueError("Cannot compute z-score: std is zero or NaN.")
z = (x - mu) / sigma
out_flag = np.abs(z) > self.threshold
logger.info(f"Z-score mean={mu}, std={sigma}")
# IQR法
elif self.method == "iqr":
q1 = np.nanpercentile(x, 25)
q3 = np.nanpercentile(x, 75)
iqr = q3 - q1
if iqr == 0 or np.isnan(iqr):
logger.error("IQR is zero or NaN. Cannot compute IQR outliers.")
raise ValueError("Cannot compute IQR: IQR is zero or NaN.")
lower = q1 - self.threshold * iqr
upper = q3 + self.threshold * iqr
out_flag = (x < lower) | (x > upper)
logger.info(f"IQR q1={q1}, q3={q3}, IQR={iqr}, lower={lower}, upper={upper}")
# 結果を新列に格納(非破壊)
out = df.copy()
out[flag_col] = out_flag.astype(int)
logger.info(f"OutlierTagger tagging completed. Added column: '{flag_col}'")
return out
except Exception as e:
logger.exception(f"OutlierTagger.tag failed: {e}")
raise
実行例1:Zスコア法による外れ値フラグ付与
df = pd.DataFrame({"score": [1, 2, 2, 3, 100]})
tagger = OutlierTagger(method="zscore", threshold=2.0)
result = tagger.tag(df, "score")
print(result)
実行結果1
score is_outlier
0 1 0
1 2 0
2 2 0
3 3 0
4 100 1
実行ログ1
INFO OutlierTagger initialized (method=zscore, threshold=2.0)
INFO Z-score mean=21.6, std=43.13258890051631
INFO OutlierTagger tagging completed. Added column: 'is_outlier'
実行例2:IQR法+バイナリ列競合回避
df = pd.DataFrame({"x": [1, 2, 3, 100], "is_outlier": [0, 0, 0, 0]})
tagger = OutlierTagger(method="iqr", threshold=1.5)
result = tagger.tag(df, "x")
print(result)
実行結果2
x is_outlier is_outlier_1
0 1 0 0
1 2 0 0
2 3 0 0
3 100 0 1
実行ログ2
INFO OutlierTagger initialized (method=iqr, threshold=1.5)
WARNING Flag column name 'is_outlier' conflicts. Using 'is_outlier_1'.
INFO IQR q1=1.75, q3=27.25, IQR=25.5, lower=-36.5, upper=65.5
INFO OutlierTagger tagging completed. Added column: 'is_outlier_1'
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
np.mean , np.std
|
平均・標準偏差(zスコア)をNaN無視で算出 |
np.percentile |
四分位点をNaN無視で取得(IQR法) |
np.abs(z) > t |
Zスコアで閾値を超えるか高速ベクトル判定 |
copy |
元dfは不変(非破壊設計) |
flag_col |
競合時は自動でis_outlier_1 , is_outlier_2 …と生成 |
logger.info/error |
loguruで全実行・異常・競合・パラメータ状況を詳細記録 |
Q.25
項目 | 内容 |
---|---|
概要 | 欠損率やユニーク数比率など、複数の基準に従って列を自動で削除する高機能ユーティリティ。削除列はプロパティで管理・log出力。条件記述ミス・異常値も厳密に例外制御。 |
問題文 |
ColumnReducer(criteria: dict[str, float]) を設計し、reduce(df) で NaN比率 > X や ユニーク数比率 < Y のような基準で列を削除せよ。削除列は self.dropped_columns に記録・log出力必須。 |
要件 | 欠損率・ユニーク比率等の削除条件/列単位で判定/削除列の記録・プロパティ化/log出力/条件形式バリデーション |
発展仕様 |
|
使用構文 |
df.isna().mean() , df.nunique() , __call__ , @property , loguru.logger , ValueError
|
A.24
■ 模範解答
import pandas as pd
from loguru import logger
class ColumnReducer:
# サポートされる条件キー
_allowed_criteria = {"nan_ratio", "unique_ratio"}
def __init__(self, criteria: dict[str, float]):
# 条件辞書が正しいか検証
if not isinstance(criteria, dict):
logger.error("criteria must be a dict[str, float].")
raise ValueError("criteria must be a dict[str, float].")
for key, value in criteria.items():
if key not in self._allowed_criteria:
logger.error(f"Unsupported criterion: {key}")
raise ValueError(f"Unsupported criterion: {key}")
if not isinstance(value, float) or not (0.0 <= value <= 1.0):
logger.error(f"Criterion '{key}' must be a float in [0, 1].")
raise ValueError(f"Criterion '{key}' must be a float in [0, 1].")
self.criteria = criteria
self._dropped_columns = []
logger.info(f"ColumnReducer initialized: {criteria}")
def reduce(self, df: pd.DataFrame) -> pd.DataFrame:
try:
# DataFrame型検証
if not isinstance(df, pd.DataFrame):
logger.error("Input must be a pandas DataFrame.")
raise TypeError("Input must be a pandas DataFrame.")
to_drop = set()
n_rows = len(df)
# 1. 欠損率条件
if "nan_ratio" in self.criteria:
nan_ratio = df.isna().mean()
for col, ratio in nan_ratio.items():
if ratio > self.criteria["nan_ratio"]:
logger.info(f"Column '{col}' will be dropped by nan_ratio={ratio:.2f} > {self.criteria['nan_ratio']}")
to_drop.add(col)
# 2. ユニーク比率条件
if "unique_ratio" in self.criteria:
# NaN除くユニーク数
uniq_ratio = df.nunique(dropna=True) / n_rows
for col, ratio in uniq_ratio.items():
if ratio < self.criteria["unique_ratio"]:
logger.info(f"Column '{col}' will be dropped by unique_ratio={ratio:.2f} < {self.criteria['unique_ratio']}")
to_drop.add(col)
# ドロップ列を記録
self._dropped_columns = sorted(to_drop)
if self._dropped_columns:
logger.info(f"Dropped columns: {self._dropped_columns}")
else:
logger.info("No columns dropped.")
# 新DataFrame返却(非破壊)
return df.drop(columns=self._dropped_columns)
except Exception as e:
logger.exception(f"ColumnReducer.reduce failed: {e}")
raise
@property
def dropped_columns(self):
# 外部公開用プロパティ
return self._dropped_columns.copy()
実行例1:欠損率による列削除
df = pd.DataFrame({
"A": [1, None, None],
"B": [1, 2, 3],
"C": [None, None, None]
})
reducer = ColumnReducer({"nan_ratio": 0.7})
result = reducer.reduce(df)
print(result)
print("Dropped:", reducer.dropped_columns)
実行結果1
A B
0 1.0 1
1 NaN 2
2 NaN 3
Dropped: ['C']
実行ログ1
INFO ColumnReducer initialized: {'nan_ratio': 0.7}
INFO Column 'C' will be dropped by nan_ratio=1.00 > 0.7
INFO Dropped columns: ['C']
実行例2:ユニーク数比率による列削除(削除なし)
df = pd.DataFrame({"A": [1, 2, 3], "B": [1, 1, 1]})
reducer = ColumnReducer({"unique_ratio": 0.5})
result = reducer.reduce(df)
print(result)
print("Dropped:", reducer.dropped_columns)
実行結果2
A B
0 1 1
1 2 1
2 3 1
Dropped: []
実行ログ2
INFO ColumnReducer initialized: {'unique_ratio': 0.5}
INFO No columns dropped.
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
isna().mean() |
列ごとに欠損(NaN)の割合を高速計算(ベクトル化) |
nunique(dropna=True) |
各列の(NaN除外)ユニーク値の個数 |
/ n_rows |
ユニーク比率に正規化 |
set() |
重複のない削除対象列集合 |
@property |
外部から削除列一覧を安全に参照できるようにするPython標準 |
logger.info/error |
loguruによる進行・異常・削除列など全詳細の記録 |