# 交易引擎:初始化函数,只执行一次
def m3_initialize_bigquant_run(context):
pass
# 交易引擎:每个单位时间开盘前调用一次。
def m3_before_trading_start_bigquant_run(context, data):
pass
# 交易引擎:tick数据处理函数,每个tick执行一次
def m3_handle_tick_bigquant_run(context, tick):
pass
# 交易引擎:bar数据处理函数,每个时间单位执行一次
def m3_handle_data_bigquant_run(context, data):
pass
# 交易引擎:成交回报处理函数,每个成交发生时执行一次
def m3_handle_trade_bigquant_run(context, trade):
pass
# 交易引擎:委托回报处理函数,每个委托变化时执行一次
def m3_handle_order_bigquant_run(context, order):
pass
# 交易引擎:盘后处理函数,每日盘后执行一次
def m3_after_trading_bigquant_run(context, data):
pass
import pandas as pd
import numpy as np
def ZScoreNorm(x, features, axis=0, label=False):
_x = x[features]
arr = _x.values
arr = (arr - np.nanmean(arr, axis=axis, keepdims=True)) / np.nanstd(arr, axis=axis, ddof=1, keepdims=True)
result = pd.DataFrame(arr, columns=_x.columns, index=_x.index)
result["date"] = x["date"]
result["product_code"] = x["product_code"]
if "label" in x.columns:
if label:
_label = x["label"]
_label_arr = _label.values
_label_arr = (_label_arr - np.nanmean(_label_arr, axis=0)) / np.nanstd(_label_arr, axis=0, ddof=1)
result["label"] = _label_arr
else:
result["label"] = x["label"]
return result