UPstream.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include "UPstream.H"
27 #include "PstreamReduceOps.H"
28 #include "OSspecific.H"
29 #include "PstreamGlobals.H"
30 #include "SubList.H"
31 #include "allReduce.H"
32 
33 #include <mpi.h>
34 
35 #include <cstring>
36 #include <cstdlib>
37 #include <csignal>
38 
39 #if defined(WM_SP)
40  #define MPI_SCALAR MPI_FLOAT
41 #elif defined(WM_DP)
42  #define MPI_SCALAR MPI_DOUBLE
43 #endif
44 
45 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
46 
47 // NOTE:
48 // valid parallel options vary between implementations, but flag common ones.
49 // if they are not removed by MPI_Init(), the subsequent argument processing
50 // will notice that they are wrong
51 void Foam::UPstream::addValidParOptions(HashTable<string>& validParOptions)
52 {
53  validParOptions.insert("np", "");
54  validParOptions.insert("p4pg", "PI file");
55  validParOptions.insert("p4wd", "directory");
56  validParOptions.insert("p4amslave", "");
57  validParOptions.insert("p4yourname", "hostname");
58  validParOptions.insert("machinefile", "machine file");
59 }
60 
61 
62 bool Foam::UPstream::init(int& argc, char**& argv)
63 {
64  //MPI_Init(&argc, &argv);
65  int provided_thread_support;
66  MPI_Init_thread
67  (
68  &argc,
69  &argv,
70  MPI_THREAD_MULTIPLE,
71  &provided_thread_support
72  );
73 
74  int numprocs;
75  MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
76  int myRank;
77  MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
78 
79  if (debug)
80  {
81  Pout<< "UPstream::init : initialised with numProcs:" << numprocs
82  << " myRank:" << myRank << endl;
83  }
84 
85  if (numprocs <= 1)
86  {
88  << "bool IPstream::init(int& argc, char**& argv) : "
89  "attempt to run parallel on 1 processor"
91  }
92 
93 
94  // Initialise parallel structure
95  setParRun(numprocs);
96 
97  if (Pstream::master() && provided_thread_support != MPI_THREAD_MULTIPLE)
98  {
100  << "mpi does not seem to have thread support."
101  << " There might be issues with e.g. threaded IO"
102  << endl;
103  }
104 
105 
106  #ifndef SGIMPI
107  string bufferSizeName = getEnv("MPI_BUFFER_SIZE");
108 
109  if (bufferSizeName.size())
110  {
111  int bufferSize = atoi(bufferSizeName.c_str());
112 
113  if (bufferSize)
114  {
115  MPI_Buffer_attach(new char[bufferSize], bufferSize);
116  }
117  }
118  else
119  {
121  << "UPstream::init(int& argc, char**& argv) : "
122  << "environment variable MPI_BUFFER_SIZE not defined"
124  }
125  #endif
126 
127  //int processorNameLen;
128  //char processorName[MPI_MAX_PROCESSOR_NAME];
129  //
130  //MPI_Get_processor_name(processorName, &processorNameLen);
131  //processorName[processorNameLen] = '\0';
132  //Pout<< "Processor name:" << processorName << endl;
133 
134  return true;
135 }
136 
137 
138 void Foam::UPstream::exit(int errnum)
139 {
140  if (debug)
141  {
142  Pout<< "UPstream::exit." << endl;
143  }
144 
145  #ifndef SGIMPI
146  int size;
147  char* buff;
148  MPI_Buffer_detach(&buff, &size);
149  delete[] buff;
150  #endif
151 
153  {
156 
158  << "There are still " << n << " outstanding MPI_Requests." << endl
159  << "This means that your code exited before doing a"
160  << " UPstream::waitRequests()." << endl
161  << "This should not happen for a normal code exit."
162  << endl;
163  }
164 
165  // Clean mpi communicators
166  forAll(myProcNo_, communicator)
167  {
168  if (myProcNo_[communicator] != -1)
169  {
170  freePstreamCommunicator(communicator);
171  }
172  }
173 
174  if (errnum == 0)
175  {
176  MPI_Finalize();
177  ::exit(errnum);
178  }
179  else
180  {
181  MPI_Abort(MPI_COMM_WORLD, errnum);
182  }
183 }
184 
185 
187 {
188  MPI_Abort(MPI_COMM_WORLD, 1);
189 }
190 
191 
192 void Foam::reduce
193 (
194  scalar& Value,
195  const sumOp<scalar>& bop,
196  const int tag,
197  const label communicator
198 )
199 {
200  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
201  {
202  Pout<< "** reducing:" << Value << " with comm:" << communicator
203  << " warnComm:" << UPstream::warnComm
204  << endl;
206  }
207  allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
208 }
209 
210 
211 void Foam::reduce
212 (
213  scalar& Value,
214  const minOp<scalar>& bop,
215  const int tag,
216  const label communicator
217 )
218 {
219  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
220  {
221  Pout<< "** reducing:" << Value << " with comm:" << communicator
222  << " warnComm:" << UPstream::warnComm
223  << endl;
225  }
226  allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag, communicator);
227 }
228 
229 
230 void Foam::reduce
231 (
232  vector2D& Value,
233  const sumOp<vector2D>& bop,
234  const int tag,
235  const label communicator
236 )
237 {
238  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
239  {
240  Pout<< "** reducing:" << Value << " with comm:" << communicator
241  << " warnComm:" << UPstream::warnComm
242  << endl;
244  }
245  allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
246 }
247 
248 
249 void Foam::sumReduce
250 (
251  scalar& Value,
252  label& Count,
253  const int tag,
254  const label communicator
255 )
256 {
257  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
258  {
259  Pout<< "** reducing:" << Value << " with comm:" << communicator
260  << " warnComm:" << UPstream::warnComm
261  << endl;
263  }
264  vector2D twoScalars(Value, scalar(Count));
265  reduce(twoScalars, sumOp<vector2D>(), tag, communicator);
266 
267  Value = twoScalars.x();
268  Count = twoScalars.y();
269 }
270 
271 
272 void Foam::reduce
273 (
274  scalar& Value,
275  const sumOp<scalar>& bop,
276  const int tag,
277  const label communicator,
278  label& requestID
279 )
280 {
281 #ifdef MPIX_COMM_TYPE_SHARED
282  // Assume mpich2 with non-blocking collectives extensions. Once mpi3
283  // is available this will change.
284  MPI_Request request;
285  scalar v = Value;
286  MPIX_Ireduce
287  (
288  &v,
289  &Value,
290  1,
291  MPI_SCALAR,
292  MPI_SUM,
293  0, //root
294  PstreamGlobals::MPICommunicators_[communicator],
295  &request
296  );
297 
298  requestID = PstreamGlobals::outstandingRequests_.size();
299  PstreamGlobals::outstandingRequests_.append(request);
300 
301  if (UPstream::debug)
302  {
303  Pout<< "UPstream::allocateRequest for non-blocking reduce"
304  << " : request:" << requestID
305  << endl;
306  }
307 #else
308  // Non-blocking not yet implemented in mpi
309  reduce(Value, bop, tag, communicator);
310  requestID = -1;
311 #endif
312 }
313 
314 
316 (
317  const labelUList& sendData,
318  labelUList& recvData,
319  const label communicator
320 )
321 {
322  label np = nProcs(communicator);
323 
324  if (sendData.size() != np || recvData.size() != np)
325  {
327  << "Size of sendData " << sendData.size()
328  << " or size of recvData " << recvData.size()
329  << " is not equal to the number of processors in the domain "
330  << np
332  }
333 
334  if (!UPstream::parRun())
335  {
336  recvData.deepCopy(sendData);
337  }
338  else
339  {
340  if
341  (
342  MPI_Alltoall
343  (
344  // NOTE: const_cast is a temporary hack for
345  // backward-compatibility with versions of OpenMPI < 1.7.4
346  const_cast<label*>(sendData.begin()),
347  sizeof(label),
348  MPI_BYTE,
349  recvData.begin(),
350  sizeof(label),
351  MPI_BYTE,
353  )
354  )
355  {
357  << "MPI_Alltoall failed for " << sendData
358  << " on communicator " << communicator
360  }
361  }
362 }
363 
364 
365 void Foam::UPstream::allocatePstreamCommunicator
366 (
367  const label parentIndex,
368  const label index
369 )
370 {
371  if (index == PstreamGlobals::MPIGroups_.size())
372  {
373  // Extend storage with dummy values
374  MPI_Group newGroup = MPI_GROUP_NULL;
375  PstreamGlobals::MPIGroups_.append(newGroup);
376  MPI_Comm newComm = MPI_COMM_NULL;
377  PstreamGlobals::MPICommunicators_.append(newComm);
378  }
379  else if (index > PstreamGlobals::MPIGroups_.size())
380  {
382  << "PstreamGlobals out of sync with UPstream data. Problem."
384  }
385 
386 
387  if (parentIndex == -1)
388  {
389  // Allocate world communicator
390 
391  if (index != UPstream::worldComm)
392  {
394  << "world communicator should always be index "
396  }
397 
398  PstreamGlobals::MPICommunicators_[index] = MPI_COMM_WORLD;
399  MPI_Comm_group(MPI_COMM_WORLD, &PstreamGlobals::MPIGroups_[index]);
400  MPI_Comm_rank
401  (
403  &myProcNo_[index]
404  );
405 
406  // Set the number of processes to the actual number
407  int numProcs;
408  MPI_Comm_size(PstreamGlobals::MPICommunicators_[index], &numProcs);
409 
410  //procIDs_[index] = identity(numProcs);
411  procIDs_[index].setSize(numProcs);
412  forAll(procIDs_[index], i)
413  {
414  procIDs_[index][i] = i;
415  }
416  }
417  else
418  {
419  // Create new group
420  MPI_Group_incl
421  (
422  PstreamGlobals::MPIGroups_[parentIndex],
423  procIDs_[index].size(),
424  procIDs_[index].begin(),
426  );
427 
428  // Create new communicator
429  MPI_Comm_create
430  (
434  );
435 
436  if (PstreamGlobals::MPICommunicators_[index] == MPI_COMM_NULL)
437  {
438  myProcNo_[index] = -1;
439  }
440  else
441  {
442  if
443  (
444  MPI_Comm_rank
445  (
447  &myProcNo_[index]
448  )
449  )
450  {
452  << "Problem :"
453  << " when allocating communicator at " << index
454  << " from ranks " << procIDs_[index]
455  << " of parent " << parentIndex
456  << " cannot find my own rank"
458  }
459  }
460  }
461 }
462 
463 
464 void Foam::UPstream::freePstreamCommunicator(const label communicator)
465 {
466  if (communicator != UPstream::worldComm)
467  {
468  if (PstreamGlobals::MPICommunicators_[communicator] != MPI_COMM_NULL)
469  {
470  // Free communicator. Sets communicator to MPI_COMM_NULL
471  MPI_Comm_free(&PstreamGlobals::MPICommunicators_[communicator]);
472  }
473  if (PstreamGlobals::MPIGroups_[communicator] != MPI_GROUP_NULL)
474  {
475  // Free greoup. Sets group to MPI_GROUP_NULL
476  MPI_Group_free(&PstreamGlobals::MPIGroups_[communicator]);
477  }
478  }
479 }
480 
481 
483 {
485 }
486 
487 
489 {
491  {
493  }
494 }
495 
496 
497 void Foam::UPstream::waitRequests(const label start)
498 {
499  if (debug)
500  {
501  Pout<< "UPstream::waitRequests : starting wait for "
503  << " outstanding requests starting at " << start << endl;
504  }
505 
507  {
508  SubList<MPI_Request> waitRequests
509  (
512  start
513  );
514 
515  if
516  (
517  MPI_Waitall
518  (
519  waitRequests.size(),
520  waitRequests.begin(),
521  MPI_STATUSES_IGNORE
522  )
523  )
524  {
526  << "MPI_Waitall returned with error" << Foam::endl;
527  }
528 
529  resetRequests(start);
530  }
531 
532  if (debug)
533  {
534  Pout<< "UPstream::waitRequests : finished wait." << endl;
535  }
536 }
537 
538 
540 {
541  if (debug)
542  {
543  Pout<< "UPstream::waitRequest : starting wait for request:" << i
544  << endl;
545  }
546 
547  if (i >= PstreamGlobals::outstandingRequests_.size())
548  {
550  << "There are " << PstreamGlobals::outstandingRequests_.size()
551  << " outstanding send requests and you are asking for i=" << i
552  << nl
553  << "Maybe you are mixing blocking/non-blocking comms?"
555  }
556 
557  if
558  (
559  MPI_Wait
560  (
562  MPI_STATUS_IGNORE
563  )
564  )
565  {
567  << "MPI_Wait returned with error" << Foam::endl;
568  }
569 
570  if (debug)
571  {
572  Pout<< "UPstream::waitRequest : finished wait for request:" << i
573  << endl;
574  }
575 }
576 
577 
579 {
580  if (debug)
581  {
582  Pout<< "UPstream::finishedRequest : checking request:" << i
583  << endl;
584  }
585 
586  if (i >= PstreamGlobals::outstandingRequests_.size())
587  {
589  << "There are " << PstreamGlobals::outstandingRequests_.size()
590  << " outstanding send requests and you are asking for i=" << i
591  << nl
592  << "Maybe you are mixing blocking/non-blocking comms?"
594  }
595 
596  int flag;
597  MPI_Test
598  (
600  &flag,
601  MPI_STATUS_IGNORE
602  );
603 
604  if (debug)
605  {
606  Pout<< "UPstream::finishedRequest : finished request:" << i
607  << endl;
608  }
609 
610  return flag != 0;
611 }
612 
613 
615 {
616  int tag;
617  if (PstreamGlobals::freedTags_.size())
618  {
620  }
621  else
622  {
623  tag = PstreamGlobals::nTags_++;
624  }
625 
626  if (debug)
627  {
628  //if (UPstream::lateBlocking > 0)
629  //{
630  // string& poutp = Pout.prefix();
631  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
632  // Perr.prefix() = Pout.prefix();
633  //}
634  Pout<< "UPstream::allocateTag " << s
635  << " : tag:" << tag
636  << endl;
637  }
638 
639  return tag;
640 }
641 
642 
644 {
645  int tag;
646  if (PstreamGlobals::freedTags_.size())
647  {
649  }
650  else
651  {
652  tag = PstreamGlobals::nTags_++;
653  }
654 
655  if (debug)
656  {
657  //if (UPstream::lateBlocking > 0)
658  //{
659  // string& poutp = Pout.prefix();
660  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
661  // Perr.prefix() = Pout.prefix();
662  //}
663  Pout<< "UPstream::allocateTag " << s
664  << " : tag:" << tag
665  << endl;
666  }
667 
668  return tag;
669 }
670 
671 
672 void Foam::UPstream::freeTag(const char* s, const int tag)
673 {
674  if (debug)
675  {
676  //if (UPstream::lateBlocking > 0)
677  //{
678  // string& poutp = Pout.prefix();
679  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
680  // Perr.prefix() = Pout.prefix();
681  //}
682  Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
683  }
685 }
686 
687 
688 void Foam::UPstream::freeTag(const word& s, const int tag)
689 {
690  if (debug)
691  {
692  //if (UPstream::lateBlocking > 0)
693  //{
694  // string& poutp = Pout.prefix();
695  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
696  // Perr.prefix() = Pout.prefix();
697  //}
698  Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
699  }
701 }
702 
703 
704 // ************************************************************************* //
DynamicList< int > freedTags_
string getEnv(const word &)
Return environment variable of given name.
Definition: POSIX.C:104
static void printStack(Ostream &)
Helper function to print a stack.
#define forAll(list, i)
Loop across all elements in list.
Definition: UList.H:428
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
Inter-processor communication reduction functions.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
error FatalError
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:319
Various functions to wrap MPI_Allreduce.
Vector2D< scalar > vector2D
vector2D obtained from generic Vector2D
Definition: vector2D.H:49
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:253
static bool master(const label communicator=0)
Am I the master process.
Definition: UPstream.H:412
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished?
Definition: UPstream.C:125
DynamicList< MPI_Request > outstandingRequests_
static label nRequests()
Get number of outstanding requests.
Definition: UPstream.C:107
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:275
DynamicList< MPI_Group > MPIGroups_
UList< label > labelUList
Definition: UList.H:64
Functions used by OpenFOAM that are specific to POSIX compliant operating systems and need to be repl...
gmvFile<< "tracers "<< particles.size()<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){ gmvFile<< iter().position().x()<< " ";}gmvFile<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){ gmvFile<< iter().position().y()<< " ";}gmvFile<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){ gmvFile<< iter().position().z()<< " ";}gmvFile<< nl;forAll(lagrangianScalarNames, i){ word name=lagrangianScalarNames[i];IOField< scalar > s(IOobject(name, runTime.timeName(), cloud::prefix, mesh, IOobject::MUST_READ, IOobject::NO_WRITE))
static int allocateTag(const char *)
Definition: UPstream.C:614
static void freeTag(const char *, const int tag)
Definition: UPstream.C:672
A class for handling words, derived from string.
Definition: word.H:59
void sumReduce(T &Value, label &Count, const int tag=Pstream::msgType(), const label comm=UPstream::worldComm)
static void resetRequests(const label sz)
Truncate number of outstanding requests.
Definition: UPstream.C:113
static bool init(int &argc, char **&argv)
Initialisation function called from main.
Definition: UPstream.C:35
DynamicList< T, SizeInc, SizeMult, SizeDiv > & append(const T &)
Append an element at the end of the list.
Definition: DynamicListI.H:292
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:278
errorManip< error > abort(error &err)
Definition: errorManip.H:131
static void exit(int errnum=1)
Exit program.
Definition: UPstream.C:46
static const char nl
Definition: Ostream.H:262
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
Definition: UPstream.C:117
void reduce(const List< UPstream::commsStruct > &comms, T &Value, const BinaryOp &bop, const int tag, const label comm)
T remove()
Remove and return the top element.
Definition: DynamicListI.H:347
static void abort()
Abort program.
Definition: UPstream.C:52
static bool & parRun()
Is this a parallel run?
Definition: UPstream.H:394
static label nProcs(const label communicator=0)
Number of processes in parallel run.
Definition: UPstream.H:400
#define WarningInFunction
Report a warning using Foam::Warning.
prefixOSstream Pout(cout, "Pout")
Definition: IOstreams.H:53
DynamicList< MPI_Comm > MPICommunicators_
static void allToAll(const labelUList &sendData, labelUList &recvData, const label communicator=0)
Exchange label with all processors (in the communicator).
Definition: UPstream.C:85
static void waitRequest(const label i)
Wait until request i has finished.
Definition: UPstream.C:121
static void addValidParOptions(HashTable< string > &validParOptions)
Add the valid option this type of communications library.
Definition: UPstream.C:31
void allReduce(Type &Value, int count, MPI_Datatype MPIType, MPI_Op op, const BinaryOp &bop, const int tag, const label communicator)