"""Core: WebSocket market data feed.""" import asyncio from typing import Awaitable, Callable from fyers_api_sdk.fyers_async import FyersAsync from loguru import logger class FyersDataFeed: """Handles WebSocket connection for real-time data.""" def __init__( self, fyers: FyersAsync, on_connect: Callable[[], Awaitable[None]], on_tick: Callable[[dict], Awaitable[None]], on_close: Callable[[], Awaitable[None]] | None = None, on_error: Callable[[str], Awaitable[None]] | None = None, ): """ Initializes the FyersDataFeed. Args: fyers: An authenticated FyersAsync instance. on_connect: Async callback to run on successful connection. on_tick: Async callback to process each incoming tick. on_close: Async callback for when the connection closes. on_error: Async callback to handle errors. """ self._fyers = fyers self._on_connect = on_connect self._on_tick = on_tick self._on_close = on_close self._on_error = on_error self.websocket = None async def connect(self, access_token: str, data_type: str = "symbolData"): """Establishes a WebSocket connection.""" logger.info("Connecting to Fyers WebSocket...") try: self.websocket = self._fyers.fyers_market_socket( access_token=access_token, log_path="", # Disable log file generation on_connect=lambda: asyncio.create_task(self._on_connect()), on_close=lambda: asyncio.create_task(self._handle_close()), on_error=lambda msg: asyncio.create_task(self._handle_error(msg)), on_message=lambda msg: asyncio.create_task(self._on_tick(msg)), ) await asyncio.to_thread(self.websocket.connect) logger.success("Fyers WebSocket connected.") except Exception as e: logger.error(f"Failed to connect to WebSocket: {e}") if self._on_error: await self._on_error(str(e)) async def subscribe(self, symbols: list[str]): """Subscribes to a list of symbols.""" if self.websocket: logger.info(f"Subscribing to symbols: {symbols}") request = {"T": "SUB_L2", "L2_T": "T", "V": symbols} self.websocket.send_message(request) else: logger.warning("WebSocket not connected. Cannot subscribe.") async def unsubscribe(self, symbols: list[str]): """Unsubscribes from a list of symbols.""" if self.websocket: logger.info(f"Unsubscribing from symbols: {symbols}") request = {"T": "UNSUB_L2", "V": symbols} self.websocket.send_message(request) else: logger.warning("WebSocket not connected. Cannot unsubscribe.") async def close(self): """Closes the WebSocket connection.""" if self.websocket: logger.info("Closing WebSocket connection.") self.websocket.close() async def _handle_close(self): """Internal handler for close events.""" logger.info("WebSocket connection closed.") if self._on_close: await self._on_close() async def _handle_error(self, message: str): """Internal handler for error events.""" logger.error(f"WebSocket error: {message}") if self._on_error: await self._on_error(message)