UPstream.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration | Website: https://openfoam.org
5  \\ / A nd | Copyright (C) 2011-2018 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include "UPstream.H"
27 #include "debug.H"
28 #include "registerSwitch.H"
29 #include "dictionary.H"
30 #include "IOstreams.H"
31 
32 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
33 
34 namespace Foam
35 {
36  defineTypeNameAndDebug(UPstream, 0);
37 
38  template<>
39  const char* Foam::NamedEnum
40  <
42  3
43  >::names[] =
44  {
45  "blocking",
46  "scheduled",
47  "nonBlocking"
48  };
49 }
50 
51 
54 
55 
56 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
57 
58 void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads)
59 {
60  if (nProcs == 0)
61  {
62  parRun_ = false;
63  haveThreads_ = haveThreads;
64 
65  freeCommunicator(UPstream::worldComm);
66  label comm = allocateCommunicator(-1, labelList(1, label(0)), false);
67  if (comm != UPstream::worldComm)
68  {
69  FatalErrorIn("UPstream::setParRun(const label)")
70  << "problem : comm:" << comm
71  << " UPstream::worldComm:" << UPstream::worldComm
73  }
74 
75  Pout.prefix() = "";
76  Perr.prefix() = "";
77  }
78  else
79  {
80  parRun_ = true;
81  haveThreads_ = haveThreads;
82 
83  // Redo worldComm communicator (this has been created at static
84  // initialisation time)
85  freeCommunicator(UPstream::worldComm);
86  label comm = allocateCommunicator(-1, identity(nProcs), true);
87  if (comm != UPstream::worldComm)
88  {
90  << "problem : comm:" << comm
91  << " UPstream::worldComm:" << UPstream::worldComm
93  }
94 
95  Pout.prefix() = '[' + name(myProcNo(Pstream::worldComm)) + "] ";
96  Perr.prefix() = '[' + name(myProcNo(Pstream::worldComm)) + "] ";
97  }
98 }
99 
100 
101 Foam::List<Foam::UPstream::commsStruct> Foam::UPstream::calcLinearComm
102 (
103  const label nProcs
104 )
105 {
106  List<commsStruct> linearCommunication(nProcs);
107 
108  // Master
109  labelList belowIDs(nProcs - 1);
110  forAll(belowIDs, i)
111  {
112  belowIDs[i] = i + 1;
113  }
114 
115  linearCommunication[0] = commsStruct
116  (
117  nProcs,
118  0,
119  -1,
120  belowIDs,
121  labelList(0)
122  );
123 
124  // Slaves. Have no below processors, only communicate up to master
125  for (label procID = 1; procID < nProcs; procID++)
126  {
127  linearCommunication[procID] = commsStruct
128  (
129  nProcs,
130  procID,
131  0,
132  labelList(0),
133  labelList(0)
134  );
135  }
136  return linearCommunication;
137 }
138 
139 
140 void Foam::UPstream::collectReceives
141 (
142  const label procID,
143  const List<DynamicList<label>>& receives,
144  DynamicList<label>& allReceives
145 )
146 {
147  // Append my children (and my children children etc.) to allReceives.
148 
149  const DynamicList<label>& myChildren = receives[procID];
150 
151  forAll(myChildren, childI)
152  {
153  allReceives.append(myChildren[childI]);
154  collectReceives(myChildren[childI], receives, allReceives);
155  }
156 }
157 
158 
159 Foam::List<Foam::UPstream::commsStruct> Foam::UPstream::calcTreeComm
160 (
161  label nProcs
162 )
163 {
164  // Tree like schedule. For 8 procs:
165  // (level 0)
166  // 0 receives from 1
167  // 2 receives from 3
168  // 4 receives from 5
169  // 6 receives from 7
170  // (level 1)
171  // 0 receives from 2
172  // 4 receives from 6
173  // (level 2)
174  // 0 receives from 4
175  //
176  // The sends/receives for all levels are collected per processor
177  // (one send per processor; multiple receives possible) creating a table:
178  //
179  // So per processor:
180  // proc receives from sends to
181  // ---- ------------- --------
182  // 0 1,2,4 -
183  // 1 - 0
184  // 2 3 0
185  // 3 - 2
186  // 4 5 0
187  // 5 - 4
188  // 6 7 4
189  // 7 - 6
190 
191  label nLevels = 1;
192  while ((1 << nLevels) < nProcs)
193  {
194  nLevels++;
195  }
196 
197  List<DynamicList<label>> receives(nProcs);
198  labelList sends(nProcs, -1);
199 
200  // Info<< "Using " << nLevels << " communication levels" << endl;
201 
202  label offset = 2;
203  label childOffset = offset/2;
204 
205  for (label level = 0; level < nLevels; level++)
206  {
207  label receiveID = 0;
208  while (receiveID < nProcs)
209  {
210  // Determine processor that sends and we receive from
211  label sendID = receiveID + childOffset;
212 
213  if (sendID < nProcs)
214  {
215  receives[receiveID].append(sendID);
216  sends[sendID] = receiveID;
217  }
218 
219  receiveID += offset;
220  }
221 
222  offset <<= 1;
223  childOffset <<= 1;
224  }
225 
226  // For all processors find the processors it receives data from
227  // (and the processors they receive data from etc.)
228  List<DynamicList<label>> allReceives(nProcs);
229  for (label procID = 0; procID < nProcs; procID++)
230  {
231  collectReceives(procID, receives, allReceives[procID]);
232  }
233 
234 
235  List<commsStruct> treeCommunication(nProcs);
236 
237  for (label procID = 0; procID < nProcs; procID++)
238  {
239  treeCommunication[procID] = commsStruct
240  (
241  nProcs,
242  procID,
243  sends[procID],
244  receives[procID].shrink(),
245  allReceives[procID].shrink()
246  );
247  }
248  return treeCommunication;
249 }
250 
251 
253 (
254  const label parentIndex,
255  const labelList& subRanks,
256  const bool doPstream
257 )
258 {
259  label index;
260  if (!freeComms_.empty())
261  {
262  index = freeComms_.pop();
263  }
264  else
265  {
266  // Extend storage
267  index = parentCommunicator_.size();
268 
269  myProcNo_.append(-1);
270  procIDs_.append(List<int>(0));
271  parentCommunicator_.append(-1);
272  linearCommunication_.append(List<commsStruct>(0));
273  treeCommunication_.append(List<commsStruct>(0));
274  }
275 
276  if (debug)
277  {
278  Pout<< "Communicators : Allocating communicator " << index << endl
279  << " parent : " << parentIndex << endl
280  << " procs : " << subRanks << endl
281  << endl;
282  }
283 
284  // Initialise; overwritten by allocatePstreamCommunicator
285  myProcNo_[index] = 0;
286 
287  // Convert from label to int
288  procIDs_[index].setSize(subRanks.size());
289  forAll(procIDs_[index], i)
290  {
291  procIDs_[index][i] = subRanks[i];
292 
293  // Enforce incremental order (so index is rank in next communicator)
294  if (i >= 1 && subRanks[i] <= subRanks[i-1])
295  {
297  << "subranks not sorted : " << subRanks
298  << " when allocating subcommunicator from parent "
299  << parentIndex
301  }
302  }
303  parentCommunicator_[index] = parentIndex;
304 
305  linearCommunication_[index] = calcLinearComm(procIDs_[index].size());
306  treeCommunication_[index] = calcTreeComm(procIDs_[index].size());
307 
308 
309  if (doPstream && parRun())
310  {
311  allocatePstreamCommunicator(parentIndex, index);
312  }
313 
314  return index;
315 }
316 
317 
319 (
320  const label communicator,
321  const bool doPstream
322 )
323 {
324  if (debug)
325  {
326  Pout<< "Communicators : Freeing communicator " << communicator << endl
327  << " parent : " << parentCommunicator_[communicator] << endl
328  << " myProcNo : " << myProcNo_[communicator] << endl
329  << endl;
330  }
331 
332  if (doPstream && parRun())
333  {
334  freePstreamCommunicator(communicator);
335  }
336  myProcNo_[communicator] = -1;
337  // procIDs_[communicator].clear();
338  parentCommunicator_[communicator] = -1;
339  linearCommunication_[communicator].clear();
340  treeCommunication_[communicator].clear();
341 
342  freeComms_.push(communicator);
343 }
344 
345 
346 void Foam::UPstream::freeCommunicators(const bool doPstream)
347 {
348  forAll(myProcNo_, communicator)
349  {
350  if (myProcNo_[communicator] != -1)
351  {
352  freeCommunicator(communicator, doPstream);
353  }
354  }
355 }
356 
357 
358 int Foam::UPstream::baseProcNo(const label myComm, const int myProcID)
359 {
360  int procID = myProcID;
361  label comm = myComm;
362 
363  while (parent(comm) != -1)
364  {
365  const List<int>& parentRanks = UPstream::procID(comm);
366  procID = parentRanks[procID];
367  comm = UPstream::parent(comm);
368  }
369 
370  return procID;
371 }
372 
373 
374 Foam::label Foam::UPstream::procNo(const label myComm, const int baseProcID)
375 {
376  const List<int>& parentRanks = procID(myComm);
377  label parentComm = parent(myComm);
378 
379  if (parentComm == -1)
380  {
381  return findIndex(parentRanks, baseProcID);
382  }
383  else
384  {
385  label parentRank = procNo(parentComm, baseProcID);
386  return findIndex(parentRanks, parentRank);
387  }
388 }
389 
390 
392 (
393  const label myComm,
394  const label currentComm,
395  const int currentProcID
396 )
397 {
398  label physProcID = UPstream::baseProcNo(currentComm, currentProcID);
399  return procNo(myComm, physProcID);
400 }
401 
402 
403 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
404 
405 bool Foam::UPstream::parRun_(false);
406 
407 bool Foam::UPstream::haveThreads_(false);
408 
409 Foam::LIFOStack<Foam::label> Foam::UPstream::freeComms_;
410 
411 Foam::DynamicList<int> Foam::UPstream::myProcNo_(10);
412 
413 Foam::DynamicList<Foam::List<int>> Foam::UPstream::procIDs_(10);
414 
415 Foam::DynamicList<Foam::label> Foam::UPstream::parentCommunicator_(10);
416 
417 int Foam::UPstream::msgType_(1);
418 
419 
421 Foam::UPstream::linearCommunication_(10);
422 
424 Foam::UPstream::treeCommunication_(10);
425 
426 
427 // Allocate a serial communicator. This gets overwritten in parallel mode
428 // (by UPstream::setParRun())
430 (
431  -1,
433  false
434 );
435 
436 
438 (
439  Foam::debug::optimisationSwitch("floatTransfer", 0)
440 );
442 (
443  "floatTransfer",
444  bool,
445  Foam::UPstream::floatTransfer
446 );
447 
449 (
450  Foam::debug::optimisationSwitch("nProcsSimpleSum", 16)
451 );
453 (
454  "nProcsSimpleSum",
455  int,
456  Foam::UPstream::nProcsSimpleSum
457 );
458 
460 (
461  commsTypeNames.read(Foam::debug::optimisationSwitches().lookup("commsType"))
462 );
463 
464 namespace Foam
465 {
466  // Register re-reader
468  :
470  {
471  public:
472 
473  addcommsTypeToOpt(const char* name)
474  :
476  {}
477 
479  {}
480 
481  virtual void readData(Foam::Istream& is)
482  {
484  (
485  is
486  );
487  }
488 
489  virtual void writeData(Foam::Ostream& os) const
490  {
492  }
493  };
494 
496 }
497 
499 
501 
503 (
504  Foam::debug::optimisationSwitch("nPollProcInterfaces", 0)
505 );
507 (
508  "nPollProcInterfaces",
509  int,
510  Foam::UPstream::nPollProcInterfaces
511 );
512 
513 
514 // ************************************************************************* //
A LIFO stack based on a singly-linked list.
Definition: LIFOStack.H:51
static bool floatTransfer
Should compact transfer be used in which floats replace doubles.
Definition: UPstream.H:265
Abstract base class for registered object with I/O. Used in debug symbol registration.
#define forAll(list, i)
Loop across all elements in list.
Definition: UList.H:434
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
addcommsTypeToOpt addcommsTypeToOpt_("commsType")
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
error FatalError
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:319
Foam::UPstream::communicator serialComm(-1, Foam::labelList(1, Foam::label(0)), false)
commsTypes
Types of communications.
Definition: UPstream.H:64
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: HashTable.H:59
An Istream is an abstract base class for all input systems (streams, files, token lists etc)...
Definition: Istream.H:57
void size(const label)
Override size to be inconsistent with allocated storage.
Definition: ListI.H:163
static int nProcsSimpleSum
Number of processors at which the sum algorithm changes from linear.
Definition: UPstream.H:269
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:256
int optimisationSwitch(const char *name, const int defaultValue=0)
Lookup optimisation switch or add default value.
Definition: debug.C:196
static label procNo(const label comm, const int baseProcID)
Return processor number in communicator (given physical processor.
Definition: UPstream.C:374
labelList identity(const label len)
Create identity map (map[i] == i) of given length.
Definition: ListOps.C:104
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:278
Initialise the NamedEnum HashTable from the static list of names.
Definition: NamedEnum.H:51
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
static void freeCommunicators(const bool doPstream)
Free all communicators.
Definition: UPstream.C:346
const string & prefix() const
Return the prefix of the stream.
static int nPollProcInterfaces
Number of polling cycles in processor updates.
Definition: UPstream.H:275
static label parent(const label communicator)
Definition: UPstream.H:434
virtual void writeData(Foam::Ostream &os) const
Write.
Definition: UPstream.C:489
virtual void readData(Foam::Istream &is)
Read.
Definition: UPstream.C:481
void append(const T &)
Append an element at the end of the list.
Definition: ListI.H:177
DynamicList< T, SizeInc, SizeMult, SizeDiv > & append(const T &)
Append an element at the end of the list.
Definition: DynamicListI.H:296
List< label > labelList
A List of labels.
Definition: labelList.H:56
registerOptSwitch("floatTransfer", bool, Foam::UPstream::floatTransfer)
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:281
errorManip< error > abort(error &err)
Definition: errorManip.H:131
An Ostream is an abstract base class for all output systems (streams, files, token lists...
Definition: Ostream.H:53
static int baseProcNo(const label myComm, const int procID)
Return physical processor number (i.e. processor number in.
Definition: UPstream.C:358
defineTypeNameAndDebug(combustionModel, 0)
prefixOSstream Perr(cerr, "Perr")
Definition: IOstreams.H:54
static const NamedEnum< commsTypes, 3 > commsTypeNames
Definition: UPstream.H:71
label findIndex(const ListType &, typename ListType::const_reference, const label start=0)
Find first occurrence of given element and return index,.
word name(const complex &)
Return a string representation of a complex.
Definition: complex.C:47
Helper class for allocating/freeing communicators.
Definition: UPstream.H:314
static commsTypes defaultCommsType
Default commsType.
Definition: UPstream.H:272
prefixOSstream Pout(cout, "Pout")
Definition: IOstreams.H:53
Enum read(Istream &) const
Read a word from Istream and return the corresponding.
Definition: NamedEnum.C:61
dictionary & optimisationSwitches()
The OptimisationSwitches sub-dictionary in the central controlDict.
Definition: debug.C:172
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
Definition: UPstream.C:253
void addOptimisationObject(const char *name, simpleRegIOobject *obj)
Register optimisation switch read/write object.
Definition: debug.C:261
addcommsTypeToOpt(const char *name)
Definition: UPstream.C:473
virtual ~addcommsTypeToOpt()
Definition: UPstream.C:478
#define FatalErrorIn(functionName)
Report an error message using Foam::FatalError.
Definition: error.H:314
static List< int > & procID(label communicator)
Process ID of given process index.
Definition: UPstream.H:440
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:319
Namespace for OpenFOAM.
ITstream & lookup(const word &, bool recursive=false, bool patternMatch=true) const
Find and return an entry data stream.
Definition: dictionary.C:583