2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
38 /* this file is #included from collective.c */
41 int tMPI_Scatter(void* sendbuf
, int sendcount
, tMPI_Datatype sendtype
,
42 void* recvbuf
, int recvcount
, tMPI_Datatype recvtype
,
43 int root
, tMPI_Comm comm
)
49 struct tmpi_thread
*cur
=tMPI_Get_current();
52 tMPI_Profile_count_start(cur
);
55 tMPI_Trace_print("tMPI_Scatter(%p, %d, %p, %p, %d, %p, %d, %p)",
56 sendbuf
, sendcount
, sendtype
,
57 recvbuf
, recvcount
, recvtype
, root
, comm
);
61 return tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_COMM
);
63 myrank
=tMPI_Comm_seek_rank(comm
, cur
);
65 /* we increase our counter, and determine which coll_env we get */
66 cev
=tMPI_Get_cev(comm
, myrank
, &synct
);
71 size_t sendsize
=sendtype
->size
*sendcount
;
72 size_t total_send_size
=0;
73 #ifdef USE_COLLECTIVE_COPY_BUFFER
77 if (!sendbuf
) /* don't do pointer arithmetic on a NULL ptr */
79 return tMPI_Error(comm
, TMPI_ERR_BUF
);
82 /* we set up multiple posts, so no Post_multi */
83 cev
->met
[myrank
].tag
=TMPI_SCATTER_TAG
;
84 cev
->met
[myrank
].datatype
=sendtype
;
85 tMPI_Atomic_set( &(cev
->met
[myrank
].n_remaining
), cev
->N
-1 );
86 for(i
=0;i
<comm
->grp
.N
;i
++)
88 total_send_size
+= sendtype
->size
*sendcount
;
89 cev
->met
[myrank
].bufsize
[i
]=sendsize
;
90 cev
->met
[myrank
].buf
[i
]=(char*)sendbuf
+sendsize
*i
;
92 #ifdef USE_COLLECTIVE_COPY_BUFFER
93 /* we must copy our own data too, unfortunately (otherwise there's
95 using_cb
=(total_send_size
< (size_t)((cev
->N
)*COPY_BUFFER_SIZE
));
96 cev
->met
[myrank
].using_cb
=using_cb
;
99 /* we set cpbuf stuff to NULL initially */
100 for(i
=0;i
<cev
->N
;i
++)
102 /*cev->met[myrank].cpbuf[i]=NULL;*/
103 tMPI_Atomic_ptr_set(&(cev
->met
[myrank
].cpbuf
[i
]), NULL
);
105 tMPI_Atomic_set(&(cev
->met
[myrank
].buf_readcount
), 0);
109 /* post availability */
110 for(i
=0;i
<cev
->N
;i
++)
113 tMPI_Event_signal( &(cev
->met
[i
].recv_ev
) );
119 #ifdef USE_COLLECTIVE_COPY_BUFFER
120 /* do the copy_buffer thing */
123 /* copy the buffer locally. First allocate */
124 cev
->met
[myrank
].cb
=tMPI_Copy_buffer_list_get(
125 &(tMPI_Get_thread(comm
,myrank
)->cbl_multi
));
126 if (cev
->met
[myrank
].cb
->size
< total_send_size
)
128 fprintf(stderr
, "ERROR: cb size too small\n");
131 /* copy to the new buf */
132 memcpy(cev
->met
[myrank
].cb
->buf
, sendbuf
, total_send_size
);
134 /* post the new buf */
135 for(i
=0;i
<cev
->N
;i
++)
137 tMPI_Atomic_memory_barrier();
138 tMPI_Atomic_ptr_set(&(cev
->met
[myrank
].cpbuf
[i
]),
139 (char*)cev
->met
[myrank
].cb
->buf
+sendsize
*i
);
140 /*cev->met[myrank].cpbuf[i] = (char*)cev->met[myrank].cb->buf +
146 /* do root transfer */
147 if (recvbuf
!=TMPI_IN_PLACE
)
149 tMPI_Coll_root_xfer(comm
, sendtype
, recvtype
,
150 sendsize
, recvtype
->size
*recvcount
,
151 (char*)sendbuf
+sendsize
*myrank
,
155 /* and wait until everybody is done copying */
156 tMPI_Wait_for_others(cev
, myrank
);
160 /* get the root cev */
161 size_t bufsize
=recvcount
*recvtype
->size
;
162 /* wait until root becomes available */
163 tMPI_Wait_for_data(cur
, cev
, myrank
);
164 tMPI_Mult_recv(comm
, cev
, root
, myrank
,TMPI_SCATTER_TAG
, recvtype
,
165 bufsize
, recvbuf
, &ret
);
168 tMPI_Profile_count_stop(cur
, TMPIFN_Scatter
);
175 int tMPI_Scatterv(void* sendbuf
, int *sendcounts
, int *displs
,
176 tMPI_Datatype sendtype
, void* recvbuf
, int recvcount
,
177 tMPI_Datatype recvtype
, int root
, tMPI_Comm comm
)
180 struct coll_env
*cev
;
182 int ret
=TMPI_SUCCESS
;
183 struct tmpi_thread
*cur
=tMPI_Get_current();
186 tMPI_Trace_print("tMPI_Scatterv(%p, %p, %p, %p, %p, %d, %p, %d, %p)",
187 sendbuf
, sendcounts
, displs
, sendtype
, recvbuf
,
188 recvcount
, recvtype
, root
, comm
);
191 tMPI_Profile_count_start(cur
);
197 return tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_COMM
);
199 myrank
=tMPI_Comm_seek_rank(comm
, cur
);
201 /* we increase our counter, and determine which coll_env we get */
202 cev
=tMPI_Get_cev(comm
, myrank
, &synct
);
207 size_t total_send_size
=0;
208 #ifdef USE_COLLECTIVE_COPY_BUFFER
212 if (!sendbuf
) /* don't do pointer arithmetic on a NULL ptr */
214 return tMPI_Error(comm
, TMPI_ERR_BUF
);
217 /* we set up multiple posts, so no Post_multi */
218 cev
->met
[myrank
].tag
=TMPI_SCATTERV_TAG
;
219 cev
->met
[myrank
].datatype
=sendtype
;
220 tMPI_Atomic_set( &(cev
->met
[myrank
].n_remaining
), cev
->N
-1 );
221 for(i
=0;i
<cev
->N
;i
++)
223 total_send_size
+= sendtype
->size
*sendcounts
[i
];
224 cev
->met
[myrank
].bufsize
[i
]=sendtype
->size
*sendcounts
[i
];
225 cev
->met
[myrank
].buf
[i
]=(char*)sendbuf
+sendtype
->size
*displs
[i
];
227 #ifdef USE_COLLECTIVE_COPY_BUFFER
228 /* we must copy our own data too, unfortunately (otherwise there's
230 using_cb
=(total_send_size
< (size_t)((cev
->N
)*COPY_BUFFER_SIZE
));
231 cev
->met
[myrank
].using_cb
=using_cb
;
234 /* we set cpbuf stuff to NULL initially */
235 for(i
=0;i
<cev
->N
;i
++)
237 /*cev->met[myrank].cpbuf[i]=NULL;*/
238 tMPI_Atomic_ptr_set(&(cev
->met
[myrank
].cpbuf
[i
]), NULL
);
240 tMPI_Atomic_set(&(cev
->met
[myrank
].buf_readcount
), 0);
244 /* post availability */
245 for(i
=0;i
<cev
->N
;i
++)
248 tMPI_Event_signal( &(cev
->met
[i
].recv_ev
) );
253 #ifdef USE_COLLECTIVE_COPY_BUFFER
254 /* do the copy_buffer thing */
257 /* copy the buffer locally. First allocate */
258 cev
->met
[myrank
].cb
=tMPI_Copy_buffer_list_get(
259 &(tMPI_Get_thread(comm
,myrank
)->cbl_multi
));
260 if (cev
->met
[myrank
].cb
->size
< total_send_size
)
262 fprintf(stderr
, "ERROR: cb size too small\n");
265 /* copy to the new buf */
266 memcpy(cev
->met
[myrank
].cb
->buf
, sendbuf
, total_send_size
);
267 /* post the new buf */
268 for(i
=0;i
<cev
->N
;i
++)
270 tMPI_Atomic_memory_barrier();
271 tMPI_Atomic_ptr_set(&(cev
->met
[myrank
].cpbuf
[i
]),
272 (char*)cev
->met
[myrank
].cb
->buf
+
273 sendtype
->size
*displs
[i
]);
274 /*cev->met[myrank].cpbuf[i]=(char*)cev->met[myrank].cb->buf +
275 sendtype->size*displs[i];*/
277 tMPI_Atomic_memory_barrier();
281 /* do root transfer */
282 if (recvbuf
!=TMPI_IN_PLACE
)
284 tMPI_Coll_root_xfer(comm
, sendtype
, recvtype
,
285 sendtype
->size
*sendcounts
[myrank
],
286 recvtype
->size
*recvcount
,
287 (char*)sendbuf
+sendtype
->size
*displs
[myrank
],
292 /* and wait until everybody is done copying */
293 tMPI_Wait_for_others(cev
, myrank
);
297 /* get the root cev */
298 size_t bufsize
=recvcount
*recvtype
->size
;
299 /* wait until root becomes available */
300 tMPI_Wait_for_data(cur
, cev
, myrank
);
301 tMPI_Mult_recv(comm
, cev
, root
, myrank
, TMPI_SCATTERV_TAG
,
302 recvtype
, bufsize
, recvbuf
, &ret
);
305 tMPI_Profile_count_stop(cur
, TMPIFN_Scatterv
);