2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
39 /* this is the header file for the implementation side of the thread_mpi
40 library. It contains the definitions for all the internal data structures
41 and the prototypes for all the internal functions that aren't static. */
48 #ifdef HAVE_SYS_TIME_H
59 #include "thread_mpi/atomic.h"
60 #include "thread_mpi/threads.h"
61 #include "thread_mpi/event.h"
62 #include "thread_mpi/tmpi.h"
63 #include "thread_mpi/collective.h"
64 #include "thread_mpi/barrier.h"
65 #include "thread_mpi/hwinfo.h"
75 /**************************************************************************
79 **************************************************************************/
100 #ifdef USE_COLLECTIVE_COPY_BUFFER
101 /**************************************************************************
103 PRE-ALLOCATED COMMUNICATION BUFFERS
105 **************************************************************************/
108 /* Buffer structure. Every thread structure has several of these ready to
109 be used when the data transmission is small enough for double copying to
110 occur (i.e. the size of the transmission is less than N*MAX_COPY_BUFFER_SIZE,
111 where N is the number of receiving threads).
113 These buffers come in two sizes: one pre-allocated to MAX_COPY_BUFFER_SIZE
114 (for point-to-point transmissions, and one pre-allocated to
115 Nthreads*MAX_COPY_BUFFE_SIZE). */
118 void *buf
; /* the actual buffer */
119 struct copy_buffer
*next
; /* pointer to next free buffer in buffer_list */
120 size_t size
; /* allocated size of buffer */
123 /* a list of copy_buffers of a specific size. */
124 struct copy_buffer_list
126 struct copy_buffer
*cb
; /* pointer to the first copy_buffer */
127 size_t size
; /* allocated size of buffers in this list */
128 struct copy_buffer
*cb_alloc
; /* list as allocated */
129 int Nbufs
; /* number of allocated buffers */
145 /**************************************************************************
147 POINT-TO-POINT COMMUNICATION DATA STRUCTURES
149 **************************************************************************/
151 /* the message envelopes (as described in the MPI standard).
152 These fully describes the message, and make each message unique (enough).
154 Transmitting data works by having the sender put a pointer to an envelope
155 onto the receiver's new envelope list corresponding to the originating
157 The sender then waits until the receiver finishes the transmission, while
158 matching all incoming new envelopes against its own list of receive
161 The receiver either directly matches its receiving envelope against
162 all previously un-matched sending envelopes, or, if no suitable envelope
163 is found, it puts the receive envelope on a receive list.
164 Once waiting for completion, the receiver matches against all incoming
167 /* the state of an individual point-to-point transmission */
170 env_unmatched
= 0, /* the envelope has not had a match yet */
171 env_copying
= 1, /* busy copying (only used for send envelope
172 by receiver if using_cpbuf is true,
173 but cb was still NULL). */
174 env_cb_available
= 2, /* the copy buffer is available. Set by
175 the sender on a send_buffer. */
176 env_finished
= 3 /* the transmission has finished */
180 /* the envelope. Held in tmpi_thread->evs[src_thread] for send envelopes,
181 or in tmpi_thread->evl for receive envelopes */
184 int tag
; /* the tag */
185 tMPI_Comm comm
; /* this is a structure shared across threads, so we
186 can test easily whether two threads are talking
187 about the same comm. */
189 struct tmpi_thread
*src
, *dest
; /* these are pretty obvious */
191 void *buf
; /* buffer to be sent */
192 size_t bufsize
; /* the size of the data to be transmitted */
193 tMPI_Datatype datatype
; /* the data type */
195 bool nonblock
; /* whether the receiver is non-blocking */
197 /* state, values from enum_envelope_state .
198 (there's a few busy-waits relying on this flag).
199 status=env_unmatched is the initial state.*/
202 /* the error condition */
205 /* the message status */
206 /*tMPI_Status *status;*/
208 /* prev and next envelopes in the send/recv_envelope_list linked list */
209 struct envelope
*prev
,*next
;
211 bool send
; /* whether this is a send envelope (if TRUE), or a receive
212 envelope (if FALSE) */
213 #ifdef USE_SEND_RECV_COPY_BUFFER
214 bool using_cb
; /* whether a copy buffer is (going to be) used */
215 void* cb
;/* the allocated copy buffer pointer */
217 /* the next and previous envelopes in the request list */
218 struct envelope
*prev_req
, *next_req
;
220 /* the list I'm in */
221 struct recv_envelope_list
*rlist
;
222 struct send_envelope_list
*slist
;
227 /* singly linked lists of free send & receive envelopes belonging to a
229 struct free_envelope_list
231 struct envelope
*head_recv
; /* the first element in the linked list */
232 struct envelope
*recv_alloc_head
; /* the allocated recv list */
235 /* collection of send envelopes to a specific thread */
236 struct send_envelope_list
238 struct envelope
*head_free
; /* singly linked list with free send
239 envelopes. A single-thread LIFO.*/
240 #ifdef TMPI_LOCK_FREE_LISTS
241 tMPI_Atomic_ptr_t head_new
; /* singly linked list with the new send
242 envelopes (i.e. those that are put there by
243 the sending thread, but not yet checked by
244 the receiving thread). This is a lock-free
245 shared detachable list.*/
246 tMPI_Atomic_ptr_t head_rts
; /* singly linked list with free send
247 envelopes returned by the other thread.
248 This is a lock-free shared LIFO.*/
250 struct envelope
*head_new
; /* singly linked list with the new send
251 envelopes (i.e. those that are put there by
252 the sending thread, but not yet checked by
253 the receiving thread). */
254 struct envelope
*head_rts
; /* singly linked list with free send envelopes */
255 tMPI_Spinlock_t lock_new
; /* this locks head_new */
256 tMPI_Spinlock_t lock_rts
; /* this locks head_rts */
258 struct envelope
*head_old
; /* the old send envelopes, in a circular doubly
259 linked list. These have been checked by the
260 receiving thread against the existing
261 recv_envelope_list. */
263 struct envelope
*alloc_head
; /* the allocated send list */
264 size_t Nalloc
; /* number of allocted sends */
267 struct recv_envelope_list
269 struct envelope
*head
; /* first envelope in this list */
270 struct envelope dummy
; /* the dummy element for the list */
274 /* the request object for asynchronious operations. */
277 bool finished
; /* whether it's finished */
278 struct envelope
*ev
; /* the envelope */
280 struct tmpi_thread
*source
; /* the message source (for receives) */
281 tMPI_Comm comm
; /* the comm */
282 int tag
; /* the tag */
283 int error
; /* error code */
284 size_t transferred
; /* the number of transferred bytes */
285 bool cancelled
; /* whether the transmission was canceled */
287 struct tmpi_req_
*next
,*prev
; /* next,prev request in linked list,
288 used in the req_list, but also in
292 /* pre-allocated request object list */
295 struct tmpi_req_
*head
; /* pre-allocated singly linked list of requests.
296 (i.e. reqs->prev is undefined). */
297 struct tmpi_req_
*alloc_head
; /* the allocated block */
315 /**************************************************************************
317 MULTICAST COMMUNICATION DATA STRUCTURES
319 **************************************************************************/
321 /* these are data structures meant for keeping track of multicast operations
322 (tMPI_Bcast, tMPI_Gather, etc.). Because these operations are all collective
323 across the comm, and are always blocking, the protocol can be much simpler
324 than that for point-to-point communication through tMPI_Send/Recv, etc. */
326 /* unique tags for multicast & collective operations */
327 #define TMPI_BCAST_TAG 1
328 #define TMPI_GATHER_TAG 2
329 #define TMPI_GATHERV_TAG 3
330 #define TMPI_SCATTER_TAG 4
331 #define TMPI_SCATTERV_TAG 5
332 #define TMPI_REDUCE_TAG 6
333 #define TMPI_ALLTOALL_TAG 7
334 #define TMPI_ALLTOALLV_TAG 8
337 /* thread-specific part of the coll_env */
338 struct coll_env_thread
340 tMPI_Atomic_t current_sync
; /* sync counter value for the current
342 tMPI_Atomic_t n_remaining
; /* remaining threads count for each thread */
344 int tag
; /* collective communication type */
345 tMPI_Datatype datatype
; /* datatype */
347 void **buf
; /* array of send/recv buffer values */
348 size_t *bufsize
; /* array of number of bytes to send/recv */
350 #ifdef USE_COLLECTIVE_COPY_BUFFER
351 bool using_cb
; /* whether a copy buffer is (going to be) used */
352 tMPI_Atomic_t buf_readcount
; /* Number of threads reading from buf
353 while using_cpbuf is true, but cpbuf
355 tMPI_Atomic_ptr_t
*cpbuf
; /* copy_buffer pointers. */
356 struct copy_buffer
*cb
; /* the copy buffer cpbuf points to */
359 tMPI_Event send_ev
; /* event associated with being the sending thread.
360 Triggered when last receiving thread is ready,
361 and the coll_env_thread is ready for re-use. */
362 tMPI_Event recv_ev
; /* event associated with being a receiving thread. */
364 bool *read_data
; /* whether we read data from a specific thread. */
367 /* Collective communications once sync. These run in parallel with
368 the collection of coll_env_threads*/
371 /* collective sync data */
372 tMPI_Atomic_t current_sync
; /* sync counter value for the current
374 tMPI_Atomic_t n_remaining
; /* remaining threads count */
376 void *res
; /* result data for once calls. */
379 /* the collective communication envelope. There's a few of these per
380 comm, and each one stands for one collective communication call. */
383 struct coll_env_thread
*met
; /* thread-specific collective envelope data.*/
385 struct coll_env_coll coll
;
389 /* multicast synchronization data structure. There's one of these for
390 each thread in each tMPI_Comm structure */
393 int synct
; /* sync counter for coll_env_thread. */
394 int syncs
; /* sync counter for coll_env_coll. */
396 tMPI_Event
*events
; /* One event for each other thread */
397 int N
; /* the number of threads */
410 /**************************************************************************
412 THREAD DATA STRUCTURES
414 **************************************************************************/
416 /* information about a running thread. This structure is put in a
417 globally available array; the envelope exchange, etc. are all done through
418 the elements of this array.*/
421 tMPI_Thread_t thread_id
; /* this thread's id */
423 /* p2p communication structures: */
425 /* the receive envelopes posted for other threads to check */
426 struct recv_envelope_list evr
;
427 /* the send envelopes posted by other threadas */
428 struct send_envelope_list
*evs
;
429 /* free send and receive envelopes */
430 struct free_envelope_list envelopes
;
431 /* number of finished send envelopes */
432 tMPI_Atomic_t ev_outgoing_received
;
433 /* the p2p communication events (incoming envelopes + finished send
434 envelopes generate events) */
435 tMPI_Event p2p_event
;
436 TMPI_YIELD_WAIT_DATA
/* data associated with waiting */
437 struct req_list rql
; /* list of pre-allocated requests */
439 /* collective communication structures: */
440 #ifdef USE_COLLECTIVE_COPY_BUFFER
441 /* copy buffer list for multicast communications */
442 struct copy_buffer_list cbl_multi
;
445 /* miscellaneous data: */
447 tMPI_Comm self_comm
; /* comms for MPI_COMM_SELF */
449 /* the per-thread profile structure that keeps call counts & wait times. */
450 struct tmpi_profile profile
;
452 /* The start function (or NULL, if main() is to be called) */
453 void (*start_fn
)(void*);
454 /* the argument to the start function, if it's not main()*/
457 /* we copy these for each thread (providing these to main() is not
458 required by the MPI standard, but it's convenient). Note that we copy,
459 because some programs (like Gromacs) like to manipulate these. */
469 /**************************************************************************
471 ERROR HANDLER DATA STRUCTURES
473 **************************************************************************/
476 /* the error handler */
477 struct tmpi_errhandler_
480 tMPI_Errhandler_fn fn
;
483 /* standard error handler functions */
484 void tmpi_errors_are_fatal_fn(tMPI_Comm
*comm
, int *err
);
485 void tmpi_errors_return_fn(tMPI_Comm
*comm
, int *err
);
491 /**************************************************************************
493 GLOBAL DATA STRUCTURE
495 **************************************************************************/
497 /* global MPI information */
500 /* list of pointers to all user-defined types */
501 struct tmpi_datatype_
**usertypes
;
503 int Nalloc_usertypes
;
505 /* spinlock/mutex for manipulating tmpi_user_types */
506 tMPI_Spinlock_t datatype_lock
;
508 /* barrier for tMPI_Finalize(), etc. */
509 tMPI_Thread_barrier_t barrier
;
511 /* the timer for tMPI_Wtime() */
512 tMPI_Thread_mutex_t timer_mutex
;
513 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
514 /* the time at initialization. */
515 struct timeval timer_init
;
517 /* the time at initialization. */
536 /**************************************************************************
538 COMMUNICATOR DATA STRUCTURES
540 **************************************************************************/
545 int N
; /* the number of threads */
546 struct tmpi_thread
**peers
; /* the list of peers to communicate with */
548 int Nrefs
; /* the number of references to this structure */
553 /* the communicator objects are globally shared. */
556 struct tmpi_group_ grp
; /* the communicator group */
558 /* the barrier for tMPI_Barrier() */
559 tMPI_Spinlock_barrier_t barrier
;
562 /* List of barriers for reduce operations.
563 reduce_barrier[0] contains a list of N/2 barriers for N threads
564 reduce_barrier[1] contains a list of N/4 barriers for N/2 threads
565 reduce_barrier[2] contains a list of N/8 barriers for N/4 threads
566 and so on. (until N/x reaches 1)
567 This is to facilitate tree-based algorithms for tMPI_Reduce, etc. */
568 tMPI_Spinlock_barrier_t
**reduce_barrier
;
569 int *N_reduce
; /* the number of barriers in each iteration */
570 int N_reduce_iter
; /* the number of iterations */
573 struct coll_env
*cev
; /* list of multicast envelope objecs */
574 struct coll_sync
*csync
; /* list of multicast sync objecs */
576 /* lists of globally shared send/receive buffers for tMPI_Reduce. */
577 tMPI_Atomic_ptr_t
*reduce_sendbuf
, *reduce_recvbuf
;
579 /* mutex for communication object creation. Traditional mutexes are
580 better here because communicator creation should not be done in
581 time-critical sections of code. */
582 tMPI_Thread_mutex_t comm_create_lock
;
583 tMPI_Thread_cond_t comm_create_prep
;
584 tMPI_Thread_cond_t comm_create_finish
;
586 tMPI_Comm
*new_comm
; /* newly created communicators */
588 /* the split structure is shared among the comm threads and is
589 allocated & deallocated during tMPI_Comm_split */
590 struct tmpi_split
*split
;
592 /* the topologies (only cartesian topology is currently implemented */
593 struct cart_topol
*cart
;
594 /*struct tmpi_graph_topol_ *graph;*/
598 /* links for a global circular list of all comms that starts at
599 TMPI_COMM_WORLD. Used to de-allocate the comm structures after
601 struct tmpi_comm_
*next
,*prev
;
605 /* specific for tMPI_Split: */
608 volatile int Ncol_init
;
609 volatile int Ncol_destroy
;
610 volatile bool can_finish
;
611 volatile int *colors
;
615 /* cartesian topology */
618 int ndims
; /* number of dimensions */
619 int *dims
; /* procs per coordinate */
620 int *periods
; /* whether the grid is periodic, per dimension */
625 struct tmpi_graph_topol_
634 /**************************************************************************
636 DATA TYPE DATA STRUCTURES
638 **************************************************************************/
640 /* tMPI_Reduce Op functions */
641 typedef void (*tMPI_Op_fn
)(void*, void*, void*, int);
644 struct tmpi_datatype_component
646 struct tmpi_datatype_
*type
;
650 /* we don't support datatypes with holes (yet) */
651 struct tmpi_datatype_
653 size_t size
; /* full extent of type. */
654 tMPI_Op_fn
*op_functions
; /* array of op functions for this datatype */
655 int N_comp
; /* number of components */
656 struct tmpi_datatype_component
*comps
; /* the components */
657 bool committed
; /* whether the data type is committed */
659 /* just as a shorthand: */
660 typedef struct tmpi_datatype_ tmpi_dt
;
669 /**************************************************************************
673 **************************************************************************/
676 /* the threads themselves (tmpi_comm only contains lists of pointers to this
678 extern struct tmpi_thread
*threads
;
682 extern tMPI_Thread_key_t id_key
; /* the key to get the thread id */
684 /* misc. global information about MPI */
685 extern struct tmpi_global
*tmpi_global
;
694 /**************************************************************************
696 FUNCTION PROTOTYPES & MACROS
698 **************************************************************************/
701 void tMPI_Trace_print(const char *fmt
, ...);
704 /* error-checking malloc/realloc: */
705 void *tMPI_Malloc(size_t size
);
706 void *tMPI_Realloc(void *p
, size_t size
);
709 /* get the current thread structure pointer */
710 #define tMPI_Get_current() ((struct tmpi_thread*) \
711 tMPI_Thread_getspecific(id_key))
713 /* get the number of this thread */
714 /*#define tMPI_This_threadnr() (tMPI_Get_current() - threads)*/
716 /* get the number of a specific thread. We convert to the resulting size_t to
717 int, which is unlikely to cause problems in the foreseeable future. */
718 #define tMPI_Threadnr(th) (int)(th - threads)
720 /* get thread associated with rank */
721 #define tMPI_Get_thread(comm, rank) (comm->grp.peers[rank])
725 /* get the current thread structure pointer */
726 struct tmpi_thread
*tMPI_Get_current(void);
727 /* get the thread belonging to comm with rank rank */
728 struct tmpi_thread
*tMPI_Get_thread(tMPI_Comm comm
, int rank
);
732 /* handle an error, returning the errorcode */
733 int tMPI_Error(tMPI_Comm comm
, int tmpi_errno
);
737 /* check whether we're the main thread */
738 bool tMPI_Is_master(void);
739 /* check whether the current process is in a group */
740 bool tMPI_In_group(tMPI_Group group
);
742 /* find the rank of a thread in a comm */
743 int tMPI_Comm_seek_rank(tMPI_Comm comm
, struct tmpi_thread
*th
);
744 /* find the size of a comm */
745 int tMPI_Comm_N(tMPI_Comm comm
);
747 /* allocate a comm object, making space for N threads */
748 tMPI_Comm
tMPI_Comm_alloc(tMPI_Comm parent
, int N
);
749 /* de-allocate a comm object */
750 void tMPI_Comm_destroy(tMPI_Comm comm
);
751 /* allocate a group object */
752 tMPI_Group
tMPI_Group_alloc(void);
754 /* topology functions */
755 /* de-allocate a cartesian topology structure. (it is allocated with
756 the internal function tMPI_Cart_init()) */
757 void tMPI_Cart_destroy(struct cart_topol
*top
);
760 #ifdef USE_COLLECTIVE_COPY_BUFFER
761 /* initialize a copy_buffer_list */
762 void tMPI_Copy_buffer_list_init(struct copy_buffer_list
*cbl
, int Nbufs
,
764 /* initialize a copy_buffer_list */
765 void tMPI_Copy_buffer_list_destroy(struct copy_buffer_list
*cbl
);
766 /* get a copy buffer from a list */
767 struct copy_buffer
*tMPI_Copy_buffer_list_get(struct copy_buffer_list
*cbl
);
768 /* return a copy buffer to a list */
769 void tMPI_Copy_buffer_list_return(struct copy_buffer_list
*cbl
,
770 struct copy_buffer
*cb
);
771 /* initialize a copy buffer */
772 void tMPI_Copy_buffer_init(struct copy_buffer
*cb
, size_t size
);
773 void tMPI_Copy_buffer_destroy(struct copy_buffer
*cb
);
780 /* initialize a free envelope list with N envelopes */
781 void tMPI_Free_env_list_init(struct free_envelope_list
*evl
, int N
);
782 /* destroy a free envelope list */
783 void tMPI_Free_env_list_destroy(struct free_envelope_list
*evl
);
786 /* initialize a send envelope list */
787 void tMPI_Send_env_list_init(struct send_envelope_list
*evl
, int N
);
788 /* destroy a send envelope list */
789 void tMPI_Send_env_list_destroy(struct send_envelope_list
*evl
);
796 /* initialize a recv envelope list */
797 void tMPI_Recv_env_list_init(struct recv_envelope_list
*evl
);
798 /* destroy a recv envelope list */
799 void tMPI_Recv_env_list_destroy(struct recv_envelope_list
*evl
);
804 /* initialize request list */
805 void tMPI_Req_list_init(struct req_list
*rl
, int N_reqs
);
806 /* destroy request list */
807 void tMPI_Req_list_destroy(struct req_list
*rl
);
812 /* multicast functions */
813 /* initialize a coll env structure */
814 void tMPI_Coll_env_init(struct coll_env
*mev
, int N
);
815 /* destroy a coll env structure */
816 void tMPI_Coll_env_destroy(struct coll_env
*mev
);
818 /* initialize a coll sync structure */
819 void tMPI_Coll_sync_init(struct coll_sync
*msc
, int N
);
820 /* destroy a coll sync structure */
821 void tMPI_Coll_sync_destroy(struct coll_sync
*msc
);
826 /* and we need this prototype */
827 int main(int argc
, char **argv
);