来公司第一个比较大的业务需求,便是换了新的行情提供商dxfeed,需要把所有K线的处理都重新倒腾一遍。这里说说重写分时K线图的一点心得。
先交代一下dxfeed的情况:
1)股票分时数据采用订阅的方式,可以订阅历史数据
2)每分钟每只股票可能会接收到多条分时数据,也可能一条不推。有些股票在开市过程中每分钟都会有分时数据,有些股票则一个交易日只有几条数据,甚至没有,差别很大
3)推送的分时数据有延迟,从监控获得数据,开市期间大约在60s-120s左右(分时数据看的是趋势,这是可以忍受的)
4)每个订阅对实时数据有qps限制,对历史数据无qps限制(从监控上得到的)
再交代一下股票分时K线数据的情况:
1)股票数目为8400+(我们只做美股相关)
2)每只股票1个交易日的分时线包含391条数据(9:30-16:00, America/New_York),每条数据包含这一分钟的最高价、最低价、均价、昨收价、涨幅、交易量、VWAP等,数据量在150k左右
3)每天存储的分时数据在120w条左右
4)股票分时线如果某一分钟没有点,使用上一分钟的点代替,但无交易量
接下来重点说一下这里的设计:(以AAPL为例)
1)由于分时数据有延迟,无法确定每一分钟的最后一条数据什么时候会来到。因此开市期间并不去保存分时数据。实时的分时数据存储在redis中,计算K线时从redis中取
2)redis中分时数据的存储格式为hash结构,每只股票一个key,为了防止当前交易日使用上个交易日的数据,因此分时数据缓存时是包含日期的,比如:TMT_AAPL0105, key为美东时间,格式:HHmm, value为简单计算后的分时数据。上一个交易日的分时数据通过定时任务在指定时间清理。(这里没有对分时数据做过期设置,是因为刚开市时数据推送量及redis操作量很大,而使用hashes这个结构,是不支持直接传入expire time的,同时即使某天系统出什么问题,也只是多占些硬盘的问题,不会对其他造成影响,对这个操作添加监控及报警就可以了)
3)结构有了,如果dxfeed每次都推送数据过来时,都去操作redis更新,那样redis很可能会吃不消,谁都无法晓得第三方是否会出什么问题疯狂推送一下(事实证明,dxfeed也确实这样有过,可能因网络故障等补偿推送历史数据)。接下来是缓存的的重点:在内存中通过guava cache缓存了最近几分钟的所有symbol的最新数据,key: 股票+HHmm(美东时间), value:一条分时数据,每次推送分时数据过来时,根据股票及分时数据时间进行cache,过期数据(很久以前或非最新数据)或脏数据(相关数据为NaN)直接抛弃,然后通过ScheduledExecutorService维护一个1分钟1执行的job去将最近几分钟的分时数据更新到redis中。这样既解决了分时数据有延迟的问题,又保证了每分钟的redis更新数据量。
4)数据存储结束后,就是处理分时K线的数据了。当请求AAPL时,会先查看是否已经有cache了,比如C_TT_AAPL, 如果有取出cache中数据处理后返回,没有开始计算。计算会先取出上面存储的数据,即TMT_AAPL0105,然后计算当前k线的时间点(开市过程中从1个点慢慢增长到391个点),遍历HHmm去TMT_AAPL0105中取分时数据,如果有直接取出即可,如果没有使用上一个点生成(第一个记得特殊处理一下,没有需要拿上个交易日最后一个点补),计算完后转成json放到redis中,key为计算前查询的key: C_TT_AAPL,然后将结果返回。开市过程中缓存到下一分钟开始,闭市了缓存到下个交易日开始。由于这里缓存的是string,因此直接带上过期时间,之后就不用care了。如此,最核心的分时K线数据绝大部分都是在操作redis,每只股票每分钟只需要计算一次即可,当然这是基于分时K线主要看的是趋势。
5)上面说了,分时数据可以拿历史数据。这里我单独写了一个job,可以在给定开始时间后,取出期望的历史数据,然后更新历史分时数据。当实时推送出问题时,这个job可以用来在秒级获取历史数据来修复redis中的分时数据。实际上,这个job在故障中的表现远远超出预期,之前因为对实时那里做优化,几次都有点问题导致开市后实时数据获取或处理有问题,这时候启动这个拿历史分时的job,几秒内就可以保证redis中的数据变成最新,保证核心的分时图数据一直ok。
6)说到这里,分时数据还没保存。这里借助上面拿历史数据的job,在闭市后(北京时间凌晨5点以后)自动执行,更新redis中数据后开始插入数据库。由于插入量特别大,通过guava的RateLimiter控制写入速度在合理的qps值,慢慢更新就好啦
当然这不是全部,还有些细节,比如动态增加股票,就不多说了。此外,我单独补了许多核心监控,比如定时任务是否正常执行、读取及更新redis时间、开市期间推送的qps、计算分时K线平均时间、平均每分钟推送不同股票数目等等,目前达到的结果是:docker上部署的2个4g的服务来处理所有K线图,分时K线图数据可以在平均10ms左右返回,大量的分时数据在低峰时间端插入,服务器在开市闭市都没什么压力。