闲的无聊爬了下维基百科有关古罗马的数据,爬取模式是分布式+增量爬取。数据爬完了项目却没有停手,因为个人兴趣开始研究python3.5加入的异步特性,经过一段时间的添添补补,一个简单的小爬虫就这样诞生了~
本框架基于asyncio,aiohttp及redis(分布式模式需要)。目前已上架git和pypi,名字取自毕生对抗罗马共和国的迦太基名将汉尼拔。
git地址:JorgenLiu/hannibal
闲话少说,下面是框架的介绍。
Mission
框架以每个url作为一个爬取单位,使用时需将url封装成任务Mission,序列化后放置入爬取队列。
关于此种设计思路的解释:
诚然大部分爬取工作只是不同的url,但目前遇到的一种情况是对某个相同的url通过POST不同的参数来获取不同的数据,故而将url封装成Mission用来爬取。
Mission类的定义:
class Mission(object):
def __init__(self, unique_tag, url, method='GET', data=None, data_type='data'):
self.unique_tag = str(unique_tag)
self.url = url
if method not in ['GET', 'POST', 'PUT', 'DELETE']:
raise ValueError('invalid method')
else:
self.method = method
self.data = data
if data_type not in ['json', 'data']:
raise ValueError('invalid data type')
else:
self.data_type = data_type
unique_tag字段取代了url,作为每个任务的唯一标识。Mission类实现了serialize和deserialize方法,分别负责对任务进行序列化和反序列化,并分别在存入和取出任务时进行调用。以下为定义Mission的示例:
方法为GET时:
Mission(unique_tag='1', url='http://httpbin.org/get?t=1')
方法为POST时:
Mission(unique_tag='4377', url='https://www.xytzq.cn:9443/tzq/pc/project/getProjectInfo',
method='POST',
data={'projectid': 4377})
Mission定义后,需加入采集队列内。可通过调用队列的 init_queue([mission_list]) (初始化queue,将一个Mission列表序列化后加入队列)方法或enqueue (将单个Mission加入队列)方法加入队列。
Collector
Collector类负责进行网页内容的爬取,目前实现的模式有两种:
- 根据预设好的url列表进行抓取。
- 根据一个起点url,抓取后进行解析,将解析出的url列表添加至爬取队列中进行增量抓取。
目前已实现两种Collector:
- LocalCollector,本地运行的单进程采集节点,声明时需传入一个异步的解析函数,负责对页面内容进行解析。
- DistributeCollector,通过redis队列与解析节点进行交互的分布式采集节点,只负责爬取页面,并将爬取的页面内容放入解析队列,由解析节点进行解析。
声明LocalCollector需要以下参数:
- mission_queue,任务队列
- href_pool,链接去重池
- parser_function,一个异步函数,接受一个response对象,可通过调用extract_json或extract_html方法从response对象中提取json或普通html数据。LocalCollector不需要额外的解析节点,会调用这个异步的解析函数对爬取的响应体进行解析,从中提取数据进行操作。
- cache_size,默认为3,控制请求并发数。
DistributeCollector声明所需参数和LocalCollector基本一致,但不需要定义parser_function,取而代之的是一个额外的parse_queue,爬取的结果会被序列化后放入parse_queue这个解析队列中,等待解析节点进行解析。
Collector支持注册错误处理器。注册错误处理器需传入一个错误状态码(int)和一个错误处理函数。错误处理函数接受一个response对象和一个url,可对错误进行处理。
Collector支持注册爬取前中间件。中间件是一个同步函数,接受一个mission对象,将在爬取前执行。可注册多个中间件,执行顺序与注册顺序相同。
以下是声明一个爬取httpbin的LocalCollector的案例:
from hannibal.spider import LocalCollector
from hannibal.util import MemPool, MemQueue, extract_json, Mission
pool = MemPool(name='http_bin')
queue = MemQueue(name='http_bin', limited=True)
async def collect_function(response):
json_obj = await extract_json(response)
print(json_obj)
def demo_handler(response, url):
print('handel 400')
def demo_before_collect_middleware(mission):
print('this is url: %s' % mission.url)
def demo_before_collect_middleware1(mission):
print('this is tag: %s' % mission.unique_tag)
def ping_http_bin():
url_list = [Mission(unique_tag=i, url='http://httpbin.org/get?t=%d' % i) for i in range(1, 500)]
queue.init_queue(url_list)
collector = LocalCollector(mission_queue=queue, href_pool=pool, parse_function=collect_function, cache_size=10)
collector.register_middleware(demo_before_collect_middleware)
collector.register_middleware(demo_before_collect_middleware1)
collector.register_error_handler(400, demo_handler)
collector.conquer()
if __name__ == '__main__':
ping_http_bin()