Quellcodebibliothek Statistik Leitseite products/sources/formale Sprachen/C/Firefox/nsprpub/pr/src/io/   (Browser von der Mozilla Stiftung Version 136.0.1©)  Datei vom 10.2.2025 mit Größe 39 kB image not shown  

Quelle  prmwait.c   Sprache: C

 
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */


#include "primpl.h"
#include "pprmwait.h"

#define _MW_REHASH_MAX 11

static PRLock* mw_lock = NULL;
static _PRGlobalState* mw_state = NULL;

static PRIntervalTime max_polling_interval;

#ifdef WINNT

typedef struct TimerEvent {
  PRIntervalTime absolute;
  void (*func)(void*);
  void* arg;
  LONG ref_count;
  PRCList links;
} TimerEvent;

#  define TIMER_EVENT_PTR(_qp) \
    ((TimerEvent*)((char*)(_qp) - offsetof(TimerEvent, links)))

struct {
  PRLock* ml;
  PRCondVar* new_timer;
  PRCondVar* cancel_timer;
  PRThread* manager_thread;
  PRCList timer_queue;
} tm_vars;

static PRStatus TimerInit(void);
static void TimerManager(void* arg);
static TimerEvent* CreateTimer(PRIntervalTime timeout, void (*func)(void*),
                               void* arg);
static PRBool CancelTimer(TimerEvent* timer);

static void TimerManager(void* arg) {
  PRIntervalTime now;
  PRIntervalTime timeout;
  PRCList* head;
  TimerEvent* timer;

  PR_Lock(tm_vars.ml);
  while (1) {
    if (PR_CLIST_IS_EMPTY(&tm_vars.timer_queue)) {
      PR_WaitCondVar(tm_vars.new_timer, PR_INTERVAL_NO_TIMEOUT);
    } else {
      now = PR_IntervalNow();
      head = PR_LIST_HEAD(&tm_vars.timer_queue);
      timer = TIMER_EVENT_PTR(head);
      if ((PRInt32)(now - timer->absolute) >= 0) {
        PR_REMOVE_LINK(head);
        /*
         * make its prev and next point to itself so that
         * it's obvious that it's not on the timer_queue.
         */

        PR_INIT_CLIST(head);
        PR_ASSERT(2 == timer->ref_count);
        PR_Unlock(tm_vars.ml);
        timer->func(timer->arg);
        PR_Lock(tm_vars.ml);
        timer->ref_count -= 1;
        if (0 == timer->ref_count) {
          PR_NotifyAllCondVar(tm_vars.cancel_timer);
        }
      } else {
        timeout = (PRIntervalTime)(timer->absolute - now);
        PR_WaitCondVar(tm_vars.new_timer, timeout);
      }
    }
  }
  PR_Unlock(tm_vars.ml);
}

static TimerEvent* CreateTimer(PRIntervalTime timeout, void (*func)(void*),
                               void* arg) {
  TimerEvent* timer;
  PRCList *links, *tail;
  TimerEvent* elem;

  timer = PR_NEW(TimerEvent);
  if (NULL == timer) {
    PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
    return timer;
  }
  timer->absolute = PR_IntervalNow() + timeout;
  timer->func = func;
  timer->arg = arg;
  timer->ref_count = 2;
  PR_Lock(tm_vars.ml);
  tail = links = PR_LIST_TAIL(&tm_vars.timer_queue);
  while (links->prev != tail) {
    elem = TIMER_EVENT_PTR(links);
    if ((PRInt32)(timer->absolute - elem->absolute) >= 0) {
      break;
    }
    links = links->prev;
  }
  PR_INSERT_AFTER(&timer->links, links);
  PR_NotifyCondVar(tm_vars.new_timer);
  PR_Unlock(tm_vars.ml);
  return timer;
}

static PRBool CancelTimer(TimerEvent* timer) {
  PRBool canceled = PR_FALSE;

  PR_Lock(tm_vars.ml);
  timer->ref_count -= 1;
  if (timer->links.prev == &timer->links) {
    while (timer->ref_count == 1) {
      PR_WaitCondVar(tm_vars.cancel_timer, PR_INTERVAL_NO_TIMEOUT);
    }
  } else {
    PR_REMOVE_LINK(&timer->links);
    canceled = PR_TRUE;
  }
  PR_Unlock(tm_vars.ml);
  PR_DELETE(timer);
  return canceled;
}

static PRStatus TimerInit(void) {
  tm_vars.ml = PR_NewLock();
  if (NULL == tm_vars.ml) {
    goto failed;
  }
  tm_vars.new_timer = PR_NewCondVar(tm_vars.ml);
  if (NULL == tm_vars.new_timer) {
    goto failed;
  }
  tm_vars.cancel_timer = PR_NewCondVar(tm_vars.ml);
  if (NULL == tm_vars.cancel_timer) {
    goto failed;
  }
  PR_INIT_CLIST(&tm_vars.timer_queue);
  tm_vars.manager_thread =
      PR_CreateThread(PR_SYSTEM_THREAD, TimerManager, NULL, PR_PRIORITY_NORMAL,
                      PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, 0);
  if (NULL == tm_vars.manager_thread) {
    goto failed;
  }
  return PR_SUCCESS;

failed:
  if (NULL != tm_vars.cancel_timer) {
    PR_DestroyCondVar(tm_vars.cancel_timer);
  }
  if (NULL != tm_vars.new_timer) {
    PR_DestroyCondVar(tm_vars.new_timer);
  }
  if (NULL != tm_vars.ml) {
    PR_DestroyLock(tm_vars.ml);
  }
  return PR_FAILURE;
}

#endif /* WINNT */

/******************************************************************/
/******************************************************************/
/************************ The private portion *********************/
/******************************************************************/
/******************************************************************/
void _PR_InitMW(void) {
#ifdef WINNT
  /*
   * We use NT 4's InterlockedCompareExchange() to operate
   * on PRMWStatus variables.
   */

  PR_ASSERT(sizeof(LONG) == sizeof(PRMWStatus));
  TimerInit();
#endif
  mw_lock = PR_NewLock();
  PR_ASSERT(NULL != mw_lock);
  mw_state = PR_NEWZAP(_PRGlobalState);
  PR_ASSERT(NULL != mw_state);
  PR_INIT_CLIST(&mw_state->group_list);
  max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL);
/* _PR_InitMW */

void _PR_CleanupMW(void) {
  PR_DestroyLock(mw_lock);
  mw_lock = NULL;
  if (mw_state->group) {
    PR_DestroyWaitGroup(mw_state->group);
    /* mw_state->group is set to NULL as a side effect. */
  }
  PR_DELETE(mw_state);
/* _PR_CleanupMW */

static PRWaitGroup* MW_Init2(void) {
  PRWaitGroup* group = mw_state->group; /* it's the null group */
  if (NULL == group)                    /* there is this special case */
  {
    group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH);
    if (NULL == group) {
      goto failed_alloc;
    }
    PR_Lock(mw_lock);
    if (NULL == mw_state->group) {
      mw_state->group = group;
      group = NULL;
    }
    PR_Unlock(mw_lock);
    if (group != NULL) {
      (void)PR_DestroyWaitGroup(group);
    }
    group = mw_state->group; /* somebody beat us to it */
  }
failed_alloc:
  return group; /* whatever */
/* MW_Init2 */

static _PR_HashStory MW_AddHashInternal(PRRecvWait* desc, _PRWaiterHash* hash) {
  /*
  ** The entries are put in the table using the fd (PRFileDesc*) of
  ** the receive descriptor as the key. This allows us to locate
  ** the appropriate entry aqain when the poll operation finishes.
  **
  ** The pointer to the file descriptor object is first divided by
  ** the natural alignment of a pointer in the belief that object
  ** will have at least that many zeros in the low order bits.
  ** This may not be a good assuption.
  **
  ** We try to put the entry in by rehashing _MW_REHASH_MAX times. After
  ** that we declare defeat and force the table to be reconstructed.
  ** Since some fds might be added more than once, won't that cause
  ** collisions even in an empty table?
  */

  PRIntn rehash = _MW_REHASH_MAX;
  PRRecvWait** waiter;
  PRUintn hidx = _MW_HASH(desc->fd, hash->length);
  PRUintn hoffset = 0;

  while (rehash-- > 0) {
    waiter = &hash->recv_wait;
    if (NULL == waiter[hidx]) {
      waiter[hidx] = desc;
      hash->count += 1;
#if 0
            printf("Adding 0x%x->0x%x ", desc, desc->fd);
            printf(
                "table[%u:%u:*%u]: 0x%x->0x%x\n",
                hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
#endif
      return _prmw_success;
    }
    if (desc == waiter[hidx]) {
      PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); /* desc already in table */
      return _prmw_error;
    }
#if 0
        printf("Failing 0x%x->0x%x ", desc, desc->fd);
        printf(
            "table[*%u:%u:%u]: 0x%x->0x%x\n",
            hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
#endif
    if (0 == hoffset) {
      hoffset = _MW_HASH2(desc->fd, hash->length);
      PR_ASSERT(0 != hoffset);
    }
    hidx = (hidx + hoffset) % (hash->length);
  }
  return _prmw_rehash;
/* MW_AddHashInternal */

static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup* group) {
  PRRecvWait** desc;
  PRUint32 pidx, length;
  _PRWaiterHash *newHash, *oldHash = group->waiter;
  PRBool retry;
  _PR_HashStory hrv;

  static const PRInt32 prime_number[] = {_PR_DEFAULT_HASH_LENGTH,
                                         179,
                                         521,
                                         907,
                                         1427,
                                         2711,
                                         3917,
                                         5021,
                                         8219,
                                         11549,
                                         18911,
                                         26711,
                                         33749,
                                         44771};
  PRUintn primes = (sizeof(prime_number) / sizeof(PRInt32));

  /* look up the next size we'd like to use for the hash table */
  for (pidx = 0; pidx < primes; ++pidx) {
    if (prime_number[pidx] == oldHash->length) {
      break;
    }
  }
  /* table size must be one of the prime numbers */
  PR_ASSERT(pidx < primes);

  /* if pidx == primes - 1, we can't expand the table any more */
  while (pidx < primes - 1) {
    /* next size */
    ++pidx;
    length = prime_number[pidx];

    /* allocate the new hash table and fill it in with the old */
    newHash = (_PRWaiterHash*)PR_CALLOC(sizeof(_PRWaiterHash) +
                                        (length * sizeof(PRRecvWait*)));
    if (NULL == newHash) {
      PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
      return _prmw_error;
    }

    newHash->length = length;
    retry = PR_FALSE;
    for (desc = &oldHash->recv_wait; newHash->count < oldHash->count; ++desc) {
      PR_ASSERT(desc < &oldHash->recv_wait + oldHash->length);
      if (NULL != *desc) {
        hrv = MW_AddHashInternal(*desc, newHash);
        PR_ASSERT(_prmw_error != hrv);
        if (_prmw_success != hrv) {
          PR_DELETE(newHash);
          retry = PR_TRUE;
          break;
        }
      }
    }
    if (retry) {
      continue;
    }

    PR_DELETE(group->waiter);
    group->waiter = newHash;
    group->p_timestamp += 1;
    return _prmw_success;
  }

  PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
  return _prmw_error; /* we're hosed */
/* MW_ExpandHashInternal */

#ifndef WINNT
static void _MW_DoneInternal(PRWaitGroup* group, PRRecvWait** waiter,
                             PRMWStatus outcome) {
  /*
  ** Add this receive wait object to the list of finished I/O
  ** operations for this particular group. If there are other
  ** threads waiting on the group, notify one. If not, arrange
  ** for this thread to return.
  */


#  if 0
    printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd);
#  endif
  (*waiter)->outcome = outcome;
  PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready);
  PR_NotifyCondVar(group->io_complete);
  PR_ASSERT(0 != group->waiter->count);
  group->waiter->count -= 1;
  *waiter = NULL;
/* _MW_DoneInternal */
#endif /* WINNT */

static PRRecvWait** _MW_LookupInternal(PRWaitGroup* group, PRFileDesc* fd) {
  /*
  ** Find the receive wait object corresponding to the file descriptor.
  ** Only search the wait group specified.
  */

  PRRecvWait** desc;
  PRIntn rehash = _MW_REHASH_MAX;
  _PRWaiterHash* hash = group->waiter;
  PRUintn hidx = _MW_HASH(fd, hash->length);
  PRUintn hoffset = 0;

  while (rehash-- > 0) {
    desc = (&hash->recv_wait) + hidx;
    if ((*desc != NULL) && ((*desc)->fd == fd)) {
      return desc;
    }
    if (0 == hoffset) {
      hoffset = _MW_HASH2(fd, hash->length);
      PR_ASSERT(0 != hoffset);
    }
    hidx = (hidx + hoffset) % (hash->length);
  }
  return NULL;
/* _MW_LookupInternal */

#ifndef WINNT
static PRStatus _MW_PollInternal(PRWaitGroup* group) {
  PRRecvWait** waiter;
  PRStatus rv = PR_FAILURE;
  PRInt32 count, count_ready;
  PRIntervalTime polling_interval;

  group->poller = PR_GetCurrentThread();

  while (PR_TRUE) {
    PRIntervalTime now, since_last_poll;
    PRPollDesc* poll_list;

    while (0 == group->waiter->count) {
      PRStatus st;
      st = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT);
      if (_prmw_running != group->state) {
        PR_SetError(PR_INVALID_STATE_ERROR, 0);
        goto aborted;
      }
      if (_MW_ABORTED(st)) {
        goto aborted;
      }
    }

    /*
    ** There's something to do. See if our existing polling list
    ** is large enough for what we have to do?
    */


    while (group->polling_count < group->waiter->count) {
      PRUint32 old_count = group->waiter->count;
      PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE);
      PRSize new_size = sizeof(PRPollDesc) * new_count;
      PRPollDesc* old_polling_list = group->polling_list;

      PR_Unlock(group->ml);
      poll_list = (PRPollDesc*)PR_CALLOC(new_size);
      if (NULL == poll_list) {
        PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
        PR_Lock(group->ml);
        goto failed_alloc;
      }
      if (NULL != old_polling_list) {
        PR_DELETE(old_polling_list);
      }
      PR_Lock(group->ml);
      if (_prmw_running != group->state) {
        PR_DELETE(poll_list);
        PR_SetError(PR_INVALID_STATE_ERROR, 0);
        goto aborted;
      }
      group->polling_list = poll_list;
      group->polling_count = new_count;
    }

    now = PR_IntervalNow();
    polling_interval = max_polling_interval;
    since_last_poll = now - group->last_poll;

    waiter = &group->waiter->recv_wait;
    poll_list = group->polling_list;
    for (count = 0; count < group->waiter->count; ++waiter) {
      PR_ASSERT(waiter < &group->waiter->recv_wait + group->waiter->length);
      if (NULL != *waiter) /* a live one! */
      {
        if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout) &&
            (since_last_poll >= (*waiter)->timeout)) {
          _MW_DoneInternal(group, waiter, PR_MW_TIMEOUT);
        } else {
          if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout) {
            (*waiter)->timeout -= since_last_poll;
            if ((*waiter)->timeout < polling_interval) {
              polling_interval = (*waiter)->timeout;
            }
          }
          PR_ASSERT(poll_list < group->polling_list + group->polling_count);
          poll_list->fd = (*waiter)->fd;
          poll_list->in_flags = PR_POLL_READ;
          poll_list->out_flags = 0;
#  if 0
                    printf(
                        "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n",
                        poll_list, count, poll_list->fd, (*waiter)->timeout);
#  endif
          poll_list += 1;
          count += 1;
        }
      }
    }

    PR_ASSERT(count == group->waiter->count);

    /*
    ** If there are no more threads waiting for completion,
    ** we need to return.
    */

    if ((!PR_CLIST_IS_EMPTY(&group->io_ready)) &&
        (1 == group->waiting_threads)) {
      break;
    }

    if (0 == count) {
      continue/* wait for new business */
    }

    group->last_poll = now;

    PR_Unlock(group->ml);

    count_ready = PR_Poll(group->polling_list, count, polling_interval);

    PR_Lock(group->ml);

    if (_prmw_running != group->state) {
      PR_SetError(PR_INVALID_STATE_ERROR, 0);
      goto aborted;
    }
    if (-1 == count_ready) {
      goto failed_poll; /* that's a shame */
    } else if (0 < count_ready) {
      for (poll_list = group->polling_list; count > 0; poll_list++, count--) {
        PR_ASSERT(poll_list < group->polling_list + group->polling_count);
        if (poll_list->out_flags != 0) {
          waiter = _MW_LookupInternal(group, poll_list->fd);
          /*
          ** If 'waiter' is NULL, that means the wait receive
          ** descriptor has been canceled.
          */

          if (NULL != waiter) {
            _MW_DoneInternal(group, waiter, PR_MW_SUCCESS);
          }
        }
      }
    }
    /*
    ** If there are no more threads waiting for completion,
    ** we need to return.
    ** This thread was "borrowed" to do the polling, but it really
    ** belongs to the client.
    */

    if ((!PR_CLIST_IS_EMPTY(&group->io_ready)) &&
        (1 == group->waiting_threads)) {
      break;
    }
  }

  rv = PR_SUCCESS;

aborted:
failed_poll:
failed_alloc:
  group->poller = NULL; /* we were that, not we ain't */
  if ((_prmw_running == group->state) && (group->waiting_threads > 1)) {
    /* Wake up one thread to become the new poller. */
    PR_NotifyCondVar(group->io_complete);
  }
  return rv; /* we return with the lock held */
/* _MW_PollInternal */
#endif /* !WINNT */

static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup* group) {
  PRMWGroupState rv = group->state;
  /*
  ** Looking at the group's fields is safe because
  ** once the group's state is no longer running, it
  ** cannot revert and there is a safe check on entry
  ** to make sure no more threads are made to wait.
  */

  if ((_prmw_stopping == rv) && (0 == group->waiting_threads)) {
    rv = group->state = _prmw_stopped;
    PR_NotifyCondVar(group->mw_manage);
  }
  return rv;
/* MW_TestForShutdownInternal */

#ifndef WINNT
static void _MW_InitialRecv(PRCList* io_ready) {
  PRRecvWait* desc = (PRRecvWait*)io_ready;
  if ((NULL == desc->buffer.start) || (0 == desc->buffer.length)) {
    desc->bytesRecv = 0;
  } else {
    desc->bytesRecv = (desc->fd->methods->recv)(
        desc->fd, desc->buffer.start, desc->buffer.length, 0, desc->timeout);
    if (desc->bytesRecv < 0) { /* SetError should already be there */
      desc->outcome = PR_MW_FAILURE;
    }
  }
/* _MW_InitialRecv */
#endif

#ifdef WINNT
static void NT_TimeProc(void* arg) {
  _MDOverlapped* overlapped = (_MDOverlapped*)arg;
  PRRecvWait* desc = overlapped->data.mw.desc;
  PRFileDesc* bottom;

  if (InterlockedCompareExchange((LONG*)&desc->outcome, (LONG)PR_MW_TIMEOUT,
                                 (LONG)PR_MW_PENDING) != (LONG)PR_MW_PENDING) {
    /* This wait recv descriptor has already completed. */
    return;
  }

  /* close the osfd to abort the outstanding async io request */
  /* $$$$
  ** Little late to be checking if NSPR's on the bottom of stack,
  ** but if we don't check, we can't assert that the private data
  ** is what we think it is.
  ** $$$$
  */

  bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
  PR_ASSERT(NULL != bottom);
  if (NULL != bottom) /* now what!?!?! */
  {
    bottom->secret->state = _PR_FILEDESC_CLOSED;
    if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) {
      fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
      PR_NOT_REACHED("What shall I do?");
    }
  }
  return;
/* NT_TimeProc */

static PRStatus NT_HashRemove(PRWaitGroup* group, PRFileDesc* fd) {
  PRRecvWait** waiter;

  _PR_MD_LOCK(&group->mdlock);
  waiter = _MW_LookupInternal(group, fd);
  if (NULL != waiter) {
    group->waiter->count -= 1;
    *waiter = NULL;
  }
  _PR_MD_UNLOCK(&group->mdlock);
  return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
}

PRStatus NT_HashRemoveInternal(PRWaitGroup* group, PRFileDesc* fd) {
  PRRecvWait** waiter;

  waiter = _MW_LookupInternal(group, fd);
  if (NULL != waiter) {
    group->waiter->count -= 1;
    *waiter = NULL;
  }
  return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
}
#endif /* WINNT */

/******************************************************************/
/******************************************************************/
/********************** The public API portion ********************/
/******************************************************************/
/******************************************************************/
PR_IMPLEMENT(PRStatus)
PR_AddWaitFileDesc(PRWaitGroup* group, PRRecvWait* desc) {
  _PR_HashStory hrv;
  PRStatus rv = PR_FAILURE;
#ifdef WINNT
  _MDOverlapped* overlapped;
  HANDLE hFile;
  BOOL bResult;
  DWORD dwError;
  PRFileDesc* bottom;
#endif

  if (!_pr_initialized) {
    _PR_ImplicitInitialization();
  }
  if ((NULL == group) && (NULL == (group = MW_Init2()))) {
    return rv;
  }

  PR_ASSERT(NULL != desc->fd);

  desc->outcome = PR_MW_PENDING; /* nice, well known value */
  desc->bytesRecv = 0;           /* likewise, though this value is ambiguious */

  PR_Lock(group->ml);

  if (_prmw_running != group->state) {
    /* Not allowed to add after cancelling the group */
    desc->outcome = PR_MW_INTERRUPT;
    PR_SetError(PR_INVALID_STATE_ERROR, 0);
    PR_Unlock(group->ml);
    return rv;
  }

#ifdef WINNT
  _PR_MD_LOCK(&group->mdlock);
#endif

  /*
  ** If the waiter count is zero at this point, there's no telling
  ** how long we've been idle. Therefore, initialize the beginning
  ** of the timing interval. As long as the list doesn't go empty,
  ** it will maintain itself.
  */

  if (0 == group->waiter->count) {
    group->last_poll = PR_IntervalNow();
  }

  do {
    hrv = MW_AddHashInternal(desc, group->waiter);
    if (_prmw_rehash != hrv) {
      break;
    }
    hrv = MW_ExpandHashInternal(group); /* gruesome */
    if (_prmw_success != hrv) {
      break;
    }
  } while (PR_TRUE);

#ifdef WINNT
  _PR_MD_UNLOCK(&group->mdlock);
#endif

  PR_NotifyCondVar(group->new_business); /* tell the world */
  rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE;
  PR_Unlock(group->ml);

#ifdef WINNT
  overlapped = PR_NEWZAP(_MDOverlapped);
  if (NULL == overlapped) {
    PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
    NT_HashRemove(group, desc->fd);
    return rv;
  }
  overlapped->ioModel = _MD_MultiWaitIO;
  overlapped->data.mw.desc = desc;
  overlapped->data.mw.group = group;
  if (desc->timeout != PR_INTERVAL_NO_TIMEOUT) {
    overlapped->data.mw.timer =
        CreateTimer(desc->timeout, NT_TimeProc, overlapped);
    if (0 == overlapped->data.mw.timer) {
      NT_HashRemove(group, desc->fd);
      PR_DELETE(overlapped);
      /*
       * XXX It appears that a maximum of 16 timer events can
       * be outstanding. GetLastError() returns 0 when I try it.
       */

      PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError());
      return PR_FAILURE;
    }
  }

  /* Reach to the bottom layer to get the OS fd */
  bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
  PR_ASSERT(NULL != bottom);
  if (NULL == bottom) {
    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
    return PR_FAILURE;
  }
  hFile = (HANDLE)bottom->secret->md.osfd;
  if (!bottom->secret->md.io_model_committed) {
    PRInt32 st;
    st = _md_Associate(hFile);
    PR_ASSERT(0 != st);
    bottom->secret->md.io_model_committed = PR_TRUE;
  }
  bResult = ReadFile(hFile, desc->buffer.start, (DWORD)desc->buffer.length,
                     NULL, &overlapped->overlapped);
  if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING) {
    if (desc->timeout != PR_INTERVAL_NO_TIMEOUT) {
      if (InterlockedCompareExchange((LONG*)&desc->outcome, (LONG)PR_MW_FAILURE,
                                     (LONG)PR_MW_PENDING) ==
          (LONG)PR_MW_PENDING) {
        CancelTimer(overlapped->data.mw.timer);
      }
      NT_HashRemove(group, desc->fd);
      PR_DELETE(overlapped);
    }
    _PR_MD_MAP_READ_ERROR(dwError);
    rv = PR_FAILURE;
  }
#endif

  return rv;
/* PR_AddWaitFileDesc */

PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup* group) {
  PRCList* io_ready = NULL;
#ifdef WINNT
  PRThread* me = _PR_MD_CURRENT_THREAD();
  _MDOverlapped* overlapped;
#endif

  if (!_pr_initialized) {
    _PR_ImplicitInitialization();
  }
  if ((NULL == group) && (NULL == (group = MW_Init2()))) {
    goto failed_init;
  }

  PR_Lock(group->ml);

  if (_prmw_running != group->state) {
    PR_SetError(PR_INVALID_STATE_ERROR, 0);
    goto invalid_state;
  }

  group->waiting_threads += 1; /* the polling thread is counted */

#ifdef WINNT
  _PR_MD_LOCK(&group->mdlock);
  while (PR_CLIST_IS_EMPTY(&group->io_ready)) {
    _PR_THREAD_LOCK(me);
    me->state = _PR_IO_WAIT;
    PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
    if (!_PR_IS_NATIVE_THREAD(me)) {
      _PR_SLEEPQ_LOCK(me->cpu);
      _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
      _PR_SLEEPQ_UNLOCK(me->cpu);
    }
    _PR_THREAD_UNLOCK(me);
    _PR_MD_UNLOCK(&group->mdlock);
    PR_Unlock(group->ml);
    _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
    me->state = _PR_RUNNING;
    PR_Lock(group->ml);
    _PR_MD_LOCK(&group->mdlock);
    if (_PR_PENDING_INTERRUPT(me)) {
      PR_REMOVE_LINK(&me->waitQLinks);
      _PR_MD_UNLOCK(&group->mdlock);
      me->flags &= ~_PR_INTERRUPT;
      me->io_suspended = PR_FALSE;
      PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
      goto aborted;
    }
  }
  io_ready = PR_LIST_HEAD(&group->io_ready);
  PR_ASSERT(io_ready != NULL);
  PR_REMOVE_LINK(io_ready);
  _PR_MD_UNLOCK(&group->mdlock);
  overlapped =
      (_MDOverlapped*)((char*)io_ready - offsetof(_MDOverlapped, data));
  io_ready = &overlapped->data.mw.desc->internal;
#else
  do {
    /*
    ** If the I/O ready list isn't empty, have this thread
    ** return with the first receive wait object that's available.
    */

    if (PR_CLIST_IS_EMPTY(&group->io_ready)) {
      /*
      ** Is there a polling thread yet? If not, grab this thread
      ** and use it.
      */

      if (NULL == group->poller) {
        /*
        ** This thread will stay do polling until it becomes the only one
        ** left to service a completion. Then it will return and there will
        ** be none left to actually poll or to run completions.
        **
        ** The polling function should only return w/ failure or
        ** with some I/O ready.
        */

        if (PR_FAILURE == _MW_PollInternal(group)) {
          goto failed_poll;
        }
      } else {
        /*
        ** There are four reasons a thread can be awakened from
        ** a wait on the io_complete condition variable.
        ** 1. Some I/O has completed, i.e., the io_ready list
        **    is nonempty.
        ** 2. The wait group is canceled.
        ** 3. The thread is interrupted.
        ** 4. The current polling thread has to leave and needs
        **    a replacement.
        ** The logic to find a new polling thread is made more
        ** complicated by all the other possible events.
        ** I tried my best to write the logic clearly, but
        ** it is still full of if's with continue and goto.
        */

        PRStatus st;
        do {
          st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEOUT);
          if (_prmw_running != group->state) {
            PR_SetError(PR_INVALID_STATE_ERROR, 0);
            goto aborted;
          }
          if (_MW_ABORTED(st) || (NULL == group->poller)) {
            break;
          }
        } while (PR_CLIST_IS_EMPTY(&group->io_ready));

        /*
        ** The thread is interrupted and has to leave.  It might
        ** have also been awakened to process ready i/o or be the
        ** new poller.  To be safe, if either condition is true,
        ** we awaken another thread to take its place.
        */

        if (_MW_ABORTED(st)) {
          if ((NULL == group->poller || !PR_CLIST_IS_EMPTY(&group->io_ready)) &&
              group->waiting_threads > 1) {
            PR_NotifyCondVar(group->io_complete);
          }
          goto aborted;
        }

        /*
        ** A new poller is needed, but can I be the new poller?
        ** If there is no i/o ready, sure.  But if there is any
        ** i/o ready, it has a higher priority.  I want to
        ** process the ready i/o first and wake up another
        ** thread to be the new poller.
        */

        if (NULL == group->poller) {
          if (PR_CLIST_IS_EMPTY(&group->io_ready)) {
            continue;
          }
          if (group->waiting_threads > 1) {
            PR_NotifyCondVar(group->io_complete);
          }
        }
      }
      PR_ASSERT(!PR_CLIST_IS_EMPTY(&group->io_ready));
    }
    io_ready = PR_LIST_HEAD(&group->io_ready);
    PR_NotifyCondVar(group->io_taken);
    PR_ASSERT(io_ready != NULL);
    PR_REMOVE_LINK(io_ready);
  } while (NULL == io_ready);

failed_poll:

#endif

aborted:

  group->waiting_threads -= 1;
invalid_state:
  (void)MW_TestForShutdownInternal(group);
  PR_Unlock(group->ml);

failed_init:
  if (NULL != io_ready) {
    /* If the operation failed, record the reason why */
    switch (((PRRecvWait*)io_ready)->outcome) {
      case PR_MW_PENDING:
        PR_ASSERT(0);
        break;
      case PR_MW_SUCCESS:
#ifndef WINNT
        _MW_InitialRecv(io_ready);
#endif
        break;
#ifdef WINNT
      case PR_MW_FAILURE:
        _PR_MD_MAP_READ_ERROR(overlapped->data.mw.error);
        break;
#endif
      case PR_MW_TIMEOUT:
        PR_SetError(PR_IO_TIMEOUT_ERROR, 0);
        break;
      case PR_MW_INTERRUPT:
        PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
        break;
      default:
        break;
    }
#ifdef WINNT
    if (NULL != overlapped->data.mw.timer) {
      PR_ASSERT(PR_INTERVAL_NO_TIMEOUT != overlapped->data.mw.desc->timeout);
      CancelTimer(overlapped->data.mw.timer);
    } else {
      PR_ASSERT(PR_INTERVAL_NO_TIMEOUT == overlapped->data.mw.desc->timeout);
    }
    PR_DELETE(overlapped);
#endif
  }
  return (PRRecvWait*)io_ready;
/* PR_WaitRecvReady */

PR_IMPLEMENT(PRStatus)
PR_CancelWaitFileDesc(PRWaitGroup* group, PRRecvWait* desc) {
#if !defined(WINNT)
  PRRecvWait** recv_wait;
#endif
  PRStatus rv = PR_SUCCESS;
  if (NULL == group) {
    group = mw_state->group;
  }
  PR_ASSERT(NULL != group);
  if (NULL == group) {
    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
    return PR_FAILURE;
  }

  PR_Lock(group->ml);

  if (_prmw_running != group->state) {
    PR_SetError(PR_INVALID_STATE_ERROR, 0);
    rv = PR_FAILURE;
    goto unlock;
  }

#ifdef WINNT
  if (InterlockedCompareExchange((LONG*)&desc->outcome, (LONG)PR_MW_INTERRUPT,
                                 (LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING) {
    PRFileDesc* bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
    PR_ASSERT(NULL != bottom);
    if (NULL == bottom) {
      PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
      goto unlock;
    }
    bottom->secret->state = _PR_FILEDESC_CLOSED;
#  if 0
        fprintf(stderr, "cancel wait recv: closing socket\n");
#  endif
    if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) {
      fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
      exit(1);
    }
  }
#else
  if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd))) {
    /* it was in the wait table */
    _MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT);
    goto unlock;
  }
  if (!PR_CLIST_IS_EMPTY(&group->io_ready)) {
    /* is it already complete? */
    PRCList* head = PR_LIST_HEAD(&group->io_ready);
    do {
      PRRecvWait* done = (PRRecvWait*)head;
      if (done == desc) {
        goto unlock;
      }
      head = PR_NEXT_LINK(head);
    } while (head != &group->io_ready);
  }
  PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
  rv = PR_FAILURE;

#endif
unlock:
  PR_Unlock(group->ml);
  return rv;
/* PR_CancelWaitFileDesc */

PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup* group) {
  PRRecvWait** desc;
  PRRecvWait* recv_wait = NULL;
#ifdef WINNT
  _MDOverlapped* overlapped;
  PRRecvWait** end;
  PRThread* me = _PR_MD_CURRENT_THREAD();
#endif

  if (NULL == group) {
    group = mw_state->group;
  }
  PR_ASSERT(NULL != group);
  if (NULL == group) {
    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
    return NULL;
  }

  PR_Lock(group->ml);
  if (_prmw_stopped != group->state) {
    if (_prmw_running == group->state) {
      group->state = _prmw_stopping; /* so nothing new comes in */
    }
    if (0 == group->waiting_threads) { /* is there anybody else? */
      group->state = _prmw_stopped;    /* we can stop right now */
    } else {
      PR_NotifyAllCondVar(group->new_business);
      PR_NotifyAllCondVar(group->io_complete);
    }
    while (_prmw_stopped != group->state) {
      (void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT);
    }
  }

#ifdef WINNT
  _PR_MD_LOCK(&group->mdlock);
#endif
  /* make all the existing descriptors look done/interrupted */
#ifdef WINNT
  end = &group->waiter->recv_wait + group->waiter->length;
  for (desc = &group->waiter->recv_wait; desc < end; ++desc) {
    if (NULL != *desc) {
      if (InterlockedCompareExchange(
              (LONG*)&(*desc)->outcome, (LONG)PR_MW_INTERRUPT,
              (LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING) {
        PRFileDesc* bottom =
            PR_GetIdentitiesLayer((*desc)->fd, PR_NSPR_IO_LAYER);
        PR_ASSERT(NULL != bottom);
        if (NULL == bottom) {
          PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
          goto invalid_arg;
        }
        bottom->secret->state = _PR_FILEDESC_CLOSED;
#  if 0
                fprintf(stderr, "cancel wait group: closing socket\n");
#  endif
        if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) {
          fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
          exit(1);
        }
      }
    }
  }
  while (group->waiter->count > 0) {
    _PR_THREAD_LOCK(me);
    me->state = _PR_IO_WAIT;
    PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
    if (!_PR_IS_NATIVE_THREAD(me)) {
      _PR_SLEEPQ_LOCK(me->cpu);
      _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
      _PR_SLEEPQ_UNLOCK(me->cpu);
    }
    _PR_THREAD_UNLOCK(me);
    _PR_MD_UNLOCK(&group->mdlock);
    PR_Unlock(group->ml);
    _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
    me->state = _PR_RUNNING;
    PR_Lock(group->ml);
    _PR_MD_LOCK(&group->mdlock);
  }
#else
  for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc) {
    PR_ASSERT(desc < &group->waiter->recv_wait + group->waiter->length);
    if (NULL != *desc) {
      _MW_DoneInternal(group, desc, PR_MW_INTERRUPT);
    }
  }
#endif

  /* take first element of finished list and return it or NULL */
  if (PR_CLIST_IS_EMPTY(&group->io_ready)) {
    PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
  } else {
    PRCList* head = PR_LIST_HEAD(&group->io_ready);
    PR_REMOVE_AND_INIT_LINK(head);
#ifdef WINNT
    overlapped = (_MDOverlapped*)((char*)head - offsetof(_MDOverlapped, data));
    head = &overlapped->data.mw.desc->internal;
    if (NULL != overlapped->data.mw.timer) {
      PR_ASSERT(PR_INTERVAL_NO_TIMEOUT != overlapped->data.mw.desc->timeout);
      CancelTimer(overlapped->data.mw.timer);
    } else {
      PR_ASSERT(PR_INTERVAL_NO_TIMEOUT == overlapped->data.mw.desc->timeout);
    }
    PR_DELETE(overlapped);
#endif
    recv_wait = (PRRecvWait*)head;
  }
#ifdef WINNT
invalid_arg:
  _PR_MD_UNLOCK(&group->mdlock);
#endif
  PR_Unlock(group->ml);

  return recv_wait;
/* PR_CancelWaitGroup */

PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */) {
  PRWaitGroup* wg;

  if (NULL == (wg = PR_NEWZAP(PRWaitGroup))) {
    PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
    goto failed;
  }
  /* the wait group itself */
  wg->ml = PR_NewLock();
  if (NULL == wg->ml) {
    goto failed_lock;
  }
  wg->io_taken = PR_NewCondVar(wg->ml);
  if (NULL == wg->io_taken) {
    goto failed_cvar0;
  }
  wg->io_complete = PR_NewCondVar(wg->ml);
  if (NULL == wg->io_complete) {
    goto failed_cvar1;
  }
  wg->new_business = PR_NewCondVar(wg->ml);
  if (NULL == wg->new_business) {
    goto failed_cvar2;
  }
  wg->mw_manage = PR_NewCondVar(wg->ml);
  if (NULL == wg->mw_manage) {
    goto failed_cvar3;
  }

  PR_INIT_CLIST(&wg->group_link);
  PR_INIT_CLIST(&wg->io_ready);

  /* the waiters sequence */
  wg->waiter = (_PRWaiterHash*)PR_CALLOC(
      sizeof(_PRWaiterHash) + (_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*)));
  if (NULL == wg->waiter) {
    PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
    goto failed_waiter;
  }
  wg->waiter->count = 0;
  wg->waiter->length = _PR_DEFAULT_HASH_LENGTH;

#ifdef WINNT
  _PR_MD_NEW_LOCK(&wg->mdlock);
  PR_INIT_CLIST(&wg->wait_list);
#endif /* WINNT */

  PR_Lock(mw_lock);
  PR_APPEND_LINK(&wg->group_link, &mw_state->group_list);
  PR_Unlock(mw_lock);
  return wg;

failed_waiter:
  PR_DestroyCondVar(wg->mw_manage);
failed_cvar3:
  PR_DestroyCondVar(wg->new_business);
failed_cvar2:
  PR_DestroyCondVar(wg->io_complete);
failed_cvar1:
  PR_DestroyCondVar(wg->io_taken);
failed_cvar0:
  PR_DestroyLock(wg->ml);
failed_lock:
  PR_DELETE(wg);
  wg = NULL;

failed:
  return wg;
/* MW_CreateWaitGroup */

PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup* group) {
  PRStatus rv = PR_SUCCESS;
  if (NULL == group) {
    group = mw_state->group;
  }
  PR_ASSERT(NULL != group);
  if (NULL != group) {
    PR_Lock(group->ml);
    if ((group->waiting_threads == 0) && (group->waiter->count == 0) &&
        PR_CLIST_IS_EMPTY(&group->io_ready)) {
      group->state = _prmw_stopped;
    } else {
      PR_SetError(PR_INVALID_STATE_ERROR, 0);
      rv = PR_FAILURE;
    }
    PR_Unlock(group->ml);
    if (PR_FAILURE == rv) {
      return rv;
    }

    PR_Lock(mw_lock);
    PR_REMOVE_LINK(&group->group_link);
    PR_Unlock(mw_lock);

#ifdef WINNT
    /*
     * XXX make sure wait_list is empty and waiter is empty.
     * These must be checked while holding mdlock.
     */

    _PR_MD_FREE_LOCK(&group->mdlock);
#endif

    PR_DELETE(group->waiter);
    PR_DELETE(group->polling_list);
    PR_DestroyCondVar(group->mw_manage);
    PR_DestroyCondVar(group->new_business);
    PR_DestroyCondVar(group->io_complete);
    PR_DestroyCondVar(group->io_taken);
    PR_DestroyLock(group->ml);
    if (group == mw_state->group) {
      mw_state->group = NULL;
    }
    PR_DELETE(group);
  } else {
    /* The default wait group is not created yet. */
    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
    rv = PR_FAILURE;
  }
  return rv;
/* PR_DestroyWaitGroup */

/**********************************************************************
***********************************************************************
******************** Wait group enumerations **************************
***********************************************************************
**********************************************************************/


PR_IMPLEMENT(PRMWaitEnumerator*) PR_CreateMWaitEnumerator(PRWaitGroup* group) {
  PRMWaitEnumerator* enumerator = PR_NEWZAP(PRMWaitEnumerator);
  if (NULL == enumerator) {
    PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
  } else {
    enumerator->group = group;
    enumerator->seal = _PR_ENUM_SEALED;
  }
  return enumerator;
/* PR_CreateMWaitEnumerator */

PR_IMPLEMENT(PRStatus)
PR_DestroyMWaitEnumerator(PRMWaitEnumerator* enumerator) {
  PR_ASSERT(NULL != enumerator);
  PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
  if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal)) {
    PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
    return PR_FAILURE;
  }
  enumerator->seal = _PR_ENUM_UNSEALED;
  PR_Free(enumerator);
  return PR_SUCCESS;
/* PR_DestroyMWaitEnumerator */

PR_IMPLEMENT(PRRecvWait*)
PR_EnumerateWaitGroup(PRMWaitEnumerator* enumerator,
                      const PRRecvWait* previous) {
  PRRecvWait* result = NULL;

  /* entry point sanity checking */
  PR_ASSERT(NULL != enumerator);
  PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
  if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal)) {
    goto bad_argument;
  }

  /* beginning of enumeration */
  if (NULL == previous) {
    if (NULL == enumerator->group) {
      enumerator->group = mw_state->group;
      if (NULL == enumerator->group) {
        PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
        return NULL;
      }
    }
    enumerator->waiter = &enumerator->group->waiter->recv_wait;
    enumerator->p_timestamp = enumerator->group->p_timestamp;
    enumerator->thread = PR_GetCurrentThread();
    enumerator->index = 0;
  }
  /* continuing an enumeration */
  else {
    PRThread* me = PR_GetCurrentThread();
    PR_ASSERT(me == enumerator->thread);
    if (me != enumerator->thread) {
      goto bad_argument;
    }

    /* need to restart the enumeration */
    if (enumerator->p_timestamp != enumerator->group->p_timestamp) {
      return PR_EnumerateWaitGroup(enumerator, NULL);
    }
  }

  /* actually progress the enumeration */
#if defined(WINNT)
  _PR_MD_LOCK(&enumerator->group->mdlock);
#else
  PR_Lock(enumerator->group->ml);
#endif
  while (enumerator->index++ < enumerator->group->waiter->length) {
    if (NULL != (result = *(enumerator->waiter)++)) {
      break;
    }
  }
#if defined(WINNT)
  _PR_MD_UNLOCK(&enumerator->group->mdlock);
#else
  PR_Unlock(enumerator->group->ml);
#endif

  return result; /* what we live for */

bad_argument:
  PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
  return NULL; /* probably ambiguous */
/* PR_EnumerateWaitGroup */

/* prmwait.c */

Messung V0.5
C=93 H=69 G=81

¤ Dauer der Verarbeitung: 0.17 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.