40 #define MPI_SCALAR MPI_FLOAT
42 #define MPI_SCALAR MPI_DOUBLE
44 #define MPI_SCALAR MPI_LONG_DOUBLE
55 validParOptions.insert(
"np",
"");
56 validParOptions.insert(
"p4pg",
"PI file");
57 validParOptions.insert(
"p4wd",
"directory");
58 validParOptions.insert(
"p4amslave",
"");
59 validParOptions.insert(
"p4yourname",
"hostname");
60 validParOptions.insert(
"machinefile",
"machine file");
67 int provided_thread_support;
77 &provided_thread_support
86 MPI_Comm_rank(MPI_COMM_WORLD, &myGlobalRank);
102 Pout<<
"UPstream::init : initialised with numProcs:" << numprocs
103 <<
" myRank:" << myRank <<
endl;
109 <<
"bool IPstream::init(int& argc, char**& argv) : "
110 "attempt to run parallel on 1 processor"
116 setParRun(numprocs, provided_thread_support == MPI_THREAD_MULTIPLE);
119 string bufferSizeName =
getEnv(
"MPI_BUFFER_SIZE");
121 if (bufferSizeName.size())
123 int bufferSize = atoi(bufferSizeName.c_str());
127 MPI_Buffer_attach(
new char[bufferSize], bufferSize);
133 <<
"UPstream::init(int& argc, char**& argv) : "
134 <<
"environment variable MPI_BUFFER_SIZE not defined"
160 MPI_Buffer_detach(&buff, &size);
170 <<
"There are still " <<
n <<
" outstanding MPI_Requests." <<
endl
171 <<
"This means that your code exited before doing a"
172 <<
" UPstream::waitRequests()." <<
endl
173 <<
"This should not happen for a normal code exit."
178 forAll(myProcNo_, communicator)
180 if (myProcNo_[communicator] != -1)
182 freePstreamCommunicator(communicator);
207 const sumOp<scalar>& bop,
209 const label communicator
214 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
219 allReduce(Value, 1, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
226 const minOp<scalar>& bop,
228 const label communicator
233 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
238 allReduce(Value, 1, MPI_SCALAR, MPI_MIN, bop, tag, communicator);
245 const sumOp<vector2D>& bop,
247 const label communicator
252 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
257 allReduce(Value, 2, MPI_SCALAR, MPI_SUM, bop, tag, communicator);
266 const label communicator
271 Pout<<
"** reducing:" << Value <<
" with comm:" << communicator
276 vector2D twoScalars(Value, scalar(Count));
277 reduce(twoScalars, sumOp<vector2D>(), tag, communicator);
279 Value = twoScalars.x();
280 Count = twoScalars.y();
287 const sumOp<scalar>& bop,
289 const label communicator,
293 #ifdef MPIX_COMM_TYPE_SHARED
315 Pout<<
"UPstream::allocateRequest for non-blocking reduce"
316 <<
" : request:" << requestID
321 reduce(Value, bop, tag, communicator);
331 const label communicator
334 label np = nProcs(communicator);
336 if (sendData.size() != np || recvData.size() != np)
339 <<
"Size of sendData " << sendData.size()
340 <<
" or size of recvData " << recvData.size()
341 <<
" is not equal to the number of processors in the domain "
348 recvData.deepCopy(sendData);
356 const_cast<label*
>(sendData.begin()),
367 <<
"MPI_Alltoall failed for " << sendData
368 <<
" on communicator " << communicator
377 const char* sendData,
392 sendSizes.
size() != np
393 || sendOffsets.
size() != np
394 || recvSizes.
size() != np
395 || recvOffsets.
size() != np
399 <<
"Size of sendSize " << sendSizes.
size()
400 <<
", sendOffsets " << sendOffsets.
size()
401 <<
", recvSizes " << recvSizes.
size()
402 <<
" or recvOffsets " << recvOffsets.
size()
403 <<
" is not equal to the number of processors in the domain "
410 if (recvSizes[0] != sendSizes[0])
413 <<
"Bytes to send " << sendSizes[0]
414 <<
" does not equal bytes to receive " << recvSizes[0]
417 memmove(recvData, &sendData[sendOffsets[0]], recvSizes[0]);
425 const_cast<char*
>(sendData),
426 const_cast<int*
>(sendSizes.
begin()),
427 const_cast<int*
>(sendOffsets.
begin()),
430 const_cast<int*
>(recvSizes.
begin()),
431 const_cast<int*
>(recvOffsets.
begin()),
438 <<
"MPI_Alltoallv failed for sendSizes " << sendSizes
439 <<
" recvSizes " << recvSizes
449 const char* sendData,
455 const label communicator
458 label np = nProcs(communicator);
463 && (recvSizes.
size() != np || recvOffsets.
size() < np)
470 <<
"Size of recvSizes " << recvSizes.
size()
471 <<
" or recvOffsets " << recvOffsets.
size()
472 <<
" is not equal to the number of processors in the domain "
479 memmove(recvData, sendData, sendSize);
487 const_cast<char*
>(sendData),
491 const_cast<int*
>(recvSizes.
begin()),
492 const_cast<int*
>(recvOffsets.
begin()),
500 <<
"MPI_Gatherv failed for sendSize " << sendSize
501 <<
" recvSizes " << recvSizes
502 <<
" communicator " << communicator
511 const char* sendData,
512 const UList<int>& sendSizes,
513 const UList<int>& sendOffsets,
517 const label communicator
520 label np = nProcs(communicator);
525 && (sendSizes.size() != np || sendOffsets.size() != np)
529 <<
"Size of sendSizes " << sendSizes.size()
530 <<
" or sendOffsets " << sendOffsets.size()
531 <<
" is not equal to the number of processors in the domain "
538 memmove(recvData, sendData, recvSize);
546 const_cast<char*
>(sendData),
547 const_cast<int*
>(sendSizes.begin()),
548 const_cast<int*
>(sendOffsets.begin()),
559 <<
"MPI_Scatterv failed for sendSizes " << sendSizes
560 <<
" sendOffsets " << sendOffsets
561 <<
" communicator " << communicator
568 void Foam::UPstream::allocatePstreamCommunicator
570 const label parentIndex,
577 MPI_Group newGroup = MPI_GROUP_NULL;
579 MPI_Comm newComm = MPI_COMM_NULL;
585 <<
"PstreamGlobals out of sync with UPstream data. Problem."
590 if (parentIndex == -1)
597 <<
"world communicator should always be index "
619 procIndices_[index].setSize(numProcs);
620 forAll(procIndices_[index], i)
622 procIndices_[index][i] = i;
631 procIndices_[index].size(),
632 procIndices_[index].begin(),
646 myProcNo_[index] = -1;
661 <<
" when allocating communicator at " << index
662 <<
" from ranks " << procIndices_[index]
663 <<
" of parent " << parentIndex
664 <<
" cannot find my own rank"
672 void Foam::UPstream::freePstreamCommunicator(
const label communicator)
709 Pout<<
"UPstream::waitRequests : starting wait for "
711 <<
" outstanding requests starting at " << start <<
endl;
716 SubList<MPI_Request> waitRequests
728 waitRequests.begin(),
734 <<
"MPI_Waitall returned with error" <<
Foam::endl;
737 resetRequests(start);
742 Pout<<
"UPstream::waitRequests : finished wait." <<
endl;
751 Pout<<
"UPstream::waitRequest : starting wait for request:" << i
759 <<
" outstanding send requests and you are asking for i=" << i
761 <<
"Maybe you are mixing blocking/non-blocking comms?"
775 <<
"MPI_Wait returned with error" <<
Foam::endl;
780 Pout<<
"UPstream::waitRequest : finished wait for request:" << i
790 Pout<<
"UPstream::finishedRequest : checking request:" << i
798 <<
" outstanding send requests and you are asking for i=" << i
800 <<
"Maybe you are mixing blocking/non-blocking comms?"
814 Pout<<
"UPstream::finishedRequest : finished request:" << i
842 Pout<<
"UPstream::allocateTag " <<
s
871 Pout<<
"UPstream::allocateTag " <<
s
890 Pout<<
"UPstream::freeTag " <<
s <<
" tag:" << tag <<
endl;
906 Pout<<
"UPstream::freeTag " <<
s <<
" tag:" << tag <<
endl;
Functions used by OpenFOAM that are specific to POSIX compliant operating systems and need to be repl...
Inter-processor communication reduction functions.
#define forAll(list, i)
Loop across all elements in list.
Various functions to wrap MPI_Allreduce.
T remove()
Remove and return the top element.
DynamicList< T, SizeInc, SizeMult, SizeDiv > & append(const T &)
Append an element at the end of the list.
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
label size() const
Return the number of elements in the UList.
iterator begin()
Return an iterator to begin traversing the UList.
Helper class for allocating/freeing communicators.
static bool init(int &argc, char **&argv, const bool needsThread)
Initialisation function called from main.
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
static void freeTag(const char *, const int tag)
static int allocateTag(const char *)
static bool master(const label communicator=0)
Am I the master process.
static void allToAll(const labelUList &sendData, labelUList &recvData, const label communicator=0)
Exchange label with all processors (in the communicator).
static label worldComm
Default communicator (all processors)
static label nRequests()
Get number of outstanding requests.
static void gather(const char *sendData, int sendSize, char *recvData, const UList< int > &recvSizes, const UList< int > &recvOffsets, const label communicator=0)
Receive data from all processors on the master.
static void addValidParOptions(HashTable< string > &validParOptions)
Add the valid option this type of communications library.
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
static void resetRequests(const label sz)
Truncate number of outstanding requests.
static void scatter(const char *sendData, const UList< int > &sendSizes, const UList< int > &sendOffsets, char *recvData, int recvSize, const label communicator=0)
Send data to all processors from the root of the communicator.
static void abort()
Abort program.
static bool & parRun()
Is this a parallel run?
static void exit(int errnum=1)
Exit program.
static bool finishedRequest(const label i)
Non-blocking comms: has request i finished?
static void waitRequest(const label i)
Wait until request i has finished.
static void printStack(Ostream &)
Helper function to print a stack.
A class for handling words, derived from string.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
gmvFile<< "tracers "<< particles.size()<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){ gmvFile<< iter().position().x()<< " ";}gmvFile<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){ gmvFile<< iter().position().y()<< " ";}gmvFile<< nl;forAllConstIter(Cloud< passiveParticle >, particles, iter){ gmvFile<< iter().position().z()<< " ";}gmvFile<< nl;forAll(lagrangianScalarNames, i){ word name=lagrangianScalarNames[i];IOField< scalar > s(IOobject(name, runTime.name(), cloud::prefix, mesh, IOobject::MUST_READ, IOobject::NO_WRITE))
#define WarningInFunction
Report a warning using Foam::Warning.
DynamicList< int > freedTags_
DynamicList< MPI_Group > MPIGroups_
DynamicList< MPI_Comm > MPICommunicators_
DynamicList< MPI_Request > outstandingRequests_
Vector2D< scalar > vector2D
vector2D obtained from generic Vector2D
errorManipArg< error, int > exit(error &err, const int errNo=1)
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Ostream & endl(Ostream &os)
Add newline and flush stream.
errorManip< error > abort(error &err)
void allReduce(Type &Value, int count, MPI_Datatype MPIType, MPI_Op op, const BinaryOp &bop, const int tag, const label communicator)
void reduce(const List< UPstream::commsStruct > &comms, T &Value, const BinaryOp &bop, const int tag, const label comm)
string getEnv(const word &)
Return environment variable of given name.
prefixOSstream Pout(cout, "Pout")
UList< label > labelUList
void sumReduce(T &Value, label &Count, const int tag=Pstream::msgType(), const label comm=UPstream::worldComm)