Cygwin: strptime: add release note
[newlib-cygwin.git] / winsup / cygwin / aio.cc
blobc4b3389c46aebcdd79693689cfdd3275fbe8437b
1 /* aio.cc: Posix asynchronous i/o functions.
3 This file is part of Cygwin.
5 This software is a copyrighted work licensed under the terms of the
6 Cygwin license. Please consult the file "CYGWIN_LICENSE" for
7 details. */
9 #include "winsup.h"
10 #include "path.h"
11 #include "fhandler.h"
12 #include "dtable.h"
13 #include "cygheap.h"
14 #include "sigproc.h"
15 #include <aio.h>
16 #include <fcntl.h>
17 #include <semaphore.h>
18 #include <unistd.h>
20 #ifdef __cplusplus
21 extern "C" {
22 #endif
24 /* 'aioinitialized' is a thread-safe status of AIO feature initialization:
25 * 0 means uninitialized, >0 means initializing, <0 means initialized
27 static NO_COPY volatile LONG aioinitialized = 0;
29 /* This implementation supports two flavors of asynchronous operation:
30 * "inline" and "queued". Inline AIOs are used when:
31 * (1) fd refers to a local non-locked disk file opened in binary mode,
32 * (2) no more than AIO_MAX inline AIOs will be in progress at same time.
33 * In all other cases queued AIOs will be used.
35 * An inline AIO is performed by the calling app's thread as a pread|pwrite on
36 * a shadow fd that permits Windows asynchronous i/o, with event notification
37 * on completion. Event arrival causes AIO context for the fd to be updated.
39 * A queued AIO is performed in a similar manner, but by an AIO worker thread
40 * rather than the calling app's thread. The queued flavor can also operate
41 * on sockets, pipes, non-binary files, mandatory-locked files, and files
42 * that don't support pread|pwrite. Generally all these cases are handled as
43 * synchronous read|write operations, but still don't delay the app because
44 * they're taken care of by AIO worker threads.
47 /* These variables support inline AIO operations */
48 static NO_COPY HANDLE evt_handles[AIO_MAX];
49 static NO_COPY struct aiocb *evt_aiocbs[AIO_MAX];
50 static NO_COPY CRITICAL_SECTION evt_locks[AIO_MAX]; /* per-slot locks */
51 static NO_COPY CRITICAL_SECTION slotcrit; /* lock for slot variables in toto */
53 /* These variables support queued AIO operations */
54 static NO_COPY sem_t worksem; /* tells whether AIOs are queued */
55 static NO_COPY CRITICAL_SECTION workcrit; /* lock for AIO work queue */
56 TAILQ_HEAD(queue, aiocb) worklist = TAILQ_HEAD_INITIALIZER(worklist);
58 static int
59 aiochkslot (struct aiocb *aio)
61 EnterCriticalSection (&slotcrit);
63 /* Sanity check.. make sure this AIO is not already busy */
64 for (int slot = 0; slot < AIO_MAX; ++slot)
65 if (evt_aiocbs[slot] == aio)
67 debug_printf ("aio %p is already busy in slot %d", aio, slot);
68 LeaveCriticalSection (&slotcrit);
69 return slot;
72 LeaveCriticalSection (&slotcrit);
73 return -1;
76 static int
77 aiogetslot (struct aiocb *aio)
79 EnterCriticalSection (&slotcrit);
81 /* Find free slot for this inline AIO; if none available AIO will be queued */
82 for (int slot = 0; slot < AIO_MAX; ++slot)
83 if (evt_aiocbs[slot] == NULL)
85 /* If aio is NULL this is just an availability check.. no change made */
86 if (aio)
87 evt_aiocbs[slot] = aio;
88 LeaveCriticalSection (&slotcrit);
89 return slot;
92 LeaveCriticalSection (&slotcrit);
93 return -1;
96 static int
97 aiorelslot (struct aiocb *aio)
99 EnterCriticalSection (&slotcrit);
101 /* Find slot associated with this inline AIO and free it */
102 for (int slot = 0; slot < AIO_MAX; ++slot)
103 if (evt_aiocbs[slot] == aio)
105 evt_aiocbs[slot] = NULL;
106 LeaveCriticalSection (&slotcrit);
107 return slot;
110 LeaveCriticalSection (&slotcrit);
111 return -1;
114 static void
115 aionotify_on_pthread (struct sigevent *evp)
117 pthread_attr_t *attr;
118 pthread_attr_t default_attr;
119 int rc;
120 pthread_t vaquita; /* == "little porpoise", endangered, see below */
122 if (evp->sigev_notify_attributes)
123 attr = evp->sigev_notify_attributes;
124 else
126 pthread_attr_init (attr = &default_attr);
127 pthread_attr_setdetachstate (attr, PTHREAD_CREATE_DETACHED);
130 /* A "vaquita" thread is a temporary pthread created to deliver a signal to
131 * the application. We don't wait around for the thread to return from the
132 * app. There's some symbolism here of sending a little creature off to tell
133 * the app something important. If all the vaquitas end up wiped out in the
134 * wild, a distinct near-term possibility, at least this code remembers them.
136 rc = pthread_create (&vaquita, attr,
137 (void * (*) (void *)) evp->sigev_notify_function,
138 evp->sigev_value.sival_ptr);
140 /* The following error is not expected. If seen often, develop a recovery. */
141 if (rc)
142 debug_printf ("aio vaquita thread creation failed, %E");
144 /* Should we wait for the signal delivery thread to finish? We can't: Who
145 * knows what mischief the app coder may have in their handler? Worst case
146 * is they accidentally used non-signal-safe functions in their handler. We
147 * return hoping for the best and finish cleaning up our end of notification.
149 return;
152 static void
153 aionotify (struct aiocb *aio)
155 siginfo_t si = {0};
156 si.si_code = SI_ASYNCIO;
158 /* If signal notification wanted, send AIO-complete signal */
159 switch (aio->aio_sigevent.sigev_notify) {
160 case SIGEV_NONE:
161 break;
163 case SIGEV_SIGNAL:
164 si.si_signo = aio->aio_sigevent.sigev_signo;
165 si.si_value = aio->aio_sigevent.sigev_value;
166 if (si.si_signo)
167 sig_send (myself, si);
168 break;
170 case SIGEV_THREAD:
171 aionotify_on_pthread (&aio->aio_sigevent);
172 break;
175 /* If this op is on LIO list and is last op, send LIO-complete signal */
176 if (aio->aio_liocb)
178 if (1 == InterlockedExchangeAdd (&aio->aio_liocb->lio_count, -1))
180 /* LIO's count has decremented to zero */
181 switch (aio->aio_liocb->lio_sigevent->sigev_notify) {
182 case SIGEV_NONE:
183 break;
185 case SIGEV_SIGNAL:
186 si.si_signo = aio->aio_liocb->lio_sigevent->sigev_signo;
187 si.si_value = aio->aio_liocb->lio_sigevent->sigev_value;
188 if (si.si_signo)
189 sig_send (myself, si);
190 break;
192 case SIGEV_THREAD:
193 aionotify_on_pthread (aio->aio_liocb->lio_sigevent);
194 break;
197 free (aio->aio_liocb);
198 aio->aio_liocb = NULL;
203 static DWORD __attribute__ ((noreturn))
204 aiowaiter (void *unused)
205 { /* One instance, called on its own cygthread; runs until program exits */
206 struct aiocb *aio;
208 while (1)
210 /* Wait forever for at least one event to be set */
211 DWORD res = WaitForMultipleObjects(AIO_MAX, evt_handles, FALSE, INFINITE);
212 switch (res)
214 case WAIT_FAILED:
215 api_fatal ("aiowaiter fatal error, %E");
217 default:
218 if (res < WAIT_OBJECT_0 || res >= WAIT_OBJECT_0 + AIO_MAX)
219 api_fatal ("aiowaiter unexpected WFMO result %d", res);
220 int slot = res - WAIT_OBJECT_0;
222 /* Guard against "saw completion before request finished" gotcha */
223 EnterCriticalSection (&evt_locks[slot]);
224 LeaveCriticalSection (&evt_locks[slot]);
226 aio = evt_aiocbs[slot];
227 debug_printf ("WFMO returns %d, aio %p", res, aio);
229 if (aio->aio_errno == EBUSY)
231 /* Capture Windows status and convert to Cygwin status */
232 NTSTATUS status = (NTSTATUS) aio->aio_wincb.status;
233 if (NT_SUCCESS (status))
235 aio->aio_rbytes = (ssize_t) aio->aio_wincb.info;
236 aio->aio_errno = 0;
238 else
240 aio->aio_rbytes = -1;
241 aio->aio_errno = geterrno_from_nt_status (status);
244 else
246 /* Async operation was simulated; AIO status already updated */
249 /* Send completion signal if user requested it */
250 aionotify (aio);
252 /* Free up the slot used for this inline AIO. We do this
253 * manually rather than calling aiorelslot() because we
254 * already have the slot number handy.
256 EnterCriticalSection (&slotcrit);
257 evt_aiocbs[slot] = NULL;
258 LeaveCriticalSection (&slotcrit);
259 debug_printf ("retired aio %p; slot %d released", aio, slot);
261 /* Notify workers that a slot has opened up */
262 sem_post (&worksem);
267 static ssize_t
268 asyncread (struct aiocb *aio)
269 { /* Try to initiate an asynchronous read, either from app or worker thread */
270 ssize_t res = 0;
272 cygheap_fdget cfd (aio->aio_fildes);
273 if (cfd < 0)
274 res = -1; /* errno has been set to EBADF */
275 else
277 int slot = aiogetslot (aio);
278 debug_printf ("slot %d%s", slot, slot >= 0 ? " acquired" : "");
279 if (slot >= 0)
281 EnterCriticalSection (&evt_locks[slot]);
282 aio->aio_errno = EBUSY; /* Mark AIO as physically underway now */
283 aio->aio_wincb.event = (void *) evt_handles[slot];
284 res = cfd->pread ((void *) aio->aio_buf, aio->aio_nbytes,
285 aio->aio_offset, (void *) aio);
286 LeaveCriticalSection (&evt_locks[slot]);
288 else
290 set_errno (ENOBUFS); /* Internal use only */
291 res = -1;
295 return res;
298 static ssize_t
299 asyncwrite (struct aiocb *aio)
300 { /* Try to initiate an asynchronous write, either from app or worker thread */
301 ssize_t res = 0;
303 cygheap_fdget cfd (aio->aio_fildes);
304 if (cfd < 0)
305 res = -1; /* errno has been set to EBADF */
306 else
308 int slot = aiogetslot (aio);
309 debug_printf ("slot %d%s", slot, slot >= 0 ? " acquired" : "");
310 if (slot >= 0)
312 EnterCriticalSection (&evt_locks[slot]);
313 aio->aio_errno = EBUSY; /* Mark AIO as physically underway now */
314 aio->aio_wincb.event = (void *) evt_handles[slot];
315 res = cfd->pwrite ((void *) aio->aio_buf, aio->aio_nbytes,
316 aio->aio_offset, (void *) aio);
317 LeaveCriticalSection (&evt_locks[slot]);
319 else
321 set_errno (ENOBUFS); /* Internal use only */
322 res = -1;
326 return res;
329 /* Have to forward ref because of chicken v. egg situation */
330 static DWORD __attribute__ ((noreturn)) aioworker (void *);
332 static void
333 aioinit (void)
335 /* First a cheap test to speed processing after initialization completes */
336 if (aioinitialized >= 0)
338 /* Guard against multiple threads initializing at same time */
339 if (0 == InterlockedExchangeAdd (&aioinitialized, 1))
341 int i = AIO_MAX;
342 char *tnames = (char *) malloc (AIO_MAX * 8);
344 if (!tnames)
345 api_fatal ("couldn't create aioworker tname table");
347 InitializeCriticalSection (&slotcrit);
348 InitializeCriticalSection (&workcrit);
349 sem_init (&worksem, 0, 0);
350 TAILQ_INIT(&worklist);
352 /* Create AIO_MAX number of aioworker threads for queued AIOs */
353 while (i--)
355 __small_sprintf (&tnames[i * 8], "aio%d", AIO_MAX - i);
356 if (!new cygthread (aioworker, NULL, &tnames[i * 8]))
357 api_fatal ("couldn't create an aioworker thread, %E");
360 /* Initialize event handles and slot locks arrays for inline AIOs */
361 for (i = 0; i < AIO_MAX; ++i)
363 /* Events are non-inheritable, auto-reset, init unset, unnamed */
364 evt_handles[i] = CreateEvent (NULL, FALSE, FALSE, NULL);
365 if (!evt_handles[i])
366 api_fatal ("couldn't create an event, %E");
368 InitializeCriticalSection (&evt_locks[i]);
371 /* Create aiowaiter thread; waits for inline AIO completion events */
372 if (!new cygthread (aiowaiter, NULL, "aio"))
373 api_fatal ("couldn't create aiowaiter thread, %E");
375 /* Indicate we have completed initialization */
376 InterlockedExchange (&aioinitialized, -1);
378 else
379 /* If 'aioinitialized' is greater than zero, another thread is
380 * initializing for us; wait until 'aioinitialized' goes negative
382 while (InterlockedExchangeAdd (&aioinitialized, 0) >= 0)
383 yield ();
387 static int
388 aioqueue (struct aiocb *aio)
389 { /* Add an AIO to the worklist, to be serviced by a worker thread */
390 if (aioinitialized >= 0)
391 aioinit ();
393 EnterCriticalSection (&workcrit);
394 TAILQ_INSERT_TAIL(&worklist, aio, aio_chain);
395 LeaveCriticalSection (&workcrit);
397 debug_printf ("queued aio %p", aio);
398 sem_post (&worksem);
400 return 0;
403 static DWORD __attribute__ ((noreturn))
404 aioworker (void *unused)
405 { /* Multiple instances; called on own cygthreads; runs 'til program exits */
406 struct aiocb *aio;
408 while (1)
410 /* Park here until there's work to do or a slot becomes available */
411 sem_wait (&worksem);
413 look4work:
414 EnterCriticalSection (&workcrit);
415 if (TAILQ_EMPTY(&worklist))
417 /* Another aioworker picked up the work already */
418 LeaveCriticalSection (&workcrit);
419 continue;
422 /* Make sure a slot is available before starting this AIO */
423 aio = TAILQ_FIRST(&worklist);
424 int slot = aiogetslot (NULL);
425 if (slot >= 0) // a slot is available
426 TAILQ_REMOVE(&worklist, aio, aio_chain);
427 LeaveCriticalSection (&workcrit);
428 if (slot < 0) // no slot is available, so worklist unchanged and we park
429 continue;
431 debug_printf ("starting aio %p", aio);
432 switch (aio->aio_lio_opcode)
434 case LIO_NOP:
435 aio->aio_rbytes = 0;
436 break;
438 case LIO_READ:
439 aio->aio_rbytes = asyncread (aio);
440 break;
442 case LIO_WRITE:
443 aio->aio_rbytes = asyncwrite (aio);
444 break;
446 default:
447 errno = EINVAL;
448 aio->aio_rbytes = -1;
449 break;
452 /* If operation still underway, let aiowaiter hear about its finish */
453 if (aio->aio_rbytes == 0 && aio->aio_nbytes != 0) // not racy
454 continue;
456 /* If operation errored, save error number, else clear it */
457 if (aio->aio_rbytes == -1)
458 aio->aio_errno = get_errno ();
459 else
460 aio->aio_errno = 0;
462 /* If a slot for this queued async AIO was available, but we lost out */
463 if (aio->aio_errno == ENOBUFS)
465 aio->aio_errno = EINPROGRESS;
466 aioqueue (aio); /* Re-queue the AIO */
468 /* Another option would be to fail the AIO with error EAGAIN, but
469 * experience with iozone showed apps might not expect to see a
470 * deferred EAGAIN. I.e. they should expect EAGAIN on their call to
471 * aio_read() or aio_write() but probably not expect to see EAGAIN
472 * on an aio_error() query after they'd previously seen EINPROGRESS
473 * on the initial AIO call.
475 continue;
478 /* If seeks aren't permitted on given fd, or pread|pwrite not legal */
479 if (aio->aio_errno == ESPIPE)
481 ssize_t res = 0;
482 off_t curpos;
484 cygheap_fdget cfd (aio->aio_fildes);
485 if (cfd < 0)
487 res = -1;
488 goto done; /* errno has been set to EBADF */
491 /* If we can get current file position, seek to aio_offset */
492 curpos = cfd->lseek (0, SEEK_CUR);
493 if (curpos < 0 || cfd->lseek (aio->aio_offset, SEEK_SET) < 0)
495 /* Can't seek */
496 res = curpos;
497 set_errno (0); /* Get rid of ESPIPE we've incurred */
498 aio->aio_errno = 0; /* Here too */
501 /* Do the requested AIO operation manually, synchronously */
502 switch (aio->aio_lio_opcode)
504 case LIO_READ:
505 /* 2nd argument to cfd->read() is passed by reference... */
506 cfd->read ((void *) aio->aio_buf, aio->aio_nbytes);
507 /* ...so on return it contains the number of bytes read */
508 res = aio->aio_nbytes;
509 break;
511 case LIO_WRITE:
512 res = cfd->write ((void *) aio->aio_buf, aio->aio_nbytes);
513 break;
516 /* If we had seeked successfully, restore original file position */
517 if (curpos >= 0)
518 if (cfd->lseek (curpos, SEEK_SET) < 0)
519 res = -1;
521 done:
522 /* Update AIO to reflect final result */
523 aio->aio_rbytes = res;
524 aio->aio_errno = res == -1 ? get_errno () : 0;
526 /* Make like the requested async operation completed normally */
527 for (int i = 0; i < AIO_MAX; i++)
528 if (evt_aiocbs[i] == aio)
530 SetEvent (evt_handles[i]);
531 goto truly_done;
534 /* Free up the slot we ended up not using */
535 int slot = aiorelslot (aio);
536 debug_printf ("slot %d released", slot);
539 /* Send completion signal if user requested it */
540 aionotify (aio);
542 truly_done:
543 debug_printf ("completed aio %p", aio);
544 goto look4work;
549 aio_cancel (int fildes, struct aiocb *aio)
551 int aiocount = 0;
552 struct aiocb *ptr;
553 siginfo_t si = {0};
554 si.si_code = SI_ASYNCIO;
556 /* Note 'aio' is allowed to be NULL here; it's used as a wildcard */
557 restart:
558 EnterCriticalSection (&workcrit);
559 TAILQ_FOREACH(ptr, &worklist, aio_chain)
561 if (ptr->aio_fildes == fildes && (!aio || ptr == aio))
563 /* This queued AIO qualifies for cancellation */
564 TAILQ_REMOVE(&worklist, ptr, aio_chain);
565 LeaveCriticalSection (&workcrit);
567 ptr->aio_errno = ECANCELED;
568 ptr->aio_rbytes = -1;
570 /* If signal notification wanted, send AIO-canceled signal */
571 switch (ptr->aio_sigevent.sigev_notify) {
572 case SIGEV_NONE:
573 break;
575 case SIGEV_SIGNAL:
576 si.si_signo = ptr->aio_sigevent.sigev_signo;
577 si.si_value = ptr->aio_sigevent.sigev_value;
578 if (si.si_signo)
579 sig_send (myself, si);
580 break;
582 case SIGEV_THREAD:
583 aionotify_on_pthread (&ptr->aio_sigevent);
584 break;
587 ++aiocount;
588 goto restart;
591 LeaveCriticalSection (&workcrit);
593 /* Note that AIO_NOTCANCELED is not possible in this implementation. That's
594 * because AIOs are dequeued to execute; the worklist search above won't
595 * find an AIO that's been dequeued from the worklist.
597 if (aiocount)
598 return AIO_CANCELED;
599 else
600 return AIO_ALLDONE;
604 aio_error (const struct aiocb *aio)
606 int res;
608 if (!aio)
610 set_errno (EINVAL);
611 return -1;
614 switch (aio->aio_errno)
616 case EBUSY: /* This state for internal use only; not visible to app */
617 case ENOBUFS: /* This state for internal use only; not visible to app */
618 res = EINPROGRESS;
619 break;
621 default:
622 res = aio->aio_errno;
625 return res;
629 aio_fsync (int mode, struct aiocb *aio)
631 if (!aio)
633 set_errno (EINVAL);
634 return -1;
637 switch (mode)
639 #if defined(O_SYNC)
640 case O_SYNC:
641 aio->aio_rbytes = fsync (aio->aio_fildes);
642 break;
644 #if defined(O_DSYNC) && O_DSYNC != O_SYNC
645 case O_DSYNC:
646 aio->aio_rbytes = fdatasync (aio->aio_fildes);
647 break;
648 #endif
649 #endif
651 default:
652 set_errno (EINVAL);
653 return -1;
656 if (aio->aio_rbytes == -1)
657 aio->aio_errno = get_errno ();
659 return aio->aio_rbytes;
663 aio_read (struct aiocb *aio)
665 ssize_t res = 0;
667 if (!aio)
669 set_errno (EINVAL);
670 return -1;
672 if (aioinitialized >= 0)
673 aioinit ();
674 if (aio->aio_errno == EINPROGRESS || -1 != aiochkslot (aio))
676 set_errno (EAGAIN);
677 return -1;
680 aio->aio_lio_opcode = LIO_READ;
681 aio->aio_errno = EINPROGRESS;
682 aio->aio_rbytes = -1;
684 /* Ensure zeroed (i.e. initialized but unused) aio_sigevent doesn't signal */
685 if (aio->aio_sigevent.sigev_signo == 0)
686 aio->aio_sigevent.sigev_notify = SIGEV_NONE;
688 /* Try to launch inline async read; only on ESPIPE/ENOBUFS is it queued */
689 pthread_testcancel ();
690 res = asyncread (aio);
692 /* If async read couldn't be launched, queue the AIO for a worker thread */
693 if (res == -1)
694 switch (get_errno ()) {
695 case ESPIPE:
697 int slot = aiorelslot (aio);
698 if (slot >= 0)
699 debug_printf ("slot %d released", slot);
701 fallthrough;
703 case ENOBUFS:
704 aio->aio_errno = EINPROGRESS;
705 aio->aio_rbytes = -1;
707 res = aioqueue (aio);
708 break;
710 default:
711 ; /* I think this is not possible */
714 return res < 0 ? (int) res : 0; /* return 0 on success */
717 ssize_t
718 aio_return (struct aiocb *aio)
720 if (!aio)
722 set_errno (EINVAL);
723 return -1;
726 switch (aio->aio_errno)
728 case EBUSY: /* AIO is currently underway (internal state) */
729 case ENOBUFS: /* AIO is currently underway (internal state) */
730 case EINPROGRESS: /* AIO has been queued successfully */
731 set_errno (EINPROGRESS);
732 return -1;
734 case EINVAL: /* aio_return() has already been called on this AIO */
735 set_errno (aio->aio_errno);
736 return -1;
738 default: /* AIO has completed, successfully or not */
742 /* This AIO has completed so grab any error status if present */
743 if (aio->aio_rbytes == -1)
744 set_errno (aio->aio_errno);
746 /* Set this AIO's errno so later aio_return() calls on this AIO fail */
747 aio->aio_errno = EINVAL;
749 return aio->aio_rbytes;
752 static int
753 aiosuspend (const struct aiocb *const aiolist[],
754 int nent, const struct timespec *timeout)
756 /* Returns lowest list index of completed aios, else 'nent' if all completed.
757 * If none completed on entry, wait for interval specified by 'timeout'.
759 int res;
760 sigset_t sigmask;
761 siginfo_t si;
762 ULONGLONG nsecs = 0;
763 ULONGLONG time0, time1;
764 struct timespec to = {0};
766 if (timeout)
768 to = *timeout;
769 if (!valid_timespec (to))
771 set_errno (EINVAL);
772 return -1;
774 nsecs = (NSPERSEC * to.tv_sec) + to.tv_nsec;
777 retry:
778 sigemptyset (&sigmask);
779 int aiocount = 0;
780 for (int i = 0; i < nent; ++i)
781 if (aiolist[i] && aiolist[i]->aio_liocb)
783 if (aiolist[i]->aio_errno == EINPROGRESS ||
784 aiolist[i]->aio_errno == ENOBUFS ||
785 aiolist[i]->aio_errno == EBUSY)
787 ++aiocount;
788 if (aiolist[i]->aio_sigevent.sigev_notify == SIGEV_SIGNAL ||
789 aiolist[i]->aio_sigevent.sigev_notify == SIGEV_THREAD)
790 sigaddset (&sigmask, aiolist[i]->aio_sigevent.sigev_signo);
792 else
793 return i;
796 if (aiocount == 0)
797 return nent;
799 if (timeout && nsecs == 0)
801 set_errno (EAGAIN);
802 return -1;
805 time0 = get_clock (CLOCK_MONOTONIC)->nsecs ();
806 /* Note wait below is abortable even w/ empty sigmask and infinite timeout */
807 res = sigtimedwait (&sigmask, &si, timeout ? &to : NULL);
808 if (res == -1)
809 return -1; /* Return with errno set by failed sigtimedwait() */
810 time1 = get_clock (CLOCK_MONOTONIC)->nsecs ();
812 /* Adjust timeout to account for time just waited */
813 time1 -= time0;
814 if (time1 > nsecs)
815 nsecs = 0; // just in case we didn't get rescheduled very quickly
816 else
817 nsecs -= time1;
818 to.tv_sec = nsecs / NSPERSEC;
819 to.tv_nsec = nsecs % NSPERSEC;
821 goto retry;
825 aio_suspend (const struct aiocb *const aiolist[],
826 int nent, const struct timespec *timeout)
828 int res;
830 if (nent < 0)
832 set_errno (EINVAL);
833 return -1;
836 pthread_testcancel ();
837 res = aiosuspend (aiolist, nent, timeout);
839 /* If there was an error, or no AIOs completed before or during timeout */
840 if (res == -1)
841 return res; /* If no AIOs completed, errno has been set to EAGAIN */
843 /* Else if all AIOs have completed */
844 else if (res == nent)
845 return 0;
847 /* Else at least one of the AIOs completed */
848 else
849 return 0;
853 aio_write (struct aiocb *aio)
855 ssize_t res = 0;
857 if (!aio)
859 set_errno (EINVAL);
860 return -1;
862 if (aioinitialized >= 0)
863 aioinit ();
864 if (aio->aio_errno == EINPROGRESS || -1 != aiochkslot (aio))
866 set_errno (EAGAIN);
867 return -1;
870 aio->aio_lio_opcode = LIO_WRITE;
871 aio->aio_errno = EINPROGRESS;
872 aio->aio_rbytes = -1;
874 /* Ensure zeroed (i.e. initialized but unused) aio_sigevent doesn't signal */
875 if (aio->aio_sigevent.sigev_signo == 0)
876 aio->aio_sigevent.sigev_notify = SIGEV_NONE;
878 /* Try to launch inline async write; only on ESPIPE/ENOBUFS is it queued */
879 pthread_testcancel ();
880 res = asyncwrite (aio);
882 /* If async write couldn't be launched, queue the AIO for a worker thread */
883 if (res == -1)
884 switch (get_errno ()) {
885 case ESPIPE:
887 int slot = aiorelslot (aio);
888 if (slot >= 0)
889 debug_printf ("slot %d released", slot);
891 fallthrough;
893 case ENOBUFS:
894 aio->aio_errno = EINPROGRESS;
895 aio->aio_rbytes = -1;
897 res = aioqueue (aio);
898 break;
900 default:
901 ; /* I think this is not possible */
904 return res < 0 ? (int) res : 0; /* return 0 on success */
908 lio_listio (int mode, struct aiocb *__restrict const aiolist[__restrict_arr],
909 int nent, struct sigevent *__restrict sig)
911 struct aiocb *aio;
912 struct __liocb *lio;
914 pthread_testcancel ();
916 if ((mode != LIO_WAIT && mode != LIO_NOWAIT) ||
917 (nent < 0 || nent > AIO_LISTIO_MAX))
919 set_errno (EINVAL);
920 return -1;
923 if (sig && nent && mode == LIO_NOWAIT)
925 lio = (struct __liocb *) malloc (sizeof (struct __liocb));
926 if (!lio)
928 set_errno (ENOMEM);
929 return -1;
932 lio->lio_count = nent;
933 lio->lio_sigevent = sig;
935 else
936 lio = NULL;
938 int aiocount = 0;
939 for (int i = 0; i < nent; ++i)
941 aio = (struct aiocb *) aiolist[i];
942 if (!aio)
944 if (lio)
945 InterlockedDecrement (&lio->lio_count);
946 continue;
949 aio->aio_liocb = lio;
950 switch (aio->aio_lio_opcode)
952 case LIO_NOP:
953 if (lio)
954 InterlockedDecrement (&lio->lio_count);
955 continue;
957 case LIO_READ:
958 aio_read (aio);
959 ++aiocount;
960 continue;
962 case LIO_WRITE:
963 aio_write (aio);
964 ++aiocount;
965 continue;
967 default:
968 break;
971 if (lio)
972 InterlockedDecrement (&lio->lio_count);
973 aio->aio_errno = EINVAL;
974 aio->aio_rbytes = -1;
977 /* mode is LIO_NOWAIT so return some kind of answer immediately */
978 if (mode == LIO_NOWAIT)
980 /* At least one AIO has been launched or queued */
981 if (aiocount)
982 return 0;
984 /* No AIOs have been launched or queued */
985 set_errno (EAGAIN);
986 return -1;
989 /* Else mode is LIO_WAIT so wait for all AIOs to complete or error */
990 while (nent)
992 int i = aiosuspend ((const struct aiocb *const *) aiolist, nent, NULL);
993 if (i >= nent)
994 break;
995 else
996 aiolist[i]->aio_liocb = NULL; /* Avoids repeating notify on this AIO */
999 return 0;
1002 #ifdef __cplusplus
1004 #endif