Compare commits

...

2 Commits

Author SHA1 Message Date
Flatlogic Bot
c3681ec74b Auto commit: 2025-12-27T04:26:41.696Z 2025-12-27 04:26:41 +00:00
Flatlogic Bot
5836c16118 Auto commit: 2025-12-27T04:12:50.711Z 2025-12-27 04:12:50 +00:00
25 changed files with 1588 additions and 0 deletions

5
.gitignore vendored
View File

@ -1,3 +1,8 @@
node_modules/
*/node_modules/
*/build/
# Ignore environment files and access tokens
*.env
access_token.txt
algo_trader/storage/trader.db

46
algo_trader/README.md Normal file
View File

@ -0,0 +1,46 @@
# Professional-Grade Trading Application
## Overview
This is a private, professional-grade desktop trading application for personal use. It is built with Python and PySide6 for the UI.
## Features
- Intraday F&O trading (Index + Stocks)
- Swing / compounding equity trading
- Strict risk management
- Strategy-based execution
- Real-time monitoring
- Manual override & safety controls
## Tech Stack
- **Language:** Python 3.10+
- **UI Framework:** PySide6 (Qt)
- **Charts:** pyqtgraph
- **Database:** SQLite
- **Async:** asyncio + threading
- **Broker:** FYERS API (v3)
## Setup
1. **Install dependencies:**
```bash
pip install -r requirements.txt
```
2. **Configure credentials:**
Copy `config/credentials.env.example` to `config/credentials.env` and fill in your FYERS API credentials.
3. **Run the application:**
```bash
python algo_trader/app.py
```
## ⚠️ Warnings ⚠️
- **This is not a toy.** This is a trading application that can execute real trades with real money.
- **Use at your own risk.** The author is not responsible for any financial losses.
- **Paper trade first.** Always test your strategies in a paper trading environment before going live.
- **No auto-start.** The application will not start automatically. You must manually start it.
- **No unattended trading.** Do not leave the application running unattended.

146
algo_trader/app.py Normal file
View File

@ -0,0 +1,146 @@
import sys
import asyncio
import yaml
from PySide6.QtWidgets import QApplication
from sqlalchemy.orm import sessionmaker
from ui.main_window import MainWindow
from core.broker import FyersBroker
from core.data_feed import FyersDataFeed
from storage.database import get_engine, init_db
from core.order_manager import OrderManager
from core.strategy_engine import StrategyEngine
from core.risk_manager import RiskManager
from core.portfolio import Portfolio
from utils.logger import app_logger as logger # Import the global app_logger
async def run_backend(
broker,
data_feed,
db_session,
order_manager,
risk_manager,
strategy_engine,
settings,
portfolio,
):
try:
# --- Broker and Core Components Initialization ---
access_token = await broker.get_access_token()
if access_token and broker.fyers:
profile = await broker.get_profile()
logger.info(f"Broker Profile: {profile}")
# Now set the order_manager in risk_manager
risk_manager.order_manager = order_manager
# --- Connect and Run ---
ws_access_token = f"{broker.client_id}:{access_token}"
await data_feed.connect(ws_access_token)
# Load strategy and subscribe to symbols required by it
strategy_engine.load_strategy("simple_crossover")
await data_feed.subscribe(["NSE:SBIN-EQ"])
logger.info("Auto-trading strategy is now running. Waiting for signals...")
# Keep the application running to listen for ticks
# The UI event loop will keep the application alive
# asyncio.sleep(30) is removed as UI will manage lifetime
else:
logger.error(
"Broker or token initialization failed. Check credentials and logs."
)
except Exception as e:
logger.opt(exception=True).critical(
f"An error occurred during backend runtime: {e}"
)
finally:
# Cleanup should be handled by the UI's close event or a dedicated shutdown
logger.info("Backend run function finished.")
async def main():
"""Main function to run the application."""
try:
with open("config/settings.yaml", "r") as f:
settings = yaml.safe_load(f)
except FileNotFoundError:
print("Error: settings.yaml not found. Please ensure the file exists.")
sys.exit(1)
except yaml.YAMLError as e:
print(f"Error parsing settings.yaml: {e}")
sys.exit(1)
logger.info(f"Starting {settings['app']['name']} v{settings['app']['version']}")
# --- Database Initialization ---
db_session = None
try:
db_path = settings["database"]["path"]
engine = get_engine(db_path)
init_db(engine)
Session = sessionmaker(bind=engine)
db_session = Session()
logger.success("Database initialized successfully.")
except Exception as e:
logger.opt(exception=True).critical(
f"An error occurred during database initialization: {e}"
)
sys.exit(1)
broker = None
data_feed = None
order_manager = None
risk_manager = None
strategy_engine = None
portfolio = None
try:
broker = FyersBroker(settings=settings["fyers"])
risk_manager = RiskManager(order_manager=None, settings=settings, logger=logger)
order_manager = OrderManager(
broker=broker, db_session=db_session, risk_manager=risk_manager
)
strategy_engine = StrategyEngine(order_manager=order_manager)
data_feed = FyersDataFeed(
fyers=broker.fyers,
)
portfolio = Portfolio(db_session=db_session)
# Launch UI and pass backend components
app = QApplication(sys.argv)
main_window = MainWindow(
broker=broker,
data_feed=data_feed,
db_session=db_session,
order_manager=order_manager,
risk_manager=risk_manager,
strategy_engine=strategy_engine,
settings=settings,
backend_runner=run_backend,
app_logger=logger, # Pass the global app_logger instance
portfolio=portfolio,
)
main_window.show()
logger.info("Application UI started.")
# Run the Qt event loop in a separate thread or integrate with asyncio
# For simplicity, we'll let QApplication manage the main thread
# and run the backend in the asyncio event loop managed by a QThread or similar
# For now, simply run the Qt app. The backend needs to be started
# from within the UI or via a separate asyncio loop.
sys.exit(app.exec())
except Exception as e:
logger.opt(exception=True).critical(f"An error occurred: {e}")
finally:
if db_session:
db_session.close()
logger.info("Application shutdown complete.")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,4 @@
# FYERS API Credentials
FYERS_APP_ID=""
FYERS_SECRET_KEY=""
FYERS_REDIRECT_URI=""

View File

@ -0,0 +1,21 @@
app:
name: "AlgoTrader"
version: "0.1.0"
fyers:
credentials_path: "config/credentials.env"
access_token_path: "access_token.txt"
logging:
level: "INFO"
file: "logs/app.log"
database:
path: "storage/trader.db"
risk:
max_daily_loss_percentage: 0.01
default_position_size: 10
stop_loss_percentage: 0.005
take_profit_percentage: 0.01
risk_per_trade_percentage: 0.005

125
algo_trader/core/broker.py Normal file
View File

@ -0,0 +1,125 @@
import os
import webbrowser
from dotenv import load_dotenv
from fyers_api import fyersModel, accessToken
from loguru import logger
class FyersBroker:
"""
Handles all interactions with the Fyers API.
"""
def __init__(self, settings: dict):
"""
Initializes the FyersBroker.
Args:
settings (dict): Application settings containing API credentials.
"""
load_dotenv(dotenv_path=settings['credentials_path'])
self.client_id = os.getenv('FYERS_CLIENT_ID')
self.secret_key = os.getenv('FYERS_SECRET_KEY')
self.redirect_uri = os.getenv('FYERS_REDIRECT_URI')
self.access_token_path = settings['access_token_path']
self.fyers = None
self.access_token = self._load_access_token()
if not self.access_token:
self._generate_new_access_token()
self._initialize_fyers_model()
def _load_access_token(self) -> str | None:
"""Loads access token from the specified file."""
try:
if os.path.exists(self.access_token_path):
with open(self.access_token_path, 'r') as f:
return f.read().strip()
return None
except Exception as e:
logger.error(f"Error loading access token: {e}")
return None
def _save_access_token(self, token: str):
"""Saves access token to the specified file."""
try:
with open(self.access_token_path, 'w') as f:
f.write(token)
logger.info("Access token saved successfully.")
except Exception as e:
logger.error(f"Error saving access token: {e}")
def _generate_new_access_token(self):
"""Generates a new access token using the auth code flow."""
session = accessToken.SessionModel(
client_id=self.client_id,
secret_key=self.secret_key,
redirect_uri=self.redirect_uri,
response_type='code',
grant_type='authorization_code'
)
try:
response = session.generate_authcode()
logger.info(f"Auth code generation response: {response}")
print(f"Login URL: {response}")
webbrowser.open(response)
auth_code = input("Enter the auth code from the redirected URL: ")
session.set_token(auth_code)
access_token_response = session.generate_token()
self.access_token = access_token_response['access_token']
self._save_access_token(self.access_token)
logger.info("New access token generated and saved.")
except Exception as e:
logger.error(f"Error generating new access token: {e}")
self.access_token = None
def _initialize_fyers_model(self):
"""Initializes the FyersModel with the access token."""
if self.access_token:
try:
self.fyers = fyersModel.FyersModel(
client_id=self.client_id,
is_async=True,
token=self.access_token,
log_path=os.path.join(os.path.dirname(__file__), '..' , 'logs')
)
logger.info("FyersModel initialized successfully.")
except Exception as e:
logger.error(f"Error initializing FyersModel: {e}")
self.fyers = None
else:
logger.warning("FyersModel could not be initialized. Access token is missing.")
async def get_profile(self):
"""Fetches user profile details."""
if not self.fyers:
return {"error": "FyersModel not initialized."}
try:
response = await self.fyers.get_profile()
if response['s'] == 'ok':
return response['data']
else:
logger.error(f"Error fetching profile: {response['message']}")
return {"error": response['message']}
except Exception as e:
logger.error(f"Exception fetching profile: {e}")
return {"error": str(e)}
async def get_funds(self):
"""Fetches user funds details."""
if not self.fyers:
return {"error": "FyersModel not initialized."}
try:
response = await self.fyers.get_funds()
if response['s'] == 'ok':
return response['fund_limit']
else:
logger.error(f"Error fetching funds: {response['message']}")
return {"error": response['message']}
except Exception as e:
logger.error(f"Exception fetching funds: {e}")
return {"error": str(e)}

View File

@ -0,0 +1,94 @@
"""Core: WebSocket market data feed."""
import asyncio
from PySide6.QtCore import QObject, Signal
from typing import Any
from fyers_api_sdk.fyers_async import FyersAsync
from loguru import logger
class FyersDataFeed(QObject):
"""Handles WebSocket connection for real-time data."""
class Signals(QObject):
connected = Signal()
disconnected = Signal()
error = Signal(str)
tick = Signal(dict)
def __init__(
self,
fyers: FyersAsync,
):
"""
Initializes the FyersDataFeed.
Args:
fyers: An authenticated FyersAsync instance.
"""
super().__init__()
self._fyers = fyers
self.websocket = None
self.signals = self.Signals()
async def connect(self, access_token: str, data_type: str = "symbolData"):
"""Establishes a WebSocket connection."""
logger.info("Connecting to Fyers WebSocket...")
try:
self.websocket = self._fyers.fyers_market_socket(
access_token=access_token,
log_path="", # Disable log file generation
on_connect=lambda: asyncio.create_task(self._handle_connect()),
on_close=lambda: asyncio.create_task(self._handle_close()),
on_error=lambda msg: asyncio.create_task(self._handle_error(msg)),
on_message=lambda msg: asyncio.create_task(self._handle_tick(msg)),
)
await asyncio.to_thread(self.websocket.connect)
logger.success("Fyers WebSocket connected.")
except Exception as e:
logger.error(f"Failed to connect to WebSocket: {e}")
self.signals.error.emit(str(e))
async def subscribe(self, symbols: list[str]):
"""Subscribes to a list of symbols."""
if self.websocket:
logger.info(f"Subscribing to symbols: {symbols}")
request = {"T": "SUB_L2", "L2_T": "T", "V": symbols}
self.websocket.send_message(request)
else:
logger.warning("WebSocket not connected. Cannot subscribe.")
async def unsubscribe(self, symbols: list[str]):
"""Unsubscribes from a list of symbols."""
if self.websocket:
logger.info(f"Unsubscribing from symbols: {symbols}")
request = {"T": "UNSUB_L2", "V": symbols}
self.websocket.send_message(request)
else:
logger.warning("WebSocket not connected. Cannot unsubscribe.")
async def close(self):
"""Closes the WebSocket connection."""
if self.websocket:
logger.info("Closing WebSocket connection.")
self.websocket.close()
async def _handle_connect(self):
"""Internal handler for connect events."""
logger.info("WebSocket connection established.")
self.signals.connected.emit()
async def _handle_close(self):
"""Internal handler for close events."""
logger.info("WebSocket connection closed.")
self.signals.disconnected.emit()
async def _handle_error(self, message: str):
"""Internal handler for error events."""
logger.error(f"WebSocket error: {message}")
self.signals.error.emit(message)
async def _handle_tick(self, message: dict):
"""Internal handler for tick events."""
logger.debug(f"Received tick: {message}")
self.signals.tick.emit(message)

View File

@ -0,0 +1,163 @@
"""Core: Order execution and management."""
import asyncio
from datetime import datetime
from typing import Dict, Any
from loguru import logger
from PySide6.QtCore import QObject, Signal
from storage.models import Order
from .risk_manager import RiskManager
class OrderManager(QObject):
"""Handles order creation, tracking, and lifecycle management."""
class Signals(QObject):
order_updated = Signal(Order)
def __init__(self, broker, db_session, risk_manager: RiskManager):
"""
Initializes the OrderManager.
Args:
broker: The broker instance to execute trades.
db_session: The database session for persistence.
risk_manager: The RiskManager instance for risk validation.
"""
super().__init__()
self.broker = broker
self.db_session = db_session
self.risk_manager = risk_manager
self.orders = {} # In-memory cache for active orders
self.signals = self.Signals()
async def place_order(
self,
symbol: str,
side: str,
current_price: float,
quantity: int,
order_type: str,
limit_price: float = None,
stop_price: float = None,
is_async: bool = True,
) -> Dict[str, Any]:
"""
Places a new order with the broker and stores it in the database.
Args:
symbol (str): The symbol to trade (e.g., 'NSE:SBIN-EQ').
side (str): 'BUY' or 'SELL'.
quantity (int): The number of shares.
order_type (str): 'LIMIT', 'MARKET', 'STOP', etc.
limit_price (float, optional): The limit price for LIMIT orders.
stop_price (float, optional): The stop price for STOP orders.
is_async (bool, optional): FYERS specific flag. Defaults to True.
Returns:
dict: The response from the broker.
"""
logger.info(
f"Placing {side} order for {quantity} {symbol} at {order_type}."
)
order_data_for_validation = {
"symbol": symbol,
"side": side,
"quantity": quantity,
"order_type": order_type,
"limit_price": limit_price,
"stop_price": stop_price,
"current_price": current_price, # Add current price for risk manager
}
if not self.risk_manager.validate_order(order_data_for_validation):
return {"error": "Order rejected by risk manager"}
# Use adjusted quantity, stop_loss_price, take_profit_price from risk manager
adjusted_quantity = order_data_for_validation["quantity"]
adjusted_stop_loss_price = order_data_for_validation.get("stop_loss_price")
adjusted_take_profit_price = order_data_for_validation.get("take_profit_price")
try:
order_data = {
"symbol": symbol,
"qty": adjusted_quantity,
"type": self._map_order_type(order_type),
"side": self._map_side(side),
"productType": "INTRADAY", # Or CNC, MARGIN, etc.
"limitPrice": adjusted_take_profit_price if adjusted_take_profit_price else 0, # Use take profit as limit for now
"stopPrice": adjusted_stop_loss_price if adjusted_stop_loss_price else 0,
"validity": "DAY",
"disclosedQty": 0,
"offlineOrder": "False",
}
# The fyers_sdk places orders synchronously, but we run it in an executor
loop = asyncio.get_running_loop()
broker_response = await loop.run_in_executor(
None, self.broker.place_order, order_data
)
if broker_response and broker_response.get("s") == "ok":
order_id = broker_response.get("id")
logger.success(f"Order placed successfully. Broker ID: {order_id}")
await self._store_order(
order_id, symbol, side, quantity, order_type, limit_price
)
return broker_response
else:
error_msg = broker_response.get(
"message", "Unknown error from broker"
)
logger.error(f"Failed to place order: {error_msg}")
return {"error": error_msg}
except Exception as e:
logger.error(f"Exception placing order: {e}")
return {"error": str(e)}
async def _store_order(
self, order_id, symbol, side, quantity, order_type, price
):
"""Stores the order details in the database."""
try:
new_order = Order(
broker_order_id=order_id,
symbol=symbol,
side=side,
quantity=quantity,
order_type=order_type,
price=price,
status="PENDING", # Initial status
timestamp=datetime.utcnow(),
)
self.db_session.add(new_order)
self.db_session.commit()
self.orders[order_id] = new_order # Cache it
self.signals.order_updated.emit(new_order) # Emit signal
logger.info(f"Order {order_id} stored in database.")
except Exception as e:
logger.error(f"Failed to store order {order_id}: {e}")
self.db_session.rollback()
def _map_order_type(self, order_type: str) -> int:
"""Maps internal order types to FYERS API integer codes."""
mapping = {"LIMIT": 1, "MARKET": 2, "STOP": 3, "STOP_LIMIT": 4}
return mapping.get(order_type.upper(), 2) # Default to MARKET
def _map_side(self, side: str) -> int:
"""Maps internal side to FYERS API integer codes."""
mapping = {"BUY": 1, "SELL": -1}
return mapping.get(side.upper(), 1) # Default to BUY
async def get_order_status(self, order_id: str):
"""Retrieves the status of an order from the broker."""
# Placeholder for fetching order status from broker
pass
async def cancel_order(self, order_id: str):
"""Cancels an active order."""
# Placeholder for cancelling an order
pass

View File

@ -0,0 +1,90 @@
from typing import Dict, Any
from PySide6.QtCore import QObject, Signal
from loguru import logger
from storage.models import Position
class Portfolio(QObject):
"""Manages the trading portfolio, including positions and P&L."""
class Signals(QObject):
position_updated = Signal(Position)
portfolio_value_updated = Signal(float)
def __init__(self, db_session):
super().__init__()
self.db_session = db_session
self.positions: Dict[str, Position] = {}
self.signals = self.Signals()
self._load_positions_from_db()
def _load_positions_from_db(self):
"""Loads active positions from the database on initialization."""
try:
active_positions = self.db_session.query(Position).filter_by(is_open=True).all()
for pos in active_positions:
self.positions[pos.symbol] = pos
self.signals.position_updated.emit(pos) # Emit for initial UI load
logger.info(f"Loaded {len(active_positions)} active positions from DB.")
except Exception as e:
logger.error(f"Error loading positions from DB: {e}")
def update_position(
self,
symbol: str,
quantity: int,
average_price: float,
current_price: float,
is_open: bool = True,
):
"""
Adds or updates a position in the portfolio.
"""
position = self.positions.get(symbol)
if position:
position.quantity = quantity
position.average_price = average_price
position.current_price = current_price
position.market_value = quantity * current_price
position.unrealized_pnl = (current_price - average_price) * quantity
position.is_open = is_open
else:
position = Position(
symbol=symbol,
quantity=quantity,
average_price=average_price,
current_price=current_price,
market_value=quantity * current_price,
unrealized_pnl=(current_price - average_price) * quantity,
is_open=is_open,
)
self.db_session.add(position)
self.positions[symbol] = position
try:
self.db_session.commit()
self.signals.position_updated.emit(position)
logger.info(f"Position updated for {symbol}: Qty={quantity}, AvgPrice={average_price}, CurrentPrice={current_price}")
except Exception as e:
self.db_session.rollback()
logger.error(f"Failed to save position for {symbol}: {e}")
def get_position(self, symbol: str) -> Position | None:
"""
Returns the current position for a given symbol.
"""
return self.positions.get(symbol)
def get_all_positions(self) -> Dict[str, Position]:
"""
Returns all active positions.
"""
return {s: p for s, p in self.positions.items() if p.is_open}
def calculate_total_pnl(self) -> float:
"""
Calculates the total unrealized P&L of the portfolio.
"""
total_pnl = sum(pos.unrealized_pnl for pos in self.positions.values() if pos.is_open)
self.signals.portfolio_value_updated.emit(total_pnl)
return total_pnl

View File

@ -0,0 +1,144 @@
"""Core: Risk management and capital protection."""
from typing import Dict, Any
from .order_manager import OrderManager
from ..storage.models import Order
from decimal import Decimal
class RiskManager:
def __init__(self, order_manager: OrderManager, settings: Dict[str, Any], logger: Any):
self.order_manager = order_manager
self.settings = settings.get("risk", {})
self.logger = logger
self.max_daily_loss_percentage = Decimal(str(self.settings.get("max_daily_loss_percentage", 0.01)))
self.default_position_size = self.settings.get("default_position_size", 10)
self.stop_loss_percentage = Decimal(str(self.settings.get("stop_loss_percentage", 0.005)))
self.take_profit_percentage = Decimal(str(self.settings.get("take_profit_percentage", 0.01)))
self.risk_per_trade_percentage = Decimal(str(self.settings.get("risk_per_trade_percentage", 0.005)))
self.daily_profit_loss_today = Decimal('0.0') # Track daily P&L for current day
self.logger.info("RiskManager initialized with settings: %s", self.settings)
self._reset_daily_metrics()
def _reset_daily_metrics(self):
"""
Resets daily metrics like profit/loss.
In a real application, this would be triggered at the start of a new trading day.
"""
self.daily_profit_loss_today = Decimal('0.0')
self.logger.info("Daily risk metrics reset.")
def validate_order(self, order_data: Dict[str, Any]) -> bool:
"""
Validates if an order can be placed based on risk rules and adjusts order parameters.
"""
if not self._monitor_daily_loss():
self.logger.warning("Order rejected due to exceeding maximum daily loss.")
return False
symbol = order_data.get("symbol")
side = order_data.get("side")
current_price = Decimal(str(order_data.get("current_price")))
# Create a dummy Order object for _calculate_stop_loss_take_profit
# In a real system, the order would be more fully formed here or SL/TP calculation
# would be refactored to take individual parameters.
dummy_order = Order(symbol=symbol, side=side, quantity=0, price=current_price) # Only side and symbol are needed for SL/TP calcs
sl_tp_prices = self._calculate_stop_loss_take_profit(dummy_order, current_price)
stop_loss_price = sl_tp_prices["stop_loss"]
take_profit_price = sl_tp_prices["take_profit"]
quantity = self._calculate_position_size(symbol, current_price, stop_loss_price)
if quantity == 0:
self.logger.warning("Order rejected due to calculated position size being zero.")
return False
# Update order_data with calculated values
order_data["quantity"] = quantity
order_data["stop_loss_price"] = stop_loss_price
order_data["take_profit_price"] = take_profit_price
self.logger.info("Order validated and adjusted by RiskManager: %s", order_data)
return True
def _get_current_portfolio_value(self) -> Decimal:
"""
Placeholder to fetch the current total portfolio value.
In a real system, this would interact with the portfolio manager or broker.
"""
# For now, return a dummy value
return Decimal('100000.00')
def _calculate_position_size(self, symbol: str, current_price: Decimal, stop_loss_price: Decimal) -> int:
"""
Calculates the appropriate position size based on risk settings, portfolio, and stop-loss price.
"""
if current_price <= Decimal('0.0') or stop_loss_price <= Decimal('0.0') or current_price == stop_loss_price:
self.logger.warning(
"Cannot calculate position size with invalid prices: current_price=%s, stop_loss_price=%s",
current_price, stop_loss_price
)
return 0
portfolio_value = self._get_current_portfolio_value()
# Risk a small percentage of portfolio per trade
risk_per_trade_capital = portfolio_value * self.settings.get("risk_per_trade_percentage", Decimal('0.005')) # e.g., 0.5%
# Calculate the potential loss per share if stop-loss is hit
loss_per_share = abs(current_price - stop_loss_price)
if loss_per_share == Decimal('0.0'):
self.logger.warning("Loss per share is zero, cannot calculate position size safely. Defaulting to 1.")
return 1
# Calculate quantity
quantity = int(risk_per_trade_capital / loss_per_share)
if quantity == 0:
quantity = 1 # Ensure at least 1 share is traded if valid
self.logger.info(
"Calculated position size for %s: %s (Current Price: %s, Stop Loss: %s, Portfolio Value: %s, Risk per Trade Capital: %s)",
symbol, quantity, current_price, stop_loss_price, portfolio_value, risk_per_trade_capital
)
return min(quantity, self.default_position_size) # Cap at default for now
def _calculate_stop_loss_take_profit(self, order: Order, current_price: Decimal) -> Dict[str, Decimal]:
"""
Calculates stop loss and take profit prices for an order.
"""
stop_loss_price = Decimal('0.0')
take_profit_price = Decimal('0.0')
if order.side == "BUY":
stop_loss_price = current_price * (Decimal('1.0') - self.stop_loss_percentage)
take_profit_price = current_price * (Decimal('1.0') + self.take_profit_percentage)
elif order.side == "SELL":
stop_loss_price = current_price * (Decimal('1.0') + self.stop_loss_percentage)
take_profit_price = current_price * (Decimal('1.0') - self.take_profit_percentage)
self.logger.info(
"Calculated SL/TP for %s at price %s: SL=%s, TP=%s",
order.symbol, current_price, stop_loss_price, take_profit_price
)
return {"stop_loss": stop_loss_price, "take_profit": take_profit_price}
def _monitor_daily_loss(self) -> bool:
"""
Monitors the daily profit/loss and returns False if the max daily loss is exceeded.
"""
portfolio_value = self._get_current_portfolio_value()
if portfolio_value > 0 and self.daily_profit_loss_today / portfolio_value < -self.max_daily_loss_percentage:
self.logger.warning(
"Maximum daily loss exceeded. Current P&L: %s, Max Loss: %s",
self.daily_profit_loss_today, -self.max_daily_loss_percentage * portfolio_value
)
return False
return True
def update_daily_profit_loss(self, pnl: Decimal):
"""
Updates the daily profit and loss.
"""
self.daily_profit_loss_today += pnl
self.logger.info("Daily P&L updated: %s", self.daily_profit_loss_today)

View File

@ -0,0 +1,123 @@
"""Core: Strategy orchestration and signal generation."""
from loguru import logger
from PySide6.QtCore import QObject, Signal
class StrategyEngine(QObject):
"""Hosts and executes trading strategies, generating signals from market data."""
class Signals(QObject):
strategy_loaded = Signal(str)
strategy_started = Signal(str)
strategy_stopped = Signal(str)
trade_signal = Signal(dict) # Emits trade signals for potential orders
def __init__(self, order_manager):
"""
Initializes the StrategyEngine.
Args:
order_manager: The OrderManager instance to send signals to.
"""
super().__init__()
self.order_manager = order_manager
self.strategy = None
self.strategy_name = None
self.last_price = None # For our simple demo strategy
self._is_running = False
self.signals = self.Signals()
def load_strategy(self, strategy_name: str):
"""
Loads a trading strategy. For now, we use a simple hardcoded one.
In the future, this will dynamically load strategy modules.
"""
if strategy_name == "simple_crossover":
self.strategy = self._simple_crossover_strategy
self.strategy_name = strategy_name
logger.info(f"Loaded strategy: {strategy_name}")
self.signals.strategy_loaded.emit(strategy_name)
else:
logger.error(f"Strategy '{strategy_name}' not found.")
def start_strategy(self):
"""
Starts the loaded strategy.
"""
if self.strategy and not self._is_running:
self._is_running = True
logger.info(f"Strategy '{self.strategy_name}' started.")
self.signals.strategy_started.emit(self.strategy_name)
elif not self.strategy:
logger.warning("No strategy loaded to start.")
else:
logger.warning("Strategy is already running.")
def stop_strategy(self):
"""
Stops the running strategy.
"""
if self._is_running:
self._is_running = False
logger.info(f"Strategy '{self.strategy_name}' stopped.")
self.signals.strategy_stopped.emit(self.strategy_name)
else:
logger.warning("No strategy is currently running.")
async def on_tick(self, tick: dict):
"""
Called by the DataFeed on every new market data tick.
Executes the loaded strategy.
Args:
tick (dict): The tick data from the data feed.
"""
if not self.strategy or not self._is_running:
return
# The tick format from FYERS WebSocket is a list of dicts
# For simplicity, we assume we get one instrument's data at a time
if isinstance(tick, list) and tick:
tick = tick[0]
ltp = tick.get("ltp")
symbol = tick.get("symbol")
if ltp and symbol:
await self.strategy(symbol, ltp)
async def _simple_crossover_strategy(self, symbol: str, current_price: float):
"""
A very basic placeholder strategy.
- If price goes up, BUY.
- If price goes down, SELL.
- Acts only on the first price change.
This is for demonstration and is NOT a profitable strategy.
"""
if self.last_price is None:
self.last_price = current_price
logger.info(f"Starting price for {symbol} is {current_price}. Waiting for change.")
return
# Only trade once for this demo
if abs(self.last_price - current_price) > 0.1: # Threshold to act
trade_info = {"symbol": symbol, "current_price": current_price}
if current_price > self.last_price:
logger.warning(f"Price increased: {self.last_price} -> {current_price}. Sending BUY signal.")
trade_info["side"] = "BUY"
await self.order_manager.place_order(
symbol=symbol, side="BUY", current_price=current_price, quantity=1, order_type="MARKET"
)
elif current_price < self.last_price:
logger.warning(f"Price decreased: {self.last_price} -> {current_price}. Sending SELL signal.")
trade_info["side"] = "SELL"
await self.order_manager.place_order(
symbol=symbol, side="SELL", current_price=current_price, quantity=1, order_type="MARKET"
)
self.signals.trade_signal.emit(trade_info)
# Stop further trading in this simple example
self.stop_strategy()
logger.info("Demo trade placed. Strategy deactivated.")

View File

@ -0,0 +1,12 @@
PySide6
pyqtgraph
pandas
numpy
ta
fyers-apiv3
python-dotenv
loguru
websocket-client
PyYAML
SQLAlchemy
alembic

View File

@ -0,0 +1,56 @@
@echo off
REM This script automates the setup and execution of the Algo Trader application.
REM It is designed for Windows environments.
echo Checking for Python 3.10+...
python -c "import sys; assert sys.version_info >= (3, 10)" >nul 2>&1
if %errorlevel% neq 0 (
echo Python 3.10 or higher is not found or not in PATH. Please install it.
echo Download from: https://www.python.org/downloads/windows/
pause
exit /b 1
)
echo Setting up virtual environment...
python -m venv venv
if %errorlevel% neq 0 (
echo Failed to create virtual environment.
pause
exit /b 1
)
echo Activating virtual environment...
call .\venv\Scripts\activate.bat
if %errorlevel% neq 0 (
echo Failed to activate virtual environment.
pause
exit /b 1
)
echo Installing dependencies...
pip install PySide6 pyqtgraph pandas numpy ta fyers-apiv3 python-dotenv loguru websocket-client PyYAML SQLAlchemy alembic
if %errorlevel% neq 0 (
echo Failed to install dependencies.
pause
exit /b 1
)
echo.
echo =========================================================================
echo IMPORTANT: API Credentials Setup
echo =========================================================================
echo 1. Navigate to the 'config' folder inside 'algo_trader'.
echo 2. Rename 'credentials.env.example' to 'credentials.env'.
echo 3. Open 'credentials.env' in a text editor.
echo 4. Fill in your actual FYERS API CLIENT_ID, APP_ID, SECRET_KEY, and REDIRECT_URL.
echo If you don't have these, you'll need to create an API app on FYERS.
echo =========================================================================
echo.
pause
echo Launching Algo Trader application...
python app.py
echo Application closed.
pause
exit /b 0

View File

@ -0,0 +1,37 @@
"""Storage: SQLite database handler."""
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from .models import Base
def get_engine(db_path: str):
"""Creates a SQLAlchemy engine.
Args:
db_path: Path to the SQLite database file.
Returns:
A SQLAlchemy engine instance.
"""
return create_engine(f"sqlite:///{db_path}")
def get_session(engine):
"""Creates a session factory and returns a session.
Args:
engine: A SQLAlchemy engine instance.
Returns:
A SQLAlchemy session.
"""
Session = sessionmaker(bind=engine)
return Session()
def init_db(engine):
"""Creates all tables in the database.
Args:
engine: A SQLAlchemy engine instance.
"""
Base.metadata.create_all(engine)

View File

@ -0,0 +1,65 @@
"""Storage: Data models for trades, orders, etc."""
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Enum
from sqlalchemy.orm import declarative_base
from sqlalchemy.sql import func
import enum
Base = declarative_base()
class OrderStatus(enum.Enum):
PENDING = "PENDING"
OPEN = "OPEN"
EXECUTED = "EXECUTED"
CANCELED = "CANCELED"
REJECTED = "REJECTED"
class OrderType(enum.Enum):
MARKET = "MARKET"
LIMIT = "LIMIT"
class TransactionType(enum.Enum):
BUY = "BUY"
SELL = "SELL"
class Order(Base):
__tablename__ = 'orders'
id = Column(Integer, primary_key=True)
symbol = Column(String, nullable=False)
quantity = Column(Integer, nullable=False)
price = Column(Float)
order_type = Column(Enum(OrderType), nullable=False)
transaction_type = Column(Enum(TransactionType), nullable=False)
status = Column(Enum(OrderStatus), default=OrderStatus.PENDING)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
def __repr__(self):
return f"<Order(id={self.id}, symbol='{self.symbol}', status='{self.status}')>"
class Trade(Base):
__tablename__ = 'trades'
id = Column(Integer, primary_key=True)
order_id = Column(Integer, nullable=False) # FK to Order table
symbol = Column(String, nullable=False)
quantity = Column(Integer, nullable=False)
price = Column(Float, nullable=False)
transaction_type = Column(Enum(TransactionType), nullable=False)
trade_time = Column(DateTime, default=func.now())
def __repr__(self):
return f"<Trade(id={self.id}, symbol='{self.symbol}', quantity={self.quantity})>"
class Position(Base):
__tablename__ = 'positions'
id = Column(Integer, primary_key=True)
symbol = Column(String, unique=True, nullable=False)
quantity = Column(Integer, nullable=False)
average_price = Column(Float, nullable=False)
last_updated = Column(DateTime, default=func.now(), onupdate=func.now())
def __repr__(self):
return f"<Position(symbol='{self.symbol}', quantity={self.quantity})>"

View File

@ -0,0 +1 @@
"""Strategy: Positional equity swing trading logic."""

View File

@ -0,0 +1 @@
"""Strategy: Intraday F&O logic."""

View File

@ -0,0 +1,65 @@
from PySide6.QtWidgets import QWidget, QLabel, QVBoxLayout, QGridLayout
from PySide6.QtCore import Qt
class DashboardPanel(QWidget):
def __init__(self):
super().__init__()
self.init_ui()
def init_ui(self):
layout = QVBoxLayout(self)
# Title
title_label = QLabel("Dashboard")
title_label.setAlignment(Qt.AlignCenter)
title_label.setStyleSheet("font-size: 24px; font-weight: bold; margin-bottom: 20px;")
layout.addWidget(title_label)
# Metrics Grid
metrics_grid = QGridLayout()
metrics_grid.setSpacing(15)
# Account Balance
self.account_balance_label = self._create_metric_label("Account Balance:", "$100,000.00")
metrics_grid.addWidget(self.account_balance_label[0], 0, 0)
metrics_grid.addWidget(self.account_balance_label[1], 0, 1)
# Today's P&L
self.pnl_label = self._create_metric_label("Today's P&L:", "+$500.00 (0.5%)")
metrics_grid.addWidget(self.pnl_label[0], 1, 0)
metrics_grid.addWidget(self.pnl_label[1], 1, 1)
# Open Positions
self.open_positions_label = self._create_metric_label("Open Positions:", "5")
metrics_grid.addWidget(self.open_positions_label[0], 2, 0)
metrics_grid.addWidget(self.open_positions_label[1], 2, 1)
# Pending Orders
self.pending_orders_label = self._create_metric_label("Pending Orders:", "2")
metrics_grid.addWidget(self.pending_orders_label[0], 3, 0)
metrics_grid.addWidget(self.pending_orders_label[1], 3, 1)
# Last Tick
self.last_tick_label = self._create_metric_label("Last Tick:", "N/A")
metrics_grid.addWidget(self.last_tick_label[0], 4, 0)
metrics_grid.addWidget(self.last_tick_label[1], 4, 1)
layout.addLayout(metrics_grid)
layout.addStretch(1) # Pushes content to the top
self.setLayout(layout)
def _create_metric_label(self, title, value):
title_label = QLabel(title)
title_label.setStyleSheet("font-weight: bold;")
value_label = QLabel(value)
return title_label, value_label
def update_market_data(self, tick_data):
# Example: Update Last Tick label with relevant info
if "symbol" in tick_data and "ltp" in tick_data:
self.last_tick_label[1].setText(f"{tick_data['symbol']}: {tick_data['ltp']}")
elif isinstance(tick_data, dict):
# Fallback for any other dict data
self.last_tick_label[1].setText(str(tick_data))
else:
self.last_tick_label[1].setText(str(tick_data))

View File

@ -0,0 +1,26 @@
from PySide6.QtWidgets import QWidget, QLabel, QVBoxLayout, QTextEdit
from PySide6.QtCore import Qt
class LogsPanel(QWidget):
def __init__(self):
super().__init__()
self.init_ui()
def init_ui(self):
layout = QVBoxLayout(self)
# Title
title_label = QLabel("Logs")
title_label.setAlignment(Qt.AlignCenter)
title_label.setStyleSheet("font-size: 24px; font-weight: bold; margin-bottom: 20px;")
layout.addWidget(title_label)
self.log_display = QTextEdit()
self.log_display.setReadOnly(True)
layout.addWidget(self.log_display)
layout.addStretch(1) # Pushes content to the top
self.setLayout(layout)
def append_log(self, message):
self.log_display.append(message)

View File

@ -0,0 +1,120 @@
import sys
import asyncio
from PySide6.QtWidgets import QApplication, QMainWindow, QLabel, QWidget, QVBoxLayout, QTabWidget
from PySide6.QtCore import Qt, QTimer, QThread, Signal
from .dashboard import DashboardPanel
from .strategy_panel import StrategyPanel
from .positions_panel import PositionsPanel
from .orders_panel import OrdersPanel
from .logs_panel import LogsPanel
class AsyncWorker(QThread):
def __init__(self, loop: asyncio.AbstractEventLoop, target_coro, kwargs):
super().__init__()
self.loop = loop
self.target_coro = target_coro
self.kwargs = kwargs
def run(self):
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.target_coro(**self.kwargs))
self.loop.close()
class MainWindow(QMainWindow):
def __init__(
self, broker, data_feed, db_session, order_manager, risk_manager, strategy_engine, settings, backend_runner, app_logger, portfolio
):
super().__init__()
self.broker = broker
self.data_feed = data_feed
self.db_session = db_session
self.order_manager = order_manager
self.risk_manager = risk_manager
self.strategy_engine = strategy_engine
self.settings = settings
self.backend_runner = backend_runner
self.app_logger = app_logger # Store the app_logger instance
self.portfolio = portfolio
self.setWindowTitle("Algo Trader")
self.setGeometry(100, 100, 1600, 900)
# Create main tab widget
self.tabs = QTabWidget()
self.tabs.setTabPosition(QTabWidget.North)
# Create panels
self.dashboard_panel = DashboardPanel()
self.strategy_panel = StrategyPanel()
self.positions_panel = PositionsPanel()
self.orders_panel = OrdersPanel()
self.logs_panel = LogsPanel()
# Add panels to tabs
self.tabs.addTab(self.dashboard_panel, "Dashboard")
self.tabs.addTab(self.strategy_panel, "Strategies")
self.tabs.addTab(self.positions_panel, "Positions")
self.tabs.addTab(self.orders_panel, "Orders")
self.tabs.addTab(self.logs_panel, "Logs")
self.setCentralWidget(self.tabs)
# Connect logger signal to logs panel
self.app_logger.log_signal.connect(self.logs_panel.append_log)
# Connect data_feed signals to dashboard and logger
self.data_feed.signals.connected.connect(lambda: self.app_logger.info("DataFeed: Connected"))
self.data_feed.signals.disconnected.connect(lambda: self.app_logger.warning("DataFeed: Disconnected"))
self.data_feed.signals.error.connect(lambda msg: self.app_logger.error(f"DataFeed Error: {msg}"))
self.data_feed.signals.tick.connect(self.dashboard_panel.update_market_data)
# Connect order_manager signals to orders panel
self.order_manager.signals.order_updated.connect(self.orders_panel.add_or_update_order)
# Connect portfolio signals to positions panel
self.portfolio.signals.position_updated.connect(self.positions_panel.add_or_update_position)
self.portfolio.signals.portfolio_value_updated.connect(lambda value: self.app_logger.info(f"Portfolio Value Updated: {value}"))
# Connect strategy_engine signals to strategy_panel and logger
self.strategy_engine.signals.strategy_loaded.connect(self.strategy_panel.update_status)
self.strategy_engine.signals.strategy_started.connect(lambda name: self.strategy_panel.update_status(f"Running: {name}"))
self.strategy_engine.signals.strategy_stopped.connect(lambda name: self.strategy_panel.update_status(f"Stopped: {name}"))
self.strategy_engine.signals.trade_signal.connect(lambda signal: self.app_logger.info(f"Trade Signal: {signal}"))
# Connect strategy_panel UI signals to strategy_engine methods
self.strategy_panel.strategy_selected.connect(self.strategy_engine.load_strategy)
self.strategy_panel.start_strategy_requested.connect(self.strategy_engine.start_strategy)
self.strategy_panel.stop_strategy_requested.connect(self.strategy_engine.stop_strategy)
# --- Asyncio Integration ---
self.loop = asyncio.get_event_loop()
if self.loop.is_running():
self.loop = asyncio.new_event_loop()
self.backend_worker = AsyncWorker(
loop=self.loop,
target_coro=self.backend_runner,
kwargs={
"broker": self.broker,
"data_feed": self.data_feed,
"db_session": self.db_session,
"order_manager": self.order_manager,
"risk_manager": self.risk_manager,
"strategy_engine": self.strategy_engine,
"settings": self.settings,
"portfolio": self.portfolio,
},
)
self.backend_worker.start()
def closeEvent(self, event):
if self.data_feed:
asyncio.run_coroutine_threadsafe(self.data_feed.close(), self.loop)
if self.db_session:
self.db_session.close()
self.loop.call_soon_threadsafe(self.loop.stop)
self.backend_worker.wait() # Wait for the backend thread to finish
super().closeEvent(event)

View File

@ -0,0 +1,60 @@
from PySide6.QtWidgets import QWidget, QLabel, QVBoxLayout, QTableWidget, QTableWidgetItem, QHeaderView
from PySide6.QtCore import Qt
from storage.models import Order
class OrdersPanel(QWidget):
def __init__(self):
super().__init__()
self.init_ui()
self.order_rows = {} # To keep track of rows by order_id
def init_ui(self):
layout = QVBoxLayout(self)
# Title
title_label = QLabel("Orders")
title_label.setAlignment(Qt.AlignCenter)
title_label.setStyleSheet("font-size: 24px; font-weight: bold; margin-bottom: 20px;")
layout.addWidget(title_label)
self.orders_table = QTableWidget()
self.orders_table.setColumnCount(8)
self.orders_table.setHorizontalHeaderLabels(["Order ID", "Symbol", "Type", "Side", "Quantity", "Price", "Status", "Timestamp"])
self.orders_table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch)
layout.addWidget(self.orders_table)
layout.addStretch(1) # Pushes content to the top
self.setLayout(layout)
def add_or_update_order(self, order: Order):
if order.broker_order_id in self.order_rows:
row_position = self.order_rows[order.broker_order_id]
else:
row_position = self.orders_table.rowCount()
self.orders_table.insertRow(row_position)
self.order_rows[order.broker_order_id] = row_position
self.orders_table.setItem(row_position, 0, QTableWidgetItem(str(order.broker_order_id)))
self.orders_table.setItem(row_position, 1, QTableWidgetItem(order.symbol))
self.orders_table.setItem(row_position, 2, QTableWidgetItem(order.order_type))
self.orders_table.setItem(row_position, 3, QTableWidgetItem(order.side))
self.orders_table.setItem(row_position, 4, QTableWidgetItem(str(order.quantity)))
self.orders_table.setItem(row_position, 5, QTableWidgetItem(str(order.price)))
self.orders_table.setItem(row_position, 6, QTableWidgetItem(order.status))
self.orders_table.setItem(row_position, 7, QTableWidgetItem(str(order.timestamp)))
def add_order(self, order_id, symbol, order_type, side, quantity, price, status, timestamp):
# This method is now a compatibility wrapper or can be removed if not used elsewhere
# For now, create a dummy Order object and call add_or_update_order
from storage.models import Order as DummyOrder # Use alias to avoid re-importing
dummy_order = DummyOrder(
broker_order_id=order_id,
symbol=symbol,
order_type=order_type,
side=side,
quantity=quantity,
price=price,
status=status,
timestamp=timestamp
)
self.add_or_update_order(dummy_order)

View File

@ -0,0 +1,67 @@
from PySide6.QtWidgets import QWidget, QLabel, QVBoxLayout, QTableWidget, QTableWidgetItem, QHeaderView
from PySide6.QtCore import Qt
from storage.models import Position
class PositionsPanel(QWidget):
def __init__(self):
super().__init__()
self.init_ui()
self.position_rows = {} # To keep track of rows by symbol
def init_ui(self):
layout = QVBoxLayout(self)
# Title
title_label = QLabel("Positions")
title_label.setAlignment(Qt.AlignCenter)
title_label.setStyleSheet("font-size: 24px; font-weight: bold; margin-bottom: 20px;")
layout.addWidget(title_label)
self.positions_table = QTableWidget()
self.positions_table.setColumnCount(6)
self.positions_table.setHorizontalHeaderLabels(["Symbol", "Quantity", "Average Price", "Current Price", "Market Value", "Unrealized P&L"])
self.positions_table.horizontalHeader().setSectionResizeMode(QHeaderView.Stretch)
layout.addWidget(self.positions_table)
layout.addStretch(1) # Pushes content to the top
self.setLayout(layout)
def add_or_update_position(self, position: Position):
if not position.is_open: # Remove closed positions from the UI
if position.symbol in self.position_rows:
row_to_remove = self.position_rows.pop(position.symbol)
self.positions_table.removeRow(row_to_remove)
# Update row indices after removal
for symbol, row_idx in self.position_rows.items():
if row_idx > row_to_remove:
self.position_rows[symbol] = row_idx - 1
return
if position.symbol in self.position_rows:
row_position = self.position_rows[position.symbol]
else:
row_position = self.positions_table.rowCount()
self.positions_table.insertRow(row_position)
self.position_rows[position.symbol] = row_position
self.positions_table.setItem(row_position, 0, QTableWidgetItem(position.symbol))
self.positions_table.setItem(row_position, 1, QTableWidgetItem(str(position.quantity)))
self.positions_table.setItem(row_position, 2, QTableWidgetItem(str(position.average_price)))
self.positions_table.setItem(row_position, 3, QTableWidgetItem(str(position.current_price)))
self.positions_table.setItem(row_position, 4, QTableWidgetItem(str(position.market_value)))
self.positions_table.setItem(row_position, 5, QTableWidgetItem(str(position.unrealized_pnl)))
def update_position(self, symbol, quantity, avg_price, current_price, market_value, unrealized_pnl):
# This method is now a compatibility wrapper or can be removed if not used elsewhere
# For now, create a dummy Position object and call add_or_update_position
from storage.models import Position as DummyPosition # Use alias to avoid re-importing
dummy_position = DummyPosition(
symbol=symbol,
quantity=quantity,
average_price=avg_price,
current_price=current_price,
market_value=market_value,
unrealized_pnl=unrealized_pnl,
is_open=True # Assuming update_position is for open positions
)
self.add_or_update_position(dummy_position)

View File

@ -0,0 +1,86 @@
from PySide6.QtWidgets import QWidget, QLabel, QVBoxLayout, QHBoxLayout, QPushButton, QComboBox
from PySide6.QtCore import Qt, Signal
class StrategyPanel(QWidget):
strategy_selected = Signal(str)
start_strategy_requested = Signal(str)
stop_strategy_requested = Signal(str)
def __init__(self):
super().__init__()
self.init_ui()
def init_ui(self):
layout = QVBoxLayout(self)
# Title
title_label = QLabel("Strategies")
title_label.setAlignment(Qt.AlignCenter)
title_label.setStyleSheet("font-size: 24px; font-weight: bold; margin-bottom: 20px;")
layout.addWidget(title_label)
# Strategy selection and control
strategy_control_layout = QHBoxLayout()
strategy_control_layout.addWidget(QLabel("Select Strategy:"))
self.strategy_selector = QComboBox()
self.strategy_selector.addItem("None")
self.strategy_selector.addItem("simple_crossover") # Placeholder strategy
self.strategy_selector.addItem("intraday_fno") # Placeholder strategy
self.strategy_selector.currentIndexChanged.connect(self._on_strategy_selected)
strategy_control_layout.addWidget(self.strategy_selector)
self.start_button = QPushButton("Start Strategy")
self.start_button.clicked.connect(self._on_start_button_clicked)
self.start_button.setEnabled(False) # Initially disabled
strategy_control_layout.addWidget(self.start_button)
self.stop_button = QPushButton("Stop Strategy")
self.stop_button.clicked.connect(self._on_stop_button_clicked)
self.stop_button.setEnabled(False) # Initially disabled
strategy_control_layout.addWidget(self.stop_button)
layout.addLayout(strategy_control_layout)
# Strategy status
status_layout = QHBoxLayout()
status_layout.addWidget(QLabel("Status:"))
self.strategy_status_label = QLabel("Idle")
status_layout.addWidget(self.strategy_status_label)
status_layout.addStretch(1)
layout.addLayout(status_layout)
layout.addStretch(1) # Pushes content to the top
self.setLayout(layout)
def _on_strategy_selected(self, index):
strategy_name = self.strategy_selector.currentText()
self.strategy_selected.emit(strategy_name)
self.start_button.setEnabled(strategy_name != "None")
self.stop_button.setEnabled(False)
self.update_status("Loaded" if strategy_name != "None" else "Idle")
def _on_start_button_clicked(self):
strategy_name = self.strategy_selector.currentText()
if strategy_name != "None":
self.start_strategy_requested.emit(strategy_name)
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
self.update_status("Starting...")
def _on_stop_button_clicked(self):
strategy_name = self.strategy_selector.currentText()
if strategy_name != "None":
self.stop_strategy_requested.emit(strategy_name)
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
self.update_status("Stopping...")
def update_status(self, status: str):
self.strategy_status_label.setText(status)
def set_available_strategies(self, strategies: list[str]):
self.strategy_selector.clear()
self.strategy_selector.addItem("None")
for strategy in strategies:
self.strategy_selector.addItem(strategy)

View File

@ -0,0 +1 @@
"""Utils: Helper functions."""

View File

@ -0,0 +1,30 @@
from PySide6.QtCore import QObject, Signal
from loguru import logger
import sys
class Logger(QObject):
log_signal = Signal(str)
def __init__(self):
super().__init__()
logger.remove() # Remove default logger
logger.add(self.emit_log, level="INFO")
logger.add(sys.stderr, level="ERROR")
def emit_log(self, message):
self.log_signal.emit(message)
def info(self, message):
logger.info(message)
def warning(self, message):
logger.warning(message)
def error(self, message):
logger.error(message)
def debug(self, message):
logger.debug(message)
# Global logger instance
app_logger = Logger()