40 #define MPI_SCALAR MPI_FLOAT 42 #define MPI_SCALAR MPI_DOUBLE 44 #define MPI_SCALAR MPI_LONG_DOUBLE 55 validParOptions.insert(
"np",
"");
56 validParOptions.insert(
"p4pg",
"PI file");
57 validParOptions.insert(
"p4wd",
"directory");
58 validParOptions.insert(
"p4amslave",
"");
59 validParOptions.insert(
"p4yourname",
"hostname");
60 validParOptions.insert(
"machinefile",
"machine file");
67 int provided_thread_support;
77 &provided_thread_support
86 MPI_Comm_rank(MPI_COMM_WORLD, &myGlobalRank);
102 Pout<<
"UPstream::init : initialised with numProcs:" << numprocs
103 <<
" myRank:" << myRank <<
endl;
109 <<
"bool IPstream::init(int& argc, char**& argv) : " 110 "attempt to run parallel on 1 processor" 116 setParRun(numprocs, provided_thread_support == MPI_THREAD_MULTIPLE);
119 string bufferSizeName =
getEnv(
"MPI_BUFFER_SIZE");
121 if (bufferSizeName.size())
123 int bufferSize = atoi(bufferSizeName.c_str());
127 MPI_Buffer_attach(
new char[bufferSize], bufferSize);
133 <<
"UPstream::init(int& argc, char**& argv) : " 134 <<
"environment variable MPI_BUFFER_SIZE not defined" 160 MPI_Buffer_detach(&buff, &size);
170 <<
"There are still " << n <<
" outstanding MPI_Requests." <<
endl 171 <<
"This means that your code exited before doing a" 172 <<
" UPstream::waitRequests()." <<
endl 173 <<
"This should not happen for a normal code exit." 178 forAll(myProcNo_, communicator)
180 if (myProcNo_[communicator] != -1)
182 freePstreamCommunicator(communicator);
207 const sumOp<scalar>& bop,
209 const label communicator
214 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
219 allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
226 const minOp<scalar>& bop,
228 const label communicator
233 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
238 allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag, communicator);
245 const sumOp<vector2D>& bop,
247 const label communicator
252 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
257 allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
266 const label communicator
271 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
276 vector2D twoScalars(Value, scalar(Count));
277 reduce(twoScalars, sumOp<vector2D>(), tag, communicator);
279 Value = twoScalars.x();
280 Count = twoScalars.y();
287 const sumOp<scalar>& bop,
289 const label communicator,
293 #ifdef MPIX_COMM_TYPE_SHARED 315 Pout<<
"UPstream::allocateRequest for non-blocking reduce" 316 <<
" : request:" << requestID
321 reduce(Value, bop, tag, communicator);
331 const label communicator
336 if (sendData.size() != np || recvData.size() != np)
339 <<
"Size of sendData " << sendData.size()
340 <<
" or size of recvData " << recvData.size()
341 <<
" is not equal to the number of processors in the domain " 348 recvData.deepCopy(sendData);
356 const_cast<label*>(sendData.begin()),
367 <<
"MPI_Alltoall failed for " << sendData
368 <<
" on communicator " << communicator
377 const char* sendData,
385 const label communicator
392 sendSizes.
size() != np
393 || sendOffsets.
size() != np
394 || recvSizes.
size() != np
395 || recvOffsets.
size() != np
399 <<
"Size of sendSize " << sendSizes.
size()
400 <<
", sendOffsets " << sendOffsets.
size()
401 <<
", recvSizes " << recvSizes.
size()
402 <<
" or recvOffsets " << recvOffsets.
size()
403 <<
" is not equal to the number of processors in the domain " 410 if (recvSizes[0] != sendSizes[0])
413 <<
"Bytes to send " << sendSizes[0]
414 <<
" does not equal bytes to receive " << recvSizes[0]
417 memmove(recvData, &sendData[sendOffsets[0]], recvSizes[0]);
425 const_cast<char*>(sendData),
426 const_cast<int*>(sendSizes.
begin()),
427 const_cast<int*>(sendOffsets.
begin()),
430 const_cast<int*>(recvSizes.
begin()),
431 const_cast<int*>(recvOffsets.
begin()),
438 <<
"MPI_Alltoallv failed for sendSizes " << sendSizes
439 <<
" recvSizes " << recvSizes
440 <<
" communicator " << communicator
449 const char* sendData,
455 const label communicator
463 && (recvSizes.
size() != np || recvOffsets.
size() < np)
470 <<
"Size of recvSizes " << recvSizes.
size()
471 <<
" or recvOffsets " << recvOffsets.
size()
472 <<
" is not equal to the number of processors in the domain " 479 memmove(recvData, sendData, sendSize);
487 const_cast<char*>(sendData),
491 const_cast<int*>(recvSizes.
begin()),
492 const_cast<int*>(recvOffsets.
begin()),
500 <<
"MPI_Gatherv failed for sendSize " << sendSize
501 <<
" recvSizes " << recvSizes
502 <<
" communicator " << communicator
511 const char* sendData,
517 const label communicator
525 && (sendSizes.
size() != np || sendOffsets.
size() != np)
529 <<
"Size of sendSizes " << sendSizes.
size()
530 <<
" or sendOffsets " << sendOffsets.
size()
531 <<
" is not equal to the number of processors in the domain " 538 memmove(recvData, sendData, recvSize);
546 const_cast<char*>(sendData),
547 const_cast<int*>(sendSizes.
begin()),
548 const_cast<int*>(sendOffsets.
begin()),
559 <<
"MPI_Scatterv failed for sendSizes " << sendSizes
560 <<
" sendOffsets " << sendOffsets
561 <<
" communicator " << communicator
568 void Foam::UPstream::allocatePstreamCommunicator
570 const label parentIndex,
577 MPI_Group newGroup = MPI_GROUP_NULL;
579 MPI_Comm newComm = MPI_COMM_NULL;
585 <<
"PstreamGlobals out of sync with UPstream data. Problem." 590 if (parentIndex == -1)
597 <<
"world communicator should always be index " 619 procIDs_[index].setSize(numProcs);
620 forAll(procIDs_[index], i)
622 procIDs_[index][i] = i;
631 procIDs_[index].size(),
632 procIDs_[index].begin(),
646 myProcNo_[index] = -1;
661 <<
" when allocating communicator at " << index
662 <<
" from ranks " << procIDs_[index]
663 <<
" of parent " << parentIndex
664 <<
" cannot find my own rank" 672 void Foam::UPstream::freePstreamCommunicator(
const label communicator)
709 Pout<<
"UPstream::waitRequests : starting wait for " 711 <<
" outstanding requests starting at " << start <<
endl;
728 waitRequests.
begin(),
734 <<
"MPI_Waitall returned with error" <<
Foam::endl;
742 Pout<<
"UPstream::waitRequests : finished wait." <<
endl;
751 Pout<<
"UPstream::waitRequest : starting wait for request:" << i
759 <<
" outstanding send requests and you are asking for i=" << i
761 <<
"Maybe you are mixing blocking/non-blocking comms?" 775 <<
"MPI_Wait returned with error" <<
Foam::endl;
780 Pout<<
"UPstream::waitRequest : finished wait for request:" << i
790 Pout<<
"UPstream::finishedRequest : checking request:" << i
798 <<
" outstanding send requests and you are asking for i=" << i
800 <<
"Maybe you are mixing blocking/non-blocking comms?" 814 Pout<<
"UPstream::finishedRequest : finished request:" << i
842 Pout<<
"UPstream::allocateTag " << s
871 Pout<<
"UPstream::allocateTag " << s
890 Pout<<
"UPstream::freeTag " << s <<
" tag:" << tag <<
endl;
906 Pout<<
"UPstream::freeTag " << s <<
" tag:" << tag <<
endl;
DynamicList< int > freedTags_
string getEnv(const word &)
Return environment variable of given name.
static void printStack(Ostream &)
Helper function to print a stack.
#define forAll(list, i)
Loop across all elements in list.
FvWallInfoData< WallInfo, label > label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Inter-processor communication reduction functions.
errorManipArg< error, int > exit(error &err, const int errNo=1)
static void scatter(const char *sendData, const UList< int > &sendSizes, const UList< int > &sendOffsets, char *recvData, int recvSize, const label communicator=0)
Send data to all processors from the root of the communicator.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Various functions to wrap MPI_Allreduce.
Vector2D< scalar > vector2D
vector2D obtained from generic Vector2D
Ostream & endl(Ostream &os)
Add newline and flush stream.
static bool master(const label communicator=0)
Am I the master process.
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished?
DynamicList< MPI_Request > outstandingRequests_
static label nRequests()
Get number of outstanding requests.
static label worldComm
Default communicator (all processors)
DynamicList< MPI_Group > MPIGroups_
UList< label > labelUList
Functions used by OpenFOAM that are specific to POSIX compliant operating systems and need to be repl...
A List obtained as a section of another List.
gmvFile<< "tracers "<< particles.size()<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){ gmvFile<< iter().position().x()<< " ";}gmvFile<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){ gmvFile<< iter().position().y()<< " ";}gmvFile<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){ gmvFile<< iter().position().z()<< " ";}gmvFile<< nl;forAll(lagrangianScalarNames, i){ word name=lagrangianScalarNames[i];IOField< scalar > s(IOobject(name, runTime.timeName(), cloud::prefix, mesh, IOobject::MUST_READ, IOobject::NO_WRITE))
static int allocateTag(const char *)
static void freeTag(const char *, const int tag)
static bool init(int &argc, char **&argv, const bool needsThread)
Initialisation function called from main.
A class for handling words, derived from string.
void sumReduce(T &Value, label &Count, const int tag=Pstream::msgType(), const label comm=UPstream::worldComm)
static void resetRequests(const label sz)
Truncate number of outstanding requests.
iterator begin()
Return an iterator to begin traversing the UList.
DynamicList< T, SizeInc, SizeMult, SizeDiv > & append(const T &)
Append an element at the end of the list.
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
errorManip< error > abort(error &err)
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
static void exit(int errnum=1)
Exit program.
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
void reduce(const List< UPstream::commsStruct > &comms, T &Value, const BinaryOp &bop, const int tag, const label comm)
T remove()
Remove and return the top element.
static void abort()
Abort program.
static bool & parRun()
Is this a parallel run?
static label nProcs(const label communicator=0)
Number of processes in parallel run.
static void gather(const char *sendData, int sendSize, char *recvData, const UList< int > &recvSizes, const UList< int > &recvOffsets, const label communicator=0)
Receive data from all processors on the master.
#define WarningInFunction
Report a warning using Foam::Warning.
prefixOSstream Pout(cout, "Pout")
DynamicList< MPI_Comm > MPICommunicators_
static void allToAll(const labelUList &sendData, labelUList &recvData, const label communicator=0)
Exchange label with all processors (in the communicator).
label size() const
Return the number of elements in the UList.
static void waitRequest(const label i)
Wait until request i has finished.
static void addValidParOptions(HashTable< string > &validParOptions)
Add the valid option this type of communications library.
void allReduce(Type &Value, int count, MPI_Datatype MPIType, MPI_Op op, const BinaryOp &bop, const int tag, const label communicator)