from __future__ import absolute_import, print_function, division, unicode_literals
import _io import inspect import json as json_module import logging import re import six
from collections import namedtuple from functools import update_wrapper from requests.adapters import HTTPAdapter from requests.exceptions import ConnectionError from requests.sessions import REDIRECT_STATI from requests.utils import cookiejar_from_dict
try: from collections.abc import Sequence, Sized except ImportError: from collections import Sequence, Sized
try: from requests.packages.urllib3.response import HTTPResponse except ImportError: from urllib3.response import HTTPResponse
if six.PY2: from urlparse import urlparse, parse_qsl, urlsplit, urlunsplit from urllib import quote else: from urllib.parse import urlparse, parse_qsl, urlsplit, urlunsplit, quote
if six.PY2: try: from six import cStringIO as BufferIO except ImportError: from six import StringIO as BufferIO else: from io import BytesIO as BufferIO
try: from unittest import mock as std_mock except ImportError: import mock as std_mock
def _has_unicode(s): return any(ord(char) > 128 for char in s)
def _clean_unicode(url): # Clean up domain names, which use punycode to handle unicode chars
urllist = list(urlsplit(url))
netloc = urllist[1] if _has_unicode(netloc):
domains = netloc.split(".") for i, d in enumerate(domains): if _has_unicode(d):
d = "xn--" + d.encode("punycode").decode("ascii")
domains[i] = d
urllist[1] = ".".join(domains)
url = urlunsplit(urllist)
# Clean up path/query/params, which use url-encoding to handle unicode chars if isinstance(url.encode("utf8"), six.string_types):
url = url.encode("utf8")
chars = list(url) for i, x in enumerate(chars): if ord(x) > 128:
chars[i] = quote(x)
cookies_dict = {name: v.value for name, v in resp_cookie.items()} except ImportError: from cookies import Cookies
resp_cookies = Cookies.from_request(headers["set-cookie"])
cookies_dict = {v.name: v.value for _, v in resp_cookies.items()} return cookiejar_from_dict(cookies_dict)
_wrapper_template = """\ def wrapper%(wrapper_args)s: with responses: return func%(func_args)s """
def get_wrapped(func, responses): if six.PY2:
args, a, kw, defaults = inspect.getargspec(func)
wrapper_args = inspect.formatargspec(args, a, kw, defaults)
# Preserve the argspec for the wrapped function so that testing # tools such as pytest can continue to use their fixture injection. if hasattr(func, "__self__"):
args = args[1:] # Omit 'self'
func_args = inspect.formatargspec(args, a, kw, None) else:
signature = inspect.signature(func)
signature = signature.replace(return_annotation=inspect.Signature.empty) # If the function is wrapped, switch to *args, **kwargs for the parameters # as we can't rely on the signature to give us the arguments the function will # be called with. For example unittest.mock.patch uses required args that are # not actually passed to the function when invoked. if hasattr(func, "__wrapped__"):
wrapper_params = [
inspect.Parameter("args", inspect.Parameter.VAR_POSITIONAL),
inspect.Parameter("kwargs", inspect.Parameter.VAR_KEYWORD),
] else:
wrapper_params = [
param.replace(annotation=inspect.Parameter.empty) for param in signature.parameters.values()
]
signature = signature.replace(parameters=wrapper_params)
def _ensure_url_default_path(url): if _is_string(url):
url_parts = list(urlsplit(url)) if url_parts[2] == "":
url_parts[2] = "/"
url = urlunsplit(url_parts) return url
def _handle_body(body): if isinstance(body, six.text_type):
body = body.encode("utf-8") if isinstance(body, _io.BufferedReader): return body
return BufferIO(body)
_unspecified = object()
class BaseResponse(object):
content_type = None
headers = None
stream = False
def __init__(self, method, url, match_querystring=_unspecified):
self.method = method # ensure the url has a default path set if the url is a string
self.url = _ensure_url_default_path(url)
self.match_querystring = self._should_match_querystring(match_querystring)
self.call_count = 0
# Can't simply do a equality check on the objects directly here since __eq__ isn't # implemented for regex. It might seem to work as regex is using a cache to return # the same regex instances, but it doesn't in all cases.
self_url = self.url.pattern if isinstance(self.url, Pattern) else self.url
other_url = other.url.pattern if isinstance(other.url, Pattern) else other.url
class Response(BaseResponse): def __init__(
self,
method,
url,
body="",
json=None,
status=200,
headers=None,
stream=False,
content_type=UNSET,
**kwargs
): # if we were passed a `json` argument, # override the body and content_type if json isnotNone: assertnot body
body = json_module.dumps(json) if content_type is UNSET:
content_type = "application/json"
if content_type is UNSET:
content_type = "text/plain"
# body must be bytes if isinstance(body, six.text_type):
body = body.encode("utf-8")
self.body = body
self.status = status
self.headers = headers
self.stream = stream
self.content_type = content_type
super(Response, self).__init__(method, url, **kwargs)
def get_response(self, request): if self.body and isinstance(self.body, Exception): raise self.body
headers = self.get_headers()
status = self.status
body = _handle_body(self.body)
def remove(self, method_or_response=None, url=None): """
Removes a response previously added using ``add()``, identified
either by a response object inheriting ``BaseResponse`` or
``method`` and ``url``. Removes all matching responses.
while response in self._matches:
self._matches.remove(response)
def replace(self, method_or_response=None, url=None, body="", *args, **kwargs): """
Replaces a response previously added using ``add()``. The signature is identical to ``add()``. The response is identified using ``method`` and ``url``, and the first matching response is replaced.
index = self._matches.index(response)
self._matches[index] = response
def add_callback(
self, method, url, callback, match_querystring=False, content_type="text/plain"
): # ensure the url has a default path set if the url is a string # url = _ensure_url_default_path(url, match_querystring)
def _find_match(self, request):
found = None
found_match = None for i, match in enumerate(self._matches): if match.matches(request): if found isNone:
found = i
found_match = match else: # Multiple matches found. Remove & return the first match. return self._matches.pop(found)
if match isNone: if request.url.startswith(self.passthru_prefixes):
logger.info("request.allowed-passthru", extra={"url": request.url}) return _real_send(adapter, request, **kwargs)
not_called = [m for m in self._matches if m.call_count == 0] if not_called: raise AssertionError( "Not all requests have been executed {0!r}".format(
[(match.method, match.url) for match in not_called]
)
)
# expose default mock namespace
mock = _default_mock = RequestsMock(assert_all_requests_are_fired=False)
__all__ = ["CallbackResponse", "Response", "RequestsMock"] for __attr in (a for a in dir(_default_mock) ifnot a.startswith("_")):
__all__.append(__attr)
globals()[__attr] = getattr(_default_mock, __attr)
¤ Dauer der Verarbeitung: 0.15 Sekunden
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.