136 lines
5.4 KiB
Python
136 lines
5.4 KiB
Python
import os
|
|
import yfinance as yf
|
|
import pandas as pd
|
|
import numpy as np
|
|
import feedparser
|
|
from bs4 import BeautifulSoup
|
|
from statsmodels.tsa.regime_switching.markov_regression import MarkovRegression
|
|
from scipy.stats import norm, t
|
|
from datetime import datetime, timedelta
|
|
|
|
class RiskEngine:
|
|
def __init__(self, symbol="ES=F"):
|
|
self.symbol = symbol
|
|
self.lookback_days = 250
|
|
|
|
def get_market_data(self):
|
|
"""Fetch historical ES futures data from Yahoo Finance."""
|
|
end_date = datetime.now()
|
|
start_date = end_date - timedelta(days=self.lookback_days)
|
|
data = yf.download(self.symbol, start=start_date, end=end_date)
|
|
if data.empty:
|
|
raise ValueError("No market data fetched.")
|
|
|
|
# Calculate daily log returns
|
|
# Handle potential multi-index if symbol is a single string but yfinance returns multi-index
|
|
if isinstance(data.columns, pd.MultiIndex):
|
|
close_col = ('Close', self.symbol) if ( 'Close', self.symbol) in data.columns else data.columns[0]
|
|
close_data = data[close_col]
|
|
else:
|
|
close_data = data['Close']
|
|
|
|
log_returns = np.log(close_data / close_data.shift(1))
|
|
|
|
processed_data = pd.DataFrame({
|
|
'Close': close_data,
|
|
'Log_Returns': log_returns
|
|
})
|
|
return processed_data.dropna()
|
|
|
|
def get_sentiment(self):
|
|
"""Collect sentiment from Google News RSS."""
|
|
rss_url = f"https://news.google.com/rss/search?q={self.symbol}+futures+stock+market&hl=en-US&gl=US&ceid=US:en"
|
|
feed = feedparser.parse(rss_url)
|
|
|
|
positive_words = {'bull', 'rally', 'surge', 'growth', 'positive', 'gain', 'strong', 'uptrend', 'recovery', 'high'}
|
|
negative_words = {'bear', 'crash', 'plunge', 'recession', 'negative', 'drop', 'weak', 'downtrend', 'risk', 'low', 'crisis'}
|
|
|
|
scores = []
|
|
for entry in feed.entries[:20]: # Last 20 headlines
|
|
headline = entry.title.lower()
|
|
p_count = sum(1 for w in positive_words if w in headline)
|
|
n_count = sum(1 for w in negative_words if w in headline)
|
|
score = (p_count - n_count) / (p_count + n_count + 1)
|
|
scores.append(score)
|
|
|
|
return np.mean(scores) if scores else 0.0
|
|
|
|
def fit_markov_regime(self, data):
|
|
"""Model daily return dynamics with a 2-state Markov transition framework."""
|
|
# 0: Low Vol, 1: High Vol/Bearish
|
|
model = MarkovRegression(data['Log_Returns'], k_regimes=2, trend='c', switching_variance=True)
|
|
res = model.fit(disp=False)
|
|
|
|
# Latest regime probability
|
|
current_regime = 0 if res.smoothed_marginal_probabilities[0].iloc[-1] > 0.5 else 1
|
|
|
|
# Regime parameters
|
|
regime_params = {
|
|
'mu': res.params[['const[0]', 'const[1]']].values,
|
|
'sigma': np.sqrt(res.params[['sigma2[0]', 'sigma2[1]']].values)
|
|
}
|
|
|
|
return current_regime, regime_params
|
|
|
|
def run_simulation(self):
|
|
"""Main entry point to run the risk simulation."""
|
|
data = self.get_market_data()
|
|
sentiment = self.get_sentiment()
|
|
regime, params = self.fit_markov_regime(data)
|
|
|
|
# Sentiment adjustment
|
|
# Sentiment (negative) increases volatility and jump intensity
|
|
vol_adj = 1.0 - (sentiment * 0.5) # If sentiment is -1, vol_adj is 1.5
|
|
current_close = float(data['Close'].iloc[-1])
|
|
mu = float(params['mu'][regime])
|
|
sigma = float(params['sigma'][regime] * vol_adj)
|
|
|
|
# Monte Carlo Simulation (Intraday - 100 steps for a day)
|
|
n_paths = 5000
|
|
n_steps = 100
|
|
dt = 1.0 / n_steps
|
|
|
|
# Fat-tailed moves (Student's t-distribution)
|
|
df = 5 # Degrees of freedom for fat tails
|
|
shocks = t.rvs(df, size=(n_paths, n_steps)) * sigma * np.sqrt(dt)
|
|
paths = np.zeros((n_paths, n_steps + 1))
|
|
paths[:, 0] = current_close
|
|
|
|
for t_step in range(1, n_steps + 1):
|
|
paths[:, t_step] = paths[:, t_step - 1] * np.exp((mu - 0.5 * sigma**2) * dt + shocks[:, t_step - 1])
|
|
|
|
# Metrics
|
|
intraday_lows = np.min(paths, axis=1)
|
|
expected_low = np.mean(intraday_lows)
|
|
worst_case_5th = np.percentile(intraday_lows, 5)
|
|
|
|
# Prob of 1% drawdown
|
|
drawdowns = (np.min(paths, axis=1) - current_close) / current_close
|
|
prob_1pct_drawdown = np.mean(drawdowns <= -0.01) * 100 # In percentage
|
|
|
|
# Directional Bias & TP/SL
|
|
# Bias is driven by (Sentiment + Mu)
|
|
total_bias_score = sentiment * 0.3 + mu * 0.7
|
|
if total_bias_score > 0.0005:
|
|
bias = "LONG"
|
|
tp = current_close + (2 * sigma * current_close)
|
|
sl = current_close - (1.5 * sigma * current_close)
|
|
elif total_bias_score < -0.0005:
|
|
bias = "SHORT"
|
|
tp = current_close - (2 * sigma * current_close)
|
|
sl = current_close + (1.5 * sigma * current_close)
|
|
else:
|
|
bias = "NEUTRAL"
|
|
tp = current_close + (1 * sigma * current_close)
|
|
sl = current_close - (1 * sigma * current_close)
|
|
|
|
return {
|
|
'expected_low': expected_low,
|
|
'worst_case_5th': worst_case_5th,
|
|
'drawdown_prob': prob_1pct_drawdown,
|
|
'bias': bias,
|
|
'tp': tp,
|
|
'sl': sl,
|
|
'sentiment': sentiment,
|
|
'regime': regime
|
|
} |