周期性の確認
自己相関関数
import statsmodels.api as sm
# 自己相関係数の計算
p_acf = sm.tsa.stattools.acf(daily_counts)
print(p_acf)
# 自己相関係数の可視化
fig, ax = plt.subplots(figsize=(16, 10))
sm.graphics.tsa.plot_acf(daily_counts, lags=70, ax=ax)
ax.set_title("自己相関", fontsize=30)
ax.set_ylabel("相関係数", fontsize=30)
ax.set_xlabel("日数", fontsize=30)
ax.tick_params(labelsize=25, width=4, length=10)
plt.grid()
パワースペクトル
from scipy.signal import argrelextrema
plt.rcParams['figure.figsize'] = [12, 9]
# データセットの値を取得
# y = dataset.data['value'].values
y = daily_counts_df["オーダー日"]
# データのサイズとサンプリング間隔を設定
N = y.size
#N = len(daily_counts_df)
T = 1.0
# 高速フーリエ変換 (FFT)を実行
yf = np.fft.fft(y)
xf = np.fft.fftfreq(N, d=T)[:N//2]
# パワースペクトルを計算
power_spectrum = 2.0/N * np.abs(yf[0:N//2])
# パワースペクトルのピークを探す
peaks = argrelextrema(power_spectrum, np.greater)
# ピークの周波数とパワーを取得
peak_freqs = xf[peaks]
peak_powers = power_spectrum[peaks]
# ピークの周期を計算
peak_periods = 1 / peak_freqs
x=np.linspace(0,0.5,1000)
y=np.array([-79.5]*1000)
plt.plot(x,y,color="black",linewidth=10,label=f"$f^0$")
# パワースペクトルとピークをプロット
plt.xscale("log")
#plt.yscale("log")
peak_powers_pdf = peak_powers-peak_powers.sum()
power_spectrum_pdf = power_spectrum - power_spectrum.sum()
plt.plot(xf[1:], power_spectrum_pdf[1:], "o-",color="red")
plt.grid()
plt.title('日毎のパワースペクトル',fontsize=30)
plt.xlabel(r'周波数($f$) [1/4年(1239日)]', fontsize=20)
plt.ylabel('Power',fontsize=20)
plt.tick_params(labelsize=25, width=4, length=10)
plt.legend(fontsize=30)
plt.show()
ピークの優位性検定
def optimal_block_size(y, N_BOOTSTRAP, LOWER_PERCENTILE, UPPER_PERCENTILE):
# ブロックサイズの範囲を設定
block_size_range = list(range(2, 24)) # Adjust this to your preferred range
# 各ブロックサイズでブートストラップを実行し、信頼区間の幅の平均を記録
average_interval_widths = []
for block_size in block_size_range:
bootstrap_powers = np.zeros((N_BOOTSTRAP, len(y)//2))
for i in range(N_BOOTSTRAP):
bootstrap_sample = []
while len(bootstrap_sample) < len(y):
start_index = np.random.randint(0, len(y) - block_size + 1)
bootstrap_sample += list(y[start_index: start_index + block_size])
bootstrap_sample = bootstrap_sample[:len(y)] # Adjust the length of bootstrap_sample to N
bootstrap_yf = np.fft.fft(bootstrap_sample)
bootstrap_powers[i, :] = 2.0/len(y) * np.abs(bootstrap_yf[0:len(y)//2])
lower_bound = np.percentile(bootstrap_powers, LOWER_PERCENTILE, axis=0)
upper_bound = np.percentile(bootstrap_powers, UPPER_PERCENTILE, axis=0)
average_interval_width = np.mean(upper_bound - lower_bound)
average_interval_widths.append(average_interval_width)
# 最適なブロックサイズを選択
optimal_block_size = block_size_range[np.argmin(average_interval_widths)]
return optimal_block_size
N_BOOTSTRAP = 1000
LOWER_PERCENTILE = 0.5
UPPER_PERCENTILE = 99.5
block_size = optimal_block_size(y, N_BOOTSTRAP, LOWER_PERCENTILE, UPPER_PERCENTILE)
print("Optimal block size: ", block_size)
# ブートストラップサンプルの数とパーセンタイルを設定
N_BOOTSTRAP = 10000
LOWER_PERCENTILE = 0.5
UPPER_PERCENTILE = 100.5
# ブートストラップ法を用いて信頼区間を計算
n_bootstrap = N_BOOTSTRAP
bootstrap_powers = np.zeros((n_bootstrap, N//2))
# ブロックブートストラップのブロックサイズを設定
block_size = optimal_block_size(y, N_BOOTSTRAP, LOWER_PERCENTILE, UPPER_PERCENTILE)
# ブートストラップ法を用いて信頼区間を計算
n_bootstrap = N_BOOTSTRAP
bootstrap_powers = np.zeros((n_bootstrap, N//2))
for i in range(n_bootstrap):
bootstrap_sample = []
while len(bootstrap_sample) < N:
start_index = np.random.randint(0, N - block_size + 1)
bootstrap_sample += list(y[start_index: start_index + block_size])
bootstrap_sample = bootstrap_sample[:N] # Adjust the length of bootstrap_sample to N
bootstrap_yf = np.fft.fft(bootstrap_sample)
bootstrap_powers[i, :] = 2.0/N * np.abs(bootstrap_yf[0:N//2])
# ブートストラップパワーの下位と上位パーセンタイルを計算
lower_bound = np.percentile(bootstrap_powers, LOWER_PERCENTILE, axis=0)
upper_bound = np.percentile(bootstrap_powers, UPPER_PERCENTILE, axis=0)
# ピークが有意かどうかを判断(ピークパワーが信頼区間外であるか)
significant = np.logical_and(peak_powers > lower_bound[peaks], peak_powers > upper_bound[peaks])
# ピークの情報データフレームに有意情報を追加
peak_df = peak_df.sort_index()
peak_df['Significant'] = significant
# パワーの降順でピーク情報のデータフレームをソートして表示
peak_df = peak_df.sort_values('Power', ascending=False)
print(peak_df)
STL分解
# STL分解
stl=STL(daily_counts, period=7, robust=True)
stl_series = stl.fit()
# STL分解結果のグラフ化
stl_series.plot()
plt.show()
# STL分解結果のデータ
stl_o = stl_series.observed #観測データ(STL分解前の元のデータ)=トレンド+季節性+残差
stl_t = stl_series.trend #トレンド(trend)
stl_s = stl_series.seasonal #季節性(seasonal)
stl_r = stl_series.resid #残差(resid)
stl_t.plot() #トレンド(trend)のグラフ描写
stl_s.plot() #季節性(season)のグラフ描写
stl_r.plot() #残差(resid)のグラフ描写
plt.title('STL分解',fontsize=20) #グラフタイトル
#plt.ylabel('Monthly Number of Airline Passengers') #タテ軸のラベル
plt.xlabel('Date',fontsize=20) #ヨコ軸のラベル
plt.legend(fontsize=20) #凡例表示
plt.tick_params(axis='both', which='major', labelsize=15)
plt.show()
時系列データの前処理
if adf_test(daily_counts_df["オーダー日"].fillna(0), do_print=True):
print("対数価格系列は定常である")
else:
print("対数価格系列は非定常である")
daily_counts_df["オーダー日"].plot()
plt.xlabel('Date', fontsize=20)
plt.ylabel('元データ', fontsize=20)
plt.tick_params(axis='both', which='major', labelsize=15)
plt.title('日毎の来店回数(サンプルスーパーストア)',fontsize=20)
plt.legend()
plt.show()
def getWeights_FFD(d, thres):
w, k = [1.], 1
while True:
w_=-w[-1]/k * (d-k+1)
if abs(w_) < thres:
break
w.append(w_)
k += 1
return np.array(w[::-1]).reshape(-1,1)
def fracDiff_FFD(series:pd.Series, d:float, thres:float=1e-5) -> pd.DataFrame:
"""
固定幅ウィンドウによる分数次差分の実装
series: 価格系列データ
d: 差分の字数
thres: tauに対応する閾値
"""
w= getWeights_FFD(d, thres)
width = len(w)-1
df = {}
for name in series.columns:
seriesF, df_ = series[[name]].fillna(method='ffill').dropna(), pd.Series()
for iloc1 in range(width, seriesF.shape[0]):
loc0, loc1 = seriesF.index[iloc1-width], seriesF.index[iloc1]
if not np.isfinite(series.loc[loc1, name]):
continue
df_[loc1] = np.dot(w.T, seriesF.loc[loc0:loc1])[0,0]
df[name] = df_.copy(deep=True)
df=pd.concat(df, axis=1)
return df
def plot_min_ffd(df, col):
"""
与えられた特徴量を分数次差分で変換する際の適切な次数を調べる関数
Parameters
==========
df: pandas dataframe
特徴量が格納されたdataframe
col: str
特徴量の名前
"""
out = pd.DataFrame(columns=['adfStat', 'pCal', 'lags', 'nobs',
'95% conf', 'corr'])
d_list = np.linspace(0, 1, 11)
for d in d_list:
print(f'd: {d}')
series1 = df[[col]]
series2 = fracDiff_FFD(df[[col]], d, thres=0.01)
corr = np.corrcoef(series1.loc[series2.index, col].values, series2.values.reshape(-1))[0, 1]
stats = adfuller(series2.values.reshape(-1), maxlag=1, regression='c', autolag=None)
out.loc[d] = list(stats[:4]) + [stats[4]['5%']] + [corr]
fig = plt.figure(figsize=(10, 7))
ax1 = fig.add_subplot(111)
ax1.plot(d_list, out['adfStat'].values, color='g', label='adfStats')
ax1.hlines(np.mean(out['95% 信頼区間']), 0, 1, colors='red', linestyle='dashed', linewidth=2, label='95% conf')
ax2 = ax1.twinx()
ax2.plot(d_list, out['corr'].values, color='blue', label='corr')
h1, l1 = ax1.get_legend_handles_labels()
h2, l2 = ax2.get_legend_handles_labels()
ax1.legend(h1+h2, l1+l2, loc='lower left')
ax1.grid(axis='x')
ax1.set_xlabel(次数D', size='large')
ax1.set_ylabel('ADF統計量', size='large')
ax2.set_ylabel('相関', size='large')
ax1.set_title('次数dに対する相関とADF統計量の変化',fontxize=20)
plt.show()
plot_min_ffd(daily_counts_df, "オーダー日")
多変量時系列モデル
def prepreprocess(ewm_span, threshold, daily_counts_df):
daily_counts_df["diff"] = daily_counts_df["オーダー日"].diff()
daily_counts_df["log_オーダー日"] = np.log(daily_counts_df["オーダー日"])
daily_counts_df["diff_log_オーダー日"] = daily_counts_df["log_オーダー日"].diff()
ewm_span=ewm_span
threshold = threshold
ewm_mean = daily_counts_df.ewm(span=ewm_span).mean() # 指数加重移動平均
ewm_std = daily_counts_df.ewm(span=ewm_span).std() # 指数加重移動標準偏差
daily_counts_df = daily_counts_df.where((daily_counts_df - ewm_mean).abs() < ewm_std * threshold,daily_counts_df.ewm(span=90).mean())
return daily_counts_df
daily_counts_df_kagu = prepreprocess(ewm_span=90, threshold=2, daily_counts_df=daily_counts_df_kagu)
daily_counts_df_zimu = prepreprocess(ewm_span=90, threshold=2, daily_counts_df=daily_counts_df_zimu)
daily_counts_df_kaden = prepreprocess(ewm_span=90, threshold=2, daily_counts_df=daily_counts_df_kaden)
category_df = pd.concat([pd.DataFrame(daily_counts_df_kagu["log_オーダー日"]),
pd.DataFrame(daily_counts_df_zimu["log_オーダー日"]),
pd.DataFrame(daily_counts_df_kaden["log_オーダー日"]),],axis=1)
category_list = ["家具","事務用品","家電"]
category_df.columns=category_list
category_df
# ヒートマップで表してみる。
# ここではラグなし(k=0)の時をしめす
def plot_corr_heatmap(df, figsize=(16, 16)):
'''
相互相関のヒートマップをプロット
Parameters
----------
df: pandas Dataframe
figsize: int
figure size
Returns
-------
'''
df_corr = df.corr() # 相互相関を計算
fig, ax = plt.subplots(figsize=figsize)
fig.tight_layout()
heatmap = ax.pcolor(df_corr, cmap=plt.cm.jet)
ax.set_xticks(np.arange(df_corr.shape[0]) + 0.5, minor=False)
ax.set_yticks(np.arange(df_corr.shape[1]) + 0.5, minor=False)
ax.invert_yaxis()
ax.xaxis.tick_top()
ax.set_xticklabels(df_corr.columns.values, minor=False, rotation=90,fontsize=20)
ax.set_yticklabels(df_corr.index.values, minor=False,fontsize=20)
pp = fig.colorbar(heatmap, aspect=20)
plt.show()
return df_corr
df_corr = plot_corr_heatmap(category_df)
相関係数を距離に変換
df_corr_distance = np.sqrt(2*(1-np.array(df_corr)))
# ネットワークを作成する
graph = nx.from_numpy_array(df_corr_distance)
# 最小全域木を作成する
mst = nx.minimum_spanning_tree(graph)
# グラフを描画する
pos = nx.spring_layout(mst)
fig, ax = plt.subplots(figsize=(20, 20))
nx.draw_networkx_nodes(mst, pos=pos,node_size=300, node_color='r',edgecolors="black",node_shape="s")
nx.draw_networkx_edges(mst, pos=pos)
for idx, (x, y) in pos.items():
ax.text(x, y, category_list[idx], ha='center', va='center',fontsize=40)
plt.axis('off')
plt.show()
時系列モデル
df["オーダー日"]=pd.to_datetime(df["オーダー日"])
daily_counts = df['オーダー日'].value_counts().sort_index()
daily_counts_df = pd.DataFrame(daily_counts)
daily_counts_df["diff"] = daily_counts_df["オーダー日"].diff()
daily_counts_df["log_オーダー日"] = np.log(daily_counts_df["オーダー日"])
daily_counts_df["diff_log_オーダー日"] = daily_counts_df["log_オーダー日"].diff()
# データと予測対象になるラベルの用意
Xy = daily_counts_df[["diff_log_オーダー日"]]
# 予測対象として,diff_log_closeの1ステップ先の値を設定しています.
Xy["y"] = daily_counts_df["diff_log_オーダー日"].shift(-1).astype(float)
Xy.dropna(inplace=True)
Xy.head()
# 関数の用意
# 可視化のための関数
def plot_result(target, pred, title, ylabel):
plt.figure(figsize=(15,5))
plt.plot(target.index, target, c='blue', label='実際', marker='.')
plt.plot(target.index, pred, c='r', label='予測値', marker='.')
plt.ylabel(ylabel, fontsize=17)
plt.legend()
plt.title(title)
plt.show()
# 残差の可視化のための関数
def plot_resid(target, pred, title, ylabel):
plt.figure(figsize=(15,5))
plt.plot(target.index, target-pred, c='green', marker='.')
plt.ylabel(ylabel, fontsize=17)
plt.title(title)
plt.show()
# train validationの分割のための関数
def timeseries_train_val_split(Xy, target="y"):
# 時系列の前半75%を学習, 後半25%を検証に利用する
train, val = Xy[:int(len(Xy)*0.75)], Xy[int(len(Xy)*0.75)+10:]
trainX = train.drop([target],axis=1)
trainy = train[target]
valX = val.drop([target],axis=1)
valy = val[target]
return trainX, trainy, valX, valy
from sklearn.metrics import accuracy_score
# 上昇もしくは下落の予測のaccuracyを測定する関数
def eval_direction(target, pred):
target = np.where(np.array(target) > 0, 1, -1)
pred = np.where(np.array(pred) > 0, 1, -1)
print("accuracy", accuracy_score(target, pred))
# trainとvalidationの分割
trainX, trainy, valX, valy = timeseries_train_val_split(Xy, target="y")
# AR(1)モデルの設定とfitting
lr = LinearRegression()
lr.fit(trainX[["diff_log_オーダー日"]], trainy)
pred = lr.predict(valX[["diff_log_オーダー日"]])
# 結果のプロット
plot_result(target=valy, pred=pred, title="AR(1)モデルの予測結果", ylabel="diff_log_ymd_date")
plot_resid(valy, pred, title="AR(1)モデルの誤差", ylabel="diff_log_ymd_date")