UPstream.H
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration | Website: https://openfoam.org
5  \\ / A nd | Copyright (C) 2011-2018 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 Class
25  Foam::UPstream
26 
27 Description
28  Inter-processor communications stream
29 
30 SourceFiles
31  UPstream.C
32  UPstreamCommsStruct.C
33  gatherScatter.C
34  combineGatherScatter.C
35  gatherScatterList.C
36 
37 \*---------------------------------------------------------------------------*/
38 
39 #ifndef UPstream_H
40 #define UPstream_H
41 
42 #include "labelList.H"
43 #include "DynamicList.H"
44 #include "HashTable.H"
45 #include "string.H"
46 #include "NamedEnum.H"
47 #include "ListOps.H"
48 #include "LIFOStack.H"
49 
50 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
51 
52 namespace Foam
53 {
54 
55 /*---------------------------------------------------------------------------*\
56  Class UPstream Declaration
57 \*---------------------------------------------------------------------------*/
58 
59 class UPstream
60 {
61 
62 public:
63 
64  //- Types of communications
65  enum class commsTypes
66  {
67  blocking,
68  scheduled,
69  nonBlocking
70  };
71 
73 
74  // Public classes
75 
76  //- Structure for communicating between processors
77  class commsStruct
78  {
79  // Private data
80 
81  //- procID of above processor
82  label above_;
83 
84  //- procIDs of processors directly below me
85  labelList below_;
86 
87  //- procIDs of all processors below (so not just directly below)
88  labelList allBelow_;
89 
90  //- procIDs of all processors not below. (inverse set of
91  // allBelow_ and minus myProcNo)
92  labelList allNotBelow_;
93 
94 
95  public:
96 
97  // Constructors
98 
99  //- Construct null
100  commsStruct();
101 
102  //- Construct from components
104  (
105  const label,
106  const labelList&,
107  const labelList&,
108  const labelList&
109  );
110 
111  //- Construct from components; construct allNotBelow_
113  (
114  const label nProcs,
115  const label myProcID,
116  const label,
117  const labelList&,
118  const labelList&
119  );
120 
121 
122  // Member Functions
123 
124  // Access
126  label above() const
127  {
128  return above_;
129  }
131  const labelList& below() const
132  {
133  return below_;
134  }
136  const labelList& allBelow() const
137  {
138  return allBelow_;
139  }
141  const labelList& allNotBelow() const
142  {
143  return allNotBelow_;
144  }
145 
146 
147  // Member operators
148 
149  bool operator==(const commsStruct&) const;
150 
151  bool operator!=(const commsStruct&) const;
152 
153 
154  // Ostream Operator
155 
156  friend Ostream& operator<<(Ostream&, const commsStruct&);
157  };
158 
159 
160  //- combineReduce operator for lists. Used for counting.
161  class listEq
162  {
163 
164  public:
165 
166  template<class T>
167  void operator()(T& x, const T& y) const
168  {
169  forAll(y, i)
170  {
171  if (y[i].size())
172  {
173  x[i] = y[i];
174  }
175  }
176  }
177  };
178 
179 
180 private:
181 
182  // Private data
183 
184  //- By default this is not a parallel run
185  static bool parRun_;
186 
187  //- Have support for threads?
188  static bool haveThreads_;
189 
190  //- Standard transfer message type
191  static int msgType_;
192 
193  // Communicator specific data
194 
195  //- Free communicators
196  static LIFOStack<label> freeComms_;
197 
198  //- My processor number
199  static DynamicList<int> myProcNo_;
200 
201  //- List of process IDs
202  static DynamicList<List<int>> procIDs_;
203 
204  //- Parent communicator
205  static DynamicList<label> parentCommunicator_;
206 
207  //- Linear communication schedule
208  static DynamicList<List<commsStruct>> linearCommunication_;
209 
210  //- Multi level communication schedule
211  static DynamicList<List<commsStruct>> treeCommunication_;
212 
213 
214  // Private Member Functions
215 
216  //- Set data for parallel running
217  static void setParRun(const label nProcs, const bool haveThreads);
218 
219  //- Calculate linear communication schedule
220  static List<commsStruct> calcLinearComm(const label nProcs);
221 
222  //- Calculate tree communication schedule
223  static List<commsStruct> calcTreeComm(const label nProcs);
224 
225  //- Helper function for tree communication schedule determination
226  // Collects all processorIDs below a processor
227  static void collectReceives
228  (
229  const label procID,
230  const List<DynamicList<label>>& receives,
231  DynamicList<label>& allReceives
232  );
233 
234  //- Allocate a communicator with index
235  static void allocatePstreamCommunicator
236  (
237  const label parentIndex,
238  const label index
239  );
240 
241  //- Free a communicator
242  static void freePstreamCommunicator
243  (
244  const label index
245  );
246 
247 
248 protected:
249 
250  // Protected data
251 
252  //- Communications type of this stream
254 
255 public:
256 
257  // Declare name of the class and its debug switch
258  ClassName("UPstream");
259 
260 
261  // Static data
262 
263  //- Should compact transfer be used in which floats replace doubles
264  // reducing the bandwidth requirement at the expense of some loss
265  // in accuracy
266  static bool floatTransfer;
267 
268  //- Number of processors at which the sum algorithm changes from linear
269  // to tree
270  static int nProcsSimpleSum;
271 
272  //- Default commsType
274 
275  //- Number of polling cycles in processor updates
276  static int nPollProcInterfaces;
277 
278  //- Default communicator (all processors)
279  static label worldComm;
280 
281  //- Debugging: warn for use of any communicator differing from warnComm
282  static label warnComm;
283 
284 
285  // Constructors
286 
287  //- Construct given optional buffer size
289  :
290  commsType_(commsType)
291  {}
292 
293 
294  // Member functions
295 
296  //- Allocate a new communicator
298  (
299  const label parent,
300  const labelList& subRanks,
301  const bool doPstream = true
302  );
303 
304  //- Free a previously allocated communicator
305  static void freeCommunicator
306  (
307  const label communicator,
308  const bool doPstream = true
309  );
310 
311  //- Free all communicators
312  static void freeCommunicators(const bool doPstream);
313 
314  //- Helper class for allocating/freeing communicators
315  class communicator
316  {
317  label comm_;
318 
319  //- Disallow copy and assignment
320  communicator(const communicator&);
321  void operator=(const communicator&);
322 
323  public:
324 
325  communicator
326  (
327  const label parent,
328  const labelList& subRanks,
329  const bool doPstream
330  )
331  :
332  comm_(allocateCommunicator(parent, subRanks, doPstream))
333  {}
335  ~communicator()
336  {
337  freeCommunicator(comm_);
338  }
340  operator label() const
341  {
342  return comm_;
343  }
344  };
345 
346  //- Return physical processor number (i.e. processor number in
347  // worldComm) given communicator and procssor
348  static int baseProcNo(const label myComm, const int procID);
349 
350  //- Return processor number in communicator (given physical processor
351  // number) (= reverse of baseProcNo)
352  static label procNo(const label comm, const int baseProcID);
353 
354  //- Return processor number in communicator (given processor number
355  // and communicator)
356  static label procNo
357  (
358  const label myComm,
359  const label currentComm,
360  const int currentProcID
361  );
362 
363  //- Add the valid option this type of communications library
364  // adds/requires on the command line
365  static void addValidParOptions(HashTable<string>& validParOptions);
366 
367  //- Initialisation function called from main
368  // Spawns slave processes and initialises inter-communication
369  static bool init(int& argc, char**& argv, const bool needsThread);
370 
371  // Non-blocking comms
372 
373  //- Get number of outstanding requests
374  static label nRequests();
375 
376  //- Truncate number of outstanding requests
377  static void resetRequests(const label sz);
378 
379  //- Wait until all requests (from start onwards) have finished.
380  static void waitRequests(const label start = 0);
381 
382  //- Wait until request i has finished.
383  static void waitRequest(const label i);
384 
385  //- Non-blocking comms: has request i finished?
386  static bool finishedRequest(const label i);
387 
388  static int allocateTag(const char*);
389 
390  static int allocateTag(const word&);
391 
392  static void freeTag(const char*, const int tag);
393 
394  static void freeTag(const word&, const int tag);
395 
396 
397  //- Is this a parallel run?
398  static bool& parRun()
399  {
400  return parRun_;
401  }
402 
403  //- Have support for threads
404  static bool haveThreads()
405  {
406  return haveThreads_;
407  }
408 
409  //- Number of processes in parallel run
410  static label nProcs(const label communicator = 0)
411  {
412  return procIDs_[communicator].size();
413  }
414 
415  //- Process index of the master
416  static int masterNo()
417  {
418  return 0;
419  }
420 
421  //- Am I the master process
422  static bool master(const label communicator = 0)
423  {
424  return myProcNo_[communicator] == masterNo();
425  }
426 
427  //- Number of this process (starting from masterNo() = 0)
428  static int myProcNo(const label communicator = 0)
429  {
430  return myProcNo_[communicator];
431  }
433  static label parent(const label communicator)
434  {
435  return parentCommunicator_(communicator);
436  }
437 
438  //- Process ID of given process index
439  static List<int>& procID(label communicator)
440  {
441  return procIDs_[communicator];
442  }
443 
444  //- Process index of first slave
445  static int firstSlave()
446  {
447  return 1;
448  }
449 
450  //- Process index of last slave
451  static int lastSlave(const label communicator = 0)
452  {
453  return nProcs(communicator) - 1;
454  }
455 
456  //- Communication schedule for linear all-to-master (proc 0)
458  (
459  const label communicator = 0
460  )
461  {
462  return linearCommunication_[communicator];
463  }
464 
465  //- Communication schedule for tree all-to-master (proc 0)
467  (
468  const label communicator = 0
469  )
470  {
471  return treeCommunication_[communicator];
472  }
473 
474  //- Message tag of standard messages
475  static int& msgType()
476  {
477  return msgType_;
478  }
479 
480 
481  //- Get the communications type of the stream
482  commsTypes commsType() const
483  {
484  return commsType_;
485  }
486 
487  //- Set the communications type of the stream
489  {
490  commsTypes oldCommsType = commsType_;
491  commsType_ = ct;
492  return oldCommsType;
493  }
494 
495 
496  //- Exit program
497  static void exit(int errnum = 1);
498 
499  //- Abort program
500  static void abort();
501 
502  //- Exchange label with all processors (in the communicator).
503  // sendData[proci] is the label to send to proci.
504  // After return recvData contains the data from the other processors.
505  static void allToAll
506  (
507  const labelUList& sendData,
508  labelUList& recvData,
509  const label communicator = 0
510  );
511 
512  //- Exchange data with all processors (in the communicator)
513  // sendSizes, sendOffsets give (per processor) the slice of
514  // sendData to send, similarly recvSizes, recvOffsets give the slice
515  // of recvData to receive
516  static void allToAll
517  (
518  const char* sendData,
519  const UList<int>& sendSizes,
520  const UList<int>& sendOffsets,
521 
522  char* recvData,
523  const UList<int>& recvSizes,
524  const UList<int>& recvOffsets,
525 
526  const label communicator = 0
527  );
528 
529  //- Receive data from all processors on the master
530  static void gather
531  (
532  const char* sendData,
533  int sendSize,
534 
535  char* recvData,
536  const UList<int>& recvSizes,
537  const UList<int>& recvOffsets,
538  const label communicator = 0
539  );
540 
541  //- Send data to all processors from the root of the communicator
542  static void scatter
543  (
544  const char* sendData,
545  const UList<int>& sendSizes,
546  const UList<int>& sendOffsets,
547 
548  char* recvData,
549  int recvSize,
550  const label communicator = 0
551  );
552 };
553 
554 
556 
557 
558 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
559 
560 } // End namespace Foam
561 
562 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
563 
564 #endif
565 
566 // ************************************************************************* //
static bool floatTransfer
Should compact transfer be used in which floats replace doubles.
Definition: UPstream.H:265
commsTypes commsType() const
Get the communications type of the stream.
Definition: UPstream.H:481
#define forAll(list, i)
Loop across all elements in list.
Definition: UList.H:428
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
static int masterNo()
Process index of the master.
Definition: UPstream.H:415
static void scatter(const char *sendData, const UList< int > &sendSizes, const UList< int > &sendOffsets, char *recvData, int recvSize, const label communicator=0)
Send data to all processors from the root of the communicator.
Definition: UPstream.C:111
commsTypes
Types of communications.
Definition: UPstream.H:64
static int firstSlave()
Process index of first slave.
Definition: UPstream.H:444
commsTypes commsType_
Communications type of this stream.
Definition: UPstream.H:252
void size(const label)
Override size to be inconsistent with allocated storage.
Definition: ListI.H:163
static int myProcNo(const label communicator=0)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:427
static int nProcsSimpleSum
Number of processors at which the sum algorithm changes from linear.
Definition: UPstream.H:269
static bool master(const label communicator=0)
Am I the master process.
Definition: UPstream.H:421
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished?
Definition: UPstream.C:155
static label procNo(const label comm, const int baseProcID)
Return processor number in communicator (given physical processor.
Definition: UPstream.C:374
static label nRequests()
Get number of outstanding requests.
Definition: UPstream.C:137
static int & msgType()
Message tag of standard messages.
Definition: UPstream.H:474
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:278
static const List< commsStruct > & linearCommunication(const label communicator=0)
Communication schedule for linear all-to-master (proc 0)
Definition: UPstream.H:457
ClassName("UPstream")
Various functions to operate on Lists.
static void freeCommunicators(const bool doPstream)
Free all communicators.
Definition: UPstream.C:346
scalar y
static int nPollProcInterfaces
Number of polling cycles in processor updates.
Definition: UPstream.H:275
static label parent(const label communicator)
Definition: UPstream.H:432
static int allocateTag(const char *)
Definition: UPstream.C:826
static void freeTag(const char *, const int tag)
Definition: UPstream.C:884
static bool init(int &argc, char **&argv, const bool needsThread)
Initialisation function called from main.
Definition: UPstream.C:35
tmp< fvMatrix< Type > > operator==(const fvMatrix< Type > &, const fvMatrix< Type > &)
A class for handling words, derived from string.
Definition: word.H:59
static void resetRequests(const label sz)
Truncate number of outstanding requests.
Definition: UPstream.C:143
static const List< commsStruct > & treeCommunication(const label communicator=0)
Communication schedule for tree all-to-master (proc 0)
Definition: UPstream.H:466
combineReduce operator for lists. Used for counting.
Definition: UPstream.H:160
An STL-conforming hash table.
Definition: HashTable.H:62
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:281
Structure for communicating between processors.
Definition: UPstream.H:76
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:61
An Ostream is an abstract base class for all output systems (streams, files, token lists...
Definition: Ostream.H:53
static void exit(int errnum=1)
Exit program.
Definition: UPstream.C:46
static bool haveThreads()
Have support for threads.
Definition: UPstream.H:403
static int baseProcNo(const label myComm, const int procID)
Return physical processor number (i.e. processor number in.
Definition: UPstream.C:358
static const NamedEnum< commsTypes, 3 > commsTypeNames
Definition: UPstream.H:71
void T(FieldField< Field, Type > &f1, const FieldField< Field, Type > &f2)
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
Definition: UPstream.C:147
static void abort()
Abort program.
Definition: UPstream.C:52
static commsTypes defaultCommsType
Default commsType.
Definition: UPstream.H:272
static bool & parRun()
Is this a parallel run?
Definition: UPstream.H:397
static label nProcs(const label communicator=0)
Number of processes in parallel run.
Definition: UPstream.H:409
static void gather(const char *sendData, int sendSize, char *recvData, const UList< int > &recvSizes, const UList< int > &recvOffsets, const label communicator=0)
Receive data from all processors on the master.
Definition: UPstream.C:96
UPstream(const commsTypes commsType)
Construct given optional buffer size.
Definition: UPstream.H:287
Ostream & operator<<(Ostream &, const ensightPart &)
static void allToAll(const labelUList &sendData, labelUList &recvData, const label communicator=0)
Exchange label with all processors (in the communicator).
Definition: UPstream.C:85
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
Definition: UPstream.C:253
bool operator!=(const particle &, const particle &)
Definition: particle.C:1150
static void waitRequest(const label i)
Wait until request i has finished.
Definition: UPstream.C:151
Inter-processor communications stream.
Definition: UPstream.H:58
static List< int > & procID(label communicator)
Process ID of given process index.
Definition: UPstream.H:438
static void addValidParOptions(HashTable< string > &validParOptions)
Add the valid option this type of communications library.
Definition: UPstream.C:31
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:319
Namespace for OpenFOAM.
static int lastSlave(const label communicator=0)
Process index of last slave.
Definition: UPstream.H:450