本文地址://www.greatytc.com/p/58781f28904f
在抓取数据的过程中,主要要做的事就是从杂乱的数据中提取出结构化的数据。Scrapy
的Spider
可以把数据提取为一个Python中的字典,虽然字典使用起来非常方便,对我们来说也很熟悉,但是字典有一个缺点:缺少固定结构。在一个拥有许多爬虫的大项目中,字典非常容易造成字段名称上的语法错误,或者是返回不一致的数据。
所以Scrapy
中,定义了一个专门的通用数据结构:Item
。这个Item
对象提供了跟字典相似的API,并且有一个非常方便的语法来声明可用的字段。
声明Item
Item
的声明在items.py
这个文件中,声明Item
时,需要从scrapy.Item
继承:
import scrapy
class Product(scrapy.Item):
name = scrapy.Field()
price = scrapy.Field()
stock = scrapy.Field()
last_updated = scrapy.Field(serializer=str)
在声明这个Item
时,需要同时声明Item
中的字段,声明的方式类似于设置类属性。熟悉Djiango
的话那么你会注意到这与Django Models
非常相似,但是没有多种Field
类型,更加简单。
使用:
In [1]: product = Product(name='Desktop PC', price=1000)
In [2]: product
Out[2]: {'name': 'Desktop PC', 'price': 1000}
Field对象
可以看到,在声明Item
时,声明字段使用的是Field
对象。这个Field
对象其实完全继承自Python的字典,并且没有做任何改动,所以在使用Field
声明字段时,可以传入数据作为这个字段的元数据(metadata
),上方的serializer=str
其实就是一个指定序列化函数的元数据。
字段的元数据与字段的值之间没有必然的联系。如果我们直接查看Item
对象,那么获取的是字段的值;
In [3]: product
Out[3]: {'name': 'Desktop PC', 'price': 1000}
如果使用.fields
,获取的就是字段的元数据了:
In [4]: product.fields
Out[4]: {'last_updated': {'serializer': str}, 'name': {}, 'price': {}, 'stock': {}}
使用方式
由于Item
有跟字典类似的API,所以很多时候可以像字典一样使用:
# 可以像字典一样用字段名取值
In [5]: product['name']
Out[5]: 'Desktop PC'
# 可以使用get方法
In [6]: product.get('name')
Out[6]: 'Desktop PC'
# 可以在获取字段没有值时,设置默认返回的值
In [7]: product.get('last_updated', 'not set')
Out[7]: 'not set'
# 可以像字典一样对存在的字段赋值
In [8]: product['last_updated'] = 'today'
In [9]: product['last_updated']
Out[9]: today
但是有一点区别的是,如果对Item
没有声明的字段操作,会抛出异常:
# 获取没有声明的字段
In [10]: product['lala']
Traceback (most recent call last):
...
KeyError: 'lala'
# 可以对未声明字段使用get方法,设置默认返回的值
In [11]: product.get('lala', 'unknown field')
Out[11]:'unknown field'
# 给没有声明的字段赋值
In [12]: product['lala'] = 'test' # setting unknown field
Traceback (most recent call last):
...
KeyError: 'Product does not support field: lala'
还可以直接从Dict直接创建Item
:
In [13]: Product({'name': 'Laptop PC', 'price': 1500})
Out[13]: Product(price=1500, name='Laptop PC')
In [14]: Product({'name': 'Laptop PC', 'lala': 1500}) # warning: unknown field in dict
Traceback (most recent call last):
...
KeyError: 'Product does not support field: lala'
如果需要扩展或者修改某一个Item
类的话,可以使用继承的方式:
class DiscountedProduct(Product):
discount_percent = scrapy.Field(serializer=str)
discount_expiration_date = scrapy.Field()
class SpecificProduct(Product):
name = scrapy.Field(Product.fields['name'], serializer=my_serializer)
Item Pipeline
在Spider
中返回一个Item
后,这个Item
将会被发送给Item Pipeline
,其主要有以下几种作用:
- 清洗数据
- 验证抓取下来的数据(检查是否含有某些字段)
- 检查去重
- 存储数据到数据库
每个Item Pipeline
都是一个Python类,实现了几个简单的方法。
启用Item Pipeline
启用方式与Middleware
基本相同,优先级的值越小,越先被调用。
ITEM_PIPELINES = {
'myproject.pipelines.PricePipeline': 300,
'myproject.pipelines.JsonWriterPipeline': 800,
}
自定义Item Pipeline
自定义Item Pipeline
必须实现以下这个方法:
-
process_item(self, item, spider)
每一个
Item Pipeline
都会调用这个方法,用来处理Item
。参数:
-
item(
Item
对象或者Dict
) - 抓取的Item
。 -
spider(
Spider
对象) - 抓取这个Item
的Spider
。
这个方法需要返回以下两种返回值的一种:
-
Item或者Dict
Scrapy
将会继续调用接下来的Item Pipeline
组件处理下去。 -
抛出一个DropItem异常
将会不再继续调用接下来的
Item Pipeline
。
-
item(
还可以实现以下几种方法之一来实现某些功能:
-
open_spider(self, spider)
这个方法将会在
Spider
打开时调用。 -
close_spider(self, spider)
这个方法将会在
Spider
关闭时调用。 -
from_crawler(cls, crawler)
如果存在这个方法,那么就会调用这个类方法来从
Crawler
创建一个Pipeline
实例。这个方法必须返回一个Pipeline
的新实例。其中的
Crwaler
对象提供了可以访问到Scrapy
核心部件的路径,比如settings和signals。Pipeline
实例可以通过这种方式来连接他们。
Item Pipeline例子
1.验证数据
如果Item
中包含price_excludes_vat
属性,就调整数据中的price
属性,并抛弃那些不包含price
属性的Item。
from scrapy.exceptions import DropItem
class PricePipeline(object):
vat_factor = 1.15
def process_item(self, item, spider):
if item['price']:
if item['price_excludes_vat']:
item['price'] = item['price'] * self.vat_factor
return item
else:
raise DropItem("Missing price in %s" % item)
2.写入Item到JSON文件
import json
class JsonWriterPipeline(object):
def open_spider(self, spider):
self.file = open('items.jl', 'w')
def close_spider(self, spider):
self.file.close()
def process_item(self, item, spider):
line = json.dumps(dict(item)) + "\n"
self.file.write(line)
return item
3.写入Item到MongoDB
import pymongo
class MongoPipeline(object):
collection_name = 'scrapy_items'
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
self.db[self.collection_name].insert_one(dict(item))
return item
4.去重
from scrapy.exceptions import DropItem
class DuplicatesPipeline(object):
def __init__(self):
self.ids_seen = set()
def process_item(self, item, spider):
if item['id'] in self.ids_seen:
raise DropItem("Duplicate item found: %s" % item)
else:
self.ids_seen.add(item['id'])
return item
系列文章: