82 lines
3.5 KiB
Python
82 lines
3.5 KiB
Python
import pandas as pd
|
||
import numpy as np
|
||
from tqdm import tqdm
|
||
|
||
def _prepare_concept_df(concept_dict: dict) -> pd.DataFrame:
|
||
"""将 concept_dict 转换为长格式的 DataFrame。"""
|
||
records = []
|
||
for date_str, inner_dict in concept_dict.items():
|
||
trade_date = pd.to_datetime(date_str, format='%Y%m%d')
|
||
for concept_name, stock_list in inner_dict.items():
|
||
for ts_code in stock_list:
|
||
records.append((trade_date, concept_name, ts_code))
|
||
|
||
if not records:
|
||
return pd.DataFrame(columns=['trade_date', 'concept_name', 'ts_code'])
|
||
|
||
concept_df = pd.DataFrame(records, columns=['trade_date', 'concept_name', 'ts_code'])
|
||
concept_df = concept_df.drop_duplicates(subset=["trade_date", "ts_code"], keep="first")
|
||
|
||
return concept_df
|
||
|
||
def generate_concept_factors(df: pd.DataFrame, concept_dict: dict) -> pd.DataFrame:
|
||
"""
|
||
基于热门概念数据生成因子。
|
||
|
||
Args:
|
||
df (pd.DataFrame): 所有股票所有日期的数据,需包含 'ts_code', 'trade_date'
|
||
以及用于聚合的列 (如 'pct_chg', 'turnover_rate')。
|
||
concept_dict (dict): 每日热门概念及其股票列表的字典。
|
||
|
||
Returns:
|
||
pd.DataFrame: 添加了概念相关因子的原始 DataFrame。
|
||
"""
|
||
print("开始生成概念相关因子...")
|
||
|
||
# 0. 准备工作,创建副本以避免修改原始df
|
||
df = df.copy()
|
||
df['trade_date'] = pd.to_datetime(df['trade_date'])
|
||
|
||
# 1. 将 concept_dict 转换为适合合并的 DataFrame
|
||
concept_df = _prepare_concept_df(concept_dict).sort_values(by=['trade_date'])
|
||
if concept_df.empty:
|
||
print("警告: concept_dict 为空或格式不正确,无法生成概念因子。")
|
||
return df
|
||
|
||
# 2. 将概念信息合并到主数据 df
|
||
df = pd.merge(df, concept_df, on=['trade_date', 'ts_code'], how='left')
|
||
# --- 因子计算 ---
|
||
|
||
# 因子 1: 是否属于当日热门概念
|
||
df['cat_hot_concept_stock'] = df['concept_name'].notna().astype(np.int8)
|
||
|
||
# 因子 2 & 3: 概念内的截面排序因子
|
||
# 创建一个掩码,只对热门概念股进行后续计算,以提高效率
|
||
hot_mask = df['concept_name'].notna()
|
||
|
||
# 定义需要在概念内部进行截面排序的特征列表
|
||
# 确保这些列存在于你的 df 中
|
||
features_to_rank = ['pct_chg', 'turnover_rate', 'volume_ratio']
|
||
|
||
# 筛选出 df 中实际存在的特征列
|
||
existing_features_to_rank = [f for f in features_to_rank if f in df.columns]
|
||
if not existing_features_to_rank:
|
||
print("警告: df 中缺少用于概念内排序的特征列,跳过相关因子计算。")
|
||
else:
|
||
print(f"开始计算概念内截面排序因子,基于: {existing_features_to_rank}")
|
||
|
||
# 使用 groupby().rank() 高效计算截面排名
|
||
grouped = df[hot_mask].groupby(['trade_date', 'concept_name'])
|
||
|
||
for feature in tqdm(existing_features_to_rank, desc="Ranking Features in Concepts"):
|
||
# 计算百分比排名 (0到1之间),值越大表示排名越靠前
|
||
rank_col_name = f'concept_rank_{feature}'
|
||
df[rank_col_name] = grouped[feature].rank(pct=True)
|
||
|
||
# --- 清理 & 返回 ---
|
||
# `concept_name` 列包含了有用的信息,可以选择保留或删除
|
||
# 这里我们选择保留,以便后续分析。如果不需要,可以取消下面这行注释。
|
||
df.drop(columns=['concept_name'], inplace=True)
|
||
|
||
print("概念相关因子生成完毕。")
|
||
return df |