products/Sources/formale Sprachen/Isabelle/Pure/Concurrent image not shown  

Quellcode-Bibliothek

© Kompilation durch diese Firma

[Weder Korrektheit noch Funktionsfähigkeit der Software werden zugesichert.]

Datei: consumer_thread.scala   Sprache: Scala

Original von: Isabelle©

/*  Title:      Pure/Concurrent/consumer_thread.scala
    Author:     Makarius

Consumer thread with unbounded queueing of requests, and optional
acknowledgment.
*/


package isabelle


import scala.annotation.tailrec


object Consumer_Thread
{
  def fork_bulk[A](name: String = "", daemon: Boolean = false)(
      bulk: A => Boolean,
      consume: List[A] => (List[Exn.Result[Unit]], Boolean),
      finish: () => Unit = () => ()): Consumer_Thread[A] =
    new Consumer_Thread[A](name, daemon, bulk, consume, finish)

  def fork[A](name: String = "", daemon: Boolean = false)(
      consume: A => Boolean,
      finish: () => Unit = () => ()): Consumer_Thread[A] =
  {
    def consume_single(args: List[A]): (List[Exn.Result[Unit]], Boolean) =
    {
      assert(args.length == 1)
      Exn.capture { consume(args.head) } match {
        case Exn.Res(continue) => (List(Exn.Res(())), continue)
        case Exn.Exn(exn) => (List(Exn.Exn(exn)), true)
      }
    }

    fork_bulk(name = name, daemon = daemon)(_ => false, consume_single, finish = finish)
  }
}

final class Consumer_Thread[A] private(
  name: String, daemon: Boolean,
  bulk: A => Boolean,
  consume: List[A] => (List[Exn.Result[Unit]], Boolean),
  finish: () => Unit)
{
  /* thread */

  private var active = true
  private val mailbox = Mailbox[Option[Request]]

  private val thread = Isabelle_Thread.fork(name = name, daemon = daemon) { main_loop(Nil) }
  def is_active: Boolean = active && thread.isAlive
  def check_thread: Boolean = Thread.currentThread == thread

  private def failure(exn: Throwable): Unit =
    Output.error_message(
      "Consumer thread failure: " + quote(thread.getName) + "\n" + Exn.message(exn))

  private def robust_finish(): Unit =
    try { finish() } catch { case exn: Throwable => failure(exn) }


  /* requests */

  private class Request(val arg: A, acknowledge: Boolean = false)
  {
    val ack: Option[Synchronized[Option[Exn.Result[Unit]]]] =
      if (acknowledge) Some(Synchronized(None)) else None

    def await
    {
      for (a <- ack) {
        Exn.release(a.guarded_access({ case None => None case res => Some((res.get, res)) }))
      }
    }
  }

  private def request(req: Request)
  {
    synchronized {
      if (is_active) mailbox.send(Some(req))
      else error("Consumer thread not active: " + quote(thread.getName))
    }
    req.await
  }

  @tailrec private def main_loop(msgs: List[Option[Request]]): Unit =
    msgs match {
      case Nil => main_loop(mailbox.receive())
      case None :: _ => robust_finish()
      case _ =>
        val reqs =
          proper_list(msgs.takeWhile(msg => msg.isDefined && bulk(msg.get.arg)))
            .getOrElse(msgs.take(1))
            .map(_.get)

        val (results, continue) = consume(reqs.map(_.arg))

        for { (Some(req), Some(res)) <- reqs.map(Some(_)).zipAll(results.map(Some(_)), None, None) }
        {
          (req.ack, res) match {
            case ((Some(a), _)) => a.change(_ => Some(res))
            case ((None, Exn.Res(_))) =>
            case ((None, Exn.Exn(exn))) => failure(exn)
          }
        }

        if (continue) main_loop(msgs.drop(reqs.length))
        else robust_finish()
    }


  /* main methods */

  assert(is_active)

  def send(arg: A) { request(new Request(arg)) }
  def send_wait(arg: A) { request(new Request(arg, acknowledge = true)) }

  def shutdown()
  {
    synchronized { if (is_active) { active = false; mailbox.send(None) } }
    thread.join
  }
}

¤ Dauer der Verarbeitung: 0.15 Sekunden  (vorverarbeitet)  ¤





Download des
Quellennavigators
Download des
sprechenden Kalenders

in der Quellcodebibliothek suchen




Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.


Bemerkung:

Die farbliche Syntaxdarstellung ist noch experimentell.


Bot Zugriff