4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
27 #pragma ident "%Z%%M% %I% %E% SMI"
30 #include "thr_uberdata.h"
33 #include <sys/param.h>
37 static int _aio_hash_insert(aio_result_t
*, aio_req_t
*);
38 static aio_req_t
*_aio_req_get(aio_worker_t
*);
39 static void _aio_req_add(aio_req_t
*, aio_worker_t
**, int);
40 static void _aio_req_del(aio_worker_t
*, aio_req_t
*, int);
41 static void _aio_work_done(aio_worker_t
*);
42 static void _aio_enq_doneq(aio_req_t
*);
44 extern void _aio_lio_free(aio_lio_t
*);
46 extern int __fdsync(int, int);
47 extern int __fcntl(int, int, ...);
48 extern int _port_dispatch(int, int, int, int, uintptr_t, void *);
50 static int _aio_fsync_del(aio_worker_t
*, aio_req_t
*);
51 static void _aiodone(aio_req_t
*, ssize_t
, int);
52 static void _aio_cancel_work(aio_worker_t
*, int, int *, int *);
53 static void _aio_finish_request(aio_worker_t
*, ssize_t
, int);
56 * switch for kernel async I/O
58 int _kaio_ok
= 0; /* 0 = disabled, 1 = on, -1 = error */
61 * Key for thread-specific data
63 pthread_key_t _aio_key
;
66 * Array for determining whether or not a file supports kaio.
67 * Initialized in _kaio_init().
69 uint32_t *_kaio_supported
= NULL
;
72 * workers for read/write requests
73 * (__aio_mutex lock protects circular linked list of workers)
75 aio_worker_t
*__workers_rw
; /* circular list of AIO workers */
76 aio_worker_t
*__nextworker_rw
; /* next worker in list of workers */
77 int __rw_workerscnt
; /* number of read/write workers */
80 * worker for notification requests.
82 aio_worker_t
*__workers_no
; /* circular list of AIO workers */
83 aio_worker_t
*__nextworker_no
; /* next worker in list of workers */
84 int __no_workerscnt
; /* number of write workers */
86 aio_req_t
*_aio_done_tail
; /* list of done requests */
87 aio_req_t
*_aio_done_head
;
89 mutex_t __aio_initlock
= DEFAULTMUTEX
; /* makes aio initialization atomic */
90 cond_t __aio_initcv
= DEFAULTCV
;
91 int __aio_initbusy
= 0;
93 mutex_t __aio_mutex
= DEFAULTMUTEX
; /* protects counts, and linked lists */
94 cond_t _aio_iowait_cv
= DEFAULTCV
; /* wait for userland I/Os */
96 pid_t __pid
= (pid_t
)-1; /* initialize as invalid pid */
97 int _sigio_enabled
= 0; /* when set, send SIGIO signal */
99 aio_hash_t
*_aio_hash
;
101 aio_req_t
*_aio_doneq
; /* double linked done queue list */
103 int _aio_donecnt
= 0;
104 int _aio_waitncnt
= 0; /* # of requests for aio_waitn */
105 int _aio_doneq_cnt
= 0;
106 int _aio_outstand_cnt
= 0; /* # of outstanding requests */
107 int _kaio_outstand_cnt
= 0; /* # of outstanding kaio requests */
108 int _aio_req_done_cnt
= 0; /* req. done but not in "done queue" */
109 int _aio_kernel_suspend
= 0; /* active kernel kaio calls */
110 int _aio_suscv_cnt
= 0; /* aio_suspend calls waiting on cv's */
112 int _max_workers
= 256; /* max number of workers permitted */
113 int _min_workers
= 4; /* min number of workers */
114 int _minworkload
= 2; /* min number of request in q */
115 int _aio_worker_cnt
= 0; /* number of workers to do requests */
116 int __uaio_ok
= 0; /* AIO has been enabled */
117 sigset_t _worker_set
; /* worker's signal mask */
119 int _aiowait_flag
= 0; /* when set, aiowait() is inprogress */
120 int _aio_flags
= 0; /* see asyncio.h defines for */
122 aio_worker_t
*_kaiowp
= NULL
; /* points to kaio cleanup thread */
124 int hz
; /* clock ticks per second */
127 _kaio_supported_init(void)
132 if (_kaio_supported
!= NULL
) /* already initialized */
135 size
= MAX_KAIO_FDARRAY_SIZE
* sizeof (uint32_t);
136 ptr
= mmap(NULL
, size
, PROT_READ
| PROT_WRITE
,
137 MAP_PRIVATE
| MAP_ANON
, -1, (off_t
)0);
138 if (ptr
== MAP_FAILED
)
140 _kaio_supported
= ptr
;
145 * The aio subsystem is initialized when an AIO request is made.
146 * Constants are initialized like the max number of workers that
147 * the subsystem can create, and the minimum number of workers
148 * permitted before imposing some restrictions. Also, some
149 * workers are created.
158 lmutex_lock(&__aio_initlock
);
159 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE
, &cancel_state
);
160 while (__aio_initbusy
)
161 (void) cond_wait(&__aio_initcv
, &__aio_initlock
);
162 (void) pthread_setcancelstate(cancel_state
, NULL
);
163 if (__uaio_ok
) { /* already initialized */
164 lmutex_unlock(&__aio_initlock
);
168 lmutex_unlock(&__aio_initlock
);
170 hz
= (int)sysconf(_SC_CLK_TCK
);
173 setup_cancelsig(SIGAIOCANCEL
);
175 if (_kaio_supported_init() != 0)
179 * Allocate and initialize the hash table.
180 * Do this only once, even if __uaio_init() is called twice.
182 if (_aio_hash
== NULL
) {
183 /* LINTED pointer cast */
184 _aio_hash
= mmap(NULL
,
185 HASHSZ
* sizeof (aio_hash_t
), PROT_READ
| PROT_WRITE
,
186 MAP_PRIVATE
| MAP_ANON
, -1, (off_t
)0);
187 if ((void *)_aio_hash
== MAP_FAILED
) {
191 for (i
= 0; i
< HASHSZ
; i
++)
192 (void) mutex_init(&_aio_hash
[i
].hash_lock
,
197 * Initialize worker's signal mask to only catch SIGAIOCANCEL.
199 (void) sigfillset(&_worker_set
);
200 (void) sigdelset(&_worker_set
, SIGAIOCANCEL
);
203 * Create one worker to send asynchronous notifications.
204 * Do this only once, even if __uaio_init() is called twice.
206 if (__no_workerscnt
== 0 &&
207 (_aio_create_worker(NULL
, AIONOTIFY
) != 0)) {
213 * Create the minimum number of read/write workers.
214 * And later check whether atleast one worker is created;
215 * lwp_create() calls could fail because of segkp exhaustion.
217 for (i
= 0; i
< _min_workers
; i
++)
218 (void) _aio_create_worker(NULL
, AIOREAD
);
219 if (__rw_workerscnt
== 0) {
226 lmutex_lock(&__aio_initlock
);
230 (void) cond_broadcast(&__aio_initcv
);
231 lmutex_unlock(&__aio_initlock
);
236 * Called from close() before actually performing the real _close().
241 if (fd
< 0) /* avoid cancelling everything */
244 * Cancel all outstanding aio requests for this file descriptor.
247 (void) aiocancel_all(fd
);
249 * If we have allocated the bit array, clear the bit for this file.
250 * The next open may re-use this file descriptor and the new file
251 * may have different kaio() behaviour.
253 if (_kaio_supported
!= NULL
)
254 CLEAR_KAIO_SUPPORTED(fd
);
258 * special kaio cleanup thread sits in a loop in the
259 * kernel waiting for pending kaio requests to complete.
262 _kaio_cleanup_thread(void *arg
)
264 if (pthread_setspecific(_aio_key
, arg
) != 0)
265 aio_panic("_kaio_cleanup_thread, pthread_setspecific()");
266 (void) _kaio(AIOSTART
);
280 lmutex_lock(&__aio_initlock
);
281 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE
, &cancel_state
);
282 while (__aio_initbusy
)
283 (void) cond_wait(&__aio_initcv
, &__aio_initlock
);
284 (void) pthread_setcancelstate(cancel_state
, NULL
);
285 if (_kaio_ok
) { /* already initialized */
286 lmutex_unlock(&__aio_initlock
);
290 lmutex_unlock(&__aio_initlock
);
292 if (_kaio_supported_init() != 0)
294 else if ((_kaiowp
= _aio_worker_alloc()) == NULL
)
296 else if ((error
= (int)_kaio(AIOINIT
)) == 0) {
297 (void) pthread_sigmask(SIG_SETMASK
, &maskset
, &oset
);
298 error
= thr_create(NULL
, AIOSTKSIZE
, _kaio_cleanup_thread
,
299 _kaiowp
, THR_DAEMON
, &_kaiowp
->work_tid
);
300 (void) pthread_sigmask(SIG_SETMASK
, &oset
, NULL
);
302 if (error
&& _kaiowp
!= NULL
) {
303 _aio_worker_free(_kaiowp
);
307 lmutex_lock(&__aio_initlock
);
313 (void) cond_broadcast(&__aio_initcv
);
314 lmutex_unlock(&__aio_initlock
);
318 aioread(int fd
, caddr_t buf
, int bufsz
, off_t offset
, int whence
,
319 aio_result_t
*resultp
)
321 return (_aiorw(fd
, buf
, bufsz
, offset
, whence
, resultp
, AIOREAD
));
325 aiowrite(int fd
, caddr_t buf
, int bufsz
, off_t offset
, int whence
,
326 aio_result_t
*resultp
)
328 return (_aiorw(fd
, buf
, bufsz
, offset
, whence
, resultp
, AIOWRITE
));
333 aioread64(int fd
, caddr_t buf
, int bufsz
, off64_t offset
, int whence
,
334 aio_result_t
*resultp
)
336 return (_aiorw(fd
, buf
, bufsz
, offset
, whence
, resultp
, AIOAREAD64
));
340 aiowrite64(int fd
, caddr_t buf
, int bufsz
, off64_t offset
, int whence
,
341 aio_result_t
*resultp
)
343 return (_aiorw(fd
, buf
, bufsz
, offset
, whence
, resultp
, AIOAWRITE64
));
345 #endif /* !defined(_LP64) */
348 _aiorw(int fd
, caddr_t buf
, int bufsz
, offset_t offset
, int whence
,
349 aio_result_t
*resultp
, int mode
)
354 struct stat64 stat64
;
365 if ((loffset
= llseek(fd
, 0, SEEK_CUR
)) == -1)
371 if (fstat64(fd
, &stat64
) == -1)
374 loffset
= offset
+ stat64
.st_size
;
384 /* initialize kaio */
389 * _aio_do_request() needs the original request code (mode) to be able
390 * to choose the appropiate 32/64 bit function. All other functions
391 * only require the difference between READ and WRITE (umode).
393 if (mode
== AIOAREAD64
|| mode
== AIOAWRITE64
)
394 umode
= mode
- AIOAREAD64
;
399 * Try kernel aio first.
400 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation.
402 if (_kaio_ok
> 0 && KAIO_SUPPORTED(fd
)) {
403 resultp
->aio_errno
= 0;
404 sig_mutex_lock(&__aio_mutex
);
405 _kaio_outstand_cnt
++;
406 sig_mutex_unlock(&__aio_mutex
);
407 kerr
= (int)_kaio(((resultp
->aio_return
== AIO_INPROGRESS
) ?
408 (umode
| AIO_POLL_BIT
) : umode
),
409 fd
, buf
, bufsz
, loffset
, resultp
);
413 sig_mutex_lock(&__aio_mutex
);
414 _kaio_outstand_cnt
--;
415 sig_mutex_unlock(&__aio_mutex
);
416 if (errno
!= ENOTSUP
&& errno
!= EBADFD
)
419 SET_KAIO_NOT_SUPPORTED(fd
);
422 if (!__uaio_ok
&& __uaio_init() == -1)
425 if ((reqp
= _aio_req_alloc()) == NULL
) {
431 * _aio_do_request() checks reqp->req_op to differentiate
432 * between 32 and 64 bit access.
435 reqp
->req_resultp
= resultp
;
436 ap
= &reqp
->req_args
;
440 ap
->offset
= loffset
;
442 if (_aio_hash_insert(resultp
, reqp
) != 0) {
448 * _aio_req_add() only needs the difference between READ and
449 * WRITE to choose the right worker queue.
451 _aio_req_add(reqp
, &__nextworker_rw
, umode
);
456 aiocancel(aio_result_t
*resultp
)
469 sig_mutex_lock(&__aio_mutex
);
470 reqp
= _aio_hash_find(resultp
);
472 if (_aio_outstand_cnt
== _aio_req_done_cnt
)
478 aiowp
= reqp
->req_worker
;
479 sig_mutex_lock(&aiowp
->work_qlock1
);
480 (void) _aio_cancel_req(aiowp
, reqp
, &canceled
, &done
);
481 sig_mutex_unlock(&aiowp
->work_qlock1
);
486 if (_aio_outstand_cnt
== 0 ||
487 _aio_outstand_cnt
== _aio_req_done_cnt
)
494 sig_mutex_unlock(&__aio_mutex
);
500 _aiowait_cleanup(void *arg
)
502 sig_mutex_lock(&__aio_mutex
);
504 sig_mutex_unlock(&__aio_mutex
);
508 * This must be asynch safe and cancel safe
511 aiowait(struct timeval
*uwait
)
513 aio_result_t
*uresultp
;
514 aio_result_t
*kresultp
;
515 aio_result_t
*resultp
;
519 struct timeval twait
;
520 struct timeval
*wait
= NULL
;
526 * Check for a valid specified wait time.
527 * If it is invalid, fail the call right away.
529 if (uwait
->tv_sec
< 0 || uwait
->tv_usec
< 0 ||
530 uwait
->tv_usec
>= MICROSEC
) {
532 return ((aio_result_t
*)-1);
535 if (uwait
->tv_sec
> 0 || uwait
->tv_usec
> 0) {
536 hrtend
= gethrtime() +
537 (hrtime_t
)uwait
->tv_sec
* NANOSEC
+
538 (hrtime_t
)uwait
->tv_usec
* (NANOSEC
/ MICROSEC
);
544 sig_mutex_lock(&__aio_mutex
);
545 if (_kaio_outstand_cnt
== 0) {
546 kresultp
= (aio_result_t
*)-1;
548 kresultp
= (aio_result_t
*)_kaio(AIOWAIT
,
549 (struct timeval
*)-1, 1);
550 if (kresultp
!= (aio_result_t
*)-1 &&
552 kresultp
!= (aio_result_t
*)1) {
553 _kaio_outstand_cnt
--;
554 sig_mutex_unlock(&__aio_mutex
);
558 uresultp
= _aio_req_done();
559 sig_mutex_unlock(&__aio_mutex
);
560 if (uresultp
!= NULL
&&
561 uresultp
!= (aio_result_t
*)-1) {
564 if (uresultp
== (aio_result_t
*)-1 &&
565 kresultp
== (aio_result_t
*)-1) {
567 return ((aio_result_t
*)-1);
575 sig_mutex_lock(&__aio_mutex
);
576 uresultp
= _aio_req_done();
577 if (uresultp
!= NULL
&& uresultp
!= (aio_result_t
*)-1) {
578 sig_mutex_unlock(&__aio_mutex
);
583 dontblock
= (uresultp
== (aio_result_t
*)-1);
584 if (dontblock
&& _kaio_outstand_cnt
== 0) {
585 kresultp
= (aio_result_t
*)-1;
588 sig_mutex_unlock(&__aio_mutex
);
589 pthread_cleanup_push(_aiowait_cleanup
, NULL
);
591 kresultp
= (aio_result_t
*)_kaio(AIOWAIT
,
594 pthread_cleanup_pop(0);
595 sig_mutex_lock(&__aio_mutex
);
599 sig_mutex_unlock(&__aio_mutex
);
600 if (kresultp
== (aio_result_t
*)1) {
601 /* aiowait() awakened by an aionotify() */
603 } else if (kresultp
!= NULL
&&
604 kresultp
!= (aio_result_t
*)-1) {
606 sig_mutex_lock(&__aio_mutex
);
607 _kaio_outstand_cnt
--;
608 sig_mutex_unlock(&__aio_mutex
);
610 } else if (kresultp
== (aio_result_t
*)-1 &&
611 kaio_errno
== EINVAL
&&
612 uresultp
== (aio_result_t
*)-1) {
614 resultp
= (aio_result_t
*)-1;
616 } else if (kresultp
== (aio_result_t
*)-1 &&
617 kaio_errno
== EINTR
) {
619 resultp
= (aio_result_t
*)-1;
621 } else if (timedwait
) {
622 hres
= hrtend
- gethrtime();
624 /* time is up; return */
629 * Some time left. Round up the remaining time
630 * in nanoseconds to microsec. Retry the call.
632 hres
+= (NANOSEC
/ MICROSEC
) - 1;
633 wait
->tv_sec
= hres
/ NANOSEC
;
635 (hres
% NANOSEC
) / (NANOSEC
/ MICROSEC
);
638 ASSERT(kresultp
== NULL
&& uresultp
== NULL
);
647 * _aio_get_timedelta calculates the remaining time and stores the result
648 * into timespec_t *wait.
652 _aio_get_timedelta(timespec_t
*end
, timespec_t
*wait
)
658 (void) gettimeofday(&cur
, NULL
);
659 curtime
.tv_sec
= cur
.tv_sec
;
660 curtime
.tv_nsec
= cur
.tv_usec
* 1000; /* convert us to ns */
662 if (end
->tv_sec
>= curtime
.tv_sec
) {
663 wait
->tv_sec
= end
->tv_sec
- curtime
.tv_sec
;
664 if (end
->tv_nsec
>= curtime
.tv_nsec
) {
665 wait
->tv_nsec
= end
->tv_nsec
- curtime
.tv_nsec
;
666 if (wait
->tv_sec
== 0 && wait
->tv_nsec
== 0)
667 ret
= -1; /* timer expired */
669 if (end
->tv_sec
> curtime
.tv_sec
) {
671 wait
->tv_nsec
= NANOSEC
-
672 (curtime
.tv_nsec
- end
->tv_nsec
);
674 ret
= -1; /* timer expired */
684 * If closing by file descriptor: we will simply cancel all the outstanding
685 * aio`s and return. Those aio's in question will have either noticed the
686 * cancellation notice before, during, or after initiating io.
689 aiocancel_all(int fd
)
692 aio_req_t
**reqpp
, *last
;
699 sig_mutex_lock(&__aio_mutex
);
701 if (_aio_outstand_cnt
== 0) {
702 sig_mutex_unlock(&__aio_mutex
);
703 return (AIO_ALLDONE
);
707 * Cancel requests from the read/write workers' queues.
709 first
= __nextworker_rw
;
712 _aio_cancel_work(next
, fd
, &canceled
, &done
);
713 } while ((next
= next
->work_forw
) != first
);
716 * finally, check if there are requests on the done queue that
717 * should be canceled.
721 reqpp
= &_aio_done_tail
;
722 last
= _aio_done_tail
;
723 while ((reqp
= *reqpp
) != NULL
) {
724 if (cancelall
|| reqp
->req_args
.fd
== fd
) {
725 *reqpp
= reqp
->req_next
;
727 last
= reqp
->req_next
;
729 if (_aio_done_head
== reqp
) {
730 /* this should be the last req in list */
731 _aio_done_head
= last
;
734 _aio_set_result(reqp
, -1, ECANCELED
);
735 (void) _aio_hash_del(reqp
->req_resultp
);
738 reqpp
= &reqp
->req_next
;
744 ASSERT(_aio_donecnt
== 0);
745 _aio_done_head
= NULL
;
747 sig_mutex_unlock(&__aio_mutex
);
749 if (canceled
&& done
== 0)
750 return (AIO_CANCELED
);
751 else if (done
&& canceled
== 0)
752 return (AIO_ALLDONE
);
753 else if ((canceled
+ done
== 0) && KAIO_SUPPORTED(fd
))
754 return ((int)_kaio(AIOCANCEL
, fd
, NULL
));
755 return (AIO_NOTCANCELED
);
759 * Cancel requests from a given work queue. If the file descriptor
760 * parameter, fd, is non-negative, then only cancel those requests
761 * in this queue that are to this file descriptor. If the fd
762 * parameter is -1, then cancel all requests.
765 _aio_cancel_work(aio_worker_t
*aiowp
, int fd
, int *canceled
, int *done
)
769 sig_mutex_lock(&aiowp
->work_qlock1
);
771 * cancel queued requests first.
773 reqp
= aiowp
->work_tail1
;
774 while (reqp
!= NULL
) {
775 if (fd
< 0 || reqp
->req_args
.fd
== fd
) {
776 if (_aio_cancel_req(aiowp
, reqp
, canceled
, done
)) {
778 * Callers locks were dropped.
779 * reqp is invalid; start traversing
780 * the list from the beginning again.
782 reqp
= aiowp
->work_tail1
;
786 reqp
= reqp
->req_next
;
789 * Since the queued requests have been canceled, there can
790 * only be one inprogress request that should be canceled.
792 if ((reqp
= aiowp
->work_req
) != NULL
&&
793 (fd
< 0 || reqp
->req_args
.fd
== fd
))
794 (void) _aio_cancel_req(aiowp
, reqp
, canceled
, done
);
795 sig_mutex_unlock(&aiowp
->work_qlock1
);
799 * Cancel a request. Return 1 if the callers locks were temporarily
800 * dropped, otherwise return 0.
803 _aio_cancel_req(aio_worker_t
*aiowp
, aio_req_t
*reqp
, int *canceled
, int *done
)
805 int ostate
= reqp
->req_state
;
807 ASSERT(MUTEX_HELD(&__aio_mutex
));
808 ASSERT(MUTEX_HELD(&aiowp
->work_qlock1
));
809 if (ostate
== AIO_REQ_CANCELED
)
811 if (ostate
== AIO_REQ_DONE
&& !POSIX_AIO(reqp
) &&
812 aiowp
->work_prev1
== reqp
) {
813 ASSERT(aiowp
->work_done1
!= 0);
815 * If not on the done queue yet, just mark it CANCELED,
816 * _aio_work_done() will do the necessary clean up.
817 * This is required to ensure that aiocancel_all() cancels
818 * all the outstanding requests, including this one which
819 * is not yet on done queue but has been marked done.
821 _aio_set_result(reqp
, -1, ECANCELED
);
822 (void) _aio_hash_del(reqp
->req_resultp
);
823 reqp
->req_state
= AIO_REQ_CANCELED
;
828 if (ostate
== AIO_REQ_DONE
|| ostate
== AIO_REQ_DONEQ
) {
832 if (reqp
->req_op
== AIOFSYNC
&& reqp
!= aiowp
->work_req
) {
833 ASSERT(POSIX_AIO(reqp
));
834 /* Cancel the queued aio_fsync() request */
835 if (!reqp
->req_head
->lio_canned
) {
836 reqp
->req_head
->lio_canned
= 1;
842 reqp
->req_state
= AIO_REQ_CANCELED
;
843 _aio_req_del(aiowp
, reqp
, ostate
);
844 (void) _aio_hash_del(reqp
->req_resultp
);
846 if (reqp
== aiowp
->work_req
) {
847 ASSERT(ostate
== AIO_REQ_INPROGRESS
);
849 * Set the result values now, before _aiodone() is called.
850 * We do this because the application can expect aio_return
851 * and aio_errno to be set to -1 and ECANCELED, respectively,
852 * immediately after a successful return from aiocancel()
855 _aio_set_result(reqp
, -1, ECANCELED
);
856 (void) thr_kill(aiowp
->work_tid
, SIGAIOCANCEL
);
859 if (!POSIX_AIO(reqp
)) {
861 _aio_set_result(reqp
, -1, ECANCELED
);
865 sig_mutex_unlock(&aiowp
->work_qlock1
);
866 sig_mutex_unlock(&__aio_mutex
);
867 _aiodone(reqp
, -1, ECANCELED
);
868 sig_mutex_lock(&__aio_mutex
);
869 sig_mutex_lock(&aiowp
->work_qlock1
);
874 _aio_create_worker(aio_req_t
*reqp
, int mode
)
876 aio_worker_t
*aiowp
, **workers
, **nextworker
;
878 void *(*func
)(void *);
883 * Put the new worker thread in the right queue.
894 workers
= &__workers_rw
;
895 nextworker
= &__nextworker_rw
;
896 aio_workerscnt
= &__rw_workerscnt
;
897 func
= _aio_do_request
;
900 workers
= &__workers_no
;
901 nextworker
= &__nextworker_no
;
902 func
= _aio_do_notify
;
903 aio_workerscnt
= &__no_workerscnt
;
906 aio_panic("_aio_create_worker: invalid mode");
910 if ((aiowp
= _aio_worker_alloc()) == NULL
)
914 reqp
->req_state
= AIO_REQ_QUEUED
;
915 reqp
->req_worker
= aiowp
;
916 aiowp
->work_head1
= reqp
;
917 aiowp
->work_tail1
= reqp
;
918 aiowp
->work_next1
= reqp
;
919 aiowp
->work_count1
= 1;
920 aiowp
->work_minload1
= 1;
923 (void) pthread_sigmask(SIG_SETMASK
, &maskset
, &oset
);
924 error
= thr_create(NULL
, AIOSTKSIZE
, func
, aiowp
,
925 THR_DAEMON
| THR_SUSPENDED
, &aiowp
->work_tid
);
926 (void) pthread_sigmask(SIG_SETMASK
, &oset
, NULL
);
930 reqp
->req_worker
= NULL
;
932 _aio_worker_free(aiowp
);
936 lmutex_lock(&__aio_mutex
);
938 if (*workers
== NULL
) {
939 aiowp
->work_forw
= aiowp
;
940 aiowp
->work_backw
= aiowp
;
944 aiowp
->work_backw
= (*workers
)->work_backw
;
945 aiowp
->work_forw
= (*workers
);
946 (*workers
)->work_backw
->work_forw
= aiowp
;
947 (*workers
)->work_backw
= aiowp
;
950 lmutex_unlock(&__aio_mutex
);
952 (void) thr_continue(aiowp
->work_tid
);
958 * This is the worker's main routine.
959 * The task of this function is to execute all queued requests;
960 * once the last pending request is executed this function will block
961 * in _aio_idle(). A new incoming request must wakeup this thread to
963 * Every worker has an own work queue. The queue lock is required
964 * to synchronize the addition of new requests for this worker or
965 * cancellation of pending/running requests.
967 * Cancellation scenarios:
968 * The cancellation of a request is being done asynchronously using
969 * _aio_cancel_req() from another thread context.
970 * A queued request can be cancelled in different manners :
971 * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED):
972 * - lock the queue -> remove the request -> unlock the queue
973 * - this function/thread does not detect this cancellation process
974 * b) request is in progress (AIO_REQ_INPROGRESS) :
975 * - this function first allow the cancellation of the running
976 * request with the flag "work_cancel_flg=1"
977 * see _aio_req_get() -> _aio_cancel_on()
978 * During this phase, it is allowed to interrupt the worker
979 * thread running the request (this thread) using the SIGAIOCANCEL
981 * Once this thread returns from the kernel (because the request
982 * is just done), then it must disable a possible cancellation
983 * and proceed to finish the request. To disable the cancellation
984 * this thread must use _aio_cancel_off() to set "work_cancel_flg=0".
985 * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ):
986 * same procedure as in a)
989 * This thread uses sigsetjmp() to define the position in the code, where
990 * it wish to continue working in the case that a SIGAIOCANCEL signal
992 * Normally this thread should get the cancellation signal during the
993 * kernel phase (reading or writing). In that case the signal handler
994 * aiosigcancelhndlr() is activated using the worker thread context,
995 * which again will use the siglongjmp() function to break the standard
996 * code flow and jump to the "sigsetjmp" position, provided that
997 * "work_cancel_flg" is set to "1".
998 * Because the "work_cancel_flg" is only manipulated by this worker
999 * thread and it can only run on one CPU at a given time, it is not
1000 * necessary to protect that flag with the queue lock.
1001 * Returning from the kernel (read or write system call) we must
1002 * first disable the use of the SIGAIOCANCEL signal and accordingly
1003 * the use of the siglongjmp() function to prevent a possible deadlock:
1004 * - It can happens that this worker thread returns from the kernel and
1005 * blocks in "work_qlock1",
1006 * - then a second thread cancels the apparently "in progress" request
1007 * and sends the SIGAIOCANCEL signal to the worker thread,
1008 * - the worker thread gets assigned the "work_qlock1" and will returns
1010 * - the kernel detects the pending signal and activates the signal
1012 * - if the "work_cancel_flg" is still set then the signal handler
1013 * should use siglongjmp() to cancel the "in progress" request and
1014 * it would try to acquire the same work_qlock1 in _aio_req_get()
1015 * for a second time => deadlock.
1016 * To avoid that situation we disable the cancellation of the request
1017 * in progress BEFORE we try to acquire the work_qlock1.
1018 * In that case the signal handler will not call siglongjmp() and the
1019 * worker thread will continue running the standard code flow.
1020 * Then this thread must check the AIO_REQ_CANCELED flag to emulate
1021 * an eventually required siglongjmp() freeing the work_qlock1 and
1022 * avoiding a deadlock.
1025 _aio_do_request(void *arglist
)
1027 aio_worker_t
*aiowp
= (aio_worker_t
*)arglist
;
1028 ulwp_t
*self
= curthread
;
1029 struct aio_args
*arg
;
1030 aio_req_t
*reqp
; /* current AIO request */
1035 if (pthread_setspecific(_aio_key
, aiowp
) != 0)
1036 aio_panic("_aio_do_request, pthread_setspecific()");
1037 (void) pthread_sigmask(SIG_SETMASK
, &_worker_set
, NULL
);
1038 ASSERT(aiowp
->work_req
== NULL
);
1041 * We resume here when an operation is cancelled.
1042 * On first entry, aiowp->work_req == NULL, so all
1043 * we do is block SIGAIOCANCEL.
1045 (void) sigsetjmp(aiowp
->work_jmp_buf
, 0);
1046 ASSERT(self
->ul_sigdefer
== 0);
1048 sigoff(self
); /* block SIGAIOCANCEL */
1049 if (aiowp
->work_req
!= NULL
)
1050 _aio_finish_request(aiowp
, -1, ECANCELED
);
1054 * Put completed requests on aio_done_list. This has
1055 * to be done as part of the main loop to ensure that
1056 * we don't artificially starve any aiowait'ers.
1058 if (aiowp
->work_done1
)
1059 _aio_work_done(aiowp
);
1062 /* consume any deferred SIGAIOCANCEL signal here */
1066 while ((reqp
= _aio_req_get(aiowp
)) == NULL
) {
1067 if (_aio_idle(aiowp
) != 0)
1070 arg
= &reqp
->req_args
;
1071 ASSERT(reqp
->req_state
== AIO_REQ_INPROGRESS
||
1072 reqp
->req_state
== AIO_REQ_CANCELED
);
1075 switch (reqp
->req_op
) {
1078 sigon(self
); /* unblock SIGAIOCANCEL */
1079 retval
= pread(arg
->fd
, arg
->buf
,
1080 arg
->bufsz
, arg
->offset
);
1082 if (errno
== ESPIPE
) {
1083 retval
= read(arg
->fd
,
1084 arg
->buf
, arg
->bufsz
);
1091 sigoff(self
); /* block SIGAIOCANCEL */
1096 * The SUSv3 POSIX spec for aio_write() states:
1097 * If O_APPEND is set for the file descriptor,
1098 * write operations append to the file in the
1099 * same order as the calls were made.
1100 * but, somewhat inconsistently, it requires pwrite()
1101 * to ignore the O_APPEND setting. So we have to use
1102 * fcntl() to get the open modes and call write() for
1103 * the O_APPEND case.
1105 append
= (__fcntl(arg
->fd
, F_GETFL
) & O_APPEND
);
1106 sigon(self
); /* unblock SIGAIOCANCEL */
1108 write(arg
->fd
, arg
->buf
, arg
->bufsz
) :
1109 pwrite(arg
->fd
, arg
->buf
, arg
->bufsz
,
1112 if (errno
== ESPIPE
) {
1113 retval
= write(arg
->fd
,
1114 arg
->buf
, arg
->bufsz
);
1121 sigoff(self
); /* block SIGAIOCANCEL */
1125 sigon(self
); /* unblock SIGAIOCANCEL */
1126 retval
= pread64(arg
->fd
, arg
->buf
,
1127 arg
->bufsz
, arg
->offset
);
1129 if (errno
== ESPIPE
) {
1130 retval
= read(arg
->fd
,
1131 arg
->buf
, arg
->bufsz
);
1138 sigoff(self
); /* block SIGAIOCANCEL */
1142 * The SUSv3 POSIX spec for aio_write() states:
1143 * If O_APPEND is set for the file descriptor,
1144 * write operations append to the file in the
1145 * same order as the calls were made.
1146 * but, somewhat inconsistently, it requires pwrite()
1147 * to ignore the O_APPEND setting. So we have to use
1148 * fcntl() to get the open modes and call write() for
1149 * the O_APPEND case.
1151 append
= (__fcntl(arg
->fd
, F_GETFL
) & O_APPEND
);
1152 sigon(self
); /* unblock SIGAIOCANCEL */
1154 write(arg
->fd
, arg
->buf
, arg
->bufsz
) :
1155 pwrite64(arg
->fd
, arg
->buf
, arg
->bufsz
,
1158 if (errno
== ESPIPE
) {
1159 retval
= write(arg
->fd
,
1160 arg
->buf
, arg
->bufsz
);
1167 sigoff(self
); /* block SIGAIOCANCEL */
1169 #endif /* !defined(_LP64) */
1171 if (_aio_fsync_del(aiowp
, reqp
))
1173 ASSERT(reqp
->req_head
== NULL
);
1175 * All writes for this fsync request are now
1176 * acknowledged. Now make these writes visible
1177 * and put the final request into the hash table.
1179 if (reqp
->req_state
== AIO_REQ_CANCELED
) {
1181 } else if (arg
->offset
== O_SYNC
) {
1182 if ((retval
= __fdsync(arg
->fd
, O_SYNC
)) == -1)
1185 if ((retval
= __fdsync(arg
->fd
, O_DSYNC
)) == -1)
1188 if (_aio_hash_insert(reqp
->req_resultp
, reqp
) != 0)
1189 aio_panic("_aio_do_request(): AIOFSYNC: "
1190 "request already in hash table");
1193 aio_panic("_aio_do_request, bad op");
1196 _aio_finish_request(aiowp
, retval
, error
);
1203 * Perform the tail processing for _aio_do_request().
1204 * The in-progress request may or may not have been cancelled.
1207 _aio_finish_request(aio_worker_t
*aiowp
, ssize_t retval
, int error
)
1211 sig_mutex_lock(&aiowp
->work_qlock1
);
1212 if ((reqp
= aiowp
->work_req
) == NULL
)
1213 sig_mutex_unlock(&aiowp
->work_qlock1
);
1215 aiowp
->work_req
= NULL
;
1216 if (reqp
->req_state
== AIO_REQ_CANCELED
) {
1220 if (!POSIX_AIO(reqp
)) {
1222 if (reqp
->req_state
== AIO_REQ_INPROGRESS
) {
1223 reqp
->req_state
= AIO_REQ_DONE
;
1224 _aio_set_result(reqp
, retval
, error
);
1226 sig_mutex_unlock(&aiowp
->work_qlock1
);
1227 sig_mutex_lock(&__aio_mutex
);
1229 * If it was canceled, this request will not be
1230 * added to done list. Just free it.
1232 if (error
== ECANCELED
) {
1233 _aio_outstand_cnt
--;
1234 _aio_req_free(reqp
);
1236 _aio_req_done_cnt
++;
1239 * Notify any thread that may have blocked
1240 * because it saw an outstanding request.
1243 if (_aio_outstand_cnt
== 0 && _aiowait_flag
) {
1246 sig_mutex_unlock(&__aio_mutex
);
1248 (void) _kaio(AIONOTIFY
);
1251 if (reqp
->req_state
== AIO_REQ_INPROGRESS
)
1252 reqp
->req_state
= AIO_REQ_DONE
;
1253 sig_mutex_unlock(&aiowp
->work_qlock1
);
1254 _aiodone(reqp
, retval
, error
);
1260 _aio_req_mark_done(aio_req_t
*reqp
)
1263 if (reqp
->req_largefile
)
1264 ((aiocb64_t
*)reqp
->req_aiocbp
)->aio_state
= USERAIO_DONE
;
1267 ((aiocb_t
*)reqp
->req_aiocbp
)->aio_state
= USERAIO_DONE
;
1271 * Sleep for 'ticks' clock ticks to give somebody else a chance to run,
1272 * hopefully to consume one of our queued signals.
1275 _aio_delay(int ticks
)
1277 (void) usleep(ticks
* (MICROSEC
/ hz
));
1281 * Actually send the notifications.
1282 * We could block indefinitely here if the application
1283 * is not listening for the signal or port notifications.
1286 send_notification(notif_param_t
*npp
)
1288 extern int __sigqueue(pid_t pid
, int signo
,
1289 /* const union sigval */ void *value
, int si_code
, int block
);
1292 (void) __sigqueue(__pid
, npp
->np_signo
, npp
->np_user
,
1294 else if (npp
->np_port
>= 0)
1295 (void) _port_dispatch(npp
->np_port
, 0, PORT_SOURCE_AIO
,
1296 npp
->np_event
, npp
->np_object
, npp
->np_user
);
1298 if (npp
->np_lio_signo
)
1299 (void) __sigqueue(__pid
, npp
->np_lio_signo
, npp
->np_lio_user
,
1301 else if (npp
->np_lio_port
>= 0)
1302 (void) _port_dispatch(npp
->np_lio_port
, 0, PORT_SOURCE_AIO
,
1303 npp
->np_lio_event
, npp
->np_lio_object
, npp
->np_lio_user
);
1307 * Asynchronous notification worker.
1310 _aio_do_notify(void *arg
)
1312 aio_worker_t
*aiowp
= (aio_worker_t
*)arg
;
1316 * This isn't really necessary. All signals are blocked.
1318 if (pthread_setspecific(_aio_key
, aiowp
) != 0)
1319 aio_panic("_aio_do_notify, pthread_setspecific()");
1322 * Notifications are never cancelled.
1323 * All signals remain blocked, forever.
1326 while ((reqp
= _aio_req_get(aiowp
)) == NULL
) {
1327 if (_aio_idle(aiowp
) != 0)
1328 aio_panic("_aio_do_notify: _aio_idle() failed");
1330 send_notification(&reqp
->req_notify
);
1331 _aio_req_free(reqp
);
1339 * Do the completion semantics for a request that was either canceled
1340 * by _aio_cancel_req() or was completed by _aio_do_request().
1343 _aiodone(aio_req_t
*reqp
, ssize_t retval
, int error
)
1345 aio_result_t
*resultp
= reqp
->req_resultp
;
1355 * We call _aiodone() only for Posix I/O.
1357 ASSERT(POSIX_AIO(reqp
));
1365 np
.np_lio_signo
= 0;
1366 np
.np_lio_port
= -1;
1368 switch (reqp
->req_sigevent
.sigev_notify
) {
1382 aio_panic("_aiodone: improper sigev_notify");
1387 * Figure out the notification parameters while holding __aio_mutex.
1388 * Actually perform the notifications after dropping __aio_mutex.
1389 * This allows us to sleep for a long time (if the notifications
1390 * incur delays) without impeding other async I/O operations.
1393 sig_mutex_lock(&__aio_mutex
);
1396 if ((np
.np_signo
= reqp
->req_sigevent
.sigev_signo
) != 0)
1398 np
.np_user
= reqp
->req_sigevent
.sigev_value
.sival_ptr
;
1399 } else if (sigev_thread
| sigev_port
) {
1400 if ((np
.np_port
= reqp
->req_sigevent
.sigev_signo
) >= 0)
1402 np
.np_event
= reqp
->req_op
;
1403 if (np
.np_event
== AIOFSYNC
&& reqp
->req_largefile
)
1404 np
.np_event
= AIOFSYNC64
;
1405 np
.np_object
= (uintptr_t)reqp
->req_aiocbp
;
1406 np
.np_user
= reqp
->req_sigevent
.sigev_value
.sival_ptr
;
1409 if (resultp
->aio_errno
== EINPROGRESS
)
1410 _aio_set_result(reqp
, retval
, error
);
1412 _aio_outstand_cnt
--;
1414 head
= reqp
->req_head
;
1415 reqp
->req_head
= NULL
;
1418 _aio_enq_doneq(reqp
);
1421 (void) _aio_hash_del(resultp
);
1422 _aio_req_mark_done(reqp
);
1425 _aio_waitn_wakeup();
1428 * __aio_waitn() sets AIO_WAIT_INPROGRESS and
1429 * __aio_suspend() increments "_aio_kernel_suspend"
1430 * when they are waiting in the kernel for completed I/Os.
1432 * _kaio(AIONOTIFY) awakes the corresponding function
1433 * in the kernel; then the corresponding __aio_waitn() or
1434 * __aio_suspend() function could reap the recently
1435 * completed I/Os (_aiodone()).
1437 if ((_aio_flags
& AIO_WAIT_INPROGRESS
) || _aio_kernel_suspend
> 0)
1438 (void) _kaio(AIONOTIFY
);
1440 sig_mutex_unlock(&__aio_mutex
);
1444 * If all the lio requests have completed,
1445 * prepare to notify the waiting thread.
1447 sig_mutex_lock(&head
->lio_mutex
);
1448 ASSERT(head
->lio_refcnt
== head
->lio_nent
);
1449 if (head
->lio_refcnt
== 1) {
1451 if (head
->lio_mode
== LIO_WAIT
) {
1452 if ((waiting
= head
->lio_waiting
) != 0)
1453 (void) cond_signal(&head
->lio_cond_cv
);
1454 } else if (head
->lio_port
< 0) { /* none or signal */
1455 if ((np
.np_lio_signo
= head
->lio_signo
) != 0)
1457 np
.np_lio_user
= head
->lio_sigval
.sival_ptr
;
1458 } else { /* thread or port */
1460 np
.np_lio_port
= head
->lio_port
;
1461 np
.np_lio_event
= head
->lio_event
;
1463 (uintptr_t)head
->lio_sigevent
;
1464 np
.np_lio_user
= head
->lio_sigval
.sival_ptr
;
1466 head
->lio_nent
= head
->lio_refcnt
= 0;
1467 sig_mutex_unlock(&head
->lio_mutex
);
1469 _aio_lio_free(head
);
1473 sig_mutex_unlock(&head
->lio_mutex
);
1478 * The request is completed; now perform the notifications.
1483 * We usually put the request on the notification
1484 * queue because we don't want to block and delay
1485 * other operations behind us in the work queue.
1486 * Also we must never block on a cancel notification
1487 * because we are being called from an application
1488 * thread in this case and that could lead to deadlock
1489 * if no other thread is receiving notificatins.
1491 reqp
->req_notify
= np
;
1492 reqp
->req_op
= AIONOTIFY
;
1493 _aio_req_add(reqp
, &__workers_no
, AIONOTIFY
);
1497 * We already put the request on the done queue,
1498 * so we can't queue it to the notification queue.
1499 * Just do the notification directly.
1501 send_notification(&np
);
1506 _aio_req_free(reqp
);
1510 * Delete fsync requests from list head until there is
1511 * only one left. Return 0 when there is only one,
1512 * otherwise return a non-zero value.
1515 _aio_fsync_del(aio_worker_t
*aiowp
, aio_req_t
*reqp
)
1517 aio_lio_t
*head
= reqp
->req_head
;
1520 ASSERT(reqp
== aiowp
->work_req
);
1521 sig_mutex_lock(&aiowp
->work_qlock1
);
1522 sig_mutex_lock(&head
->lio_mutex
);
1523 if (head
->lio_refcnt
> 1) {
1526 aiowp
->work_req
= NULL
;
1527 sig_mutex_unlock(&head
->lio_mutex
);
1528 sig_mutex_unlock(&aiowp
->work_qlock1
);
1529 sig_mutex_lock(&__aio_mutex
);
1530 _aio_outstand_cnt
--;
1531 _aio_waitn_wakeup();
1532 sig_mutex_unlock(&__aio_mutex
);
1533 _aio_req_free(reqp
);
1536 ASSERT(head
->lio_nent
== 1 && head
->lio_refcnt
== 1);
1537 reqp
->req_head
= NULL
;
1538 if (head
->lio_canned
)
1539 reqp
->req_state
= AIO_REQ_CANCELED
;
1540 if (head
->lio_mode
== LIO_DESTROY
) {
1541 aiowp
->work_req
= NULL
;
1544 sig_mutex_unlock(&head
->lio_mutex
);
1545 sig_mutex_unlock(&aiowp
->work_qlock1
);
1548 _aio_lio_free(head
);
1550 _aio_req_free(reqp
);
1555 * A worker is set idle when its work queue is empty.
1556 * The worker checks again that it has no more work
1557 * and then goes to sleep waiting for more work.
1560 _aio_idle(aio_worker_t
*aiowp
)
1564 sig_mutex_lock(&aiowp
->work_qlock1
);
1565 if (aiowp
->work_count1
== 0) {
1566 ASSERT(aiowp
->work_minload1
== 0);
1567 aiowp
->work_idleflg
= 1;
1569 * A cancellation handler is not needed here.
1570 * aio worker threads are never cancelled via pthread_cancel().
1572 error
= sig_cond_wait(&aiowp
->work_idle_cv
,
1573 &aiowp
->work_qlock1
);
1575 * The idle flag is normally cleared before worker is awakened
1576 * by aio_req_add(). On error (EINTR), we clear it ourself.
1579 aiowp
->work_idleflg
= 0;
1581 sig_mutex_unlock(&aiowp
->work_qlock1
);
1586 * A worker's completed AIO requests are placed onto a global
1587 * done queue. The application is only sent a SIGIO signal if
1588 * the process has a handler enabled and it is not waiting via
1592 _aio_work_done(aio_worker_t
*aiowp
)
1596 sig_mutex_lock(&__aio_mutex
);
1597 sig_mutex_lock(&aiowp
->work_qlock1
);
1598 reqp
= aiowp
->work_prev1
;
1599 reqp
->req_next
= NULL
;
1600 aiowp
->work_done1
= 0;
1601 aiowp
->work_tail1
= aiowp
->work_next1
;
1602 if (aiowp
->work_tail1
== NULL
)
1603 aiowp
->work_head1
= NULL
;
1604 aiowp
->work_prev1
= NULL
;
1605 _aio_outstand_cnt
--;
1606 _aio_req_done_cnt
--;
1607 if (reqp
->req_state
== AIO_REQ_CANCELED
) {
1609 * Request got cancelled after it was marked done. This can
1610 * happen because _aio_finish_request() marks it AIO_REQ_DONE
1611 * and drops all locks. Don't add the request to the done
1612 * queue and just discard it.
1614 sig_mutex_unlock(&aiowp
->work_qlock1
);
1615 _aio_req_free(reqp
);
1616 if (_aio_outstand_cnt
== 0 && _aiowait_flag
) {
1617 sig_mutex_unlock(&__aio_mutex
);
1618 (void) _kaio(AIONOTIFY
);
1620 sig_mutex_unlock(&__aio_mutex
);
1624 sig_mutex_unlock(&aiowp
->work_qlock1
);
1626 ASSERT(_aio_donecnt
> 0 &&
1627 _aio_outstand_cnt
>= 0 &&
1628 _aio_req_done_cnt
>= 0);
1629 ASSERT(reqp
!= NULL
);
1631 if (_aio_done_tail
== NULL
) {
1632 _aio_done_head
= _aio_done_tail
= reqp
;
1634 _aio_done_head
->req_next
= reqp
;
1635 _aio_done_head
= reqp
;
1638 if (_aiowait_flag
) {
1639 sig_mutex_unlock(&__aio_mutex
);
1640 (void) _kaio(AIONOTIFY
);
1642 sig_mutex_unlock(&__aio_mutex
);
1644 (void) kill(__pid
, SIGIO
);
1649 * The done queue consists of AIO requests that are in either the
1650 * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled
1651 * are discarded. If the done queue is empty then NULL is returned.
1652 * Otherwise the address of a done aio_result_t is returned.
1658 aio_result_t
*resultp
;
1660 ASSERT(MUTEX_HELD(&__aio_mutex
));
1662 if ((reqp
= _aio_done_tail
) != NULL
) {
1663 if ((_aio_done_tail
= reqp
->req_next
) == NULL
)
1664 _aio_done_head
= NULL
;
1665 ASSERT(_aio_donecnt
> 0);
1667 (void) _aio_hash_del(reqp
->req_resultp
);
1668 resultp
= reqp
->req_resultp
;
1669 ASSERT(reqp
->req_state
== AIO_REQ_DONE
);
1670 _aio_req_free(reqp
);
1673 /* is queue empty? */
1674 if (reqp
== NULL
&& _aio_outstand_cnt
== 0) {
1675 return ((aio_result_t
*)-1);
1681 * Set the return and errno values for the application's use.
1683 * For the Posix interfaces, we must set the return value first followed
1684 * by the errno value because the Posix interfaces allow for a change
1685 * in the errno value from EINPROGRESS to something else to signal
1686 * the completion of the asynchronous request.
1688 * The opposite is true for the Solaris interfaces. These allow for
1689 * a change in the return value from AIO_INPROGRESS to something else
1690 * to signal the completion of the asynchronous request.
1693 _aio_set_result(aio_req_t
*reqp
, ssize_t retval
, int error
)
1695 aio_result_t
*resultp
= reqp
->req_resultp
;
1697 if (POSIX_AIO(reqp
)) {
1698 resultp
->aio_return
= retval
;
1700 resultp
->aio_errno
= error
;
1702 resultp
->aio_errno
= error
;
1704 resultp
->aio_return
= retval
;
1709 * Add an AIO request onto the next work queue.
1710 * A circular list of workers is used to choose the next worker.
1713 _aio_req_add(aio_req_t
*reqp
, aio_worker_t
**nextworker
, int mode
)
1715 ulwp_t
*self
= curthread
;
1716 aio_worker_t
*aiowp
;
1717 aio_worker_t
*first
;
1718 int load_bal_flg
= 1;
1721 ASSERT(reqp
->req_state
!= AIO_REQ_DONEQ
);
1722 reqp
->req_next
= NULL
;
1724 * Try to acquire the next worker's work queue. If it is locked,
1725 * then search the list of workers until a queue is found unlocked,
1726 * or until the list is completely traversed at which point another
1727 * worker will be created.
1729 sigoff(self
); /* defer SIGIO */
1730 sig_mutex_lock(&__aio_mutex
);
1731 first
= aiowp
= *nextworker
;
1732 if (mode
!= AIONOTIFY
)
1733 _aio_outstand_cnt
++;
1734 sig_mutex_unlock(&__aio_mutex
);
1745 /* try to find an idle worker */
1748 if (sig_mutex_trylock(&aiowp
->work_qlock1
) == 0) {
1749 if (aiowp
->work_idleflg
) {
1753 sig_mutex_unlock(&aiowp
->work_qlock1
);
1755 } while ((aiowp
= aiowp
->work_forw
) != first
);
1758 aiowp
->work_minload1
++;
1762 /* try to acquire some worker's queue lock */
1764 if (sig_mutex_trylock(&aiowp
->work_qlock1
) == 0) {
1768 } while ((aiowp
= aiowp
->work_forw
) != first
);
1771 * Create more workers when the workers appear overloaded.
1772 * Either all the workers are busy draining their queues
1773 * or no worker's queue lock could be acquired.
1776 if (_aio_worker_cnt
< _max_workers
) {
1777 if (_aio_create_worker(reqp
, mode
))
1778 aio_panic("_aio_req_add: add worker");
1779 sigon(self
); /* reenable SIGIO */
1784 * No worker available and we have created
1785 * _max_workers, keep going through the
1786 * list slowly until we get a lock
1788 while (sig_mutex_trylock(&aiowp
->work_qlock1
) != 0) {
1790 * give someone else a chance
1793 aiowp
= aiowp
->work_forw
;
1797 ASSERT(MUTEX_HELD(&aiowp
->work_qlock1
));
1798 if (_aio_worker_cnt
< _max_workers
&&
1799 aiowp
->work_minload1
>= _minworkload
) {
1800 sig_mutex_unlock(&aiowp
->work_qlock1
);
1801 sig_mutex_lock(&__aio_mutex
);
1802 *nextworker
= aiowp
->work_forw
;
1803 sig_mutex_unlock(&__aio_mutex
);
1804 if (_aio_create_worker(reqp
, mode
))
1805 aio_panic("aio_req_add: add worker");
1806 sigon(self
); /* reenable SIGIO */
1809 aiowp
->work_minload1
++;
1814 sig_mutex_lock(&aiowp
->work_qlock1
);
1817 aio_panic("_aio_req_add: invalid mode");
1821 * Put request onto worker's work queue.
1823 if (aiowp
->work_tail1
== NULL
) {
1824 ASSERT(aiowp
->work_count1
== 0);
1825 aiowp
->work_tail1
= reqp
;
1826 aiowp
->work_next1
= reqp
;
1828 aiowp
->work_head1
->req_next
= reqp
;
1829 if (aiowp
->work_next1
== NULL
)
1830 aiowp
->work_next1
= reqp
;
1832 reqp
->req_state
= AIO_REQ_QUEUED
;
1833 reqp
->req_worker
= aiowp
;
1834 aiowp
->work_head1
= reqp
;
1836 * Awaken worker if it is not currently active.
1838 if (aiowp
->work_count1
++ == 0 && aiowp
->work_idleflg
) {
1839 aiowp
->work_idleflg
= 0;
1840 (void) cond_signal(&aiowp
->work_idle_cv
);
1842 sig_mutex_unlock(&aiowp
->work_qlock1
);
1845 sig_mutex_lock(&__aio_mutex
);
1846 *nextworker
= aiowp
->work_forw
;
1847 sig_mutex_unlock(&__aio_mutex
);
1849 sigon(self
); /* reenable SIGIO */
1853 * Get an AIO request for a specified worker.
1854 * If the work queue is empty, return NULL.
1857 _aio_req_get(aio_worker_t
*aiowp
)
1861 sig_mutex_lock(&aiowp
->work_qlock1
);
1862 if ((reqp
= aiowp
->work_next1
) != NULL
) {
1864 * Remove a POSIX request from the queue; the
1865 * request queue is a singularly linked list
1866 * with a previous pointer. The request is
1867 * removed by updating the previous pointer.
1869 * Non-posix requests are left on the queue
1870 * to eventually be placed on the done queue.
1873 if (POSIX_AIO(reqp
)) {
1874 if (aiowp
->work_prev1
== NULL
) {
1875 aiowp
->work_tail1
= reqp
->req_next
;
1876 if (aiowp
->work_tail1
== NULL
)
1877 aiowp
->work_head1
= NULL
;
1879 aiowp
->work_prev1
->req_next
= reqp
->req_next
;
1880 if (aiowp
->work_head1
== reqp
)
1881 aiowp
->work_head1
= reqp
->req_next
;
1885 aiowp
->work_prev1
= reqp
;
1886 ASSERT(aiowp
->work_done1
>= 0);
1887 aiowp
->work_done1
++;
1889 ASSERT(reqp
!= reqp
->req_next
);
1890 aiowp
->work_next1
= reqp
->req_next
;
1891 ASSERT(aiowp
->work_count1
>= 1);
1892 aiowp
->work_count1
--;
1893 switch (reqp
->req_op
) {
1902 ASSERT(aiowp
->work_minload1
> 0);
1903 aiowp
->work_minload1
--;
1906 reqp
->req_state
= AIO_REQ_INPROGRESS
;
1908 aiowp
->work_req
= reqp
;
1909 ASSERT(reqp
!= NULL
|| aiowp
->work_count1
== 0);
1910 sig_mutex_unlock(&aiowp
->work_qlock1
);
1915 _aio_req_del(aio_worker_t
*aiowp
, aio_req_t
*reqp
, int ostate
)
1921 ASSERT(aiowp
!= NULL
);
1922 ASSERT(MUTEX_HELD(&aiowp
->work_qlock1
));
1923 if (POSIX_AIO(reqp
)) {
1924 if (ostate
!= AIO_REQ_QUEUED
)
1927 last
= &aiowp
->work_tail1
;
1928 lastrp
= aiowp
->work_tail1
;
1929 ASSERT(ostate
== AIO_REQ_QUEUED
|| ostate
== AIO_REQ_INPROGRESS
);
1930 while ((next
= *last
) != NULL
) {
1932 *last
= next
->req_next
;
1933 if (aiowp
->work_next1
== next
)
1934 aiowp
->work_next1
= next
->req_next
;
1937 * if this is the first request on the queue, move
1938 * the lastrp pointer forward.
1941 lastrp
= next
->req_next
;
1944 * if this request is pointed by work_head1, then
1945 * make work_head1 point to the last request that is
1946 * present on the queue.
1948 if (aiowp
->work_head1
== next
)
1949 aiowp
->work_head1
= lastrp
;
1952 * work_prev1 is used only in non posix case and it
1953 * points to the current AIO_REQ_INPROGRESS request.
1954 * If work_prev1 points to this request which is being
1955 * deleted, make work_prev1 NULL and set work_done1
1958 * A worker thread can be processing only one request
1961 if (aiowp
->work_prev1
== next
) {
1962 ASSERT(ostate
== AIO_REQ_INPROGRESS
&&
1963 !POSIX_AIO(reqp
) && aiowp
->work_done1
> 0);
1964 aiowp
->work_prev1
= NULL
;
1965 aiowp
->work_done1
--;
1968 if (ostate
== AIO_REQ_QUEUED
) {
1969 ASSERT(aiowp
->work_count1
>= 1);
1970 aiowp
->work_count1
--;
1971 ASSERT(aiowp
->work_minload1
>= 1);
1972 aiowp
->work_minload1
--;
1976 last
= &next
->req_next
;
1983 _aio_enq_doneq(aio_req_t
*reqp
)
1985 if (_aio_doneq
== NULL
) {
1987 reqp
->req_next
= reqp
->req_prev
= reqp
;
1989 reqp
->req_next
= _aio_doneq
;
1990 reqp
->req_prev
= _aio_doneq
->req_prev
;
1991 _aio_doneq
->req_prev
->req_next
= reqp
;
1992 _aio_doneq
->req_prev
= reqp
;
1994 reqp
->req_state
= AIO_REQ_DONEQ
;
1999 * caller owns the _aio_mutex
2002 _aio_req_remove(aio_req_t
*reqp
)
2004 if (reqp
&& reqp
->req_state
!= AIO_REQ_DONEQ
)
2008 /* request in done queue */
2009 if (_aio_doneq
== reqp
)
2010 _aio_doneq
= reqp
->req_next
;
2011 if (_aio_doneq
== reqp
) {
2012 /* only one request on queue */
2015 aio_req_t
*tmp
= reqp
->req_next
;
2016 reqp
->req_prev
->req_next
= tmp
;
2017 tmp
->req_prev
= reqp
->req_prev
;
2019 } else if ((reqp
= _aio_doneq
) != NULL
) {
2020 if (reqp
== reqp
->req_next
) {
2021 /* only one request on queue */
2024 reqp
->req_prev
->req_next
= _aio_doneq
= reqp
->req_next
;
2025 _aio_doneq
->req_prev
= reqp
->req_prev
;
2030 reqp
->req_next
= reqp
->req_prev
= reqp
;
2031 reqp
->req_state
= AIO_REQ_DONE
;
2037 * An AIO request is identified by an aio_result_t pointer. The library
2038 * maps this aio_result_t pointer to its internal representation using a
2039 * hash table. This function adds an aio_result_t pointer to the hash table.
2042 _aio_hash_insert(aio_result_t
*resultp
, aio_req_t
*reqp
)
2048 hashp
= _aio_hash
+ AIOHASH(resultp
);
2049 lmutex_lock(&hashp
->hash_lock
);
2050 prev
= &hashp
->hash_ptr
;
2051 while ((next
= *prev
) != NULL
) {
2052 if (resultp
== next
->req_resultp
) {
2053 lmutex_unlock(&hashp
->hash_lock
);
2056 prev
= &next
->req_link
;
2059 ASSERT(reqp
->req_link
== NULL
);
2060 lmutex_unlock(&hashp
->hash_lock
);
2065 * Remove an entry from the hash table.
2068 _aio_hash_del(aio_result_t
*resultp
)
2072 aio_req_t
*next
= NULL
;
2074 if (_aio_hash
!= NULL
) {
2075 hashp
= _aio_hash
+ AIOHASH(resultp
);
2076 lmutex_lock(&hashp
->hash_lock
);
2077 prev
= &hashp
->hash_ptr
;
2078 while ((next
= *prev
) != NULL
) {
2079 if (resultp
== next
->req_resultp
) {
2080 *prev
= next
->req_link
;
2081 next
->req_link
= NULL
;
2084 prev
= &next
->req_link
;
2086 lmutex_unlock(&hashp
->hash_lock
);
2092 * find an entry in the hash table
2095 _aio_hash_find(aio_result_t
*resultp
)
2099 aio_req_t
*next
= NULL
;
2101 if (_aio_hash
!= NULL
) {
2102 hashp
= _aio_hash
+ AIOHASH(resultp
);
2103 lmutex_lock(&hashp
->hash_lock
);
2104 prev
= &hashp
->hash_ptr
;
2105 while ((next
= *prev
) != NULL
) {
2106 if (resultp
== next
->req_resultp
)
2108 prev
= &next
->req_link
;
2110 lmutex_unlock(&hashp
->hash_lock
);
2116 * AIO interface for POSIX
2119 _aio_rw(aiocb_t
*aiocbp
, aio_lio_t
*lio_head
, aio_worker_t
**nextworker
,
2126 if (aiocbp
== NULL
) {
2131 /* initialize kaio */
2135 aiocbp
->aio_state
= NOCHECK
;
2138 * If we have been called because a list I/O
2139 * kaio() failed, we dont want to repeat the
2143 if (flg
& AIO_KAIO
) {
2145 * Try kernel aio first.
2146 * If errno is ENOTSUP/EBADFD,
2147 * fall back to the thread implementation.
2149 if (_kaio_ok
> 0 && KAIO_SUPPORTED(aiocbp
->aio_fildes
)) {
2150 aiocbp
->aio_resultp
.aio_errno
= EINPROGRESS
;
2151 aiocbp
->aio_state
= CHECK
;
2152 kerr
= (int)_kaio(mode
, aiocbp
);
2155 if (errno
!= ENOTSUP
&& errno
!= EBADFD
) {
2156 aiocbp
->aio_resultp
.aio_errno
= errno
;
2157 aiocbp
->aio_resultp
.aio_return
= -1;
2158 aiocbp
->aio_state
= NOCHECK
;
2161 if (errno
== EBADFD
)
2162 SET_KAIO_NOT_SUPPORTED(aiocbp
->aio_fildes
);
2166 aiocbp
->aio_resultp
.aio_errno
= EINPROGRESS
;
2167 aiocbp
->aio_state
= USERAIO
;
2169 if (!__uaio_ok
&& __uaio_init() == -1)
2172 if ((reqp
= _aio_req_alloc()) == NULL
) {
2178 * If an LIO request, add the list head to the aio request
2180 reqp
->req_head
= lio_head
;
2181 reqp
->req_type
= AIO_POSIX_REQ
;
2182 reqp
->req_op
= mode
;
2183 reqp
->req_largefile
= 0;
2185 if (aiocbp
->aio_sigevent
.sigev_notify
== SIGEV_NONE
) {
2186 reqp
->req_sigevent
.sigev_notify
= SIGEV_NONE
;
2187 } else if (aiocbp
->aio_sigevent
.sigev_notify
== SIGEV_SIGNAL
) {
2188 reqp
->req_sigevent
.sigev_notify
= SIGEV_SIGNAL
;
2189 reqp
->req_sigevent
.sigev_signo
=
2190 aiocbp
->aio_sigevent
.sigev_signo
;
2191 reqp
->req_sigevent
.sigev_value
.sival_ptr
=
2192 aiocbp
->aio_sigevent
.sigev_value
.sival_ptr
;
2193 } else if (aiocbp
->aio_sigevent
.sigev_notify
== SIGEV_PORT
) {
2194 port_notify_t
*pn
= aiocbp
->aio_sigevent
.sigev_value
.sival_ptr
;
2195 reqp
->req_sigevent
.sigev_notify
= SIGEV_PORT
;
2197 * Reuse the sigevent structure to contain the port number
2198 * and the user value. Same for SIGEV_THREAD, below.
2200 reqp
->req_sigevent
.sigev_signo
=
2202 reqp
->req_sigevent
.sigev_value
.sival_ptr
=
2204 } else if (aiocbp
->aio_sigevent
.sigev_notify
== SIGEV_THREAD
) {
2205 reqp
->req_sigevent
.sigev_notify
= SIGEV_THREAD
;
2207 * The sigevent structure contains the port number
2208 * and the user value. Same for SIGEV_PORT, above.
2210 reqp
->req_sigevent
.sigev_signo
=
2211 aiocbp
->aio_sigevent
.sigev_signo
;
2212 reqp
->req_sigevent
.sigev_value
.sival_ptr
=
2213 aiocbp
->aio_sigevent
.sigev_value
.sival_ptr
;
2216 reqp
->req_resultp
= &aiocbp
->aio_resultp
;
2217 reqp
->req_aiocbp
= aiocbp
;
2218 ap
= &reqp
->req_args
;
2219 ap
->fd
= aiocbp
->aio_fildes
;
2220 ap
->buf
= (caddr_t
)aiocbp
->aio_buf
;
2221 ap
->bufsz
= aiocbp
->aio_nbytes
;
2222 ap
->offset
= aiocbp
->aio_offset
;
2224 if ((flg
& AIO_NO_DUPS
) &&
2225 _aio_hash_insert(&aiocbp
->aio_resultp
, reqp
) != 0) {
2226 aio_panic("_aio_rw(): request already in hash table");
2227 _aio_req_free(reqp
);
2231 _aio_req_add(reqp
, nextworker
, mode
);
2237 * 64-bit AIO interface for POSIX
2240 _aio_rw64(aiocb64_t
*aiocbp
, aio_lio_t
*lio_head
, aio_worker_t
**nextworker
,
2247 if (aiocbp
== NULL
) {
2252 /* initialize kaio */
2256 aiocbp
->aio_state
= NOCHECK
;
2259 * If we have been called because a list I/O
2260 * kaio() failed, we dont want to repeat the
2264 if (flg
& AIO_KAIO
) {
2266 * Try kernel aio first.
2267 * If errno is ENOTSUP/EBADFD,
2268 * fall back to the thread implementation.
2270 if (_kaio_ok
> 0 && KAIO_SUPPORTED(aiocbp
->aio_fildes
)) {
2271 aiocbp
->aio_resultp
.aio_errno
= EINPROGRESS
;
2272 aiocbp
->aio_state
= CHECK
;
2273 kerr
= (int)_kaio(mode
, aiocbp
);
2276 if (errno
!= ENOTSUP
&& errno
!= EBADFD
) {
2277 aiocbp
->aio_resultp
.aio_errno
= errno
;
2278 aiocbp
->aio_resultp
.aio_return
= -1;
2279 aiocbp
->aio_state
= NOCHECK
;
2282 if (errno
== EBADFD
)
2283 SET_KAIO_NOT_SUPPORTED(aiocbp
->aio_fildes
);
2287 aiocbp
->aio_resultp
.aio_errno
= EINPROGRESS
;
2288 aiocbp
->aio_state
= USERAIO
;
2290 if (!__uaio_ok
&& __uaio_init() == -1)
2293 if ((reqp
= _aio_req_alloc()) == NULL
) {
2299 * If an LIO request, add the list head to the aio request
2301 reqp
->req_head
= lio_head
;
2302 reqp
->req_type
= AIO_POSIX_REQ
;
2303 reqp
->req_op
= mode
;
2304 reqp
->req_largefile
= 1;
2306 if (aiocbp
->aio_sigevent
.sigev_notify
== SIGEV_NONE
) {
2307 reqp
->req_sigevent
.sigev_notify
= SIGEV_NONE
;
2308 } else if (aiocbp
->aio_sigevent
.sigev_notify
== SIGEV_SIGNAL
) {
2309 reqp
->req_sigevent
.sigev_notify
= SIGEV_SIGNAL
;
2310 reqp
->req_sigevent
.sigev_signo
=
2311 aiocbp
->aio_sigevent
.sigev_signo
;
2312 reqp
->req_sigevent
.sigev_value
.sival_ptr
=
2313 aiocbp
->aio_sigevent
.sigev_value
.sival_ptr
;
2314 } else if (aiocbp
->aio_sigevent
.sigev_notify
== SIGEV_PORT
) {
2315 port_notify_t
*pn
= aiocbp
->aio_sigevent
.sigev_value
.sival_ptr
;
2316 reqp
->req_sigevent
.sigev_notify
= SIGEV_PORT
;
2317 reqp
->req_sigevent
.sigev_signo
=
2319 reqp
->req_sigevent
.sigev_value
.sival_ptr
=
2321 } else if (aiocbp
->aio_sigevent
.sigev_notify
== SIGEV_THREAD
) {
2322 reqp
->req_sigevent
.sigev_notify
= SIGEV_THREAD
;
2323 reqp
->req_sigevent
.sigev_signo
=
2324 aiocbp
->aio_sigevent
.sigev_signo
;
2325 reqp
->req_sigevent
.sigev_value
.sival_ptr
=
2326 aiocbp
->aio_sigevent
.sigev_value
.sival_ptr
;
2329 reqp
->req_resultp
= &aiocbp
->aio_resultp
;
2330 reqp
->req_aiocbp
= aiocbp
;
2331 ap
= &reqp
->req_args
;
2332 ap
->fd
= aiocbp
->aio_fildes
;
2333 ap
->buf
= (caddr_t
)aiocbp
->aio_buf
;
2334 ap
->bufsz
= aiocbp
->aio_nbytes
;
2335 ap
->offset
= aiocbp
->aio_offset
;
2337 if ((flg
& AIO_NO_DUPS
) &&
2338 _aio_hash_insert(&aiocbp
->aio_resultp
, reqp
) != 0) {
2339 aio_panic("_aio_rw64(): request already in hash table");
2340 _aio_req_free(reqp
);
2344 _aio_req_add(reqp
, nextworker
, mode
);
2347 #endif /* !defined(_LP64) */