2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
38 /* this file is #included from collective.c */
40 int tMPI_Alltoall(void* sendbuf
, int sendcount
, tMPI_Datatype sendtype
,
41 void* recvbuf
, int recvcount
, tMPI_Datatype recvtype
,
49 size_t sendsize
=sendtype
->size
*sendcount
;
50 size_t recvsize
=recvtype
->size
*recvcount
;
52 struct tmpi_thread
*cur
=tMPI_Get_current();
55 tMPI_Profile_count_start(cur
);
58 tMPI_Trace_print("tMPI_Alltoall(%p, %d, %p, %p, %d, %p, %p)",
59 sendbuf
, sendcount
, sendtype
,
60 recvbuf
, recvcount
, recvtype
, comm
);
65 return tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_COMM
);
67 if (!sendbuf
|| !recvbuf
) /* don't do pointer arithmetic on a NULL ptr */
69 return tMPI_Error(comm
, TMPI_ERR_BUF
);
72 myrank
=tMPI_Comm_seek_rank(comm
, cur
);
74 /* we increase our counter, and determine which coll_env we get */
75 cev
=tMPI_Get_cev(comm
, myrank
, &synct
);
77 /* post our pointers */
78 /* we set up multiple posts, so no Post_multi */
79 cev
->met
[myrank
].tag
=TMPI_ALLTOALL_TAG
;
80 cev
->met
[myrank
].datatype
=sendtype
;
81 tMPI_Atomic_set( &(cev
->met
[myrank
].n_remaining
), cev
->N
-1 );
82 for(i
=0;i
<comm
->grp
.N
;i
++)
84 cev
->met
[myrank
].bufsize
[i
]=sendsize
;
85 cev
->met
[myrank
].buf
[i
]=(char*)sendbuf
+sendsize
*i
;
86 cev
->met
[myrank
].read_data
[i
]=FALSE
;
88 tMPI_Atomic_set(&(cev
->met
[myrank
].current_sync
), synct
);
90 /* post availability */
94 tMPI_Event_signal( &(cev
->met
[i
].recv_ev
) );
97 /* we don't do the copy buffer thing here because it's pointless:
98 the processes have to synchronize anyway, because they all
101 /* do root transfer */
102 tMPI_Coll_root_xfer(comm
, sendtype
, recvtype
,
104 (char*)sendbuf
+sendsize
*myrank
,
105 (char*)recvbuf
+recvsize
*myrank
, &ret
);
106 cev
->met
[myrank
].read_data
[myrank
]=TRUE
;
107 /* and poll data availability */
108 n_remaining
=cev
->N
-1;
111 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
112 tMPI_Profile_wait_start(cur
);
114 tMPI_Event_wait( &(cev
->met
[myrank
]).recv_ev
) ;
115 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
116 tMPI_Profile_wait_stop(cur
, TMPIWAIT_Coll_recv
);
118 for(i
=0;i
<cev
->N
;i
++)
120 if ((! cev
->met
[myrank
].read_data
[i
]) &&
121 (tMPI_Atomic_get(&(cev
->met
[i
].current_sync
))==synct
))
123 tMPI_Event_process( &(cev
->met
[myrank
]).recv_ev
, 1) ;
124 tMPI_Mult_recv(comm
, cev
, i
, myrank
, TMPI_ALLTOALL_TAG
,
125 recvtype
, recvsize
, (char*)recvbuf
+recvsize
*i
,
127 if (ret
!=TMPI_SUCCESS
)
129 cev
->met
[myrank
].read_data
[i
]=TRUE
;
136 /* and wait until everybody is done copying our data */
137 tMPI_Wait_for_others(cev
, myrank
);
140 tMPI_Profile_count_stop(cur
, TMPIFN_Alltoall
);
146 int tMPI_Alltoallv(void* sendbuf
, int *sendcounts
, int *sdispls
,
147 tMPI_Datatype sendtype
,
148 void* recvbuf
, int *recvcounts
, int *rdispls
,
149 tMPI_Datatype recvtype
,
154 struct coll_env
*cev
;
156 int ret
=TMPI_SUCCESS
;
159 struct tmpi_thread
*cur
=tMPI_Get_current();
162 tMPI_Profile_count_start(cur
);
165 tMPI_Trace_print("tMPI_Alltoallv(%p, %p, %p, %p, %p, %p, %p, %p, %p, %p)",
166 sendbuf
, sendcounts
, sdispls
, sendtype
,
167 recvbuf
, recvcounts
, rdispls
, recvtype
,
172 return tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_COMM
);
174 if (!sendbuf
|| !recvbuf
) /* don't do pointer arithmetic on a NULL ptr */
176 return tMPI_Error(comm
, TMPI_ERR_BUF
);
179 myrank
=tMPI_Comm_seek_rank(comm
, cur
);
181 /* we increase our counter, and determine which coll_env we get */
182 cev
=tMPI_Get_cev(comm
, myrank
, &synct
);
184 /* post our pointers */
185 /* we set up multiple posts, so no Post_multi */
186 cev
->met
[myrank
].tag
=TMPI_ALLTOALLV_TAG
;
187 cev
->met
[myrank
].datatype
=sendtype
;
188 tMPI_Atomic_set( &(cev
->met
[myrank
].n_remaining
), cev
->N
-1 );
189 for(i
=0;i
<comm
->grp
.N
;i
++)
191 cev
->met
[myrank
].bufsize
[i
]=sendtype
->size
*sendcounts
[i
];
192 cev
->met
[myrank
].buf
[i
]=(char*)sendbuf
+sendtype
->size
*sdispls
[i
];
193 cev
->met
[myrank
].read_data
[i
]=FALSE
;
195 tMPI_Atomic_set(&(cev
->met
[myrank
].current_sync
), synct
);
197 /* post availability */
198 for(i
=0;i
<cev
->N
;i
++)
201 tMPI_Event_signal( &(cev
->met
[i
].recv_ev
) );
204 /* we don't do the copy buffer thing here because it's pointless:
205 the processes have to synchronize anyway, because they all
208 /* do root transfer */
209 tMPI_Coll_root_xfer(comm
, sendtype
, recvtype
,
210 sendtype
->size
*sendcounts
[myrank
],
211 recvtype
->size
*recvcounts
[myrank
],
212 (char*)sendbuf
+sendtype
->size
*sdispls
[myrank
],
213 (char*)recvbuf
+recvtype
->size
*rdispls
[myrank
], &ret
);
214 cev
->met
[myrank
].read_data
[myrank
]=TRUE
;
216 /* and poll data availability */
217 n_remaining
=cev
->N
-1;
220 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
221 tMPI_Profile_wait_start(cur
);
223 tMPI_Event_wait( &(cev
->met
[myrank
]).recv_ev
) ;
224 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
225 tMPI_Profile_wait_stop(cur
, TMPIWAIT_Coll_recv
);
227 for(i
=0;i
<cev
->N
;i
++)
229 if ((! cev
->met
[myrank
].read_data
[i
]) &&
230 (tMPI_Atomic_get(&(cev
->met
[i
].current_sync
))==synct
) )
232 tMPI_Event_process( &(cev
->met
[myrank
]).recv_ev
, 1) ;
233 tMPI_Mult_recv(comm
, cev
, i
, myrank
, TMPI_ALLTOALLV_TAG
,
234 recvtype
, recvtype
->size
*recvcounts
[i
],
235 (char*)recvbuf
+recvtype
->size
*rdispls
[i
], &ret
);
236 if (ret
!=TMPI_SUCCESS
)
238 cev
->met
[myrank
].read_data
[i
]=TRUE
;
244 /* and wait until everybody is done copying our data */
245 tMPI_Wait_for_others(cev
, myrank
);
248 tMPI_Profile_count_stop(cur
, TMPIFN_Alltoallv
);