import asyncio import calendar import contextlib import datetime import heapq import itertools import os # noqa import pathlib import pickle import re import time from collections import defaultdict from http.cookies import BaseCookie, Morsel, SimpleCookie from typing import (
DefaultDict,
Dict,
Iterable,
Iterator,
List,
Mapping,
Optional,
Set,
Tuple,
Union,
cast,
)
from yarl import URL
from .abc import AbstractCookieJar, ClearCookiePredicate from .helpers import is_ip_address from .typedefs import LooseCookies, PathLike, StrOrURL
__all__ = ("CookieJar", "DummyCookieJar")
CookieItem = Union[str, "Morsel[str]"]
# We cache these string methods here as their use is in performance critical code.
_FORMAT_PATH = "{}/{}".format
_FORMAT_DOMAIN_REVERSED = "{1}.{0}".format
# The minimum number of scheduled cookie expirations before we start cleaning up # the expiration heap. This is a performance optimization to avoid cleaning up the # heap too often when there are only a few scheduled expirations.
_MIN_SCHEDULED_COOKIE_EXPIRATION = 100
class CookieJar(AbstractCookieJar): """Implements cookie storage adhering to RFC 6265."""
# calendar.timegm() fails for timestamps after datetime.datetime.max # Minus one as a loss of precision occurs when timestamp() is called.
MAX_TIME = (
int(datetime.datetime.max.replace(tzinfo=datetime.timezone.utc).timestamp()) - 1
) try:
calendar.timegm(time.gmtime(MAX_TIME)) except (OSError, ValueError): # Hit the maximum representable time on Windows # https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/localtime-localtime32-localtime64 # Throws ValueError on PyPy 3.8 and 3.9, OSError elsewhere
MAX_TIME = calendar.timegm((3000, 12, 31, 23, 59, 59, -1, -1, -1)) except OverflowError: # #4515: datetime.max may not be representable on 32-bit platforms
MAX_TIME = 2**31 - 1 # Avoid minuses in the future, 3x faster
SUB_MAX_TIME = MAX_TIME - 1
now = time.time()
to_del = [
key for (domain, path), cookie in self._cookies.items() for name, morsel in cookie.items() if (
(key := (domain, path, name)) in self._expirations and self._expirations[key] <= now
) or predicate(morsel)
] if to_del:
self._delete_cookies(to_del)
# If the expiration heap grows larger than the number expirations # times two, we clean it up to avoid keeping expired entries in # the heap and consuming memory. We guard this with a minimum # threshold to avoid cleaning up the heap too often when there are # only a few scheduled expirations. if (
expire_heap_len > _MIN_SCHEDULED_COOKIE_EXPIRATION and expire_heap_len > len(self._expirations) * 2
): # Remove any expired entries from the expiration heap # that do not match the expiration time in the expirations # as it means the cookie has been re-added to the heap # with a different expiration time.
self._expire_heap = [
entry for entry in self._expire_heap if self._expirations.get(entry[1]) == entry[0]
]
heapq.heapify(self._expire_heap)
now = time.time()
to_del: List[Tuple[str, str, str]] = [] # Find any expired cookies and add them to the to-delete list while self._expire_heap:
when, cookie_key = self._expire_heap[0] if when > now: break
heapq.heappop(self._expire_heap) # Check if the cookie hasn't been re-added to the heap # with a different expiration time as it will be removed # later when it reaches the top of the heap and its # expiration time is met. if self._expirations.get(cookie_key) == when:
to_del.append(cookie_key)
if to_del:
self._delete_cookies(to_del)
def _delete_cookies(self, to_del: List[Tuple[str, str, str]]) -> None: for domain, path, name in to_del:
self._host_only_cookies.discard((domain, name))
self._cookies[(domain, path)].pop(name, None)
self._morsel_cache[(domain, path)].pop(name, None)
self._expirations.pop((domain, path, name), None)
def _expire_cookie(self, when: float, domain: str, path: str, name: str) -> None:
cookie_key = (domain, path, name) if self._expirations.get(cookie_key) == when: # Avoid adding duplicates to the heap return
heapq.heappush(self._expire_heap, (when, cookie_key))
self._expirations[cookie_key] = when
ifnot self._unsafe and is_ip_address(hostname): # Don't accept cookies from IPs return
if isinstance(cookies, Mapping):
cookies = cookies.items()
for name, cookie in cookies: ifnot isinstance(cookie, Morsel):
tmp = SimpleCookie()
tmp[name] = cookie # type: ignore[assignment]
cookie = tmp[name]
domain = cookie["domain"]
# ignore domains with trailing dots if domain and domain[-1] == ".":
domain = "" del cookie["domain"]
ifnot domain and hostname isnotNone: # Set the cookie's domain to the response hostname # and set its host-only-flag
self._host_only_cookies.add((hostname, name))
domain = cookie["domain"] = hostname
if domain and domain[0] == ".": # Remove leading dot
domain = domain[1:]
cookie["domain"] = domain
if hostname andnot self._is_domain_match(domain, hostname): # Setting cookies for different domains is not allowed continue
path = cookie["path"] ifnot path or path[0] != "/": # Set the cookie's path to the response path
path = response_url.path ifnot path.startswith("/"):
path = "/" else: # Cut everything from the last slash to the end
path = "/" + path[1 : path.rfind("/")]
cookie["path"] = path
path = path.rstrip("/")
key = (domain, path) if self._cookies[key].get(name) != cookie: # Don't blow away the cache if the same # cookie gets set again
self._cookies[key][name] = cookie
self._morsel_cache[key].pop(name, None)
self._do_expiration()
def filter_cookies(self, request_url: URL = URL()) -> "BaseCookie[str]": """Returns this jar's cookies filtered by their attributes."""
filtered: Union[SimpleCookie, "BaseCookie[str]"] = (
SimpleCookie() if self._quote_cookie else BaseCookie()
) ifnot self._cookies: # Skip do_expiration() if there are no cookies. return filtered
self._do_expiration() ifnot self._cookies: # Skip rest of function if no non-expired cookies. return filtered
request_url = URL(request_url)
hostname = request_url.raw_host or""
is_not_secure = request_url.scheme notin ("https", "wss") if is_not_secure and self._treat_as_secure_origin:
request_origin = URL() with contextlib.suppress(ValueError):
request_origin = request_url.origin()
is_not_secure = request_origin notin self._treat_as_secure_origin
# Send shared cookie for c in self._cookies[("", "")].values():
filtered[c.key] = c.value
if is_ip_address(hostname): ifnot self._unsafe: return filtered
domains: Iterable[str] = (hostname,) else: # Get all the subdomains that might match a cookie (e.g. "foo.bar.com", "bar.com", "com")
domains = itertools.accumulate(
reversed(hostname.split(".")), _FORMAT_DOMAIN_REVERSED
)
# Get all the path prefixes that might match a cookie (e.g. "", "/foo", "/foo/bar")
paths = itertools.accumulate(request_url.path.split("/"), _FORMAT_PATH) # Create every combination of (domain, path) pairs.
pairs = itertools.product(domains, paths)
if (domain, name) in self._host_only_cookies and domain != hostname: continue
# Skip edge case when the cookie has a trailing slash but request doesn't. if len(cookie["path"]) > path_len: continue
if is_not_secure and cookie["secure"]: continue
# We already built the Morsel so reuse it here if name in self._morsel_cache[p]:
filtered[name] = self._morsel_cache[p][name] continue
# It's critical we use the Morsel so the coded_value # (based on cookie version) is preserved
mrsl_val = cast("Morsel[str]", cookie.get(cookie.key, Morsel()))
mrsl_val.set(cookie.key, cookie.value, cookie.coded_value)
self._morsel_cache[p][name] = mrsl_val
filtered[name] = mrsl_val
return filtered
@staticmethod def _is_domain_match(domain: str, hostname: str) -> bool: """Implements domain matching adhering to RFC 6265.""" if hostname == domain: returnTrue
ifnot hostname.endswith(domain): returnFalse
non_matching = hostname[: -len(domain)]
ifnot non_matching.endswith("."): returnFalse
returnnot is_ip_address(hostname)
@classmethod def _parse_date(cls, date_str: str) -> Optional[int]: """Implements date string parsing adhering to RFC 6265.""" ifnot date_str: returnNone
hour = minute = second = 0
day = 0
month = 0
year = 0
for token_match in cls.DATE_TOKENS_RE.finditer(date_str):
token = token_match.group("token")
ifnot found_time:
time_match = cls.DATE_HMS_TIME_RE.match(token) if time_match:
found_time = True
hour, minute, second = (int(s) for s in time_match.groups()) continue
ifnot found_day:
day_match = cls.DATE_DAY_OF_MONTH_RE.match(token) if day_match:
found_day = True
day = int(day_match.group()) continue
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.