基于OpenFE的期货因子挖掘
由qxiao创建,最终由small_q 被浏览 11 用户
引言
在量化交易与数据科学领域,特征工程是一个至关重要的步骤,直接影响到模型的预测能力与效果。OpenFE 是一个开源的特征工程框架,旨在帮助研究人员和工程师快速生成高质量的特征。然而,原始版本的 OpenFE 算子虽然功能强大,但在某些应用场景下仍存在一定的局限性。为了更好地满足我们在量化研究中的需求,我对 OpenFE 算子进行了重新构建,丰富衍生特征生成;并将其与 XGBoost 相结合,用于特征重要性评估,方便后续标的打分。
本文将详细介绍这一重构过程,并通过实际案例展示如何使用这一改进后的算子生成衍生特征,并使用 XGBoost 进行特征重要性评估,从而优化我们的量化模型。
OpenFE简介
OpenFE(Open Feature Engineering)是一个用于自动化特征工程的开源库,旨在为量化分析师、数据科学家提供便捷的工具来生成衍生特征。OpenFE遵循Expand-And-Reduce框架来自动生成特征。为了扩展,第一步在输入基本数据后,openfe将所有基本特征分类为数值特征和分类特征。然后通过使用树结构枚举的方式结合算子和基本特征所有的一阶变换来创建候选特征池,每个变换使用一个运算符。第二步,openfe对生成的特征进行了筛选。特征自动特征生成的挑战往往在与先扩后缩,特别是对于枚举法生成衍生特征之后特征数量十分庞大的情况,这一步显得尤为重要。为了缩减,openfe提出了一个两阶段评估框架来快速减少候选特征的数量。最后,在基本特征集中包含排名靠前的候选特征。
特征提升
针对如何快速衡量每个候选特征的效果,OpenFE采用了一种类似于 梯度提升树(Gradient Boosting) 的方法。与传统的重新训练整个模型的方式不同,OpenFE通过增量训练来高效评估每个新特征的贡献。以GBDT为例,OpenFE首先使用已有的特征集训练一个GBDT模型,得到模型的预测值和效果。接着,针对某一个新特征,OpenFE将其作为GBDT模型的初始预测,然后仅在新特征上进行训练,得到新的预测值及其效果。通过对比新特征训练前后的预测效果,能够快速评估新特征对模型提升的贡献,而无需重新训练整个模型。这样,不仅大大提高了训练速度,还能得到与完全重新训练相似的效果。
特征筛选
OpenFE采用了一个两阶段的特征筛选结构。
第一阶段:\n该阶段的目标是快速去除明显无效的候选特征。OpenFE借鉴了 多臂老虎机问题(Multi-Armed Bandit) 中的 Successive Halving 算法,将数据集划分为多个数据块。在初始阶段,只用一个数据块对所有新特征进行评估,基于评估结果,去除一半效果较差的特征。接下来,使用更多的数据块来评估剩下的特征,并重复这一过程。由于该阶段主要进行粗筛,OpenFE在此仅利用特征提升方法评估每个新特征的单独增量贡献,尚未考虑特征之间的交互影响。
第二阶段:\n在第一阶段筛选后,OpenFE进入了精细筛选阶段,这时会更加关注特征之间的交互作用。此时,OpenFE将第一阶段筛选出的候选特征与已有的特征集合并在一起,通过 特征提升 训练一个新模型。然后,OpenFE使用 特征重要性归因(Feature Importance Attribution) 方法,评估每个新特征在模型损失下降中的贡献程度,并根据贡献度对特征进行排序。最后,排名靠前的候选特征被选入最终的特征集合。
我们可以使用 OpenFE 来快速提取原始数据中的重要信息,并为后续的建模和预测过程提供丰富的特征。
OpenFE重构动机与主要过程
特征提升
OpenFE包中主要通过FeatureGenerator包中的calaulate函数定义算子,并使用列表来收集不同类别算子名称,如num_operators、num_num_operators等;再使用树结构枚举的方式结合算子和基本特征所有的一阶变换来创建候选特征池;结合完成后,使用FeatureGenerator包中的tree_to_formula函数将树结构转化为公式的形式,增加了合成算子的可读性。本文扩张了算子的定义方式,更新了算子名称列表。
主要过程
当前OpenFE包中主要的算子有五类,分别是all_operators, num_operators, num_num_operators, cat_num_operators和cat_cat_operators。覆盖了因子生成所需的基本算法,例如,截面因子可以使用GroupByThenRank(*, date)的树结构生成。但是本文注意到,在特征工程中经常用到的时序窗口算子在上述函数中没有提及。所以,本文新增对时序算子 time_series_operators的定义。
all_operators = ['freq']
num_operators = ['abs', 'log', 'sqrt', 'square', 'sigmoid', 'round', 'residual']
num_num_operators = ['min', 'max', '+', '-', '*', '/']
cat_num_operators = ['GroupByThenMin', 'GroupByThenMax', 'GroupByThenMean', 'GroupByThenMedian', 'GroupByThenStd', 'GroupByThenRank']
cat_cat_operators = ['Combine', 'CombineThenFreq', 'GroupByThenNUnique']
首先,由于因子生成的过程并非交互性的,所以窗口长度需要被提前设定,本文设定1-10为窗口长度,选择ma、std、sum三类常用算子,通过函数generate_time_series_operators生成时序窗口算子。并在算子列表中新增时序算子的列表time_series_operators。
def generate_time_series_operators(base_operators, windows):
"""
动态生成带有窗口长度的时间序列操作符。
Args:
base_operators (list): 基础操作符(如 'ma', 'std', 'sum')。
windows (list or range): 窗口长度范围。
Returns:
list: 动态生成的操作符列表。
"""
return [f"{operator}_{window}" for operator in base_operators for window in windows]
time_series_operators_base = ['ma', 'std', 'sum']
windows = range(1, 11)
time_series_operators = generate_time_series_operators(time_series_operators_base, windows)
其次,本文定义新的因子计算函数calculate_customized,覆盖原FeatureGenerator包,Node模块中的calculate函数,并在里面新增时序窗口算子的定义。遵从原包中树结构的形式,主要使用pandas包中的groupby,rolling等函数来构建。
from openfe.FeatureGenerator import Node
def calculate_customized(self, data, is_root=False):
if self.name in all_operators+num_operators:
d = self.children[0].calculate(data)
if self.name == "abs":
new_data = d.abs()
elif self.name == "log":
new_data = np.log(np.abs(d.replace(0, np.nan)))
elif self.name == "sqrt":
new_data = np.sqrt(np.abs(d))
elif self.name == "square":
new_data = np.square(d)
elif self.name == "sigmoid":
new_data = 1 / (1 + np.exp(-d))
elif self.name == "freq":
value_counts = d.value_counts()
value_counts.loc[np.nan] = np.nan
new_data = d.apply(lambda x: value_counts.loc[x])
elif self.name == "round":
new_data = np.floor(d)
elif self.name == "residual":
new_data = d - np.floor(d)
else:
raise NotImplementedError(f"Unrecognized operator {self.name}.")
elif self.name in num_num_operators:
d1 = self.children[0].calculate(data)
d2 = self.children[1].calculate(data)
if self.name == "max":
new_data = np.maximum(d1, d2)
elif self.name == "min":
new_data = np.minimum(d1, d2)
elif self.name == "+":
new_data = d1 + d2
elif self.name == "-":
new_data = d1 - d2
elif self.name == "*":
new_data = d1 * d2
elif self.name == "/":
new_data = d1 / d2.replace(0, np.nan)
elif self.name in time_series_operators:
d1 = self.children[0].calculate(data)
d2 = self.children[1].calculate(data)
# 提取算子和窗口长度
operator, window = self.name.split('_')
window = int(window)
# 根据算子类型执行对应操作
if operator == 'ma':
new_data = d1.groupby(d2).rolling(window=window, min_periods=1).mean().reset_index(level=0, drop=True)
elif operator == 'std':
new_data = d1.groupby(d2).rolling(window=window, min_periods=1).std().reset_index(level=0, drop=True)
elif operator == 'sum':
new_data = d1.groupby(d2).rolling(window=window, min_periods=1).sum().reset_index(level=0, drop=True)
else:
d1 = self.children[0].calculate(data)
d2 = self.children[1].calculate(data)
if self.name == "GroupByThenMin":
temp = d1.groupby(d2).min()
temp.loc[np.nan] = np.nan
new_data = d2.apply(lambda x: temp.loc[x])
elif self.name == "GroupByThenMax":
temp = d1.groupby(d2).max()
temp.loc[np.nan] = np.nan
new_data = d2.apply(lambda x: temp.loc[x])
elif self.name == "GroupByThenMean":
temp = d1.groupby(d2).mean()
temp.loc[np.nan] = np.nan
new_data = d2.apply(lambda x: temp.loc[x])
elif self.name == "GroupByThenMedian":
temp = d1.groupby(d2).median()
temp.loc[np.nan] = np.nan
new_data = d2.apply(lambda x: temp.loc[x])
elif self.name == "GroupByThenStd":
temp = d1.groupby(d2).std()
temp.loc[np.nan] = np.nan
new_data = d2.apply(lambda x: temp.loc[x])
elif self.name == 'GroupByThenRank':
new_data = d1.groupby(d2).rank(ascending=True, pct=True)
elif self.name == "GroupByThenFreq":
def _f(x):
value_counts = x.value_counts()
value_counts.loc[np.nan] = np.nan
return x.apply(lambda x: value_counts.loc[x])
new_data = d1.groupby(d2).apply(_f)
elif self.name == "GroupByThenNUnique":
nunique = d1.groupby(d2).nunique()
nunique.loc[np.nan] = np.nan
new_data = d2.apply(lambda x: nunique.loc[x])
elif self.name == "Combine":
temp = d1.astype(str) + '_' + d2.astype(str)
temp[d1.isna() | d2.isna()] = np.nan
temp, _ = temp.factorize()
new_data = pd.Series(temp, index=d1.index).astype("float64")
elif self.name == "CombineThenFreq":
temp = d1.astype(str) + '_' + d2.astype(str)
temp[d1.isna() | d2.isna()] = np.nan
value_counts = temp.value_counts()
value_counts.loc[np.nan] = np.nan
new_data = temp.apply(lambda x: value_counts.loc[x])
else:
raise NotImplementedError(f"Unrecognized operator {self.name}.")
if self.name == 'Combine':
new_data = new_data.astype('category')
else:
new_data = new_data.astype('float')
if is_root:
self.data = new_data
return new_data
Node.calculate = calculate_customized
最后,本文在OpenFE算子生成的过程中调用时序窗口算子。算子的初始化和生成主要在openfe包中的enumerate和get_cindidate_features函数里,在enumerate函数中初始化树节点并调用FeatureGenerator中的Node模块将算子与基本特征结合,生成衍生特征,将生成的特征append到numcandidate_features或cat_generator_features里面。
def _enumerate(current_order_num_features, lower_order_num_features,
current_order_cat_features, lower_order_cat_features):
num_candidate_features = []
cat_candidate_features = []
for op in all_operators:
for f in current_order_num_features+current_order_cat_features:
num_candidate_features.append(Node(op, children=[deepcopy(f)]))
for op in num_operators:
for f in current_order_num_features:
num_candidate_features.append(Node(op, children=[deepcopy(f)]))
for op in time_series_operators:
for f in current_order_num_features:
ins_f = FNode('instrument')
num_candidate_features.append(Node(op, children=[deepcopy(f), deepcopy(ins_f)]))
for op in num_num_operators:
for i in range(len(current_order_num_features)):
f1 = current_order_num_features[i]
k = i if op in symmetry_operators else 0
for f2 in current_order_num_features[k:] + lower_order_num_features:
if check_xor(f1, f2):
num_candidate_features.append(Node(op, children=[deepcopy(f1), deepcopy(f2)]))
for op in cat_num_operators:
for f in current_order_num_features:
for cat_f in current_order_cat_features + lower_order_cat_features:
if check_xor(f, cat_f):
num_candidate_features.append(Node(op, children=[deepcopy(f), deepcopy(cat_f)]))
for f in lower_order_num_features:
for cat_f in current_order_cat_features:
if check_xor(f, cat_f):
num_candidate_features.append(Node(op, children=[deepcopy(f), deepcopy(cat_f)]))
for op in cat_cat_operators:
for i in range(len(current_order_cat_features)):
f1 = current_order_cat_features[i]
k = i if op in symmetry_operators else 0
for f2 in current_order_cat_features[k:] + lower_order_cat_features:
if check_xor(f1, f2):
if op in ['Combine']:
cat_candidate_features.append(Node(op, children=[deepcopy(f1), deepcopy(f2)]))
else:
num_candidate_features.append(Node(op, children=[deepcopy(f1), deepcopy(f2)]))
return num_candidate_features, cat_candidate_features
经过上述步骤后,时序窗口算子成功地被添加到衍生特征生成的因子里。
重构代码
算子生成:
from openfe.FeatureGenerator import all_operators, num_operators, num_num_operators, cat_num_operators, cat_cat_operators
all_operators = ['freq']
num_operators = ['abs', 'log', 'sqrt', 'square', 'sigmoid', 'round', 'residual']
num_num_operators = ['min', 'max', '+', '-', '*', '/']
cat_num_operators = ['GroupByThenMin', 'GroupByThenMax', 'GroupByThenMean', 'GroupByThenMedian', 'GroupByThenStd', 'GroupByThenRank']
cat_cat_operators = ['Combine', 'CombineThenFreq', 'GroupByThenNUnique']
# 生成时序因子
def generate_time_series_operators(base_operators, windows):
"""
动态生成带有窗口长度的时间序列操作符。
Args:
base_operators (list): 基础操作符(如 'ma', 'std', 'sum')。
windows (list or range): 窗口长度范围。
Returns:
list: 动态生成的操作符列表。
"""
return [f"{operator}_{window}" for operator in base_operators for window in windows]
time_series_operators_base = ['ma', 'std', 'sum']
windows = range(1, 11)
time_series_operators = generate_time_series_operators(time_series_operators_base, windows)
算子定义:
from openfe.FeatureGenerator import Node
def calculate_customized(self, data, is_root=False):
if self.name in all_operators+num_operators:
d = self.children[0].calculate(data)
if self.name == "abs":
new_data = d.abs()
elif self.name == "log":
new_data = np.log(np.abs(d.replace(0, np.nan)))
elif self.name == "sqrt":
new_data = np.sqrt(np.abs(d))
elif self.name == "square":
new_data = np.square(d)
elif self.name == "sigmoid":
new_data = 1 / (1 + np.exp(-d))
elif self.name == "freq":
value_counts = d.value_counts()
value_counts.loc[np.nan] = np.nan
new_data = d.apply(lambda x: value_counts.loc[x])
elif self.name == "round":
new_data = np.floor(d)
elif self.name == "residual":
new_data = d - np.floor(d)
else:
raise NotImplementedError(f"Unrecognized operator {self.name}.")
elif self.name in num_num_operators:
d1 = self.children[0].calculate(data)
d2 = self.children[1].calculate(data)
if self.name == "max":
new_data = np.maximum(d1, d2)
elif self.name == "min":
new_data = np.minimum(d1, d2)
elif self.name == "+":
new_data = d1 + d2
elif self.name == "-":
new_data = d1 - d2
elif self.name == "*":
new_data = d1 * d2
elif self.name == "/":
new_data = d1 / d2.replace(0, np.nan)
elif self.name in time_series_operators:
d1 = self.children[0].calculate(data)
d2 = self.children[1].calculate(data)
# 提取算子和窗口长度
operator, window = self.name.split('_')
window = int(window)
# 根据算子类型执行对应操作
if operator == 'ma':
new_data = d1.groupby(d2).rolling(window=window, min_periods=1).mean().reset_index(level=0, drop=True)
elif operator == 'std':
new_data = d1.groupby(d2).rolling(window=window, min_periods=1).std().reset_index(level=0, drop=True)
elif operator == 'sum':
new_data = d1.groupby(d2).rolling(window=window, min_periods=1).sum().reset_index(level=0, drop=True)
else:
d1 = self.children[0].calculate(data)
d2 = self.children[1].calculate(data)
if self.name == "GroupByThenMin":
temp = d1.groupby(d2).min()
temp.loc[np.nan] = np.nan
new_data = d2.apply(lambda x: temp.loc[x])
elif self.name == "GroupByThenMax":
temp = d1.groupby(d2).max()
temp.loc[np.nan] = np.nan
new_data = d2.apply(lambda x: temp.loc[x])
elif self.name == "GroupByThenMean":
temp = d1.groupby(d2).mean()
temp.loc[np.nan] = np.nan
new_data = d2.apply(lambda x: temp.loc[x])
elif self.name == "GroupByThenMedian":
temp = d1.groupby(d2).median()
temp.loc[np.nan] = np.nan
new_data = d2.apply(lambda x: temp.loc[x])
elif self.name == "GroupByThenStd":
temp = d1.groupby(d2).std()
temp.loc[np.nan] = np.nan
new_data = d2.apply(lambda x: temp.loc[x])
elif self.name == 'GroupByThenRank':
new_data = d1.groupby(d2).rank(ascending=True, pct=True)
elif self.name == "GroupByThenFreq":
def _f(x):
value_counts = x.value_counts()
value_counts.loc[np.nan] = np.nan
return x.apply(lambda x: value_counts.loc[x])
new_data = d1.groupby(d2).apply(_f)
elif self.name == "GroupByThenNUnique":
nunique = d1.groupby(d2).nunique()
nunique.loc[np.nan] = np.nan
new_data = d2.apply(lambda x: nunique.loc[x])
elif self.name == "Combine":
temp = d1.astype(str) + '_' + d2.astype(str)
temp[d1.isna() | d2.isna()] = np.nan
temp, _ = temp.factorize()
new_data = pd.Series(temp, index=d1.index).astype("float64")
elif self.name == "CombineThenFreq":
temp = d1.astype(str) + '_' + d2.astype(str)
temp[d1.isna() | d2.isna()] = np.nan
value_counts = temp.value_counts()
value_counts.loc[np.nan] = np.nan
new_data = temp.apply(lambda x: value_counts.loc[x])
else:
raise NotImplementedError(f"Unrecognized operator {self.name}.")
if self.name == 'Combine':
new_data = new_data.astype('category')
else:
new_data = new_data.astype('float')
if is_root:
self.data = new_data
return new_data
Node.calculate = calculate_customized
算子调用:
def _enumerate(current_order_num_features, lower_order_num_features,
current_order_cat_features, lower_order_cat_features):
num_candidate_features = []
cat_candidate_features = []
for op in all_operators:
for f in current_order_num_features+current_order_cat_features:
num_candidate_features.append(Node(op, children=[deepcopy(f)]))
for op in num_operators:
for f in current_order_num_features:
num_candidate_features.append(Node(op, children=[deepcopy(f)]))
for op in time_series_operators:
for f in current_order_num_features:
ins_f = FNode('instrument')
num_candidate_features.append(Node(op, children=[deepcopy(f), deepcopy(ins_f)]))
for op in num_num_operators:
for i in range(len(current_order_num_features)):
f1 = current_order_num_features[i]
k = i if op in symmetry_operators else 0
for f2 in current_order_num_features[k:] + lower_order_num_features:
if check_xor(f1, f2):
num_candidate_features.append(Node(op, children=[deepcopy(f1), deepcopy(f2)]))
for op in cat_num_operators:
for f in current_order_num_features:
for cat_f in current_order_cat_features + lower_order_cat_features:
if check_xor(f, cat_f):
num_candidate_features.append(Node(op, children=[deepcopy(f), deepcopy(cat_f)]))
for f in lower_order_num_features:
for cat_f in current_order_cat_features:
if check_xor(f, cat_f):
num_candidate_features.append(Node(op, children=[deepcopy(f), deepcopy(cat_f)]))
for op in cat_cat_operators:
for i in range(len(current_order_cat_features)):
f1 = current_order_cat_features[i]
k = i if op in symmetry_operators else 0
for f2 in current_order_cat_features[k:] + lower_order_cat_features:
if check_xor(f1, f2):
if op in ['Combine']:
cat_candidate_features.append(Node(op, children=[deepcopy(f1), deepcopy(f2)]))
else:
num_candidate_features.append(Node(op, children=[deepcopy(f1), deepcopy(f2)]))
return num_candidate_features, cat_candidate_features
特征筛选
主要过程
OpenFE开源函数包里通过使用_*evaluate和evaluate_*lgbm函数定义两阶段筛选中重要性评估方法,其中,evalueta函数用于一阶段的重要性评估,*evaluate*lgbm用于二阶段的重要性评估,考虑到二阶段的重要性评估必须使用特定方法评估LGBM模型的结果,所以本文不对二阶段的重要性评估进行重构,主要对一阶段的重要性评估进行重构。所以,本文定制_evaluate_customized函数替换openfe模块中的原函数,定义stage1_metric_set表示可选的一阶段评估方法。
如果需要增加新的重要性评估方法,需要在stage1_metric_set里面新增方法名称,在_evaluate_customized函数中新增相关定义。
{{pro}}