LoginSignup
0
0

ファクターモデル_一連の流れ

Last updated at Posted at 2024-05-01

初めに

df =pd.read_csv(R"C:\Users\tata5\Desktop\株\data\株.csv")
df["Date"] = pd.to_datetime(df["Date"])
df["Return"] = df["Close"].pct_change()
df = df.fillna(0)

共通の前処理

# 共通の前処理
def preprocess(data_train, data_test, subset):
    # Split: common/factor data
    common_train = data_train[[c for c in data_train.columns if not c in subset]]
    common_test = data_test[[c for c in data_test.columns if not c in subset]]
    factor_train = data_train[subset]
    factor_test = data_test[subset]

    # Do something
    # ...
    proc_train = factor_train.copy()
    proc_test = factor_test.copy()

    # Concat label
    result_train = pd.concat([common_train, proc_train], axis=1)
    result_test = pd.concat([common_test, proc_test], axis=1)
    return result_train, result_test

予測結果について

# 共通の前処理
def preprocess(data_train, data_test, subset):
    # Split: common/factor data
    common_train = data_train[[c for c in data_train.columns if not c in subset]]
    common_test = data_test[[c for c in data_test.columns if not c in subset]]
    factor_train = data_train[subset]
    factor_test = data_test[subset]

    # Do something
    # ...
    proc_train = factor_train.copy()
    proc_test = factor_test.copy()

    # Concat label
    result_train = pd.concat([common_train, proc_train], axis=1)
    result_test = pd.concat([common_test, proc_test], axis=1)
    return result_train, result_test
data_train = df[df["Date"]< "2022-08-01"]
data_test = df[df["Date"] >="2022-08-01"]
import sklearn
def Ridge_score(x_train, x_test, y_train, y_test, show_result=False):
        # Make model
        #model_linear = sklearn.linear_model.LinearRegression(fit_intercept=True, normalize=False, n_jobs=-2)
        model_linear = RidgeCV(alphas=[0.1, 1, 10],cv = 5) #0.1から1までのalphaを10回行う。
        # Do something
        # Training model
        model_linear.fit(x_train, y_train)

        # predict for test data
        pred_train = model_linear.predict(x_train)
        pred_test = model_linear.predict(x_test)

        plt.rcParams['figure.figsize'] = 15,12

        coefs_0d = np.squeeze(model_linear.coef_) # (1,30)->(30,)

        plt.figure(figsize=(9,5))
        plt.bar(range(len(coefs_0d)), coefs_0d, color='dimgray')
        plt.xticks(range(len(subset)), subset, rotation=89)
        plt.show()

        import scipy.stats as stats
        residual = y_train - pred_train
        stats.probplot(residual, dist="norm", plot=plt)
        plt.show()

        if show_result:
            print("Coef: {}".format(model_linear.coef_))
            print("Intercept: {:.4f}".format(model_linear.intercept_))
            print("MSE: {:.4f}".format(sklearn.metrics.mean_squared_error(y_test, pred_test)))
            print("R1: {:.5f}".format(sklearn.metrics.r2_score(y_test, pred_test)))
        return pred_train, pred_test
data_train["Score"], data_test["Score"] = Ridge_score(x_train = data_train[subset], x_test=data_test[subset],y_train=data_train["Return"], y_test=data_test["Return"],show_result=True)
# 予測を元にしてデータを5つに分類
def quantile(data_df, q=5, weighted=False):
    """
    data_df: DataFrame which have "Date", "Return", "Score" columns.
    """
    
    #Liquidity
    # Quantile label
    score_df = data_df[["Date", "log_diff_fill_close", "Score"]].copy() # wheightを消して、Liquidityに変更した。
    labels = ["Q{}".format(i+1) for i in range(q)][::-1]
    score_df["Q"] = score_df.groupby("Date")["Score"].transform(lambda x: pd.qcut(x, q, labels=labels)).astype(str)# スコアごとにgroupby

    # Quantile return
    if weighted:
        qrtn_df = score_df.groupby(["Date", "Q"]).apply(lambda x: np.average(x["log_diff_fill_close"], weights=x["Liquidity"])).reset_index()
        qrtn_df = qrtn_df.rename(columns={0: "Return"})
    else:
        qrtn_df = score_df.groupby(["Date", "Q"])[["log_diff_fill_close"]].mean().reset_index()
    qrtn_df = qrtn_df.pivot(index="Date", columns="Q", values="log_diff_fill_close")
    qrtn_df["LS"] = qrtn_df["Q1"] - qrtn_df["Q{}".format(q)]
    return qrtn_df

def calc_perf(qrtn_df, rf_rate=0.0, show_result=False):
    """
    rf_rate: Risk free rate. (Default=0)
    """
    # Calc performance
    # Annual return
    num_years = qrtn_df.shape[0] / 12
    cum_return = (1 + qrtn_df).prod()
    ann_return = (np.sign(cum_return) * pow(abs(cum_return), 1 / num_years)) - 1
    # Annual risk
    ann_risk = qrtn_df.std() * np.sqrt(12)
    # Sharpe ratio
    sharpe_ratio = (ann_return - rf_rate) / ann_risk
    # Max drawdown
    cum_price = (1 + qrtn_df).cumprod()
    cum_price.loc[pd.date_range(qrtn_df.index[-1], "2100-01-01", freq="BM")[1]] = None
    cum_price = cum_price.sort_index().shift(1).fillna(1)
    max_drawdown = (cum_price / cum_price.cummax() - 1).min()
    perf_df = pd.DataFrame({"Annual Return": ann_return, "Annual Risk": ann_risk,
                            "Sharpe Ratio": sharpe_ratio, "Max Drawdown": max_drawdown}).T

    # Show performance
    if show_result:
        print(perf_df)
        cum_price.plot()
        plt.show()
        plt.close("all")
    return perf_df

# Show performance
perf_train = calc_perf(qrtn_train, show_result=True)
perf_test = calc_perf(qrtn_test, show_result=True)
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0