UPstream.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include "UPstream.H"
27 #include "PstreamReduceOps.H"
28 #include "OSspecific.H"
29 #include "PstreamGlobals.H"
30 #include "SubList.H"
31 #include "allReduce.H"
32 
33 #include <mpi.h>
34 
35 #include <cstring>
36 #include <cstdlib>
37 #include <csignal>
38 
39 #if defined(WM_SP)
40  #define MPI_SCALAR MPI_FLOAT
41 #elif defined(WM_DP)
42  #define MPI_SCALAR MPI_DOUBLE
43 #endif
44 
45 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
46 
47 // NOTE:
48 // valid parallel options vary between implementations, but flag common ones.
49 // if they are not removed by MPI_Init(), the subsequent argument processing
50 // will notice that they are wrong
51 void Foam::UPstream::addValidParOptions(HashTable<string>& validParOptions)
52 {
53  validParOptions.insert("np", "");
54  validParOptions.insert("p4pg", "PI file");
55  validParOptions.insert("p4wd", "directory");
56  validParOptions.insert("p4amslave", "");
57  validParOptions.insert("p4yourname", "hostname");
58  validParOptions.insert("machinefile", "machine file");
59 }
60 
61 
62 bool Foam::UPstream::init(int& argc, char**& argv)
63 {
64  MPI_Init(&argc, &argv);
65 
66  int numprocs;
67  MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
68  int myRank;
69  MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
70 
71  if (debug)
72  {
73  Pout<< "UPstream::init : initialised with numProcs:" << numprocs
74  << " myRank:" << myRank << endl;
75  }
76 
77  if (numprocs <= 1)
78  {
80  << "bool IPstream::init(int& argc, char**& argv) : "
81  "attempt to run parallel on 1 processor"
83  }
84 
85 
86  // Initialise parallel structure
87  setParRun(numprocs);
88 
89  #ifndef SGIMPI
90  string bufferSizeName = getEnv("MPI_BUFFER_SIZE");
91 
92  if (bufferSizeName.size())
93  {
94  int bufferSize = atoi(bufferSizeName.c_str());
95 
96  if (bufferSize)
97  {
98  MPI_Buffer_attach(new char[bufferSize], bufferSize);
99  }
100  }
101  else
102  {
104  << "UPstream::init(int& argc, char**& argv) : "
105  << "environment variable MPI_BUFFER_SIZE not defined"
107  }
108  #endif
109 
110  //int processorNameLen;
111  //char processorName[MPI_MAX_PROCESSOR_NAME];
112  //
113  //MPI_Get_processor_name(processorName, &processorNameLen);
114  //processorName[processorNameLen] = '\0';
115  //Pout<< "Processor name:" << processorName << endl;
116 
117  return true;
118 }
119 
120 
121 void Foam::UPstream::exit(int errnum)
122 {
123  if (debug)
124  {
125  Pout<< "UPstream::exit." << endl;
126  }
127 
128  #ifndef SGIMPI
129  int size;
130  char* buff;
131  MPI_Buffer_detach(&buff, &size);
132  delete[] buff;
133  #endif
134 
136  {
139 
141  << "There are still " << n << " outstanding MPI_Requests." << endl
142  << "This means that your code exited before doing a"
143  << " UPstream::waitRequests()." << endl
144  << "This should not happen for a normal code exit."
145  << endl;
146  }
147 
148  // Clean mpi communicators
149  forAll(myProcNo_, communicator)
150  {
151  if (myProcNo_[communicator] != -1)
152  {
153  freePstreamCommunicator(communicator);
154  }
155  }
156 
157  if (errnum == 0)
158  {
159  MPI_Finalize();
160  ::exit(errnum);
161  }
162  else
163  {
164  MPI_Abort(MPI_COMM_WORLD, errnum);
165  }
166 }
167 
168 
170 {
171  MPI_Abort(MPI_COMM_WORLD, 1);
172 }
173 
174 
175 void Foam::reduce
176 (
177  scalar& Value,
178  const sumOp<scalar>& bop,
179  const int tag,
180  const label communicator
181 )
182 {
183  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
184  {
185  Pout<< "** reducing:" << Value << " with comm:" << communicator
186  << " warnComm:" << UPstream::warnComm
187  << endl;
189  }
190  allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
191 }
192 
193 
194 void Foam::reduce
195 (
196  scalar& Value,
197  const minOp<scalar>& bop,
198  const int tag,
199  const label communicator
200 )
201 {
202  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
203  {
204  Pout<< "** reducing:" << Value << " with comm:" << communicator
205  << " warnComm:" << UPstream::warnComm
206  << endl;
208  }
209  allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag, communicator);
210 }
211 
212 
213 void Foam::reduce
214 (
215  vector2D& Value,
216  const sumOp<vector2D>& bop,
217  const int tag,
218  const label communicator
219 )
220 {
221  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
222  {
223  Pout<< "** reducing:" << Value << " with comm:" << communicator
224  << " warnComm:" << UPstream::warnComm
225  << endl;
227  }
228  allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
229 }
230 
231 
232 void Foam::sumReduce
233 (
234  scalar& Value,
235  label& Count,
236  const int tag,
237  const label communicator
238 )
239 {
240  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
241  {
242  Pout<< "** reducing:" << Value << " with comm:" << communicator
243  << " warnComm:" << UPstream::warnComm
244  << endl;
246  }
247  vector2D twoScalars(Value, scalar(Count));
248  reduce(twoScalars, sumOp<vector2D>(), tag, communicator);
249 
250  Value = twoScalars.x();
251  Count = twoScalars.y();
252 }
253 
254 
255 void Foam::reduce
256 (
257  scalar& Value,
258  const sumOp<scalar>& bop,
259  const int tag,
260  const label communicator,
261  label& requestID
262 )
263 {
264 #ifdef MPIX_COMM_TYPE_SHARED
265  // Assume mpich2 with non-blocking collectives extensions. Once mpi3
266  // is available this will change.
267  MPI_Request request;
268  scalar v = Value;
269  MPIX_Ireduce
270  (
271  &v,
272  &Value,
273  1,
274  MPI_SCALAR,
275  MPI_SUM,
276  0, //root
277  PstreamGlobals::MPICommunicators_[communicator],
278  &request
279  );
280 
281  requestID = PstreamGlobals::outstandingRequests_.size();
282  PstreamGlobals::outstandingRequests_.append(request);
283 
284  if (UPstream::debug)
285  {
286  Pout<< "UPstream::allocateRequest for non-blocking reduce"
287  << " : request:" << requestID
288  << endl;
289  }
290 #else
291  // Non-blocking not yet implemented in mpi
292  reduce(Value, bop, tag, communicator);
293  requestID = -1;
294 #endif
295 }
296 
297 
299 (
300  const labelUList& sendData,
301  labelUList& recvData,
302  const label communicator
303 )
304 {
305  label np = nProcs(communicator);
306 
307  if (sendData.size() != np || recvData.size() != np)
308  {
310  << "Size of sendData " << sendData.size()
311  << " or size of recvData " << recvData.size()
312  << " is not equal to the number of processors in the domain "
313  << np
315  }
316 
317  if (!UPstream::parRun())
318  {
319  recvData.deepCopy(sendData);
320  }
321  else
322  {
323  if
324  (
325  MPI_Alltoall
326  (
327  // NOTE: const_cast is a temporary hack for
328  // backward-compatibility with versions of OpenMPI < 1.7.4
329  const_cast<label*>(sendData.begin()),
330  sizeof(label),
331  MPI_BYTE,
332  recvData.begin(),
333  sizeof(label),
334  MPI_BYTE,
336  )
337  )
338  {
340  << "MPI_Alltoall failed for " << sendData
341  << " on communicator " << communicator
343  }
344  }
345 }
346 
347 
348 void Foam::UPstream::allocatePstreamCommunicator
349 (
350  const label parentIndex,
351  const label index
352 )
353 {
354  if (index == PstreamGlobals::MPIGroups_.size())
355  {
356  // Extend storage with dummy values
357  MPI_Group newGroup = MPI_GROUP_NULL;
358  PstreamGlobals::MPIGroups_.append(newGroup);
359  MPI_Comm newComm = MPI_COMM_NULL;
360  PstreamGlobals::MPICommunicators_.append(newComm);
361  }
362  else if (index > PstreamGlobals::MPIGroups_.size())
363  {
365  << "PstreamGlobals out of sync with UPstream data. Problem."
367  }
368 
369 
370  if (parentIndex == -1)
371  {
372  // Allocate world communicator
373 
374  if (index != UPstream::worldComm)
375  {
377  << "world communicator should always be index "
379  }
380 
381  PstreamGlobals::MPICommunicators_[index] = MPI_COMM_WORLD;
382  MPI_Comm_group(MPI_COMM_WORLD, &PstreamGlobals::MPIGroups_[index]);
383  MPI_Comm_rank
384  (
386  &myProcNo_[index]
387  );
388 
389  // Set the number of processes to the actual number
390  int numProcs;
391  MPI_Comm_size(PstreamGlobals::MPICommunicators_[index], &numProcs);
392 
393  //procIDs_[index] = identity(numProcs);
394  procIDs_[index].setSize(numProcs);
395  forAll(procIDs_[index], i)
396  {
397  procIDs_[index][i] = i;
398  }
399  }
400  else
401  {
402  // Create new group
403  MPI_Group_incl
404  (
405  PstreamGlobals::MPIGroups_[parentIndex],
406  procIDs_[index].size(),
407  procIDs_[index].begin(),
409  );
410 
411  // Create new communicator
412  MPI_Comm_create
413  (
417  );
418 
419  if (PstreamGlobals::MPICommunicators_[index] == MPI_COMM_NULL)
420  {
421  myProcNo_[index] = -1;
422  }
423  else
424  {
425  if
426  (
427  MPI_Comm_rank
428  (
430  &myProcNo_[index]
431  )
432  )
433  {
435  << "Problem :"
436  << " when allocating communicator at " << index
437  << " from ranks " << procIDs_[index]
438  << " of parent " << parentIndex
439  << " cannot find my own rank"
441  }
442  }
443  }
444 }
445 
446 
447 void Foam::UPstream::freePstreamCommunicator(const label communicator)
448 {
449  if (communicator != UPstream::worldComm)
450  {
451  if (PstreamGlobals::MPICommunicators_[communicator] != MPI_COMM_NULL)
452  {
453  // Free communicator. Sets communicator to MPI_COMM_NULL
454  MPI_Comm_free(&PstreamGlobals::MPICommunicators_[communicator]);
455  }
456  if (PstreamGlobals::MPIGroups_[communicator] != MPI_GROUP_NULL)
457  {
458  // Free greoup. Sets group to MPI_GROUP_NULL
459  MPI_Group_free(&PstreamGlobals::MPIGroups_[communicator]);
460  }
461  }
462 }
463 
464 
466 {
468 }
469 
470 
472 {
474  {
476  }
477 }
478 
479 
480 void Foam::UPstream::waitRequests(const label start)
481 {
482  if (debug)
483  {
484  Pout<< "UPstream::waitRequests : starting wait for "
486  << " outstanding requests starting at " << start << endl;
487  }
488 
490  {
491  SubList<MPI_Request> waitRequests
492  (
495  start
496  );
497 
498  if
499  (
500  MPI_Waitall
501  (
502  waitRequests.size(),
503  waitRequests.begin(),
504  MPI_STATUSES_IGNORE
505  )
506  )
507  {
509  << "MPI_Waitall returned with error" << Foam::endl;
510  }
511 
512  resetRequests(start);
513  }
514 
515  if (debug)
516  {
517  Pout<< "UPstream::waitRequests : finished wait." << endl;
518  }
519 }
520 
521 
523 {
524  if (debug)
525  {
526  Pout<< "UPstream::waitRequest : starting wait for request:" << i
527  << endl;
528  }
529 
530  if (i >= PstreamGlobals::outstandingRequests_.size())
531  {
533  << "There are " << PstreamGlobals::outstandingRequests_.size()
534  << " outstanding send requests and you are asking for i=" << i
535  << nl
536  << "Maybe you are mixing blocking/non-blocking comms?"
538  }
539 
540  if
541  (
542  MPI_Wait
543  (
545  MPI_STATUS_IGNORE
546  )
547  )
548  {
550  << "MPI_Wait returned with error" << Foam::endl;
551  }
552 
553  if (debug)
554  {
555  Pout<< "UPstream::waitRequest : finished wait for request:" << i
556  << endl;
557  }
558 }
559 
560 
562 {
563  if (debug)
564  {
565  Pout<< "UPstream::finishedRequest : checking request:" << i
566  << endl;
567  }
568 
569  if (i >= PstreamGlobals::outstandingRequests_.size())
570  {
572  << "There are " << PstreamGlobals::outstandingRequests_.size()
573  << " outstanding send requests and you are asking for i=" << i
574  << nl
575  << "Maybe you are mixing blocking/non-blocking comms?"
577  }
578 
579  int flag;
580  MPI_Test
581  (
583  &flag,
584  MPI_STATUS_IGNORE
585  );
586 
587  if (debug)
588  {
589  Pout<< "UPstream::finishedRequest : finished request:" << i
590  << endl;
591  }
592 
593  return flag != 0;
594 }
595 
596 
598 {
599  int tag;
600  if (PstreamGlobals::freedTags_.size())
601  {
603  }
604  else
605  {
606  tag = PstreamGlobals::nTags_++;
607  }
608 
609  if (debug)
610  {
611  //if (UPstream::lateBlocking > 0)
612  //{
613  // string& poutp = Pout.prefix();
614  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
615  // Perr.prefix() = Pout.prefix();
616  //}
617  Pout<< "UPstream::allocateTag " << s
618  << " : tag:" << tag
619  << endl;
620  }
621 
622  return tag;
623 }
624 
625 
627 {
628  int tag;
629  if (PstreamGlobals::freedTags_.size())
630  {
632  }
633  else
634  {
635  tag = PstreamGlobals::nTags_++;
636  }
637 
638  if (debug)
639  {
640  //if (UPstream::lateBlocking > 0)
641  //{
642  // string& poutp = Pout.prefix();
643  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
644  // Perr.prefix() = Pout.prefix();
645  //}
646  Pout<< "UPstream::allocateTag " << s
647  << " : tag:" << tag
648  << endl;
649  }
650 
651  return tag;
652 }
653 
654 
655 void Foam::UPstream::freeTag(const char* s, const int tag)
656 {
657  if (debug)
658  {
659  //if (UPstream::lateBlocking > 0)
660  //{
661  // string& poutp = Pout.prefix();
662  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
663  // Perr.prefix() = Pout.prefix();
664  //}
665  Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
666  }
668 }
669 
670 
671 void Foam::UPstream::freeTag(const word& s, const int tag)
672 {
673  if (debug)
674  {
675  //if (UPstream::lateBlocking > 0)
676  //{
677  // string& poutp = Pout.prefix();
678  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
679  // Perr.prefix() = Pout.prefix();
680  //}
681  Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
682  }
684 }
685 
686 
687 // ************************************************************************* //
DynamicList< int > freedTags_
string getEnv(const word &)
Return environment variable of given name.
Definition: POSIX.C:102
static void printStack(Ostream &)
Helper function to print a stack.
#define forAll(list, i)
Loop across all elements in list.
Definition: UList.H:428
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
Inter-processor communication reduction functions.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
error FatalError
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:319
Various functions to wrap MPI_Allreduce.
Vector2D< scalar > vector2D
vector2D obtained from generic Vector2D
Definition: vector2D.H:49
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:253
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished?
Definition: UPstream.C:125
DynamicList< MPI_Request > outstandingRequests_
static label nRequests()
Get number of outstanding requests.
Definition: UPstream.C:107
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:274
DynamicList< MPI_Group > MPIGroups_
UList< label > labelUList
Definition: UList.H:64
Functions used by OpenFOAM that are specific to POSIX compliant operating systems and need to be repl...
gmvFile<< "tracers "<< particles.size()<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){gmvFile<< iter().position().x()<< " ";}gmvFile<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){gmvFile<< iter().position().y()<< " ";}gmvFile<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){gmvFile<< iter().position().z()<< " ";}gmvFile<< nl;forAll(lagrangianScalarNames, i){word name=lagrangianScalarNames[i];IOField< scalar > s(IOobject(name, runTime.timeName(), cloud::prefix, mesh, IOobject::MUST_READ, IOobject::NO_WRITE))
static int allocateTag(const char *)
Definition: UPstream.C:597
static void freeTag(const char *, const int tag)
Definition: UPstream.C:655
A class for handling words, derived from string.
Definition: word.H:59
void sumReduce(T &Value, label &Count, const int tag=Pstream::msgType(), const label comm=UPstream::worldComm)
static void resetRequests(const label sz)
Truncate number of outstanding requests.
Definition: UPstream.C:113
static bool init(int &argc, char **&argv)
Initialisation function called from main.
Definition: UPstream.C:35
DynamicList< T, SizeInc, SizeMult, SizeDiv > & append(const T &)
Append an element at the end of the list.
Definition: DynamicListI.H:292
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:277
errorManip< error > abort(error &err)
Definition: errorManip.H:131
prefixOSstream Pout(cout,"Pout")
Definition: IOstreams.H:53
static void exit(int errnum=1)
Exit program.
Definition: UPstream.C:46
static const char nl
Definition: Ostream.H:262
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
Definition: UPstream.C:117
void reduce(const List< UPstream::commsStruct > &comms, T &Value, const BinaryOp &bop, const int tag, const label comm)
T remove()
Remove and return the top element.
Definition: DynamicListI.H:347
static void abort()
Abort program.
Definition: UPstream.C:52
static bool & parRun()
Is this a parallel run?
Definition: UPstream.H:393
static label nProcs(const label communicator=0)
Number of processes in parallel run.
Definition: UPstream.H:399
#define WarningInFunction
Report a warning using Foam::Warning.
DynamicList< MPI_Comm > MPICommunicators_
static void allToAll(const labelUList &sendData, labelUList &recvData, const label communicator=0)
Exchange label with all processors (in the communicator).
Definition: UPstream.C:85
static void waitRequest(const label i)
Wait until request i has finished.
Definition: UPstream.C:121
static void addValidParOptions(HashTable< string > &validParOptions)
Add the valid option this type of communications library.
Definition: UPstream.C:31
void allReduce(Type &Value, int count, MPI_Datatype MPIType, MPI_Op op, const BinaryOp &bop, const int tag, const label communicator)