Fork me on GitHub

Scrapy中如何提高数据的插入速度

这是崔斯特的第三十九篇原创文章

长期更新 (๑• . •๑)

速度问题

最近工作中遇到这么一个问题,全站抓取时采用分布式:爬虫A与爬虫B,爬虫A给爬虫B喂饼,爬虫B由于各种原因运行的比较慢,达不到预期效果,所以必须对爬虫B进行优化。

提升Scrapy运行速度有很多方法,国外有大佬说过

Speed up web scraper

Here’s a collection of things to try:

  1. use latest scrapy version (if not using already)
  2. check if non-standard middlewares are used
  3. try to increase CONCURRENT_REQUESTS_PER_DOMAIN, CONCURRENT_REQUESTS settings (docs)
    turn off logging LOG_ENABLED = False (docs)
  4. try yielding an item in a loop instead of collecting items into the items list and returning them
    use local cache DNS (see this thread)
  5. check if this site is using download threshold and limits your download speed (see this thread)
    log cpu and memory usage during the spider run - see if there are any problems there
  6. try run the same spider under scrapyd service
  7. see if grequests + lxml will perform better (ask if you need any help with implementing this solution)
  8. try running Scrapy on pypy, see Running Scrapy on PyPy

大致看了下,确实可以提高爬虫运行速度,但是对于海量数据(这里说的是百万级)还需要考虑一点的就是数据插入问题,这里我们使用的是 Mongo。

官方示例

让我们先从官方文档开始 Write items to MongoDB

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import pymongo
class MongoPipeline(object):
collection_name = 'scrapy_items'
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
self.db[self.collection_name].insert_one(dict(item))
return item

比较简单,这里插入使用的方法是 insert_one,继续文档:

insert_one(document, bypass_document_validation=False, session=None)

Insert a single document.

1
2
3
4
5
6
7
>>> db.test.count({'x': 1})
0
>>> result = db.test.insert_one({'x': 1})
>>> result.inserted_id
ObjectId('54f112defba522406c9cc208')
>>> db.test.find_one({'x': 1})
{u'x': 1, u'_id': ObjectId('54f112defba522406c9cc208')}

以前经常使用的 insert 方法,已经不被赞同

insert(doc_or_docs, manipulate=True, check_keys=True, continue_on_error=False, **kwargs)

Insert a document(s) into this collection.

1
2
3
DEPRECATED - Use insert_one() or insert_many() instead.
Changed in version 3.0: Removed the safe parameter. Pass w=0 for unacknowledged write operations.

insert 简单理解就是插入,把我们采集到的 item 插入到数据库,这样存在一个很严重的问题,就是去重

去重

晚上有一种很流行的写法,使用 update命令,如:

self.db[self.collection_name].update({'id': item['id']}, {'$set': dict(item)}, True)

解释为:

比较重要的一点就在于process_item,在这里使用了update方法,第一个参数传入查询条件,这里使用的是id,第二个参数传入字典类型的对象,就是我们的item,第三个参数传入True,这样就可以保证,如果查询数据存在的话就更新,不存在的话就插入。这样就可以保证去重了。

这确实是一种很简单的方法,其实原理很简单,就是在每次插入数据前,对数据库中查询,是否有该 ID,如果没有就插入,如果有就放弃。

对于数据量比较少的项目,这确实是一种很简单的方法,很简单就完成了目标。

但是,我们现在说的是百万级数据,如果每一条数据在插入前,都需要去查询该数据是否在数据库,那会多么耗时,效率会大大较低,那么还有什么好办法呢?

索引

MongoDB 索引

索引能够实现高效地查询。没有索引,MongoDB 就必须扫描集合中的所有文档,才能找到匹配查询语句的文档。这种扫描毫无效率可言,需要处理大量的数据。

索引是一种特殊的数据结构,将一小块数据集保存为容易遍历的形式。索引能够存储某种特殊字段或字段集的值,并按照索引指定的方式将字段值进行排序。

我们可以借助索引,使用 insert_one方法提高效率。代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class MongoDBPipeline(object):
def open_spider(self, spider):
self.client = mongodb_client
self.db = self.client.get_database()
self.collection = self.db['test']
# 添加唯一索引
self.collection.create_index('id', unique=True)
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
try:
self.collection.insert_one(dict(item))
return item
except DuplicateKeyError:
spider.logger.debug(' duplicate key error collection')
return item

其实很简单,就是在 open_spider先创建唯一索引,然后再插入数据。注意需要在process_item中使用异常处理,因为很有可能插入重复数据,到时候就会输出日志。

其他方法

mongo 除了 insert_one方法还有一种,insert_many

insert_many(documents, ordered=True, bypass_document_validation=False, session=None)

Insert an iterable of documents.

1
2
3
4
5
6
7
>>> db.test.count()
0
>>> result = db.test.insert_many([{'x': i} for i in range(2)])
>>> result.inserted_ids
[ObjectId('54f113fffba522406c9cc20e'), ObjectId('54f113fffba522406c9cc20f')]
>>> db.test.count()
2

这样插入的数据不再是一条,而是很多,

What’s the difference between insert(), insertOne() and insertMany() methods on MongoDB

大佬有写到,可以去看看。

同时插入多条数据,减轻数据库压力。但是这个“多”到底还是多少,目前不得而知。

结语

除了更多机器和更多节点,还有很多方法可以提升 Scrapy运行速度。

今天说到的是管道阻塞问题,还有其他地方也可以优化,还需要努力。