BigQuant使用文档

DAI SQL FAQ

由qxiao创建,最终由qxiao 被浏览 221 用户

如何实现自定义的带有窗口的 macro 函数

创建 macro 函数的语法可参见 create macro.

DAI 提供的滚动窗口函数 m_aggregate_func (e.g. m_avg, m_median, etc.) 有这样的模型:

create macro m_aggregate_func(args, win_sz, pb:=instrument, ob:=date) as
    aggregate_func(args) over (partition by pb order by ob rows win_sz-1 preceding)

注意args 如果是多个参数,定义 macro 函数时需要分开指明。默认参数 pb, ob 如果有多个参数也需要分开指明。也就是说每个标识符只能传入一个参数。

类似的,时间截面窗口函数 c_rank, c_avg 实现:

create macro c_rank_asc(arg, pb:=date) as
    rank() over (partition by pb order by arg);

create macro c_rank_desc(arg, pb:=date) as
    rank() over (partition by pb order by arg desc);

-- 不能传入关键字作为参数,asc/desc 是关键字
-- create macro c_rank(arg, pb:=date, order_type:=asc) as
--     rank() over (partition by pb order by arg order_type);

-- workaround
create macro c_rank(arg, pb:=date, ascending:=true) as
    if (ascending, rank() over (partition by pb order by arg), 
                   rank() over (partition by pb order by arg desc));

create macro c_avg(arg, pb:=date) as
    avg(arg) over (partition by pb);

行业市值中性化 c_neutralize 的使用

  1. 为什么求出来基本全是 NaN值呢?

    答:c_neutralize(y, industry_level1_code, market_cap) 中传入的因子值是 y (右端向量),通过industry_level1_code 获取的dummy矩阵及取对数后的市值(以及为constant添加的常向量1)共同组成 X,然后做的线性回归。当 y, X 中有任何元素为NaN时求线性方程时就会全部为NaN。所以算之前需要先把 NaN 过滤掉或者填充,一般y很有可能含有 NaN,比如 y = m_avg(close, 5)

  2. c_neutralize采用的算法:

    def process_factor(factor):
        median = np.median(factor)
        mad = np.median(abs(factor - median))
        mad *= 3*1.4826
        clipped = factor.clip(median - mad, median + mad)
        # pandas std() has ddof=1, while numpy std() has ddof=0
        return (clipped - clipped.mean()) / clipped.std(ddof=1)
    
    def sm_resid(data):
        X = pd.get_dummies(data['industry_level1_code']).astype('float64')
        # X = X.reindex(columns=dummy_cols)
        X['log_marketcap'] = np.log(data['total_market_cap'])
        y = process_factor(data['pb'])
        X = sm.add_constant(X)  # intercept term
        model = sm.OLS(y, X)
        results = model.fit(method='pinv')
        return pd.DataFrame(results.resid, index=data.index)
    
    def np_resid(data):
        X = pd.get_dummies(data['industry_level1_code']).astype('float64')
        # X = X.reindex(columns=dummy_cols)
        X['log_marketcap'] = np.log(data['total_market_cap'])
        y = process_factor(data['pb'])
        beta, _, _, _ = np.linalg.lstsq(X, y, rcond=None)
        y_pred = X.dot(beta)
        return pd.DataFrame(y - y_pred, index=data.index)
    
    t1 = time.time()
    df['sm_resid'] = df.groupby('date').apply(sm_resid).reset_index(drop=True).reindex(df.index)
    

    使用例子和对比参见: https://bigquant.com/codeshare/5739e696-fd64-480c-95ec-793ea9ff889c

  3. 如果想自己处理 factor 后再只算个残差, 可以使用 c_neutralize_resid(y, industry_level1_code, log_marketcap) 函数 (y = processed_factor)。

缺失值填充/替换

当分区后的数据含有NA值时,填充其值为前一个非NA值:

select 
    instrument, date, 
    last(columns(* exclude (date, instrument)) ignore nulls) 
        over (partition by instrument order by date rows between 
              unbounded preceding and current row) as 'columns(*)' 
from cn_stock_bar1d
where date >= '2023-01-01' 
order by instrument, date

columns(* exclude (date, instrument))   会依次扩展成每一个非 date  和 instrument  的列,最好减少不需要操作的数据。as 'columns(*)’ 会保存处理后的数据成原始的列名,as 'columns(*)_suffix_name’ 的话会把处理后的数据依次存成原列名加上后缀名(_suffix_name)的列名。

填充前 填充后

类似的如果想批量替换NA值为其他值,比如 0,可以把 last 函数替换成 ifnull 函数:

select 
    instrument, date, 
    ifnull(columns(* exclude (date, instrument, name)), 0) as 'columns(*)' 
from cn_stock_bar1d
where date >= '2023-01-01' 
order by instrument, date

把当天open, close为空的值填充为当天所有股票的中位数:

select 
    instrument, date, 
    ifnull(columns(['close', 'open']), 
           c_median(columns(['close', 'open']))) as 'columns(*)' 
from cn_stock_bar1d
where date >= '2023-01-01'
order by instrument, date

\