VNPY应用入门-实盘逻辑分析


(cdbdyx) #1

还有很多功能再以后整理为笔记,这篇是近期最后更新关于VNPY的文章了。

实盘逻辑:

首先打开runCtaTrading.py,主函数执行: runChildProcess()

# 创建日志引擎
le = LogEngine()
le.setLogLevel(le.LEVEL_INFO)
le.addConsoleHandler()
le.addFileHandler()

le.info(u'启动CTA策略运行子进程')

ee = EventEngine2()
le.info(u'事件引擎创建成功')

me = MainEngine(ee)
me.addGateway(ctpGateway)
me.addApp(ctaStrategy)
le.info(u'主引擎创建成功')

ee.register(EVENT_LOG, le.processLogEvent)
ee.register(EVENT_CTA_LOG, le.processLogEvent)
ee.register(EVENT_ERROR, processErrorEvent)
le.info(u'注册日志事件监听')

首先这里我们初始化了三个引擎:日志引擎(用来记录交易日志)、事件引擎和主引擎。

日志引擎很简单,我们就不展开了,在temp文件夹下会生成每次运行的日志文件,比如说我们随便打开一个.log文件就可以看到所有的日志记录情况。

事件引擎在eventEngine.py中定义,我们可以看到有EventEngine和EventEngine2两种事件驱动引擎,两个版本的区别在于定时器通过不同的方式实现,这里我们看到初始化的是EventEngine2。

接着初始化主引擎,在vtEngine.py中定义了四个类,我们看看主引擎的初始化参数有哪些: def init(self, eventEngine): “”“Constructor”"" # 记录今日日期 self.todayDate = datetime.now().strftime(’%Y%m%d’)

# 绑定事件引擎
    self.eventEngine = eventEngine
    self.eventEngine.start()

    # 创建数据引擎
    self.dataEngine = DataEngine(self.eventEngine)

    # MongoDB数据库相关
    self.dbClient = None    # MongoDB客户端对象

    # 接口实例
    self.gatewayDict = OrderedDict()
    self.gatewayDetailList = []

    # 应用模块实例
    self.appDict = OrderedDict()
    self.appDetailList = []

    # 风控引擎实例(特殊独立对象)
    self.rmEngine = None

    # 日志引擎实例
    self.logEngine = None
    self.initLogEngine()

在三大引擎初始化完毕以后,连接CTP接口。

me.connect('CTP')
le.info(u'连接CTP接口')

接着调用主引擎下的connect函数:

def connect(self, gatewayName):
    """连接特定名称的接口"""
    gateway = self.getGateway(gatewayName)

    if gateway:
        gateway.connect()
        # 接口连接后自动执行数据库连接的任务
        self.dbConnect()

接着gateway.connect()调用ctpGateway类的connect函数:

def connect(self):
    """连接"""
    try:
        f = file(self.filePath)
    except IOError:
        log = VtLogData()
        log.gatewayName = self.gatewayName
        log.logContent = text.LOADING_ERROR
        self.onLog(log)
        return

    # 解析json文件
    setting = json.load(f)
    try:
        userID = str(setting['userID'])
        password = str(setting['password'])
        brokerID = str(setting['brokerID'])
        tdAddress = str(setting['tdAddress'])
        mdAddress = str(setting['mdAddress'])

        # 如果json文件提供了验证码
        if 'authCode' in setting: 
            authCode = str(setting['authCode'])
            userProductInfo = str(setting['userProductInfo'])
            self.tdApi.requireAuthentication = True
        else:
            authCode = None
            userProductInfo = None

    except KeyError:
        log = VtLogData()
        log.gatewayName = self.gatewayName
        log.logContent = text.CONFIG_KEY_MISSING
        self.onLog(log)
        return            

    # 创建行情和交易接口对象
    self.mdApi.connect(userID, password, brokerID, mdAddress)
    self.tdApi.connect(userID, password, brokerID, tdAddress, authCode, userProductInfo)

    # 初始化并启动查询
    self.initQuery()

首先是读取CTP_connect.json文件路径,即在CtaTrading中的文件,这个文件里面其实就是定义了userID、password、brokerID和交易与行情的服务器地址,使用模拟盘的盆友可以登录simnow注册,就会获得这四个参数,配置进CTP_connect.json中就行。然后创建行情和交易接口对象。 这里我们主要看下行情接口:

self.mdApi.connect(userID, password, brokerID, mdAddress)

这里调用CtpMdApi类的connect函数:

def connect(self, userID, password, brokerID, address):
    """初始化连接"""
    self.userID = userID                # 账号
    self.password = password            # 密码
    self.brokerID = brokerID            # 经纪商代码
    self.address = address              # 服务器地址

    # 如果尚未建立服务器连接,则进行连接
    if not self.connectionStatus:
        # 创建C++环境中的API对象,这里传入的参数是需要用来保存.con文件的文件夹路径
        path = getTempPath(self.gatewayName + '_')
        self.createFtdcMdApi(path)

        # 注册服务器地址
        self.registerFront(self.address)

        # 初始化连接,成功会调用onFrontConnected
        self.init()

    # 若已经连接但尚未登录,则进行登录
    else:
        if not self.loginStatus:
            self.login()

这里完成账户登录,但是并没有开始获取数据。

cta.loadSetting()

调用ctaEngine.py中的loadSetting函数:
def loadSetting(self): “”“读取策略配置”"" with open(self.settingfilePath) as f: l = json.load(f)

for setting in l:
            self.loadStrategy(setting)

这里读取setting的文件路径为’\examples\CtaTrading\CTA_setting.json’,我们需要在这个文件中去定义我们想要实盘交易的策略,这里我们仅配置一个策略,更改品种为IH1810:

{
    "name": "king keltner",
    "className": "KkStrategy",
    "vtSymbol": "IH1810"
}

回到runCtaTrading.py

cta.initAll()

调用ctaEngine的initAll函数:

def initAll(self):
    """全部初始化"""
    for name in self.strategyDict.keys():
        self.initStrategy(name)

调用ctaEngine的initStrategy函数:

def initStrategy(self, name):
    """初始化策略"""
    if name in self.strategyDict:
        strategy = self.strategyDict[name]

        if not strategy.inited:
            self.callStrategyFunc(strategy, strategy.onInit)
            strategy.inited = True

            self.loadSyncData(strategy)                             # 初始化完成后加载同步数据
            self.subscribeMarketData(strategy)                      # 加载同步数据后再订阅行情
        else:
            self.writeCtaLog(u'请勿重复初始化策略实例:%s' %name)
    else:
        self.writeCtaLog(u'策略实例不存在:%s' %name)

首先里面先是调用策略的onInit函数进行初始化,初始化成功以后调用subscribeMarketData函数:

def subscribeMarketData(self, strategy):
    """订阅行情"""
    # 订阅合约
    contract = self.mainEngine.getContract(strategy.vtSymbol)
    if contract:
        req = VtSubscribeReq()
        req.symbol = contract.symbol
        req.exchange = contract.exchange

        # 对于IB接口订阅行情时所需的货币和产品类型,从策略属性中获取
        req.currency = strategy.currency
        req.productClass = strategy.productClass

        self.mainEngine.subscribe(req, contract.gatewayName)
    else:
        self.writeCtaLog(u'%s的交易合约%s无法找到' %(strategy.name, strategy.vtSymbol))

接着里面调用self.mainEngine.subscribe(req, contract.gatewayName),即vtEngine.py中的mainengine的subscribe函数。

def subscribe(self, subscribeReq, gatewayName):
    """订阅特定接口的行情"""
    gateway = self.getGateway(gatewayName) 
    if gateway:
        gateway.subscribe(subscribeReq)

接着调用ctp接口的subscribe函数:

def subscribe(self, subscribeReq):
    """订阅行情"""
    self.mdApi.subscribe(subscribeReq)

接着调用mdApi的subscribe函数:

def subscribe(self, subscribeReq):
    """订阅合约"""
    # 这里的设计是,如果尚未登录就调用了订阅方法
    # 则先保存订阅请求,登录完成后会自动订阅
    if self.loginStatus:
        self.subscribeMarketData(str(subscribeReq.symbol))
    self.subscribedSymbols.add(subscribeReq)

接下来就是引用VNPY的作者写的数据流,具体的可以看文章:

用Python的交易员:一张图看懂VnTrader的数据流​zhuanlan.zhihu.com图标

这里调用底层中调用C++封装的MdApi.subscribeMarketData函数,将订阅行情的请求最终通过底层C++ CTP API发出。

C++ CTP API收到Tick推送,自动回调MdApi.onRtnDepthMarketData函数推送行情数据字典data

MdApi.onRtnDepthMarketData中将data里的数据读取并转化成VtTickData对象,并调用ctpGateway.onTick函数

ctpGateway.onTick函数将VtTickData对象包装成类型为EVENT_TICK的行情事件对象Event,并调用eventEngine.put函数,放入事件引擎的缓冲队列

事件引擎的工作线程,从缓冲队列中读取出最新的行情事件后,根据EVENT_TICK事件类型去查找缓存在内部字典中的处理函数列表,并将事件对象作为入参,遍历调用到列表中的处理函数ctaEngine.processTickEvent

ctaEngine.processTickEvent中查看Tick的代码vtSymbol,并调用交易该代码合约的策略对象strategy.onTick函数,最终去运行策略中的逻辑

这里要注意策略涉及读取历史数据,需要本地数据库存储。

最后界面是在examply->vntrader里运行run.py,但是运行时报错 ,修改代码如下:

qApp.setStyleSheet(qdarkstyle.load_stylesheet_pyqt5())