初めに
df =pd.read_csv(R"C:\Users\tata5\Desktop\株\data\株.csv")
df["Date"] = pd.to_datetime(df["Date"])
df["Return"] = df["Close"].pct_change()
df = df.fillna(0)
共通の前処理
# 共通の前処理
def preprocess(data_train, data_test, subset):
# Split: common/factor data
common_train = data_train[[c for c in data_train.columns if not c in subset]]
common_test = data_test[[c for c in data_test.columns if not c in subset]]
factor_train = data_train[subset]
factor_test = data_test[subset]
# Do something
# ...
proc_train = factor_train.copy()
proc_test = factor_test.copy()
# Concat label
result_train = pd.concat([common_train, proc_train], axis=1)
result_test = pd.concat([common_test, proc_test], axis=1)
return result_train, result_test
予測結果について
# 共通の前処理
def preprocess(data_train, data_test, subset):
# Split: common/factor data
common_train = data_train[[c for c in data_train.columns if not c in subset]]
common_test = data_test[[c for c in data_test.columns if not c in subset]]
factor_train = data_train[subset]
factor_test = data_test[subset]
# Do something
# ...
proc_train = factor_train.copy()
proc_test = factor_test.copy()
# Concat label
result_train = pd.concat([common_train, proc_train], axis=1)
result_test = pd.concat([common_test, proc_test], axis=1)
return result_train, result_test
data_train = df[df["Date"]< "2022-08-01"]
data_test = df[df["Date"] >="2022-08-01"]
import sklearn
def Ridge_score(x_train, x_test, y_train, y_test, show_result=False):
# Make model
#model_linear = sklearn.linear_model.LinearRegression(fit_intercept=True, normalize=False, n_jobs=-2)
model_linear = RidgeCV(alphas=[0.1, 1, 10],cv = 5) #0.1から1までのalphaを10回行う。
# Do something
# Training model
model_linear.fit(x_train, y_train)
# predict for test data
pred_train = model_linear.predict(x_train)
pred_test = model_linear.predict(x_test)
plt.rcParams['figure.figsize'] = 15,12
coefs_0d = np.squeeze(model_linear.coef_) # (1,30)->(30,)
plt.figure(figsize=(9,5))
plt.bar(range(len(coefs_0d)), coefs_0d, color='dimgray')
plt.xticks(range(len(subset)), subset, rotation=89)
plt.show()
import scipy.stats as stats
residual = y_train - pred_train
stats.probplot(residual, dist="norm", plot=plt)
plt.show()
if show_result:
print("Coef: {}".format(model_linear.coef_))
print("Intercept: {:.4f}".format(model_linear.intercept_))
print("MSE: {:.4f}".format(sklearn.metrics.mean_squared_error(y_test, pred_test)))
print("R1: {:.5f}".format(sklearn.metrics.r2_score(y_test, pred_test)))
return pred_train, pred_test
data_train["Score"], data_test["Score"] = Ridge_score(x_train = data_train[subset], x_test=data_test[subset],y_train=data_train["Return"], y_test=data_test["Return"],show_result=True)
# 予測を元にしてデータを5つに分類
def quantile(data_df, q=5, weighted=False):
"""
data_df: DataFrame which have "Date", "Return", "Score" columns.
"""
#Liquidity
# Quantile label
score_df = data_df[["Date", "log_diff_fill_close", "Score"]].copy() # wheightを消して、Liquidityに変更した。
labels = ["Q{}".format(i+1) for i in range(q)][::-1]
score_df["Q"] = score_df.groupby("Date")["Score"].transform(lambda x: pd.qcut(x, q, labels=labels)).astype(str)# スコアごとにgroupby
# Quantile return
if weighted:
qrtn_df = score_df.groupby(["Date", "Q"]).apply(lambda x: np.average(x["log_diff_fill_close"], weights=x["Liquidity"])).reset_index()
qrtn_df = qrtn_df.rename(columns={0: "Return"})
else:
qrtn_df = score_df.groupby(["Date", "Q"])[["log_diff_fill_close"]].mean().reset_index()
qrtn_df = qrtn_df.pivot(index="Date", columns="Q", values="log_diff_fill_close")
qrtn_df["LS"] = qrtn_df["Q1"] - qrtn_df["Q{}".format(q)]
return qrtn_df
def calc_perf(qrtn_df, rf_rate=0.0, show_result=False):
"""
rf_rate: Risk free rate. (Default=0)
"""
# Calc performance
# Annual return
num_years = qrtn_df.shape[0] / 12
cum_return = (1 + qrtn_df).prod()
ann_return = (np.sign(cum_return) * pow(abs(cum_return), 1 / num_years)) - 1
# Annual risk
ann_risk = qrtn_df.std() * np.sqrt(12)
# Sharpe ratio
sharpe_ratio = (ann_return - rf_rate) / ann_risk
# Max drawdown
cum_price = (1 + qrtn_df).cumprod()
cum_price.loc[pd.date_range(qrtn_df.index[-1], "2100-01-01", freq="BM")[1]] = None
cum_price = cum_price.sort_index().shift(1).fillna(1)
max_drawdown = (cum_price / cum_price.cummax() - 1).min()
perf_df = pd.DataFrame({"Annual Return": ann_return, "Annual Risk": ann_risk,
"Sharpe Ratio": sharpe_ratio, "Max Drawdown": max_drawdown}).T
# Show performance
if show_result:
print(perf_df)
cum_price.plot()
plt.show()
plt.close("all")
return perf_df
# Show performance
perf_train = calc_perf(qrtn_train, show_result=True)
perf_test = calc_perf(qrtn_test, show_result=True)