UPstream.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | Copyright (C) 2011-2015 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include "UPstream.H"
27 #include "PstreamReduceOps.H"
28 #include "OSspecific.H"
29 #include "PstreamGlobals.H"
30 #include "SubList.H"
31 #include "allReduce.H"
32 
33 #include <mpi.h>
34 
35 #include <cstring>
36 #include <cstdlib>
37 #include <csignal>
38 
39 #if defined(WM_SP)
40 # define MPI_SCALAR MPI_FLOAT
41 #elif defined(WM_DP)
42 # define MPI_SCALAR MPI_DOUBLE
43 #endif
44 
45 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
46 
47 // NOTE:
48 // valid parallel options vary between implementations, but flag common ones.
49 // if they are not removed by MPI_Init(), the subsequent argument processing
50 // will notice that they are wrong
51 void Foam::UPstream::addValidParOptions(HashTable<string>& validParOptions)
52 {
53  validParOptions.insert("np", "");
54  validParOptions.insert("p4pg", "PI file");
55  validParOptions.insert("p4wd", "directory");
56  validParOptions.insert("p4amslave", "");
57  validParOptions.insert("p4yourname", "hostname");
58  validParOptions.insert("machinefile", "machine file");
59 }
60 
61 
62 bool Foam::UPstream::init(int& argc, char**& argv)
63 {
64  MPI_Init(&argc, &argv);
65 
66  int numprocs;
67  MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
68  int myRank;
69  MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
70 
71  if (debug)
72  {
73  Pout<< "UPstream::init : initialised with numProcs:" << numprocs
74  << " myRank:" << myRank << endl;
75  }
76 
77  if (numprocs <= 1)
78  {
79  FatalErrorIn("UPstream::init(int& argc, char**& argv)")
80  << "bool IPstream::init(int& argc, char**& argv) : "
81  "attempt to run parallel on 1 processor"
83  }
84 
85 
86  // Initialise parallel structure
87  setParRun(numprocs);
88 
89 # ifndef SGIMPI
90  string bufferSizeName = getEnv("MPI_BUFFER_SIZE");
91 
92  if (bufferSizeName.size())
93  {
94  int bufferSize = atoi(bufferSizeName.c_str());
95 
96  if (bufferSize)
97  {
98  MPI_Buffer_attach(new char[bufferSize], bufferSize);
99  }
100  }
101  else
102  {
103  FatalErrorIn("UPstream::init(int& argc, char**& argv)")
104  << "UPstream::init(int& argc, char**& argv) : "
105  << "environment variable MPI_BUFFER_SIZE not defined"
107  }
108 # endif
109 
110  //int processorNameLen;
111  //char processorName[MPI_MAX_PROCESSOR_NAME];
112  //
113  //MPI_Get_processor_name(processorName, &processorNameLen);
114  //processorName[processorNameLen] = '\0';
115  //Pout<< "Processor name:" << processorName << endl;
116 
117  return true;
118 }
119 
120 
121 void Foam::UPstream::exit(int errnum)
122 {
123  if (debug)
124  {
125  Pout<< "UPstream::exit." << endl;
126  }
127 
128 # ifndef SGIMPI
129  int size;
130  char* buff;
131  MPI_Buffer_detach(&buff, &size);
132  delete[] buff;
133 # endif
134 
136  {
139 
140  WarningIn("UPstream::exit(int)")
141  << "There are still " << n << " outstanding MPI_Requests." << endl
142  << "This means that your code exited before doing a"
143  << " UPstream::waitRequests()." << endl
144  << "This should not happen for a normal code exit."
145  << endl;
146  }
147 
148  // Clean mpi communicators
149  forAll(myProcNo_, communicator)
150  {
151  if (myProcNo_[communicator] != -1)
152  {
153  freePstreamCommunicator(communicator);
154  }
155  }
156 
157  if (errnum == 0)
158  {
159  MPI_Finalize();
160  ::exit(errnum);
161  }
162  else
163  {
164  MPI_Abort(MPI_COMM_WORLD, errnum);
165  }
166 }
167 
168 
170 {
171  MPI_Abort(MPI_COMM_WORLD, 1);
172 }
173 
174 
175 void Foam::reduce
176 (
177  scalar& Value,
178  const sumOp<scalar>& bop,
179  const int tag,
180  const label communicator
181 )
182 {
183  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
184  {
185  Pout<< "** reducing:" << Value << " with comm:" << communicator
186  << " warnComm:" << UPstream::warnComm
187  << endl;
189  }
190  allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
191 }
192 
193 
194 void Foam::reduce
195 (
196  scalar& Value,
197  const minOp<scalar>& bop,
198  const int tag,
199  const label communicator
200 )
201 {
202  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
203  {
204  Pout<< "** reducing:" << Value << " with comm:" << communicator
205  << " warnComm:" << UPstream::warnComm
206  << endl;
208  }
209  allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag, communicator);
210 }
211 
212 
213 void Foam::reduce
214 (
215  vector2D& Value,
216  const sumOp<vector2D>& bop,
217  const int tag,
218  const label communicator
219 )
220 {
221  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
222  {
223  Pout<< "** reducing:" << Value << " with comm:" << communicator
224  << " warnComm:" << UPstream::warnComm
225  << endl;
227  }
228  allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
229 }
230 
231 
232 void Foam::sumReduce
233 (
234  scalar& Value,
235  label& Count,
236  const int tag,
237  const label communicator
238 )
239 {
240  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
241  {
242  Pout<< "** reducing:" << Value << " with comm:" << communicator
243  << " warnComm:" << UPstream::warnComm
244  << endl;
246  }
247  vector2D twoScalars(Value, scalar(Count));
248  reduce(twoScalars, sumOp<vector2D>(), tag, communicator);
249 
250  Value = twoScalars.x();
251  Count = twoScalars.y();
252 }
253 
254 
255 void Foam::reduce
256 (
257  scalar& Value,
258  const sumOp<scalar>& bop,
259  const int tag,
260  const label communicator,
261  label& requestID
262 )
263 {
264 #ifdef MPIX_COMM_TYPE_SHARED
265  // Assume mpich2 with non-blocking collectives extensions. Once mpi3
266  // is available this will change.
267  MPI_Request request;
268  scalar v = Value;
269  MPIX_Ireduce
270  (
271  &v,
272  &Value,
273  1,
274  MPI_SCALAR,
275  MPI_SUM,
276  0, //root
277  PstreamGlobals::MPICommunicators_[communicator],
278  &request
279  );
280 
281  requestID = PstreamGlobals::outstandingRequests_.size();
282  PstreamGlobals::outstandingRequests_.append(request);
283 
284  if (UPstream::debug)
285  {
286  Pout<< "UPstream::allocateRequest for non-blocking reduce"
287  << " : request:" << requestID
288  << endl;
289  }
290 #else
291  // Non-blocking not yet implemented in mpi
292  reduce(Value, bop, tag, communicator);
293  requestID = -1;
294 #endif
295 }
296 
297 
298 void Foam::UPstream::allocatePstreamCommunicator
299 (
300  const label parentIndex,
301  const label index
302 )
303 {
304  if (index == PstreamGlobals::MPIGroups_.size())
305  {
306  // Extend storage with dummy values
307  MPI_Group newGroup = MPI_GROUP_NULL;
308  PstreamGlobals::MPIGroups_.append(newGroup);
309  MPI_Comm newComm = MPI_COMM_NULL;
310  PstreamGlobals::MPICommunicators_.append(newComm);
311  }
312  else if (index > PstreamGlobals::MPIGroups_.size())
313  {
315  (
316  "UPstream::allocatePstreamCommunicator\n"
317  "(\n"
318  " const label parentIndex,\n"
319  " const labelList& subRanks\n"
320  ")\n"
321  ) << "PstreamGlobals out of sync with UPstream data. Problem."
323  }
324 
325 
326  if (parentIndex == -1)
327  {
328  // Allocate world communicator
329 
330  if (index != UPstream::worldComm)
331  {
333  (
334  "UPstream::allocateCommunicator\n"
335  "(\n"
336  " const label parentIndex,\n"
337  " const labelList& subRanks\n"
338  ")\n"
339  ) << "world communicator should always be index "
341  }
342 
343  PstreamGlobals::MPICommunicators_[index] = MPI_COMM_WORLD;
344  MPI_Comm_group(MPI_COMM_WORLD, &PstreamGlobals::MPIGroups_[index]);
345  MPI_Comm_rank
346  (
348  &myProcNo_[index]
349  );
350 
351  // Set the number of processes to the actual number
352  int numProcs;
353  MPI_Comm_size(PstreamGlobals::MPICommunicators_[index], &numProcs);
354 
355  //procIDs_[index] = identity(numProcs);
356  procIDs_[index].setSize(numProcs);
357  forAll(procIDs_[index], i)
358  {
359  procIDs_[index][i] = i;
360  }
361  }
362  else
363  {
364  // Create new group
365  MPI_Group_incl
366  (
367  PstreamGlobals::MPIGroups_[parentIndex],
368  procIDs_[index].size(),
369  procIDs_[index].begin(),
371  );
372 
373  // Create new communicator
374  MPI_Comm_create
375  (
379  );
380 
381  if (PstreamGlobals::MPICommunicators_[index] == MPI_COMM_NULL)
382  {
383  myProcNo_[index] = -1;
384  }
385  else
386  {
387  if
388  (
389  MPI_Comm_rank
390  (
392  &myProcNo_[index]
393  )
394  )
395  {
397  (
398  "UPstream::allocatePstreamCommunicator\n"
399  "(\n"
400  " const label,\n"
401  " const labelList&\n"
402  ")\n"
403  ) << "Problem :"
404  << " when allocating communicator at " << index
405  << " from ranks " << procIDs_[index]
406  << " of parent " << parentIndex
407  << " cannot find my own rank"
409  }
410  }
411  }
412 }
413 
414 
415 void Foam::UPstream::freePstreamCommunicator(const label communicator)
416 {
417  if (communicator != UPstream::worldComm)
418  {
419  if (PstreamGlobals::MPICommunicators_[communicator] != MPI_COMM_NULL)
420  {
421  // Free communicator. Sets communicator to MPI_COMM_NULL
422  MPI_Comm_free(&PstreamGlobals::MPICommunicators_[communicator]);
423  }
424  if (PstreamGlobals::MPIGroups_[communicator] != MPI_GROUP_NULL)
425  {
426  // Free greoup. Sets group to MPI_GROUP_NULL
427  MPI_Group_free(&PstreamGlobals::MPIGroups_[communicator]);
428  }
429  }
430 }
431 
432 
434 {
436 }
437 
438 
440 {
442  {
444  }
445 }
446 
447 
448 void Foam::UPstream::waitRequests(const label start)
449 {
450  if (debug)
451  {
452  Pout<< "UPstream::waitRequests : starting wait for "
454  << " outstanding requests starting at " << start << endl;
455  }
456 
458  {
459  SubList<MPI_Request> waitRequests
460  (
463  start
464  );
465 
466  if
467  (
468  MPI_Waitall
469  (
470  waitRequests.size(),
471  waitRequests.begin(),
472  MPI_STATUSES_IGNORE
473  )
474  )
475  {
477  (
478  "UPstream::waitRequests()"
479  ) << "MPI_Waitall returned with error" << Foam::endl;
480  }
481 
482  resetRequests(start);
483  }
484 
485  if (debug)
486  {
487  Pout<< "UPstream::waitRequests : finished wait." << endl;
488  }
489 }
490 
491 
493 {
494  if (debug)
495  {
496  Pout<< "UPstream::waitRequest : starting wait for request:" << i
497  << endl;
498  }
499 
500  if (i >= PstreamGlobals::outstandingRequests_.size())
501  {
503  (
504  "UPstream::waitRequest(const label)"
505  ) << "There are " << PstreamGlobals::outstandingRequests_.size()
506  << " outstanding send requests and you are asking for i=" << i
507  << nl
508  << "Maybe you are mixing blocking/non-blocking comms?"
510  }
511 
512  if
513  (
514  MPI_Wait
515  (
517  MPI_STATUS_IGNORE
518  )
519  )
520  {
522  (
523  "UPstream::waitRequest()"
524  ) << "MPI_Wait returned with error" << Foam::endl;
525  }
526 
527  if (debug)
528  {
529  Pout<< "UPstream::waitRequest : finished wait for request:" << i
530  << endl;
531  }
532 }
533 
534 
536 {
537  if (debug)
538  {
539  Pout<< "UPstream::finishedRequest : checking request:" << i
540  << endl;
541  }
542 
543  if (i >= PstreamGlobals::outstandingRequests_.size())
544  {
546  (
547  "UPstream::finishedRequest(const label)"
548  ) << "There are " << PstreamGlobals::outstandingRequests_.size()
549  << " outstanding send requests and you are asking for i=" << i
550  << nl
551  << "Maybe you are mixing blocking/non-blocking comms?"
553  }
554 
555  int flag;
556  MPI_Test
557  (
559  &flag,
560  MPI_STATUS_IGNORE
561  );
562 
563  if (debug)
564  {
565  Pout<< "UPstream::finishedRequest : finished request:" << i
566  << endl;
567  }
568 
569  return flag != 0;
570 }
571 
572 
574 {
575  int tag;
576  if (PstreamGlobals::freedTags_.size())
577  {
579  }
580  else
581  {
582  tag = PstreamGlobals::nTags_++;
583  }
584 
585  if (debug)
586  {
587  //if (UPstream::lateBlocking > 0)
588  //{
589  // string& poutp = Pout.prefix();
590  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
591  // Perr.prefix() = Pout.prefix();
592  //}
593  Pout<< "UPstream::allocateTag " << s
594  << " : tag:" << tag
595  << endl;
596  }
597 
598  return tag;
599 }
600 
601 
603 {
604  int tag;
605  if (PstreamGlobals::freedTags_.size())
606  {
608  }
609  else
610  {
611  tag = PstreamGlobals::nTags_++;
612  }
613 
614  if (debug)
615  {
616  //if (UPstream::lateBlocking > 0)
617  //{
618  // string& poutp = Pout.prefix();
619  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = 'X';
620  // Perr.prefix() = Pout.prefix();
621  //}
622  Pout<< "UPstream::allocateTag " << s
623  << " : tag:" << tag
624  << endl;
625  }
626 
627  return tag;
628 }
629 
630 
631 void Foam::UPstream::freeTag(const char* s, const int tag)
632 {
633  if (debug)
634  {
635  //if (UPstream::lateBlocking > 0)
636  //{
637  // string& poutp = Pout.prefix();
638  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
639  // Perr.prefix() = Pout.prefix();
640  //}
641  Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
642  }
644 }
645 
646 
647 void Foam::UPstream::freeTag(const word& s, const int tag)
648 {
649  if (debug)
650  {
651  //if (UPstream::lateBlocking > 0)
652  //{
653  // string& poutp = Pout.prefix();
654  // poutp[poutp.size()-2*(UPstream::lateBlocking+2)+tag] = ' ';
655  // Perr.prefix() = Pout.prefix();
656  //}
657  Pout<< "UPstream::freeTag " << s << " tag:" << tag << endl;
658  }
660 }
661 
662 
663 // ************************************************************************* //
void reduce(const List< UPstream::commsStruct > &comms, T &Value, const BinaryOp &bop, const int tag, const label comm)
static int allocateTag(const char *)
Definition: UPstream.C:573
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished?
Definition: UPstream.C:114
gmvFile<< "tracers "<< particles.size()<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){gmvFile<< iter().position().x()<< " ";}gmvFile<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){gmvFile<< iter().position().y()<< " ";}gmvFile<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){gmvFile<< iter().position().z()<< " ";}gmvFile<< nl;forAll(lagrangianScalarNames, i){word name=lagrangianScalarNames[i];IOField< scalar > s(IOobject( name, runTime.timeName(), cloud::prefix, mesh, IOobject::MUST_READ, IOobject::NO_WRITE ))
static void resetRequests(const label sz)
Truncate number of outstanding requests.
Definition: UPstream.C:102
DynamicList< MPI_Group > MPIGroups_
string getEnv(const word &)
Return environment variable of given name.
Definition: POSIX.C:102
Functions used by OpenFOAM that are specific to POSIX compliant operating systems and need to be repl...
A class for handling words, derived from string.
Definition: word.H:59
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
DynamicList< T, SizeInc, SizeMult, SizeDiv > & append(const T &)
Append an element at the end of the list.
Definition: DynamicListI.H:310
Various functions to wrap MPI_Allreduce.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
DynamicList< int > freedTags_
static void abort()
Abort program.
Definition: UPstream.C:52
static void printStack(Ostream &)
Helper function to print a stack.
T remove()
Remove and return the top element.
Definition: DynamicListI.H:368
static void exit(int errnum=1)
Exit program.
Definition: UPstream.C:46
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
Definition: UPstream.C:106
static const char nl
Definition: Ostream.H:260
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:251
static void freeTag(const char *, const int tag)
Definition: UPstream.C:631
#define WarningIn(functionName)
Report a warning using Foam::Warning.
static void addValidParOptions(HashTable< string > &validParOptions)
Add the valid option this type of communications library.
Definition: UPstream.C:31
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:264
#define forAll(list, i)
Definition: UList.H:421
Vector2D< scalar > vector2D
vector2D obtained from generic Vector2D
Definition: vector2D.H:49
DynamicList< MPI_Comm > MPICommunicators_
errorManip< error > abort(error &err)
Definition: errorManip.H:131
DynamicList< MPI_Request > outstandingRequests_
#define FatalErrorIn(functionName)
Report an error message using Foam::FatalError.
Definition: error.H:314
static label nRequests()
Get number of outstanding requests.
Definition: UPstream.C:96
error FatalError
void allReduce(Type &Value, int count, MPI_Datatype MPIType, MPI_Op op, const BinaryOp &bop, const int tag, const label communicator)
static void waitRequest(const label i)
Wait until request i has finished.
Definition: UPstream.C:110
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:261
void sumReduce(T &Value, label &Count, const int tag=Pstream::msgType(), const label comm=UPstream::worldComm)
static bool init(int &argc, char **&argv)
Initialisation function called from main.
Definition: UPstream.C:35
prefixOSstream Pout(cout,"Pout")
Definition: IOstreams.H:53