自从StockQuant
开源以来,受到了很多朋友的喜爱,也有很多朋友提出了一些珍贵的建议,我个人其实一直也是想要继续完善,无奈事务繁多且我的想法总是在变化。我想就让它保持那样的简单的面貌吧。前段时间写了一个StockQuant_Pro
,主要是结合自己在开发量化交易系统方面的经验,做一个股票方面的事件驱动的量化交易系统。当然了,因为A股的交易机制以及政策方面的限制,所以其实我们能够做的还是比较有限,希望以后在这一块能够迎来更为宽松的政策吧,到时候我们就可以更愉快地玩耍啦。
一、模板文件
1.策略文件
from stockquant_pro.entrance import * # 导入模块
class Strategy:
def __init__(self):
self.kline = RestApi.get_history_kline(code="sh600519", interval="1d", start_date="2021-01-01")
Event(
codes=["sh600519", "sh600928", "sz000002"], # 订阅的标的列表
channels=["index", "orderbook", "trade", "kline.1d", "kline.1m"],
index_update_callback=self.on_event_index_update_callback, # 指数更新回调函数
orderbook_update_callback=self.on_event_orderbook_update_callback, # 订单簿数据更新回调函数
kline_update_callback=self.on_event_kline_update_callback, # k线数据更新回调函数
trade_update_callback=self.on_event_trade_update_callback, # 逐笔成交数据更新回调函数
)
@async_method_locker("on_event_index_update_callback.lock", wait=False)
async def on_event_index_update_callback(self, index: Index):
"""指数更新"""
logger.debug("index:", index, caller=self)
@async_method_locker("on_event_orderbook_update_callback.lock", wait=False)
async def on_event_orderbook_update_callback(self, orderbook: Orderbook):
"""订单簿更新"""
logger.debug("orderbook:", orderbook, caller=self)
@async_method_locker("on_event_trade_update_callback.lock", wait=False)
async def on_event_trade_update_callback(self, trade: Trade):
"""成交数据更新"""
logger.debug("trade:", trade, caller=self)
@async_method_locker("on_event_kline_update_callback.lock", wait=False)
async def on_event_kline_update_callback(self, kline: Kline):
"""k线更新"""
logger.debug("kline:", kline, caller=self)
if __name__ == '__main__':
# 启动框架,载入配置文件,初始化日志设置,执行入口函数
Quant.start("config.json", Strategy)
2.配置文件
{
"LOG": {
"level": "debug",
"path": "./logs",
"name": "error.log",
"console": true,
"backup_count": 100000,
"clear": false
},
"DINGTALK": "https://oapi.dingtalk.com/robot/send?access_token=a167ab94db"
}
3.requirements.txt
aiohttp==3.7.4.post0
motor==2.4.0
baostock==0.8.8
pandas==1.2.4
二、简要介绍
策略启动之后,系统底层会自动获取数据(默认1s/1次),并且将数据处理后通过回调的方式推送给策略使用。数据一共有4种:
- 指数数据(上证指数与深成指数)
- 个股5档盘口数据
- 个股tick数据
- K线数据
各种数据的数据结构如下:
-
Index
:symbol: str
code: str
price: float
change: float
ratio: float
volume: float
turnover: float
timestamp: str
-
Trade
:symbol: str
code: str
price: float
quantity: float
timestamp: str
-
Orderbook
:symbol: str
code: str
asks: list
bids: list
timestamp: str
-
Kline
:symbol: str
code: str
interval: str
open: float
high: float
low: float
close: float
volume: float
timestamp: str
klines: list
其中,K线数据可以通过RestApi.get_history_kline()
方法来通过BaoStock
获取历史K线数据,但是它只能获取到截止到昨天的数据,实时的盘中数据是无法获取的,市面上的数据源大体如此,即使有能够获取盘中K线数据的,也肯定价格不菲。所以我们系统底层是创建了一个定时执行的协程任务,每隔30秒
根据tick
数据合成一次实时的盘中K线数据,比如1分钟
k线或者其他的任意的分钟、小时级K线数据,这只需要在订阅频道时指定获取的K线的周期就可以了。除此之外,系统底层会自动地每秒推送一次实时的日k线数据,就像Websocket
推送的实时K线数据一样,它就是最新的数据,我们可以将之与历史K线合成,这样就能获取实时的包含最新k线数据的日K线数据。
三、所有数据自动持久化
订阅的所有行情数据,都会自动持久化到本地磁盘,会在当前项目目录下自动创建对应的文件夹,然后每个文件都会带有日期时间戳,非常适合在服务器上永久运行,从而一直能够保存数据,而不需要手动处理和干预等等。
持久化的数据当然也会保存盘中合成的诸如1分钟K线等等,另外,我们提供了一个接口来获取昨日保存的k线数据,这样即使在开盘时我们也能获取到足够的1分钟k线来计算指标啦。
四、一个简单的双均线策略示例
"""
此策略简单演示了如何编写一个双均线策略,并且推送策略信号到钉钉;
策略启动时获取一次历史日k线数据(不含当日K线),K线更新回调函数会每秒推送一次当日K线,将二者合成即可;
上证指数、深成指数,相关标的的五档盘口数据、tick数据,也是每秒更新,并异步执行相应的回调函数;
非交易日或非交易时间段不会获取数据,会打印相关的debug级别的提示日志;
分钟或小时级别的K线是根据trade数据按指定时间粒度进行采样合成,每30秒推送一次更新数据;
所有数据都自动持久化到当前项目目录下指定的文件夹中;
启动之后永久性运行,除非显式调用`Quant.stop()`方法;
"""
import pandas as pd
from stockquant_pro.entrance import * # 导入模块
class Strategy:
def __init__(self):
""" 类初始化
"""
self.price = 0 # 开仓价格
self.quantity = 0 # 持仓数量
self.asset = 10000 # 总资金
self.cross_over = False
self.cross_down = False
self.kline = RestApi.get_history_kline(code="sh600519", interval="1d", start_date="2021-01-01")
Event(
codes=["sh600519", "sh600928", "sz000002"], # 订阅的标的列表
channels=["index", "orderbook", "trade", "kline.1d", "kline.1m"],
index_update_callback=self.on_event_index_update_callback, # 指数更新回调函数
orderbook_update_callback=self.on_event_orderbook_update_callback, # 订单簿数据更新回调函数
kline_update_callback=self.on_event_kline_update_callback, # k线数据更新回调函数
trade_update_callback=self.on_event_trade_update_callback, # 逐笔成交数据更新回调函数
)
@async_method_locker("on_event_index_update_callback.lock", wait=False)
async def on_event_index_update_callback(self, index: Index):
"""指数更新"""
logger.debug("index:", index, caller=self)
@async_method_locker("on_event_orderbook_update_callback.lock", wait=False)
async def on_event_orderbook_update_callback(self, orderbook: Orderbook):
"""订单簿更新"""
logger.debug("orderbook:", orderbook, caller=self)
if orderbook.code != "sh600519":
return
# 取买一和卖一的平均值作为当前最新价格
price = (orderbook.asks[0][0] + orderbook.bids[0][0]) / 2
if self.cross_over and self.quantity == 0:
self.quantity = self.asset / price
self.price = price
content = "### 策略信号推送\n\n" \
"> **策略名称:** {stg_name}\n\n" \
"> **股票名称:** {code}\n\n" \
"> **信号名称:** {signal}\n\n" \
"> **交易数量:** {amount}\n\n" \
"> **交易价格:** {price}\n\n" \
"> **时间戳:** {timestamp}".format(
stg_name="双均线策略",
code=orderbook.symbol,
signal="金叉买入",
amount=self.quantity,
price=price,
timestamp=orderbook.timestamp
)
await Dingtalk.markdown(content)
elif self.cross_down and self.quantity > 0:
profit = (price - self.price) * self.quantity
self.asset += profit
content = "### 策略信号推送\n\n" \
"> **策略名称:** {stg_name}\n\n" \
"> **股票名称:** {code}\n\n" \
"> **信号名称:** {signal}\n\n" \
"> **交易数量:** {amount}\n\n" \
"> **交易价格:** {price}\n\n" \
"> **交易利润:** {profit}\n\n" \
"> **当前资金:** {asset}\n\n" \
"> **时间戳:** {timestamp}".format(
stg_name="双均线策略",
code=orderbook.symbol,
signal="死叉卖出",
amount=self.quantity,
price=price,
profit=profit,
asset=self.asset,
timestamp=orderbook.timestamp
)
await Dingtalk.markdown(content)
self.quantity, self.price = 0, 0
elif price <= self.price * 0.9 and self.quantity > 0:
profit = (price - self.price) * self.quantity
self.asset += profit
content = "### 策略信号推送\n\n" \
"> **策略名称:** {stg_name}\n\n" \
"> **股票名称:** {code}\n\n" \
"> **信号名称:** {signal}\n\n" \
"> **交易数量:** {amount}\n\n" \
"> **交易价格:** {price}\n\n" \
"> **交易利润:** {profit}\n\n" \
"> **当前资金:** {asset}\n\n" \
"> **时间戳:** {timestamp}".format(
stg_name="双均线策略",
code=orderbook.symbol,
signal="止损卖出",
amount=self.quantity,
price=price,
profit=profit,
asset=self.asset,
timestamp=orderbook.timestamp
)
await Dingtalk.markdown(content)
self.quantity, self.price = 0, 0
@async_method_locker("on_event_trade_update_callback.lock", wait=False)
async def on_event_trade_update_callback(self, trade: Trade):
"""成交数据更新"""
logger.debug("trade:", trade, caller=self)
@async_method_locker("on_event_kline_update_callback.lock", wait=False)
async def on_event_kline_update_callback(self, kline: Kline):
"""k线更新"""
logger.debug("kline:", kline, caller=self)
if kline.code != "sh600519" or kline.interval != "1d":
return
# 合成K线,即每秒都会重新计算指标
self.kline.append([kline.timestamp, kline.open, kline.high, kline.low, kline.close, kline.volume])
# 将获取的数据转换成矩阵,使用pandas直接计算均线指标;使用Talib也可。
df = pd.DataFrame(self.kline, columns=["timestamp", "open", "high", "low", "close", "volume"])
ma20 = df["close"].rolling(20).mean()
ma30 = df["close"].rolling(30).mean()
# 若金叉
if ma20.iloc[-2] < ma30.iloc[-2] and ma20.iloc[-1] >= ma30.iloc[-1]:
self.cross_over = True
# 若死叉
elif ma20.iloc[-2] > ma30.iloc[-2] and ma20.iloc[-1] <= ma30.iloc[-1]:
self.cross_down = True
# 去除列表中的最后一项数据,以便下次合成
self.kline.pop(-1)
if __name__ == '__main__':
# 启动框架,载入配置文件,初始化日志设置,执行入口函数
Quant.start("config.json", Strategy)
策略的简单说明在文件开头部分的多行注释中有一些介绍,可以大致参考一下。
五、回测功能
以上介绍的都是为交易时间段内的实盘情景服务的,在非交易时间日或时间段,策略会打印日志提示当前非交易时间段,直到市场开盘才会继续获取的发布数据等等。
之前我们在使用StockQuant
时,都是盘后才去遍历一些历史数据然后计算指标产生策略信号等等,因为我们使用的数据源无法实时获取盘中的K线数据等等。但是Pro
版是可以盘中获取各种周期的K线数据的,所以我们可以盘中就计算指标,产生策略信号,总之,思路是不一样的。
针对回测,之前写过一个事件驱动类型的回测模块,例如我们在日k线上进行回测,但是我们可以同时使用比如1分钟或者5分钟k线数据来作为底层周期的数据,从而使我们的回测更为精确。后续会将之合成到我们的StockQuant_Pro
中来。
六、A股历史数据服务
针对一些做模型训练或数据分析的需求,我们计划上线A股历史K线数据服务,包括上证50、沪深300、中证500的盘口10档的历史数据和逐笔成交数据等等。敬请期待。
七、展望
后续计划挖掘用户需求进而将这套系统更完善,欢迎提供建议。
Gary-Hertel
2021-06-12