import os import yfinance as yf import pandas as pd import numpy as np import feedparser from bs4 import BeautifulSoup from statsmodels.tsa.regime_switching.markov_regression import MarkovRegression from scipy.stats import norm, t from datetime import datetime, timedelta class RiskEngine: def __init__(self, symbol="ES=F"): self.symbol = symbol self.lookback_days = 250 def get_market_data(self): """Fetch historical ES futures data from Yahoo Finance.""" end_date = datetime.now() start_date = end_date - timedelta(days=self.lookback_days) data = yf.download(self.symbol, start=start_date, end=end_date) if data.empty: raise ValueError("No market data fetched.") # Calculate daily log returns # Handle potential multi-index if symbol is a single string but yfinance returns multi-index if isinstance(data.columns, pd.MultiIndex): close_col = ('Close', self.symbol) if ( 'Close', self.symbol) in data.columns else data.columns[0] close_data = data[close_col] else: close_data = data['Close'] log_returns = np.log(close_data / close_data.shift(1)) processed_data = pd.DataFrame({ 'Close': close_data, 'Log_Returns': log_returns }) return processed_data.dropna() def get_sentiment(self): """Collect sentiment from Google News RSS.""" rss_url = f"https://news.google.com/rss/search?q={self.symbol}+futures+stock+market&hl=en-US&gl=US&ceid=US:en" feed = feedparser.parse(rss_url) positive_words = {'bull', 'rally', 'surge', 'growth', 'positive', 'gain', 'strong', 'uptrend', 'recovery', 'high'} negative_words = {'bear', 'crash', 'plunge', 'recession', 'negative', 'drop', 'weak', 'downtrend', 'risk', 'low', 'crisis'} scores = [] for entry in feed.entries[:20]: # Last 20 headlines headline = entry.title.lower() p_count = sum(1 for w in positive_words if w in headline) n_count = sum(1 for w in negative_words if w in headline) score = (p_count - n_count) / (p_count + n_count + 1) scores.append(score) return np.mean(scores) if scores else 0.0 def fit_markov_regime(self, data): """Model daily return dynamics with a 2-state Markov transition framework.""" # 0: Low Vol, 1: High Vol/Bearish model = MarkovRegression(data['Log_Returns'], k_regimes=2, trend='c', switching_variance=True) res = model.fit(disp=False) # Latest regime probability current_regime = 0 if res.smoothed_marginal_probabilities[0].iloc[-1] > 0.5 else 1 # Regime parameters regime_params = { 'mu': res.params[['const[0]', 'const[1]']].values, 'sigma': np.sqrt(res.params[['sigma2[0]', 'sigma2[1]']].values) } return current_regime, regime_params def run_simulation(self): """Main entry point to run the risk simulation.""" data = self.get_market_data() sentiment = self.get_sentiment() regime, params = self.fit_markov_regime(data) # Sentiment adjustment # Sentiment (negative) increases volatility and jump intensity vol_adj = 1.0 - (sentiment * 0.5) # If sentiment is -1, vol_adj is 1.5 current_close = float(data['Close'].iloc[-1]) mu = float(params['mu'][regime]) sigma = float(params['sigma'][regime] * vol_adj) # Monte Carlo Simulation (Intraday - 100 steps for a day) n_paths = 5000 n_steps = 100 dt = 1.0 / n_steps # Fat-tailed moves (Student's t-distribution) df = 5 # Degrees of freedom for fat tails shocks = t.rvs(df, size=(n_paths, n_steps)) * sigma * np.sqrt(dt) paths = np.zeros((n_paths, n_steps + 1)) paths[:, 0] = current_close for t_step in range(1, n_steps + 1): paths[:, t_step] = paths[:, t_step - 1] * np.exp((mu - 0.5 * sigma**2) * dt + shocks[:, t_step - 1]) # Metrics intraday_lows = np.min(paths, axis=1) expected_low = np.mean(intraday_lows) worst_case_5th = np.percentile(intraday_lows, 5) # Prob of 1% drawdown drawdowns = (np.min(paths, axis=1) - current_close) / current_close prob_1pct_drawdown = np.mean(drawdowns <= -0.01) * 100 # In percentage # Directional Bias & TP/SL # Bias is driven by (Sentiment + Mu) total_bias_score = sentiment * 0.3 + mu * 0.7 if total_bias_score > 0.0005: bias = "LONG" tp = current_close + (2 * sigma * current_close) sl = current_close - (1.5 * sigma * current_close) elif total_bias_score < -0.0005: bias = "SHORT" tp = current_close - (2 * sigma * current_close) sl = current_close + (1.5 * sigma * current_close) else: bias = "NEUTRAL" tp = current_close + (1 * sigma * current_close) sl = current_close - (1 * sigma * current_close) return { 'expected_low': expected_low, 'worst_case_5th': worst_case_5th, 'drawdown_prob': prob_1pct_drawdown, 'bias': bias, 'tp': tp, 'sl': sl, 'sentiment': sentiment, 'regime': regime }