"""
This module provides a pool manager that uses Google App Engine
's
`URLFetch Service <
https://cloud.google.com/appengine/docs/python/urlfetch>`_.
Example usage::
from urllib3
import PoolManager
from urllib3.contrib.appengine
import AppEngineManager, is_appengine_sandbox
if is_appengine_sandbox():
# AppEngineManager uses AppEngine's URLFetch API behind the scenes
http = AppEngineManager()
else:
# PoolManager uses a socket-level API behind the scenes
http = PoolManager()
r = http.request(
'GET',
'https://google.com/')
There are `limitations <
https://cloud.google.com/appengine/docs/python/\
urlfetch/
#Python_Quotas_and_limits>`_ to the URLFetch service and it may not be
the best choice
for your application. There are three options
for using
urllib3 on Google App Engine:
1. You can use :
class:`AppEngineManager`
with URLFetch. URLFetch
is
cost-effective
in many circumstances
as long
as your usage
is within the
limitations.
2. You can use a normal :
class:`~urllib3.PoolManager` by enabling sockets.
Sockets also have `limitations
and restrictions
<
https://cloud.google.com/appengine/docs/python/sockets/\
#limitations-and-restrictions>`_ and have a lower free quota than URLFetch.
To use sockets, be sure to specify the following
in your ``app.yaml``::
env_variables:
GAE_USE_SOCKETS_HTTPLIB :
'true'
3.
If you are using `App Engine Flexible
<
https://cloud.google.com/appengine/docs/flexible/>`_, you can use the standard
:
class:`PoolManager` without any configuration
or special environment variables.
"""
from __future__
import absolute_import
import io
import logging
import warnings
from ..exceptions
import (
HTTPError,
HTTPWarning,
MaxRetryError,
ProtocolError,
SSLError,
TimeoutError,
)
from ..packages.six.moves.urllib.parse
import urljoin
from ..request
import RequestMethods
from ..response
import HTTPResponse
from ..util.retry
import Retry
from ..util.timeout
import Timeout
from .
import _appengine_environ
try:
from google.appengine.api
import urlfetch
except ImportError:
urlfetch =
None
log = logging.getLogger(__name__)
class AppEnginePlatformWarning(HTTPWarning):
pass
class AppEnginePlatformError(HTTPError):
pass
class AppEngineManager(RequestMethods):
"""
Connection manager
for Google App Engine sandbox applications.
This manager uses the URLFetch service directly instead of using the
emulated httplib,
and is subject to URLFetch limitations
as described
in
the App Engine documentation `here
<
https://cloud.google.com/appengine/docs/python/urlfetch>`_.
Notably it will
raise an :
class:`AppEnginePlatformError`
if:
* URLFetch
is not available.
*
If you attempt to use this on App Engine Flexible,
as full socket
support
is available.
*
If a request size
is more than 10 megabytes.
*
If a response size
is more than 32 megabytes.
*
If you use an unsupported request method such
as OPTIONS.
Beyond those cases, it will
raise normal urllib3 errors.
"""
def __init__(
self,
headers=
None,
retries=
None,
validate_certificate=
True,
urlfetch_retries=
True,
):
if not urlfetch:
raise AppEnginePlatformError(
"URLFetch is not available in this environment."
)
warnings.warn(
"urllib3 is using URLFetch on Google App Engine sandbox instead "
"of sockets. To use sockets directly instead of URLFetch see "
"https://urllib3.readthedocs.io/en/1.26.x/reference/urllib3.contrib.html.",
AppEnginePlatformWarning,
)
RequestMethods.__init__(self, headers)
self.validate_certificate = validate_certificate
self.urlfetch_retries = urlfetch_retries
self.retries = retries
or Retry.DEFAULT
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# Return False to re-raise any potential exceptions
return False
def urlopen(
self,
method,
url,
body=
None,
headers=
None,
retries=
None,
redirect=
True,
timeout=Timeout.DEFAULT_TIMEOUT,
**response_kw
):
retries = self._get_retries(retries, redirect)
try:
follow_redirects = redirect
and retries.redirect != 0
and retries.total
response = urlfetch.fetch(
url,
payload=body,
method=method,
headers=headers
or {},
allow_truncated=
False,
follow_redirects=self.urlfetch_retries
and follow_redirects,
deadline=self._get_absolute_timeout(timeout),
validate_certificate=self.validate_certificate,
)
except urlfetch.DeadlineExceededError
as e:
raise TimeoutError(self, e)
except urlfetch.InvalidURLError
as e:
if "too large" in str(e):
raise AppEnginePlatformError(
"URLFetch request too large, URLFetch only "
"supports requests up to 10mb in size.",
e,
)
raise ProtocolError(e)
except urlfetch.DownloadError
as e:
if "Too many redirects" in str(e):
raise MaxRetryError(self, url, reason=e)
raise ProtocolError(e)
except urlfetch.ResponseTooLargeError
as e:
raise AppEnginePlatformError(
"URLFetch response too large, URLFetch only supports"
"responses up to 32mb in size.",
e,
)
except urlfetch.SSLCertificateError
as e:
raise SSLError(e)
except urlfetch.InvalidMethodError
as e:
raise AppEnginePlatformError(
"URLFetch does not support method: %s" % method, e
)
http_response = self._urlfetch_response_to_http_response(
response, retries=retries, **response_kw
)
# Handle redirect?
redirect_location = redirect
and http_response.get_redirect_location()
if redirect_location:
# Check for redirect response
if self.urlfetch_retries
and retries.raise_on_redirect:
raise MaxRetryError(self, url,
"too many redirects")
else:
if http_response.status == 303:
method =
"GET"
try:
retries = retries.increment(
method, url, response=http_response, _pool=self
)
except MaxRetryError:
if retries.raise_on_redirect:
raise MaxRetryError(self, url,
"too many redirects")
return http_response
retries.sleep_for_retry(http_response)
log.debug(
"Redirecting %s -> %s", url, redirect_location)
redirect_url = urljoin(url, redirect_location)
return self.urlopen(
method,
redirect_url,
body,
headers,
retries=retries,
redirect=redirect,
timeout=timeout,
**response_kw
)
# Check if we should retry the HTTP response.
has_retry_after = bool(http_response.headers.get(
"Retry-After"))
if retries.is_retry(method, http_response.status, has_retry_after):
retries = retries.increment(method, url, response=http_response, _pool=self)
log.debug(
"Retry: %s", url)
retries.sleep(http_response)
return self.urlopen(
method,
url,
body=body,
headers=headers,
retries=retries,
redirect=redirect,
timeout=timeout,
**response_kw
)
return http_response
def _urlfetch_response_to_http_response(self, urlfetch_resp, **response_kw):
if is_prod_appengine():
# Production GAE handles deflate encoding automatically, but does
# not remove the encoding header.
content_encoding = urlfetch_resp.headers.get(
"content-encoding")
if content_encoding ==
"deflate":
del urlfetch_resp.headers[
"content-encoding"]
transfer_encoding = urlfetch_resp.headers.get(
"transfer-encoding")
# We have a full response's content,
# so let's make sure we don't report ourselves as chunked data.
if transfer_encoding ==
"chunked":
encodings = transfer_encoding.split(
",")
encodings.remove(
"chunked")
urlfetch_resp.headers[
"transfer-encoding"] =
",".join(encodings)
original_response = HTTPResponse(
# In order for decoding to work, we must present the content as
# a file-like object.
body=io.BytesIO(urlfetch_resp.content),
msg=urlfetch_resp.header_msg,
headers=urlfetch_resp.headers,
status=urlfetch_resp.status_code,
**response_kw
)
return HTTPResponse(
body=io.BytesIO(urlfetch_resp.content),
headers=urlfetch_resp.headers,
status=urlfetch_resp.status_code,
original_response=original_response,
**response_kw
)
def _get_absolute_timeout(self, timeout):
if timeout
is Timeout.DEFAULT_TIMEOUT:
return None # Defer to URLFetch's default.
if isinstance(timeout, Timeout):
if timeout._read
is not None or timeout._connect
is not None:
warnings.warn(
"URLFetch does not support granular timeout settings, "
"reverting to total or default URLFetch timeout.",
AppEnginePlatformWarning,
)
return timeout.total
return timeout
def _get_retries(self, retries, redirect):
if not isinstance(retries, Retry):
retries = Retry.from_int(retries, redirect=redirect, default=self.retries)
if retries.connect
or retries.read
or retries.redirect:
warnings.warn(
"URLFetch only supports total retries and does not "
"recognize connect, read, or redirect retry parameters.",
AppEnginePlatformWarning,
)
return retries
# Alias methods from _appengine_environ to maintain public API interface.
is_appengine = _appengine_environ.is_appengine
is_appengine_sandbox = _appengine_environ.is_appengine_sandbox
is_local_appengine = _appengine_environ.is_local_appengine
is_prod_appengine = _appengine_environ.is_prod_appengine
is_prod_appengine_mvms = _appengine_environ.is_prod_appengine_mvms