在backtrader官网文档中,介绍了两种多周期回测方式,第一种是读入多周期的K线数据,第二种是对数据进行resample。本文将对第一种方式进行介绍。
有些策略会使用到多周期的数据,典型的应用为:
使用周线(大周期)数据判断趋势
使用日线(小周期)数据判断买卖点
这就需要同时读入多周期数据进行回测,backtrader内置了对多周期策略回测的支持。
回测时需要遵循以下规则:
小周期数据必须被第一个加入到Cerebro实例中在backtrader中,第一个被添加的数据将被作为时钟数据,因此需要将小周期数据首先添加到系统中,以使得小力度的时间都能被遍历到。
数据必须按照日期时间做好对齐,这样backtrader才能正确地使用数据backtrader在回测过程中,不会对时间进行重新排序,只能按照整理好的数据顺序依次处理,因此需要将不同周期的数据都做好对齐。很幸运,从TuShare及BaoStock下载下来的不同周期的数据均做好对齐,可以直接使用,注意数据需要按时间升序组织。
大周期数据的使用会使得策略的最小周期变大。如果要计算出技术指标的有效值,至少需要若干根K线数据,例如:计算5日均线,则至少需要前5根日K线,才能计算出第一个有效值,也就是至少经过5日,这里的5日就是backtrader里所指的最小周期。不同指标可能具有不同的最小周期,单周期回测时,策略的最小周期就是所有指标最小周期的最大值。而多周期回测中,最小周期则变得复杂。通常,要计算出大周期技术指标的有效值,策略的最小周期会变大,也就是回测开始后,要经过更多的K线来保证大周期技术指标能够计算出第一个有效值。
为了演示多周期策略回测,本文使用以下方案:
策略将先读入日线数据,再读入周线数据。回测股票为000001平安银行,回测周期为2018年1月1日至2019年12月31日。 # 加载数据 def load_data(data_info): fromdate = datetime.datetime(2018, 1, 1) todate = datetime.datetime(2019, 12, 31) datapath = './stk_data/' + data_info[0] +'/' + data_info[1] + '.csv' return bt.feeds.GenericCSVData( dataname = datapath, fromdate = fromdate, todate = todate + datetime.timedelta(days=1), nullvalue = 0.0, dtformat = ('%Y-%m-%d'), datetime = 0, open = 1, high = 2, low = 3, close = 4, volume = 5, openinterest = -1 ) for stk_code in ['sz.000001']: data_list = list(map(load_data, [('d', stk_code), ('w', stk_code)])) for data in data_list: cerebro.adddata(data)平安银行的日线及周线数据分别保存在目录 ‘./stk_data/d/sz.000001.csv’ 和 ‘./stk_data/w/sz.000001.csv’ 中,通过load_data方法先后读入日线和K线数据,保存在data_list之后,依次添加到cerebro中,等待系统回测。
买入条件:日MACD金叉、周RSI小于50;卖出条件:价格较最高收盘价回撤5%卖出。在策略类的init方法中,定义所需的技术指标:
def __init__(self): # 存储不同数据的技术指标 self.inds = dict() # 存储特定股票的订单,key为股票的代码 self.orders = dict() # 遍历所有数据 for i, d in enumerate(self.datas): self.orders[d._name] = None # 为每个数据定义字典,存储技术指标 self.inds[d] = dict() # 判断d是否为日线数据 if 0 == i % 2: self.inds[d]['crossup'] = btind.CrossUp(btind.MACD(d).macd, btind.MACD(d).signal) # d为周线数据 else: self.inds[d]['rsi'] = btind.RSI_Safe(d)定义字典self.inds,来存储不同数据的技术指标。
定义self.orders,来存储特定股票的订单,key为股票的代码。
然后遍历所有的数据,将数据的订单先置为空,并且为每个数据创建字典,来存储技术指标。由于系统是先添加日线数据,再添加周线数据,因此当i % 2等于0时,d为日线数据,那么就计算MACD金叉指标;当i % 2不等于0时,d为周线数据,那么计算RSI指标。
在策略类的next方法中,定义买入卖出条件:
def next(self): #print(self.datetime.date()) for i, d in enumerate(self.datas): # 如果处理周线数据则跳过买卖条件,因为已在日线数据判断处理过 if 1 == i % 2: continue pos = self.getposition(d) # 不在场内,则可以买入 if not len(pos): # 达到买入条件 if self.inds[d]['crossup'][0] and self.inds[self.datas[i + 1]]['rsi'][0] < 50: # 买入手数,如果是多只股票回测,这里需要修改 stake = int(self.broker.cash // (d.close[0] * 100)) * 100 # 买买买 self.buy(data = d, size = stake) elif not self.orders[d._name]: # 下保护点卖单 self.orders[d._name] = self.close(data = d, exectype= bt.Order.StopTrail, trailamount=self.p.trailamount, trailpercent=self.p.trailpercent)这里依然只对i % 2等于0的数据(即日线数据)进行条件判断,然后使用self.inds[self.datas[i + 1]][‘rsi’]的方式对周线技术指标进行访问。
当达到买入条件后下买单,订单将在第二天以开盘价成交。在计算买入仓位大小时,保证资金得到最大程度的使用。需要注意,如果是要对多只股票同时进行回测,这里需要做资金分配。
买单成交当天,下StopTrail卖单,当股价较最高收盘价回撤5%卖出(具体参加笔记20和笔记31)。这里使用close方法而不是sell方法,如果使用sell方法,股票将以1股1股的卖出,使用close则是全部卖出。
输出结果为:
2018-05-15 BUY sz.000001 EXECUTED, Price: 11.18 2018-05-25 SELL sz.000001 EXECUTED, Price: 10.56 2018-06-19 BUY sz.000001 EXECUTED, Price: 10.05 2018-06-25 SELL sz.000001 EXECUTED, Price: 9.41 2018-07-11 BUY sz.000001 EXECUTED, Price: 8.76 2018-08-02 SELL sz.000001 EXECUTED, Price: 8.95 2019-01-08 BUY sz.000001 EXECUTED, Price: 9.73 2019-03-08 SELL sz.000001 EXECUTED, Price: 12.43 2019-06-11 BUY sz.000001 EXECUTED, Price: 12.34 2019-06-25 SELL sz.000001 EXECUTED, Price: 13.11 Final Portfolio Value: 1226710.85两年间共有5笔交易,收益率为22.7%(还可以)。
backtrader可以通过加载不同周期的数据来实现多周期策略回测。
加载不同周期数据进行策略回测时,需要先加载小周期数据,后加载大周期数据。
在策略实现时,通过数据索引的取余运算来区分小、大周期数据。
多周期策略回测程序v1代码:
# 多周期 # 买入条件:日MACD金叉、周RSI小于50 # 卖出条件:价格较最高收盘价回撤5%卖出 from __future__ import (absolute_import, division, print_function, unicode_literals) import backtrader as bt import backtrader.feeds as btfeeds import backtrader.indicators as btind import datetime import pandas as pd from collections import defaultdict class RSIMACDMultiTF(bt.Strategy): params = ( ('trailamount', 0.0), ('trailpercent', 0.05), ) def __init__(self): # 存储不同数据的技术指标 self.inds = dict() # 存储特定股票的订单,key为股票的代码 self.orders = dict() # 遍历所有数据 for i, d in enumerate(self.datas): self.orders[d._name] = None # 为每个数据定义字典,存储技术指标 self.inds[d] = dict() # 判断d是否为日线数据 if 0 == i % 2: self.inds[d]['crossup'] = btind.CrossUp(btind.MACD(d).macd, btind.MACD(d).signal) # d为周线数据 else: self.inds[d]['rsi'] = btind.RSI_Safe(d) def next(self): #print(self.datetime.date()) for i, d in enumerate(self.datas): # 如果处理周线数据则跳过买卖条件,因为已在日线数据判断处理过 if 1 == i % 2: continue pos = self.getposition(d) # 不在场内,则可以买入 if not len(pos): # 达到买入条件 if self.inds[d]['crossup'][0] and self.inds[self.datas[i + 1]]['rsi'][0] < 50: # 买入手数,如果是多只股票回测,这里需要修改 stake = int(self.broker.cash // (d.close[0] * 100)) * 100 # 买买买 self.buy(data = d, size = stake) elif not self.orders[d._name]: # 下保护点卖单 self.orders[d._name] = self.close(data = d, exectype= bt.Order.StopTrail, trailamount=self.p.trailamount, trailpercent=self.p.trailpercent) def notify_order(self, order): if order.status in [order.Completed]: if order.isbuy(): print('{} BUY {} EXECUTED, Price: {:.2f}'.format(self.datetime.date(), order.data._name, order.executed.price)) else: # Sell self.orders[order.data._name] = None print('{} SELL {} EXECUTED, Price: {:.2f}'.format(self.datetime.date(), order.data._name, order.executed.price)) # 加载数据 def load_data(data_info): fromdate = datetime.datetime(2018, 1, 1) todate = datetime.datetime(2019, 12, 31) datapath = './stk_data/' + data_info[0] +'/' + data_info[1] + '.csv' return bt.feeds.GenericCSVData( dataname = datapath, fromdate = fromdate, todate = todate + datetime.timedelta(days=1), nullvalue = 0.0, dtformat = ('%Y-%m-%d'), datetime = 0, open = 1, high = 2, low = 3, close = 4, volume = 5, openinterest = -1 ) def runstrat(): cerebro = bt.Cerebro() cerebro.broker.setcash(1000000.0) cerebro.addstrategy(RSIMACDMultiTF) for stk_code in ['sz.000001']: data_list = list(map(load_data, [('d', stk_code), ('w', stk_code)])) for data in data_list: cerebro.adddata(data) cerebro.addwriter(bt.WriterFile, out = 'log.csv', csv = True) cerebro.run() print('Final Portfolio Value: %.2f' % cerebro.broker.getvalue()) # Plot the result绘制结果 cerebro.plot(start=datetime.date(2018, 1, 1), end=datetime.date(2019, 12, 31), volume = False, style = 'candle', barup = 'red', bardown = 'green') if __name__ == '__main__': runstrat()为了便于相互交流学习,已建微信群,感兴趣的读者请加微信。