def _first_entry_after_rsi(df: pd.DataFrame, rsi_idx: int) -> Optional[Dict[str, int]]:
entry_candidates = (
set(find_valley_points(df))
& set(find_lower_shadow_points(df))
& set(find_volume_spike_points(df))
)
rebound_set = set(find_rebound_stronger_than_decline_points(df, rsi_idx))
for i in range(rsi_idx + 1, len(df) - 1):
if i not in entry_candidates:
continue
if not find_band_walk_points(df, i):
continue
if i in rebound_set:
return {"rsi_idx": rsi_idx, "entry_idx": i}
return None
def _first_exit_after_entry(df: pd.DataFrame, entry_idx: int) -> Optional[int]:
exit_candidates = sorted(find_upper_band_exit_points(df))
for j in exit_candidates:
if j > entry_idx:
return j
return None
def analyze_market_before_stages(chart: ChartData) -> Dict[str, Any]:
base = {
"latest_date": chart.latest_date,
"latest_close": chart.latest_close,
"rsi_pass": False,
"bb_pass": False,
"valley_pass": False,
"exit_pass": False,
"rsi_idx": None,
"latest_rsi_idx": None,
"entry_idx": None,
"exit_idx": None,
"rsi_count": 0,
"entry_count": 0,
"exit_count": 0,
"reason": "",
}
df = _prepare_df(chart)
if not _has_required_price_data(df):
base["reason"] = "Not enough data"
return base
df = prepare_technical_indicators(df)
base["df"] = df
rsi_indices = find_rsi_cross_under(df, threshold=30)
base["rsi_count"] = len(rsi_indices)
if not rsi_indices:
base["reason"] = "No RSI entry"
return base
base["rsi_pass"] = True
base["latest_rsi_idx"] = rsi_indices[-1]
chosen = None
all_entries: List[int] = []
all_exits: List[int] = []
for rsi_idx in rsi_indices:
found = _first_entry_after_rsi(df, rsi_idx)
if found is None:
continue
all_entries.append(found["entry_idx"])
if chosen is None:
chosen = found
exit_idx = _first_exit_after_entry(df, found["entry_idx"])
if exit_idx is not None:
all_exits.append(exit_idx)
base["bb_pass"] = chosen is not None
base["valley_pass"] = chosen is not None
base["entry_count"] = len(all_entries)
base["exit_count"] = len(all_exits)
if chosen is not None:
base["rsi_idx"] = chosen["rsi_idx"]
base["entry_idx"] = chosen["entry_idx"]
base["exit_idx"] = _first_exit_after_entry(df, chosen["entry_idx"])
base["exit_pass"] = base["exit_idx"] is not None
if not base["exit_pass"]:
base["reason"] = "No +3sigma exit"
else:
base["reason"] = "No BB/Valley entry"
return base
def build_market_before_exclusive_rows(
ticker: str, company_name: str, chart: ChartData
) -> List[Dict[str, Any]]:
"""RSI検知1件につき4行セット(1st〜4th)を生成する。RSI未検知は空リストを返す。"""
df = _prepare_df(chart)
latest_date = chart.latest_date
latest_close = chart.latest_close
if not _has_required_price_data(df):
return []
df = prepare_technical_indicators(df)
rsi_indices = find_rsi_cross_under(df, threshold=30)
def _date_close(idx):
if idx is None:
return "", ""
d = df.at[idx, "Date"]
if hasattr(d, "strftime"):
d = d.strftime("%Y-%m-%d")
return d, df.at[idx, "Close"]
exclusive_rows = []
for rsi_idx in rsi_indices:
first_entry_after_rsi = _first_entry_after_rsi(df, rsi_idx)
entry_idx = (
first_entry_after_rsi["entry_idx"] if first_entry_after_rsi else None
)
exit_idx = (
_first_exit_after_entry(df, entry_idx) if entry_idx is not None else None
)
bb_pass = entry_idx is not None
valley_pass = entry_idx is not None
exit_pass = exit_idx is not None
rsi_date, rsi_close_val = _date_close(rsi_idx)
entry_date, entry_close_val = _date_close(entry_idx)
exit_date, exit_close_val = _date_close(exit_idx)
# 1st_RSI
rsi_at = f"At_Detect_Rsi: {df.at[rsi_idx, 'RSI']:.2f}"
p = rsi_idx - 1
prev_date = f"Prev_Date: {df.at[p, 'Date'].strftime('%Y-%m-%d')}"
prev_close = f"Prev_Close: {df.at[p, 'Close']:.2f}"
prev_rsi = f"Prev_Rsi: {df.at[p, 'RSI']:.2f}"
# 2nd_BB
if entry_idx is not None:
walk_period = df.iloc[rsi_idx : entry_idx + 1]
walk_days = len(walk_period)
close_ratio = (walk_period["Close"] <= walk_period["BB_Lower"]).mean()
touch_ratio = (walk_period["Low"] <= walk_period["BB_Lower"]).mean()
bb_walk = (
f"BBWalk: {walk_days}d "
f"close={close_ratio:.0%} touch={touch_ratio:.0%}"
)
else:
bb_walk = "BBWalk: N/A"
# 3rd_VALLEY
if entry_idx is not None:
low_shadow_pct = (
(df.at[entry_idx, "Close"] - df.at[entry_idx, "Low"])
/ df.at[entry_idx, "Low"]
* 100
)
lower_shadow = f"LowerShadowPct: {low_shadow_pct:.2f}%"
else:
lower_shadow = "LowerShadowPct: N/A"
avg_v = df.at[entry_idx, "Avg_Volume"] if entry_idx is not None else 0
vol_ratio = (
f"VolRatio: {df.at[entry_idx, 'Volume'] / avg_v:.2f}"
if entry_idx is not None
else "VolRatio: N/A"
)
if entry_idx is not None and entry_idx > rsi_idx:
rebound = abs(df.at[entry_idx + 1, "Close"] - df.at[entry_idx, "Low"])
decline_per_day = abs(df.at[entry_idx, "Low"] - df.at[rsi_idx, "Low"]) / (
entry_idx - rsi_idx
)
slope_ratio = f"SlopeRatio: {rebound / (decline_per_day or 1):.2f}"
else:
slope_ratio = "SlopeRatio: N/A"
# 4th_EXIT
if exit_idx is not None:
dist_pct = (
df.at[exit_idx, "High"] / df.at[exit_idx, "BB_Upper_3sigma"] - 1
) * 100
distance = f"DistancePct: {dist_pct:.2f}%"
else:
distance = "DistancePct: N/A"
if exit_idx is not None:
shadow_pct = (
(df.at[exit_idx, "High"] - df.at[exit_idx, "Close"])
/ df.at[exit_idx, "High"]
* 100
)
upper_shadow = f"UpperShadowPct: {shadow_pct:.2f}%"
else:
upper_shadow = "UpperShadowPct: N/A"
exclusive_rows.extend(
[
{
"Ticker": ticker,
"Company_Name": company_name,
"Last_Date": latest_date,
"Last_Price": latest_close,
"Detect_Date": rsi_date,
"Detect_Price": rsi_close_val,
"Stage": "1st_RSI",
"Signal": "BID",
"extras": ["RSI crossed below 30 (oversold)", rsi_at, prev_date, prev_close, prev_rsi],
},
{
"Ticker": ticker,
"Company_Name": company_name,
"Last_Date": latest_date,
"Last_Price": latest_close,
"Detect_Date": entry_date,
"Detect_Price": entry_close_val,
"Stage": "2nd_BB",
"Signal": "BID" if bb_pass else "NO",
"extras": [
"close<=-2σ & low touch >=50%",
bb_walk,
],
},
{
"Ticker": ticker,
"Company_Name": company_name,
"Last_Date": latest_date,
"Last_Price": latest_close,
"Detect_Date": entry_date,
"Detect_Price": entry_close_val,
"Stage": "3rd_VALLEY",
"Signal": "BID" if valley_pass else "NO",
"extras": [
"Valley+LowerShadow+VolumeSpike",
lower_shadow,
vol_ratio,
slope_ratio,
],
},
{
"Ticker": ticker,
"Company_Name": company_name,
"Last_Date": latest_date,
"Last_Price": latest_close,
"Detect_Date": exit_date,
"Detect_Price": exit_close_val,
"Stage": "4th_EXIT",
"Signal": "ASK" if exit_pass else "NO",
"extras": [
"+3σ 95% reach and upper shadow",
distance,
upper_shadow,
],
},
]
)
return exclusive_rows
Register as a new user and use Qiita more conveniently
- You get articles that match your needs
- You can efficiently read back useful information
- You can use dark theme