UPstream.H
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration | Website: https://openfoam.org
5  \\ / A nd | Copyright (C) 2011-2019 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 Class
25  Foam::UPstream
26 
27 Description
28  Inter-processor communications stream
29 
30 SourceFiles
31  UPstream.C
32  UPstreamCommsStruct.C
33  gatherScatter.C
34  combineGatherScatter.C
35  gatherScatterList.C
36 
37 \*---------------------------------------------------------------------------*/
38 
39 #ifndef UPstream_H
40 #define UPstream_H
41 
42 #include "labelList.H"
43 #include "DynamicList.H"
44 #include "HashTable.H"
45 #include "string.H"
46 #include "NamedEnum.H"
47 #include "ListOps.H"
48 #include "LIFOStack.H"
49 
50 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
51 
52 namespace Foam
53 {
54 
55 /*---------------------------------------------------------------------------*\
56  Class UPstream Declaration
57 \*---------------------------------------------------------------------------*/
58 
59 class UPstream
60 {
61 
62 public:
63 
64  //- Types of communications
65  enum class commsTypes
66  {
67  blocking,
68  scheduled,
69  nonBlocking
70  };
71 
73 
74  // Public classes
75 
76  //- Structure for communicating between processors
77  class commsStruct
78  {
79  // Private Data
80 
81  //- procID of above processor
82  label above_;
83 
84  //- procIDs of processors directly below me
85  labelList below_;
86 
87  //- procIDs of all processors below (so not just directly below)
88  labelList allBelow_;
89 
90  //- procIDs of all processors not below. (inverse set of
91  // allBelow_ and minus myProcNo)
92  labelList allNotBelow_;
93 
94 
95  public:
96 
97  // Constructors
98 
99  //- Construct null
100  commsStruct();
101 
102  //- Construct from components
104  (
105  const label,
106  const labelList&,
107  const labelList&,
108  const labelList&
109  );
110 
111  //- Construct from components; construct allNotBelow_
113  (
114  const label nProcs,
115  const label myProcID,
116  const label,
117  const labelList&,
118  const labelList&
119  );
120 
121 
122  // Member Functions
123 
124  // Access
126  label above() const
127  {
128  return above_;
129  }
131  const labelList& below() const
132  {
133  return below_;
134  }
136  const labelList& allBelow() const
137  {
138  return allBelow_;
139  }
141  const labelList& allNotBelow() const
142  {
143  return allNotBelow_;
144  }
145 
146 
147  // Member Operators
148 
149  bool operator==(const commsStruct&) const;
150 
151  bool operator!=(const commsStruct&) const;
152 
153 
154  // Ostream Operator
155 
156  friend Ostream& operator<<(Ostream&, const commsStruct&);
157  };
158 
159 
160  //- combineReduce operator for lists. Used for counting.
161  class listEq
162  {
163 
164  public:
165 
166  template<class T>
167  void operator()(T& x, const T& y) const
168  {
169  forAll(y, i)
170  {
171  if (y[i].size())
172  {
173  x[i] = y[i];
174  }
175  }
176  }
177  };
178 
179 
180 private:
181 
182  // Private Data
183 
184  //- By default this is not a parallel run
185  static bool parRun_;
186 
187  //- Have support for threads?
188  static bool haveThreads_;
189 
190  //- Standard transfer message type
191  static int msgType_;
192 
193  // Communicator specific data
194 
195  //- Free communicators
196  static LIFOStack<label> freeComms_;
197 
198  //- My processor number
199  static DynamicList<int> myProcNo_;
200 
201  //- List of process IDs
202  static DynamicList<List<int>> procIDs_;
203 
204  //- Parent communicator
205  static DynamicList<label> parentCommunicator_;
206 
207  //- Linear communication schedule
208  static DynamicList<List<commsStruct>> linearCommunication_;
209 
210  //- Multi level communication schedule
211  static DynamicList<List<commsStruct>> treeCommunication_;
212 
213 
214  // Private Member Functions
215 
216  //- Set data for parallel running
217  static void setParRun(const label nProcs, const bool haveThreads);
218 
219  //- Calculate linear communication schedule
220  static List<commsStruct> calcLinearComm(const label nProcs);
221 
222  //- Calculate tree communication schedule
223  static List<commsStruct> calcTreeComm(const label nProcs);
224 
225  //- Helper function for tree communication schedule determination
226  // Collects all processorIDs below a processor
227  static void collectReceives
228  (
229  const label procID,
230  const List<DynamicList<label>>& receives,
231  DynamicList<label>& allReceives
232  );
233 
234  //- Allocate a communicator with index
235  static void allocatePstreamCommunicator
236  (
237  const label parentIndex,
238  const label index
239  );
240 
241  //- Free a communicator
242  static void freePstreamCommunicator
243  (
244  const label index
245  );
246 
247 
248 protected:
249 
250  // Protected data
251 
252  //- Communications type of this stream
254 
255 public:
256 
257  // Declare name of the class and its debug switch
258  ClassName("UPstream");
259 
260 
261  // Static data
262 
263  //- Should compact transfer be used in which floats replace doubles
264  // reducing the bandwidth requirement at the expense of some loss
265  // in accuracy
266  static bool floatTransfer;
267 
268  //- Number of processors at which the sum algorithm changes from linear
269  // to tree
270  static int nProcsSimpleSum;
271 
272  //- Default commsType
274 
275  //- Number of polling cycles in processor updates
276  static int nPollProcInterfaces;
277 
278  //- Default communicator (all processors)
279  static label worldComm;
280 
281  //- Debugging: warn for use of any communicator differing from warnComm
282  static label warnComm;
283 
284 
285  // Constructors
286 
287  //- Construct given optional buffer size
289  :
290  commsType_(commsType)
291  {}
292 
293 
294  // Member Functions
295 
296  //- Allocate a new communicator
298  (
299  const label parent,
300  const labelList& subRanks,
301  const bool doPstream = true
302  );
303 
304  //- Free a previously allocated communicator
305  static void freeCommunicator
306  (
307  const label communicator,
308  const bool doPstream = true
309  );
310 
311  //- Free all communicators
312  static void freeCommunicators(const bool doPstream);
313 
314  //- Helper class for allocating/freeing communicators
315  class communicator
316  {
317  label comm_;
318 
319  public:
320 
321  communicator
322  (
323  const label parent,
324  const labelList& subRanks,
325  const bool doPstream
326  )
327  :
328  comm_(allocateCommunicator(parent, subRanks, doPstream))
329  {}
330 
331  //- Disallow default bitwise copy construction
332  communicator(const communicator&) = delete;
334  ~communicator()
335  {
336  freeCommunicator(comm_);
337  }
339  operator label() const
340  {
341  return comm_;
342  }
343 
344  //- Disallow default bitwise assignment
345  void operator=(const communicator&) = delete;
346  };
347 
348  //- Return physical processor number (i.e. processor number in
349  // worldComm) given communicator and processor
350  static int baseProcNo(const label myComm, const int procID);
351 
352  //- Return processor number in communicator (given physical processor
353  // number) (= reverse of baseProcNo)
354  static label procNo(const label comm, const int baseProcID);
355 
356  //- Return processor number in communicator (given processor number
357  // and communicator)
358  static label procNo
359  (
360  const label myComm,
361  const label currentComm,
362  const int currentProcID
363  );
364 
365  //- Add the valid option this type of communications library
366  // adds/requires on the command line
367  static void addValidParOptions(HashTable<string>& validParOptions);
368 
369  //- Initialisation function called from main
370  // Spawns slave processes and initialises inter-communication
371  static bool init(int& argc, char**& argv, const bool needsThread);
372 
373  // Non-blocking comms
374 
375  //- Get number of outstanding requests
376  static label nRequests();
377 
378  //- Truncate number of outstanding requests
379  static void resetRequests(const label sz);
380 
381  //- Wait until all requests (from start onwards) have finished.
382  static void waitRequests(const label start = 0);
383 
384  //- Wait until request i has finished.
385  static void waitRequest(const label i);
386 
387  //- Non-blocking comms: has request i finished?
388  static bool finishedRequest(const label i);
389 
390  static int allocateTag(const char*);
391 
392  static int allocateTag(const word&);
393 
394  static void freeTag(const char*, const int tag);
395 
396  static void freeTag(const word&, const int tag);
397 
398 
399  //- Is this a parallel run?
400  static bool& parRun()
401  {
402  return parRun_;
403  }
404 
405  //- Have support for threads
406  static bool haveThreads()
407  {
408  return haveThreads_;
409  }
410 
411  //- Number of processes in parallel run
412  static label nProcs(const label communicator = 0)
413  {
414  return procIDs_[communicator].size();
415  }
416 
417  //- Process index of the master
418  static int masterNo()
419  {
420  return 0;
421  }
422 
423  //- Am I the master process
424  static bool master(const label communicator = 0)
425  {
426  return myProcNo_[communicator] == masterNo();
427  }
428 
429  //- Number of this process (starting from masterNo() = 0)
430  static int myProcNo(const label communicator = 0)
431  {
432  return myProcNo_[communicator];
433  }
435  static label parent(const label communicator)
436  {
437  return parentCommunicator_(communicator);
438  }
439 
440  //- Process ID of given process index
441  static List<int>& procID(label communicator)
442  {
443  return procIDs_[communicator];
444  }
445 
446  //- Process index of first slave
447  static int firstSlave()
448  {
449  return 1;
450  }
451 
452  //- Process index of last slave
453  static int lastSlave(const label communicator = 0)
454  {
455  return nProcs(communicator) - 1;
456  }
457 
458  //- Communication schedule for linear all-to-master (proc 0)
460  (
461  const label communicator = 0
462  )
463  {
464  return linearCommunication_[communicator];
465  }
466 
467  //- Communication schedule for tree all-to-master (proc 0)
469  (
470  const label communicator = 0
471  )
472  {
473  return treeCommunication_[communicator];
474  }
475 
476  //- Message tag of standard messages
477  static int& msgType()
478  {
479  return msgType_;
480  }
481 
482 
483  //- Get the communications type of the stream
484  commsTypes commsType() const
485  {
486  return commsType_;
487  }
488 
489  //- Set the communications type of the stream
491  {
492  commsTypes oldCommsType = commsType_;
493  commsType_ = ct;
494  return oldCommsType;
495  }
496 
497 
498  //- Exit program
499  static void exit(int errnum = 1);
500 
501  //- Abort program
502  static void abort();
503 
504  //- Exchange label with all processors (in the communicator).
505  // sendData[proci] is the label to send to proci.
506  // After return recvData contains the data from the other processors.
507  static void allToAll
508  (
509  const labelUList& sendData,
510  labelUList& recvData,
511  const label communicator = 0
512  );
513 
514  //- Exchange data with all processors (in the communicator)
515  // sendSizes, sendOffsets give (per processor) the slice of
516  // sendData to send, similarly recvSizes, recvOffsets give the slice
517  // of recvData to receive
518  static void allToAll
519  (
520  const char* sendData,
521  const UList<int>& sendSizes,
522  const UList<int>& sendOffsets,
523 
524  char* recvData,
525  const UList<int>& recvSizes,
526  const UList<int>& recvOffsets,
527 
528  const label communicator = 0
529  );
530 
531  //- Receive data from all processors on the master
532  static void gather
533  (
534  const char* sendData,
535  int sendSize,
536 
537  char* recvData,
538  const UList<int>& recvSizes,
539  const UList<int>& recvOffsets,
540  const label communicator = 0
541  );
542 
543  //- Send data to all processors from the root of the communicator
544  static void scatter
545  (
546  const char* sendData,
547  const UList<int>& sendSizes,
548  const UList<int>& sendOffsets,
549 
550  char* recvData,
551  int recvSize,
552  const label communicator = 0
553  );
554 };
555 
556 
558 
559 
560 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
561 
562 } // End namespace Foam
563 
564 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
565 
566 #endif
567 
568 // ************************************************************************* //
static bool floatTransfer
Should compact transfer be used in which floats replace doubles.
Definition: UPstream.H:265
commsTypes commsType() const
Get the communications type of the stream.
Definition: UPstream.H:483
#define forAll(list, i)
Loop across all elements in list.
Definition: UList.H:434
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
static int masterNo()
Process index of the master.
Definition: UPstream.H:417
static void scatter(const char *sendData, const UList< int > &sendSizes, const UList< int > &sendOffsets, char *recvData, int recvSize, const label communicator=0)
Send data to all processors from the root of the communicator.
Definition: UPstream.C:111
commsTypes
Types of communications.
Definition: UPstream.H:64
static int firstSlave()
Process index of first slave.
Definition: UPstream.H:446
commsTypes commsType_
Communications type of this stream.
Definition: UPstream.H:252
void size(const label)
Override size to be inconsistent with allocated storage.
Definition: ListI.H:164
static int myProcNo(const label communicator=0)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:429
static int nProcsSimpleSum
Number of processors at which the sum algorithm changes from linear.
Definition: UPstream.H:269
static bool master(const label communicator=0)
Am I the master process.
Definition: UPstream.H:423
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished?
Definition: UPstream.C:155
static label procNo(const label comm, const int baseProcID)
Return processor number in communicator (given physical processor.
Definition: UPstream.C:369
static label nRequests()
Get number of outstanding requests.
Definition: UPstream.C:137
static int & msgType()
Message tag of standard messages.
Definition: UPstream.H:476
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:278
static const List< commsStruct > & linearCommunication(const label communicator=0)
Communication schedule for linear all-to-master (proc 0)
Definition: UPstream.H:459
ClassName("UPstream")
Various functions to operate on Lists.
static void freeCommunicators(const bool doPstream)
Free all communicators.
Definition: UPstream.C:341
scalar y
static int nPollProcInterfaces
Number of polling cycles in processor updates.
Definition: UPstream.H:275
static label parent(const label communicator)
Definition: UPstream.H:434
static int allocateTag(const char *)
Definition: UPstream.C:826
static void freeTag(const char *, const int tag)
Definition: UPstream.C:884
static bool init(int &argc, char **&argv, const bool needsThread)
Initialisation function called from main.
Definition: UPstream.C:35
tmp< fvMatrix< Type > > operator==(const fvMatrix< Type > &, const fvMatrix< Type > &)
A class for handling words, derived from string.
Definition: word.H:59
static void resetRequests(const label sz)
Truncate number of outstanding requests.
Definition: UPstream.C:143
static const List< commsStruct > & treeCommunication(const label communicator=0)
Communication schedule for tree all-to-master (proc 0)
Definition: UPstream.H:468
combineReduce operator for lists. Used for counting.
Definition: UPstream.H:160
An STL-conforming hash table.
Definition: HashTable.H:61
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:281
Structure for communicating between processors.
Definition: UPstream.H:76
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:60
An Ostream is an abstract base class for all output systems (streams, files, token lists...
Definition: Ostream.H:54
static void exit(int errnum=1)
Exit program.
Definition: UPstream.C:46
static bool haveThreads()
Have support for threads.
Definition: UPstream.H:405
static int baseProcNo(const label myComm, const int procID)
Return physical processor number (i.e. processor number in.
Definition: UPstream.C:353
static const NamedEnum< commsTypes, 3 > commsTypeNames
Definition: UPstream.H:71
void T(FieldField< Field, Type > &f1, const FieldField< Field, Type > &f2)
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
Definition: UPstream.C:147
static void abort()
Abort program.
Definition: UPstream.C:52
static commsTypes defaultCommsType
Default commsType.
Definition: UPstream.H:272
static bool & parRun()
Is this a parallel run?
Definition: UPstream.H:399
static label nProcs(const label communicator=0)
Number of processes in parallel run.
Definition: UPstream.H:411
static void gather(const char *sendData, int sendSize, char *recvData, const UList< int > &recvSizes, const UList< int > &recvOffsets, const label communicator=0)
Receive data from all processors on the master.
Definition: UPstream.C:96
UPstream(const commsTypes commsType)
Construct given optional buffer size.
Definition: UPstream.H:287
Ostream & operator<<(Ostream &, const ensightPart &)
static void allToAll(const labelUList &sendData, labelUList &recvData, const label communicator=0)
Exchange label with all processors (in the communicator).
Definition: UPstream.C:85
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
Definition: UPstream.C:248
bool operator!=(const particle &, const particle &)
Definition: particle.C:1205
static void waitRequest(const label i)
Wait until request i has finished.
Definition: UPstream.C:151
Inter-processor communications stream.
Definition: UPstream.H:58
static List< int > & procID(label communicator)
Process ID of given process index.
Definition: UPstream.H:440
static void addValidParOptions(HashTable< string > &validParOptions)
Add the valid option this type of communications library.
Definition: UPstream.C:31
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:314
Namespace for OpenFOAM.
static int lastSlave(const label communicator=0)
Process index of last slave.
Definition: UPstream.H:452