Merge branch 'master' of github.com:OpenCFD/OpenFOAM-2.0.x
[OpenFOAM-2.0.x.git] / src / Pstream / mpi / UIPread.C
blobdac3c35ccafa3bf0786ae535180b27059be773f6
1 /*---------------------------------------------------------------------------*\
2   =========                 |
3   \\      /  F ield         | OpenFOAM: The Open Source CFD Toolbox
4    \\    /   O peration     |
5     \\  /    A nd           | Copyright (C) 2004-2010 OpenCFD Ltd.
6      \\/     M anipulation  |
7 -------------------------------------------------------------------------------
8 License
9     This file is part of OpenFOAM.
11     OpenFOAM is free software: you can redistribute it and/or modify it
12     under the terms of the GNU General Public License as published by
13     the Free Software Foundation, either version 3 of the License, or
14     (at your option) any later version.
16     OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18     FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
19     for more details.
21     You should have received a copy of the GNU General Public License
22     along with OpenFOAM.  If not, see <http://www.gnu.org/licenses/>.
24 Description
25     Read from UIPstream
27 \*---------------------------------------------------------------------------*/
29 #include "mpi.h"
31 #include "UIPstream.H"
32 #include "PstreamGlobals.H"
33 #include "IOstreams.H"
35 // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
37 Foam::UIPstream::UIPstream
39     const commsTypes commsType,
40     const int fromProcNo,
41     DynamicList<char>& externalBuf,
42     label& externalBufPosition,
43     const int tag,
44     const bool clearAtEnd,
45     streamFormat format,
46     versionNumber version
49     UPstream(commsType),
50     Istream(format, version),
51     fromProcNo_(fromProcNo),
52     externalBuf_(externalBuf),
53     externalBufPosition_(externalBufPosition),
54     tag_(tag),
55     clearAtEnd_(clearAtEnd),
56     messageSize_(0)
58     setOpened();
59     setGood();
61     if (commsType == UPstream::nonBlocking)
62     {
63         // Message is already received into externalBuf
64     }
65     else
66     {
67         MPI_Status status;
69         label wantedSize = externalBuf_.capacity();
71         if (debug)
72         {
73             Pout<< "UIPstream::UIPstream : read from:" << fromProcNo
74                 << " tag:" << tag << " wanted size:" << wantedSize
75                 << Foam::endl;
76         }
79         // If the buffer size is not specified, probe the incomming message
80         // and set it
81         if (!wantedSize)
82         {
83             MPI_Probe(procID(fromProcNo_), tag_, MPI_COMM_WORLD, &status);
84             MPI_Get_count(&status, MPI_BYTE, &messageSize_);
86             externalBuf_.setCapacity(messageSize_);
87             wantedSize = messageSize_;
89             if (debug)
90             {
91                 Pout<< "UIPstream::UIPstream : probed size:" << wantedSize
92                     << Foam::endl;
93             }
94         }
96         messageSize_ = UIPstream::read
97         (
98             commsType,
99             fromProcNo_,
100             externalBuf_.begin(),
101             wantedSize,
102             tag_
103         );
105         // Set addressed size. Leave actual allocated memory intact.
106         externalBuf_.setSize(messageSize_);
108         if (!messageSize_)
109         {
110             FatalErrorIn
111             (
112                 "UIPstream::UIPstream(const commsTypes, const int, "
113                 "DynamicList<char>&, streamFormat, versionNumber)"
114             )   << "read failed"
115                 << Foam::abort(FatalError);
116         }
117     }
121 Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
123     UPstream(buffers.commsType_),
124     Istream(buffers.format_, buffers.version_),
125     fromProcNo_(fromProcNo),
126     externalBuf_(buffers.recvBuf_[fromProcNo]),
127     externalBufPosition_(buffers.recvBufPos_[fromProcNo]),
128     tag_(buffers.tag_),
129     clearAtEnd_(true),
130     messageSize_(0)
132     if (commsType() != UPstream::scheduled && !buffers.finishedSendsCalled_)
133     {
134         FatalErrorIn("UIPstream::UIPstream(const int, PstreamBuffers&)")
135             << "PstreamBuffers::finishedSends() never called." << endl
136             << "Please call PstreamBuffers::finishedSends() after doing"
137             << " all your sends (using UOPstream) and before doing any"
138             << " receives (using UIPstream)" << Foam::exit(FatalError);
139     }
141     setOpened();
142     setGood();
144     if (commsType() == UPstream::nonBlocking)
145     {
146         // Message is already received into externalBuf
147         messageSize_ = buffers.recvBuf_[fromProcNo].size();
148     }
149     else
150     {
151         MPI_Status status;
153         label wantedSize = externalBuf_.capacity();
155         if (debug)
156         {
157             Pout<< "UIPstream::UIPstream PstreamBuffers :"
158                 << " read from:" << fromProcNo
159                 << " tag:" << tag_ << " wanted size:" << wantedSize
160                 << Foam::endl;
161         }
163         // If the buffer size is not specified, probe the incomming message
164         // and set it
165         if (!wantedSize)
166         {
167             MPI_Probe(procID(fromProcNo_), tag_, MPI_COMM_WORLD, &status);
168             MPI_Get_count(&status, MPI_BYTE, &messageSize_);
170             externalBuf_.setCapacity(messageSize_);
171             wantedSize = messageSize_;
173             if (debug)
174             {
175                 Pout<< "UIPstream::UIPstream PstreamBuffers : probed size:"
176                     << wantedSize << Foam::endl;
177             }
178         }
180         messageSize_ = UIPstream::read
181         (
182             commsType(),
183             fromProcNo_,
184             externalBuf_.begin(),
185             wantedSize,
186             tag_
187         );
189         // Set addressed size. Leave actual allocated memory intact.
190         externalBuf_.setSize(messageSize_);
192         if (!messageSize_)
193         {
194             FatalErrorIn
195             (
196                 "UIPstream::UIPstream(const int, PstreamBuffers&)"
197             )   << "read failed"
198                 << Foam::abort(FatalError);
199         }
200     }
204 // * * * * * * * * * * * * * * * Member Functions  * * * * * * * * * * * * * //
206 Foam::label Foam::UIPstream::read
208     const commsTypes commsType,
209     const int fromProcNo,
210     char* buf,
211     const std::streamsize bufSize,
212     const int tag
215     if (debug)
216     {
217         Pout<< "UIPstream::read : starting read from:" << fromProcNo
218             << " tag:" << tag << " wanted size:" << label(bufSize)
219             << " commsType:" << UPstream::commsTypeNames[commsType]
220             << Foam::endl;
221     }
223     if (commsType == blocking || commsType == scheduled)
224     {
225         MPI_Status status;
227         if
228         (
229             MPI_Recv
230             (
231                 buf,
232                 bufSize,
233                 MPI_PACKED,
234                 procID(fromProcNo),
235                 tag,
236                 MPI_COMM_WORLD,
237                 &status
238             )
239         )
240         {
241             FatalErrorIn
242             (
243                 "UIPstream::read"
244                 "(const int fromProcNo, char* buf, std::streamsize bufSize)"
245             )   << "MPI_Recv cannot receive incomming message"
246                 << Foam::abort(FatalError);
248             return 0;
249         }
252         // Check size of message read
254         label messageSize;
255         MPI_Get_count(&status, MPI_BYTE, &messageSize);
257         if (debug)
258         {
259             Pout<< "UIPstream::read : finished read from:" << fromProcNo
260                 << " tag:" << tag << " read size:" << label(bufSize)
261                 << " commsType:" << UPstream::commsTypeNames[commsType]
262                 << Foam::endl;
263         }
265         if (messageSize > bufSize)
266         {
267             FatalErrorIn
268             (
269                 "UIPstream::read"
270                 "(const int fromProcNo, char* buf, std::streamsize bufSize)"
271             )   << "buffer (" << label(bufSize)
272                 << ") not large enough for incomming message ("
273                 << messageSize << ')'
274                 << Foam::abort(FatalError);
275         }
277         return messageSize;
278     }
279     else if (commsType == nonBlocking)
280     {
281         MPI_Request request;
283         if
284         (
285             MPI_Irecv
286             (
287                 buf,
288                 bufSize,
289                 MPI_PACKED,
290                 procID(fromProcNo),
291                 tag,
292                 MPI_COMM_WORLD,
293                 &request
294             )
295         )
296         {
297             FatalErrorIn
298             (
299                 "UIPstream::read"
300                 "(const int fromProcNo, char* buf, std::streamsize bufSize)"
301             )   << "MPI_Recv cannot start non-blocking receive"
302                 << Foam::abort(FatalError);
304             return 0;
305         }
307         if (debug)
308         {
309             Pout<< "UIPstream::read : started read from:" << fromProcNo
310                 << " tag:" << tag << " read size:" << label(bufSize)
311                 << " commsType:" << UPstream::commsTypeNames[commsType]
312                 << " request:" << PstreamGlobals::outstandingRequests_.size()
313                 << Foam::endl;
314         }
316         PstreamGlobals::outstandingRequests_.append(request);
318         // Assume the message is completely received.
319         return bufSize;
320     }
321     else
322     {
323         FatalErrorIn
324         (
325             "UIPstream::read"
326             "(const int fromProcNo, char* buf, std::streamsize bufSize)"
327         )   << "Unsupported communications type "
328             << commsType
329             << Foam::abort(FatalError);
331         return 0;
332     }
336 // ************************************************************************* //