UPstream.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | Copyright (C) 2011-2015 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include "UPstream.H"
27 #include "debug.H"
28 #include "registerSwitch.H"
29 #include "dictionary.H"
30 #include "IOstreams.H"
31 
32 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
33 
34 namespace Foam
35 {
36  defineTypeNameAndDebug(UPstream, 0);
37 
38  template<>
39  const char* Foam::NamedEnum
40  <
42  3
43  >::names[] =
44  {
45  "blocking",
46  "scheduled",
47  "nonBlocking"
48  };
49 }
50 
51 
54 
55 
56 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
57 
58 void Foam::UPstream::setParRun(const label nProcs)
59 {
60  parRun_ = true;
61 
62  // Redo worldComm communicator (this has been created at static
63  // initialisation time)
64  freeCommunicator(UPstream::worldComm);
65  label comm = allocateCommunicator(-1, identity(nProcs), true);
66  if (comm != UPstream::worldComm)
67  {
68  FatalErrorIn("UPstream::setParRun(const label)")
69  << "problem : comm:" << comm
70  << " UPstream::worldComm:" << UPstream::worldComm
72  }
73 
74  Pout.prefix() = '[' + name(myProcNo(Pstream::worldComm)) + "] ";
75  Perr.prefix() = '[' + name(myProcNo(Pstream::worldComm)) + "] ";
76 }
77 
78 
79 Foam::List<Foam::UPstream::commsStruct> Foam::UPstream::calcLinearComm
80 (
81  const label nProcs
82 )
83 {
84  List<commsStruct> linearCommunication(nProcs);
85 
86  // Master
87  labelList belowIDs(nProcs - 1);
88  forAll(belowIDs, i)
89  {
90  belowIDs[i] = i + 1;
91  }
92 
93  linearCommunication[0] = commsStruct
94  (
95  nProcs,
96  0,
97  -1,
98  belowIDs,
99  labelList(0)
100  );
101 
102  // Slaves. Have no below processors, only communicate up to master
103  for (label procID = 1; procID < nProcs; procID++)
104  {
105  linearCommunication[procID] = commsStruct
106  (
107  nProcs,
108  procID,
109  0,
110  labelList(0),
111  labelList(0)
112  );
113  }
114  return linearCommunication;
115 }
116 
117 
118 // Append my children (and my children children etc.) to allReceives.
119 void Foam::UPstream::collectReceives
120 (
121  const label procID,
122  const List<DynamicList<label> >& receives,
123  DynamicList<label>& allReceives
124 )
125 {
126  const DynamicList<label>& myChildren = receives[procID];
127 
128  forAll(myChildren, childI)
129  {
130  allReceives.append(myChildren[childI]);
131  collectReceives(myChildren[childI], receives, allReceives);
132  }
133 }
134 
135 
136 // Tree like schedule. For 8 procs:
137 // (level 0)
138 // 0 receives from 1
139 // 2 receives from 3
140 // 4 receives from 5
141 // 6 receives from 7
142 // (level 1)
143 // 0 receives from 2
144 // 4 receives from 6
145 // (level 2)
146 // 0 receives from 4
147 //
148 // The sends/receives for all levels are collected per processor (one send per
149 // processor; multiple receives possible) creating a table:
150 //
151 // So per processor:
152 // proc receives from sends to
153 // ---- ------------- --------
154 // 0 1,2,4 -
155 // 1 - 0
156 // 2 3 0
157 // 3 - 2
158 // 4 5 0
159 // 5 - 4
160 // 6 7 4
161 // 7 - 6
162 Foam::List<Foam::UPstream::commsStruct> Foam::UPstream::calcTreeComm
163 (
164  label nProcs
165 )
166 {
167  label nLevels = 1;
168  while ((1 << nLevels) < nProcs)
169  {
170  nLevels++;
171  }
172 
173  List<DynamicList<label> > receives(nProcs);
174  labelList sends(nProcs, -1);
175 
176  // Info<< "Using " << nLevels << " communication levels" << endl;
177 
178  label offset = 2;
179  label childOffset = offset/2;
180 
181  for (label level = 0; level < nLevels; level++)
182  {
183  label receiveID = 0;
184  while (receiveID < nProcs)
185  {
186  // Determine processor that sends and we receive from
187  label sendID = receiveID + childOffset;
188 
189  if (sendID < nProcs)
190  {
191  receives[receiveID].append(sendID);
192  sends[sendID] = receiveID;
193  }
194 
195  receiveID += offset;
196  }
197 
198  offset <<= 1;
199  childOffset <<= 1;
200  }
201 
202  // For all processors find the processors it receives data from
203  // (and the processors they receive data from etc.)
204  List<DynamicList<label> > allReceives(nProcs);
205  for (label procID = 0; procID < nProcs; procID++)
206  {
207  collectReceives(procID, receives, allReceives[procID]);
208  }
209 
210 
211  List<commsStruct> treeCommunication(nProcs);
212 
213  for (label procID = 0; procID < nProcs; procID++)
214  {
215  treeCommunication[procID] = commsStruct
216  (
217  nProcs,
218  procID,
219  sends[procID],
220  receives[procID].shrink(),
221  allReceives[procID].shrink()
222  );
223  }
224  return treeCommunication;
225 }
226 
227 
229 (
230  const label parentIndex,
231  const labelList& subRanks,
232  const bool doPstream
233 )
234 {
235  label index;
236  if (!freeComms_.empty())
237  {
238  index = freeComms_.pop();
239  }
240  else
241  {
242  // Extend storage
243  index = parentCommunicator_.size();
244 
245  myProcNo_.append(-1);
246  procIDs_.append(List<int>(0));
247  parentCommunicator_.append(-1);
248  linearCommunication_.append(List<commsStruct>(0));
249  treeCommunication_.append(List<commsStruct>(0));
250  }
251 
252  if (debug)
253  {
254  Pout<< "Communicators : Allocating communicator " << index << endl
255  << " parent : " << parentIndex << endl
256  << " procs : " << subRanks << endl
257  << endl;
258  }
259 
260  // Initialise; overwritten by allocatePstreamCommunicator
261  myProcNo_[index] = 0;
262 
263  // Convert from label to int
264  procIDs_[index].setSize(subRanks.size());
265  forAll(procIDs_[index], i)
266  {
267  procIDs_[index][i] = subRanks[i];
268 
269  // Enforce incremental order (so index is rank in next communicator)
270  if (i >= 1 && subRanks[i] <= subRanks[i-1])
271  {
273  (
274  "UPstream::allocateCommunicator"
275  "(const label, const labelList&, const bool)"
276  ) << "subranks not sorted : " << subRanks
277  << " when allocating subcommunicator from parent "
278  << parentIndex
280  }
281  }
282  parentCommunicator_[index] = parentIndex;
283 
284  linearCommunication_[index] = calcLinearComm(procIDs_[index].size());
285  treeCommunication_[index] = calcTreeComm(procIDs_[index].size());
286 
287 
288  if (doPstream && parRun())
289  {
290  allocatePstreamCommunicator(parentIndex, index);
291  }
292 
293  return index;
294 }
295 
296 
298 (
299  const label communicator,
300  const bool doPstream
301 )
302 {
303  if (debug)
304  {
305  Pout<< "Communicators : Freeing communicator " << communicator << endl
306  << " parent : " << parentCommunicator_[communicator] << endl
307  << " myProcNo : " << myProcNo_[communicator] << endl
308  << endl;
309  }
310 
311  if (doPstream && parRun())
312  {
313  freePstreamCommunicator(communicator);
314  }
315  myProcNo_[communicator] = -1;
316  //procIDs_[communicator].clear();
317  parentCommunicator_[communicator] = -1;
318  linearCommunication_[communicator].clear();
319  treeCommunication_[communicator].clear();
320 
321  freeComms_.push(communicator);
322 }
323 
324 
325 void Foam::UPstream::freeCommunicators(const bool doPstream)
326 {
327  forAll(myProcNo_, communicator)
328  {
329  if (myProcNo_[communicator] != -1)
330  {
331  freeCommunicator(communicator, doPstream);
332  }
333  }
334 }
335 
336 
337 int Foam::UPstream::baseProcNo(const label myComm, const int myProcID)
338 {
339  int procID = myProcID;
340  label comm = myComm;
341 
342  while (parent(comm) != -1)
343  {
344  const List<int>& parentRanks = UPstream::procID(comm);
345  procID = parentRanks[procID];
346  comm = UPstream::parent(comm);
347  }
348 
349  return procID;
350 }
351 
352 
353 Foam::label Foam::UPstream::procNo(const label myComm, const int baseProcID)
354 {
355  const List<int>& parentRanks = procID(myComm);
356  label parentComm = parent(myComm);
357 
358  if (parentComm == -1)
359  {
360  return findIndex(parentRanks, baseProcID);
361  }
362  else
363  {
364  label parentRank = procNo(parentComm, baseProcID);
365  return findIndex(parentRanks, parentRank);
366  }
367 }
368 
369 
371 (
372  const label myComm,
373  const label currentComm,
374  const int currentProcID
375 )
376 {
377  label physProcID = UPstream::baseProcNo(currentComm, currentProcID);
378  return procNo(myComm, physProcID);
379 }
380 
381 
382 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
383 
384 // By default this is not a parallel run
385 bool Foam::UPstream::parRun_(false);
386 
387 // Free communicators
388 Foam::LIFOStack<Foam::label> Foam::UPstream::freeComms_;
389 
390 // My processor number
391 Foam::DynamicList<int> Foam::UPstream::myProcNo_(10);
392 
393 // List of process IDs
394 Foam::DynamicList<Foam::List<int> > Foam::UPstream::procIDs_(10);
395 
396 // Parent communicator
397 Foam::DynamicList<Foam::label> Foam::UPstream::parentCommunicator_(10);
398 
399 // Standard transfer message type
400 int Foam::UPstream::msgType_(1);
401 
402 // Linear communication schedule
404 Foam::UPstream::linearCommunication_(10);
405 
406 // Multi level communication schedule
408 Foam::UPstream::treeCommunication_(10);
409 
410 
411 // Allocate a serial communicator. This gets overwritten in parallel mode
412 // (by UPstream::setParRun())
414 (
415  -1,
417  false
418 );
419 
420 
421 
422 // Should compact transfer be used in which floats replace doubles
423 // reducing the bandwidth requirement at the expense of some loss
424 // in accuracy
426 (
427  Foam::debug::optimisationSwitch("floatTransfer", 0)
428 );
430 (
431  "floatTransfer",
432  bool,
433  Foam::UPstream::floatTransfer
434 );
435 
436 // Number of processors at which the reduce algorithm changes from linear to
437 // tree
439 (
440  Foam::debug::optimisationSwitch("nProcsSimpleSum", 16)
441 );
443 (
444  "nProcsSimpleSum",
445  int,
446  Foam::UPstream::nProcsSimpleSum
447 );
448 
449 // Default commsType
451 (
452  commsTypeNames.read(Foam::debug::optimisationSwitches().lookup("commsType"))
453 );
454 
455 namespace Foam
456 {
457  // Register re-reader
459  :
461  {
462  public:
463 
464  addcommsTypeToOpt(const char* name)
465  :
467  {}
468 
470  {}
471 
472  virtual void readData(Foam::Istream& is)
473  {
475  (
476  is
477  );
478  }
479 
480  virtual void writeData(Foam::Ostream& os) const
481  {
483  }
484  };
485 
487 }
488 
489 // Default communicator
491 
492 
493 // Warn for use of any communicator
495 
496 
497 // Number of polling cycles in processor updates
499 (
500  Foam::debug::optimisationSwitch("nPollProcInterfaces", 0)
501 );
503 (
504  "nPollProcInterfaces",
505  int,
506  Foam::UPstream::nPollProcInterfaces
507 );
508 
509 
510 // ************************************************************************* //
const string & prefix() const
Return the prefix of the stream.
Helper class for allocating/freeing communicators.
Definition: UPstream.H:297
A LIFO stack based on a singly-linked list.
Definition: LIFOStack.H:51
label findIndex(const ListType &, typename ListType::const_reference, const label start=0)
Find first occurence of given element and return index,.
word name(const complex &)
Return a string representation of a complex.
Definition: complex.C:47
virtual ~addcommsTypeToOpt()
Definition: UPstream.C:469
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
Definition: UPstream.C:229
static int baseProcNo(const label myComm, const int procID)
Return physical processor number (i.e. processor number in.
Definition: UPstream.C:337
An Istream is an abstract base class for all input systems (streams, files, token lists etc)...
Definition: Istream.H:57
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:298
void size(const label)
Override size to be inconsistent with allocated storage.
Definition: ListI.H:76
DynamicList< T, SizeInc, SizeMult, SizeDiv > & append(const T &)
Append an element at the end of the list.
Definition: DynamicListI.H:310
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
prefixOSstream Perr(cerr,"Perr")
Definition: IOstreams.H:54
virtual void readData(Foam::Istream &is)
Read.
Definition: UPstream.C:472
addcommsTypeToOpt(const char *name)
Definition: UPstream.C:464
static bool floatTransfer
Should compact transfer be used in which floats replace doubles.
Definition: UPstream.H:248
static label parent(const label communicator)
Definition: UPstream.H:409
Namespace for OpenFOAM.
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: HashTable.H:59
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:251
static List< int > & procID(label communicator)
Process ID of given process index.
Definition: UPstream.H:415
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:264
commsTypes
Types of communications.
Definition: UPstream.H:64
#define forAll(list, i)
Definition: UList.H:421
virtual void writeData(Foam::Ostream &os) const
Write.
Definition: UPstream.C:480
addcommsTypeToOpt addcommsTypeToOpt_("commsType")
dictionary & optimisationSwitches()
The OptimisationSwitches sub-dictionary in the central controlDict.
Definition: debug.C:159
labelList identity(const label len)
Create identity map (map[i] == i) of given length.
Definition: ListOps.C:104
void addOptimisationObject(const char *name, simpleRegIOobject *obj)
Register optimisation switch read/write object.
Definition: debug.C:235
errorManip< error > abort(error &err)
Definition: errorManip.H:131
registerOptSwitch("floatTransfer", bool, Foam::UPstream::floatTransfer)
Initialise the NamedEnum HashTable from the static list of names.
Definition: NamedEnum.H:52
Abstract base class for registered object with I/O. Used in debug symbol registration.
#define FatalErrorIn(functionName)
Report an error message using Foam::FatalError.
Definition: error.H:314
ITstream & lookup(const word &, bool recursive=false, bool patternMatch=true) const
Find and return an entry data stream.
Definition: dictionary.C:452
error FatalError
static int nPollProcInterfaces
Number of polling cycles in processor updates.
Definition: UPstream.H:258
List< label > labelList
A List of labels.
Definition: labelList.H:56
static const NamedEnum< commsTypes, 3 > commsTypeNames
Definition: UPstream.H:71
Foam::UPstream::communicator serialComm(-1, Foam::labelList(1, Foam::label(0)), false)
An Ostream is an abstract base class for all output systems (streams, files, token lists...
Definition: Ostream.H:53
void append(const T &)
Append an element at the end of the list.
Definition: ListI.H:97
static label procNo(const label comm, const int baseProcID)
Return processor number in communicator (given physical processor.
Definition: UPstream.C:353
static commsTypes defaultCommsType
Default commsType.
Definition: UPstream.H:255
int optimisationSwitch(const char *name, const int defaultValue=0)
Lookup optimisation switch or add default value.
Definition: debug.C:183
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:261
static void freeCommunicators(const bool doPstream)
Free all communicators.
Definition: UPstream.C:325
Enum read(Istream &) const
Read a word from Istream and return the corresponding.
Definition: NamedEnum.C:61
defineTypeNameAndDebug(combustionModel, 0)
prefixOSstream Pout(cout,"Pout")
Definition: IOstreams.H:53
static int nProcsSimpleSum
Number of processors at which the sum algorithm changes from linear.
Definition: UPstream.H:252