Fork me on GitHub

快速写一个爬虫

这是崔斯特的第五十七篇原创文章

快、更快 (๑• . •๑)

缘来

今天下班前,老板让我帮忙爬一个数据,简单看了下,需要登录,看起来应该不难。回到家,注册一个账号,复制url,然后用postman转代码,简单暴力,直接撸。

这里说下postman的一个BUG,发送请求不会获得任何数据,如果你遇到,建议升级postman为最新版本

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import pymongo
import requests
import traceback
from concurrent import futures
from urllib.request import urljoin
from scrapy.selector import Selector
logging.basicConfig(level=logging.INFO)
client = pymongo.MongoClient()
coll = client['table']['collection']
# coll.create_index('url', unique=True)
url = "your urls"
headers = {
'user-agent': "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36",
'accept': "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
'accept-encoding': "gzip, deflate, br",
'accept-language': "zh-CN,zh;q=0.9,en;q=0.8",
}
def fetch(page):
try:
proxies = {
'http': proxy,
'https': proxy
}
response = requests.get(url.format(page), headers=headers, timeout=20, proxies=proxies)
if response.status_code == 200:
s = Selector(text=response.text)
for i in s.xpath('//*[@id="search-results"]/tbody/tr'):
url_ = i.xpath('td[4]/a/@href').extract_first()
detail_url = urljoin(url, url_)
data = get_detail(detail_url)
logging.info('success save data {} '.format(data['url']))
save_mongo(data)
except Exception as e:
logging.error(traceback.format_exc())
def get_detail(detail_url):
try:
proxies = {
'http': proxy,
'https': proxy
}
response = requests.get(detail_url, headers=headers, timeout=20, proxies=proxies)
if response.status_code == 200:
s = Selector(text=response.text)
data = dict()
data['url'] = detail_url
data['SMILES '] = s.xpath('//*[@id="smiles"]/text()').extract_first()
img = s.xpath('//*[@id="molecule-image"]/img/@src').extract_first()
data['img'] = urljoin(detail_url, img)
data['formula'] = s.xpath('//*[@id="name-structure"]/tbody/tr[2]/td[2]/text()').extract_first()
data['Mass'] = s.xpath('//*[@id="name-structure"]/tbody/tr[3]/td[2]/text()').extract_first()
return data
except Exception as e:
logging.error(traceback.format_exc())
def save_mongo(data):
try:
coll.insert(data)
except pymongo.errors.DuplicateKeyError:
pass
if __name__ == '__main__':
# for i in range(1, 11):
# fetch(str(i))
# if use Thread
with futures.ThreadPoolExecutor(max_workers=50) as executor:
to_do = []
for i in range(1, 51):
future = executor.submit(fetch, str(i))
to_do.append(future)

代码相当简单,fetch函数用来抓取列表页,get_detail函数抓取详情页,save_mongo保存数据库,需要说明下的就是最后使用的多线程了,这里号使用的是futures,并不想说很多大道理,来看看文档

提高速度

concurrent.futures 是python3新增加的一个库,用于并发处理,提供了多线程和多进程的并发功能

线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))

进程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()

关于入库方面,建议是增加唯一索引, coll.create_index('url', unique=True),一个是去重,一个是提高查询速度。