这是我的公众号获取原创保护的首篇文章,原创的肯定将支持我继续前行。现在写这篇文章的时间是晚上11:30,写完就回寝室休息了,希望更多的朋友与我一起同行(当然需要一个善良的妹子的救济)。(我的新书《Python爬虫开发与项目实战》发布了,大家在这里可以看到样章)
好了,废话不多说,咱们进入今天的主题。上一篇咱们讲解了代理ip上篇,本篇咱们继续讲解代理ip。这一篇是上一篇的扩展和优化,主要的改动是使用scrapy来进行爬取代理ip,同时演示在scrapy框架中怎么使用mongodb数据库,最后使用多线程批量验证代理ip的合理性,大大加快了速度。
这次我选择的依然是www.xicidaili.com/nn/,我之后打算做一个大的代理ip池,方便之后做分布式爬虫。
使用firebug审查元素,查看如何解析html,上一篇我已经讲过了,所以就不详细说了,大家不明白的可以看看代理ip上篇。
下面咱们可以写代码了,由于咱们使用的是scrapy框架进行爬取,所以首先先生成scrapy工程,在cmd中 输入scrapy startproject proxySpider_scrapy,然后使用pycharm打开。
工程结构如下:
db包中db_helper:实现的是mongodb的增删改查。和代理ip上篇增加了proxyId字段。
detect包中
1.detect_proxy:验证代理ip的可用性的线程
2.detect_manager: 用来管理验证线程,监控线程状态
spiders包中 proxySpider:主要实现爬虫的逻辑和html解析
items:主要是描述了ip和port
pipelines:里面主要是将爬取到的ip和port存储到数据库中
main:主要是完成参数的判断和爬虫的启动(咱们使用脚本来启动爬虫不使用命令行)
还要说一下检测:我是用 ip.chinaz.com/getip.aspx作为检测网址,只要使用代理访问不超时,而且响应码为200,咱们就认为是成功的代理。
接下来运行程序看看效果:
在windows下切换到工程目录,运行python main.py -h,会看到我定义的使用说明和参数设置。和上一篇基本上完全一样。
接着运行python main.py -c 1 5 (意思是爬取1-5页的ip地址):
这时候如果想验证ip的正确性:运行python main.py -t db
使用多线程验证的速度非常快,我设置了5个线程。两分钟不到,就验证结束。124个ip是可以使用的。
看一下mongodb数据库:
大家注意到那个proxyId字段了吗?这个在我们进行多线程分段验证的时候是很有用的。详细的使用,请看代码。
** 当咱们下一篇讲解突破反爬虫的时候就可以使用这些ip了**。
下面把解析和验证的代码贴一下:
proxySpider.py:
#coding:utf-8
import scrapy
from proxySpider_scrapy.db.db_helper import DB_Helper
from proxySpider_scrapy.detect.detect_proxy import Detect_Proxy
from proxySpider_scrapy.detect.detect_manager import Detect_Manager
from proxySpider_scrapy.items import ProxyItem
'''
这个类的作用是将代理数据进行爬取
'''
class ProxySpider(scrapy.Spider):
name = 'proxy'
start_urls = ["http://www.xicidaili.com/nn/"]
allowed_domains = []
db_helper = DB_Helper()
detecter = Detect_Manager(5)
Page_Start = 1
Page_End = 4
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en',
'Referer':'http://www.xicidaili.com/'
}
def parse(self, response):
'''
解析出其中的ip和端口
:param response:
:return:
'''
trs = response.xpath('//tr[@class="odd" or @class=""]')
for tr in trs:
item = ProxyItem()
tds = tr.xpath('./td/text()').extract()
for td in tds:
content = td.strip()
if len(content)>0:
if content.isdigit():
item['port'] = content
print 'ip:',item['ip']
print 'port:',item['port']
break
if content.find('.')!= -1:
item['ip'] = content
yield item
if self.Page_Start < self.Page_End:
new_url = self.start_urls[0]+str(self.Page_Start)
self.Page_Start += 1
yield scrapy.Request(new_url,headers=self.headers,callback=self.parse)
pipelines.py:
# -*- coding: utf-8 -*-
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: http://doc.scrapy.org/en/latest/topics/item-pipeline.html
from proxySpider_scrapy.spiders.proxySpider import ProxySpider
class ProxyPipeline(object):
proxyId = 1 #设置一个ID号,方便多线程验证
def process_item(self, item, spider):
'''
:param item:
:param spider:
:return:
'''
if spider.name == 'proxy':#需要判断是哪个爬虫
proxySpider = ProxySpider(spider)
proxy = {'ip':item['ip'],'port':item['port']}
proxy_all = {'ip':item['ip'],'port':item['port'],'proxyId':self.proxyId}
if proxySpider.db_helper.insert(proxy,proxy_all) == True:#插入数据
self.proxyId += 1
return item
else:
return item
detect_manager.py:
#coding:utf-8
from threading import Thread
import time
from proxySpider_scrapy.db.db_helper import DB_Helper
from proxySpider_scrapy.detect.detect_proxy import Detect_Proxy
'''
定义一个管理线程,来管理产生的线程
'''
class Detect_Manager(Thread):
def __init__(self,threadSum):
Thread.__init__(self)
sqldb = DB_Helper()#将序号重新恢复
sqldb.updateID()
self.pool =[]
for i in range(threadSum):
self.pool.append(Detect_Proxy(DB_Helper(),i+1,threadSum))
def run(self):
self.startManager()
self.checkState()
def startManager(self):
for thread in self.pool:
thread.start()
def checkState(self):
'''
这个函数是用来检测线程的状态
:return:
'''
now = 0
while now < len(self.pool):
for thread in self.pool:
if thread.isAlive():
now = 0
break
else:
now+=1
time.sleep(0.1)
goodNum=0
badNum =0
for i in self.pool:
goodNum += i.goodNum
badNum += i.badNum
sqldb = DB_Helper()#将序号重新恢复
sqldb.updateID()
print 'proxy good Num ---',goodNum
print 'proxy bad Num ---',badNum
detect_proxy.py:
#coding:utf-8
import socket
from threading import Thread
import urllib
'''
这个类主要是用来检测代理的可用性
'''
class Detect_Proxy(Thread):
url = 'http://ip.chinaz.com/getip.aspx'
def __init__(self,db_helper,part,sum):
Thread.__init__(self)
self.db_helper = db_helper
self.part = part#检测的分区
self.sum = sum#检索的总区域
self.counts = self.db_helper.proxys.count()
socket.setdefaulttimeout(2)
self.__goodNum = 0
self.__badNum = 0
@property
def goodNum(self):
return self.__goodNum
@goodNum.setter
def goodNum(self,value):
self.__goodNum = value
@property
def badNum(self):
return self.__badNum
@badNum.setter
def badNum(self,value):
self.__badNum = value
def run(self):
self.detect()#开始检测
def detect(self):
'''
http://ip.chinaz.com/getip.aspx 作为检测目标
:return:
'''
if self.counts < self.sum:
return
pre = self.counts/self.sum
start = pre * (self.part-1)
end = pre * self.part
if self.part == self.sum:#如果是最后一部分,结束就是末尾
end = self.counts
# print 'pre-%d-start-%d-end-%d'%(pre,start,end)
proxys = self.db_helper.proxys.find({'proxyId':{'$gt':start,'$lte':end}})#大于start小于等于end,很重要
for proxy in proxys:
ip = proxy['ip']
port = proxy['port']
try:
proxy_host ="http://ha:ha@"+ip+':'+port #随便添加了账户名和密码,只是为了防止填写账户名密码暂停的情况
response = urllib.urlopen(self.url,proxies={"http":proxy_host})
if response.getcode()!=200:
self.db_helper.delete({'ip':ip,'port':port})
self.__badNum += 1
print proxy_host,'bad proxy'
else:
self.__goodNum += 1
print proxy_host,'success proxy'
except Exception,e:
print proxy_host,'bad proxy'
self.db_helper.delete({'ip':ip,'port':port})
self.__badNum += 1
continue
完整的代码我已经上传到github上
今天的分享就到这里,已经晚上12:15了,如果大家觉得还可以呀,请打赏我。提前透露一下,下一篇会讲解突破反爬虫。