1 /* Copyright (C) 2018-2024 Free Software Foundation, Inc.
2 Contributed by Nicolas Koenig
4 This file is part of the GNU Fortran runtime library (libgfortran).
6 Libgfortran is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3, or (at your option)
11 Libgfortran is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 Under Section 7 of GPL version 3, you are granted additional
17 permissions described in the GCC Runtime Library Exception, version
18 3.1, as published by the Free Software Foundation.
20 You should have received a copy of the GNU General Public License and
21 a copy of the GCC Runtime Library Exception along with this program;
22 see the files COPYING3 and COPYING.RUNTIME respectively. If not, see
23 <http://www.gnu.org/licenses/>. */
25 #include "libgfortran.h"
27 #define _GTHREAD_USE_COND_INIT_FUNC
28 #include "../../libgcc/gthr.h"
36 #include <sys/types.h>
41 DEBUG_LINE (__thread
const char *aio_prefix
= MPREFIX
);
43 DEBUG_LINE (__gthread_mutex_t debug_queue_lock
= __GTHREAD_MUTEX_INIT
;)
44 DEBUG_LINE (aio_lock_debug
*aio_debug_head
= NULL
;)
45 #ifdef __GTHREAD_RWLOCK_INIT
46 DEBUG_LINE (aio_rwlock_debug
*aio_rwlock_debug_head
= NULL
;)
47 DEBUG_LINE (__gthread_rwlock_t debug_queue_rwlock
= __GTHREAD_RWLOCK_INIT
;)
50 /* Current unit for asynchronous I/O. Needed for error reporting. */
52 __thread gfc_unit
*thread_unit
= NULL
;
54 /* Queue entry for the asynchronous I/O entry. */
55 typedef struct transfer_queue
58 struct transfer_queue
*next
;
59 struct st_parameter_dt
*new_pdt
;
70 /* Helper function to exchange the old vs. a new PDT. */
73 update_pdt (st_parameter_dt
**old
, st_parameter_dt
*new) {
74 st_parameter_dt
*temp
;
75 NOTE ("Changing pdts, current_unit = %p", (void *) (new->u
.p
.current_unit
));
81 /* Destroy an adv_cond structure. */
84 destroy_adv_cond (struct adv_cond
*ac
)
86 T_ERROR (__gthread_cond_destroy
, &ac
->signal
);
89 /* Function invoked as start routine for a new asynchronous I/O unit.
90 Contains the main loop for accepting requests and handling them. */
95 DEBUG_LINE (aio_prefix
= TPREFIX
);
96 transfer_queue
*ctq
= NULL
, *prev
= NULL
;
97 gfc_unit
*u
= (gfc_unit
*) arg
;
98 async_unit
*au
= u
->au
;
101 au
->thread
= __gthread_self ();
104 /* Main loop. At this point, au->lock is always held. */
105 WAIT_SIGNAL_MUTEX (&au
->work
, au
->tail
!= NULL
, &au
->lock
);
109 /* Loop over the queue entries until they are finished. */
114 if (!au
->error
.has_error
)
121 NOTE ("Finalizing write");
122 st_write_done_worker (au
->pdt
, false);
123 UNLOCK (&au
->io_lock
);
127 NOTE ("Finalizing read");
128 st_read_done_worker (au
->pdt
, false);
129 UNLOCK (&au
->io_lock
);
132 case AIO_DATA_TRANSFER_INIT
:
133 NOTE ("Data transfer init");
135 update_pdt (&au
->pdt
, ctq
->new_pdt
);
136 data_transfer_init_worker (au
->pdt
, ctq
->read_flag
);
139 case AIO_TRANSFER_SCALAR
:
140 NOTE ("Starting scalar transfer");
141 ctq
->arg
.scalar
.transfer (au
->pdt
, ctq
->arg
.scalar
.arg_bt
,
142 ctq
->arg
.scalar
.data
,
148 case AIO_TRANSFER_ARRAY
:
149 NOTE ("Starting array transfer");
150 NOTE ("ctq->arg.array.desc = %p",
151 (void *) (ctq
->arg
.array
.desc
));
152 transfer_array_inner (au
->pdt
, ctq
->arg
.array
.desc
,
154 ctq
->arg
.array
.charlen
);
155 free (ctq
->arg
.array
.desc
);
159 NOTE ("Received AIO_CLOSE");
164 internal_error (NULL
, "Invalid queue type");
168 if (unlikely (au
->error
.has_error
))
169 au
->error
.last_good_id
= au
->id
.low
- 1;
173 if (ctq
->type
== AIO_WRITE_DONE
|| ctq
->type
== AIO_READ_DONE
)
175 UNLOCK (&au
->io_lock
);
177 else if (ctq
->type
== AIO_CLOSE
)
179 NOTE ("Received AIO_CLOSE during error condition");
184 NOTE ("Next ctq, current id: %d", au
->id
.low
);
185 if (ctq
->has_id
&& au
->id
.waiting
== au
->id
.low
++)
186 SIGNAL (&au
->id
.done
);
193 SIGNAL (&au
->emptysignal
);
199 SIGNAL (&au
->emptysignal
);
205 /* Free an asynchronous unit. */
208 free_async_unit (async_unit
*au
)
211 internal_error (NULL
, "Trying to free nonempty asynchronous unit");
213 destroy_adv_cond (&au
->work
);
214 destroy_adv_cond (&au
->emptysignal
);
215 destroy_adv_cond (&au
->id
.done
);
216 T_ERROR (__gthread_mutex_destroy
, &au
->lock
);
220 /* Initialize an adv_cond structure. */
223 init_adv_cond (struct adv_cond
*ac
)
226 __GTHREAD_COND_INIT_FUNCTION (&ac
->signal
);
229 /* Initialize an asyncronous unit, returning zero on success,
230 nonzero on failure. It also sets u->au. */
233 init_async_unit (gfc_unit
*u
)
236 if (!__gthread_active_p ())
242 au
= (async_unit
*) xmalloc (sizeof (async_unit
));
244 init_adv_cond (&au
->work
);
245 init_adv_cond (&au
->emptysignal
);
246 __GTHREAD_MUTEX_INIT_FUNCTION (&au
->lock
);
247 __GTHREAD_MUTEX_INIT_FUNCTION (&au
->io_lock
);
249 T_ERROR (__gthread_create
, &au
->thread
, &async_io
, (void *) u
);
257 au
->error
.fatal_error
= 0;
258 au
->error
.has_error
= 0;
259 au
->error
.last_good_id
= 0;
260 init_adv_cond (&au
->id
.done
);
264 /* Enqueue a transfer statement. */
267 enqueue_transfer (async_unit
*au
, transfer_args
*arg
, enum aio_do type
)
269 transfer_queue
*tq
= calloc (1, sizeof (transfer_queue
));
279 REVOKE_SIGNAL (&(au
->emptysignal
));
285 /* Enqueue an st_write_done or st_read_done which contains an ID. */
288 enqueue_done_id (async_unit
*au
, enum aio_do type
)
291 transfer_queue
*tq
= calloc (1, sizeof (transfer_queue
));
301 REVOKE_SIGNAL (&(au
->emptysignal
));
304 NOTE ("Enqueue id: %d", ret
);
310 /* Enqueue an st_write_done or st_read_done without an ID. */
313 enqueue_done (async_unit
*au
, enum aio_do type
)
315 transfer_queue
*tq
= calloc (1, sizeof (transfer_queue
));
324 REVOKE_SIGNAL (&(au
->emptysignal
));
330 /* Enqueue a CLOSE statement. */
333 enqueue_close (async_unit
*au
)
335 transfer_queue
*tq
= calloc (1, sizeof (transfer_queue
));
337 tq
->type
= AIO_CLOSE
;
344 REVOKE_SIGNAL (&(au
->emptysignal
));
350 /* The asynchronous unit keeps the currently active PDT around.
351 This function changes that to the current one. */
354 enqueue_data_transfer_init (async_unit
*au
, st_parameter_dt
*dt
, int read_flag
)
356 st_parameter_dt
*new = xmalloc (sizeof (st_parameter_dt
));
357 transfer_queue
*tq
= xmalloc (sizeof (transfer_queue
));
359 memcpy ((void *) new, (void *) dt
, sizeof (st_parameter_dt
));
361 NOTE ("dt->internal_unit_desc = %p", dt
->internal_unit_desc
);
362 NOTE ("common.flags & mask = %d", dt
->common
.flags
& IOPARM_LIBRETURN_MASK
);
364 tq
->type
= AIO_DATA_TRANSFER_INIT
;
365 tq
->read_flag
= read_flag
;
375 REVOKE_SIGNAL (&(au
->emptysignal
));
381 /* Collect the errors that may have happened asynchronously. Return true if
382 an error has been encountered. */
385 collect_async_errors (st_parameter_common
*cmp
, async_unit
*au
)
387 bool has_error
= au
->error
.has_error
;
391 if (generate_error_common (cmp
, au
->error
.family
, au
->error
.message
))
393 au
->error
.has_error
= 0;
394 au
->error
.cmp
= NULL
;
398 /* The program will exit later. */
399 au
->error
.fatal_error
= true;
405 /* Perform a wait operation on an asynchronous unit with an ID specified,
406 which means collecting the errors that may have happened asynchronously.
407 Return true if an error has been encountered. */
410 async_wait_id (st_parameter_common
*cmp
, async_unit
*au
, int i
)
420 if (au
->error
.has_error
)
422 if (i
<= au
->error
.last_good_id
)
425 return collect_async_errors (cmp
, au
);
431 generate_error_common (cmp
, LIBERROR_BAD_WAIT_ID
, NULL
);
436 NOTE ("Waiting for id %d", i
);
437 if (au
->id
.waiting
< i
)
439 SIGNAL (&(au
->work
));
440 WAIT_SIGNAL_MUTEX (&(au
->id
.done
),
441 (au
->id
.low
>= au
->id
.waiting
|| au
->empty
), &au
->lock
);
443 ret
= collect_async_errors (cmp
, au
);
448 /* Perform a wait operation an an asynchronous unit without an ID. */
451 async_wait (st_parameter_common
*cmp
, async_unit
*au
)
462 SIGNAL (&(au
->work
));
466 ret
= collect_async_errors (cmp
, au
);
471 WAIT_SIGNAL_MUTEX (&(au
->emptysignal
), (au
->empty
), &au
->lock
);
472 ret
= collect_async_errors (cmp
, au
);
476 /* Close an asynchronous unit. */
479 async_close (async_unit
*au
)
484 NOTE ("Closing async unit");
486 T_ERROR (__gthread_join
, au
->thread
, NULL
);
487 free_async_unit (au
);
492 /* Only set u->au to NULL so no async I/O will happen. */
495 init_async_unit (gfc_unit
*u
)
501 /* Do-nothing function, which will not be called. */
504 enqueue_transfer (async_unit
*au
, transfer_args
*arg
, enum aio_do type
)
509 /* Do-nothing function, which will not be called. */
512 enqueue_done_id (async_unit
*au
, enum aio_do type
)
517 /* Do-nothing function, which will not be called. */
520 enqueue_done (async_unit
*au
, enum aio_do type
)
525 /* Do-nothing function, which will not be called. */
528 enqueue_close (async_unit
*au
)
533 /* Do-nothing function, which will not be called. */
536 enqueue_data_transfer_init (async_unit
*au
, st_parameter_dt
*dt
, int read_flag
)
541 /* Do-nothing function, which will not be called. */
544 collect_async_errors (st_parameter_common
*cmp
, async_unit
*au
)
549 /* Do-nothing function, which will not be called. */
552 async_wait_id (st_parameter_common
*cmp
, async_unit
*au
, int i
)
557 /* Do-nothing function, which will not be called. */
560 async_wait (st_parameter_common
*cmp
, async_unit
*au
)
565 /* Do-nothing function, which will not be called. */
568 async_close (async_unit
*au
)