# 显式导入 BigQuant 相关 SDK 模块
from bigdatasource.api import DataSource
from bigdata.api.datareader import D
from biglearning.api import M
from biglearning.api import tools as T
from biglearning.module2.common.data import Outputs
import pandas as pd
import numpy as np
import time
import math
import warnings
import datetime
import dai
import uuid
from zipline.finance.commission import PerOrder
from zipline.api import get_open_orders
from zipline.api import symbol
from bigtrader.sdk import *
from bigtrader.utils.my_collections import NumPyDeque
from bigtrader.constant import OrderType
from bigtrader.constant import Direction
# <aistudiograph>
# @param(id="m7", name="initialize")
# 交易引擎:初始化函数,只执行一次
def m7_initialize_bigquant_run(context):
# context.ranker_prediction = context.options['data'].read_bdb(as_type=pd.DataFrame)
# 系统已经设置了默认的交易手续费和滑点,要修改手续费可使用如下函数
context.set_commission(PerOrder(buy_cost=0.0003, sell_cost=0.0013, min_cost=5))
# 预测数据,通过 options 传入进来,使用 read_df 函数,加载到内存 (DataFrame)
# 设置买入的股票数量,这里买入预测股票列表排名靠前的5只
stock_count = 1
# 每只的股票的权重,如下的权重分配会使得靠前的股票分配多一点的资金,[0.339160, 0.213986, 0.169580, ..]
context.stock_weights = T.norm([1 / math.log(i + 2) for i in range(0, stock_count)])
# 设置每只股票占用的最大资金比例
context.max_cash_per_instrument = 0.51
context.options['hold_days'] = 1
# @param(id="m7", name="before_trading_start")
# 交易引擎:每个单位时间开盘前调用一次。
def m7_before_trading_start_bigquant_run(context, data):
# 盘前处理,订阅行情等
pass
# @param(id="m7", name="handle_tick")
# 交易引擎:tick数据处理函数,每个tick执行一次
def m7_handle_tick_bigquant_run(context, tick):
pass
# @param(id="m7", name="handle_data")
# 回测引擎:每日数据处理函数,每天执行一次
def m7_handle_data_bigquant_run(context, data):
# 获取当前持仓
positions = {e: p.amount * p.last_sale_price
for e, p in context.portfolio.positions.items()}
hold_days = context.options["hold_days"]
if (context.trading_day_index % hold_days != 0) & (len(positions) > 0):
return
today = data.current_dt.strftime("%Y-%m-%d")
# 按日期过滤得到今日的预测数据
ranker_prediction = context.data[
context.data.date == today]
# now_bm = context.bm_df[today]
# if now_bm == 1 & context.enable_market_risk == True:
# try:
# #大盘风控模块,读取风控数据
# for instrument in positions.keys():
# context.order_target(context.symbol(instrument), 0)
# print(today,"大盘风控止损触发,全仓卖出")
# return
# except:
# print("--!")
#当risk为1时,市场有风险,全部平仓,不再执行其它操作
# 按日期过滤得到今日的预测数据
# ranker_prediction = context.data[context.data.date == data.current_dt.strftime("%Y-%m-%d")]
cash_for_buy = min(context.portfolio.portfolio_value/2,context.portfolio.cash)
#cash_for_buy = context.portfolio.cash *(0.51 if len(positions)==0 else 1)
buy_instruments = list(ranker_prediction.instrument)
m_len = len(context.stock_weights)
sell_instruments = [instrument for instrument in context.portfolio.positions.keys()]
to_buy = set(buy_instruments[:m_len]) - set(sell_instruments)
to_sell = set(sell_instruments) - set(buy_instruments[:m_len])
for instrument in to_sell:
context.order_target(context.symbol(instrument), 0)
for ix,instrument in enumerate(to_buy):
context.order_value(context.symbol(instrument), cash_for_buy * context.stock_weights[ix])
# @param(id="m7", name="handle_trade")
# 交易引擎:成交回报处理函数,每个成交发生时执行一次
def m7_handle_trade_bigquant_run(context, trade):
pass
# @param(id="m7", name="handle_order")
# 交易引擎:委托回报处理函数,每个委托变化时执行一次
def m7_handle_order_bigquant_run(context, order):
pass
# @param(id="m7", name="after_trading")
# 交易引擎:盘后处理函数,每日盘后执行一次
def m7_after_trading_bigquant_run(context, data):
pass
def m6_run_bigquant_run(input_1, input_2, input_3):
model_ds = DataSource.write_pickle("6d2b36501e4f11efa79c22690a76d9da")
return Outputs(data_1=model_ds)
# 后处理函数,可选。
def m6_post_run_bigquant_run(outputs):
return outputs
# general_rankvol
gr12 = M.input_features.v1(
features="rankVol=where((rank(-1*return_0)<=0.5)&(price_limit_status_0==2),1,0)"
)
gr20 = M.instruments.v2(
start_date=T.live_run_param("trading_date", "2024-05-01"),
end_date=T.live_run_param("trading_date", "2024-05-28"),
market='CN_STOCK_A',
instrument_list='',
max_count=0
)
gr10 = M.general_feature_extractor.v7(
instruments=gr20.data,
features=gr12.data,
start_date='',
end_date='',
before_start_days=5
)
gr11 = M.derived_feature_extractor.v3(
input_data=gr10.data,
features=gr12.data,
date_col='date',
instrument_col='instrument',
drop_na=True,
remove_extra_columns=False
)
gr22 = M.select_columns.v3(
input_ds=gr11.data,
columns='date,instrument,rankVol',
reverse_select=False
)
gr23 = M.filter.v3(
input_data=gr22.data,
expr='(rankVol>0)&(date>"2023-01-01")',
output_left_data=False
)
# print('gr23=',gr23.data.read_df())
m3 = M.input_features.v1(
features=
"""
h03=-1*rank(hf_ret_in_7)
ns01=rank(-1*(open_0/2+close_0/2-low_0)/open_0)
ns02=rank(-1*decay_linear((high_0-low_0)/close_1,4))
ns03=rank(-1*decay_linear((high_0-low_0)/close_1,3))
nf10=rank(-1*ta_rsi_28_0)
nf15=rank(-1*ta_cci_14_0)
nf20=rank(-1*ta_atr_14_0/ta_atr_28_0)
nh01=rank(-1*hf_real_std_5m)
nh23=rank(-1*hf_trend_str_8)
nt27=group_rank(industry_sw_level1_0,-1*rank(ta_bbands_upperband_14_0/ta_bbands_upperband_28_0))
fs01=in_csi300_0+in_csi500_0+in_csi800_0
fs07=list_board_0
nh24=rank(-1*(hf_trend_str_6+turn_0))
nh26=rank(-1*(hf_trend_str_8+turn_0))
"""
)
# m9 = M.instruments.v2(
# start_date=T.live_run_param("trading_date", "2023-01-01"),
# end_date=T.live_run_param("trading_date", "2024-04-30"),
# market="CN_STOCK_A",
# instrument_list="",
# max_count=0
# )
m17 = M.general_feature_extractor.v7(
instruments=gr20.data,
features=m3.data,
start_date="",
end_date="",
before_start_days=120
)
m18 = M.derived_feature_extractor.v3(
input_data=m17.data,
features=m3.data,
date_col="date",
instrument_col="instrument",
drop_na=False,
remove_extra_columns=False
)
m14 = M.dropnan.v1(
input_data=m18.data
)
# m11 = M.filter.v3(
# input_data=m14.data,
# expr="(date>'2023-1-1')",
# output_left_data=False
# )
m19 = M.chinaa_stock_filter.v1(
input_data=m14.data,
index_constituent_cond=["全部"],
board_cond=["全部"],
industry_cond=["全部"],
st_cond=["正常"],
delist_cond=["非退市"],
output_left_data=False
)
# m19_1 = DataSource.write_df(m18.data.read_df().tail(3000))
# m2 = M.df_save_to_csv.v1(
# input_1=m19_1,
# name='T-266963.full.csv'
# )
#print('m19.data=',m19.data.read_df())
m6 = M.cached.v3(
run=m6_run_bigquant_run,
post_run=m6_post_run_bigquant_run,
input_ports="",
params="{}",
output_ports=""
)
m7_0 = M.join.v3(
data1=m19.data,
data2=gr23.data,
on='date,instrument',
how='inner',
sort=False
)
#print('after join:',m7_1.data.read_df())
m8 = M.stock_ranker_predict.v5(
model=m6.data_1,
data=m7_0.data,
m_lazy_run=False
)
# 使用replace方法替换字符串
df=m8.predictions.read()
df['instrument'] = df['instrument'].str.replace('SHA', 'SH').str.replace('SZA', 'SZ').str.replace('BJA', 'BJ')
uid = 't'+str(uuid.uuid4()).replace('-','')
#print('df=',df, 'uid=',uid)
ds = dai.DataSource.write_bdb(
data=df,
# datasource id是全局唯一的,支持小写字母、数字、下划线,以字母开始
id=uid,#"predict_data_for_old",
# 数据插入时,根据unique_together如果有重复的,会去重.如果有分区,则需要传入索引参数indexes
unique_together=["date", "instrument"],
indexes=["date"],
)
# ds = dai.DataSource.write_df(df)
print('ds=',ds)
m7 = M.bigtrader.v9(
data=ds, #m14.predictions,
start_date='',
end_date='',
initialize=m7_initialize_bigquant_run,
before_trading_start=m7_before_trading_start_bigquant_run,
handle_tick=m7_handle_tick_bigquant_run,
handle_data=m7_handle_data_bigquant_run,
handle_trade=m7_handle_trade_bigquant_run,
handle_order=m7_handle_order_bigquant_run,
after_trading=m7_after_trading_bigquant_run,
capital_base=200000,
frequency='daily',
product_type='股票',
before_start_days=0,
volume_limit=1,
order_price_field_buy='open',
order_price_field_sell='close',
benchmark='000300.SH',
plot_charts=True,
disable_cache=False,
debug=False,
backtest_only=False
)