#!/bin/env python3
# SPDX-License-Identifier: GPL-2.0
import subprocess
from shutil import which
from os import pread
class PerfCounterInfo:
def __init__(self, subsys, event):
self.subsys = subsys
self.event = event
def get_perf_event_name(self):
return f'{self.subsys}/{self.event}/'
def get_turbostat_perf_id(self, counter_scope, counter_type, column_name):
return f'perf/{self.subsys}/{self.event},{counter_scope},{counter_type},{column_name}'
PERF_COUNTERS_CANDIDATES = [
PerfCounterInfo('msr' , 'mperf' ),
PerfCounterInfo('msr' , 'aperf' ),
PerfCounterInfo('msr' , 'tsc' ),
PerfCounterInfo('cstate_core' , 'c1-residency' ),
PerfCounterInfo('cstate_core' , 'c6-residency' ),
PerfCounterInfo('cstate_core' , 'c7-residency' ),
PerfCounterInfo('cstate_pkg' , 'c2-residency' ),
PerfCounterInfo('cstate_pkg' , 'c3-residency' ),
PerfCounterInfo('cstate_pkg' , 'c6-residency' ),
PerfCounterInfo('cstate_pkg' , 'c7-residency' ),
PerfCounterInfo('cstate_pkg' , 'c8-residency' ),
PerfCounterInfo('cstate_pkg' , 'c9-residency' ),
PerfCounterInfo('cstate_pkg' , 'c10-residency' ),
]
present_perf_counters = []
def check_perf_access():
perf = which('perf' )
if perf is None :
print('SKIP: Could not find perf binary, thus could not determine perf access.' )
return False
def has_perf_counter_access(counter_name):
proc_perf = subprocess.run([perf, 'stat' , '-e' , counter_name, '--timeout' , '10' ],
capture_output = True )
if proc_perf.returncode != 0:
print(f'SKIP: Could not read {counter_name} perf counter.' )
return False
if b'' in proc_perf.stderr:
print(f'SKIP: Could not read {counter_name} perf counter.' )
return False
return True
for counter in PERF_COUNTERS_CANDIDATES:
if has_perf_counter_access(counter.get_perf_event_name()):
present_perf_counters.append(counter)
if len(present_perf_counters) == 0:
print('SKIP: Could not read any perf counter.' )
return False
if len(present_perf_counters) != len(PERF_COUNTERS_CANDIDATES):
print(f'WARN: Could not access all of the counters - some will be left untested' )
return True
if not check_perf_access():
exit(0)
turbostat_counter_source_opts = ['' ]
turbostat = which('turbostat' )
if turbostat is None :
print('Could not find turbostat binary' )
exit(1)
timeout = which('timeout' )
if timeout is None :
print('Could not find timeout binary' )
exit(1)
proc_turbostat = subprocess.run([turbostat, '--list' ], capture_output = True )
if proc_turbostat.returncode != 0:
print(f'turbostat failed with {proc_turbostat.returncode}' )
exit(1)
EXPECTED_COLUMNS_DEBUG_DEFAULT = [b'usec' , b'Time_Of_Day_Seconds' , b'APIC' , b'X2APIC' ]
expected_columns = [b'CPU' ]
counters_argv = []
for counter in present_perf_counters:
if counter.subsys == 'cstate_core' :
counter_scope = 'core'
elif counter.subsys == 'cstate_pkg' :
counter_scope = 'package'
else :
counter_scope = 'cpu'
counter_type = 'delta'
column_name = counter.event
cparams = counter.get_turbostat_perf_id(
counter_scope = counter_scope,
counter_type = counter_type,
column_name = column_name
)
expected_columns.append(column_name.encode())
counters_argv.extend(['--add' , cparams])
expected_columns_debug = EXPECTED_COLUMNS_DEBUG_DEFAULT + expected_columns
def gen_user_friendly_cmdline(argv_):
argv = argv_[:]
ret = ''
while len(argv) != 0:
arg = argv.pop(0)
arg_next = ''
if arg in ('-i' , '--show' , '--add' ):
arg_next = argv.pop(0) if len(argv) > 0 else ''
ret += f'{arg} {arg_next} \\\n\t'
# Remove the last separator and return
return ret[:-4]
#
# Run turbostat for some time and send SIGINT
#
timeout_argv = [timeout, '--preserve-status' , '-s' , 'SIGINT' , '-k' , '3' , '0.2s' ]
turbostat_argv = [turbostat, '-i' , '0.50' , '--show' , 'CPU' ] + counters_argv
def check_columns_or_fail(expected_columns: list, actual_columns: list):
if len(actual_columns) != len(expected_columns):
print(f'turbostat column check failed\n{expected_columns=}\n{actual_columns=}' )
exit(1)
failed = False
for expected_column in expected_columns:
if expected_column not in actual_columns:
print(f'turbostat column check failed: missing column {expected_column.decode()}' )
failed = True
if failed:
exit(1)
cmdline = gen_user_friendly_cmdline(turbostat_argv)
print(f'Running turbostat with:\n\t{cmdline}\n... ' , end = '' , flush = True )
proc_turbostat = subprocess.run(timeout_argv + turbostat_argv, capture_output = True )
if proc_turbostat.returncode != 0:
print(f'turbostat failed with {proc_turbostat.returncode}' )
exit(1)
actual_columns = proc_turbostat.stdout.split(b'\n' )[0].split(b'\t' )
check_columns_or_fail(expected_columns, actual_columns)
print('OK' )
#
# Same, but with --debug
#
# We explicitly specify '--show CPU' to make sure turbostat
# don't show a bunch of default counters instead.
#
turbostat_argv.append('--debug' )
cmdline = gen_user_friendly_cmdline(turbostat_argv)
print(f'Running turbostat (in debug mode) with:\n\t{cmdline}\n... ' , end = '' , flush = True )
proc_turbostat = subprocess.run(timeout_argv + turbostat_argv, capture_output = True )
if proc_turbostat.returncode != 0:
print(f'turbostat failed with {proc_turbostat.returncode}' )
exit(1)
actual_columns = proc_turbostat.stdout.split(b'\n' )[0].split(b'\t' )
check_columns_or_fail(expected_columns_debug, actual_columns)
print('OK' )
Messung V0.5 C=95 H=94 G=94
¤ Dauer der Verarbeitung: 0.5 Sekunden
¤
*© Formatika GbR, Deutschland