StockQuant_Pro——事件驱动的A股量化交易系统

自从StockQuant开源以来,受到了很多朋友的喜爱,也有很多朋友提出了一些珍贵的建议,我个人其实一直也是想要继续完善,无奈事务繁多且我的想法总是在变化。我想就让它保持那样的简单的面貌吧。前段时间写了一个StockQuant_Pro,主要是结合自己在开发量化交易系统方面的经验,做一个股票方面的事件驱动的量化交易系统。当然了,因为A股的交易机制以及政策方面的限制,所以其实我们能够做的还是比较有限,希望以后在这一块能够迎来更为宽松的政策吧,到时候我们就可以更愉快地玩耍啦。

一、模板文件

1.策略文件

from stockquant_pro.entrance import *         # 导入模块


class Strategy:

    def __init__(self):

        self.kline = RestApi.get_history_kline(code="sh600519", interval="1d", start_date="2021-01-01")

        Event(
            codes=["sh600519", "sh600928", "sz000002"],                          # 订阅的标的列表
            channels=["index", "orderbook", "trade", "kline.1d", "kline.1m"],
            index_update_callback=self.on_event_index_update_callback,           # 指数更新回调函数
            orderbook_update_callback=self.on_event_orderbook_update_callback,   # 订单簿数据更新回调函数
            kline_update_callback=self.on_event_kline_update_callback,           # k线数据更新回调函数
            trade_update_callback=self.on_event_trade_update_callback,           # 逐笔成交数据更新回调函数
        )

    @async_method_locker("on_event_index_update_callback.lock", wait=False)
    async def on_event_index_update_callback(self, index: Index):
        """指数更新"""
        logger.debug("index:", index, caller=self)

    @async_method_locker("on_event_orderbook_update_callback.lock", wait=False)
    async def on_event_orderbook_update_callback(self, orderbook: Orderbook):
        """订单簿更新"""
        logger.debug("orderbook:", orderbook, caller=self)

    @async_method_locker("on_event_trade_update_callback.lock", wait=False)
    async def on_event_trade_update_callback(self, trade: Trade):
        """成交数据更新"""
        logger.debug("trade:", trade, caller=self)

    @async_method_locker("on_event_kline_update_callback.lock", wait=False)
    async def on_event_kline_update_callback(self, kline: Kline):
        """k线更新"""
        logger.debug("kline:", kline, caller=self)


if __name__ == '__main__':

    # 启动框架,载入配置文件,初始化日志设置,执行入口函数
    Quant.start("config.json", Strategy)

2.配置文件

{
    "LOG": {
        "level": "debug",
        "path": "./logs",
        "name": "error.log",
        "console": true,
        "backup_count": 100000,
        "clear": false
    },
    "DINGTALK": "https://oapi.dingtalk.com/robot/send?access_token=a167ab94db"
}

3.requirements.txt

aiohttp==3.7.4.post0
motor==2.4.0
baostock==0.8.8
pandas==1.2.4

二、简要介绍

策略启动之后,系统底层会自动获取数据(默认1s/1次),并且将数据处理后通过回调的方式推送给策略使用。数据一共有4种:

  • 指数数据(上证指数与深成指数)
  • 个股5档盘口数据
  • 个股tick数据
  • K线数据

各种数据的数据结构如下:

  • Index
    • symbol: str
    • code: str
    • price: float
    • change: float
    • ratio: float
    • volume: float
    • turnover: float
    • timestamp: str
  • Trade
    • symbol: str
    • code: str
    • price: float
    • quantity: float
    • timestamp: str
  • Orderbook
    • symbol: str
    • code: str
    • asks: list
    • bids: list
    • timestamp: str
  • Kline
    • symbol: str
    • code: str
    • interval: str
    • open: float
    • high: float
    • low: float
    • close: float
    • volume: float
    • timestamp: str
    • klines: list

其中,K线数据可以通过RestApi.get_history_kline()方法来通过BaoStock获取历史K线数据,但是它只能获取到截止到昨天的数据,实时的盘中数据是无法获取的,市面上的数据源大体如此,即使有能够获取盘中K线数据的,也肯定价格不菲。所以我们系统底层是创建了一个定时执行的协程任务,每隔30秒根据tick数据合成一次实时的盘中K线数据,比如1分钟k线或者其他的任意的分钟、小时级K线数据,这只需要在订阅频道时指定获取的K线的周期就可以了。除此之外,系统底层会自动地每秒推送一次实时的日k线数据,就像Websocket推送的实时K线数据一样,它就是最新的数据,我们可以将之与历史K线合成,这样就能获取实时的包含最新k线数据的日K线数据。

三、所有数据自动持久化

订阅的所有行情数据,都会自动持久化到本地磁盘,会在当前项目目录下自动创建对应的文件夹,然后每个文件都会带有日期时间戳,非常适合在服务器上永久运行,从而一直能够保存数据,而不需要手动处理和干预等等。持久化的数据当然也会保存盘中合成的诸如1分钟K线等等,另外,我们提供了一个接口来获取昨日保存的k线数据,这样即使在开盘时我们也能获取到足够的1分钟k线来计算指标啦。

四、一个简单的双均线策略示例

"""
此策略简单演示了如何编写一个双均线策略,并且推送策略信号到钉钉;
策略启动时获取一次历史日k线数据(不含当日K线),K线更新回调函数会每秒推送一次当日K线,将二者合成即可;
上证指数、深成指数,相关标的的五档盘口数据、tick数据,也是每秒更新,并异步执行相应的回调函数;
非交易日或非交易时间段不会获取数据,会打印相关的debug级别的提示日志;
分钟或小时级别的K线是根据trade数据按指定时间粒度进行采样合成,每30秒推送一次更新数据;
所有数据都自动持久化到当前项目目录下指定的文件夹中;
启动之后永久性运行,除非显式调用`Quant.stop()`方法;
"""
import pandas as pd

from stockquant_pro.entrance import *         # 导入模块


class Strategy:

    def __init__(self):
        """ 类初始化
        """
        self.price = 0          # 开仓价格
        self.quantity = 0       # 持仓数量
        self.asset = 10000      # 总资金
        self.cross_over = False
        self.cross_down = False

        self.kline = RestApi.get_history_kline(code="sh600519", interval="1d", start_date="2021-01-01")

        Event(
            codes=["sh600519", "sh600928", "sz000002"],                          # 订阅的标的列表
            channels=["index", "orderbook", "trade", "kline.1d", "kline.1m"],
            index_update_callback=self.on_event_index_update_callback,           # 指数更新回调函数
            orderbook_update_callback=self.on_event_orderbook_update_callback,   # 订单簿数据更新回调函数
            kline_update_callback=self.on_event_kline_update_callback,           # k线数据更新回调函数
            trade_update_callback=self.on_event_trade_update_callback,           # 逐笔成交数据更新回调函数
        )

    @async_method_locker("on_event_index_update_callback.lock", wait=False)
    async def on_event_index_update_callback(self, index: Index):
        """指数更新"""
        logger.debug("index:", index, caller=self)

    @async_method_locker("on_event_orderbook_update_callback.lock", wait=False)
    async def on_event_orderbook_update_callback(self, orderbook: Orderbook):
        """订单簿更新"""
        logger.debug("orderbook:", orderbook, caller=self)

        if orderbook.code != "sh600519":
            return

        # 取买一和卖一的平均值作为当前最新价格
        price = (orderbook.asks[0][0] + orderbook.bids[0][0]) / 2

        if self.cross_over and self.quantity == 0:
            self.quantity = self.asset / price
            self.price = price
            content = "### 策略信号推送\n\n" \
                      "> **策略名称:** {stg_name}\n\n" \
                      "> **股票名称:** {code}\n\n" \
                      "> **信号名称:** {signal}\n\n" \
                      "> **交易数量:** {amount}\n\n" \
                      "> **交易价格:** {price}\n\n" \
                      "> **时间戳:** {timestamp}".format(
                        stg_name="双均线策略",
                        code=orderbook.symbol,
                        signal="金叉买入",
                        amount=self.quantity,
                        price=price,
                        timestamp=orderbook.timestamp
            )
            await Dingtalk.markdown(content)

        elif self.cross_down and self.quantity > 0:
            profit = (price - self.price) * self.quantity
            self.asset += profit
            content = "### 策略信号推送\n\n" \
                      "> **策略名称:** {stg_name}\n\n" \
                      "> **股票名称:** {code}\n\n" \
                      "> **信号名称:** {signal}\n\n" \
                      "> **交易数量:** {amount}\n\n" \
                      "> **交易价格:** {price}\n\n" \
                      "> **交易利润:** {profit}\n\n" \
                      "> **当前资金:** {asset}\n\n" \
                      "> **时间戳:** {timestamp}".format(
                        stg_name="双均线策略",
                        code=orderbook.symbol,
                        signal="死叉卖出",
                        amount=self.quantity,
                        price=price,
                        profit=profit,
                        asset=self.asset,
                        timestamp=orderbook.timestamp
            )
            await Dingtalk.markdown(content)
            self.quantity, self.price = 0, 0

        elif price <= self.price * 0.9 and self.quantity > 0:
            profit = (price - self.price) * self.quantity
            self.asset += profit
            content = "### 策略信号推送\n\n" \
                      "> **策略名称:** {stg_name}\n\n" \
                      "> **股票名称:** {code}\n\n" \
                      "> **信号名称:** {signal}\n\n" \
                      "> **交易数量:** {amount}\n\n" \
                      "> **交易价格:** {price}\n\n" \
                      "> **交易利润:** {profit}\n\n" \
                      "> **当前资金:** {asset}\n\n" \
                      "> **时间戳:** {timestamp}".format(
                        stg_name="双均线策略",
                        code=orderbook.symbol,
                        signal="止损卖出",
                        amount=self.quantity,
                        price=price,
                        profit=profit,
                        asset=self.asset,
                        timestamp=orderbook.timestamp
            )
            await Dingtalk.markdown(content)
            self.quantity, self.price = 0, 0

    @async_method_locker("on_event_trade_update_callback.lock", wait=False)
    async def on_event_trade_update_callback(self, trade: Trade):
        """成交数据更新"""
        logger.debug("trade:", trade, caller=self)

    @async_method_locker("on_event_kline_update_callback.lock", wait=False)
    async def on_event_kline_update_callback(self, kline: Kline):
        """k线更新"""
        logger.debug("kline:", kline, caller=self)

        if kline.code != "sh600519" or kline.interval != "1d":
            return

        # 合成K线,即每秒都会重新计算指标
        self.kline.append([kline.timestamp, kline.open, kline.high, kline.low, kline.close, kline.volume])

        # 将获取的数据转换成矩阵,使用pandas直接计算均线指标;使用Talib也可。
        df = pd.DataFrame(self.kline, columns=["timestamp", "open", "high", "low", "close", "volume"])
        ma20 = df["close"].rolling(20).mean()
        ma30 = df["close"].rolling(30).mean()

        # 若金叉
        if ma20.iloc[-2] < ma30.iloc[-2] and ma20.iloc[-1] >= ma30.iloc[-1]:
            self.cross_over = True

        # 若死叉
        elif ma20.iloc[-2] > ma30.iloc[-2] and ma20.iloc[-1] <= ma30.iloc[-1]:
            self.cross_down = True

        # 去除列表中的最后一项数据,以便下次合成
        self.kline.pop(-1)


if __name__ == '__main__':

    # 启动框架,载入配置文件,初始化日志设置,执行入口函数
    Quant.start("config.json", Strategy)

策略的简单说明在文件开头部分的多行注释中有一些介绍,可以大致参考一下。

五、回测功能

以上介绍的都是为交易时间段内的实盘情景服务的,在非交易时间日或时间段,策略会打印日志提示当前非交易时间段,直到市场开盘才会继续获取的发布数据等等。

之前我们在使用StockQuant时,都是盘后才去遍历一些历史数据然后计算指标产生策略信号等等,因为我们使用的数据源无法实时获取盘中的K线数据等等。但是Pro版是可以盘中获取各种周期的K线数据的,所以我们可以盘中就计算指标,产生策略信号,总之,思路是不一样的。

针对回测,之前写过一个事件驱动类型的回测模块,例如我们在日k线上进行回测,但是我们可以同时使用比如1分钟或者5分钟k线数据来作为底层周期的数据,从而使我们的回测更为精确。后续会将之合成到我们的StockQuant_Pro中来。

六、A股历史数据服务

针对一些做模型训练或数据分析的需求,我们计划上线A股历史K线数据服务,包括上证50、沪深300、中证500的盘口10档的历史数据和逐笔成交数据等等。敬请期待。

七、展望

后续计划挖掘用户需求进而将这套系统更完善,欢迎提供建议。

Gary-Hertel
2021-06-12

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,816评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,729评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,300评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,780评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,890评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,084评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,151评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,912评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,355评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,666评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,809评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,504评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,150评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,121评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,628评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,724评论 2 351

推荐阅读更多精彩内容