bqljbbfy的知识库

【研究框架-数据运算】Python算子-DAI平台SQL算子对应的Python实现(持续更新)

由bqljbbfy创建,最终由bqljbbfy 被浏览 9 用户

〇、重要说明

本帖中的代码绝大部分为AI辅助编写

小伙伴们想实现什么样的算子,可以在评论区留言,UP会持续更新的

一、研究目的

DAI数据平台支持许多SQL算子(DAI SQL 函数列表),这些算子是直接在数据库层面进行操作,计算效率很高,我个人是十分推荐大家使用的。但是有些小伙伴可能还是对Python代码有更高的需求,可能是因为自己对Python更熟悉,也可能是因为导师或者甲方要求使用Python进行编写。

因此本UP在这里给大家总结一些Python算子,本质为函数,但是我将函数都封装成了算子类里的静态方法了,用法就是对提取出来的Pandas的DataFrame形式的数据进行运算。

Python算子的优缺点包括:

  • 优点:
    • Python函数的受众更广泛,使用群体庞大
    • Python语言可以跨平台使用
    • 用户可以根据不同需求自己编写Python算子,而SQL算子要请求DAI平台后端工程师来封装
  • 缺点:
    • Python本身的计算速度慢
    • Python是在前端运算,所以会耗费更多的内存
    • Python算子无法形成表达式引擎,对于大表达式来说,SQL可以一步到位实现,而Python只能分步运算

后续UP不仅会不断丰富算子,还在探索使用Numpy进行计算效率提升。

二、使用说明

(一)算子类的使用

所有的Python算子我都封装到了一个Operator类中,在这个类中,所有的算子都是静态方法,静态方法的特点就是不加self参数,因此每一个算子单独拿出来,都可以当做一个函数使用。

首先,我将整个算子类Operator都放到了AIStudio中的如下路径的一个Python文件中:/home/aiuser/work/tools/Operator.py。

接着,如果我想在其他NoteBook中使用该类中的算子,我就可以在一个代码块中输入以下代码,表示的是先让程序去找到装有Operator类的Python文件的所在路径,接着从Python文件中引用Operator类,并起别名叫op

import sys
sys.path.append("/home/aiuser/work/tools")
from Operator import Operator as op

之后,如果想使用算子,例如求5日移动平均的算子m_avg,直接调用op.m_avg,把它当作一个函数即可

(二)算子的使用

每一个算子都会以一个pandas的DataFrame为输入,输出一个与原DataFrame行数相同的Series,这样的话,就可以将算子的输出命名为原DataFrame的一个新列,方便后续操作。

1. 标量计算类算子

标量计算类算子以s开头,这些算子以整列为输入进行简单计算,其实就是将一些Numpy函数封装了一下,例如求log、求exp、求绝对值等等,例如求ln:

2. 时序计算类算子

时序计算函数是指对于同一标的来说,不同时间段的数据进行计算,这样的算子要将整个df指定为输入,因为我们必须按照df的instrument列分组再按照date列排序才能做到时序计算,列名在输入中是以字符串的形式传入的,并且通常要指定窗口大小,这里我们还看以下时序求平均:

3. 截面计算类算子

截面计算函数是指对于同一时间点来说,不同标的数据进行计算,这样的算子也要将整个df指定为输入,因为我们必须按照df的date列分组才能做到截面计算,列名在输入中是以字符串的形式传入的,并且截面计算就不需要指定窗口大小了。

需要注意的是,有些截面计算的结果是单一的值,例如2024-07-01这天全市场股票的成交量求和,结果就是一个单一的和而已,我们要将这个值填充至所有标的:

而有些截面计算函数就不是这样的,它是对于不同标的来说,运算结果不同的,例如,截面求排名的操作:

三、完整算子代码

import pandas as pd
import numpy as np
from scipy.stats import skew
from scipy.stats import kurtosis

DATE = 'date'
INST = 'instrument'

class Operator:

    # 一、标量计算
    # 三元表达式
    @staticmethod
    def s_if(condition, A, B):
        return np.where(condition, A, B)

    # 正负
    @staticmethod
    def s_sign(A):
        return np.sign(A)

    # 绝对值
    @staticmethod
    def s_abs(A):
        return np.abs(A)

    # 自然对数
    @staticmethod
    def s_ln(A):
        return np.log(A)

    # 自然指数
    @staticmethod
    def s_exp(A):
        return np.exp(A)

    # 对数
    @staticmethod
    def s_log(A, base):
        return np.log(A) / np.log(base)

    # 指数
    @staticmethod
    def s_power(A, B):
        return np.power(A, B)

    # 取最大值(二元)
    @staticmethod
    def s_min2d(A, B):
        return np.minimum(A, B)

    # 取最小值(二元)
    @staticmethod
    def s_max2d(A, B):
        return np.maximum(A, B)
    
    # 取最大值(三元)
    @staticmethod
    def s_min3d(A, B, C):
        return np.minimum(np.minimum(A, B), C)

    # 取最小值(三元)
    @staticmethod
    def s_max3d(A, B, C):
        return np.maximum(np.maximum(A, B), C)
    
    # 去极值
    @staticmethod
    def s_clip(A, lower_limit, upper_limit):
        return np.clip(A, lower_limit, upper_limit)

    # 二、时序计算
    ## 向前取n行
    @staticmethod
    def m_lag(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].shift(n)
    
    ## 向后取n行
    @staticmethod
    def m_lead(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].shift(-n)

    ## 相对于n天之前的变化值
    @staticmethod
    def m_delta(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].shift(0) - df.sort_values(by=[DATE]).groupby(INST)[arg].shift(n)
    
    ## 相对于n天之前的变化百分比
    @staticmethod
    def m_pct_change(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].shift(0) / df.sort_values(by=[DATE]).groupby(INST)[arg].shift(n) - 1
    
    ## n日滚动平均
    @staticmethod
    def m_avg(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n).mean().reset_index(level=0, drop=True)
    
    ## n日滚动平均(去空值)
    @staticmethod
    def m_nanavg(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n, min_periods=1).mean().reset_index(level=0, drop=True)

    ## n日滚动求和
    @staticmethod
    def m_sum(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n).sum().reset_index(level=0, drop=True)
    
    ## n日滚动求和(去空值)
    @staticmethod
    def m_nansum(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n, min_periods=1).sum().reset_index(level=0, drop=True)

    ## n日滚动求乘积
    @staticmethod
    def m_prod(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n).apply(np.prod).reset_index(level=0, drop=True)
    
    ## n日滚动求乘积(去空值)
    @staticmethod
    def m_nanprod(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n, min_periods=1).apply(np.prod).reset_index(level=0, drop=True)

    ## n日滚动求方差
    @staticmethod
    def m_var(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n).var().reset_index(level=0, drop=True)
    
    ## n日滚动求方差(去空值)
    @staticmethod
    def m_nanvar(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n, min_periods=1).var().reset_index(level=0, drop=True)

    ## n日滚动求标准差
    @staticmethod
    def m_stddev(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n).std().reset_index(level=0, drop=True)
    
    ## n日滚动求标准差(去空值)
    @staticmethod
    def m_nanstddev(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n, min_periods=1).std().reset_index(level=0, drop=True)

    ## n日滚动求偏度
    @staticmethod
    def m_skw(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n).apply(skew, raw=True).reset_index(level=0, drop=True)

    ## n日滚动求偏度(去空值)
    @staticmethod
    def m_nanskw(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n, min_periods=1).apply(skew, raw=True).reset_index(level=0, drop=True)

    ## n日滚动求峰度
    @staticmethod
    def m_krt(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n).apply(kurtosis, raw=True).reset_index(level=0, drop=True)

    ## n日滚动求峰度(去空值)
    @staticmethod
    def m_nankrt(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n, min_periods=1).apply(kurtosis, raw=True).reset_index(level=0, drop=True)

    ## n日滚动求协方差
    @staticmethod
    def m_cov(df, arg1, arg2, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST).apply(lambda x: x[arg1].rolling(window=n).cov(x[arg2])).reset_index(level=0, drop=True)

    ## n日滚动求协方差(去空值)
    @staticmethod
    def m_nancov(df, arg1, arg2, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST).apply(lambda x: x[arg1].rolling(window=n, min_periods=1).cov(x[arg2])).reset_index(level=0, drop=True)

    ## n日滚动求相关系数
    @staticmethod
    def m_corr(df, arg1, arg2, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST).apply(lambda x: x[arg1].rolling(window=n).corr(x[arg2])).reset_index(level=0, drop=True)

    ## n日滚动求相关系数(去空值)
    @staticmethod
    def m_nancorr(df, arg1, arg2, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST).apply(lambda x: x[arg1].rolling(window=n, min_periods=1).corr(x[arg2])).reset_index(level=0, drop=True)

    ## n日滚动求最大值
    @staticmethod
    def m_max(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n).max().reset_index(level=0, drop=True)
    
    ## n日滚动求最大值(去空值)
    @staticmethod
    def m_nanmax(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n, min_periods=1).max().reset_index(level=0, drop=True)

    ## n日滚动求最小值
    @staticmethod
    def m_min(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n).min().reset_index(level=0, drop=True)

    ## n日滚动求最小值(去空值)
    @staticmethod
    def m_nanmin(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n, min_periods=1).min().reset_index(level=0, drop=True)

    ## n日滚动求最大值的索引
    @staticmethod
    def m_argmax(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n).apply(lambda x: n - np.argmax(x) - 1, raw=True).reset_index(level=0, drop=True)

    ## n日滚动求最大值的索引(去空值)
    @staticmethod
    def m_nanargmax(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n, min_periods=1).apply(lambda x: n - np.argmax(x) - 1, raw=True).reset_index(level=0, drop=True)

    ## n日滚动求最小值的索引
    @staticmethod
    def m_argmin(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n).apply(lambda x: n - np.argmin(x) - 1, raw=True).reset_index(level=0, drop=True)

    ## n日滚动求最小值的索引(去空值)
    @staticmethod
    def m_nanargmin(df, arg, n):
        n = round(n)
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n, min_periods=1).apply(lambda x: n - np.argmin(x) - 1, raw=True).reset_index(level=0, drop=True)

    ## n日滚动排名
    @staticmethod
    def m_rank(df, arg, n, asc=True):
        n = round(n)
        df_sorted = df.sort_values(by=[DATE])
        ranked = df_sorted.groupby(INST)[arg].rolling(window=n).rank(ascending=asc)
        return ranked.groupby(df_sorted[INST]).shift(0).reset_index(level=0, drop=True)

    ## n日滚动百分比排名
    @staticmethod
    def m_pct_rank(df, arg, n, asc=True):
        n = round(n)
        df_sorted = df.sort_values(by=[DATE])
        pct_ranked = df_sorted.groupby(INST)[arg].rolling(window=n).rank(ascending=asc, pct=True)
        return pct_ranked.groupby(df_sorted[INST]).shift(0).reset_index(level=0, drop=True)
    
    ## n日线性衰减平均
    @staticmethod
    def m_decay_linear(df, arg, n):
        n = round(n)
        weights = np.linspace(1, n, num=n)
        weights = weights / weights.sum() 
        return df.sort_values(by=[DATE]).groupby(INST)[arg].rolling(window=n).apply(lambda x: np.dot(x, weights[:len(x)]), raw=True).reset_index(level=0, drop=True)
    
    # 三、截面计算
    # 截面平均
    @staticmethod
    def c_avg(df, arg):
        return df.groupby(DATE)[arg].transform('mean')
    
    # 截面求和
    @staticmethod
    def c_sum(df, arg):
        return df.groupby(DATE)[arg].transform('sum')

    # 截面求积
    @staticmethod
    def c_prod(df, arg):
        return df.groupby(DATE)[arg].transform('prod')

    # 截面求方差
    @staticmethod
    def c_var(df, arg):
        return df.groupby(DATE)[arg].transform('var')

    # 截面求标准差
    @staticmethod
    def c_stddev(df, arg):
        return df.groupby(DATE)[arg].transform('std')

    # 截面求偏度
    @staticmethod
    def c_skw(df, arg):
        return df.groupby(DATE)[arg].transform('skew')

    # 截面求峰度
    @staticmethod
    def c_krt(df, arg):
        return df.groupby(DATE)[arg].transform(lambda x: kurtosis(x, fisher=True))

    # 截面求协方差
    @staticmethod
    def c_cov(df, arg1, arg2):
        cov_values = df.groupby(DATE).apply(lambda x: x[arg1].cov(x[arg2]))
        return df[DATE].map(cov_values)

    # 截面求相关系数
    @staticmethod
    def c_corr(df, arg1, arg2):
        corr_values = df.groupby(DATE).apply(lambda x: x[arg1].corr(x[arg2]))
        return df[DATE].map(corr_values)

    # 截面求排名
    @staticmethod
    def c_rank(df, arg, asc=True):
        return df.groupby(DATE)[arg].rank(ascending=asc)

    # 截面求百分比排名
    @staticmethod
    def c_pct_rank(df, arg, asc=True):
        return df.groupby(DATE)[arg].rank(ascending=asc, pct=True)

    # 截面规模整体放缩为a(默认1)
    @staticmethod
    def c_scale(df, arg, a=1):
        sum_abs_x = df.groupby(DATE)[arg].apply(lambda group: group.abs().sum())
        scale_factor = df[DATE].map(a / sum_abs_x)
        return df[arg] * scale_factor
    
    # 截面正则化
    @staticmethod
    def c_normalize(df, arg):
        min_val = df.groupby(DATE)[arg].transform('min')
        max_val = df.groupby(DATE)[arg].transform('max')
        return (df[arg] - min_val) / (max_val - min_val)

    # 截面标准化
    @staticmethod
    def c_standardize(df, arg):
        mean_val = df.groupby(DATE)[arg].transform('mean')
        std_val  = df.groupby(DATE)[arg].transform('std')
        return (df[arg] - mean_val) / std_val



\

{link}