1 /*---------------------------------------------------------------------------*\
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
5 \\ / A nd | Copyright (C) 2011 OpenFOAM Foundation
7 -------------------------------------------------------------------------------
9 This file is part of OpenFOAM.
11 OpenFOAM is free software: you can redistribute it and/or modify it
12 under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21 You should have received a copy of the GNU General Public License
22 along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
27 GAMMA has a (polling) receive handler which gets called every time a
28 received message is complete. Ours stores the length of the currently
29 received message and sets up the next buffer to store the next message
31 Note that the pattern between two processors can be
36 since the first swap might belong to a local exchange and the second to
37 a reduce. Since gamma has to have the receive buffers already set up we
38 have to allocate them big enough. To prevent excessive amounts needed we
39 dynamically resize them (never shrink) by sending special 'resize' messages
40 before sending a largish message.
42 Because of this we actually need four receive buffers:
44 - receive resize message
45 - receive normal message
46 - receive resize message
47 - receive normal message
50 The special resize message is a message with a special header which
51 (hopefully) should never appear in normal exchanges (it actually checks
52 for this in the OPstream::send)
54 \*---------------------------------------------------------------------------*/
57 #include "PstreamReduceOps.H"
58 #include "OSspecific.H"
59 #include "PstreamGlobals.H"
67 # include <linux/gamma/libgamma.h>
71 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
76 // Receive handler to copy out received message length and switch buffers.
77 static void handler(void)
79 label current = PstreamGlobals::recvIndex[gamma_active_port];
81 List<char>& buf = PstreamGlobals::recvBuf[current][gamma_active_port];
82 label bufLen = PstreamGlobals::recvBufLen[current][gamma_active_port];
86 FatalErrorIn("Pstream::handler(void)")
87 << "Buffer length not reset : "
89 << " when receiving message of size " << gamma_msglen
90 << " from processor " << gamma_active_port << endl
91 << "This means that the existing data has not been consumed yet"
92 << " (by IPstream::read) and means your communication pattern"
93 << " is probably not balanced (a receive for every send)"
95 << "This can happen if you have e.g. gather without scatter."
97 << "A workaround is to increase the depth of the circular"
98 << " receive buffers in PstreamGlobals.H"
107 || gamma_msglen > buf.size()
110 FatalErrorIn("Pstream::handler(void)")
111 << "Received message of size " << gamma_msglen
112 << " from processor " << gamma_active_port
114 << "but global receive buffer is only of size "
116 << abort(FatalError);
119 // Check for resize message
120 label resizeLen = PstreamGlobals::getSizeFromHeader
130 Pout<< "Pstream::handler : Resize message:" << resizeLen
131 << " from proc " << gamma_active_port
133 << PstreamGlobals::getMaxBufSize(gamma_active_port)
137 // Saved current buffer.
140 if (resizeLen > PstreamGlobals::getMaxBufSize(gamma_active_port))
144 Pout<< "Pstream::handler :"
145 << " resizing receive buffer for processor "
148 << PstreamGlobals::getMaxBufSize(gamma_active_port)
149 << " to " << resizeLen << Foam::endl;
152 // Save the pointer (that gamma knows about) so we can safely
153 // gamma_switch_to_buffer with a valid pointer.
154 // Not sure if nessecary but do anyway.
155 savedBuf.transfer(buf);
157 // Resize all the buffers
158 forAll(PstreamGlobals::recvBuf, i)
161 PstreamGlobals::recvBuf[i][gamma_active_port];
163 // gamma_munlock(chars.begin(), chars.size());
164 chars.setSize(resizeLen);
165 // gamma_mlock(chars.begin(), chars.size());
169 // Update length with special value to denote resize was done.
170 PstreamGlobals::recvBufLen[current][gamma_active_port] = -2;
174 // Update length with actual message length
175 PstreamGlobals::recvBufLen[current][gamma_active_port] = gamma_msglen;
178 // Go to next buffer.
179 label next = PstreamGlobals::recvBuf.fcIndex(current);
180 PstreamGlobals::recvIndex[gamma_active_port] = next;
182 // gamma_switch_to_buffer
186 PstreamGlobals::recvBuf[next][gamma_active_port].begin(),
187 PstreamGlobals::recvBuf[next][gamma_active_port].size()
192 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
194 void Pstream::addValidParOptions(HashTable<string>& validParOptions)
196 validParOptions.insert("np", "");
197 validParOptions.insert("p4pg", "PI file");
198 validParOptions.insert("p4wd", "directory");
199 validParOptions.insert("p4amslave", "");
200 validParOptions.insert("p4yourname", "hostname");
202 validParOptions.insert("machinefile", "machine file");
203 validParOptions.insert("GAMMANP", "numProcs");
204 validParOptions.insert("GAMMAHOME", "gamma cwd");
205 validParOptions.insert("GAMMA", "1(enable) or 0(disable)");
209 bool Pstream::init(int& argc, char**& argv)
213 string npString("-GAMMANP");
215 for (label i = 0; i < argc; i++)
217 if (argv[i] == npString)
221 numprocs = atoi(argv[i+1]);
228 unsigned char smallNumprocs = numprocs;
230 gamma_init(smallNumprocs, argc, argv);
232 myProcNo_ = gamma_my_node();
234 // Make sure printing with prefix.
237 procIDs_.setSize(numprocs);
239 forAll(procIDs_, procNo)
241 procIDs_[procNo] = procNo;
245 // Allocate receive buffers.
246 // ~~~~~~~~~~~~~~~~~~~~~~~~~
248 // Make sure each receive buffer is at least large enough to receive
249 // the resize message.
251 // Current active buffer
252 PstreamGlobals::recvIndex.setSize(numprocs);
253 PstreamGlobals::recvIndex = 0;
254 PstreamGlobals::consumeIndex.setSize(numprocs);
255 PstreamGlobals::consumeIndex = 0;
257 forAll(PstreamGlobals::recvBuf, i)
259 PstreamGlobals::recvBufLen[i].setSize(numprocs);
260 PstreamGlobals::recvBufLen[i] = -1;
262 List<List<char> >& buffers = PstreamGlobals::recvBuf[i];
264 buffers.setSize(numprocs);
265 forAll(buffers, procNo)
267 if (procNo != myProcNo_)
269 buffers[procNo].setSize(PstreamGlobals::initialBufferLen);
271 // Acc. to gamma sources all buffers need to be in memory.
272 // Either locked or "write touched".
275 // buffers[procNo].begin(),
276 // buffers[procNo].size()
283 // Lock the special resize message
286 // reinterpret_cast<char*>(PstreamGlobals::resizeMessage),
287 // PstreamGlobals::resizeMessageLen*sizeof(uint64_t)
291 // Attach current receive buffers
292 forAll(procIDs_, procNo)
294 if (procNo != myProcNo_)
296 // Buffer index (always 0 at this point)
297 label current = PstreamGlobals::recvIndex[procNo];
299 // Current buffer for this processor.
300 List<char>& buf = PstreamGlobals::recvBuf[current][procNo];
302 gamma_set_active_port
304 procNo, //unsigned short port,
305 procNo, //unsigned short dest_node,
306 gamma_my_par_pid(), //unsigned char dest_par_pid,
307 myProcNo_, //unsigned short dest_port,
309 procNo, //unsigned short semaphore,
310 GO_BACK, //unsigned char buffer_kind,
318 // Make sure all have allocated the ports (so set the receive buffers)
321 Info<< "GAMMA Pstream initialized with:" << nl
322 << " floatTransfer : " << floatTransfer << nl
323 << " nProcsSimpleSum : " << nProcsSimpleSum << nl
324 << " scheduledTransfer : " << Pstream::scheduledTransfer << nl
327 // Now that nprocs is known construct communication tables.
328 initCommunicationSchedule();
334 void Pstream::exit(int errnum)
336 // gamma_munlockall();
342 void Pstream::abort()
344 Pout<< "**Pstream::abort()**" << endl;
345 // gamma_munlockall();
350 void reduce(scalar& Value, const sumOp<scalar>& bop)
352 if (!Pstream::parRun())
359 Pout<< "**entering Pstream::reduce for " << Value << Foam::endl;
363 if (Pstream::master())
367 int slave=Pstream::firstSlave();
368 slave<=Pstream::lastSlave();
379 reinterpret_cast<char*>(&value), // buf
380 sizeof(Value) // bufSize
386 "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
387 ) << "IPstream::read failed"
388 << Foam::abort(FatalError);
391 Value = bop(Value, value);
401 reinterpret_cast<const char*>(&Value), // buf
402 sizeof(Value), // bufSize
403 false // non-buffered
409 "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
410 ) << "OPstream::write failed"
411 << Foam::abort(FatalError);
415 if (Pstream::master())
419 int slave=Pstream::firstSlave();
420 slave<=Pstream::lastSlave();
429 reinterpret_cast<const char*>(&Value), // buf
430 sizeof(Value), // bufSize,
431 false // non-buffered
437 "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
438 ) << "OPstream::write failed"
439 << Foam::abort(FatalError);
450 reinterpret_cast<char*>(&Value), // buf
451 sizeof(Value) // bufSize
457 "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
458 ) << "IPstream::read failed"
459 << Foam::abort(FatalError);
465 Pout<< "**exiting Pstream::reduce with " << Value << Foam::endl;
470 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
472 } // End namespace Foam
474 // ************************************************************************* //