def median_time: Time = Timing_Data.median_time(results.map(_.elapsed))
def best_result: Result = results.minBy(_.elapsed.ms)
}
}
class Timing_Data private( privateval facet: Timing_Data.Facet, val host_infos: Host_Infos, val sessions_structure: Sessions.Structure
) { privatedef inflection_point(last_mono: Int, next: Int): Int =
last_mono + ((next - last_mono) / 2)
def best_threads(job_name: String, max_threads: Int): Int = { val worse_threads =
facet.by_job.get(job_name).toList.flatMap(_.by_hostname).flatMap { case (hostname, facet) => val best_threads = facet.best_result.threads
facet.by_threads.keys.toList.sorted.find(_ > best_threads).map(
inflection_point(best_threads, _))
}
(max_threads :: worse_threads).min
}
privatedef hostname_factor(from: String, to: String): Double =
host_infos.host_factor(host_infos.the_host(from), host_infos.the_host(to))
privatedef approximate_threads(entries_unsorted: List[(Int, Time)], threads: Int): Time = { val entries = entries_unsorted.sortBy(_._1)
def sorted_prefix[A](xs: List[A], f: A => Long): List[A] =
xs match { case x1 :: x2 :: xs => if (f(x1) <= f(x2)) x1 :: sorted_prefix(x2 :: xs, f) else x1 :: Nil case xs => xs
}
def linear(p0: (Int, Time), p1: (Int, Time)): Time = { val a = (p1._2 - p0._2).scale(1.0 / (p1._1 - p0._1)) val b = p0._2 - a.scale(p0._1)
(a.scale(threads) + b) max Time.zero
}
val mono_prefix = sorted_prefix(entries, e => -e._2.ms)
val is_mono = entries == mono_prefix val in_prefix = mono_prefix.length > 1 && threads <= mono_prefix.last._1 val in_inflection =
!is_mono && mono_prefix.length > 1 && threads < entries.drop(mono_prefix.length).head._1 if (is_mono || in_prefix || in_inflection) {
// Model with Amdahl's law val t_p =
Timing_Data.median_time(for {
(n, t0) <- mono_prefix
(m, t1) <- mono_prefix if m != n
} yield (t0 - t1).scale(n.toDouble * m / (m - n))) val t_c =
Timing_Data.median_time(for ((n, t) <- mono_prefix) yield t - t_p.scale(1.0 / n))
def model(threads: Int): Time = (t_c + t_p.scale(1.0 / threads)) max Time.zero
if (is_mono || in_prefix) model(threads) else { val post_inflection = entries.drop(mono_prefix.length).head val inflection_threads = inflection_point(mono_prefix.last._1, post_inflection._1)
if (threads <= inflection_threads) model(threads) else linear((inflection_threads, model(inflection_threads)), post_inflection)
}
} else {
// Piecewise linear val (p0, p1) = if (entries.head._1 < threads && threads < entries.last._1) { val split = entries.partition(_._1 < threads)
(split._1.last, split._2.head)
} else { val piece = if (threads < entries.head._1) entries.take(2) else entries.takeRight(2)
(piece.head, piece.last)
}
def estimate_threads(job_name: String, hostname: String, threads: Int): Option[Time] = { def try_approximate(facet: Timing_Data.Facet): Option[Time] = { val entries =
facet.by_threads.toList match { case List((i, Timing_Data.Facet(List(result)))) if i != 1 =>
(i, facet.median_time) :: result.proper_cpu.map(1 -> _).toList case entries => entries.map((threads, facet) => threads -> facet.median_time)
} if (entries.size < 2) None else Some(approximate_threads(entries, threads))
}
for {
facet <- facet.by_job.get(job_name)
facet <- facet.by_hostname.get(hostname)
time <- facet.by_threads.get(threads).map(_.median_time).orElse(try_approximate(facet))
} yield time
}
/* approximation factors -- penalize estimations with less information */
val FACTOR_NO_THREADS_GLOBAL_CURVE = 2.5 val FACTOR_NO_THREADS_UNIFY_MACHINES = 1.7 val FACTOR_NO_THREADS_OTHER_MACHINE = 1.5 val FACTOR_NO_THREADS_SAME_MACHINE = 1.4 val FACTOR_THREADS_OTHER_MACHINE = 1.2
def estimate(job_name: String, hostname: String, threads: Int): Time = { def estimate: Time =
facet.by_job.get(job_name) match { case None =>
// no data for job, use timeout as esimation for single-threaded job on worst host val default_time = sessions_structure.get(job_name).map(_.timeout).getOrElse(Time.zero) if (default_time > Time.zero) { val default_host = host_infos.hosts.sorted(host_infos.host_speeds).head
default_time
.scale(global_threads_factor(1, threads))
.scale(hostname_factor(default_host.name, hostname))
} else {
// no timeout, take average of other jobs for given threads val job_estimates = facet.by_job.keys.flatMap(estimate_threads(_, hostname, threads)) if (job_estimates.nonEmpty) Timing_Data.mean_time(job_estimates) else {
// no other job to estimate from, use global curve to approximate any other job val (threads1, facet1) = facet.by_threads.head
facet1.median_time.scale(global_threads_factor(threads1, threads))
}
}
case Some(facet) =>
facet.by_threads.get(threads) match { case None => // interpolate threads
estimate_threads(job_name, hostname, threads).map(_.scale(
FACTOR_NO_THREADS_SAME_MACHINE)).getOrElse {
// per machine, try to approximate config for threads val approximated = for {
hostname1 <- facet.by_hostname.keys
estimate <- estimate_threads(job_name, hostname1, threads)
factor = hostname_factor(hostname1, hostname)
} yield estimate.scale(factor)
if (approximated.nonEmpty)
Timing_Data.mean_time(approximated).scale(FACTOR_NO_THREADS_OTHER_MACHINE) else {
// no single machine where config can be approximated, unify data points val unified_entries = unify_hosts(job_name, hostname)
if (unified_entries.length > 1)
approximate_threads(unified_entries, threads).scale(
FACTOR_NO_THREADS_UNIFY_MACHINES) else {
// only single data point, use global curve to approximate val (job_threads, job_time) = unified_entries.head
job_time.scale(global_threads_factor(job_threads, threads)).scale(
FACTOR_NO_THREADS_GLOBAL_CURVE)
}
}
}
case Some(facet) => // time for job/thread exists, interpolate machine if necessary
facet.by_hostname.get(hostname).map(_.median_time).getOrElse {
Timing_Data.mean_time(
facet.by_hostname.toList.map((hostname1, facet) =>
facet.median_time.scale(hostname_factor(hostname1, hostname)))).scale(
FACTOR_THREADS_OTHER_MACHINE)
}
}
}
cache.get(job_name, hostname, threads) match { case Some(time) => time case None => val time = estimate
cache = cache + ((job_name, hostname, threads) -> time)
time
}
}
}
/* host information */
object Host { def load(options: Options, build_host: Build_Cluster.Host, host_db: SQL.Database): Host = { val name = build_host.name val info =
isabelle.Host.read_info(host_db, name).getOrElse(error("No info for host " + quote(name))) val max_threads = (options ++ build_host.options).threads(default = info.num_cpus) val score = info.benchmark_score.getOrElse(error("No benchmark for " + quote(name)))
def allocate(node_info: Node_Info): Resources = { val host = host_infos.the_host(node_info) new Resources(host_infos, allocated_nodes + (host.name -> (node_info :: allocated(host))))
}
def try_allocate_tasks(
hosts: List[(Host, Int)],
tasks: List[(Build_Process.Task, Int, Int)],
): (List[Config], Resources) =
tasks match { case Nil => (Nil, this) case (task, min_threads, max_threads) :: tasks => val (config, resources) =
hosts.find((host, _) => available(host, min_threads)) match { case Some((host, host_max_threads)) => val free_threads = host.max_threads - ((host.max_jobs - 1) * host_max_threads) val node_info = next_node(host, (min_threads max free_threads) min max_threads)
(Some(Config(task.name, node_info)), allocate(node_info)) case None => (None, this)
} val (configs, resources1) = resources.try_allocate_tasks(hosts, tasks)
(configs ++ config, resources1)
}
def next_node(host: Host, threads: Int): Node_Info = { val numa_node_num_cpus = host.num_cpus / (host.numa_nodes.length max 1) def explicit_cpus(node_info: Node_Info): List[Int] = if (node_info.rel_cpus.nonEmpty) node_info.rel_cpus else (0 until numa_node_num_cpus).toList
val used_nodes = allocated(host).groupMapReduce(_.numa_node)(explicit_cpus)(_ ::: _)
val available_nodes = host.numa_nodes val numa_node = if (!host.numa) None else available_nodes.sortBy(n => used_nodes.getOrElse(Some(n), Nil).length).headOption
val used_cpus = used_nodes.getOrElse(numa_node, Nil).toSet val available_cpus = (0 until numa_node_num_cpus).filterNot(used_cpus.contains).toList
val rel_cpus = if (available_cpus.length >= threads) available_cpus.take(threads) else Nil
Node_Info(host.name, numa_node, rel_cpus)
}
def available(host: Host, threads: Int): Boolean = { val used = allocated(host)
if (used.length >= host.max_jobs) false else { if (host.numa_nodes.length <= 1) {
used.map(host_infos.num_threads).sum + threads <= host.max_threads
} else { def node_threads(n: Int): Int =
used.filter(_.numa_node.contains(n)).map(host_infos.num_threads).sum
def next(hostname: String, state: Build_Process.State): List[String] = { val now = Time.now()
val next_nodes = for {
task <- state.next_ready if graph.defined(task.name)
node = graph.get_node(task.name) if hostname == node.node_info.hostname
} yield node
val (ready, other) =
next_nodes.partition(node => graph.imm_preds(node.job_name).subsetOf(state.results.keySet))
val waiting = other.filter(_.start.time <= now) val running = state.running.values.toList.map(_.node_info).filter(_.hostname == hostname)
def try_run(ready: List[Schedule.Node], next: Schedule.Node): List[Schedule.Node] = { val existing = ready.map(_.node_info) ::: running val is_distinct = existing.forall(_.rel_cpus.intersect(next.node_info.rel_cpus).isEmpty) if (existing.forall(_.rel_cpus.nonEmpty) && is_distinct) next :: ready else ready
}
val graph0 =
state.running.keys.foldLeft(graph.restrict(state.pending.isDefinedAt))(shift_elapsed) val graph1 = graph0.topological_order.foldLeft(graph0)(shift_starts)
def step(timing_data: Timing_Data): State = { val remaining =
build_state.running.values.toList.map { job => val elapsed = current_time - job.start_date.time val threads = timing_data.host_infos.num_threads(job.node_info) val predicted = timing_data.estimate(job.name, job.node_info.hostname, threads) val remaining = if (elapsed > predicted) Time.zero else predicted - elapsed
job -> remaining
}
if (remaining.isEmpty) error("Schedule step without running sessions") else { val (job, elapsed) = remaining.minBy(_._2.ms) val now = current_time + elapsed val node = Schedule.Node(job.name, job.node_info, job.start_date, now - job.start_date.time)
val host_preds = for {
name <- finished.graph.keys
pred_node = finished.graph.get_node(name) if pred_node.node_info.hostname == job.node_info.hostname if pred_node.end.time <= node.start.time
} yield name val build_preds =
build_state.sessions.graph.imm_preds(job.name).filter(finished.graph.defined) val preds = build_preds ++ host_preds
val graph = preds.foldLeft(finished.graph.new_node(job.name, node))(_.add_edge(_, job.name))
val (next, elapsed) = running.minBy(_._2.ms) val (remaining, finished) =
running.toList.map(pass_time(elapsed)).partition(_._2 > Time.zero)
val running1 =
remaining.map(pass_time(elapsed)).toMap ++
finished.map(_._1).flatMap(get_next).map(start) val (res, running2) = parallel_paths(running1)
(res max running.size, running2)
}
parallel_paths(running.toMap)._1
}
def select_next(state: Build_Process.State): List[Config] = { val resources = host_infos.available(state)
def best_threads(task: Build_Process.Task): Int = this.best_threads(task.name)
val rev_ordered_hosts = ordered_hosts.reverse.map(_ -> max_threads)
val available_nodes =
host_infos.available(state.copy(running = Map.empty))
.unused_nodes(max_threads)
.sortBy(node => host_infos.the_host(node))(host_infos.host_speeds).reverse
def remaining_time(node: Node): (Node, Time) =
state.running.get(node) match { case None => node -> best_times(node) case Some(job) => val estimate =
timing_data.estimate(job.name, job.node_info.hostname,
host_infos.num_threads(job.node_info))
node -> ((Time.now() - job.start_date.time + estimate) max Time.zero)
}
val next_sorted = state.next_ready.sortBy(max_time(_).ms).reverse val is_parallelizable =
available_nodes.length >= parallel_paths(
state.ready.map(_.name).map(remaining_time),
max = available_nodes.length + 1)
if (is_parallelizable) { val all_tasks = next_sorted.map(task => (task, best_threads(task), best_threads(task)))
resources.try_allocate_tasks(rev_ordered_hosts, all_tasks)._1
} else { def is_critical(time: Time): Boolean = this.is_critical match { case Absolute_Time(threshold) => time > threshold case Relative_Time(factor) => time > minimals.map(max_time).maxBy(_.ms).scale(factor)
}
val critical_minimals = state.ready.filter(task => is_critical(max_time(task))).map(_.name) val critical_nodes =
path_max_times(critical_minimals).filter((_, time) => is_critical(time)).keySet
val (critical, other) = next_sorted.partition(task => critical_nodes.contains(task.name))
val critical_tasks = critical.map(task => (task, best_threads(task), best_threads(task)))
def parallel_threads(task: Build_Process.Task): Int = this.parallel_threads match { case Fixed_Thread(threads) => threads case Time_Based_Threads(f) => f(best_times(task.name))
}
val other_tasks = other.map(task => (task, parallel_threads(task), best_threads(task)))
val max_critical_parallel =
parallel_paths(critical_minimals.map(remaining_time), critical_nodes) val max_critical_hosts =
available_nodes.take(max_critical_parallel).map(_.hostname).distinct.length
val split = this.host_criterion match { case Critical_Nodes => max_critical_hosts case Fixed_Fraction(fraction) =>
((rev_ordered_hosts.length * fraction).ceil.toInt max 1) min max_critical_hosts case Host_Speed(min_factor) => val best = rev_ordered_hosts.head._1.benchmark_score val num_fast =
rev_ordered_hosts.count(_._1.benchmark_score >= best * min_factor)
num_fast min max_critical_hosts
}
val (critical_hosts, other_hosts) = rev_ordered_hosts.splitAt(split)
val (configs1, resources1) = resources.try_allocate_tasks(critical_hosts, critical_tasks) val (configs2, _) = resources1.try_allocate_tasks(other_hosts, other_tasks)
configs1 ::: configs2
}
}
}
/* master and slave processes for scheduled build */
class Scheduled_Build_Process(
build_context: Build.Context,
build_progress: Progress,
server: SSH.Server,
) extends Build_Process(build_context, build_progress, server) { /* global state: internal var vs. external database */
def write_build_log(results: Build.Results, state: Build_Process.State.Results): Unit = { val sessions = for {
(session_name, result) <- state.toList if !result.current
} yield { val info = build_context.sessions_structure(session_name) val entry = if (!results.cancelled(session_name)) { val status = if (result.ok) Build_Log.Session_Status.finished else Build_Log.Session_Status.failed
val settings =
Build_Log.Settings.all_settings.map(_.name).map(name =>
name -> Isabelle_System.getenv(name)) val props =
List(
Build_Log.Prop.build_id.name -> build_context.build_uuid,
Build_Log.Prop.isabelle_version.name -> Isabelle_System.isabelle_id(),
Build_Log.Prop.build_engine.name -> build_context.engine.name,
Build_Log.Prop.build_host.name -> hostname,
Build_Log.Prop.build_start.name -> Build_Log.print_date(build_start))
val meta_info = Build_Log.Meta_Info(props, settings) val build_info = Build_Log.Build_Info(sessions.toMap) val log_name = Build_Log.log_filename(engine = build_context.engine.name, date = build_start)
def is_current(state: Build_Process.State, session_name: String): Boolean =
state.ancestor_results(session_name) match { case Some(ancestor_results) if ancestor_results.forall(_.current) =>
store.check_output(
_database_server, session_name,
sources_shasum = state.sessions(session_name).sources_shasum,
input_shasum = store.make_shasum(ancestor_results.map(_.output_shasum)),
build_thorough = build_context.sessions_structure(session_name).build_thorough,
fresh_build = build_context.fresh_build,
store_heap = build_context.store_heap || state.sessions.store_heap(session_name))._1 case _ => false
}
overridedef next_jobs(state: Build_Process.State): List[String] = if (progress.stopped) state.next_ready.map(_.name) elseif (!_schedule.is_outdated(build_options, state)) _schedule.next(hostname, state) else { val current = state.next_ready.filter(task => is_current(state, task.name)) if (current.nonEmpty) current.map(_.name) else { val start = Date.now()
def completed_since(name: String): Time = { val result = state.results(name)
start - (result.start_date + result.process_result.timing.elapsed)
}
val active_hosts0 = (for ((_, job) <- state.running) yield job.node_info.hostname).toSet val inactive_hosts =
(for {
host <- _host_infos.hosts if !active_hosts0.contains(host.name)
ancestors = _schedule.next(host.name, state).flatMap(_schedule.graph.imm_preds) if ancestors.nonEmpty && ancestors.forall(ancestor =>
completed_since(ancestor) > build_options.seconds("build_schedule_inactive_delay"))
} yield host).toSet
val host_infos = Host_Infos(_host_infos.hosts.filterNot(inactive_hosts.contains)) if (host_infos != _host_infos) {
_host_infos = host_infos
_scheduler = init_scheduler(Timing_Data.restrict(timing_data, host_infos))
}
val new_schedule = _scheduler.schedule(state).update(state) val schedule = if (_schedule.is_empty) new_schedule else List(_schedule.update(state), new_schedule).minBy(_.end)(Date.Ordering)
object Schedules { val build_uuid = Generic.build_uuid.make_primary_key val generator = SQL.Column.string("generator") val start = SQL.Column.date("start") val serial = SQL.Column.long("serial")
val table = make_table(List(build_uuid, generator, start, serial), name = "schedules")
}
object Nodes { val build_uuid = Generic.build_uuid.make_primary_key val name = Generic.name.make_primary_key val succs = SQL.Column.string("succs") val hostname = SQL.Column.string("hostname") val numa_node = SQL.Column.int("numa_node") val rel_cpus = SQL.Column.string("rel_cpus") val start = SQL.Column.date("start") val duration = SQL.Column.long("duration")
val table =
make_table(
List(build_uuid, name, succs, hostname, numa_node, rel_cpus, start, duration),
name = "schedule_nodes")
}
type Nodes = List[((String, Schedule.Node), List[String])]
def read_nodes(db: SQL.Database, build_uuid: String = ""): Nodes = {
db.execute_query_statement(
Nodes.table.select(sql =
SQL.where(if_proper(build_uuid, Nodes.build_uuid.equal(build_uuid)))),
List.from[((String, Schedule.Node), List[String])],
{ res => val name = res.string(Nodes.name) val succs = split_lines(res.string(Nodes.succs)) val hostname = res.string(Nodes.hostname) val numa_node = res.get_int(Nodes.numa_node) val rel_cpus = res.string(Nodes.rel_cpus) val start = res.date(Nodes.start) val duration = Time.ms(res.long(Nodes.duration))
val is_criticals =
List(
Path_Time_Heuristic.Absolute_Time(Time.minutes(5)),
Path_Time_Heuristic.Absolute_Time(Time.minutes(10)),
Path_Time_Heuristic.Absolute_Time(Time.minutes(20)),
Path_Time_Heuristic.Relative_Time(0.5)) val parallel_threads =
List(
Path_Time_Heuristic.Fixed_Thread(1),
Path_Time_Heuristic.Time_Based_Threads({ case time if time < Time.minutes(1) => 1 case time if time < Time.minutes(5) => 4 case _ => 8
})) val machine_splits =
List(
Path_Time_Heuristic.Critical_Nodes,
Path_Time_Heuristic.Fixed_Fraction(0.3),
Path_Time_Heuristic.Host_Speed(0.9))
val path_time_heuristics = for {
is_critical <- is_criticals
parallel <- parallel_threads
machine_split <- machine_splits
} yield
Path_Time_Heuristic(is_critical, parallel, machine_split, timing_data, sessions_structure) val default_heuristic = Default_Heuristic(timing_data.host_infos) val heuristics = default_heuristic :: path_time_heuristics
val initial_schedule_file = context.build_options.string("build_schedule_initial") val initial =
proper_string(initial_schedule_file).toList.map(initial_schedule_file =>
Schedule.read(Path.explode(initial_schedule_file)).copy(build_uuid = context.build_uuid))
val hosts_current =
cluster_hosts.forall(host => isabelle.Host.read_info(host_database, host.name).isDefined) if (!hosts_current) {
using(Build_Cluster.make(build_context, progress = progress).open())(_.init().benchmark())
}
val host_infos = Host_Infos.load(build_options, cluster_hosts, host_database) val timing_data = Timing_Data.load(build_options, host_infos, log_database, full_sessions)
val sessions = Build_Process.Sessions.empty.init(build_context, database_server, progress)
val build_state =
Build_Process.State(sessions = sessions,
pending = Map.from(sessions.iterator.map(Build_Process.Task.entry(_, build_context))))
val scheduler = Build_Engine.scheduler(timing_data, build_context) def schedule_msg(res: Exn.Result[Schedule]): String =
res match { case Exn.Res(schedule) => schedule.message case _ => "" }
val line_height = Font_Metric.default.height val char_width = Font_Metric.default.average_width val padding = Font_Metric.default.space_width val gap = char_width * 3
def node_rect(node: Schedule.Node): Rectangle2D.Double = { val x = node_start(node) val y = hostname_height + padding + date_height(node.start) val width = node_width(node) val height = time_height(node.duration) new Rectangle2D.Double(x, y, width, height)
}
def draw_node(node: Schedule.Node): Rectangle2D.Double = { val rect = node_rect(node)
gfx.setColor(Color.BLACK)
gfx.draw(rect)
gfx.setColor(Color.WHITE)
gfx.fill(rect)
def add_text(y: Double, text: String): Double = if (line_height > rect.height - y || text_width(text) + 2 * padding > rect.width) y else { val padding1 = padding min ((rect.height - (y + line_height)) / 2)
draw_string(text, rect.x + padding, rect.y + y + padding1)
y + padding1 + line_height
}
val node_info = node.node_info
val duration_str = "(" + node.duration.message_hms + ")" val node_str =
--> --------------------
--> maximum size reached
--> --------------------
¤ Dauer der Verarbeitung: 0.42 Sekunden
(vorverarbeitet)
¤
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung ist noch experimentell.