import abc import asyncio import base64 import functools import hashlib import html import inspect import keyword import os import re import sys import warnings from functools import wraps from pathlib import Path from types import MappingProxyType from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Container,
Dict,
Final,
Generator,
Iterable,
Iterator,
List,
Mapping,
NoReturn,
Optional,
Pattern,
Set,
Sized,
Tuple,
Type,
TypedDict,
Union,
cast,
)
from yarl import URL, __version__ as yarl_version
from . import hdrs from .abc import AbstractMatchInfo, AbstractRouter, AbstractView from .helpers import DEBUG from .http import HttpVersion11 from .typedefs import Handler, PathLike from .web_exceptions import (
HTTPException,
HTTPExpectationFailed,
HTTPForbidden,
HTTPMethodNotAllowed,
HTTPNotFound,
) from .web_fileresponse import FileResponse from .web_request import Request from .web_response import Response, StreamResponse from .web_routedef import AbstractRouteDef
if expect_handler isNone:
expect_handler = _default_expect_handler
assert asyncio.iscoroutinefunction(
expect_handler
), f"Coroutine is expected, got {expect_handler!r}"
method = method.upper() ifnot HTTP_METHOD_RE.match(method): raise ValueError(f"{method} is not allowed HTTP method")
assert callable(handler), handler if asyncio.iscoroutinefunction(handler): pass elif inspect.isgeneratorfunction(handler):
warnings.warn( "Bare generators are deprecated, ""use @coroutine wrapper",
DeprecationWarning,
) elif isinstance(handler, type) and issubclass(handler, AbstractView): pass else:
warnings.warn( "Bare functions are deprecated, ""use async ones", DeprecationWarning
)
@wraps(handler)
async def handler_wrapper(request: Request) -> StreamResponse:
result = old_handler(request) # type: ignore[call-arg] if asyncio.iscoroutine(result):
result = await result assert isinstance(result, StreamResponse) return result
@current_app.setter def current_app(self, app: "Application") -> None: if DEBUG: # pragma: no cover if app notin self._apps: raise RuntimeError( "Expected one of the following apps {!r}, got {!r}".format(
self._apps, app
)
)
self._current_app = app
Just send "100 Continue" to client. raise HTTPExpectationFailed if value of header isnot"100-continue" """
expect = request.headers.get(hdrs.EXPECT, "") if request.version == HttpVersion11: if expect.lower() == "100-continue":
await request.writer.write(b"HTTP/1.1 100 Continue\r\n\r\n") # Reset output_size as we haven't started the main body yet.
request.writer.output_size = 0 else: raise HTTPExpectationFailed(text="Unknown Expect: %s" % expect)
for route_obj in self._routes: if route_obj.method == method or route_obj.method == hdrs.METH_ANY: raise RuntimeError( "Added route will never be executed, " "method {route.method} is already " "registered".format(route=route_obj)
)
def _match(self, path: str) -> Optional[Dict[str, str]]: # string comparison is about 10 times faster than regexp matching if self._path == path: return {} returnNone
def _match(self, path: str) -> Optional[Dict[str, str]]:
match = self._pattern.fullmatch(path) if match isNone: returnNone return {
key: _unquote_path_safe(value) for key, value in match.groupdict().items()
}
url = URL.build(path=self._prefix, encoded=True) # filename is not encoded if YARL_VERSION < (1, 6):
url = url / filename.replace("%", "%25") else:
url = url / filename
if append_version:
unresolved_path = self._directory.joinpath(filename) try: if self._follow_symlinks:
normalized_path = Path(os.path.normpath(unresolved_path))
normalized_path.relative_to(self._directory)
filepath = normalized_path.resolve() else:
filepath = unresolved_path.resolve()
filepath.relative_to(self._directory) except (ValueError, FileNotFoundError): # ValueError for case when path point to symlink # with follow_symlinks is False return url # relatively safe if filepath.is_file(): # TODO cache file content # with file watcher for cache invalidation with filepath.open("rb") as f:
file_bytes = f.read()
h = self._get_file_hash(file_bytes)
url = url.with_query({self.VERSION_KEY: h}) return url return url
@staticmethod def _get_file_hash(byte_array: bytes) -> str:
m = hashlib.sha256() # todo sha256 can be configurable param
m.update(byte_array)
b64 = base64.urlsafe_b64encode(m.digest()) return b64.decode("ascii")
async def _handle(self, request: Request) -> StreamResponse:
rel_url = request.match_info["filename"]
filename = Path(rel_url) if filename.anchor: # rel_url is an absolute name like # /static/\\machine_name\c$ or /static/D:\path # where the static dir is totally different raise HTTPForbidden()
def _resolve_path_to_response(self, unresolved_path: Path) -> StreamResponse: """Take the unresolved path and query the file system to form a response.""" # Check for access outside the root directory. For follow symlinks, URI # cannot traverse out, but symlinks can. Otherwise, no access outside # root is permitted. try: if self._follow_symlinks:
normalized_path = Path(os.path.normpath(unresolved_path))
normalized_path.relative_to(self._directory)
file_path = normalized_path.resolve() else:
file_path = unresolved_path.resolve()
file_path.relative_to(self._directory) except (ValueError, *CIRCULAR_SYMLINK_ERROR) as error: # ValueError is raised for the relative check. Circular symlinks # raise here on resolving for python < 3.13. raise HTTPNotFound() from error
# if path is a directory, return the contents if permitted. Note the # directory check will raise if a segment is not readable. try: if file_path.is_dir(): if self._show_index: return Response(
text=self._directory_as_html(file_path),
content_type="text/html",
) else: raise HTTPForbidden() except PermissionError as error: raise HTTPForbidden() from error
# Return the file response, which handles all other checks. return FileResponse(file_path, chunk_size=self._chunk_size)
def _directory_as_html(self, dir_path: Path) -> str: """returns directory's index as html.""" assert dir_path.is_dir()
index_list = []
dir_index = dir_path.iterdir() for _file in sorted(dir_index): # show file url as relative to static path
rel_path = _file.relative_to(self._directory).as_posix()
quoted_file_url = _quote_path(f"{self._prefix}/{rel_path}")
# if file is a directory, add '/' to the end of the name if _file.is_dir():
file_name = f"{_file.name}/" else:
file_name = _file.name
def _add_prefix_to_resources(self, prefix: str) -> None:
router = self._app.router for resource in router.resources(): # Since the canonical path of a resource is about # to change, we need to unindex it and then reindex
router.unindex_resource(resource)
resource.add_prefix(prefix)
router.index_resource(resource)
def url_for(self, *args: str, **kwargs: str) -> URL: raise RuntimeError(".url_for() is not supported ""by sub-application root")
class AbstractRuleMatching(abc.ABC):
@abc.abstractmethod # pragma: no branch
async def match(self, request: Request) -> bool: """Return bool if the request satisfies the criteria"""
@abc.abstractmethod # pragma: no branch def get_info(self) -> _InfoDict: """Return a dict with additional info useful for introspection"""
@property
@abc.abstractmethod # pragma: no branch def canonical(self) -> str: """Return a str"""
class Domain(AbstractRuleMatching):
re_part = re.compile(r"(?!-)[a-z\d-]{1,63}(?)
def _raise_allowed_methods(self) -> NoReturn:
allowed_methods = {m for m in hdrs.METH_ALL if hasattr(self, m.lower())} raise HTTPMethodNotAllowed(self.request.method, allowed_methods)
def __contains__(self, resource: object) -> bool: return resource in self._resources
class RoutesView(Sized, Iterable[AbstractRoute], Container[AbstractRoute]): def __init__(self, resources: List[AbstractResource]):
self._routes: List[AbstractRoute] = [] for resource in resources: for route in resource:
self._routes.append(route)
# Walk the url parts looking for candidates. We walk the url backwards # to ensure the most explicit match is found first. If there are multiple # candidates for a given url part because there are multiple resources # registered for the same canonical path, we resolve them in a linear # fashion to ensure registration order is respected.
url_part = request.rel_url.path_safe while url_part: for candidate in resource_index.get(url_part, ()):
match_dict, allowed = await candidate.resolve(request) if match_dict isnotNone: return match_dict else:
allowed_methods |= allowed if url_part == "/": break
url_part = url_part.rpartition("/")[0] or"/"
# # We didn't find any candidates, so we'll try the matched sub-app # resources which we have to walk in a linear fashion because they # have regex/wildcard match rules and we cannot index them. # # For most cases we do not expect there to be many of these since # currently they are only added by `add_domain` # for resource in self._matched_sub_app_resources:
match_dict, allowed = await resource.resolve(request) if match_dict isnotNone: return match_dict else:
allowed_methods |= allowed
if allowed_methods: return MatchInfoError(HTTPMethodNotAllowed(request.method, allowed_methods))
def register_resource(self, resource: AbstractResource) -> None: assert isinstance(
resource, AbstractResource
), f"Instance of AbstractResource class is required, got {resource!r}" if self.frozen: raise RuntimeError("Cannot register a resource into frozen router.")
name = resource.name
if name isnotNone:
parts = self.NAME_SPLIT_RE.split(name) for part in parts: if keyword.iskeyword(part): raise ValueError(
f"Incorrect route name {name!r}, " "python keywords cannot be used " "for route name"
) ifnot part.isidentifier(): raise ValueError( "Incorrect route name {!r}, " "the name should be a sequence of " "python identifiers separated " "by dash, dot or column".format(name)
) if name in self._named_resources: raise ValueError( "Duplicate {!r}, " "already handled by {!r}".format(name, self._named_resources[name])
)
self._named_resources[name] = resource
self._resources.append(resource)
if isinstance(resource, MatchedSubAppResource): # We cannot index match sub-app resources because they have match rules
self._matched_sub_app_resources.append(resource) else:
self.index_resource(resource)
def _get_resource_index_key(self, resource: AbstractResource) -> str: """Return a key to index the resource in the resource index.""" if"{"in (index_key := resource.canonical): # strip at the first { to allow for variables, and than # rpartition at / to allow for variable parts in the path # For example if the canonical path is `/core/locations{tail:.*}` # the index key will be `/core` since index is based on the # url parts split by `/`
index_key = index_key.partition("{")[0].rpartition("/")[0] return index_key.rstrip("/") or"/"
def index_resource(self, resource: AbstractResource) -> None: """Add a resource to the resource index."""
resource_key = self._get_resource_index_key(resource) # There may be multiple resources for a canonical path # so we keep them in a list to ensure that registration # order is respected.
self._resource_index.setdefault(resource_key, []).append(resource)
def unindex_resource(self, resource: AbstractResource) -> None: """Remove a resource from the resource index."""
resource_key = self._get_resource_index_key(resource)
self._resource_index[resource_key].remove(resource)
def add_resource(self, path: str, *, name: Optional[str] = None) -> Resource: if path andnot path.startswith("/"): raise ValueError("path should be started with / or be empty") # Reuse last added resource if path and name are the same if self._resources:
resource = self._resources[-1] if resource.name == name and resource.raw_match(path): return cast(Resource, resource) ifnot ("{"in path or"}"in path or ROUTE_RE.search(path)):
resource = PlainResource(path, name=name)
self.register_resource(resource) return resource
resource = DynamicResource(path, name=name)
self.register_resource(resource) return resource
If allow_head istrue, another
route is added allowing head requests to the same endpoint. """
resource = self.add_resource(path, name=name) if allow_head:
resource.add_route(hdrs.METH_HEAD, handler, **kwargs) return resource.add_route(hdrs.METH_GET, handler, **kwargs)
def add_view(
self, path: str, handler: Type[AbstractView], **kwargs: Any
) -> AbstractRoute: """Shortcut for add_route with ANY methods for a class-based view.""" return self.add_route(hdrs.METH_ANY, path, handler, **kwargs)
def freeze(self) -> None:
super().freeze() for resource in self._resources:
resource.freeze()
Parameter should be a sequence of RouteDef objects.
Returns a list of registered AbstractRoute instances. """
registered_routes = [] for route_def in routes:
registered_routes.extend(route_def.register(self)) return registered_routes
def _quote_path(value: str) -> str: if YARL_VERSION < (1, 6):
value = value.replace("%", "%25") return URL.build(path=value, encoded=False).raw_path
def _requote_path(value: str) -> str: # Quote non-ascii characters and other characters which must be quoted, # but preserve existing %-sequences.
result = _quote_path(value) if"%"in value:
result = result.replace("%25", "%") return result
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.