Merge branch 'master' of ssh://git.code.sf.net/p/foam-extend/foam-extend-3.2
[foam-extend-3.2.git] / src / foam / db / IOstreams / Pstreams / IPread.C
blob78668c39af942bf4e0248f31a11a832688595aaf
1 /*---------------------------------------------------------------------------*\
2   =========                 |
3   \\      /  F ield         | foam-extend: Open Source CFD
4    \\    /   O peration     | Version:     3.2
5     \\  /    A nd           | Web:         http://www.foam-extend.org
6      \\/     M anipulation  | For copyright notice see file Copyright
7 -------------------------------------------------------------------------------
8 License
9     This file is part of foam-extend.
11     foam-extend is free software: you can redistribute it and/or modify it
12     under the terms of the GNU General Public License as published by the
13     Free Software Foundation, either version 3 of the License, or (at your
14     option) any later version.
16     foam-extend is distributed in the hope that it will be useful, but
17     WITHOUT ANY WARRANTY; without even the implied warranty of
18     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19     General Public License for more details.
21     You should have received a copy of the GNU General Public License
22     along with foam-extend.  If not, see <http://www.gnu.org/licenses/>.
24 Description
25     Read token and binary block from IPstream
27 \*---------------------------------------------------------------------------*/
29 #include "mpi.h"
31 #include "IPstream.H"
32 #include "PstreamGlobals.H"
34 // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
36 Foam::IPstream::IPstream
38     const commsTypes commsType,
39     const int fromProcNo,
40     const label bufSize,
41     streamFormat format,
42     versionNumber version
45     Pstream(commsType, bufSize),
46     Istream(format, version),
47     fromProcNo_(fromProcNo),
48     messageSize_(0)
50     setOpened();
51     setGood();
53     MPI_Status status;
55     // If the buffer size is not specified, probe the incomming message
56     // and set it
57     if (!bufSize)
58     {
59         MPI_Probe(procID(fromProcNo_), msgType(), MPI_COMM_WORLD, &status);
60         MPI_Get_count(&status, MPI_BYTE, &messageSize_);
62         buf_.setSize(messageSize_);
63     }
65     messageSize_ = read(commsType, fromProcNo_, buf_.begin(), buf_.size());
67     if (!messageSize_)
68     {
69         FatalErrorIn
70         (
71             "IPstream::IPstream(const int fromProcNo, "
72             "const label bufSize, streamFormat format, versionNumber version)"
73         )   << "read failed"
74             << Foam::abort(FatalError);
75     }
79 // * * * * * * * * * * * * * * * Member Functions  * * * * * * * * * * * * * //
81 Foam::label Foam::IPstream::read
83     const commsTypes commsType,
84     const int fromProcNo,
85     char* buf,
86     const std::streamsize bufSize
89     if (commsType == blocking || commsType == scheduled)
90     {
91         MPI_Status status;
93         if
94         (
95             MPI_Recv
96             (
97                 buf,
98                 bufSize,
99                 MPI_PACKED,
100                 procID(fromProcNo),
101                 msgType(),
102                 MPI_COMM_WORLD,
103                 &status
104             )
105         )
106         {
107             FatalErrorIn
108             (
109                 "IPstream::read"
110                 "(const int fromProcNo, char* buf, std::streamsize bufSize)"
111             )   << "MPI_Recv cannot receive incomming message"
112                 << Foam::abort(FatalError);
114             return 0;
115         }
118         // Check size of message read
120         label messageSize;
121         MPI_Get_count(&status, MPI_BYTE, &messageSize);
123         if (messageSize > bufSize)
124         {
125             FatalErrorIn
126             (
127                 "IPstream::read"
128                 "(const int fromProcNo, char* buf, std::streamsize bufSize)"
129             )   << "buffer (" << label(bufSize)
130                 << ") not large enough for incomming message ("
131                 << messageSize << ')'
132                 << Foam::abort(FatalError);
133         }
135         return messageSize;
136     }
137     else if (commsType == nonBlocking)
138     {
139         MPI_Request request;
141         if
142         (
143             MPI_Irecv
144             (
145                 buf,
146                 bufSize,
147                 MPI_PACKED,
148                 procID(fromProcNo),
149                 msgType(),
150                 MPI_COMM_WORLD,
151                 &request
152             )
153         )
154         {
155             FatalErrorIn
156             (
157                 "IPstream::read"
158                 "(const int fromProcNo, char* buf, std::streamsize bufSize)"
159             )   << "MPI_Recv cannot start non-blocking receive"
160                 << Foam::abort(FatalError);
162             return 0;
163         }
165         PstreamGlobals::IPstream_outstandingRequests_.append(request);
167         return 1;
168     }
169     else
170     {
171         FatalErrorIn
172         (
173             "IPstream::read"
174             "(const int fromProcNo, char* buf, std::streamsize bufSize)"
175         )   << "Unsupported communications type " << commsType
176             << Foam::abort(FatalError);
178         return 0;
179     }
183 void Foam::IPstream::waitRequests()
185     if (PstreamGlobals::IPstream_outstandingRequests_.size())
186     {
187         if
188         (
189             MPI_Waitall
190             (
191                 PstreamGlobals::IPstream_outstandingRequests_.size(),
192                 PstreamGlobals::IPstream_outstandingRequests_.begin(),
193                 MPI_STATUSES_IGNORE
194             )
195         )
196         {
197             FatalErrorIn
198             (
199                 "IPstream::waitRequests()"
200             )   << "MPI_Waitall returned with error" << endl;
201         }
203         PstreamGlobals::IPstream_outstandingRequests_.clear();
204     }
208 bool Foam::IPstream::finishedRequest(const label i)
210     if (i >= PstreamGlobals::IPstream_outstandingRequests_.size())
211     {
212         FatalErrorIn
213         (
214             "IPstream::finishedRequest(const label)"
215         )   << "There are "
216             << PstreamGlobals::IPstream_outstandingRequests_.size()
217             << " outstanding send requests and you are asking for i=" << i
218             << nl
219             << "Maybe you are mixing blocking/non-blocking comms?"
220             << Foam::abort(FatalError);
221     }
223     int flag;
224     MPI_Test
225     (
226         &PstreamGlobals::IPstream_outstandingRequests_[i],
227         &flag,
228         MPI_STATUS_IGNORE
229     );
231     return flag != 0;
235 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
237 // ************************************************************************* //