"""测试 Bug 修复: 1. 临时因子命名冲突修复验证 2. 逻辑运算符支持验证 """ import sys sys.path.insert(0, "D:/PyProject/ProStock") from src.factors.dsl import Symbol, BinaryOpNode from src.factors.engine.ast_optimizer import ExpressionFlattener, flatten_expression def test_temp_name_uniqueness(): """测试:临时因子名称全局唯一性。""" print("测试 1: 临时因子命名冲突修复") print("-" * 50) close = Symbol("close") open_price = Symbol("open") # 创建两个表达式拍平器实例 flattener1 = ExpressionFlattener() flattener2 = ExpressionFlattener() # 模拟因子 A: cs_rank(ts_delay(close, 1)) from src.factors.dsl import FunctionNode expr_a = FunctionNode("cs_rank", FunctionNode("ts_delay", close, 1)) flat_a, temps_a = flattener1.flatten(expr_a) # 模拟因子 B: cs_mean(ts_delay(open, 2)) expr_b = FunctionNode("cs_mean", FunctionNode("ts_delay", open_price, 2)) flat_b, temps_b = flattener2.flatten(expr_b) # 验证临时名称不冲突 temp_names_a = set(temps_a.keys()) temp_names_b = set(temps_b.keys()) print(f"因子 A 临时名称: {temp_names_a}") print(f"因子 B 临时名称: {temp_names_b}") # 检查是否有名称冲突 common_names = temp_names_a & temp_names_b if common_names: print(f"[失败] 发现命名冲突: {common_names}") return False print("[通过] 临时因子名称全局唯一,无冲突") return True def test_logical_operators(): """测试:逻辑运算符支持。""" print("\n测试 2: 逻辑运算符支持") print("-" * 50) # 测试 DSL 层 close = Symbol("close") open_price = Symbol("open") # 测试 & 运算符(注意 Python 运算符优先级,需要用括号) and_expr = (close > open_price) & (close > 0) print(f"DSL 表达式 ((close > open) & (close > 0)): {and_expr}") assert isinstance(and_expr, BinaryOpNode), "& 应生成 BinaryOpNode" assert and_expr.op == "&", "运算符应为 &" print("[通过] DSL 层支持 & 运算符") # 测试 | 运算符(注意 Python 运算符优先级,需要用括号) or_expr = (close < open_price) | (close < 0) print(f"DSL 表达式 ((close < open) | (close < 0)): {or_expr}") assert isinstance(or_expr, BinaryOpNode), "| 应生成 BinaryOpNode" assert or_expr.op == "|", "运算符应为 |" print("[通过] DSL 层支持 | 运算符") # 测试字符串解析 from src.factors.parser import FormulaParser from src.factors.registry import FunctionRegistry parser = FormulaParser(FunctionRegistry()) # 解析包含 & 的表达式 try: parsed_and = parser.parse("(close > open) & (volume > 0)") print(f"解析器支持 & 运算符: {parsed_and}") print("[通过] Parser 支持 & 运算符") except Exception as e: print(f"[失败] Parser 解析 & 失败: {e}") return False # 解析包含 | 的表达式 try: parsed_or = parser.parse("(close < open) | (volume < 0)") print(f"解析器支持 | 运算符: {parsed_or}") print("[通过] Parser 支持 | 运算符") except Exception as e: print(f"[失败] Parser 解析 | 失败: {e}") return False # 测试翻译到 Polars from src.factors.translator import PolarsTranslator import polars as pl translator = PolarsTranslator() try: polars_and = translator.translate(parsed_and) print(f"Polars 表达式 (&): {polars_and}") print("[通过] Translator 支持 & 运算符") except Exception as e: print(f"[失败] Translator 翻译 & 失败: {e}") return False try: polars_or = translator.translate(parsed_or) print(f"Polars 表达式 (|): {polars_or}") print("[通过] Translator 支持 | 运算符") except Exception as e: print(f"[失败] Translator 翻译 | 失败: {e}") return False return True if __name__ == "__main__": print("=" * 60) print("Bug 修复验证测试") print("=" * 60) test1_passed = test_temp_name_uniqueness() test2_passed = test_logical_operators() print("\n" + "=" * 60) print("测试结果汇总") print("=" * 60) print(f"临时因子命名冲突修复: {'[通过]' if test1_passed else '[失败]'}") print(f"逻辑运算符支持: {'[通过]' if test2_passed else '[失败]'}") if test1_passed and test2_passed: print("\n所有测试通过!") sys.exit(0) else: print("\n存在失败的测试!") sys.exit(1)