流れ
データの準備:
時系列データとそれに対応するラベルを準備します。データの中でラベルが1の連続する部分を特定し、その間のラベルが0である部分をまとまりとして抽出します。
まとまりの生成:
ラベルが1である連続部分を特定し、その間のラベルが0である部分をまとまりとして抽出します。まとまりは、ラベルが1の連続部分の前後のインデックスを用いて切り出すことができます。
特徴量の抽出:
各まとまりに対して必要な特徴量を抽出します。この場合、まとまり内のデータそのものや統計的な特徴量などを考慮することができます。
機械学習モデルの学習:
特徴量と対応するラベルを使用して機械学習モデルを学習させます。この場合、ラベルが1であるまとまりと0であるまとまりを区別するためのモデルを選択します。
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
# サンプルデータ生成
np.random.seed(42)
time_series_data = np.concatenate([np.zeros(50),
np.ones(30),
np.zeros(20),
np.ones(40),
np.zeros(10)])
labels = [0, 1, 1, 0, 1, 0, 1, 1, 1, 0] # ラベルは1の間に0が連続している部分を示す
# ラベル1のまとまりの特定
start_index = None
segments = []
for i, label in enumerate(labels):
if label == 1 and start_index is None:
start_index = i
elif label == 0 and start_index is not None:
segment = time_series_data[start_index:i]
segments.append(segment)
start_index = None
# 特徴量の抽出
def extract_features(segment):
return [np.mean(segment), np.std(segment)]
X = [extract_features(segment) for segment in segments]
y = [1] * len(segments) # ラベル1のまとまりに対してラベルを1とする
# 学習データとテストデータに分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 特徴量の標準化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 機械学習モデルの学習
model = SVC(kernel='linear')
model.fit(X_train_scaled, y_train)
# モデルの評価
y_pred = model.predict(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)
print("Accuracy:", accuracy)
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
# サンプルデータ生成
np.random.seed(42)
time_series_data = np.concatenate([np.zeros(50),
np.ones(30),
np.array([2, 3, 4, 5]),
np.ones(40),
np.zeros(10)])
labels = [0, 1, 1, 0, 1, 0, 1, 1, 1, 0] # ラベルは1の間に変動がある部分を示す
# ラベル1のまとまりの特定
start_index = None
segments = []
for i, label in enumerate(labels):
if label == 1 and start_index is None:
start_index = i
elif label == 0 and start_index is not None:
segment = time_series_data[start_index:i]
segments.append(segment)
start_index = None
# 特徴量の抽出
def extract_features(segment):
return [np.mean(segment), np.std(segment)]
X = [extract_features(segment) for segment in segments]
y = [1] * len(segments) # ラベル1のまとまりに対してラベルを1とする
# 学習データとテストデータに分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 特徴量の標準化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 機械学習モデルの学習
model = SVC(kernel='linear')
model.fit(X_train_scaled, y_train)
# モデルの評価
y_pred = model.predict(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)
print("Accuracy:", accuracy)
変化点検出:
行動の変化点を検出するために、時系列データのセグメンテーション技術を使用します。代表的な手法としては、CUSUMアルゴリズムやSAXアルゴリズムなどがあります。これらの手法は、時系列データの異常検出や変化点検出に利用されます。
クラスタリング:
変化点間のデータをクラスタリングして、同じ行動をしている間のまとまりを特定します。代表的なクラスタリング手法としては、K-means、DBSCAN、階層的クラスタリングなどがあります。クラスタリングによって行動パターンを特定し、それぞれのまとまりに対して行動の推定を行う準備をします。
学習データの生成:
各まとまりに対して、そのまとまり内でのセンサーデータを学習データとして準備します。まとまりごとにラベルを付けることで、教師あり学習のデータセットを作成します。
機械学習モデルの学習:
学習データを用いて、機械学習モデルを学習させます。推定したい行動に合わせて適切な分類器を選択します。例えば、ランダムフォレスト、サポートベクターマシン、ニューラルネットワークなどが考えられます。
# 変化点検出
change_points = detect_change_points(time_series_data)
# まとまりの生成と特徴量の抽出
segments = []
for i in range(len(change_points) - 1):
segment = time_series_data[change_points[i]:change_points[i+1]]
features = extract_features(segment)
segments.append(features)
# ラベル付け
labels = assign_labels_to_segments(segments)
# 学習データの作成
X = segments
y = labels
# 機械学習モデルの学習
model = train_machine_learning_model(X, y)
# 新しいまとまりのラベル予測
new_segments = get_new_segments(new_time_series_data, change_points)
predicted_labels = model.predict(new_segments)
import numpy as np
# サンプルデータ生成
np.random.seed(42)
time_series_data = np.concatenate([np.random.normal(0, 0.5, 50),
np.random.normal(3, 0.5, 50),
np.random.normal(0, 0.5, 50)])
# 変化点検出
def detect_change_points(data, threshold):
change_points = []
for i in range(1, len(data)):
if abs(data[i] - data[i-1]) > threshold:
change_points.append(i)
return change_points
threshold = 1.5
change_points = detect_change_points(time_series_data, threshold)
print("Change Points:", change_points)
# まとまりの生成
segments = []
for i in range(len(change_points) - 1):
segment = time_series_data[change_points[i]:change_points[i+1]]
segments.append(segment)
print("Segments:", segments)
## 変化点検出
クラスタリング
K-means クラスタリングは、与えられたデータを指定されたクラスタ数に分割する手法です。各クラスタの中心とデータ点との距離の二乗和を最小化することを目指します。K-means はデータが比較的等方的な場合に効果的ですが、クラスタ数を事前に指定する必要があります。
from sklearn.cluster import KMeans
# データの前処理や特徴量の選択が必要
X = preprocess_data()
# K-means クラスタリング
kmeans = KMeans(n_clusters=3, random_state=42)
clusters = kmeans.fit_predict(X)
DBSCAN (Density-Based Spatial Clustering of Applications with Noise):
DBSCAN はデータの密度に基づいてクラスタを形成します。近くのデータ点を連結し、クラスタを形成する一方、低密度の領域を外れ値として扱います。クラスタ数を事前に指定する必要はなく、比較的小さなクラスタを検出する際に有効です。
from sklearn.cluster import DBSCAN
# データの前処理や特徴量の選択が必要
X = preprocess_data()
# DBSCAN クラスタリング
dbscan = DBSCAN(eps=0.5, min_samples=5)
clusters = dbscan.fit_predict(X)
Dynamic Time Warping (DTW) クラスタリング:
DTWは、異なる長さの時系列データの間での最適な類似度を計算する手法です。DTWはデータ間の時間的な歪みを許容するため、変化点検出やクラスタリングに適しています。
各時系列データ間のDTW距離行列を計算します。
距離行列を元に階層的クラスタリングを行います。
クラスタリングの階層構造から適切なクラスタ数を選択します。
import numpy as np
from scipy.spatial.distance import euclidean
from fastdtw import fastdtw # 必要なライブラリをインストール: pip install fastdtw
from scipy.cluster.hierarchy import linkage, fcluster
from sklearn.datasets import make_multilabel_classification
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
# 例としてランダムな時系列データを生成
X, y = make_multilabel_classification(n_samples=100, n_features=20, n_classes=5, n_labels=1, random_state=42)
# DTW距離を計算する関数
def dtw_distance(x, y):
distance, _ = fastdtw(x, y, dist=euclidean)
return distance
# DTW距離行列を計算
n_samples = X.shape[0]
dist_matrix = np.zeros((n_samples, n_samples))
for i in range(n_samples):
for j in range(n_samples):
dist_matrix[i, j] = dtw_distance(X[i], X[j])
# 階層的クラスタリングを行う
linkage_matrix = linkage(dist_matrix, method='ward')
plt.figure(figsize=(10, 7))
dendrogram = dendrogram(linkage_matrix)
plt.show()
# クラスタリング結果からクラスタ数を選択
n_clusters = 3
clusters = fcluster(linkage_matrix, n_clusters, criterion='maxclust')
# 可視化のために次元削減
pca = PCA(n_components=3)
X_pca = pca.fit_transform(X)
# 3D散布図でクラスタリング結果を表示
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
for i in range(1, n_clusters + 1):
indices = np.where(clusters == i)
ax.scatter(X_pca[indices, 0], X_pca[indices, 1], X_pca[indices, 2], label=f'Cluster {i}')
ax.legend()
plt.show()