nspr: import 3.0 RC1 cutoff from CVS
[mozilla-nspr.git] / nsprpub / pr / src / io / prmwait.c
blob6b3c46a34b1240e5d25ee2392431123efbab245e
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* ***** BEGIN LICENSE BLOCK *****
3 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
5 * The contents of this file are subject to the Mozilla Public License Version
6 * 1.1 (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 * http://www.mozilla.org/MPL/
10 * Software distributed under the License is distributed on an "AS IS" basis,
11 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
12 * for the specific language governing rights and limitations under the
13 * License.
15 * The Original Code is the Netscape Portable Runtime (NSPR).
17 * The Initial Developer of the Original Code is
18 * Netscape Communications Corporation.
19 * Portions created by the Initial Developer are Copyright (C) 1998-2000
20 * the Initial Developer. All Rights Reserved.
22 * Contributor(s):
24 * Alternatively, the contents of this file may be used under the terms of
25 * either the GNU General Public License Version 2 or later (the "GPL"), or
26 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
27 * in which case the provisions of the GPL or the LGPL are applicable instead
28 * of those above. If you wish to allow use of your version of this file only
29 * under the terms of either the GPL or the LGPL, and not to allow others to
30 * use your version of this file under the terms of the MPL, indicate your
31 * decision by deleting the provisions above and replace them with the notice
32 * and other provisions required by the GPL or the LGPL. If you do not delete
33 * the provisions above, a recipient may use your version of this file under
34 * the terms of any one of the MPL, the GPL or the LGPL.
36 * ***** END LICENSE BLOCK ***** */
38 #include "primpl.h"
39 #include "pprmwait.h"
41 #define _MW_REHASH_MAX 11
43 static PRLock *mw_lock = NULL;
44 static _PRGlobalState *mw_state = NULL;
46 static PRIntervalTime max_polling_interval;
48 #ifdef WINNT
50 typedef struct TimerEvent {
51 PRIntervalTime absolute;
52 void (*func)(void *);
53 void *arg;
54 LONG ref_count;
55 PRCList links;
56 } TimerEvent;
58 #define TIMER_EVENT_PTR(_qp) \
59 ((TimerEvent *) ((char *) (_qp) - offsetof(TimerEvent, links)))
61 struct {
62 PRLock *ml;
63 PRCondVar *new_timer;
64 PRCondVar *cancel_timer;
65 PRThread *manager_thread;
66 PRCList timer_queue;
67 } tm_vars;
69 static PRStatus TimerInit(void);
70 static void TimerManager(void *arg);
71 static TimerEvent *CreateTimer(PRIntervalTime timeout,
72 void (*func)(void *), void *arg);
73 static PRBool CancelTimer(TimerEvent *timer);
75 static void TimerManager(void *arg)
77 PRIntervalTime now;
78 PRIntervalTime timeout;
79 PRCList *head;
80 TimerEvent *timer;
82 PR_Lock(tm_vars.ml);
83 while (1)
85 if (PR_CLIST_IS_EMPTY(&tm_vars.timer_queue))
87 PR_WaitCondVar(tm_vars.new_timer, PR_INTERVAL_NO_TIMEOUT);
89 else
91 now = PR_IntervalNow();
92 head = PR_LIST_HEAD(&tm_vars.timer_queue);
93 timer = TIMER_EVENT_PTR(head);
94 if ((PRInt32) (now - timer->absolute) >= 0)
96 PR_REMOVE_LINK(head);
98 * make its prev and next point to itself so that
99 * it's obvious that it's not on the timer_queue.
101 PR_INIT_CLIST(head);
102 PR_ASSERT(2 == timer->ref_count);
103 PR_Unlock(tm_vars.ml);
104 timer->func(timer->arg);
105 PR_Lock(tm_vars.ml);
106 timer->ref_count -= 1;
107 if (0 == timer->ref_count)
109 PR_NotifyAllCondVar(tm_vars.cancel_timer);
112 else
114 timeout = (PRIntervalTime)(timer->absolute - now);
115 PR_WaitCondVar(tm_vars.new_timer, timeout);
119 PR_Unlock(tm_vars.ml);
122 static TimerEvent *CreateTimer(
123 PRIntervalTime timeout,
124 void (*func)(void *),
125 void *arg)
127 TimerEvent *timer;
128 PRCList *links, *tail;
129 TimerEvent *elem;
131 timer = PR_NEW(TimerEvent);
132 if (NULL == timer)
134 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
135 return timer;
137 timer->absolute = PR_IntervalNow() + timeout;
138 timer->func = func;
139 timer->arg = arg;
140 timer->ref_count = 2;
141 PR_Lock(tm_vars.ml);
142 tail = links = PR_LIST_TAIL(&tm_vars.timer_queue);
143 while (links->prev != tail)
145 elem = TIMER_EVENT_PTR(links);
146 if ((PRInt32)(timer->absolute - elem->absolute) >= 0)
148 break;
150 links = links->prev;
152 PR_INSERT_AFTER(&timer->links, links);
153 PR_NotifyCondVar(tm_vars.new_timer);
154 PR_Unlock(tm_vars.ml);
155 return timer;
158 static PRBool CancelTimer(TimerEvent *timer)
160 PRBool canceled = PR_FALSE;
162 PR_Lock(tm_vars.ml);
163 timer->ref_count -= 1;
164 if (timer->links.prev == &timer->links)
166 while (timer->ref_count == 1)
168 PR_WaitCondVar(tm_vars.cancel_timer, PR_INTERVAL_NO_TIMEOUT);
171 else
173 PR_REMOVE_LINK(&timer->links);
174 canceled = PR_TRUE;
176 PR_Unlock(tm_vars.ml);
177 PR_DELETE(timer);
178 return canceled;
181 static PRStatus TimerInit(void)
183 tm_vars.ml = PR_NewLock();
184 if (NULL == tm_vars.ml)
186 goto failed;
188 tm_vars.new_timer = PR_NewCondVar(tm_vars.ml);
189 if (NULL == tm_vars.new_timer)
191 goto failed;
193 tm_vars.cancel_timer = PR_NewCondVar(tm_vars.ml);
194 if (NULL == tm_vars.cancel_timer)
196 goto failed;
198 PR_INIT_CLIST(&tm_vars.timer_queue);
199 tm_vars.manager_thread = PR_CreateThread(
200 PR_SYSTEM_THREAD, TimerManager, NULL, PR_PRIORITY_NORMAL,
201 PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, 0);
202 if (NULL == tm_vars.manager_thread)
204 goto failed;
206 return PR_SUCCESS;
208 failed:
209 if (NULL != tm_vars.cancel_timer)
211 PR_DestroyCondVar(tm_vars.cancel_timer);
213 if (NULL != tm_vars.new_timer)
215 PR_DestroyCondVar(tm_vars.new_timer);
217 if (NULL != tm_vars.ml)
219 PR_DestroyLock(tm_vars.ml);
221 return PR_FAILURE;
224 #endif /* WINNT */
226 /******************************************************************/
227 /******************************************************************/
228 /************************ The private portion *********************/
229 /******************************************************************/
230 /******************************************************************/
231 void _PR_InitMW(void)
233 #ifdef WINNT
235 * We use NT 4's InterlockedCompareExchange() to operate
236 * on PRMWStatus variables.
238 PR_ASSERT(sizeof(LONG) == sizeof(PRMWStatus));
239 TimerInit();
240 #endif
241 mw_lock = PR_NewLock();
242 PR_ASSERT(NULL != mw_lock);
243 mw_state = PR_NEWZAP(_PRGlobalState);
244 PR_ASSERT(NULL != mw_state);
245 PR_INIT_CLIST(&mw_state->group_list);
246 max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL);
247 } /* _PR_InitMW */
249 void _PR_CleanupMW(void)
251 PR_DestroyLock(mw_lock);
252 mw_lock = NULL;
253 if (mw_state->group) {
254 PR_DestroyWaitGroup(mw_state->group);
255 /* mw_state->group is set to NULL as a side effect. */
257 PR_DELETE(mw_state);
258 } /* _PR_CleanupMW */
260 static PRWaitGroup *MW_Init2(void)
262 PRWaitGroup *group = mw_state->group; /* it's the null group */
263 if (NULL == group) /* there is this special case */
265 group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH);
266 if (NULL == group) goto failed_alloc;
267 PR_Lock(mw_lock);
268 if (NULL == mw_state->group)
270 mw_state->group = group;
271 group = NULL;
273 PR_Unlock(mw_lock);
274 if (group != NULL) (void)PR_DestroyWaitGroup(group);
275 group = mw_state->group; /* somebody beat us to it */
277 failed_alloc:
278 return group; /* whatever */
279 } /* MW_Init2 */
281 static _PR_HashStory MW_AddHashInternal(PRRecvWait *desc, _PRWaiterHash *hash)
284 ** The entries are put in the table using the fd (PRFileDesc*) of
285 ** the receive descriptor as the key. This allows us to locate
286 ** the appropriate entry aqain when the poll operation finishes.
288 ** The pointer to the file descriptor object is first divided by
289 ** the natural alignment of a pointer in the belief that object
290 ** will have at least that many zeros in the low order bits.
291 ** This may not be a good assuption.
293 ** We try to put the entry in by rehashing _MW_REHASH_MAX times. After
294 ** that we declare defeat and force the table to be reconstructed.
295 ** Since some fds might be added more than once, won't that cause
296 ** collisions even in an empty table?
298 PRIntn rehash = _MW_REHASH_MAX;
299 PRRecvWait **waiter;
300 PRUintn hidx = _MW_HASH(desc->fd, hash->length);
301 PRUintn hoffset = 0;
303 while (rehash-- > 0)
305 waiter = &hash->recv_wait;
306 if (NULL == waiter[hidx])
308 waiter[hidx] = desc;
309 hash->count += 1;
310 #if 0
311 printf("Adding 0x%x->0x%x ", desc, desc->fd);
312 printf(
313 "table[%u:%u:*%u]: 0x%x->0x%x\n",
314 hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
315 #endif
316 return _prmw_success;
318 if (desc == waiter[hidx])
320 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); /* desc already in table */
321 return _prmw_error;
323 #if 0
324 printf("Failing 0x%x->0x%x ", desc, desc->fd);
325 printf(
326 "table[*%u:%u:%u]: 0x%x->0x%x\n",
327 hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
328 #endif
329 if (0 == hoffset)
331 hoffset = _MW_HASH2(desc->fd, hash->length);
332 PR_ASSERT(0 != hoffset);
334 hidx = (hidx + hoffset) % (hash->length);
336 return _prmw_rehash;
337 } /* MW_AddHashInternal */
339 static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup *group)
341 PRRecvWait **desc;
342 PRUint32 pidx, length;
343 _PRWaiterHash *newHash, *oldHash = group->waiter;
344 PRBool retry;
345 _PR_HashStory hrv;
347 static const PRInt32 prime_number[] = {
348 _PR_DEFAULT_HASH_LENGTH, 179, 521, 907, 1427,
349 2711, 3917, 5021, 8219, 11549, 18911, 26711, 33749, 44771};
350 PRUintn primes = (sizeof(prime_number) / sizeof(PRInt32));
352 /* look up the next size we'd like to use for the hash table */
353 for (pidx = 0; pidx < primes; ++pidx)
355 if (prime_number[pidx] == oldHash->length)
357 break;
360 /* table size must be one of the prime numbers */
361 PR_ASSERT(pidx < primes);
363 /* if pidx == primes - 1, we can't expand the table any more */
364 while (pidx < primes - 1)
366 /* next size */
367 ++pidx;
368 length = prime_number[pidx];
370 /* allocate the new hash table and fill it in with the old */
371 newHash = (_PRWaiterHash*)PR_CALLOC(
372 sizeof(_PRWaiterHash) + (length * sizeof(PRRecvWait*)));
373 if (NULL == newHash)
375 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
376 return _prmw_error;
379 newHash->length = length;
380 retry = PR_FALSE;
381 for (desc = &oldHash->recv_wait;
382 newHash->count < oldHash->count; ++desc)
384 PR_ASSERT(desc < &oldHash->recv_wait + oldHash->length);
385 if (NULL != *desc)
387 hrv = MW_AddHashInternal(*desc, newHash);
388 PR_ASSERT(_prmw_error != hrv);
389 if (_prmw_success != hrv)
391 PR_DELETE(newHash);
392 retry = PR_TRUE;
393 break;
397 if (retry) continue;
399 PR_DELETE(group->waiter);
400 group->waiter = newHash;
401 group->p_timestamp += 1;
402 return _prmw_success;
405 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
406 return _prmw_error; /* we're hosed */
407 } /* MW_ExpandHashInternal */
409 #ifndef WINNT
410 static void _MW_DoneInternal(
411 PRWaitGroup *group, PRRecvWait **waiter, PRMWStatus outcome)
414 ** Add this receive wait object to the list of finished I/O
415 ** operations for this particular group. If there are other
416 ** threads waiting on the group, notify one. If not, arrange
417 ** for this thread to return.
420 #if 0
421 printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd);
422 #endif
423 (*waiter)->outcome = outcome;
424 PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready);
425 PR_NotifyCondVar(group->io_complete);
426 PR_ASSERT(0 != group->waiter->count);
427 group->waiter->count -= 1;
428 *waiter = NULL;
429 } /* _MW_DoneInternal */
430 #endif /* WINNT */
432 static PRRecvWait **_MW_LookupInternal(PRWaitGroup *group, PRFileDesc *fd)
435 ** Find the receive wait object corresponding to the file descriptor.
436 ** Only search the wait group specified.
438 PRRecvWait **desc;
439 PRIntn rehash = _MW_REHASH_MAX;
440 _PRWaiterHash *hash = group->waiter;
441 PRUintn hidx = _MW_HASH(fd, hash->length);
442 PRUintn hoffset = 0;
444 while (rehash-- > 0)
446 desc = (&hash->recv_wait) + hidx;
447 if ((*desc != NULL) && ((*desc)->fd == fd)) return desc;
448 if (0 == hoffset)
450 hoffset = _MW_HASH2(fd, hash->length);
451 PR_ASSERT(0 != hoffset);
453 hidx = (hidx + hoffset) % (hash->length);
455 return NULL;
456 } /* _MW_LookupInternal */
458 #ifndef WINNT
459 static PRStatus _MW_PollInternal(PRWaitGroup *group)
461 PRRecvWait **waiter;
462 PRStatus rv = PR_FAILURE;
463 PRInt32 count, count_ready;
464 PRIntervalTime polling_interval;
466 group->poller = PR_GetCurrentThread();
468 while (PR_TRUE)
470 PRIntervalTime now, since_last_poll;
471 PRPollDesc *poll_list;
473 while (0 == group->waiter->count)
475 PRStatus st;
476 st = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT);
477 if (_prmw_running != group->state)
479 PR_SetError(PR_INVALID_STATE_ERROR, 0);
480 goto aborted;
482 if (_MW_ABORTED(st)) goto aborted;
486 ** There's something to do. See if our existing polling list
487 ** is large enough for what we have to do?
490 while (group->polling_count < group->waiter->count)
492 PRUint32 old_count = group->waiter->count;
493 PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE);
494 PRSize new_size = sizeof(PRPollDesc) * new_count;
495 PRPollDesc *old_polling_list = group->polling_list;
497 PR_Unlock(group->ml);
498 poll_list = (PRPollDesc*)PR_CALLOC(new_size);
499 if (NULL == poll_list)
501 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
502 PR_Lock(group->ml);
503 goto failed_alloc;
505 if (NULL != old_polling_list)
506 PR_DELETE(old_polling_list);
507 PR_Lock(group->ml);
508 if (_prmw_running != group->state)
510 PR_SetError(PR_INVALID_STATE_ERROR, 0);
511 goto aborted;
513 group->polling_list = poll_list;
514 group->polling_count = new_count;
517 now = PR_IntervalNow();
518 polling_interval = max_polling_interval;
519 since_last_poll = now - group->last_poll;
521 waiter = &group->waiter->recv_wait;
522 poll_list = group->polling_list;
523 for (count = 0; count < group->waiter->count; ++waiter)
525 PR_ASSERT(waiter < &group->waiter->recv_wait
526 + group->waiter->length);
527 if (NULL != *waiter) /* a live one! */
529 if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
530 && (since_last_poll >= (*waiter)->timeout))
531 _MW_DoneInternal(group, waiter, PR_MW_TIMEOUT);
532 else
534 if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
536 (*waiter)->timeout -= since_last_poll;
537 if ((*waiter)->timeout < polling_interval)
538 polling_interval = (*waiter)->timeout;
540 PR_ASSERT(poll_list < group->polling_list
541 + group->polling_count);
542 poll_list->fd = (*waiter)->fd;
543 poll_list->in_flags = PR_POLL_READ;
544 poll_list->out_flags = 0;
545 #if 0
546 printf(
547 "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n",
548 poll_list, count, poll_list->fd, (*waiter)->timeout);
549 #endif
550 poll_list += 1;
551 count += 1;
556 PR_ASSERT(count == group->waiter->count);
559 ** If there are no more threads waiting for completion,
560 ** we need to return.
562 if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
563 && (1 == group->waiting_threads)) break;
565 if (0 == count) continue; /* wait for new business */
567 group->last_poll = now;
569 PR_Unlock(group->ml);
571 count_ready = PR_Poll(group->polling_list, count, polling_interval);
573 PR_Lock(group->ml);
575 if (_prmw_running != group->state)
577 PR_SetError(PR_INVALID_STATE_ERROR, 0);
578 goto aborted;
580 if (-1 == count_ready)
582 goto failed_poll; /* that's a shame */
584 else if (0 < count_ready)
586 for (poll_list = group->polling_list; count > 0;
587 poll_list++, count--)
589 PR_ASSERT(
590 poll_list < group->polling_list + group->polling_count);
591 if (poll_list->out_flags != 0)
593 waiter = _MW_LookupInternal(group, poll_list->fd);
595 ** If 'waiter' is NULL, that means the wait receive
596 ** descriptor has been canceled.
598 if (NULL != waiter)
599 _MW_DoneInternal(group, waiter, PR_MW_SUCCESS);
604 ** If there are no more threads waiting for completion,
605 ** we need to return.
606 ** This thread was "borrowed" to do the polling, but it really
607 ** belongs to the client.
609 if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
610 && (1 == group->waiting_threads)) break;
613 rv = PR_SUCCESS;
615 aborted:
616 failed_poll:
617 failed_alloc:
618 group->poller = NULL; /* we were that, not we ain't */
619 if ((_prmw_running == group->state) && (group->waiting_threads > 1))
621 /* Wake up one thread to become the new poller. */
622 PR_NotifyCondVar(group->io_complete);
624 return rv; /* we return with the lock held */
625 } /* _MW_PollInternal */
626 #endif /* !WINNT */
628 static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup *group)
630 PRMWGroupState rv = group->state;
632 ** Looking at the group's fields is safe because
633 ** once the group's state is no longer running, it
634 ** cannot revert and there is a safe check on entry
635 ** to make sure no more threads are made to wait.
637 if ((_prmw_stopping == rv)
638 && (0 == group->waiting_threads))
640 rv = group->state = _prmw_stopped;
641 PR_NotifyCondVar(group->mw_manage);
643 return rv;
644 } /* MW_TestForShutdownInternal */
646 #ifndef WINNT
647 static void _MW_InitialRecv(PRCList *io_ready)
649 PRRecvWait *desc = (PRRecvWait*)io_ready;
650 if ((NULL == desc->buffer.start)
651 || (0 == desc->buffer.length))
652 desc->bytesRecv = 0;
653 else
655 desc->bytesRecv = (desc->fd->methods->recv)(
656 desc->fd, desc->buffer.start,
657 desc->buffer.length, 0, desc->timeout);
658 if (desc->bytesRecv < 0) /* SetError should already be there */
659 desc->outcome = PR_MW_FAILURE;
661 } /* _MW_InitialRecv */
662 #endif
664 #ifdef WINNT
665 static void NT_TimeProc(void *arg)
667 _MDOverlapped *overlapped = (_MDOverlapped *)arg;
668 PRRecvWait *desc = overlapped->data.mw.desc;
669 PRFileDesc *bottom;
671 if (InterlockedCompareExchange((LONG *)&desc->outcome,
672 (LONG)PR_MW_TIMEOUT, (LONG)PR_MW_PENDING) != (LONG)PR_MW_PENDING)
674 /* This wait recv descriptor has already completed. */
675 return;
678 /* close the osfd to abort the outstanding async io request */
679 /* $$$$
680 ** Little late to be checking if NSPR's on the bottom of stack,
681 ** but if we don't check, we can't assert that the private data
682 ** is what we think it is.
683 ** $$$$
685 bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
686 PR_ASSERT(NULL != bottom);
687 if (NULL != bottom) /* now what!?!?! */
689 bottom->secret->state = _PR_FILEDESC_CLOSED;
690 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
692 fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
693 PR_ASSERT(!"What shall I do?");
696 return;
697 } /* NT_TimeProc */
699 static PRStatus NT_HashRemove(PRWaitGroup *group, PRFileDesc *fd)
701 PRRecvWait **waiter;
703 _PR_MD_LOCK(&group->mdlock);
704 waiter = _MW_LookupInternal(group, fd);
705 if (NULL != waiter)
707 group->waiter->count -= 1;
708 *waiter = NULL;
710 _PR_MD_UNLOCK(&group->mdlock);
711 return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
714 PRStatus NT_HashRemoveInternal(PRWaitGroup *group, PRFileDesc *fd)
716 PRRecvWait **waiter;
718 waiter = _MW_LookupInternal(group, fd);
719 if (NULL != waiter)
721 group->waiter->count -= 1;
722 *waiter = NULL;
724 return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
726 #endif /* WINNT */
728 /******************************************************************/
729 /******************************************************************/
730 /********************** The public API portion ********************/
731 /******************************************************************/
732 /******************************************************************/
733 PR_IMPLEMENT(PRStatus) PR_AddWaitFileDesc(
734 PRWaitGroup *group, PRRecvWait *desc)
736 _PR_HashStory hrv;
737 PRStatus rv = PR_FAILURE;
738 #ifdef WINNT
739 _MDOverlapped *overlapped;
740 HANDLE hFile;
741 BOOL bResult;
742 DWORD dwError;
743 PRFileDesc *bottom;
744 #endif
746 if (!_pr_initialized) _PR_ImplicitInitialization();
747 if ((NULL == group) && (NULL == (group = MW_Init2())))
749 return rv;
752 PR_ASSERT(NULL != desc->fd);
754 desc->outcome = PR_MW_PENDING; /* nice, well known value */
755 desc->bytesRecv = 0; /* likewise, though this value is ambiguious */
757 PR_Lock(group->ml);
759 if (_prmw_running != group->state)
761 /* Not allowed to add after cancelling the group */
762 desc->outcome = PR_MW_INTERRUPT;
763 PR_SetError(PR_INVALID_STATE_ERROR, 0);
764 PR_Unlock(group->ml);
765 return rv;
768 #ifdef WINNT
769 _PR_MD_LOCK(&group->mdlock);
770 #endif
773 ** If the waiter count is zero at this point, there's no telling
774 ** how long we've been idle. Therefore, initialize the beginning
775 ** of the timing interval. As long as the list doesn't go empty,
776 ** it will maintain itself.
778 if (0 == group->waiter->count)
779 group->last_poll = PR_IntervalNow();
783 hrv = MW_AddHashInternal(desc, group->waiter);
784 if (_prmw_rehash != hrv) break;
785 hrv = MW_ExpandHashInternal(group); /* gruesome */
786 if (_prmw_success != hrv) break;
787 } while (PR_TRUE);
789 #ifdef WINNT
790 _PR_MD_UNLOCK(&group->mdlock);
791 #endif
793 PR_NotifyCondVar(group->new_business); /* tell the world */
794 rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE;
795 PR_Unlock(group->ml);
797 #ifdef WINNT
798 overlapped = PR_NEWZAP(_MDOverlapped);
799 if (NULL == overlapped)
801 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
802 NT_HashRemove(group, desc->fd);
803 return rv;
805 overlapped->ioModel = _MD_MultiWaitIO;
806 overlapped->data.mw.desc = desc;
807 overlapped->data.mw.group = group;
808 if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
810 overlapped->data.mw.timer = CreateTimer(
811 desc->timeout,
812 NT_TimeProc,
813 overlapped);
814 if (0 == overlapped->data.mw.timer)
816 NT_HashRemove(group, desc->fd);
817 PR_DELETE(overlapped);
819 * XXX It appears that a maximum of 16 timer events can
820 * be outstanding. GetLastError() returns 0 when I try it.
822 PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError());
823 return PR_FAILURE;
827 /* Reach to the bottom layer to get the OS fd */
828 bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
829 PR_ASSERT(NULL != bottom);
830 if (NULL == bottom)
832 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
833 return PR_FAILURE;
835 hFile = (HANDLE)bottom->secret->md.osfd;
836 if (!bottom->secret->md.io_model_committed)
838 PRInt32 st;
839 st = _md_Associate(hFile);
840 PR_ASSERT(0 != st);
841 bottom->secret->md.io_model_committed = PR_TRUE;
843 bResult = ReadFile(hFile,
844 desc->buffer.start,
845 (DWORD)desc->buffer.length,
846 NULL,
847 &overlapped->overlapped);
848 if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING)
850 if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
852 if (InterlockedCompareExchange((LONG *)&desc->outcome,
853 (LONG)PR_MW_FAILURE, (LONG)PR_MW_PENDING)
854 == (LONG)PR_MW_PENDING)
856 CancelTimer(overlapped->data.mw.timer);
858 NT_HashRemove(group, desc->fd);
859 PR_DELETE(overlapped);
861 _PR_MD_MAP_READ_ERROR(dwError);
862 rv = PR_FAILURE;
864 #endif
866 return rv;
867 } /* PR_AddWaitFileDesc */
869 PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup *group)
871 PRCList *io_ready = NULL;
872 #ifdef WINNT
873 PRThread *me = _PR_MD_CURRENT_THREAD();
874 _MDOverlapped *overlapped;
875 #endif
877 if (!_pr_initialized) _PR_ImplicitInitialization();
878 if ((NULL == group) && (NULL == (group = MW_Init2()))) goto failed_init;
880 PR_Lock(group->ml);
882 if (_prmw_running != group->state)
884 PR_SetError(PR_INVALID_STATE_ERROR, 0);
885 goto invalid_state;
888 group->waiting_threads += 1; /* the polling thread is counted */
890 #ifdef WINNT
891 _PR_MD_LOCK(&group->mdlock);
892 while (PR_CLIST_IS_EMPTY(&group->io_ready))
894 _PR_THREAD_LOCK(me);
895 me->state = _PR_IO_WAIT;
896 PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
897 if (!_PR_IS_NATIVE_THREAD(me))
899 _PR_SLEEPQ_LOCK(me->cpu);
900 _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
901 _PR_SLEEPQ_UNLOCK(me->cpu);
903 _PR_THREAD_UNLOCK(me);
904 _PR_MD_UNLOCK(&group->mdlock);
905 PR_Unlock(group->ml);
906 _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
907 me->state = _PR_RUNNING;
908 PR_Lock(group->ml);
909 _PR_MD_LOCK(&group->mdlock);
910 if (_PR_PENDING_INTERRUPT(me)) {
911 PR_REMOVE_LINK(&me->waitQLinks);
912 _PR_MD_UNLOCK(&group->mdlock);
913 me->flags &= ~_PR_INTERRUPT;
914 me->io_suspended = PR_FALSE;
915 PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
916 goto aborted;
919 io_ready = PR_LIST_HEAD(&group->io_ready);
920 PR_ASSERT(io_ready != NULL);
921 PR_REMOVE_LINK(io_ready);
922 _PR_MD_UNLOCK(&group->mdlock);
923 overlapped = (_MDOverlapped *)
924 ((char *)io_ready - offsetof(_MDOverlapped, data));
925 io_ready = &overlapped->data.mw.desc->internal;
926 #else
930 ** If the I/O ready list isn't empty, have this thread
931 ** return with the first receive wait object that's available.
933 if (PR_CLIST_IS_EMPTY(&group->io_ready))
936 ** Is there a polling thread yet? If not, grab this thread
937 ** and use it.
939 if (NULL == group->poller)
942 ** This thread will stay do polling until it becomes the only one
943 ** left to service a completion. Then it will return and there will
944 ** be none left to actually poll or to run completions.
946 ** The polling function should only return w/ failure or
947 ** with some I/O ready.
949 if (PR_FAILURE == _MW_PollInternal(group)) goto failed_poll;
951 else
954 ** There are four reasons a thread can be awakened from
955 ** a wait on the io_complete condition variable.
956 ** 1. Some I/O has completed, i.e., the io_ready list
957 ** is nonempty.
958 ** 2. The wait group is canceled.
959 ** 3. The thread is interrupted.
960 ** 4. The current polling thread has to leave and needs
961 ** a replacement.
962 ** The logic to find a new polling thread is made more
963 ** complicated by all the other possible events.
964 ** I tried my best to write the logic clearly, but
965 ** it is still full of if's with continue and goto.
967 PRStatus st;
970 st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEOUT);
971 if (_prmw_running != group->state)
973 PR_SetError(PR_INVALID_STATE_ERROR, 0);
974 goto aborted;
976 if (_MW_ABORTED(st) || (NULL == group->poller)) break;
977 } while (PR_CLIST_IS_EMPTY(&group->io_ready));
980 ** The thread is interrupted and has to leave. It might
981 ** have also been awakened to process ready i/o or be the
982 ** new poller. To be safe, if either condition is true,
983 ** we awaken another thread to take its place.
985 if (_MW_ABORTED(st))
987 if ((NULL == group->poller
988 || !PR_CLIST_IS_EMPTY(&group->io_ready))
989 && group->waiting_threads > 1)
990 PR_NotifyCondVar(group->io_complete);
991 goto aborted;
995 ** A new poller is needed, but can I be the new poller?
996 ** If there is no i/o ready, sure. But if there is any
997 ** i/o ready, it has a higher priority. I want to
998 ** process the ready i/o first and wake up another
999 ** thread to be the new poller.
1001 if (NULL == group->poller)
1003 if (PR_CLIST_IS_EMPTY(&group->io_ready))
1004 continue;
1005 if (group->waiting_threads > 1)
1006 PR_NotifyCondVar(group->io_complete);
1009 PR_ASSERT(!PR_CLIST_IS_EMPTY(&group->io_ready));
1011 io_ready = PR_LIST_HEAD(&group->io_ready);
1012 PR_NotifyCondVar(group->io_taken);
1013 PR_ASSERT(io_ready != NULL);
1014 PR_REMOVE_LINK(io_ready);
1015 } while (NULL == io_ready);
1017 failed_poll:
1019 #endif
1021 aborted:
1023 group->waiting_threads -= 1;
1024 invalid_state:
1025 (void)MW_TestForShutdownInternal(group);
1026 PR_Unlock(group->ml);
1028 failed_init:
1029 if (NULL != io_ready)
1031 /* If the operation failed, record the reason why */
1032 switch (((PRRecvWait*)io_ready)->outcome)
1034 case PR_MW_PENDING:
1035 PR_ASSERT(0);
1036 break;
1037 case PR_MW_SUCCESS:
1038 #ifndef WINNT
1039 _MW_InitialRecv(io_ready);
1040 #endif
1041 break;
1042 #ifdef WINNT
1043 case PR_MW_FAILURE:
1044 _PR_MD_MAP_READ_ERROR(overlapped->data.mw.error);
1045 break;
1046 #endif
1047 case PR_MW_TIMEOUT:
1048 PR_SetError(PR_IO_TIMEOUT_ERROR, 0);
1049 break;
1050 case PR_MW_INTERRUPT:
1051 PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
1052 break;
1053 default: break;
1055 #ifdef WINNT
1056 if (NULL != overlapped->data.mw.timer)
1058 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1059 != overlapped->data.mw.desc->timeout);
1060 CancelTimer(overlapped->data.mw.timer);
1062 else
1064 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1065 == overlapped->data.mw.desc->timeout);
1067 PR_DELETE(overlapped);
1068 #endif
1070 return (PRRecvWait*)io_ready;
1071 } /* PR_WaitRecvReady */
1073 PR_IMPLEMENT(PRStatus) PR_CancelWaitFileDesc(PRWaitGroup *group, PRRecvWait *desc)
1075 #if !defined(WINNT)
1076 PRRecvWait **recv_wait;
1077 #endif
1078 PRStatus rv = PR_SUCCESS;
1079 if (NULL == group) group = mw_state->group;
1080 PR_ASSERT(NULL != group);
1081 if (NULL == group)
1083 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1084 return PR_FAILURE;
1087 PR_Lock(group->ml);
1089 if (_prmw_running != group->state)
1091 PR_SetError(PR_INVALID_STATE_ERROR, 0);
1092 rv = PR_FAILURE;
1093 goto unlock;
1096 #ifdef WINNT
1097 if (InterlockedCompareExchange((LONG *)&desc->outcome,
1098 (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING)
1100 PRFileDesc *bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
1101 PR_ASSERT(NULL != bottom);
1102 if (NULL == bottom)
1104 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1105 goto unlock;
1107 bottom->secret->state = _PR_FILEDESC_CLOSED;
1108 #if 0
1109 fprintf(stderr, "cancel wait recv: closing socket\n");
1110 #endif
1111 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
1113 fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
1114 exit(1);
1117 #else
1118 if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd)))
1120 /* it was in the wait table */
1121 _MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT);
1122 goto unlock;
1124 if (!PR_CLIST_IS_EMPTY(&group->io_ready))
1126 /* is it already complete? */
1127 PRCList *head = PR_LIST_HEAD(&group->io_ready);
1130 PRRecvWait *done = (PRRecvWait*)head;
1131 if (done == desc) goto unlock;
1132 head = PR_NEXT_LINK(head);
1133 } while (head != &group->io_ready);
1135 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1136 rv = PR_FAILURE;
1138 #endif
1139 unlock:
1140 PR_Unlock(group->ml);
1141 return rv;
1142 } /* PR_CancelWaitFileDesc */
1144 PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup *group)
1146 PRRecvWait **desc;
1147 PRRecvWait *recv_wait = NULL;
1148 #ifdef WINNT
1149 _MDOverlapped *overlapped;
1150 PRRecvWait **end;
1151 PRThread *me = _PR_MD_CURRENT_THREAD();
1152 #endif
1154 if (NULL == group) group = mw_state->group;
1155 PR_ASSERT(NULL != group);
1156 if (NULL == group)
1158 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1159 return NULL;
1162 PR_Lock(group->ml);
1163 if (_prmw_stopped != group->state)
1165 if (_prmw_running == group->state)
1166 group->state = _prmw_stopping; /* so nothing new comes in */
1167 if (0 == group->waiting_threads) /* is there anybody else? */
1168 group->state = _prmw_stopped; /* we can stop right now */
1169 else
1171 PR_NotifyAllCondVar(group->new_business);
1172 PR_NotifyAllCondVar(group->io_complete);
1174 while (_prmw_stopped != group->state)
1175 (void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT);
1178 #ifdef WINNT
1179 _PR_MD_LOCK(&group->mdlock);
1180 #endif
1181 /* make all the existing descriptors look done/interrupted */
1182 #ifdef WINNT
1183 end = &group->waiter->recv_wait + group->waiter->length;
1184 for (desc = &group->waiter->recv_wait; desc < end; ++desc)
1186 if (NULL != *desc)
1188 if (InterlockedCompareExchange((LONG *)&(*desc)->outcome,
1189 (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING)
1190 == (LONG)PR_MW_PENDING)
1192 PRFileDesc *bottom = PR_GetIdentitiesLayer(
1193 (*desc)->fd, PR_NSPR_IO_LAYER);
1194 PR_ASSERT(NULL != bottom);
1195 if (NULL == bottom)
1197 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1198 goto invalid_arg;
1200 bottom->secret->state = _PR_FILEDESC_CLOSED;
1201 #if 0
1202 fprintf(stderr, "cancel wait group: closing socket\n");
1203 #endif
1204 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
1206 fprintf(stderr, "closesocket failed: %d\n",
1207 WSAGetLastError());
1208 exit(1);
1213 while (group->waiter->count > 0)
1215 _PR_THREAD_LOCK(me);
1216 me->state = _PR_IO_WAIT;
1217 PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
1218 if (!_PR_IS_NATIVE_THREAD(me))
1220 _PR_SLEEPQ_LOCK(me->cpu);
1221 _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
1222 _PR_SLEEPQ_UNLOCK(me->cpu);
1224 _PR_THREAD_UNLOCK(me);
1225 _PR_MD_UNLOCK(&group->mdlock);
1226 PR_Unlock(group->ml);
1227 _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
1228 me->state = _PR_RUNNING;
1229 PR_Lock(group->ml);
1230 _PR_MD_LOCK(&group->mdlock);
1232 #else
1233 for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc)
1235 PR_ASSERT(desc < &group->waiter->recv_wait + group->waiter->length);
1236 if (NULL != *desc)
1237 _MW_DoneInternal(group, desc, PR_MW_INTERRUPT);
1239 #endif
1241 /* take first element of finished list and return it or NULL */
1242 if (PR_CLIST_IS_EMPTY(&group->io_ready))
1243 PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
1244 else
1246 PRCList *head = PR_LIST_HEAD(&group->io_ready);
1247 PR_REMOVE_AND_INIT_LINK(head);
1248 #ifdef WINNT
1249 overlapped = (_MDOverlapped *)
1250 ((char *)head - offsetof(_MDOverlapped, data));
1251 head = &overlapped->data.mw.desc->internal;
1252 if (NULL != overlapped->data.mw.timer)
1254 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1255 != overlapped->data.mw.desc->timeout);
1256 CancelTimer(overlapped->data.mw.timer);
1258 else
1260 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1261 == overlapped->data.mw.desc->timeout);
1263 PR_DELETE(overlapped);
1264 #endif
1265 recv_wait = (PRRecvWait*)head;
1267 #ifdef WINNT
1268 invalid_arg:
1269 _PR_MD_UNLOCK(&group->mdlock);
1270 #endif
1271 PR_Unlock(group->ml);
1273 return recv_wait;
1274 } /* PR_CancelWaitGroup */
1276 PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */)
1278 #ifdef XP_MAC
1279 #pragma unused (size)
1280 #endif
1281 PRWaitGroup *wg;
1283 if (NULL == (wg = PR_NEWZAP(PRWaitGroup)))
1285 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1286 goto failed;
1288 /* the wait group itself */
1289 wg->ml = PR_NewLock();
1290 if (NULL == wg->ml) goto failed_lock;
1291 wg->io_taken = PR_NewCondVar(wg->ml);
1292 if (NULL == wg->io_taken) goto failed_cvar0;
1293 wg->io_complete = PR_NewCondVar(wg->ml);
1294 if (NULL == wg->io_complete) goto failed_cvar1;
1295 wg->new_business = PR_NewCondVar(wg->ml);
1296 if (NULL == wg->new_business) goto failed_cvar2;
1297 wg->mw_manage = PR_NewCondVar(wg->ml);
1298 if (NULL == wg->mw_manage) goto failed_cvar3;
1300 PR_INIT_CLIST(&wg->group_link);
1301 PR_INIT_CLIST(&wg->io_ready);
1303 /* the waiters sequence */
1304 wg->waiter = (_PRWaiterHash*)PR_CALLOC(
1305 sizeof(_PRWaiterHash) +
1306 (_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*)));
1307 if (NULL == wg->waiter)
1309 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1310 goto failed_waiter;
1312 wg->waiter->count = 0;
1313 wg->waiter->length = _PR_DEFAULT_HASH_LENGTH;
1315 #ifdef WINNT
1316 _PR_MD_NEW_LOCK(&wg->mdlock);
1317 PR_INIT_CLIST(&wg->wait_list);
1318 #endif /* WINNT */
1320 PR_Lock(mw_lock);
1321 PR_APPEND_LINK(&wg->group_link, &mw_state->group_list);
1322 PR_Unlock(mw_lock);
1323 return wg;
1325 failed_waiter:
1326 PR_DestroyCondVar(wg->mw_manage);
1327 failed_cvar3:
1328 PR_DestroyCondVar(wg->new_business);
1329 failed_cvar2:
1330 PR_DestroyCondVar(wg->io_complete);
1331 failed_cvar1:
1332 PR_DestroyCondVar(wg->io_taken);
1333 failed_cvar0:
1334 PR_DestroyLock(wg->ml);
1335 failed_lock:
1336 PR_DELETE(wg);
1337 wg = NULL;
1339 failed:
1340 return wg;
1341 } /* MW_CreateWaitGroup */
1343 PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup *group)
1345 PRStatus rv = PR_SUCCESS;
1346 if (NULL == group) group = mw_state->group;
1347 PR_ASSERT(NULL != group);
1348 if (NULL != group)
1350 PR_Lock(group->ml);
1351 if ((group->waiting_threads == 0)
1352 && (group->waiter->count == 0)
1353 && PR_CLIST_IS_EMPTY(&group->io_ready))
1355 group->state = _prmw_stopped;
1357 else
1359 PR_SetError(PR_INVALID_STATE_ERROR, 0);
1360 rv = PR_FAILURE;
1362 PR_Unlock(group->ml);
1363 if (PR_FAILURE == rv) return rv;
1365 PR_Lock(mw_lock);
1366 PR_REMOVE_LINK(&group->group_link);
1367 PR_Unlock(mw_lock);
1369 #ifdef WINNT
1371 * XXX make sure wait_list is empty and waiter is empty.
1372 * These must be checked while holding mdlock.
1374 _PR_MD_FREE_LOCK(&group->mdlock);
1375 #endif
1377 PR_DELETE(group->waiter);
1378 PR_DELETE(group->polling_list);
1379 PR_DestroyCondVar(group->mw_manage);
1380 PR_DestroyCondVar(group->new_business);
1381 PR_DestroyCondVar(group->io_complete);
1382 PR_DestroyCondVar(group->io_taken);
1383 PR_DestroyLock(group->ml);
1384 if (group == mw_state->group) mw_state->group = NULL;
1385 PR_DELETE(group);
1387 else
1389 /* The default wait group is not created yet. */
1390 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1391 rv = PR_FAILURE;
1393 return rv;
1394 } /* PR_DestroyWaitGroup */
1396 /**********************************************************************
1397 ***********************************************************************
1398 ******************** Wait group enumerations **************************
1399 ***********************************************************************
1400 **********************************************************************/
1402 PR_IMPLEMENT(PRMWaitEnumerator*) PR_CreateMWaitEnumerator(PRWaitGroup *group)
1404 PRMWaitEnumerator *enumerator = PR_NEWZAP(PRMWaitEnumerator);
1405 if (NULL == enumerator) PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1406 else
1408 enumerator->group = group;
1409 enumerator->seal = _PR_ENUM_SEALED;
1411 return enumerator;
1412 } /* PR_CreateMWaitEnumerator */
1414 PR_IMPLEMENT(PRStatus) PR_DestroyMWaitEnumerator(PRMWaitEnumerator* enumerator)
1416 PR_ASSERT(NULL != enumerator);
1417 PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
1418 if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal))
1420 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1421 return PR_FAILURE;
1423 enumerator->seal = _PR_ENUM_UNSEALED;
1424 PR_Free(enumerator);
1425 return PR_SUCCESS;
1426 } /* PR_DestroyMWaitEnumerator */
1428 PR_IMPLEMENT(PRRecvWait*) PR_EnumerateWaitGroup(
1429 PRMWaitEnumerator *enumerator, const PRRecvWait *previous)
1431 PRRecvWait *result = NULL;
1433 /* entry point sanity checking */
1434 PR_ASSERT(NULL != enumerator);
1435 PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
1436 if ((NULL == enumerator)
1437 || (_PR_ENUM_SEALED != enumerator->seal)) goto bad_argument;
1439 /* beginning of enumeration */
1440 if (NULL == previous)
1442 if (NULL == enumerator->group)
1444 enumerator->group = mw_state->group;
1445 if (NULL == enumerator->group)
1447 PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
1448 return NULL;
1451 enumerator->waiter = &enumerator->group->waiter->recv_wait;
1452 enumerator->p_timestamp = enumerator->group->p_timestamp;
1453 enumerator->thread = PR_GetCurrentThread();
1454 enumerator->index = 0;
1456 /* continuing an enumeration */
1457 else
1459 PRThread *me = PR_GetCurrentThread();
1460 PR_ASSERT(me == enumerator->thread);
1461 if (me != enumerator->thread) goto bad_argument;
1463 /* need to restart the enumeration */
1464 if (enumerator->p_timestamp != enumerator->group->p_timestamp)
1465 return PR_EnumerateWaitGroup(enumerator, NULL);
1468 /* actually progress the enumeration */
1469 #if defined(WINNT)
1470 _PR_MD_LOCK(&enumerator->group->mdlock);
1471 #else
1472 PR_Lock(enumerator->group->ml);
1473 #endif
1474 while (enumerator->index++ < enumerator->group->waiter->length)
1476 if (NULL != (result = *(enumerator->waiter)++)) break;
1478 #if defined(WINNT)
1479 _PR_MD_UNLOCK(&enumerator->group->mdlock);
1480 #else
1481 PR_Unlock(enumerator->group->ml);
1482 #endif
1484 return result; /* what we live for */
1486 bad_argument:
1487 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1488 return NULL; /* probably ambiguous */
1489 } /* PR_EnumerateWaitGroup */
1491 /* prmwait.c */