python多线程实现S3文件分段上传下载

对于大对象的存取,s3提供了分段上传/下载的接口,基于此,可以进一步实现多线程并行传输或者断点续传等功能。

本实现使用了亚马逊的boto库
https://pypi.python.org/pypi/boto

以及filechunkio库
https://pypi.python.org/pypi/filechunkio/

1.分段上传

为了分段上传一个大文件,需要先将文件分段,然后使用云盘提供的Multipart接口上传每个分段即可,最后云盘将在后端把所有分段合并成一个Object。
下面的例子中使用了FileChunkIO分段读取文件:

chunksize=4096*1024
chunkcnt=int(math.ceil(filesize*1.0/chunksize))
mp=bucket.initiate_multipart_upload("object-1")  #创建Multipart对象
for i in range(0,chunkcnt):
offset=chunksize*i
len=min(chunksize,filesize-offset)
fp=FileChunkIO(“/path/to/file”,'r',offset=offset,bytes=len)  #创建文件的分段
mp.upload_part_from_file(fp,part_num=i+1)  #上传每个分段
mp.complete_upload()

完成分段上传之后,需要使用Multipart的complete_upload()或者cancel_upload()结束分段上传,释放Multipart占用的资源。

2.分段下载

为了使用分段下载,需要指定分段在文件中的起始偏移地址和终止偏移地址,然后构造包含Range报文头的HTTP Get请求下载相应的分段。
示例如下:

chunksize=4096*1024
chunkcnt=int(math.ceil(filesize*1.0/chunksize))
for i in range(0,chunkcnt):
 offset=chunksize*i
 len=min(chunksize,filesize-offset)
 resp=conn.make_request("GET",bucket.name,filename,headers={"Range":"bytes=%d-%d" % (offset,offset+len)})
 data=resp.read(len)
 if data == "":
    break
 fp.write(data)

3.多线程的完整实现

import shutil
import math
import string
import io
from io import BytesIO
import os
from os import path
import sys
import traceback
import boto
import boto.s3.connection
from filechunkio import FileChunkIO
import threading
import Queue
import time

class Chunk:
    num = 0
    offset = 0
    len = 0
    def __init__(self, n, o, l):  
        self.num = n
        self.offset = o
        self.len = l

chunksize = 8 << 20

def init_queue(filesize):
    chunkcnt = int(math.ceil(filesize*1.0/chunksize))
    q = Queue.Queue(maxsize = chunkcnt)
    for i in range(0,chunkcnt):
        offset = chunksize*i
        len = min(chunksize, filesize-offset)
        c = Chunk(i+1, offset, len)
        q.put(c)
    return q

def upload_chunk(filepath, mp, q, id):
    while (not q.empty()):
        chunk = q.get()
        fp = FileChunkIO(filepath, 'r', offset=chunk.offset, bytes=chunk.len)
        mp.upload_part_from_file(fp, part_num=chunk.num)
        fp.close()
        q.task_done()

def upload_file_multipart(filepath, keyname, bucket, threadcnt=8):
    filesize = os.stat(filepath).st_size
    mp = bucket.initiate_multipart_upload(keyname)
    q = init_queue(filesize)
    for i in range(0, threadcnt):
        t = threading.Thread(target=upload_chunk, args=(filepath, mp, q, i))
        t.setDaemon(True)
        t.start()
    q.join()
    mp.complete_upload()

def download_chunk(filepath, bucket, key, q, id):
    while (not q.empty()):
        chunk = q.get()
        offset = chunk.offset
        len = chunk.len
        resp = bucket.connection.make_request("GET", bucket.name, key.name, headers={"Range":"bytes=%d-%d" % (offset, offset+len)})
        data = resp.read(len)
        fp = FileChunkIO(filepath, 'r+', offset=offset, bytes=len)
        fp.write(data)
        fp.close()
        q.task_done()

def download_file_multipart(key, bucket, filepath, threadcnt=8):
    if type(key) == str:
        key=bucket.get_key(key)
    filesize=key.size
    if os.path.exists(filepath):
        os.remove(filepath)
    os.mknod(filepath)
    q = init_queue(filesize)
    for i in range(0, threadcnt):
        t = threading.Thread(target=download_chunk, args=(filepath, bucket, key, q, i))
        t.setDaemon(True)
        t.start()
    q.join()

access_key = "test"
secret_key = "123456"
host = "*****"

filepath = "/search/2G.file"
keyname = "2G.file"

threadcnt = 8

conn = boto.connect_s3(
    aws_access_key_id = access_key,
    aws_secret_access_key = secret_key,
    host = host,
    is_secure=False,
    calling_format = boto.s3.connection.OrdinaryCallingFormat(),
    )

bucket = conn.get_bucket("test")

time1= time.time()
upload_file_multipart(filepath, keyname, bucket, threadcnt)
time2= time.time()
print "upload %s with %d threads use %d seconds" % (keyname, threadcnt, time2-time1)

key = bucket.get_key(keyname)

download_filepath = path.join(".", keyname)
time1= time.time()
download_file_multipart(key, bucket, download_filepath, threadcnt)
time2= time.time()
print "download %s with %d threads use %d seconds" % (keyname, threadcnt, time2-time1)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • # Python 资源大全中文版 我想很多程序员应该记得 GitHub 上有一个 Awesome - XXX 系列...
    aimaile阅读 26,563评论 6 427
  • 本文包括:1、文件上传概述2、利用 Commons-fileupload 组件实现文件上传3、核心API——Dis...
    廖少少阅读 12,601评论 5 91
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,915评论 18 139
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 173,242评论 25 708
  • 志行族薯条分享: 第一天: 下午骑马路上,我张开双手举过头顶,抬起头,闭上眼睛迎着太阳,感觉自己想鸟儿一样在空中飞...
    路西法妈妈阅读 329评论 0 1