Don't use POSIX fnmatch() for pattern matching.
[gromacs/qmmm-gamess-us.git] / src / gmxlib / thread_mpi / multicast.c
blob7549532215197f344a1c0d5bf6458e3964973747
1 /*
2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
6 All rights reserved.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
35 files.
37 To help us fund development, we humbly ask that you cite
38 any papers on the package - you can find them in the top README file.
42 #ifdef HAVE_CONFIG_H
43 #include "config.h"
44 #endif
46 #ifdef HAVE_UNISTD_H
47 #include <unistd.h>
48 #endif
50 #include <errno.h>
51 #include <stdlib.h>
52 #include <stdio.h>
53 #include <stdarg.h>
54 #include <string.h>
55 #if 0
56 #include <sys/time.h>
57 #endif
59 #include "thread_mpi/threads.h"
60 #include "thread_mpi/atomic.h"
61 #include "thread_mpi/tmpi.h"
62 #include "tmpi_impl.h"
65 /* do a mulitcast transfer, with checks */
66 static void tMPI_Mult_xfer(tMPI_Comm comm, int rank, struct multi_env *rmev,
67 void *recvbuf, size_t recvsize,
68 tMPI_Datatype recvtype, int expected_tag, int *ret);
72 /* run a single binary reduce operation on src_a and src_b, producing dest.
73 dest and src_a may be identical */
74 static int tMPI_Reduce_run_op(void *dest, void *src_a, void *src_b,
75 tMPI_Datatype datatype, int count, tMPI_Op op,
76 tMPI_Comm comm);
82 void tMPI_Multi_sync_init(struct multi_sync *msc, int N)
84 int i;
86 msc->counter=0;
87 for(i=0;i<N_MULTI_SYNC;i++)
89 tMPI_Atomic_set( &(msc->mev[i].current_counter), 0);
90 tMPI_Atomic_set( &(msc->mev[i].n_remaining), 0);
91 msc->mev[i].buf=(volatile void**)tMPI_Malloc(sizeof(void*)*N);
92 msc->mev[i].bufsize=(size_t*)tMPI_Malloc(sizeof(size_t)*N);
93 msc->mev[i].read_data=(bool*)tMPI_Malloc(sizeof(bool)*N);
97 void tMPI_Multi_sync_destroy(struct multi_sync *msc)
99 int i;
101 for(i=0;i<N_MULTI_SYNC;i++)
103 free((void*)msc->mev[i].buf);
104 free((void*)msc->mev[i].bufsize);
105 free((void*)msc->mev[i].read_data);
109 static void tMPI_Mult_xfer(tMPI_Comm comm, int rank, struct multi_env *rmev,
110 void *recvbuf, size_t recvsize,
111 tMPI_Datatype recvtype, int expected_tag, int *ret)
113 size_t sendsize=rmev->bufsize[rank];
114 /* check tags, types */
115 if ( (rmev->datatype != recvtype ) || (rmev->tag != expected_tag) )
117 *ret=tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
120 if (sendsize) /* we allow NULL ptrs if there's nothing to xmit */
122 if ( sendsize > recvsize )
124 *ret=tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
127 if ( rmev->buf[rank] == recvbuf )
129 *ret=tMPI_Error(TMPI_COMM_WORLD,TMPI_ERR_XFER_BUF_OVERLAP);
131 /* copy data */
132 memcpy((char*)recvbuf, (void*)(rmev->buf[rank]), sendsize);
134 /* signal one thread ready */
135 tMPI_Atomic_fetch_add( &(rmev->n_remaining), -1);
139 int tMPI_Barrier(tMPI_Comm comm)
141 if (!comm)
143 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
146 if (comm->grp.N>1)
148 tMPI_Spinlock_barrier_wait( &(comm->multicast_barrier[0]));
150 return TMPI_SUCCESS;
154 /* multi */
155 int tMPI_Bcast(void* buffer, int count, tMPI_Datatype datatype, int root,
156 tMPI_Comm comm)
158 struct multi_sync *msc;
159 int mevi;
160 int myrank;
161 int ret=TMPI_SUCCESS;
163 if (!comm)
165 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
167 myrank=tMPI_Comm_seek_rank(comm, tMPI_Get_current());
169 msc=&(comm->msc[myrank]);
170 /* we increase our counter. */
171 msc->counter++;
172 mevi=msc->counter % N_MULTI_SYNC;
173 if (myrank==root)
175 struct multi_env *mev=&(msc->mev[mevi]);
177 /* first set up the data */
178 mev->tag=TMPI_BCAST_TAG;
179 mev->datatype=datatype;
180 mev->buf[0]=buffer;
181 mev->bufsize[0]=count*datatype->size;
182 tMPI_Atomic_set(&(mev->n_remaining), comm->grp.N-1);
183 /* we now publish our counter */
184 tMPI_Atomic_set(&(mev->current_counter), msc->counter);
185 /* and wait until everybody is done copying */
186 while (tMPI_Atomic_get( &(mev->n_remaining) ) > 0)
190 else
192 /* get the root mev */
193 struct multi_env *rmev = &(comm->msc[root].mev[mevi]);
194 size_t bufsize=count*datatype->size;
195 /* wait until root becomes available */
196 while( tMPI_Atomic_get( &(rmev->current_counter)) != msc->counter)
200 tMPI_Mult_xfer(comm, 0, rmev, buffer, bufsize, datatype,
201 TMPI_BCAST_TAG, &ret);
203 return ret;
210 int tMPI_Gather(void* sendbuf, int sendcount, tMPI_Datatype sendtype,
211 void* recvbuf, int recvcount, tMPI_Datatype recvtype,
212 int root, tMPI_Comm comm)
214 struct multi_sync *msc;
215 int mevi;
216 int myrank;
217 int ret=TMPI_SUCCESS;
218 struct multi_env *mev;
219 if (!comm)
221 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
223 myrank=tMPI_Comm_seek_rank(comm, tMPI_Get_current());
226 msc=&(comm->msc[myrank]);
227 /* we increase our counter. */
228 msc->counter++;
229 mevi=msc->counter % N_MULTI_SYNC;
230 mev=&(msc->mev[mevi]);
231 if (myrank==root)
233 int i;
234 int n_remaining=comm->grp.N-1;
235 /* do root transfer */
237 size_t recvsize=recvtype->size*recvcount;
238 size_t sendsize=sendtype->size*sendcount;
239 if (recvsize < sendsize)
241 return tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
243 if (recvtype != sendtype)
245 return tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
247 if ( sendbuf == (char*)recvbuf+myrank*recvcount*recvtype->size )
249 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_XFER_BUF_OVERLAP);
252 memcpy((char*)recvbuf+myrank*recvcount*recvtype->size, sendbuf,
253 sendsize);
255 /* poll data availability */
256 while(n_remaining>0)
258 for(i=0;i<comm->grp.N;i++)
260 struct multi_env *rmev=&(comm->msc[i].mev[mevi]);
261 if ((i!=myrank) &&
262 (tMPI_Atomic_get(&(rmev->current_counter))==msc->counter)&&
263 (tMPI_Atomic_get(&(rmev->n_remaining)) > 0) )
265 tMPI_Mult_xfer(comm, 0, rmev,
266 (char*)recvbuf+i*recvcount*recvtype->size,
267 recvcount*recvtype->size,
268 recvtype, TMPI_GATHERV_TAG,&ret);
269 if (ret!=TMPI_SUCCESS)
270 return ret;
271 n_remaining--;
276 else
278 size_t bufsize = sendcount*sendtype->size;
280 if (!sendbuf) /* don't do pointer arithmetic on a NULL ptr */
282 return tMPI_Error(comm, TMPI_ERR_BUF);
285 /* first set up the data */
286 mev->tag=TMPI_GATHERV_TAG;
287 mev->datatype=sendtype;
288 mev->buf[0]=sendbuf;
289 mev->bufsize[0]=bufsize;
291 tMPI_Atomic_set(&(mev->n_remaining), 1);
292 /* we now publish our counter */
293 tMPI_Atomic_set(&(mev->current_counter), msc->counter);
295 /* and wait until root is done copying */
296 while (tMPI_Atomic_get( &(mev->n_remaining) ) > 0)
301 return ret;
305 int tMPI_Gatherv(void* sendbuf, int sendcount, tMPI_Datatype sendtype,
306 void* recvbuf, int *recvcounts, int *displs,
307 tMPI_Datatype recvtype, int root, tMPI_Comm comm)
309 struct multi_sync *msc;
310 int mevi;
311 int myrank;
312 int ret=TMPI_SUCCESS;
313 struct multi_env *mev;
314 if (!comm)
316 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
318 myrank=tMPI_Comm_seek_rank(comm, tMPI_Get_current());
321 msc=&(comm->msc[myrank]);
322 /* we increase our counter. */
323 msc->counter++;
324 mevi=msc->counter % N_MULTI_SYNC;
325 mev=&(msc->mev[mevi]);
326 if (myrank==root)
328 int i;
329 int n_remaining=comm->grp.N-1;
330 /* do root transfer */
332 size_t recvsize=recvtype->size*recvcounts[myrank];
333 size_t sendsize=sendtype->size*sendcount;
334 if (recvsize < sendsize)
336 return tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
338 if (recvtype != sendtype)
340 return tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
342 if ( sendbuf == (char*)recvbuf+displs[myrank]*recvtype->size )
344 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_XFER_BUF_OVERLAP);
347 memcpy((char*)recvbuf+displs[myrank]*recvtype->size,
348 sendbuf, sendsize);
350 /* poll data availability */
351 while(n_remaining>0)
353 for(i=0;i<comm->grp.N;i++)
355 struct multi_env *rmev=&(comm->msc[i].mev[mevi]);
356 if ((i!=myrank) &&
357 (tMPI_Atomic_get(&(rmev->current_counter))==msc->counter)&&
358 (tMPI_Atomic_get(&(rmev->n_remaining)) > 0) )
360 tMPI_Mult_xfer(comm, 0, rmev,
361 (char*)recvbuf+displs[i]*recvtype->size,
362 recvcounts[i]*recvtype->size,
363 recvtype, TMPI_GATHERV_TAG,&ret);
364 if (ret!=TMPI_SUCCESS)
365 return ret;
366 n_remaining--;
371 else
373 size_t bufsize = sendcount*sendtype->size;
375 if (!sendbuf) /* don't do pointer arithmetic on a NULL ptr */
377 return tMPI_Error(comm, TMPI_ERR_BUF);
380 /* first set up the data */
381 mev->tag=TMPI_GATHERV_TAG;
382 mev->datatype=sendtype;
383 mev->buf[0]=sendbuf;
384 mev->bufsize[0]=bufsize;
386 tMPI_Atomic_set(&(mev->n_remaining), 1);
387 /* we now publish our counter */
388 tMPI_Atomic_set(&(mev->current_counter), msc->counter);
390 /* and wait until root is done copying */
391 while (tMPI_Atomic_get( &(mev->n_remaining) ) > 0)
396 return ret;
400 int tMPI_Scatter(void* sendbuf, int sendcount, tMPI_Datatype sendtype,
401 void* recvbuf, int recvcount, tMPI_Datatype recvtype,
402 int root, tMPI_Comm comm)
404 struct multi_sync *msc;
405 int mevi;
406 int myrank;
407 int ret=TMPI_SUCCESS;
408 if (!comm)
410 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
412 myrank=tMPI_Comm_seek_rank(comm, tMPI_Get_current());
415 msc=&(comm->msc[myrank]);
416 /* we increase our counter. */
417 msc->counter++;
418 mevi=msc->counter % N_MULTI_SYNC;
419 if (myrank==root)
421 int i;
422 struct multi_env *mev=&(msc->mev[mevi]);
423 size_t bufsize = sendcount*sendtype->size;
425 if (!sendbuf) /* don't do pointer arithmetic on a NULL ptr */
427 return tMPI_Error(comm, TMPI_ERR_BUF);
429 /* first set up the data */
430 mev->tag=TMPI_SCATTER_TAG;
431 mev->datatype=sendtype;
432 for(i=0;i<comm->grp.N;i++)
434 mev->buf[i]=(char*)sendbuf + bufsize*i;
435 mev->bufsize[i]=bufsize;
437 tMPI_Atomic_set(&(mev->n_remaining), comm->grp.N-1);
438 /* we now publish our counter */
439 tMPI_Atomic_set(&(mev->current_counter), msc->counter);
441 /* do the root transfer */
443 size_t recvsize=recvtype->size*recvcount;
444 if (recvsize < bufsize)
446 return tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
448 if (recvtype != sendtype)
450 return tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
452 if ( recvbuf == mev->buf[myrank] )
454 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_XFER_BUF_OVERLAP);
457 memcpy(recvbuf, (void*)(mev->buf[myrank]), bufsize);
460 /* and wait until everybody is done copying */
461 while (tMPI_Atomic_get( &(mev->n_remaining) ) > 0)
465 else
467 /* get the root mev */
468 struct multi_env *rmev = &(comm->msc[root].mev[mevi]);
469 size_t bufsize=recvcount*recvtype->size;
470 /* wait until root becomes available */
471 while( tMPI_Atomic_get( &(rmev->current_counter)) != msc->counter)
474 tMPI_Mult_xfer(comm, myrank, rmev, recvbuf, bufsize, recvtype,
475 TMPI_SCATTER_TAG, &ret);
477 return ret;
482 int tMPI_Scatterv(void* sendbuf, int *sendcounts, int *displs,
483 tMPI_Datatype sendtype, void* recvbuf, int recvcount,
484 tMPI_Datatype recvtype, int root, tMPI_Comm comm)
486 struct multi_sync *msc;
487 int mevi;
488 int myrank;
489 int ret=TMPI_SUCCESS;
490 if (!comm)
492 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
494 myrank=tMPI_Comm_seek_rank(comm, tMPI_Get_current());
497 msc=&(comm->msc[myrank]);
498 /* we increase our counter. */
499 msc->counter++;
500 mevi=msc->counter % N_MULTI_SYNC;
501 if (myrank==root)
503 int i;
504 struct multi_env *mev=&(msc->mev[mevi]);
506 if (!sendbuf) /* don't do pointer arithmetic on a NULL ptr */
508 return tMPI_Error(comm, TMPI_ERR_BUF);
510 /* first set up the data */
511 mev->tag=TMPI_SCATTERV_TAG;
512 mev->datatype=sendtype;
513 for(i=0;i<comm->grp.N;i++)
515 mev->buf[i]=(char*)sendbuf + displs[i]*sendtype->size;
516 mev->bufsize[i]=sendtype->size*sendcounts[i];
518 tMPI_Atomic_set(&(mev->n_remaining), comm->grp.N-1);
519 /* we now publish our counter */
520 tMPI_Atomic_set(&(mev->current_counter), msc->counter);
522 /* do the root transfer */
524 size_t recvsize=recvtype->size*recvcount;
525 if (recvsize < mev->bufsize[myrank])
527 return tMPI_Error(comm, TMPI_ERR_XFER_BUFSIZE);
529 if (recvtype != sendtype)
531 return tMPI_Error(comm, TMPI_ERR_MULTI_MISMATCH);
533 if ( recvbuf == mev->buf[myrank] )
535 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_XFER_BUF_OVERLAP);
538 memcpy(recvbuf, (void*)(mev->buf[myrank]), mev->bufsize[myrank]);
541 /* and wait until everybody is done copying */
542 while (tMPI_Atomic_get( &(mev->n_remaining) ) > 0)
546 else
548 /* get the root mev */
549 struct multi_env *rmev = &(comm->msc[root].mev[mevi]);
550 size_t bufsize=recvcount*recvtype->size;
551 /* wait until root becomes available */
552 while( tMPI_Atomic_get( &(rmev->current_counter)) != msc->counter)
555 tMPI_Mult_xfer(comm, myrank, rmev, recvbuf, bufsize, recvtype,
556 TMPI_SCATTERV_TAG, &ret);
558 return ret;
564 int tMPI_Alltoall(void* sendbuf, int sendcount, tMPI_Datatype sendtype,
565 void* recvbuf, int recvcount, tMPI_Datatype recvtype,
566 tMPI_Comm comm)
568 struct multi_sync *msc;
569 struct multi_env *mev;
570 int mevi;
571 int i;
572 int myrank;
573 int n_remaining;
574 int ret=TMPI_SUCCESS;
576 if (!comm)
578 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
580 if (!sendbuf || !recvbuf) /* don't do pointer arithmetic on a NULL ptr */
582 return tMPI_Error(comm, TMPI_ERR_BUF);
585 myrank=tMPI_Comm_seek_rank(comm, tMPI_Get_current());
587 msc=&(comm->msc[myrank]);
588 /* we increase our counter. */
589 msc->counter++;
590 mevi=msc->counter % N_MULTI_SYNC;
591 mev=&(msc->mev[mevi]);
593 /* post our pointers */
595 /* first set up the data */
596 mev->tag=TMPI_ALLTOALLV_TAG;
597 mev->datatype=sendtype;
598 for(i=0;i<comm->grp.N;i++)
600 mev->buf[i]=(char*)sendbuf+i*sendcount*sendtype->size;
601 mev->bufsize[i]=sendcount*sendtype->size;
602 mev->read_data[i]=FALSE;
605 tMPI_Atomic_set(&(mev->n_remaining), comm->grp.N-1);
606 /* we now publish our counter */
607 tMPI_Atomic_set(&(mev->current_counter), msc->counter);
609 /* do root transfer */
611 size_t recvsize=recvtype->size*recvcount;
612 tMPI_Mult_xfer(comm, myrank, mev,
613 (char*)recvbuf+myrank*recvcount*recvtype->size,
614 recvsize, recvtype, TMPI_ALLTOALLV_TAG, &ret);
615 if (ret!=TMPI_SUCCESS)
616 return ret;
618 mev->read_data[myrank]=TRUE;
621 n_remaining=comm->grp.N-1;
622 /* poll data availability */
623 while(n_remaining>0)
625 for(i=0;i<comm->grp.N;i++)
627 struct multi_env *rmev=&(comm->msc[i].mev[mevi]);
628 if ((!mev->read_data[i]) &&
629 (tMPI_Atomic_get(&(rmev->current_counter))==msc->counter) )
631 tMPI_Mult_xfer(comm, myrank, rmev,
632 (char*)recvbuf+i*recvcount*recvtype->size,
633 recvcount*recvtype->size,
634 recvtype, TMPI_ALLTOALLV_TAG,&ret);
635 if (ret!=TMPI_SUCCESS)
636 return ret;
638 mev->read_data[i]=TRUE;
639 n_remaining--;
643 /* and wait until everybody is done copying our data */
644 while (tMPI_Atomic_get( &(mev->n_remaining) ) > 0)
648 return ret;
652 int tMPI_Alltoallv(void* sendbuf, int *sendcounts, int *sdispls,
653 tMPI_Datatype sendtype,
654 void* recvbuf, int *recvcounts, int *rdispls,
655 tMPI_Datatype recvtype,
656 tMPI_Comm comm)
659 struct multi_sync *msc;
660 struct multi_env *mev;
661 int mevi;
662 int i;
663 int myrank;
664 int n_remaining;
665 int ret=TMPI_SUCCESS;
667 if (!comm)
669 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
671 if (!sendbuf || !recvbuf) /* don't do pointer arithmetic on a NULL ptr */
673 return tMPI_Error(comm, TMPI_ERR_BUF);
676 myrank=tMPI_Comm_seek_rank(comm, tMPI_Get_current());
678 msc=&(comm->msc[myrank]);
679 /* we increase our counter. */
680 msc->counter++;
681 mevi=msc->counter % N_MULTI_SYNC;
682 mev=&(msc->mev[mevi]);
684 /* post our pointers */
686 /* first set up the data */
687 mev->tag=TMPI_ALLTOALLV_TAG;
688 mev->datatype=sendtype;
689 for(i=0;i<comm->grp.N;i++)
691 mev->buf[i]=(char*)sendbuf+sdispls[i]*sendtype->size;
692 mev->bufsize[i]=sendcounts[i]*sendtype->size;
693 mev->read_data[i]=FALSE;
696 tMPI_Atomic_set(&(mev->n_remaining), comm->grp.N-1);
697 /* we now publish our counter */
698 tMPI_Atomic_set(&(mev->current_counter), msc->counter);
700 /* do root transfer */
702 size_t recvsize=recvtype->size*recvcounts[myrank];
703 tMPI_Mult_xfer(comm, myrank, mev,
704 (char*)recvbuf+rdispls[myrank]*recvtype->size,
705 recvsize, recvtype, TMPI_ALLTOALLV_TAG, &ret);
706 if (ret!=TMPI_SUCCESS)
707 return ret;
709 mev->read_data[myrank]=TRUE;
712 n_remaining=comm->grp.N-1;
713 /* poll data availability */
714 while(n_remaining>0)
716 for(i=0;i<comm->grp.N;i++)
718 struct multi_env *rmev=&(comm->msc[i].mev[mevi]);
719 if ((!mev->read_data[i]) &&
720 (tMPI_Atomic_get(&(rmev->current_counter))==msc->counter) )
722 tMPI_Mult_xfer(comm, myrank, rmev,
723 (char*)recvbuf+rdispls[i]*recvtype->size,
724 recvcounts[i]*recvtype->size,
725 recvtype, TMPI_ALLTOALLV_TAG,&ret);
726 if (ret!=TMPI_SUCCESS)
727 return ret;
729 mev->read_data[i]=TRUE;
730 n_remaining--;
734 /* and wait until everybody is done copying our data */
735 while (tMPI_Atomic_get( &(mev->n_remaining) ) > 0)
739 return ret;
747 int tMPI_Reduce_run_op(void *dest, void *src_a, void *src_b,
748 tMPI_Datatype datatype, int count, tMPI_Op op,
749 tMPI_Comm comm)
751 tMPI_Op_fn fn=datatype->op_functions[op];
753 if (src_a==src_b)
755 return tMPI_Error(comm, TMPI_ERR_XFER_BUF_OVERLAP);
757 fn(dest, src_a, src_b, count);
758 return TMPI_SUCCESS;
761 int tMPI_Reduce_fast(void* sendbuf, void* recvbuf, int count,
762 tMPI_Datatype datatype, tMPI_Op op, int root, tMPI_Comm comm)
764 int myrank=tMPI_Comm_seek_rank(comm, tMPI_Get_current());
765 int i;
767 /* this function uses a binary tree-like reduction algorithm: */
768 int N=tMPI_Comm_N(comm);
769 int myrank_rtr=(N+myrank-root)%N; /* my rank relative to root */
770 int Nnbrs=N; /* number of neighbours to communicate with
771 (decreases exponentially) */
772 int nbr_dist=1; /* distance between communicating neighbours
773 (increases exponentially) */
774 int stepping=2; /* distance between non-communicating neighbours
775 (increases exponentially) */
776 int iteration=0;
778 if (count==0)
779 return TMPI_SUCCESS;
780 if (!comm)
782 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
784 if (!recvbuf)
786 return tMPI_Error(comm, TMPI_ERR_BUF);
788 if ( (!datatype->op_functions) || (!datatype->op_functions[op]) )
790 return tMPI_Error(comm, TMPI_ERR_OP_FN);
793 if (!sendbuf)/* i.e. sendbuf == tMPI_IN_PLACE */
795 sendbuf=recvbuf;
797 comm->sendbuf[myrank]=sendbuf;
798 comm->recvbuf[myrank]=recvbuf;
799 /* there's a barrier to wait for all the processes to put their
800 send/recvbuf in the global list */
801 tMPI_Spinlock_barrier_wait( &(comm->multicast_barrier[0]));
803 /* check the buffers */
804 for(i=0;i<N;i++)
806 if ( (i!=myrank) && ( (comm->recvbuf[i]==recvbuf) ||
807 (comm->sendbuf[i]==sendbuf) ) )
809 return tMPI_Error(comm, TMPI_ERR_XFER_BUF_OVERLAP);
813 while(Nnbrs>1)
815 int nbr;
816 /* reduce between myrank and myrank+nbr_dist, if there is such
817 a neighbour. */
818 #ifdef TMPI_DEBUG
819 printf("%d: iteration=%d, Nnbrs=%d, barrier size=%d and I'm still in here\n",
820 myrank, iteration, comm->N_multicast_barrier[iteration], Nnbrs);
821 fflush(0);
822 #endif
823 if (myrank_rtr%stepping == 0 )
825 if (myrank_rtr+nbr_dist<N)
827 void *a,*b;
828 int ret;
829 #ifdef TMPI_DEBUG
830 printf("%d: reducing with %d, iteration=%d\n",
831 myrank, myrank+nbr_dist, iteration);
832 fflush(0);
833 #endif
834 /* we reduce with our neighbour*/
835 nbr=(N+myrank+nbr_dist)%N; /* here we use the real rank */
836 if (iteration==0)
838 a=sendbuf;
839 b=(void*)(comm->sendbuf[nbr]);
841 else
843 a=recvbuf;
844 b=(void*)(comm->recvbuf[nbr]);
846 if ((ret=tMPI_Reduce_run_op(recvbuf, a, b, datatype,
847 count, op, comm)) != TMPI_SUCCESS)
848 return ret;
850 else
852 /* we still need to put things in the right buffer for the next
853 iteration */
854 if (iteration==0)
855 memcpy(recvbuf, sendbuf, datatype->size*count);
857 /* split barrier */
858 tMPI_Spinlock_barrier_wait( &(comm->multicast_barrier[iteration]));
860 else
862 /* this barrier is split because the threads not actually
863 calculating still need to be waiting for the thread using its
864 data to finish calculating */
865 #ifdef TMPI_DEBUG
866 printf("%d: barrier waiting, iteration=%d\n", myrank, iteration);
867 fflush(0);
868 #endif
869 tMPI_Spinlock_barrier_wait( &(comm->multicast_barrier[iteration]));
870 break;
872 #ifdef TMPI_DEBUG
873 printf("%d: barrier over, iteration=%d\n", myrank, iteration);
874 fflush(0);
875 #endif
876 Nnbrs = Nnbrs/2 + Nnbrs%2;
877 nbr_dist*=2;
878 stepping*=2;
879 iteration++;
881 return TMPI_SUCCESS;
884 int tMPI_Reduce(void* sendbuf, void* recvbuf, int count,
885 tMPI_Datatype datatype, tMPI_Op op, int root, tMPI_Comm comm)
887 int myrank=tMPI_Comm_seek_rank(comm, tMPI_Get_current());
888 int ret;
890 if (myrank==root)
892 if (!sendbuf) /* i.e. sendbuf == TMPI_IN_PLACE */
894 sendbuf=recvbuf;
897 else
899 #ifdef TMPI_WARN_MALLOC
900 fprintf(stderr, "Warning: malloc during tMPI_Reduce\n");
901 #endif
902 recvbuf=(void*)tMPI_Malloc(datatype->size*count);
904 ret=tMPI_Reduce_fast(sendbuf, recvbuf, count, datatype, op, root, comm);
905 if (myrank!=root)
907 free(recvbuf);
909 return ret;
912 int tMPI_Allreduce(void* sendbuf, void* recvbuf, int count,
913 tMPI_Datatype datatype, tMPI_Op op, tMPI_Comm comm)
915 void *rootbuf=NULL; /* root process' receive buffer */
916 int myrank=tMPI_Comm_seek_rank(comm, tMPI_Get_current());
917 int ret;
919 if (count==0)
920 return TMPI_SUCCESS;
921 if (!recvbuf)
923 return tMPI_Error(comm, TMPI_ERR_BUF);
925 if (!sendbuf) /* i.e. sendbuf == TMPI_IN_PLACE */
927 sendbuf=recvbuf;
930 ret=tMPI_Reduce_fast(sendbuf, recvbuf, count, datatype, op, 0, comm);
931 /* distribute rootbuf */
932 rootbuf=(void*)comm->recvbuf[0];
934 tMPI_Spinlock_barrier_wait( &(comm->multicast_barrier[0]));
935 /* and now we just copy things back inefficiently. We should make
936 a better tMPI_Scatter, and use that. */
937 if (myrank != 0)
939 if (rootbuf==recvbuf)
941 return tMPI_Error(comm, TMPI_ERR_XFER_BUF_OVERLAP);
943 memcpy(recvbuf, rootbuf, datatype->size*count );
945 tMPI_Spinlock_barrier_wait( &(comm->multicast_barrier[0]));
947 return ret;