Files
JamesTang 6e86da6733 Implement moomoo API connector for OHLCV data
- Added DataConnector base class with OHLCV, InstrumentInfo, Interval
- Implemented MoomooClient with rate limiting, circuit breaker, caching
- Mock mode generates realistic data for development
- Real-time WebSocket subscription support (mock)
- Added examples/demo_moomoo.py showcasing functionality
- Updated requirements.txt with requests, websocket-client, redis, python-dotenv
- Updated README.md with Data Layer documentation
- Added .env.example for configuration
2026-02-04 11:35:33 +08:00

138 lines
3.1 KiB
Python

#!/usr/bin/env python3
"""
Base classes for data connectors.
"""
import abc
from datetime import datetime
from typing import List, Optional, Dict, Any
from dataclasses import dataclass
from enum import Enum
class Interval(Enum):
"""Trading data intervals."""
MINUTE_1 = "1m"
MINUTE_5 = "5m"
MINUTE_15 = "15m"
MINUTE_30 = "30m"
HOUR_1 = "1h"
HOUR_4 = "4h"
DAY_1 = "1d"
WEEK_1 = "1w"
MONTH_1 = "1M"
@dataclass
class OHLCV:
"""Standard OHLCV data point."""
timestamp: datetime
open: float
high: float
low: float
close: float
volume: float
symbol: str
interval: Interval
def to_dict(self) -> Dict[str, Any]:
return {
"timestamp": self.timestamp.isoformat(),
"open": self.open,
"high": self.high,
"low": self.low,
"close": self.close,
"volume": self.volume,
"symbol": self.symbol,
"interval": self.interval.value
}
@dataclass
class InstrumentInfo:
"""Instrument metadata."""
symbol: str
name: str
exchange: str
currency: str
lot_size: int
min_price_increment: float
trading_hours: Optional[str] = None
is_tradable: bool = True
class DataConnector(abc.ABC):
"""Abstract base class for data connectors."""
@abc.abstractmethod
def connect(self) -> bool:
"""Establish connection to data source."""
pass
@abc.abstractmethod
def disconnect(self):
"""Close connection."""
pass
@abc.abstractmethod
def get_ohlcv(
self,
symbol: str,
interval: Interval,
start_date: datetime,
end_date: datetime,
limit: Optional[int] = None
) -> List[OHLCV]:
"""
Fetch historical OHLCV data.
Args:
symbol: Trading symbol (e.g., "AAPL")
interval: Time interval
start_date: Start of period
end_date: End of period
limit: Maximum number of records to return
Returns:
List of OHLCV objects
"""
pass
@abc.abstractmethod
def get_instrument_info(self, symbol: str) -> InstrumentInfo:
"""Get instrument metadata."""
pass
@abc.abstractmethod
def subscribe_ohlcv(
self,
symbols: List[str],
interval: Interval,
callback
):
"""
Subscribe to real-time OHLCV updates.
Args:
symbols: List of symbols to subscribe to
interval: Interval for updates
callback: Function to call with new OHLCV data
"""
pass
@abc.abstractmethod
def unsubscribe(self, symbols: List[str]):
"""Cancel subscriptions."""
pass
@property
@abc.abstractmethod
def is_connected(self) -> bool:
"""Check if connector is currently connected."""
pass
@property
@abc.abstractmethod
def request_count(self) -> int:
"""Total number of API requests made."""
pass