1 /*---------------------------------------------------------------------------*\
3 \\ / F ield | foam-extend: Open Source CFD
4 \\ / O peration | Version: 3.2
5 \\ / A nd | Web: http://www.foam-extend.org
6 \\/ M anipulation | For copyright notice see file Copyright
7 -------------------------------------------------------------------------------
9 This file is part of foam-extend.
11 foam-extend is free software: you can redistribute it and/or modify it
12 under the terms of the GNU General Public License as published by the
13 Free Software Foundation, either version 3 of the License, or (at your
14 option) any later version.
16 foam-extend is distributed in the hope that it will be useful, but
17 WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with foam-extend. If not, see <http://www.gnu.org/licenses/>.
24 \*---------------------------------------------------------------------------*/
34 #include "PstreamReduceOps.H"
36 #include "dictionary.H"
37 #include "OSspecific.H"
40 # define MPI_SCALAR MPI_FLOAT
42 # define MPI_SCALAR MPI_DOUBLE
44 # define MPI_SCALAR MPI_LONG_DOUBLE
48 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
50 defineTypeNameAndDebug(Foam::Pstream, 0);
53 const char* Foam::NamedEnum<Foam::Pstream::commsTypes, 3>::names[] =
60 const Foam::NamedEnum<Foam::Pstream::commsTypes, 3>
61 Foam::Pstream::commsTypeNames;
64 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
66 void Foam::Pstream::setParRun()
70 Pout.prefix() = '[' + name(myProcNo()) + "] ";
71 Perr.prefix() = '[' + name(myProcNo()) + "] ";
75 void Foam::Pstream::calcLinearComm(const label nProcs)
77 linearCommunication_.setSize(nProcs);
80 labelList belowIDs(nProcs - 1);
86 linearCommunication_[0] = commsStruct
95 // Slaves. Have no below processors, only communicate up to master
96 for (label procID = 1; procID < nProcs; procID++)
98 linearCommunication_[procID] = commsStruct
110 // Append my children (and my children children etc.) to allReceives.
111 void Foam::Pstream::collectReceives
114 const List<dynamicLabelList >& receives,
115 dynamicLabelList& allReceives
118 const dynamicLabelList& myChildren = receives[procID];
120 forAll(myChildren, childI)
122 allReceives.append(myChildren[childI]);
123 collectReceives(myChildren[childI], receives, allReceives);
128 // Tree like schedule. For 8 procs:
140 // The sends/receives for all levels are collected per processor (one send per
141 // processor; multiple receives possible) creating a table:
144 // proc receives from sends to
145 // ---- ------------- --------
154 void Foam::Pstream::calcTreeComm(label nProcs)
157 while ((1 << nLevels) < nProcs)
162 List<dynamicLabelList > receives(nProcs);
163 labelList sends(nProcs, -1);
165 // Info<< "Using " << nLevels << " communication levels" << endl;
168 label childOffset = offset/2;
170 for (label level = 0; level < nLevels; level++)
173 while (receiveID < nProcs)
175 // Determine processor that sends and we receive from
176 label sendID = receiveID + childOffset;
180 receives[receiveID].append(sendID);
181 sends[sendID] = receiveID;
191 // For all processors find the processors it receives data from
192 // (and the processors they receive data from etc.)
193 List<dynamicLabelList > allReceives(nProcs);
194 for (label procID = 0; procID < nProcs; procID++)
196 collectReceives(procID, receives, allReceives[procID]);
200 treeCommunication_.setSize(nProcs);
202 for (label procID = 0; procID < nProcs; procID++)
204 treeCommunication_[procID] = commsStruct
209 receives[procID].shrink(),
210 allReceives[procID].shrink()
216 // Callback from Pstream::init() : initialize linear and tree communication
217 // schedules now that nProcs is known.
218 void Foam::Pstream::initCommunicationSchedule()
220 calcLinearComm(nProcs());
221 calcTreeComm(nProcs());
226 // valid parallel options vary between implementations, but flag common ones.
227 // if they are not removed by MPI_Init(), the subsequent argument processing
228 // will notice that they are wrong
229 void Foam::Pstream::addValidParOptions(HashTable<string>& validParOptions)
231 validParOptions.insert("np", "");
232 validParOptions.insert("p4pg", "PI file");
233 validParOptions.insert("p4wd", "directory");
234 validParOptions.insert("p4amslave", "");
235 validParOptions.insert("p4yourname", "hostname");
236 validParOptions.insert("GAMMANP", "number of instances");
237 validParOptions.insert("machinefile", "machine file");
241 bool Foam::Pstream::init(int& argc, char**& argv)
243 MPI_Init(&argc, &argv);
246 MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
247 MPI_Comm_rank(MPI_COMM_WORLD, &myProcNo_);
251 FatalErrorIn("Pstream::init(int& argc, char**& argv)")
252 << "bool Pstream::init(int& argc, char**& argv) : "
253 "attempt to run parallel on 1 processor"
254 << Foam::abort(FatalError);
257 procIDs_.setSize(numprocs);
259 forAll(procIDs_, procNo)
261 procIDs_[procNo] = procNo;
267 string bufferSizeName = getEnv("MPI_BUFFER_SIZE");
269 if (bufferSizeName.size())
271 int bufferSize = atoi(bufferSizeName.c_str());
275 MPI_Buffer_attach(new char[bufferSize], bufferSize);
280 FatalErrorIn("Pstream::init(int& argc, char**& argv)")
281 << "Pstream::init(int& argc, char**& argv) : "
282 << "environment variable MPI_BUFFER_SIZE not defined"
283 << Foam::abort(FatalError);
287 int processorNameLen;
288 char processorName[MPI_MAX_PROCESSOR_NAME];
290 MPI_Get_processor_name(processorName, &processorNameLen);
292 //signal(SIGABRT, stop);
294 // Now that nprocs is known construct communication tables.
295 initCommunicationSchedule();
301 void Foam::Pstream::exit(int errnum)
306 MPI_Buffer_detach(&buff, &size);
317 MPI_Abort(MPI_COMM_WORLD, errnum);
322 void Foam::Pstream::abort()
324 MPI_Abort(MPI_COMM_WORLD, 1);
328 void Foam::reduce(scalar& Value, const sumOp<scalar>& bop)
330 if (!Pstream::parRun())
335 if (Pstream::nProcs() <= Pstream::nProcsSimpleSum())
337 if (Pstream::master())
341 int slave=Pstream::firstSlave();
342 slave<=Pstream::lastSlave();
355 Pstream::procID(slave),
364 "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
365 ) << "MPI_Recv failed"
366 << Foam::abort(FatalError);
369 Value = bop(Value, value);
381 Pstream::procID(Pstream::masterNo()),
389 "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
390 ) << "MPI_Send failed"
391 << Foam::abort(FatalError);
396 if (Pstream::master())
400 int slave=Pstream::firstSlave();
401 slave<=Pstream::lastSlave();
412 Pstream::procID(slave),
420 "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
421 ) << "MPI_Send failed"
422 << Foam::abort(FatalError);
435 Pstream::procID(Pstream::masterNo()),
444 "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
445 ) << "MPI_Recv failed"
446 << Foam::abort(FatalError);
453 MPI_Allreduce(&Value, &sum, 1, MPI_SCALAR, MPI_SUM, MPI_COMM_WORLD);
457 int myProcNo = Pstream::myProcNo();
458 int nProcs = Pstream::nProcs();
461 // receive from children
464 int thisLevelOffset = 2;
465 int childLevelOffset = thisLevelOffset/2;
470 (childLevelOffset < nProcs)
471 && (myProcNo % thisLevelOffset) == 0
474 childProcId = myProcNo + childLevelOffset;
478 if (childProcId < nProcs)
487 Pstream::procID(childProcId),
496 "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
497 ) << "MPI_Recv failed"
498 << Foam::abort(FatalError);
501 Value = bop(Value, value);
505 thisLevelOffset <<= 1;
506 childLevelOffset = thisLevelOffset/2;
510 // send and receive from parent
512 if (!Pstream::master())
514 int parentId = myProcNo - (myProcNo % thisLevelOffset);
523 Pstream::procID(parentId),
531 "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
532 ) << "MPI_Send failed"
533 << Foam::abort(FatalError);
543 Pstream::procID(parentId),
552 "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
553 ) << "MPI_Recv failed"
554 << Foam::abort(FatalError);
560 // distribute to my children
563 thisLevelOffset >>= 1;
564 childLevelOffset = thisLevelOffset/2;
568 childProcId = myProcNo + childLevelOffset;
570 if (childProcId < nProcs)
579 Pstream::procID(childProcId),
587 "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
588 ) << "MPI_Send failed"
589 << Foam::abort(FatalError);
594 thisLevelOffset >>= 1;
595 childLevelOffset = thisLevelOffset/2;
602 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
604 // Initialise my process number to 0 (the master)
605 int Foam::Pstream::myProcNo_(0);
608 // By default this is not a parallel run
609 bool Foam::Pstream::parRun_(false);
612 // List of process IDs
613 Foam::List<int> Foam::Pstream::procIDs_(1, 0);
616 // Standard transfer message type
617 const int Foam::Pstream::msgType_(1);
620 // Linear communication schedule
621 Foam::List<Foam::Pstream::commsStruct> Foam::Pstream::linearCommunication_(0);
624 // Multi level communication schedule
625 Foam::List<Foam::Pstream::commsStruct> Foam::Pstream::treeCommunication_(0);
628 // Should compact transfer be used in which floats replace doubles
629 // reducing the bandwidth requirement at the expense of some loss
631 const Foam::debug::optimisationSwitch
632 Foam::Pstream::floatTransfer
639 // Number of processors at which the reduce algorithm changes from linear to
641 const Foam::debug::optimisationSwitch
642 Foam::Pstream::nProcsSimpleSum
649 const Foam::debug::optimisationSwitch
650 Foam::Pstream::defaultCommsType
656 "blocking, nonBlocking, scheduled"
660 // ************************************************************************* //