"""个股资金流向 (Moneyflow) interface. Fetch A-share stock money flow data from Tushare. This interface retrieves fund flow data analyzing large and small order transactions. Data starts from 2010. """ import pandas as pd from typing import Optional from src.data.client import TushareClient from src.data.api_wrappers.base_sync import DateBasedSync def get_moneyflow( trade_date: Optional[str] = None, ts_code: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, client: Optional[TushareClient] = None, ) -> pd.DataFrame: """Fetch individual stock money flow data from Tushare. This interface retrieves fund flow data analyzing large and small order transactions for A-share stocks. Data starts from 2010. Order size classification: - Small orders (小单): < 50,000 yuan - Medium orders (中单): 50,000 - 200,000 yuan - Large orders (大单): 200,000 - 1,000,000 yuan - Extra large orders (特大单): >= 1,000,000 yuan Args: trade_date: Specific trade date in YYYYMMDD format ts_code: Stock code filter (optional, e.g., '000001.SZ') start_date: Start date for date range query (YYYYMMDD format) end_date: End date for date range query (YYYYMMDD format) client: Optional TushareClient instance for shared rate limiting. If None, creates a new client. For concurrent sync operations, pass a shared client to ensure proper rate limiting. Returns: pd.DataFrame with columns: - ts_code: Stock code - trade_date: Trade date (YYYYMMDD) - buy_sm_vol: Small order buy volume (hands) - buy_sm_amount: Small order buy amount (10k yuan) - sell_sm_vol: Small order sell volume (hands) - sell_sm_amount: Small order sell amount (10k yuan) - buy_md_vol: Medium order buy volume (hands) - buy_md_amount: Medium order buy amount (10k yuan) - sell_md_vol: Medium order sell volume (hands) - sell_md_amount: Medium order sell amount (10k yuan) - buy_lg_vol: Large order buy volume (hands) - buy_lg_amount: Large order buy amount (10k yuan) - sell_lg_vol: Large order sell volume (hands) - sell_lg_amount: Large order sell amount (10k yuan) - buy_elg_vol: Extra large order buy volume (hands) - buy_elg_amount: Extra large order buy amount (10k yuan) - sell_elg_vol: Extra large order sell volume (hands) - sell_elg_amount: Extra large order sell amount (10k yuan) - net_mf_vol: Net money flow volume (hands) - net_mf_amount: Net money flow amount (10k yuan) Example: >>> # Get all stocks' money flow for a single date >>> data = get_moneyflow(trade_date='20240115') >>> >>> # Get date range data for a specific stock >>> data = get_moneyflow(ts_code='000001.SZ', start_date='20240101', end_date='20240131') >>> >>> # Get specific stock on specific date >>> data = get_moneyflow(ts_code='000001.SZ', trade_date='20240115') """ client = client or TushareClient() # Build parameters params = {} if trade_date: params["trade_date"] = trade_date if ts_code: params["ts_code"] = ts_code if start_date: params["start_date"] = start_date if end_date: params["end_date"] = end_date # Fetch data using moneyflow API data = client.query("moneyflow", **params) # Rename date column if needed if "date" in data.columns: data = data.rename(columns={"date": "trade_date"}) return data class MoneyflowSync(DateBasedSync): """个股资金流向数据批量同步管理器,支持全量/增量同步。 继承自 DateBasedSync,使用按日期并发获取数据。 数据始于 2010 年。 Example: >>> sync = MoneyflowSync() >>> results = sync.sync_all() # 增量同步 >>> results = sync.sync_all(force_full=True) # 全量同步 >>> preview = sync.preview_sync() # 预览 """ table_name = "moneyflow" default_start_date = "20100101" # 表结构定义 - 使用 Tushare API 原始字段名 TABLE_SCHEMA = { "ts_code": "VARCHAR(16) NOT NULL", "trade_date": "DATE NOT NULL", "buy_sm_vol": "INTEGER", # 小单买入量(手) "buy_sm_amount": "DOUBLE", # 小单买入金额(万元) "sell_sm_vol": "INTEGER", # 小单卖出量(手) "sell_sm_amount": "DOUBLE", # 小单卖出金额(万元) "buy_md_vol": "INTEGER", # 中单买入量(手) "buy_md_amount": "DOUBLE", # 中单买入金额(万元) "sell_md_vol": "INTEGER", # 中单卖出量(手) "sell_md_amount": "DOUBLE", # 中单卖出金额(万元) "buy_lg_vol": "INTEGER", # 大单买入量(手) "buy_lg_amount": "DOUBLE", # 大单买入金额(万元) "sell_lg_vol": "INTEGER", # 大单卖出量(手) "sell_lg_amount": "DOUBLE", # 大单卖出金额(万元) "buy_elg_vol": "INTEGER", # 特大单买入量(手) "buy_elg_amount": "DOUBLE", # 特大单买入金额(万元) "sell_elg_vol": "INTEGER", # 特大单卖出量(手) "sell_elg_amount": "DOUBLE", # 特大单卖出金额(万元) "net_mf_vol": "INTEGER", # 净流入量(手) "net_mf_amount": "DOUBLE", # 净流入额(万元) } # 索引定义 TABLE_INDEXES = [ ("idx_moneyflow_date_code", ["trade_date", "ts_code"]), ] # 主键定义 PRIMARY_KEY = ("ts_code", "trade_date") def fetch_single_date(self, trade_date: str) -> pd.DataFrame: """获取单日所有股票的资金流向数据。 Args: trade_date: 交易日期(YYYYMMDD) Returns: 包含当日所有股票资金流向数据的 DataFrame """ return get_moneyflow(trade_date=trade_date, client=self.client) def sync_moneyflow( start_date: Optional[str] = None, end_date: Optional[str] = None, force_full: bool = False, ) -> pd.DataFrame: """同步个股资金流向数据到 DuckDB,支持智能增量同步。 逻辑: - 若表不存在:创建表 + 复合索引 (trade_date, ts_code) + 全量同步 - 若表存在:从 last_date + 1 开始增量同步 Args: start_date: 起始日期(YYYYMMDD 格式,默认全量从 20100101,增量从 last_date+1) end_date: 结束日期(YYYYMMDD 格式,默认为今天) force_full: 若为 True,强制从 20100101 完整重载 Returns: 包含同步数据的 pd.DataFrame Example: >>> # 首次同步(从 20100101 全量加载) >>> result = sync_moneyflow() >>> >>> # 后续同步(增量 - 仅新数据) >>> result = sync_moneyflow() >>> >>> # 强制完整重载 >>> result = sync_moneyflow(force_full=True) >>> >>> # 手动指定日期范围 >>> result = sync_moneyflow(start_date='20240101', end_date='20240131') """ sync_manager = MoneyflowSync() return sync_manager.sync_all( start_date=start_date, end_date=end_date, force_full=force_full, ) def preview_moneyflow_sync( start_date: Optional[str] = None, end_date: Optional[str] = None, force_full: bool = False, sample_size: int = 3, ) -> dict: """预览个股资金流向数据同步数据量和样本(不实际同步)。 Args: start_date: 手动指定起始日期(覆盖自动检测) end_date: 手动指定结束日期(默认为今天) force_full: 若为 True,预览全量同步(从 20100101) sample_size: 预览天数(默认: 3) Returns: 包含预览信息的字典 Example: >>> # 预览将要同步的内容 >>> preview = preview_moneyflow_sync() >>> >>> # 预览全量同步 >>> preview = preview_moneyflow_sync(force_full=True) """ sync_manager = MoneyflowSync() return sync_manager.preview_sync( start_date=start_date, end_date=end_date, force_full=force_full, sample_size=sample_size, )