1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* ***** BEGIN LICENSE BLOCK *****
3 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
5 * The contents of this file are subject to the Mozilla Public License Version
6 * 1.1 (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 * http://www.mozilla.org/MPL/
10 * Software distributed under the License is distributed on an "AS IS" basis,
11 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
12 * for the specific language governing rights and limitations under the
15 * The Original Code is the Netscape Portable Runtime (NSPR).
17 * The Initial Developer of the Original Code is
18 * Netscape Communications Corporation.
19 * Portions created by the Initial Developer are Copyright (C) 1998-2000
20 * the Initial Developer. All Rights Reserved.
24 * Alternatively, the contents of this file may be used under the terms of
25 * either the GNU General Public License Version 2 or later (the "GPL"), or
26 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
27 * in which case the provisions of the GPL or the LGPL are applicable instead
28 * of those above. If you wish to allow use of your version of this file only
29 * under the terms of either the GPL or the LGPL, and not to allow others to
30 * use your version of this file under the terms of the MPL, indicate your
31 * decision by deleting the provisions above and replace them with the notice
32 * and other provisions required by the GPL or the LGPL. If you do not delete
33 * the provisions above, a recipient may use your version of this file under
34 * the terms of any one of the MPL, the GPL or the LGPL.
36 * ***** END LICENSE BLOCK ***** */
41 * [1] lth. The call to Sleep() is a hack to get the test case to run
42 * on Windows 95. Without it, the test case fails with an error
43 * WSAECONNRESET following a recv() call. The error is caused by the
44 * server side thread termination without a shutdown() or closesocket()
45 * call. Windows docmunentation suggests that this is predicted
46 * behavior; that other platforms get away with it is ... serindipity.
47 * The test case should shutdown() or closesocket() before
48 * thread termination. I didn't have time to figure out where or how
49 * to do it. The Sleep() call inserts enough delay to allow the
50 * client side to recv() all his data before the server side thread
51 * terminates. Whew! ...
53 ** Modification History:
54 * 14-May-97 AGarcia- Converted the test to accomodate the debug_mode flag.
55 * The debug mode will print all of the printfs associated with this test.
56 * The regress mode will be the default mode. Since the regress tool limits
57 * the output to a one line status:PASS or FAIL,all of the printf statements
58 * have been handled with an if (debug_mode) statement.
92 #define printf PR_LogPrint
96 ** This is the beginning of the test
101 #define DEFAULT_LOW 0
102 #define DEFAULT_HIGH 0
103 #define BUFFER_SIZE 1024
104 #define DEFAULT_BACKLOG 5
105 #define DEFAULT_PORT 12849
106 #define DEFAULT_CLIENTS 1
107 #define ALLOWED_IN_ACCEPT 1
108 #define DEFAULT_CLIPPING 1000
109 #define DEFAULT_WORKERS_MIN 1
110 #define DEFAULT_WORKERS_MAX 1
111 #define DEFAULT_SERVER "localhost"
112 #define DEFAULT_EXECUTION_TIME 10
113 #define DEFAULT_CLIENT_TIMEOUT 4000
114 #define DEFAULT_SERVER_TIMEOUT 4000
115 #define DEFAULT_SERVER_PRIORITY PR_PRIORITY_HIGH
117 typedef enum CSState_e
{cs_init
, cs_run
, cs_stop
, cs_exit
} CSState_t
;
119 static void PR_CALLBACK
Worker(void *arg
);
120 typedef struct CSPool_s CSPool_t
;
121 typedef struct CSWorker_s CSWorker_t
;
122 typedef struct CSServer_s CSServer_t
;
123 typedef enum Verbosity
134 static PRInt32 domain
= AF_INET
;
135 static PRInt32 protocol
= 6; /* TCP */
136 static PRFileDesc
*debug_out
= NULL
;
137 static PRBool debug_mode
= PR_FALSE
;
138 static PRBool pthread_stats
= PR_FALSE
;
139 static Verbosity verbosity
= TEST_LOG_ALWAYS
;
140 static PRThreadScope thread_scope
= PR_LOCAL_THREAD
;
144 PRCList element
; /* list of the server's workers */
146 PRThread
*thread
; /* this worker objects thread */
147 CSServer_t
*server
; /* back pointer to server structure */
153 PRCondVar
*acceptComplete
;
154 PRUint32 accepting
, active
, workers
;
159 PRCList list
; /* head of worker list */
162 PRThread
*thread
; /* the main server thread */
163 PRCondVar
*stateChange
;
165 PRUint16 port
; /* port we're listening on */
166 PRUint32 backlog
; /* size of our listener backlog */
167 PRFileDesc
*listener
; /* the fd accepting connections */
169 CSPool_t pool
; /* statistics on worker threads */
170 CSState_t state
; /* the server's state */
171 struct /* controlling worker counts */
173 PRUint32 minimum
, maximum
, accepting
;
177 PRIntervalTime started
, stopped
;
178 PRUint32 operations
, bytesTransferred
;
181 typedef struct CSDescriptor_s
183 PRInt32 size
; /* size of transfer */
184 char filename
[60]; /* filename, null padded */
187 typedef struct CSClient_s
191 PRCondVar
*stateChange
;
192 PRNetAddr serverAddress
;
197 PRIntervalTime started
, stopped
;
198 PRUint32 operations
, bytesTransferred
;
201 #define TEST_LOG(l, p, a) \
203 if (debug_mode || (p <= verbosity)) printf a; \
206 PRLogModuleInfo
*cltsrv_log_file
= NULL
;
208 #define MY_ASSERT(_expr) \
209 ((_expr)?((void)0):_MY_Assert(# _expr,__FILE__,__LINE__))
211 #define TEST_ASSERT(_expr) \
212 ((_expr)?((void)0):_MY_Assert(# _expr,__FILE__,__LINE__))
214 static void _MY_Assert(const char *s
, const char *file
, PRIntn ln
)
217 PR_Assert(s
, file
, ln
);
220 static PRBool
Aborted(PRStatus rv
)
222 return ((PR_FAILURE
== rv
) && (PR_PENDING_INTERRUPT_ERROR
== PR_GetError())) ?
226 static void TimeOfDayMessage(const char *msg
, PRThread
* me
)
230 PR_ExplodeTime(PR_Now(), PR_LocalTimeParameters
, &tod
);
231 (void)PR_FormatTime(buffer
, sizeof(buffer
), "%H:%M:%S", &tod
);
234 cltsrv_log_file
, TEST_LOG_ALWAYS
,
235 ("%s(0x%p): %s\n", msg
, me
, buffer
));
236 } /* TimeOfDayMessage */
239 static void PR_CALLBACK
Client(void *arg
)
244 PRFileDesc
*fd
= NULL
;
245 PRUintn clipping
= DEFAULT_CLIPPING
;
246 PRThread
*me
= PR_GetCurrentThread();
247 CSClient_t
*client
= (CSClient_t
*)arg
;
248 CSDescriptor_t
*descriptor
= PR_NEW(CSDescriptor_t
);
249 PRIntervalTime timeout
= PR_MillisecondsToInterval(DEFAULT_CLIENT_TIMEOUT
);
252 for (index
= 0; index
< sizeof(buffer
); ++index
)
253 buffer
[index
] = (char)index
;
255 client
->started
= PR_IntervalNow();
258 client
->state
= cs_run
;
259 PR_NotifyCondVar(client
->stateChange
);
260 PR_Unlock(client
->ml
);
262 TimeOfDayMessage("Client started at", me
);
264 while (cs_run
== client
->state
)
266 PRInt32 bytes
, descbytes
, filebytes
, netbytes
;
268 (void)PR_NetAddrToString(&client
->serverAddress
, buffer
, sizeof(buffer
));
269 TEST_LOG(cltsrv_log_file
, TEST_LOG_INFO
,
270 ("\tClient(0x%p): connecting to server at %s\n", me
, buffer
));
272 fd
= PR_Socket(domain
, SOCK_STREAM
, protocol
);
273 TEST_ASSERT(NULL
!= fd
);
274 rv
= PR_Connect(fd
, &client
->serverAddress
, timeout
);
275 if (PR_FAILURE
== rv
)
278 cltsrv_log_file
, TEST_LOG_ERROR
,
279 ("\tClient(0x%p): conection failed (%d, %d)\n",
280 me
, PR_GetError(), PR_GetOSError()));
284 memset(descriptor
, 0, sizeof(*descriptor
));
285 descriptor
->size
= PR_htonl(descbytes
= rand() % clipping
);
287 descriptor
->filename
, sizeof(descriptor
->filename
),
288 "CS%p%p-%p.dat", client
->started
, me
, client
->operations
);
290 cltsrv_log_file
, TEST_LOG_VERBOSE
,
291 ("\tClient(0x%p): sending descriptor for %u bytes\n", me
, descbytes
));
293 fd
, descriptor
, sizeof(*descriptor
), SEND_FLAGS
, timeout
);
294 if (sizeof(CSDescriptor_t
) != bytes
)
296 if (Aborted(PR_FAILURE
)) goto aborted
;
297 if (PR_IO_TIMEOUT_ERROR
== PR_GetError())
300 cltsrv_log_file
, TEST_LOG_ERROR
,
301 ("\tClient(0x%p): send descriptor timeout\n", me
));
305 TEST_ASSERT(sizeof(*descriptor
) == bytes
);
308 while (netbytes
< descbytes
)
310 filebytes
= sizeof(buffer
);
311 if ((descbytes
- netbytes
) < filebytes
)
312 filebytes
= descbytes
- netbytes
;
314 cltsrv_log_file
, TEST_LOG_VERBOSE
,
315 ("\tClient(0x%p): sending %d bytes\n", me
, filebytes
));
316 bytes
= PR_Send(fd
, buffer
, filebytes
, SEND_FLAGS
, timeout
);
317 if (filebytes
!= bytes
)
319 if (Aborted(PR_FAILURE
)) goto aborted
;
320 if (PR_IO_TIMEOUT_ERROR
== PR_GetError())
323 cltsrv_log_file
, TEST_LOG_ERROR
,
324 ("\tClient(0x%p): send data timeout\n", me
));
328 TEST_ASSERT(bytes
== filebytes
);
332 while (filebytes
< descbytes
)
334 netbytes
= sizeof(buffer
);
335 if ((descbytes
- filebytes
) < netbytes
)
336 netbytes
= descbytes
- filebytes
;
338 cltsrv_log_file
, TEST_LOG_VERBOSE
,
339 ("\tClient(0x%p): receiving %d bytes\n", me
, netbytes
));
340 bytes
= PR_Recv(fd
, buffer
, netbytes
, RECV_FLAGS
, timeout
);
343 if (Aborted(PR_FAILURE
))
346 cltsrv_log_file
, TEST_LOG_ERROR
,
347 ("\tClient(0x%p): receive data aborted\n", me
));
350 else if (PR_IO_TIMEOUT_ERROR
== PR_GetError())
352 cltsrv_log_file
, TEST_LOG_ERROR
,
353 ("\tClient(0x%p): receive data timeout\n", me
));
356 cltsrv_log_file
, TEST_LOG_ERROR
,
357 ("\tClient(0x%p): receive error (%d, %d)\n",
358 me
, PR_GetError(), PR_GetOSError()));
364 cltsrv_log_file
, TEST_LOG_ERROR
,
365 ("\t\tClient(0x%p): unexpected end of stream\n",
366 PR_GetCurrentThread()));
372 rv
= PR_Shutdown(fd
, PR_SHUTDOWN_BOTH
);
373 if (Aborted(rv
)) goto aborted
;
374 TEST_ASSERT(PR_SUCCESS
== rv
);
376 (void)PR_Close(fd
); fd
= NULL
;
378 cltsrv_log_file
, TEST_LOG_INFO
,
379 ("\tClient(0x%p): disconnected from server\n", me
));
382 client
->operations
+= 1;
383 client
->bytesTransferred
+= 2 * descbytes
;
384 rv
= PR_WaitCondVar(client
->stateChange
, rand() % clipping
);
385 PR_Unlock(client
->ml
);
386 if (Aborted(rv
)) break;
390 client
->stopped
= PR_IntervalNow();
393 if (NULL
!= fd
) rv
= PR_Close(fd
);
396 client
->state
= cs_exit
;
397 PR_NotifyCondVar(client
->stateChange
);
398 PR_Unlock(client
->ml
);
399 PR_DELETE(descriptor
);
401 cltsrv_log_file
, TEST_LOG_ALWAYS
,
402 ("\tClient(0x%p): stopped after %u operations and %u bytes\n",
403 PR_GetCurrentThread(), client
->operations
, client
->bytesTransferred
));
407 static PRStatus
ProcessRequest(PRFileDesc
*fd
, CSServer_t
*server
)
411 PRFileDesc
*file
= NULL
;
412 PRThread
* me
= PR_GetCurrentThread();
413 PRInt32 bytes
, descbytes
, netbytes
, filebytes
= 0;
414 CSDescriptor_t
*descriptor
= PR_NEW(CSDescriptor_t
);
415 PRIntervalTime timeout
= PR_MillisecondsToInterval(DEFAULT_SERVER_TIMEOUT
);
418 cltsrv_log_file
, TEST_LOG_VERBOSE
,
419 ("\tProcessRequest(0x%p): receiving desciptor\n", me
));
421 fd
, descriptor
, sizeof(*descriptor
), RECV_FLAGS
, timeout
);
425 if (Aborted(rv
)) goto exit
;
426 if (PR_IO_TIMEOUT_ERROR
== PR_GetError())
429 cltsrv_log_file
, TEST_LOG_ERROR
,
430 ("\tProcessRequest(0x%p): receive timeout\n", me
));
438 cltsrv_log_file
, TEST_LOG_ERROR
,
439 ("\tProcessRequest(0x%p): unexpected end of file\n", me
));
442 descbytes
= PR_ntohl(descriptor
->size
);
443 TEST_ASSERT(sizeof(*descriptor
) == bytes
);
446 cltsrv_log_file
, TEST_LOG_VERBOSE
,
447 ("\t\tProcessRequest(0x%p): read descriptor {%d, %s}\n",
448 me
, descbytes
, descriptor
->filename
));
451 descriptor
->filename
, (PR_CREATE_FILE
| PR_WRONLY
), 0666);
455 if (Aborted(rv
)) goto aborted
;
456 if (PR_IO_TIMEOUT_ERROR
== PR_GetError())
459 cltsrv_log_file
, TEST_LOG_ERROR
,
460 ("\tProcessRequest(0x%p): open file timeout\n", me
));
464 TEST_ASSERT(NULL
!= file
);
467 while (filebytes
< descbytes
)
469 netbytes
= sizeof(buffer
);
470 if ((descbytes
- filebytes
) < netbytes
)
471 netbytes
= descbytes
- filebytes
;
473 cltsrv_log_file
, TEST_LOG_VERBOSE
,
474 ("\tProcessRequest(0x%p): receive %d bytes\n", me
, netbytes
));
475 bytes
= PR_Recv(fd
, buffer
, netbytes
, RECV_FLAGS
, timeout
);
479 if (Aborted(rv
)) goto aborted
;
480 if (PR_IO_TIMEOUT_ERROR
== PR_GetError())
483 cltsrv_log_file
, TEST_LOG_ERROR
,
484 ("\t\tProcessRequest(0x%p): receive data timeout\n", me
));
488 * XXX: I got (PR_CONNECT_RESET_ERROR, ERROR_NETNAME_DELETED)
489 * on NT here. This is equivalent to ECONNRESET on Unix.
493 cltsrv_log_file
, TEST_LOG_WARNING
,
494 ("\t\tProcessRequest(0x%p): unexpected error (%d, %d)\n",
495 me
, PR_GetError(), PR_GetOSError()));
501 cltsrv_log_file
, TEST_LOG_WARNING
,
502 ("\t\tProcessRequest(0x%p): unexpected end of stream\n", me
));
508 /* The byte count for PR_Write should be positive */
509 MY_ASSERT(netbytes
> 0);
511 cltsrv_log_file
, TEST_LOG_VERBOSE
,
512 ("\tProcessRequest(0x%p): write %d bytes to file\n", me
, netbytes
));
513 bytes
= PR_Write(file
, buffer
, netbytes
);
514 if (netbytes
!= bytes
)
517 if (Aborted(rv
)) goto aborted
;
518 if (PR_IO_TIMEOUT_ERROR
== PR_GetError())
521 cltsrv_log_file
, TEST_LOG_ERROR
,
522 ("\t\tProcessRequest(0x%p): write file timeout\n", me
));
526 TEST_ASSERT(bytes
> 0);
530 server
->operations
+= 1;
531 server
->bytesTransferred
+= filebytes
;
532 PR_Unlock(server
->ml
);
535 if (Aborted(rv
)) goto aborted
;
536 TEST_ASSERT(PR_SUCCESS
== rv
);
540 cltsrv_log_file
, TEST_LOG_VERBOSE
,
541 ("\t\tProcessRequest(0x%p): opening %s\n", me
, descriptor
->filename
));
542 file
= PR_Open(descriptor
->filename
, PR_RDONLY
, 0);
546 if (Aborted(rv
)) goto aborted
;
547 if (PR_IO_TIMEOUT_ERROR
== PR_GetError())
550 cltsrv_log_file
, TEST_LOG_ERROR
,
551 ("\t\tProcessRequest(0x%p): open file timeout\n",
552 PR_GetCurrentThread()));
556 cltsrv_log_file
, TEST_LOG_ERROR
,
557 ("\t\tProcessRequest(0x%p): other file open error (%u, %u)\n",
558 me
, PR_GetError(), PR_GetOSError()));
561 TEST_ASSERT(NULL
!= file
);
564 while (netbytes
< descbytes
)
566 filebytes
= sizeof(buffer
);
567 if ((descbytes
- netbytes
) < filebytes
)
568 filebytes
= descbytes
- netbytes
;
570 cltsrv_log_file
, TEST_LOG_VERBOSE
,
571 ("\tProcessRequest(0x%p): read %d bytes from file\n", me
, filebytes
));
572 bytes
= PR_Read(file
, buffer
, filebytes
);
573 if (filebytes
!= bytes
)
576 if (Aborted(rv
)) goto aborted
;
577 if (PR_IO_TIMEOUT_ERROR
== PR_GetError())
579 cltsrv_log_file
, TEST_LOG_ERROR
,
580 ("\t\tProcessRequest(0x%p): read file timeout\n", me
));
583 cltsrv_log_file
, TEST_LOG_ERROR
,
584 ("\t\tProcessRequest(0x%p): other file error (%d, %d)\n",
585 me
, PR_GetError(), PR_GetOSError()));
588 TEST_ASSERT(bytes
> 0);
592 cltsrv_log_file
, TEST_LOG_VERBOSE
,
593 ("\t\tProcessRequest(0x%p): sending %d bytes\n", me
, filebytes
));
594 bytes
= PR_Send(fd
, buffer
, filebytes
, SEND_FLAGS
, timeout
);
595 if (filebytes
!= bytes
)
598 if (Aborted(rv
)) goto aborted
;
599 if (PR_IO_TIMEOUT_ERROR
== PR_GetError())
602 cltsrv_log_file
, TEST_LOG_ERROR
,
603 ("\t\tProcessRequest(0x%p): send data timeout\n", me
));
608 TEST_ASSERT(bytes
> 0);
612 server
->bytesTransferred
+= filebytes
;
613 PR_Unlock(server
->ml
);
615 rv
= PR_Shutdown(fd
, PR_SHUTDOWN_BOTH
);
616 if (Aborted(rv
)) goto aborted
;
619 if (Aborted(rv
)) goto aborted
;
620 TEST_ASSERT(PR_SUCCESS
== rv
);
625 if (NULL
!= file
) PR_Close(file
);
626 drv
= PR_Delete(descriptor
->filename
);
627 TEST_ASSERT(PR_SUCCESS
== drv
);
630 cltsrv_log_file
, TEST_LOG_VERBOSE
,
631 ("\t\tProcessRequest(0x%p): Finished\n", me
));
633 PR_DELETE(descriptor
);
636 PR_Sleep(PR_MillisecondsToInterval(200)); /* lth. see note [1] */
639 } /* ProcessRequest */
641 static PRStatus
CreateWorker(CSServer_t
*server
, CSPool_t
*pool
)
643 CSWorker_t
*worker
= PR_NEWZAP(CSWorker_t
);
644 worker
->server
= server
;
645 PR_INIT_CLIST(&worker
->element
);
646 worker
->thread
= PR_CreateThread(
647 PR_USER_THREAD
, Worker
, worker
,
648 DEFAULT_SERVER_PRIORITY
, thread_scope
,
649 PR_UNJOINABLE_THREAD
, 0);
650 if (NULL
== worker
->thread
)
656 TEST_LOG(cltsrv_log_file
, TEST_LOG_STATUS
,
657 ("\tCreateWorker(0x%p): create new worker (0x%p)\n",
658 PR_GetCurrentThread(), worker
->thread
));
663 static void PR_CALLBACK
Worker(void *arg
)
667 PRFileDesc
*fd
= NULL
;
668 PRThread
*me
= PR_GetCurrentThread();
669 CSWorker_t
*worker
= (CSWorker_t
*)arg
;
670 CSServer_t
*server
= worker
->server
;
671 CSPool_t
*pool
= &server
->pool
;
674 cltsrv_log_file
, TEST_LOG_NOTICE
,
675 ("\t\tWorker(0x%p): started [%u]\n", me
, pool
->workers
+ 1));
678 PR_APPEND_LINK(&worker
->element
, &server
->list
);
679 pool
->workers
+= 1; /* define our existance */
681 while (cs_run
== server
->state
)
683 while (pool
->accepting
>= server
->workers
.accepting
)
686 cltsrv_log_file
, TEST_LOG_VERBOSE
,
687 ("\t\tWorker(0x%p): waiting for accept slot[%d]\n",
688 me
, pool
->accepting
));
689 rv
= PR_WaitCondVar(pool
->acceptComplete
, PR_INTERVAL_NO_TIMEOUT
);
690 if (Aborted(rv
) || (cs_run
!= server
->state
))
693 cltsrv_log_file
, TEST_LOG_NOTICE
,
694 ("\tWorker(0x%p): has been %s\n",
695 me
, (Aborted(rv
) ? "interrupted" : "stopped")));
699 pool
->accepting
+= 1; /* how many are really in accept */
700 PR_Unlock(server
->ml
);
703 cltsrv_log_file
, TEST_LOG_VERBOSE
,
704 ("\t\tWorker(0x%p): calling accept\n", me
));
705 fd
= PR_Accept(server
->listener
, &from
, PR_INTERVAL_NO_TIMEOUT
);
708 pool
->accepting
-= 1;
709 PR_NotifyCondVar(pool
->acceptComplete
);
711 if ((NULL
== fd
) && Aborted(PR_FAILURE
))
713 if (NULL
!= server
->listener
)
715 PR_Close(server
->listener
);
716 server
->listener
= NULL
;
724 ** Create another worker of the total number of workers is
725 ** less than the minimum specified or we have none left in
726 ** accept() AND we're not over the maximum.
727 ** This sort of presumes that the number allowed in accept
728 ** is at least as many as the minimum. Otherwise we'll keep
729 ** creating new threads and deleting them soon after.
732 ((pool
->workers
< server
->workers
.minimum
) ||
733 ((0 == pool
->accepting
)
734 && (pool
->workers
< server
->workers
.maximum
))) ?
737 PR_Unlock(server
->ml
);
739 if (another
) (void)CreateWorker(server
, pool
);
741 rv
= ProcessRequest(fd
, server
);
742 if (PR_SUCCESS
!= rv
)
744 cltsrv_log_file
, TEST_LOG_ERROR
,
745 ("\t\tWorker(0x%p): server process ended abnormally\n", me
));
746 (void)PR_Close(fd
); fd
= NULL
;
755 PR_Unlock(server
->ml
);
759 (void)PR_Shutdown(fd
, PR_SHUTDOWN_BOTH
);
764 cltsrv_log_file
, TEST_LOG_NOTICE
,
765 ("\t\tWorker(0x%p): exiting [%u]\n", PR_GetCurrentThread(), pool
->workers
));
768 pool
->workers
-= 1; /* undefine our existance */
769 PR_REMOVE_AND_INIT_LINK(&worker
->element
);
770 PR_NotifyCondVar(pool
->exiting
);
771 PR_Unlock(server
->ml
);
773 PR_DELETE(worker
); /* destruction of the "worker" object */
777 static void PR_CALLBACK
Server(void *arg
)
780 PRNetAddr serverAddress
;
781 PRThread
*me
= PR_GetCurrentThread();
782 CSServer_t
*server
= (CSServer_t
*)arg
;
783 PRSocketOptionData sockOpt
;
785 server
->listener
= PR_Socket(domain
, SOCK_STREAM
, protocol
);
787 sockOpt
.option
= PR_SockOpt_Reuseaddr
;
788 sockOpt
.value
.reuse_addr
= PR_TRUE
;
789 rv
= PR_SetSocketOption(server
->listener
, &sockOpt
);
790 TEST_ASSERT(PR_SUCCESS
== rv
);
792 memset(&serverAddress
, 0, sizeof(serverAddress
));
793 if (PR_AF_INET6
!= domain
)
794 rv
= PR_InitializeNetAddr(PR_IpAddrAny
, DEFAULT_PORT
, &serverAddress
);
796 rv
= PR_SetNetAddr(PR_IpAddrAny
, PR_AF_INET6
, DEFAULT_PORT
,
798 rv
= PR_Bind(server
->listener
, &serverAddress
);
799 TEST_ASSERT(PR_SUCCESS
== rv
);
801 rv
= PR_Listen(server
->listener
, server
->backlog
);
802 TEST_ASSERT(PR_SUCCESS
== rv
);
804 server
->started
= PR_IntervalNow();
805 TimeOfDayMessage("Server started at", me
);
808 server
->state
= cs_run
;
809 PR_NotifyCondVar(server
->stateChange
);
810 PR_Unlock(server
->ml
);
813 ** Create the first worker (actually, a thread that accepts
814 ** connections and then processes the work load as needed).
815 ** From this point on, additional worker threads are created
816 ** as they are needed by existing worker threads.
818 rv
= CreateWorker(server
, &server
->pool
);
819 TEST_ASSERT(PR_SUCCESS
== rv
);
822 ** From here on this thread is merely hanging around as the contact
823 ** point for the main test driver. It's just waiting for the driver
824 ** to declare the test complete.
827 cltsrv_log_file
, TEST_LOG_VERBOSE
,
828 ("\tServer(0x%p): waiting for state change\n", me
));
831 while ((cs_run
== server
->state
) && !Aborted(rv
))
833 rv
= PR_WaitCondVar(server
->stateChange
, PR_INTERVAL_NO_TIMEOUT
);
835 PR_Unlock(server
->ml
);
839 cltsrv_log_file
, TEST_LOG_INFO
,
840 ("\tServer(0x%p): shutting down workers\n", me
));
843 ** Get all the worker threads to exit. They know how to
844 ** clean up after themselves, so this is just a matter of
845 ** waiting for clorine in the pool to take effect. During
846 ** this stage we're ignoring interrupts.
848 server
->workers
.minimum
= server
->workers
.maximum
= 0;
851 while (!PR_CLIST_IS_EMPTY(&server
->list
))
853 PRCList
*head
= PR_LIST_HEAD(&server
->list
);
854 CSWorker_t
*worker
= (CSWorker_t
*)head
;
856 cltsrv_log_file
, TEST_LOG_VERBOSE
,
857 ("\tServer(0x%p): interrupting worker(0x%p)\n", me
, worker
));
858 rv
= PR_Interrupt(worker
->thread
);
859 TEST_ASSERT(PR_SUCCESS
== rv
);
860 PR_REMOVE_AND_INIT_LINK(head
);
863 while (server
->pool
.workers
> 0)
866 cltsrv_log_file
, TEST_LOG_NOTICE
,
867 ("\tServer(0x%p): waiting for %u workers to exit\n",
868 me
, server
->pool
.workers
));
869 (void)PR_WaitCondVar(server
->pool
.exiting
, PR_INTERVAL_NO_TIMEOUT
);
872 server
->state
= cs_exit
;
873 PR_NotifyCondVar(server
->stateChange
);
874 PR_Unlock(server
->ml
);
877 cltsrv_log_file
, TEST_LOG_ALWAYS
,
878 ("\tServer(0x%p): stopped after %u operations and %u bytes\n",
879 me
, server
->operations
, server
->bytesTransferred
));
881 if (NULL
!= server
->listener
) PR_Close(server
->listener
);
882 server
->stopped
= PR_IntervalNow();
886 static void WaitForCompletion(PRIntn execution
)
888 while (execution
> 0)
890 PRIntn dally
= (execution
> 30) ? 30 : execution
;
891 PR_Sleep(PR_SecondsToInterval(dally
));
892 if (pthread_stats
) PT_FPrintStats(debug_out
, "\nPThread Statistics\n");
895 } /* WaitForCompletion */
897 static void Help(void)
899 PR_fprintf(debug_out
, "cltsrv test program usage:\n");
900 PR_fprintf(debug_out
, "\t-a <n> threads allowed in accept (5)\n");
901 PR_fprintf(debug_out
, "\t-b <n> backlock for listen (5)\n");
902 PR_fprintf(debug_out
, "\t-c <threads> number of clients to create (1)\n");
903 PR_fprintf(debug_out
, "\t-f <low> low water mark for fd caching (0)\n");
904 PR_fprintf(debug_out
, "\t-F <high> high water mark for fd caching (0)\n");
905 PR_fprintf(debug_out
, "\t-w <threads> minimal number of server threads (1)\n");
906 PR_fprintf(debug_out
, "\t-W <threads> maximum number of server threads (1)\n");
907 PR_fprintf(debug_out
, "\t-e <seconds> duration of the test in seconds (10)\n");
908 PR_fprintf(debug_out
, "\t-s <string> dsn name of server (localhost)\n");
909 PR_fprintf(debug_out
, "\t-G use GLOBAL threads (LOCAL)\n");
910 PR_fprintf(debug_out
, "\t-X use XTP as transport (TCP)\n");
911 PR_fprintf(debug_out
, "\t-6 Use IPv6 (IPv4)\n");
912 PR_fprintf(debug_out
, "\t-v verbosity (accumulative) (0)\n");
913 PR_fprintf(debug_out
, "\t-p pthread statistics (FALSE)\n");
914 PR_fprintf(debug_out
, "\t-d debug mode (FALSE)\n");
915 PR_fprintf(debug_out
, "\t-h this message\n");
918 static Verbosity
IncrementVerbosity(void)
920 PRIntn verboge
= (PRIntn
)verbosity
+ 1;
921 return (Verbosity
)verboge
;
922 } /* IncrementVerbosity */
924 PRIntn
main(PRIntn argc
, char** argv
)
929 PRStatus rv
, joinStatus
;
930 CSServer_t
*server
= NULL
;
932 PRUintn backlog
= DEFAULT_BACKLOG
;
933 PRUintn clients
= DEFAULT_CLIENTS
;
934 const char *serverName
= DEFAULT_SERVER
;
935 PRBool serverIsLocal
= PR_TRUE
;
936 PRUintn accepting
= ALLOWED_IN_ACCEPT
;
937 PRUintn workersMin
= DEFAULT_WORKERS_MIN
;
938 PRUintn workersMax
= DEFAULT_WORKERS_MAX
;
939 PRIntn execution
= DEFAULT_EXECUTION_TIME
;
940 PRIntn low
= DEFAULT_LOW
, high
= DEFAULT_HIGH
;
943 * -G use global threads
944 * -a <n> threads allowed in accept
945 * -b <n> backlock for listen
946 * -c <threads> number of clients to create
947 * -f <low> low water mark for caching FDs
948 * -F <high> high water mark for caching FDs
949 * -w <threads> minimal number of server threads
950 * -W <threads> maximum number of server threads
951 * -e <seconds> duration of the test in seconds
952 * -s <string> dsn name of server (implies no server here)
957 PLOptState
*opt
= PL_CreateOptState(argc
, argv
, "GX6b:a:c:f:F:w:W:e:s:vdhp");
959 debug_out
= PR_GetSpecialFD(PR_StandardError
);
961 while (PL_OPT_EOL
!= (os
= PL_GetNextOpt(opt
)))
963 if (PL_OPT_BAD
== os
) continue;
966 case 'G': /* use global threads */
967 thread_scope
= PR_GLOBAL_THREAD
;
969 case 'X': /* use XTP as transport */
972 case '6': /* Use IPv6 */
973 domain
= PR_AF_INET6
;
975 case 'a': /* the value for accepting */
976 accepting
= atoi(opt
->value
);
978 case 'b': /* the value for backlock */
979 backlog
= atoi(opt
->value
);
981 case 'c': /* number of client threads */
982 clients
= atoi(opt
->value
);
984 case 'f': /* low water fd cache */
985 low
= atoi(opt
->value
);
987 case 'F': /* low water fd cache */
988 high
= atoi(opt
->value
);
990 case 'w': /* minimum server worker threads */
991 workersMin
= atoi(opt
->value
);
993 case 'W': /* maximum server worker threads */
994 workersMax
= atoi(opt
->value
);
996 case 'e': /* program execution time in seconds */
997 execution
= atoi(opt
->value
);
999 case 's': /* server's address */
1000 serverName
= opt
->value
;
1002 case 'v': /* verbosity */
1003 verbosity
= IncrementVerbosity();
1005 case 'd': /* debug mode */
1006 debug_mode
= PR_TRUE
;
1008 case 'p': /* pthread mode */
1009 pthread_stats
= PR_TRUE
;
1017 PL_DestroyOptState(opt
);
1019 if (0 != PL_strcmp(serverName
, DEFAULT_SERVER
)) serverIsLocal
= PR_FALSE
;
1020 if (0 == execution
) execution
= DEFAULT_EXECUTION_TIME
;
1021 if (0 == workersMax
) workersMax
= DEFAULT_WORKERS_MAX
;
1022 if (0 == workersMin
) workersMin
= DEFAULT_WORKERS_MIN
;
1023 if (0 == accepting
) accepting
= ALLOWED_IN_ACCEPT
;
1024 if (0 == backlog
) backlog
= DEFAULT_BACKLOG
;
1026 if (workersMin
> accepting
) accepting
= workersMin
;
1029 TimeOfDayMessage("Client/Server started at", PR_GetCurrentThread());
1031 cltsrv_log_file
= PR_NewLogModule("cltsrv_log");
1032 MY_ASSERT(NULL
!= cltsrv_log_file
);
1033 boolean
= PR_SetLogFile("cltsrv.log");
1037 debug_mode
= PR_TRUE
;
1040 rv
= PR_SetFDCacheSize(low
, high
);
1041 PR_ASSERT(PR_SUCCESS
== rv
);
1045 /* Establish the server */
1047 cltsrv_log_file
, TEST_LOG_INFO
,
1048 ("main(0x%p): starting server\n", PR_GetCurrentThread()));
1050 server
= PR_NEWZAP(CSServer_t
);
1051 PR_INIT_CLIST(&server
->list
);
1052 server
->state
= cs_init
;
1053 server
->ml
= PR_NewLock();
1054 server
->backlog
= backlog
;
1055 server
->port
= DEFAULT_PORT
;
1056 server
->workers
.minimum
= workersMin
;
1057 server
->workers
.maximum
= workersMax
;
1058 server
->workers
.accepting
= accepting
;
1059 server
->stateChange
= PR_NewCondVar(server
->ml
);
1060 server
->pool
.exiting
= PR_NewCondVar(server
->ml
);
1061 server
->pool
.acceptComplete
= PR_NewCondVar(server
->ml
);
1064 cltsrv_log_file
, TEST_LOG_NOTICE
,
1065 ("main(0x%p): creating server thread\n", PR_GetCurrentThread()));
1067 server
->thread
= PR_CreateThread(
1068 PR_USER_THREAD
, Server
, server
, PR_PRIORITY_HIGH
,
1069 thread_scope
, PR_JOINABLE_THREAD
, 0);
1070 TEST_ASSERT(NULL
!= server
->thread
);
1073 cltsrv_log_file
, TEST_LOG_VERBOSE
,
1074 ("main(0x%p): waiting for server init\n", PR_GetCurrentThread()));
1076 PR_Lock(server
->ml
);
1077 while (server
->state
== cs_init
)
1078 PR_WaitCondVar(server
->stateChange
, PR_INTERVAL_NO_TIMEOUT
);
1079 PR_Unlock(server
->ml
);
1082 cltsrv_log_file
, TEST_LOG_VERBOSE
,
1083 ("main(0x%p): server init complete (port #%d)\n",
1084 PR_GetCurrentThread(), server
->port
));
1089 /* Create all of the clients */
1091 char buffer
[BUFFER_SIZE
];
1092 client
= (CSClient_t
*)PR_CALLOC(clients
* sizeof(CSClient_t
));
1095 cltsrv_log_file
, TEST_LOG_VERBOSE
,
1096 ("main(0x%p): creating %d client threads\n",
1097 PR_GetCurrentThread(), clients
));
1101 rv
= PR_GetHostByName(serverName
, buffer
, BUFFER_SIZE
, &host
);
1102 if (PR_SUCCESS
!= rv
)
1104 PL_FPrintError(PR_STDERR
, "PR_GetHostByName");
1109 for (index
= 0; index
< clients
; ++index
)
1111 client
[index
].state
= cs_init
;
1112 client
[index
].ml
= PR_NewLock();
1115 if (PR_AF_INET6
!= domain
)
1116 (void)PR_InitializeNetAddr(
1117 PR_IpAddrLoopback
, DEFAULT_PORT
,
1118 &client
[index
].serverAddress
);
1120 rv
= PR_SetNetAddr(PR_IpAddrLoopback
, PR_AF_INET6
,
1121 DEFAULT_PORT
, &client
[index
].serverAddress
);
1125 (void)PR_EnumerateHostEnt(
1126 0, &host
, DEFAULT_PORT
, &client
[index
].serverAddress
);
1128 client
[index
].stateChange
= PR_NewCondVar(client
[index
].ml
);
1130 cltsrv_log_file
, TEST_LOG_INFO
,
1131 ("main(0x%p): creating client threads\n", PR_GetCurrentThread()));
1132 client
[index
].thread
= PR_CreateThread(
1133 PR_USER_THREAD
, Client
, &client
[index
], PR_PRIORITY_NORMAL
,
1134 thread_scope
, PR_JOINABLE_THREAD
, 0);
1135 TEST_ASSERT(NULL
!= client
[index
].thread
);
1136 PR_Lock(client
[index
].ml
);
1137 while (cs_init
== client
[index
].state
)
1138 PR_WaitCondVar(client
[index
].stateChange
, PR_INTERVAL_NO_TIMEOUT
);
1139 PR_Unlock(client
[index
].ml
);
1143 /* Then just let them go at it for a bit */
1145 cltsrv_log_file
, TEST_LOG_ALWAYS
,
1146 ("main(0x%p): waiting for execution interval (%d seconds)\n",
1147 PR_GetCurrentThread(), execution
));
1149 WaitForCompletion(execution
);
1151 TimeOfDayMessage("Shutting down", PR_GetCurrentThread());
1155 for (index
= 0; index
< clients
; ++index
)
1157 TEST_LOG(cltsrv_log_file
, TEST_LOG_STATUS
,
1158 ("main(0x%p): notifying client(0x%p) to stop\n",
1159 PR_GetCurrentThread(), client
[index
].thread
));
1161 PR_Lock(client
[index
].ml
);
1162 if (cs_run
== client
[index
].state
)
1164 client
[index
].state
= cs_stop
;
1165 PR_Interrupt(client
[index
].thread
);
1166 while (cs_stop
== client
[index
].state
)
1168 client
[index
].stateChange
, PR_INTERVAL_NO_TIMEOUT
);
1170 PR_Unlock(client
[index
].ml
);
1172 TEST_LOG(cltsrv_log_file
, TEST_LOG_VERBOSE
,
1173 ("main(0x%p): joining client(0x%p)\n",
1174 PR_GetCurrentThread(), client
[index
].thread
));
1176 joinStatus
= PR_JoinThread(client
[index
].thread
);
1177 TEST_ASSERT(PR_SUCCESS
== joinStatus
);
1178 PR_DestroyCondVar(client
[index
].stateChange
);
1179 PR_DestroyLock(client
[index
].ml
);
1186 /* All clients joined - retrieve the server */
1188 cltsrv_log_file
, TEST_LOG_NOTICE
,
1189 ("main(0x%p): notifying server(0x%p) to stop\n",
1190 PR_GetCurrentThread(), server
->thread
));
1192 PR_Lock(server
->ml
);
1193 server
->state
= cs_stop
;
1194 PR_Interrupt(server
->thread
);
1195 while (cs_exit
!= server
->state
)
1196 PR_WaitCondVar(server
->stateChange
, PR_INTERVAL_NO_TIMEOUT
);
1197 PR_Unlock(server
->ml
);
1200 cltsrv_log_file
, TEST_LOG_NOTICE
,
1201 ("main(0x%p): joining server(0x%p)\n",
1202 PR_GetCurrentThread(), server
->thread
));
1203 joinStatus
= PR_JoinThread(server
->thread
);
1204 TEST_ASSERT(PR_SUCCESS
== joinStatus
);
1206 PR_DestroyCondVar(server
->stateChange
);
1207 PR_DestroyCondVar(server
->pool
.exiting
);
1208 PR_DestroyCondVar(server
->pool
.acceptComplete
);
1209 PR_DestroyLock(server
->ml
);
1214 cltsrv_log_file
, TEST_LOG_ALWAYS
,
1215 ("main(0x%p): test complete\n", PR_GetCurrentThread()));
1217 PT_FPrintStats(debug_out
, "\nPThread Statistics\n");
1219 TimeOfDayMessage("Test exiting at", PR_GetCurrentThread());