From 84479ee9ffd52476d1fb8b4190d42b34e9439565 Mon Sep 17 00:00:00 2001 From: liaozhaorun <1300336796@qq.com> Date: Sun, 1 Mar 2026 01:24:39 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E5=B0=86=E8=A1=A8=E7=BB=93?= =?UTF-8?q?=E6=9E=84=E5=AE=9A=E4=B9=89=E4=BB=8E=20storage=20=E8=BF=81?= =?UTF-8?q?=E7=A7=BB=E5=88=B0=E5=90=84=20API=20=E6=96=87=E4=BB=B6=20-=20?= =?UTF-8?q?=E7=A7=BB=E9=99=A4=20storage.py=20=E9=9B=86=E4=B8=AD=E5=BC=8F?= =?UTF-8?q?=E5=BB=BA=E8=A1=A8=E9=80=BB=E8=BE=91=EF=BC=8C=E6=94=B9=E4=B8=BA?= =?UTF-8?q?=E5=90=84=20API=20=E6=96=87=E4=BB=B6=E8=87=AA=E7=AE=A1=E7=90=86?= =?UTF-8?q?=20-=20base=5Fsync.py=20=E6=96=B0=E5=A2=9E=20ensure=5Ftable=5Fe?= =?UTF-8?q?xists()=20=E5=92=8C=E8=A1=A8=E6=8E=A2=E6=B5=8B=E6=9C=BA?= =?UTF-8?q?=E5=88=B6=20-=20api=5Fdaily/api=5Fpro=5Fbar/api=5Fbak=5Fbasic?= =?UTF-8?q?=20=E6=B7=BB=E5=8A=A0=20TABLE=5FSCHEMA=20=E5=AE=9A=E4=B9=89=20-?= =?UTF-8?q?=20api=5Ffinancial=5Fsync=20=E6=B7=BB=E5=8A=A0=E5=AE=8C?= =?UTF-8?q?=E6=95=B4=E5=88=A9=E6=B6=A6=E8=A1=A8=E5=AD=97=E6=AE=B5=E5=AE=9A?= =?UTF-8?q?=E4=B9=89=20-=20sync.py=20=E6=9B=B4=E6=96=B0=E8=81=8C=E8=B4=A3?= =?UTF-8?q?=E6=96=87=E6=A1=A3=EF=BC=8C=E6=98=8E=E7=A1=AE=E4=BB=85=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E6=AF=8F=E6=97=A5=E6=9B=B4=E6=96=B0=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=20-=20AGENTS.md=20=E6=B7=BB=E5=8A=A0=20v2.1=20=E6=9E=B6?= =?UTF-8?q?=E6=9E=84=E5=8F=98=E6=9B=B4=E5=8E=86=E5=8F=B2=E5=92=8C=20AI=20?= =?UTF-8?q?=E8=A1=8C=E4=B8=BA=E5=87=86=E5=88=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AGENTS.md | 61 ++++- src/data/api_wrappers/api_bak_basic.py | 44 +++- src/data/api_wrappers/api_daily.py | 25 ++ src/data/api_wrappers/api_pro_bar.py | 34 +++ src/data/api_wrappers/base_sync.py | 242 ++++++++++++++++++ .../financial_data/api_financial_sync.py | 169 +++++++++--- src/data/storage.py | 172 ++----------- src/data/sync.py | 141 +++++++--- 8 files changed, 651 insertions(+), 237 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index bb85ce3..16420c1 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -296,6 +296,34 @@ uv run python -c "from src.data.sync import sync_all; sync_all(max_workers=20)" ## 架构变更历史 +### v2.1 (2026-02-28) - 同步模块规范更新 + +#### sync.py 职责划分 + **变更**: 明确 `sync.py` 只包含每日更新的数据同步 + **原因**: 区分高频(每日)和低频(季度/年度)数据,避免不必要的 API 调用 + **规范**: + - `sync.py` / `sync_all_data()`: **仅包含每日更新的数据** + - 日线数据 (`api_daily`) + - Pro Bar 数据 (`api_pro_bar`) + - 交易日历 (`api_trade_cal`) + - 股票基本信息 (`api_stock_basic`) + - 历史股票列表 (`api_bak_basic`) + + - **不应放入 `sync.py` 的季度/低频数据**: + - 财务数据 (`financial_data/` 目录): 利润表、资产负债表、现金流量表等 + - 名称变更 (`api_namechange`): 已移除自动同步,建议手动定期同步 + + - **季度数据同步方式**: + ```python + # 财务数据单独同步(不在 sync_all_data 中) + from src.data.api_wrappers.financial_data.api_financial_sync import sync_financial + sync_financial() # 增量同步利润表 + + # 名称变更手动同步 + from src.data.api_wrappers import sync_namechange + sync_namechange(force=True) + ``` + ### v2.0 (2026-02-23) - 重要更新 #### 存储层重构 @@ -325,7 +353,34 @@ uv run python -c "from src.data.sync import sync_all; sync_all(max_workers=20)" `get_today_date()`、`get_next_date()`、`DEFAULT_START_DATE` 等函数统一在 `src/data/utils.py` 中管理 其他模块应从 `utils.py` 导入这些函数,避免重复定义 -### v1.x (历史版本) - 初始版本,使用 HDF5 存储 - 数据同步逻辑集中在 `sync.py` +## AI 行为准则 + +### LSP 检测报错处理 + +**⚠️ 强制要求:当进行 LSP 检测时报错,必定是代码格式问题。** + +如果 LSP 检测报错,必须按照以下流程处理: + +1. **问题定位** + - 报错必定是由基础格式错误引起:缩进错误、引号括号不匹配、代码格式错误等 + - 必须读取对应的代码行,精确定位错误 + +2. **修复方式** + - ✅ **必须**:读取报错文件,检查具体代码行 + - ✅ **必须**:修复格式错误(缩进、括号匹配、引号闭合等) + - ❌ **禁止**:删除文件重新修改 + - ❌ **禁止**:自行 rollback 文件 + - ❌ **禁止**:新建文件重新修改 + - ❌ **禁止**:忽略错误继续执行 + +3. **验证要求** + - 修复后必须重新运行 LSP 检测确认无错误 + - 确保修改仅针对格式问题,不改变代码逻辑 + +**示例场景**: +``` +LSP 报错:Syntax error on line 45 +✅ 正确做法:读取文件第 45 行,发现少了一个右括号,添加后重新检测 +❌ 错误做法:删除文件重新写、或者忽略错误继续 +``` diff --git a/src/data/api_wrappers/api_bak_basic.py b/src/data/api_wrappers/api_bak_basic.py index f56e819..7f36729 100644 --- a/src/data/api_wrappers/api_bak_basic.py +++ b/src/data/api_wrappers/api_bak_basic.py @@ -89,15 +89,43 @@ class BakBasicSync(DateBasedSync): table_name = "bak_basic" default_start_date = "20160101" + # 表结构定义 + TABLE_SCHEMA = { + "trade_date": "DATE NOT NULL", + "ts_code": "VARCHAR(16) NOT NULL", + "name": "VARCHAR(50)", + "industry": "VARCHAR(50)", + "area": "VARCHAR(50)", + "pe": "DOUBLE", + "float_share": "DOUBLE", + "total_share": "DOUBLE", + "total_assets": "DOUBLE", + "liquid_assets": "DOUBLE", + "fixed_assets": "DOUBLE", + "reserved": "DOUBLE", + "reserved_pershare": "DOUBLE", + "eps": "DOUBLE", + "bvps": "DOUBLE", + "pb": "DOUBLE", + "list_date": "VARCHAR(8)", + "undp": "DOUBLE", + "per_undp": "DOUBLE", + "rev_yoy": "DOUBLE", + "profit_yoy": "DOUBLE", + "gpr": "DOUBLE", + "npr": "DOUBLE", + "holder_num": "DOUBLE", + } + + # 索引定义 + TABLE_INDEXES = [ + ("idx_bak_basic_date_code", ["trade_date", "ts_code"]), + ] + + # 主键定义 + PRIMARY_KEY = ("trade_date", "ts_code") + def fetch_single_date(self, trade_date: str) -> pd.DataFrame: - """获取单日的历史股票列表数据。 - - Args: - trade_date: 交易日期(YYYYMMDD) - - Returns: - 包含当日所有股票数据的 DataFrame - """ return get_bak_basic(trade_date=trade_date) diff --git a/src/data/api_wrappers/api_daily.py b/src/data/api_wrappers/api_daily.py index 2cbfebf..1810278 100644 --- a/src/data/api_wrappers/api_daily.py +++ b/src/data/api_wrappers/api_daily.py @@ -92,6 +92,31 @@ class DailySync(StockBasedSync): table_name = "daily" + # 表结构定义 + TABLE_SCHEMA = { + "ts_code": "VARCHAR(16) NOT NULL", + "trade_date": "DATE NOT NULL", + "open": "DOUBLE", + "high": "DOUBLE", + "low": "DOUBLE", + "close": "DOUBLE", + "pre_close": "DOUBLE", + "change": "DOUBLE", + "pct_chg": "DOUBLE", + "vol": "DOUBLE", + "amount": "DOUBLE", + "turnover_rate": "DOUBLE", + "volume_ratio": "DOUBLE", + } + + # 索引定义 + TABLE_INDEXES = [ + ("idx_daily_date_code", ["trade_date", "ts_code"]), + ] + + # 主键定义 + PRIMARY_KEY = ("ts_code", "trade_date") + def fetch_single_stock( self, ts_code: str, diff --git a/src/data/api_wrappers/api_pro_bar.py b/src/data/api_wrappers/api_pro_bar.py index 87fd142..dad2e55 100644 --- a/src/data/api_wrappers/api_pro_bar.py +++ b/src/data/api_wrappers/api_pro_bar.py @@ -123,6 +123,14 @@ def get_pro_bar( if "date" in data.columns: data = data.rename(columns={"date": "trade_date"}) + # Rename columns to match database schema + # Tushare API uses 'turnover_rate' and 'volume_ratio', but our DB uses 'tor' and 'vr' + column_mapping = { + "turnover_rate": "tor", + "volume_ratio": "vr", + } + data = data.rename(columns=column_mapping) + return data @@ -141,6 +149,32 @@ class ProBarSync(StockBasedSync): table_name = "pro_bar" + # 表结构定义 + TABLE_SCHEMA = { + "ts_code": "VARCHAR(16) NOT NULL", + "trade_date": "DATE NOT NULL", + "open": "DOUBLE", + "high": "DOUBLE", + "low": "DOUBLE", + "close": "DOUBLE", + "pre_close": "DOUBLE", + "change": "DOUBLE", + "pct_chg": "DOUBLE", + "vol": "DOUBLE", + "amount": "DOUBLE", + "tor": "DOUBLE", + "vr": "DOUBLE", + "adj_factor": "DOUBLE", + } + + # 索引定义 + TABLE_INDEXES = [ + ("idx_pro_bar_date_code", ["trade_date", "ts_code"]), + ] + + # 主键定义 + PRIMARY_KEY = ("ts_code", "trade_date") + def fetch_single_stock( self, ts_code: str, diff --git a/src/data/api_wrappers/base_sync.py b/src/data/api_wrappers/base_sync.py index 41a90ef..118c4ad 100644 --- a/src/data/api_wrappers/base_sync.py +++ b/src/data/api_wrappers/base_sync.py @@ -41,6 +41,7 @@ from src.data.api_wrappers.api_trade_cal import ( get_last_trading_day, sync_trade_cal_cache, ) + from src.data.api_wrappers.api_stock_basic import _get_csv_path, sync_all_stocks @@ -62,6 +63,18 @@ class BaseDataSync(ABC): table_name: str = "" # 子类必须覆盖 DEFAULT_START_DATE = "20180101" DEFAULT_MAX_WORKERS = get_settings().threads + + # 表结构定义(子类可覆盖) + # 格式: {"column_name": "SQL_TYPE", ...} + TABLE_SCHEMA: Dict[str, str] = {} + + # 索引定义(子类可覆盖) + # 格式: [("index_name", ["col1", "col2"]), ...] + TABLE_INDEXES: List[tuple] = [] + + # 主键定义(子类可覆盖) + # 格式: ("col1", "col2") + PRIMARY_KEY: tuple = () def __init__(self, max_workers: Optional[int] = None): """初始化同步管理器。 @@ -284,6 +297,143 @@ class BaseDataSync(ABC): else: return "partial" + def _probe_table_and_cleanup( + self, + probe_data: pd.DataFrame, + probe_description: str, + ) -> bool: + """探测表结构:插入样本数据、验证查询、清空表。 + + 该步骤仅在表不存在或为空时执行,用于: + 1. 验证表结构是否正确创建 + 2. 验证数据能否正常插入和查询 + 3. 确保正式同步前表处于干净状态 + + Args: + probe_data: 用于探测的样本数据 + probe_description: 探测描述(用于日志) + + Returns: + True if 探测成功,False otherwise + """ + class_name = self.__class__.__name__ + storage = Storage() + + if probe_data.empty: + print(f"[{class_name}] Probe skipped: no sample data available") + return False + + try: + print(f"[{class_name}] Probe: {probe_description}") + print(f"[{class_name}] Probe: Inserting {len(probe_data)} sample records...") + + # 插入样本数据 + storage.save(self.table_name, probe_data, mode="append") + + # 验证查询 + print(f"[{class_name}] Probe: Verifying data query...") + verification = storage._connection.execute( + f'SELECT * FROM "{self.table_name}" LIMIT 1' + ).fetchdf() + + if verification.empty: + print(f"[{class_name}] Probe: FAILED - Cannot query inserted data") + return False + + print(f"[{class_name}] Probe: Query verification OK") + # 清空表(truncate) + print(f"[{class_name}] Probe: Cleaning up sample data...") + storage._connection.execute(f'DELETE FROM "{self.table_name}"') + + # 验证表已清空 + count_result = storage._connection.execute( + f'SELECT COUNT(*) FROM "{self.table_name}"' + ).fetchone() + remaining = count_result[0] if count_result else -1 + + if remaining == 0: + print(f"[{class_name}] Probe: SUCCESS - Table verified and cleaned") + return True + else: + print(f"[{class_name}] Probe: WARNING - {remaining} rows remaining after cleanup") + return True # 仍然继续,因为主要目的是验证结构 + + except Exception as e: + print(f"[{class_name}] Probe: FAILED - {e}") + return False + + def _should_probe_table(self) -> bool: + """检查是否需要进行表探测。 + + 仅在以下情况返回 True: + - 表不存在 + - 表存在但为空 + + Returns: + True if 需要探测,False otherwise + """ + storage = Storage() + + # 检查表是否存在 + if not storage.exists(self.table_name): + return True + + # 检查表是否为空 + try: + count_result = storage._connection.execute( + f'SELECT COUNT(*) FROM "{self.table_name}"' + ).fetchone() + row_count = count_result[0] if count_result else 0 + return row_count == 0 + except Exception: + return True # 出错时保守处理,进行探测 + + def ensure_table_exists(self) -> None: + """确保表结构存在,如不存在则创建表和索引。 + + 根据类属性 TABLE_SCHEMA、TABLE_INDEXES、PRIMARY_KEY 创建表。 + 子类可以覆盖此方法以自定义建表逻辑。 + """ + storage = Storage() + + if storage.exists(self.table_name): + return + + if not self.TABLE_SCHEMA: + print(f"[{self.__class__.__name__}] TABLE_SCHEMA not defined, skipping table creation") + return + + # 构建列定义 + columns_def = [] + for col_name, col_type in self.TABLE_SCHEMA.items(): + columns_def.append(f'"{col_name}" {col_type}') + + # 添加主键约束 + if self.PRIMARY_KEY: + pk_cols = ', '.join(f'"{col}"' for col in self.PRIMARY_KEY) + columns_def.append(f"PRIMARY KEY ({pk_cols})") + + columns_sql = ", ".join(columns_def) + create_sql = f'CREATE TABLE IF NOT EXISTS "{self.table_name}" ({columns_sql})' + + try: + storage._connection.execute(create_sql) + print(f"[{self.__class__.__name__}] Created table '{self.table_name}'") + except Exception as e: + print(f"[{self.__class__.__name__}] Error creating table: {e}") + raise + + # 创建索引 + for idx_name, idx_cols in self.TABLE_INDEXES: + try: + idx_cols_sql = ', '.join(f'"{col}"' for col in idx_cols) + storage._connection.execute( + f'CREATE INDEX IF NOT EXISTS "{idx_name}" ON "{self.table_name}"({idx_cols_sql})' + ) + print(f"[{self.__class__.__name__}] Created index '{idx_name}' on {idx_cols}") + except Exception as e: + print(f"[{self.__class__.__name__}] Error creating index {idx_name}: {e}") + @abstractmethod def preview_sync( self, @@ -711,6 +861,40 @@ class StockBasedSync(BaseDataSync): print(f"[{class_name}] No stocks found to sync") return {} + # 首次同步探测:验证表结构是否正常 + if self._should_probe_table(): + print(f"[{class_name}] Table '{self.table_name}' is empty or doesn't exist, probing...") + # 使用第一只股票的完整日期范围数据进行探测 + probe_stock = stock_codes[0] + probe_data = self.fetch_single_stock( + probe_stock, sync_start_date, end_date + ) + probe_desc = f"stock={probe_stock}, range={sync_start_date} to {end_date}" + probe_success = self._probe_table_and_cleanup(probe_data, probe_desc) + + if not probe_success: + print(f"[{class_name}] Probe failed! Stopping sync to prevent data corruption.") + raise RuntimeError( + f"Table '{self.table_name}' probe failed. " + "Please check database schema and column mappings." + ) + if self._should_probe_table(): + print(f"[{class_name}] Table '{self.table_name}' is empty or doesn't exist, probing...") + # 使用第一只股票的完整日期范围数据进行探测 + probe_stock = stock_codes[0] + probe_data = self.fetch_single_stock( + probe_stock, sync_start_date, end_date + ) + probe_desc = f"stock={probe_stock}, range={sync_start_date} to {end_date}" + self._probe_table_and_cleanup(probe_data, probe_desc) + + print(f"[{class_name}] Total stocks to sync: {len(stock_codes)}") + print(f"[{class_name}] Using {max_workers or self.max_workers} worker threads") + stock_codes = self.get_all_stock_codes() + if not stock_codes: + print(f"[{class_name}] No stocks found to sync") + return {} + print(f"[{class_name}] Total stocks to sync: {len(stock_codes)}") print(f"[{class_name}] Using {max_workers or self.max_workers} worker threads") @@ -1117,6 +1301,64 @@ class DateBasedSync(BaseDataSync): else: print(f"[{class_name}] Cannot create table: no sample data available") return pd.DataFrame() + + # 首次同步探测:验证表结构是否正常 + if self._should_probe_table(): + print(f"[{class_name}] Table '{self.table_name}' is empty, probing...") + # 使用最近一个交易日的完整数据进行探测 + probe_date = get_last_trading_day(sync_start, sync_end) + if probe_date: + probe_data = self.fetch_single_date(probe_date) + probe_desc = f"date={probe_date}, all stocks" + self._probe_table_and_cleanup(probe_data, probe_desc) + + # 执行同步 + combined = self._run_date_range_sync(sync_start, sync_end, dry_run) + if self._should_probe_table(): + print(f"[{class_name}] Table '{self.table_name}' is empty, probing...") + # 使用最近一个交易日的完整数据进行探测 + probe_date = get_last_trading_day(sync_start, sync_end) + if probe_date: + probe_data = self.fetch_single_date(probe_date) + probe_desc = f"date={probe_date}, all stocks" + self._probe_table_and_cleanup(probe_data, probe_desc) + + # 执行同步 + if self._should_probe_table(): + print(f"[{class_name}] Table '{self.table_name}' is empty, probing...") + # 使用最近一个交易日的完整数据进行探测 + probe_date = get_last_trading_day(sync_start, sync_end) + if probe_date: + probe_data = self.fetch_single_date(probe_date) + probe_desc = f"date={probe_date}, all stocks" + self._probe_table_and_cleanup(probe_data, probe_desc) + if self._should_probe_table(): + print(f"[{class_name}] Table '{self.table_name}' is empty, probing...") + # 使用最近一个交易日的完整数据进行探测 + from src.data.api_wrappers.api_trade_cal import get_last_n_trading_days + last_days = get_last_n_trading_days(1, sync_end) + if last_days: + probe_date = last_days[0] + probe_data = self.fetch_single_date(probe_date) + probe_desc = f"date={probe_date}, all stocks" + self._probe_table_and_cleanup(probe_data, probe_desc) + + # 执行同步 + storage = Storage() + if not storage.exists(self.table_name): + print( + f"[{class_name}] Table '{self.table_name}' doesn't exist, creating..." + ) + # 获取样本数据以推断 schema + sample = self.fetch_single_date(sync_end) + if sample.empty: + # 尝试另一个日期 + sample = self.fetch_single_date("20240102") + if not sample.empty: + self._ensure_table_schema(sample) + else: + print(f"[{class_name}] Cannot create table: no sample data available") + return pd.DataFrame() # 执行同步 combined = self._run_date_range_sync(sync_start, sync_end, dry_run) diff --git a/src/data/api_wrappers/financial_data/api_financial_sync.py b/src/data/api_wrappers/financial_data/api_financial_sync.py index a85ad4a..a67305e 100644 --- a/src/data/api_wrappers/financial_data/api_financial_sync.py +++ b/src/data/api_wrappers/financial_data/api_financial_sync.py @@ -100,6 +100,112 @@ class FinancialSync: >>> sync.sync_income() # 只同步利润表 """ + # 表结构定义(按表名) + TABLE_SCHEMAS = { + "financial_income": { + "columns": { + "ts_code": "VARCHAR(16) NOT NULL", + "ann_date": "DATE", + "f_ann_date": "DATE", + "end_date": "DATE NOT NULL", + "report_type": "INTEGER", + "comp_type": "INTEGER", + "end_type": "VARCHAR(10)", + "basic_eps": "DOUBLE", + "diluted_eps": "DOUBLE", + "total_revenue": "DOUBLE", + "revenue": "DOUBLE", + "int_income": "DOUBLE", + "prem_earned": "DOUBLE", + "comm_income": "DOUBLE", + "n_commis_income": "DOUBLE", + "n_oth_income": "DOUBLE", + "n_oth_b_income": "DOUBLE", + "prem_income": "DOUBLE", + "out_prem": "DOUBLE", + "une_prem_reser": "DOUBLE", + "reins_income": "DOUBLE", + "n_sec_tb_income": "DOUBLE", + "n_sec_uw_income": "DOUBLE", + "n_asset_mg_income": "DOUBLE", + "oth_b_income": "DOUBLE", + "fv_value_chg_gain": "DOUBLE", + "invest_income": "DOUBLE", + "ass_invest_income": "DOUBLE", + "forex_gain": "DOUBLE", + "total_cogs": "DOUBLE", + "oper_cost": "DOUBLE", + "int_exp": "DOUBLE", + "comm_exp": "DOUBLE", + "biz_tax_surchg": "DOUBLE", + "sell_exp": "DOUBLE", + "admin_exp": "DOUBLE", + "fin_exp": "DOUBLE", + "assets_impair_loss": "DOUBLE", + "prem_refund": "DOUBLE", + "compens_payout": "DOUBLE", + "reser_insur_liab": "DOUBLE", + "div_payt": "DOUBLE", + "reins_exp": "DOUBLE", + "oper_exp": "DOUBLE", + "compens_payout_refu": "DOUBLE", + "insur_reser_refu": "DOUBLE", + "reins_cost_refund": "DOUBLE", + "other_bus_cost": "DOUBLE", + "operate_profit": "DOUBLE", + "non_oper_income": "DOUBLE", + "non_oper_exp": "DOUBLE", + "nca_disploss": "DOUBLE", + "total_profit": "DOUBLE", + "income_tax": "DOUBLE", + "n_income": "DOUBLE", + "n_income_attr_p": "DOUBLE", + "minority_gain": "DOUBLE", + "oth_compr_income": "DOUBLE", + "t_compr_income": "DOUBLE", + "compr_inc_attr_p": "DOUBLE", + "compr_inc_attr_m_s": "DOUBLE", + "ebit": "DOUBLE", + "ebitda": "DOUBLE", + "insurance_exp": "DOUBLE", + "undist_profit": "DOUBLE", + "distable_profit": "DOUBLE", + "rd_exp": "DOUBLE", + "fin_exp_int_exp": "DOUBLE", + "fin_exp_int_inc": "DOUBLE", + "transfer_surplus_rese": "DOUBLE", + "transfer_housing_imprest": "DOUBLE", + "transfer_oth": "DOUBLE", + "adj_lossgain": "DOUBLE", + "withdra_legal_surplus": "DOUBLE", + "withdra_legal_pubfund": "DOUBLE", + "withdra_biz_devfund": "DOUBLE", + "withdra_rese_fund": "DOUBLE", + "withdra_oth_ersu": "DOUBLE", + "workers_welfare": "DOUBLE", + "distr_profit_shrhder": "DOUBLE", + "prfshare_payable_dvd": "DOUBLE", + "comshare_payable_dvd": "DOUBLE", + "capit_comstock_div": "DOUBLE", + "net_after_nr_lp_correct": "DOUBLE", + "credit_impa_loss": "DOUBLE", + "net_expo_hedging_benefits": "DOUBLE", + "oth_impair_loss_assets": "DOUBLE", + "total_opcost": "DOUBLE", + "amodcost_fin_assets": "DOUBLE", + "oth_income": "DOUBLE", + "asset_disp_income": "DOUBLE", + "continued_net_profit": "DOUBLE", + "end_net_profit": "DOUBLE", + "update_flag": "VARCHAR(1)", + }, + "primary_key": ("ts_code", "end_date"), + "indexes": [ + ("idx_financial_ann", ["ts_code", "ann_date"]), + ], + }, + } + def __init__(self): """初始化同步管理器""" self.storage = Storage() @@ -115,40 +221,43 @@ class FinancialSync: print(f"[FinancialSync] 表 {table_name} 已存在,跳过建表") return + if table_name not in self.TABLE_SCHEMAS: + print(f"[FinancialSync] 表 {table_name} 没有定义表结构,跳过建表") + return + + schema = self.TABLE_SCHEMAS[table_name] print(f"[FinancialSync] 表 {table_name} 不存在,创建表和索引...") - # 根据表名创建不同的表结构 - if table_name == "financial_income": - self.storage._connection.execute(f""" - CREATE TABLE IF NOT EXISTS {table_name} ( - ts_code VARCHAR(16) NOT NULL, - ann_date DATE, - f_ann_date DATE, - end_date DATE NOT NULL, - report_type INTEGER, - comp_type INTEGER, - basic_eps DOUBLE, - diluted_eps DOUBLE, - PRIMARY KEY (ts_code, end_date) - ) - """) - # 创建索引 - self.storage._connection.execute(f""" - CREATE INDEX IF NOT EXISTS idx_financial_ann - ON {table_name}(ts_code, ann_date) - """) - else: - # 默认表结构 - self.storage._connection.execute(f""" - CREATE TABLE IF NOT EXISTS {table_name} ( - ts_code VARCHAR(16) NOT NULL, - end_date DATE NOT NULL, - PRIMARY KEY (ts_code, end_date) - ) - """) + # 构建列定义 + columns_def = [] + for col_name, col_type in schema["columns"].items(): + columns_def.append(f'"{col_name}" {col_type}') - print(f"[FinancialSync] 表 {table_name} 创建完成") + # 添加主键约束 + if schema.get("primary_key"): + pk_cols = ', '.join(f'"{col}"' for col in schema["primary_key"]) + columns_def.append(f"PRIMARY KEY ({pk_cols})") + columns_sql = ", ".join(columns_def) + create_sql = f'CREATE TABLE IF NOT EXISTS "{table_name}" ({columns_sql})' + + try: + self.storage._connection.execute(create_sql) + print(f"[FinancialSync] 表 {table_name} 创建完成") + except Exception as e: + print(f"[FinancialSync] 创建表 {table_name} 失败: {e}") + raise + + # 创建索引 + for idx_name, idx_cols in schema.get("indexes", []): + try: + idx_cols_sql = ', '.join(f'"{col}"' for col in idx_cols) + self.storage._connection.execute( + f'CREATE INDEX IF NOT EXISTS "{idx_name}" ON "{table_name}"({idx_cols_sql})' + ) + print(f"[FinancialSync] 索引 {idx_name} 创建完成") + except Exception as e: + print(f"[FinancialSync] 创建索引 {idx_name} 失败: {e}") def _get_latest_quarter( self, table_name: str, period_field: str = "end_date" ) -> Optional[str]: diff --git a/src/data/storage.py b/src/data/storage.py index 07dcd66..5a7b00c 100644 --- a/src/data/storage.py +++ b/src/data/storage.py @@ -62,164 +62,17 @@ class Storage: self._initialized = True def _init_db(self): - """Initialize database connection and schema.""" + """Initialize database connection and schema. + + 注意:建表语句已迁移到对应的 API 文件中, + 每个同步类负责自己的表结构定义和创建。 + 参见: + - api_daily.py: DailySync.TABLE_SCHEMA + - api_pro_bar.py: ProBarSync.TABLE_SCHEMA + - api_bak_basic.py: BakBasicSync.TABLE_SCHEMA + - api_financial_sync.py: FinancialSync.TABLE_SCHEMAS + """ self._connection = duckdb.connect(str(self.db_path)) - - # Create tables with schema validation - self._connection.execute(""" - CREATE TABLE IF NOT EXISTS daily ( - ts_code VARCHAR(16) NOT NULL, - trade_date DATE NOT NULL, - open DOUBLE, - high DOUBLE, - low DOUBLE, - close DOUBLE, - pre_close DOUBLE, - change DOUBLE, - pct_chg DOUBLE, - vol DOUBLE, - amount DOUBLE, - turnover_rate DOUBLE, - volume_ratio DOUBLE, - PRIMARY KEY (ts_code, trade_date) - ) - """) - - # Create composite index for query optimization (trade_date, ts_code) - self._connection.execute(""" - CREATE INDEX IF NOT EXISTS idx_daily_date_code ON daily(trade_date, ts_code) - """) - - # Create financial_income table for income statement data - # 完整的利润表字段(94列全部) - self._connection.execute(""" - CREATE TABLE IF NOT EXISTS financial_income ( - ts_code VARCHAR(16) NOT NULL, - ann_date DATE, - f_ann_date DATE, - end_date DATE NOT NULL, - report_type INTEGER, - comp_type INTEGER, - end_type VARCHAR(10), - basic_eps DOUBLE, - diluted_eps DOUBLE, - total_revenue DOUBLE, - revenue DOUBLE, - int_income DOUBLE, - prem_earned DOUBLE, - comm_income DOUBLE, - n_commis_income DOUBLE, - n_oth_income DOUBLE, - n_oth_b_income DOUBLE, - prem_income DOUBLE, - out_prem DOUBLE, - une_prem_reser DOUBLE, - reins_income DOUBLE, - n_sec_tb_income DOUBLE, - n_sec_uw_income DOUBLE, - n_asset_mg_income DOUBLE, - oth_b_income DOUBLE, - fv_value_chg_gain DOUBLE, - invest_income DOUBLE, - ass_invest_income DOUBLE, - forex_gain DOUBLE, - total_cogs DOUBLE, - oper_cost DOUBLE, - int_exp DOUBLE, - comm_exp DOUBLE, - biz_tax_surchg DOUBLE, - sell_exp DOUBLE, - admin_exp DOUBLE, - fin_exp DOUBLE, - assets_impair_loss DOUBLE, - prem_refund DOUBLE, - compens_payout DOUBLE, - reser_insur_liab DOUBLE, - div_payt DOUBLE, - reins_exp DOUBLE, - oper_exp DOUBLE, - compens_payout_refu DOUBLE, - insur_reser_refu DOUBLE, - reins_cost_refund DOUBLE, - other_bus_cost DOUBLE, - operate_profit DOUBLE, - non_oper_income DOUBLE, - non_oper_exp DOUBLE, - nca_disploss DOUBLE, - total_profit DOUBLE, - income_tax DOUBLE, - n_income DOUBLE, - n_income_attr_p DOUBLE, - minority_gain DOUBLE, - oth_compr_income DOUBLE, - t_compr_income DOUBLE, - compr_inc_attr_p DOUBLE, - compr_inc_attr_m_s DOUBLE, - ebit DOUBLE, - ebitda DOUBLE, - insurance_exp DOUBLE, - undist_profit DOUBLE, - distable_profit DOUBLE, - rd_exp DOUBLE, - fin_exp_int_exp DOUBLE, - fin_exp_int_inc DOUBLE, - transfer_surplus_rese DOUBLE, - transfer_housing_imprest DOUBLE, - transfer_oth DOUBLE, - adj_lossgain DOUBLE, - withdra_legal_surplus DOUBLE, - withdra_legal_pubfund DOUBLE, - withdra_biz_devfund DOUBLE, - withdra_rese_fund DOUBLE, - withdra_oth_ersu DOUBLE, - workers_welfare DOUBLE, - distr_profit_shrhder DOUBLE, - prfshare_payable_dvd DOUBLE, - comshare_payable_dvd DOUBLE, - capit_comstock_div DOUBLE, - net_after_nr_lp_correct DOUBLE, - credit_impa_loss DOUBLE, - net_expo_hedging_benefits DOUBLE, - oth_impair_loss_assets DOUBLE, - total_opcost DOUBLE, - amodcost_fin_assets DOUBLE, - oth_income DOUBLE, - asset_disp_income DOUBLE, - continued_net_profit DOUBLE, - end_net_profit DOUBLE, - update_flag VARCHAR(1), - PRIMARY KEY (ts_code, end_date) - update_flag VARCHAR(1), - PRIMARY KEY (ts_code, end_date) - ) - """) - - # Create pro_bar table for pro bar data (with adj, tor, vr) - self._connection.execute(""" - CREATE TABLE IF NOT EXISTS pro_bar ( - ts_code VARCHAR(16) NOT NULL, - trade_date DATE NOT NULL, - open DOUBLE, - high DOUBLE, - low DOUBLE, - close DOUBLE, - pre_close DOUBLE, - change DOUBLE, - pct_chg DOUBLE, - vol DOUBLE, - amount DOUBLE, - tor DOUBLE, - vr DOUBLE, - adj_factor DOUBLE, - PRIMARY KEY (ts_code, trade_date) - ) - """) - - # Create index for financial_income - self._connection.execute(""" - CREATE INDEX IF NOT EXISTS idx_financial_ann ON financial_income(ts_code, ann_date) - """) - def save(self, name: str, data: pd.DataFrame, mode: str = "append") -> dict: """Save data to DuckDB. @@ -271,6 +124,11 @@ class Storage: self._connection.execute(f"DELETE FROM {name}") # UPSERT: INSERT OR REPLACE + columns = ', '.join(f'"{col}"' for col in data.columns) + self._connection.execute(f""" + INSERT OR REPLACE INTO {name} ({columns}) + SELECT {columns} FROM temp_data + """) columns = ", ".join(data.columns) self._connection.execute(f""" INSERT OR REPLACE INTO {name} ({columns}) diff --git a/src/data/sync.py b/src/data/sync.py index e4ba33b..bf83b28 100644 --- a/src/data/sync.py +++ b/src/data/sync.py @@ -1,22 +1,34 @@ """数据同步调度中心模块。 该模块作为数据同步的调度中心,统一管理各类型数据的同步流程。 -具体的同步逻辑已迁移到对应的 api_xxx.py 文件中: -- api_daily.py: 日线数据同步 (DailySync 类) -- api_bak_basic.py: 历史股票列表同步 (BakBasicSync 类) -- api_pro_bar.py: Pro Bar 数据同步 (ProBarSync 类) -- api_stock_basic.py: 股票基本信息同步 -- api_trade_cal.py: 交易日历同步 -注意:名称变更 (namechange) 已从自动同步中移除, -因为股票名称变更不频繁,建议手动定期同步。 +【重要规范 - sync.py 职责范围】 +本模块**仅包含每日更新的数据接口**,季度/低频数据不应放入此文件: + +✅ 本模块包含的同步逻辑(每日更新): + - api_daily.py: 日线数据同步 (DailySync 类) + - api_bak_basic.py: 历史股票列表同步 (BakBasicSync 类) + - api_pro_bar.py: Pro Bar 数据同步 (ProBarSync 类) + - api_stock_basic.py: 股票基本信息同步 + - api_trade_cal.py: 交易日历同步 + +❌ 不应包含的同步逻辑(季度/低频更新): + - financial_data/: 财务数据(利润表、资产负债表、现金流量表等) + 使用方式: + from src.data.api_wrappers.financial_data.api_financial_sync import sync_financial + sync_financial() + + - api_namechange.py: 股票名称变更(不频繁) + 使用方式: + from src.data.api_wrappers import sync_namechange + sync_namechange(force=True) 使用方式: # 预览同步(检查数据量,不写入) from src.data.sync import preview_sync preview = preview_sync() - # 同步所有数据(不包括 namechange) + # 同步所有每日更新数据(不包括财务数据、namechange) from src.data.sync import sync_all_data result = sync_all_data() @@ -24,7 +36,7 @@ result = sync_all_data(force_full=True) """ -from typing import Optional, Dict +from typing import Optional, Dict, Union, Any import pandas as pd @@ -40,7 +52,7 @@ def preview_sync( end_date: Optional[str] = None, sample_size: int = 3, max_workers: Optional[int] = None, -) -> dict: +) -> dict[str, Any]: """预览日线同步数据量和样本(不实际同步)。 这是推荐的方式,可在实际同步前检查将要同步的内容。 @@ -88,7 +100,7 @@ def sync_all( end_date: Optional[str] = None, max_workers: Optional[int] = None, dry_run: bool = False, -) -> Dict[str, pd.DataFrame]: +) -> dict[str, pd.DataFrame]: """同步所有股票的日线数据。 这是日线数据同步的主要入口点。 @@ -135,16 +147,26 @@ def sync_all_data( force_full: bool = False, max_workers: Optional[int] = None, dry_run: bool = False, -) -> Dict[str, pd.DataFrame]: - """同步所有数据类型(每日同步)。 +) -> dict[str, Any]: + """同步所有每日更新的数据类型。 - 该函数按顺序同步所有可用的数据类型: + 【重要】本函数仅同步每日更新的数据,不包含季度/低频数据。 + + 该函数按顺序同步以下每日更新的数据类型: 1. 交易日历 (sync_trade_cal_cache) 2. 股票基本信息 (sync_all_stocks) - 3. Pro Bar 数据 (sync_pro_bar) - 4. 历史股票列表 (sync_bak_basic) + 3. 日线数据 (sync_daily) + 4. Pro Bar 数据 (sync_pro_bar) + 5. 历史股票列表 (sync_bak_basic) - 注意:名称变更 (namechange) 不在自动同步中,如需同步请手动调用。 + 【不包含的同步(需单独调用)】 + - 财务数据: 利润表、资产负债表、现金流量表(季度更新) + 使用: from src.data.api_wrappers.financial_data.api_financial_sync import sync_financial + 调用: sync_financial() + + - 名称变更 (namechange): 股票曾用名(低频更新) + 使用: from src.data.api_wrappers import sync_namechange + 调用: sync_namechange(force=True) Args: force_full: 若为 True,强制所有数据类型完整重载 @@ -163,68 +185,109 @@ def sync_all_data( >>> # Dry run >>> result = sync_all_data(dry_run=True) """ - results: Dict[str, pd.DataFrame] = {} + results: dict[str, Any] = {} print("\n" + "=" * 60) print("[sync_all_data] Starting full data synchronization...") print("=" * 60) # 1. Sync trade calendar (always needed first) - print("\n[1/4] Syncing trade calendar cache...") + print("\n[1/5] Syncing trade calendar cache...") try: from src.data.api_wrappers import sync_trade_cal_cache sync_trade_cal_cache() results["trade_cal"] = pd.DataFrame() - print("[1/4] Trade calendar: OK") + print("[1/5] Trade calendar: OK") except Exception as e: - print(f"[1/4] Trade calendar: FAILED - {e}") + print(f"[1/5] Trade calendar: FAILED - {e}") results["trade_cal"] = pd.DataFrame() # 2. Sync stock basic info - print("\n[2/4] Syncing stock basic info...") + print("\n[2/5] Syncing stock basic info...") try: sync_all_stocks() results["stock_basic"] = pd.DataFrame() - print("[2/4] Stock basic: OK") + print("[2/5] Stock basic: OK") except Exception as e: - print(f"[2/4] Stock basic: FAILED - {e}") + print(f"[2/5] Stock basic: FAILED - {e}") results["stock_basic"] = pd.DataFrame() - # 3. Sync Pro Bar data - print("\n[3/4] Syncing Pro Bar data (with adj, tor, vr)...") + # 3. Sync daily market data + print("\n[3/5] Syncing daily market data...") try: + # 确保表存在 + from src.data.api_wrappers.api_daily import DailySync + + DailySync().ensure_table_exists() + + daily_result = sync_daily( + force_full=force_full, + max_workers=max_workers, + dry_run=dry_run, + ) + results["daily"] = daily_result + total_daily_records = ( + sum(len(df) for df in daily_result.values()) if daily_result else 0 + ) + print( + f"[3/5] Daily data: OK ({total_daily_records} records from {len(daily_result)} stocks)" + ) + except Exception as e: + print(f"[3/5] Daily data: FAILED - {e}") + results["daily"] = pd.DataFrame() + + # 4. Sync Pro Bar data + print("\n[4/5] Syncing Pro Bar data (with adj, tor, vr)...") + try: + # 确保表存在 + from src.data.api_wrappers.api_pro_bar import ProBarSync + + ProBarSync().ensure_table_exists() + pro_bar_result = sync_pro_bar( force_full=force_full, max_workers=max_workers, dry_run=dry_run, ) - results["pro_bar"] = ( - pd.concat(pro_bar_result.values(), ignore_index=True) - if pro_bar_result - else pd.DataFrame() + results["pro_bar"] = pro_bar_result + total_pro_bar_records = ( + sum(len(df) for df in pro_bar_result.values()) if pro_bar_result else 0 + ) + print( + f"[4/5] Pro Bar data: OK ({total_pro_bar_records} records from {len(pro_bar_result)} stocks)" ) - print(f"[3/4] Pro Bar data: OK ({len(results['pro_bar'])} records)") except Exception as e: - print(f"[3/4] Pro Bar data: FAILED - {e}") + print(f"[4/5] Pro Bar data: FAILED - {e}") results["pro_bar"] = pd.DataFrame() - # 4. Sync stock historical list (bak_basic) - print("\n[4/4] Syncing stock historical list (bak_basic)...") + # 5. Sync stock historical list (bak_basic) + print("\n[5/5] Syncing stock historical list (bak_basic)...") try: + # 确保表存在 + from src.data.api_wrappers.api_bak_basic import BakBasicSync + + BakBasicSync().ensure_table_exists() + bak_basic_result = sync_bak_basic(force_full=force_full) results["bak_basic"] = bak_basic_result - print(f"[4/4] Bak basic: OK ({len(bak_basic_result)} records)") + print(f"[5/5] Bak basic: OK ({len(bak_basic_result)} records)") except Exception as e: - print(f"[4/4] Bak basic: FAILED - {e}") + print(f"[5/5] Bak basic: FAILED - {e}") results["bak_basic"] = pd.DataFrame() # Summary print("\n" + "=" * 60) print("[sync_all_data] Sync Summary") print("=" * 60) - for data_type, df in results.items(): - print(f" {data_type}: {len(df)} records") + for data_type, data in results.items(): + if isinstance(data, dict): + # 日线和 Pro Bar 返回的是 dict[str, DataFrame] + total_records = sum(len(df) for df in data.values()) + print(f" {data_type}: {len(data)} stocks, {total_records} total records") + else: + # bak_basic 返回的是 DataFrame + print(f" {data_type}: {len(data)} records") print("=" * 60) print("\nNote: namechange is NOT in auto-sync. To sync manually:") print(" from src.data.api_wrappers import sync_namechange")