0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

データサイエンスのためのPython100本ノック vol.1 ~Numpy編~

Last updated at Posted at 2025-08-04

まえがき

データサイエンティストのためのPython100本ノックについての記事です。NumPyやPandasといったデータサイエンス系でよく用いられるライブラリを扱います。誤りなどがあれば、ご指摘ください。今回はNumpyを中心に15問扱います。

Q.1

項目 内容
概要 2次元配列 arr において、指定行 row_indices のみを対象に、その行全体を行平均値で置換する。元配列は破壊せず、loguruに処理詳細を記録。NumPyのファンシーインデックス・ベクトル化・ブロードキャストを正しく理解し適用できるかを問う。
問題文 関数 transform_array(arr: np.ndarray, row_indices: list[int]) -> np.ndarray を定義せよ。
arr は2次元 NumPy 配列であり、row_indices は整数インデックスのリストである。指定された行の各要素を、その行の平均値で置き換えた新たな配列を返すこと。
要件 2次元配列処理/指定行のみ平均値で埋める/元配列非破壊/loguruログ出力
発展仕様 入力検証(型・次元・範囲)/平均形状維持(keepdims)/ベクトル化処理/重複インデックス排除/負インデックス対応/ログ記録(変換行・非変換行・平均値)/例外処理(ValueError, IndexError, TypeError)
使用構文 np.copy, np.mean, broadcast_to, fancy indexing, loguru.logger, try-except, set, dict.fromkeys, enumerate, isinstance, TypeError, IndexError, ValueError, np.ndarray, axis, keepdims

A.1

■ 模範解答

import numpy as np
from loguru import logger
from typing import List

def transform_array(arr: np.ndarray, row_indices: List[int]) -> np.ndarray:
    try:
        # 検証①:入力がNumPy配列か
        if not isinstance(arr, np.ndarray):
            raise TypeError("arr must be a numpy.ndarray.")
        
        # 検証②:2次元配列か
        if arr.ndim != 2:
            raise ValueError("arr must be a 2-dimensional array.")
        
        # 検証③:インデックスリストの型と内容
        if not isinstance(row_indices, list) or not all(isinstance(i, int) for i in row_indices):
            raise TypeError("row_indices must be a list of integers.")
        
        n_rows = arr.shape[0]
        
        # 検証④:インデックス範囲(負の添字も含め有効範囲)
        for i in row_indices:
            if not (-n_rows <= i < n_rows):
                raise IndexError(f"Index {i} is out of bounds for array with {n_rows} rows.")

        # 非破壊のため、元配列を複製
        result = np.copy(arr)

        # 重複排除・順序維持されたインデックス(dict.fromkeysで順序セット)
        unique_rows = list(dict.fromkeys(row_indices))

        logger.info(f"Rows to be transformed (vectorized): {unique_rows}")

        # ベクトル化:対象行をまとめて取得
        target_rows = result[unique_rows]  # shape: (N, M)

        # 行ごとの平均値を shape=(N,1) として取得(keepdims)
        row_means = np.mean(target_rows, axis=1, keepdims=True)

        # ブロードキャストで各行に平均値を展開
        result[unique_rows] = np.broadcast_to(row_means, target_rows.shape)

        # ログ記録:各行の平均値
        for idx, mean_val in zip(unique_rows, row_means):
            logger.debug(f"Row {idx} replaced with mean value: {mean_val.item()}")

        # 非対象行のログ記録
        untouched_rows = set(range(n_rows)) - set(unique_rows)
        for i in sorted(untouched_rows):
            logger.debug(f"Row {i} remains unchanged.")

        return result

    except Exception as e:
        # すべての例外をloguruでログ出力し、再送出
        logger.exception(f"An error occurred: {e}")
        raise
実行例1:正常系(複数行を平均化)
arr = np.array([
    [1, 2, 3],
    [4, 5, 6],
    [7, 8, 9]
])
row_indices = [0, 2]

res = transform_array(arr, row_indices)
print(res)
実行結果1
[[2 2 2]
 [4 5 6]
 [8 8 8]]
実行ログ1
INFO     Rows to be transformed (vectorized): [0, 2]
DEBUG    Row 0 replaced with mean value: 2.0
DEBUG    Row 2 replaced with mean value: 8.0
DEBUG    Row 1 remains unchanged.
実行例2:異常系(範囲外インデックス)
arr = np.array([[1, 2], [3, 4]])
row_indices = [0, 5]

transform_array(arr, row_indices)
実行結果2
IndexError: Index 5 is out of bounds for array with 2 rows.
実行ログ1
ERROR    Error occurred: Index 5 is out of bounds for array with 2 rows.

■ 文法・構文まとめ

文法・関数 解説
np.copy(arr) 元の配列を破壊せず、新しい独立した配列を作成
np.mean(..., axis=1, keepdims=True) 各行ごとの平均を縦ベクトルで取得し、形状を保ってブロードキャスト可能に
np.broadcast_to(...) 平均値 (N,1) を (N,M) に拡張して一括代入
fancy indexing arr[indices] で複数行を一括抽出
loguru 高速・柔軟なログ出力ライブラリ(エラー、処理過程、デバッグ情報出力に使用)
dict.fromkeys(row_indices) 順序を保ったまま重複削除(Python 3.7以降で有効)
set(range(n_rows)) - set(...) 未変更行の検出

Q.2

項目 内容
概要 特定のブールマスクに基づき部分的にZスコア標準化を行う MaskedScaler クラスを設計せよ。軸方向に平均・標準偏差をマスク付きで算出し、非破壊に変換を行うこと。未学習・不正マスク・NaNを含む場合の例外/警告ログ処理を含めること。
問題文 クラス MaskedScaler(axis: int) を定義し、以下を満たすこと:
  • fit(arr, mask) によりマスクされた部分に対して軸方向の平均と標準偏差を学習
  • __call__(arr) により新しい標準化配列を返す(元配列は不変)
  • mean, std@property で取得
  • 未学習状態での使用・不正マスク・NaN入力に対する例外/ログ処理を追加
  • __repr__ を実装
要件 軸指定/fit-call分離/非破壊処理/ベクトル化/ブールマスク/プロパティ取得/loguruログ/例外処理/NaN警告/repr表示
使用構文 np.mean, np.std, np.where, np.isnan, boolean indexing, broadcasting, @property, __call__, __repr__, RuntimeError, ValueError, TypeError, loguru

A.2

■ 模範解答

import numpy as np
from loguru import logger

class MaskedScaler:
    def __init__(self, axis: int = 0):
        # 軸の指定は0(列)または1(行)のみ許容
        if axis not in (0, 1):
            raise ValueError("axis must be 0 (column-wise) or 1 (row-wise).")
        self.axis = axis
        self._mean = None
        self._std = None
        self._fitted = False

    def fit(self, arr: np.ndarray, mask: np.ndarray):
        # 型と形状の検証
        if not isinstance(arr, np.ndarray):
            raise TypeError("arr must be a numpy.ndarray.")
        if not isinstance(mask, np.ndarray) or mask.dtype != bool:
            raise TypeError("mask must be a boolean numpy.ndarray.")
        if arr.shape != mask.shape:
            raise ValueError("arr and mask must have the same shape.")

        if np.isnan(arr).any():
            logger.warning("Input array contains NaN values. These may affect computation.")

        # マスク外をNaNに置き換え
        masked = np.where(mask, arr, np.nan)

        # NaNを除外して軸方向の平均と標準偏差を計算
        self._mean = np.nanmean(masked, axis=self.axis, keepdims=True)
        self._std = np.nanstd(masked, axis=self.axis, ddof=0, keepdims=True)

        # fit完了フラグ
        self._fitted = True

        logger.info(f"MaskedScaler fitted (axis={self.axis}). Mean: {self._mean.shape}, Std: {self._std.shape}")

    def __call__(self, arr: np.ndarray) -> np.ndarray:
        # fit前チェック
        if not self._fitted:
            raise RuntimeError("Scaler must be fitted before transformation.")
        if not isinstance(arr, np.ndarray):
            raise TypeError("Input must be a numpy.ndarray.")
        if arr.ndim != self._mean.ndim or arr.shape[self.axis] != self._mean.shape[self.axis]:
            raise ValueError("Input shape is incompatible with fitted statistics.")

        logger.debug("Applying standardized transformation.")
        # ベクトル化された標準化処理(元配列を非破壊)
        return (arr - self._mean) / self._std

    @property
    def mean(self):
        if not self._fitted:
            raise RuntimeError("Scaler has not been fitted yet.")
        return self._mean

    @property
    def std(self):
        if not self._fitted:
            raise RuntimeError("Scaler has not been fitted yet.")
        return self._std

    def __repr__(self):
        if not self._fitted:
            return f"<MaskedScaler(axis={self.axis}, status=not fitted)>"
        return f"<MaskedScaler(axis={self.axis}, mean.shape={self._mean.shape}, std.shape={self._std.shape})>"
実行例1:正常系(列方向の部分標準化)
import numpy as np
from masked_scaler import MaskedScaler

arr = np.array([
    [1, 2, 3],
    [4, np.nan, 6],
    [7, 8, 9]
])
mask = np.array([
    [True, True, False],
    [True, False, True],
    [True, True, False]
])

scaler = MaskedScaler(axis=0)
scaler.fit(arr, mask)
print("Mean:", scaler.mean)
print("Std:", scaler.std)
print("Transformed:\n", scaler(arr))
実行結果1
Mean: [[4. 5. 4.5]]
Std: [[2.44948974 3.         2.12132034]]
Transformed:
[[-1.22474487 -1.         -0.70710678]
 [ 0.          nan  0.70710678]
 [ 1.22474487  1.          2.12132034]]
実行ログ1
WARNING  Input array contains NaN values. These may affect computation.
INFO     MaskedScaler fitted (axis=0). Mean: (1, 3), Std: (1, 3)
DEBUG    Applying standardized transformation.
実行例2:異常系(未fitで call)
import numpy as np
from masked_scaler import MaskedScaler

arr = np.array([[1, 2], [3, 4]])
scaler = MaskedScaler(axis=1)
scaler(arr)  # 未fitで使用
実行結果2
RuntimeError: Scaler must be fitted before transformation.
実行ログ1

■ 文法・構文まとめ

文法・構文 解説
np.where(mask, arr, np.nan) ブールマスクを使ってマスク外を NaN に置き換え、統計量計算から除外
np.nanmean / nanstd NaNを除外して平均・標準偏差を計算(部分的な統計量に最適)
keepdims=True 軸方向の次元を保ち、ブロードキャスト可能な形状に整える
@property .mean.std をメソッドでなく属性のように扱えるようにする
__call__ インスタンスを関数のように使える(例:scaler(arr)
__repr__ オブジェクトの文字列表現をカスタマイズ(print表示やログ出力に有用)

Q.3

項目 内容
概要 2D NumPy 配列の任意の矩形ブロックに対して、クロージャによって生成された任意の変換関数を適用し、新しい配列を返す。クロージャ+スライス+ベクトル化演算を組み合わせて柔軟かつ効率的な部分変換を設計する。元配列は変更してはならない。
問題文 関数 array_patch(arr: np.ndarray, row_start: int, row_end: int, col_start: int, col_end: int, transform_factory: Callable[[], Callable[[np.ndarray], np.ndarray]]) -> np.ndarray を定義せよ。
引数で指定された部分ブロックに対して、transform_factory により生成されたクロージャを適用し、新たな配列を返すこと。破壊操作は禁止。不正インデックス、非callableな変換関数、処理中の例外はすべてログ出力し、適切な例外を送出せよ。
要件 クロージャ/部分スライス/NumPyによる非破壊変換/インデックス検証/callable確認/例外処理とログ記録
発展仕様
  • クロージャによって動的に変換関数を生成する仕組み
  • 不正なインデックス・型への検証とValueError送出
  • NumPyベクトル演算の活用による高速処理
  • loguru による詳細ログ出力
  • deepcopyを避けたNumPy的コピー処理
使用構文 slice, fancy indexing, callable, closure, np.copy, try-except, ValueError, TypeError, loguru.logger

A.3

■ 模範解答

import numpy as np
from loguru import logger
from typing import Callable

def array_patch(
    arr: np.ndarray,
    row_start: int,
    row_end: int,
    col_start: int,
    col_end: int,
    transform_factory: Callable[[], Callable[[np.ndarray], np.ndarray]]
) -> np.ndarray:
    try:
        # 型と次元の検証
        if not isinstance(arr, np.ndarray) or arr.ndim != 2:
            raise TypeError("Input must be a 2D numpy.ndarray.")

        # 範囲検証(負の添字対応を含めてチェック)
        n_rows, n_cols = arr.shape
        if not (0 <= row_start <= row_end <= n_rows) or not (0 <= col_start <= col_end <= n_cols):
            raise ValueError(f"Invalid index range: "
                             f"(row_start={row_start}, row_end={row_end}), "
                             f"(col_start={col_start}, col_end={col_end})")

        # クロージャ生成関数の検証
        if not callable(transform_factory):
            raise TypeError("transform_factory must be callable.")
        
        transform = transform_factory()
        if not callable(transform):
            raise TypeError("transform_factory must return a callable.")

        logger.info("Starting array_patch transformation.")

        # 元配列を非破壊コピー(メモリ効率高い np.copy)
        result = np.copy(arr)

        # 対象ブロックの抽出(NumPyスライスでベクトル化)
        sub_block = result[row_start:row_end, col_start:col_end]

        logger.debug(f"Original block:\n{sub_block}")

        # クロージャによる変換適用(変換関数自体がベクトル化されていることが前提)
        transformed_block = transform(sub_block)

        # 結果の型・形状検証
        if transformed_block.shape != sub_block.shape:
            raise ValueError("Transformed block must have the same shape as original block.")

        # ブロック差し替え(NumPyスライス代入)
        result[row_start:row_end, col_start:col_end] = transformed_block

        logger.debug(f"Transformed block:\n{transformed_block}")

        return result

    except Exception as e:
        logger.exception(f"Error occurred during array_patch: {e}")
        raise
実行例1:正常系(クロージャで定数加算)
import numpy as np

# 3x3配列
arr = np.array([
    [1, 2, 3],
    [4, 5, 6],
    [7, 8, 9]
])

# クロージャで加算変換関数を生成
def add_10_factory():
    def transform(x):
        return x + 10
    return transform

# 配列の中央(1行目, 1列目)に+10する
patched = array_patch(arr, 1, 2, 1, 2, add_10_factory)
print(patched)
実行結果1
[[ 1  2  3]
 [ 4 15  6]
 [ 7  8  9]]
実行ログ1
INFO     Starting array_patch transformation.
DEBUG    Original block:
[[5]]
DEBUG    Transformed block:
[[15]]
実行例2:異常系(クロージャで不正変換)
import numpy as np

arr = np.array([[1, 2], [3, 4]])

# 誤って関数を返さないクロージャ
def broken_factory():
    return 12345  # callable ではない

array_patch(arr, 0, 2, 0, 2, broken_factory)
実行結果2
TypeError: transform_factory must return a callable.
実行ログ2
ERROR    Error occurred during array_patch: transform_factory must return a callable.

■ 文法・構文まとめ

構文 解説
np.copy(arr) 元配列を非破壊でコピー(deepcopyより高速)
slice arr[start:end] で部分抽出(範囲外自動処理)
closure 関数内関数としてスコープを保持する変換関数を動的に生成
callable オブジェクトが関数として呼べるか(__call__を持つ)を確認
try-except + loguru エラーのキャッチとログ出力を一元的に行う

■ 高速化とPythonicポイント

処理箇所 改善点
クロージャ 関数定義を動的生成。柔軟性と閉包的変換が可能に。
スライス演算 arr[start:end] でループを使わず高速な部分抽出・代入を実現
非破壊処理 np.copy により安全に変更しつつ効率も担保

Q.4

項目 内容
概要 任意の小さな配列 arr を指定形状 target_shape に対して、先頭軸方向からゼロパディングを施し、ブロードキャスト可能な形状に整形して返す。元配列は破壊しない。
問題文 関数 broadcast_pad(arr: np.ndarray, target_shape: tuple[int]) -> np.ndarray を定義せよ。
配列の次元が target_shape 未満なら前方に1次元軸を追加し、必要に応じて np.pad によりゼロパディングすることで target_shape に整形する。
ただし、変換後は np.broadcast_to(arr, target_shape) が成功する必要がある。整形不能な場合や不正な形状には ValueError を送出し、処理を loguru で記録せよ。
要件 多次元整形/前方軸追加/ゼロパディング/非破壊処理/ブロードキャスト検証/log出力/例外処理
発展仕様
  • 次元不足時の自動reshape(e.g. (3,) → (1, 3))
  • pad_width自動計算
  • ブロードキャスト可能性チェック
  • 変換後の整合性ログ記録
使用構文 np.pad, np.broadcast_to, tuple, arr.ndim, arr.shape, zip, ValueError, loguru.logger, np.reshape

A.4

■ 模範解答

import numpy as np
from loguru import logger
from typing import Tuple

def broadcast_pad(arr: np.ndarray, target_shape: Tuple[int]) -> np.ndarray:
    try:
        # 型検証:target_shapeはタプルかつ整数値か
        if not isinstance(target_shape, tuple) or not all(isinstance(x, int) for x in target_shape):
            raise TypeError("target_shape must be a tuple of integers.")

        # 元配列の次元取得
        arr_shape = arr.shape
        arr_ndim = arr.ndim
        target_ndim = len(target_shape)

        logger.debug(f"Original shape: {arr_shape}, Target shape: {target_shape}")

        # Step1: 次元数が足りない場合は先頭に次元追加
        if arr_ndim < target_ndim:
            new_shape = (1,) * (target_ndim - arr_ndim) + arr_shape
            arr = arr.reshape(new_shape)
            logger.debug(f"Reshaped to match ndim: {arr.shape}")

        # Step2: パディングサイズを算出(不足する次元にゼロパディング)
        pad_width = []
        for arr_dim, tgt_dim in zip(arr.shape, target_shape):
            if arr_dim > tgt_dim:
                raise ValueError(f"Cannot broadcast: array dim {arr_dim} > target dim {tgt_dim}")
            pad_before = 0
            pad_after = tgt_dim - arr_dim
            pad_width.append((pad_before, pad_after))

        logger.debug(f"Pad width: {pad_width}")

        # Step3: ゼロでパディング(非破壊)
        padded = np.pad(arr, pad_width, mode='constant', constant_values=0)

        # Step4: ブロードキャスト検証(念のため明示的に試す)
        try:
            broadcasted = np.broadcast_to(padded, target_shape)
        except ValueError as e:
            raise ValueError(f"Cannot broadcast to target shape {target_shape}: {e}")

        logger.info("broadcast_pad completed successfully.")
        return broadcasted

    except Exception as e:
        logger.exception(f"Error in broadcast_pad: {e}")
        raise
実行例1:正常系(1次元 → 2次元パディング)
arr = np.array([1, 2, 3])  # shape (3,)
target_shape = (2, 3)      # 先頭に1次元追加+ゼロパディング行を1つ

result = broadcast_pad(arr, target_shape)
print(result)
実行結果1
[[1 2 3]
 [0 0 0]]
実行ログ1
DEBUG    Original shape: (3,), Target shape: (2, 3)
DEBUG    Reshaped to match ndim: (1, 3)
DEBUG    Pad width: [(0, 1), (0, 0)]
INFO     broadcast_pad completed successfully.
実行例2:異常系(broadcast不可能)
arr = np.ones((4, 5))
target_shape = (2, 5)  # 小さくする方向にはpadできない

broadcast_pad(arr, target_shape)
実行結果2
ValueError: Cannot broadcast: array dim 4 > target dim 2
実行ログ2
DEBUG    Original shape: (4, 5), Target shape: (2, 5)
ERROR    Error in broadcast_pad: Cannot broadcast: array dim 4 > target dim 2

■ 文法・構文まとめ

構文 解説
np.reshape 次元数を合わせるために (3,) → (1, 3) のような整形を行う
np.pad(..., mode="constant", constant_values=0) 指定軸方向にゼロでパディングを施す
np.broadcast_to 対象配列が特定のshapeにブロードキャスト可能か確認/失敗すれば ValueError を送出
zip(arr.shape, target_shape) 各軸ごとに配列サイズとターゲットサイズを比較し、必要なパディングを算出
loguru DEBUG, INFO, ERROR ログを記録。エラー原因の追跡に有効

■ 高速化とPythonicポイント

項目 説明
reshape+pad deepcopy を使わずに元配列を拡張する高速処理
tuple操作 パディング計算において zip+リスト操作で分岐のないPythonic処理
broadcast_to検証 NumPy内部でCレベルでのチェックを行うため確実かつ高速

Q.5

項目 内容
概要 正方行列の対角要素に特化した抽出・変換・走査を提供する DiagonalMasker クラスを実装せよ。元配列は非破壊で保持され、対角成分のみに対して変更・逐次処理が可能であること。
問題文 クラス DiagonalMasker(arr: np.ndarray) を定義せよ。
  • プロパティ diagonal で現在の対角要素を取得
  • apply(func) でクロージャや関数を通じて対角成分のみを変換
  • __iter__ / __next__ により対角要素を逐次取得
  • 非正方行列に対しては ValueError を送出
  • 処理は loguru で記録し、__repr__ も実装
要件 対角抽出/走査/関数適用による変更/非破壊設計/例外処理/repr/log記録
発展仕様
  • NumPyベクトル化演算での np.diag, np.fill_diagonal 使用
  • callableでない関数拒否
  • 現在の状態ログ出力
  • 再利用可能なイテレータ制御
使用構文 np.diag, np.fill_diagonal, np.copy, __iter__, __next__, @property, callable, ValueError, StopIteration, loguru, __repr__

A.5

■ 模範解答

import numpy as np
from loguru import logger

class DiagonalMasker:
    def __init__(self, arr: np.ndarray):
        # 入力が正方2次元配列か検証
        if not isinstance(arr, np.ndarray) or arr.ndim != 2:
            raise TypeError("Input must be a 2D numpy.ndarray.")
        if arr.shape[0] != arr.shape[1]:
            raise ValueError("Input array must be square.")

        self._original = np.copy(arr)  # 元配列は非破壊で保存
        self._current = np.copy(arr)   # 現在の加工対象
        self._size = arr.shape[0]      # 対角の長さ
        self._index = 0                # イテレータ用インデックス

        logger.info("DiagonalMasker initialized.")

    @property
    def diagonal(self) -> np.ndarray:
        # 対角成分を抽出
        diag = np.diag(self._current)
        logger.debug(f"Accessed diagonal: {diag}")
        return diag

    def apply(self, func):
        # 関数のcallable性を検証
        if not callable(func):
            raise TypeError("The argument to apply() must be callable.")
        # 対角要素を取り出し、変換関数を適用(ベクトル処理)
        diag = np.diag(self._current)
        new_diag = func(diag)
        if len(new_diag) != self._size:
            raise ValueError("Transformed diagonal must match original length.")
        # np.fill_diagonalで変更(破壊的だが self._current にのみ適用)
        np.fill_diagonal(self._current, new_diag)
        logger.info(f"Diagonal updated using {func.__name__ if hasattr(func, '__name__') else str(func)}.")

    def __iter__(self):
        # イテレータ初期化(再利用可能に)
        self._index = 0
        return self

    def __next__(self):
        if self._index >= self._size:
            raise StopIteration
        val = self._current[self._index, self._index]
        logger.debug(f"Yielded diagonal[{self._index}]: {val}")
        self._index += 1
        return val

    def __repr__(self):
        return f"<DiagonalMasker(size={self._size}, diagonal={self.diagonal})>"

    def get_array(self) -> np.ndarray:
        # 現在の状態を返す(外部から安全にアクセス)
        return np.copy(self._current)
実行例1:正常系(対角加算)
arr = np.array([
    [1, 2, 3],
    [4, 5, 6],
    [7, 8, 9]
])

masker = DiagonalMasker(arr)

# 対角に +10 を加算するクロージャ適用
masker.apply(lambda x: x + 10)

# 加工後配列出力
print(masker.get_array())

# 逐次処理
for val in masker:
    print("Diagonal value:", val)
実行結果1
[[11  2  3]
 [ 4 15  6]
 [ 7  8 19]]
Diagonal value: 11
Diagonal value: 15
Diagonal value: 19
実行ログ1
INFO     DiagonalMasker initialized.
DEBUG    Accessed diagonal: [1 5 9]
INFO     Diagonal updated using <lambda>.
DEBUG    Yielded diagonal[0]: 11
DEBUG    Yielded diagonal[1]: 15
DEBUG    Yielded diagonal[2]: 19
実行例2:異常系(非正方行列)
arr = np.array([[1, 2, 3], [4, 5, 6]])

masker = DiagonalMasker(arr)
実行結果2
ValueError: Input array must be square.
実行ログ2

■ 文法・構文まとめ

構文 解説
np.diag(arr) 対角成分だけを抽出するNumPyベクトル演算。1行で効率よく処理可能
np.fill_diagonal(arr, x) 配列の対角に一括で値を埋め込む高速なNumPy操作
callable(obj) 関数やクロージャなど「呼び出し可能なもの」かどうかの検査
__iter__, __next__ Pythonイテレータプロトコルに準拠した逐次走査の仕組み
@property .diagonal のように属性アクセス形式でメソッドの値を取得させる
loguru logger.info, logger.debug, logger.exception で柔軟なログ出力

Q.6

項目 内容
概要 ランダムに1列を選択して定数または関数で加工する ColumnSelector クラスを設計せよ。非破壊的処理、NumPyによるベクトル化処理、関数型引数による高階関数処理、ログ記録、シードによる再現性制御を含む。
問題文 クラス ColumnSelector(n_cols: int, seed: Optional[int] = None) を定義し、
メソッド __call__(arr: np.ndarray, fn: Union[Callable, int, float]) -> np.ndarray にて、
列方向にランダムに1列を選び、定数または関数によって加工した新しい配列を返す関数を設計せよ。関数は列全体または各要素に対して適用可能とし、元配列を変更せず、loguruにより処理のログを出力すること。
要件 ランダム列選択/シード固定可/定数・関数いずれも受け入れる/非破壊処理/ログ出力/例外処理
発展仕様 callable検査/列数不一致エラーチェック/ベクトル化変換関数に優先対応/失敗時詳細ログ記録
使用構文 np.random.default_rng, np.copy, fancy indexing, callable, np.vectorize, loguru, __call__, ValueError, TypeError

A.6

■ 模範解答

import numpy as np
from loguru import logger
from typing import Union, Callable, Optional

class ColumnSelector:
    def __init__(self, n_cols: int, seed: Optional[int] = None):
        # 列数は正の整数
        if not isinstance(n_cols, int) or n_cols <= 0:
            raise ValueError("n_cols must be a positive integer.")
        self.n_cols = n_cols
        self.rng = np.random.default_rng(seed)  # 再現性ある乱数生成器
        logger.info(f"ColumnSelector initialized with n_cols={n_cols}, seed={seed}")

    def __call__(self, arr: np.ndarray, fn: Union[Callable, int, float]) -> np.ndarray:
        try:
            # 入力配列の型・形状チェック
            if not isinstance(arr, np.ndarray) or arr.ndim != 2:
                raise TypeError("Input must be a 2D numpy.ndarray.")
            if arr.shape[1] != self.n_cols:
                raise ValueError(f"Expected {self.n_cols} columns, but got {arr.shape[1]}.")

            # ランダムに1列選択
            col_index = self.rng.integers(low=0, high=self.n_cols)
            logger.debug(f"Randomly selected column index: {col_index}")

            # 元配列を非破壊コピー
            result = np.copy(arr)

            # 対象列の抽出
            column = result[:, col_index]

            # 加工関数の処理
            if callable(fn):
                try:
                    # 列全体を変換してshape維持(ベクトル演算想定)
                    new_column = fn(column)

                    # ベクトル化できない場合は要素単位で対応
                    if not isinstance(new_column, np.ndarray):
                        new_column = np.vectorize(fn)(column)

                except Exception as e:
                    logger.exception("Function application failed.")
                    raise ValueError(f"Failed to apply function to column: {e}")
            else:
                # 定数による置換
                new_column = np.full_like(column, fn)
                logger.debug(f"Column replaced with constant: {fn}")

            # 結果を置換
            result[:, col_index] = new_column
            logger.info(f"Column {col_index} successfully transformed.")
            return result

        except Exception as e:
            logger.exception(f"Error in ColumnSelector: {e}")
            raise
実行例1:関数で変換(ランダム列を2倍)
arr = np.array([
    [1, 2, 3],
    [4, 5, 6],
    [7, 8, 9]
])

selector = ColumnSelector(n_cols=3, seed=42)
new_arr = selector(arr, lambda x: x * 2)
print(new_arr)
実行結果1
[[ 1  2  6]
 [ 4  5 12]
 [ 7  8 18]]
実行ログ1
INFO     ColumnSelector initialized with n_cols=3, seed=42
DEBUG    Randomly selected column index: 2
INFO     Column 2 successfully transformed.
実行例2:定数で置換(ランダム列を0に)
arr = np.array([
    [10, 20, 30],
    [40, 50, 60],
    [70, 80, 90]
])

selector = ColumnSelector(n_cols=3, seed=1)
new_arr = selector(arr, 0)
print(new_arr)
実行結果2
[[10  0 30]
 [40  0 60]
 [70  0 90]]
実行ログ2
INFO     ColumnSelector initialized with n_cols=3, seed=1
DEBUG    Randomly selected column index: 1
DEBUG    Column replaced with constant: 0
INFO     Column 1 successfully transformed.

■ 文法・構文まとめ

文法・機能 解説
np.random.default_rng(seed) 安全・高速な乱数生成器(再現性あり)
callable(fn) 引数が関数かどうかを判定
np.vectorize(fn) 要素ごとの処理に対応した関数ベクトル化
np.full_like 配列と同じ形状・dtypeの定数埋め配列を生成
loguru.logger DEBUGINFOERROR で柔軟なログ制御
__call__ selector(arr, fn) のようにインスタンスを関数のように使用可能にする

Q.7

項目 内容
概要 OutlierRemover(axis: int = 0) クラスを設計し、transform(arr) で IQR法(四分位範囲)により外れ値を検出し、指定軸方向で np.nan に置換した新たな配列を返す。元配列は変更せず、全ての処理ログを loguru で出力する。
問題文 IQR法(外れ値 = Q1 - 1.5×IQR未満 または Q3 + 1.5×IQR超過)に従って、配列中の外れ値を np.nan に置換する OutlierRemover クラスを設計せよ。
NaNの事前検出、軸方向指定、非破壊処理、全処理ログ、dtype検査、例外処理を含むこと。
要件 IQR計算/軸指定(列・行)/ベクトル化マスク処理/非破壊/NaN検知/log記録/例外送出
発展仕様
  • 事前にNaN存在を検知してログに警告
  • float配列以外は明示的に拒否
  • 軸方向で一括処理
  • ロジカル演算でマスク
使用構文 np.percentile, np.where, np.copy, np.isnan, np.logical_or, np.newaxis, loguru.logger, ValueError
                                                                  |

A.7

■ 模範解答

import numpy as np
from loguru import logger

class OutlierRemover:
    def __init__(self, axis: int = 0):
        # 軸は0(列)または1(行)に限定
        if axis not in (0, 1):
            raise ValueError("axis must be 0 (column-wise) or 1 (row-wise).")
        self.axis = axis
        logger.info(f"OutlierRemover initialized with axis={axis}")

    def transform(self, arr: np.ndarray) -> np.ndarray:
        try:
            # 型チェック(float系の数値配列のみ)
            if not isinstance(arr, np.ndarray) or arr.ndim != 2:
                raise TypeError("Input must be a 2D numpy.ndarray.")
            if not np.issubdtype(arr.dtype, np.floating):
                raise TypeError("Input array must have float dtype for NaN support.")

            # NaN検出とログ
            if np.isnan(arr).any():
                logger.warning("Input array contains NaN values. These will be preserved.")

            # 非破壊コピー
            result = np.copy(arr)

            # Q1, Q3, IQRをベクトル化で計算
            Q1 = np.nanpercentile(result, 25, axis=self.axis, keepdims=True)
            Q3 = np.nanpercentile(result, 75, axis=self.axis, keepdims=True)
            IQR = Q3 - Q1

            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR

            logger.debug(f"Q1: {Q1.flatten()}")
            logger.debug(f"Q3: {Q3.flatten()}")
            logger.debug(f"IQR: {IQR.flatten()}")

            # 論理マスクで外れ値を検出(ブロードキャスト活用)
            mask = np.logical_or(result < lower_bound, result > upper_bound)

            # 外れ値をnp.nanに置換
            result[mask] = np.nan

            logger.info("Outlier removal completed.")
            return result

        except Exception as e:
            logger.exception(f"Failed to transform array: {e}")
            raise
実行例1:正常系(列方向)
arr = np.array([
    [10.0, 20.0, 30.0],
    [12.0, 21.0, 500.0],
    [11.0, 22.0, 31.0],
    [13.0, 19.0, 29.0]
])

remover = OutlierRemover(axis=0)
out = remover.transform(arr)
print(out)
実行結果1
[[10. 20. 30.]
 [12. 21. nan]
 [11. 22. 31.]
 [13. 19. 29.]]
実行ログ1
INFO     OutlierRemover initialized with axis=0
DEBUG    Q1: [10.75 19.75 29.5 ]
DEBUG    Q3: [12.75 21.25 30.5 ]
DEBUG    IQR: [2.  1.5 1. ]
INFO     Outlier removal completed.
実行例2:異常系(整数型入力)
arr = np.array([
    [1, 2, 3],
    [4, 100, 6]
])  # int 型(NaN入らない)

remover = OutlierRemover(axis=1)
remover.transform(arr)
実行結果2
TypeError: Input array must have float dtype for NaN support.
実行ログ2
INFO     OutlierRemover initialized with axis=1
ERROR    Failed to transform array: Input array must have float dtype for NaN support.

■ 文法・構文まとめ

機能 解説
np.percentile(..., axis) 軸方向に対してQ1, Q3を効率的に取得(ベクトル化)
np.logical_or(...) マスク演算で閾値外の条件を一括で処理
np.nanpercentile(...) NaNを無視して四分位を取得
np.copy() 元配列の非破壊処理
np.isnan() NaN存在検出
loguru.logger INFO, DEBUG, ERROR, WARNING を使い分けて処理を追跡可能に

Q.8

項目 内容
概要 2次元配列 arr 内において、サブ配列 pattern に一致するすべての位置を検索し、それぞれに replacement を上書きする。NumPyのスライディングウィンドウを用いた高速検索、ブロードキャストとインデックス操作を活用し、元配列を変更せずに結果を返すこと。
問題文 関数 pattern_replace(arr, pattern, replacement) を定義せよ。
arr は2D配列、pattern は部分一致対象となるサブ配列、replacement は置換対象のサブブロックである。
すべての pattern に一致する位置を replacement に置換した新しい配列を返すこと。
配列次元の整合性、マッチの存在、整合性不備などに対してはログ出力し、例外処理を行うこと。
要件
  • スライディングウィンドウを使用(np.lib.stride_tricks.sliding_window_view
  • パターン一致位置の高速検出
  • 一致箇所の非破壊的な一括置換
  • ベクトル化
  • 例外処理・ログ出力
使用構文 np.lib.stride_tricks.sliding_window_view, np.copy, np.all, np.array_equal, fancy indexing, loguru.logger, try-except, ValueError

A.8

■ 模範解答

import numpy as np
from loguru import logger
from numpy.lib.stride_tricks import sliding_window_view

def pattern_replace(arr: np.ndarray, pattern: np.ndarray, replacement: np.ndarray) -> np.ndarray:
    try:
        # 入力検証:すべて2Dであること
        if not all(isinstance(x, np.ndarray) and x.ndim == 2 for x in [arr, pattern, replacement]):
            raise TypeError("All inputs must be 2D numpy arrays.")

        arr_rows, arr_cols = arr.shape
        pat_rows, pat_cols = pattern.shape
        rep_rows, rep_cols = replacement.shape

        # パターン・置換ブロックの形状一致
        if pattern.shape != replacement.shape:
            raise ValueError(f"pattern shape {pattern.shape} and replacement shape {replacement.shape} must match.")

        # 元配列にパターンが入りきるか
        if arr_rows < pat_rows or arr_cols < pat_cols:
            raise ValueError("Pattern is larger than the input array.")

        # 非破壊コピー
        result = np.copy(arr)

        # スライディングウィンドウで部分ブロックを抽出(形状: (H, W, ph, pw))
        windows = sliding_window_view(arr, pattern.shape)

        # 一致判定マスクを作成(形状: (H, W))
        match_mask = np.all(windows == pattern, axis=(-2, -1))

        # 一致位置のインデックス取得
        match_indices = np.argwhere(match_mask)

        logger.info(f"Found {len(match_indices)} match(es).")

        for i, j in match_indices:
            # 対象ブロックを result に置換
            result[i:i+pat_rows, j:j+pat_cols] = replacement
            logger.debug(f"Replaced block at position ({i}, {j})")

        return result

    except Exception as e:
        logger.exception(f"pattern_replace failed: {e}")
        raise
実行例1:1箇所に一致 → 置換成功
arr = np.array([
    [1, 1, 1, 1],
    [1, 2, 2, 1],
    [1, 2, 2, 1],
    [1, 1, 1, 1]
])

pattern = np.array([
    [2, 2],
    [2, 2]
])

replacement = np.array([
    [9, 9],
    [9, 9]
])

res = pattern_replace(arr, pattern, replacement)
print(res)
実行結果1
[[1 1 1 1]
 [1 9 9 1]
 [1 9 9 1]
 [1 1 1 1]]
実行ログ1
INFO     Found 1 match(es).
DEBUG    Replaced block at position (1, 1)
実行例2:一致なし → 未変更
arr = np.array([
    [1, 2, 3],
    [4, 5, 6],
    [7, 8, 9]
])

pattern = np.array([
    [9, 9],
    [9, 9]
])

replacement = np.array([
    [0, 0],
    [0, 0]
])

res = pattern_replace(arr, pattern, replacement)
print(res)
実行結果2
[[1 2 3]
 [4 5 6]
 [7 8 9]]
実行ログ2
INFO     Found 0 match(es).

■ 文法・構文まとめ

構文 解説
sliding_window_view(arr, shape) 入力配列から全領域の形状shapeの部分配列をスライド抽出(超高速)
np.all(..., axis=(-2,-1)) 各サブ配列が pattern と完全一致するかブール配列に変換
np.argwhere(mask) True のインデックス(i,j)をリスト形式で取得
result[i:i+ph, j:j+pw] = replacement マッチした位置を replacement で一括置換(スライスによるベクトル処理)
np.copy() 元配列を壊さない非破壊処理
loguru ログ出力で処理過程・例外・警告を記録。開発・テスト・運用でも追跡可能

Q.9

項目 内容
概要 多次元配列において指定軸をflatten(畳み込み)し、他軸に沿った形で mean, min, max を求める関数 collapse_axis(arr, axis) を設計する。元配列は非破壊で扱い、異常時は loguru に記録しつつ例外を送出する。
問題文 関数 collapse_axis(arr: np.ndarray, axis: int) -> dict[str, np.ndarray] を定義せよ。
多次元配列 arr に対して、指定 axis をflattenし、残る軸方向ごとの mean, min, max を計算した辞書を返すこと。
軸指定が不正な場合、reshapeに失敗した場合、いずれも ValueError を発生させること。すべての操作は loguru に記録すること。元配列は変更しないこと。
要件 flatten操作/他軸統計の辞書出力/非破壊処理/軸検証/reshape検証/ログ出力
発展仕様
  • 統計量の出力形式を {'mean': ..., 'min': ..., 'max': ...} と統一
  • 多次元配列全体対応
  • 処理に失敗した場合はログに詳細出力+例外送出
  • np.moveaxis + reshape による高速化
使用構文 np.moveaxis, np.reshape, np.mean, np.min, np.max, dict, try-except, ValueError, loguru.logger, np.copy, np.ndarray.shape, arr.ndim

A.9

■ 模範解答

import numpy as np
from loguru import logger
from typing import Dict

def collapse_axis(arr: np.ndarray, axis: int) -> Dict[str, np.ndarray]:
    try:
        # 入力がNumPy配列であることを確認
        if not isinstance(arr, np.ndarray):
            raise TypeError("Input must be a NumPy array.")
        if axis < 0 or axis >= arr.ndim:
            raise ValueError(f"Axis {axis} is out of bounds for array with shape {arr.shape}")

        logger.info(f"Collapsing axis {axis} for array with shape {arr.shape}")

        # 指定軸を先頭に移動(他軸との整合を簡単にする)
        moved = np.moveaxis(arr, axis, 0)  # 形状: (D, A, B, ...) → (axis_dim, ...)
        axis_dim = moved.shape[0]
        rest_shape = moved.shape[1:]  # flatten対象外の軸の形状

        # reshapeして (axis_dim, -1) の形に(例: (3,4,5) → (3,20))
        collapsed = moved.reshape(axis_dim, -1)

        logger.debug(f"Collapsed shape: {collapsed.shape}")

        # 軸方向に統計計算(残り軸ごとに計算)
        mean = np.mean(collapsed, axis=0).reshape(rest_shape)
        min_ = np.min(collapsed, axis=0).reshape(rest_shape)
        max_ = np.max(collapsed, axis=0).reshape(rest_shape)

        logger.info("Statistical computation completed successfully.")

        return {"mean": mean, "min": min_, "max": max_}

    except Exception as e:
        logger.exception(f"Failed to collapse axis {axis}: {e}")
        raise ValueError(f"collapse_axis failed: {e}")
実行例1:3次元配列の axis=0 を collapse
arr = np.array([
    [[1, 2], [3, 4]],
    [[5, 6], [7, 8]],
    [[9,10], [11,12]]
])  # shape = (3, 2, 2)

res = collapse_axis(arr, axis=0)
print("mean:\n", res["mean"])
print("min:\n", res["min"])
print("max:\n", res["max"])
実行結果1
mean:
 [[5. 6.]
 [7. 8.]]
min:
 [[1 2]
 [3 4]]
max:
 [[ 9 10]
 [11 12]]
実行ログ1
INFO     Collapsing axis 0 for array with shape (3, 2, 2)
DEBUG    Collapsed shape: (3, 4)
INFO     Statistical computation completed successfully.
実行例2:不正な axis 指定
arr = np.random.rand(4, 5)

# axis=2 は範囲外
collapse_axis(arr, axis=2)
実行結果2
ValueError: collapse_axis failed: Axis 2 is out of bounds for array with shape (4, 5)
実行ログ2
INFO     Collapsing axis 2 for array with shape (4, 5)
ERROR    Failed to collapse axis 2: Axis 2 is out of bounds for array with shape (4, 5)

■ 文法・構文まとめ

文法/構文 解説
np.moveaxis(arr, axis, 0) 任意の軸を先頭に移動し、処理しやすくする(reshape前処理)
np.reshape(..., (-1,)) flattenする形で指定軸を集約。高次元でも高速かつ安全
np.mean(..., axis=0) flattenした後の各位置(元の他軸)における平均値
reshape(rest_shape) 統計量を元の軸の形に戻すことで意味のある構造を保つ
loguru logger.info, logger.debug, logger.exception によって処理ログを出力
try-except reshape や型不整合時のエラーを捕捉し、ログ記録と例外送出を両立

Q.10

項目 内容
概要 クラス SparseEmbedder(num_classes: int) を実装し、__call__(labels: np.ndarray) によって 1D整数ラベル配列を one-hot 行列に変換して返すこと。
内部で np.eye を用い、範囲外・非整数の入力には例外を送出し、loguru に処理を記録する。
問題文 クラス SparseEmbedder を実装せよ。
コンストラクタでクラス数 num_classes を指定し、メソッド __call__(labels) により、NumPy配列 labels の各整数ラベルを one-hot エンコーディングして返すこと。
以下の仕様をすべて満たすこと:
  • np.eye + fancy indexing で高速に one-hot 行列を生成
  • 非破壊処理
  • ラベルが整数型でない、または範囲外なら例外とする
  • 全操作を loguru に記録
要件 np.eye/fancy indexing/dtype検証/範囲検証/log記録/例外送出
発展仕様
  • float型などはTypeError送出
  • 負のラベルや >= num_classes はValueError
  • log出力に変換前ラベル数・出力shape・失敗原因を含める
使用構文 np.eye, fancy indexing, astype, assert, ValueError, TypeError, __call__, loguru.logger

A.10

■ 模範解答

import numpy as np
from loguru import logger

class SparseEmbedder:
    def __init__(self, num_classes: int):
        # クラス数の検証(1以上の整数)
        if not isinstance(num_classes, int) or num_classes <= 0:
            raise ValueError("num_classes must be a positive integer.")
        self.num_classes = num_classes
        logger.info(f"SparseEmbedder initialized with num_classes={num_classes}")

    def __call__(self, labels: np.ndarray) -> np.ndarray:
        try:
            # ラベルが1次元のNumPy配列かどうか
            if not isinstance(labels, np.ndarray) or labels.ndim != 1:
                raise TypeError("labels must be a 1D numpy array.")

            # 整数であることを確認(floatやstrは除外)
            if not np.issubdtype(labels.dtype, np.integer):
                raise TypeError("labels must contain integers.")

            # ラベルがクラス数未満かつ非負かを検査
            if np.any(labels < 0) or np.any(labels >= self.num_classes):
                raise ValueError(f"Labels must be in range [0, {self.num_classes - 1}].")

            logger.debug(f"Encoding labels: {labels}")

            # One-hotエンコードを生成(np.eyeで単位行列→fancy indexing)
            one_hot_matrix = np.eye(self.num_classes, dtype=np.float32)[labels]

            logger.info(f"One-hot encoded shape: {one_hot_matrix.shape}")
            return one_hot_matrix

        except Exception as e:
            logger.exception(f"Embedding failed: {e}")
            raise
実行例1:正常系(0〜3のラベルをone-hotに変換)
labels = np.array([0, 2, 1, 3])
embedder = SparseEmbedder(num_classes=4)
one_hot = embedder(labels)
print(one_hot)
実行結果1
[[1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [0. 1. 0. 0.]
 [0. 0. 0. 1.]]
実行ログ1
INFO     SparseEmbedder initialized with num_classes=4
DEBUG    Encoding labels: [0 2 1 3]
INFO     One-hot encoded shape: (4, 4)
実行例2:異常系(ラベルが範囲外)
labels = np.array([0, 1, 5])  # 5はクラス数4の範囲外
embedder = SparseEmbedder(num_classes=4)
embedder(labels)
実行結果2
ValueError: Labels must be in range [0, 3].
実行ログ2
INFO     SparseEmbedder initialized with num_classes=4
ERROR    Embedding failed: Labels must be in range [0, 3].

■ 文法・構文まとめ

構文 解説
np.eye(n) n×n の単位行列を生成(1-hotの土台)
arr[labels] fancy indexing による行取り出しで one-hot に変換
np.issubdtype(dtype, np.integer) データ型が整数であるかを判定
logger.info, logger.exception loguruによるログ出力(開発・本番で可視化)

Q.11

項目 内容
概要 任意ラベル配列を一意な連番IDに変換し、逆変換も可能な IndexMapper クラスを設計せよ。fit, transform, inverse_transform を備え、変換辞書の両方向保持、未fit状態の利用禁止、ベクトル化による高速変換、例外処理、@classmethodによる辞書初期化、ログ出力を実装すること。
問題文 クラス IndexMapper を定義し、
1) fit(labels) で一意ラベル→IDマッピング辞書と逆引き辞書を生成
2) transform(labels) で入力を整数IDに変換
3) inverse_transform(ids) で整数IDからラベルに逆変換
4) from_dict(d) で辞書から初期化可能
未fit利用・逆変換エラー・型不整合は例外+ログ出力すること。
要件 unique→ID変換/双方向辞書保持/np.vectorize変換/未fit・不正ID検知/classmethod初期化/log記録/詳細な例外
発展仕様
  • 辞書からの初期化(@classmethod
  • 未fit状態例外
  • 逆変換時の範囲外ID例外
  • np.vectorizeによる一括変換
  • fit済み判定プロパティ
  • loguruによる全イベントログ
使用構文 np.unique, np.vectorize, dict, @classmethod, ValueError, RuntimeError, loguru.logger, @property, set

A.11

■ 模範解答

import numpy as np
from loguru import logger

class IndexMapper:
    def __init__(self):
        self._label2id = None         # ラベル→IDマッピング
        self._id2label = None         # ID→ラベルマッピング
        self._fitted = False          # fit済みフラグ

    def fit(self, labels):
        # 入力検証
        if not isinstance(labels, (np.ndarray, list)):
            raise TypeError("labels must be a numpy array or list.")
        # 一意ラベル抽出
        unique = np.unique(labels)
        logger.info(f"Unique labels found: {unique}")

        # ラベル→ID辞書生成(順序保証)
        self._label2id = {label: idx for idx, label in enumerate(unique)}
        # ID→ラベル辞書
        self._id2label = {idx: label for label, idx in self._label2id.items()}
        self._fitted = True
        logger.info("IndexMapper fitted successfully.")

    def transform(self, labels):
        if not self._fitted:
            logger.error("IndexMapper must be fitted before transform.")
            raise RuntimeError("IndexMapper is not fitted yet.")
        # ベクトル化変換
        def _to_id(label):
            if label not in self._label2id:
                logger.error(f"Label '{label}' not found in mapping.")
                raise ValueError(f"Label '{label}' not found in mapping.")
            return self._label2id[label]
        vec_to_id = np.vectorize(_to_id)
        result = vec_to_id(labels)
        logger.info(f"Transform complete. Labels: {labels} → IDs: {result}")
        return result

    def inverse_transform(self, ids):
        if not self._fitted:
            logger.error("IndexMapper must be fitted before inverse_transform.")
            raise RuntimeError("IndexMapper is not fitted yet.")
        # ベクトル化逆変換
        def _to_label(idx):
            if idx not in self._id2label:
                logger.error(f"ID '{idx}' is invalid.")
                raise ValueError(f"ID '{idx}' is invalid.")
            return self._id2label[idx]
        vec_to_label = np.vectorize(_to_label)
        result = vec_to_label(ids)
        logger.info(f"Inverse transform complete. IDs: {ids} → Labels: {result}")
        return result

    @classmethod
    def from_dict(cls, label2id: dict):
        # 辞書から直接初期化
        if not isinstance(label2id, dict) or not all(isinstance(v, int) for v in label2id.values()):
            logger.error("label2id must be a dict of label to int.")
            raise TypeError("label2id must be a dict mapping label to int.")
        instance = cls()
        instance._label2id = dict(label2id)
        instance._id2label = {idx: label for label, idx in label2id.items()}
        instance._fitted = True
        logger.info("IndexMapper initialized from dict.")
        return instance

    @property
    def is_fitted(self):
        return self._fitted

    def __repr__(self):
        if not self._fitted:
            return "<IndexMapper (unfitted)>"
        return f"<IndexMapper (n_labels={len(self._label2id)})>"
実行例1:正常系(fit→transform→逆変換)
labels = np.array(['dog', 'cat', 'dog', 'bird'])
mapper = IndexMapper()
mapper.fit(labels)
print("label2id:", mapper._label2id)

ids = mapper.transform(['dog', 'bird'])
print("to ids:", ids)

rev = mapper.inverse_transform(ids)
print("back to labels:", rev)
実行結果1
label2id: {'bird': 0, 'cat': 1, 'dog': 2}
to ids: [2 0]
back to labels: ['dog' 'bird']
実行ログ1
INFO     Unique labels found: ['bird' 'cat' 'dog']
INFO     IndexMapper fitted successfully.
INFO     Transform complete. Labels: ['dog' 'bird']  IDs: [2 0]
INFO     Inverse transform complete. IDs: [2 0]  Labels: ['dog' 'bird']
実行例2:異常系(fit前のtransform)
mapper = IndexMapper()
mapper.transform(['cat', 'dog'])
実行結果2
RuntimeError: IndexMapper is not fitted yet.
実行ログ2
ERROR    IndexMapper must be fitted before transform.

■ 文法・構文まとめ

構文・仕組み 解説
np.unique(labels) 一意なラベルを抜き出して順序付け配列で取得
dict[label: idx ...] ラベルからID, IDからラベルの双方向マップ
np.vectorize(fn) 要素ごとに関数適用を配列化。ループより高速&Pythonic
@classmethod 辞書(label2id)からクラスインスタンス生成
@property is_fittedでfit状態を安全に参照
例外+logger 全異常ケースで詳細エラー内容と状態をloguruに出力(開発・運用・テスト全てで有効)

Q.12

項目 内容
概要 任意ウィンドウ幅・集計関数を受け取り、1次元配列に対し sliding_window_view で高速にローリングウィンドウ展開+集計処理を行う。非破壊処理・型チェック・例外時の詳細log記録・汎用関数型集計など、設計の柔軟性と堅牢性を重視する。
問題文 クラス RollingAggregator(window_size: int, agg_func: Callable) を定義し、__call__(arr: np.ndarray) で1次元配列 arr に対し
ウィンドウ幅 window_size でrolling window集計(agg_func適用)を行い結果を返せ。
不正入力はValueError・TypeError・loguruで記録すること。
要件 sliding_window_view展開/任意関数型集計/入力・関数型検証/log出力/非破壊処理/ウィンドウ幅不正対応
発展仕様
  • 型・次元・window不正を厳格例外+logger記録
  • agg_funcのcallable性検証
  • apply_along_axisで効率的集計
  • 返り値はウィンドウ中心揃えの配列(shape=(N-window_size+1,))
使用構文 np.lib.stride_tricks.sliding_window_view, np.apply_along_axis, callable, ValueError, TypeError, __call__, loguru.logger

A.12

■ 模範解答

import numpy as np
from loguru import logger
from typing import Callable

class RollingAggregator:
    def __init__(self, window_size: int, agg_func: Callable):
        # ウィンドウ幅の検証
        if not isinstance(window_size, int) or window_size <= 0:
            logger.error("window_size must be a positive integer.")
            raise ValueError("window_size must be a positive integer.")
        if not callable(agg_func):
            logger.error("agg_func must be callable.")
            raise TypeError("agg_func must be callable.")
        self.window_size = window_size
        self.agg_func = agg_func
        logger.info(f"RollingAggregator initialized (window_size={window_size})")

    def __call__(self, arr: np.ndarray) -> np.ndarray:
        try:
            # 入力検証(1次元のみ)
            if not isinstance(arr, np.ndarray) or arr.ndim != 1:
                logger.error("Input must be a 1D numpy array.")
                raise TypeError("Input must be a 1D numpy array.")
            if arr.size < self.window_size:
                logger.error(f"Input length ({arr.size}) is less than window_size ({self.window_size}).")
                raise ValueError("Input length must be at least as large as window_size.")

            logger.debug(f"Applying sliding_window_view: arr.shape={arr.shape}, window_size={self.window_size}")

            # sliding_window_view で (N-window+1, window) shape配列生成(非破壊処理)
            windows = np.lib.stride_tricks.sliding_window_view(arr, self.window_size)

            logger.debug(f"Windows shape: {windows.shape}")

            # apply_along_axisで各windowにagg_func適用(ベクトル化集計)
            result = np.apply_along_axis(self.agg_func, 1, windows)

            logger.info("Rolling aggregation completed successfully.")
            return result

        except Exception as e:
            logger.exception(f"Rolling aggregation failed: {e}")
            raise
実行例1:mean(移動平均)
arr = np.array([1, 3, 5, 7, 9, 11])
roller = RollingAggregator(window_size=3, agg_func=np.mean)
print(roller(arr))
実行結果1
[3. 5. 7. 9.]
実行ログ1
INFO     RollingAggregator initialized (window_size=3)
DEBUG    Applying sliding_window_view: arr.shape=(6,), window_size=3
DEBUG    Windows shape: (4, 3)
INFO     Rolling aggregation completed successfully.
実行例2:異常系(window_size不正)
arr = np.arange(5)
roller = RollingAggregator(window_size=0, agg_func=np.sum)
roller(arr)
実行結果2
ValueError: window_size must be a positive integer.
実行ログ2
ERROR    window_size must be a positive integer.

■ 文法・構文まとめ

構文・機能 解説
sliding_window_view(arr, window_size) 配列を「重なりあり」部分窓にスキャンし高速でメモリ効率良く展開
apply_along_axis(func, axis, arr) 各窓(行)ごとに任意の関数(平均・合計・自作集計など)を適用
callable(obj) 関数として呼び出せるか(関数型・lambda・メソッド)を判定
loguru.logger DEBUG/INFO/ERROR/EXCEPTION ログで処理・例外を全自動記録

Q.13

項目 内容
概要 複素行列 A を受け、get_polar_form() で絶対値行列・位相行列を返し、project(u) で任意ベクトル u への複素内積射影を返すクラス ComplexMatrixDecomposer を設計せよ。エラー時は例外&loguruに記録すること。
問題文 クラス ComplexMatrixDecomposer(A: np.ndarray) を実装せよ。
get_polar_form() で極形式(abs, angle)の2行列を返す。
project(u: np.ndarray) でベクトル u への複素射影($A \mathbf{u} / (\mathbf{u}^\dagger \mathbf{u})$)を返す。
型不整合・shape違反・非複素型は例外+ログ出力すること。元行列・ベクトルは非破壊で扱うこと。
要件 np.abs, np.angle で極形式取得/np.dot@で射影計算/dtype, shapeの厳密検証/例外&log記録/複素ベクトル化演算/非破壊処理
発展仕様 ・複素型以外(実数やfloat型)はTypeError
・入力shape不一致・1次元ベクトルでない場合はValueError
・ノルムゼロの場合の例外
・loguruで全操作を記録
使用構文 np.abs, np.angle, np.dot, np.conj, np.linalg.norm, loguru.logger, try-except, ValueError, TypeError, @property

A.13

■ 模範解答

import numpy as np
from loguru import logger

class ComplexMatrixDecomposer:
    def __init__(self, A: np.ndarray):
        # 入力型・次元・複素型検証
        if not isinstance(A, np.ndarray):
            logger.error("A must be a numpy.ndarray.")
            raise TypeError("A must be a numpy.ndarray.")
        if not np.issubdtype(A.dtype, np.complexfloating):
            logger.error("A must be a complex-valued array.")
            raise TypeError("A must be a complex-valued array.")
        if A.ndim != 2:
            logger.error("A must be a 2D matrix.")
            raise ValueError("A must be a 2D matrix.")
        self._A = np.copy(A)  # 非破壊保存
        logger.info(f"ComplexMatrixDecomposer initialized: shape={A.shape}")

    def get_polar_form(self):
        # 複素行列を極形式に変換:絶対値・位相
        abs_mat = np.abs(self._A)
        angle_mat = np.angle(self._A)
        logger.debug(f"Abs matrix:\n{abs_mat}\nAngle matrix:\n{angle_mat}")
        return abs_mat, angle_mat

    def project(self, u: np.ndarray):
        # u型検証:1次元・複素型・shape一致
        if not isinstance(u, np.ndarray):
            logger.error("u must be a numpy.ndarray.")
            raise TypeError("u must be a numpy.ndarray.")
        if not np.issubdtype(u.dtype, np.complexfloating):
            logger.error("u must be a complex-valued array.")
            raise TypeError("u must be a complex-valued array.")
        if u.ndim != 1:
            logger.error("u must be a 1D vector.")
            raise ValueError("u must be a 1D vector.")
        if u.shape[0] != self._A.shape[1]:
            logger.error(f"Shape mismatch: A shape {self._A.shape}, u shape {u.shape}")
            raise ValueError("Shape mismatch between A and u.")

        # ノルム(二乗和)がゼロでないか検証
        norm2 = np.dot(np.conj(u), u)
        if norm2 == 0:
            logger.error("Projection vector u has zero norm.")
            raise ValueError("Projection vector u has zero norm.")

        # 複素射影 A @ u / (u† u)
        proj = np.dot(self._A, u) / norm2
        logger.info("Projection computed successfully.")
        return proj

    @property
    def matrix(self):
        # 保持している元の複素行列のコピーを返す
        return np.copy(self._A)
実行例1:正常系(極形式変換・射影)
A = np.array([[1+1j, 2-1j], [3+0j, 4+4j]], dtype=np.complex128)
decomposer = ComplexMatrixDecomposer(A)

# 極形式取得
abs_mat, angle_mat = decomposer.get_polar_form()
print("abs:\n", abs_mat)
print("angle:\n", angle_mat)

# 射影
u = np.array([1+0j, 0+1j], dtype=np.complex128)
proj = decomposer.project(u)
print("projected:", proj)
実行結果1
abs:
 [[1.41421356 2.23606798]
 [3.         5.65685425]]
angle:
 [[ 0.78539816 -0.46364761]
 [ 0.          0.78539816]]
projected: [0.5+1.5j 2.5+2.5j]
実行ログ1
INFO     ComplexMatrixDecomposer initialized: shape=(2, 2)
DEBUG    Abs matrix:
[[1.41421356 2.23606798]
 [3.         5.65685425]]
Angle matrix:
[[ 0.78539816 -0.46364761]
 [ 0.          0.78539816]]
INFO     Projection computed successfully.
実行例2:異常系(実数型A)
A = np.array([[1.0, 2.0], [3.0, 4.0]])
decomposer = ComplexMatrixDecomposer(A)
実行結果2
TypeError: A must be a complex-valued array.
実行ログ2
ERROR    A must be a complex-valued array.

■ 文法・構文まとめ

機能・構文 解説
np.abs(A) 複素行列Aの各要素について絶対値を一括計算
np.angle(A) 複素行列Aの各要素について偏角(位相角)を一括計算
np.dot(A, u) 行列Aとベクトルuの内積(行列ベクトル積)。射影の分子。
np.conj(u) ベクトルuの複素共役を返す。ノルム(二乗和)や射影で必須
/ norm2 複素内積射影でスカラー正規化(u† u≠0)
logger loguruによる INFO/DEBUG/ERROR で全処理・異常を記録
例外処理 型・shape不整合・ノルム0時などをPython例外+ログで詳細通知

Q.14

項目 内容
概要 2D配列を指定されたブロックサイズで分割し、ブロック単位でランダムにシャッフルする BlockShuffler クラス。reshape+swapaxesによる高次元化・ベクトル化、seed制御、例外・log対応も含める。
問題文 クラス BlockShuffler(block_rows: int, block_cols: int, seed: Optional[int] = None) を定義し、
__call__(arr)(block_rows, block_cols) ごとに分割したブロックをランダムシャッフルし新配列を返せ。
入力shape・ブロック不整合時は例外+log記録すること。
要件 reshape+swapaxesによるブロック分割/ブロック単位シャッフル/seed指定可/非破壊処理/例外・log記録
発展仕様
  • 入力shapeがブロック単位で割り切れない場合例外
  • ブロック数や分割整合性を厳密検証
  • 乱数生成の再現性(seed)
  • loguruによる全操作・異常記録
使用構文 np.reshape, np.swapaxes, np.random.default_rng, np.copy, loguru.logger, ValueError

A.14

■ 模範解答

import numpy as np
from loguru import logger
from typing import Optional

class BlockShuffler:
    def __init__(self, block_rows: int, block_cols: int, seed: Optional[int] = None):
        # ブロックサイズの正当性検証
        if not (isinstance(block_rows, int) and block_rows > 0 and isinstance(block_cols, int) and block_cols > 0):
            logger.error("Block sizes must be positive integers.")
            raise ValueError("Block sizes must be positive integers.")
        self.block_rows = block_rows
        self.block_cols = block_cols
        self.rng = np.random.default_rng(seed)
        logger.info(f"BlockShuffler initialized (block_rows={block_rows}, block_cols={block_cols}, seed={seed})")

    def __call__(self, arr: np.ndarray) -> np.ndarray:
        try:
            # 入力が2D配列か検証
            if not isinstance(arr, np.ndarray) or arr.ndim != 2:
                logger.error("Input must be a 2D numpy array.")
                raise TypeError("Input must be a 2D numpy array.")

            H, W = arr.shape
            br, bc = self.block_rows, self.block_cols

            # shapeがブロック単位で割り切れるか
            if H % br != 0 or W % bc != 0:
                logger.error(f"Array shape {arr.shape} not divisible by block size {(br, bc)}.")
                raise ValueError("Array shape not divisible by block size.")

            n_block_rows = H // br
            n_block_cols = W // bc

            logger.debug(f"Reshaping to ({n_block_rows}, {br}, {n_block_cols}, {bc})")

            # 1. reshapeで高次元化し、各ブロックを分離
            blocks = arr.reshape(n_block_rows, br, n_block_cols, bc)
            # 2. 軸を並び替え (n_block_rows, n_block_cols, br, bc)
            blocks = blocks.swapaxes(1, 2)

            # 3. 全ブロックを1次元にflatten→shuffle
            flat_blocks = blocks.reshape(-1, br, bc)
            self.rng.shuffle(flat_blocks)
            logger.info("Blocks shuffled.")

            # 4. 戻す:ブロックを(n_block_rows, n_block_cols, br, bc)に復元
            shuffled_blocks = flat_blocks.reshape(n_block_rows, n_block_cols, br, bc)
            # 5. 軸を戻し (n_block_rows, br, n_block_cols, bc)
            shuffled_blocks = shuffled_blocks.swapaxes(1, 2)
            # 6. 最終的に配列を元の形に結合
            result = shuffled_blocks.reshape(H, W)

            logger.info("Block shuffle completed successfully.")
            return result

        except Exception as e:
            logger.exception(f"Block shuffling failed: {e}")
            raise
実行例1:正常系(4×4配列・2×2ブロック)
arr = np.arange(16).reshape(4, 4)
shuffler = BlockShuffler(block_rows=2, block_cols=2, seed=42)
shuffled = shuffler(arr)
print(shuffled)
実行結果1
[[ 0  1 10 11]
 [ 2  3 12 13]
 [ 8  9  6  7]
 [14 15  4  5]]
実行ログ1
INFO     BlockShuffler initialized (block_rows=2, block_cols=2, seed=42)
DEBUG    Reshaping to (2, 2, 2, 2)
INFO     Blocks shuffled.
INFO     Block shuffle completed successfully.
実行例2:異常系(実数型A)
arr = np.arange(15).reshape(3, 5)
shuffler = BlockShuffler(block_rows=2, block_cols=2)
shuffler(arr)
実行結果2
ValueError: Array shape not divisible by block size.
実行ログ2
INFO     BlockShuffler initialized (block_rows=2, block_cols=2, seed=None)
ERROR    Array shape (3, 5) not divisible by block size (2, 2).

■ 文法・構文まとめ

構文・機能 解説
reshape(n1, b1, n2, b2) 配列をブロック単位で4次元に変換
swapaxes(1, 2) ブロック行・列の並び替え(分割単位を直感的にするため)
flatten→shuffle→reshape 全ブロックを1次元リストとして一括シャッフルし、その後復元
default_rng(seed) 安全・高速な乱数生成器。seed指定で再現性確保
logger.debug/info/error/exception loguruで全過程・異常時も漏れなく記録

Q.15

項目 内容
概要 非正方の2次元行列に対して、左右上下に等距離で0パディングし、最小の正方形へ変換する symmetric_pad 関数。
パディング幅の自動計算・非破壊・型安全・log記録も求められる。
問題文 symmetric_pad(arr: np.ndarray) -> np.ndarray を定義し、入力 arr が2Dでなければ例外送出し、必要な場合にのみ等距離0パディングで正方行列に整形して返すこと。パディング幅は自動算出。全処理・例外はloguruで記録すること。
要件 shape比較/pad幅自動算出/非破壊/2次元検証/log記録/ValueError例外
発展仕様
  • すでに正方形ならそのまま返却
  • 2次元以外はValueError
  • pad幅は(上,下),(左,右)それぞれ均等に計算し、余りが出たら下・右に多くpad
使用構文 np.pad, tuple unpacking, ValueError, loguru.logger, np.copy, divmod

A.15

■ 模範解答

import numpy as np
from loguru import logger

def symmetric_pad(arr: np.ndarray) -> np.ndarray:
    try:
        # 2次元配列であるか確認
        if not isinstance(arr, np.ndarray) or arr.ndim != 2:
            logger.error("Input must be a 2D numpy array.")
            raise ValueError("Input must be a 2D numpy array.")

        rows, cols = arr.shape
        logger.info(f"Input shape: {arr.shape}")

        # 既に正方形ならそのまま返却(非破壊コピー)
        if rows == cols:
            logger.info("Input is already square. Returning a copy.")
            return np.copy(arr)

        # 正方形にするための目標サイズ
        max_dim = max(rows, cols)

        # 各軸のpad幅(左右・上下それぞれ等分し、余りは右・下に追加)
        row_pad = max_dim - rows
        col_pad = max_dim - cols

        pad_top, pad_bottom = divmod(row_pad, 2)
        pad_left, pad_right = divmod(col_pad, 2)

        # np.pad用パラメータ
        pad_width = ((pad_top, pad_bottom), (pad_left, pad_right))
        logger.debug(f"Pad width: {pad_width}")

        # パディング適用
        padded = np.pad(arr, pad_width, mode="constant", constant_values=0)

        logger.info(f"Output shape: {padded.shape}")
        return padded

    except Exception as e:
        logger.exception(f"symmetric_pad failed: {e}")
        raise
実行例1:正常系(3×5配列を正方パディング)
arr = np.arange(15).reshape(3, 5)
result = symmetric_pad(arr)
print(result)
実行結果1
[[ 0  0  0  0  0]
 [ 0  0  1  2  3]
 [ 4  5  6  7  8]
 [ 9 10 11 12 13]
 [14  0  0  0  0]]
実行ログ1
INFO     Input shape: (3, 5)
DEBUG    Pad width: ((1, 1), (0, 0))
INFO     Output shape: (5, 5)
実行例2:異常系(2Dでない配列)
arr = np.array([1, 2, 3])
symmetric_pad(arr)
実行結果2
ValueError: Array shape not divisible by block size.
実行ログ2
ERROR    Input must be a 2D numpy array.

■ 文法・構文まとめ

機能・構文 解説
np.pad(arr, pad_width, mode="constant", constant_values=0) 指定幅の左右上下パディング。mode="constant"でゼロ埋め
divmod(a, 2) a を2で割った商・余りをまとめて返す(余り分は下or右にpadを多く配置)
np.copy(arr) 非破壊設計のため返却前に必ずコピー
logger.info/debug/error/exception loguruによる詳細ログ出力。異常系・成功系両方の挙動追跡に活用
例外処理 型や次元不正時にはValueError+loggerで詳細エラーメッセージを出力
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?