37172-vm/algo_trader/core/strategy_engine.py
2025-12-27 04:26:41 +00:00

123 lines
4.6 KiB
Python

"""Core: Strategy orchestration and signal generation."""
from loguru import logger
from PySide6.QtCore import QObject, Signal
class StrategyEngine(QObject):
"""Hosts and executes trading strategies, generating signals from market data."""
class Signals(QObject):
strategy_loaded = Signal(str)
strategy_started = Signal(str)
strategy_stopped = Signal(str)
trade_signal = Signal(dict) # Emits trade signals for potential orders
def __init__(self, order_manager):
"""
Initializes the StrategyEngine.
Args:
order_manager: The OrderManager instance to send signals to.
"""
super().__init__()
self.order_manager = order_manager
self.strategy = None
self.strategy_name = None
self.last_price = None # For our simple demo strategy
self._is_running = False
self.signals = self.Signals()
def load_strategy(self, strategy_name: str):
"""
Loads a trading strategy. For now, we use a simple hardcoded one.
In the future, this will dynamically load strategy modules.
"""
if strategy_name == "simple_crossover":
self.strategy = self._simple_crossover_strategy
self.strategy_name = strategy_name
logger.info(f"Loaded strategy: {strategy_name}")
self.signals.strategy_loaded.emit(strategy_name)
else:
logger.error(f"Strategy '{strategy_name}' not found.")
def start_strategy(self):
"""
Starts the loaded strategy.
"""
if self.strategy and not self._is_running:
self._is_running = True
logger.info(f"Strategy '{self.strategy_name}' started.")
self.signals.strategy_started.emit(self.strategy_name)
elif not self.strategy:
logger.warning("No strategy loaded to start.")
else:
logger.warning("Strategy is already running.")
def stop_strategy(self):
"""
Stops the running strategy.
"""
if self._is_running:
self._is_running = False
logger.info(f"Strategy '{self.strategy_name}' stopped.")
self.signals.strategy_stopped.emit(self.strategy_name)
else:
logger.warning("No strategy is currently running.")
async def on_tick(self, tick: dict):
"""
Called by the DataFeed on every new market data tick.
Executes the loaded strategy.
Args:
tick (dict): The tick data from the data feed.
"""
if not self.strategy or not self._is_running:
return
# The tick format from FYERS WebSocket is a list of dicts
# For simplicity, we assume we get one instrument's data at a time
if isinstance(tick, list) and tick:
tick = tick[0]
ltp = tick.get("ltp")
symbol = tick.get("symbol")
if ltp and symbol:
await self.strategy(symbol, ltp)
async def _simple_crossover_strategy(self, symbol: str, current_price: float):
"""
A very basic placeholder strategy.
- If price goes up, BUY.
- If price goes down, SELL.
- Acts only on the first price change.
This is for demonstration and is NOT a profitable strategy.
"""
if self.last_price is None:
self.last_price = current_price
logger.info(f"Starting price for {symbol} is {current_price}. Waiting for change.")
return
# Only trade once for this demo
if abs(self.last_price - current_price) > 0.1: # Threshold to act
trade_info = {"symbol": symbol, "current_price": current_price}
if current_price > self.last_price:
logger.warning(f"Price increased: {self.last_price} -> {current_price}. Sending BUY signal.")
trade_info["side"] = "BUY"
await self.order_manager.place_order(
symbol=symbol, side="BUY", current_price=current_price, quantity=1, order_type="MARKET"
)
elif current_price < self.last_price:
logger.warning(f"Price decreased: {self.last_price} -> {current_price}. Sending SELL signal.")
trade_info["side"] = "SELL"
await self.order_manager.place_order(
symbol=symbol, side="SELL", current_price=current_price, quantity=1, order_type="MARKET"
)
self.signals.trade_signal.emit(trade_info)
# Stop further trading in this simple example
self.stop_strategy()
logger.info("Demo trade placed. Strategy deactivated.")