java实现股市行情实时推送

所使用的技术

springcloud,redis,rocketMq,websocket,mysql

架构图

行情推送架构图.png

整体流程说明:交易服务交易产生的行情,存放到redis的队列内,然后行情服务会监听这个队列,获取这个队列里面的行情节点,经过数据处理,如,生成k线数据,然后存入redis内,并且使用mq异步持久化行情数据,同时,将行情数据发布给网关,网关订阅到行情数据后,使用websocket实时推送给前端用户。

问题

1.交易产生的行情数据是怎么样的?
这里要说一下,国际行情不是我们去产生的,而是要去对接第三方(比如对接OTC),拿到行情,然后我们再对国际行情进行处理推送展示。而本文的行情是我们自己系统自己交易产生的,所以会有一个交易服务,给用户进行交易,然后产生行情。
产生的行情数据其实很简单,大部分都包含:成交价格,成交量,成交时间这三个字段,因为是自己产生的行情,所以可以把成交的订单号也一并放到队列里,方便查找行情的出处。

2.什么是分时线,什么是K线?
分时线:每分钟的最后一笔成交价的连线叫分时线。分时线即大盘、个股分时走势图中的白色曲线,它反映的是大盘、个股的实时走势。像下图,横坐标的时间,纵坐标是成交价格,那么每一分钟的最后一笔成交价就的连线就是分时线了,分时线是实时变动的,这里就要使用ws推送行情了。那么这里有几个问题,就是怎么获取分时线数据?分时线数据如何存储?你想,要产生和存储分时图的数据,我们是不是得将每一分钟的最后一口价进行存储,并且存储时得注明这个价格是哪一分钟的价格,这个要如何实现?问题会在后面讲到解决办法。

行情分时图.png

K线图:股市及期货市场中的K线图的画法包含四个数据,即开盘价、最高价、最低价、收盘价,所有的k线都是围绕这四个数据展开,反映大势的状况和价格信息。如果把每日的K线图放在一张纸上,就能得到日K线图,同样也可画出周K线图、月K线图。
从图片上来看,其实不止开盘价、最高价、最低价、收盘价这个数据,还有成交量和成交额。
所以,包括的数据就有:开盘价、最高价、最低价、收盘价,成交量、成交额
比如今天是2020-03-01,那么今天就会产生一个日K数据,数据包含:日期(精确到日,即2020-03-01),开盘价(今天的开始价格)、最高价(今天的最高价格)、最低价(今天的最低价格)、收盘价(今天的最后一口价),成交量(今天的总成交量),成交额(今天的总成交额)同理,如果是周K,则就是以周为单位:日期(这周的结束日期,即这周的最后一天的日期,如今天周日,则这周最后一天就是今天2020-03-01),开盘价(这周的开始价格)、最高价(这周的最高价格)、最低价(这周的最低价格)、收盘价(这周的最后一口价)成交量(这周的总成交量),成交额(这周的总成交额)。
当然,这里的日期怎么显示,要看公司具体需求,比如周K的日期,有些公司是拿这周的开始日期,有些公司是拿这周的结束日期。

行情k线图.png

3.为什么要将行情保存到redis的队列里,然后行情服务去主动获取队列里的行情?而不直接使用redis的发布订阅,将行情发布给行情服务;或者使用rocketMq,将行情异步发送给行情服务?
可能有人会说,可以直接使用redis的发布订阅来将行情发布给行情服务,所以就没必要将行情存放到redis的一个队列里,然后行情服务再主动去获取行情数据。理论上,这样也是可以达到效果的,但是为什么不这样做?主要就是因为redis的发布订阅,是消息不可靠的,也就是说这个消息很可能丢失,假如行情服务全部挂掉了,但是交易服务还是正常的发布消息,那么,这个时候发布的消息将全部丢失。所以才需要将行情数据先保存到一个队列内,然后等待行情服务主动去获取,这样,即使行情服务挂了,行情数据也不会丢失,等到行情服务正常后,行情服务会主动去获取队列里的行情数据,一个一个的处理。
那么,为什么不直接使用rocketMq将行情发送给行情服务呢?其中最大的原因就是消息顺序消费的问题,行情需要保证一个正确的顺序输出的,比如我这一秒产生的行情,不能够比上一秒产生的行情要晚推送给前端,是需要保证顺序的,而rocketMq是很难保证顺序消费的,所以也不会使用rocketmq来实现这一点。

4.websocket是什么?为什么使用它做行情的实时推送?
可能有人会想,要保证页面实时的更新获取最新价格,可以使用http的长轮询,也就是不断的去轮询请求接口获取相应的数据来保证实时性,但是这样是很耗资源和性能的,因此不采用这种方式,而是使用websocket。

ajax轮询和websocket对比.png

从上图可以看出,使用ajax轮询实现实时推送,需要每次都发送一次http请求,然后等待服务器响应。而使用websocket只需要在第一次的时候发送http请求,客户端与服务器建立连接之后,后面客户端就不再需要每次都发送请求,然后等待访问了,服务端会主动发送数据给客户端。
这就像我要去山的另外一边看海,如果使用ajax轮询这种方式,我每次去山的那边都需要在山上挖一条隧道,然后走过去看海,但是使用websocket方式,则只需要第一次的时候,将隧道挖好,以后想去看海都不用再挖隧道了,直接就可以过去看海。当然,关于websocket的原理,大家可以去搜索上网了解,这里只是简单的说下。

代码实现

最上面的架构图就是实现的一个思路,我们需要将行情基础数据保存到redis的一个队列里,然后行情服务去主动获取队列内的数据,行情服务获取到数据后,就将行情发布给网关,然后由网关使用websocket将数据推送给前端用户,同时,需要将行情数据处理成k线数据,并保存到redis里以及对行情数据进行持久化。
那么我们只要围绕着这个思路,去实现就可以了:
1.将行情保存到redis队列内
2.行情服务获取队列数据
3.行情服务发布行情消息
4.行情服务处理行情数据,生成K线数据,并保存到redis
5.行情服务队行情数据进行持久化
6.网关订阅行情,并使用websocet将行情推送给前端

1.将行情保存到redis队列内(略)

2.行情服务获取队列数据(分布式锁保证顺序),发布行情消息,生成K线数据并保存到redis,行情数据持久化

/**
 * 系统初始化类
 * 
 * @author Longer
 *
 */
@Component
public class SystemInitBean implements InitializingBean {

    static Logger logger = LoggerFactory.getLogger(SystemInitBean.class);
    @Autowired
    private IKLineService kLineService;
    @Autowired
    private TradeTimeService tradeTimeService;
    @Autowired
    private RedisUtil redisUtil;
    @Autowired
    private QuotePeriodUtil quotePeriodUtil;
    @Autowired
    private RocketMQUtil rocketMQUtil;
    @Value("${rocketmq.quotepersistenece.topics}")
    private String persistenceTopics;
    @Value("${product.list}")
    private String productList;//商品列表

    public void afterPropertiesSet() throws Exception {
        logger.info("-------------开始启动初始服务--------------");
        String[] productArr = productList.split(",");
        for (String productTradeNo : productArr) {
            new QuoteListListenerThread(kLineService,tradeTimeService,redisUtil,quotePeriodUtil
                    ,rocketMQUtil,persistenceTopics,productTradeNo).start();
        }
        logger.info("---------启动初始服务完毕-------------------");
    }
}


/**
 * @Classname QuoteListListenerThread
 * @Description 行情队列监听线程
 * @Date 2019/12/23 15:43
 * @Created by Longer
 */
@Slf4j
public class QuoteListListenerThread extends Thread{
    public QuoteListListenerThread(){}
    public QuoteListListenerThread(IKLineService kLineService, TradeTimeService tradeTimeService,
                                   RedisUtil redisUtil, QuotePeriodUtil quotePeriodUtil,
                                   RocketMQUtil rocketMQUtil, String persistenceTopics, String productTradeNo){
        this.kLineService=kLineService;
        this.tradeTimeService=tradeTimeService;
        this.redisUtil=redisUtil;
        this.quotePeriodUtil=quotePeriodUtil;
        this.rocketMQUtil=rocketMQUtil;
        this.persistenceTopics=persistenceTopics;
        this.productTradeNo=productTradeNo;
    }

    private IKLineService kLineService;
    private TradeTimeService tradeTimeService;
    private RedisUtil redisUtil;
    private QuotePeriodUtil quotePeriodUtil;
    private RocketMQUtil rocketMQUtil;
    private String persistenceTopics;
    private String productTradeNo;

 @Override
    public void run() {
        while(true){
            String lockValue = UUID.randomUUID().toString();
            try{
                //加锁(分布式锁),不加锁的话,就无法保证顺序消费,因为有多个节点
                boolean lock = redisUtil.getLock(MessageFormat.format(RedisConstant.QUOTESERVICE_KLINE_LOCK_KEY,productTradeNo)
                        , lockValue, 1);
                if(lock){
                    Object realMsg = redisUtil.rightPop(MessageFormat.format(RedisConstant.QUOTESERVICE_QUOTE_LIST,productTradeNo));
                    if(!StringUtils.isEmpty(realMsg)){
                        log.info("QuoteListListenerThread监听到队列消息:{}",realMsg);
                        String s = realMsg.toString();
                        QuoteDto quoteDto = JSONObject.parseObject(s, QuoteDto.class);
                        
                        //step1.发布行情消息给网关
                        
                        //step2.行情数据处理(K线)和存储到redis
                        
                        //step3.行情数据异步持久化
                        
                    }
                }else{
                    log.info("QuoteListListenerThread:获取锁失败");
                    Thread.sleep(1 * 1000);
                }
            }catch (Exception e){
                log.info("QuoteListListenerThread报错:{}",e);
                try {
                    Thread.sleep(1 * 1000);
                } catch (InterruptedException e1) {
                    log.info("QuoteListListenerThread睡眠报错:{}",e1);
                }
            }finally {
                try {
                    //释放锁
                    redisUtil.releaseLock(MessageFormat.format(RedisConstant.QUOTESERVICE_KLINE_LOCK_KEY,productTradeNo),lockValue);
                }catch (Exception e){
                    log.info("释放锁报错:{}",e);
                }

            }
        }
    }
}

说明:不同的商品会对应不同的行情,所以也会将各自的行情存放到不同的队列。这里区分商品的标识是productTradeNo 字段(商品交易编码),在系统启动的时候,会针对每一个商品,都开启一个线程,进行处理商品的行情。
由于行情服务是多节点,所以这里需要使用分布式锁,来保证每次只有一个节点获取到行情数据,从而保证行情的顺序消费。其实如果要百分百保证顺序消费的话,最好是行情服务只部署一个节点,但是这样就不能保证行情服务的高可用了。

行情服务处理行情数据,生成K线数据,并保存到redis

刚刚在上面提过,分时线就是每一分钟最后一口价的连线,而K线包含的数据有:“开盘价、最高价、最低价、收盘价,成交量、成交额”,那么要获取分时线数据,则只需要获取1分钟k线数据就好了,也就是以每分钟为单位的K线数据。因为1分钟K线的收盘价就是每分钟的最后一口价。
那现在的问题就是怎么样将基本行情数据(成交价格,成交量,成交时间),加工处理成K线数据,并且保存到redis内。
这里就要使用到一个Java类CronSequenceGenerator
二话不说,直接上代码。

/**
 * @Classname PeriodType
 * @Description TODO
 * @Date 2019/5/30 16:57
 * @Created by Longer
 */
public enum PeriodType {
    /** 分时 */
    ONE_MINUTE("1"),

    /** 5分钟 */
    FIVE_MINUTE("5"),

    /** 15分钟 */
    FIFTEEN_MINUTE("15"),

    /** 30分钟 */
    THIRTY_MINUTE("30"),

    /** 60分钟 */
    SIXTY_MINUTE("60"),

    /** 天 */
    DAY("day"),

    /** 4小时*/
    FOUR_HOUR("4"),

    /** 周 */
    WEEK("week"),

    /** 月 */
    MONTH("month");

    private String index;

    // 构造方法
    private PeriodType(String index) {
        this.index = index;
    }

    public String getIndex() {
        return index;
    }
}


/**
 * @Classname QuotePeriodUtil
 * @Description TODO
 * @Date 2019/5/31 14:46
 * @Created by Longer
 */
@Component
@Slf4j
public class QuotePeriodUtil {

    private static CronSequenceGenerator oneMinuteTrigger = new CronSequenceGenerator("0 0/1 * * * ? ");
    private static CronSequenceGenerator fiveMinuteTrigger = new CronSequenceGenerator("0 0/5 *  * * ? ");
    private static CronSequenceGenerator fifteenMinuteTrigger = new CronSequenceGenerator("0 0/15 *  * * ? ");
    private static CronSequenceGenerator thirtyMinuteTrigger = new CronSequenceGenerator("0 0/30 *  * * ? ");
    private static CronSequenceGenerator fourHourTrigger = new CronSequenceGenerator("0 0 0/4  * * ? ");
    private static CronSequenceGenerator sixtyMinuteTrigger = new CronSequenceGenerator("0 0/60 *  * * ? ");
    // 每天上午0:00触发
    private static CronSequenceGenerator dayTrigger = new CronSequenceGenerator("0 0 0 * * ?");
    // 每个星期一上午0点触发
    private static CronSequenceGenerator weekTrigger = new CronSequenceGenerator("0 0 0 ? * MON");
    // 表示在每月的1日的上午0点触发
    private static CronSequenceGenerator monthTrigger = new CronSequenceGenerator("0 0 0 1 * ?");
    /*
    @Autowired
    private ParameterCache parameterCache;*/

    public QuotePeriod getPeriod( PeriodType type, Date dateTime) {
        QuotePeriod period = null;
        Date nextExecutionTime = null;
        switch (type) {
        case ONE_MINUTE:
            nextExecutionTime = oneMinuteTrigger.next(dateTime);
            period = new QuotePeriod(nextExecutionTime.getTime() - 1 * 60 * 1000, nextExecutionTime.getTime(), type);
            break;
        case FIVE_MINUTE:
            nextExecutionTime = fiveMinuteTrigger.next(dateTime);
            period = new QuotePeriod(nextExecutionTime.getTime() - 5 * 60 * 1000, nextExecutionTime.getTime(), type);
            break;
        case FIFTEEN_MINUTE:
            nextExecutionTime = fifteenMinuteTrigger.next(dateTime);
            period = new QuotePeriod(nextExecutionTime.getTime() - 15 * 60 * 1000, nextExecutionTime.getTime(), type);
            break;
        case THIRTY_MINUTE:
            nextExecutionTime = thirtyMinuteTrigger.next(dateTime);
            period = new QuotePeriod(nextExecutionTime.getTime() - 30 * 60 * 1000, nextExecutionTime.getTime(), type);
            break;
        case SIXTY_MINUTE:
            nextExecutionTime = sixtyMinuteTrigger.next(dateTime);
            period = new QuotePeriod(nextExecutionTime.getTime() - 60 * 60 * 1000, nextExecutionTime.getTime(), type);
            break;
        case FOUR_HOUR:
            nextExecutionTime = fourHourTrigger.next(dateTime);
            period = new QuotePeriod(nextExecutionTime.getTime() - 4 * 60 * 60 * 1000, nextExecutionTime.getTime(), type);
            break;
        case DAY:
            nextExecutionTime = dayTrigger.next(dateTime);
            period = new QuotePeriod(nextExecutionTime.getTime() - 24 * 60 * 60 * 1000, nextExecutionTime.getTime(), type);
            break;
        case WEEK:
            nextExecutionTime = weekTrigger.next(dateTime);
            period = new QuotePeriod(nextExecutionTime.getTime() - 7 * 24 * 60 * 60 * 1000, nextExecutionTime.getTime(), type);
            break;
        case MONTH:
            nextExecutionTime = monthTrigger.next(dateTime);
            // 获取当月1号上午6点
            Calendar calendar = Calendar.getInstance();
            calendar.setTime(new Date(nextExecutionTime.getTime()));
            calendar.add(Calendar.MONTH, -1);// 取上个月
            calendar.set(Calendar.DAY_OF_MONTH, 1);
            calendar.set(Calendar.HOUR_OF_DAY, 0);
            calendar.set(Calendar.MINUTE, 0);
            calendar.set(Calendar.SECOND, 0);
            period = new QuotePeriod(calendar.getTimeInMillis(), nextExecutionTime.getTime(), type);
            break;
        default:
            nextExecutionTime = thirtyMinuteTrigger.next(dateTime);
            period = new QuotePeriod(nextExecutionTime.getTime() - 30 * 60 * 1000, nextExecutionTime.getTime(), PeriodType.THIRTY_MINUTE);
        }

        return period;
    }

    /**
     * 获取当前天的周期前的第 n 个天周期的日期时间
     * @param type
     * @param n
     * @return
     */
    public QuotePeriod getBeforePeriod( PeriodType type, int n) {
        Date nextExecutionTime = dayTrigger.next(new Date());
        QuotePeriod period = new QuotePeriod(nextExecutionTime.getTime() - 24 * 60 * 60 * 1000 * n, nextExecutionTime.getTime(), type);
        return period;
    }

使用上面我写好的类,就可以知道当前时间是属于哪一分钟,或者是属于哪个周,哪个月的。大家可以直接拷贝上面两个类,然后自行进行调试。

public static void main(String[] args) {
        QuotePeriodUtil quotePeriodUtil = new QuotePeriodUtil();
        QuotePeriod period = quotePeriodUtil.getPeriod(PeriodType.WEEK, new Date());
        System.out.println("这周的开始日期"+period.getStartTime()+"这周的结束日期"+period.getEndTime());
    }

3.网关订阅行情,并使用websocet将行情推送给前端(略)

因为本文的重点是整个行情推送的设计实录,并且对一些难点进行说明,而ws推送的代码网上是很多的,大家可以自行上网查找,这里就不贴出来了。

总结

java实现股市行情实时推送,最上面的架构图就是实现的一个思路,我们需要将行情基础数据保存到redis的一个队列里,然后行情服务去主动获取队列内的数据,行情服务获取到数据后,就将行情发布给网关,然后由网关使用websocket将数据推送给前端用户,同时,需要将行情数据处理成k线数据,并保存到redis里以及对行情数据进行持久化。
那么我们只要围绕着这个思路,去实现就可以了:
1.将行情保存到redis队列内
2.行情服务获取队列数据
3.行情服务发布行情消息
4.行情服务处理行情数据,生成K线数据,并保存到redis
5.数据持久化

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,911评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,014评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 142,129评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,283评论 1 264
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,159评论 4 357
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,161评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,565评论 3 382
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,251评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,531评论 1 292
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,619评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,383评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,255评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,624评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,916评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,199评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,553评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,756评论 2 335

推荐阅读更多精彩内容