0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

python0.3

0
Last updated at Posted at 2026-05-16
def _first_entry_after_rsi(df: pd.DataFrame, rsi_idx: int) -> Optional[Dict[str, int]]:
    entry_candidates = (
        set(find_valley_points(df))
        & set(find_lower_shadow_points(df))
        & set(find_volume_spike_points(df))
    )
    rebound_set = set(find_rebound_stronger_than_decline_points(df, rsi_idx))

    for i in range(rsi_idx + 1, len(df) - 1):
        if i not in entry_candidates:
            continue
        if not find_band_walk_points(df, i):
            continue
        if i in rebound_set:
            return {"rsi_idx": rsi_idx, "entry_idx": i}
    return None


def _first_exit_after_entry(df: pd.DataFrame, entry_idx: int) -> Optional[int]:
    exit_candidates = sorted(find_upper_band_exit_points(df))
    for j in exit_candidates:
        if j > entry_idx:
            return j
    return None
    
def analyze_market_before_stages(chart: ChartData) -> Dict[str, Any]:
    base = {
        "latest_date": chart.latest_date,
        "latest_close": chart.latest_close,
        "rsi_pass": False,
        "bb_pass": False,
        "valley_pass": False,
        "exit_pass": False,
        "rsi_idx": None,
        "latest_rsi_idx": None,
        "entry_idx": None,
        "exit_idx": None,
        "rsi_count": 0,
        "entry_count": 0,
        "exit_count": 0,
        "reason": "",
    }

    df = _prepare_df(chart)
    if not _has_required_price_data(df):
        base["reason"] = "Not enough data"
        return base

    df = prepare_technical_indicators(df)
    base["df"] = df

    rsi_indices = find_rsi_cross_under(df, threshold=30)
    base["rsi_count"] = len(rsi_indices)
    if not rsi_indices:
        base["reason"] = "No RSI entry"
        return base

    base["rsi_pass"] = True
    base["latest_rsi_idx"] = rsi_indices[-1]
    chosen = None
    all_entries: List[int] = []
    all_exits: List[int] = []

    for rsi_idx in rsi_indices:
        found = _first_entry_after_rsi(df, rsi_idx)
        if found is None:
            continue

        all_entries.append(found["entry_idx"])
        if chosen is None:
            chosen = found

        exit_idx = _first_exit_after_entry(df, found["entry_idx"])
        if exit_idx is not None:
            all_exits.append(exit_idx)

    base["bb_pass"] = chosen is not None
    base["valley_pass"] = chosen is not None
    base["entry_count"] = len(all_entries)
    base["exit_count"] = len(all_exits)

    if chosen is not None:
        base["rsi_idx"] = chosen["rsi_idx"]
        base["entry_idx"] = chosen["entry_idx"]
        base["exit_idx"] = _first_exit_after_entry(df, chosen["entry_idx"])
        base["exit_pass"] = base["exit_idx"] is not None
        if not base["exit_pass"]:
            base["reason"] = "No +3sigma exit"
    else:
        base["reason"] = "No BB/Valley entry"

    return base


def build_market_before_exclusive_rows(
    ticker: str, company_name: str, chart: ChartData
) -> List[Dict[str, Any]]:
    """RSI検知1件につき4行セット(1st〜4th)を生成する。RSI未検知は空リストを返す。"""
    df = _prepare_df(chart)
    latest_date = chart.latest_date
    latest_close = chart.latest_close

    if not _has_required_price_data(df):
        return []

    df = prepare_technical_indicators(df)

    rsi_indices = find_rsi_cross_under(df, threshold=30)

    def _date_close(idx):
        if idx is None:
            return "", ""
        d = df.at[idx, "Date"]
        if hasattr(d, "strftime"):
            d = d.strftime("%Y-%m-%d")
        return d, df.at[idx, "Close"]

    exclusive_rows = []
    for rsi_idx in rsi_indices:
        first_entry_after_rsi = _first_entry_after_rsi(df, rsi_idx)
        entry_idx = (
            first_entry_after_rsi["entry_idx"] if first_entry_after_rsi else None
        )
        exit_idx = (
            _first_exit_after_entry(df, entry_idx) if entry_idx is not None else None
        )

        bb_pass = entry_idx is not None
        valley_pass = entry_idx is not None
        exit_pass = exit_idx is not None

        rsi_date, rsi_close_val = _date_close(rsi_idx)
        entry_date, entry_close_val = _date_close(entry_idx)
        exit_date, exit_close_val = _date_close(exit_idx)

        # 1st_RSI
        rsi_at = f"At_Detect_Rsi: {df.at[rsi_idx, 'RSI']:.2f}"
        p = rsi_idx - 1
        prev_date = f"Prev_Date: {df.at[p, 'Date'].strftime('%Y-%m-%d')}"
        prev_close = f"Prev_Close: {df.at[p, 'Close']:.2f}"
        prev_rsi = f"Prev_Rsi: {df.at[p, 'RSI']:.2f}"

        # 2nd_BB
        if entry_idx is not None:
            walk_period = df.iloc[rsi_idx : entry_idx + 1]
            walk_days = len(walk_period)
            close_ratio = (walk_period["Close"] <= walk_period["BB_Lower"]).mean()
            touch_ratio = (walk_period["Low"] <= walk_period["BB_Lower"]).mean()
            bb_walk = (
                f"BBWalk: {walk_days}d "
                f"close={close_ratio:.0%} touch={touch_ratio:.0%}"
            )
        else:
            bb_walk = "BBWalk: N/A"

        # 3rd_VALLEY
        if entry_idx is not None:
            low_shadow_pct = (
                (df.at[entry_idx, "Close"] - df.at[entry_idx, "Low"])
                / df.at[entry_idx, "Low"]
                * 100
            )
            lower_shadow = f"LowerShadowPct: {low_shadow_pct:.2f}%"
        else:
            lower_shadow = "LowerShadowPct: N/A"

        avg_v = df.at[entry_idx, "Avg_Volume"] if entry_idx is not None else 0
        vol_ratio = (
            f"VolRatio: {df.at[entry_idx, 'Volume'] / avg_v:.2f}"
            if entry_idx is not None
            else "VolRatio: N/A"
        )

        if entry_idx is not None and entry_idx > rsi_idx:
            rebound = abs(df.at[entry_idx + 1, "Close"] - df.at[entry_idx, "Low"])
            decline_per_day = abs(df.at[entry_idx, "Low"] - df.at[rsi_idx, "Low"]) / (
                entry_idx - rsi_idx
            )
            slope_ratio = f"SlopeRatio: {rebound / (decline_per_day or 1):.2f}"
        else:
            slope_ratio = "SlopeRatio: N/A"

        # 4th_EXIT
        if exit_idx is not None:
            dist_pct = (
                df.at[exit_idx, "High"] / df.at[exit_idx, "BB_Upper_3sigma"] - 1
            ) * 100
            distance = f"DistancePct: {dist_pct:.2f}%"
        else:
            distance = "DistancePct: N/A"

        if exit_idx is not None:
            shadow_pct = (
                (df.at[exit_idx, "High"] - df.at[exit_idx, "Close"])
                / df.at[exit_idx, "High"]
                * 100
            )
            upper_shadow = f"UpperShadowPct: {shadow_pct:.2f}%"
        else:
            upper_shadow = "UpperShadowPct: N/A"

        exclusive_rows.extend(
            [
                {
                    "Ticker": ticker,
                    "Company_Name": company_name,
                    "Last_Date": latest_date,
                    "Last_Price": latest_close,
                    "Detect_Date": rsi_date,
                    "Detect_Price": rsi_close_val,
                    "Stage": "1st_RSI",
                    "Signal": "BID",
                    "extras": ["RSI crossed below 30 (oversold)", rsi_at, prev_date, prev_close, prev_rsi],
                },
                {
                    "Ticker": ticker,
                    "Company_Name": company_name,
                    "Last_Date": latest_date,
                    "Last_Price": latest_close,
                    "Detect_Date": entry_date,
                    "Detect_Price": entry_close_val,
                    "Stage": "2nd_BB",
                    "Signal": "BID" if bb_pass else "NO",
                    "extras": [
                        "close<=-2σ & low touch >=50%",
                        bb_walk,
                    ],
                },
                {
                    "Ticker": ticker,
                    "Company_Name": company_name,
                    "Last_Date": latest_date,
                    "Last_Price": latest_close,
                    "Detect_Date": entry_date,
                    "Detect_Price": entry_close_val,
                    "Stage": "3rd_VALLEY",
                    "Signal": "BID" if valley_pass else "NO",
                    "extras": [
                        "Valley+LowerShadow+VolumeSpike",
                        lower_shadow,
                        vol_ratio,
                        slope_ratio,
                    ],
                },
                {
                    "Ticker": ticker,
                    "Company_Name": company_name,
                    "Last_Date": latest_date,
                    "Last_Price": latest_close,
                    "Detect_Date": exit_date,
                    "Detect_Price": exit_close_val,
                    "Stage": "4th_EXIT",
                    "Signal": "ASK" if exit_pass else "NO",
                    "extras": [
                        "+3σ 95% reach and upper shadow",
                        distance,
                        upper_shadow,
                    ],
                },
            ]
        )

    return exclusive_rows
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?