注意事项
有时间一定要熟悉Requests的文档:Requests: 让 HTTP 服务人类¶
Requests 完全满足今日 web 的需求。
- Keep-Alive & 连接池
- 国际化域名和 URL
- 带持久 Cookie 的会话
- 浏览器式的 SSL 认证
- 自动内容解码
- 基本/摘要式的身份认证
- 优雅的 key/value Cookie
- 自动解压
- Unicode 响应体
- HTTP(S) 代理支持
- 文件分块上传
- 流下载
- 连接超时
- 分块请求
- 支持
.netrc
我会列举一些使用Requests时需要注意的点。
SSL证书认证
Requests 可以为 HTTPS 请求验证 SSL 证书,就像 web 浏览器一样。SSL 验证默认是开启的,如果证书验证失败,Requests 会抛出 SSLError:
>>> requests.get('https://requestb.in')
requests.exceptions.SSLError: hostname 'requestb.in' doesn't match either of '*.herokuapp.com', 'herokuapp.com'
如果你将 verify
设置为 False,Requests 也能忽略对 SSL 证书的验证。
>>> requests.get('https://kennethreitz.org', verify=False)
<Response [200]>
默认情况下, verify
是设置为 True 的。选项 verify
仅应用于主机证书。
对于私有证书,你也可以传递一个 CA_BUNDLE 文件的路径给
verify
。你也可以设置 #REQUEST_CA_BUNDLE
环境变量。
超时设置
为防止服务器不能及时响应,大部分发至外部服务器的请求都应该带着 timeout 参数。在默认情况下,除非显式指定了 timeout 值,requests 是不会自动进行超时处理的。如果没有 timeout,你的代码可能会挂起若干分钟甚至更长时间。
r = requests.get('https://github.com', timeout=10)
我一般会把超时时间设为10秒。
封装
如果你需要频繁使用Requests发送请求是,最好是将其封装成一个通用方法。
例如这里是对post方法的简单封装,当然需要根据自己需求来做。
import requests
import json as json_
class Session(object):
def __init__(self):
self.session = requests.session()
def post(self, url, data=None, json=None, **kwargs):
retry_times = 0
while True:
try:
res = self.session.post(url, data=data, json=json, **kwargs)
except (requests.exceptions.ConnectionError, requests.ReadTimeout):
pass
else:
try:
res.json()
except json_.JSONDecodeError:
retry_times += 1
if retry_times > 20:
return None
else:
continue
return res
还可以看看这里,借鉴别人的思想:Python’s encapsulation of requests
# -*- coding:utf-8 -*-
import requests
from concurrent.futures import ThreadPoolExecutor
Read from Tools.Config import Config configuration file
From Tools. Log import Log # log management
From Tools. tools import decoLOG log decoration
'''
Functions: Requests class
Usage method:
Author: Guo Kechang
Time of completion: 20180224
Update:
Update time:
'''
class Requests(object):
def __init__(self):
self.session = requests.session()
self.header = {}
# By default, URLs come from configuration files to facilitate switching between different test environments, and can also be set dynamically.
self.URL = Config().getURL()
# Default 60s, can be set dynamically
self.timeout = 60
# When HTTP connection is abnormal, the number of reconnections is 3 by default. It can be set dynamically.
self.iRetryNum = 3
self.errorMsg = ""
# Content = {Use Case Number: Response Data}
self.responses = {}
# Content = {Use Case Number: Exception Information}
self.resErr={}
# Retention of original post usage
# bodyData: request's data
@decoLOG
def post(self, bodyData):
response = None
self.errorMsg = ""
try:
response = self.session.post(self.URL, data=bodyData.encode('utf-8'), headers=self.header, timeout=self.timeout)
response.raise_for_status()
except Exception as e:
self.errorMsg = str(e)
Log (). logger. error ("HTTP request exception, exception information:% s"% self. errorMsg)
return response
# Complex request concurrent processing, in the form of thread pool, use case > thread pool capacity: thread pool capacity is concurrent, otherwise, use case number is concurrent
# dicDatas: {Use Case Number: Use Case Data}
@decoLOG
def req_all(self, dicDatas, iThreadNum=5):
if len(dict(dicDatas)) < 1:
Log (). logger. error ("No test object, please confirm and try again..." ""
return self.responses.clear()
# Request use case set conversion (use case number, use case data)
seed = [i for i in dicDatas.items()]
self.responses.clear()
# Thread pool concurrent execution, iThreadNum concurrent number
with ThreadPoolExecutor(iThreadNum) as executor:
executor.map(self.req_single,seed)
# Returns response information for all requests ({use case number: response data}), HTTP connection exception: corresponding to None
return self.responses
# For single use case submission, HTTP connection failures can be reconnected, and the maximum number of reconnections can be dynamically set
def req_single(self, listData, reqType="post", iLoop=1):
response = None
# If the maximum number of reconnections is reached, the submission ends after the connection
if iLoop == self.iRetryNum:
if reqType == "post":
try:
response = requests.post(self.URL, data=listData[1].encode('utf-8'), headers=self.header,
timeout=self.timeout)
response.raise_for_status()
except Exception as e:
# Exception information is saved only when the maximum number of connections is reached, but the maximum number of connections is not reached. Exception information is empty.
self.resErr[listData[0]] = str(e)
Log (). logger. error ("HTTP request exception, exception information:% s [% d]% (str (e), iLoop))
self.responses[listData[0]] = response
else:
# for future: other request method expand
pass
# The maximum number of connections is not reached, and if an exception occurs, a reconnection attempt is made
else:
if reqType == "post":
try:
response = requests.post(self.URL, data=listData[1].encode('utf-8'), headers=self.header,
timeout=self.timeout)
response.raise_for_status()
except Exception as e:
Log (). logger. error ("HTTP request exception, exception information:% s [% d]% (str (e), iLoop))
# Increased number of reconnections
iLoop += 1
# Reconnect
self.req_single(listData, reqType, iLoop)
# Current connection termination
return None
self.responses[listData[0]] = response
else:
# for future: other request method expand
pass
# Set up SoapAction to complete the header setting of web service interface quickly
def setSoapAction(self, soapAction):
self.header["SOAPAction"] = soapAction
self.header["Content-Type"] = "text/xml;charset=UTF-8"
self.header["Connection"] = "Keep-Alive"
self.header["User-Agent"] = "InterfaceAutoTest-run"