0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

データサイエンスのためのPython100本ノック vol.2 ~pandas編①~

Posted at

まえがき

データサイエンティストのためのPython100本ノックについての記事です。NumPyやPandasといったデータサイエンス系でよく用いられるライブラリを扱います。誤りなどがあれば、ご指摘ください。今回はpandasを中心に10問扱います。

Q.16

項目 内容
概要 groupby集約によるカテゴリ別統計量を柔軟に抽出するクラス設計。キー・集約列・関数指定・MultiIndexフラット化・詳細エラーとlog出力に対応。
問題文 `GroupSummaryExtractor(by: str list[str], agg_funcs: dict[str, list[str]]) を設計し、call(df: pd.DataFrame)` で
指定カラムによるgroupby集約+出力を単純インデックス化して返せ。カテゴリ列・集約列存在やagg_func形式などすべて検証し、異常時は例外+loguru記録せよ。
要件 groupby集約/集約関数指定/MultiIndexフラット化/非破壊処理/カテゴリ・列存在チェック/agg_func形式検証/詳細log・例外対応
発展仕様
  • 集約関数dictのキー・値型検証
  • カテゴリ/集約列未存在時はValueError
  • 出力DataFrameはreset_index済み(単純index)
  • 全操作・例外をloguruで記録
使用構文 groupby, agg, reset_index, try-except, ValueError, loguru.logger, isinstance, issubclass, dict

A.16

■ 模範解答

import pandas as pd
from loguru import logger
from typing import Union

class GroupSummaryExtractor:
    def __init__(self, by: Union[str, list], agg_funcs: dict):
        # 集約キーの検証
        if not (isinstance(by, (str, list)) and (isinstance(by, str) or all(isinstance(k, str) for k in by))):
            logger.error("by must be str or list[str].")
            raise ValueError("by must be str or list of str.")
        # agg_funcsが dict[str, list[str]] 形式か検証
        if not (isinstance(agg_funcs, dict) and all(isinstance(k, str) for k in agg_funcs.keys()) and
                all(isinstance(v, list) and all(isinstance(f, str) for f in v) for v in agg_funcs.values())):
            logger.error("agg_funcs must be dict[str, list[str]].")
            raise ValueError("agg_funcs must be dict[str, list[str]].")
        self.by = by
        self.agg_funcs = agg_funcs
        logger.info(f"GroupSummaryExtractor initialized (by={by}, agg_funcs={agg_funcs})")

    def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
        try:
            # DataFrame型確認
            if not isinstance(df, pd.DataFrame):
                logger.error("Input must be a pandas DataFrame.")
                raise TypeError("Input must be a pandas DataFrame.")
            # カテゴリ列・集約列の存在検証
            keys = [self.by] if isinstance(self.by, str) else self.by
            missing_cols = [col for col in keys if col not in df.columns]
            if missing_cols:
                logger.error(f"Missing groupby key columns: {missing_cols}")
                raise ValueError(f"Missing groupby key columns: {missing_cols}")

            missing_agg = [col for col in self.agg_funcs if col not in df.columns]
            if missing_agg:
                logger.error(f"Missing aggregation columns: {missing_agg}")
                raise ValueError(f"Missing aggregation columns: {missing_agg}")

            logger.debug(f"groupby keys: {keys}, agg columns: {list(self.agg_funcs.keys())}")

            # groupby集約処理
            grouped = df.groupby(self.by, dropna=False).agg(self.agg_funcs)
            logger.debug(f"Aggregated DataFrame (possibly MultiIndex columns):\n{grouped}")

            # MultiIndexカラムをフラット化
            if isinstance(grouped.columns, pd.MultiIndex):
                grouped.columns = ['_'.join([str(a) for a in col if a]) for col in grouped.columns.values]
                logger.info(f"Flattened MultiIndex columns: {list(grouped.columns)}")
            result = grouped.reset_index()  # インデックスを普通の列に戻す
            logger.info("GroupSummaryExtractor: aggregation and flattening complete.")
            return result

        except Exception as e:
            logger.exception(f"GroupSummaryExtractor failed: {e}")
            raise
実行例1:正常系(カテゴリ別・平均+合計)
import pandas as pd

df = pd.DataFrame({
    "category": ["A", "B", "A", "B", "A"],
    "value1": [10, 20, 30, 40, 50],
    "value2": [1, 2, 3, 4, 5]
})

extractor = GroupSummaryExtractor(
    by="category",
    agg_funcs={"value1": ["mean", "sum"], "value2": ["max"]}
)
summary = extractor(df)
print(summary)
実行結果1
  category  value1_mean  value1_sum  value2_max
0        A   30.000000         90           5
1        B   30.000000         60           4
実行ログ1
INFO     GroupSummaryExtractor initialized (by=category, agg_funcs={'value1': ['mean', 'sum'], 'value2': ['max']})
DEBUG    groupby keys: ['category'], agg columns: ['value1', 'value2']
DEBUG    Aggregated DataFrame (possibly MultiIndex columns):
         value1         value2
           mean sum max
category
A           30  90   5
B           30  60   4
INFO     Flattened MultiIndex columns: ['value1_mean', 'value1_sum', 'value2_max']
INFO     GroupSummaryExtractor: aggregation and flattening complete.
実行例2:異常系(列未存在)
df = pd.DataFrame({"foo": [1, 2], "bar": [3, 4]})
extractor = GroupSummaryExtractor(by="category", agg_funcs={"bar": ["sum"]})
extractor(df)
実行結果2
ValueError: Missing groupby key columns: ['category']
実行ログ2
INFO     GroupSummaryExtractor initialized (by=category, agg_funcs={'bar': ['sum']})
ERROR    Missing groupby key columns: ['category']

■ 文法・構文まとめ

機能・構文 解説
groupby(by).agg(agg_funcs) by列でカテゴリ分割し、指定列・集約関数dictで一括統計
reset_index() groupbyでindex化された列をふつうの列に戻す
MultiIndex['a_b'...] 複数階層のカラム名をフラット化("列_集約関数" のような1階層列名に変換)
try-except + logger すべての異常系・実行状況を詳細にログ記録

Q.17

項目 内容
概要 DataFrame内の欠損値出現パターンをベクトル化・高速集計。連続欠損行・列・閾値超列を抽出し、詳細な集計レポートを辞書で返すクラス。
問題文 MissingPatternAnalyzer() を設計し、analyze(df)
1. 連続欠損行/列の番号
2. 欠損数・率の集計
3. 欠損率80%以上の列
を全てdictで返却。異常時は例外+loguru記録。
要件 欠損集計/連続欠損検出(行・列)/高欠損列抽出(80%以上)/欠損率計算/マスクベクトル化/詳細log記録
発展仕様
  • 欠損マスク構築
  • np.diff, pd.isna, np.whereの応用
  • 詳細例外・log記録
使用構文 pd.isna, np.diff, np.where, sum, shift, loguru.logger, dict, pd.DataFrame, np.any, np.all

A.17

■ 模範解答

import pandas as pd
import numpy as np
from loguru import logger

class MissingPatternAnalyzer:
    def __init__(self):
        logger.info("MissingPatternAnalyzer initialized.")

    def analyze(self, df: pd.DataFrame) -> dict:
        try:
            # DataFrame型検証
            if not isinstance(df, pd.DataFrame):
                logger.error("Input must be a pandas DataFrame.")
                raise TypeError("Input must be a pandas DataFrame.")

            n_rows, n_cols = df.shape
            logger.info(f"Analyzing DataFrame of shape {df.shape}")

            # 1. 欠損マスク作成(True=欠損, False=非欠損)
            mask = df.isna().values  # ndarray型に変換
            logger.debug(f"Missing mask:\n{mask.astype(int)}")

            # 2. 欠損数・率(全体、行、列ごと)
            n_missing = mask.sum()
            missing_per_row = mask.sum(axis=1)
            missing_per_col = mask.sum(axis=0)
            missing_rate = n_missing / (n_rows * n_cols)
            missing_rate_per_row = missing_per_row / n_cols
            missing_rate_per_col = missing_per_col / n_rows

            # 3. 連続欠損行・列の検出
            def find_consecutive_missing(mat, axis):
                # axis=1: 行方向(各行がすべてTrueならTrue)
                all_missing = np.all(mat, axis=axis)
                # np.diffで立ち上がり/立ち下がりを検出
                starts = np.where(np.diff(np.concatenate(([0], all_missing.astype(int)))) == 1)[0]
                ends = np.where(np.diff(np.concatenate((all_missing.astype(int), [0]))) == -1)[0]
                blocks = [list(range(s, e)) for s, e in zip(starts, ends)]
                return blocks

            consecutive_missing_rows = find_consecutive_missing(mask, axis=1)
            consecutive_missing_cols = find_consecutive_missing(mask, axis=0)

            # 欠損率80%以上の列
            high_missing_cols = [df.columns[i] for i, rate in enumerate(missing_rate_per_col) if rate >= 0.8]

            logger.info("Missing pattern analysis completed.")

            # レポートを辞書形式でまとめる
            report = {
                "n_missing": int(n_missing),
                "missing_rate": float(missing_rate),
                "missing_per_row": missing_per_row.tolist(),
                "missing_per_col": missing_per_col.tolist(),
                "missing_rate_per_row": missing_rate_per_row.tolist(),
                "missing_rate_per_col": missing_rate_per_col.tolist(),
                "consecutive_missing_rows": consecutive_missing_rows,  # 連続欠損区間のリスト
                "consecutive_missing_cols": consecutive_missing_cols,
                "high_missing_cols": high_missing_cols
            }
            return report

        except Exception as e:
            logger.exception(f"MissingPatternAnalyzer failed: {e}")
            raise
実行例1:一部欠損と連続欠損があるケース
import pandas as pd
df = pd.DataFrame({
    "A": [1, np.nan, np.nan, 4, 5],
    "B": [np.nan, np.nan, np.nan, np.nan, 10],
    "C": [3, 4, 5, 6, 7]
})

analyzer = MissingPatternAnalyzer()
report = analyzer.analyze(df)
for k, v in report.items():
    print(f"{k}: {v}")
実行結果1
n_missing: 6
missing_rate: 0.4
missing_per_row: [1, 2, 2, 1, 0]
missing_per_col: [2, 4, 0]
missing_rate_per_row: [0.3333333333333333, 0.6666666666666666, 0.6666666666666666, 0.3333333333333333, 0.0]
missing_rate_per_col: [0.4, 0.8, 0.0]
consecutive_missing_rows: []
consecutive_missing_cols: [[1, 2, 3, 4]]
high_missing_cols: ['B']
実行ログ1
INFO     MissingPatternAnalyzer initialized.
INFO     Analyzing DataFrame of shape (5, 3)
DEBUG    Missing mask:
[[0 1 0]
 [1 1 0]
 [1 1 0]
 [0 1 0]
 [0 0 0]]
INFO     Missing pattern analysis completed.
実行例2:全行全欠損のケース
df = pd.DataFrame({"A": [np.nan, np.nan], "B": [np.nan, np.nan]})
analyzer = MissingPatternAnalyzer()
report = analyzer.analyze(df)
print(report)
実行結果2
{'n_missing': 4,
 'missing_rate': 1.0,
 'missing_per_row': [2, 2],
 'missing_per_col': [2, 2],
 'missing_rate_per_row': [1.0, 1.0],
 'missing_rate_per_col': [1.0, 1.0],
 'consecutive_missing_rows': [[0, 1]],
 'consecutive_missing_cols': [[0, 1]],
 'high_missing_cols': ['A', 'B']}
実行ログ2
INFO     MissingPatternAnalyzer initialized.
INFO     Analyzing DataFrame of shape (2, 2)
DEBUG    Missing mask:
[[1 1]
 [1 1]]
INFO     Missing pattern analysis completed.

■ 文法・構文まとめ

機能・構文 解説
pd.isna()/df.isna() DataFrameで欠損(NaN)位置をTrue/Falseで取得
mask.sum(axis=1/0) 行・列ごとの欠損数(ベクトル化で高速)
np.all(mask, axis=axis) 行や列すべてが欠損かどうかを一括判定
np.diff, np.where 立ち上がり・立ち下がり検出で連続区間を抽出
high_missing_cols 欠損率80%以上の列ラベルを自動抽出
logger.info/debug/exception loguruで全実行・異常を記録

Q.18

項目 内容
概要 DataFrame 各列を dtype・ユニーク性等の特徴から 'numerical', 'categorical', 'datetime', 'other' の4分類へ自動分類するクラス。未分類列はログ警告。
問題文 ColumnTypeClassifier() クラスを定義し、classify(df) で列ごとに型推定+カテゴリ分類し、ラベル辞書(列名→分類)を返す。型判定はpandas.api.types利用。未分類列があれば警告をログ出力すること。
要件 dtype判定/ユニーク比活用/カテゴリ定義/分類辞書返却/log記録
発展仕様
  • datetime型・自動判定(object列もdate変換試行)
  • カテゴリ型はユニーク比小(例:0.05未満か50未満)
  • 分類不能列はother+警告
使用構文 pd.api.types.is_numeric_dtype, is_datetime64_any_dtype, is_categorical_dtype, pd.to_datetime, nunique, dict, loguru.logger, try-except

A.18

■ 模範解答

import pandas as pd
from loguru import logger

class ColumnTypeClassifier:
    def __init__(self):
        logger.info("ColumnTypeClassifier initialized.")

    def classify(self, df: pd.DataFrame) -> dict:
        try:
            if not isinstance(df, pd.DataFrame):
                logger.error("Input must be a pandas DataFrame.")
                raise TypeError("Input must be a pandas DataFrame.")

            n_rows = len(df)
            results = {}

            for col in df.columns:
                s = df[col]
                col_type = "other"  # デフォルト

                # 1. datetime判定(明示的型 or 変換可)
                if pd.api.types.is_datetime64_any_dtype(s):
                    col_type = "datetime"
                else:
                    try:
                        parsed = pd.to_datetime(s, errors="raise")
                        if not parsed.isnull().all():
                            col_type = "datetime"
                    except Exception:
                        pass

                # 2. numerical判定(float, intなど)
                if col_type == "other" and pd.api.types.is_numeric_dtype(s):
                    col_type = "numerical"

                # 3. categorical判定(カテゴリ型 or ユニーク率閾値)
                if col_type == "other":
                    nunique = s.nunique(dropna=True)
                    uniq_ratio = nunique / n_rows if n_rows > 0 else 0
                    # カテゴリ型 or "ユニーク比0.05未満か50未満ならカテゴリ"のヒューリスティック
                    if pd.api.types.is_categorical_dtype(s) or uniq_ratio < 0.05 or nunique < 50:
                        col_type = "categorical"

                results[col] = col_type

            # 未分類(other)列の警告
            others = [col for col, typ in results.items() if typ == "other"]
            if others:
                logger.warning(f"Some columns could not be classified: {others}")

            logger.info(f"Classification result: {results}")
            return results

        except Exception as e:
            logger.exception(f"ColumnTypeClassifier failed: {e}")
            raise
実行例1:基本的な型分類
df = pd.DataFrame({
    "num": [1.0, 2.0, 3.0],
    "cat": ["a", "b", "a"],
    "date": pd.to_datetime(["2022-01-01", "2022-01-02", "2022-01-03"]),
    "obj": ["x", "y", "z"]
})

clf = ColumnTypeClassifier()
result = clf.classify(df)
print(result)
実行結果1
{'num': 'numerical', 'cat': 'categorical', 'date': 'datetime', 'obj': 'categorical'}
実行ログ1
INFO     ColumnTypeClassifier initialized.
INFO     Classification result: {'num': 'numerical', 'cat': 'categorical', 'date': 'datetime', 'obj': 'categorical'}
実行例2:未分類列(other)
df = pd.DataFrame({
    "misc": [lambda x: x, lambda x: x+1, lambda x: x-1],
    "num": [1, 2, 3]
})
clf = ColumnTypeClassifier()
result = clf.classify(df)
print(result)
実行結果2
{'misc': 'other', 'num': 'numerical'}
実行ログ2
INFO     ColumnTypeClassifier initialized.
WARNING  Some columns could not be classified: ['misc']
INFO     Classification result: {'misc': 'other', 'num': 'numerical'}

■ 文法・構文まとめ

機能・構文 解説
is_numeric_dtype, is_datetime64_any_dtype pandas標準の型自動判定メソッド
pd.to_datetime(s) object列でも日付変換できるか判定
nunique()/len(df) ユニーク比によるカテゴリ判定ヒューリスティック
'other' どの型にも分類できない列のデフォルトカテゴリ
logger.info, logger.warning loguruで詳細な分類結果や未分類警告を記録

Q.19

項目 内容
概要 任意の2つのDataFrameを指定キーで効率的に結合し、競合列名の解決(suffix付与)、内部結合・外部結合切替、キーや方式の異常時例外+log記録にも対応するクラス設計
問題文 CustomJoiner(on: str, how: str = "inner", suffixes: tuple[str, str] = ("_x", "_y")) を定義し、join(df1, df2) で指定条件に基づき結合し、異常時は例外+loguru記録せよ
要件 結合方式指定/onキー存在チェック/列重複検知・suffix自動付与/try-except例外/全処理log記録/pandas高速ベクトル化結合
発展仕様
  • 重複列名の警告記録
  • 不正なキー・方式時の詳細例外
  • 結合処理の成否記録
使用構文 pd.merge, suffixes, on, ValueError, TypeError, loguru.logger, try-except, issubclass, tuple, list

A.19

■ 模範解答

import pandas as pd
from loguru import logger

class CustomJoiner:
    def __init__(self, on: str, how: str = "inner", suffixes: tuple = ("_x", "_y")):
        # 入力検証: on, how, suffixes
        if not isinstance(on, str):
            logger.error("Join key 'on' must be a string.")
            raise ValueError("Join key 'on' must be a string.")
        if how not in {"inner", "outer", "left", "right"}:
            logger.error(f"Join method '{how}' not supported.")
            raise ValueError("Join method must be one of: 'inner', 'outer', 'left', 'right'.")
        if not (isinstance(suffixes, tuple) and len(suffixes) == 2):
            logger.error("Suffixes must be a tuple of two strings.")
            raise ValueError("Suffixes must be a tuple of two strings.")
        self.on = on
        self.how = how
        self.suffixes = suffixes
        logger.info(f"CustomJoiner initialized (on={on}, how={how}, suffixes={suffixes})")

    def join(self, df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
        try:
            # DataFrame型確認
            if not isinstance(df1, pd.DataFrame) or not isinstance(df2, pd.DataFrame):
                logger.error("Both inputs must be pandas DataFrame.")
                raise TypeError("Both inputs must be pandas DataFrame.")

            # onキー存在確認
            if self.on not in df1.columns or self.on not in df2.columns:
                logger.error(f"Key '{self.on}' not found in both DataFrames.")
                raise ValueError(f"Key '{self.on}' must exist in both DataFrames.")

            # 重複列名の検出(on以外で共通名がある場合警告)
            cols1 = set(df1.columns)
            cols2 = set(df2.columns)
            overlap = (cols1 & cols2) - {self.on}
            if overlap:
                logger.warning(f"Overlapping columns detected: {overlap}. Suffixes {self.suffixes} will be applied.")

            logger.info(f"Joining on '{self.on}' with method '{self.how}'.")

            # merge処理
            result = pd.merge(
                df1, df2,
                on=self.on,
                how=self.how,
                suffixes=self.suffixes
            )

            logger.info(f"Join completed. Result shape: {result.shape}")
            return result

        except Exception as e:
            logger.exception(f"CustomJoiner join failed: {e}")
            raise
実行例1:正常系(共通列の競合あり・suffix付与)
df1 = pd.DataFrame({"id": [1, 2], "value": [10, 20]})
df2 = pd.DataFrame({"id": [1, 2], "value": [100, 200], "other": ["a", "b"]})
joiner = CustomJoiner(on="id", how="inner", suffixes=("_a", "_b"))
result = joiner.join(df1, df2)
print(result)
実行結果1
   id  value_a  value_b other
0   1       10      100     a
1   2       20      200     b
実行ログ1
INFO     CustomJoiner initialized (on=id, how=inner, suffixes=('_a', '_b'))
WARNING  Overlapping columns detected: {'value'}. Suffixes ('_a', '_b') will be applied.
INFO     Joining on 'id' with method 'inner'.
INFO     Join completed. Result shape: (2, 4)
実行例2:異常系(onキーが一方に存在しない)
df1 = pd.DataFrame({"id": [1, 2], "v": [10, 20]})
df2 = pd.DataFrame({"x": [1, 2], "v": [100, 200]})
joiner = CustomJoiner(on="id")
joiner.join(df1, df2)
実行結果2
ValueError: Key 'id' must exist in both DataFrames.
実行ログ2
INFO     CustomJoiner initialized (on=id, how=inner, suffixes=('_x', '_y'))
ERROR    Key 'id' not found in both DataFrames.

■ 文法・構文まとめ

機能・構文 解説
pd.merge(..., on, how, suffixes) pandasの最速・最強の結合関数。key,結合方式,競合列の接尾辞指定が可能
set(df1.columns) & set(df2.columns) 両データフレームの重複列検出
logger.warning/info/error/exception loguruで警告・進行・異常・例外の詳細記録
try-except 失敗時の異常検出・再現性向上
ValueError, TypeError 異常な入力やパラメータを即例外化

Q.20

項目 内容
概要 MultiIndex を持つDataFrameの行または列に対して、階層情報を1階層のstrインデックスに変換して通常のDataFrameとして扱えるよう変換するクラス設計。セパレータや階層深さも柔軟に制御・log記録にも対応。
問題文 MultiIndexFlattener(separator="_") を定義し、flatten(df) により、列または行のMultiIndexを階層名をセパレータ結合したstr型index/columnsに変換した新DataFrameを返す。元dfは非破壊。全処理・例外をloguruで記録せよ。
要件 階層確認/MultiIndex→str化/列・行対応/セパレータ指定/ベクトル化高速変換/詳細log記録/例外処理
発展仕様
  • MultiIndexでない場合はそのまま返却(log出力のみ)
  • 階層Noneも含め必ずstr型に
  • セパレータは任意文字列
  • 全処理・異常log記録
使用構文 df.columns.map, df.index.map, isinstance, pd.MultiIndex, str.join, loguru.logger, try-except

A.20

■ 模範解答

import pandas as pd
from loguru import logger

class MultiIndexFlattener:
    def __init__(self, separator: str = "_"):
        # 階層連結用のセパレータ
        self.separator = separator
        logger.info(f"MultiIndexFlattener initialized (separator='{separator}')")

    def flatten(self, df: pd.DataFrame) -> pd.DataFrame:
        try:
            # DataFrame型か検証
            if not isinstance(df, pd.DataFrame):
                logger.error("Input must be a pandas DataFrame.")
                raise TypeError("Input must be a pandas DataFrame.")

            changed = False  # 変換フラグ

            # columnsがMultiIndexの場合: tuple→str変換
            if isinstance(df.columns, pd.MultiIndex):
                logger.info("Flattening columns MultiIndex.")
                # 各カラムの階層タプルをセパレータ結合しstrへ
                new_columns = df.columns.map(lambda tup: self.separator.join(map(str, tup)))
                changed = True
            else:
                new_columns = df.columns

            # indexがMultiIndexの場合: tuple→str変換
            if isinstance(df.index, pd.MultiIndex):
                logger.info("Flattening index MultiIndex.")
                new_index = df.index.map(lambda tup: self.separator.join(map(str, tup)))
                changed = True
            else:
                new_index = df.index

            # 新DataFrameを返却(元は非破壊)
            out_df = df.copy()
            out_df.columns = new_columns
            out_df.index = new_index

            if not changed:
                logger.info("No MultiIndex found. DataFrame unchanged.")

            logger.info("MultiIndex flattening completed.")
            return out_df

        except Exception as e:
            logger.exception(f"MultiIndexFlattener failed: {e}")
            raise
実行例1:列・行の両方がMultiIndex
import pandas as pd

cols = pd.MultiIndex.from_tuples([('A', 'x'), ('A', 'y'), ('B', 'z')])
idx = pd.MultiIndex.from_tuples([('foo', 1), ('foo', 2), ('bar', 1)])
df = pd.DataFrame([[1,2,3],[4,5,6],[7,8,9]], columns=cols, index=idx)

flattener = MultiIndexFlattener(separator=":")
result = flattener.flatten(df)
print(result)
実行結果1
         A:x  A:y  B:z
foo:1     1    2    3
foo:2     4    5    6
bar:1     7    8    9
実行ログ1
INFO     MultiIndexFlattener initialized (separator=':')
INFO     Flattening columns MultiIndex.
INFO     Flattening index MultiIndex.
INFO     MultiIndex flattening completed.
実行例2:通常の単層DataFrame
df = pd.DataFrame({"A": [1,2], "B": [3,4]}, index=["x", "y"])
flattener = MultiIndexFlattener()
result = flattener.flatten(df)
print(result)
実行結果2
   A  B
x  1  3
y  2  4
実行ログ2
INFO     MultiIndexFlattener initialized (separator='_')
INFO     No MultiIndex found. DataFrame unchanged.
INFO     MultiIndex flattening completed.

■ 文法・構文まとめ

機能・構文 解説
isinstance(df.columns, pd.MultiIndex) 複数階層インデックスかどうか確認
df.columns.map(lambda tup: "_".join(map(str, tup))) タプル(階層)→文字列連結して1階層へ変換
out_df = df.copy() 非破壊設計(元dfの状態は一切変更しない)
logger.info, logger.error, logger.exception loguruで正常系・異常系をすべて詳細記録

Q.21

項目 内容
概要 2つのDataFrameのインデックスまたはカラムを自動でアライン(再インデックス&並び替え)。pandas演算前に完全整合をとるクラス。整合性ログ記録・重複ラベル検証・方式切替・例外制御もすべて網羅する。
問題文 DataFrameAligner(method='index' | 'columns') クラスを設計し、align(df1, df2) でラベルを自動でreindex・並び替えた2つの新DataFrameを返せ。方式検証・重複エラー・処理ログも出力せよ。
要件 reindex利用/index・columnsの方式切替/非破壊/方式バリデーション/重複ラベル例外/処理内容のlog出力
発展仕様
  • ラベル重複時はValueError
  • 整合後のラベル情報をlogで出力
  • 引数・型エラーも詳細記録
使用構文 reindex, sort_index, sort_values, loguru.logger, ValueError, try-except, pd.DataFrame.duplicated

A.21

■ 模範解答

import pandas as pd
from loguru import logger

class DataFrameAligner:
    def __init__(self, method: str = "index"):
        # アライメント方式のバリデーション
        if method not in ("index", "columns"):
            logger.error("method must be 'index' or 'columns'.")
            raise ValueError("method must be 'index' or 'columns'.")
        self.method = method
        logger.info(f"DataFrameAligner initialized (method='{method}')")

    def align(self, df1: pd.DataFrame, df2: pd.DataFrame):
        try:
            # 型チェック
            if not isinstance(df1, pd.DataFrame) or not isinstance(df2, pd.DataFrame):
                logger.error("Both inputs must be pandas DataFrame.")
                raise TypeError("Both inputs must be pandas DataFrame.")

            # ラベル取得
            if self.method == "index":
                labels1 = df1.index
                labels2 = df2.index
                axis = 0
            else:
                labels1 = df1.columns
                labels2 = df2.columns
                axis = 1

            # 重複ラベルチェック
            if pd.Index(labels1).duplicated().any() or pd.Index(labels2).duplicated().any():
                logger.error("Duplicated labels detected in input DataFrames.")
                raise ValueError("Duplicated labels found in DataFrame.")

            # 共通ラベル集合を取得してソート(順序性確保)
            all_labels = sorted(set(labels1) | set(labels2))
            logger.debug(f"All labels after union: {all_labels}")

            # reindexで合わせて並び替え
            aligned1 = df1.reindex(all_labels, axis=axis)
            aligned2 = df2.reindex(all_labels, axis=axis)

            logger.info(f"Alignment completed on axis={self.method}.")
            logger.debug(f"Aligned df1 {self.method}s: {aligned1.index if axis == 0 else aligned1.columns}")
            logger.debug(f"Aligned df2 {self.method}s: {aligned2.index if axis == 0 else aligned2.columns}")
            return aligned1, aligned2

        except Exception as e:
            logger.exception(f"DataFrameAligner.align failed: {e}")
            raise
実行例1:行indexアライメント
df1 = pd.DataFrame({"A": [1, 2]}, index=["x", "y"])
df2 = pd.DataFrame({"B": [10, 20]}, index=["y", "z"])
aligner = DataFrameAligner(method="index")
a1, a2 = aligner.align(df1, df2)
print(a1)
print(a2)
実行結果1
     A
x  1.0
y  2.0
z  NaN
     B
x  NaN
y 10.0
z 20.0
実行ログ1
INFO     DataFrameAligner initialized (method='index')
DEBUG    All labels after union: ['x', 'y', 'z']
INFO     Alignment completed on axis=index.
DEBUG    Aligned df1 indexs: Index(['x', 'y', 'z'], dtype='object')
DEBUG    Aligned df2 indexs: Index(['x', 'y', 'z'], dtype='object')
実行例2:列labelアライメント+重複エラー
df1 = pd.DataFrame([[1, 2]], columns=["A", "A"])
df2 = pd.DataFrame([[3, 4]], columns=["A", "B"])
aligner = DataFrameAligner(method="columns")
aligner.align(df1, df2)
実行結果2
ValueError: Duplicated labels found in DataFrame.
実行ログ2
INFO     DataFrameAligner initialized (method='columns')
ERROR    Duplicated labels detected in input DataFrames.

■ 文法・構文まとめ

機能・構文 解説
reindex(all_labels, axis=...) ラベルを揃えて欠損はNaNで補い、順序も強制的に揃える
sorted(set(...)) ラベル集合を結合し昇順に。順序性を担保
duplicated().any() indexやcolumnsに重複ラベルがないかを検出
logger.info/debug/error loguruで進行・異常・整合情報を記録
try-except すべての例外に詳細ログ+re-raiseで安全運用

Q.22

項目 内容
概要 条件関数に合致するセルを定数や他列値で動的に置換する超汎用セル操作クラス。np.where 構文のDataFrame版。列指定や関数・値の柔軟性、エラー制御・詳細ログ記録も必須。
問題文 ConditionalReplacer(condition_fn, replacement_value, target_columns=None) を設計し、apply(df) で、condition_fn(row)がTrueなセルをreplacement_value(定数 or 関数 or 別列名指定)で置換した新DataFrameを返せ。target_columnsで列指定も可能。異常時は例外+loguru記録必須。
要件 行ごと条件判定/np.whereベクトル構造/定数・関数・列名での置換柔軟化/callable検証/列指定可/全処理・異常log/非破壊
発展仕様
  • condition_fn, replacement_value, target_columns型検証
  • 行方向applyベクトル化
  • replacementが関数/列/定数か自動判定
  • 列名未存在時例外
使用構文 apply, np.where, df.loc, callable, loguru.logger, TypeError, ValueError, copy

A.22

■ 模範解答

import pandas as pd
import numpy as np
from loguru import logger
from typing import Callable, Any, Optional, List, Union

class ConditionalReplacer:
    def __init__(self, condition_fn: Callable, replacement_value: Any, target_columns: Optional[Union[str, List[str]]] = None):
        # 条件関数がcallableか検証
        if not callable(condition_fn):
            logger.error("condition_fn must be callable.")
            raise TypeError("condition_fn must be callable.")
        self.condition_fn = condition_fn
        self.replacement_value = replacement_value

        # 列指定の正当性検証
        if target_columns is not None and not (isinstance(target_columns, (str, list))):
            logger.error("target_columns must be None, a str, or a list of str.")
            raise TypeError("target_columns must be None, a str, or a list of str.")
        self.target_columns = [target_columns] if isinstance(target_columns, str) else target_columns
        logger.info(f"ConditionalReplacer initialized (target_columns={self.target_columns})")

    def apply(self, df: pd.DataFrame) -> pd.DataFrame:
        try:
            # DataFrame型検証
            if not isinstance(df, pd.DataFrame):
                logger.error("Input must be a pandas DataFrame.")
                raise TypeError("Input must be a pandas DataFrame.")
            # 列存在検証
            targets = self.target_columns if self.target_columns is not None else list(df.columns)
            for col in targets:
                if col not in df.columns:
                    logger.error(f"Target column '{col}' does not exist in DataFrame.")
                    raise ValueError(f"Target column '{col}' does not exist in DataFrame.")

            logger.info(f"Columns to apply replacement: {targets}")

            out = df.copy()

            # 行方向に条件判定 (ベクトル化高速化)
            mask = df.apply(self.condition_fn, axis=1)  # 各行でbool取得
            logger.debug(f"Condition mask: {mask.values}")

            for col in targets:
                # 置換値の決定方式
                if callable(self.replacement_value):
                    # replacement_valueが関数の場合:行→値
                    new_values = df.apply(self.replacement_value, axis=1)
                elif isinstance(self.replacement_value, str) and self.replacement_value in df.columns:
                    # replacement_valueが列名の場合
                    new_values = df[self.replacement_value]
                else:
                    # replacement_valueが定数
                    new_values = self.replacement_value

                # np.whereで一括置換 (maskがTrueならnew_values、Falseは現状値)
                out[col] = np.where(mask, new_values, df[col])
                logger.debug(f"Column '{col}' replaced where condition is True.")

            logger.info("Conditional replacement completed.")
            return out

        except Exception as e:
            logger.exception(f"ConditionalReplacer.apply failed: {e}")
            raise
実行例1:列A>1のセルをB列の値で置換(全列対象)
df = pd.DataFrame({"A": [1, 2, 3], "B": [100, 200, 300]})
# Aが2より大きい or 2の行のセルをB列値で置換
replacer = ConditionalReplacer(
    condition_fn=lambda row: row["A"] >= 2,
    replacement_value="B"
)
result = replacer.apply(df)
print(result)
実行結果1
     A    B
0    1  100
1  200  200
2  300  300
実行ログ1
INFO     ConditionalReplacer initialized (target_columns=None)
INFO     Columns to apply replacement: ['A', 'B']
DEBUG    Condition mask: [False  True  True]
DEBUG    Column 'A' replaced where condition is True.
DEBUG    Column 'B' replaced where condition is True.
INFO     Conditional replacement completed.
実行例2:特定列のみ、関数による置換(2倍する)
df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
# A列が偶数の行のB列のみ、B列値を2倍で置換
replacer = ConditionalReplacer(
    condition_fn=lambda row: row["A"] % 2 == 0,
    replacement_value=lambda row: row["B"] * 2,
    target_columns="B"
)
result = replacer.apply(df)
print(result)
実行結果2
   A  B
0  1  4
1  2 10
2  3  6
実行ログ2
INFO     ConditionalReplacer initialized (target_columns=['B'])
INFO     Columns to apply replacement: ['B']
DEBUG    Condition mask: [False  True False]
DEBUG    Column 'B' replaced where condition is True.
INFO     Conditional replacement completed.

■ 文法・構文まとめ

機能・構文 解説
df.apply(condition_fn, axis=1) 各行でbool判定(マスクをベクトル的に作る)
np.where(mask, new, old) マスクTrueの場所だけ置換値、それ以外は元値
replacement_valueの多様性 定数/列名/関数どれも柔軟に許容
target_columns 対象列をstrまたはlist指定で柔軟化
logger.info/debug/error/exception loguruで正常系・異常系・詳細実行をすべて記録

Q.23

項目 内容
概要 カテゴリ列を「ラベルエンコード」または「one-hotエンコード」で変換し、fit→transform→inverse_transformを連続利用可能なクラス設計。方式切替・ラベル管理・逆変換・未fit時例外・カテゴリ数制限・全処理ログも必須。
問題文 CategoricalEncoder(method="label" | "onehot") を設計し、fit(df)transform(df)inverse_transform(df) を順番に利用できるようにせよ。
label方式はLabelEncoder相当、onehot方式はget_dummies相当。
全エラーと詳細logも記録せよ。
要件 エンコード方式切替/fit→transform必須順守/ラベル辞書・カテゴリ数管理/逆変換実装/未fit時エラー/log記録/DataFrame高速ベクトル変換
発展仕様
  • カテゴリ数上限エラー
  • カテゴリ列自動判定(objectやcategory型)
  • fit→transform間の整合検証
使用構文 pd.get_dummies, astype('category'), map, dict, ValueError, RuntimeError, loguru.logger

A.23

■ 模範解答

import pandas as pd
from loguru import logger

class CategoricalEncoder:
    def __init__(self, method: str = "label", max_categories: int = 100):
        # methodのバリデーション
        if method not in {"label", "onehot"}:
            logger.error("method must be 'label' or 'onehot'.")
            raise ValueError("method must be 'label' or 'onehot'.")
        self.method = method
        self.max_categories = max_categories
        self.fitted = False
        self.label_maps = {}    # 列ごとのカテゴリ→整数マップ
        self.inverse_maps = {}  # 列ごとの整数→カテゴリマップ
        self.columns_ = []      # fitしたカラム情報
        logger.info(f"CategoricalEncoder initialized (method={method})")

    def fit(self, df: pd.DataFrame):
        # fit時はカテゴリ列のみ選別
        cat_cols = [col for col in df.columns
                    if pd.api.types.is_object_dtype(df[col]) or pd.api.types.is_categorical_dtype(df[col])]
        logger.info(f"Fitting columns: {cat_cols}")
        self.columns_ = cat_cols
        self.label_maps.clear()
        self.inverse_maps.clear()
        for col in cat_cols:
            cats = pd.Series(df[col].astype("category").cat.categories)
            if len(cats) > self.max_categories:
                logger.error(f"Column '{col}' exceeds maximum {self.max_categories} categories.")
                raise ValueError(f"Too many categories in column '{col}'.")
            label_map = {cat: i for i, cat in enumerate(cats)}
            inv_map = {i: cat for i, cat in enumerate(cats)}
            self.label_maps[col] = label_map
            self.inverse_maps[col] = inv_map
        self.fitted = True
        logger.info("Fit complete.")

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        if not self.fitted:
            logger.error("transform called before fit.")
            raise RuntimeError("Must fit before transform.")
        logger.info("Transforming DataFrame.")
        out = df.copy()
        if self.method == "label":
            for col in self.columns_:
                if col in out:
                    out[col] = out[col].map(self.label_maps[col])
                    logger.debug(f"Column '{col}' label-encoded.")
        elif self.method == "onehot":
            # onehotのみ対象列のみ展開、その他はそのまま
            ohe = pd.get_dummies(out[self.columns_], prefix=self.columns_, dtype=int)
            out = out.drop(columns=self.columns_).reset_index(drop=True)
            ohe = ohe.reset_index(drop=True)
            out = pd.concat([out, ohe], axis=1)
            logger.info("One-hot encoding completed.")
        return out

    def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
        if not self.fitted:
            logger.error("inverse_transform called before fit.")
            raise RuntimeError("Must fit before inverse_transform.")
        logger.info("Inverse transforming DataFrame.")
        out = df.copy()
        if self.method == "label":
            for col in self.columns_:
                if col in out:
                    out[col] = out[col].map(self.inverse_maps[col])
                    logger.debug(f"Column '{col}' label-decoded.")
        elif self.method == "onehot":
            for col in self.columns_:
                # onehotカラム(prefix==col)を抽出
                ohe_cols = [c for c in out.columns if c.startswith(col + "_")]
                if not ohe_cols:
                    logger.warning(f"No one-hot columns for '{col}' found.")
                    continue
                idx = out[ohe_cols].values.argmax(axis=1)
                inv_map = self.inverse_maps[col]
                out[col] = [inv_map[i] for i in idx]
                out = out.drop(columns=ohe_cols)
                logger.debug(f"Column '{col}' one-hot decoded.")
        return out
実行例1:ラベルエンコーディング・逆変換
df = pd.DataFrame({"cat": ["a", "b", "a", "c"], "num": [1, 2, 3, 4]})
enc = CategoricalEncoder(method="label")
enc.fit(df)
df_enc = enc.transform(df)
print(df_enc)
df_dec = enc.inverse_transform(df_enc)
print(df_dec)
実行結果1
   cat  num
0    0    1
1    1    2
2    0    3
3    2    4
  cat  num
0   a    1
1   b    2
2   a    3
3   c    4
実行ログ1
INFO     CategoricalEncoder initialized (method=label)
INFO     Fitting columns: ['cat']
INFO     Fit complete.
INFO     Transforming DataFrame.
DEBUG    Column 'cat' label-encoded.
INFO     Inverse transforming DataFrame.
DEBUG    Column 'cat' label-decoded.
実行例2:one-hotエンコーディング・逆変換
df = pd.DataFrame({"color": ["red", "green", "red"], "size": ["S", "M", "L"]})
enc = CategoricalEncoder(method="onehot")
enc.fit(df)
df_enc = enc.transform(df)
print(df_enc)
df_dec = enc.inverse_transform(df_enc)
print(df_dec)
実行結果2
   size_L  size_M  size_S  color_green  color_red
0       0       0       1            0          1
1       0       1       0            1          0
2       1       0       0            0          1
  size color
0    S   red
1    M green
2    L   red
実行ログ2
INFO     CategoricalEncoder initialized (method=onehot)
INFO     Fitting columns: ['color', 'size']
INFO     Fit complete.
INFO     Transforming DataFrame.
INFO     One-hot encoding completed.
INFO     Inverse transforming DataFrame.
DEBUG    Column 'color' one-hot decoded.
DEBUG    Column 'size' one-hot decoded.

■ 文法・構文まとめ

機能・構文 解説
astype('category').cat.categories カテゴリ値を自動取得(順序保証)
map(self.label_maps[col]) カテゴリ→ラベルへの高速一括変換
pd.get_dummies one-hotエンコーディング(カテゴリごとに新列)
inverse_maps ラベル→カテゴリへの逆写像管理
RuntimeError fit前にtransform/inverse_transformを呼ぶと必ず例外
loguru.logger 進行・詳細実行・例外・警告をすべて記録

Q.24

項目 内容
概要 Zスコア・IQR法など複数指標での外れ値判定を行い、結果を新たなバイナリ列(is_outlier)として追加するクラス設計。パラメータ切替・列名競合回避・例外・詳細logも必須。
問題文 OutlierTagger(method='zscore' | 'iqr', threshold=3.0) を設計し、tag(df, column) により指定列で外れ値を検出し、'is_outlier'列を追加して返せ。手法切替・例外・ログ必須。
要件 指標切替/Zスコア・IQR法実装/バイナリ列追加/列名競合回避/ベクトル演算/全処理・例外log
発展仕様
  • methodのバリデーション
  • バイナリ列名の競合解決(自動リネーム)
  • 計算失敗時例外・詳細log
使用構文 np.mean, np.std, np.percentile, np.abs, df['new_col'], loguru.logger, ValueError, copy

A.24

■ 模範解答

import pandas as pd
import numpy as np
from loguru import logger

class OutlierTagger:
    def __init__(self, method: str = "zscore", threshold: float = 3.0):
        # methodのバリデーション
        if method not in ("zscore", "iqr"):
            logger.error("method must be 'zscore' or 'iqr'.")
            raise ValueError("method must be 'zscore' or 'iqr'.")
        self.method = method
        self.threshold = threshold
        logger.info(f"OutlierTagger initialized (method={method}, threshold={threshold})")

    def tag(self, df: pd.DataFrame, column: str) -> pd.DataFrame:
        try:
            # DataFrame型検証
            if not isinstance(df, pd.DataFrame):
                logger.error("Input must be a pandas DataFrame.")
                raise TypeError("Input must be a pandas DataFrame.")

            # カラム存在検証
            if column not in df.columns:
                logger.error(f"Column '{column}' does not exist in DataFrame.")
                raise ValueError(f"Column '{column}' does not exist.")

            # 結果バイナリ列名の競合回避
            base_flag = "is_outlier"
            flag_col = base_flag
            suffix = 1
            while flag_col in df.columns:
                flag_col = f"{base_flag}_{suffix}"
                suffix += 1
            if flag_col != base_flag:
                logger.warning(f"Flag column name '{base_flag}' conflicts. Using '{flag_col}'.")

            # 対象列抽出(欠損除外せずそのまま演算)
            x = df[column].values.astype(float)
            out_flag = np.zeros_like(x, dtype=bool)

            # Zスコア法
            if self.method == "zscore":
                mu = np.nanmean(x)
                sigma = np.nanstd(x)
                if sigma == 0 or np.isnan(sigma):
                    logger.error("Standard deviation is zero or NaN. Cannot compute z-score.")
                    raise ValueError("Cannot compute z-score: std is zero or NaN.")
                z = (x - mu) / sigma
                out_flag = np.abs(z) > self.threshold
                logger.info(f"Z-score mean={mu}, std={sigma}")

            # IQR法
            elif self.method == "iqr":
                q1 = np.nanpercentile(x, 25)
                q3 = np.nanpercentile(x, 75)
                iqr = q3 - q1
                if iqr == 0 or np.isnan(iqr):
                    logger.error("IQR is zero or NaN. Cannot compute IQR outliers.")
                    raise ValueError("Cannot compute IQR: IQR is zero or NaN.")
                lower = q1 - self.threshold * iqr
                upper = q3 + self.threshold * iqr
                out_flag = (x < lower) | (x > upper)
                logger.info(f"IQR q1={q1}, q3={q3}, IQR={iqr}, lower={lower}, upper={upper}")

            # 結果を新列に格納(非破壊)
            out = df.copy()
            out[flag_col] = out_flag.astype(int)
            logger.info(f"OutlierTagger tagging completed. Added column: '{flag_col}'")
            return out

        except Exception as e:
            logger.exception(f"OutlierTagger.tag failed: {e}")
            raise
実行例1:Zスコア法による外れ値フラグ付与
df = pd.DataFrame({"score": [1, 2, 2, 3, 100]})
tagger = OutlierTagger(method="zscore", threshold=2.0)
result = tagger.tag(df, "score")
print(result)
実行結果1
   score  is_outlier
0      1           0
1      2           0
2      2           0
3      3           0
4    100           1
実行ログ1
INFO     OutlierTagger initialized (method=zscore, threshold=2.0)
INFO     Z-score mean=21.6, std=43.13258890051631
INFO     OutlierTagger tagging completed. Added column: 'is_outlier'
実行例2:IQR法+バイナリ列競合回避
df = pd.DataFrame({"x": [1, 2, 3, 100], "is_outlier": [0, 0, 0, 0]})
tagger = OutlierTagger(method="iqr", threshold=1.5)
result = tagger.tag(df, "x")
print(result)
実行結果2
     x  is_outlier  is_outlier_1
0    1           0             0
1    2           0             0
2    3           0             0
3  100           0             1
実行ログ2
INFO     OutlierTagger initialized (method=iqr, threshold=1.5)
WARNING  Flag column name 'is_outlier' conflicts. Using 'is_outlier_1'.
INFO     IQR q1=1.75, q3=27.25, IQR=25.5, lower=-36.5, upper=65.5
INFO     OutlierTagger tagging completed. Added column: 'is_outlier_1'

■ 文法・構文まとめ

機能・構文 解説
np.mean, np.std 平均・標準偏差(zスコア)をNaN無視で算出
np.percentile 四分位点をNaN無視で取得(IQR法)
np.abs(z) > t Zスコアで閾値を超えるか高速ベクトル判定
copy 元dfは不変(非破壊設計)
flag_col 競合時は自動でis_outlier_1, is_outlier_2…と生成
logger.info/error loguruで全実行・異常・競合・パラメータ状況を詳細記録

Q.25

項目 内容
概要 欠損率やユニーク数比率など、複数の基準に従って列を自動で削除する高機能ユーティリティ。削除列はプロパティで管理・log出力。条件記述ミス・異常値も厳密に例外制御。
問題文 ColumnReducer(criteria: dict[str, float]) を設計し、reduce(df)NaN比率 > Xユニーク数比率 < Y のような基準で列を削除せよ。削除列は self.dropped_columns に記録・log出力必須。
要件 欠損率・ユニーク比率等の削除条件/列単位で判定/削除列の記録・プロパティ化/log出力/条件形式バリデーション
発展仕様
  • 条件式・値の妥当性検証
  • 複数条件併用可能(AND組み合わせ)
  • 削除列なし時もlog記録
使用構文 df.isna().mean(), df.nunique(), __call__, @property, loguru.logger, ValueError

A.24

■ 模範解答

import pandas as pd
from loguru import logger

class ColumnReducer:
    # サポートされる条件キー
    _allowed_criteria = {"nan_ratio", "unique_ratio"}

    def __init__(self, criteria: dict[str, float]):
        # 条件辞書が正しいか検証
        if not isinstance(criteria, dict):
            logger.error("criteria must be a dict[str, float].")
            raise ValueError("criteria must be a dict[str, float].")
        for key, value in criteria.items():
            if key not in self._allowed_criteria:
                logger.error(f"Unsupported criterion: {key}")
                raise ValueError(f"Unsupported criterion: {key}")
            if not isinstance(value, float) or not (0.0 <= value <= 1.0):
                logger.error(f"Criterion '{key}' must be a float in [0, 1].")
                raise ValueError(f"Criterion '{key}' must be a float in [0, 1].")
        self.criteria = criteria
        self._dropped_columns = []
        logger.info(f"ColumnReducer initialized: {criteria}")

    def reduce(self, df: pd.DataFrame) -> pd.DataFrame:
        try:
            # DataFrame型検証
            if not isinstance(df, pd.DataFrame):
                logger.error("Input must be a pandas DataFrame.")
                raise TypeError("Input must be a pandas DataFrame.")

            to_drop = set()
            n_rows = len(df)

            # 1. 欠損率条件
            if "nan_ratio" in self.criteria:
                nan_ratio = df.isna().mean()
                for col, ratio in nan_ratio.items():
                    if ratio > self.criteria["nan_ratio"]:
                        logger.info(f"Column '{col}' will be dropped by nan_ratio={ratio:.2f} > {self.criteria['nan_ratio']}")
                        to_drop.add(col)

            # 2. ユニーク比率条件
            if "unique_ratio" in self.criteria:
                # NaN除くユニーク数
                uniq_ratio = df.nunique(dropna=True) / n_rows
                for col, ratio in uniq_ratio.items():
                    if ratio < self.criteria["unique_ratio"]:
                        logger.info(f"Column '{col}' will be dropped by unique_ratio={ratio:.2f} < {self.criteria['unique_ratio']}")
                        to_drop.add(col)

            # ドロップ列を記録
            self._dropped_columns = sorted(to_drop)
            if self._dropped_columns:
                logger.info(f"Dropped columns: {self._dropped_columns}")
            else:
                logger.info("No columns dropped.")

            # 新DataFrame返却(非破壊)
            return df.drop(columns=self._dropped_columns)

        except Exception as e:
            logger.exception(f"ColumnReducer.reduce failed: {e}")
            raise

    @property
    def dropped_columns(self):
        # 外部公開用プロパティ
        return self._dropped_columns.copy()
実行例1:欠損率による列削除
df = pd.DataFrame({
    "A": [1, None, None],
    "B": [1, 2, 3],
    "C": [None, None, None]
})
reducer = ColumnReducer({"nan_ratio": 0.7})
result = reducer.reduce(df)
print(result)
print("Dropped:", reducer.dropped_columns)
実行結果1
     A  B
0  1.0  1
1  NaN  2
2  NaN  3
Dropped: ['C']
実行ログ1
INFO     ColumnReducer initialized: {'nan_ratio': 0.7}
INFO     Column 'C' will be dropped by nan_ratio=1.00 > 0.7
INFO     Dropped columns: ['C']
実行例2:ユニーク数比率による列削除(削除なし)
df = pd.DataFrame({"A": [1, 2, 3], "B": [1, 1, 1]})
reducer = ColumnReducer({"unique_ratio": 0.5})
result = reducer.reduce(df)
print(result)
print("Dropped:", reducer.dropped_columns)
実行結果2
   A  B
0  1  1
1  2  1
2  3  1
Dropped: []
実行ログ2
INFO     ColumnReducer initialized: {'unique_ratio': 0.5}
INFO     No columns dropped.

■ 文法・構文まとめ

機能・構文 解説
isna().mean() 列ごとに欠損(NaN)の割合を高速計算(ベクトル化)
nunique(dropna=True) 各列の(NaN除外)ユニーク値の個数
/ n_rows ユニーク比率に正規化
set() 重複のない削除対象列集合
@property 外部から削除列一覧を安全に参照できるようにするPython標準
logger.info/error loguruによる進行・異常・削除列など全詳細の記録
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?