まえがき
データサイエンティストのためのPython100本ノックについての記事です。NumPyやPandasといったデータサイエンス系でよく用いられるライブラリを扱います。誤りなどがあれば、ご指摘ください。今回はNumpyを中心に15問扱います。
Q.1
項目 | 内容 |
---|---|
概要 | 2次元配列 arr において、指定行 row_indices のみを対象に、その行全体を行平均値で置換する。元配列は破壊せず、loguruに処理詳細を記録。NumPyのファンシーインデックス・ベクトル化・ブロードキャストを正しく理解し適用できるかを問う。 |
問題文 | 関数 transform_array(arr: np.ndarray, row_indices: list[int]) -> np.ndarray を定義せよ。arr は2次元 NumPy 配列であり、row_indices は整数インデックスのリストである。指定された行の各要素を、その行の平均値で置き換えた新たな配列を返すこと。 |
要件 | 2次元配列処理/指定行のみ平均値で埋める/元配列非破壊/loguruログ出力 |
発展仕様 | 入力検証(型・次元・範囲)/平均形状維持(keepdims)/ベクトル化処理/重複インデックス排除/負インデックス対応/ログ記録(変換行・非変換行・平均値)/例外処理(ValueError, IndexError, TypeError) |
使用構文 |
np.copy , np.mean , broadcast_to , fancy indexing , loguru.logger , try-except , set , dict.fromkeys , enumerate , isinstance , TypeError , IndexError , ValueError , np.ndarray , axis , keepdims
|
A.1
■ 模範解答
import numpy as np
from loguru import logger
from typing import List
def transform_array(arr: np.ndarray, row_indices: List[int]) -> np.ndarray:
try:
# 検証①:入力がNumPy配列か
if not isinstance(arr, np.ndarray):
raise TypeError("arr must be a numpy.ndarray.")
# 検証②:2次元配列か
if arr.ndim != 2:
raise ValueError("arr must be a 2-dimensional array.")
# 検証③:インデックスリストの型と内容
if not isinstance(row_indices, list) or not all(isinstance(i, int) for i in row_indices):
raise TypeError("row_indices must be a list of integers.")
n_rows = arr.shape[0]
# 検証④:インデックス範囲(負の添字も含め有効範囲)
for i in row_indices:
if not (-n_rows <= i < n_rows):
raise IndexError(f"Index {i} is out of bounds for array with {n_rows} rows.")
# 非破壊のため、元配列を複製
result = np.copy(arr)
# 重複排除・順序維持されたインデックス(dict.fromkeysで順序セット)
unique_rows = list(dict.fromkeys(row_indices))
logger.info(f"Rows to be transformed (vectorized): {unique_rows}")
# ベクトル化:対象行をまとめて取得
target_rows = result[unique_rows] # shape: (N, M)
# 行ごとの平均値を shape=(N,1) として取得(keepdims)
row_means = np.mean(target_rows, axis=1, keepdims=True)
# ブロードキャストで各行に平均値を展開
result[unique_rows] = np.broadcast_to(row_means, target_rows.shape)
# ログ記録:各行の平均値
for idx, mean_val in zip(unique_rows, row_means):
logger.debug(f"Row {idx} replaced with mean value: {mean_val.item()}")
# 非対象行のログ記録
untouched_rows = set(range(n_rows)) - set(unique_rows)
for i in sorted(untouched_rows):
logger.debug(f"Row {i} remains unchanged.")
return result
except Exception as e:
# すべての例外をloguruでログ出力し、再送出
logger.exception(f"An error occurred: {e}")
raise
実行例1:正常系(複数行を平均化)
arr = np.array([
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
])
row_indices = [0, 2]
res = transform_array(arr, row_indices)
print(res)
実行結果1
[[2 2 2]
[4 5 6]
[8 8 8]]
実行ログ1
INFO Rows to be transformed (vectorized): [0, 2]
DEBUG Row 0 replaced with mean value: 2.0
DEBUG Row 2 replaced with mean value: 8.0
DEBUG Row 1 remains unchanged.
実行例2:異常系(範囲外インデックス)
arr = np.array([[1, 2], [3, 4]])
row_indices = [0, 5]
transform_array(arr, row_indices)
実行結果2
IndexError: Index 5 is out of bounds for array with 2 rows.
実行ログ1
ERROR Error occurred: Index 5 is out of bounds for array with 2 rows.
■ 文法・構文まとめ
文法・関数 | 解説 |
---|---|
np.copy(arr) |
元の配列を破壊せず、新しい独立した配列を作成 |
np.mean(..., axis=1, keepdims=True) |
各行ごとの平均を縦ベクトルで取得し、形状を保ってブロードキャスト可能に |
np.broadcast_to(...) |
平均値 (N,1) を (N,M) に拡張して一括代入 |
fancy indexing |
arr[indices] で複数行を一括抽出 |
loguru |
高速・柔軟なログ出力ライブラリ(エラー、処理過程、デバッグ情報出力に使用) |
dict.fromkeys(row_indices) |
順序を保ったまま重複削除(Python 3.7以降で有効) |
set(range(n_rows)) - set(...) |
未変更行の検出 |
Q.2
項目 | 内容 |
---|---|
概要 | 特定のブールマスクに基づき部分的にZスコア標準化を行う MaskedScaler クラスを設計せよ。軸方向に平均・標準偏差をマスク付きで算出し、非破壊に変換を行うこと。未学習・不正マスク・NaNを含む場合の例外/警告ログ処理を含めること。 |
問題文 | クラス MaskedScaler(axis: int) を定義し、以下を満たすこと:
|
要件 | 軸指定/fit-call分離/非破壊処理/ベクトル化/ブールマスク/プロパティ取得/loguruログ/例外処理/NaN警告/repr表示 |
使用構文 |
np.mean , np.std , np.where , np.isnan , boolean indexing , broadcasting , @property , __call__ , __repr__ , RuntimeError , ValueError , TypeError , loguru
|
A.2
■ 模範解答
import numpy as np
from loguru import logger
class MaskedScaler:
def __init__(self, axis: int = 0):
# 軸の指定は0(列)または1(行)のみ許容
if axis not in (0, 1):
raise ValueError("axis must be 0 (column-wise) or 1 (row-wise).")
self.axis = axis
self._mean = None
self._std = None
self._fitted = False
def fit(self, arr: np.ndarray, mask: np.ndarray):
# 型と形状の検証
if not isinstance(arr, np.ndarray):
raise TypeError("arr must be a numpy.ndarray.")
if not isinstance(mask, np.ndarray) or mask.dtype != bool:
raise TypeError("mask must be a boolean numpy.ndarray.")
if arr.shape != mask.shape:
raise ValueError("arr and mask must have the same shape.")
if np.isnan(arr).any():
logger.warning("Input array contains NaN values. These may affect computation.")
# マスク外をNaNに置き換え
masked = np.where(mask, arr, np.nan)
# NaNを除外して軸方向の平均と標準偏差を計算
self._mean = np.nanmean(masked, axis=self.axis, keepdims=True)
self._std = np.nanstd(masked, axis=self.axis, ddof=0, keepdims=True)
# fit完了フラグ
self._fitted = True
logger.info(f"MaskedScaler fitted (axis={self.axis}). Mean: {self._mean.shape}, Std: {self._std.shape}")
def __call__(self, arr: np.ndarray) -> np.ndarray:
# fit前チェック
if not self._fitted:
raise RuntimeError("Scaler must be fitted before transformation.")
if not isinstance(arr, np.ndarray):
raise TypeError("Input must be a numpy.ndarray.")
if arr.ndim != self._mean.ndim or arr.shape[self.axis] != self._mean.shape[self.axis]:
raise ValueError("Input shape is incompatible with fitted statistics.")
logger.debug("Applying standardized transformation.")
# ベクトル化された標準化処理(元配列を非破壊)
return (arr - self._mean) / self._std
@property
def mean(self):
if not self._fitted:
raise RuntimeError("Scaler has not been fitted yet.")
return self._mean
@property
def std(self):
if not self._fitted:
raise RuntimeError("Scaler has not been fitted yet.")
return self._std
def __repr__(self):
if not self._fitted:
return f"<MaskedScaler(axis={self.axis}, status=not fitted)>"
return f"<MaskedScaler(axis={self.axis}, mean.shape={self._mean.shape}, std.shape={self._std.shape})>"
実行例1:正常系(列方向の部分標準化)
import numpy as np
from masked_scaler import MaskedScaler
arr = np.array([
[1, 2, 3],
[4, np.nan, 6],
[7, 8, 9]
])
mask = np.array([
[True, True, False],
[True, False, True],
[True, True, False]
])
scaler = MaskedScaler(axis=0)
scaler.fit(arr, mask)
print("Mean:", scaler.mean)
print("Std:", scaler.std)
print("Transformed:\n", scaler(arr))
実行結果1
Mean: [[4. 5. 4.5]]
Std: [[2.44948974 3. 2.12132034]]
Transformed:
[[-1.22474487 -1. -0.70710678]
[ 0. nan 0.70710678]
[ 1.22474487 1. 2.12132034]]
実行ログ1
WARNING Input array contains NaN values. These may affect computation.
INFO MaskedScaler fitted (axis=0). Mean: (1, 3), Std: (1, 3)
DEBUG Applying standardized transformation.
実行例2:異常系(未fitで call)
import numpy as np
from masked_scaler import MaskedScaler
arr = np.array([[1, 2], [3, 4]])
scaler = MaskedScaler(axis=1)
scaler(arr) # 未fitで使用
実行結果2
RuntimeError: Scaler must be fitted before transformation.
実行ログ1
■ 文法・構文まとめ
文法・構文 | 解説 |
---|---|
np.where(mask, arr, np.nan) |
ブールマスクを使ってマスク外を NaN に置き換え、統計量計算から除外 |
np.nanmean / nanstd
|
NaNを除外して平均・標準偏差を計算(部分的な統計量に最適) |
keepdims=True |
軸方向の次元を保ち、ブロードキャスト可能な形状に整える |
@property |
.mean や .std をメソッドでなく属性のように扱えるようにする |
__call__ |
インスタンスを関数のように使える(例:scaler(arr) ) |
__repr__ |
オブジェクトの文字列表現をカスタマイズ(print表示やログ出力に有用) |
Q.3
項目 | 内容 |
---|---|
概要 | 2D NumPy 配列の任意の矩形ブロックに対して、クロージャによって生成された任意の変換関数を適用し、新しい配列を返す。クロージャ+スライス+ベクトル化演算を組み合わせて柔軟かつ効率的な部分変換を設計する。元配列は変更してはならない。 |
問題文 | 関数 array_patch(arr: np.ndarray, row_start: int, row_end: int, col_start: int, col_end: int, transform_factory: Callable[[], Callable[[np.ndarray], np.ndarray]]) -> np.ndarray を定義せよ。引数で指定された部分ブロックに対して、transform_factory により生成されたクロージャを適用し、新たな配列を返すこと。破壊操作は禁止。不正インデックス、非callableな変換関数、処理中の例外はすべてログ出力し、適切な例外を送出せよ。 |
要件 | クロージャ/部分スライス/NumPyによる非破壊変換/インデックス検証/callable確認/例外処理とログ記録 |
発展仕様 |
|
使用構文 |
slice , fancy indexing , callable , closure , np.copy , try-except , ValueError , TypeError , loguru.logger
|
A.3
■ 模範解答
import numpy as np
from loguru import logger
from typing import Callable
def array_patch(
arr: np.ndarray,
row_start: int,
row_end: int,
col_start: int,
col_end: int,
transform_factory: Callable[[], Callable[[np.ndarray], np.ndarray]]
) -> np.ndarray:
try:
# 型と次元の検証
if not isinstance(arr, np.ndarray) or arr.ndim != 2:
raise TypeError("Input must be a 2D numpy.ndarray.")
# 範囲検証(負の添字対応を含めてチェック)
n_rows, n_cols = arr.shape
if not (0 <= row_start <= row_end <= n_rows) or not (0 <= col_start <= col_end <= n_cols):
raise ValueError(f"Invalid index range: "
f"(row_start={row_start}, row_end={row_end}), "
f"(col_start={col_start}, col_end={col_end})")
# クロージャ生成関数の検証
if not callable(transform_factory):
raise TypeError("transform_factory must be callable.")
transform = transform_factory()
if not callable(transform):
raise TypeError("transform_factory must return a callable.")
logger.info("Starting array_patch transformation.")
# 元配列を非破壊コピー(メモリ効率高い np.copy)
result = np.copy(arr)
# 対象ブロックの抽出(NumPyスライスでベクトル化)
sub_block = result[row_start:row_end, col_start:col_end]
logger.debug(f"Original block:\n{sub_block}")
# クロージャによる変換適用(変換関数自体がベクトル化されていることが前提)
transformed_block = transform(sub_block)
# 結果の型・形状検証
if transformed_block.shape != sub_block.shape:
raise ValueError("Transformed block must have the same shape as original block.")
# ブロック差し替え(NumPyスライス代入)
result[row_start:row_end, col_start:col_end] = transformed_block
logger.debug(f"Transformed block:\n{transformed_block}")
return result
except Exception as e:
logger.exception(f"Error occurred during array_patch: {e}")
raise
実行例1:正常系(クロージャで定数加算)
import numpy as np
# 3x3配列
arr = np.array([
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
])
# クロージャで加算変換関数を生成
def add_10_factory():
def transform(x):
return x + 10
return transform
# 配列の中央(1行目, 1列目)に+10する
patched = array_patch(arr, 1, 2, 1, 2, add_10_factory)
print(patched)
実行結果1
[[ 1 2 3]
[ 4 15 6]
[ 7 8 9]]
実行ログ1
INFO Starting array_patch transformation.
DEBUG Original block:
[[5]]
DEBUG Transformed block:
[[15]]
実行例2:異常系(クロージャで不正変換)
import numpy as np
arr = np.array([[1, 2], [3, 4]])
# 誤って関数を返さないクロージャ
def broken_factory():
return 12345 # callable ではない
array_patch(arr, 0, 2, 0, 2, broken_factory)
実行結果2
TypeError: transform_factory must return a callable.
実行ログ2
ERROR Error occurred during array_patch: transform_factory must return a callable.
■ 文法・構文まとめ
構文 | 解説 |
---|---|
np.copy(arr) |
元配列を非破壊でコピー(deepcopyより高速) |
slice |
arr[start:end] で部分抽出(範囲外自動処理) |
closure |
関数内関数としてスコープを保持する変換関数を動的に生成 |
callable |
オブジェクトが関数として呼べるか(__call__ を持つ)を確認 |
try-except + loguru
|
エラーのキャッチとログ出力を一元的に行う |
■ 高速化とPythonicポイント
処理箇所 | 改善点 |
---|---|
クロージャ | 関数定義を動的生成。柔軟性と閉包的変換が可能に。 |
スライス演算 |
arr[start:end] でループを使わず高速な部分抽出・代入を実現 |
非破壊処理 |
np.copy により安全に変更しつつ効率も担保 |
Q.4
項目 | 内容 |
---|---|
概要 | 任意の小さな配列 arr を指定形状 target_shape に対して、先頭軸方向からゼロパディングを施し、ブロードキャスト可能な形状に整形して返す。元配列は破壊しない。 |
問題文 | 関数 broadcast_pad(arr: np.ndarray, target_shape: tuple[int]) -> np.ndarray を定義せよ。配列の次元が target_shape 未満なら前方に1次元軸を追加し、必要に応じて np.pad によりゼロパディングすることで target_shape に整形する。ただし、変換後は np.broadcast_to(arr, target_shape) が成功する必要がある。整形不能な場合や不正な形状には ValueError を送出し、処理を loguru で記録せよ。 |
要件 | 多次元整形/前方軸追加/ゼロパディング/非破壊処理/ブロードキャスト検証/log出力/例外処理 |
発展仕様 |
|
使用構文 |
np.pad , np.broadcast_to , tuple , arr.ndim , arr.shape , zip , ValueError , loguru.logger , np.reshape
|
A.4
■ 模範解答
import numpy as np
from loguru import logger
from typing import Tuple
def broadcast_pad(arr: np.ndarray, target_shape: Tuple[int]) -> np.ndarray:
try:
# 型検証:target_shapeはタプルかつ整数値か
if not isinstance(target_shape, tuple) or not all(isinstance(x, int) for x in target_shape):
raise TypeError("target_shape must be a tuple of integers.")
# 元配列の次元取得
arr_shape = arr.shape
arr_ndim = arr.ndim
target_ndim = len(target_shape)
logger.debug(f"Original shape: {arr_shape}, Target shape: {target_shape}")
# Step1: 次元数が足りない場合は先頭に次元追加
if arr_ndim < target_ndim:
new_shape = (1,) * (target_ndim - arr_ndim) + arr_shape
arr = arr.reshape(new_shape)
logger.debug(f"Reshaped to match ndim: {arr.shape}")
# Step2: パディングサイズを算出(不足する次元にゼロパディング)
pad_width = []
for arr_dim, tgt_dim in zip(arr.shape, target_shape):
if arr_dim > tgt_dim:
raise ValueError(f"Cannot broadcast: array dim {arr_dim} > target dim {tgt_dim}")
pad_before = 0
pad_after = tgt_dim - arr_dim
pad_width.append((pad_before, pad_after))
logger.debug(f"Pad width: {pad_width}")
# Step3: ゼロでパディング(非破壊)
padded = np.pad(arr, pad_width, mode='constant', constant_values=0)
# Step4: ブロードキャスト検証(念のため明示的に試す)
try:
broadcasted = np.broadcast_to(padded, target_shape)
except ValueError as e:
raise ValueError(f"Cannot broadcast to target shape {target_shape}: {e}")
logger.info("broadcast_pad completed successfully.")
return broadcasted
except Exception as e:
logger.exception(f"Error in broadcast_pad: {e}")
raise
実行例1:正常系(1次元 → 2次元パディング)
arr = np.array([1, 2, 3]) # shape (3,)
target_shape = (2, 3) # 先頭に1次元追加+ゼロパディング行を1つ
result = broadcast_pad(arr, target_shape)
print(result)
実行結果1
[[1 2 3]
[0 0 0]]
実行ログ1
DEBUG Original shape: (3,), Target shape: (2, 3)
DEBUG Reshaped to match ndim: (1, 3)
DEBUG Pad width: [(0, 1), (0, 0)]
INFO broadcast_pad completed successfully.
実行例2:異常系(broadcast不可能)
arr = np.ones((4, 5))
target_shape = (2, 5) # 小さくする方向にはpadできない
broadcast_pad(arr, target_shape)
実行結果2
ValueError: Cannot broadcast: array dim 4 > target dim 2
実行ログ2
DEBUG Original shape: (4, 5), Target shape: (2, 5)
ERROR Error in broadcast_pad: Cannot broadcast: array dim 4 > target dim 2
■ 文法・構文まとめ
構文 | 解説 |
---|---|
np.reshape |
次元数を合わせるために (3,) → (1, 3) のような整形を行う |
np.pad(..., mode="constant", constant_values=0) |
指定軸方向にゼロでパディングを施す |
np.broadcast_to |
対象配列が特定のshapeにブロードキャスト可能か確認/失敗すれば ValueError を送出 |
zip(arr.shape, target_shape) |
各軸ごとに配列サイズとターゲットサイズを比較し、必要なパディングを算出 |
loguru |
DEBUG , INFO , ERROR ログを記録。エラー原因の追跡に有効 |
■ 高速化とPythonicポイント
項目 | 説明 |
---|---|
reshape+pad |
deepcopy を使わずに元配列を拡張する高速処理 |
tuple操作 | パディング計算において zip +リスト操作で分岐のないPythonic処理 |
broadcast_to検証 | NumPy内部でCレベルでのチェックを行うため確実かつ高速 |
Q.5
項目 | 内容 |
---|---|
概要 | 正方行列の対角要素に特化した抽出・変換・走査を提供する DiagonalMasker クラスを実装せよ。元配列は非破壊で保持され、対角成分のみに対して変更・逐次処理が可能であること。 |
問題文 | クラス DiagonalMasker(arr: np.ndarray) を定義せよ。
|
要件 | 対角抽出/走査/関数適用による変更/非破壊設計/例外処理/repr/log記録 |
発展仕様 |
|
使用構文 |
np.diag , np.fill_diagonal , np.copy , __iter__ , __next__ , @property , callable , ValueError , StopIteration , loguru , __repr__
|
A.5
■ 模範解答
import numpy as np
from loguru import logger
class DiagonalMasker:
def __init__(self, arr: np.ndarray):
# 入力が正方2次元配列か検証
if not isinstance(arr, np.ndarray) or arr.ndim != 2:
raise TypeError("Input must be a 2D numpy.ndarray.")
if arr.shape[0] != arr.shape[1]:
raise ValueError("Input array must be square.")
self._original = np.copy(arr) # 元配列は非破壊で保存
self._current = np.copy(arr) # 現在の加工対象
self._size = arr.shape[0] # 対角の長さ
self._index = 0 # イテレータ用インデックス
logger.info("DiagonalMasker initialized.")
@property
def diagonal(self) -> np.ndarray:
# 対角成分を抽出
diag = np.diag(self._current)
logger.debug(f"Accessed diagonal: {diag}")
return diag
def apply(self, func):
# 関数のcallable性を検証
if not callable(func):
raise TypeError("The argument to apply() must be callable.")
# 対角要素を取り出し、変換関数を適用(ベクトル処理)
diag = np.diag(self._current)
new_diag = func(diag)
if len(new_diag) != self._size:
raise ValueError("Transformed diagonal must match original length.")
# np.fill_diagonalで変更(破壊的だが self._current にのみ適用)
np.fill_diagonal(self._current, new_diag)
logger.info(f"Diagonal updated using {func.__name__ if hasattr(func, '__name__') else str(func)}.")
def __iter__(self):
# イテレータ初期化(再利用可能に)
self._index = 0
return self
def __next__(self):
if self._index >= self._size:
raise StopIteration
val = self._current[self._index, self._index]
logger.debug(f"Yielded diagonal[{self._index}]: {val}")
self._index += 1
return val
def __repr__(self):
return f"<DiagonalMasker(size={self._size}, diagonal={self.diagonal})>"
def get_array(self) -> np.ndarray:
# 現在の状態を返す(外部から安全にアクセス)
return np.copy(self._current)
実行例1:正常系(対角加算)
arr = np.array([
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
])
masker = DiagonalMasker(arr)
# 対角に +10 を加算するクロージャ適用
masker.apply(lambda x: x + 10)
# 加工後配列出力
print(masker.get_array())
# 逐次処理
for val in masker:
print("Diagonal value:", val)
実行結果1
[[11 2 3]
[ 4 15 6]
[ 7 8 19]]
Diagonal value: 11
Diagonal value: 15
Diagonal value: 19
実行ログ1
INFO DiagonalMasker initialized.
DEBUG Accessed diagonal: [1 5 9]
INFO Diagonal updated using <lambda>.
DEBUG Yielded diagonal[0]: 11
DEBUG Yielded diagonal[1]: 15
DEBUG Yielded diagonal[2]: 19
実行例2:異常系(非正方行列)
arr = np.array([[1, 2, 3], [4, 5, 6]])
masker = DiagonalMasker(arr)
実行結果2
ValueError: Input array must be square.
実行ログ2
■ 文法・構文まとめ
構文 | 解説 |
---|---|
np.diag(arr) |
対角成分だけを抽出するNumPyベクトル演算。1行で効率よく処理可能 |
np.fill_diagonal(arr, x) |
配列の対角に一括で値を埋め込む高速なNumPy操作 |
callable(obj) |
関数やクロージャなど「呼び出し可能なもの」かどうかの検査 |
__iter__ , __next__
|
Pythonイテレータプロトコルに準拠した逐次走査の仕組み |
@property |
.diagonal のように属性アクセス形式でメソッドの値を取得させる |
loguru |
logger.info , logger.debug , logger.exception で柔軟なログ出力 |
Q.6
項目 | 内容 |
---|---|
概要 | ランダムに1列を選択して定数または関数で加工する ColumnSelector クラスを設計せよ。非破壊的処理、NumPyによるベクトル化処理、関数型引数による高階関数処理、ログ記録、シードによる再現性制御を含む。 |
問題文 | クラス ColumnSelector(n_cols: int, seed: Optional[int] = None) を定義し、メソッド __call__(arr: np.ndarray, fn: Union[Callable, int, float]) -> np.ndarray にて、列方向にランダムに1列を選び、定数または関数によって加工した新しい配列を返す関数を設計せよ。関数は列全体または各要素に対して適用可能とし、元配列を変更せず、loguruにより処理のログを出力すること。 |
要件 | ランダム列選択/シード固定可/定数・関数いずれも受け入れる/非破壊処理/ログ出力/例外処理 |
発展仕様 | callable検査/列数不一致エラーチェック/ベクトル化変換関数に優先対応/失敗時詳細ログ記録 |
使用構文 |
np.random.default_rng , np.copy , fancy indexing , callable , np.vectorize , loguru , __call__ , ValueError , TypeError
|
A.6
■ 模範解答
import numpy as np
from loguru import logger
from typing import Union, Callable, Optional
class ColumnSelector:
def __init__(self, n_cols: int, seed: Optional[int] = None):
# 列数は正の整数
if not isinstance(n_cols, int) or n_cols <= 0:
raise ValueError("n_cols must be a positive integer.")
self.n_cols = n_cols
self.rng = np.random.default_rng(seed) # 再現性ある乱数生成器
logger.info(f"ColumnSelector initialized with n_cols={n_cols}, seed={seed}")
def __call__(self, arr: np.ndarray, fn: Union[Callable, int, float]) -> np.ndarray:
try:
# 入力配列の型・形状チェック
if not isinstance(arr, np.ndarray) or arr.ndim != 2:
raise TypeError("Input must be a 2D numpy.ndarray.")
if arr.shape[1] != self.n_cols:
raise ValueError(f"Expected {self.n_cols} columns, but got {arr.shape[1]}.")
# ランダムに1列選択
col_index = self.rng.integers(low=0, high=self.n_cols)
logger.debug(f"Randomly selected column index: {col_index}")
# 元配列を非破壊コピー
result = np.copy(arr)
# 対象列の抽出
column = result[:, col_index]
# 加工関数の処理
if callable(fn):
try:
# 列全体を変換してshape維持(ベクトル演算想定)
new_column = fn(column)
# ベクトル化できない場合は要素単位で対応
if not isinstance(new_column, np.ndarray):
new_column = np.vectorize(fn)(column)
except Exception as e:
logger.exception("Function application failed.")
raise ValueError(f"Failed to apply function to column: {e}")
else:
# 定数による置換
new_column = np.full_like(column, fn)
logger.debug(f"Column replaced with constant: {fn}")
# 結果を置換
result[:, col_index] = new_column
logger.info(f"Column {col_index} successfully transformed.")
return result
except Exception as e:
logger.exception(f"Error in ColumnSelector: {e}")
raise
実行例1:関数で変換(ランダム列を2倍)
arr = np.array([
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
])
selector = ColumnSelector(n_cols=3, seed=42)
new_arr = selector(arr, lambda x: x * 2)
print(new_arr)
実行結果1
[[ 1 2 6]
[ 4 5 12]
[ 7 8 18]]
実行ログ1
INFO ColumnSelector initialized with n_cols=3, seed=42
DEBUG Randomly selected column index: 2
INFO Column 2 successfully transformed.
実行例2:定数で置換(ランダム列を0に)
arr = np.array([
[10, 20, 30],
[40, 50, 60],
[70, 80, 90]
])
selector = ColumnSelector(n_cols=3, seed=1)
new_arr = selector(arr, 0)
print(new_arr)
実行結果2
[[10 0 30]
[40 0 60]
[70 0 90]]
実行ログ2
INFO ColumnSelector initialized with n_cols=3, seed=1
DEBUG Randomly selected column index: 1
DEBUG Column replaced with constant: 0
INFO Column 1 successfully transformed.
■ 文法・構文まとめ
文法・機能 | 解説 |
---|---|
np.random.default_rng(seed) |
安全・高速な乱数生成器(再現性あり) |
callable(fn) |
引数が関数かどうかを判定 |
np.vectorize(fn) |
要素ごとの処理に対応した関数ベクトル化 |
np.full_like |
配列と同じ形状・dtypeの定数埋め配列を生成 |
loguru.logger |
DEBUG 〜INFO 〜ERROR で柔軟なログ制御 |
__call__ |
selector(arr, fn) のようにインスタンスを関数のように使用可能にする |
Q.7
項目 | 内容 |
---|---|
概要 |
OutlierRemover(axis: int = 0) クラスを設計し、transform(arr) で IQR法(四分位範囲)により外れ値を検出し、指定軸方向で np.nan に置換した新たな配列を返す。元配列は変更せず、全ての処理ログを loguru で出力する。 |
問題文 | IQR法(外れ値 = Q1 - 1.5×IQR未満 または Q3 + 1.5×IQR超過)に従って、配列中の外れ値を np.nan に置換する OutlierRemover クラスを設計せよ。NaNの事前検出、軸方向指定、非破壊処理、全処理ログ、dtype検査、例外処理を含むこと。 |
要件 | IQR計算/軸指定(列・行)/ベクトル化マスク処理/非破壊/NaN検知/log記録/例外送出 |
発展仕様 |
|
使用構文 |
np.percentile , np.where , np.copy , np.isnan , np.logical_or , np.newaxis , loguru.logger , ValueError
|
|
A.7
■ 模範解答
import numpy as np
from loguru import logger
class OutlierRemover:
def __init__(self, axis: int = 0):
# 軸は0(列)または1(行)に限定
if axis not in (0, 1):
raise ValueError("axis must be 0 (column-wise) or 1 (row-wise).")
self.axis = axis
logger.info(f"OutlierRemover initialized with axis={axis}")
def transform(self, arr: np.ndarray) -> np.ndarray:
try:
# 型チェック(float系の数値配列のみ)
if not isinstance(arr, np.ndarray) or arr.ndim != 2:
raise TypeError("Input must be a 2D numpy.ndarray.")
if not np.issubdtype(arr.dtype, np.floating):
raise TypeError("Input array must have float dtype for NaN support.")
# NaN検出とログ
if np.isnan(arr).any():
logger.warning("Input array contains NaN values. These will be preserved.")
# 非破壊コピー
result = np.copy(arr)
# Q1, Q3, IQRをベクトル化で計算
Q1 = np.nanpercentile(result, 25, axis=self.axis, keepdims=True)
Q3 = np.nanpercentile(result, 75, axis=self.axis, keepdims=True)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
logger.debug(f"Q1: {Q1.flatten()}")
logger.debug(f"Q3: {Q3.flatten()}")
logger.debug(f"IQR: {IQR.flatten()}")
# 論理マスクで外れ値を検出(ブロードキャスト活用)
mask = np.logical_or(result < lower_bound, result > upper_bound)
# 外れ値をnp.nanに置換
result[mask] = np.nan
logger.info("Outlier removal completed.")
return result
except Exception as e:
logger.exception(f"Failed to transform array: {e}")
raise
実行例1:正常系(列方向)
arr = np.array([
[10.0, 20.0, 30.0],
[12.0, 21.0, 500.0],
[11.0, 22.0, 31.0],
[13.0, 19.0, 29.0]
])
remover = OutlierRemover(axis=0)
out = remover.transform(arr)
print(out)
実行結果1
[[10. 20. 30.]
[12. 21. nan]
[11. 22. 31.]
[13. 19. 29.]]
実行ログ1
INFO OutlierRemover initialized with axis=0
DEBUG Q1: [10.75 19.75 29.5 ]
DEBUG Q3: [12.75 21.25 30.5 ]
DEBUG IQR: [2. 1.5 1. ]
INFO Outlier removal completed.
実行例2:異常系(整数型入力)
arr = np.array([
[1, 2, 3],
[4, 100, 6]
]) # int 型(NaN入らない)
remover = OutlierRemover(axis=1)
remover.transform(arr)
実行結果2
TypeError: Input array must have float dtype for NaN support.
実行ログ2
INFO OutlierRemover initialized with axis=1
ERROR Failed to transform array: Input array must have float dtype for NaN support.
■ 文法・構文まとめ
機能 | 解説 |
---|---|
np.percentile(..., axis) |
軸方向に対してQ1, Q3を効率的に取得(ベクトル化) |
np.logical_or(...) |
マスク演算で閾値外の条件を一括で処理 |
np.nanpercentile(...) |
NaNを無視して四分位を取得 |
np.copy() |
元配列の非破壊処理 |
np.isnan() |
NaN存在検出 |
loguru.logger |
INFO , DEBUG , ERROR , WARNING を使い分けて処理を追跡可能に |
Q.8
項目 | 内容 |
---|---|
概要 | 2次元配列 arr 内において、サブ配列 pattern に一致するすべての位置を検索し、それぞれに replacement を上書きする。NumPyのスライディングウィンドウを用いた高速検索、ブロードキャストとインデックス操作を活用し、元配列を変更せずに結果を返すこと。 |
問題文 | 関数 pattern_replace(arr, pattern, replacement) を定義せよ。arr は2D配列、pattern は部分一致対象となるサブ配列、replacement は置換対象のサブブロックである。すべての pattern に一致する位置を replacement に置換した新しい配列を返すこと。配列次元の整合性、マッチの存在、整合性不備などに対してはログ出力し、例外処理を行うこと。 |
要件 |
|
使用構文 |
np.lib.stride_tricks.sliding_window_view , np.copy , np.all , np.array_equal , fancy indexing , loguru.logger , try-except , ValueError
|
A.8
■ 模範解答
import numpy as np
from loguru import logger
from numpy.lib.stride_tricks import sliding_window_view
def pattern_replace(arr: np.ndarray, pattern: np.ndarray, replacement: np.ndarray) -> np.ndarray:
try:
# 入力検証:すべて2Dであること
if not all(isinstance(x, np.ndarray) and x.ndim == 2 for x in [arr, pattern, replacement]):
raise TypeError("All inputs must be 2D numpy arrays.")
arr_rows, arr_cols = arr.shape
pat_rows, pat_cols = pattern.shape
rep_rows, rep_cols = replacement.shape
# パターン・置換ブロックの形状一致
if pattern.shape != replacement.shape:
raise ValueError(f"pattern shape {pattern.shape} and replacement shape {replacement.shape} must match.")
# 元配列にパターンが入りきるか
if arr_rows < pat_rows or arr_cols < pat_cols:
raise ValueError("Pattern is larger than the input array.")
# 非破壊コピー
result = np.copy(arr)
# スライディングウィンドウで部分ブロックを抽出(形状: (H, W, ph, pw))
windows = sliding_window_view(arr, pattern.shape)
# 一致判定マスクを作成(形状: (H, W))
match_mask = np.all(windows == pattern, axis=(-2, -1))
# 一致位置のインデックス取得
match_indices = np.argwhere(match_mask)
logger.info(f"Found {len(match_indices)} match(es).")
for i, j in match_indices:
# 対象ブロックを result に置換
result[i:i+pat_rows, j:j+pat_cols] = replacement
logger.debug(f"Replaced block at position ({i}, {j})")
return result
except Exception as e:
logger.exception(f"pattern_replace failed: {e}")
raise
実行例1:1箇所に一致 → 置換成功
arr = np.array([
[1, 1, 1, 1],
[1, 2, 2, 1],
[1, 2, 2, 1],
[1, 1, 1, 1]
])
pattern = np.array([
[2, 2],
[2, 2]
])
replacement = np.array([
[9, 9],
[9, 9]
])
res = pattern_replace(arr, pattern, replacement)
print(res)
実行結果1
[[1 1 1 1]
[1 9 9 1]
[1 9 9 1]
[1 1 1 1]]
実行ログ1
INFO Found 1 match(es).
DEBUG Replaced block at position (1, 1)
実行例2:一致なし → 未変更
arr = np.array([
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
])
pattern = np.array([
[9, 9],
[9, 9]
])
replacement = np.array([
[0, 0],
[0, 0]
])
res = pattern_replace(arr, pattern, replacement)
print(res)
実行結果2
[[1 2 3]
[4 5 6]
[7 8 9]]
実行ログ2
INFO Found 0 match(es).
■ 文法・構文まとめ
構文 | 解説 |
---|---|
sliding_window_view(arr, shape) |
入力配列から全領域の形状shape の部分配列をスライド抽出(超高速) |
np.all(..., axis=(-2,-1)) |
各サブ配列が pattern と完全一致するかブール配列に変換 |
np.argwhere(mask) |
True のインデックス(i,j)をリスト形式で取得 |
result[i:i+ph, j:j+pw] = replacement |
マッチした位置を replacement で一括置換(スライスによるベクトル処理) |
np.copy() |
元配列を壊さない非破壊処理 |
loguru |
ログ出力で処理過程・例外・警告を記録。開発・テスト・運用でも追跡可能 |
Q.9
項目 | 内容 |
---|---|
概要 | 多次元配列において指定軸をflatten(畳み込み)し、他軸に沿った形で mean , min , max を求める関数 collapse_axis(arr, axis) を設計する。元配列は非破壊で扱い、異常時は loguru に記録しつつ例外を送出する。 |
問題文 | 関数 collapse_axis(arr: np.ndarray, axis: int) -> dict[str, np.ndarray] を定義せよ。多次元配列 arr に対して、指定 axis をflattenし、残る軸方向ごとの mean , min , max を計算した辞書を返すこと。軸指定が不正な場合、reshapeに失敗した場合、いずれも ValueError を発生させること。すべての操作は loguru に記録すること。元配列は変更しないこと。 |
要件 | flatten操作/他軸統計の辞書出力/非破壊処理/軸検証/reshape検証/ログ出力 |
発展仕様 |
|
使用構文 |
np.moveaxis , np.reshape , np.mean , np.min , np.max , dict , try-except , ValueError , loguru.logger , np.copy , np.ndarray.shape , arr.ndim
|
A.9
■ 模範解答
import numpy as np
from loguru import logger
from typing import Dict
def collapse_axis(arr: np.ndarray, axis: int) -> Dict[str, np.ndarray]:
try:
# 入力がNumPy配列であることを確認
if not isinstance(arr, np.ndarray):
raise TypeError("Input must be a NumPy array.")
if axis < 0 or axis >= arr.ndim:
raise ValueError(f"Axis {axis} is out of bounds for array with shape {arr.shape}")
logger.info(f"Collapsing axis {axis} for array with shape {arr.shape}")
# 指定軸を先頭に移動(他軸との整合を簡単にする)
moved = np.moveaxis(arr, axis, 0) # 形状: (D, A, B, ...) → (axis_dim, ...)
axis_dim = moved.shape[0]
rest_shape = moved.shape[1:] # flatten対象外の軸の形状
# reshapeして (axis_dim, -1) の形に(例: (3,4,5) → (3,20))
collapsed = moved.reshape(axis_dim, -1)
logger.debug(f"Collapsed shape: {collapsed.shape}")
# 軸方向に統計計算(残り軸ごとに計算)
mean = np.mean(collapsed, axis=0).reshape(rest_shape)
min_ = np.min(collapsed, axis=0).reshape(rest_shape)
max_ = np.max(collapsed, axis=0).reshape(rest_shape)
logger.info("Statistical computation completed successfully.")
return {"mean": mean, "min": min_, "max": max_}
except Exception as e:
logger.exception(f"Failed to collapse axis {axis}: {e}")
raise ValueError(f"collapse_axis failed: {e}")
実行例1:3次元配列の axis=0 を collapse
arr = np.array([
[[1, 2], [3, 4]],
[[5, 6], [7, 8]],
[[9,10], [11,12]]
]) # shape = (3, 2, 2)
res = collapse_axis(arr, axis=0)
print("mean:\n", res["mean"])
print("min:\n", res["min"])
print("max:\n", res["max"])
実行結果1
mean:
[[5. 6.]
[7. 8.]]
min:
[[1 2]
[3 4]]
max:
[[ 9 10]
[11 12]]
実行ログ1
INFO Collapsing axis 0 for array with shape (3, 2, 2)
DEBUG Collapsed shape: (3, 4)
INFO Statistical computation completed successfully.
実行例2:不正な axis 指定
arr = np.random.rand(4, 5)
# axis=2 は範囲外
collapse_axis(arr, axis=2)
実行結果2
ValueError: collapse_axis failed: Axis 2 is out of bounds for array with shape (4, 5)
実行ログ2
INFO Collapsing axis 2 for array with shape (4, 5)
ERROR Failed to collapse axis 2: Axis 2 is out of bounds for array with shape (4, 5)
■ 文法・構文まとめ
文法/構文 | 解説 |
---|---|
np.moveaxis(arr, axis, 0) |
任意の軸を先頭に移動し、処理しやすくする(reshape前処理) |
np.reshape(..., (-1,)) |
flattenする形で指定軸を集約。高次元でも高速かつ安全 |
np.mean(..., axis=0) |
flattenした後の各位置(元の他軸)における平均値 |
reshape(rest_shape) |
統計量を元の軸の形に戻すことで意味のある構造を保つ |
loguru |
logger.info , logger.debug , logger.exception によって処理ログを出力 |
try-except |
reshape や型不整合時のエラーを捕捉し、ログ記録と例外送出を両立 |
Q.10
項目 | 内容 |
---|---|
概要 | クラス SparseEmbedder(num_classes: int) を実装し、__call__(labels: np.ndarray) によって 1D整数ラベル配列を one-hot 行列に変換して返すこと。内部で np.eye を用い、範囲外・非整数の入力には例外を送出し、loguru に処理を記録する。 |
問題文 | クラス SparseEmbedder を実装せよ。コンストラクタでクラス数 num_classes を指定し、メソッド __call__(labels) により、NumPy配列 labels の各整数ラベルを one-hot エンコーディングして返すこと。以下の仕様をすべて満たすこと:
|
要件 |
np.eye /fancy indexing/dtype検証/範囲検証/log記録/例外送出 |
発展仕様 |
|
使用構文 |
np.eye , fancy indexing , astype , assert , ValueError , TypeError , __call__ , loguru.logger
|
A.10
■ 模範解答
import numpy as np
from loguru import logger
class SparseEmbedder:
def __init__(self, num_classes: int):
# クラス数の検証(1以上の整数)
if not isinstance(num_classes, int) or num_classes <= 0:
raise ValueError("num_classes must be a positive integer.")
self.num_classes = num_classes
logger.info(f"SparseEmbedder initialized with num_classes={num_classes}")
def __call__(self, labels: np.ndarray) -> np.ndarray:
try:
# ラベルが1次元のNumPy配列かどうか
if not isinstance(labels, np.ndarray) or labels.ndim != 1:
raise TypeError("labels must be a 1D numpy array.")
# 整数であることを確認(floatやstrは除外)
if not np.issubdtype(labels.dtype, np.integer):
raise TypeError("labels must contain integers.")
# ラベルがクラス数未満かつ非負かを検査
if np.any(labels < 0) or np.any(labels >= self.num_classes):
raise ValueError(f"Labels must be in range [0, {self.num_classes - 1}].")
logger.debug(f"Encoding labels: {labels}")
# One-hotエンコードを生成(np.eyeで単位行列→fancy indexing)
one_hot_matrix = np.eye(self.num_classes, dtype=np.float32)[labels]
logger.info(f"One-hot encoded shape: {one_hot_matrix.shape}")
return one_hot_matrix
except Exception as e:
logger.exception(f"Embedding failed: {e}")
raise
実行例1:正常系(0〜3のラベルをone-hotに変換)
labels = np.array([0, 2, 1, 3])
embedder = SparseEmbedder(num_classes=4)
one_hot = embedder(labels)
print(one_hot)
実行結果1
[[1. 0. 0. 0.]
[0. 0. 1. 0.]
[0. 1. 0. 0.]
[0. 0. 0. 1.]]
実行ログ1
INFO SparseEmbedder initialized with num_classes=4
DEBUG Encoding labels: [0 2 1 3]
INFO One-hot encoded shape: (4, 4)
実行例2:異常系(ラベルが範囲外)
labels = np.array([0, 1, 5]) # 5はクラス数4の範囲外
embedder = SparseEmbedder(num_classes=4)
embedder(labels)
実行結果2
ValueError: Labels must be in range [0, 3].
実行ログ2
INFO SparseEmbedder initialized with num_classes=4
ERROR Embedding failed: Labels must be in range [0, 3].
■ 文法・構文まとめ
構文 | 解説 |
---|---|
np.eye(n) |
n×n の単位行列を生成(1-hotの土台) |
arr[labels] |
fancy indexing による行取り出しで one-hot に変換 |
np.issubdtype(dtype, np.integer) |
データ型が整数であるかを判定 |
logger.info , logger.exception
|
loguruによるログ出力(開発・本番で可視化) |
Q.11
項目 | 内容 |
---|---|
概要 | 任意ラベル配列を一意な連番IDに変換し、逆変換も可能な IndexMapper クラスを設計せよ。fit, transform, inverse_transform を備え、変換辞書の両方向保持、未fit状態の利用禁止、ベクトル化による高速変換、例外処理、@classmethodによる辞書初期化、ログ出力を実装すること。 |
問題文 | クラス IndexMapper を定義し、1) fit(labels) で一意ラベル→IDマッピング辞書と逆引き辞書を生成2) transform(labels) で入力を整数IDに変換3) inverse_transform(ids) で整数IDからラベルに逆変換4) from_dict(d) で辞書から初期化可能未fit利用・逆変換エラー・型不整合は例外+ログ出力すること。 |
要件 | unique→ID変換/双方向辞書保持/np.vectorize変換/未fit・不正ID検知/classmethod初期化/log記録/詳細な例外 |
発展仕様 |
|
使用構文 |
np.unique , np.vectorize , dict , @classmethod , ValueError , RuntimeError , loguru.logger , @property , set
|
A.11
■ 模範解答
import numpy as np
from loguru import logger
class IndexMapper:
def __init__(self):
self._label2id = None # ラベル→IDマッピング
self._id2label = None # ID→ラベルマッピング
self._fitted = False # fit済みフラグ
def fit(self, labels):
# 入力検証
if not isinstance(labels, (np.ndarray, list)):
raise TypeError("labels must be a numpy array or list.")
# 一意ラベル抽出
unique = np.unique(labels)
logger.info(f"Unique labels found: {unique}")
# ラベル→ID辞書生成(順序保証)
self._label2id = {label: idx for idx, label in enumerate(unique)}
# ID→ラベル辞書
self._id2label = {idx: label for label, idx in self._label2id.items()}
self._fitted = True
logger.info("IndexMapper fitted successfully.")
def transform(self, labels):
if not self._fitted:
logger.error("IndexMapper must be fitted before transform.")
raise RuntimeError("IndexMapper is not fitted yet.")
# ベクトル化変換
def _to_id(label):
if label not in self._label2id:
logger.error(f"Label '{label}' not found in mapping.")
raise ValueError(f"Label '{label}' not found in mapping.")
return self._label2id[label]
vec_to_id = np.vectorize(_to_id)
result = vec_to_id(labels)
logger.info(f"Transform complete. Labels: {labels} → IDs: {result}")
return result
def inverse_transform(self, ids):
if not self._fitted:
logger.error("IndexMapper must be fitted before inverse_transform.")
raise RuntimeError("IndexMapper is not fitted yet.")
# ベクトル化逆変換
def _to_label(idx):
if idx not in self._id2label:
logger.error(f"ID '{idx}' is invalid.")
raise ValueError(f"ID '{idx}' is invalid.")
return self._id2label[idx]
vec_to_label = np.vectorize(_to_label)
result = vec_to_label(ids)
logger.info(f"Inverse transform complete. IDs: {ids} → Labels: {result}")
return result
@classmethod
def from_dict(cls, label2id: dict):
# 辞書から直接初期化
if not isinstance(label2id, dict) or not all(isinstance(v, int) for v in label2id.values()):
logger.error("label2id must be a dict of label to int.")
raise TypeError("label2id must be a dict mapping label to int.")
instance = cls()
instance._label2id = dict(label2id)
instance._id2label = {idx: label for label, idx in label2id.items()}
instance._fitted = True
logger.info("IndexMapper initialized from dict.")
return instance
@property
def is_fitted(self):
return self._fitted
def __repr__(self):
if not self._fitted:
return "<IndexMapper (unfitted)>"
return f"<IndexMapper (n_labels={len(self._label2id)})>"
実行例1:正常系(fit→transform→逆変換)
labels = np.array(['dog', 'cat', 'dog', 'bird'])
mapper = IndexMapper()
mapper.fit(labels)
print("label2id:", mapper._label2id)
ids = mapper.transform(['dog', 'bird'])
print("to ids:", ids)
rev = mapper.inverse_transform(ids)
print("back to labels:", rev)
実行結果1
label2id: {'bird': 0, 'cat': 1, 'dog': 2}
to ids: [2 0]
back to labels: ['dog' 'bird']
実行ログ1
INFO Unique labels found: ['bird' 'cat' 'dog']
INFO IndexMapper fitted successfully.
INFO Transform complete. Labels: ['dog' 'bird'] → IDs: [2 0]
INFO Inverse transform complete. IDs: [2 0] → Labels: ['dog' 'bird']
実行例2:異常系(fit前のtransform)
mapper = IndexMapper()
mapper.transform(['cat', 'dog'])
実行結果2
RuntimeError: IndexMapper is not fitted yet.
実行ログ2
ERROR IndexMapper must be fitted before transform.
■ 文法・構文まとめ
構文・仕組み | 解説 |
---|---|
np.unique(labels) |
一意なラベルを抜き出して順序付け配列で取得 |
dict[label: idx ...] |
ラベルからID, IDからラベルの双方向マップ |
np.vectorize(fn) |
要素ごとに関数適用を配列化。ループより高速&Pythonic |
@classmethod |
辞書(label2id)からクラスインスタンス生成 |
@property |
is_fitted でfit状態を安全に参照 |
例外+logger | 全異常ケースで詳細エラー内容と状態をloguruに出力(開発・運用・テスト全てで有効) |
Q.12
項目 | 内容 |
---|---|
概要 | 任意ウィンドウ幅・集計関数を受け取り、1次元配列に対し sliding_window_view で高速にローリングウィンドウ展開+集計処理を行う。非破壊処理・型チェック・例外時の詳細log記録・汎用関数型集計など、設計の柔軟性と堅牢性を重視する。 |
問題文 | クラス RollingAggregator(window_size: int, agg_func: Callable) を定義し、__call__(arr: np.ndarray) で1次元配列 arr に対しウィンドウ幅 window_size でrolling window集計(agg_func 適用)を行い結果を返せ。不正入力はValueError・TypeError・loguruで記録すること。 |
要件 | sliding_window_view展開/任意関数型集計/入力・関数型検証/log出力/非破壊処理/ウィンドウ幅不正対応 |
発展仕様 |
|
使用構文 |
np.lib.stride_tricks.sliding_window_view , np.apply_along_axis , callable , ValueError , TypeError , __call__ , loguru.logger
|
A.12
■ 模範解答
import numpy as np
from loguru import logger
from typing import Callable
class RollingAggregator:
def __init__(self, window_size: int, agg_func: Callable):
# ウィンドウ幅の検証
if not isinstance(window_size, int) or window_size <= 0:
logger.error("window_size must be a positive integer.")
raise ValueError("window_size must be a positive integer.")
if not callable(agg_func):
logger.error("agg_func must be callable.")
raise TypeError("agg_func must be callable.")
self.window_size = window_size
self.agg_func = agg_func
logger.info(f"RollingAggregator initialized (window_size={window_size})")
def __call__(self, arr: np.ndarray) -> np.ndarray:
try:
# 入力検証(1次元のみ)
if not isinstance(arr, np.ndarray) or arr.ndim != 1:
logger.error("Input must be a 1D numpy array.")
raise TypeError("Input must be a 1D numpy array.")
if arr.size < self.window_size:
logger.error(f"Input length ({arr.size}) is less than window_size ({self.window_size}).")
raise ValueError("Input length must be at least as large as window_size.")
logger.debug(f"Applying sliding_window_view: arr.shape={arr.shape}, window_size={self.window_size}")
# sliding_window_view で (N-window+1, window) shape配列生成(非破壊処理)
windows = np.lib.stride_tricks.sliding_window_view(arr, self.window_size)
logger.debug(f"Windows shape: {windows.shape}")
# apply_along_axisで各windowにagg_func適用(ベクトル化集計)
result = np.apply_along_axis(self.agg_func, 1, windows)
logger.info("Rolling aggregation completed successfully.")
return result
except Exception as e:
logger.exception(f"Rolling aggregation failed: {e}")
raise
実行例1:mean(移動平均)
arr = np.array([1, 3, 5, 7, 9, 11])
roller = RollingAggregator(window_size=3, agg_func=np.mean)
print(roller(arr))
実行結果1
[3. 5. 7. 9.]
実行ログ1
INFO RollingAggregator initialized (window_size=3)
DEBUG Applying sliding_window_view: arr.shape=(6,), window_size=3
DEBUG Windows shape: (4, 3)
INFO Rolling aggregation completed successfully.
実行例2:異常系(window_size不正)
arr = np.arange(5)
roller = RollingAggregator(window_size=0, agg_func=np.sum)
roller(arr)
実行結果2
ValueError: window_size must be a positive integer.
実行ログ2
ERROR window_size must be a positive integer.
■ 文法・構文まとめ
構文・機能 | 解説 |
---|---|
sliding_window_view(arr, window_size) |
配列を「重なりあり」部分窓にスキャンし高速でメモリ効率良く展開 |
apply_along_axis(func, axis, arr) |
各窓(行)ごとに任意の関数(平均・合計・自作集計など)を適用 |
callable(obj) |
関数として呼び出せるか(関数型・lambda・メソッド)を判定 |
loguru.logger |
DEBUG /INFO /ERROR /EXCEPTION ログで処理・例外を全自動記録 |
Q.13
項目 | 内容 |
---|---|
概要 | 複素行列 A を受け、get_polar_form() で絶対値行列・位相行列を返し、project(u) で任意ベクトル u への複素内積射影を返すクラス ComplexMatrixDecomposer を設計せよ。エラー時は例外&loguruに記録すること。 |
問題文 | クラス ComplexMatrixDecomposer(A: np.ndarray) を実装せよ。① get_polar_form() で極形式(abs, angle)の2行列を返す。② project(u: np.ndarray) でベクトル u への複素射影($A \mathbf{u} / (\mathbf{u}^\dagger \mathbf{u})$)を返す。型不整合・shape違反・非複素型は例外+ログ出力すること。元行列・ベクトルは非破壊で扱うこと。 |
要件 |
np.abs , np.angle で極形式取得/np.dot や@ で射影計算/dtype, shapeの厳密検証/例外&log記録/複素ベクトル化演算/非破壊処理 |
発展仕様 | ・複素型以外(実数やfloat型)はTypeError ・入力shape不一致・1次元ベクトルでない場合はValueError ・ノルムゼロの場合の例外 ・loguruで全操作を記録 |
使用構文 |
np.abs , np.angle , np.dot , np.conj , np.linalg.norm , loguru.logger , try-except , ValueError , TypeError , @property
|
A.13
■ 模範解答
import numpy as np
from loguru import logger
class ComplexMatrixDecomposer:
def __init__(self, A: np.ndarray):
# 入力型・次元・複素型検証
if not isinstance(A, np.ndarray):
logger.error("A must be a numpy.ndarray.")
raise TypeError("A must be a numpy.ndarray.")
if not np.issubdtype(A.dtype, np.complexfloating):
logger.error("A must be a complex-valued array.")
raise TypeError("A must be a complex-valued array.")
if A.ndim != 2:
logger.error("A must be a 2D matrix.")
raise ValueError("A must be a 2D matrix.")
self._A = np.copy(A) # 非破壊保存
logger.info(f"ComplexMatrixDecomposer initialized: shape={A.shape}")
def get_polar_form(self):
# 複素行列を極形式に変換:絶対値・位相
abs_mat = np.abs(self._A)
angle_mat = np.angle(self._A)
logger.debug(f"Abs matrix:\n{abs_mat}\nAngle matrix:\n{angle_mat}")
return abs_mat, angle_mat
def project(self, u: np.ndarray):
# u型検証:1次元・複素型・shape一致
if not isinstance(u, np.ndarray):
logger.error("u must be a numpy.ndarray.")
raise TypeError("u must be a numpy.ndarray.")
if not np.issubdtype(u.dtype, np.complexfloating):
logger.error("u must be a complex-valued array.")
raise TypeError("u must be a complex-valued array.")
if u.ndim != 1:
logger.error("u must be a 1D vector.")
raise ValueError("u must be a 1D vector.")
if u.shape[0] != self._A.shape[1]:
logger.error(f"Shape mismatch: A shape {self._A.shape}, u shape {u.shape}")
raise ValueError("Shape mismatch between A and u.")
# ノルム(二乗和)がゼロでないか検証
norm2 = np.dot(np.conj(u), u)
if norm2 == 0:
logger.error("Projection vector u has zero norm.")
raise ValueError("Projection vector u has zero norm.")
# 複素射影 A @ u / (u† u)
proj = np.dot(self._A, u) / norm2
logger.info("Projection computed successfully.")
return proj
@property
def matrix(self):
# 保持している元の複素行列のコピーを返す
return np.copy(self._A)
実行例1:正常系(極形式変換・射影)
A = np.array([[1+1j, 2-1j], [3+0j, 4+4j]], dtype=np.complex128)
decomposer = ComplexMatrixDecomposer(A)
# 極形式取得
abs_mat, angle_mat = decomposer.get_polar_form()
print("abs:\n", abs_mat)
print("angle:\n", angle_mat)
# 射影
u = np.array([1+0j, 0+1j], dtype=np.complex128)
proj = decomposer.project(u)
print("projected:", proj)
実行結果1
abs:
[[1.41421356 2.23606798]
[3. 5.65685425]]
angle:
[[ 0.78539816 -0.46364761]
[ 0. 0.78539816]]
projected: [0.5+1.5j 2.5+2.5j]
実行ログ1
INFO ComplexMatrixDecomposer initialized: shape=(2, 2)
DEBUG Abs matrix:
[[1.41421356 2.23606798]
[3. 5.65685425]]
Angle matrix:
[[ 0.78539816 -0.46364761]
[ 0. 0.78539816]]
INFO Projection computed successfully.
実行例2:異常系(実数型A)
A = np.array([[1.0, 2.0], [3.0, 4.0]])
decomposer = ComplexMatrixDecomposer(A)
実行結果2
TypeError: A must be a complex-valued array.
実行ログ2
ERROR A must be a complex-valued array.
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
np.abs(A) |
複素行列Aの各要素について絶対値を一括計算 |
np.angle(A) |
複素行列Aの各要素について偏角(位相角)を一括計算 |
np.dot(A, u) |
行列Aとベクトルuの内積(行列ベクトル積)。射影の分子。 |
np.conj(u) |
ベクトルuの複素共役を返す。ノルム(二乗和)や射影で必須 |
/ norm2 |
複素内積射影でスカラー正規化(u† u≠0) |
logger |
loguruによる INFO /DEBUG /ERROR で全処理・異常を記録 |
例外処理 | 型・shape不整合・ノルム0時などをPython例外+ログで詳細通知 |
Q.14
項目 | 内容 |
---|---|
概要 | 2D配列を指定されたブロックサイズで分割し、ブロック単位でランダムにシャッフルする BlockShuffler クラス。reshape+swapaxesによる高次元化・ベクトル化、seed制御、例外・log対応も含める。 |
問題文 | クラス BlockShuffler(block_rows: int, block_cols: int, seed: Optional[int] = None) を定義し、__call__(arr) で (block_rows, block_cols) ごとに分割したブロックをランダムシャッフルし新配列を返せ。入力shape・ブロック不整合時は例外+log記録すること。 |
要件 | reshape+swapaxesによるブロック分割/ブロック単位シャッフル/seed指定可/非破壊処理/例外・log記録 |
発展仕様 |
|
使用構文 |
np.reshape , np.swapaxes , np.random.default_rng , np.copy , loguru.logger , ValueError
|
A.14
■ 模範解答
import numpy as np
from loguru import logger
from typing import Optional
class BlockShuffler:
def __init__(self, block_rows: int, block_cols: int, seed: Optional[int] = None):
# ブロックサイズの正当性検証
if not (isinstance(block_rows, int) and block_rows > 0 and isinstance(block_cols, int) and block_cols > 0):
logger.error("Block sizes must be positive integers.")
raise ValueError("Block sizes must be positive integers.")
self.block_rows = block_rows
self.block_cols = block_cols
self.rng = np.random.default_rng(seed)
logger.info(f"BlockShuffler initialized (block_rows={block_rows}, block_cols={block_cols}, seed={seed})")
def __call__(self, arr: np.ndarray) -> np.ndarray:
try:
# 入力が2D配列か検証
if not isinstance(arr, np.ndarray) or arr.ndim != 2:
logger.error("Input must be a 2D numpy array.")
raise TypeError("Input must be a 2D numpy array.")
H, W = arr.shape
br, bc = self.block_rows, self.block_cols
# shapeがブロック単位で割り切れるか
if H % br != 0 or W % bc != 0:
logger.error(f"Array shape {arr.shape} not divisible by block size {(br, bc)}.")
raise ValueError("Array shape not divisible by block size.")
n_block_rows = H // br
n_block_cols = W // bc
logger.debug(f"Reshaping to ({n_block_rows}, {br}, {n_block_cols}, {bc})")
# 1. reshapeで高次元化し、各ブロックを分離
blocks = arr.reshape(n_block_rows, br, n_block_cols, bc)
# 2. 軸を並び替え (n_block_rows, n_block_cols, br, bc)
blocks = blocks.swapaxes(1, 2)
# 3. 全ブロックを1次元にflatten→shuffle
flat_blocks = blocks.reshape(-1, br, bc)
self.rng.shuffle(flat_blocks)
logger.info("Blocks shuffled.")
# 4. 戻す:ブロックを(n_block_rows, n_block_cols, br, bc)に復元
shuffled_blocks = flat_blocks.reshape(n_block_rows, n_block_cols, br, bc)
# 5. 軸を戻し (n_block_rows, br, n_block_cols, bc)
shuffled_blocks = shuffled_blocks.swapaxes(1, 2)
# 6. 最終的に配列を元の形に結合
result = shuffled_blocks.reshape(H, W)
logger.info("Block shuffle completed successfully.")
return result
except Exception as e:
logger.exception(f"Block shuffling failed: {e}")
raise
実行例1:正常系(4×4配列・2×2ブロック)
arr = np.arange(16).reshape(4, 4)
shuffler = BlockShuffler(block_rows=2, block_cols=2, seed=42)
shuffled = shuffler(arr)
print(shuffled)
実行結果1
[[ 0 1 10 11]
[ 2 3 12 13]
[ 8 9 6 7]
[14 15 4 5]]
実行ログ1
INFO BlockShuffler initialized (block_rows=2, block_cols=2, seed=42)
DEBUG Reshaping to (2, 2, 2, 2)
INFO Blocks shuffled.
INFO Block shuffle completed successfully.
実行例2:異常系(実数型A)
arr = np.arange(15).reshape(3, 5)
shuffler = BlockShuffler(block_rows=2, block_cols=2)
shuffler(arr)
実行結果2
ValueError: Array shape not divisible by block size.
実行ログ2
INFO BlockShuffler initialized (block_rows=2, block_cols=2, seed=None)
ERROR Array shape (3, 5) not divisible by block size (2, 2).
■ 文法・構文まとめ
構文・機能 | 解説 |
---|---|
reshape(n1, b1, n2, b2) |
配列をブロック単位で4次元に変換 |
swapaxes(1, 2) |
ブロック行・列の並び替え(分割単位を直感的にするため) |
flatten→shuffle→reshape |
全ブロックを1次元リストとして一括シャッフルし、その後復元 |
default_rng(seed) |
安全・高速な乱数生成器。seed指定で再現性確保 |
logger.debug/info/error/exception |
loguruで全過程・異常時も漏れなく記録 |
Q.15
項目 | 内容 |
---|---|
概要 | 非正方の2次元行列に対して、左右上下に等距離で0パディングし、最小の正方形へ変換する symmetric_pad 関数。パディング幅の自動計算・非破壊・型安全・log記録も求められる。 |
問題文 |
symmetric_pad(arr: np.ndarray) -> np.ndarray を定義し、入力 arr が2Dでなければ例外送出し、必要な場合にのみ等距離0パディングで正方行列に整形して返すこと。パディング幅は自動算出。全処理・例外はloguruで記録すること。 |
要件 | shape比較/pad幅自動算出/非破壊/2次元検証/log記録/ValueError例外 |
発展仕様 |
|
使用構文 |
np.pad , tuple unpacking , ValueError , loguru.logger , np.copy , divmod
|
A.15
■ 模範解答
import numpy as np
from loguru import logger
def symmetric_pad(arr: np.ndarray) -> np.ndarray:
try:
# 2次元配列であるか確認
if not isinstance(arr, np.ndarray) or arr.ndim != 2:
logger.error("Input must be a 2D numpy array.")
raise ValueError("Input must be a 2D numpy array.")
rows, cols = arr.shape
logger.info(f"Input shape: {arr.shape}")
# 既に正方形ならそのまま返却(非破壊コピー)
if rows == cols:
logger.info("Input is already square. Returning a copy.")
return np.copy(arr)
# 正方形にするための目標サイズ
max_dim = max(rows, cols)
# 各軸のpad幅(左右・上下それぞれ等分し、余りは右・下に追加)
row_pad = max_dim - rows
col_pad = max_dim - cols
pad_top, pad_bottom = divmod(row_pad, 2)
pad_left, pad_right = divmod(col_pad, 2)
# np.pad用パラメータ
pad_width = ((pad_top, pad_bottom), (pad_left, pad_right))
logger.debug(f"Pad width: {pad_width}")
# パディング適用
padded = np.pad(arr, pad_width, mode="constant", constant_values=0)
logger.info(f"Output shape: {padded.shape}")
return padded
except Exception as e:
logger.exception(f"symmetric_pad failed: {e}")
raise
実行例1:正常系(3×5配列を正方パディング)
arr = np.arange(15).reshape(3, 5)
result = symmetric_pad(arr)
print(result)
実行結果1
[[ 0 0 0 0 0]
[ 0 0 1 2 3]
[ 4 5 6 7 8]
[ 9 10 11 12 13]
[14 0 0 0 0]]
実行ログ1
INFO Input shape: (3, 5)
DEBUG Pad width: ((1, 1), (0, 0))
INFO Output shape: (5, 5)
実行例2:異常系(2Dでない配列)
arr = np.array([1, 2, 3])
symmetric_pad(arr)
実行結果2
ValueError: Array shape not divisible by block size.
実行ログ2
ERROR Input must be a 2D numpy array.
■ 文法・構文まとめ
機能・構文 | 解説 |
---|---|
np.pad(arr, pad_width, mode="constant", constant_values=0) |
指定幅の左右上下パディング。mode="constant"でゼロ埋め |
divmod(a, 2) |
a を2で割った商・余りをまとめて返す(余り分は下or右にpadを多く配置) |
np.copy(arr) |
非破壊設計のため返却前に必ずコピー |
logger.info/debug/error/exception |
loguruによる詳細ログ出力。異常系・成功系両方の挙動追跡に活用 |
例外処理 | 型や次元不正時にはValueError+loggerで詳細エラーメッセージを出力 |