Dubbo的服务调用(心跳、编码、解码)

本系列主要参考官网文档、芋道源码的源码解读和《深入理解Apache Dubbo与实战》一书。Dubbo版本为2.6.1。
本篇用以分析心跳、编码、解码相关的代码。

文章内容顺序:
1.心跳
  1.1 为什么要有心跳?
  1.2 HeaderExchangeClient
  1.3 HeaderExchangeClient#startHeatbeatTimer
  1.4 HeartBeatTask#run
2.编码
  2.1 编码的链路:
  2.2 消息头的字段的意义
  2.3 ExchangeCodec
  2.4 序列化的多种实现
3.解码
  3.1 InternalDecoder#messageReceived
  3.2 DubboCountCodec#decode
  3.3 DubboCodec.decode(channel,buffer)
  3.4 DubboCodec#decodeBody
  3.5 DecodeableRpcInvocation#decode
  3.6 解码的方法调用顺序

首先是心跳相关

1.心跳

在上一篇服务调用的消费端中,我们介绍到HeaderExchangeClient的构造方法中会有心跳的一些逻辑,在那边一笔带过了,在这篇文章来详细看看。

1.1为什么要有心跳?

心跳间隔,对于长连接,当物理层断开时,比如拔网线,TCP的FIN消息来不及发送,对方收不到断开事件,此时需要心跳来帮助检查连接是否已断开

这里仍旧贴一下HeaderExchangeClient的构造方法

1.2HeaderExchangeClient

public HeaderExchangeClient(Client client, boolean needHeartbeat) {
        if (client == null) {
            throw new IllegalArgumentException("client == null");
        }
        this.client = client;
        
        // 创建 HeaderExchangeChannel 对象
        this.channel = new HeaderExchangeChannel(client);
        
        // 以下代码均与心跳检测逻辑有关
        String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
        this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
        this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        if (needHeartbeat) {
            // 开启心跳检测定时器
            startHeartbeatTimer();
        }
    }

此方法最后调用 #startHeatbeatTimer() 方法,发起心跳定时器。直接来看startHeatbeatTimer()方法的实现吧

1.3HeaderExchangeClient#startHeatbeatTimer

    private void startHeatbeatTimer() {
        // 停止原有定时任务
        stopHeartbeatTimer();
        // 发起新的定时任务
        if (heartbeat > 0) {
            heartbeatTimer = scheduled.scheduleWithFixedDelay(
                    new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                        public Collection<Channel> getChannels() {
                            return Collections.<Channel>singletonList(HeaderExchangeClient.this);
                        }
                    }, heartbeat, heartbeatTimeout),
                    heartbeat, heartbeat, TimeUnit.MILLISECONDS);
        }
    }

可以看到这边就是直接调用线程池进行了一个定时任务HeartBeatTask对象,scheduleWithFixedDelay方法意为当当前任务执行完毕后再隔多少秒进行下一个任务。

接下来我们来看一下HeartBeatTask的实现。

1.4HeartBeatTask#run

    public void run() {
        try {
            long now = System.currentTimeMillis();
            for (Channel channel : channelProvider.getChannels()) {
                if (channel.isClosed()) {
                    continue;
                }
                try {
                    Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);
                    Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
                    // 最后读写的时间,任一超过心跳间隔,发送心跳
                    if ((lastRead != null && now - lastRead > heartbeat)
                            || (lastWrite != null && now - lastWrite > heartbeat)) {
                        Request req = new Request();
                        req.setVersion("2.0.0");
                        req.setTwoWay(true); // 需要响应
                        req.setEvent(Request.HEARTBEAT_EVENT);
                        channel.send(req);
                        if (logger.isDebugEnabled()) {
                            logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                                    + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
                        }
                    }
                    // 最后读的时间,超过心跳超时时间
                    if (lastRead != null && now - lastRead > heartbeatTimeout) {
                        logger.warn("Close channel " + channel
                                + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
                        // 客户端侧,重新连接服务端
                        if (channel instanceof Client) {
                            try {
                                ((Client) channel).reconnect();
                            } catch (Exception e) {
                                //do nothing
                            }
                        // 服务端侧,关闭客户端连接
                        } else {
                            channel.close();
                        }
                    }
                } catch (Throwable t) {
                    logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
                }
            }
        } catch (Throwable t) {
            logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
        }
    }
  • 【任务一】:最后读或写的时间,任一超过心跳间隔 heartbeat ,发送心跳。
  • 【任务二】:最后读的时间,超过心跳超时时间 heartbeatTimeout ,分成两种情况:
    客户端侧,重连连接服务端。
    服务端侧,关闭客户端连接。

至此,心跳的分析告一段落。

接下来来看看编码的操作。

2.编码

2.1编码的链路

image.png

看上面这张图,当运行到NettyChannel#send的这一行后,就会跳进Netty的执行逻辑,最后由NettyCodecAdapter的内部类调用编码类执行编码操作

image.png

上图中继承的OneToOneEncoderNetty的抽象方法,那这个InternalEncoder是什么时候传进来的呢?

NettyClient#doOpen()方法中有如下代码,会在编码器、解码器等一并设置进来,从而使得最后编码、解码逻辑能交由Dubbo自己的类实现。

image.png

再贴一张编码的链路。

NettyChannel#send
->一系列Netty内部的方法
->NettyCodecAdapter内部类(继承了netty抽象类)#encode
->ExchangeCodec#encode
->ExchangeCodec#encodeRequest

image.png

可以看到最后是交由ExchangeCodec来执行编码的逻辑了。
那么这个链路就简单介绍到这,直接来看他是怎么实现的编码吧。

2.2消息头的字段的意义

先简单列举一下消息头的内容,其中的魔数是用来分割处理粘包问题的。

image.png

接下来就直接来看看ExchangeCodec的实现

2.3 ExchangeCodec

public class ExchangeCodec extends TelnetCodec {

    // 消息头长度
    protected static final int HEADER_LENGTH = 16;
    // 魔数内容
    protected static final short MAGIC = (short) 0xdabb;
    protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
    protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
    protected static final byte FLAG_REQUEST = (byte) 0x80;
    protected static final byte FLAG_TWOWAY = (byte) 0x40;
    protected static final byte FLAG_EVENT = (byte) 0x20;
    protected static final int SERIALIZATION_MASK = 0x1f;
    private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);

    public Short getMagicCode() {
        return MAGIC;
    }

    @Override
    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        if (msg instanceof Request) {
            // 对 Request 对象进行编码
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
            // 对 Response 对象进行编码,后面分析
            encodeResponse(channel, buffer, (Response) msg);
        } else {
            super.encode(channel, buffer, msg);
        }
    }

    protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
        Serialization serialization = getSerialization(channel);

        // 创建消息头字节数组,长度为 16
        byte[] header = new byte[HEADER_LENGTH];

        // 设置魔数
        Bytes.short2bytes(MAGIC, header);

        // 设置数据包类型(Request/Response)和序列化器编号
        header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

        // 设置通信方式(单向/双向)
        if (req.isTwoWay()) {
            header[2] |= FLAG_TWOWAY;
        }
        
        // 设置事件标识
        if (req.isEvent()) {
            header[2] |= FLAG_EVENT;
        }

        // 设置请求编号,8个字节,从第4个字节开始设置
        Bytes.long2bytes(req.getId(), header, 4);

        // 获取 buffer 当前的写位置
        int savedWriteIndex = buffer.writerIndex();
        // 更新 writerIndex,为消息头预留 16 个字节的空间
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        // 创建序列化器,比如 Hessian2ObjectOutput
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        if (req.isEvent()) {
            // 对事件数据进行序列化操作
            encodeEventData(channel, out, req.getData());
        } else {
            // 对请求数据进行序列化操作
            encodeRequestData(channel, out, req.getData(), req.getVersion());
        }
        out.flushBuffer();
        if (out instanceof Cleanable) {
            ((Cleanable) out).cleanup();
        }
        bos.flush();
        bos.close();
        
        // 获取写入的字节数,也就是消息体长度
        int len = bos.writtenBytes();
        checkPayload(channel, len);

        // 将消息体长度写入到消息头中
        Bytes.int2bytes(len, header, 12);

        // 将 buffer 指针移动到 savedWriteIndex,为写消息头做准备
        buffer.writerIndex(savedWriteIndex);
        // 从 savedWriteIndex 下标处写入消息头
        buffer.writeBytes(header);
        // 设置新的 writerIndex,writerIndex = 原写下标 + 消息头长度 + 消息体长度
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    }
    
    // 省略其他方法
}

以上就是请求对象的编码过程,该过程首先会通过位运算将消息头写入到 header 数组中。然后对 Request对象的 data字段执行序列化操作,序列化后的数据最终会存储到ChannelBuffer 中。序列化操作执行完后,可得到数据序列化后的长度 len,紧接着将 len写入到 header指定位置处。最后再将消息头字节数组 header写入到 ChannelBuffer 中,整个编码过程就结束了。

这里我们可以再来关注一下encodeEventData方法(encodeRequestData也是一样的实现。)

2.4 序列化的多种实现

image.png

通过一系列重载方法,我们可以看到最后调用了out.writeObeject
而这个out,则是在encodeRequest方法中通过url传过来的参数设置的,有多种不同的实现。
image.png

image.png

有关序列化的协议,可以简单参照下这篇博文: Dubbo协议及序列化

说完了编码,再来说说解码。

同样的,还是从解码的链路开始说起

3.解码

3.1 InternalDecoder#messageReceived

NettyCodecAdapter的内部类InternalDecoder#messageReceived方法
同样的,这个类也与编码的类一样,都是通过netty的pipeline设置进来的,上文已经介绍过了。

image.png

调用了DubboCountCodec#decode方法

3.2 DubboCountCodec#decode

public final class DubboCountCodec implements Codec2 {
    private DubboCodec codec = new DubboCodec();
 public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        // 记录当前读位置
        int save = buffer.readerIndex();
        // 创建 MultiMessage 对象
        MultiMessage result = MultiMessage.create();
        do {
            // 解码
            Object obj = codec.decode(channel, buffer);
            // 输入不够,重置读进度
            if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
                buffer.readerIndex(save);
                break;
            // 解析到消息
            } else {
                // 添加结果消息
                result.addMessage(obj);
                // 记录消息长度到隐式参数集合,用于 MonitorFilter 监控
                logMessageLength(obj, buffer.readerIndex() - save);
                // 记录当前读位置
                save = buffer.readerIndex();
            }
        } while (true);
        // 需要更多的输入
        if (result.isEmpty()) {
            return Codec2.DecodeResult.NEED_MORE_INPUT;
        }
        // 返回解析到的消息
        if (result.size() == 1) {
            return result.get(0);
        }
        return result;
    }
//省略其他代码
}

这边的codec指的就是DubboCodec。调用的是DubboCodec.decode(channel,buffer)

3.3 DubboCodec.decode(channel,buffer)

public class ExchangeCodec extends TelnetCodec {

    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        // 读取 Header 数组
        int readable = buffer.readableBytes();
        byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
        buffer.readBytes(header);
        // 解码
        return decode(channel, buffer, readable, header);
    }

    @Override
    protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
        // 非 Dubbo 协议,目前是 Telnet 命令。
        // check magic number.
        if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) {
            // 将 buffer 完全复制到 `header` 数组中。因为,上面的 `#decode(channel, buffer)` 方法,可能未读全
            int length = header.length;
            if (header.length < readable) {
                header = Bytes.copyOf(header, readable);
                buffer.readBytes(header, length, readable - length);
            }
            // 【TODO 8026 】header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW ?
            for (int i = 1; i < header.length - 1; i++) {
                if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                    buffer.readerIndex(buffer.readerIndex() - header.length + i);
                    header = Bytes.copyOf(header, i);
                    break;
                }
            }
            // 提交给父类( Telnet ) 处理,目前是 Telnet 命令。
            return super.   decode(channel, buffer, readable, header);
        }
        // Header 长度不够,返回需要更多的输入
        // check length.
        if (readable < HEADER_LENGTH) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // `[96 - 127]`:Body 的**长度**。通过该长度,读取 Body 。
        // get data length.
        int len = Bytes.bytes2int(header, 12);
        checkPayload(channel, len);

        // 总长度不够,返回需要更多的输入
        int tt = len + HEADER_LENGTH;
        if (readable < tt) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // 解析 Header + Body
        // limit input stream.
        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
        try {
      //子类重写的方法
            return decodeBody(channel, is, header);
        } finally {
            // skip 未读完的流,并打印错误日志
            if (is.available() > 0) {
                try {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Skip input stream " + is.available());
                    }
                    StreamUtils.skipUnusedStream(is);
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    }
//省略其他代码
}

这边对于再提一下,实际上这个方法是DubboCodec里的方法,但是ExchangeCodecDubboCodec的父类,并且在DubboCodec没有重写这个方法,所以debug会跳到父类的方法行(因为代码逻辑写在父类里)。

上面方法通过检测消息头中的魔数是否与规定的魔数相等,提前拦截掉非常规数据包,比如通过 telnet命令行发出的数据包。接着再对消息体长度,以及可读字节数进行检测。最后调用 decodeBody方法进行后续的解码工作。

注意在最后的try块中,会调用到DubboCodec的实现——DubboCodec#decodeBody。注意,从头到尾我们调用的都是DubboCodec类。

3.4 DubboCodec#decodeBody

    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        byte flag = header[2];
        // 获得 Serialization 对象
        byte proto = (byte) (flag & SERIALIZATION_MASK);
        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
        // 获得请求||响应编号
        // get request id.
        long id = Bytes.bytes2long(header, 4);
        // 解析响应
        if ((flag & FLAG_REQUEST) == 0) {
            // decode response.
            Response res = new Response(id);
            // 若是心跳事件,进行设置
            if ((flag & FLAG_EVENT) != 0) {
                res.setEvent(Response.HEARTBEAT_EVENT);
            }
            // 设置状态
            // get status.
            byte status = header[3];
            res.setStatus(status);
            // 正常响应状态
            if (status == Response.OK) {
                try {
                    Object data;
                    // 解码心跳事件
                    if (res.isHeartbeat()) {
                        data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                    // 解码其它事件
                    } else if (res.isEvent()) {
                        data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                    // 解码普通响应
                    } else {
                        DecodeableRpcResult result;
                        // 在通信框架(例如,Netty)的 IO 线程,解码
                        if (channel.getUrl().getParameter(Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                            result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto);
                            result.decode();
                        // 在 Dubbo ThreadPool 线程,解码,使用 DecodeHandler
                        } else {
                            result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto);
                        }
                        data = result;
                    }
                    // 设置结果
                    res.setResult(data);
                } catch (Throwable t) {
                    if (log.isWarnEnabled()) {
                        log.warn("Decode response failed: " + t.getMessage(), t);
                    }
                    res.setStatus(Response.CLIENT_ERROR);
                    res.setErrorMessage(StringUtils.toString(t));
                }
            // 异常响应状态
            } else {
                res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
            }
            return res;
        // 解析请求
        } else {
            // decode request.
            Request req = new Request(id);
            req.setVersion("2.0.0");
            // 是否需要响应
            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
            // 若是心跳事件,进行设置
            if ((flag & FLAG_EVENT) != 0) {
                req.setEvent(Request.HEARTBEAT_EVENT);
            }
            try {
                Object data;
                // 解码心跳事件
                if (req.isHeartbeat()) {
                    data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                // 解码其它事件
                } else if (req.isEvent()) {
                    data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                // 解码普通请求
                } else {
                    // 在通信框架(例如,Netty)的 IO 线程,解码
                    DecodeableRpcInvocation inv;
                    if (channel.getUrl().getParameter(Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                        inv = new DecodeableRpcInvocation(channel, req, is, proto);
                        inv.decode();
                    // 在 Dubbo ThreadPool 线程,解码,使用 DecodeHandler
                    } else {
                        inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                    }
                    data = inv;
                }
                req.setData(data);
            } catch (Throwable t) {
                if (log.isWarnEnabled()) {
                    log.warn("Decode request failed: " + t.getMessage(), t);
                }
                // bad request
                req.setBroken(true);
                req.setData(t);
            }
            return req;
        }
    }

如上,decodeBody对部分字段进行了解码,并将解码得到的字段封装到 Request 中。随后会调用 DecodeableRpcInvocation#decode 方法进行后续的解码工作。
要么是在本线程内解码,要么是交由work线程池执行,会在Dubbo的线程模型、handler
讲解如何交给其执行,又是怎么执行的。

再来看一下DecodeableRpcInvocation#decode方法

3.5 DecodeableRpcInvocation#decode

public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable {
    /**
     * 是否已经解码完成
     */
    private volatile boolean hasDecoded;

    @Override
    public void decode() {
        if (!hasDecoded && channel != null && inputStream != null) {
            try {
                decode(channel, inputStream);
            } catch (Throwable e) {
                if (log.isWarnEnabled()) {
                    log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
                }
                request.setBroken(true);
                request.setData(e);
            } finally {
                hasDecoded = true;
            }
        }
    }

    
    @Override
    public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                .deserialize(channel.getUrl(), input);

        // 通过反序列化得到 dubbo version,并保存到 attachments 变量中
        String dubboVersion = in.readUTF();
        request.setVersion(dubboVersion);
        setAttachment(Constants.DUBBO_VERSION_KEY, dubboVersion);

        // 通过反序列化得到 path,version,并保存到 attachments 变量中
        setAttachment(Constants.PATH_KEY, in.readUTF());
        setAttachment(Constants.VERSION_KEY, in.readUTF());

        // 通过反序列化得到调用方法名
        setMethodName(in.readUTF());
        try {
            Object[] args;
            Class<?>[] pts;
            // 通过反序列化得到参数类型字符串,比如 Ljava/lang/String;
            String desc = in.readUTF();
            if (desc.length() == 0) {
                pts = DubboCodec.EMPTY_CLASS_ARRAY;
                args = DubboCodec.EMPTY_OBJECT_ARRAY;
            } else {
                // 将 desc 解析为参数类型数组
                pts = ReflectUtils.desc2classArray(desc);
                args = new Object[pts.length];
                for (int i = 0; i < args.length; i++) {
                    try {
                        // 解析运行时参数
                        args[i] = in.readObject(pts[i]);
                    } catch (Exception e) {
                        if (log.isWarnEnabled()) {
                            log.warn("Decode argument failed: " + e.getMessage(), e);
                        }
                    }
                }
            }
            
            // 设置参数类型数组
            setParameterTypes(pts);

            // 通过反序列化得到原 attachment 的内容
            Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
            if (map != null && map.size() > 0) {
                Map<String, String> attachment = getAttachments();
                if (attachment == null) {
                    attachment = new HashMap<String, String>();
                }
                // 将 map 与当前对象中的 attachment 集合进行融合
                attachment.putAll(map);
                setAttachments(attachment);
            }
            
            // 对 callback 类型的参数进行处理
            for (int i = 0; i < args.length; i++) {
                args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
            }

            // 设置参数列表
            setArguments(args);

        } catch (ClassNotFoundException e) {
            throw new IOException(StringUtils.toString("Read invocation data failed.", e));
        } finally {
            if (in instanceof Cleanable) {
                ((Cleanable) in).cleanup();
            }
        }
        return this;
    }
}

可以看到DecodeableRpcInvocation #decode会先判断是否已经解码完成(这很重要,在交由Dubbo线程池执行的时候也会进到这个方法,如果已经解码过,就不进行下面的流程,如果已经没解码过,那么就会帮助执行解码操作),如果没有解码过,调用decode的重载方法。
重载方法通过反序列化将诸如 pathversion、调用方法名、参数列表等信息依次解析出来,并设置到相应的字段中,最终得到一个具有完整调用信息的 DecodeableRpcInvocation 对象。

3.6解码的方法调用顺序

所以解码调用的顺序为:

NettyCodecAdapter的内部类InternalDecoder#messageReceived
  ->DubboCountCodec#decode
    ->DubboCodec#decode(channel,buffer)//父类实现的方法
      ->DubboCodec#decode(channel,buffer,readable,header)//父类实现的方法
        ->Dubbo#decodeBody
          ->DecodeableRpcInvocation#decode//交由本线程或者业务线程池执行
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,542评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,822评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,912评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,449评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,500评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,370评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,193评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,074评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,505评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,722评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,841评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,569评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,168评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,783评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,918评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,962评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,781评论 2 354