1000字范文,内容丰富有趣,学习的好帮手!
1000字范文 > VNPY量化回测框架源码解析(自顶向下)

VNPY量化回测框架源码解析(自顶向下)

时间:2023-02-11 16:06:58

相关推荐

VNPY量化回测框架源码解析(自顶向下)

前言

最近需要搭建本地的量化回测平台来满足个性化的回测需求,填补聚宽、真格等在线回测平台的缺陷。之前没有做过相关的工作,所以打算先学习一下vn.py的回测模块的框架,读一下vn.py的源码。vnpy的源码可以从github上获取

但vn.py源码的注释比较少,本蒟蒻读起来比较吃力,在参考了网络上相关的解析之后,形成自己的一套解析思路,即自顶向下解析,把学习的过程记录下来,与诸位大佬分析。

在我个人学习的过程中,主要参考了知乎张世玉的vnpy源码解析系列文章。张先生的文章介绍得更为系统和详细,我写的这篇文章主要是希望可以提纲掣领,帮助和我一样的新手快速了解vnpy回测模块的架构。

解析思路

本文是我自己在阅读源码的时候的解析思路的记录,采用自顶向下的思路来逐层剥丝抽茧。也就是先从页面分析,追踪到每一个按钮绑定的槽函数,接着分析槽函数,追踪槽函数中调用的方法,分析数据的流向,知道把整个系统贯穿起来。

主干部分

界面

启动vnpy后,弹出如下界面,点击左侧菜单栏中的回测功能,可打开回测模块。

回测模块的窗口如下,有一个开始回测按钮,我们这里就分析点击开始回测按钮,会触发什么样的一系列程序。

界面源码

回测窗口的界面源码就在vnpy\app\cta_backtester\ui\widget.py中

该文件中定义了一个BacktesterManager类,该类是窗口类的子类,我们可以在其中查找“开始回测”按钮相关的语句。

我们找到了开始回测按钮定义的位置,发现点击开始回测按钮后,触发的是self.start_backtesting函数,于是我们转入分析BacktesterManager.start_backtesting函数。

BacktesterManager.start_backtesting函数

我们很快定位到了start_backtesting函数。

接下来我们分析start_backtesting函数中的代码。

可以看到,这个函数一开始是从窗口的输入框中读取用户输入的配置。对应窗口中左上角的位置。最后的save_json只是把配置以json的格式保存到了一个文件中。

而后面的几行代码,调用了BacktestingSettingEditor,主要是给用户修改策略中的参数。

BacktestingSettingEditor会弹出一个Dialog,内容如下,是配置策略中的参数的。

而在start_backtesting函数的最后,调用了backtester_engine下面的另一个start_backtesting函数。所以我们下面要分析这个新的start_backtesting函数。

BacktesterEngine.start_backtesting

最终新的start_backtesting函数,发现其是定义在vnpy\app\cta_backtester\engine.py文件中的BacktesterEngine类中。

找到该目录下的start_backtesting函数,发现其只是创建了一个子线程,该线程执行的是run_backtesting函数。

def start_backtesting(self,class_name: str,vt_symbol: str,interval: str,start: datetime,end: datetime,rate: float,slippage: float,size: int,pricetick: float,capital: int,inverse: bool,setting: dict):if self.thread:self.write_log("已有任务在运行中,请等待完成")return Falseself.write_log("-" * 40)self.thread = Thread(target=self.run_backtesting,args=(class_name,vt_symbol,interval,start,end,rate,slippage,size,pricetick,capital,inverse,setting))self.thread.start()return True

BacktesterEngine.run_backtesting

继续追踪到run_backtesting函数,这个还是调用了其他的函数。主要是调用了engine对象的add_strategy()、load_data()和run_backtesting()函数。继续追踪,发现engine是BacktestingEngine类的一个对象,而该类是定义在vnpy-master\vnpy\app\cta_strategy\backtesting.py文件中的。

BacktestingEngine.add_strategy

该函数是将策略文件加载进来。在vnpy中,所有的策略文件都放在一个文件夹中,如下:

在上面的介绍的BacktesterEngine中有加载策略类的函数,该函数会在窗口初始化的时候被调用,把该上图目录下的策略文件以“类”的形式存储在字典变量BacktesterEngine.classes中。而add_strategy便是要实例化策略类,生成一个策略对象self.strategy.

上图便是把目录下的策略文件全部加载到self.classes的核心语句,该函数在vnpy-master\vnpy\app\cta_backtester\engine.py中。

下图是BacktestingEngine.add_strategy的代码,可以看到,该函数根据策略类和相关的参数实例化了一个策略对象。

下图是策略类的初始化函数,可以看到,其参数便是add_strategy中传入的参数。

BacktestingEngine.load_data

load_data,顾名思义,就是加载数据。该函数就是从数据库中加载历史数据,保存到self.history_data中。同时该函数调用了load_bar_data和load_tick_data两个函数,这两个函数比较麻烦,被封装的层数比较多,但也就是从数据库中获取数据而已,只是对sql语句的封装,所以这里不对这两个函数展开讲了。

def load_data(self):""""""self.output("开始加载历史数据")if not self.end:self.end = datetime.now()if self.start >= self.end:self.output("起始日期必须小于结束日期")returnself.history_data.clear() # Clear previously loaded history data# Load 30 days of data each time and allow for progress updateprogress_delta = timedelta(days=30)total_delta = self.end - self.startinterval_delta = INTERVAL_DELTA_MAP[self.interval]start = self.startend = self.start + progress_deltaprogress = 0while start < self.end:end = min(end, self.end) # Make sure end time stays within set rangeif self.mode == BacktestingMode.BAR:data = load_bar_data(#这里调用了load_bar_data函数,从数据库中获取数据self.symbol,self.exchange,self.interval,start,end)else:data = load_tick_data(self.symbol,self.exchange,start,end)self.history_data.extend(data)progress += progress_delta / total_deltaprogress = min(progress, 1)progress_bar = "#" * int(progress * 10)self.output(f"加载进度:{progress_bar} [{progress:.0%}]")start = end + interval_deltaend += (progress_delta + interval_delta)self.output(f"历史数据加载完成,数据量:{len(self.history_data)}")

BacktestingEngine.run_backtesting

下面我们看回测引擎中最核心的部分。具体看我添加的中文注释。

def run_backtesting(self):""""""if self.mode == BacktestingMode.BAR:#策略是基于bar还是tick,这里以bar为例func = self.new_barelse:func = self.new_tickself.strategy.on_init()#策略的初始化函数,后面会详细讨论,该函数给self.days和self.callback赋了值。# Use the first [days] of history data for initializing strategyday_count = 1ix = 0for ix, data in enumerate(self.history_data):#遍历历史数据,给策略初始化if self.datetime and data.datetime.day != self.datetime.day:day_count += 1if day_count >= self.days:breakself.datetime = data.datetimetry:self.callback(data)#回调函数,实际上是策略类的on_bar函数except Exception:self.output("触发异常,回测终止")self.output(traceback.format_exc())returnself.strategy.inited = Trueself.output("策略初始化完成")self.strategy.on_start()self.strategy.trading = True#更改状态self.output("开始回放历史数据")# Use the rest of history data for running backtestingfor data in self.history_data[ix:]:#遍历历史数据try:func(data)#调用了self.new_barexcept Exception:self.output("触发异常,回测终止")self.output(traceback.format_exc())returnself.output("历史数据回放结束")

主要由三个关键的函数strategy.on_init()、callback()、func()(如果策略是基于bar的,func也就是self.new_bar),我们一个个分析.

strategy.on_init

我们以双均线策略中的on_init为例,可以看到其主要调用了load_bar函数。

而load_bar函数是在策略模板类(即策略类的父类)中定义的。如下图,我们可以看到,在load_bar函数中,将callback赋值为self.on_bar函数,再调用回测引擎中的load_bar函数。

我们追踪到回测引擎(也就是前面的BacktestingEngine)中的load_bar函数,该函数就是给self.callback和self.days赋值,其中days就是strategy.on_init中调用时传入的10,而callback就是在策略模板类中的load_bar调用时设置成的self.on_bar.

on_init函数是为了初始化策略,提前将10天的数据加载进来,方便后面计算均线等指标。

callback(strategy.on_bar)

上面分析了,callback就是策略类中的on_bar函数,所以我们查看策略类的on_bar函数。这里以双均线策略为例。

def on_bar(self, bar: BarData):"""Callback of new bar data update."""am = self.amam.update_bar(bar)if not am.inited:returnfast_ma = am.sma(self.fast_window, array=True)self.fast_ma0 = fast_ma[-1]self.fast_ma1 = fast_ma[-2]slow_ma = am.sma(self.slow_window, array=True)self.slow_ma0 = slow_ma[-1]self.slow_ma1 = slow_ma[-2]cross_over = self.fast_ma0 > self.slow_ma0 and self.fast_ma1 < self.slow_ma1cross_below = self.fast_ma0 < self.slow_ma0 and self.fast_ma1 > self.slow_ma1if cross_over:if self.pos == 0:self.buy(bar.close_price, 1)elif self.pos < 0:self.cover(bar.close_price, 1)self.buy(bar.close_price, 1)elif cross_below:if self.pos == 0:self.short(bar.close_price, 1)elif self.pos > 0:self.sell(bar.close_price, 1)self.short(bar.close_price, 1)self.put_event()

可以看出,策略类中的on_bar函数里编写的就是我们策略的逻辑。根据信号,进行sell、buy等交易操作。而在on_bar中,还有一个am对象和am.update_bar()函数,这个我们放在后面分析,这里先看一下self.buy等函数。我们以buy为例。

strategy.on_buy

该函数的内容上图所示,其调用了send_order函数。该函数根据trading的值来做选择。而trading的值是在BacktestingEngine.run_backtesting中,策略初始化结束后,正式回测之前被置为True。所以在初始化阶段,该函数返回空列表,在回测阶段,该函数调用回测引擎的send_order函数。

BacktestingEngine.send_order

现在我们回到BacktestingEngine,分析其中的send_order类型。发现这个函数会根据订单的类型(stop还是limit),选择调用不同的函数。

BacktestingEngine.send_limit_order

我们以现价单为例,其函数内容如下:

该函数根据传入的参数,生成了一个订单对象,并将该对象存储到了active_limit_orders和limit_orders中。而vt_orderid属性则是根据订单的序号等信息生成的一个字符串,没有什么特殊的含义。

func

如果策略是基于bar的,那个func函数也就是new_bar函数。该函数会在遍历历史数据的时候被重复调用,我们查看new_bar函数。

func中调用了cross_limit_order和cross_stop_order,这两个函数是根据最新价来撮合成交的,订单信息就保存在上面提到的self.active_limit_orders中,同时更改仓位信息,把成交的信息保存在self.trades中。

def cross_limit_order(self):"""Cross limit order with last bar/tick data."""if self.mode == BacktestingMode.BAR:long_cross_price = self.bar.low_priceshort_cross_price = self.bar.high_pricelong_best_price = self.bar.open_priceshort_best_price = self.bar.open_priceelse:long_cross_price = self.tick.ask_price_1short_cross_price = self.tick.bid_price_1long_best_price = long_cross_priceshort_best_price = short_cross_pricefor order in list(self.active_limit_orders.values()):# Push order update with status "not traded" (pending).if order.status == Status.SUBMITTING:order.status = Status.NOTTRADEDself.strategy.on_order(order)# Check whether limit orders can be filled.long_cross = (order.direction == Direction.LONGand order.price >= long_cross_priceand long_cross_price > 0)short_cross = (order.direction == Direction.SHORTand order.price <= short_cross_priceand short_cross_price > 0)if not long_cross and not short_cross:continue# Push order udpate with status "all traded" (filled).order.traded = order.volumeorder.status = Status.ALLTRADEDself.strategy.on_order(order)self.active_limit_orders.pop(order.vt_orderid)# Push trade updateself.trade_count += 1if long_cross:trade_price = min(order.price, long_best_price)pos_change = order.volumeelse:trade_price = max(order.price, short_best_price)pos_change = -order.volumetrade = TradeData(symbol=order.symbol,exchange=order.exchange,orderid=order.orderid,tradeid=str(self.trade_count),direction=order.direction,offset=order.offset,price=trade_price,volume=order.volume,datetime=self.datetime,gateway_name=self.gateway_name,)self.strategy.pos += pos_change#更改仓位信息self.strategy.on_trade(trade)self.trades[trade.vt_tradeid] = trade#保存交易数据

而同时,new_bar函数还调用了策略的on_bar函数,上面已经分析过,on_bar函数会判断交易信号,产生订单,保存到self.active_limit_orders中。所以这个new_bar函数就是不断地根据新的bar来撮合已有的还未成交的订单成交,同时将bar传送给策略,策略产生新的订单。

new_bar中最后的一行,update_daily_close只是记录每个交易日的收盘价,没有特殊的内容。

至此,回测框架的流程基本上就分析完了。还有上面提到的策略中的am对象和am.update_bar()函数。

ArrayManager

策略中的am对象实际上是ArrayManager的实例化,该类可以存储历史数据列表,同时封装了一些常用的指标函数,如sma等。

而update_bar函数就是把最新的bar添加进去,把较远的bar删掉(也就是移动窗口,窗口的长度默认为100个bar)。

至此,我们已经把vnpy回测框架中最核心的部分都提取出来了。后面就是根据self.trades中的成交数据,计算损益等等指标了,这部分的内容等过几天再更新。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。