附件分片下载、断点续传

附件分片下载、断点续传

场景引入

在最近接触的一个项目中碰到这样的场景一个加密过的文件存储在FastDFS中,在业务处理过程中需要将对应的附件下载下来并解密之后做相应的业务处理。

原有的处理方式是单线程模式下载并做相应的业务处理,当文件比较大时例如10G+的文件此时就会出现性能瓶颈。故此引入分片下载处理方案,开启多个线程同步下载附件,提高下载效率。

以下示例是是剥离业务功能后写的一个附件分片下载、断点续传demo 涵盖了核心实现原理

附件分片下载、断点续传 实现原理

核心实现原理

  1. 获取附件信息:附件大小、是否需要分片处理、详细分片信息、附件MD5

    • 更具附件ID获取附件信息

    • 通过分片阈值判断是否需要进行分片处理,需要分片处理则获取详细的分片信息

    • 计算附件MD5或每个分片的MD5,并缓存起来(相同附件ID直接从缓存中获取MD5)

  2. 根据步骤1获取的信息请求附件下载接口下载附件

    • 判断磁盘空间是否充裕

    • 根据分片信息调用下载接口下载附件,使用线程池执行下载合理高效使用计算机处理能力

    • 比对每个分片的MD5,所有分片下载成功后合并附件(可每个分片下载成功直接进入合并附件的执行步骤)。

  3. 分片下载完成后合并附件,并通过附件MD5校验附件下载的一致性

  4. 相应的业务处理

工业级代码考虑点

  1. duild once ,Run everywhere 为适应不同服务器公共的性能参数做到可配置:下载缓冲区大小、分片阈值、每个分片大小、下载存储临时目录、线程池大小、线程池队列大小

  2. 下载前做磁盘空间预判

  3. 保证每个分片下载的正确性(校验分片MD5) 并保证最终合并附件的正确性

  4. 利用HTTP 206 Partial Content实现断点续传

代码实现

获取附件信息

/**
     * 获取附件信息
     *
     * @return 附件详细信息
     */
    @RequestMapping("/getFileDetailInfo")
    public FileDetailInfo getFileDetailInfo(String fileId) {
        FileDetailInfo fileDetailInfo = new FileDetailInfo();
        //附件id 获取附件信息
        String path = getFilePath(fileId);
        File file = new File(path);
        if (file.exists() && file.isFile()) {
            fileDetailInfo.setSize(file.length());
            fileDetailInfo.setFilePath(path);
            fileDetailInfo.setFileName(file.getName());
            fileDetailInfo.setFileMd5(FileUtils.getFileMd5(path));
            if (file.length() > fileOperationProperties.getMultipartSizeLimit()) {
                fileDetailInfo.setMultipart(true);
                //超过分块阈值限制 进行附件分块
                long alreadyPartLength = 0;
                int part = 0;
                while (alreadyPartLength<file.length()) {
                    //起始分块位置
                    long off = alreadyPartLength;
                    //已分块的长度
                    alreadyPartLength = (alreadyPartLength + fileOperationProperties.getMultipartSize()) > file.length() ?
                            file.length() : alreadyPartLength + fileOperationProperties.getMultipartSize();
                    MultiPartFileInfo multiPartFileInfo = new MultiPartFileInfo();
                    multiPartFileInfo.setFileName(file.getName()+part);
                    multiPartFileInfo.setLen(alreadyPartLength - off);
                    //分块的MD5
                    multiPartFileInfo.setFileMd5(FileUtils.getFileMd5(file,multiPartFileInfo.getFileName() ,
                                off ,multiPartFileInfo.getLen() ));
                    multiPartFileInfo.setFilePath(fileDetailInfo.getFilePath());
                    part += 1;
                    multiPartFileInfo.setOff(off);

                    fileDetailInfo.addMultiPart(multiPartFileInfo);
                }
            }else{
                fileDetailInfo.setMultipart(false);
            }
        }else {
            throw new RuntimeException("操作异常!非文件或文件不存在");
        }
        return fileDetailInfo;
    }

下载功能 包含断点续传

 /**
     * 附件下载
     */
    @RequestMapping("/downloadFile")
    public void downloadFile(@RequestBody MultiPartFileInfo multiPartFileInfo , HttpServletResponse response , HttpServletRequest request) {
        //FIXME 优化代码
        File file = new File(multiPartFileInfo.getFilePath());
        if (file.exists() && file.isFile()) {
            OutputStream out = null;
            RandomAccessFile in = null;
            //下载起始位置
            long off = 0;
            int downloadSize = 0;
            try {
                response.setContentType("application/octet-stream");
                response.setHeader("Content-disposition", String.format("attachment; filename=\"%s\"",
                        new String(multiPartFileInfo.getFileName().getBytes("UTF-8"), "ISO8859-1")));

                response.setHeader("Accept-Ranges", "bytes");
                if (request.getHeader("Range") == null) {
                    response.setHeader("Content-Length", String.valueOf(multiPartFileInfo.getLen()));
                }else {
                    //解析断点续传
                    String range = request.getHeader("Range");
                    String[] bytes = range.replaceAll("bytes", "").split("-");
                    off = Long.parseLong(bytes[0]);
                    long end = 0;
                    if (bytes.length == 2) {
                        end = Long.parseLong(bytes[1]);
                    }
                    int length = 0;
                    if (end != 0 && end>off) {
                        length = Math.toIntExact(end - off);
                    }else{
                        length = Math.toIntExact(multiPartFileInfo.getLen() - off);
                    }
                    response.setHeader("Content-Length", String.valueOf(length));
                    downloadSize = length;
                }

                in = new RandomAccessFile(file,"rw");
                 out = response.getOutputStream();
                if (off == 0) {
                    off = multiPartFileInfo.getOff();
                }
                if (downloadSize == 0) {
                    downloadSize = Math.toIntExact(multiPartFileInfo.getLen());
                }
                byte[] bytes = new byte[fileOperationProperties.getReadBufLenSize()];
                int length = 0;
                //设置下载起始位置
                if (multiPartFileInfo.getOff() > 0) {
                    in.seek(off);
                }

                //预防读取超出分块范围大小
                long readContentLen = 0;
                if ((readContentLen + fileOperationProperties.getReadBufLenSize()) > downloadSize) {
                    bytes = new byte[Math.toIntExact(multiPartFileInfo.getLen() - readContentLen)];
                }
                while ((length = in.read(bytes)) !=-1 ) {
                    out.write(bytes,0,length);
                    readContentLen += length;
                }
                out.flush();
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                try {
                    if (in != null) {
                        in.close();
                    }
                    if (out != null) {
                        out.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }
        }

    }

Junit下载测试用例

@Test
    public void testFileUploadNotPart() {

        try {
            //Step1 查询分片信息
            MvcResult mvcResult = mockMvc.perform(MockMvcRequestBuilders.post("/fileOperation/getFileDetailInfo").param("fileId", "1"))
                    .andExpect(MockMvcResultMatchers.status().isOk())
                    .andDo(MockMvcResultHandlers.print())
                    .andReturn();
            String body = mvcResult.getResponse().getContentAsString();
            FileDetailInfo fileDetailInfo = new JSONObject().parseObject(body, FileDetailInfo.class);

            //创建临时存储文件夹
            File directFile = new File(fileOperationProperties.getTempDirect());
            if (!directFile.exists()) {
                directFile.mkdirs();
            }

            if (FileUtils.enoughFreeSpace(fileOperationProperties.getTempDirect(), (long) (fileDetailInfo.getSize() * 1.3))) {
                //Step2 下载附件
                if (fileDetailInfo.isMultipart()) {
                    //Step2.1 分片下载
                    int nThreads = fileOperationProperties.getNThreads();
                    int threadPoolQueueCapacity = fileOperationProperties.getThreadPoolQueueCapacity();
                    ExecutorService executorService = new ThreadPoolExecutor(nThreads, nThreads, 0L,
                            TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(threadPoolQueueCapacity));
                    //Step2.1.1 线程池中下载
                    //FIXME 优化:1、获取的分片信息是个有序集合,按照一定的顺序下载。-->及时处理已下载完成的分片,尽可能合理利用存储空间
                    //FIXME 优化:2、下载完成一个分片 合并处理一个分片  不适用阻塞
                    Queue<Future<FileUploadResultModel>> futureQueue = new ConcurrentLinkedQueue<Future<FileUploadResultModel>>();
                    for (MultiPartFileInfo multiPartFileInfo : fileDetailInfo.getMultiPartFileInfos()) {
                        Future<FileUploadResultModel> futreTask = executorService.submit(new FileJob(restTemplate,
                                multiPartFileInfo, fileOperationProperties));
                        futureQueue.add(futreTask);
                    }

                    int sizeOfSuccessPartFile = 0;
                    for (Future<FileUploadResultModel> resultModelFuture : futureQueue) {
                        FileUploadResultModel resultModel = resultModelFuture.get();
                        if (resultModel.isSuccess()) {
                            sizeOfSuccessPartFile += 1;
                        } else {
                            //下载失败处理--记录下载失败原因/重试下载
                        }
                    }

                    if (sizeOfSuccessPartFile == fileDetailInfo.getMultiPartFileInfos().size()) {
                        //Step2.1.2 下载成功 合并附件
                        String filePath = fileOperationProperties.getTempDirect() + File.separator + fileDetailInfo.getFileName();
                        //预创建与源文件相同大小的文件
                        File file = new File(filePath);
                        if (file.exists() && file.isFile()) {
                            file.delete();
                            file.createNewFile();
                        } else {
                            file.createNewFile();
                        }
                        //FIXME 此处使用多线程合并文件,提高合并处理效率
                        RandomAccessFile rFile = new RandomAccessFile(file, "rw");
                        rFile.setLength(fileDetailInfo.getSize());
                        for (Future<FileUploadResultModel> resultModelFuture : futureQueue) {
                            FileUploadResultModel fileUploadResultModel = resultModelFuture.get();
                            MultiPartFileInfo multiPartFileInfo = fileUploadResultModel.getMultiPartFileInfo();
                            //设置写入起始位置
                            rFile.seek(multiPartFileInfo.getOff());
                            byte[] bytes = new byte[fileOperationProperties.getReadBufLenSize()];
                            int length = 0;
                            File tempFile = new File(fileUploadResultModel.getLocalFilePath());
                            InputStream TempFileInputStream = new FileInputStream(tempFile);
                            while ((length = TempFileInputStream.read(bytes)) != -1) {
                                rFile.write(bytes, 0, length);
                            }
                            TempFileInputStream.close();
                            tempFile.delete();
                        }
                        //Step2.1.3 校验附件
                        if (FileUtils.checkFile(filePath, fileDetailInfo.getFileMd5(), fileDetailInfo.getSize())) {
                            log.info("附件下载成功!附件本地目录 {}", filePath);
                        }

                    }
                } else {
                    //step2.2 整个附件下载
                    MultiPartFileInfo multiPartFileInfo = new MultiPartFileInfo();
                    multiPartFileInfo.setFilePath(fileDetailInfo.getFilePath());
                    multiPartFileInfo.setOff(0);
                    multiPartFileInfo.setFileName(fileDetailInfo.getFileName());
                    multiPartFileInfo.setFileMd5(fileDetailInfo.getFileMd5());
                    multiPartFileInfo.setLen(fileDetailInfo.getSize());
                    //下载附件
                    FileJob fileJob = new FileJob(restTemplate,
                            multiPartFileInfo, fileOperationProperties);
                    FileUploadResultModel resultModel = fileJob.uploadFile();
                    //step3 校验附件MD5
                    if (FileUtils.checkFile(resultModel.getLocalFilePath(), fileDetailInfo.getFileMd5(), fileDetailInfo.getSize())) {
                        log.info("附件下载成功!附件本地目录 {}", resultModel.getLocalFilePath());
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

结尾

以上是整个分片下载、断点续传的实现原理及其实现。

在上诉实现中还有可优化的点

  • 获取的分片信息时使用有序集合,按照一定的顺序下载。-->及时处理已下载完成的分片,尽可能合理利用存储空间

  • 下载完成一个分片 合并处理一个分片 不使用阻塞,减少磁盘空间的占用

  • 可使用多线程处理附件合并

源码地址

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,470评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,393评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,577评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,176评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,189评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,155评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,041评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,903评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,319评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,539评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,703评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,417评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,013评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,664评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,818评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,711评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,601评论 2 353