1 /*---------------------------------------------------------------------------*\
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
5 \\ / A nd | Copyright (C) 2011 OpenFOAM Foundation
7 -------------------------------------------------------------------------------
9 This file is part of OpenFOAM.
11 OpenFOAM is free software: you can redistribute it and/or modify it
12 under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21 You should have received a copy of the GNU General Public License
22 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
27 \*---------------------------------------------------------------------------*/
31 #include "UIPstream.H"
32 #include "PstreamGlobals.H"
33 #include "IOstreams.H"
35 // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
37 Foam::UIPstream::UIPstream
39 const commsTypes commsType,
41 DynamicList<char>& externalBuf,
42 label& externalBufPosition,
44 const bool clearAtEnd,
50 Istream(format, version),
51 fromProcNo_(fromProcNo),
52 externalBuf_(externalBuf),
53 externalBufPosition_(externalBufPosition),
55 clearAtEnd_(clearAtEnd),
61 if (commsType == UPstream::nonBlocking)
63 // Message is already received into externalBuf
69 label wantedSize = externalBuf_.capacity();
73 Pout<< "UIPstream::UIPstream : read from:" << fromProcNo
74 << " tag:" << tag << " wanted size:" << wantedSize
79 // If the buffer size is not specified, probe the incomming message
83 MPI_Probe(procID(fromProcNo_), tag_, MPI_COMM_WORLD, &status);
84 MPI_Get_count(&status, MPI_BYTE, &messageSize_);
86 externalBuf_.setCapacity(messageSize_);
87 wantedSize = messageSize_;
91 Pout<< "UIPstream::UIPstream : probed size:" << wantedSize
96 messageSize_ = UIPstream::read
100 externalBuf_.begin(),
105 // Set addressed size. Leave actual allocated memory intact.
106 externalBuf_.setSize(messageSize_);
112 "UIPstream::UIPstream(const commsTypes, const int, "
113 "DynamicList<char>&, streamFormat, versionNumber)"
115 << Foam::abort(FatalError);
121 Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
123 UPstream(buffers.commsType_),
124 Istream(buffers.format_, buffers.version_),
125 fromProcNo_(fromProcNo),
126 externalBuf_(buffers.recvBuf_[fromProcNo]),
127 externalBufPosition_(buffers.recvBufPos_[fromProcNo]),
132 if (commsType() != UPstream::scheduled && !buffers.finishedSendsCalled_)
134 FatalErrorIn("UIPstream::UIPstream(const int, PstreamBuffers&)")
135 << "PstreamBuffers::finishedSends() never called." << endl
136 << "Please call PstreamBuffers::finishedSends() after doing"
137 << " all your sends (using UOPstream) and before doing any"
138 << " receives (using UIPstream)" << Foam::exit(FatalError);
144 if (commsType() == UPstream::nonBlocking)
146 // Message is already received into externalBuf
147 messageSize_ = buffers.recvBuf_[fromProcNo].size();
153 label wantedSize = externalBuf_.capacity();
157 Pout<< "UIPstream::UIPstream PstreamBuffers :"
158 << " read from:" << fromProcNo
159 << " tag:" << tag_ << " wanted size:" << wantedSize
163 // If the buffer size is not specified, probe the incomming message
167 MPI_Probe(procID(fromProcNo_), tag_, MPI_COMM_WORLD, &status);
168 MPI_Get_count(&status, MPI_BYTE, &messageSize_);
170 externalBuf_.setCapacity(messageSize_);
171 wantedSize = messageSize_;
175 Pout<< "UIPstream::UIPstream PstreamBuffers : probed size:"
176 << wantedSize << Foam::endl;
180 messageSize_ = UIPstream::read
184 externalBuf_.begin(),
189 // Set addressed size. Leave actual allocated memory intact.
190 externalBuf_.setSize(messageSize_);
196 "UIPstream::UIPstream(const int, PstreamBuffers&)"
198 << Foam::abort(FatalError);
204 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
206 Foam::label Foam::UIPstream::read
208 const commsTypes commsType,
209 const int fromProcNo,
211 const std::streamsize bufSize,
217 Pout<< "UIPstream::read : starting read from:" << fromProcNo
218 << " tag:" << tag << " wanted size:" << label(bufSize)
219 << " commsType:" << UPstream::commsTypeNames[commsType]
223 if (commsType == blocking || commsType == scheduled)
244 "(const int fromProcNo, char* buf, std::streamsize bufSize)"
245 ) << "MPI_Recv cannot receive incomming message"
246 << Foam::abort(FatalError);
252 // Check size of message read
255 MPI_Get_count(&status, MPI_BYTE, &messageSize);
259 Pout<< "UIPstream::read : finished read from:" << fromProcNo
260 << " tag:" << tag << " read size:" << label(bufSize)
261 << " commsType:" << UPstream::commsTypeNames[commsType]
265 if (messageSize > bufSize)
270 "(const int fromProcNo, char* buf, std::streamsize bufSize)"
271 ) << "buffer (" << label(bufSize)
272 << ") not large enough for incomming message ("
273 << messageSize << ')'
274 << Foam::abort(FatalError);
279 else if (commsType == nonBlocking)
300 "(const int fromProcNo, char* buf, std::streamsize bufSize)"
301 ) << "MPI_Recv cannot start non-blocking receive"
302 << Foam::abort(FatalError);
309 Pout<< "UIPstream::read : started read from:" << fromProcNo
310 << " tag:" << tag << " read size:" << label(bufSize)
311 << " commsType:" << UPstream::commsTypeNames[commsType]
312 << " request:" << PstreamGlobals::outstandingRequests_.size()
316 PstreamGlobals::outstandingRequests_.append(request);
318 // Assume the message is completely received.
326 "(const int fromProcNo, char* buf, std::streamsize bufSize)"
327 ) << "Unsupported communications type "
329 << Foam::abort(FatalError);
336 // ************************************************************************* //