UPstream.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration | Website: https://openfoam.org
5  \\ / A nd | Copyright (C) 2011-2023 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include "UPstream.H"
27 #include "debug.H"
28 #include "dictionary.H"
29 #include "IOstreams.H"
30 
31 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
32 
33 namespace Foam
34 {
36 
37  template<>
39  {
40  "blocking",
41  "scheduled",
42  "nonBlocking"
43  };
44 }
45 
46 
49 
50 
51 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
52 
53 void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads)
54 {
55  if (nProcs == 0)
56  {
57  parRun_ = false;
58  haveThreads_ = haveThreads;
59 
61  label comm = allocateCommunicator(-1, labelList(1, label(0)), false);
62  if (comm != UPstream::worldComm)
63  {
64  FatalErrorIn("UPstream::setParRun(const label)")
65  << "problem : comm:" << comm
66  << " UPstream::worldComm:" << UPstream::worldComm
68  }
69 
70  Pout.prefix() = "";
71  Perr.prefix() = "";
72  }
73  else
74  {
75  parRun_ = true;
76  haveThreads_ = haveThreads;
77 
78  // Redo worldComm communicator (this has been created at static
79  // initialisation time)
81  label comm = allocateCommunicator(-1, identityMap(nProcs), true);
82  if (comm != UPstream::worldComm)
83  {
85  << "problem : comm:" << comm
86  << " UPstream::worldComm:" << UPstream::worldComm
88  }
89 
90  Pout.prefix() = '[' + name(myProcNo(Pstream::worldComm)) + "] ";
91  Perr.prefix() = '[' + name(myProcNo(Pstream::worldComm)) + "] ";
92  }
93 }
94 
95 
96 Foam::List<Foam::UPstream::commsStruct> Foam::UPstream::calcLinearComm
97 (
98  const label nProcs
99 )
100 {
101  List<commsStruct> linearCommunication(nProcs);
102 
103  // Master
104  labelList belowIDs(nProcs - 1);
105  forAll(belowIDs, i)
106  {
107  belowIDs[i] = i + 1;
108  }
109 
110  linearCommunication[0] = commsStruct
111  (
112  nProcs,
113  0,
114  -1,
115  belowIDs,
116  labelList(0)
117  );
118 
119  // Slaves. Have no below processors, only communicate up to master
120  for (label procID = 1; procID < nProcs; procID++)
121  {
122  linearCommunication[procID] = commsStruct
123  (
124  nProcs,
125  procID,
126  0,
127  labelList(0),
128  labelList(0)
129  );
130  }
131  return linearCommunication;
132 }
133 
134 
135 void Foam::UPstream::collectReceives
136 (
137  const label procID,
138  const List<DynamicList<label>>& receives,
139  DynamicList<label>& allReceives
140 )
141 {
142  // Append my children (and my children children etc.) to allReceives.
143 
144  const DynamicList<label>& myChildren = receives[procID];
145 
146  forAll(myChildren, childI)
147  {
148  allReceives.append(myChildren[childI]);
149  collectReceives(myChildren[childI], receives, allReceives);
150  }
151 }
152 
153 
154 Foam::List<Foam::UPstream::commsStruct> Foam::UPstream::calcTreeComm
155 (
156  label nProcs
157 )
158 {
159  // Tree like schedule. For 8 procs:
160  // (level 0)
161  // 0 receives from 1
162  // 2 receives from 3
163  // 4 receives from 5
164  // 6 receives from 7
165  // (level 1)
166  // 0 receives from 2
167  // 4 receives from 6
168  // (level 2)
169  // 0 receives from 4
170  //
171  // The sends/receives for all levels are collected per processor
172  // (one send per processor; multiple receives possible) creating a table:
173  //
174  // So per processor:
175  // proc receives from sends to
176  // ---- ------------- --------
177  // 0 1,2,4 -
178  // 1 - 0
179  // 2 3 0
180  // 3 - 2
181  // 4 5 0
182  // 5 - 4
183  // 6 7 4
184  // 7 - 6
185 
186  label nLevels = 1;
187  while ((1 << nLevels) < nProcs)
188  {
189  nLevels++;
190  }
191 
192  List<DynamicList<label>> receives(nProcs);
193  labelList sends(nProcs, -1);
194 
195  // Info<< "Using " << nLevels << " communication levels" << endl;
196 
197  label offset = 2;
198  label childOffset = offset/2;
199 
200  for (label level = 0; level < nLevels; level++)
201  {
202  label receiveID = 0;
203  while (receiveID < nProcs)
204  {
205  // Determine processor that sends and we receive from
206  label sendID = receiveID + childOffset;
207 
208  if (sendID < nProcs)
209  {
210  receives[receiveID].append(sendID);
211  sends[sendID] = receiveID;
212  }
213 
214  receiveID += offset;
215  }
216 
217  offset <<= 1;
218  childOffset <<= 1;
219  }
220 
221  // For all processors find the processors it receives data from
222  // (and the processors they receive data from etc.)
223  List<DynamicList<label>> allReceives(nProcs);
224  for (label procID = 0; procID < nProcs; procID++)
225  {
226  collectReceives(procID, receives, allReceives[procID]);
227  }
228 
229 
230  List<commsStruct> treeCommunication(nProcs);
231 
232  for (label procID = 0; procID < nProcs; procID++)
233  {
234  treeCommunication[procID] = commsStruct
235  (
236  nProcs,
237  procID,
238  sends[procID],
239  receives[procID].shrink(),
240  allReceives[procID].shrink()
241  );
242  }
243  return treeCommunication;
244 }
245 
246 
248 (
249  const label parentIndex,
250  const labelList& subRanks,
251  const bool doPstream
252 )
253 {
254  label index;
255  if (!freeComms_.empty())
256  {
257  index = freeComms_.pop();
258  }
259  else
260  {
261  // Extend storage
262  index = parentCommunicator_.size();
263 
264  myProcNo_.append(-1);
265  procIndices_.append(List<int>(0));
266  parentCommunicator_.append(-1);
267  linearCommunication_.append(List<commsStruct>(0));
268  treeCommunication_.append(List<commsStruct>(0));
269  }
270 
271  if (debug)
272  {
273  Pout<< "Communicators : Allocating communicator " << index << endl
274  << " parent : " << parentIndex << endl
275  << " procs : " << subRanks << endl
276  << endl;
277  }
278 
279  // Initialise; overwritten by allocatePstreamCommunicator
280  myProcNo_[index] = 0;
281 
282  // Convert from label to int
283  procIndices_[index].setSize(subRanks.size());
284  forAll(procIndices_[index], i)
285  {
286  procIndices_[index][i] = subRanks[i];
287 
288  // Enforce incremental order (so index is rank in next communicator)
289  if (i >= 1 && subRanks[i] <= subRanks[i-1])
290  {
292  << "subranks not sorted : " << subRanks
293  << " when allocating subcommunicator from parent "
294  << parentIndex
296  }
297  }
298  parentCommunicator_[index] = parentIndex;
299 
300  linearCommunication_[index] = calcLinearComm(procIndices_[index].size());
301  treeCommunication_[index] = calcTreeComm(procIndices_[index].size());
302 
303 
304  if (doPstream && parRun())
305  {
306  allocatePstreamCommunicator(parentIndex, index);
307  }
308 
309  return index;
310 }
311 
312 
314 (
315  const label communicator,
316  const bool doPstream
317 )
318 {
319  if (debug)
320  {
321  Pout<< "Communicators : Freeing communicator " << communicator << endl
322  << " parent : " << parentCommunicator_[communicator] << endl
323  << " myProcNo : " << myProcNo_[communicator] << endl
324  << endl;
325  }
326 
327  if (doPstream && parRun())
328  {
329  freePstreamCommunicator(communicator);
330  }
331  myProcNo_[communicator] = -1;
332  // procIndices_[communicator].clear();
333  parentCommunicator_[communicator] = -1;
334  linearCommunication_[communicator].clear();
335  treeCommunication_[communicator].clear();
336 
337  freeComms_.push(communicator);
338 }
339 
340 
341 void Foam::UPstream::freeCommunicators(const bool doPstream)
342 {
343  forAll(myProcNo_, communicator)
344  {
345  if (myProcNo_[communicator] != -1)
346  {
347  freeCommunicator(communicator, doPstream);
348  }
349  }
350 }
351 
352 
353 int Foam::UPstream::baseProcNo(const label myComm, const int myProcID)
354 {
355  int procID = myProcID;
356  label comm = myComm;
357 
358  while (parent(comm) != -1)
359  {
360  const List<int>& parentRanks = UPstream::procID(comm);
361  procID = parentRanks[procID];
362  comm = UPstream::parent(comm);
363  }
364 
365  return procID;
366 }
367 
368 
369 Foam::label Foam::UPstream::procNo(const label myComm, const int baseProcID)
370 {
371  const List<int>& parentRanks = procID(myComm);
372  label parentComm = parent(myComm);
373 
374  if (parentComm == -1)
375  {
376  return findIndex(parentRanks, baseProcID);
377  }
378  else
379  {
380  label parentRank = procNo(parentComm, baseProcID);
381  return findIndex(parentRanks, parentRank);
382  }
383 }
384 
385 
387 (
388  const label myComm,
389  const label currentComm,
390  const int currentProcID
391 )
392 {
393  label physProcID = UPstream::baseProcNo(currentComm, currentProcID);
394  return procNo(myComm, physProcID);
395 }
396 
397 
398 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
399 
400 bool Foam::UPstream::parRun_(false);
401 
402 bool Foam::UPstream::haveThreads_(false);
403 
404 Foam::LIFOStack<Foam::label> Foam::UPstream::freeComms_;
405 
406 Foam::DynamicList<int> Foam::UPstream::myProcNo_(10);
407 
408 Foam::DynamicList<Foam::List<int>> Foam::UPstream::procIndices_(10);
409 
410 Foam::DynamicList<Foam::label> Foam::UPstream::parentCommunicator_(10);
411 
412 int Foam::UPstream::msgType_(1);
413 
414 
416 Foam::UPstream::linearCommunication_(10);
417 
419 Foam::UPstream::treeCommunication_(10);
420 
421 
422 // Allocate a serial communicator. This gets overwritten in parallel mode
423 // (by UPstream::setParRun())
425 (
426  -1,
428  false
429 );
430 
431 
433 (
434  Foam::debug::optimisationSwitch("floatTransfer", 0)
435 );
436 
438 (
439  Foam::debug::optimisationSwitch("nProcsSimpleSum", 16)
440 );
441 
443 (
445  (
446  "commsType",
447  commsTypeNames,
448  defaultCommsType
449  )
450 );
451 
453 
455 
457 (
458  Foam::debug::optimisationSwitch("nPollProcInterfaces", 0)
459 );
460 
461 
462 // ************************************************************************* //
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
Foam::UPstream::communicator serialComm(-1, Foam::labelList(1, Foam::label(0)), false)
#define forAll(list, i)
Loop across all elements in list.
Definition: UList.H:434
A LIFO stack based on a singly-linked list.
Definition: LIFOStack.H:54
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: List.H:91
void size(const label)
Override size to be inconsistent with allocated storage.
Definition: ListI.H:164
Initialise the NamedEnum HashTable from the static list of names.
Definition: NamedEnum.H:54
Helper class for allocating/freeing communicators.
Definition: UPstream.H:315
Inter-processor communications stream.
Definition: UPstream.H:59
static bool haveThreads()
Have support for threads.
Definition: UPstream.H:405
commsTypes
Types of communications.
Definition: UPstream.H:65
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:281
static void freeCommunicators(const bool doPstream)
Free all communicators.
Definition: UPstream.C:341
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:314
static bool floatTransfer
Should compact transfer be used in which floats replace doubles.
Definition: UPstream.H:265
static List< int > & procID(label communicator)
Process ID of given process index.
Definition: UPstream.H:440
static const NamedEnum< commsTypes, 3 > commsTypeNames
Definition: UPstream.H:71
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:278
static label parent(const label communicator)
Definition: UPstream.H:434
static int nProcsSimpleSum
Number of processors at which the sum algorithm changes from linear.
Definition: UPstream.H:269
static int nPollProcInterfaces
Number of polling cycles in processor updates.
Definition: UPstream.H:275
static label nProcs(const label communicator=0)
Number of processes in parallel run.
Definition: UPstream.H:411
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
Definition: UPstream.C:248
static label procNo(const label comm, const int baseProcID)
Return processor number in communicator (given physical processor.
Definition: UPstream.C:369
static commsTypes defaultCommsType
Default commsType.
Definition: UPstream.H:272
static int myProcNo(const label communicator=0)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:429
static int baseProcNo(const label myComm, const int procID)
Return physical processor number (i.e. processor number in.
Definition: UPstream.C:353
const string & prefix() const
Return the prefix of the stream.
#define FatalErrorIn(functionName)
Report an error message using Foam::FatalError.
Definition: error.H:329
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:334
int optimisationSwitch(const char *name, const int defaultValue=0)
Lookup optimisation switch or add default value.
Definition: debug.C:255
Enum namedEnumOptimisationSwitch(const char *name, const NamedEnum< Enum, nEnum > &enumNames, const Enum defaultValue)
Lookup optimisation switch or add default value.
Definition: debug.H:94
Namespace for OpenFOAM.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
prefixOSstream Perr(cerr, "Perr")
Definition: IOstreams.H:54
List< label > labelList
A List of labels.
Definition: labelList.H:56
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:257
word name(const bool)
Return a word representation of a bool.
Definition: boolIO.C:39
errorManip< error > abort(error &err)
Definition: errorManip.H:131
defineTypeNameAndDebug(combustionModel, 0)
prefixOSstream Pout(cout, "Pout")
Definition: IOstreams.H:53
label findIndex(const ListType &, typename ListType::const_reference, const label start=0)
Find first occurrence of given element and return index,.
error FatalError
labelList identityMap(const label len)
Create identity map (map[i] == i) of given length.
Definition: ListOps.C:104
void offset(label &lst, const label o)