UPstream.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration | Website: https://openfoam.org
5  \\ / A nd | Copyright (C) 2011-2025 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include "UPstream.H"
27 #include "debug.H"
28 #include "dictionary.H"
29 #include "IOstreams.H"
30 
31 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
32 
33 namespace Foam
34 {
36 }
37 
38 
41 {
42  "blocking",
43  "scheduled",
44  "nonBlocking"
45 };
46 
47 
48 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
49 
50 void Foam::UPstream::setParRun(const label nProcs, const bool haveThreads)
51 {
52  if (nProcs == 0)
53  {
54  parRun_ = false;
55  haveThreads_ = haveThreads;
56 
58  label comm = allocateCommunicator(-1, labelList(1, label(0)), false);
59  if (comm != UPstream::worldComm)
60  {
61  FatalErrorIn("UPstream::setParRun(const label)")
62  << "problem : comm:" << comm
63  << " UPstream::worldComm:" << UPstream::worldComm
65  }
66 
67  Pout.prefix() = "";
68  Perr.prefix() = "";
69  }
70  else
71  {
72  parRun_ = true;
73  haveThreads_ = haveThreads;
74 
75  // Redo worldComm communicator (this has been created at static
76  // initialisation time)
78  label comm = allocateCommunicator(-1, identityMap(nProcs), true);
79  if (comm != UPstream::worldComm)
80  {
82  << "problem : comm:" << comm
83  << " UPstream::worldComm:" << UPstream::worldComm
85  }
86 
87  Pout.prefix() = '[' + name(myProcNo(Pstream::worldComm)) + "] ";
88  Perr.prefix() = '[' + name(myProcNo(Pstream::worldComm)) + "] ";
89  }
90 }
91 
92 
93 Foam::List<Foam::UPstream::commsStruct> Foam::UPstream::calcLinearComm
94 (
95  const label nProcs
96 )
97 {
98  List<commsStruct> linearCommunication(nProcs);
99 
100  // Master
101  labelList belowIDs(nProcs - 1);
102  forAll(belowIDs, i)
103  {
104  belowIDs[i] = i + 1;
105  }
106 
107  linearCommunication[0] = commsStruct
108  (
109  nProcs,
110  0,
111  -1,
112  belowIDs,
113  labelList(0)
114  );
115 
116  // Slaves. Have no below processors, only communicate up to master
117  for (label procID = 1; procID < nProcs; procID++)
118  {
119  linearCommunication[procID] = commsStruct
120  (
121  nProcs,
122  procID,
123  0,
124  labelList(0),
125  labelList(0)
126  );
127  }
128  return linearCommunication;
129 }
130 
131 
132 void Foam::UPstream::collectReceives
133 (
134  const label procID,
135  const List<DynamicList<label>>& receives,
136  DynamicList<label>& allReceives
137 )
138 {
139  // Append my children (and my children children etc.) to allReceives.
140 
141  const DynamicList<label>& myChildren = receives[procID];
142 
143  forAll(myChildren, childI)
144  {
145  allReceives.append(myChildren[childI]);
146  collectReceives(myChildren[childI], receives, allReceives);
147  }
148 }
149 
150 
151 Foam::List<Foam::UPstream::commsStruct> Foam::UPstream::calcTreeComm
152 (
153  label nProcs
154 )
155 {
156  // Tree like schedule. For 8 procs:
157  // (level 0)
158  // 0 receives from 1
159  // 2 receives from 3
160  // 4 receives from 5
161  // 6 receives from 7
162  // (level 1)
163  // 0 receives from 2
164  // 4 receives from 6
165  // (level 2)
166  // 0 receives from 4
167  //
168  // The sends/receives for all levels are collected per processor
169  // (one send per processor; multiple receives possible) creating a table:
170  //
171  // So per processor:
172  // proc receives from sends to
173  // ---- ------------- --------
174  // 0 1,2,4 -
175  // 1 - 0
176  // 2 3 0
177  // 3 - 2
178  // 4 5 0
179  // 5 - 4
180  // 6 7 4
181  // 7 - 6
182 
183  label nLevels = 1;
184  while ((1 << nLevels) < nProcs)
185  {
186  nLevels++;
187  }
188 
189  List<DynamicList<label>> receives(nProcs);
190  labelList sends(nProcs, -1);
191 
192  // Info<< "Using " << nLevels << " communication levels" << endl;
193 
194  label offset = 2;
195  label childOffset = offset/2;
196 
197  for (label level = 0; level < nLevels; level++)
198  {
199  label receiveID = 0;
200  while (receiveID < nProcs)
201  {
202  // Determine processor that sends and we receive from
203  label sendID = receiveID + childOffset;
204 
205  if (sendID < nProcs)
206  {
207  receives[receiveID].append(sendID);
208  sends[sendID] = receiveID;
209  }
210 
211  receiveID += offset;
212  }
213 
214  offset <<= 1;
215  childOffset <<= 1;
216  }
217 
218  // For all processors find the processors it receives data from
219  // (and the processors they receive data from etc.)
220  List<DynamicList<label>> allReceives(nProcs);
221  for (label procID = 0; procID < nProcs; procID++)
222  {
223  collectReceives(procID, receives, allReceives[procID]);
224  }
225 
226 
227  List<commsStruct> treeCommunication(nProcs);
228 
229  for (label procID = 0; procID < nProcs; procID++)
230  {
231  treeCommunication[procID] = commsStruct
232  (
233  nProcs,
234  procID,
235  sends[procID],
236  receives[procID].shrink(),
237  allReceives[procID].shrink()
238  );
239  }
240  return treeCommunication;
241 }
242 
243 
245 (
246  const label parentIndex,
247  const labelList& subRanks,
248  const bool doPstream
249 )
250 {
251  label index;
252  if (!freeComms_.empty())
253  {
254  index = freeComms_.pop();
255  }
256  else
257  {
258  // Extend storage
259  index = parentCommunicator_.size();
260 
261  myProcNo_.append(-1);
262  procIndices_.append(List<int>(0));
263  parentCommunicator_.append(-1);
264  linearCommunication_.append(List<commsStruct>(0));
265  treeCommunication_.append(List<commsStruct>(0));
266  }
267 
268  if (debug)
269  {
270  Pout<< "Communicators : Allocating communicator " << index << endl
271  << " parent : " << parentIndex << endl
272  << " procs : " << subRanks << endl
273  << endl;
274  }
275 
276  // Initialise; overwritten by allocatePstreamCommunicator
277  myProcNo_[index] = 0;
278 
279  // Convert from label to int
280  procIndices_[index].setSize(subRanks.size());
281  forAll(procIndices_[index], i)
282  {
283  procIndices_[index][i] = subRanks[i];
284 
285  // Enforce incremental order (so index is rank in next communicator)
286  if (i >= 1 && subRanks[i] <= subRanks[i-1])
287  {
289  << "subranks not sorted : " << subRanks
290  << " when allocating subcommunicator from parent "
291  << parentIndex
293  }
294  }
295  parentCommunicator_[index] = parentIndex;
296 
297  linearCommunication_[index] = calcLinearComm(procIndices_[index].size());
298  treeCommunication_[index] = calcTreeComm(procIndices_[index].size());
299 
300 
301  if (doPstream && parRun())
302  {
303  allocatePstreamCommunicator(parentIndex, index);
304  }
305 
306  return index;
307 }
308 
309 
311 (
312  const label communicator,
313  const bool doPstream
314 )
315 {
316  if (debug)
317  {
318  Pout<< "Communicators : Freeing communicator " << communicator << endl
319  << " parent : " << parentCommunicator_[communicator] << endl
320  << " myProcNo : " << myProcNo_[communicator] << endl
321  << endl;
322  }
323 
324  if (doPstream && parRun())
325  {
326  freePstreamCommunicator(communicator);
327  }
328  myProcNo_[communicator] = -1;
329  // procIndices_[communicator].clear();
330  parentCommunicator_[communicator] = -1;
331  linearCommunication_[communicator].clear();
332  treeCommunication_[communicator].clear();
333 
334  freeComms_.push(communicator);
335 }
336 
337 
338 void Foam::UPstream::freeCommunicators(const bool doPstream)
339 {
340  forAll(myProcNo_, communicator)
341  {
342  if (myProcNo_[communicator] != -1)
343  {
344  freeCommunicator(communicator, doPstream);
345  }
346  }
347 }
348 
349 
350 int Foam::UPstream::baseProcNo(const label myComm, const int myProcID)
351 {
352  int procID = myProcID;
353  label comm = myComm;
354 
355  while (parent(comm) != -1)
356  {
357  const List<int>& parentRanks = UPstream::procID(comm);
358  procID = parentRanks[procID];
359  comm = UPstream::parent(comm);
360  }
361 
362  return procID;
363 }
364 
365 
366 Foam::label Foam::UPstream::procNo(const label myComm, const int baseProcID)
367 {
368  const List<int>& parentRanks = procID(myComm);
369  label parentComm = parent(myComm);
370 
371  if (parentComm == -1)
372  {
373  return findIndex(parentRanks, baseProcID);
374  }
375  else
376  {
377  label parentRank = procNo(parentComm, baseProcID);
378  return findIndex(parentRanks, parentRank);
379  }
380 }
381 
382 
384 (
385  const label myComm,
386  const label currentComm,
387  const int currentProcID
388 )
389 {
390  label physProcID = UPstream::baseProcNo(currentComm, currentProcID);
391  return procNo(myComm, physProcID);
392 }
393 
394 
395 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
396 
397 bool Foam::UPstream::parRun_(false);
398 
399 bool Foam::UPstream::haveThreads_(false);
400 
401 Foam::LIFOStack<Foam::label> Foam::UPstream::freeComms_;
402 
403 Foam::DynamicList<int> Foam::UPstream::myProcNo_(10);
404 
405 Foam::DynamicList<Foam::List<int>> Foam::UPstream::procIndices_(10);
406 
407 Foam::DynamicList<Foam::label> Foam::UPstream::parentCommunicator_(10);
408 
409 int Foam::UPstream::msgType_(1);
410 
411 
413 Foam::UPstream::linearCommunication_(10);
414 
416 Foam::UPstream::treeCommunication_(10);
417 
418 
419 // Allocate a serial communicator. This gets overwritten in parallel mode
420 // (by UPstream::setParRun())
422 (
423  -1,
425  false
426 );
427 
428 
430 (
431  Foam::debug::optimisationSwitch("floatTransfer", 0)
432 );
433 
435 (
436  Foam::debug::optimisationSwitch("nProcsSimpleSum", 16)
437 );
438 
440 (
442  (
443  "commsType",
444  commsTypeNames,
445  defaultCommsType
446  )
447 );
448 
450 
452 
454 (
455  Foam::debug::optimisationSwitch("nPollProcInterfaces", 0)
456 );
457 
458 
459 // ************************************************************************* //
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
Foam::UPstream::communicator serialComm(-1, Foam::labelList(1, Foam::label(0)), false)
#define forAll(list, i)
Loop across all elements in list.
Definition: UList.H:433
A LIFO stack based on a singly-linked list.
Definition: LIFOStack.H:54
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: List.H:91
void size(const label)
Override size to be inconsistent with allocated storage.
Definition: ListI.H:164
Initialise the NamedEnum HashTable from the static list of names.
Definition: NamedEnum.H:55
Helper class for allocating/freeing communicators.
Definition: UPstream.H:315
Inter-processor communications stream.
Definition: UPstream.H:59
static bool haveThreads()
Have support for threads.
Definition: UPstream.H:405
commsTypes
Types of communications.
Definition: UPstream.H:65
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:281
static void freeCommunicators(const bool doPstream)
Free all communicators.
Definition: UPstream.C:338
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:311
static bool floatTransfer
Should compact transfer be used in which floats replace doubles.
Definition: UPstream.H:265
static List< int > & procID(label communicator)
Process ID of given process index.
Definition: UPstream.H:440
static const NamedEnum< commsTypes, 3 > commsTypeNames
Definition: UPstream.H:71
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:278
static label parent(const label communicator)
Definition: UPstream.H:434
static int nProcsSimpleSum
Number of processors at which the sum algorithm changes from linear.
Definition: UPstream.H:269
static int nPollProcInterfaces
Number of polling cycles in processor updates.
Definition: UPstream.H:275
static label nProcs(const label communicator=0)
Number of processes in parallel run.
Definition: UPstream.H:411
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
Definition: UPstream.C:245
static label procNo(const label comm, const int baseProcID)
Return processor number in communicator (given physical processor.
Definition: UPstream.C:366
static commsTypes defaultCommsType
Default commsType.
Definition: UPstream.H:272
static int myProcNo(const label communicator=0)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:429
static int baseProcNo(const label myComm, const int procID)
Return physical processor number (i.e. processor number in.
Definition: UPstream.C:350
const string & prefix() const
Return the prefix of the stream.
#define FatalErrorIn(functionName)
Report an error message using Foam::FatalError.
Definition: error.H:329
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:334
int optimisationSwitch(const char *name, const int defaultValue=0)
Lookup optimisation switch or add default value.
Definition: debug.C:249
Enum namedEnumOptimisationSwitch(const char *name, const NamedEnum< Enum, nEnum > &enumNames, const Enum defaultValue)
Lookup optimisation switch or add default value.
Definition: debug.H:94
Namespace for OpenFOAM.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
prefixOSstream Perr(cerr, "Perr")
Definition: IOstreams.H:54
List< label > labelList
A List of labels.
Definition: labelList.H:56
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:258
errorManip< error > abort(error &err)
Definition: errorManip.H:131
defineTypeNameAndDebug(combustionModel, 0)
word name(const LagrangianState state)
Return a string representation of a Lagrangian state enumeration.
prefixOSstream Pout(cout, "Pout")
Definition: IOstreams.H:53
label findIndex(const ListType &, typename ListType::const_reference, const label start=0)
Find first occurrence of given element and return index,.
error FatalError
labelList identityMap(const label len)
Create identity map (map[i] == i) of given length.
Definition: ListOps.C:104
void offset(label &lst, const label o)