ENH: autoLayerDriver: better layering information message
[OpenFOAM-2.0.x.git] / src / Pstream / gamma / Pstream.C
blob4994fe7884c5ea8f8db5a23d770149ba931deb87
1 /*---------------------------------------------------------------------------*\
2   =========                 |
3   \\      /  F ield         | OpenFOAM: The Open Source CFD Toolbox
4    \\    /   O peration     |
5     \\  /    A nd           | Copyright (C) 2011 OpenFOAM Foundation
6      \\/     M anipulation  |
7 -------------------------------------------------------------------------------
8 License
9     This file is part of OpenFOAM.
11     OpenFOAM is free software: you can redistribute it and/or modify it
12     under the terms of the GNU General Public License as published by
13     the Free Software Foundation, either version 3 of the License, or
14     (at your option) any later version.
16     OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18     FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
19     for more details.
21     You should have received a copy of the GNU General Public License
22     along with OpenFOAM.  If not, see <http://www.gnu.org/licenses/>.
24 Description
25     Pstream for GAMMA
27     GAMMA has a (polling) receive handler which gets called every time a
28     received message is complete. Ours stores the length of the currently
29     received message and sets up the next buffer to store the next message
30     in.
31     Note that the pattern between two processors can be
32     - send
33     - receive
34     - receive
35     - send
36     since the first swap might belong to a local exchange and the second to
37     a reduce. Since gamma has to have the receive buffers already set up we
38     have to allocate them big enough. To prevent excessive amounts needed we
39     dynamically resize them (never shrink) by sending special 'resize' messages
40     before sending a largish message.
42     Because of this we actually need four receive buffers:
43     - send
44     - receive resize message
45     - receive normal message
46     - receive resize message
47     - receive normal message
48     - send
50     The special resize message is a message with a special header which
51     (hopefully) should never appear in normal exchanges (it actually checks
52     for this in the OPstream::send)
54 \*---------------------------------------------------------------------------*/
56 #include "Pstream.H"
57 #include "PstreamReduceOps.H"
58 #include "OSspecific.H"
59 #include "PstreamGlobals.H"
61 #include <cstring>
62 #include <cstdlib>
63 #include <csignal>
65 extern "C"
67 #   include <linux/gamma/libgamma.h>
71 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
73 namespace Foam
76 // Receive handler to copy out received message length and switch buffers.
77 static void handler(void)
79     label current = PstreamGlobals::recvIndex[gamma_active_port];
81     List<char>& buf = PstreamGlobals::recvBuf[current][gamma_active_port];
82     label bufLen = PstreamGlobals::recvBufLen[current][gamma_active_port];
84     if (bufLen != -1)
85     {
86         FatalErrorIn("Pstream::handler(void)")
87             << "Buffer length not reset : "
88             << bufLen
89             << " when receiving message of size " << gamma_msglen
90             << " from processor " << gamma_active_port << endl
91             << "This means that the existing data has not been consumed yet"
92             << " (by IPstream::read) and means your communication pattern"
93             << " is probably not balanced (a receive for every send)"
94             << endl
95             << "This can happen if you have e.g. gather without scatter."
96             << endl
97             << "A workaround is to increase the depth of the circular"
98             << " receive buffers in PstreamGlobals.H"
99             << abort(FatalError);
100     }
103     // Some checks
104     if
105     (
106         gamma_msglen < 0
107      || gamma_msglen > buf.size()
108     )
109     {
110         FatalErrorIn("Pstream::handler(void)")
111             << "Received message of size " << gamma_msglen
112             << " from processor " << gamma_active_port
113             << Foam::endl
114             << "but global receive buffer is only of size "
115             << buf.size()
116             << abort(FatalError);
117     }
119     // Check for resize message
120     label resizeLen = PstreamGlobals::getSizeFromHeader
121     (
122         buf.begin(),
123         gamma_msglen
124     );
126     if (resizeLen != -1)
127     {
128         if (Pstream::debug)
129         {
130             Pout<< "Pstream::handler : Resize message:" << resizeLen
131                 << " from proc " << gamma_active_port
132                 << " current size:"
133                 << PstreamGlobals::getMaxBufSize(gamma_active_port)
134                 << Foam::endl;
135         }
137         // Saved current buffer.
138         List<char> savedBuf;
140         if (resizeLen > PstreamGlobals::getMaxBufSize(gamma_active_port))
141         {
142             if (Pstream::debug)
143             {
144                 Pout<< "Pstream::handler :"
145                     << " resizing receive buffer for processor "
146                     << gamma_active_port
147                     << " from "
148                     << PstreamGlobals::getMaxBufSize(gamma_active_port)
149                     << " to " << resizeLen << Foam::endl;
150             }
152             // Save the pointer (that gamma knows about) so we can safely
153             // gamma_switch_to_buffer with a valid pointer.
154             // Not sure if nessecary but do anyway.
155             savedBuf.transfer(buf);
157             // Resize all the buffers
158             forAll(PstreamGlobals::recvBuf, i)
159             {
160                 List<char>& chars =
161                     PstreamGlobals::recvBuf[i][gamma_active_port];
163 //                gamma_munlock(chars.begin(), chars.size());
164                 chars.setSize(resizeLen);
165 //                gamma_mlock(chars.begin(), chars.size());
166             }
167         }
169         // Update length with special value to denote resize was done.
170         PstreamGlobals::recvBufLen[current][gamma_active_port] = -2;
171     }
172     else
173     {
174         // Update length with actual message length
175         PstreamGlobals::recvBufLen[current][gamma_active_port] = gamma_msglen;
176     }
178     // Go to next buffer.
179     label next = PstreamGlobals::recvBuf.fcIndex(current);
180     PstreamGlobals::recvIndex[gamma_active_port] = next;
182 //    gamma_switch_to_buffer
183     gamma_post_recv
184     (
185         gamma_active_port,
186         PstreamGlobals::recvBuf[next][gamma_active_port].begin(),
187         PstreamGlobals::recvBuf[next][gamma_active_port].size()
188     );
192 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
194 void Pstream::addValidParOptions(HashTable<string>& validParOptions)
196     validParOptions.insert("np", "");
197     validParOptions.insert("p4pg", "PI file");
198     validParOptions.insert("p4wd", "directory");
199     validParOptions.insert("p4amslave", "");
200     validParOptions.insert("p4yourname", "hostname");
202     validParOptions.insert("machinefile", "machine file");
203     validParOptions.insert("GAMMANP", "numProcs");
204     validParOptions.insert("GAMMAHOME", "gamma cwd");
205     validParOptions.insert("GAMMA", "1(enable) or 0(disable)");
209 bool Pstream::init(int& argc, char**& argv)
211     int numprocs = 0;
213     string npString("-GAMMANP");
215     for (label i = 0; i < argc; i++)
216     {
217         if (argv[i] == npString)
218         {
219             if (i+1 < argc)
220             {
221                 numprocs = atoi(argv[i+1]);
222                 break;
223             }
224         }
225     }
227     // Initialize GAMMA
228     unsigned char smallNumprocs = numprocs;
230     gamma_init(smallNumprocs, argc, argv);
232     myProcNo_ = gamma_my_node();
234     // Make sure printing with prefix.
235     setParRun();
237     procIDs_.setSize(numprocs);
239     forAll(procIDs_, procNo)
240     {
241         procIDs_[procNo] = procNo;
242     }
245     // Allocate receive buffers.
246     // ~~~~~~~~~~~~~~~~~~~~~~~~~
248     // Make sure each receive buffer is at least large enough to receive
249     // the resize message.
251     // Current active buffer
252     PstreamGlobals::recvIndex.setSize(numprocs);
253     PstreamGlobals::recvIndex = 0;
254     PstreamGlobals::consumeIndex.setSize(numprocs);
255     PstreamGlobals::consumeIndex = 0;
257     forAll(PstreamGlobals::recvBuf, i)
258     {
259         PstreamGlobals::recvBufLen[i].setSize(numprocs);
260         PstreamGlobals::recvBufLen[i] = -1;
262         List<List<char> >& buffers = PstreamGlobals::recvBuf[i];
264         buffers.setSize(numprocs);
265         forAll(buffers, procNo)
266         {
267             if (procNo != myProcNo_)
268             {
269                 buffers[procNo].setSize(PstreamGlobals::initialBufferLen);
271                 // Acc. to gamma sources all buffers need to be in memory.
272                 // Either locked or "write touched".
273 //                gamma_mlock
274  //               (
275   //                  buffers[procNo].begin(),
276    //                 buffers[procNo].size()
277     //            );
278             }
279         }
280     }
283     // Lock the special resize message
284     //    gamma_mlock
285     //    (
286     //       reinterpret_cast<char*>(PstreamGlobals::resizeMessage),
287     //      PstreamGlobals::resizeMessageLen*sizeof(uint64_t)
288     // );
291     // Attach current receive buffers
292     forAll(procIDs_, procNo)
293     {
294         if (procNo != myProcNo_)
295         {
296             // Buffer index (always 0 at this point)
297             label current = PstreamGlobals::recvIndex[procNo];
299             // Current buffer for this processor.
300             List<char>& buf = PstreamGlobals::recvBuf[current][procNo];
302             gamma_set_active_port
303             (
304                 procNo,             //unsigned short port,
305                 procNo,             //unsigned short dest_node,
306                 gamma_my_par_pid(), //unsigned char dest_par_pid,
307                 myProcNo_,          //unsigned short dest_port,
308                 handler,            //callback
309                 procNo,             //unsigned short semaphore,
310                 GO_BACK,            //unsigned char buffer_kind,
311                 buf.begin(),
312                 buf.size()
313             );
314         }
315     }
318     // Make sure all have allocated the ports (so set the receive buffers)
319     gamma_sync();
321     Info<< "GAMMA Pstream initialized with:" << nl
322         << "    floatTransfer         : " << floatTransfer << nl
323         << "    nProcsSimpleSum       : " << nProcsSimpleSum << nl
324         << "    scheduledTransfer     : " << Pstream::scheduledTransfer << nl
325         << Foam::endl;
327     // Now that nprocs is known construct communication tables.
328     initCommunicationSchedule();
330     return true;
334 void Pstream::exit(int errnum)
336     //    gamma_munlockall();
337     gamma_exit();
338     //gamma_abort();
342 void Pstream::abort()
344     Pout<< "**Pstream::abort()**" << endl;
345     // gamma_munlockall();
346     gamma_abort();
350 void reduce(scalar& Value, const sumOp<scalar>& bop)
352     if (!Pstream::parRun())
353     {
354         return;
355     }
357     if (Pstream::debug)
358     {
359         Pout<< "**entering Pstream::reduce for " << Value << Foam::endl;
360     }
363     if (Pstream::master())
364     {
365         for
366         (
367             int slave=Pstream::firstSlave();
368             slave<=Pstream::lastSlave();
369             slave++
370         )
371         {
372             scalar value;
374             if
375             (
376                !IPstream::read
377                 (
378                     slave,
379                     reinterpret_cast<char*>(&value),    // buf
380                     sizeof(Value)                       // bufSize
381                 )
382             )
383             {
384                 FatalErrorIn
385                 (
386                     "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
387                 )   << "IPstream::read failed"
388                     << Foam::abort(FatalError);
389             }
391             Value = bop(Value, value);
392         }
393     }
394     else
395     {
396         if
397         (
398            !OPstream::write
399             (
400                 Pstream::masterNo(),
401                 reinterpret_cast<const char*>(&Value),  // buf
402                 sizeof(Value),                          // bufSize
403                 false                                   // non-buffered
404             )
405         )
406         {
407             FatalErrorIn
408             (
409                 "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
410             )   << "OPstream::write failed"
411                 << Foam::abort(FatalError);
412         }
413     }
415     if (Pstream::master())
416     {
417         for
418         (
419             int slave=Pstream::firstSlave();
420             slave<=Pstream::lastSlave();
421             slave++
422         )
423         {
424             if
425             (
426                !OPstream::write
427                 (
428                     slave,
429                     reinterpret_cast<const char*>(&Value),  // buf
430                     sizeof(Value),                          // bufSize,
431                     false                                   // non-buffered
432                 )
433             )
434             {
435                 FatalErrorIn
436                 (
437                     "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
438                 )   << "OPstream::write failed"
439                     << Foam::abort(FatalError);
440             }
441         }
442     }
443     else
444     {
445         if
446         (
447            !IPstream::read
448             (
449                 Pstream::masterNo(),
450                 reinterpret_cast<char*>(&Value),    // buf
451                 sizeof(Value)                       // bufSize
452             )
453         )
454         {
455             FatalErrorIn
456             (
457                 "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
458             )   << "IPstream::read failed"
459                 << Foam::abort(FatalError);
460         }
461     }
463     if (Pstream::debug)
464     {
465         Pout<< "**exiting Pstream::reduce with " << Value << Foam::endl;
466     }
470 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
472 } // End namespace Foam
474 // ************************************************************************* //