问答交流

如何将跨日高频数据抽取为日频

由dawnj创建,最终由small_q 被浏览 25 用户

比如计算7日内分钟数据的volume和close的相关系数

高频特征抽取模块只能算一天内的

\

标签

金融数据获取
评论
  • 你好,您知道如何使用FAI算力平台吗?更复杂的高频因子可以使用 FAI来实现。 给您一个例子参考: ```python import fai @fai.remote def cal_single_df(table, instrument, start_date, end_date): fields = ['num_trades','date','instrument','amount', 'price'] df = DataSource(table).read(instruments=instrument, start_date=start_date, end_date=end_date,fields=fields, bdb=False) df["date_index"] = df["date"].dt.date def handle_factor(df): def convert_period(bars, period='60s'): bars = bars.set_index('date') nbars = pd.DataFrame() nbars['open'] = bars['price'].resample(rule=period).first() nbars['high'] = bars['price'].resample(rule=period).max() nbars['low'] = bars['price'].resample(rule=period).min() nbars['close'] = bars['price'].resample(rule=period).last() nbars['1m_amount'] = bars['amount'].resample(rule=period).sum() nbars['1m_num_trades'] = bars['num_trades'].resample(rule=period).sum() return nbars[nbars['1m_amount']>0] minute_df = convert_period(df, period='60s') minute_df['ret'] = minute_df['close'].pct_change() Var = minute_df.ret.dropna().var() # 收益率方差 Skew = minute_df.ret.dropna().skew() # 收益率峰度 Kurt = minute_df.ret.dropna().kurt() # 收益率偏度 # 平均单笔成交金额 AmtPerTrd = minute_df['1m_amount'].sum() / minute_df['1m_num_trades'].sum() # 平均单笔流入金额 AmtPerTrd_InFlow = minute_df[minute_df['ret']>0]['1m_amount'].sum() / minute_df[minute_df['ret']>0]['1m_num_trades'].sum() # 平均单笔流出金额 AmtPerTrd_OutFlow = minute_df[minute_df['ret']<0]['1m_amount'].sum() / minute_df[minute_df['ret']<0]['1m_num_trades'].sum() AmtPerTrd_InFlow_ratio = AmtPerTrd_InFlow / AmtPerTrd # 平均单笔流入金额占比 AmtPerTrd_OutFlow_ratio = AmtPerTrd_OutFlow / AmtPerTrd # 平均单笔流出金额占比 minute_df['1m_AmtPerTrd'] = minute_df['1m_amount'] / minute_df['1m_num_trades'] bigorder_df = minute_df.sort_values('1m_AmtPerTrd', ascending=False).head(int(336*0.1)) # 大单资金净流入金额 Amt_netInFlow_bigOrder = bigorder_df[bigorder_df['ret']>0]['1m_amount'].sum() - bigorder_df[bigorder_df['ret']<0]['1m_amount'].sum() # 大单资金净流入率 Amt_netInFlow_bigOrder_ratio = Amt_netInFlow_bigOrder / minute_df['1m_amount'].sum() # 大单驱动涨幅 Mom_bigOrder = (bigorder_df['ret'].dropna()+1).prod() return pd.DataFrame({'Mom_bigOrder':[Mom_bigOrder], 'Var':[Var], 'Skew':[Skew], 'Kurt':[Kurt], 'AmtPerTrd':[AmtPerTrd], 'AmtPerTrd_InFlow':[AmtPerTrd_InFlow], 'AmtPerTrd_OutFlow':[AmtPerTrd_OutFlow], 'AmtPerTrd_InFlow_ratio':[AmtPerTrd_InFlow_ratio], 'AmtPerTrd_OutFlow_ratio':[AmtPerTrd_OutFlow_ratio], 'Amt_netInFlow_bigOrder':[Amt_netInFlow_bigOrder], 'Amt_netInFlow_bigOrder_ratio':[Amt_netInFlow_bigOrder_ratio], 'Mom_bigOrder':[Mom_bigOrder], }) factor_df = df.groupby(['date_index','instrument']).apply(handle_factor).reset_index() return factor_df fai.init(cluster="fai-xiaoshao-xjqpcksp.fai-xiaoshao",token="pooh1iKZIwKPuOFXvZnz48cgqV4sgdBM") print('连接集群成功!') start_date = '2021-01-01' end_date = '2021-12-31' ins = D.instruments(start_date=start_date, end_date=end_date)[:40] table = 'level2_snapshot_CN_STOCK_A' def gen_parameters(instruments, start_date, end_date): parameters = [] for ins in instruments: parameters.append({"ins": ins, "sd": start_date, "ed": end_date}) return parameters print('股票数量:', len(ins)) parameters = gen_parameters(ins, start_date, end_date) all_res = [] import time time0 = time.time() fai.log_silent(True) remainings = [cal_single_df.remote(table, p["ins"], p["sd"], p["ed"]) for p in parameters] done = 0 ready_list = [] while remainings: ready, remainings = fai.wait(remainings) ready_list+=ready done += len(ready) if done % 10 == 0: print(f"{time.time() - time0}, {done}/{len(remainings) + done}") print('计算时间:', time.time() - time0) time1 = time.time() result = pd.DataFrame() for i in range(len(ready_list)): try: tmp = fai.get(ready_list[i]) result = pd.concat([result, tmp]) except TypeError as e: pass print('合并时间:', time.time() - time1) print(result.head()) ``` \