def _lookup_matches(self, line: str, matches: List[str]) -> bool: for m in matches: if re.match(m, line): returnTrue returnFalse
def _lookup_lognos(self, line: str, lognos: set) -> bool: if len(lognos) > 0:
m = self.RE_APLOGNO.match(line) if m and m.group('aplogno') in lognos: returnTrue returnFalse
def add_ignored_lognos(self, lognos: List[str]): for l in lognos:
self._ignored_lognos.add(l)
def _is_ignored(self, line: str) -> bool: if self._lookup_matches(line, self._ignored_matches): returnTrue if self._lookup_lognos(line, self._ignored_lognos): returnTrue returnFalse
def ignore_recent(self, lognos: List[str] = [], matches: List[str] = []): """After a test case triggered errors/warnings on purpose, add
those to our 'caught' list so the do not get reported as'missed'. """
self._recent_errors = []
self._recent_warnings = [] if os.path.isfile(self._path): with open(self._path) as fd:
fd.seek(self._recent_pos, os.SEEK_SET)
lognos_set = set(lognos) for line in fd: if self._is_ignored(line): continue if self._lookup_matches(line, matches):
self._caught_matches.add(line) continue
m = self.RE_ERRLOG_WARN.match(line) if m and self._lookup_lognos(line, lognos_set):
self._caught_warnings.add(line) continue
m = self.RE_ERRLOG_ERROR.match(line) if m and self._lookup_lognos(line, lognos_set):
self._caught_errors.add(line) continue
self._recent_pos = fd.tell()
def get_missed(self) -> Tuple[List[str], List[str]]:
errors = []
warnings = []
self._recent_errors = []
self._recent_warnings = [] if os.path.isfile(self._path): with open(self._path) as fd:
fd.seek(self._start_pos, os.SEEK_SET) for line in fd: if self._is_ignored(line): continue if line in self._caught_matches: continue
m = self.RE_ERRLOG_WARN.match(line) if m and line notin self._caught_warnings:
warnings.append(line) continue
m = self.RE_ERRLOG_ERROR.match(line) if m and line notin self._caught_errors:
errors.append(line) continue
self._start_pos = self._recent_pos = fd.tell()
self._caught_errors = set()
self._caught_warnings = set()
self._caught_matches = set() return errors, warnings
def scan_recent(self, pattern: re.Pattern, timeout=10): ifnot os.path.isfile(self.path): returnFalse with open(self.path) as fd:
end = datetime.now() + timedelta(seconds=timeout) whileTrue:
fd.seek(self._recent_pos, os.SEEK_SET) for line in fd: if pattern.match(line): returnTrue if datetime.now() > end: raise TimeoutError(f"pattern not found in error log after {timeout} seconds")
time.sleep(.1) returnFalse
¤ Dauer der Verarbeitung: 0.12 Sekunden
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.