169 lines
6.8 KiB
Python
169 lines
6.8 KiB
Python
import numpy as np
|
||
import pandas as pd
|
||
import yfinance as yf
|
||
from dataclasses import dataclass
|
||
from sentiment import calculate_weighted_sentiment, run_go_scraper
|
||
from datetime import datetime, timedelta
|
||
|
||
# ── Config ────────────────────────────────────────────────────────────────
|
||
|
||
Q = 0.5 # Tsallis index (q<1 amplifies fat tails)
|
||
LAMBDA_DECAY = 0.97 # Exponential decay for time weighting
|
||
WINDOW = 60 # Rolling window (bars)
|
||
N_BINS = 15 # Histogram bins
|
||
BIN_SCALE = 3.5 # Bins span ±3.5 × historical std
|
||
MULTI_WINDOWS = (30, 60, 120) # Multi-scale averaging
|
||
PARK_WEIGHT = 0.3 # 0.0 = pure Tsallis, 1.0 = pure Parkinson
|
||
MIN_CAL = 120 # Minimum bars before first prediction
|
||
|
||
# ── Parkinson Range Volatility ────────────────────────────────────────────
|
||
|
||
def _parkinson(highs: np.ndarray, lows: np.ndarray) -> np.ndarray:
|
||
n = len(highs)
|
||
out = np.full(n, np.nan)
|
||
safe_lows = np.where(lows > 0, lows, highs)
|
||
log_hl_sq = np.log(np.where(highs > safe_lows, highs / safe_lows, 1.0)) ** 2
|
||
for i in range(WINDOW, n):
|
||
out[i] = np.sqrt(np.mean(log_hl_sq[i - WINDOW:i]) / (4.0 * np.log(2.0)))
|
||
return out
|
||
|
||
def _norm(value: float, past: np.ndarray) -> float:
|
||
clean = past[~np.isnan(past)]
|
||
if len(clean) < 20 or np.std(clean) == 0:
|
||
return 0.5
|
||
z = (value - np.mean(clean)) / np.std(clean)
|
||
return float(np.clip(np.tanh(z / 2.0 + 0.5), 0.0, 1.0))
|
||
|
||
# ── Tsallis Entropy Core ─────────────────────────────────────────────────
|
||
|
||
def _weights(T: int) -> np.ndarray:
|
||
return LAMBDA_DECAY ** np.arange(T, 0, -1, dtype=np.float64)
|
||
|
||
def _weighted_probs(returns: np.ndarray, weights: np.ndarray, edges: np.ndarray) -> np.ndarray:
|
||
n_bins = len(edges) - 1
|
||
wp = np.zeros(n_bins, dtype=np.float64)
|
||
idx = np.clip(np.digitize(returns, edges) - 1, 0, n_bins - 1)
|
||
np.add.at(wp, idx, weights)
|
||
s = weights.sum()
|
||
if s > 0:
|
||
wp /= s
|
||
return wp
|
||
|
||
def _entropy(wp: np.ndarray) -> float:
|
||
nz = wp[wp > 0]
|
||
return (1.0 - np.sum(nz ** Q)) / (Q - 1.0) if len(nz) > 0 else 0.0
|
||
|
||
def _max_entropy() -> float:
|
||
return (N_BINS ** (1.0 - Q) - 1.0) / (1.0 - Q)
|
||
|
||
def _tsallis_score(returns: np.ndarray, edges: np.ndarray, window: int) -> float:
|
||
eff = returns[-window:] if len(returns) >= window else returns
|
||
wp = _weighted_probs(eff, _weights(len(eff)), edges)
|
||
mx = _max_entropy()
|
||
return float(np.clip(_entropy(wp) / mx, 0.0, 1.0)) if mx else 0.0
|
||
|
||
# ── Public API ────────────────────────────────────────────────────────────
|
||
|
||
@dataclass
|
||
class Result:
|
||
score: float # Volatility/Entropy [0, 1]
|
||
sent: float # News Sentiment [-1, 1]
|
||
trade_signal: float # sentiment * score
|
||
regime: str # LOW / MODERATE / HIGH
|
||
|
||
def get_score(ticker_symbol: str, stock_name: str, parquet_path: str, end_dt_str: str, days_lookback: int, volatility_time: int, interval: str = "1h") -> Result:
|
||
# FIX: Parse the input string correctly using the argument 'end_dt_str'
|
||
end_dt = datetime.strptime(end_dt_str, "%Y-%m-%d")
|
||
start_dt = end_dt - timedelta(days=volatility_time)
|
||
|
||
# 2. Fetch Price Data
|
||
df = yf.Ticker(ticker_symbol).history(
|
||
start=start_dt.strftime("%Y-%m-%d"),
|
||
end=end_dt.strftime("%Y-%m-%d"),
|
||
interval=interval,
|
||
auto_adjust=True
|
||
)
|
||
|
||
if df.empty:
|
||
raise ValueError(f"No data for '{ticker_symbol}'.")
|
||
|
||
df["log_return"] = np.log(df["Close"] / df["Close"].shift(1))
|
||
df = df.dropna(subset=["log_return"])
|
||
|
||
returns = df["log_return"].values
|
||
highs = df["High"].values
|
||
lows = df["Low"].values
|
||
|
||
if len(returns) < MIN_CAL:
|
||
raise ValueError(f"Need {MIN_CAL} bars, got {len(returns)}.")
|
||
|
||
# 3. Calculate Tsallis Volatility
|
||
r = returns[:-1].astype(np.float64)
|
||
half = BIN_SCALE * np.std(r)
|
||
edges = np.linspace(np.mean(r) - half, np.mean(r) + half, N_BINS + 1)
|
||
ts = [_tsallis_score(returns, edges, w) for w in MULTI_WINDOWS if len(returns) >= w]
|
||
tsallis = float(np.mean(ts)) if ts else 0.5
|
||
|
||
# 4. Calculate Parkinson Volatility
|
||
park = 0.5
|
||
if PARK_WEIGHT > 0:
|
||
pv = _parkinson(highs, lows)
|
||
if not np.isnan(pv[-1]):
|
||
park = _norm(pv[-1], pv[:-1])
|
||
|
||
# 5. Final Vol Score & Sentiment Integration
|
||
vol_score = float(np.clip((1.0 - PARK_WEIGHT) * tsallis + PARK_WEIGHT * park, 0.0, 1.0))
|
||
|
||
# Pass correct variables to sentiment function
|
||
sent = calculate_weighted_sentiment(parquet_path, end_date_str=end_dt_str, num_days=days_lookback)
|
||
|
||
trade_signal = sent * vol_score
|
||
regime = "LOW" if vol_score < 0.3 else "MODERATE" if vol_score < 0.6 else "HIGH"
|
||
|
||
return Result(
|
||
score=vol_score,
|
||
sent=sent,
|
||
trade_signal=trade_signal,
|
||
regime=regime
|
||
)
|
||
|
||
# ── Execution ─────────────────────────────────────────────────────────────
|
||
|
||
if __name__ == "__main__":
|
||
# 1. Define targets dynamically
|
||
target_ticker = str(input("Stock ticker: ")) # Ticker Symbol
|
||
target_stock = str(input("Stock name: ")) # Search keyword for headlines
|
||
lookback = int(input("Sentiment lookback: "))
|
||
run_date = (datetime.now()).strftime("%Y-%m-%d")
|
||
volatility_lookback = int(input("Volatility lookback: "))
|
||
|
||
# 2. Run the updated Go Scraper (Now with 5 arguments)
|
||
run_go_scraper("yahooscrape.go", run_date, lookback, target_ticker, target_stock)
|
||
|
||
# 3. Run Analysis
|
||
try:
|
||
res = get_score(
|
||
ticker_symbol=target_ticker,
|
||
stock_name=target_stock,
|
||
parquet_path="mentions.parquet",
|
||
end_dt_str=run_date,
|
||
days_lookback=lookback,
|
||
volatility_time=volatility_lookback
|
||
)
|
||
|
||
print(f"\n--- {target_ticker} ({res.regime} VOLATILITY) ---")
|
||
print(f"Vol Score: {res.score:.4f}")
|
||
print(f"Sentiment: {res.sent:.4f}")
|
||
print(f"Trade Signal: {res.trade_signal:.4f}")
|
||
|
||
# Verdict logic
|
||
if res.score < 0.3:
|
||
print("Verdict: SIT OUT (Low Activity)")
|
||
elif res.score > 0.6:
|
||
action = "BUY" if res.trade_signal > 0 else "SELL"
|
||
print(f"Verdict: FULL CONVICTION {action}")
|
||
else:
|
||
print("Verdict: MODERATE (Monitor/Small Position)")
|
||
|
||
except Exception as e:
|
||
print(f"Pipeline Error: {e}") |