聊聊flink的HistoryServer

本文主要研究一下flink的HistoryServer

HistoryServer

flink-1.7.2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java

public class HistoryServer {

    private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class);

    private final Configuration config;

    private final String webAddress;
    private final int webPort;
    private final long webRefreshIntervalMillis;
    private final File webDir;

    private final HistoryServerArchiveFetcher archiveFetcher;

    @Nullable
    private final SSLHandlerFactory serverSSLFactory;
    private WebFrontendBootstrap netty;

    private final Object startupShutdownLock = new Object();
    private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
    private final Thread shutdownHook;

    public static void main(String[] args) throws Exception {
        ParameterTool pt = ParameterTool.fromArgs(args);
        String configDir = pt.getRequired("configDir");

        LOG.info("Loading configuration from {}", configDir);
        final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir);

        try {
            FileSystem.initialize(flinkConfig);
        } catch (IOException e) {
            throw new Exception("Error while setting the default filesystem scheme from configuration.", e);
        }

        // run the history server
        SecurityUtils.install(new SecurityConfiguration(flinkConfig));

        try {
            SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    HistoryServer hs = new HistoryServer(flinkConfig);
                    hs.run();
                    return 0;
                }
            });
            System.exit(0);
        } catch (Throwable t) {
            final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
            LOG.error("Failed to run HistoryServer.", strippedThrowable);
            strippedThrowable.printStackTrace();
            System.exit(1);
        }
    }

    public HistoryServer(Configuration config) throws IOException, FlinkException {
        this(config, new CountDownLatch(0));
    }

    public HistoryServer(Configuration config, CountDownLatch numFinishedPolls) throws IOException, FlinkException {
        Preconditions.checkNotNull(config);
        Preconditions.checkNotNull(numFinishedPolls);

        this.config = config;
        if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.isRestSSLEnabled(config)) {
            LOG.info("Enabling SSL for the history server.");
            try {
                this.serverSSLFactory = SSLUtils.createRestServerSSLEngineFactory(config);
            } catch (Exception e) {
                throw new IOException("Failed to initialize SSLContext for the history server.", e);
            }
        } else {
            this.serverSSLFactory = null;
        }

        webAddress = config.getString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS);
        webPort = config.getInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT);
        webRefreshIntervalMillis = config.getLong(HistoryServerOptions.HISTORY_SERVER_WEB_REFRESH_INTERVAL);

        String webDirectory = config.getString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR);
        if (webDirectory == null) {
            webDirectory = System.getProperty("java.io.tmpdir") + File.separator + "flink-web-history-" + UUID.randomUUID();
        }
        webDir = new File(webDirectory);

        String refreshDirectories = config.getString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS);
        if (refreshDirectories == null) {
            throw new FlinkException(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS + " was not configured.");
        }
        List<RefreshLocation> refreshDirs = new ArrayList<>();
        for (String refreshDirectory : refreshDirectories.split(",")) {
            try {
                Path refreshPath = WebMonitorUtils.validateAndNormalizeUri(new Path(refreshDirectory).toUri());
                FileSystem refreshFS = refreshPath.getFileSystem();
                refreshDirs.add(new RefreshLocation(refreshPath, refreshFS));
            } catch (Exception e) {
                // there's most likely something wrong with the path itself, so we ignore it from here on
                LOG.warn("Failed to create Path or FileSystem for directory '{}'. Directory will not be monitored.", refreshDirectory, e);
            }
        }

        if (refreshDirs.isEmpty()) {
            throw new FlinkException("Failed to validate any of the configured directories to monitor.");
        }

        long refreshIntervalMillis = config.getLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL);
        archiveFetcher = new HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir, numFinishedPolls);

        this.shutdownHook = ShutdownHookUtil.addShutdownHook(
            HistoryServer.this::stop,
            HistoryServer.class.getSimpleName(),
            LOG);
    }

    @VisibleForTesting
    int getWebPort() {
        return netty.getServerPort();
    }

    public void run() {
        try {
            start();
            new CountDownLatch(1).await();
        } catch (Exception e) {
            LOG.error("Failure while running HistoryServer.", e);
        } finally {
            stop();
        }
    }

    // ------------------------------------------------------------------------
    // Life-cycle
    // ------------------------------------------------------------------------

    void start() throws IOException, InterruptedException {
        synchronized (startupShutdownLock) {
            LOG.info("Starting history server.");

            Files.createDirectories(webDir.toPath());
            LOG.info("Using directory {} as local cache.", webDir);

            Router router = new Router();
            router.addGet("/:*", new HistoryServerStaticFileServerHandler(webDir));

            if (!webDir.exists() && !webDir.mkdirs()) {
                throw new IOException("Failed to create local directory " + webDir.getAbsoluteFile() + ".");
            }

            createDashboardConfigFile();

            archiveFetcher.start();

            netty = new WebFrontendBootstrap(router, LOG, webDir, serverSSLFactory, webAddress, webPort, config);
        }
    }

    void stop() {
        if (shutdownRequested.compareAndSet(false, true)) {
            synchronized (startupShutdownLock) {
                LOG.info("Stopping history server.");

                try {
                    netty.shutdown();
                } catch (Throwable t) {
                    LOG.warn("Error while shutting down WebFrontendBootstrap.", t);
                }

                archiveFetcher.stop();

                try {
                    LOG.info("Removing web dashboard root cache directory {}", webDir);
                    FileUtils.deleteDirectory(webDir);
                } catch (Throwable t) {
                    LOG.warn("Error while deleting web root directory {}", webDir, t);
                }

                LOG.info("Stopped history server.");

                // Remove shutdown hook to prevent resource leaks
                ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
            }
        }
    }

    // ------------------------------------------------------------------------
    // File generation
    // ------------------------------------------------------------------------

    static FileWriter createOrGetFile(File folder, String name) throws IOException {
        File file = new File(folder, name + ".json");
        if (!file.exists()) {
            Files.createFile(file.toPath());
        }
        FileWriter fr = new FileWriter(file);
        return fr;
    }

    private void createDashboardConfigFile() throws IOException {
        try (FileWriter fw = createOrGetFile(webDir, "config")) {
            fw.write(createConfigJson(DashboardConfiguration.from(webRefreshIntervalMillis, ZonedDateTime.now())));
            fw.flush();
        } catch (IOException ioe) {
            LOG.error("Failed to write config file.");
            throw ioe;
        }
    }

    private static String createConfigJson(DashboardConfiguration dashboardConfiguration) throws IOException {
        StringWriter writer = new StringWriter();
        JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);

        gen.writeStartObject();
        gen.writeNumberField(DashboardConfiguration.FIELD_NAME_REFRESH_INTERVAL, dashboardConfiguration.getRefreshInterval());
        gen.writeNumberField(DashboardConfiguration.FIELD_NAME_TIMEZONE_OFFSET, dashboardConfiguration.getTimeZoneOffset());
        gen.writeStringField(DashboardConfiguration.FIELD_NAME_TIMEZONE_NAME, dashboardConfiguration.getTimeZoneName());
        gen.writeStringField(DashboardConfiguration.FIELD_NAME_FLINK_VERSION, dashboardConfiguration.getFlinkVersion());
        gen.writeStringField(DashboardConfiguration.FIELD_NAME_FLINK_REVISION, dashboardConfiguration.getFlinkRevision());

        gen.writeEndObject();

        gen.close();

        return writer.toString();
    }

    /**
     * Container for the {@link Path} and {@link FileSystem} of a refresh directory.
     */
    static class RefreshLocation {
        private final Path path;
        private final FileSystem fs;

        private RefreshLocation(Path path, FileSystem fs) {
            this.path = path;
            this.fs = fs;
        }

        public Path getPath() {
            return path;
        }

        public FileSystem getFs() {
            return fs;
        }
    }
}
  • HistoryServer提供了finished jobs的相关查询功能;构造器从配置中读取historyserver.web.address、historyserver.web.port(默认8082)、historyserver.web.refresh-interval(默认10秒)、historyserver.web.tmpdir、historyserver.archive.fs.dir、historyserver.archive.fs.refresh-interval(默认10秒),然后创建了HistoryServerArchiveFetcher
  • 其run方法主要是调用start方法,该方法主要是启动HistoryServerArchiveFetcher,然后创建WebFrontendBootstrap
  • 构造器使用ShutdownHookUtil.addShutdownHook注册了ShutdownHook,在shutdown时执行stop方法,stop方法主要是调用WebFrontendBootstrap的shutdown方法以及HistoryServerArchiveFetcher的stop方法,然后清理webDir,移除shutdownHook

HistoryServerArchiveFetcher

flink-1.7.2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java

class HistoryServerArchiveFetcher {

    private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class);

    private static final JsonFactory jacksonFactory = new JsonFactory();
    private static final ObjectMapper mapper = new ObjectMapper();

    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
        new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher"));
    private final JobArchiveFetcherTask fetcherTask;
    private final long refreshIntervalMillis;

    HistoryServerArchiveFetcher(long refreshIntervalMillis, List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numFinishedPolls) {
        this.refreshIntervalMillis = refreshIntervalMillis;
        this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir, numFinishedPolls);
        if (LOG.isInfoEnabled()) {
            for (HistoryServer.RefreshLocation refreshDir : refreshDirs) {
                LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath());
            }
        }
    }

    void start() {
        executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS);
    }

    void stop() {
        executor.shutdown();

        try {
            if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException ignored) {
            executor.shutdownNow();
        }
    }

    /**
     * {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS} for
     * new job archives.
     */
    static class JobArchiveFetcherTask extends TimerTask {

        private final List<HistoryServer.RefreshLocation> refreshDirs;
        private final CountDownLatch numFinishedPolls;

        /** Cache of all available jobs identified by their id. */
        private final Set<String> cachedArchives;

        private final File webDir;
        private final File webJobDir;
        private final File webOverviewDir;

        private static final String JSON_FILE_ENDING = ".json";

        JobArchiveFetcherTask(List<HistoryServer.RefreshLocation> refreshDirs, File webDir, CountDownLatch numFinishedPolls) {
            this.refreshDirs = checkNotNull(refreshDirs);
            this.numFinishedPolls = numFinishedPolls;
            this.cachedArchives = new HashSet<>();
            this.webDir = checkNotNull(webDir);
            this.webJobDir = new File(webDir, "jobs");
            webJobDir.mkdir();
            this.webOverviewDir = new File(webDir, "overviews");
            webOverviewDir.mkdir();
        }

        @Override
        public void run() {
            try {
                for (HistoryServer.RefreshLocation refreshLocation : refreshDirs) {
                    Path refreshDir = refreshLocation.getPath();
                    FileSystem refreshFS = refreshLocation.getFs();

                    // contents of /:refreshDir
                    FileStatus[] jobArchives;
                    try {
                        jobArchives = refreshFS.listStatus(refreshDir);
                    } catch (IOException e) {
                        LOG.error("Failed to access job archive location for path {}.", refreshDir, e);
                        continue;
                    }
                    if (jobArchives == null) {
                        continue;
                    }
                    boolean updateOverview = false;
                    for (FileStatus jobArchive : jobArchives) {
                        Path jobArchivePath = jobArchive.getPath();
                        String jobID = jobArchivePath.getName();
                        try {
                            JobID.fromHexString(jobID);
                        } catch (IllegalArgumentException iae) {
                            LOG.debug("Archive directory {} contained file with unexpected name {}. Ignoring file.",
                                refreshDir, jobID, iae);
                            continue;
                        }
                        if (cachedArchives.add(jobID)) {
                            try {
                                for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(jobArchive.getPath())) {
                                    String path = archive.getPath();
                                    String json = archive.getJson();

                                    File target;
                                    if (path.equals(JobsOverviewHeaders.URL)) {
                                        target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
                                    } else if (path.equals("/joboverview")) { // legacy path
                                        json = convertLegacyJobOverview(json);
                                        target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
                                    } else {
                                        target = new File(webDir, path + JSON_FILE_ENDING);
                                    }

                                    java.nio.file.Path parent = target.getParentFile().toPath();

                                    try {
                                        Files.createDirectories(parent);
                                    } catch (FileAlreadyExistsException ignored) {
                                        // there may be left-over directories from the previous attempt
                                    }

                                    java.nio.file.Path targetPath = target.toPath();

                                    // We overwrite existing files since this may be another attempt at fetching this archive.
                                    // Existing files may be incomplete/corrupt.
                                    Files.deleteIfExists(targetPath);

                                    Files.createFile(target.toPath());
                                    try (FileWriter fw = new FileWriter(target)) {
                                        fw.write(json);
                                        fw.flush();
                                    }
                                }
                                updateOverview = true;
                            } catch (IOException e) {
                                LOG.error("Failure while fetching/processing job archive for job {}.", jobID, e);
                                // Make sure we attempt to fetch the archive again
                                cachedArchives.remove(jobID);
                                // Make sure we do not include this job in the overview
                                try {
                                    Files.delete(new File(webOverviewDir, jobID + JSON_FILE_ENDING).toPath());
                                } catch (IOException ioe) {
                                    LOG.debug("Could not delete file from overview directory.", ioe);
                                }

                                // Clean up job files we may have created
                                File jobDirectory = new File(webJobDir, jobID);
                                try {
                                    FileUtils.deleteDirectory(jobDirectory);
                                } catch (IOException ioe) {
                                    LOG.debug("Could not clean up job directory.", ioe);
                                }
                            }
                        }
                    }
                    if (updateOverview) {
                        updateJobOverview(webOverviewDir, webDir);
                    }
                }
            } catch (Exception e) {
                LOG.error("Critical failure while fetching/processing job archives.", e);
            }
            numFinishedPolls.countDown();
        }
    }

    private static String convertLegacyJobOverview(String legacyOverview) throws IOException {
        JsonNode root = mapper.readTree(legacyOverview);
        JsonNode finishedJobs = root.get("finished");
        JsonNode job = finishedJobs.get(0);

        JobID jobId = JobID.fromHexString(job.get("jid").asText());
        String name = job.get("name").asText();
        JobStatus state = JobStatus.valueOf(job.get("state").asText());

        long startTime = job.get("start-time").asLong();
        long endTime = job.get("end-time").asLong();
        long duration = job.get("duration").asLong();
        long lastMod = job.get("last-modification").asLong();

        JsonNode tasks = job.get("tasks");
        int numTasks = tasks.get("total").asInt();
        int pending = tasks.get("pending").asInt();
        int running = tasks.get("running").asInt();
        int finished = tasks.get("finished").asInt();
        int canceling = tasks.get("canceling").asInt();
        int canceled = tasks.get("canceled").asInt();
        int failed = tasks.get("failed").asInt();

        int[] tasksPerState = new int[ExecutionState.values().length];
        // pending is a mix of CREATED/SCHEDULED/DEPLOYING
        // to maintain the correct number of task states we have to pick one of them
        tasksPerState[ExecutionState.SCHEDULED.ordinal()] = pending;
        tasksPerState[ExecutionState.RUNNING.ordinal()] = running;
        tasksPerState[ExecutionState.FINISHED.ordinal()] = finished;
        tasksPerState[ExecutionState.CANCELING.ordinal()] = canceling;
        tasksPerState[ExecutionState.CANCELED.ordinal()] = canceled;
        tasksPerState[ExecutionState.FAILED.ordinal()] = failed;

        JobDetails jobDetails = new JobDetails(jobId, name, startTime, endTime, duration, state, lastMod, tasksPerState, numTasks);
        MultipleJobsDetails multipleJobsDetails = new MultipleJobsDetails(Collections.singleton(jobDetails));

        StringWriter sw = new StringWriter();
        mapper.writeValue(sw, multipleJobsDetails);
        return sw.toString();
    }

    /**
     * This method replicates the JSON response that would be given by the JobsOverviewHandler when
     * listing both running and finished jobs.
     *
     * <p>Every job archive contains a joboverview.json file containing the same structure. Since jobs are archived on
     * their own however the list of finished jobs only contains a single job.
     *
     * <p>For the display in the HistoryServer WebFrontend we have to combine these overviews.
     */
    private static void updateJobOverview(File webOverviewDir, File webDir) {
        try (JsonGenerator gen = jacksonFactory.createGenerator(HistoryServer.createOrGetFile(webDir, JobsOverviewHeaders.URL))) {
            File[] overviews = new File(webOverviewDir.getPath()).listFiles();
            if (overviews != null) {
                Collection<JobDetails> allJobs = new ArrayList<>(overviews.length);
                for (File overview : overviews) {
                    MultipleJobsDetails subJobs = mapper.readValue(overview, MultipleJobsDetails.class);
                    allJobs.addAll(subJobs.getJobs());
                }
                mapper.writeValue(gen, new MultipleJobsDetails(allJobs));
            }
        } catch (IOException ioe) {
            LOG.error("Failed to update job overview.", ioe);
        }
    }
}
  • HistoryServerArchiveFetcher主要是以historyserver.archive.fs.refresh-interval的时间间隔从historyserver.archive.fs.dir目录拉取job archives;它内部创建了JobArchiveFetcherTask来执行这个任务
  • JobArchiveFetcherTask继承了jdk的TimerTask,其run方法就是遍历refreshDirs,然后执行FileSystem.listStatus,然后使用FsJobArchivist.getArchivedJsons获取ArchivedJson根据不同path写入到指定文件
  • 如果path是/jobs/overview,则写入webDir/overviews/jobID.json文件;如果path是/joboverview,则先调用convertLegacyJobOverview转换json,然后再写入webDir/overviews/jobID.json文件;其他的path则写入webDir/path.json文件

WebFrontendBootstrap

flink-1.7.2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/WebFrontendBootstrap.java

public class WebFrontendBootstrap {
    private final Router router;
    private final Logger log;
    private final File uploadDir;
    private final ServerBootstrap bootstrap;
    private final Channel serverChannel;
    private final String restAddress;

    public WebFrontendBootstrap(
            Router router,
            Logger log,
            File directory,
            @Nullable SSLHandlerFactory serverSSLFactory,
            String configuredAddress,
            int configuredPort,
            final Configuration config) throws InterruptedException, UnknownHostException {

        this.router = Preconditions.checkNotNull(router);
        this.log = Preconditions.checkNotNull(log);
        this.uploadDir = directory;

        ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) {
                RouterHandler handler = new RouterHandler(WebFrontendBootstrap.this.router, new HashMap<>());

                // SSL should be the first handler in the pipeline
                if (serverSSLFactory != null) {
                    ch.pipeline().addLast("ssl", serverSSLFactory.createNettySSLHandler());
                }

                ch.pipeline()
                    .addLast(new HttpServerCodec())
                    .addLast(new ChunkedWriteHandler())
                    .addLast(new HttpRequestHandler(uploadDir))
                    .addLast(handler.getName(), handler)
                    .addLast(new PipelineErrorHandler(WebFrontendBootstrap.this.log));
            }
        };

        NioEventLoopGroup bossGroup   = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        this.bootstrap = new ServerBootstrap();
        this.bootstrap
            .group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(initializer);

        ChannelFuture ch;
        if (configuredAddress == null) {
            ch = this.bootstrap.bind(configuredPort);
        } else {
            ch = this.bootstrap.bind(configuredAddress, configuredPort);
        }
        this.serverChannel = ch.sync().channel();

        InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();

        InetAddress inetAddress = bindAddress.getAddress();
        final String address;

        if (inetAddress.isAnyLocalAddress()) {
            address = config.getString(JobManagerOptions.ADDRESS, InetAddress.getLocalHost().getHostName());
        } else {
            address = inetAddress.getHostAddress();
        }

        int port = bindAddress.getPort();

        this.log.info("Web frontend listening at {}" + ':' + "{}", address, port);

        final String protocol = serverSSLFactory != null ? "https://" : "http://";

        this.restAddress = protocol + address + ':' + port;
    }

    public ServerBootstrap getBootstrap() {
        return bootstrap;
    }

    public int getServerPort() {
        Channel server = this.serverChannel;
        if (server != null) {
            try {
                return ((InetSocketAddress) server.localAddress()).getPort();
            }
            catch (Exception e) {
                log.error("Cannot access local server port", e);
            }
        }

        return -1;
    }

    public String getRestAddress() {
        return restAddress;
    }

    public void shutdown() {
        if (this.serverChannel != null) {
            this.serverChannel.close().awaitUninterruptibly();
        }
        if (bootstrap != null) {
            if (bootstrap.group() != null) {
                bootstrap.group().shutdownGracefully();
            }
            if (bootstrap.childGroup() != null) {
                bootstrap.childGroup().shutdownGracefully();
            }
        }
    }
}
  • WebFrontendBootstrap使用netty启动了一个http server,其pipeline有HttpServerCodec、ChunkedWriteHandler、HttpRequestHandler、RouterHandler、PipelineErrorHandler;其中这里的RouterHandler的Router有个GET的route,其使用的是HistoryServerStaticFileServerHandler,用于给HistoryServer提供静态文件服务

小结

  • HistoryServer提供了finished jobs的相关查询功能;其主要由HistoryServerArchiveFetcher以及WebFrontendBootstrap两部分组成;其run方法主要是调用start方法,该方法主要是启动HistoryServerArchiveFetcher,然后创建WebFrontendBootstrap
  • HistoryServerArchiveFetcher主要是以historyserver.archive.fs.refresh-interval的时间间隔从historyserver.archive.fs.dir目录拉取job archives;它内部创建了JobArchiveFetcherTask来执行这个任务;JobArchiveFetcherTask继承了jdk的TimerTask,其run方法就是遍历refreshDirs,然后执行FileSystem.listStatus,然后使用FsJobArchivist.getArchivedJsons获取ArchivedJson根据不同path写入到指定文件
  • WebFrontendBootstrap使用netty启动了一个http server,其pipeline有HttpServerCodec、ChunkedWriteHandler、HttpRequestHandler、RouterHandler、PipelineErrorHandler;其中这里的RouterHandler的Router有个GET的route,其使用的是HistoryServerStaticFileServerHandler,用于给HistoryServer提供静态文件服务

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,340评论 5 467
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,762评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,329评论 0 329
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,678评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,583评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,995评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,493评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,145评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,293评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,250评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,267评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,973评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,556评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,648评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,873评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,257评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,809评论 2 339

推荐阅读更多精彩内容