该PDF下载器支持从本地txt文件中读取和数据库读取两种方式
数据库ip我乱写的,这里集成了三种情况的数据库(本地数据库,需要验证的数据库,不需要验证的数据库)
# -*- coding: utf-8 -*-
"""
此程序用来多线程下载PDF,提供本地txt文件读取下载和数据库读取下载两种方式
"""
import hashlib
import os
import sys
from multiprocessing import Queue
from threading import Thread, Lock
import pymongo
import requests
g_num = 0 # 创建全局变量
lock = Lock() # 创建全局互斥锁
class DownloadPDF(object):
def __init__(self):
# 创建消息队列
self.q = Queue()
def run(self):
self.mkdir()
self.get_pdf_msg()
pdf_size = self.q.qsize() # 总PDF数
# 开启多线程
all_th = []
for i in range(10):
th = Thread(target=self.download, args=(pdf_size,))
th.start()
all_th.append(th)
for th in all_th:
th.join()
print('\n' + '下载完成!')
def download(self, pdf_size):
while True:
try:
# 获取PDF信息
each_pdf = self.q.get(timeout=2) # 设置超时时间2s。这里如果不设置超时,当队列为空时,q.get()会进入阻塞状态
except:
# 这里队列已经为空,取不到数据就跳出循环
break
# 计数
global g_num
lock.acquire() # 上锁
g_num += 1
# 打印下载进度
sys.stdout.write('\r' + '正在下载:%s / %s' % (g_num, pdf_size))
path = ''
pdf_url = ''
md5_later = ''
pdf_name = '' # 如果没有PDF名,置为空
if is_txt:
# 选择了txt文件方式,只有一个链接
pdf_url = each_pdf.strip()
href_name = pdf_url.split('/')[-1] # 取链接 / 后的值作为PDF名称
if '.pdf' in href_name:
href_name = href_name.split('.')[0]
md5_later = self.pdf_name_md5(href_name) # pdf转码之后的名称
path = 'PDF/' + md5_later + '.pdf'
elif not is_txt:
# 从数据库中读取PDF信息,数据库中包含两个字段(pdf_name, pdf_url)
pdf_name = each_pdf['pdf_name']
pdf_url = each_pdf['pdf_url']
md5_later = self.pdf_name_md5(pdf_name)
path = 'PDF/' + md5_later + '.pdf'
# 下载之前先去重
try:
# 尝试获取该路径下pdf的size,如果还未下载则置为0
size = os.path.getsize(path)
except:
size = 0
# 判断已经下载并且size > 0则为成功下载
if os.path.exists(path) and size != 0:
# 路径存在说明下载过
lock.release() # 释放锁
continue
# 下载PDF
self.download_pdf(pdf_name, each_pdf, path, pdf_url, md5_later)
lock.release() # 释放锁
@staticmethod
def download_pdf(pdf_name, each_pdf, path, pdf_url, md5_later):
"""
下载PDF的主要实现程序
:param pdf_name:
:param each_pdf:
:param path:
:param pdf_url:
:param md5_later:
:return:
"""
try:
# 开始下载
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)'
' Chrome/76.0.3809.100 Safari/537.36'}
r = requests.get(url=pdf_url, headers=headers, timeout=(120, 600)) # 设置connect超时2分钟,read超时10分钟
f = open(path, 'wb')
f.write(r.content)
f.close()
r.close()
s = requests.session()
s.keep_alive = False
if is_insert == 'y':
# 下载成功,插入数据库
if not pdf_name:
msg = {'url': pdf_url, 'md5_name': md5_later, 'relative_path': path}
else:
msg = {'url': pdf_url, 'origin_name': pdf_name, 'md5_name': md5_later, 'relative_path': path}
new_collection.insert(msg)
except:
# 下载失败
print('\r' + '下载失败:%s' % str(each_pdf))
with open('error.txt', 'r') as fr:
content = fr.readlines()
if str(each_pdf) not in content:
with open('error.txt', 'a') as fe:
fe.writelines(str(each_pdf) + '\n')
@staticmethod
def mkdir():
"""
创建下载失败文件、去重文件、PDF文件夹
:return:
"""
if not os.path.exists('error.txt'):
with open('error.txt', 'w') as fr:
fr.write('')
if not os.path.exists('PDF'):
os.mkdir('PDF')
def client_dbs(self):
"""
连接数据库,获取所有pdf信息
:return:
"""
ip = self.choose_ip()
client = pymongo.MongoClient('mongodb://%s:27017' % ip)
# 这里因为该数据库开了验证,所以这里进行判断验证(ip我乱写的)
if ip == '192.168.0.123':
client['admin'].authenticate('用户名', '密码') # 数据库验证(用户名密码根据自己数据库修改)
all_db_names = client.list_database_names()
# 规范打印数据库名
self.pr_datas(all_db_names)
db_name_num = input('\r' + '请选择数据库对应序号:')
db_name = self.choose_db_or_col(all_db_names, db_name_num, '数据库')
while True:
db = client[db_name]
all_col = db.collection_names()
if not all_col:
db_name = input('没有这个数据库,请重新输入:')
else:
break
# 规范打印集合名
self.pr_datas(all_col)
col_num = input('\r' + '请选择集合对应的序号:')
col_name = self.choose_db_or_col(all_col, col_num, '集合')
collection = db[col_name]
return collection, db
@staticmethod
def choose_ip():
"""
选择ip
:return:
"""
ip = input('请选择数据库:' + '\n' + '1. 本地数据库 2. 123数据库 3. 200数据库' + '\n')
while True:
if ip == '1':
ip = '127.0.0.1'
break
elif ip == '2':
ip = '192.168.0.123'
break
elif ip == '3':
ip = '192.168.0.200'
break
else:
ip = input('请正确输入1 或者 2 或者 3:')
return ip
@staticmethod
def choose_db_or_col(name, name_num, d_or_c):
"""
选择数据库或者集合
:return:
"""
while True:
try:
int(name_num)
except ValueError:
name_num = input('请输入正确的%s序号:' % d_or_c)
if 0 <= int(name_num) <= len(name):
break
else:
name_num = input('请输入正确的%s序号:' % d_or_c)
for j, e_name in enumerate(name, 1):
if name_num == str(j):
choose_name = e_name
return choose_name
@staticmethod
def pr_datas(names):
"""
规范打印数据库/集合名称
:param names:
:return:
"""
for i, each_name in enumerate(names, 1):
print(str(i) + '. ' + each_name + ' ' * (40 - len(each_name) - len(str(i))), end='')
if i % 3 == 0:
print('\n')
if len(names) < 3:
print('\n')
def get_pdf_msg(self):
"""
将pdf信息存入消息队列
:return:
"""
all_urls = self.choose_way()
for i in all_urls:
self.q.put(i)
def choose_way(self):
"""
选择PDF的url来源,是在txt文件中还是从数据库中读取
:return:
"""
print('*' * 100)
print(
'注意事项:' + '\n' + '1. 如果选择了txt文件方式下载,txt内容必须为一行一个url格式' + '\n' +
'2. 如果选择了从数据库中读取PDF信息下载,则数据库中包含两个字段(a. 存放url的字段,b. 该PDF的名字字段(可以没有)),其中存放url的字段不能嵌套,只能有一个url' + '\n' +
'3. txt文件只有链接,默认取链接最后 "/" 后的值作为PDF名' + '\n' +
'4. 数据库方式如果没有指定PDF名字,则默认以链接最后一个 / 后的内容作为PDF名')
print('*' * 100)
global is_txt, is_insert, new_collection
the_way = input('请选择PDF的来源(输入1 / 2):' + '\n' + '1. txt文件 2. 从数据库中读取' + '\n')
while True:
if the_way == '1' or the_way == '2':
break
else:
the_way = input('请正确输入1 或者 2:')
urls = []
if the_way == '1':
# txt文件
is_txt = True
print('说明:txt文件中必须为一行一个url!')
txt_name = input('请输入与该程序同级目录下的txt文件名称(如:123.txt):')
while True:
try:
with open(txt_name, 'r') as f:
urls = f.readlines()
break
except FileNotFoundError:
txt_name = input('没有这个txt文件,请重新输入:')
elif the_way == '2':
# 数据库
collection, db = self.client_dbs() # 连接数据库
url_field = input('请输入PDF的url字段名:')
name_field = input('请输入PDF的name字段名(如果没有则不输入):')
if not name_field:
is_txt = True
for d in collection.find():
if d[url_field]:
urls.append(d[url_field])
else:
is_txt = False
for d in collection.find():
if d[url_field]:
msg = {'pdf_url': d[url_field], 'pdf_name': d[name_field]}
urls.append(msg)
# 选择是否要插入数据的数据库
is_insert = input('是否要将下载的PDF信息写入到新的数据库(输入y / n,不输入或输入其他默认不写入):')
if is_insert == 'y':
print('选择插入数据库,字段包含(pdf_url, md5_name, relative_path)')
ip = self.choose_ip()
client = pymongo.MongoClient('mongodb://%s:27017' % ip)
db_name = input('请输入要存入的数据库名(可新建):')
db = client[db_name]
if ip == '192.168.0.123':
db.authenticate(name='用户名', password='密码', source='admin')
new_col = input('请输入要插入的集合名称:')
new_collection = db[new_col]
return urls
@staticmethod
def pdf_name_md5(pdf):
"""
将pdf名字转为md5
:param pdf:
:return:
"""
md = hashlib.md5()
md.update(pdf.encode('utf-8'))
pdf_name = md.hexdigest()
return pdf_name
if __name__ == '__main__':
download = DownloadPDF()
download.run()