高并发秒杀之秒杀优化

1 优化分析
前三张基本将秒杀的系统开发完成但是之前那种设计真的可以承受高并发下的秒杀么本篇文章结合该高并发系统考虑,哪些是可能出现的高并发点呢?

这里写图片描述

上图中,所有的红色的部分都可能是出现高并发的点。

1.1为什么单独获取系统时间

在详情页,可能出现用户大量的刷新情况,此时系统应该部署在CDN节点上,此时要做一个静态化处理,当再次刷新时它获取的CDN静态资源(css/js/picture),但是,时间要保持实时的,所以要单独的做一个处理,单独从服务器系统上获取时间,这也就是为什么要在详情页单独获取系统时间了。

1.2 CDN是什么

简介:CDN(内容发布网络),是一个加速用户获取数据的系统;既可以是静态资源,又可以是动态资源,这取决于我们的决策策略。经常大部分视频加速都依赖于CDN,比如优酷,爱奇艺等,据此加速;

原理:CDN部署在距离用户最近的网络节点上,用户上网的时候通过网络运营商(电信,长城等)访问距离用户最近的要给城域网网络地址节点上,然后通过城域网跳到主干网上,主干网则根据访问IP找到访问资源所在服务器,但是,很大一部分内容在上一层节点已经找到,此时不用往下继续查找,直接返回所访问的资源即可,减小了服务器的负担。一般互联网公司都会建立自己的CDN机群或者租用CDN。

1.3 获取系统时间不用优化

获取系统访问时间的操作不用优化,因为访问一次内存Cacheline大约10ns,1秒内可以做很大数据量级的时间获取操作,所以,不用做什么优化!
1.4 秒杀地址(Redis缓存技术)

对于秒杀地址暴露的接口是否可以缓存呢?

秒杀接口是无法缓存在CDN当中的,因为CDN适合缓存不易变化的资源,通常是静态资源,比如css/jquery资源,每一个url对应了一个不变的内容,秒杀的接口地址是在每次都发生变化的,不适合放在CDN缓存。

但是适合放在服务器端做缓存(后端缓存),比如redis等,下一次访问的时候直接去服务器端缓存里面查找,如果服务器端缓存有了就直接拿走,没有的话再做正常数据访问处理;另外一个原因就是,一致性维护成本很低。

秒杀地址接口的优化策略:

请求地址,先访问redis,如果redis缓存中没有所需资源或者访问访问超时,则直接进入mysql获取系统资源,将获取的内容更新在redis当中(策略:超时穿透,主动更新)。

1.5 秒杀操作
1.5.1 秒杀操作分析

(a)秒杀操作优化分析

对于这种写操作,是无法使用CDN优化的,另外,也不适合在后端缓存,因为缓存了其他数据,可能会出现数据不一致的情况。
秒杀数据数据操作的一个困难的点就是一行数据大量用户出现竞争的情况,同时出现大量的(b)update操作,这样该如何优化呢?
(架构+维护点)
设计一个原子计数器(redis/NoSQL来实现)用来记录行为信息(用分布式MQ实现这个消息队列,即把消息放在MQ当中),然后后端服务消费此消息并落地(用Mysql实现,落地:记录购买者,能够扛着很大的访问量)
但是这个而技术的有自己的弱点,也就是成本方面:
运维成本和稳定型:NoSQL,MQ等;开发成本在数据一致性和回滚方案等;幂等性难以保证:重复秒杀的问题;不适合新手的架构。
(c)为什么不用MySql来解决秒杀操作?
因为Mysql执行update的减库存比较低效,一条update操作的压力测试结果是可以抗住4wQPS,也就是说,一个商品在1秒内,可以被买4w次;
看一下Java控制事务的行为分析:
(执行库存减1操作)

Update table set num=num-1 where id=10 andnum>0,紧接着会进行一个inser购买明细的操作,然后commit/rollback;

然后第二个用户Updatetable set num=num-1 where id=10 and num>0,紧接着等待行锁,获得锁lock,来继续执行,然后后面的用户……

这样下来的话,整个秒杀操作可以说是一种串行化的执行序列。

1.5.2 分析瓶颈所在

Update减库存—>insert购买明细—>commit/rollback:这两个过程都存在网路延迟和GC;但并非java和sql本身慢,而是java和通信之间比较慢;

所以,java执行时间+网络延迟时间+GC=这行操作的执行时间(大概在2ms,1秒钟有500次减操作,对于秒杀系统来说这个性能呈指数级下降,并不好)。

1.5.3 优化思路分析

我们知道行级锁是在commit之后释放的,那么我们的优化方向就是减少行级锁的持有时间。
同城机房需要花0.5-2msmax(1000qps),update之后JVM-GC(50ms) max(20qps);
异地机房一次(北京上海之间额一次update Sql需要20ms。

如何判断update更新库存成功?

两个条件:——Update自身不报错,客户端确认影响记录数

优化思路:

把客户端逻辑放在Mysql服务端,避免网络延迟和GC影响。

那么,如何把逻辑放在Mysql服务端呢?

1.5.4 两种优化解决方案

(1)定制SQL方案:update/+[auto_commit]/,需要修改Mysql源码;这样可以在SQL执行完之后,直接在服务端完成commit,不用客户端逻辑判断之后来执行是否commit/rollback。 但是这个增加了修改Mysql源码的成本(不推荐)。

(2)使用存储过程:整个事务在MySQL端完成(把整个热点执行放在一个过程当中一次性完成,只需要返回执行的整个结果就行了,这样可以避免网络延迟和GC干扰)。

1.6 优化分析总结

前端控制:暴露接口(动静态数据分离)

按钮防重复(避免重复请求)

动静态数据分离:CDN缓存,后端缓存(redis技术实现的查询)。

事务竞争优化:减少事务锁时间(用Mysql来解决)。

2 Redis后端缓存优化
2.1 Redis 安装
Redis在通常情况下都是使用机群来维护缓存,此处用一个Redis缓存为例。
此处应用的目的:使用redis优化地址接口,暴露接口。
若想使用Redis作为服务端的缓存机制,则应该首先在服务端安装Redis:具体安装教程
在Linux环境下安装Redis步骤可参照:http://blog.csdn.net/lamp_yang_3533/article/details/52518706
在Window下安装Redis可参照:http://blog.csdn.net/wgw335363240/article/details/24469159

2.2 优化编码

第一,在Pom.xml文件引入Redis在Java环境下的客户端Jedis.

   <!-- redis -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

第二,添加一个对象序列化的缓存类RedisDao.java:

为什么要使用对象序列化?
序列化的目的是将一个实现了Serializable接口的对象转换成一个字节序列,可以。 把该字节序列保存起来(例如:保存在一个文件里),以后可以随时将该字节序列恢复为原来的对象。
序列化的对象占原有空间的十分之一,压缩速度可以达到两个数量级,同时节省了CPU
Redis 缓存对象时需要将其序列化,而何为序列化,实际上就是将对象以字节形式存储。这样,不管对象的属性是字符串、整型还是图片、视频等二进制类型,
都可以将其保存在字节数组中。对象序列化后便可以持久化保存或网络传输。需要还原对象时,只需将字节数组再反序列化即可。


/**
 * User: cxhc.
 * Date: 2017/10/5.
 * Time: 上午 12:11.
 * Explain: Redis操作的接口
 */

public interface BaseRedisDao<K, V> {

    /**
     * 用户排序通过注册时间的 权重值
     * @param date
     * @return
     */
    double getCreateTimeScore(long date);
    /**
     * 获取Redis中所有的键的key
     * @return
     */
    Set<K> getAllKeys();

    /**
     * 获取所有的普通key-value
     * @return
     */
    Map<K,V> getAllString();

    /**
     * 获取所有的Set -key-value
     * @return
     */
    Map<K,Set<V>> getAllSet();
    /**
     * 获取所有的ZSet正序  -key-value 不获取权重值
     * @return
     */
    Map<K,Set<V>> getAllZSetReverseRange();
    /**
     * 获取所有的ZSet倒序  -key-value 不获取权重值
     * @return
     */
    Map<K,Set<V>> getAllZSetRange();

    /**
     * 获取所有的List -key-value
     * @return
     */
    Map<K,List<V>> getAllList();

    /**
     * 获取所有的Map -key-value
     * @return
     */
    Map<K,Map<K,V>> getAllMap();

    /**
     * 添加一个list
     * @param key
     * @param objectList
     */
    void addList(K key, List<V> objectList);
    /**
     * 向list中增加值
     * @param key
     * @param obj
     * @return 返回在list中的下标
     */
    long addList(K key,V obj);
    /**
     *
     * 向list中增加值
     * @param key
     * @param obj
     * @return 返回在list中的下标
     */
    long addList(K key,V ...obj);

    /**
     *
     * 输出list
     * @param key List的key
     * @param s 开始下标
     * @param e 结束的下标
     * @return
     */
    List<V> getList(K key, long s, long e);
    /**
     * 输出完整的list
     * @param key
     */
    List<V> getList(K key);
    /**
     * 获取list集合中元素的个数
     * @param key
     * @return
     */
    long getListSize(K key);
    /**
     * 移除list中某值
     * 移除list中 count个value为object的值,并且返回移除的数量,
     * 如果count为0,或者大于list中为value为object数量的总和,
     * 那么移除所有value为object的值,并且返回移除数量
     * @param key
     * @param object
     * @return 返回移除数量
     */
    long removeListValue(K key,V object);
    /**
     * 移除list中某值
     * @param key
     * @param object
     * @return 返回移除数量
     */
    long removeListValue(K key,V... object);

    /**
     * 批量删除key对应的value
     * @param keys
     */
    void remove(final K... keys);
    /**
     * 删除缓存
     * 根据key精确匹配删除
     * @param key
     */
    void remove(final K key);

    /**
     * 通过分数删除ZSet中的值
     * @param key
     * @param s
     * @param e
     */
    void removeZSetRangeByScore(String key,double s , double e);
    /**
     * 设置Set的过期时间
     * @param key
     * @param time
     * @return
     */
    Boolean setSetExpireTime(String key,Long time);
    /**
     * 设置ZSet的过期时间
     * @param key
     * @param time
     * @return
     */
    Boolean setZSetExpireTime(String key,Long time);
    /**
     * 判断缓存中是否有key对应的value
     * @param key
     * @return
     */
    boolean exists(final K key);
    /**
     * 读取String缓存 可以是对象
     * @param key
     * @return
     */
    V get(final K key);
    /**
     * 读取String缓存 可以是对象
     * @param key
     * @return
     */
    List<V> get(final K... key);
    /**
     * 读取缓存 可以是对象 根据正则表达式匹配
     * @param regKey
     * @return
     */
    List<Object> getByRegular(final K regKey);
    /**
     * 写入缓存 可以是对象
     * @param key
     * @param value
     */
    void set(final K key, V value);
    /**
     * 写入缓存
     * @param key
     * @param value
     * @param expireTime 过期时间 -单位s
     * @return
     */
    void set(final K key, V value, Long expireTime);

    /**
     * 设置一个key的过期时间(单位:秒)
     * @param key
     * @param expireTime
     * @return
     */
    boolean setExpireTime(K key, Long expireTime);

    /**
     * 获取key的类型
     * @param key
     * @return
     */
    DataType getType(K key);

    /**
     * 删除map中的某个对象
     * @param key   map对应的key
     * @param field map中该对象的key
     */
    void removeMapField(K key, V... field);
    /*
     * 获取map对象
     * @param key map对应的key
     * @return
     */
    Map<K,V> getMap(K key);
    /*
     * 获取map对象
     * @param key map对应的key
     * @return
     */
    Long getMapSize(K key);
    /**
     * 获取map缓存中的某个对象
     * @param key map对应的key
     * @param field map中该对象的key
     * @return
     */
    <T> T getMapField(K key, K field);
    /**
     * 判断map中对应key的key是否存在
     * @param key map对应的key
     * @return
     */
    Boolean hasMapKey(K key,K field);

    /**
     * 获取map对应key的value
     * @param key map对应的key
     * @return
     */
    List<V> getMapFieldValue(K key);
    /**
     * 获取map的key
     * @param key map对应的key
     * @return
     */
    Set<V> getMapFieldKey(K key);
    /**
     * 添加map
     * @param key
     * @param map
     */
    void addMap(K key, Map<K,V> map);
    /**
     * 向key对应的map中添加缓存对象
     * @param key   cache对象key
     * @param field map对应的key
     * @param value     值
     */
    void addMap(K key, K field, Object value);
    /**
     * 向key对应的map中添加缓存对象
     * @param key   cache对象key
     * @param field map对应的key
     * @param time 过期时间-整个MAP的过期时间
     * @param value     值
     */
    void addMap(K key, K field, V value,long time);

    /**
     * 向set中加入对象
     * @param key  对象key
     * @param obj  值
     */
    void addSet(K key, V... obj);

    /**
     * 处理事务时锁定key
     * @param key
     */
    void watch(String key);

    /**
     * 移除set中的某些值
     * @param key  对象key
     * @param obj  值
     */
    long removeSetValue(K key, V obj);
    /**
     * 移除set中的某些值
     * @param key  对象key
     * @param obj  值
     */
    long removeSetValue(K key, V... obj);

    /**
     * 获取set的对象数
     * @param key  对象key
     */
    long getSetSize(K key);

    /**
     * 判断set中是否存在这个值
     * @param key  对象key
     */
    Boolean hasSetValue(K key,V obj);
    /**
     * 获得整个set
     * @param key  对象key
     */
    Set<V> getSet(K key);

    /**
     * 获得set 并集
     * @param key
     * @param otherKey
     * @return
     */
    Set<V> getSetUnion(K key,K otherKey);

    /**
     * 获得set 并集
     * @param key
     * @param set
     * @return
     */
    Set<V> getSetUnion(K key,Set<Object> set);

    /**
     * 获得set 交集
     * @param key
     * @param otherKey
     * @return
     */
    Set<V> getSetIntersect(K key,K otherKey);

    /**
     * 获得set 交集
     * @param key
     * @param set
     * @return
     */
    Set<V> getSetIntersect(K key,Set<Object> set);

    /**
     * 模糊移除 支持*号等匹配移除
     * @param blears
     */
    void removeBlear(K... blears);

    /**
     * 修改key名 如果不存在该key或者没有修改成功返回false
     * @param oldKey
     * @param newKey
     * @return
     */
    Boolean renameIfAbsent(String oldKey,String newKey);
    /**
     * 模糊移除 支持*号等匹配移除
     * @param blear
     */
    void removeBlear(K blear);

    /**
     * 根据正则表达式来移除key-value
     * @param blears
     */
    void removeByRegular(String... blears);

    /**
     * 根据正则表达式来移除key-value
     * @param blears
     */
    void removeByRegular(String blears);

    /**
     * 根据正则表达式来移除 Map中的key-value
     * @param key
     * @param blears
     */
    void removeMapFieldByRegular(K key,K... blears);

    /**
     * 根据正则表达式来移除 Map中的key-value
     * @param key
     * @param blear
     */
    void removeMapFieldByRegular(K key,K blear);

    /**
     * 移除key 对应的value
     * @param key
     * @param value
     * @return
     */
    Long removeZSetValue(K key, V... value);
    /**
     * 移除key ZSet
     * @param key
     * @return
     */
    void removeZSet(K key);
    /**
     *删除,键为K的集合,索引start<=index<=end的元素子集
     * @param key
     * @param start
     * @param end
     * @return
     */
    void removeZSetRange(K key,Long start,Long end);

    /**
     * 并集 将key对应的集合和key1对应的集合合并到key2中
     * 如果分数相同的值,都会保留
     * 原来key2的值会被覆盖
     * @param key
     * @param key1
     * @param key2
     */
    void setZSetUnionAndStore(String key,String key1, String key2);

    /**
     * 获取整个有序集合ZSET,正序
     * @param key
     */
    <T> T getZSetRange(K key);

    /**
     * 获取有序集合ZSET
     * 键为K的集合,索引start<=index<=end的元素子集,正序
     * @param key
     * @param start 开始位置
     * @param end 结束位置
     */
    <T> T getZSetRange(K key,long start,long end);
    /**
     * 获取整个有序集合ZSET,倒序
     * @param key
     */
    Set<Object> getZSetReverseRange(K key);

    /**
     * 获取有序集合ZSET
     * 键为K的集合,索引start<=index<=end的元素子集,倒序
     * @param key
     * @param start 开始位置
     * @param end 结束位置
     */
    Set<V> getZSetReverseRange(K key,long start,long end);

    /**
     * 通过分数(权值)获取ZSET集合 正序 -从小到大
     * @param key
     * @param start
     * @param end
     * @return
     */
    Set<V> getZSetRangeByScore(String key, double start, double end);

    /**
     * 通过分数(权值)获取ZSET集合 倒序 -从大到小
     * @param key
     * @param start
     * @param end
     * @return
     */
    Set<V> getZSetReverseRangeByScore(String key, double start, double end);

    /**
     * 键为K的集合,索引start<=index<=end的元素子集
     * 返回泛型接口(包括score和value),正序
     * @param key
     * @param start
     * @param end
     * @return
     */
    Set<ZSetOperations.TypedTuple<V>> getZSetRangeWithScores(K key, long start, long end);
    /**
     * 键为K的集合,索引start<=index<=end的元素子集
     * 返回泛型接口(包括score和value),倒序
     * @param key
     * @param start
     * @param end
     * @return
     */
    Set<ZSetOperations.TypedTuple<V>> getZSetReverseRangeWithScores(K key, long start, long end);

    /**
     * 键为K的集合
     * 返回泛型接口(包括score和value),正序
     * @param key
     * @return
     */
    Set<ZSetOperations.TypedTuple<V>> getZSetRangeWithScores(K key);
    /**
     * 键为K的集合
     * 返回泛型接口(包括score和value),倒序
     * @param key
     * @return
     */
    Set<ZSetOperations.TypedTuple<V>> getZSetReverseRangeWithScores(K key);

    /**
     * 键为K的集合,sMin<=score<=sMax的元素个数
     * @param key
     * @param sMin
     * @param sMax
     * @return
     */
    long getZSetCountSize(K key,double sMin,double sMax);

    /**
     * 获取Zset 键为K的集合元素个数
     * @param key
     * @return
     */
    long getZSetSize(K key);

    /**
     * 获取键为K的集合,value为obj的元素分数
     * @param key
     * @param value
     * @return
     */
    double getZSetScore(K key,V value);

    /**
     * 元素分数增加,delta是增量
     * @param key
     * @param value
     * @param delta
     * @return
     */
    double incrementZSetScore(K key,V value,double delta);

    /**
     * 添加有序集合ZSET
     * 默认按照score升序排列,存储格式K(1)==V(n),V(1)=S(1)
     * @param key
     * @param score
     * @param value
     * @return
     */
    Boolean addZSet(String key ,double score, Object value);

    /**
     * 添加有序集合ZSET
     * @param key
     * @param value
     * @return
     */
    Long addZSet(K key,TreeSet<V> value);

    /**
     * 添加有序集合ZSET
     * @param key
     * @param score
     * @param value
     * @return
     */
    Boolean addZSet(K key,double[] score, Object[] value);
}

redis dao 实现类


/**
 * User: cxhc.
 * Date: 2017/10/5.
 * Time: 上午 12:51.
 * Explain: Redis的工具类
 * 增删改 -不能在这里面抓取异常 -因为可能有事务处理
 */
@Repository("redisHandle")
public class RedisHandle implements BaseRedisDao<String, Object> {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @Resource(name="redisTemplate")
    protected RedisTemplate redisTemplate;
    /**
     * 出异常,重复操作的次数
     */
    private static Integer times = 5;

    
    public double getCreateTimeScore(long date) {
        return date/ 100000.0;
    }

    
    public Set<String> getAllKeys() {
        return redisTemplate.keys("*");
    }

    
    public Map<String, Object> getAllString() {
        Set<String> stringSet = getAllKeys();
        Map<String, Object> map = new HashMap<String, Object>();
        Iterator<String> iterator = stringSet.iterator();
        while (iterator.hasNext()) {
            String k = iterator.next();
            if (getType(k) == DataType.STRING) {
                map.put(k, get(k));
            }
        }
        return map;
    }

    
    public Map<String, Set<Object>> getAllSet() {
        Set<String> stringSet = getAllKeys();
        Map<String, Set<Object>> map = new HashMap<String, Set<Object>>();
        Iterator<String> iterator = stringSet.iterator();
        while (iterator.hasNext()) {
            String k = iterator.next();
            if (getType(k) == DataType.SET) {
                map.put(k, getSet(k));
            }
        }
        return map;
    }

    
    public Map<String, Set<Object>> getAllZSetRange() {
        Set<String> stringSet = getAllKeys();
        Map<String, Set<Object>> map = new HashMap<String, Set<Object>>();
        Iterator<String> iterator = stringSet.iterator();
        while (iterator.hasNext()) {
            String k = iterator.next();
            if (getType(k) == DataType.ZSET) {
                logger.debug("k:"+k);
                map.put(k, getZSetRange(k));
            }
        }
        return map;
    }

    
    public Map<String, Set<Object>> getAllZSetReverseRange() {
        Set<String> stringSet = getAllKeys();
        Map<String, Set<Object>> map = new HashMap<String, Set<Object>>();
        Iterator<String> iterator = stringSet.iterator();
        while (iterator.hasNext()) {
            String k = iterator.next();
            if (getType(k) == DataType.ZSET) {
                map.put(k, getZSetReverseRange(k));
            }
        }
        return map;
    }

    
    public Map<String, List<Object>> getAllList() {
        Set<String> stringSet = getAllKeys();
        Map<String, List<Object>> map = new HashMap<String, List<Object>>();
        Iterator<String> iterator = stringSet.iterator();
        while (iterator.hasNext()) {
            String k = iterator.next();
            if (getType(k) == DataType.LIST) {
                map.put(k, getList(k));
            }
        }
        return map;
    }

    
    public Map<String, Map<String, Object>> getAllMap() {
        Set<String> stringSet = getAllKeys();
        Map<String, Map<String, Object>> map = new HashMap<String, Map<String, Object>>();
        Iterator<String> iterator = stringSet.iterator();
        while (iterator.hasNext()) {
            String k = iterator.next();
            if (getType(k) == DataType.HASH) {
                map.put(k, getMap(k));
            }
        }
        return map;
    }

    
    public void addList(String key, List<Object> objectList) {
        for (Object obj : objectList) {
            addList(key, obj);
        }
    }

    
    public long addList(String key, Object obj) {
        return redisTemplate.boundListOps(key).rightPush(obj);
    }

    
    public long addList(String key, Object... obj) {
        return redisTemplate.boundListOps(key).rightPushAll(obj);
    }

    
    public List<Object> getList(String key, long s, long e) {
        return redisTemplate.boundListOps(key).range(s, e);
    }

    
    public List<Object> getList(String key) {
        return redisTemplate.boundListOps(key).range(0, getListSize(key));
    }

    
    public long getListSize(String key) {
        return redisTemplate.boundListOps(key).size();
    }

    
    public long removeListValue(String key, Object object) {
        return redisTemplate.boundListOps(key).remove(0, object);
    }

    
    public long removeListValue(String key, Object... objects) {
        long r = 0;
        for (Object object : objects) {
            r += removeListValue(key, object);
        }
        return r;
    }

    
    public void remove(String... key) {
        if (key != null && key.length > 0) {
            if (key.length == 1) {
                remove(key[0]);
            } else {
                redisTemplate.delete(CollectionUtils.arrayToList(key));
            }
        }
    }

    
    public void removeBlear(String... blears) {
        for (String blear : blears) {
            removeBlear(blear);
        }
    }

    
    public Boolean renameIfAbsent(String oldKey, String newKey) {
        return redisTemplate.renameIfAbsent(oldKey, newKey);
    }

    
    public void removeBlear(String blear) {
        redisTemplate.delete(redisTemplate.keys(blear));
    }

    
    public void removeByRegular(String... blears) {
        for (String blear : blears) {
            removeBlear(blear);
        }
    }

    
    public void removeByRegular(String blear) {
        Set<String> stringSet = getAllKeys();
        for (String s : stringSet) {
            if (Pattern.compile(blear).matcher(s).matches()) {
                redisTemplate.delete(s);
            }
        }
    }

    
    public void removeMapFieldByRegular(String key, String... blears) {
        for (String blear : blears) {
            removeMapFieldByRegular(key, blear);
        }
    }

    
    public void removeMapFieldByRegular(String key, String blear) {
        Map<String, Object> map = getMap(key);
        Set<String> stringSet = map.keySet();
        for (String s : stringSet) {
            if (Pattern.compile(blear).matcher(s).matches()) {
                redisTemplate.boundHashOps(key).delete(s);
            }
        }
    }

    
    public Long removeZSetValue(String key, Object... value) {
        return redisTemplate.boundZSetOps(key).remove(value);
    }

    
    public void removeZSet(String key) {
        removeZSetRange(key, 0L, getZSetSize(key));
    }

    
    public void removeZSetRange(String key, Long start, Long end) {
        redisTemplate.boundZSetOps(key).removeRange(start, end);
    }

    
    public void setZSetUnionAndStore(String key,String key1, String key2) {
        redisTemplate.boundZSetOps(key).unionAndStore(key1,key2);
    }

    
    public Set<Object> getZSetRange(String key) {
        return getZSetRange(key, 0, getZSetSize(key));
    }

    
    public Set<Object> getZSetRange(String key, long s, long e) {
        return redisTemplate.boundZSetOps(key).range(s, e);
    }

    
    public Set<Object> getZSetReverseRange(String key) {
        return getZSetReverseRange(key, 0, getZSetSize(key));
    }

    
    public Set<Object> getZSetReverseRange(String key, long start, long end) {
        return redisTemplate.boundZSetOps(key).reverseRange(start, end);
    }

    
    public Set<Object> getZSetRangeByScore(String key, double start, double end) {
        return redisTemplate.boundZSetOps(key).rangeByScore(start, end);
    }
    
    public Set<Object> getZSetReverseRangeByScore(String key, double start, double end) {
        return redisTemplate.boundZSetOps(key).reverseRangeByScore(start, end);
    }

    
    public Set<ZSetOperations.TypedTuple<Object>> getZSetRangeWithScores(String key, long start, long end) {
        return redisTemplate.boundZSetOps(key).rangeWithScores(start, end);
    }

    
    public Set<ZSetOperations.TypedTuple<Object>> getZSetReverseRangeWithScores(String key, long start, long end) {
        return redisTemplate.boundZSetOps(key).reverseRangeWithScores(start, end);
    }

    
    public Set<ZSetOperations.TypedTuple<Object>> getZSetRangeWithScores(String key) {
        return getZSetRangeWithScores(key, 0, getZSetSize(key));
    }

    
    public Set<ZSetOperations.TypedTuple<Object>> getZSetReverseRangeWithScores(String key) {
        return getZSetReverseRangeWithScores(key, 0, getZSetSize(key));
    }

    
    public long getZSetCountSize(String key, double sMin, double sMax) {
        return redisTemplate.boundZSetOps(key).count(sMin, sMax);
    }

    
    public long getZSetSize(String key) {
        return redisTemplate.boundZSetOps(key).size();
    }

    
    public double getZSetScore(String key, Object value) {
        return redisTemplate.boundZSetOps(key).score(value);
    }

    
    public double incrementZSetScore(String key, Object value, double delta) {
        return redisTemplate.boundZSetOps(key).incrementScore(value, delta);
    }

    
    public Boolean addZSet(String key, double score, Object value) {
        return redisTemplate.boundZSetOps(key).add(value, score);
    }

    
    public Long addZSet(String key, TreeSet<Object> value) {
        return redisTemplate.boundZSetOps(key).add(value);
    }

    
    public Boolean addZSet(String key, double[] score, Object[] value) {
        if (score.length != value.length) {
            return false;
        }
        for (int i = 0; i < score.length; i++) {
            if (addZSet(key, score[i], value[i]) == false) {
                return false;
            }
        }
        return true;
    }

    
    public void remove(String key) {
        if (exists(key)) {
            redisTemplate.delete(key);
        }
    }

    
    public void removeZSetRangeByScore(String key,double s , double e) {
        redisTemplate.boundZSetOps(key).removeRangeByScore(s,e);
    }

    
    public Boolean setSetExpireTime(String key, Long time) {
        return redisTemplate.boundSetOps(key).expire(time, TimeUnit.SECONDS);
    }

    
    public Boolean setZSetExpireTime(String key, Long time) {
        return redisTemplate.boundZSetOps(key).expire(time, TimeUnit.SECONDS);
    }

    
    public boolean exists(String key) {
        return redisTemplate.hasKey(key);
    }
    public Object get(int key) {
        return this.get(String.valueOf(key));
    }
    public Object get(long key) {
        return this.get(String.valueOf(key));
    }
    public Object get(String key) {
        return redisTemplate.boundValueOps(key).get();
    }

    
    public List<Object> get(String... keys) {
        List<Object> list = new ArrayList<Object>();
        for (String key : keys) {
            list.add(get(key));
        }
        return list;
    }

    
    public List<Object> getByRegular(String regKey) {
        Set<String> stringSet = getAllKeys();
        List<Object> objectList = new ArrayList<Object>();
        for (String s : stringSet) {
            if (Pattern.compile(regKey).matcher(s).matches() && getType(s) == DataType.STRING) {
                objectList.add(get(s));
            }
        }
        return objectList;
    }

    
    public void set(long key, Object value) {
       this.set(String.valueOf(key) ,value);
    }
    public void set(int key, Object value) {
        this.set(String.valueOf(key) ,value);
     }
    public void set(String key, Object value) {
        redisTemplate.boundValueOps(key).set(value);
    }

    
    public void set(String key, Object value, Long expireTime) {
        redisTemplate.boundValueOps(key).set(value, expireTime, TimeUnit.SECONDS);
    }

    
    public boolean setExpireTime(String key, Long expireTime) {
        return redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);
    }


    
    public DataType getType(String key) {
        return redisTemplate.type(key);
    }


    
    public void removeMapField(String key, Object... field) {
        redisTemplate.boundHashOps(key).delete(field);
    }

    
    public Long getMapSize(String key) {
        return redisTemplate.boundHashOps(key).size();
    }

    
    public Map<String, Object> getMap(String key) {
        return redisTemplate.boundHashOps(key).entries();
    }

    
    public <T> T getMapField(String key, String field) {
        return (T) redisTemplate.boundHashOps(key).get(field);
    }

    
    public Boolean hasMapKey(String key, String field) {
        return redisTemplate.boundHashOps(key).hasKey(field);
    }

    
    public List<Object> getMapFieldValue(String key) {
        return redisTemplate.boundHashOps(key).values();
    }

    
    public Set<Object> getMapFieldKey(String key) {
        return redisTemplate.boundHashOps(key).keys();
    }

    
    public void addMap(String key, Map<String, Object> map) {
        redisTemplate.boundHashOps(key).putAll(map);
    }

    
    public void addMap(String key, String field, Object value) {
        redisTemplate.boundHashOps(key).put(field, value);
    }

    
    public void addMap(String key, String field, Object value, long time) {
        redisTemplate.boundHashOps(key).put(field, value);
        redisTemplate.boundHashOps(key).expire(time, TimeUnit.SECONDS);
    }

    
    public void watch(String key) {
        redisTemplate.watch(key);
    }

    
    public void addSet(String key, Object... obj) {
        redisTemplate.boundSetOps(key).add(obj);
    }

    
    public long removeSetValue(String key, Object obj) {
        return redisTemplate.boundSetOps(key).remove(obj);
    }

    
    public long removeSetValue(String key, Object... obj) {
        if (obj != null && obj.length > 0) {
            return redisTemplate.boundSetOps(key).remove(obj);
        }
        return 0L;
    }

    
    public long getSetSize(String key) {
        return redisTemplate.boundSetOps(key).size();
    }

    
    public Boolean hasSetValue(String key, Object obj) {
        Boolean boo = null;
        int t =0;
        while (true){
            try {
                boo = redisTemplate.boundSetOps(key).isMember(obj);
                break;
            } catch (Exception e) {
                logger.error("key[" + key + "],obj[" + obj + "]判断Set中的值是否存在失败,异常信息:" + e.getMessage());
                t++;
            }
            if(t>times){
                break;
            }
        }
        logger.info("key[" + key + "],obj[" + obj + "]是否存在,boo:" + boo);
        return boo;
    }

    
    public Set<Object> getSet(String key) {
        return redisTemplate.boundSetOps(key).members();
    }

    
    public Set<Object> getSetUnion(String key, String otherKey) {
        return redisTemplate.boundSetOps(key).union(otherKey);
    }

    
    public Set<Object> getSetUnion(String key, Set<Object> set) {
        return redisTemplate.boundSetOps(key).union(set);
    }

    
    public Set<Object> getSetIntersect(String key, String otherKey) {
        return redisTemplate.boundSetOps(key).intersect(otherKey);
    }

    
    public Set<Object> getSetIntersect(String key, Set<Object> set) {
        return redisTemplate.boundSetOps(key).intersect(set);
    }

}

第三、配置fastjson序列化:
听说某宝的fastjson的序列化和json库性能挺好的下面整合一下这里只配置一下序列化,具体springboot怎么整合fastjson请看我的博客列表。
首先自定义一个序列化的类


public class FastJson2JsonRedisSerializer<T> implements RedisSerializer<T> {

    public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");

    private Class<T> clazz;

    public FastJson2JsonRedisSerializer(Class<T> clazz) {
        super();
        this.clazz = clazz;
    }

    public byte[] serialize(T t) throws SerializationException {
        if (t == null) {
            return new byte[0];
        }
        return JSON.toJSONString(t, SerializerFeature.WriteClassName).getBytes(DEFAULT_CHARSET);
    }

    public T deserialize(byte[] bytes) throws SerializationException {
        if (bytes == null || bytes.length <= 0) {
            return null;
        }
        String str = new String(bytes, DEFAULT_CHARSET);

        return (T) JSON.parseObject(str, clazz);
    }

}

在redis config中添加一个bean

  @Bean
    public RedisSerializer fastJson2JsonRedisSerializer() {
        ParserConfig.getGlobalInstance().setAutoTypeSupport(true); 
        return new FastJson2JsonRedisSerializer<Object>(Object.class);
    }

配置序列化

@Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory, RedisSerializer fastJson2JsonRedisSerializer) {
        StringRedisTemplate redisTemplate = new StringRedisTemplate(factory);        redisTemplate.setConnectionFactory(redisConnectionFactory());
        //redis   开启事务
        redisTemplate.setEnableTransactionSupport(true);
        //hash  使用jdk  的序列化
        redisTemplate.setHashValueSerializer(fastJson2JsonRedisSerializer/*new JdkSerializationRedisSerializer()*/);
        //StringRedisSerializer  key  序列化
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        //keySerializer  对key的默认序列化器。默认值是StringSerializer
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        //  valueSerializer
        redisTemplate.setValueSerializer(fastJson2JsonRedisSerializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

第四,编写测试类

     @Test
    public void testQueryById()  {
        long id = 1000;
        try {
            Object seckillObj= redisHandle.get(id);
            if(seckillObj==null) {
                seckillObj=seckillDao.queryById(id);
            }
            Seckill seckill=(Seckill) seckillObj;
             redisHandle.set(id, seckill);
        
            System.out.println(seckill.getName());
            System.out.println(seckill);
        }catch(Exception e) {
            e.printStackTrace();
        }
    }

测试时,要打开服务端Redis中间件的服务;才能用服务端的缓存,具体的参考上一节的安装步骤。

测试结果都正常;

第五,进一步修改缓存代码,在SeckillServiceImpl.java当中使用缓存优化,例如如下暴露接口的方法代码是通过redis缓存来实现的。

/**
     * 秒杀借口 秒杀开启时输出地址否则输出系统时间和接口时间
     */
    public Exposer  exportSeckillUrl(long seckillId) {
        Object seckillObj= redisHandle.get(String.valueOf(seckillId));
        if(seckillObj==null) {
            seckillObj=seckillDao.queryById(seckillId);
        }
        Seckill seckill=(Seckill) seckillObj;
        if(seckill==null) {
            return new Exposer(false,seckillId);
        }else {
            redisHandle.set(seckillId+"", seckill);
            Date startTime=seckill.getStartTime();
            Date endTime=seckill.getEndTime();
            Date nowTime=new Date();
            if(nowTime.getTime()<startTime.getTime()||nowTime.getTime()>endTime.getTime()) {
                return new Exposer(false,seckillId,nowTime.getTime(),startTime.getTime(),endTime.getTime());
            }else {
                String md5=getMD5(seckillId);
                return new Exposer(true,md5,seckillId);
            } 
        }       
    }

3 并发优化
3.1 优化分析
这一部分主要是针对秒杀操作进行并发优化的;秒杀操作是作为一个事务来执行的。
前面已经分析过:Update减库存—>insert购买明细—>commit/rollback:这个事务作为一个原子,里面两个过程都存在网路延迟和GC。


这里写图片描述

原来的流程:

第一阶段:秒杀开始先Update更新库存,根据结果记录数量决定是否插入明细。这个过程中存在网络延迟,数据库事务持有行级锁。

第二阶段:根据插入insert的结果,最后执行commit/rollback,这个阶段也存在网络延迟,数据库事务持有行级锁。

最终:行级锁经历了两次的java代码执行+网络延迟+GC

方案一:Sql执行顺序调整

第一阶段:先插入Insert明细(同时根据主键判断了是否重复秒杀),根据返回结果判断如果不是重复秒杀则表明插入成功,然后进入第二阶段;该阶段虽然存在网络延迟但是没有持有行级锁;

第二阶段:直接拿到行级锁,然后更新Update库存,最后根据返回结果决定commit/rollback;

该阶段持有网络延迟并且持有行级锁。

最终:行级锁经历了一次的java代码执行+网络延迟+GC;这种策略将只在最后的更新操作中持有行级锁,降低了commit/rollback的持有时间,访问速度提高到了原来的2倍。

方案二:服务端使用存储过程

这种策略直接在服务端使用存储过程将两个阶段insert和update操作直接绑定在一起,这样行级锁commit/rollback的持有在Mysql端就执行完成结束了,然后通过网络返回结果。

最终:该策略相比于方案一,屏蔽掉了所有的网络延迟,大大的提高了访问速度,可以让Mysql获得更高的QPS,所以可以把它叫做深度优化。

3.2 SQL顺序调整优化编码实现

方案一是利用SQL顺序的调整减掉一半的行级锁持有时间,在Service实现类SeckillServiceImpl中调整:

     @Transactional  
             //秒杀是否成功,成功:减库存,增加明细;失败:抛出异常,事务回滚  
            public SeckillExecution executeSeckill(long seckillId, long userPhone, String md5)  
                    throws SeckillException, RepeatKillException, SeckillCloseException {  
                if (md5==null||!md5.equals(getMD5(seckillId)))  
                {  
                    throw new SeckillException("seckill data rewrite");//秒杀数据被重写了  
                }  
                //执行秒杀逻辑:减库存+增加购买明细  
                Date nowTime=new Date();  
           
                 
                //第二个优化点:秒杀操作  
                //调整insert  
                try{  
                     
                  //否则更新了库存,秒杀成功,增加明细  
                    int insertCount=successKilledDao.insertSuccessKilled(seckillId,userPhone);  
                    //看是否该明细被重复插入,即用户是否重复秒杀(唯一主键:seckillId,userPhone)  
                    if (insertCount<=0)  
                    {  
                        throw new RepeatKillException("seckill repeated");  
                    }  
                    else {  
                      //减库存,热点商品竞争  
                        int updateCount=seckillDao.reduceNumber(seckillId,nowTime);  
                        if (updateCount<=0)  
                        {  
                            //没有更新库存记录,说明秒杀结束 ----rollback  
                            throw new SeckillCloseException("seckill is closed");  
                        }  
                        else {  
                            //秒杀成功,得到成功插入的明细记录,并返回成功秒杀的信息---commit  
                            SuccessKilled successKilled=successKilledDao.queryByIdWithSeckill(seckillId,userPhone);  
                            return new SeckillExecution(seckillId,SeckillStateEnum.SUCCESS,successKilled);  
                        }  
                    }  
           
                }catch (SeckillCloseException e1)  
                {  
                    throw e1;  
                }catch (RepeatKillException e2)  
                {  
                    throw e2;  
                }catch (Exception e)  
                {  
                    logger.error(e.getMessage(),e);  
                    //所以编译期异常转化为运行期异常  
                    throw new SeckillException("seckill inner error :"+e.getMessage());  
                }  
           
            }

3.3 深度优化

深度优化是使用方案二:使事务SQL在MySQL端利用存储过程执行,Mysql端只用返回执行的最终结果就行了,这样既可以完全屏蔽掉了网络延迟和GC影响。
3.3.1 创建存储过程

编写SQL语句,建立存储过程:

      --使用存储过程执行秒杀  
        DELIMITER$$ -- console;转换为$$;定义换行符:表示  
           
        -- 定义存储过程  
        -- 参数:in 输入参数;out 输出参数  
        --row_count():返回上一条修改类型sql(delete,insert,update)的影响行数。  
        --row_count():0:未修改数据;>0:表示修改数据的行数;<0:sql错误/未执行修改sql。  
        CREATE PROCEDURE execute_seckill(in v_seckill_id bigint,in v_phone bigint,  
                                                        in v_kill_time timestamp,out r_result int)  
        BEGIN  
            DECLARE insert_count INT DEFAULT 0;  
             
            START TRANSACTION ;  
             
            INSERT ignoresuccess_killed(seckill_id,user_phone,create_time)  
            VALUES(v_seckill_id,v_phone,v_kill_time); -- 先插入购买明细  
             
            SELECT ROW_COUNT() INTO insert_count;  
            IF(insert_count = 0) THEN  
              ROLLBACK ;  
              SET r_result = -1;   -- 重复秒杀  
            ELSEIF(insert_count < 0) THEN  
              ROLLBACK ;  
              SET r_result = -2;   -- 内部错误  
            ELSE  -- 已经插入购买明细,接下来要减少库存  
              update seckill  
              set number = number -1  
              WHERE seckill_id = v_seckill_id  
                     AND start_time < v_kill_time  
                     AND end_time > v_kill_time  
                     AND number > 0;  
               
              select ROW_COUNT() INTO insert_count;  
              IF (insert_count = 0)  THEN  
                ROLLBACK ;  
                SET r_result = 0;   -- 库存没有了,代表秒杀已经关闭  
              ELSEIF (insert_count < 0) THEN  
                ROLLBACK ;  
                SET r_result = -2;   -- 内部错误  
              ELSE  
                COMMIT ;    -- 秒杀成功,事务提交  
                SET r_result = 1;   -- 秒杀成功返回值为1  
              END IF;  
            END IF;  
          END  
        $$  
           
        -- 测试  
        DELIMITER;-- 把DELIMITER重新定义还原成分号;  
           
        SET @r_result =-3;  
        -- 执行存储过程  
        CALLexecute_seckill(1003,18864598658,now(),@r_result);  
        -- 获取结果  
        select @r_result;  
           
           
        drop procedure execute_seckill; -- 删除存储过程
 

按照上述的SQL语句在mysql数据库查询中执行,创建数据库的存储过程execute_seckill,然后用下面的语句执行存储过程测试。

使用存储过程:

1、使用存储过程优化:降低了事务行级锁持有的时间;

2、但是不要过度依赖存储过程,要根据实际需求而定;

3、简单的逻辑可以应用存储过程

4、QPS得到提升,一个秒杀单可以接近6000/qps

3.3.2 Service调用Procedure实现

第一步,(Mybatis)在SeckillDao.java接口中,添加调用存储过程的方法声明:

        /** 
             * 秒杀操作优化: 
             *  使用存储过程执行秒杀 
             * @param paramMap 
             */  
            void killByProcedure(Map<String,Object> paramMap);
第二步,(Mybatis)在SeckillDao.xml配置文件当中,编写SQL语句,带入参数,调用存储过程:
        <!--秒杀操作优化存储部分 -->  
           <!-- mybatis调用存储过程 id和接口中的方法想偶同,传入参数-->  
           <select id="killByProcedure"statementType="CALLABLE">  
           callexecute_seckill(  
           #{seckillId,jdbcType=BIGINT,mode=IN},  
           #{phone,jdbcType=BIGINT,mode=IN},  
           #{killTime,jdbcType=TIMESTAMP,mode=IN},  
           #{result,jdbcType=INTEGER,mode=OUT}  
            )  
           </select> 
第三步,在SeckillService.java接口中声明方法executeSeckillProcedure
        /** 
             * 执行秒杀操作 By存储过程 
             * @param seckillId 
             * @param userPhone 
             * @param md5 
             * @return 
             */  
            SeckillExecution executeSeckillProcedure(long seckillId,long userPhone,String md5)  
                    throws SeckillException,RepeatKillException,SeckillCloseException; 
第四步,在SeckillServiceImpl.java这个实现类中实现上述定义的方法,在Java客户端调用存户过程:
            /** 
             * 通过java客户端调用存储过程 
             * 开发使用存储过程的秒杀逻辑 
             */  
            public SeckillExecution executeSeckillProcedure(long seckillId,  
                    long userPhone, String md5) throws SeckillException,  
                    RepeatKillException, SeckillCloseException {  
                if (md5==null||!md5.equals(getMD5(seckillId)))  
                {  
                    return new SeckillExecution(seckillId,SeckillStateEnum.DATA_REWRITE);  
                }  
                //执行秒杀逻辑:减库存+增加购买明细  
                Date killTime=new Date();  
                Map<String,Object> map=new HashMap<String,Object>();  
                map.put("seckillId", seckillId);  
                map.put("phone", userPhone);  
                map.put("killTime", killTime);  
                map.put("result", null);  
                //执行存储过程,result被复制  
                try{  
                seckillDao.killByProcedure(map);  
                //获取result  
                //此处要在pom.xm,中引入MapUtil用于获取集合内的值 
                int result=MapUtils.getInteger(map,"result",-2);  
                if(result==1)  
                {  
              SuccessKilledsk=successKilledDao.queryByIdWithSeckill(seckillId,userPhone);  
                        return new SeckillExecution(seckillId,SeckillStateEnum.SUCCESS,sk);  
                }  
                else  
                {  
                     returnnew SeckillExecution(seckillId,SeckillStateEnum.stateOf(result));  
                }  
                }  
                catch(Exception e)  
                {  
                logger.error(e.getMessage(),e);  
                returnnew SeckillExecution(seckillId,SeckillStateEnum.INNER_ERROR);  
                }         
            }  
第五步:开启Mysql服务和Redis服务
第六步,在SeckillServiceTest.java类中编写测试方法:
    @Test  
        public void executeSeckillProcedureTest()  
        {  
        long seckillId=1001;  
        long phone=13476191899l;  
   Exposerexposer=seckillService.exportSeckillUrl(seckillId);  
         
        if(exposer.isExposed())  
        {  
             Stringmd5=exposer.getMd5();  
             SeckillExecutionexecution=seckillService.executeSeckillProcedure(seckillId, phone, md5);  
             logger.info(execution.getStateInfo());  
             System.out.println(execution.getStateInfo());  
         
        }         
       }  

输出秒杀重复或者秒杀成功,表示测试成功。
4 系统部署架构
系统可能用到哪些服务?
  CDN:动静态资源分离
 WebServer:Nginx+Jetty服务器容器框架
 Redis:服务端缓存
 Mysql:数据库,事务,保证数据的一致性和完整性。
系统部署架构:
![这里写图片描述](http://upload-images.jianshu.io/upload_images/7822142-aca4f67df4c500a3?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
                  
Jetty逻辑机群:用于存放开发的逻辑代码
可能参与的人员角色分配:
![这里写图片描述](http://upload-images.jianshu.io/upload_images/7822142-7d15e0de255ab97a?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
              
4 总结

数据层

  数据库技术:数据库设计和实现

  Mybatis理解和使用技巧:和数据表对应的entity—--Dao接口--—Dao接口配置sql语句的文件。

 Mybatis和Spring的整合技巧:包扫描/对象的注入

业务层技术回顾

 站在使用者的角度上进行业务接口设计和封装

 SpringIOC配置技巧:注入

  Spring声明式事务的使用和理解
Web技术回顾
Restful接口的运用:post/get
Spring MVC的使用技巧
前端交互分析过程
Bootstrap和JS的使用:使用现有的格式,使用模块/对象类似的划分。
并发优化
系统优化点的分析和抽取

 事务、锁、网络延迟理解

  前端,CDN,缓存等理解和使用

 集群化部署
 文章地址:http://www.haha174.top/article/details/254401
 源码链接:https://github.com/haha174/seckill.git
 教程地址 :http://www.imooc.com/learn/632



最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,816评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,729评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,300评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,780评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,890评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,084评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,151评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,912评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,355评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,666评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,809评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,504评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,150评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,121评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,628评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,724评论 2 351

推荐阅读更多精彩内容