Anforderungen  |   Konzepte  |   Entwurf  |   Entwicklung  |   Qualitätssicherung  |   Lebenszyklus  |   Steuerung
 
 
 
 


Quelle  provider.c   Sprache: C

 
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */


/*
 *
 * Notes:
 * [1] lth. The call to Sleep() is a hack to get the test case to run
 * on Windows 95. Without it, the test case fails with an error
 * WSAECONNRESET following a recv() call. The error is caused by the
 * server side thread termination without a shutdown() or closesocket()
 * call. Windows docmunentation suggests that this is predicted
 * behavior; that other platforms get away with it is ... serindipity.
 * The test case should shutdown() or closesocket() before
 * thread termination. I didn't have time to figure out where or how
 * to do it. The Sleep() call inserts enough delay to allow the
 * client side to recv() all his data before the server side thread
 * terminates. Whew! ...
 *
 ** Modification History:
 * 14-May-97 AGarcia- Converted the test to accomodate the debug_mode flag.
 *             The debug mode will print all of the printfs associated with this
 *test. The regress mode will be the default mode. Since the regress tool limits
 *           the output to a one line status:PASS or FAIL,all of the printf
 *statements have been handled with an if (debug_mode) statement.
 */


#include "prclist.h"
#include "prcvar.h"
#include "prerror.h"
#include "prinit.h"
#include "prinrval.h"
#include "prio.h"
#include "prlock.h"
#include "prlog.h"
#include "prtime.h"
#include "prmem.h"
#include "prnetdb.h"
#include "prprf.h"
#include "prthread.h"

#include "pprio.h"
#include "primpl.h"

#include "plstr.h"
#include "plerror.h"
#include "plgetopt.h"

#include <stdlib.h>
#include <string.h>

#if defined(XP_UNIX)
#  include <math.h>
#endif

/*
** This is the beginning of the test
*/


#ifdef DEBUG
#  define PORT_INC_DO +100
#else
#  define PORT_INC_DO
#endif
#ifdef IS_64
#  define PORT_INC_3264 +200
#else
#  define PORT_INC_3264
#endif

#define RECV_FLAGS 0
#define SEND_FLAGS 0
#define BUFFER_SIZE 1024
#define DEFAULT_BACKLOG 5
#define DEFAULT_PORT 13000 PORT_INC_DO PORT_INC_3264
#define DEFAULT_CLIENTS 1
#define ALLOWED_IN_ACCEPT 1
#define DEFAULT_CLIPPING 1000
#define DEFAULT_WORKERS_MIN 1
#define DEFAULT_WORKERS_MAX 1
#define DEFAULT_SERVER "localhost"
#define DEFAULT_EXECUTION_TIME 10
#define DEFAULT_CLIENT_TIMEOUT 4000
#define DEFAULT_SERVER_TIMEOUT 4000
#define DEFAULT_SERVER_PRIORITY PR_PRIORITY_HIGH

typedef enum CSState_e { cs_init, cs_run, cs_stop, cs_exit } CSState_t;

static void PR_CALLBACK Worker(void* arg);
typedef struct CSPool_s CSPool_t;
typedef struct CSWorker_s CSWorker_t;
typedef struct CSServer_s CSServer_t;
typedef enum Verbosity {
  TEST_LOG_ALWAYS,
  TEST_LOG_ERROR,
  TEST_LOG_WARNING,
  TEST_LOG_NOTICE,
  TEST_LOG_INFO,
  TEST_LOG_STATUS,
  TEST_LOG_VERBOSE
} Verbosity;

static enum {
  thread_nspr,
  thread_pthread,
  thread_sproc,
  thread_win32
} thread_provider;

static PRInt32 domain = AF_INET;
static PRInt32 protocol = 6; /* TCP */
static PRFileDesc* debug_out = NULL;
static PRBool debug_mode = PR_FALSE;
static PRBool pthread_stats = PR_FALSE;
static Verbosity verbosity = TEST_LOG_ALWAYS;
static PRThreadScope thread_scope = PR_LOCAL_THREAD;

struct CSWorker_s {
  PRCList element; /* list of the server's workers */

  PRThread* thread;   /* this worker objects thread */
  CSServer_t* server; /* back pointer to server structure */
};

struct CSPool_s {
  PRCondVar* exiting;
  PRCondVar* acceptComplete;
  PRUint32 accepting, active, workers;
};

struct CSServer_s {
  PRCList list; /* head of worker list */

  PRLock* ml;
  PRThread* thread; /* the main server thread */
  PRCondVar* stateChange;

  PRUint16 port;        /* port we're listening on */
  PRUint32 backlog;     /* size of our listener backlog */
  PRFileDesc* listener; /* the fd accepting connections */

  CSPool_t pool;   /* statistics on worker threads */
  CSState_t state; /* the server's state */
  struct           /* controlling worker counts */
  {
    PRUint32 minimum, maximum, accepting;
  } workers;

  /* statistics */
  PRIntervalTime started, stopped;
  PRUint32 operations, bytesTransferred;
};

typedef struct CSDescriptor_s {
  PRInt32 size;      /* size of transfer */
  char filename[60]; /* filename, null padded */
} CSDescriptor_t;

typedef struct CSClient_s {
  PRLock* ml;
  PRThread* thread;
  PRCondVar* stateChange;
  PRNetAddr serverAddress;

  CSState_t state;

  /* statistics */
  PRIntervalTime started, stopped;
  PRUint32 operations, bytesTransferred;
} CSClient_t;

#define TEST_LOG(l, p, a)                         \
  do {                                            \
    if (debug_mode || (p <= verbosity)) printf a; \
  } while (0)

PRLogModuleInfo* cltsrv_log_file = NULL;

#define MY_ASSERT(_expr) \
  ((_expr) ? ((void)0) : _MY_Assert(#_expr, __FILE__, __LINE__))

#define TEST_ASSERT(_expr) \
  ((_expr) ? ((void)0) : _MY_Assert(#_expr, __FILE__, __LINE__))

static void _MY_Assert(const char* s, const char* file, PRIntn ln) {
  PL_PrintError(NULL);
  PR_Assert(s, file, ln);
/* _MY_Assert */

static PRBool Aborted(PRStatus rv) {
  return ((PR_FAILURE == rv) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError()))
             ? PR_TRUE
             : PR_FALSE;
}

static void TimeOfDayMessage(const char* msg, PRThread* me) {
  char buffer[100];
  PRExplodedTime tod;
  PR_ExplodeTime(PR_Now(), PR_LocalTimeParameters, &tod);
  (void)PR_FormatTime(buffer, sizeof(buffer), "%H:%M:%S", &tod);

  TEST_LOG(cltsrv_log_file, TEST_LOG_ALWAYS,
           ("%s(0x%p): %s\n", msg, me, buffer));
/* TimeOfDayMessage */

static void PR_CALLBACK Client(void* arg) {
  PRStatus rv;
  PRIntn index;
  char buffer[1024];
  PRFileDesc* fd = NULL;
  PRUintn clipping = DEFAULT_CLIPPING;
  CSClient_t* client = (CSClient_t*)arg;
  PRThread* me = client->thread = PR_GetCurrentThread();
  CSDescriptor_t* descriptor = PR_NEW(CSDescriptor_t);
  PRIntervalTime timeout = PR_MillisecondsToInterval(DEFAULT_CLIENT_TIMEOUT);

  for (index = 0; index < sizeof(buffer); ++index) {
    buffer[index] = (char)index;
  }

  client->started = PR_IntervalNow();

  PR_Lock(client->ml);
  client->state = cs_run;
  PR_NotifyCondVar(client->stateChange);
  PR_Unlock(client->ml);

  TimeOfDayMessage("Client started at", me);

  while (cs_run == client->state) {
    PRInt32 bytes, descbytes, filebytes, netbytes;

    (void)PR_NetAddrToString(&client->serverAddress, buffer, sizeof(buffer));
    TEST_LOG(cltsrv_log_file, TEST_LOG_INFO,
             ("\tClient(0x%p): connecting to server at %s\n", me, buffer));

    fd = PR_Socket(domain, SOCK_STREAM, protocol);
    TEST_ASSERT(NULL != fd);
    rv = PR_Connect(fd, &client->serverAddress, timeout);
    if (PR_FAILURE == rv) {
      TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
               ("\tClient(0x%p): conection failed\n", me));
      goto aborted;
    }

    memset(descriptor, 0, sizeof(*descriptor));
    descriptor->size = PR_htonl(descbytes = rand() % clipping);
    PR_snprintf(descriptor->filename, sizeof(descriptor->filename),
                "CS%p%p-%p.dat", client->started, me, client->operations);
    TEST_LOG(
        cltsrv_log_file, TEST_LOG_VERBOSE,
        ("\tClient(0x%p): sending descriptor for %u bytes\n", me, descbytes));
    bytes = PR_Send(fd, descriptor, sizeof(*descriptor), SEND_FLAGS, timeout);
    if (sizeof(CSDescriptor_t) != bytes) {
      if (Aborted(PR_FAILURE)) {
        goto aborted;
      }
      if (PR_IO_TIMEOUT_ERROR == PR_GetError()) {
        TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
                 ("\tClient(0x%p): send descriptor timeout\n", me));
        goto retry;
      }
    }
    TEST_ASSERT(sizeof(*descriptor) == bytes);

    netbytes = 0;
    while (netbytes < descbytes) {
      filebytes = sizeof(buffer);
      if ((descbytes - netbytes) < filebytes) {
        filebytes = descbytes - netbytes;
      }
      TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
               ("\tClient(0x%p): sending %d bytes\n", me, filebytes));
      bytes = PR_Send(fd, buffer, filebytes, SEND_FLAGS, timeout);
      if (filebytes != bytes) {
        if (Aborted(PR_FAILURE)) {
          goto aborted;
        }
        if (PR_IO_TIMEOUT_ERROR == PR_GetError()) {
          TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
                   ("\tClient(0x%p): send data timeout\n", me));
          goto retry;
        }
      }
      TEST_ASSERT(bytes == filebytes);
      netbytes += bytes;
    }
    filebytes = 0;
    while (filebytes < descbytes) {
      netbytes = sizeof(buffer);
      if ((descbytes - filebytes) < netbytes) {
        netbytes = descbytes - filebytes;
      }
      TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
               ("\tClient(0x%p): receiving %d bytes\n", me, netbytes));
      bytes = PR_Recv(fd, buffer, netbytes, RECV_FLAGS, timeout);
      if (-1 == bytes) {
        if (Aborted(PR_FAILURE)) {
          TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
                   ("\tClient(0x%p): receive data aborted\n", me));
          goto aborted;
        } else if (PR_IO_TIMEOUT_ERROR == PR_GetError())
          TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
                   ("\tClient(0x%p): receive data timeout\n", me));
        else
          TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
                   ("\tClient(0x%p): receive error (%d, %d)\n", me,
                    PR_GetError(), PR_GetOSError()));
        goto retry;
      }
      if (0 == bytes) {
        TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
                 ("\t\tClient(0x%p): unexpected end of stream\n",
                  PR_GetCurrentThread()));
        break;
      }
      filebytes += bytes;
    }

    rv = PR_Shutdown(fd, PR_SHUTDOWN_BOTH);
    if (Aborted(rv)) {
      goto aborted;
    }
    TEST_ASSERT(PR_SUCCESS == rv);
  retry:
    (void)PR_Close(fd);
    fd = NULL;
    TEST_LOG(cltsrv_log_file, TEST_LOG_INFO,
             ("\tClient(0x%p): disconnected from server\n", me));

    PR_Lock(client->ml);
    client->operations += 1;
    client->bytesTransferred += 2 * descbytes;
    rv = PR_WaitCondVar(client->stateChange, rand() % clipping);
    PR_Unlock(client->ml);
    if (Aborted(rv)) {
      break;
    }
  }

aborted:
  client->stopped = PR_IntervalNow();

  PR_ClearInterrupt();
  if (NULL != fd) {
    rv = PR_Close(fd);
  }

  PR_Lock(client->ml);
  client->state = cs_exit;
  PR_NotifyCondVar(client->stateChange);
  PR_Unlock(client->ml);
  PR_DELETE(descriptor);
  TEST_LOG(
      cltsrv_log_file, TEST_LOG_ALWAYS,
      ("\tClient(0x%p): stopped after %u operations and %u bytes\n",
       PR_GetCurrentThread(), client->operations, client->bytesTransferred));

/* Client */

static PRStatus ProcessRequest(PRFileDesc* fd, CSServer_t* server) {
  PRStatus drv, rv;
  char buffer[1024];
  PRFileDesc* file = NULL;
  PRThread* me = PR_GetCurrentThread();
  PRInt32 bytes, descbytes, netbytes, filebytes = 0;
  CSDescriptor_t* descriptor = PR_NEW(CSDescriptor_t);
  PRIntervalTime timeout = PR_MillisecondsToInterval(DEFAULT_SERVER_TIMEOUT);

  TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
           ("\tProcessRequest(0x%p): receiving desciptor\n", me));
  bytes = PR_Recv(fd, descriptor, sizeof(*descriptor), RECV_FLAGS, timeout);
  if (-1 == bytes) {
    rv = PR_FAILURE;
    if (Aborted(rv)) {
      goto exit;
    }
    if (PR_IO_TIMEOUT_ERROR == PR_GetError()) {
      TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
               ("\tProcessRequest(0x%p): receive timeout\n", me));
    }
    goto exit;
  }
  if (0 == bytes) {
    rv = PR_FAILURE;
    TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
             ("\tProcessRequest(0x%p): unexpected end of file\n", me));
    goto exit;
  }
  descbytes = PR_ntohl(descriptor->size);
  TEST_ASSERT(sizeof(*descriptor) == bytes);

  TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
           ("\t\tProcessRequest(0x%p): read descriptor {%d, %s}\n", me,
            descbytes, descriptor->filename));

  file = PR_Open(descriptor->filename, (PR_CREATE_FILE | PR_WRONLY), 0666);
  if (NULL == file) {
    rv = PR_FAILURE;
    if (Aborted(rv)) {
      goto aborted;
    }
    if (PR_IO_TIMEOUT_ERROR == PR_GetError()) {
      TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
               ("\tProcessRequest(0x%p): open file timeout\n", me));
      goto aborted;
    }
  }
  TEST_ASSERT(NULL != file);

  filebytes = 0;
  while (filebytes < descbytes) {
    netbytes = sizeof(buffer);
    if ((descbytes - filebytes) < netbytes) {
      netbytes = descbytes - filebytes;
    }
    TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
             ("\tProcessRequest(0x%p): receive %d bytes\n", me, netbytes));
    bytes = PR_Recv(fd, buffer, netbytes, RECV_FLAGS, timeout);
    if (-1 == bytes) {
      rv = PR_FAILURE;
      if (Aborted(rv)) {
        goto aborted;
      }
      if (PR_IO_TIMEOUT_ERROR == PR_GetError()) {
        TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
                 ("\t\tProcessRequest(0x%p): receive data timeout\n", me));
        goto aborted;
      }
      /*
       * XXX: I got (PR_CONNECT_RESET_ERROR, ERROR_NETNAME_DELETED)
       * on NT here.  This is equivalent to ECONNRESET on Unix.
       *     -wtc
       */

      TEST_LOG(cltsrv_log_file, TEST_LOG_WARNING,
               ("\t\tProcessRequest(0x%p): unexpected error (%d, %d)\n", me,
                PR_GetError(), PR_GetOSError()));
      goto aborted;
    }
    if (0 == bytes) {
      TEST_LOG(cltsrv_log_file, TEST_LOG_WARNING,
               ("\t\tProcessRequest(0x%p): unexpected end of stream\n", me));
      rv = PR_FAILURE;
      goto aborted;
    }
    filebytes += bytes;
    netbytes = bytes;
    /* The byte count for PR_Write should be positive */
    MY_ASSERT(netbytes > 0);
    TEST_LOG(
        cltsrv_log_file, TEST_LOG_VERBOSE,
        ("\tProcessRequest(0x%p): write %d bytes to file\n", me, netbytes));
    bytes = PR_Write(file, buffer, netbytes);
    if (netbytes != bytes) {
      rv = PR_FAILURE;
      if (Aborted(rv)) {
        goto aborted;
      }
      if (PR_IO_TIMEOUT_ERROR == PR_GetError()) {
        TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
                 ("\t\tProcessRequest(0x%p): write file timeout\n", me));
        goto aborted;
      }
    }
    TEST_ASSERT(bytes > 0);
  }

  PR_Lock(server->ml);
  server->operations += 1;
  server->bytesTransferred += filebytes;
  PR_Unlock(server->ml);

  rv = PR_Close(file);
  file = NULL;
  if (Aborted(rv)) {
    goto aborted;
  }
  TEST_ASSERT(PR_SUCCESS == rv);

  TEST_LOG(
      cltsrv_log_file, TEST_LOG_VERBOSE,
      ("\t\tProcessRequest(0x%p): opening %s\n", me, descriptor->filename));
  file = PR_Open(descriptor->filename, PR_RDONLY, 0);
  if (NULL == file) {
    rv = PR_FAILURE;
    if (Aborted(rv)) {
      goto aborted;
    }
    if (PR_IO_TIMEOUT_ERROR == PR_GetError()) {
      TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
               ("\t\tProcessRequest(0x%p): open file timeout\n",
                PR_GetCurrentThread()));
      goto aborted;
    }
    TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
             ("\t\tProcessRequest(0x%p): other file open error (%u, %u)\n", me,
              PR_GetError(), PR_GetOSError()));
    goto aborted;
  }
  TEST_ASSERT(NULL != file);

  netbytes = 0;
  while (netbytes < descbytes) {
    filebytes = sizeof(buffer);
    if ((descbytes - netbytes) < filebytes) {
      filebytes = descbytes - netbytes;
    }
    TEST_LOG(
        cltsrv_log_file, TEST_LOG_VERBOSE,
        ("\tProcessRequest(0x%p): read %d bytes from file\n", me, filebytes));
    bytes = PR_Read(file, buffer, filebytes);
    if (filebytes != bytes) {
      rv = PR_FAILURE;
      if (Aborted(rv)) {
        goto aborted;
      }
      if (PR_IO_TIMEOUT_ERROR == PR_GetError())
        TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
                 ("\t\tProcessRequest(0x%p): read file timeout\n", me));
      else
        TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
                 ("\t\tProcessRequest(0x%p): other file error (%d, %d)\n", me,
                  PR_GetError(), PR_GetOSError()));
      goto aborted;
    }
    TEST_ASSERT(bytes > 0);
    netbytes += bytes;
    filebytes = bytes;
    TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
             ("\t\tProcessRequest(0x%p): sending %d bytes\n", me, filebytes));
    bytes = PR_Send(fd, buffer, filebytes, SEND_FLAGS, timeout);
    if (filebytes != bytes) {
      rv = PR_FAILURE;
      if (Aborted(rv)) {
        goto aborted;
      }
      if (PR_IO_TIMEOUT_ERROR == PR_GetError()) {
        TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
                 ("\t\tProcessRequest(0x%p): send data timeout\n", me));
        goto aborted;
      }
      break;
    }
    TEST_ASSERT(bytes > 0);
  }

  PR_Lock(server->ml);
  server->bytesTransferred += filebytes;
  PR_Unlock(server->ml);

  rv = PR_Shutdown(fd, PR_SHUTDOWN_BOTH);
  if (Aborted(rv)) {
    goto aborted;
  }

  rv = PR_Close(file);
  file = NULL;
  if (Aborted(rv)) {
    goto aborted;
  }
  TEST_ASSERT(PR_SUCCESS == rv);

aborted:
  PR_ClearInterrupt();
  if (NULL != file) {
    PR_Close(file);
  }
  drv = PR_Delete(descriptor->filename);
  TEST_ASSERT(PR_SUCCESS == drv);
exit:
  TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
           ("\t\tProcessRequest(0x%p): Finished\n", me));

  PR_DELETE(descriptor);

#if defined(WIN95)
  PR_Sleep(PR_MillisecondsToInterval(200)); /* lth. see note [1] */
#endif
  return rv;
/* ProcessRequest */

typedef void (*StartFn)(void*);
typedef struct StartObject {
  StartFn start;
  void* arg;
} StartObject;

#if defined(_PR_PTHREADS)
#  include "md/_pth.h"
#  include <pthread.h>

static void* pthread_start(void* arg) {
  StartObject* so = (StartObject*)arg;
  StartFn start = so->start;
  void* data = so->arg;
  PR_Free(so);
  start(data);
  return NULL;
/* pthread_start */
#endif /* defined(_PR_PTHREADS) */

#if defined(WIN32)
#  include <process.h> /* for _beginthreadex() */

static PRUintn __stdcall windows_start(void* arg) {
  StartObject* so = (StartObject*)arg;
  StartFn start = so->start;
  void* data = so->arg;
  PR_Free(so);
  start(data);
  return 0;
/* windows_start */
#endif /* defined(WIN32) */

static PRStatus JoinThread(PRThread* thread) {
  PRStatus rv;
  switch (thread_provider) {
    case thread_nspr:
      rv = PR_JoinThread(thread);
      break;
    case thread_pthread:
#if defined(_PR_PTHREADS)
      rv = PR_SUCCESS;
      break;
#endif /* defined(_PR_PTHREADS) */
    case thread_win32:
#if defined(WIN32)
      rv = PR_SUCCESS;
      break;
#endif
    default:
      rv = PR_FAILURE;
      break;
  }
  return rv;
/* JoinThread */

static PRStatus NewThread(StartFn start, void* arg, PRThreadPriority prio,
                          PRThreadState state) {
  PRStatus rv;

  switch (thread_provider) {
    case thread_nspr: {
      PRThread* thread =
          PR_CreateThread(PR_USER_THREAD, start, arg, PR_PRIORITY_NORMAL,
                          PR_GLOBAL_THREAD, PR_JOINABLE_THREAD, 0);
      rv = (NULL == thread) ? PR_FAILURE : PR_SUCCESS;
    } break;
    case thread_pthread:
#if defined(_PR_PTHREADS)
    {
      int rv;
      pthread_t id;
      pthread_attr_t tattr;
      StartObject* start_object;
      start_object = PR_NEW(StartObject);
      PR_ASSERT(NULL != start_object);
      start_object->start = start;
      start_object->arg = arg;

      rv = _PT_PTHREAD_ATTR_INIT(&tattr);
      PR_ASSERT(0 == rv);

      rv = pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
      PR_ASSERT(0 == rv);

      rv = pthread_attr_setstacksize(&tattr, 64 * 1024);
      PR_ASSERT(0 == rv);

      rv = _PT_PTHREAD_CREATE(&id, tattr, pthread_start, start_object);
      (void)_PT_PTHREAD_ATTR_DESTROY(&tattr);
      return (0 == rv) ? PR_SUCCESS : PR_FAILURE;
    }
#else
      PR_SetError(PR_NOT_IMPLEMENTED_ERROR, 0);
      rv = PR_FAILURE;
#endif /* defined(_PR_PTHREADS) */
    break;

    case thread_sproc:
      PR_SetError(PR_NOT_IMPLEMENTED_ERROR, 0);
      rv = PR_FAILURE;
      break;
    case thread_win32:
#if defined(WIN32)
    {
      void* th;
      PRUintn id;
      StartObject* start_object;
      start_object = PR_NEW(StartObject);
      PR_ASSERT(NULL != start_object);
      start_object->start = start;
      start_object->arg = arg;
      th = (void*)_beginthreadex(
          NULL, /* LPSECURITY_ATTRIBUTES - pointer to thread security attributes
                 */

          0U,   /* DWORD - initial thread stack size, in bytes */
          windows_start, /* LPTHREAD_START_ROUTINE - pointer to thread function
                          */

          start_object,  /* LPVOID - argument for new thread */
          STACK_SIZE_PARAM_IS_A_RESERVATION, /*DWORD dwCreationFlags - creation
                                                flags */

          &id /* LPDWORD - pointer to returned thread identifier */);

      rv = (NULL == th) ? PR_FAILURE : PR_SUCCESS;
    }
#else
      PR_SetError(PR_NOT_IMPLEMENTED_ERROR, 0);
      rv = PR_FAILURE;
#endif
    break;
    default:
      PR_SetError(PR_NOT_IMPLEMENTED_ERROR, 0);
      rv = PR_FAILURE;
  }
  return rv;
/* NewThread */

static PRStatus CreateWorker(CSServer_t* server, CSPool_t* pool) {
  PRStatus rv;
  CSWorker_t* worker = PR_NEWZAP(CSWorker_t);
  worker->server = server;
  PR_INIT_CLIST(&worker->element);
  rv = NewThread(Worker, worker, DEFAULT_SERVER_PRIORITY, PR_UNJOINABLE_THREAD);
  if (PR_FAILURE == rv) {
    PR_DELETE(worker);
  }

  TEST_LOG(cltsrv_log_file, TEST_LOG_STATUS,
           ("\tCreateWorker(0x%p): create new worker (0x%p)\n",
            PR_GetCurrentThread(), worker->thread));

  return rv;
/* CreateWorker */

static void PR_CALLBACK Worker(void* arg) {
  PRStatus rv;
  PRNetAddr from;
  PRFileDesc* fd = NULL;
  CSWorker_t* worker = (CSWorker_t*)arg;
  CSServer_t* server = worker->server;
  CSPool_t* pool = &server->pool;

  PRThread* me = worker->thread = PR_GetCurrentThread();

  TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE,
           ("\t\tWorker(0x%p): started [%u]\n", me, pool->workers + 1));

  PR_Lock(server->ml);
  PR_APPEND_LINK(&worker->element, &server->list);
  pool->workers += 1; /* define our existance */

  while (cs_run == server->state) {
    while (pool->accepting >= server->workers.accepting) {
      TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
               ("\t\tWorker(0x%p): waiting for accept slot[%d]\n", me,
                pool->accepting));
      rv = PR_WaitCondVar(pool->acceptComplete, PR_INTERVAL_NO_TIMEOUT);
      if (Aborted(rv) || (cs_run != server->state)) {
        TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE,
                 ("\tWorker(0x%p): has been %s\n", me,
                  (Aborted(rv) ? "interrupted" : "stopped")));
        goto exit;
      }
    }
    pool->accepting += 1; /* how many are really in accept */
    PR_Unlock(server->ml);

    TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
             ("\t\tWorker(0x%p): calling accept\n", me));
    fd = PR_Accept(server->listener, &from, PR_INTERVAL_NO_TIMEOUT);

    PR_Lock(server->ml);
    pool->accepting -= 1;
    PR_NotifyCondVar(pool->acceptComplete);

    if ((NULL == fd) && Aborted(PR_FAILURE)) {
      if (NULL != server->listener) {
        PR_Close(server->listener);
        server->listener = NULL;
      }
      goto exit;
    }

    if (NULL != fd) {
      /*
      ** Create another worker of the total number of workers is
      ** less than the minimum specified or we have none left in
      ** accept() AND we're not over the maximum.
      ** This sort of presumes that the number allowed in accept
      ** is at least as many as the minimum. Otherwise we'll keep
      ** creating new threads and deleting them soon after.
      */

      PRBool another = ((pool->workers < server->workers.minimum) ||
                        ((0 == pool->accepting) &&
                         (pool->workers < server->workers.maximum)))
                           ? PR_TRUE
                           : PR_FALSE;
      pool->active += 1;
      PR_Unlock(server->ml);

      if (another) {
        (void)CreateWorker(server, pool);
      }

      rv = ProcessRequest(fd, server);
      if (PR_SUCCESS != rv)
        TEST_LOG(cltsrv_log_file, TEST_LOG_ERROR,
                 ("\t\tWorker(0x%p): server process ended abnormally\n", me));
      (void)PR_Close(fd);
      fd = NULL;

      PR_Lock(server->ml);
      pool->active -= 1;
    }
  }

exit:
  PR_ClearInterrupt();
  PR_Unlock(server->ml);

  if (NULL != fd) {
    (void)PR_Shutdown(fd, PR_SHUTDOWN_BOTH);
    (void)PR_Close(fd);
  }

  TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE,
           ("\t\tWorker(0x%p): exiting [%u]\n", PR_GetCurrentThread(),
            pool->workers));

  PR_Lock(server->ml);
  pool->workers -= 1; /* undefine our existance */
  PR_REMOVE_AND_INIT_LINK(&worker->element);
  PR_NotifyCondVar(pool->exiting);
  PR_Unlock(server->ml);

  PR_DELETE(worker); /* destruction of the "worker" object */

/* Worker */

static void PR_CALLBACK Server(void* arg) {
  PRStatus rv;
  PRNetAddr serverAddress;
  CSServer_t* server = (CSServer_t*)arg;
  PRThread* me = server->thread = PR_GetCurrentThread();
  PRSocketOptionData sockOpt;

  server->listener = PR_Socket(domain, SOCK_STREAM, protocol);

  sockOpt.option = PR_SockOpt_Reuseaddr;
  sockOpt.value.reuse_addr = PR_TRUE;
  rv = PR_SetSocketOption(server->listener, &sockOpt);
  TEST_ASSERT(PR_SUCCESS == rv);

  memset(&serverAddress, 0, sizeof(serverAddress));
  rv = PR_InitializeNetAddr(PR_IpAddrAny, DEFAULT_PORT, &serverAddress);

  rv = PR_Bind(server->listener, &serverAddress);
  TEST_ASSERT(PR_SUCCESS == rv);

  rv = PR_Listen(server->listener, server->backlog);
  TEST_ASSERT(PR_SUCCESS == rv);

  server->started = PR_IntervalNow();
  TimeOfDayMessage("Server started at", me);

  PR_Lock(server->ml);
  server->state = cs_run;
  PR_NotifyCondVar(server->stateChange);
  PR_Unlock(server->ml);

  /*
  ** Create the first worker (actually, a thread that accepts
  ** connections and then processes the work load as needed).
  ** From this point on, additional worker threads are created
  ** as they are needed by existing worker threads.
  */

  rv = CreateWorker(server, &server->pool);
  TEST_ASSERT(PR_SUCCESS == rv);

  /*
  ** From here on this thread is merely hanging around as the contact
  ** point for the main test driver. It's just waiting for the driver
  ** to declare the test complete.
  */

  TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
           ("\tServer(0x%p): waiting for state change\n", me));

  PR_Lock(server->ml);
  while ((cs_run == server->state) && !Aborted(rv)) {
    rv = PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT);
  }
  PR_Unlock(server->ml);
  PR_ClearInterrupt();

  TEST_LOG(cltsrv_log_file, TEST_LOG_INFO,
           ("\tServer(0x%p): shutting down workers\n", me));

  /*
  ** Get all the worker threads to exit. They know how to
  ** clean up after themselves, so this is just a matter of
  ** waiting for clorine in the pool to take effect. During
  ** this stage we're ignoring interrupts.
  */

  server->workers.minimum = server->workers.maximum = 0;

  PR_Lock(server->ml);
  while (!PR_CLIST_IS_EMPTY(&server->list)) {
    PRCList* head = PR_LIST_HEAD(&server->list);
    CSWorker_t* worker = (CSWorker_t*)head;
    TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
             ("\tServer(0x%p): interrupting worker(0x%p)\n", me, worker));
    rv = PR_Interrupt(worker->thread);
    TEST_ASSERT(PR_SUCCESS == rv);
    PR_REMOVE_AND_INIT_LINK(head);
  }

  while (server->pool.workers > 0) {
    TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE,
             ("\tServer(0x%p): waiting for %u workers to exit\n", me,
              server->pool.workers));
    (void)PR_WaitCondVar(server->pool.exiting, PR_INTERVAL_NO_TIMEOUT);
  }

  server->state = cs_exit;
  PR_NotifyCondVar(server->stateChange);
  PR_Unlock(server->ml);

  TEST_LOG(cltsrv_log_file, TEST_LOG_ALWAYS,
           ("\tServer(0x%p): stopped after %u operations and %u bytes\n", me,
            server->operations, server->bytesTransferred));

  if (NULL != server->listener) {
    PR_Close(server->listener);
  }
  server->stopped = PR_IntervalNow();

/* Server */

static void WaitForCompletion(PRIntn execution) {
  while (execution > 0) {
    PRIntn dally = (execution > 30) ? 30 : execution;
    PR_Sleep(PR_SecondsToInterval(dally));
    if (pthread_stats) {
      PT_FPrintStats(debug_out, "\nPThread Statistics\n");
    }
    execution -= dally;
  }
/* WaitForCompletion */

static void Help(void) {
  PR_fprintf(debug_out, "cltsrv test program usage:\n");
  PR_fprintf(debug_out,
             "\t-a threads allowed in accept (5)\n");
  PR_fprintf(debug_out,
             "\t-b backlock for listen (5)\n");
  PR_fprintf(debug_out,
             "\t-c number of clients to create (1)\n");
  PR_fprintf(debug_out,
             "\t-w minimal number of server threads (1)\n");
  PR_fprintf(debug_out,
             "\t-W maximum number of server threads (1)\n");
  PR_fprintf(debug_out,
             "\t-e duration of the test in seconds (10)\n");
  PR_fprintf(debug_out,
             "\t-s dsn name of server (localhost)\n");
  PR_fprintf(debug_out,
             "\t-G use GLOBAL threads (LOCAL)\n");
  PR_fprintf(debug_out,
             "\t-T thread provider ('n' | 'p' | 'w')(n)\n");
  PR_fprintf(debug_out,
             "\t-X use XTP as transport (TCP)\n");
  PR_fprintf(debug_out,
             "\t-6 Use IPv6 (IPv4)\n");
  PR_fprintf(debug_out,
             "\t-v verbosity (accumulative) (0)\n");
  PR_fprintf(debug_out,
             "\t-p pthread statistics (FALSE)\n");
  PR_fprintf(debug_out,
             "\t-d debug mode (FALSE)\n");
  PR_fprintf(debug_out, "\t-h this message\n");
/* Help */

static Verbosity IncrementVerbosity(void) {
  PRIntn verboge = (PRIntn)verbosity + 1;
  return (Verbosity)verboge;
/* IncrementVerbosity */

int main(int argc, char** argv) {
  PRUintn index;
  PRBool boolean;
  CSClient_t* client;
  PRStatus rv, joinStatus;
  CSServer_t* server = NULL;
  char* thread_type;

  PRUintn backlog = DEFAULT_BACKLOG;
  PRUintn clients = DEFAULT_CLIENTS;
  const char* serverName = DEFAULT_SERVER;
  PRBool serverIsLocal = PR_TRUE;
  PRUintn accepting = ALLOWED_IN_ACCEPT;
  PRUintn workersMin = DEFAULT_WORKERS_MIN;
  PRUintn workersMax = DEFAULT_WORKERS_MAX;
  PRIntn execution = DEFAULT_EXECUTION_TIME;

  /*
   * -G           use global threads
   * -a <n>       threads allowed in accept
   * -b <n>       backlock for listen
   * -c <threads> number of clients to create
   * -w <threads> minimal number of server threads
   * -W <threads> maximum number of server threads
   * -e <seconds> duration of the test in seconds
   * -s <string>  dsn name of server (implies no server here)
   * -v           verbosity
   */


  PLOptStatus os;
  PLOptState* opt = PL_CreateOptState(argc, argv, "GX6b:a:c:w:W:e:s:T:vdhp");

#if defined(WIN32)
  thread_provider = thread_win32;
#elif defined(_PR_PTHREADS)
  thread_provider = thread_pthread;
#else
  thread_provider = thread_nspr;
#endif

  debug_out = PR_GetSpecialFD(PR_StandardError);

  while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) {
    if (PL_OPT_BAD == os) {
      continue;
    }
    switch (opt->option) {
      case 'G'/* use global threads */
        thread_scope = PR_GLOBAL_THREAD;
        break;
      case 'X'/* use XTP as transport */
        protocol = 36;
        break;
      case '6'/* Use IPv6 */
        domain = PR_AF_INET6;
        break;
      case 'a'/* the value for accepting */
        accepting = atoi(opt->value);
        break;
      case 'b'/* the value for backlock */
        backlog = atoi(opt->value);
        break;
      case 'T'/* the thread provider */
        if ('n' == *opt->value) {
          thread_provider = thread_nspr;
        } else if ('p' == *opt->value) {
          thread_provider = thread_pthread;
        } else if ('w' == *opt->value) {
          thread_provider = thread_win32;
        } else {
          Help();
          return 2;
        }
        break;
      case 'c'/* number of client threads */
        clients = atoi(opt->value);
        break;
      case 'w'/* minimum server worker threads */
        workersMin = atoi(opt->value);
        break;
      case 'W'/* maximum server worker threads */
        workersMax = atoi(opt->value);
        break;
      case 'e'/* program execution time in seconds */
        execution = atoi(opt->value);
        break;
      case 's'/* server's address */
        serverName = opt->value;
        break;
      case 'v'/* verbosity */
        verbosity = IncrementVerbosity();
        break;
      case 'd'/* debug mode */
        debug_mode = PR_TRUE;
        break;
      case 'p'/* pthread mode */
        pthread_stats = PR_TRUE;
        break;
      case 'h':
      default:
        Help();
        return 2;
    }
  }
  PL_DestroyOptState(opt);

  if (0 != PL_strcmp(serverName, DEFAULT_SERVER)) {
    serverIsLocal = PR_FALSE;
  }
  if (0 == execution) {
    execution = DEFAULT_EXECUTION_TIME;
  }
  if (0 == workersMax) {
    workersMax = DEFAULT_WORKERS_MAX;
  }
  if (0 == workersMin) {
    workersMin = DEFAULT_WORKERS_MIN;
  }
  if (0 == accepting) {
    accepting = ALLOWED_IN_ACCEPT;
  }
  if (0 == backlog) {
    backlog = DEFAULT_BACKLOG;
  }

  if (workersMin > accepting) {
    accepting = workersMin;
  }

  PR_STDIO_INIT();
  TimeOfDayMessage("Client/Server started at", PR_GetCurrentThread());

  cltsrv_log_file = PR_NewLogModule("cltsrv_log");
  MY_ASSERT(NULL != cltsrv_log_file);
  boolean = PR_SetLogFile("cltsrv.log");
  MY_ASSERT(boolean);

  if (serverIsLocal) {
    /* Establish the server */
    TEST_LOG(cltsrv_log_file, TEST_LOG_INFO,
             ("main(0x%p): starting server\n", PR_GetCurrentThread()));

    server = PR_NEWZAP(CSServer_t);
    PR_INIT_CLIST(&server->list);
    server->state = cs_init;
    server->ml = PR_NewLock();
    server->backlog = backlog;
    server->port = DEFAULT_PORT;
    server->workers.minimum = workersMin;
    server->workers.maximum = workersMax;
    server->workers.accepting = accepting;
    server->stateChange = PR_NewCondVar(server->ml);
    server->pool.exiting = PR_NewCondVar(server->ml);
    server->pool.acceptComplete = PR_NewCondVar(server->ml);

    TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE,
             ("main(0x%p): creating server thread\n", PR_GetCurrentThread()));

    rv = NewThread(Server, server, PR_PRIORITY_HIGH, PR_JOINABLE_THREAD);
    TEST_ASSERT(PR_SUCCESS == rv);

    TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
             ("main(0x%p): waiting for server init\n", PR_GetCurrentThread()));

    PR_Lock(server->ml);
    while (server->state == cs_init) {
      PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT);
    }
    PR_Unlock(server->ml);

    TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
             ("main(0x%p): server init complete (port #%d)\n",
              PR_GetCurrentThread(), server->port));
  }

  if (clients != 0) {
    /* Create all of the clients */
    PRHostEnt host;
    char buffer[BUFFER_SIZE];
    client = (CSClient_t*)PR_CALLOC(clients * sizeof(CSClient_t));

    TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
             ("main(0x%p): creating %d client threads\n", PR_GetCurrentThread(),
              clients));

    if (!serverIsLocal) {
      rv = PR_GetHostByName(serverName, buffer, BUFFER_SIZE, &host);
      if (PR_SUCCESS != rv) {
        PL_FPrintError(PR_STDERR, "PR_GetHostByName");
        return 2;
      }
    }

    for (index = 0; index < clients; ++index) {
      client[index].state = cs_init;
      client[index].ml = PR_NewLock();
      if (serverIsLocal) {
        (void)PR_InitializeNetAddr(PR_IpAddrLoopback, DEFAULT_PORT,
                                   &client[index].serverAddress);
      } else {
        (void)PR_EnumerateHostEnt(0, &host, DEFAULT_PORT,
                                  &client[index].serverAddress);
      }
      client[index].stateChange = PR_NewCondVar(client[index].ml);
      TEST_LOG(
          cltsrv_log_file, TEST_LOG_INFO,
          ("main(0x%p): creating client threads\n", PR_GetCurrentThread()));
      rv = NewThread(Client, &client[index], PR_PRIORITY_NORMAL,
                     PR_JOINABLE_THREAD);
      TEST_ASSERT(PR_SUCCESS == rv);
      PR_Lock(client[index].ml);
      while (cs_init == client[index].state) {
        PR_WaitCondVar(client[index].stateChange, PR_INTERVAL_NO_TIMEOUT);
      }
      PR_Unlock(client[index].ml);
    }
  }

  /* Then just let them go at it for a bit */
  TEST_LOG(cltsrv_log_file, TEST_LOG_ALWAYS,
           ("main(0x%p): waiting for execution interval (%d seconds)\n",
            PR_GetCurrentThread(), execution));

  WaitForCompletion(execution);

  TimeOfDayMessage("Shutting down", PR_GetCurrentThread());

  if (clients != 0) {
    for (index = 0; index < clients; ++index) {
      TEST_LOG(cltsrv_log_file, TEST_LOG_STATUS,
               ("main(0x%p): notifying client(0x%p) to stop\n",
                PR_GetCurrentThread(), client[index].thread));

      PR_Lock(client[index].ml);
      if (cs_run == client[index].state) {
        client[index].state = cs_stop;
        PR_Interrupt(client[index].thread);
        while (cs_stop == client[index].state)
          PR_WaitCondVar(client[index].stateChange, PR_INTERVAL_NO_TIMEOUT);
      }
      PR_Unlock(client[index].ml);

      TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
               ("main(0x%p): joining client(0x%p)\n", PR_GetCurrentThread(),
                client[index].thread));

      joinStatus = JoinThread(client[index].thread);
      TEST_ASSERT(PR_SUCCESS == joinStatus);
      PR_DestroyCondVar(client[index].stateChange);
      PR_DestroyLock(client[index].ml);
    }
    PR_DELETE(client);
  }

  if (NULL != server) {
    /* All clients joined - retrieve the server */
    TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE,
             ("main(0x%p): notifying server(0x%p) to stop\n",
              PR_GetCurrentThread(), server->thread));

    PR_Lock(server->ml);
    server->state = cs_stop;
    PR_Interrupt(server->thread);
    while (cs_exit != server->state) {
      PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT);
    }
    PR_Unlock(server->ml);

    TEST_LOG(cltsrv_log_file, TEST_LOG_NOTICE,
             ("main(0x%p): joining server(0x%p)\n", PR_GetCurrentThread(),
              server->thread));
    joinStatus = JoinThread(server->thread);
    TEST_ASSERT(PR_SUCCESS == joinStatus);

    PR_DestroyCondVar(server->stateChange);
    PR_DestroyCondVar(server->pool.exiting);
    PR_DestroyCondVar(server->pool.acceptComplete);
    PR_DestroyLock(server->ml);
    PR_DELETE(server);
  }

  TEST_LOG(cltsrv_log_file, TEST_LOG_ALWAYS,
           ("main(0x%p): test complete\n", PR_GetCurrentThread()));

  if (thread_provider == thread_win32) {
    thread_type = "\nWin32 Thread Statistics\n";
  } else if (thread_provider == thread_pthread) {
    thread_type = "\npthread Statistics\n";
  } else if (thread_provider == thread_sproc) {
    thread_type = "\nsproc Statistics\n";
  } else {
    PR_ASSERT(thread_provider == thread_nspr);
    thread_type = "\nPRThread Statistics\nn";
  }

  PT_FPrintStats(debug_out, thread_type);

  TimeOfDayMessage("Test exiting at", PR_GetCurrentThread());
  PR_Cleanup();
  return 0;
/* main */

/* cltsrv.c */

Messung V0.5
C=97 H=81 G=89

¤ Dauer der Verarbeitung: 0.5 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.






                                                                                                                                                                                                                                                                                                                                                                                                     


Neuigkeiten

     Aktuelles
     Motto des Tages

Software

     Produkte
     Quellcodebibliothek

Aktivitäten

     Artikel über Sicherheit
     Anleitung zur Aktivierung von SSL

Muße

     Gedichte
     Musik
     Bilder

Jenseits des Üblichen ....

Besucherstatistik

Besucherstatistik

Monitoring

Montastic status badge