Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  stdtasks.g   Sprache: unbekannt

 
#############################################################################
##
##  This file is part of GAP, a system for computational discrete algebra.
##
##  Copyright of GAP belongs to its developers, whose names are too numerous
##  to list here. Please refer to the COPYRIGHT file for details.
##
##  SPDX-License-Identifier: GPL-2.0-or-later
##

# To avoid deadlock, object locks are acquired in the following
# priority order:
#
# Triggers > Tasks/Milestones > TASK_QUEUE

BindGlobal("TASK_QUEUE", ShareSpecialObj( rec (
  ready_tasks := NewQueue(),
  workers := NewQueue(),
  active_count := 0,
  max_active := GAPInfo.KernelInfo.NUM_CPUS,
) ) );

CURRENT_TASK := fail;
MakeThreadLocal("CURRENT_TASK");

TASKS := AtomicRecord( rec (

  WorkerThread := function(context)
    local task;
    BreakOnError := false;
    SilentNonInteractiveErrors := true;
    WaitSemaphore(context.semaphore);
    while true do
      WaitSemaphore(context.semaphore);
      atomic context.task_container do
        task := context.task_container.task;
        context.task_container.task := fail;
      od;
      if not IsIdenticalObj(task, fail) then
        CURRENT_TASK := task;
        TASKS.ExecuteTask(task, context);
        CURRENT_TASK := fail;
        atomic TASK_QUEUE do
          PushQueue(TASK_QUEUE.workers, context);
          TASK_QUEUE.active_count := TASK_QUEUE.active_count - 1;
        od;
        TASKS.WakeWorker();
      fi;
    od;
  end,

  WakeWorker := function()
    local task;
    atomic TASK_QUEUE do
      while TASK_QUEUE.active_count < TASK_QUEUE.max_active and
            not EmptyQueue(TASK_QUEUE.ready_tasks) do
        task := PopQueue(TASK_QUEUE.ready_tasks);
        atomic task do
          if IsIdenticalObj(task.worker, fail) then
            if not EmptyQueue(TASK_QUEUE.workers) then
              task.worker := PopQueue(TASK_QUEUE.workers);
            else
              task.worker := TASKS.NewWorker();
            fi;
            atomic task.worker.task_container do
              task.worker.task_container.task := task;
            od;
          fi;
          if not IsIdenticalObj(task.body, fail) then
            TASK_QUEUE.active_count := TASK_QUEUE.active_count + 1;
          fi;
          SignalSemaphore(task.worker.semaphore);
        od;
      od;
    od;
  end,

  ExecuteTask := function(task, context)
    local error, result, async, notify, trigger, i, body, args;
    atomic task do
      task.started := true;
      async := task.async;
      atomic task.region do
        for i in [1..Length(task.adopt)] do
          if task.adopt[i] then
            AdoptObj(task.args[i]);
          fi;
        od;
      od;
      body := task.body;
      args := ShallowCopy(task.args);
    od;
    result := CALL_WITH_CATCH(body, args);
    error := not result[1];
    if not error and IsBound(result[2]) then
      result := result[2];
    else
      result := fail;
    fi;
    atomic task do
      task.complete := true;
      if IsIdenticalObj(result, fail) and
          not IsIdenticalObj(task.result, fail) then
        result := task.result;
      fi;
      if not async then
        if IsThreadLocal(result) then
          atomic task.region do
            task.result := MigrateObj(result, task.region);
          od;
          task.adopt_result := true;
        else
          task.result := result;
          task.adopt_result := false;
        fi;
      fi;
      if error then
        task.error := LastErrorMessage;
      fi;
      notify := AdoptSingleObj(task.notify);
      task.notify := MigrateSingleObj([], task);
    od;
    for trigger in notify do
      TASKS.FireTrigger(trigger);
    od;
  end,

  NewWorker := function()
    local context, thread;
    context := MakeWriteOnceAtomic(rec(
      semaphore := CreateSemaphore(),
      task_container := ShareSingleSpecialObj(rec(task := fail)),
    ));
    thread := CreateThread(TASKS.WorkerThread, context);
    context.thread := thread;
    SignalSemaphore(context.semaphore);
    return context;
  end,

  NewTask := function(func, args)
    local task, arg;
    task := rec(
      args := [],
      adopt := [],
      result := fail,
      error := fail,
      adopt_result := fail,
      region := NewSpecialRegion(),
      complete := false,
      started := false,
      async := false,
      cancel := false,
      cancelled := false,
      body := func,
      worker := fail,
      conditions := [],
      notify := [],
    );
    for arg in args do
      if IsThreadLocal(arg) then
        Add(task.args, LockAndMigrateObj(CopyRegion(arg), task.region));
        Add(task.adopt, true);
      else
        Add(task.args, arg);
        Add(task.adopt, false);
      fi;
    od;
    return task;
  end,

  MakePseudoTask := function()
    local task;
    task := TASKS.NewTask(fail, []);
    task.worker := TASKS.PseudoWorker();
    CURRENT_TASK := task;
  end,


  CurrentTask := function()
    if IsIdenticalObj(CURRENT_TASK, fail) then
      TASKS.MakePseudoTask();
    fi;
    return CURRENT_TASK;
  end,

  PseudoWorker := function()
    local context;
    context := MakeWriteOnceAtomic(rec(
      region := NewSpecialRegion(),
      semaphore := CreateSemaphore(),
      thread := fail,
    ));
    return context;
  end,

  NewMilestone := function(contributions)
    local c;
    c := Set(contributions);
    return rec(
      achieved := Set([]),
      targets := Immutable(c),
      complete := Length(c) = 0,
      notify := [],
    );
  end,

  # Triggers are built from sets of conditions.
  # Conditions are shared records with the requirement
  # that they have a boolean 'complete' element, which
  # is true if the condition is met and a 'notify' list
  # of triggers, which gets invoked when the condition
  # has been met.
  #
  # Triggers can be either a conjunction or a disjunction
  # of conditions right now. Later on, it may be useful
  # to have more complex ways of composing them.

  BuildTrigger := function(task, is_conjunction)
    return rec(
      task := task,
      conditions := task.conditions,
      done := false,
      is_conjunction := is_conjunction,
    );
  end,

  FireTrigger := function(trigger)
    local cond, done, task, tasks;
    done := false;
    atomic readwrite trigger do
      if trigger.done then
        return;
      fi;
      if trigger.is_conjunction then
        done := true;
        for cond in trigger.conditions do
          atomic readonly cond do
            if not cond.complete then
              done := false;
              break;
            fi;
          od;
        od;
      else
        done := false;
        for cond in trigger.conditions do
          atomic readonly cond do
            if cond.complete then
              done := true;
              break;
            fi;
          od;
        od;
      fi;
      trigger.done := done;
      if not done then
        return;
      fi;
      task := trigger.task;
    od;
    atomic task, TASK_QUEUE do
      if IsIdenticalObj(task.worker, fail) then
        PushQueue(TASK_QUEUE.ready_tasks, task);
        TASKS.WakeWorker();
      else
        tasks := [ task ];
        MigrateSingleObj(tasks, TASK_QUEUE);
        PushQueueFront(TASK_QUEUE.ready_tasks, task);
        TASKS.WakeWorker();
      fi;
    od;
  end,

  ActivateTrigger := function(trigger)
    local cond;
    atomic trigger do
      for cond in trigger.conditions do
        atomic cond do
          if not cond.complete then
            Add(cond.notify, trigger);
          fi;
        od;
      od;
    od;
    # Recheck conditions, because they may have fired
    # in the meantime.
    TASKS.FireTrigger(trigger);
  end,

  QueueTask := function(task)
    local trigger;
    atomic task, TASK_QUEUE do
      if Length(task.conditions) = 0 then
        PushQueue(TASK_QUEUE.ready_tasks, task);
        TASKS.WakeWorker();
        return;
      else
        trigger := TASKS.BuildTrigger(task, true);
        if trigger.done then
          PushQueue(TASK_QUEUE.ready_tasks, task);
          TASKS.WakeWorker();
          return;
        fi;
      fi;
    od;
    # We get here if we have a triggered task with
    # unmet conditions.
    ShareSpecialObj(trigger);
    TASKS.ActivateTrigger(trigger);
  end,
) );

MakeReadOnlyGVar("TASKS");

BindGlobal("NewMilestone", function(arg)
  local milestone;
  if Length(arg) = 0 then
    milestone := TASKS.NewMilestone([0]);
  elif Length(arg) = 1 and not IS_STRING(arg[1]) and IS_LIST(arg[1]) then
    milestone := TASKS.NewMilestone(arg[1]);
  else
    milestone := TASKS.NewMilestone(arg);
  fi;
  return ShareSpecialObj(milestone);
end);

BindGlobal("ContributeToMilestone", function(milestone, contribution)
  local trigger, notify;
  atomic milestone do
    if not contribution in milestone.targets then
      Error("ContributeToMilestone: Milestone does not have such a contribution");
    fi;
    if milestone.complete then
      return;
    fi;
    AddSet(milestone.achieved, Immutable(contribution));
    if Length(milestone.achieved) < Length(milestone.targets) then
      return;
    fi;
    milestone.complete := true;
    notify := AdoptSingleObj(milestone.notify);
    milestone.notify := MigrateSingleObj([], milestone);
  od;
  for trigger in notify do
    TASKS.FireTrigger(trigger);
  od;
end);

BindGlobal("AchieveMilestone", function(milestone)
  local trigger, notify;
  atomic milestone do
    milestone.achieved := CopyRegion(milestone.targets);
    milestone.complete := true;
    notify := AdoptSingleObj(milestone.notify);
    milestone.notify := MigrateSingleObj([], milestone);
  od;
  for trigger in notify do
    TASKS.FireTrigger(trigger);
  od;
end);

BindGlobal("IsMilestoneAchieved", atomic function(readonly milestone)
  return milestone.complete;
end);

BindGlobal("RunTask", function(arg)
  local task;
  task := TASKS.NewTask(arg[1], arg{[2..Length(arg)]});
  task.started := true;
  ShareSpecialObj(task, "task descriptor");
  TASKS.QueueTask(task);
  return task;
end);

BindGlobal("RunAsyncTask", function(arg)
  local task;
  task := TASKS.NewTask(arg[1], arg{[2..Length(arg)]});
  task.async := true;
  task.started := true;
  ShareSpecialObj(task);
  TASKS.QueueTask(task);
  return task;
end);

BindGlobal("DelayTask", function(arg)
  local task;
  task := TASKS.NewTask(arg[1], arg{[2..Length(arg)]});
  ShareSpecialObj(task);
  return task;
end);

BindGlobal("ExecuteTask", atomic function(readwrite task)
  if not task.started and Length(task.conditions) = 0 then
    task.started := true;
    TASKS.QueueTask(task);
  fi;
end);

BindGlobal("MakeAsyncTask", atomic function(readwrite task)
  if not task.started then
    task.async := true;
  else
    Error("Cannot make a task asynchronous after it has been started");
  fi;
end);

BindGlobal("ScheduleTask", function(arg)
  local cond, task;
  cond := arg[1];
  atomic readonly cond do
    if IS_LIST(cond) then
      cond := ShallowCopy(cond);
    else
      cond := [ cond ];
    fi;
  od;
  task := TASKS.NewTask(arg[2], arg{[3..Length(arg)]});
  task.conditions := MakeReadOnlySingleObj(cond);
  ShareSpecialObj(task);
  TASKS.QueueTask(task);
  return task;
end);

BindGlobal("ScheduleAsyncTask", function(arg)
  local cond, task;
  cond := arg[1];
  atomic readonly cond do
    if IS_LIST(cond) then
      cond := ShallowCopy(cond);
    else
      cond := [ cond ];
    fi;
  od;
  task := TASKS.NewTask(arg[2], arg{[3..Length(arg)]});
  task.async := true;
  task.conditions := MakeReadOnlySingleObj(cond);
  ShareSpecialObj(task);
  TASKS.QueueTask(task);
  return task;
end);

BindGlobal("WAIT_TASK", function(conditions, is_conjunction)
  local task, trigger, suspend, semaphore, cond, pending;
  task := TASKS.CurrentTask();
  pending := [];
  for cond in conditions do
    atomic cond do
      # are we dealing with a delayed task?
      if IsBound(cond.started) and not cond.started then
        if Length(cond.conditions) = 0 then
          Add(pending, cond);
        fi;
      fi;
    od;
  od;
  for cond in pending do
    ExecuteTask(cond);
  od;
  atomic task do
    suspend := not IsIdenticalObj(task.body, fail);
    task.conditions := MakeReadOnlySingleObj(conditions);
    semaphore := task.worker.semaphore;
    trigger := TASKS.BuildTrigger(task, is_conjunction);
  od;
  if not trigger.done then
    if suspend then
      atomic TASK_QUEUE do
        TASK_QUEUE.active_count := TASK_QUEUE.active_count - 1;
      od;
    fi;
    ShareSpecialObj(trigger);
    TASKS.ActivateTrigger(trigger);
    while true do
      TASKS.WakeWorker();
      WaitSemaphore(semaphore);
      atomic readonly trigger do
        if trigger.done then
          return;
        fi;
      od;
    od;
  fi;
end);

BindGlobal("WaitTask", function(arg)
  if Length(arg) = 1 and IsThreadLocal(arg[1]) and IS_LIST(arg[1]) then
    WAIT_TASK(ShallowCopy(arg[1]), true);
  else
    WAIT_TASK(arg, true);
  fi;
end);

BindGlobal("WaitTasks", WaitTask);

BindGlobal("WaitAnyTask", function(arg)
  local which, task, tasks;
  if Length(arg) = 1 and IsThreadLocal(arg[1]) and IS_LIST(arg[1]) then
    tasks := ShallowCopy(arg[1]);
  else
    tasks := arg;
  fi;
  WAIT_TASK(tasks, false);
  which := 1;
  for task in tasks do
    atomic readonly task do
      if task.complete then
        return which;
      fi;
    od;
    which := which + 1;
  od;
end);

BindGlobal("TaskResult", function(task)
  local complete;
  atomic readonly task do
    complete := task.complete;
  od;
  if not complete then
    WAIT_TASK([task], true);
  fi;
  atomic task do
    if task.adopt_result then
      atomic task.region do
        return AdoptObj(CopyRegion(task.result));
      od;
    else
      return task.result;
    fi;
  od;
end);

BindGlobal("SetTaskResult", function(result)
  local task;
  task := TASKS.CurrentTask();
  atomic task do
    task.result := result;
  od;
end);

BindGlobal("ImmediateTask", function(arg)
  local result, task;
  result := CALL_WITH_CATCH(arg[1], arg{[2..Length(arg)]});
  if Length(result) = 1 or not result[1] then
    if Length(result) > 1 then
      Print("Task Error: ", result[2], "\n");
    fi;
    result := fail;
  else
    result := result[2];
  fi;
  task := ShareSingleSpecialObj (rec( started := true, complete := true,
                  async := false, result := result, adopt_result := false ));
  return task;
end);

BindGlobal("TaskIsAsync", atomic function(readonly task)
  return task.async;
end);

BindGlobal("TaskStarted", atomic function(readonly task)
  return task.started;
end);

BindGlobal("TaskFinished", atomic function(readonly task)
  return task.complete;
end);

BindGlobal("TaskError", function(task)
  local complete;
  atomic readonly task do
    complete := task.complete;
    if complete then
      return task.error;
    fi;
  od;
  WAIT_TASK([task], true);
  atomic readonly task do
    return task.error;
  od;
end);

BindGlobal("TaskSuccess", function(task)
  return TaskError(task) = fail;
end);



BindGlobal("TaskCancellationRequested", atomic function(readonly task)
  return task.cancel;
end);

BindGlobal("TaskCancelled", atomic function(readonly task)
  return task.cancelled;
end);

BindGlobal("CancelTask", atomic function(readwrite task)
  if not task.complete then
    task.cancel := true;
  fi;
end);

BindGlobal("OnTaskCancellation", function(exit)
  local task, cancel, result;
  task := TASKS.CurrentTask();
  atomic task do
    if task.cancel and not task.cancelled then
      cancel := true;
      task.cancelled := true;
    else
      cancel := false;
    fi;
  od;
  if cancel then
    result := CALL_WITH_CATCH(exit, []);
    if result[1] and IsBound(result[2]) then
      atomic task do
        task.result := result[2];
      od;
    fi;
    JUMP_TO_CATCH(0);
  fi;
end);

BindGlobal("OnTaskCancellationReturn", function(value)
  OnTaskCancellation({}->value);
end);


BindGlobal("TaskInfo", atomic function(readonly task)
  return CopyRegion(task);
end);

BindGlobal("SetMaxTaskWorkers", function(count)
  atomic TASK_QUEUE do
    TASK_QUEUE.max_active := count;
  od;
end);

BindGlobal("RunningTasks", function()
  atomic TASK_QUEUE do
    return TASK_QUEUE.active_count;
  od;
end);

BindGlobal("CullIdleTasks", function()
end);

[ Dauer der Verarbeitung: 0.36 Sekunden  (vorverarbeitet)  ]

                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge