163 lines
6.1 KiB
Python
163 lines
6.1 KiB
Python
"""Core: Order execution and management."""
|
|
|
|
import asyncio
|
|
from datetime import datetime
|
|
from typing import Dict, Any
|
|
from loguru import logger
|
|
from PySide6.QtCore import QObject, Signal
|
|
|
|
from storage.models import Order
|
|
from .risk_manager import RiskManager
|
|
|
|
|
|
class OrderManager(QObject):
|
|
"""Handles order creation, tracking, and lifecycle management."""
|
|
|
|
class Signals(QObject):
|
|
order_updated = Signal(Order)
|
|
|
|
def __init__(self, broker, db_session, risk_manager: RiskManager):
|
|
"""
|
|
Initializes the OrderManager.
|
|
|
|
Args:
|
|
broker: The broker instance to execute trades.
|
|
db_session: The database session for persistence.
|
|
risk_manager: The RiskManager instance for risk validation.
|
|
"""
|
|
super().__init__()
|
|
self.broker = broker
|
|
self.db_session = db_session
|
|
self.risk_manager = risk_manager
|
|
self.orders = {} # In-memory cache for active orders
|
|
self.signals = self.Signals()
|
|
|
|
async def place_order(
|
|
self,
|
|
symbol: str,
|
|
side: str,
|
|
current_price: float,
|
|
quantity: int,
|
|
order_type: str,
|
|
limit_price: float = None,
|
|
stop_price: float = None,
|
|
is_async: bool = True,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Places a new order with the broker and stores it in the database.
|
|
|
|
Args:
|
|
symbol (str): The symbol to trade (e.g., 'NSE:SBIN-EQ').
|
|
side (str): 'BUY' or 'SELL'.
|
|
quantity (int): The number of shares.
|
|
order_type (str): 'LIMIT', 'MARKET', 'STOP', etc.
|
|
limit_price (float, optional): The limit price for LIMIT orders.
|
|
stop_price (float, optional): The stop price for STOP orders.
|
|
is_async (bool, optional): FYERS specific flag. Defaults to True.
|
|
|
|
Returns:
|
|
dict: The response from the broker.
|
|
"""
|
|
logger.info(
|
|
f"Placing {side} order for {quantity} {symbol} at {order_type}."
|
|
)
|
|
|
|
order_data_for_validation = {
|
|
"symbol": symbol,
|
|
"side": side,
|
|
"quantity": quantity,
|
|
"order_type": order_type,
|
|
"limit_price": limit_price,
|
|
"stop_price": stop_price,
|
|
"current_price": current_price, # Add current price for risk manager
|
|
}
|
|
|
|
if not self.risk_manager.validate_order(order_data_for_validation):
|
|
return {"error": "Order rejected by risk manager"}
|
|
|
|
# Use adjusted quantity, stop_loss_price, take_profit_price from risk manager
|
|
adjusted_quantity = order_data_for_validation["quantity"]
|
|
adjusted_stop_loss_price = order_data_for_validation.get("stop_loss_price")
|
|
adjusted_take_profit_price = order_data_for_validation.get("take_profit_price")
|
|
|
|
try:
|
|
order_data = {
|
|
"symbol": symbol,
|
|
"qty": adjusted_quantity,
|
|
"type": self._map_order_type(order_type),
|
|
"side": self._map_side(side),
|
|
"productType": "INTRADAY", # Or CNC, MARGIN, etc.
|
|
"limitPrice": adjusted_take_profit_price if adjusted_take_profit_price else 0, # Use take profit as limit for now
|
|
"stopPrice": adjusted_stop_loss_price if adjusted_stop_loss_price else 0,
|
|
"validity": "DAY",
|
|
"disclosedQty": 0,
|
|
"offlineOrder": "False",
|
|
}
|
|
|
|
# The fyers_sdk places orders synchronously, but we run it in an executor
|
|
loop = asyncio.get_running_loop()
|
|
broker_response = await loop.run_in_executor(
|
|
None, self.broker.place_order, order_data
|
|
)
|
|
|
|
if broker_response and broker_response.get("s") == "ok":
|
|
order_id = broker_response.get("id")
|
|
logger.success(f"Order placed successfully. Broker ID: {order_id}")
|
|
await self._store_order(
|
|
order_id, symbol, side, quantity, order_type, limit_price
|
|
)
|
|
return broker_response
|
|
else:
|
|
error_msg = broker_response.get(
|
|
"message", "Unknown error from broker"
|
|
)
|
|
logger.error(f"Failed to place order: {error_msg}")
|
|
return {"error": error_msg}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Exception placing order: {e}")
|
|
return {"error": str(e)}
|
|
|
|
async def _store_order(
|
|
self, order_id, symbol, side, quantity, order_type, price
|
|
):
|
|
"""Stores the order details in the database."""
|
|
try:
|
|
new_order = Order(
|
|
broker_order_id=order_id,
|
|
symbol=symbol,
|
|
side=side,
|
|
quantity=quantity,
|
|
order_type=order_type,
|
|
price=price,
|
|
status="PENDING", # Initial status
|
|
timestamp=datetime.utcnow(),
|
|
)
|
|
self.db_session.add(new_order)
|
|
self.db_session.commit()
|
|
self.orders[order_id] = new_order # Cache it
|
|
self.signals.order_updated.emit(new_order) # Emit signal
|
|
logger.info(f"Order {order_id} stored in database.")
|
|
except Exception as e:
|
|
logger.error(f"Failed to store order {order_id}: {e}")
|
|
self.db_session.rollback()
|
|
|
|
def _map_order_type(self, order_type: str) -> int:
|
|
"""Maps internal order types to FYERS API integer codes."""
|
|
mapping = {"LIMIT": 1, "MARKET": 2, "STOP": 3, "STOP_LIMIT": 4}
|
|
return mapping.get(order_type.upper(), 2) # Default to MARKET
|
|
|
|
def _map_side(self, side: str) -> int:
|
|
"""Maps internal side to FYERS API integer codes."""
|
|
mapping = {"BUY": 1, "SELL": -1}
|
|
return mapping.get(side.upper(), 1) # Default to BUY
|
|
|
|
async def get_order_status(self, order_id: str):
|
|
"""Retrieves the status of an order from the broker."""
|
|
# Placeholder for fetching order status from broker
|
|
pass
|
|
|
|
async def cancel_order(self, order_id: str):
|
|
"""Cancels an active order."""
|
|
# Placeholder for cancelling an order
|
|
pass |