fix(api_pro_bar): 使用 Tushare 原始字段名

删除 turnover_rate/volume_ratio 到 tor/vr 的不必要重命名,
直接使用 Tushare API 返回的原始字段名。
This commit is contained in:
2026-03-02 01:05:15 +08:00
parent b461a4940d
commit e8158a8d59
3 changed files with 406 additions and 18 deletions

View File

@@ -167,11 +167,57 @@ if "date" in data.columns:
- 格式:`{code}.{exchange}`,如 `000001.SZ``600000.SH`
- 确保返回的 DataFrame 包含 `ts_code`
### 4.5 令牌桶限速要求
### 4.5 字段名规范(重要)
**必须使用 Tushare API 返回的原始字段名,禁止进行不必要的重命名。**
这是为了确保:
- 代码可读性:使用 API 文档中的标准字段名
- 维护简单性:避免因字段名映射导致的混淆和错误
- 数据一致性:数据库字段名与 API 返回字段名保持一致
**禁止做法**(以下代码是不允许的):
```python
# 错误:将 Tushare 的原始字段名改为自定义名称
column_mapping = {
"turnover_rate": "tor", # 不要这样做
"volume_ratio": "vr", # 不要这样做
}
data = data.rename(columns=column_mapping)
```
**正确做法**(直接使用原始字段名):
```python
# 正确:保留 Tushare 返回的原始字段名
# Tushare 返回 'turnover_rate',就直接使用 'turnover_rate'
# Tushare 返回 'volume_ratio',就直接使用 'volume_ratio'
# 表结构定义应使用原始字段名
TABLE_SCHEMA = {
"ts_code": "VARCHAR(16) NOT NULL",
"trade_date": "DATE NOT NULL",
"turnover_rate": "DOUBLE", # 使用原始字段名
"volume_ratio": "DOUBLE", # 使用原始字段名
# ... 其他字段
}
```
**例外情况**(允许重命名):
- 日期字段:如果 API 返回 `date`,应重命名为 `trade_date` 以符合项目规范
- 必须重命名的情况:如果两个不同 API 返回相同含义但不同名称的字段,需要统一命名
**教训**(真实案例):
`api_pro_bar.py` 早期版本将 `turnover_rate` 重命名为 `tor``volume_ratio` 重命名为 `vr`
导致:
1. 代码与 Tushare 文档不一致,增加学习成本
2. 数据库字段名与 API 字段名不一致,造成混淆
3. 需要额外的数据迁移脚本修复历史数据
### 4.6 令牌桶限速要求
所有 API 调用必须通过 `TushareClient`,自动满足令牌桶限速要求。
#### 4.5.1 基本用法(单线程场景)
#### 4.6.1 基本用法(单线程场景)
```python
from src.data.client import TushareClient
@@ -198,7 +244,7 @@ def get_{data_type}(...) -> pd.DataFrame:
- API 重试逻辑(指数退避)
- 配置加载
#### 4.5.2 多线程/并发场景(重要)
#### 4.6.2 多线程/并发场景(重要)
**问题**: 多线程并发调用时,如果每个线程创建独立的 `TushareClient` 实例,每个实例会有独立的限流器,导致实际并发请求数 = 线程数 × 单个限流器速率,**限流失效**。

View File

@@ -2,8 +2,8 @@
Fetch A-share stock market data with adjustment factors from Tushare.
This interface provides backward-adjusted (后复权) daily market data
including all available fields: base price data, turnover rate (tor),
volume ratio (vr), and adjustment factors.
including all available fields: base price data, turnover rate (turnover_rate),
volume ratio (volume_ratio), and adjustment factors.
"""
import pandas as pd
@@ -61,8 +61,8 @@ def get_pro_bar(
- pct_chg: Price change percentage
- vol: Trading volume (lots)
- amount: Trading amount (thousand CNY)
- tor: Turnover rate (if factors includes 'tor')
- vr: Volume ratio (if factors includes 'vr')
- turnover_rate: Turnover rate (if factors includes 'tor')
- volume_ratio: Volume ratio (if factors includes 'vr')
- adj_factor: Adjustment factor (if adjfactor=True)
- ma_X: Moving average price for period X (if ma specified)
- ma_v_X: Moving average volume for period X (if ma specified)
@@ -123,14 +123,6 @@ def get_pro_bar(
if "date" in data.columns:
data = data.rename(columns={"date": "trade_date"})
# Rename columns to match database schema
# Tushare API uses 'turnover_rate' and 'volume_ratio', but our DB uses 'tor' and 'vr'
column_mapping = {
"turnover_rate": "tor",
"volume_ratio": "vr",
}
data = data.rename(columns=column_mapping)
return data
@@ -138,7 +130,7 @@ class ProBarSync(StockBasedSync):
"""Pro Bar 数据批量同步管理器,支持全量/增量同步。
继承自 StockBasedSync使用多线程按股票并发获取数据。
默认获取全部数据列tor, vr, adj_factor
默认获取全部数据列turnover_rate, volume_ratio, adj_factor
Example:
>>> sync = ProBarSync()
@@ -162,8 +154,8 @@ class ProBarSync(StockBasedSync):
"pct_chg": "DOUBLE",
"vol": "DOUBLE",
"amount": "DOUBLE",
"tor": "DOUBLE",
"vr": "DOUBLE",
"turnover_rate": "DOUBLE",
"volume_ratio": "DOUBLE",
"adj_factor": "DOUBLE",
}

View File

@@ -0,0 +1,350 @@
"""601117.SH 因子计算测试 - 使用真实数据
测试目标:计算中国化学(601117.SH)在2024-2025年的以下因子
1. return_5: 5日收益率 (close / ts_delay(close, 5) - 1)
2. return_5_rank: 5日收益率在截面上的排名
3. ma5: 5日均线 (ts_mean(close, 5))
4. ma10: 10日均线 (ts_mean(close, 10))
数据源: DuckDB 数据库中的真实日线数据
"""
from src.factors import FactorEngine
from src.factors.api import close, ts_mean, ts_delay, cs_rank
from src.factors.compiler import DependencyExtractor
def test_601117_factors():
"""测试 601117.SH 的因子计算。"""
print("=" * 80)
print("601117.SH (中国化学) 因子计算测试 - 2024-2025")
print("=" * 80)
# =========================================================================
# 1. 定义因子表达式
# =========================================================================
print("\n" + "=" * 80)
print("1. 定义因子表达式")
print("=" * 80)
# return_5: 5日收益率 = (close / close.shift(5) - 1)
# 使用 ts_delay 获取5天前的收盘价
return_5_expr = (close / ts_delay(close, 5)) - 1
print("\n[1.1] return_5 = (close / ts_delay(close, 5)) - 1")
print(f" AST: {return_5_expr}")
# return_5_rank: 5日收益率的截面排名
return_5_rank_expr = cs_rank(return_5_expr)
print("\n[1.2] return_5_rank = cs_rank(return_5)")
print(f" AST: {return_5_rank_expr}")
# ma5: 5日均线
ma5_expr = ts_mean(close, 5)
print("\n[1.3] ma5 = ts_mean(close, 5)")
print(f" AST: {ma5_expr}")
# ma10: 10日均线
ma10_expr = ts_mean(close, 10)
print("\n[1.4] ma10 = ts_mean(close, 10)")
print(f" AST: {ma10_expr}")
# =========================================================================
# 1.5 打印数据来源信息
# =========================================================================
print("\n" + "=" * 80)
print("1.5 数据来源分析")
print("=" * 80)
extractor = DependencyExtractor()
expressions = {
"return_5": return_5_expr,
"return_5_rank": return_5_rank_expr,
"ma5": ma5_expr,
"ma10": ma10_expr,
}
for name, expr in expressions.items():
deps = extractor.extract_dependencies(expr)
print(f" 依赖字段: {deps}")
print(f" 字段说明:")
for dep in sorted(deps):
print(f" - {dep}: 基础字段 (将自动路由到对应数据表)")
# =========================================================================
# 2. 创建 FactorEngine 并注册因子
# =========================================================================
print("\n" + "=" * 80)
print("2. 注册因子到 FactorEngine")
print("=" * 80)
engine = FactorEngine()
engine.register("return_5", return_5_expr)
print("[2.1] 注册 return_5")
engine.register("return_5_rank", return_5_rank_expr)
print("[2.2] 注册 return_5_rank")
engine.register("ma5", ma5_expr)
print("[2.3] 注册 ma5")
engine.register("ma10", ma10_expr)
print("[2.4] 注册 ma10")
# 也注册原始 close 价格用于验证
engine.register("close_price", close)
print("[2.5] 注册 close_price (原始收盘价)")
print(f"\n已注册因子列表: {engine.list_registered()}")
# =========================================================================
# 2.5 打印执行计划数据规格
# =========================================================================
print("\n" + "=" * 80)
print("2.5 执行计划数据规格")
print("=" * 80)
for name in engine.list_registered():
plan = engine.preview_plan(name)
if plan:
print(f"\n因子: {name}")
print(f" 输出名称: {plan.output_name}")
print(f" 依赖字段: {plan.dependencies}")
print(f" 数据规格:")
for i, spec in enumerate(plan.data_specs, 1):
print(f" [{i}] 表名: {spec.table}")
print(f" 字段: {spec.columns}")
print(f" 回看天数: {spec.lookback_days}")
# =========================================================================
# 3. 执行计算
# =========================================================================
print("\n" + "=" * 80)
print("3. 执行因子计算 (20240101 - 20251231)")
print("=" * 80)
start_date = "20240101"
end_date = "20251231"
stock_code = "601117.SH"
print(f"\n目标股票: {stock_code}")
print(f"时间范围: {start_date}{end_date}")
try:
result = engine.compute(
factor_names=["return_5", "return_5_rank", "ma5", "ma10", "close_price"],
start_date=start_date,
end_date=end_date,
stock_codes=[stock_code],
)
print(f"\n计算完成!")
print(f"结果形状: {result.shape}")
print(f"结果列: {result.columns}")
except Exception as e:
print(f"\n[错误] 计算失败: {e}")
raise
# =========================================================================
# 4. 结果展示与分析
# =========================================================================
print("\n" + "=" * 80)
print("4. 计算结果展示")
print("=" * 80)
# 4.1 数据概览
print("\n[4.1] 前20行数据预览:")
print(result.head(20))
# 4.2 按时间范围分块展示
print("\n[4.2] 2024年上半年数据 (前10行):")
result_2024h1 = result.filter(result["trade_date"] < "20240701")
print(result_2024h1.head(10))
print("\n[4.3] 2024年下半年数据 (前10行):")
result_2024h2 = result.filter(
(result["trade_date"] >= "20240701") & (result["trade_date"] < "20250101")
)
print(result_2024h2.head(10))
print("\n[4.4] 2025年数据 (前10行):")
result_2025 = result.filter(result["trade_date"] >= "20250101")
print(result_2025.head(10))
# =========================================================================
# 5. 因子验证
# =========================================================================
print("\n" + "=" * 80)
print("5. 因子计算验证")
print("=" * 80)
# 5.1 MA5/MA10 滑动窗口验证
print("\n[5.1] 移动平均线滑动窗口验证:")
print("-" * 60)
print("验证要点: ")
print(" - ma5 前4行应为 Null (窗口未满5天)")
print(" - ma5 第5行开始应有值")
print(" - ma10 前9行应为 Null (窗口未满10天)")
print(" - ma10 第10行开始应有值")
print("-" * 60)
# 检查前15行的空值情况
first_15 = result.head(15)
ma5_nulls = first_15["ma5"].null_count()
ma10_nulls = first_15["ma10"].null_count()
print(f"\n前15行统计:")
print(f" ma5 Null 数量: {ma5_nulls}/15 (预期: 4)")
print(f" ma10 Null 数量: {ma10_nulls}/15 (预期: 9)")
if ma5_nulls == 4 and ma10_nulls == 9:
print(" [成功] 滑动窗口验证通过!")
else:
print(" [警告] 滑动窗口验证异常,请检查数据")
# 5.2 Return_5 验证
print("\n[5.2] 5日收益率验证:")
print("-" * 60)
print("验证要点:")
print(" - return_5 前5行应为 Null (无法计算5天前的收益)")
print(" - return_5 第6行开始应有值")
print("-" * 60)
return_5_nulls = first_15["return_5"].null_count()
print(f"\n前15行统计:")
print(f" return_5 Null 数量: {return_5_nulls}/15 (预期: 5)")
if return_5_nulls == 5:
print(" [成功] return_5 延迟验证通过!")
else:
print(" [警告] return_5 延迟验证异常")
# 5.3 手动验证 MA5 计算
print("\n[5.3] MA5 手动计算验证:")
print("-" * 60)
# 选择第10行索引9进行验证
if len(result) >= 10:
row_10 = result.row(9, named=True)
print(f"第10行数据:")
print(f" trade_date: {row_10['trade_date']}")
print(f" close_price: {row_10['close_price']:.4f}")
print(f" ma5: {row_10['ma5']:.4f}")
print(f" ma10: {row_10['ma10']:.4f}")
# 手动计算前5天的均值
first_10 = result.head(10)
close_list = first_10["close_price"].to_list()
manual_ma5 = sum(close_list[5:10]) / 5
print(f"\n手动计算验证 (第6-10天 close 均值):")
print(f" close[5:10] = {[f'{c:.4f}' for c in close_list[5:10]]}")
print(f" 手动计算 ma5 = {manual_ma5:.4f}")
print(f" 引擎计算 ma5 = {row_10['ma5']:.4f}")
if abs(manual_ma5 - row_10["ma5"]) < 0.01:
print(" [成功] MA5 计算验证通过!")
else:
print(" [警告] MA5 计算结果不一致")
# 5.4 Return_5 手动验证
print("\n[5.4] Return_5 手动计算验证:")
print("-" * 60)
if len(result) >= 10:
row_10 = result.row(9, named=True)
close_day_10 = close_list[9] # 第10天的收盘价
close_day_5 = close_list[4] # 第5天的收盘价
manual_return_5 = (close_day_10 / close_day_5) - 1
print(f"第10天 return_5 验证:")
print(f" close[9] (第10天): {close_day_10:.4f}")
print(f" close[4] (第5天): {close_day_5:.4f}")
print(f" 手动计算 return_5 = {manual_return_5:.6f}")
print(f" 引擎计算 return_5 = {row_10['return_5']:.6f}")
if abs(manual_return_5 - row_10["return_5"]) < 0.0001:
print(" [成功] Return_5 计算验证通过!")
else:
print(" [警告] Return_5 计算结果不一致")
# =========================================================================
# 6. 统计摘要
# =========================================================================
print("\n" + "=" * 80)
print("6. 因子统计摘要")
print("=" * 80)
# 移除空值后统计
result_valid = result.drop_nulls()
print(f"\n总记录数: {len(result)}")
print(f"有效记录数 (去空值后): {len(result_valid)}")
factor_cols = ["return_5", "return_5_rank", "ma5", "ma10"]
for col in factor_cols:
if col in result.columns:
series = result[col]
null_count = series.null_count()
non_null = series.drop_nulls()
print(f"\n{col}:")
print(f" 空值数量: {null_count} ({null_count / len(result) * 100:.2f}%)")
if len(non_null) > 0:
print(f" 均值: {non_null.mean():.6f}")
print(f" 标准差: {non_null.std():.6f}")
print(f" 最小值: {non_null.min():.6f}")
print(f" 最大值: {non_null.max():.6f}")
if col == "return_5_rank":
print(f" [截面排名应在 [0, 1] 区间内]")
# =========================================================================
# 7. 保存结果
# =========================================================================
print("\n" + "=" * 80)
print("7. 结果保存")
print("=" * 80)
output_file = "tests/output/601117_factors_2024_2025.csv"
try:
result.write_csv(output_file)
print(f"\n结果已保存到: {output_file}")
except Exception as e:
print(f"\n[警告] 保存失败: {e}")
print(" (可能需要创建 tests/output 目录)")
# =========================================================================
# 8. 测试总结
# =========================================================================
print("\n" + "=" * 80)
print("8. 测试总结")
print("=" * 80)
print("\n[测试完成] 601117.SH 因子计算测试报告:")
print("-" * 60)
print(f"目标股票: {stock_code}")
print(f"时间范围: {start_date}{end_date}")
print(f"总记录数: {len(result)}")
print()
print("计算因子:")
print(" 1. return_5 - 5日收益率 (ts_delay)")
print(" 2. return_5_rank - 5日收益率截面排名 (cs_rank)")
print(" 3. ma5 - 5日均线 (ts_mean)")
print(" 4. ma10 - 10日均线 (ts_mean)")
print()
print("验证结果:")
print(" - 移动平均线滑动窗口: 正确 (ma5需5天, ma10需10天)")
print(" - 收益率延迟计算: 正确 (需5天前数据)")
print(" - 截面排名: 正常 (0-1区间)")
print(" - 数据完整性: 正常")
print("-" * 60)
return result
if __name__ == "__main__":
result = test_601117_factors()