Merge branch 'master' of git://git.gromacs.org/gromacs
[gromacs/adressmacs.git] / src / gmxlib / thread_mpi / comm.c
blob99747beac91a127922cb8696e1b5fc06d37d93dd
1 /*
2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
6 All rights reserved.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
35 files.
38 #ifdef HAVE_TMPI_CONFIG_H
39 #include "tmpi_config.h"
40 #endif
42 #ifdef HAVE_CONFIG_H
43 #include "config.h"
44 #endif
46 #ifdef HAVE_UNISTD_H
47 #include <unistd.h>
48 #endif
50 #include <errno.h>
51 #include <stdlib.h>
52 #include <stdio.h>
53 #include <stdarg.h>
54 #include <string.h>
57 #include "impl.h"
59 /* helper function for tMPI_Comm_split. Splits N entities with color and key
60 out so that the output contains Ngroups groups each with elements
61 of the same color. The group array contains the entities in each group. */
62 static void tMPI_Split_colors(int N, const int *color, const int *key,
63 int *Ngroups, int *grp_N, int *grp_color,
64 int *group);
71 /* communicator query&manipulation functions */
72 int tMPI_Comm_N(tMPI_Comm comm)
74 #ifdef TMPI_TRACE
75 tMPI_Trace_print("tMPI_Comm_N(%p)", comm);
76 #endif
77 if (!comm)
78 return 0;
79 return comm->grp.N;
82 int tMPI_Comm_size(tMPI_Comm comm, int *size)
84 #ifdef TMPI_TRACE
85 tMPI_Trace_print("tMPI_Comm_size(%p, %p)", comm, size);
86 #endif
87 return tMPI_Group_size(&(comm->grp), size);
90 int tMPI_Comm_rank(tMPI_Comm comm, int *rank)
92 #ifdef TMPI_TRACE
93 tMPI_Trace_print("tMPI_Comm_rank(%p, %p)", comm, rank);
94 #endif
95 return tMPI_Group_rank(&(comm->grp), rank);
99 int tMPI_Comm_compare(tMPI_Comm comm1, tMPI_Comm comm2, int *result)
101 int i,j;
102 #ifdef TMPI_TRACE
103 tMPI_Trace_print("tMPI_Comm_compare(%p, %p, %p)", comm1, comm2, result);
104 #endif
105 if (comm1 == comm2)
107 *result=TMPI_IDENT;
108 return TMPI_SUCCESS;
111 if ( (!comm1) || (!comm2) )
113 *result=TMPI_UNEQUAL;
114 return TMPI_SUCCESS;
117 if (comm1->grp.N != comm2->grp.N)
119 *result=TMPI_UNEQUAL;
120 return TMPI_SUCCESS;
123 *result=TMPI_CONGRUENT;
124 /* we assume that there are two identical comm members within a comm */
125 for(i=0;i<comm1->grp.N;i++)
127 if (comm1->grp.peers[i] != comm2->grp.peers[i])
129 tmpi_bool found=FALSE;
131 *result=TMPI_SIMILAR;
132 for(j=0;j<comm2->grp.N;j++)
134 if (comm1->grp.peers[i] == comm2->grp.peers[j])
136 found=TRUE;
137 break;
140 if (!found)
142 *result=TMPI_UNEQUAL;
143 return TMPI_SUCCESS;
147 return TMPI_SUCCESS;
151 tMPI_Comm tMPI_Comm_alloc(tMPI_Comm parent, int N)
153 struct tmpi_comm_ *ret;
154 int i;
156 ret=(struct tmpi_comm_*)tMPI_Malloc(sizeof(struct tmpi_comm_));
157 ret->grp.peers=(struct tmpi_thread**)tMPI_Malloc(
158 sizeof(struct tmpi_thread*)*Nthreads);
159 ret->grp.N=N;
161 tMPI_Thread_mutex_init( &(ret->comm_create_lock) );
162 tMPI_Thread_cond_init( &(ret->comm_create_prep) );
163 tMPI_Thread_cond_init( &(ret->comm_create_finish) );
165 ret->split = NULL;
166 ret->new_comm = NULL;
167 /* we have no topology to start out with */
168 ret->cart=NULL;
169 /*ret->graph=NULL;*/
172 /* initialize the main barrier */
173 tMPI_Barrier_init(&(ret->barrier), N);
175 /* the reduce barriers */
177 /* First calculate the number of reduce barriers */
178 int Niter=0; /* the iteration number */
179 int Nred=N; /* the number of reduce barriers for this iteration */
180 while(Nred>1) {
181 /* Nred is now Nred/2 + a rest term because solitary
182 process at the end of the list must still be accounter for */
183 Nred = Nred/2 + Nred%2;
184 Niter+=1;
187 ret->N_reduce_iter=Niter;
188 /* allocate the list */
189 ret->reduce_barrier=(tMPI_Barrier_t**)
190 tMPI_Malloc(sizeof(tMPI_Barrier_t*)*(Niter+1));
191 ret->N_reduce=(int*)tMPI_Malloc(sizeof(int)*(Niter+1));
193 /* we re-set Nred to N */
194 Nred=N;
195 for(i=0;i<Niter;i++)
197 int j;
199 Nred = Nred/2 + Nred%2;
200 ret->N_reduce[i] = Nred;
201 /* allocate the sub-list */
202 ret->reduce_barrier[i]=(tMPI_Barrier_t*)
203 tMPI_Malloc(sizeof(tMPI_Barrier_t)*(Nred));
204 for(j=0;j<Nred;j++)
206 tMPI_Barrier_init(&(ret->reduce_barrier[i][j]),2);
211 /* the reduce buffers */
212 #if 0
213 ret->sendbuf=(volatile void**)tMPI_Malloc(sizeof(void*)*Nthreads);
214 ret->recvbuf=(volatile void**)tMPI_Malloc(sizeof(void*)*Nthreads);
215 #else
216 ret->reduce_sendbuf=(tMPI_Atomic_ptr_t*)
217 tMPI_Malloc(sizeof(tMPI_Atomic_ptr_t)*Nthreads);
218 ret->reduce_recvbuf=(tMPI_Atomic_ptr_t*)
219 tMPI_Malloc(sizeof(tMPI_Atomic_ptr_t)*Nthreads);
220 #endif
223 if (parent)
225 ret->erh=parent->erh;
227 else
229 ret->erh=TMPI_ERRORS_ARE_FATAL;
232 /* coll_env objects */
233 ret->cev=(struct coll_env*)tMPI_Malloc(sizeof(struct coll_env)*N_COLL_ENV);
234 for(i=0;i<N_COLL_ENV;i++)
235 tMPI_Coll_env_init( &(ret->cev[i]), N);
236 /* multi_sync objects */
237 ret->csync=(struct coll_sync*)tMPI_Malloc(sizeof(struct coll_sync)*N);
238 for(i=0;i<N;i++)
239 tMPI_Coll_sync_init( &(ret->csync[i]), N);
241 /* we insert ourselves in the circular list, after TMPI_COMM_WORLD */
242 if (TMPI_COMM_WORLD)
244 ret->next=TMPI_COMM_WORLD;
245 ret->prev=TMPI_COMM_WORLD->prev;
247 TMPI_COMM_WORLD->prev->next = ret;
248 TMPI_COMM_WORLD->prev = ret;
250 else
252 ret->prev=ret->next=ret;
255 return ret;
258 void tMPI_Comm_destroy(tMPI_Comm comm)
260 int i;
262 free(comm->grp.peers);
263 #if 0
264 free(comm->reduce_barrier);
265 free(comm->N_reduce_barrier);
266 #endif
267 for(i=0;i<comm->N_reduce_iter;i++)
268 free(comm->reduce_barrier[i]);
269 free(comm->reduce_barrier);
270 free(comm->N_reduce);
272 for(i=0;i<N_COLL_ENV;i++)
273 tMPI_Coll_env_destroy( &(comm->cev[i]) );
274 for(i=0;i<comm->grp.N;i++)
275 tMPI_Coll_sync_destroy( &(comm->csync[i]) );
276 free(comm->cev);
277 free(comm->csync);
279 tMPI_Thread_mutex_destroy( &(comm->comm_create_lock) );
280 tMPI_Thread_cond_destroy( &(comm->comm_create_prep) );
281 tMPI_Thread_cond_destroy( &(comm->comm_create_finish) );
283 free((void*)comm->reduce_sendbuf);
284 free((void*)comm->reduce_recvbuf);
286 if ( comm->cart )
288 tMPI_Cart_destroy( comm->cart );
289 free(comm->cart);
292 /* remove ourselves from the circular list */
293 if (comm->next)
294 comm->next->prev=comm->prev;
295 if (comm->prev)
296 comm->prev->next=comm->next;
298 free(comm);
301 int tMPI_Comm_free(tMPI_Comm *comm)
303 int myrank=tMPI_Comm_seek_rank(*comm, tMPI_Get_current());
304 #ifdef TMPI_TRACE
305 tMPI_Trace_print("tMPI_Comm_free(%p)", comm);
306 #endif
307 #ifndef TMPI_STRICT
308 if (! *comm)
309 return TMPI_SUCCESS;
311 if ((*comm)->grp.N > 1)
313 /* we remove ourselves from the comm. */
314 tMPI_Thread_mutex_lock(&((*comm)->comm_create_lock));
315 (*comm)->grp.peers[myrank] = (*comm)->grp.peers[(*comm)->grp.N-1];
316 (*comm)->grp.N--;
317 tMPI_Thread_mutex_unlock(&((*comm)->comm_create_lock));
319 else
321 /* we're the last one so we can safely destroy it */
322 tMPI_Comm_destroy(*comm);
324 #else
325 /* This is correct if programs actually treat Comm_free as a
326 collective call */
327 /* we need to barrier because the comm is a shared structure and
328 we have to be sure that nobody else is using it
329 (for example, to get its rank, like above) before destroying it*/
330 tMPI_Barrier(*comm);
331 /* this is a collective call on a shared data structure, so only
332 one process (rank[0] in this case) should do anything */
333 if (myrank==0)
335 tMPI_Comm_destroy(*comm);
337 #endif
338 return TMPI_SUCCESS;
341 int tMPI_Comm_dup(tMPI_Comm comm, tMPI_Comm *newcomm)
343 #ifdef TMPI_TRACE
344 tMPI_Trace_print("tMPI_Comm_dup(%p, %p)", comm, newcomm);
345 #endif
346 /* we just call Comm_split because it already contains all the
347 neccesary synchronization constructs. */
348 return tMPI_Comm_split(comm, 0, tMPI_Comm_seek_rank(comm,
349 tMPI_Get_current()), newcomm);
353 int tMPI_Comm_create(tMPI_Comm comm, tMPI_Group group, tMPI_Comm *newcomm)
355 int color=TMPI_UNDEFINED;
356 int key=tMPI_Comm_seek_rank(comm, tMPI_Get_current());
358 #ifdef TMPI_TRACE
359 tMPI_Trace_print("tMPI_Comm_create(%p, %p, %p)", comm, group, newcomm);
360 #endif
361 if (tMPI_In_group(group))
363 color=1;
365 /* the MPI specs specifically say that this is equivalent */
366 return tMPI_Comm_split(comm, color, key, newcomm);
369 static void tMPI_Split_colors(int N, const int *color, const int *key,
370 int *Ngroups, int *grp_N, int *grp_color,
371 int *group)
373 int i,j;
374 tmpi_bool found;
376 /* reset groups */
377 for(i=0;i<N;i++)
378 grp_N[i]=0;
379 for(i=0;i<N;i++)
381 if (color[i] != TMPI_UNDEFINED)
383 found=FALSE;
384 for(j=0;j<(*Ngroups);j++)
386 if (grp_color[j] == color[i])
388 /* we insert where we need to, by counting back */
389 int k=grp_N[j];
391 while (k>0 && ( key[group[N*j + k-1]]>key[i]) )
393 /* shift up */
394 group[N*j + k]=group[N*j + k-1];
395 k--;
397 group[N*j+k]=i;
398 grp_N[j]++;
399 found=TRUE;
402 if (!found)
404 /* not found. just add a new color */
405 grp_N[(*Ngroups)]=1;
406 grp_color[(*Ngroups)]=color[i];
407 group[N*(*Ngroups) + 0]=i;
408 (*Ngroups)++;
414 /* this is the main comm creation function. All other functions that create
415 comms use this*/
416 int tMPI_Comm_split(tMPI_Comm comm, int color, int key, tMPI_Comm *newcomm)
418 int i,j;
419 int N=tMPI_Comm_N(comm);
420 volatile tMPI_Comm *newcomm_list;
421 volatile int colors[MAX_PREALLOC_THREADS]; /* array with the colors
422 of each thread */
423 volatile int keys[MAX_PREALLOC_THREADS]; /* same for keys (only one of
424 the threads actually suplies
425 these arrays to the comm
426 structure) */
427 tmpi_bool i_am_first=FALSE;
428 int myrank=tMPI_Comm_seek_rank(comm, tMPI_Get_current());
429 struct tmpi_split *spl;
431 #ifdef TMPI_TRACE
432 tMPI_Trace_print("tMPI_Comm_split(%p, %d, %d, %p)", comm, color, key,
433 newcomm);
434 #endif
435 if (!comm)
437 *newcomm=NULL;
438 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
441 tMPI_Thread_mutex_lock(&(comm->comm_create_lock));
442 /* first get the colors */
443 if (!comm->new_comm)
445 /* i am apparently first */
446 comm->split=(struct tmpi_split*)tMPI_Malloc(sizeof(struct tmpi_split));
447 comm->new_comm=(tMPI_Comm*)tMPI_Malloc(N*sizeof(tMPI_Comm));
448 if (N<=MAX_PREALLOC_THREADS)
450 comm->split->colors=colors;
451 comm->split->keys=keys;
453 else
455 comm->split->colors=(int*)tMPI_Malloc(N*sizeof(int));
456 comm->split->keys=(int*)tMPI_Malloc(N*sizeof(int));
458 comm->split->Ncol_init=tMPI_Comm_N(comm);
459 comm->split->can_finish=FALSE;
460 i_am_first=TRUE;
461 /* the main communicator contains a list the size of grp.N */
463 newcomm_list=comm->new_comm; /* we copy it to the local stacks because
464 we can later erase comm->new_comm safely */
465 spl=comm->split; /* we do the same for spl */
466 spl->colors[myrank] = color;
467 spl->keys[myrank] = key;
468 spl->Ncol_init--;
470 if (spl->Ncol_init == 0)
471 tMPI_Thread_cond_signal(&(comm->comm_create_prep));
473 if (!i_am_first)
475 /* all other threads can just wait until the creator thread is
476 finished */
477 while(! spl->can_finish )
479 tMPI_Thread_cond_wait(&(comm->comm_create_finish) ,
480 &(comm->comm_create_lock) );
483 else
485 int Ncomms=0;
486 int comm_color_[MAX_PREALLOC_THREADS];
487 int comm_N_[MAX_PREALLOC_THREADS];
488 int *comm_color=comm_color_; /* there can't be more comms than N*/
489 int *comm_N=comm_N_; /* the number of procs in a group */
491 int *comm_groups; /* the groups */
492 tMPI_Comm *comms; /* the communicators */
494 /* wait for the colors to be done */
495 /*if (N>1)*/
496 while(spl->Ncol_init > 0)
498 tMPI_Thread_cond_wait(&(comm->comm_create_prep),
499 &(comm->comm_create_lock));
502 /* reset the state so that a new comm creating function can run */
503 spl->Ncol_destroy=N;
504 comm->new_comm=0;
505 comm->split=0;
507 comm_groups=(int*)tMPI_Malloc(N*N*sizeof(int));
508 if (N>MAX_PREALLOC_THREADS)
510 comm_color=(int*)tMPI_Malloc(N*sizeof(int));
511 comm_N=(int*)tMPI_Malloc(N*sizeof(int));
514 /* count colors, allocate and split up communicators */
515 tMPI_Split_colors(N, (int*)spl->colors,
516 (int*)spl->keys,
517 &Ncomms,
518 comm_N, comm_color, comm_groups);
521 /* allocate a bunch of communicators */
522 comms=(tMPI_Comm*)tMPI_Malloc(Ncomms*sizeof(tMPI_Comm));
523 for(i=0;i<Ncomms;i++)
524 comms[i]=tMPI_Comm_alloc(comm, comm_N[i]);
526 /* now distribute the comms */
527 for(i=0;i<Ncomms;i++)
529 comms[i]->grp.N=comm_N[i];
530 for(j=0;j<comm_N[i];j++)
531 comms[i]->grp.peers[j]=
532 comm->grp.peers[comm_groups[i*comm->grp.N + j]];
534 /* and put them into the newcomm_list */
535 for(i=0;i<N;i++)
537 newcomm_list[i]=TMPI_COMM_NULL;
538 for(j=0;j<Ncomms;j++)
540 if (spl->colors[i] == comm_color[j])
542 newcomm_list[i] = comms[j];
543 break;
548 #ifdef TMPI_DEBUG
549 /* output */
550 for(i=0;i<Ncomms;i++)
552 printf("Group %d (color %d) has %d members: ",
553 i, comm_color[i], comm_N[i]);
554 for(j=0;j<comm_N[i];j++)
555 printf(" %d ",comm_groups[comm->grp.N*i + j]);
557 printf(" rank: ");
558 for(j=0;j<comm_N[i];j++)
559 printf(" %d ",spl->keys[comm_groups[N*i + j]]);
560 printf(" color: ");
561 for(j=0;j<comm_N[i];j++)
562 printf(" %d ",spl->colors[comm_groups[N*i + j]]);
563 printf("\n");
565 #endif
566 if (N>MAX_PREALLOC_THREADS)
568 free((int*)spl->colors);
569 free((int*)spl->keys);
570 free(comm_color);
571 free(comm_N);
573 free(comm_groups);
574 free(comms);
575 spl->can_finish=TRUE;
577 /* tell the waiting threads that there's a comm ready */
578 tMPI_Thread_cond_broadcast(&(comm->comm_create_finish));
580 /* here the individual threads get their comm object */
581 *newcomm=newcomm_list[myrank];
583 /* free when we have assigned them all, so we can reuse the object*/
584 spl->Ncol_destroy--;
585 if (spl->Ncol_destroy==0)
587 free((void*)newcomm_list);
588 free(spl);
591 tMPI_Thread_mutex_unlock(&(comm->comm_create_lock));
593 return TMPI_SUCCESS;
596 int tMPI_Comm_seek_rank(tMPI_Comm comm, struct tmpi_thread *th)
598 int i;
599 if (!comm)
600 return -1;
602 for(i=0;i<comm->grp.N;i++)
604 if (comm->grp.peers[i] == th)
605 return i;
607 return -1;