def _ensure_running(self): # type: (...) -> None if self._thread_for_pid == os.getpid() and self._thread isnotNone: returnNone with self._thread_lock: if self._thread_for_pid == os.getpid() and self._thread isnotNone: returnNone
def _thread(): # type: (...) -> None while self._running:
time.sleep(self.flush_interval) if self.pending and self._running:
self.flush()
def update(
self,
sid=None, # type: Optional[Union[str, uuid.UUID]]
did=None, # type: Optional[str]
timestamp=None, # type: Optional[datetime]
duration=None, # type: Optional[float]
status=None, # type: Optional[SessionStatus]
release=None, # type: Optional[str]
environment=None, # type: Optional[str]
user_agent=None, # type: Optional[str]
ip_address=None, # type: Optional[str]
errors=None, # type: Optional[int]
user=None, # type: Optional[Any]
): # type: (...) -> None # If a user is supplied we pull some data form it if user: if ip_address isNone:
ip_address = user.get("ip_address") if did isNone:
did = user.get("id") or user.get("email") or user.get("username")
if sid isnotNone:
self.sid = _make_uuid(sid) if did isnotNone:
self.did = str(did) if timestamp isNone:
timestamp = datetime.utcnow()
self.timestamp = timestamp if duration isnotNone:
self.duration = duration if release isnotNone:
self.release = release if environment isnotNone:
self.environment = environment if ip_address isnotNone:
self.ip_address = ip_address if user_agent isnotNone:
self.user_agent = user_agent if errors isnotNone:
self.errors = errors
if status isnotNone:
self.status = status
def close(
self, status=None# type: Optional[SessionStatus]
): # type: (...) -> Any if status isNoneand self.status == "ok":
status = "exited" if status isnotNone:
self.update(status=status)
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.