1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* ***** BEGIN LICENSE BLOCK *****
3 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
5 * The contents of this file are subject to the Mozilla Public License Version
6 * 1.1 (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 * http://www.mozilla.org/MPL/
10 * Software distributed under the License is distributed on an "AS IS" basis,
11 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
12 * for the specific language governing rights and limitations under the
15 * The Original Code is the Netscape Portable Runtime (NSPR).
17 * The Initial Developer of the Original Code is
18 * Netscape Communications Corporation.
19 * Portions created by the Initial Developer are Copyright (C) 1998-2000
20 * the Initial Developer. All Rights Reserved.
24 * Alternatively, the contents of this file may be used under the terms of
25 * either the GNU General Public License Version 2 or later (the "GPL"), or
26 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
27 * in which case the provisions of the GPL or the LGPL are applicable instead
28 * of those above. If you wish to allow use of your version of this file only
29 * under the terms of either the GPL or the LGPL, and not to allow others to
30 * use your version of this file under the terms of the MPL, indicate your
31 * decision by deleting the provisions above and replace them with the notice
32 * and other provisions required by the GPL or the LGPL. If you do not delete
33 * the provisions above, a recipient may use your version of this file under
34 * the terms of any one of the MPL, the GPL or the LGPL.
36 * ***** END LICENSE BLOCK ***** */
41 #define _MW_REHASH_MAX 11
43 static PRLock
*mw_lock
= NULL
;
44 static _PRGlobalState
*mw_state
= NULL
;
46 static PRIntervalTime max_polling_interval
;
50 typedef struct TimerEvent
{
51 PRIntervalTime absolute
;
58 #define TIMER_EVENT_PTR(_qp) \
59 ((TimerEvent *) ((char *) (_qp) - offsetof(TimerEvent, links)))
64 PRCondVar
*cancel_timer
;
65 PRThread
*manager_thread
;
69 static PRStatus
TimerInit(void);
70 static void TimerManager(void *arg
);
71 static TimerEvent
*CreateTimer(PRIntervalTime timeout
,
72 void (*func
)(void *), void *arg
);
73 static PRBool
CancelTimer(TimerEvent
*timer
);
75 static void TimerManager(void *arg
)
78 PRIntervalTime timeout
;
85 if (PR_CLIST_IS_EMPTY(&tm_vars
.timer_queue
))
87 PR_WaitCondVar(tm_vars
.new_timer
, PR_INTERVAL_NO_TIMEOUT
);
91 now
= PR_IntervalNow();
92 head
= PR_LIST_HEAD(&tm_vars
.timer_queue
);
93 timer
= TIMER_EVENT_PTR(head
);
94 if ((PRInt32
) (now
- timer
->absolute
) >= 0)
98 * make its prev and next point to itself so that
99 * it's obvious that it's not on the timer_queue.
102 PR_ASSERT(2 == timer
->ref_count
);
103 PR_Unlock(tm_vars
.ml
);
104 timer
->func(timer
->arg
);
106 timer
->ref_count
-= 1;
107 if (0 == timer
->ref_count
)
109 PR_NotifyAllCondVar(tm_vars
.cancel_timer
);
114 timeout
= (PRIntervalTime
)(timer
->absolute
- now
);
115 PR_WaitCondVar(tm_vars
.new_timer
, timeout
);
119 PR_Unlock(tm_vars
.ml
);
122 static TimerEvent
*CreateTimer(
123 PRIntervalTime timeout
,
124 void (*func
)(void *),
128 PRCList
*links
, *tail
;
131 timer
= PR_NEW(TimerEvent
);
134 PR_SetError(PR_OUT_OF_MEMORY_ERROR
, 0);
137 timer
->absolute
= PR_IntervalNow() + timeout
;
140 timer
->ref_count
= 2;
142 tail
= links
= PR_LIST_TAIL(&tm_vars
.timer_queue
);
143 while (links
->prev
!= tail
)
145 elem
= TIMER_EVENT_PTR(links
);
146 if ((PRInt32
)(timer
->absolute
- elem
->absolute
) >= 0)
152 PR_INSERT_AFTER(&timer
->links
, links
);
153 PR_NotifyCondVar(tm_vars
.new_timer
);
154 PR_Unlock(tm_vars
.ml
);
158 static PRBool
CancelTimer(TimerEvent
*timer
)
160 PRBool canceled
= PR_FALSE
;
163 timer
->ref_count
-= 1;
164 if (timer
->links
.prev
== &timer
->links
)
166 while (timer
->ref_count
== 1)
168 PR_WaitCondVar(tm_vars
.cancel_timer
, PR_INTERVAL_NO_TIMEOUT
);
173 PR_REMOVE_LINK(&timer
->links
);
176 PR_Unlock(tm_vars
.ml
);
181 static PRStatus
TimerInit(void)
183 tm_vars
.ml
= PR_NewLock();
184 if (NULL
== tm_vars
.ml
)
188 tm_vars
.new_timer
= PR_NewCondVar(tm_vars
.ml
);
189 if (NULL
== tm_vars
.new_timer
)
193 tm_vars
.cancel_timer
= PR_NewCondVar(tm_vars
.ml
);
194 if (NULL
== tm_vars
.cancel_timer
)
198 PR_INIT_CLIST(&tm_vars
.timer_queue
);
199 tm_vars
.manager_thread
= PR_CreateThread(
200 PR_SYSTEM_THREAD
, TimerManager
, NULL
, PR_PRIORITY_NORMAL
,
201 PR_LOCAL_THREAD
, PR_UNJOINABLE_THREAD
, 0);
202 if (NULL
== tm_vars
.manager_thread
)
209 if (NULL
!= tm_vars
.cancel_timer
)
211 PR_DestroyCondVar(tm_vars
.cancel_timer
);
213 if (NULL
!= tm_vars
.new_timer
)
215 PR_DestroyCondVar(tm_vars
.new_timer
);
217 if (NULL
!= tm_vars
.ml
)
219 PR_DestroyLock(tm_vars
.ml
);
226 /******************************************************************/
227 /******************************************************************/
228 /************************ The private portion *********************/
229 /******************************************************************/
230 /******************************************************************/
231 void _PR_InitMW(void)
235 * We use NT 4's InterlockedCompareExchange() to operate
236 * on PRMWStatus variables.
238 PR_ASSERT(sizeof(LONG
) == sizeof(PRMWStatus
));
241 mw_lock
= PR_NewLock();
242 PR_ASSERT(NULL
!= mw_lock
);
243 mw_state
= PR_NEWZAP(_PRGlobalState
);
244 PR_ASSERT(NULL
!= mw_state
);
245 PR_INIT_CLIST(&mw_state
->group_list
);
246 max_polling_interval
= PR_MillisecondsToInterval(MAX_POLLING_INTERVAL
);
249 void _PR_CleanupMW(void)
251 PR_DestroyLock(mw_lock
);
253 if (mw_state
->group
) {
254 PR_DestroyWaitGroup(mw_state
->group
);
255 /* mw_state->group is set to NULL as a side effect. */
258 } /* _PR_CleanupMW */
260 static PRWaitGroup
*MW_Init2(void)
262 PRWaitGroup
*group
= mw_state
->group
; /* it's the null group */
263 if (NULL
== group
) /* there is this special case */
265 group
= PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH
);
266 if (NULL
== group
) goto failed_alloc
;
268 if (NULL
== mw_state
->group
)
270 mw_state
->group
= group
;
274 if (group
!= NULL
) (void)PR_DestroyWaitGroup(group
);
275 group
= mw_state
->group
; /* somebody beat us to it */
278 return group
; /* whatever */
281 static _PR_HashStory
MW_AddHashInternal(PRRecvWait
*desc
, _PRWaiterHash
*hash
)
284 ** The entries are put in the table using the fd (PRFileDesc*) of
285 ** the receive descriptor as the key. This allows us to locate
286 ** the appropriate entry aqain when the poll operation finishes.
288 ** The pointer to the file descriptor object is first divided by
289 ** the natural alignment of a pointer in the belief that object
290 ** will have at least that many zeros in the low order bits.
291 ** This may not be a good assuption.
293 ** We try to put the entry in by rehashing _MW_REHASH_MAX times. After
294 ** that we declare defeat and force the table to be reconstructed.
295 ** Since some fds might be added more than once, won't that cause
296 ** collisions even in an empty table?
298 PRIntn rehash
= _MW_REHASH_MAX
;
300 PRUintn hidx
= _MW_HASH(desc
->fd
, hash
->length
);
305 waiter
= &hash
->recv_wait
;
306 if (NULL
== waiter
[hidx
])
311 printf("Adding 0x%x->0x%x ", desc
, desc
->fd
);
313 "table[%u:%u:*%u]: 0x%x->0x%x\n",
314 hidx
, hash
->count
, hash
->length
, waiter
[hidx
], waiter
[hidx
]->fd
);
316 return _prmw_success
;
318 if (desc
== waiter
[hidx
])
320 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0); /* desc already in table */
324 printf("Failing 0x%x->0x%x ", desc
, desc
->fd
);
326 "table[*%u:%u:%u]: 0x%x->0x%x\n",
327 hidx
, hash
->count
, hash
->length
, waiter
[hidx
], waiter
[hidx
]->fd
);
331 hoffset
= _MW_HASH2(desc
->fd
, hash
->length
);
332 PR_ASSERT(0 != hoffset
);
334 hidx
= (hidx
+ hoffset
) % (hash
->length
);
337 } /* MW_AddHashInternal */
339 static _PR_HashStory
MW_ExpandHashInternal(PRWaitGroup
*group
)
342 PRUint32 pidx
, length
;
343 _PRWaiterHash
*newHash
, *oldHash
= group
->waiter
;
347 static const PRInt32 prime_number
[] = {
348 _PR_DEFAULT_HASH_LENGTH
, 179, 521, 907, 1427,
349 2711, 3917, 5021, 8219, 11549, 18911, 26711, 33749, 44771};
350 PRUintn primes
= (sizeof(prime_number
) / sizeof(PRInt32
));
352 /* look up the next size we'd like to use for the hash table */
353 for (pidx
= 0; pidx
< primes
; ++pidx
)
355 if (prime_number
[pidx
] == oldHash
->length
)
360 /* table size must be one of the prime numbers */
361 PR_ASSERT(pidx
< primes
);
363 /* if pidx == primes - 1, we can't expand the table any more */
364 while (pidx
< primes
- 1)
368 length
= prime_number
[pidx
];
370 /* allocate the new hash table and fill it in with the old */
371 newHash
= (_PRWaiterHash
*)PR_CALLOC(
372 sizeof(_PRWaiterHash
) + (length
* sizeof(PRRecvWait
*)));
375 PR_SetError(PR_OUT_OF_MEMORY_ERROR
, 0);
379 newHash
->length
= length
;
381 for (desc
= &oldHash
->recv_wait
;
382 newHash
->count
< oldHash
->count
; ++desc
)
384 PR_ASSERT(desc
< &oldHash
->recv_wait
+ oldHash
->length
);
387 hrv
= MW_AddHashInternal(*desc
, newHash
);
388 PR_ASSERT(_prmw_error
!= hrv
);
389 if (_prmw_success
!= hrv
)
399 PR_DELETE(group
->waiter
);
400 group
->waiter
= newHash
;
401 group
->p_timestamp
+= 1;
402 return _prmw_success
;
405 PR_SetError(PR_OUT_OF_MEMORY_ERROR
, 0);
406 return _prmw_error
; /* we're hosed */
407 } /* MW_ExpandHashInternal */
410 static void _MW_DoneInternal(
411 PRWaitGroup
*group
, PRRecvWait
**waiter
, PRMWStatus outcome
)
414 ** Add this receive wait object to the list of finished I/O
415 ** operations for this particular group. If there are other
416 ** threads waiting on the group, notify one. If not, arrange
417 ** for this thread to return.
421 printf("Removing 0x%x->0x%x\n", *waiter
, (*waiter
)->fd
);
423 (*waiter
)->outcome
= outcome
;
424 PR_APPEND_LINK(&((*waiter
)->internal
), &group
->io_ready
);
425 PR_NotifyCondVar(group
->io_complete
);
426 PR_ASSERT(0 != group
->waiter
->count
);
427 group
->waiter
->count
-= 1;
429 } /* _MW_DoneInternal */
432 static PRRecvWait
**_MW_LookupInternal(PRWaitGroup
*group
, PRFileDesc
*fd
)
435 ** Find the receive wait object corresponding to the file descriptor.
436 ** Only search the wait group specified.
439 PRIntn rehash
= _MW_REHASH_MAX
;
440 _PRWaiterHash
*hash
= group
->waiter
;
441 PRUintn hidx
= _MW_HASH(fd
, hash
->length
);
446 desc
= (&hash
->recv_wait
) + hidx
;
447 if ((*desc
!= NULL
) && ((*desc
)->fd
== fd
)) return desc
;
450 hoffset
= _MW_HASH2(fd
, hash
->length
);
451 PR_ASSERT(0 != hoffset
);
453 hidx
= (hidx
+ hoffset
) % (hash
->length
);
456 } /* _MW_LookupInternal */
459 static PRStatus
_MW_PollInternal(PRWaitGroup
*group
)
462 PRStatus rv
= PR_FAILURE
;
463 PRInt32 count
, count_ready
;
464 PRIntervalTime polling_interval
;
466 group
->poller
= PR_GetCurrentThread();
470 PRIntervalTime now
, since_last_poll
;
471 PRPollDesc
*poll_list
;
473 while (0 == group
->waiter
->count
)
476 st
= PR_WaitCondVar(group
->new_business
, PR_INTERVAL_NO_TIMEOUT
);
477 if (_prmw_running
!= group
->state
)
479 PR_SetError(PR_INVALID_STATE_ERROR
, 0);
482 if (_MW_ABORTED(st
)) goto aborted
;
486 ** There's something to do. See if our existing polling list
487 ** is large enough for what we have to do?
490 while (group
->polling_count
< group
->waiter
->count
)
492 PRUint32 old_count
= group
->waiter
->count
;
493 PRUint32 new_count
= PR_ROUNDUP(old_count
, _PR_POLL_COUNT_FUDGE
);
494 PRSize new_size
= sizeof(PRPollDesc
) * new_count
;
495 PRPollDesc
*old_polling_list
= group
->polling_list
;
497 PR_Unlock(group
->ml
);
498 poll_list
= (PRPollDesc
*)PR_CALLOC(new_size
);
499 if (NULL
== poll_list
)
501 PR_SetError(PR_OUT_OF_MEMORY_ERROR
, 0);
505 if (NULL
!= old_polling_list
)
506 PR_DELETE(old_polling_list
);
508 if (_prmw_running
!= group
->state
)
510 PR_SetError(PR_INVALID_STATE_ERROR
, 0);
513 group
->polling_list
= poll_list
;
514 group
->polling_count
= new_count
;
517 now
= PR_IntervalNow();
518 polling_interval
= max_polling_interval
;
519 since_last_poll
= now
- group
->last_poll
;
521 waiter
= &group
->waiter
->recv_wait
;
522 poll_list
= group
->polling_list
;
523 for (count
= 0; count
< group
->waiter
->count
; ++waiter
)
525 PR_ASSERT(waiter
< &group
->waiter
->recv_wait
526 + group
->waiter
->length
);
527 if (NULL
!= *waiter
) /* a live one! */
529 if ((PR_INTERVAL_NO_TIMEOUT
!= (*waiter
)->timeout
)
530 && (since_last_poll
>= (*waiter
)->timeout
))
531 _MW_DoneInternal(group
, waiter
, PR_MW_TIMEOUT
);
534 if (PR_INTERVAL_NO_TIMEOUT
!= (*waiter
)->timeout
)
536 (*waiter
)->timeout
-= since_last_poll
;
537 if ((*waiter
)->timeout
< polling_interval
)
538 polling_interval
= (*waiter
)->timeout
;
540 PR_ASSERT(poll_list
< group
->polling_list
541 + group
->polling_count
);
542 poll_list
->fd
= (*waiter
)->fd
;
543 poll_list
->in_flags
= PR_POLL_READ
;
544 poll_list
->out_flags
= 0;
547 "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n",
548 poll_list
, count
, poll_list
->fd
, (*waiter
)->timeout
);
556 PR_ASSERT(count
== group
->waiter
->count
);
559 ** If there are no more threads waiting for completion,
560 ** we need to return.
562 if ((!PR_CLIST_IS_EMPTY(&group
->io_ready
))
563 && (1 == group
->waiting_threads
)) break;
565 if (0 == count
) continue; /* wait for new business */
567 group
->last_poll
= now
;
569 PR_Unlock(group
->ml
);
571 count_ready
= PR_Poll(group
->polling_list
, count
, polling_interval
);
575 if (_prmw_running
!= group
->state
)
577 PR_SetError(PR_INVALID_STATE_ERROR
, 0);
580 if (-1 == count_ready
)
582 goto failed_poll
; /* that's a shame */
584 else if (0 < count_ready
)
586 for (poll_list
= group
->polling_list
; count
> 0;
587 poll_list
++, count
--)
590 poll_list
< group
->polling_list
+ group
->polling_count
);
591 if (poll_list
->out_flags
!= 0)
593 waiter
= _MW_LookupInternal(group
, poll_list
->fd
);
595 ** If 'waiter' is NULL, that means the wait receive
596 ** descriptor has been canceled.
599 _MW_DoneInternal(group
, waiter
, PR_MW_SUCCESS
);
604 ** If there are no more threads waiting for completion,
605 ** we need to return.
606 ** This thread was "borrowed" to do the polling, but it really
607 ** belongs to the client.
609 if ((!PR_CLIST_IS_EMPTY(&group
->io_ready
))
610 && (1 == group
->waiting_threads
)) break;
618 group
->poller
= NULL
; /* we were that, not we ain't */
619 if ((_prmw_running
== group
->state
) && (group
->waiting_threads
> 1))
621 /* Wake up one thread to become the new poller. */
622 PR_NotifyCondVar(group
->io_complete
);
624 return rv
; /* we return with the lock held */
625 } /* _MW_PollInternal */
628 static PRMWGroupState
MW_TestForShutdownInternal(PRWaitGroup
*group
)
630 PRMWGroupState rv
= group
->state
;
632 ** Looking at the group's fields is safe because
633 ** once the group's state is no longer running, it
634 ** cannot revert and there is a safe check on entry
635 ** to make sure no more threads are made to wait.
637 if ((_prmw_stopping
== rv
)
638 && (0 == group
->waiting_threads
))
640 rv
= group
->state
= _prmw_stopped
;
641 PR_NotifyCondVar(group
->mw_manage
);
644 } /* MW_TestForShutdownInternal */
647 static void _MW_InitialRecv(PRCList
*io_ready
)
649 PRRecvWait
*desc
= (PRRecvWait
*)io_ready
;
650 if ((NULL
== desc
->buffer
.start
)
651 || (0 == desc
->buffer
.length
))
655 desc
->bytesRecv
= (desc
->fd
->methods
->recv
)(
656 desc
->fd
, desc
->buffer
.start
,
657 desc
->buffer
.length
, 0, desc
->timeout
);
658 if (desc
->bytesRecv
< 0) /* SetError should already be there */
659 desc
->outcome
= PR_MW_FAILURE
;
661 } /* _MW_InitialRecv */
665 static void NT_TimeProc(void *arg
)
667 _MDOverlapped
*overlapped
= (_MDOverlapped
*)arg
;
668 PRRecvWait
*desc
= overlapped
->data
.mw
.desc
;
671 if (InterlockedCompareExchange((LONG
*)&desc
->outcome
,
672 (LONG
)PR_MW_TIMEOUT
, (LONG
)PR_MW_PENDING
) != (LONG
)PR_MW_PENDING
)
674 /* This wait recv descriptor has already completed. */
678 /* close the osfd to abort the outstanding async io request */
680 ** Little late to be checking if NSPR's on the bottom of stack,
681 ** but if we don't check, we can't assert that the private data
682 ** is what we think it is.
685 bottom
= PR_GetIdentitiesLayer(desc
->fd
, PR_NSPR_IO_LAYER
);
686 PR_ASSERT(NULL
!= bottom
);
687 if (NULL
!= bottom
) /* now what!?!?! */
689 bottom
->secret
->state
= _PR_FILEDESC_CLOSED
;
690 if (closesocket(bottom
->secret
->md
.osfd
) == SOCKET_ERROR
)
692 fprintf(stderr
, "closesocket failed: %d\n", WSAGetLastError());
693 PR_ASSERT(!"What shall I do?");
699 static PRStatus
NT_HashRemove(PRWaitGroup
*group
, PRFileDesc
*fd
)
703 _PR_MD_LOCK(&group
->mdlock
);
704 waiter
= _MW_LookupInternal(group
, fd
);
707 group
->waiter
->count
-= 1;
710 _PR_MD_UNLOCK(&group
->mdlock
);
711 return (NULL
!= waiter
) ? PR_SUCCESS
: PR_FAILURE
;
714 PRStatus
NT_HashRemoveInternal(PRWaitGroup
*group
, PRFileDesc
*fd
)
718 waiter
= _MW_LookupInternal(group
, fd
);
721 group
->waiter
->count
-= 1;
724 return (NULL
!= waiter
) ? PR_SUCCESS
: PR_FAILURE
;
728 /******************************************************************/
729 /******************************************************************/
730 /********************** The public API portion ********************/
731 /******************************************************************/
732 /******************************************************************/
733 PR_IMPLEMENT(PRStatus
) PR_AddWaitFileDesc(
734 PRWaitGroup
*group
, PRRecvWait
*desc
)
737 PRStatus rv
= PR_FAILURE
;
739 _MDOverlapped
*overlapped
;
746 if (!_pr_initialized
) _PR_ImplicitInitialization();
747 if ((NULL
== group
) && (NULL
== (group
= MW_Init2())))
752 PR_ASSERT(NULL
!= desc
->fd
);
754 desc
->outcome
= PR_MW_PENDING
; /* nice, well known value */
755 desc
->bytesRecv
= 0; /* likewise, though this value is ambiguious */
759 if (_prmw_running
!= group
->state
)
761 /* Not allowed to add after cancelling the group */
762 desc
->outcome
= PR_MW_INTERRUPT
;
763 PR_SetError(PR_INVALID_STATE_ERROR
, 0);
764 PR_Unlock(group
->ml
);
769 _PR_MD_LOCK(&group
->mdlock
);
773 ** If the waiter count is zero at this point, there's no telling
774 ** how long we've been idle. Therefore, initialize the beginning
775 ** of the timing interval. As long as the list doesn't go empty,
776 ** it will maintain itself.
778 if (0 == group
->waiter
->count
)
779 group
->last_poll
= PR_IntervalNow();
783 hrv
= MW_AddHashInternal(desc
, group
->waiter
);
784 if (_prmw_rehash
!= hrv
) break;
785 hrv
= MW_ExpandHashInternal(group
); /* gruesome */
786 if (_prmw_success
!= hrv
) break;
790 _PR_MD_UNLOCK(&group
->mdlock
);
793 PR_NotifyCondVar(group
->new_business
); /* tell the world */
794 rv
= (_prmw_success
== hrv
) ? PR_SUCCESS
: PR_FAILURE
;
795 PR_Unlock(group
->ml
);
798 overlapped
= PR_NEWZAP(_MDOverlapped
);
799 if (NULL
== overlapped
)
801 PR_SetError(PR_OUT_OF_MEMORY_ERROR
, 0);
802 NT_HashRemove(group
, desc
->fd
);
805 overlapped
->ioModel
= _MD_MultiWaitIO
;
806 overlapped
->data
.mw
.desc
= desc
;
807 overlapped
->data
.mw
.group
= group
;
808 if (desc
->timeout
!= PR_INTERVAL_NO_TIMEOUT
)
810 overlapped
->data
.mw
.timer
= CreateTimer(
814 if (0 == overlapped
->data
.mw
.timer
)
816 NT_HashRemove(group
, desc
->fd
);
817 PR_DELETE(overlapped
);
819 * XXX It appears that a maximum of 16 timer events can
820 * be outstanding. GetLastError() returns 0 when I try it.
822 PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR
, GetLastError());
827 /* Reach to the bottom layer to get the OS fd */
828 bottom
= PR_GetIdentitiesLayer(desc
->fd
, PR_NSPR_IO_LAYER
);
829 PR_ASSERT(NULL
!= bottom
);
832 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
835 hFile
= (HANDLE
)bottom
->secret
->md
.osfd
;
836 if (!bottom
->secret
->md
.io_model_committed
)
839 st
= _md_Associate(hFile
);
841 bottom
->secret
->md
.io_model_committed
= PR_TRUE
;
843 bResult
= ReadFile(hFile
,
845 (DWORD
)desc
->buffer
.length
,
847 &overlapped
->overlapped
);
848 if (FALSE
== bResult
&& (dwError
= GetLastError()) != ERROR_IO_PENDING
)
850 if (desc
->timeout
!= PR_INTERVAL_NO_TIMEOUT
)
852 if (InterlockedCompareExchange((LONG
*)&desc
->outcome
,
853 (LONG
)PR_MW_FAILURE
, (LONG
)PR_MW_PENDING
)
854 == (LONG
)PR_MW_PENDING
)
856 CancelTimer(overlapped
->data
.mw
.timer
);
858 NT_HashRemove(group
, desc
->fd
);
859 PR_DELETE(overlapped
);
861 _PR_MD_MAP_READ_ERROR(dwError
);
867 } /* PR_AddWaitFileDesc */
869 PR_IMPLEMENT(PRRecvWait
*) PR_WaitRecvReady(PRWaitGroup
*group
)
871 PRCList
*io_ready
= NULL
;
873 PRThread
*me
= _PR_MD_CURRENT_THREAD();
874 _MDOverlapped
*overlapped
;
877 if (!_pr_initialized
) _PR_ImplicitInitialization();
878 if ((NULL
== group
) && (NULL
== (group
= MW_Init2()))) goto failed_init
;
882 if (_prmw_running
!= group
->state
)
884 PR_SetError(PR_INVALID_STATE_ERROR
, 0);
888 group
->waiting_threads
+= 1; /* the polling thread is counted */
891 _PR_MD_LOCK(&group
->mdlock
);
892 while (PR_CLIST_IS_EMPTY(&group
->io_ready
))
895 me
->state
= _PR_IO_WAIT
;
896 PR_APPEND_LINK(&me
->waitQLinks
, &group
->wait_list
);
897 if (!_PR_IS_NATIVE_THREAD(me
))
899 _PR_SLEEPQ_LOCK(me
->cpu
);
900 _PR_ADD_SLEEPQ(me
, PR_INTERVAL_NO_TIMEOUT
);
901 _PR_SLEEPQ_UNLOCK(me
->cpu
);
903 _PR_THREAD_UNLOCK(me
);
904 _PR_MD_UNLOCK(&group
->mdlock
);
905 PR_Unlock(group
->ml
);
906 _PR_MD_WAIT(me
, PR_INTERVAL_NO_TIMEOUT
);
907 me
->state
= _PR_RUNNING
;
909 _PR_MD_LOCK(&group
->mdlock
);
910 if (_PR_PENDING_INTERRUPT(me
)) {
911 PR_REMOVE_LINK(&me
->waitQLinks
);
912 _PR_MD_UNLOCK(&group
->mdlock
);
913 me
->flags
&= ~_PR_INTERRUPT
;
914 me
->io_suspended
= PR_FALSE
;
915 PR_SetError(PR_PENDING_INTERRUPT_ERROR
, 0);
919 io_ready
= PR_LIST_HEAD(&group
->io_ready
);
920 PR_ASSERT(io_ready
!= NULL
);
921 PR_REMOVE_LINK(io_ready
);
922 _PR_MD_UNLOCK(&group
->mdlock
);
923 overlapped
= (_MDOverlapped
*)
924 ((char *)io_ready
- offsetof(_MDOverlapped
, data
));
925 io_ready
= &overlapped
->data
.mw
.desc
->internal
;
930 ** If the I/O ready list isn't empty, have this thread
931 ** return with the first receive wait object that's available.
933 if (PR_CLIST_IS_EMPTY(&group
->io_ready
))
936 ** Is there a polling thread yet? If not, grab this thread
939 if (NULL
== group
->poller
)
942 ** This thread will stay do polling until it becomes the only one
943 ** left to service a completion. Then it will return and there will
944 ** be none left to actually poll or to run completions.
946 ** The polling function should only return w/ failure or
947 ** with some I/O ready.
949 if (PR_FAILURE
== _MW_PollInternal(group
)) goto failed_poll
;
954 ** There are four reasons a thread can be awakened from
955 ** a wait on the io_complete condition variable.
956 ** 1. Some I/O has completed, i.e., the io_ready list
958 ** 2. The wait group is canceled.
959 ** 3. The thread is interrupted.
960 ** 4. The current polling thread has to leave and needs
962 ** The logic to find a new polling thread is made more
963 ** complicated by all the other possible events.
964 ** I tried my best to write the logic clearly, but
965 ** it is still full of if's with continue and goto.
970 st
= PR_WaitCondVar(group
->io_complete
, PR_INTERVAL_NO_TIMEOUT
);
971 if (_prmw_running
!= group
->state
)
973 PR_SetError(PR_INVALID_STATE_ERROR
, 0);
976 if (_MW_ABORTED(st
) || (NULL
== group
->poller
)) break;
977 } while (PR_CLIST_IS_EMPTY(&group
->io_ready
));
980 ** The thread is interrupted and has to leave. It might
981 ** have also been awakened to process ready i/o or be the
982 ** new poller. To be safe, if either condition is true,
983 ** we awaken another thread to take its place.
987 if ((NULL
== group
->poller
988 || !PR_CLIST_IS_EMPTY(&group
->io_ready
))
989 && group
->waiting_threads
> 1)
990 PR_NotifyCondVar(group
->io_complete
);
995 ** A new poller is needed, but can I be the new poller?
996 ** If there is no i/o ready, sure. But if there is any
997 ** i/o ready, it has a higher priority. I want to
998 ** process the ready i/o first and wake up another
999 ** thread to be the new poller.
1001 if (NULL
== group
->poller
)
1003 if (PR_CLIST_IS_EMPTY(&group
->io_ready
))
1005 if (group
->waiting_threads
> 1)
1006 PR_NotifyCondVar(group
->io_complete
);
1009 PR_ASSERT(!PR_CLIST_IS_EMPTY(&group
->io_ready
));
1011 io_ready
= PR_LIST_HEAD(&group
->io_ready
);
1012 PR_NotifyCondVar(group
->io_taken
);
1013 PR_ASSERT(io_ready
!= NULL
);
1014 PR_REMOVE_LINK(io_ready
);
1015 } while (NULL
== io_ready
);
1023 group
->waiting_threads
-= 1;
1025 (void)MW_TestForShutdownInternal(group
);
1026 PR_Unlock(group
->ml
);
1029 if (NULL
!= io_ready
)
1031 /* If the operation failed, record the reason why */
1032 switch (((PRRecvWait
*)io_ready
)->outcome
)
1039 _MW_InitialRecv(io_ready
);
1044 _PR_MD_MAP_READ_ERROR(overlapped
->data
.mw
.error
);
1048 PR_SetError(PR_IO_TIMEOUT_ERROR
, 0);
1050 case PR_MW_INTERRUPT
:
1051 PR_SetError(PR_PENDING_INTERRUPT_ERROR
, 0);
1056 if (NULL
!= overlapped
->data
.mw
.timer
)
1058 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1059 != overlapped
->data
.mw
.desc
->timeout
);
1060 CancelTimer(overlapped
->data
.mw
.timer
);
1064 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1065 == overlapped
->data
.mw
.desc
->timeout
);
1067 PR_DELETE(overlapped
);
1070 return (PRRecvWait
*)io_ready
;
1071 } /* PR_WaitRecvReady */
1073 PR_IMPLEMENT(PRStatus
) PR_CancelWaitFileDesc(PRWaitGroup
*group
, PRRecvWait
*desc
)
1076 PRRecvWait
**recv_wait
;
1078 PRStatus rv
= PR_SUCCESS
;
1079 if (NULL
== group
) group
= mw_state
->group
;
1080 PR_ASSERT(NULL
!= group
);
1083 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
1089 if (_prmw_running
!= group
->state
)
1091 PR_SetError(PR_INVALID_STATE_ERROR
, 0);
1097 if (InterlockedCompareExchange((LONG
*)&desc
->outcome
,
1098 (LONG
)PR_MW_INTERRUPT
, (LONG
)PR_MW_PENDING
) == (LONG
)PR_MW_PENDING
)
1100 PRFileDesc
*bottom
= PR_GetIdentitiesLayer(desc
->fd
, PR_NSPR_IO_LAYER
);
1101 PR_ASSERT(NULL
!= bottom
);
1104 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
1107 bottom
->secret
->state
= _PR_FILEDESC_CLOSED
;
1109 fprintf(stderr
, "cancel wait recv: closing socket\n");
1111 if (closesocket(bottom
->secret
->md
.osfd
) == SOCKET_ERROR
)
1113 fprintf(stderr
, "closesocket failed: %d\n", WSAGetLastError());
1118 if (NULL
!= (recv_wait
= _MW_LookupInternal(group
, desc
->fd
)))
1120 /* it was in the wait table */
1121 _MW_DoneInternal(group
, recv_wait
, PR_MW_INTERRUPT
);
1124 if (!PR_CLIST_IS_EMPTY(&group
->io_ready
))
1126 /* is it already complete? */
1127 PRCList
*head
= PR_LIST_HEAD(&group
->io_ready
);
1130 PRRecvWait
*done
= (PRRecvWait
*)head
;
1131 if (done
== desc
) goto unlock
;
1132 head
= PR_NEXT_LINK(head
);
1133 } while (head
!= &group
->io_ready
);
1135 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
1140 PR_Unlock(group
->ml
);
1142 } /* PR_CancelWaitFileDesc */
1144 PR_IMPLEMENT(PRRecvWait
*) PR_CancelWaitGroup(PRWaitGroup
*group
)
1147 PRRecvWait
*recv_wait
= NULL
;
1149 _MDOverlapped
*overlapped
;
1151 PRThread
*me
= _PR_MD_CURRENT_THREAD();
1154 if (NULL
== group
) group
= mw_state
->group
;
1155 PR_ASSERT(NULL
!= group
);
1158 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
1163 if (_prmw_stopped
!= group
->state
)
1165 if (_prmw_running
== group
->state
)
1166 group
->state
= _prmw_stopping
; /* so nothing new comes in */
1167 if (0 == group
->waiting_threads
) /* is there anybody else? */
1168 group
->state
= _prmw_stopped
; /* we can stop right now */
1171 PR_NotifyAllCondVar(group
->new_business
);
1172 PR_NotifyAllCondVar(group
->io_complete
);
1174 while (_prmw_stopped
!= group
->state
)
1175 (void)PR_WaitCondVar(group
->mw_manage
, PR_INTERVAL_NO_TIMEOUT
);
1179 _PR_MD_LOCK(&group
->mdlock
);
1181 /* make all the existing descriptors look done/interrupted */
1183 end
= &group
->waiter
->recv_wait
+ group
->waiter
->length
;
1184 for (desc
= &group
->waiter
->recv_wait
; desc
< end
; ++desc
)
1188 if (InterlockedCompareExchange((LONG
*)&(*desc
)->outcome
,
1189 (LONG
)PR_MW_INTERRUPT
, (LONG
)PR_MW_PENDING
)
1190 == (LONG
)PR_MW_PENDING
)
1192 PRFileDesc
*bottom
= PR_GetIdentitiesLayer(
1193 (*desc
)->fd
, PR_NSPR_IO_LAYER
);
1194 PR_ASSERT(NULL
!= bottom
);
1197 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
1200 bottom
->secret
->state
= _PR_FILEDESC_CLOSED
;
1202 fprintf(stderr
, "cancel wait group: closing socket\n");
1204 if (closesocket(bottom
->secret
->md
.osfd
) == SOCKET_ERROR
)
1206 fprintf(stderr
, "closesocket failed: %d\n",
1213 while (group
->waiter
->count
> 0)
1215 _PR_THREAD_LOCK(me
);
1216 me
->state
= _PR_IO_WAIT
;
1217 PR_APPEND_LINK(&me
->waitQLinks
, &group
->wait_list
);
1218 if (!_PR_IS_NATIVE_THREAD(me
))
1220 _PR_SLEEPQ_LOCK(me
->cpu
);
1221 _PR_ADD_SLEEPQ(me
, PR_INTERVAL_NO_TIMEOUT
);
1222 _PR_SLEEPQ_UNLOCK(me
->cpu
);
1224 _PR_THREAD_UNLOCK(me
);
1225 _PR_MD_UNLOCK(&group
->mdlock
);
1226 PR_Unlock(group
->ml
);
1227 _PR_MD_WAIT(me
, PR_INTERVAL_NO_TIMEOUT
);
1228 me
->state
= _PR_RUNNING
;
1230 _PR_MD_LOCK(&group
->mdlock
);
1233 for (desc
= &group
->waiter
->recv_wait
; group
->waiter
->count
> 0; ++desc
)
1235 PR_ASSERT(desc
< &group
->waiter
->recv_wait
+ group
->waiter
->length
);
1237 _MW_DoneInternal(group
, desc
, PR_MW_INTERRUPT
);
1241 /* take first element of finished list and return it or NULL */
1242 if (PR_CLIST_IS_EMPTY(&group
->io_ready
))
1243 PR_SetError(PR_GROUP_EMPTY_ERROR
, 0);
1246 PRCList
*head
= PR_LIST_HEAD(&group
->io_ready
);
1247 PR_REMOVE_AND_INIT_LINK(head
);
1249 overlapped
= (_MDOverlapped
*)
1250 ((char *)head
- offsetof(_MDOverlapped
, data
));
1251 head
= &overlapped
->data
.mw
.desc
->internal
;
1252 if (NULL
!= overlapped
->data
.mw
.timer
)
1254 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1255 != overlapped
->data
.mw
.desc
->timeout
);
1256 CancelTimer(overlapped
->data
.mw
.timer
);
1260 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1261 == overlapped
->data
.mw
.desc
->timeout
);
1263 PR_DELETE(overlapped
);
1265 recv_wait
= (PRRecvWait
*)head
;
1269 _PR_MD_UNLOCK(&group
->mdlock
);
1271 PR_Unlock(group
->ml
);
1274 } /* PR_CancelWaitGroup */
1276 PR_IMPLEMENT(PRWaitGroup
*) PR_CreateWaitGroup(PRInt32 size
/* ignored */)
1279 #pragma unused (size)
1283 if (NULL
== (wg
= PR_NEWZAP(PRWaitGroup
)))
1285 PR_SetError(PR_OUT_OF_MEMORY_ERROR
, 0);
1288 /* the wait group itself */
1289 wg
->ml
= PR_NewLock();
1290 if (NULL
== wg
->ml
) goto failed_lock
;
1291 wg
->io_taken
= PR_NewCondVar(wg
->ml
);
1292 if (NULL
== wg
->io_taken
) goto failed_cvar0
;
1293 wg
->io_complete
= PR_NewCondVar(wg
->ml
);
1294 if (NULL
== wg
->io_complete
) goto failed_cvar1
;
1295 wg
->new_business
= PR_NewCondVar(wg
->ml
);
1296 if (NULL
== wg
->new_business
) goto failed_cvar2
;
1297 wg
->mw_manage
= PR_NewCondVar(wg
->ml
);
1298 if (NULL
== wg
->mw_manage
) goto failed_cvar3
;
1300 PR_INIT_CLIST(&wg
->group_link
);
1301 PR_INIT_CLIST(&wg
->io_ready
);
1303 /* the waiters sequence */
1304 wg
->waiter
= (_PRWaiterHash
*)PR_CALLOC(
1305 sizeof(_PRWaiterHash
) +
1306 (_PR_DEFAULT_HASH_LENGTH
* sizeof(PRRecvWait
*)));
1307 if (NULL
== wg
->waiter
)
1309 PR_SetError(PR_OUT_OF_MEMORY_ERROR
, 0);
1312 wg
->waiter
->count
= 0;
1313 wg
->waiter
->length
= _PR_DEFAULT_HASH_LENGTH
;
1316 _PR_MD_NEW_LOCK(&wg
->mdlock
);
1317 PR_INIT_CLIST(&wg
->wait_list
);
1321 PR_APPEND_LINK(&wg
->group_link
, &mw_state
->group_list
);
1326 PR_DestroyCondVar(wg
->mw_manage
);
1328 PR_DestroyCondVar(wg
->new_business
);
1330 PR_DestroyCondVar(wg
->io_complete
);
1332 PR_DestroyCondVar(wg
->io_taken
);
1334 PR_DestroyLock(wg
->ml
);
1341 } /* MW_CreateWaitGroup */
1343 PR_IMPLEMENT(PRStatus
) PR_DestroyWaitGroup(PRWaitGroup
*group
)
1345 PRStatus rv
= PR_SUCCESS
;
1346 if (NULL
== group
) group
= mw_state
->group
;
1347 PR_ASSERT(NULL
!= group
);
1351 if ((group
->waiting_threads
== 0)
1352 && (group
->waiter
->count
== 0)
1353 && PR_CLIST_IS_EMPTY(&group
->io_ready
))
1355 group
->state
= _prmw_stopped
;
1359 PR_SetError(PR_INVALID_STATE_ERROR
, 0);
1362 PR_Unlock(group
->ml
);
1363 if (PR_FAILURE
== rv
) return rv
;
1366 PR_REMOVE_LINK(&group
->group_link
);
1371 * XXX make sure wait_list is empty and waiter is empty.
1372 * These must be checked while holding mdlock.
1374 _PR_MD_FREE_LOCK(&group
->mdlock
);
1377 PR_DELETE(group
->waiter
);
1378 PR_DELETE(group
->polling_list
);
1379 PR_DestroyCondVar(group
->mw_manage
);
1380 PR_DestroyCondVar(group
->new_business
);
1381 PR_DestroyCondVar(group
->io_complete
);
1382 PR_DestroyCondVar(group
->io_taken
);
1383 PR_DestroyLock(group
->ml
);
1384 if (group
== mw_state
->group
) mw_state
->group
= NULL
;
1389 /* The default wait group is not created yet. */
1390 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
1394 } /* PR_DestroyWaitGroup */
1396 /**********************************************************************
1397 ***********************************************************************
1398 ******************** Wait group enumerations **************************
1399 ***********************************************************************
1400 **********************************************************************/
1402 PR_IMPLEMENT(PRMWaitEnumerator
*) PR_CreateMWaitEnumerator(PRWaitGroup
*group
)
1404 PRMWaitEnumerator
*enumerator
= PR_NEWZAP(PRMWaitEnumerator
);
1405 if (NULL
== enumerator
) PR_SetError(PR_OUT_OF_MEMORY_ERROR
, 0);
1408 enumerator
->group
= group
;
1409 enumerator
->seal
= _PR_ENUM_SEALED
;
1412 } /* PR_CreateMWaitEnumerator */
1414 PR_IMPLEMENT(PRStatus
) PR_DestroyMWaitEnumerator(PRMWaitEnumerator
* enumerator
)
1416 PR_ASSERT(NULL
!= enumerator
);
1417 PR_ASSERT(_PR_ENUM_SEALED
== enumerator
->seal
);
1418 if ((NULL
== enumerator
) || (_PR_ENUM_SEALED
!= enumerator
->seal
))
1420 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
1423 enumerator
->seal
= _PR_ENUM_UNSEALED
;
1424 PR_Free(enumerator
);
1426 } /* PR_DestroyMWaitEnumerator */
1428 PR_IMPLEMENT(PRRecvWait
*) PR_EnumerateWaitGroup(
1429 PRMWaitEnumerator
*enumerator
, const PRRecvWait
*previous
)
1431 PRRecvWait
*result
= NULL
;
1433 /* entry point sanity checking */
1434 PR_ASSERT(NULL
!= enumerator
);
1435 PR_ASSERT(_PR_ENUM_SEALED
== enumerator
->seal
);
1436 if ((NULL
== enumerator
)
1437 || (_PR_ENUM_SEALED
!= enumerator
->seal
)) goto bad_argument
;
1439 /* beginning of enumeration */
1440 if (NULL
== previous
)
1442 if (NULL
== enumerator
->group
)
1444 enumerator
->group
= mw_state
->group
;
1445 if (NULL
== enumerator
->group
)
1447 PR_SetError(PR_GROUP_EMPTY_ERROR
, 0);
1451 enumerator
->waiter
= &enumerator
->group
->waiter
->recv_wait
;
1452 enumerator
->p_timestamp
= enumerator
->group
->p_timestamp
;
1453 enumerator
->thread
= PR_GetCurrentThread();
1454 enumerator
->index
= 0;
1456 /* continuing an enumeration */
1459 PRThread
*me
= PR_GetCurrentThread();
1460 PR_ASSERT(me
== enumerator
->thread
);
1461 if (me
!= enumerator
->thread
) goto bad_argument
;
1463 /* need to restart the enumeration */
1464 if (enumerator
->p_timestamp
!= enumerator
->group
->p_timestamp
)
1465 return PR_EnumerateWaitGroup(enumerator
, NULL
);
1468 /* actually progress the enumeration */
1470 _PR_MD_LOCK(&enumerator
->group
->mdlock
);
1472 PR_Lock(enumerator
->group
->ml
);
1474 while (enumerator
->index
++ < enumerator
->group
->waiter
->length
)
1476 if (NULL
!= (result
= *(enumerator
->waiter
)++)) break;
1479 _PR_MD_UNLOCK(&enumerator
->group
->mdlock
);
1481 PR_Unlock(enumerator
->group
->ml
);
1484 return result
; /* what we live for */
1487 PR_SetError(PR_INVALID_ARGUMENT_ERROR
, 0);
1488 return NULL
; /* probably ambiguous */
1489 } /* PR_EnumerateWaitGroup */