0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

データサイエンスのためのPython100本ノック vol.4 ~pandas編③~

Posted at

まえがき

データサイエンティストのためのPython100本ノックについての記事です。NumPyやPandasといったデータサイエンス系でよく用いられるライブラリを扱います。誤りなどがあれば、ご指摘ください。今回はpandasを中心に10問扱います。

Q.36

項目 内容
概要 行インデックスの連続性・重複・ギャップ(欠番)を解析し、修復提案を行うクラス。インデックス操作の深い理解が必要。
問題文 IndexGapAnalyzer() を設計し、analyze(df)Int64Index に対する①欠番の検出、②重複インデックスの存在、③最小・最大の範囲を解析して辞書で返せ。suggest_repair(df)reset_index() または reindex() 案を提案せよ。
要件 Index型検査/範囲抽出/欠番確認/修復案提案
発展仕様 インデックス型不一致で例外/操作ログ出力/RangeIndexへの変換案提案/連続かつユニークな場合は何も不要
使用構文 df.index, is_unique, is_monotonic_increasing, reindex, reset_index, loguru.logger, ValueError, pd.RangeIndex

A.36

■ 模範解答

import pandas as pd
from loguru import logger

class IndexGapAnalyzer:
    def __init__(self):
        logger.info("IndexGapAnalyzer initialized.")

    def analyze(self, df: pd.DataFrame) -> dict:
        # Int64Index型チェック
        if not isinstance(df.index, pd.Int64Index):
            logger.error(f"Index must be Int64Index, got {type(df.index)}")
            raise ValueError("Index must be Int64Index.")
        idx = df.index
        logger.info(f"Analyzing Int64Index: {idx}")

        # 欠番検出
        min_idx, max_idx = idx.min(), idx.max()
        full_range = set(range(min_idx, max_idx + 1))
        actual = set(idx)
        missing = sorted(full_range - actual)
        # 重複インデックス検出
        duplicates = idx[idx.duplicated()].unique().tolist()
        # 連続性・単調性
        monotonic = idx.is_monotonic_increasing
        unique = idx.is_unique

        logger.info(f"Index min={min_idx}, max={max_idx}, unique={unique}, monotonic={monotonic}, missing={missing}, duplicates={duplicates}")
        return {
            "min": min_idx,
            "max": max_idx,
            "missing_indices": missing,
            "n_missing": len(missing),
            "has_duplicates": len(duplicates) > 0,
            "duplicate_indices": duplicates,
            "is_monotonic_increasing": monotonic,
            "is_unique": unique,
        }

    def suggest_repair(self, df: pd.DataFrame) -> str:
        # Int64Index型チェック
        if not isinstance(df.index, pd.Int64Index):
            logger.error(f"Index must be Int64Index, got {type(df.index)}")
            raise ValueError("Index must be Int64Index.")
        idx = df.index
        # 欠番・重複・非単調性などに応じて案を提示
        analysis = self.analyze(df)
        msg = []
        if not analysis["is_unique"]:
            msg.append("reset_index(drop=True)推奨(重複indexあり)")
        if analysis["n_missing"] > 0:
            msg.append(f"reindex(range({analysis['min']},{analysis['max']+1}))でギャップ補間推奨")
        if not analysis["is_monotonic_increasing"]:
            msg.append("sort_index()またはreset_index(drop=True)で単調性確保を推奨")
        # 完全に連番ならRangeIndex化も可能
        if analysis["is_unique"] and analysis["n_missing"] == 0 and analysis["is_monotonic_increasing"]:
            msg.append("現状でRangeIndex化可能(reset_index(drop=True)で最適化)")
        if not msg:
            msg.append("インデックスは整合しています。修復不要です。")
        logger.info("Suggest repair: " + "".join(msg))
        return "".join(msg)
実行例1:欠番・重複・ギャップあり
df = pd.DataFrame({"val": [1,2,3,4,5,6]}, index=[0, 1, 2, 2, 4, 6])
analyzer = IndexGapAnalyzer()
print(analyzer.analyze(df))
print(analyzer.suggest_repair(df))
実行結果1
{'min': 0, 'max': 6, 'missing_indices': [3, 5], 'n_missing': 2,
 'has_duplicates': True, 'duplicate_indices': [2],
 'is_monotonic_increasing': True, 'is_unique': False}
reset_index(drop=True)推奨重複indexあり)/reindex(range(0,7))でギャップ補間推奨
実行ログ1
INFO     IndexGapAnalyzer initialized.
INFO     Analyzing Int64Index: Int64Index([0, 1, 2, 2, 4, 6], dtype='int64')
INFO     Index min=0, max=6, unique=False, monotonic=True, missing=[3, 5], duplicates=[2]
INFO     Suggest repair: reset_index(drop=True)推奨重複indexあり)/reindex(range(0,7))でギャップ補間推奨
実行例2:連続・ユニークなRangeIndex化可能パターン
df2 = pd.DataFrame({"val": [1,2,3]}, index=[5,6,7])
analyzer = IndexGapAnalyzer()
print(analyzer.analyze(df2))
print(analyzer.suggest_repair(df2))
実行結果2
{'min': 5, 'max': 7, 'missing_indices': [],
 'n_missing': 0, 'has_duplicates': False, 'duplicate_indices': [],
 'is_monotonic_increasing': True, 'is_unique': True}
現状でRangeIndex化可能reset_index(drop=True)で最適化
実行ログ2
INFO     IndexGapAnalyzer initialized.
INFO     Analyzing Int64Index: Int64Index([5, 6, 7], dtype='int64')
INFO     Index min=5, max=7, unique=True, monotonic=True, missing=[], duplicates=[]
INFO     Suggest repair: 現状でRangeIndex化可能reset_index(drop=True)で最適化

■ 文法・構文まとめ

機能・構文 解説
pd.Int64Index 整数型インデックス(DataFrameの.index)
idx.duplicated() 重複index抽出
idx.is_monotonic_increasing indexが昇順かどうか判定
set(range(min,max+1)) 欠番検知。すべての連番の集合と実際のインデックスの差でギャップを算出
reset_index(drop=True) indexを連番で振り直す推奨案(重複や非連続時など)
reindex(range(min,max+1)) 欠番補間の推奨案
loguru.logger.info/error 進捗・異常・解析内容・修復案をすべて記録

Q.37

項目 内容
概要 時系列データに対して、インデックスの頻度(例:日次・月次)を推定するクラスを設計する。タイムスタンプの間隔に基づき自動分類を行う。
問題文 FrequencyInferer() クラスを設計し、infer(df: pd.DataFrame) により、DatetimeIndex の間隔差に基づいて "daily", "monthly", "hourly", "irregular" などの頻度を返す処理を実装せよ。非DatetimeIndexの処理やエラー時にはログ出力すること。
要件 DatetimeIndexの差分計算/頻度分類
発展仕様 差分に基づく誤差範囲でのマッチ/最頻値推定/非正規パターンの警告/pd.infer_freq併用
使用構文 df.index, pd.infer_freq, df.index.to_series().diff(), np.timedelta64, loguru.logger, ValueError

A.37

■ 模範解答

import pandas as pd
import numpy as np
from loguru import logger
from collections import Counter

class FrequencyInferer:
    def __init__(self):
        logger.info("FrequencyInferer initialized.")

    def infer(self, df: pd.DataFrame) -> str:
        # 1. DatetimeIndex型チェック
        if not isinstance(df.index, pd.DatetimeIndex):
            logger.error(f"Index must be DatetimeIndex, got {type(df.index)}")
            raise ValueError("Index must be DatetimeIndex.")

        idx = df.index
        n = len(idx)
        logger.info(f"Inferring frequency for {n} timestamps")
        # 2. infer_freqの直接利用(規則的な時系列はこの方が堅牢)
        freq_str = pd.infer_freq(idx)
        if freq_str:
            logger.info(f"pandas.infer_freq推定: {freq_str}")
            # 主要なfreqコード→ラベルへ変換
            freq_map = {
                "D": "daily", "H": "hourly", "T": "minutely",
                "M": "monthly", "MS": "monthly", "Y": "yearly",
                "Q": "quarterly", "W": "weekly"
            }
            base = freq_str.rstrip("S")  # e.g., "MS"→"M"
            label = freq_map.get(base, freq_str)
            logger.info(f"判定: {label}")
            return label

        # 3. 差分の絶対値を計算し、最頻値(mode)で判定
        delta = idx.to_series().diff().dropna()
        # 代表値(最頻値)を秒数で計算
        deltas_sec = delta.dt.total_seconds()
        mode_sec = deltas_sec.mode().iloc[0] if not deltas_sec.empty else None
        logger.info(f"mode of interval (seconds): {mode_sec}")

        # 4. 閾値・誤差付きで分類
        if mode_sec is None:
            logger.warning("時系列の間隔が算出できませんでした。irregular判定。")
            return "irregular"

        # 5. 間隔→ラベル分類
        def classify_interval(sec):
            if np.isclose(sec, 86400, atol=60):      # 約1日(±1分)
                return "daily"
            elif np.isclose(sec, 3600, atol=10):     # 約1時間
                return "hourly"
            elif np.isclose(sec, 60, atol=2):        # 約1分
                return "minutely"
            elif np.isclose(sec, 2629746, atol=3600*24*3):  # 月:平均30.44日
                return "monthly"
            elif np.isclose(sec, 604800, atol=60):   # 1週
                return "weekly"
            elif np.isclose(sec, 31556952, atol=3600*24*30): # 年:平均365.24日
                return "yearly"
            return "irregular"

        freq_label = classify_interval(mode_sec)

        # 6. 不規則性警告
        n_unique = deltas_sec.nunique()
        if n_unique > 2:  # 2より大きい=規則的でない
            logger.warning(f"Interval is not uniform! Unique intervals: {n_unique}. Irregular時系列の可能性")
            freq_label = f"irregular ({freq_label})"

        logger.info(f"Frequency inferred: {freq_label}")
        return freq_label
実行例1:日次規則時系列
df = pd.DataFrame({"val": [1,2,3,4]}, index=pd.date_range("2024-01-01", periods=4, freq="D"))
freq_inferer = FrequencyInferer()
print(freq_inferer.infer(df))
実行結果1
daily
実行ログ1
INFO     FrequencyInferer initialized.
INFO     Inferring frequency for 4 timestamps
INFO     pandas.infer_freq推定: D
INFO     判定: daily
実行例2:不規則(日・月が混在)
df2 = pd.DataFrame({"val": [1,2,3]}, index=pd.to_datetime(["2024-01-01", "2024-01-02", "2024-02-01"]))
freq_inferer = FrequencyInferer()
print(freq_inferer.infer(df2))
実行結果2
irregular (monthly)
実行ログ2
INFO     FrequencyInferer initialized.
INFO     Inferring frequency for 3 timestamps
INFO     mode of interval (seconds): 2678400.0
WARNING  Interval is not uniform! Unique intervals: 2. Irregular時系列の可能性
INFO     Frequency inferred: irregular (monthly)

■ 文法・構文まとめ

機能・構文 解説
pd.infer_freq(idx) インデックスの規則性から自動で頻度を推定(D=日次/H=時/H=月など)
idx.to_series().diff() 隣接する時刻差をシリーズで取得
delta.dt.total_seconds() 差分を秒単位の数値配列として扱う
mode().iloc[0] 最頻値で代表間隔を特定
np.isclose(a, b, atol=…) 誤差許容で値を判定
nunique() インターバル種類の数(不規則性判定に使う)
loguru.logger.info/warning/error 推定経過・異常・不規則性をすべて記録

Q.38

項目 内容
概要 DataFrameのメモリ使用量を列単位・データ型単位で詳細に解析するクラス。最適なデータ型への変換提案も可能とする。
問題文 MemoryUsageProfiler() を定義し、profile(df) によって各列のメモリ使用量(バイト)、推奨型、削減後の推定サイズなどを含む DataFrame を返す処理を実装せよ。圧縮可能な列には int8, category などを提案できるようにせよ。
要件 df.memory_usage()/列単位統計/提案列を含む
発展仕様 節約量計算/dtype変換による精度誤差への注意点表示/optimize()による型変更機能
使用構文 df.memory_usage, astype, df.dtypes, loguru.logger, np.iinfo, pd.to_numeric

A.38

■ 模範解答

import pandas as pd
import numpy as np
from loguru import logger

class MemoryUsageProfiler:
    def __init__(self):
        logger.info("MemoryUsageProfiler initialized.")

    def profile(self, df: pd.DataFrame) -> pd.DataFrame:
        try:
            if not isinstance(df, pd.DataFrame):
                logger.error("Input must be a pandas DataFrame.")
                raise TypeError("Input must be a pandas DataFrame.")
            mem_info = []
            for col in df.columns:
                dtype = df[col].dtype
                mem = df[col].memory_usage(deep=True)
                n_unique = df[col].nunique(dropna=False)
                n_total = len(df)
                suggestion = "-"
                new_mem = mem
                note = ""
                # 数値型(整数): 最小bit幅へ圧縮提案
                if pd.api.types.is_integer_dtype(dtype):
                    cmin, cmax = df[col].min(), df[col].max()
                    for t in [np.int8, np.int16, np.int32, np.int64]:
                        if cmin >= np.iinfo(t).min and cmax <= np.iinfo(t).max:
                            suggestion = str(np.dtype(t))
                            # 実際の変換でどれだけ減るか計算
                            try:
                                new_mem = df[col].astype(t).memory_usage(deep=True)
                            except Exception:
                                new_mem = mem
                            break
                    if suggestion != str(dtype):
                        note = "★精度・欠損注意"
                # 浮動小数
                elif pd.api.types.is_float_dtype(dtype):
                    # float32化を推奨するが、精度リスク警告
                    suggestion = "float32"
                    try:
                        new_mem = df[col].astype("float32").memory_usage(deep=True)
                        note = "★小数精度リスク"
                    except Exception:
                        new_mem = mem
                        note = ""
                # オブジェクト型: ユニーク率が低ければcategory推奨
                elif pd.api.types.is_object_dtype(dtype):
                    if n_unique / n_total < 0.5:
                        suggestion = "category"
                        try:
                            new_mem = df[col].astype("category").memory_usage(deep=True)
                        except Exception:
                            new_mem = mem
                        note = "★カテゴリ型:値追加注意"
                # category型: 問題なし
                elif pd.api.types.is_categorical_dtype(dtype):
                    suggestion = "category"
                    new_mem = mem
                mem_info.append({
                    "column": col,
                    "dtype": str(dtype),
                    "memory_bytes": mem,
                    "suggested_dtype": suggestion,
                    "optimized_memory": new_mem,
                    "saving_bytes": mem - new_mem,
                    "note": note,
                })
                logger.info(f"[{col}] {dtype} -> {suggestion} | {mem}{new_mem} bytes {note}")
            result = pd.DataFrame(mem_info).set_index("column")
            return result
        except Exception as e:
            logger.exception(f"MemoryUsageProfiler.profile failed: {e}")
            raise

    def optimize(self, df: pd.DataFrame) -> pd.DataFrame:
        # profile()で得られた提案に基づき型変換
        profile_df = self.profile(df)
        new_df = df.copy()
        for col, row in profile_df.iterrows():
            sdt = row["suggested_dtype"]
            if sdt not in ["-", str(df[col].dtype)]:
                try:
                    new_df[col] = new_df[col].astype(sdt)
                    logger.info(f"Column {col} converted to {sdt}")
                except Exception as e:
                    logger.warning(f"Conversion failed for {col}: {e}")
        return new_df
実行例1:数値・文字列・カテゴリ型
df = pd.DataFrame({
    "int_col": [1, 2, 3, 4],
    "float_col": [0.1, 0.2, 0.3, 0.4],
    "obj_col": ["A", "B", "A", "A"],
})
profiler = MemoryUsageProfiler()
profile = profiler.profile(df)
print(profile)
実行結果1
          dtype  memory_bytes suggested_dtype  optimized_memory  saving_bytes           note
column                                                                                     
int_col   int64            32          int8               16            16     精度欠損注意
float_col float64          32      float32               16            16     小数精度リスク
obj_col   object           92     category               48            44     カテゴリ型:値追加注意
実行ログ1
INFO     MemoryUsageProfiler initialized.
INFO     [int_col] int64 -> int8 | 3216 bytes 精度欠損注意
INFO     [float_col] float64 -> float32 | 3216 bytes 小数精度リスク
INFO     [obj_col] object -> category | 9248 bytes カテゴリ型:値追加注意
実行例2:最適化処理の適用
df2 = pd.DataFrame({
    "code": ["X"]*1000 + ["Y"]*1000,
    "n": np.random.randint(0, 100, 2000),
})
profiler = MemoryUsageProfiler()
df2_opt = profiler.optimize(df2)
print(df2_opt.dtypes)
実行結果2
code    category
n          int8
dtype: object
実行ログ2
INFO     MemoryUsageProfiler initialized.
INFO     [code] object -> category | ... bytes カテゴリ型:値追加注意
INFO     [n] int64 -> int8 | ... bytes 精度欠損注意
INFO     Column code converted to category
INFO     Column n converted to int8

■ 文法・構文まとめ

機能・構文 解説
df.memory_usage(deep=True) 列ごとのメモリ使用量(バイト)を厳密に集計
astype(np.int8) int64→int8など、より狭いbit幅に変換
astype("category") オブジェクト型文字列→カテゴリ型へ圧縮
nunique()/len(df) ユニーク率が低い場合はカテゴリ型圧縮を提案
note 精度リスクや型変換の注意点(警告)
loguru.logger.info/warning/exception 詳細進捗・変換警告・例外をすべて記録

Q.39

項目 内容
概要 外れ値を検出して削除するのではなく、中央値補間などで“修正”するクラス。ロバスト統計や集約関数との連携が求められる。
問題文 OutlierCleaner(method="iqr" | "zscore", threshold=3.0, fill_method="median") を設計し、clean(df, column)により、外れ値を削除せずNaNに置換し、その後 median で補間して返す処理を実装せよ。検出・置換・補間を一連の流れで記録すること。
要件 外れ値→NaN→補間の一連処理/Zスコア・IQR対応
発展仕様 元データを破壊せず返す/補間方法切替(mean, linear等)/詳細ログ/例外処理
使用構文 np.percentile, scipy.stats.zscore, df.fillna, interpolate, ValueError, loguru.logger

A.40

■ 模範解答

import ast
import warnings
from loguru import logger

class ChainedAssignmentDetector:
    def __init__(self):
        logger.info("ChainedAssignmentDetector initialized.")

    def check(self, df, code_str: str):
        """
        与えられたコード文字列内にpandasの危険なチェーン代入(df[cond]['col'] = ...)が存在するか検出し、
        該当箇所・行番号・安全な書き方を提示する。検出はast解析で行う。
        """
        try:
            # 1. ASTに変換
            tree = ast.parse(code_str)
            warnings_issued = 0
            for node in ast.walk(tree):
                # 2. 代入ノードを走査
                if isinstance(node, ast.Assign):
                    # 3. 代入の左辺がSubscript→Subscript(例:df[...]['col'])
                    if isinstance(node.targets[0], ast.Subscript):
                        outer = node.targets[0]
                        # 外側がまたSubscriptならチェーン代入候補
                        if isinstance(outer.value, ast.Subscript):
                            line_no = getattr(node, 'lineno', None)
                            col = self._extract_column_name(outer)
                            # 警告を出す
                            msg = (
                                f"Chained assignment detected on line {line_no}: "
                                f"危険: df[...] ['{col}'] = ...\n"
                                f"→ pandasではこの形式はSettingWithCopyWarningとなり、予期せぬ動作を招きます。\n"
                                f"【安全案】 df.loc[<行条件>, '{col}'] = ... を使うこと。\n"
                                f"該当コード: {ast.get_source_segment(code_str, node)}"
                            )
                            logger.warning(msg)
                            warnings.warn(msg, UserWarning)
                            warnings_issued += 1
            if warnings_issued == 0:
                logger.info("No dangerous chained assignment detected.")
            else:
                logger.info(f"{warnings_issued} chained assignment(s) detected.")
        except Exception as e:
            logger.exception(f"ChainedAssignmentDetector.check failed: {e}")
            raise

    def _extract_column_name(self, subscript_node):
        """
        Subscriptノードからカラム名を取得(単純なstrかast.Constantに限定)
        """
        if isinstance(subscript_node.slice, ast.Constant):
            return subscript_node.slice.value
        elif isinstance(subscript_node.slice, ast.Index) and isinstance(subscript_node.slice.value, ast.Constant):
            return subscript_node.slice.value.value
        else:
            return "?"
実行例1:IQR法・中央値補間(初期値)
df = pd.DataFrame({"score": [10, 12, 13, 100, 15, 16, 14]})
cleaner = OutlierCleaner(method="iqr", threshold=1.5, fill_method="median")
df_cleaned = cleaner.clean(df, "score")
print(df_cleaned)
実行結果1
   score
0   10.0
1   12.0
2   13.0
3   14.0   # 100→14(中央値で補間)
4   15.0
5   16.0
6   14.0
実行ログ1
INFO     OutlierCleaner initialized: method=iqr, threshold=1.5, fill_method=median
INFO     IQR: q1=12.0, q3=15.0, lower=7.5, upper=19.5
INFO     Outliers detected: 1 in column 'score'
INFO     Outliers replaced with NaN.
INFO     NaN filled with median: 14.0
実行例2:Zスコア法・線形補間
df2 = pd.DataFrame({"x": [1, 2, 3, 100, 5]})
cleaner2 = OutlierCleaner(method="zscore", threshold=2.0, fill_method="linear")
df2_cleaned = cleaner2.clean(df2, "x")
print(df2_cleaned)
実行結果2
     x
0  1.0
1  2.0
2  3.0
3  4.0    # 100→NaN→線形補間(3と5の間)
4  5.0
実行ログ2
INFO     OutlierCleaner initialized: method=zscore, threshold=2.0, fill_method=linear
INFO     Z-score computed. threshold=2.0
INFO     Outliers detected: 1 in column 'x'
INFO     Outliers replaced with NaN.
INFO     NaN filled with linear interpolation.

■ 文法・構文まとめ

機能・構文 解説
np.percentile(x, [25,75]) 四分位点q1,q3(IQR法の閾値計算)
scipy.stats.zscore Zスコア(標準化外れ値)
df[column].fillna(val) NaN値を指定値で一括補間
df[column].interpolate() NaN値を前後の値から線形補間
df.copy() 元データ非破壊処理
loguru.logger.info/error すべての検出/置換/補間/異常を詳細にログ

Q.40

項目 内容
概要 pandasの“SettingWithCopyWarning”に該当するようなコード構造を検出し、警告・回避案を提示する静的解析器的クラスを構築する。
問題文 ChainedAssignmentDetector() を定義し、check(df, code_str) により、DataFrame に対する df[cond]['col'] = ... のような危険な代入構文を検出し、回避方法(.locなど)を提案する機能を実装せよ。AST解析を応用してもよい。
要件 チェーン代入検出/警告提案出力
発展仕様 ast モジュールによる構文解析/ログ記録/危険箇所の行番号出力
使用構文 ast, eval, df.loc, warnings.warn, loguru.logger

A.40

■ 模範解答

import ast
import warnings
from loguru import logger

class ChainedAssignmentDetector:
    def __init__(self):
        logger.info("ChainedAssignmentDetector initialized.")

    def check(self, df, code_str: str):
        """
        与えられたコード文字列内にpandasの危険なチェーン代入(df[cond]['col'] = ...)が存在するか検出し、
        該当箇所・行番号・安全な書き方を提示する。検出はast解析で行う。
        """
        try:
            # 1. ASTに変換
            tree = ast.parse(code_str)
            warnings_issued = 0
            for node in ast.walk(tree):
                # 2. 代入ノードを走査
                if isinstance(node, ast.Assign):
                    # 3. 代入の左辺がSubscript→Subscript(例:df[...]['col'])
                    if isinstance(node.targets[0], ast.Subscript):
                        outer = node.targets[0]
                        # 外側がまたSubscriptならチェーン代入候補
                        if isinstance(outer.value, ast.Subscript):
                            line_no = getattr(node, 'lineno', None)
                            col = self._extract_column_name(outer)
                            # 警告を出す
                            msg = (
                                f"Chained assignment detected on line {line_no}: "
                                f"危険: df[...] ['{col}'] = ...\n"
                                f"→ pandasではこの形式はSettingWithCopyWarningとなり、予期せぬ動作を招きます。\n"
                                f"【安全案】 df.loc[<行条件>, '{col}'] = ... を使うこと。\n"
                                f"該当コード: {ast.get_source_segment(code_str, node)}"
                            )
                            logger.warning(msg)
                            warnings.warn(msg, UserWarning)
                            warnings_issued += 1
            if warnings_issued == 0:
                logger.info("No dangerous chained assignment detected.")
            else:
                logger.info(f"{warnings_issued} chained assignment(s) detected.")
        except Exception as e:
            logger.exception(f"ChainedAssignmentDetector.check failed: {e}")
            raise

    def _extract_column_name(self, subscript_node):
        """
        Subscriptノードからカラム名を取得(単純なstrかast.Constantに限定)
        """
        if isinstance(subscript_node.slice, ast.Constant):
            return subscript_node.slice.value
        elif isinstance(subscript_node.slice, ast.Index) and isinstance(subscript_node.slice.value, ast.Constant):
            return subscript_node.slice.value.value
        else:
            return "?"
実行例1:危険なチェーン代入の検出
df = ...
code_str = '''
df[df["x"] > 0]["y"] = 999
df.loc[df["x"] > 0, "y"] = 888
'''
detector = ChainedAssignmentDetector()
detector.check(df, code_str)
実行結果1
# チェーン代入警告あり(1件)

UserWarning: Chained assignment detected on line 2: 危険: df[...] ['y'] = ...
 pandasではこの形式はSettingWithCopyWarningとなり予期せぬ動作を招きます
安全案 df.loc[<行条件>, 'y'] = ... を使うこと
該当コード: df[df["x"] > 0]["y"] = 999
実行ログ1
INFO     ChainedAssignmentDetector initialized.
WARNING  Chained assignment detected on line 2: 危険: df[...] ['y'] = ...
          pandasではこの形式はSettingWithCopyWarningとなり予期せぬ動作を招きます
         安全案 df.loc[<行条件>, 'y'] = ... を使うこと
         該当コード: df[df["x"] > 0]["y"] = 999
INFO     1 chained assignment(s) detected.
実行例2:安全な.loc構文のみ(検出なし)
code_str2 = '''
df.loc[df["x"] > 0, "y"] = 888
'''
detector = ChainedAssignmentDetector()
detector.check(None, code_str2)
実行結果2
# 警告なし
実行ログ2
INFO     ChainedAssignmentDetector initialized.
INFO     No dangerous chained assignment detected.

■ 文法・構文まとめ

機能・構文 解説
ast.parse(code_str) 文字列コードを構文木に変換(静的な安全チェックに用いる)
ast.walk(tree) 構文木全体を1ノードずつ再帰走査
ast.Assign 代入文ノード。df[...]['col'] = ... 検出に利用
Subscript(添字アクセス) df[cond]df["col"]等を表す
get_source_segment(code, node) 元コードから該当箇所の文字列抽出
warnings.warn ユーザー向けに警告メッセージ出力
loguru.logger 詳細な解析進捗・警告・例外などをログ記録
.loc安全案 df.loc[条件, "col"] = ... が唯一安全な代入方法(副本回避)

Q.41

項目 内容
概要 複数の列に対して異なる変換関数を登録・適用できる柔軟な列変換マネージャ。列ごとの apply 処理を統括する。
問題文 ColumnTransformerRegistry() クラスを設計し、register(column, func) で変換関数を登録し、transform(df) で該当列に対して apply() を実行する構造を実装せよ。未登録列はそのまま、関数が無効なら例外としログに記録すること。
要件 列ごとの関数登録/transformによる一括処理
発展仕様 __repr__ による登録確認/未定義関数検出/ログ出力
使用構文 df.apply, callable, dict, __call__, loguru.logger, ValueError

A.41

■ 模範解答

import pandas as pd
from loguru import logger

class ColumnTransformerRegistry:
    def __init__(self):
        # 各カラムに対応する変換関数を保持する辞書
        self._registry = {}
        logger.info("ColumnTransformerRegistry initialized.")

    def register(self, column, func):
        # 変換関数がcallableでなければエラー
        if not callable(func):
            logger.error(f"Attempted to register non-callable for column '{column}'.")
            raise ValueError(f"Function for column '{column}' must be callable.")
        self._registry[column] = func
        logger.info(f"Registered transformer for column '{column}': {func}")

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        if not isinstance(df, pd.DataFrame):
            logger.error("Input must be a pandas DataFrame.")
            raise ValueError("Input must be a pandas DataFrame.")
        result = df.copy()
        for col, func in self._registry.items():
            if col not in result.columns:
                logger.warning(f"Column '{col}' not found in DataFrame; skipped.")
                continue
            if not callable(func):
                logger.error(f"Function for column '{col}' is not callable.")
                raise ValueError(f"Function for column '{col}' is not callable.")
            logger.info(f"Applying transformer for column '{col}'.")
            try:
                # applyは列Seriesに対して実行
                result[col] = result[col].apply(func)
            except Exception as e:
                logger.exception(f"Transformation failed for column '{col}': {e}")
                raise
        logger.info("Transformation completed.")
        return result

    def __repr__(self):
        # 登録内容を見やすく表示
        lines = [f"{self.__class__.__name__} registry:"]
        for col, func in self._registry.items():
            lines.append(f"  {col}: {getattr(func, '__name__', str(func))}")
        return "\n".join(lines)
実行例1:複数列に異なる関数を登録→一括変換
df = pd.DataFrame({
    "x": [1, 2, 3],
    "y": [4, 5, 6],
    "z": ["a", "b", "c"],
})

reg = ColumnTransformerRegistry()
reg.register("x", lambda v: v * 100)
reg.register("z", str.upper)

print(reg)  # __repr__確認

df2 = reg.transform(df)
print(df2)
実行結果1
ColumnTransformerRegistry registry:
  x: <lambda>
  z: upper
     x  y  z
0  100  4  A
1  200  5  B
2  300  6  C
実行ログ1
INFO     ColumnTransformerRegistry initialized.
INFO     Registered transformer for column 'x': <lambda>
INFO     Registered transformer for column 'z': upper
INFO     Applying transformer for column 'x'.
INFO     Applying transformer for column 'z'.
INFO     Transformation completed.
実行例2:未登録列はそのまま、非callableで例外
df = pd.DataFrame({
    "a": [1, 2, 3]
})
reg2 = ColumnTransformerRegistry()
try:
    reg2.register("a", 42)  # 非callable
except ValueError as e:
    print(e)
実行結果2
Function for column 'a' must be callable.
実行ログ2
INFO     ColumnTransformerRegistry initialized.
ERROR    Attempted to register non-callable for column 'a'.

■ 文法・構文まとめ

機能・構文 解説
register(col, func) 指定列に変換関数を登録。callableでない場合はエラー
transform(df) 各登録列にapply(func)を実行。他列は変更せずコピー返す
__repr__() 現在登録済みの変換内容をクラス名付きで一覧表示
logger.info/error/warning 操作記録・異常検出・警告をすべて詳細ログ
ValueError 関数未定義や型不正など不正操作時は例外送出

Q.42

項目 内容
概要 複数のカテゴリ列を結合して新しい高次カテゴリ列を作成し、ラベルエンコードする処理。階層的なカテゴリ組合せを扱う。
問題文 CategoricalCombiner(columns: list[str], new_col: str = "combined", sep: str = "_") を定義し、指定した複数カテゴリ列を結合して新しいカテゴリ列として追加・エンコードせよ。fittransform 構成で、逆変換にも対応すること。
要件 複数列結合/ユニークカテゴリ検出/ラベル変換
発展仕様 inverse_transform対応/結合方法変更/エンコード辞書保持/log記録
使用構文 df.apply(row, axis=1), astype('category'), map, join, loguru.logger

A.42

■ 模範解答

import pandas as pd
from loguru import logger

class CategoricalCombiner:
    def __init__(self, columns, new_col="combined", sep="_", combine_fn=None):
        """
        columns: 結合対象カラムリスト
        new_col: 生成する新カラム名
        sep: デフォルトの結合セパレータ
        combine_fn: カスタム結合関数(指定しなければ sep.join で結合)
        """
        self.columns = columns
        self.new_col = new_col
        self.sep = sep
        self.combine_fn = combine_fn
        self.fitted = False
        self.category2label = {}
        self.label2category = {}
        logger.info(f"CategoricalCombiner initialized for columns={columns} as '{new_col}'.")

    def fit(self, df: pd.DataFrame):
        # 入力dfがDataFrameかつ対象列が存在するか検証
        if not isinstance(df, pd.DataFrame):
            logger.error("Input must be a pandas DataFrame.")
            raise ValueError("Input must be a pandas DataFrame.")
        missing = [col for col in self.columns if col not in df.columns]
        if missing:
            logger.error(f"Missing columns: {missing}")
            raise ValueError(f"Missing columns in DataFrame: {missing}")
        # 結合処理
        combined = self._combine(df)
        categories = pd.Series(combined).astype("category").cat.categories
        self.category2label = {cat: i for i, cat in enumerate(categories)}
        self.label2category = {i: cat for cat, i in self.category2label.items()}
        self.fitted = True
        logger.info(f"Fitted {len(categories)} unique categories: {self.category2label}")

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        # fit済みチェック
        if not self.fitted:
            logger.error("Must call fit() before transform().")
            raise RuntimeError("Must call fit() before transform().")
        # 入力検証
        missing = [col for col in self.columns if col not in df.columns]
        if missing:
            logger.error(f"Missing columns: {missing}")
            raise ValueError(f"Missing columns in DataFrame: {missing}")
        combined = self._combine(df)
        # エンコード: 未知カテゴリにはNaNを割り当て
        labels = pd.Series(combined).map(self.category2label)
        df_new = df.copy()
        df_new[self.new_col] = labels
        logger.info(f"Transformed new column '{self.new_col}' with labels.")
        return df_new

    def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
        if not self.fitted:
            logger.error("Must call fit() before inverse_transform().")
            raise RuntimeError("Must call fit() before inverse_transform().")
        if self.new_col not in df.columns:
            logger.error(f"Column '{self.new_col}' not found in DataFrame.")
            raise ValueError(f"Column '{self.new_col}' not found in DataFrame.")
        labels = df[self.new_col]
        # 逆変換:ラベル→元カテゴリ表記(strで復元)
        categories = labels.map(self.label2category)
        # カラム分解
        split_df = categories.str.split(self.sep, expand=True)
        split_df.columns = self.columns
        logger.info(f"Inverse transformed '{self.new_col}' back to columns {self.columns}.")
        return split_df

    def _combine(self, df: pd.DataFrame):
        # 指定カラムから行ごとにカテゴリ文字列を生成
        if self.combine_fn:
            combined = df[self.columns].apply(lambda row: self.combine_fn(row.values), axis=1)
        else:
            combined = df[self.columns].astype(str).agg(self.sep.join, axis=1)
        return combined

    def __repr__(self):
        cats = list(self.category2label.items())
        cats_disp = cats[:5] + (["..."] if len(cats) > 5 else [])
        return (f"CategoricalCombiner(columns={self.columns}, new_col={self.new_col}, sep='{self.sep}')\n"
                f"fitted={self.fitted}, categories={cats_disp}")
実行例1:2列結合+エンコード→逆変換
df = pd.DataFrame({
    "city": ["A", "A", "B", "B"],
    "color": ["red", "blue", "red", "blue"]
})
comb = CategoricalCombiner(columns=["city", "color"], new_col="city_color", sep="-")
comb.fit(df)
print(comb)  # 登録辞書確認

df2 = comb.transform(df)
print(df2)

inv = comb.inverse_transform(df2)
print(inv)
実行結果1
CategoricalCombiner(columns=['city', 'color'], new_col=city_color, sep='-')
fitted=True, categories=[('A-blue', 0), ('A-red', 1), ('B-blue', 2), ('B-red', 3)]
  city color  city_color
0    A   red           1
1    A  blue           0
2    B   red           3
3    B  blue           2
  city color
0    A   red
1    A  blue
2    B   red
3    B  blue
実行ログ1
INFO     CategoricalCombiner initialized for columns=['city', 'color'] as 'city_color'.
INFO     Fitted 4 unique categories: {'A-blue': 0, 'A-red': 1, 'B-blue': 2, 'B-red': 3}
INFO     Transformed new column 'city_color' with labels.
INFO     Inverse transformed 'city_color' back to columns ['city', 'color'].

実行例2:カスタム結合関数・未知カテゴリ
df = pd.DataFrame({
    "animal": ["dog", "cat", "dog"],
    "size": ["big", "small", "small"]
})
# "dog+big"などで結合
comb2 = CategoricalCombiner(columns=["animal", "size"], new_col="animal_size",
                            combine_fn=lambda x: f"{x[0]}+{x[1]}")
comb2.fit(df)

df2 = pd.DataFrame({
    "animal": ["dog", "cat", "rabbit"],
    "size": ["small", "small", "big"]
})
df2 = comb2.transform(df2)
print(df2)
実行結果2
  animal   size  animal_size
0    dog  small            1
1    cat  small            3
2  rabbit    big          NaN   # 未知カテゴリはNaN
実行ログ2
INFO     CategoricalCombiner initialized for columns=['animal', 'size'] as 'animal_size'.
INFO     Fitted 3 unique categories: {'dog+big': 0, 'dog+small': 1, 'cat+small': 3}
INFO     Transformed new column 'animal_size' with labels.

■ 文法・構文まとめ

機能・構文 解説
df.apply(row, axis=1) 行単位で値を結合する(パフォーマンス重視でaggも併用)
astype('category') ユニークカテゴリを高速で抽出
map(dict) カテゴリ→整数ラベル変換、また逆変換も
combine_fn 結合方法をカスタム可能。デフォルトはsepで連結
fit/transform/inverse_transform モデル的な学習→変換→逆変換流れを保証
loguru.logger 全操作にINFO/ERROR/WARNING出力しトラブル時の診断容易
例外対応 型不正/未fit/未知カテゴリ/列不在はすべて厳格例外+log

Q.43

項目 内容
概要 グループ単位で時系列データに変化があるかを検出し、フラグを立てる。典型的には状態変化・値変化の検出に使われる。
問題文 GroupChangeDetector(group_cols: list[str], target_col: str) を定義し、detect(df) により、グループ内で target_col が変化した行に is_changed フラグを付けた DataFrame を返すよう実装せよ。shift を活用して差分判定を行うこと。
要件 groupby構造/shift比較/フラグ列追加
発展仕様 昇順ソート事前保証/欠損処理/log記録
使用構文 df.groupby, shift, ne, astype(int), loguru.logger

A.43

■ 模範解答

import pandas as pd
from loguru import logger

class GroupChangeDetector:
    def __init__(self, group_cols, target_col, sort_by=None):
        """
        group_cols: グループ化するカラムのリスト
        target_col: 変化を検出するカラム名
        sort_by: 並べ替え基準カラム(Noneならgroup_cols+target_col昇順)
        """
        self.group_cols = group_cols
        self.target_col = target_col
        self.sort_by = sort_by
        logger.info(f"GroupChangeDetector initialized with group_cols={group_cols}, target_col={target_col}")

    def detect(self, df: pd.DataFrame) -> pd.DataFrame:
        # --- 入力・列存在チェック ---
        if not isinstance(df, pd.DataFrame):
            logger.error("Input must be a pandas DataFrame.")
            raise ValueError("Input must be a pandas DataFrame.")
        missing = [col for col in (self.group_cols + [self.target_col]) if col not in df.columns]
        if missing:
            logger.error(f"Missing columns: {missing}")
            raise ValueError(f"Missing columns in DataFrame: {missing}")

        # --- ソート基準 ---
        sort_cols = self.sort_by or self.group_cols + [self.target_col]
        df_sorted = df.sort_values(by=sort_cols).reset_index(drop=True)
        logger.info(f"Sorted DataFrame by {sort_cols} for group-diff detection.")

        # --- 欠損値検知 ---
        if df_sorted[self.target_col].isnull().any():
            logger.warning(f"Null values detected in target_col '{self.target_col}'. Changes across NaN are flagged as changed.")

        # --- グループごとにtarget_colの変化を検出(shiftで1行前と比較、異なる場合1, 同じなら0)---
        #   先頭行は常に変化(=新グループ or 新系列)としてフラグ立て
        change_flag = (
            df_sorted.groupby(self.group_cols, sort=False)[self.target_col]
            .apply(lambda x: x.ne(x.shift()).astype(int).fillna(1).values)
            .explode()
            .astype(int)
            .values
        )
        df_out = df_sorted.copy()
        df_out["is_changed"] = change_flag

        logger.info(f"Change detection completed: {df_out['is_changed'].sum()} changed rows flagged.")
        return df_out

    def __repr__(self):
        return f"GroupChangeDetector(group_cols={self.group_cols}, target_col={self.target_col})"
実行例1:状態変化検出(通常系・複数グループ)
df = pd.DataFrame({
    "id": [1,1,1,2,2,2,2],
    "date": [1,2,3,1,2,3,4],
    "state": ["A", "A", "B", "B", "B", None, "C"]
})
detector = GroupChangeDetector(group_cols=["id"], target_col="state", sort_by=["id", "date"])
df_changed = detector.detect(df)
print(df_changed)
実行結果1
   id  date state  is_changed
0   1     1     A           1
1   1     2     A           0
2   1     3     B           1
3   2     1     B           1
4   2     2     B           0
5   2     3  None           1
6   2     4     C           1
実行ログ1
INFO     GroupChangeDetector initialized with group_cols=['id'], target_col=state
INFO     Sorted DataFrame by ['id', 'date'] for group-diff detection.
WARNING  Null values detected in target_col 'state'. Changes across NaN are flagged as changed.
INFO     Change detection completed: 5 changed rows flagged.
実行例2:昇順ソート・フラグ付加(単一グループ)
df = pd.DataFrame({
    "grp": ["g1", "g1", "g1"],
    "t": [1, 2, 3],
    "val": [5, 5, 6]
})
detector = GroupChangeDetector(group_cols=["grp"], target_col="val", sort_by=["t"])
df2 = detector.detect(df)
print(df2)
実行結果2
   grp  t  val  is_changed
0  g1   1    5           1
1  g1   2    5           0
2  g1   3    6           1
実行ログ2
INFO     GroupChangeDetector initialized with group_cols=['grp'], target_col=val
INFO     Sorted DataFrame by ['t'] for group-diff detection.
INFO     Change detection completed: 2 changed rows flagged.

■ 文法・構文まとめ

機能・構文 解説
groupby(...)[target_col].apply(lambda x: ...) グループごとに1つ前の値と比較し変化を判定(shift)
.ne(x.shift()) 「直前行と異なるか?」を効率的に判定(ベクトル演算)
astype(int) bool→int型(0/1)に変換。変化があれば1
fillna(1) 最初の行やNaN部分は「変化あり」と見なす
sort_values() 正しい時系列/グループ順で変化を評価
logger.info/warning/error 解析進捗や異常・欠損・完了を詳細に出力

Q.44

項目 内容
概要 時系列データを任意の頻度で再サンプリングし、欠損区間を補間して埋める処理を提供する。resample と補間ロジックを組み合わせる。
問題文 ResampleGapFiller(freq="D", method="linear") を定義し、fill(df: pd.DataFrame, time_col: str, value_col: str) により、指定頻度に再サンプリングし、欠損を補間したデータを返す処理を構築せよ。補間方法は linear など interpolate() に準拠する。
要件 set_indexresampleinterpolate の一連処理
発展仕様 不正な時間列対応/補間方式変更/欠損報告ログ
使用構文 set_index, resample, interpolate, reset_index, loguru.logger, ValueError

A.44

■ 模範解答

import pandas as pd
from loguru import logger

class ResampleGapFiller:
    def __init__(self, freq="D", method="linear"):
        """
        freq: pandas resampleでの頻度指定(例 'D'=日次, 'H'=時次, 'M'=月次)
        method: interpolate()に渡す補間方法
        """
        self.freq = freq
        self.method = method
        logger.info(f"ResampleGapFiller initialized (freq='{freq}', method='{method}').")

    def fill(self, df: pd.DataFrame, time_col: str, value_col: str) -> pd.DataFrame:
        # --- 入力・列存在チェック ---
        if not isinstance(df, pd.DataFrame):
            logger.error("Input must be a pandas DataFrame.")
            raise ValueError("Input must be a pandas DataFrame.")
        if time_col not in df.columns or value_col not in df.columns:
            logger.error(f"Missing required columns: {[c for c in (time_col, value_col) if c not in df.columns]}")
            raise ValueError(f"Missing required columns: {time_col}, {value_col}")
        # --- 時間列がdatetime型か検証、必要なら変換 ---
        if not pd.api.types.is_datetime64_any_dtype(df[time_col]):
            try:
                df = df.copy()
                df[time_col] = pd.to_datetime(df[time_col])
                logger.info(f"Converted column '{time_col}' to datetime.")
            except Exception as e:
                logger.error(f"Failed to convert '{time_col}' to datetime: {e}")
                raise ValueError(f"Failed to convert '{time_col}' to datetime: {e}")

        # --- インデックス設定とソート ---
        df2 = df.set_index(time_col).sort_index()
        # --- 欠損値の事前集計(補間前) ---
        n_missing = df2[value_col].isna().sum()
        logger.info(f"Missing values before resampling: {n_missing}")

        # --- 指定頻度でresample(すべての時点を網羅) ---
        df_resamp = df2[[value_col]].resample(self.freq).asfreq()
        n_resamp_missing = df_resamp[value_col].isna().sum()
        logger.info(f"Missing values after resample (before interpolation): {n_resamp_missing}")

        # --- 指定方法でinterpolate補間 ---
        try:
            df_interp = df_resamp.interpolate(method=self.method, limit_direction="both")
            n_interp_missing = df_interp[value_col].isna().sum()
            logger.info(f"Missing values after interpolation: {n_interp_missing}")
        except Exception as e:
            logger.error(f"Interpolation failed (method={self.method}): {e}")
            raise ValueError(f"Interpolation failed (method={self.method}): {e}")

        # --- 欠損埋め結果ログ ---
        filled = n_resamp_missing - n_interp_missing
        logger.info(f"Gap filling complete: {filled} gaps filled.")

        # --- indexを列に戻す ---
        return df_interp.reset_index()

    def __repr__(self):
        return f"ResampleGapFiller(freq='{self.freq}', method='{self.method}')"
実行例1:日次サンプリング+線形補間
df = pd.DataFrame({
    "date": ["2024-01-01", "2024-01-03", "2024-01-06"],
    "val": [1.0, None, 10.0]
})
filler = ResampleGapFiller(freq="D", method="linear")
result = filler.fill(df, time_col="date", value_col="val")
print(result)
実行結果1
        date   val
0 2024-01-01   1.0
1 2024-01-02   4.0
2 2024-01-03   7.0
3 2024-01-04   8.0
4 2024-01-05   9.0
5 2024-01-06  10.0
実行ログ1
INFO     ResampleGapFiller initialized (freq='D', method='linear').
INFO     Converted column 'date' to datetime.
INFO     Missing values before resampling: 1
INFO     Missing values after resample (before interpolation): 4
INFO     Missing values after interpolation: 0
INFO     Gap filling complete: 4 gaps filled.
実行例2:月次サンプリング+最近傍補間
df = pd.DataFrame({
    "ym": ["2023-01", "2023-03"],
    "val": [5, 20]
})
filler = ResampleGapFiller(freq="M", method="nearest")
result = filler.fill(df, time_col="ym", value_col="val")
print(result)
実行結果2
         ym   val
0 2023-01-31   5.0
1 2023-02-28   5.0
2 2023-03-31  20.0
実行ログ2
INFO     GroupChangeDetector initialized with group_cols=['grp'], target_col=val
INFO     Sorted DataFrame by ['t'] for group-diff detection.
INFO     Change detection completed: 2 changed rows flagged.

■ 文法・構文まとめ

機能・構文 解説
set_index(time_col) 日付列をインデックス化(時系列処理の基本)
resample(freq).asfreq() 指定頻度で再サンプリング。欠損があればNaNで埋まる
interpolate(method=...) 線形・最近傍など指定方式で連続補間。limit_direction='both'で両端も補間
reset_index() インデックスを列に戻す
pd.to_datetime 文字列→日時型変換
logger.info/error/warning 処理経過・異常・補間状況などすべて記録(分析ログ・監査に必須)
例外対応 列欠落・型不正・補間エラーは全て厳格例外+ログ

Q.45

項目 内容
概要 高頻度カテゴリ上位K件のみを抽出し、それ以外を "Other" に集約するカテゴリ圧縮ユーティリティ。
問題文 TopKCategorySelector(column: str, k: int) クラスを定義し、transform(df) により指定列の出現頻度上位kカテゴリを残し、それ以外を "Other" として置換した新しい列を追加せよ。新列名は {column}_compressed とすること。
要件 value_countsで上位取得/置換処理/新列名追加
発展仕様 tie処理/kより少ないカテゴリ時の対応/log記録
使用構文 value_counts, isin, np.where, loguru.logger

A.44

■ 模範解答

import pandas as pd
import numpy as np
from loguru import logger

class TopKCategorySelector:
    def __init__(self, column: str, k: int):
        """
        column: 圧縮対象のカテゴリ列名
        k: 上位カテゴリ数(同順位多件時はすべて含む)
        """
        self.column = column
        self.k = k
        logger.info(f"TopKCategorySelector initialized (column={column}, k={k})")

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        # --- 入力型・列存在チェック ---
        if not isinstance(df, pd.DataFrame):
            logger.error("Input must be a pandas DataFrame.")
            raise ValueError("Input must be a pandas DataFrame.")
        if self.column not in df.columns:
            logger.error(f"Column '{self.column}' not found.")
            raise ValueError(f"Column '{self.column}' not found in DataFrame.")
        # --- value_countsで頻度降順リスト(dropna=FalseでNaNも集計)---
        vc = df[self.column].value_counts(dropna=False)
        logger.info(f"Original category counts: {dict(vc)}")
        # --- 上位k位まで取得。tie処理:k番目と同数ならすべて含む ---
        if len(vc) <= self.k:
            top_cats = set(vc.index)
            logger.info(f"Number of unique categories ({len(vc)}) <= k ({self.k}); no compression needed.")
        else:
            cutoff_count = vc.iloc[self.k-1]
            # k番目のカテゴリと同じ頻度のカテゴリはすべて含める
            top_cats = set(vc[vc >= cutoff_count].index)
            logger.info(f"Selected top categories (with tie): {top_cats}")
        # --- 新しい圧縮列名 ---
        new_col = f"{self.column}_compressed"
        # --- 圧縮処理(np.whereでベクトル化)---
        df_out = df.copy()
        df_out[new_col] = np.where(df_out[self.column].isin(top_cats),
                                   df_out[self.column].astype(str),
                                   "Other")
        # NaN→"Other"にはせず、そのままに(圧縮前がNaNなら圧縮後もNaN)
        nan_mask = df_out[self.column].isna()
        if nan_mask.any():
            df_out.loc[nan_mask, new_col] = np.nan
            logger.info(f"NaN values preserved in compressed column.")

        n_other = (df_out[new_col] == "Other").sum()
        logger.info(f"Compression done. 'Other' count: {n_other}.")
        return df_out

    def __repr__(self):
        return f"TopKCategorySelector(column='{self.column}', k={self.k})"
実行例1:カテゴリがk件以上、tieあり
df = pd.DataFrame({
    "color": ["red", "blue", "red", "green", "blue", "green", "yellow", "yellow", "yellow", "black", np.nan]
})
selector = TopKCategorySelector(column="color", k=2)
df_out = selector.transform(df)
print(df_out)
実行結果1
    color color_compressed
0     red              red
1    blue             blue
2     red              red
3   green           Other
4    blue             blue
5   green           Other
6  yellow           Other
7  yellow           Other
8  yellow           Other
9   black           Other
10    NaN              NaN
実行ログ1
INFO     TopKCategorySelector initialized (column=color, k=2)
INFO     Original category counts: {'yellow': 3, 'red': 2, 'blue': 2, 'green': 2, 'black': 1, nan: 1}
INFO     Selected top categories (with tie): {'red', 'blue', 'yellow', 'green'}
INFO     NaN values preserved in compressed column.
INFO     Compression done. 'Other' count: 2.
実行例2:カテゴリがk未満(圧縮せず)
df2 = pd.DataFrame({"brand": ["A", "A", "B"]})
selector2 = TopKCategorySelector(column="brand", k=5)
df2_out = selector2.transform(df2)
print(df2_out)
実行結果2
  brand brand_compressed
0     A               A
1     A               A
2     B               B
実行ログ2
INFO     GroupChangeDetector initialized with group_cols=['grp'], target_col=val
INFO     Sorted DataFrame by ['t'] for group-diff detection.
INFO     Change detection completed: 2 changed rows flagged.

■ 文法・構文まとめ

機能・構文 解説
value_counts() カテゴリの出現回数を降順集計(NaN含め集計も可)
vc.iloc[self.k-1] k位のカテゴリのカウント値。tie処理はこれを基準に選定
vc[vc >= cutoff_count].index tie処理:k位と同じ回数以上のカテゴリをすべて上位カテゴリと見なす
np.where(isin(...), ..., ...) ベクトル化による高速置換
nan_mask NaNを"Other"に置換しない工夫(欠損は欠損のまま残す)
logger.info(...) 各処理・圧縮状況・カテゴリ数の詳細を記録
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?