2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
39 #ifdef HAVE_TMPI_CONFIG_H
40 #include "tmpi_config.h"
55 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
72 /* there are a few global variables that maintain information about the
73 running threads. Some are defined by the MPI standard: */
74 tMPI_Comm TMPI_COMM_WORLD
=NULL
;
75 tMPI_Group tMPI_GROUP_EMPTY
=NULL
;
78 /* the threads themselves (tmpi_comm only contains lists of pointers to this
80 struct tmpi_thread
*threads
=NULL
;
84 tMPI_Thread_key_t id_key
; /* the key to get the thread id */
88 /* whether MPI has finalized (we need this to distinguish pre-inited from
89 post-finalized states */
90 static bool tmpi_finalized
=FALSE
;
92 /* misc. global information about MPI */
93 struct tmpi_global
*tmpi_global
=NULL
;
102 /* start N threads with argc, argv (used by tMPI_Init)*/
103 static void tMPI_Start_threads(int N
, int *argc
, char ***argv
,
104 void (*start_fn
)(void*), void *start_arg
);
105 /* starter function for threads; takes a void pointer to a
106 struct tmpi_starter_, which calls main() if tmpi_start_.fn == NULL */
107 static void* tMPI_Thread_starter(void *arg
);
109 /* allocate and initialize the data associated with a thread structure */
110 static void tMPI_Thread_init(struct tmpi_thread
*th
);
111 /* deallocate the data associated with a thread structure */
112 static void tMPI_Thread_destroy(struct tmpi_thread
*th
);
118 void tMPI_Trace_print(const char *fmt
, ...)
121 struct tmpi_thread
* th
=tMPI_Get_current();
122 static tMPI_Thread_mutex_t mtx
=TMPI_THREAD_MUTEX_INITIALIZER
;
124 tMPI_Thread_mutex_lock(&mtx
);
126 printf("THREAD %02d: ", (int)(th
-threads
));
128 printf("THREAD main: ");
134 tMPI_Thread_mutex_unlock(&mtx
);
139 void *tMPI_Malloc(size_t size
)
141 void *ret
=(void*)malloc(size
);
145 tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_MALLOC
);
150 void *tMPI_Realloc(void *p
, size_t size
)
152 void *ret
=(void*)realloc(p
, size
);
155 tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_MALLOC
);
162 struct tmpi_thread
*tMPI_Get_current(void)
167 return (struct tmpi_thread
*)tMPI_thread_getspecific(id_key
);
171 unsigned int tMPI_Threadnr(struct tmpi_thread
*thr
)
177 unsigned int tMPI_This_threadnr(void)
179 return tMPI_Get_current()-threads
;
182 struct tmpi_thread
*tMPI_Get_thread(tMPI_Comm comm
, int rank
)
184 /* check destination */
185 if ( (rank
< 0) || (rank
> comm
->grp
.N
) )
187 tMPI_Error(comm
, TMPI_ERR_GROUP_RANK
);
190 return comm
->grp
.peers
[rank
];
194 bool tMPI_Is_master(void)
196 /* if there are no other threads, we're the main thread */
197 if ( (!TMPI_COMM_WORLD
) || TMPI_COMM_WORLD
->grp
.N
==0)
200 /* otherwise we know this through thread specific data: */
201 /* whether the thread pointer points to the head of the threads array */
202 return (bool)(tMPI_Get_current() == threads
);
205 tMPI_Comm
tMPI_Get_comm_self(void)
207 struct tmpi_thread
* th
=tMPI_Get_current();
208 return th
->self_comm
;
212 int tMPI_Get_N(int *argc
, char ***argv
, const char *optname
, int *nthreads
)
215 int ret
=TMPI_SUCCESS
;
226 if (strcmp(optname
, (*argv
)[i
]) == 0)
234 /* the number of processes is an argument */
236 *nthreads
=strtol((*argv
)[i
+1], &end
, 10);
237 if ( !end
|| (*end
!= 0) )
245 *nthreads
=tMPI_Get_recommended_nthreads();
251 static void tMPI_Thread_init(struct tmpi_thread
*th
)
253 int N_envelopes
=(Nthreads
+1)*N_EV_ALLOC
;
254 int N_send_envelopes
=N_EV_ALLOC
;
255 int N_reqs
=(Nthreads
+1)*N_EV_ALLOC
;
258 /* allocate comm.self */
259 th
->self_comm
=tMPI_Comm_alloc(TMPI_COMM_WORLD
, 1);
260 th
->self_comm
->grp
.peers
[0]=th
;
262 /* allocate envelopes */
263 tMPI_Free_env_list_init( &(th
->envelopes
), N_envelopes
);
265 tMPI_Recv_env_list_init( &(th
->evr
));
267 th
->evs
=(struct send_envelope_list
*)tMPI_Malloc(
268 sizeof(struct send_envelope_list
)*Nthreads
);
269 for(i
=0;i
<Nthreads
;i
++)
271 tMPI_Send_env_list_init( &(th
->evs
[i
]), N_send_envelopes
);
274 tMPI_Atomic_set( &(th
->ev_outgoing_received
), 0);
276 tMPI_Event_init( &(th
->p2p_event
) );
278 /* allocate requests */
279 tMPI_Req_list_init(&(th
->rql
), N_reqs
);
281 #ifdef USE_COLLECTIVE_COPY_BUFFER
282 /* allcate copy_buffer list */
283 tMPI_Copy_buffer_list_init(&(th
->cbl_multi
), (Nthreads
+1)*(N_COLL_ENV
+1),
284 Nthreads
*COPY_BUFFER_SIZE
);
288 tMPI_Profile_init(&(th
->profile
));
290 /* now wait for all other threads to come on line, before we
291 start the MPI program */
292 tMPI_Thread_barrier_wait( &(tmpi_global
->barrier
) );
296 static void tMPI_Thread_destroy(struct tmpi_thread
*th
)
300 tMPI_Recv_env_list_destroy( &(th
->evr
));
301 for(i
=0;i
<Nthreads
;i
++)
303 tMPI_Send_env_list_destroy( &(th
->evs
[i
]));
306 tMPI_Free_env_list_destroy( &(th
->envelopes
) );
307 tMPI_Event_destroy( &(th
->p2p_event
) );
308 tMPI_Req_list_destroy( &(th
->rql
) );
310 #ifdef USE_COLLECTIVE_COPY_BUFFER
311 tMPI_Copy_buffer_list_destroy(&(th
->cbl_multi
));
314 for(i
=0;i
<th
->argc
;i
++)
320 static void tMPI_Global_init(struct tmpi_global
*g
, int Nthreads
)
324 g
->Nalloc_usertypes
=0;
325 tMPI_Thread_mutex_init(&(g
->timer_mutex
));
326 tMPI_Spinlock_init(&(g
->datatype_lock
));
328 tMPI_Thread_barrier_init( &(g
->barrier
), Nthreads
);
330 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
331 /* the time at initialization. */
332 gettimeofday( &(g
->timer_init
), NULL
);
334 /* the time at initialization. */
335 g
->timer_init
=GetTickCount();
340 static void tMPI_Global_destroy(struct tmpi_global
*g
)
342 tMPI_Thread_mutex_destroy(&(g
->timer_mutex
));
348 static void* tMPI_Thread_starter(void *arg
)
350 struct tmpi_thread
*th
=(struct tmpi_thread
*)arg
;
351 /* we set our thread id, as a thread-specific piece of global data. */
352 tMPI_Thread_setspecific(id_key
, arg
);
355 tMPI_Trace_print("Created thread nr. %d", (int)(th
-threads
));
358 tMPI_Thread_init(th
);
360 /* start_fn, start_arg, argc and argv were set by the calling function */
363 main(th
->argc
, th
->argv
);
367 th
->start_fn(th
->start_arg
);
376 void tMPI_Start_threads(int N
, int *argc
, char ***argv
,
377 void (*start_fn
)(void*), void *start_arg
)
380 tMPI_Trace_print("tMPI_Start_threads(%d, %p, %p, %p, %p)", N
, argc
,
381 argv
, start_fn
, start_arg
);
387 tmpi_finalized
=FALSE
;
390 /* allocate global data */
391 tmpi_global
=(struct tmpi_global
*)
392 tMPI_Malloc(sizeof(struct tmpi_global
));
393 tMPI_Global_init(tmpi_global
, N
);
395 /* allocate world and thread data */
396 threads
=(struct tmpi_thread
*)tMPI_Malloc(sizeof(struct tmpi_thread
)*N
);
397 TMPI_COMM_WORLD
=tMPI_Comm_alloc(NULL
, N
);
398 tMPI_GROUP_EMPTY
=tMPI_Group_alloc();
400 if (tMPI_Thread_key_create(&id_key
, NULL
))
402 tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_INIT
);
404 /*printf("thread keys created\n"); fflush(NULL);*/
407 TMPI_COMM_WORLD
->grp
.peers
[i
]=&(threads
[i
]);
409 /* copy argc, argv */
413 threads
[i
].argc
=*argc
;
414 threads
[i
].argv
=(char**)tMPI_Malloc(threads
[i
].argc
*
416 for(j
=0;j
<threads
[i
].argc
;j
++)
418 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
419 threads
[i
].argv
[j
]=strdup( (*argv
)[j
] );
421 threads
[i
].argv
[j
]=_strdup( (*argv
)[j
] );
428 threads
[i
].argv
=NULL
;
430 threads
[i
].start_fn
=start_fn
;
431 threads
[i
].start_arg
=start_arg
;
433 for(i
=1;i
<N
;i
++) /* zero is the main thread */
435 if (tMPI_Thread_create(&(threads
[i
].thread_id
),
437 (void*)&(threads
[i
]) ) )
439 tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_INIT
);
442 /* the main thread now also runs start_fn */
443 /*threads[0].thread_id=NULL; we can't count on this being a pointer*/
444 tMPI_Thread_starter((void*)&(threads
[0]));
449 int tMPI_Init(int *argc
, char ***argv
)
452 tMPI_Trace_print("tMPI_Init(%p, %p)", argc
, argv
);
456 if (TMPI_COMM_WORLD
==0) /* we're the main process */
459 tMPI_Get_N(argc
, argv
, "-nt", &N
);
460 tMPI_Start_threads(N
, argc
, argv
, NULL
, NULL
);
464 /* if we're a sub-thread we need don't need to do anyhing, because
465 everything has already been set up by either the main thread,
466 or the thread runner function.*/
471 int tMPI_Init_fn(int N
, void (*start_function
)(void*), void *arg
)
474 tMPI_Trace_print("tMPI_Init_fn(%d, %p, %p)", N
, start_function
, arg
);
479 N
=tMPI_Get_recommended_nthreads();
482 if (TMPI_COMM_WORLD
==0 && N
>=1) /* we're the main process */
484 tMPI_Start_threads(N
, 0, 0, start_function
, arg
);
489 int tMPI_Initialized(int *flag
)
492 tMPI_Trace_print("tMPI_Initialized(%p)", flag
);
495 *flag
=(TMPI_COMM_WORLD
&& !tmpi_finalized
);
500 int tMPI_Finalize(void)
504 tMPI_Trace_print("tMPI_Finalize()");
507 printf("%5d: tMPI_Finalize called\n", tMPI_This_threadnr());
513 struct tmpi_thread
*cur
=tMPI_Get_current();
515 tMPI_Profile_stop( &(cur
->profile
) );
516 tMPI_Thread_barrier_wait( &(tmpi_global
->barrier
) );
518 if (tMPI_Is_master())
520 tMPI_Profiles_summarize(Nthreads
, threads
);
524 tMPI_Thread_barrier_wait( &(tmpi_global
->barrier
) );
526 if (tMPI_Is_master())
529 /* we just wait for all threads to finish; the order isn't very
530 relevant, as all threads should arrive at their endpoints soon. */
531 for(i
=1;i
<Nthreads
;i
++)
533 if (tMPI_Thread_join(threads
[i
].thread_id
, NULL
))
535 tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_FINALIZE
);
537 tMPI_Thread_destroy(&(threads
[i
]));
539 /* at this point, we are the only thread left, so we can
540 destroy the global structures with impunity. */
541 tMPI_Thread_destroy(&(threads
[0]));
544 tMPI_Thread_key_delete(id_key
);
545 /* de-allocate all the comm stuctures. */
547 tMPI_Comm cur
=TMPI_COMM_WORLD
->next
;
548 while(cur
&& (cur
!=TMPI_COMM_WORLD
) )
550 tMPI_Comm next
=cur
->next
;
551 tMPI_Comm_destroy(cur
);
554 tMPI_Comm_destroy(TMPI_COMM_WORLD
);
557 tMPI_Group_free(&tMPI_GROUP_EMPTY
);
559 TMPI_COMM_WORLD
=NULL
;
560 tMPI_GROUP_EMPTY
=NULL
;
563 /* deallocate the 'global' structure */
564 tMPI_Global_destroy(tmpi_global
);
577 int tMPI_Finalized(int *flag
)
580 tMPI_Trace_print("tMPI_Finalized(%p)", flag
);
582 *flag
=tmpi_finalized
;
589 int tMPI_Abort(tMPI_Comm comm
, int errorcode
)
592 tMPI_Trace_print("tMPI_Abort(%p, %d)", comm
, errorcode
);
595 /* we abort(). This way we can run a debugger on it */
596 fprintf(stderr
, "tMPI_Abort called with error code %d",errorcode
);
597 if (comm
==TMPI_COMM_WORLD
)
598 fprintf(stderr
, " on TMPI_COMM_WORLD");
599 fprintf(stderr
,"\n");
604 /* we just kill all threads, but not the main process */
606 if (tMPI_Is_master())
608 if (comm
==TMPI_COMM_WORLD
)
610 "tMPI_Abort called on TMPI_COMM_WORLD main with errorcode=%d\n",
613 fprintf(stderr
, "tMPI_Abort called on main thread with errorcode=%d\n",
622 fprintf(stderr
, "tMPI_Abort called with error code %d on thread %d\n",
623 errorcode
, tMPI_This_threadnr());
625 ret
=(int*)malloc(sizeof(int));
626 tMPI_Thread_exit(ret
);
633 int tMPI_Get_processor_name(char *name
, int *resultlen
)
635 int nr
=tMPI_Threadnr(tMPI_Get_current());
636 unsigned int digits
=0;
637 const unsigned int base
=10;
640 tMPI_Trace_print("tMPI_Get_processor_name(%p, %p)", name
, resultlen
);
642 /* we don't want to call sprintf here (it turns out to be not entirely
643 thread-safe on Mac OS X, for example), so we do it our own way: */
645 /* first determine number of digits */
656 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
657 strcpy(name
, "thread #");
659 strncpy_s(name
, TMPI_MAX_PROCESSOR_NAME
, "thread #", TMPI_MAX_PROCESSOR_NAME
);
661 /* now construct the number */
663 size_t len
=strlen(name
);
667 for(i
=0;i
<digits
;i
++)
669 size_t pos
=len
+ (digits
-i
-1);
670 if (pos
< (TMPI_MAX_PROCESSOR_NAME
-1) )
671 name
[ pos
]=(char)('0' + rest
%base
);
674 if ( (digits
+len
) < TMPI_MAX_PROCESSOR_NAME
)
675 name
[digits
+ len
]='\0';
677 name
[TMPI_MAX_PROCESSOR_NAME
]='\0';
681 *resultlen
=(int)strlen(name
); /* For some reason the MPI standard
682 uses ints instead of size_ts for
691 /* TODO: there must be better ways to do this */
692 double tMPI_Wtime(void)
697 tMPI_Trace_print("tMPI_Wtime()");
700 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
706 gettimeofday(&tv
, NULL
);
707 secdiff
= tv
.tv_sec
- tmpi_global
->timer_init
.tv_sec
;
708 usecdiff
= tv
.tv_usec
- tmpi_global
->timer_init
.tv_usec
;
710 ret
=(double)secdiff
+ 1e-6*usecdiff
;
714 DWORD tv
=GetTickCount();
716 /* the windows absolute time GetTickCount() wraps around in ~49 days,
717 so it's safer to always use differences, and assume that our
718 program doesn't run that long.. */
719 ret
=1e-3*((unsigned int)(tv
- tmpi_global
->timer_init
));
725 double tMPI_Wtick(void)
727 #if ! (defined( _WIN32 ) || defined( _WIN64 ) )
728 /* In Unix, we don't really know. Any modern OS should be at least
729 this precise, though */
732 /* According to the Windows documentation, this is about right: */
743 int tMPI_Get_count(tMPI_Status
*status
, tMPI_Datatype datatype
, int *count
)
746 tMPI_Trace_print("tMPI_Get_count(%p, %p, %p)", status
, datatype
, count
);
750 return tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_STATUS
);
752 *count
= (int)(status
->transferred
/datatype
->size
);