37172-vm/algo_trader/core/data_feed.py
2025-12-27 04:26:41 +00:00

94 lines
3.4 KiB
Python

"""Core: WebSocket market data feed."""
import asyncio
from PySide6.QtCore import QObject, Signal
from typing import Any
from fyers_api_sdk.fyers_async import FyersAsync
from loguru import logger
class FyersDataFeed(QObject):
"""Handles WebSocket connection for real-time data."""
class Signals(QObject):
connected = Signal()
disconnected = Signal()
error = Signal(str)
tick = Signal(dict)
def __init__(
self,
fyers: FyersAsync,
):
"""
Initializes the FyersDataFeed.
Args:
fyers: An authenticated FyersAsync instance.
"""
super().__init__()
self._fyers = fyers
self.websocket = None
self.signals = self.Signals()
async def connect(self, access_token: str, data_type: str = "symbolData"):
"""Establishes a WebSocket connection."""
logger.info("Connecting to Fyers WebSocket...")
try:
self.websocket = self._fyers.fyers_market_socket(
access_token=access_token,
log_path="", # Disable log file generation
on_connect=lambda: asyncio.create_task(self._handle_connect()),
on_close=lambda: asyncio.create_task(self._handle_close()),
on_error=lambda msg: asyncio.create_task(self._handle_error(msg)),
on_message=lambda msg: asyncio.create_task(self._handle_tick(msg)),
)
await asyncio.to_thread(self.websocket.connect)
logger.success("Fyers WebSocket connected.")
except Exception as e:
logger.error(f"Failed to connect to WebSocket: {e}")
self.signals.error.emit(str(e))
async def subscribe(self, symbols: list[str]):
"""Subscribes to a list of symbols."""
if self.websocket:
logger.info(f"Subscribing to symbols: {symbols}")
request = {"T": "SUB_L2", "L2_T": "T", "V": symbols}
self.websocket.send_message(request)
else:
logger.warning("WebSocket not connected. Cannot subscribe.")
async def unsubscribe(self, symbols: list[str]):
"""Unsubscribes from a list of symbols."""
if self.websocket:
logger.info(f"Unsubscribing from symbols: {symbols}")
request = {"T": "UNSUB_L2", "V": symbols}
self.websocket.send_message(request)
else:
logger.warning("WebSocket not connected. Cannot unsubscribe.")
async def close(self):
"""Closes the WebSocket connection."""
if self.websocket:
logger.info("Closing WebSocket connection.")
self.websocket.close()
async def _handle_connect(self):
"""Internal handler for connect events."""
logger.info("WebSocket connection established.")
self.signals.connected.emit()
async def _handle_close(self):
"""Internal handler for close events."""
logger.info("WebSocket connection closed.")
self.signals.disconnected.emit()
async def _handle_error(self, message: str):
"""Internal handler for error events."""
logger.error(f"WebSocket error: {message}")
self.signals.error.emit(message)
async def _handle_tick(self, message: dict):
"""Internal handler for tick events."""
logger.debug(f"Received tick: {message}")
self.signals.tick.emit(message)