Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  build_cluster.scala   Sprache: Scala

 
/*  Title:      Pure/Build/build_cluster.scala
    Author:     Makarius

Management of build cluster: independent ssh hosts with access to shared
PostgreSQL server.

NB: extensible classes allow quite different implementations in user-space,
via the service class Build.Engine and overridable methods
Build.Engine.open_build_process(), Build_Process.open_build_cluster().
*/


package isabelle


object Build_Cluster {
  /* host specifications */

  object Host {
    private val rfc822_specials = """()<>@,;:\"[]"""

    private val HOSTNAME = "hostname"
    private val USER = "user"
    private val PORT = "port"
    private val JOBS = "jobs"
    private val NUMA = "numa"
    private val DIRS = "dirs"
    private val HOME = "home"
    private val SHARED = "shared"
    private val SETTINGS = "settings"

    val parameters: Options =
      Options.inline("""
        option hostname : string = "" -- "explicit SSH hostname"
        option user : string = ""     -- "explicit SSH user"
        option port : int = 0         -- "explicit SSH port"
        option jobs : int = 1         -- "maximum number of parallel jobs"
        option numa : bool = false    -- "cyclic shuffling of NUMA CPU nodes"
        option dirs : string = ""     -- "additional session directories (separated by colon)"
        option home : string = ""     -- "alternative user home (via $USER_HOME)"
        option shared : bool = false  -- "shared home directory: omit sync + init"
        option settings : string = "" -- "specific host settings"
      """)

    def is_parameter(spec: Options.Spec): Boolean = parameters.defined(spec.name)

    lazy val test_options: Options = Options.init0()

    def apply(
      name: String = "",
      hostname: String = parameters.string(HOSTNAME),
      user: String = parameters.string(USER),
      port: Int = parameters.int(PORT),
      jobs: Int = parameters.int(JOBS),
      numa: Boolean = parameters.bool(NUMA),
      dirs: String = parameters.string(DIRS),
      home: String = parameters.string(HOME),
      shared: Boolean = parameters.bool(SHARED),
      settings: List[String] = split_lines(parameters.string(SETTINGS)),
      options: List[Options.Spec] = Nil
    ): Host = {
      new Host(name, hostname, user, port, jobs, numa, dirs, home, shared, settings, options)
    }

    def parse(registry: Registry, str: String): List[Host] = {
      def err(msg: String): Nothing =
        cat_error(msg, "The error(s) above occurred in host specification " + quote(str))

      val names = str.takeWhile(c => !rfc822_specials.contains(c) || c == ',')
      val more_specs =
        try {
          val n = str.length
          val m = names.length
          val l =
            if (m == n) n
            else if (str(m) == ':') m + 1
            else error("Missing \":\" after host name")
          Options.Spec.parse(str.substring(l))
        }
        catch { case ERROR(msg) => err(msg) }

      def get_registry(a: String): Registry.Cluster.Value =
        Registry.Cluster.try_unqualify(a) match {
          case Some(b) => Registry.Cluster.get(registry, b)
          case None => List(a -> Registry.Host.get(registry, a))
        }

      for (name <- space_explode(',', names); (host, host_specs) <- get_registry(name))
      yield {
        val (params, options) =
          try {
            val (specs1, specs2) = (host_specs ::: more_specs).partition(is_parameter)
            (parameters ++ specs1, { test_options ++ specs2; specs2 })
          }
          catch { case ERROR(msg) => err(msg) }

        apply(name = host,
          hostname = params.string(HOSTNAME),
          user = params.string(USER),
          port = params.int(PORT),
          jobs = params.int(JOBS),
          numa = params.bool(NUMA),
          dirs = params.string(DIRS),
          home = params.string(HOME),
          shared = params.bool(SHARED),
          settings = split_lines(params.string(SETTINGS)),
          options = options)
      }
    }

    def parse_single(registry: Registry, str: String): Host =
      Library.the_single(parse(registry, str), message = "Single host expected: " + quote(str))
  }

  class Host(
    val name: String,
    val hostname: String,
    val user: String,
    val port: Int,
    val jobs: Int,
    val numa: Boolean,
    val dirs: String,
    val home: String,
    val shared: Boolean,
    val settings: List[String],
    val options: List[Options.Spec]
  ) {
    host =>

    Path.split(host.dirs)  // check

    def ssh_host: String = proper_string(host.hostname) getOrElse host.name
    def is_local: Boolean = SSH.is_local(ssh_host)
    def is_remote: Boolean = !is_local

    override def toString: String = print

    def print: String = {
      val params =
        List(
          if (host.hostname.isEmpty) "" else Options.Spec.print(Host.HOSTNAME, host.hostname),
          if (host.user.isEmpty) "" else Options.Spec.print(Host.USER, host.user),
          if (host.port == 0) "" else Options.Spec.print(Host.PORT, host.port.toString),
          if (host.jobs == 1) "" else Options.Spec.print(Host.JOBS, host.jobs.toString),
          if_proper(host.numa, Host.NUMA),
          if_proper(host.dirs, Options.Spec.print(Host.DIRS, host.dirs)),
          if_proper(host.home, Options.Spec.print(Host.HOME, host.home)),
          if_proper(host.shared, Host.SHARED),
          if_proper(host.settings, Options.Spec.print(Host.SETTINGS, cat_lines(host.settings)))
        ).filter(_.nonEmpty)
      val rest = (params ::: host.options.map(_.print)).mkString(",")

      SSH.print_local(host.name) + if_proper(rest, ":" + rest)
    }

    def message(msg: String): String = "Host " + quote(host.name) + if_proper(msg, ": " + msg)

    def open_ssh(options: Options): SSH.System =
      SSH.open_system(options ++ host.options, ssh_host, port = host.port,
        user = host.user, user_home = host.home)

    def open_session(build_context: Build.Context, progress: Progress = new Progress): Session = {
      val ssh_options = build_context.build_options ++ host.options
      val ssh = open_ssh(build_context.build_options)
      new Session(build_context, host, ssh_options, ssh, progress)
    }
  }


  /* SSH sessions */

  class Session(
    val build_context: Build.Context,
    val host: Host,
    val options: Options,
    val ssh: SSH.System,
    val progress: Progress
  ) extends AutoCloseable {
    override def toString: String = ssh.toString

    val build_cluster_identifier: String = options.string("build_cluster_identifier")
    val build_cluster_isabelle_home: Path =
      Path.explode(options.string("build_cluster_root")) + Path.explode("isabelle")

    lazy val build_cluster_isabelle: Other_Isabelle =
      Other_Isabelle(build_cluster_isabelle_home,
        isabelle_identifier = build_cluster_identifier, ssh = ssh)

    def sync(): Other_Isabelle = {
      val context = Rsync.Context(ssh = ssh)
      val target = build_cluster_isabelle_home
      if (Mercurial.Hg_Sync.ok(Path.ISABELLE_HOME)) {
        val source = File.standard_path(Path.ISABELLE_HOME)
        Rsync.exec(context, clean = true,
          args = List("--", Url.direct_path(source), context.target(target))).check
      }
      else {
        Sync.sync(options, context, target,
          purge_heaps = true,
          preserve_jars = true,
          dirs = Sync.afp_dirs(build_context.afp_root))
      }
      build_cluster_isabelle
    }

    def init(): Unit =
      build_cluster_isabelle.init(other_settings =
        build_cluster_isabelle.init_components() ::: build_cluster_isabelle.debug_settings() :::
          host.settings)

    def benchmark(): Unit = {
      val script =
        Build_Benchmark.benchmark_command(options, host, ssh = ssh,
          isabelle_home = build_cluster_isabelle_home)
      build_cluster_isabelle.bash(script).check
    }

    def start(): Process_Result = {
      val build_cluster_ml_platform = build_cluster_isabelle.ml_settings.ml_platform
      if (build_cluster_ml_platform != build_context.ml_platform) {
        error("Bad ML_PLATFORM: found " + quote(build_cluster_ml_platform) +
          ", but expected " + quote(build_context.ml_platform))
      }
      val build_options =
        for { option <- options.iterator if option.for_build_sync } yield options.spec(option.name)
      val script =
        Build.build_worker_command(host,
          ssh = ssh,
          build_options = build_options.toList,
          build_id = build_context.build_uuid,
          isabelle_home = build_cluster_isabelle_home,
          dirs = Path.split(host.dirs).map(build_cluster_isabelle.expand_path),
          quiet = true)
      build_cluster_isabelle.bash(script).check
    }

    override def close(): Unit = ssh.close()
  }

  class Result(val host: Host, val process_result: Exn.Result[Process_Result]) {
    def rc: Int = Process_Result.RC(process_result)
    def ok: Boolean = rc == Process_Result.RC.ok
  }


  /* build clusters */

  object none extends Build_Cluster {
    override def toString: String = "Build_Cluster.none"
  }

  def make(build_context: Build.Context, progress: Progress = new Progress): Build_Cluster = {
    val remote_hosts = build_context.build_hosts.filter(_.is_remote)
    if (remote_hosts.isEmpty) none
    else new Remote_Build_Cluster(build_context, remote_hosts, progress = progress)
  }
}

// NB: extensible via Build.Engine.build_process() and Build_Process.init_cluster()
trait Build_Cluster extends AutoCloseable {
  def rc: Int = Process_Result.RC.ok
  def ok: Boolean = rc == Process_Result.RC.ok
  def return_code(rc: Int): Unit = ()
  def return_code(res: Process_Result): Unit = return_code(res.rc)
  def return_code(exn: Throwable): Unit = return_code(Process_Result.RC(exn))
  def open(): Build_Cluster = this
  def init(): Build_Cluster = this
  def benchmark(): Build_Cluster = this
  def start(): Build_Cluster = this
  def active(): Boolean = false
  def join: List[Build_Cluster.Result] = Nil
  def stop(): Unit = { join; close() }
  override def close(): Unit = ()
}

class Remote_Build_Cluster(
  val build_context: Build.Context,
  val remote_hosts: List[Build_Cluster.Host],
  val progress: Progress = new Progress
extends Build_Cluster {
  require(remote_hosts.nonEmpty && remote_hosts.forall(_.is_remote), "remote hosts required")

  override def toString: String =
    remote_hosts.iterator.map(_.name).mkString("Build_Cluster("", "")")


  /* cumulative return code */

  private val _rc = Synchronized(Process_Result.RC.ok)

  override def rc: Int = _rc.value

  override def return_code(rc: Int): Unit =
    _rc.change(rc0 => Process_Result.RC.merge(rc0, rc))

  def capture[A](host: Build_Cluster.Host, op: String)(
    e: => A,
    msg: String = host.message(op + " ..."),
    err: Throwable => String = { exn =>
      return_code(exn)
      host.message("failed to " + op + "\n" + Exn.print(exn))
    }
  ): Exn.Result[A] = {
    progress.capture(e, msg = msg, err = err)
  }


  /* open remote sessions */

  private var _sessions = List.empty[Build_Cluster.Session]

  override def open(): Build_Cluster = synchronized {
    require(_sessions.isEmpty, "build cluster already open")

    val attempts =
      Par_List.map((host: Build_Cluster.Host) =>
        capture(host, "open") { host.open_session(build_context, progress = progress) },
        remote_hosts, thread = true)

    if (attempts.forall(Exn.is_res)) {
      _sessions = attempts.map(Exn.the_res)
      _sessions
    }
    else {
      for (case Exn.Res(session) <- attempts) session.close()
      error("Failed to connect build cluster")
    }

    this
  }


  /* init and benchmark remote Isabelle distributions */

  private var _init = List.empty[Future[Unit]]

  override def init(): Build_Cluster = synchronized {
    require(_sessions.nonEmpty, "build cluster not yet open")

    if (_init.isEmpty) {
      _init =
        for (session <- _sessions if !session.host.shared) yield {
          Future.thread(session.host.message("session")) {
            capture(session.host, "sync") { session.sync() }
            capture(session.host, "init") { session.init() }
          }
        }
    }

    this
  }

  override def benchmark(): Build_Cluster = synchronized {
    _init.foreach(_.join)
    _init =
      for (session <- _sessions if !session.host.shared) yield {
        Future.thread(session.host.message("session")) {
          capture(session.host, "benchmark") { session.benchmark() }
        }
      }
    _init.foreach(_.join)

    this
  }


  /* workers */

  private var _workers = List.empty[Future[Process_Result]]

  override def active(): Boolean = synchronized { _workers.nonEmpty }

  override def start(): Build_Cluster = synchronized {
    require(_sessions.nonEmpty, "build cluster not yet open")
    require(_workers.isEmpty, "build cluster already active")

    init()
    _init.foreach(_.join)

    _workers =
      for (session <- _sessions) yield {
        Future.thread(session.host.message("work")) {
          Exn.release(capture(session.host, "work") { session.start() })
        }
      }

    this
  }

  override def join: List[Build_Cluster.Result] = synchronized {
    _init.foreach(_.join)
    if (_workers.isEmpty) Nil
    else {
      assert(_sessions.length == _workers.length)
      for ((session, worker) <- _sessions zip _workers)
        yield Build_Cluster.Result(session.host, worker.join_result)
    }
  }


  /* close */

  override def close(): Unit = synchronized {
    _init.foreach(_.join)
    _workers.foreach(_.join_result)
    _sessions.foreach(_.close())

    _init = Nil
    _workers = Nil
    _sessions = Nil
  }
}

97%


¤ Dauer der Verarbeitung: 0.31 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung ist noch experimentell.






                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge