42 bool Foam::OFstreamCollator::writeFile
46 const fileName& fName,
47 const string& masterData,
49 const PtrList<SubList<char>>& slaveData,
51 IOstream::versionNumber ver,
58 Pout<<
"OFstreamCollator : Writing master " << masterData.size()
59 <<
" bytes to " << fName
60 <<
" using comm " << comm <<
endl;
63 Pout<<
"OFstreamCollator : Slave data" <<
endl;
66 if (slaveData.set(proci))
69 <<
" size:" << slaveData[proci].size()
76 autoPtr<OSstream> osPtr;
95 OSstream& os = osPtr();
112 const_cast<char*>(masterData.data()),
113 label(masterData.size())
121 List<std::streamoff> start;
131 fileOperations::masterUncollatedFileOperation::
132 maxMasterFileBufferSize == 0
139 if (osPtr.valid() && !osPtr().good())
147 Pout<<
"OFstreamCollator : Finished writing " << masterData.size()
158 std::ostringstream os;
160 Pout<<
" (overall " << os.str() <<
")";
162 Pout<<
" to " << fName
163 <<
" using comm " << comm <<
endl;
170 void* Foam::OFstreamCollator::writeAll(
void *threadarg)
172 OFstreamCollator& handler = *
static_cast<OFstreamCollator*
>(threadarg);
180 std::lock_guard<std::mutex> guard(handler.mutex_);
181 if (handler.objects_.size())
183 ptr = handler.objects_.pop();
194 PtrList<SubList<char>> slaveData;
195 if (ptr->slaveData_.size())
197 slaveData.setSize(ptr->slaveData_.size());
200 if (ptr->slaveData_.set(proci))
207 ptr->slaveData_[proci],
231 <<
"Failed writing " << ptr->filePath_
242 Pout<<
"OFstreamCollator : Exiting write thread " <<
endl;
246 std::lock_guard<std::mutex> guard(handler.mutex_);
247 handler.threadRunning_ =
false;
254 void Foam::OFstreamCollator::waitForBufferSpace(
const off_t wantedSize)
const 262 std::lock_guard<std::mutex> guard(mutex_);
265 totalSize += iter()->size();
272 || (wantedSize >= 0 && (totalSize+wantedSize) <= maxBufferSize_)
280 std::lock_guard<std::mutex> guard(mutex_);
281 Pout<<
"OFstreamCollator : Waiting for buffer space." 282 <<
" Currently in use:" << totalSize
283 <<
" limit:" << maxBufferSize_
284 <<
" files:" << objects_.size()
297 maxBufferSize_(maxBufferSize),
298 threadRunning_(false),
313 const off_t maxBufferSize,
317 maxBufferSize_(maxBufferSize),
318 threadRunning_(
false),
339 Pout<<
"~OFstreamCollator : Waiting for write thread" <<
endl;
345 if (threadComm_ != -1)
356 const word& typeName,
372 label maxLocalSize = 0;
374 for (
label proci = 0; proci < recvSizes.
size(); proci++)
376 totalSize += recvSizes[proci];
377 maxLocalSize =
max(maxLocalSize, recvSizes[proci]);
383 if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
387 Pout<<
"OFstreamCollator : non-thread gather and write of " << fName
388 <<
" using local comm " << localComm_ <<
endl;
406 else if (totalSize <= maxBufferSize_)
413 Pout<<
"OFstreamCollator : non-thread gather; thread write of " 419 waitForBufferSpace(totalSize);
445 writeData& fileAndData = fileAndDataPtr();
459 for (
label proci = 1; proci < slaveData.
size(); proci++)
466 reinterpret_cast<char*>(slaveData[proci].begin()),
467 slaveData[proci].byteSize(),
481 reinterpret_cast<const char*>(slice.
begin()),
489 <<
"Cannot send outgoing message. " 490 <<
"to:" << 0 <<
" nBytes:" 498 std::lock_guard<std::mutex> guard(mutex_);
501 objects_.
push(fileAndDataPtr.
ptr());
510 Pout<<
"OFstreamCollator : Waiting for write thread" 518 Pout<<
"OFstreamCollator : Starting write thread" 521 thread_.
reset(
new std::thread(writeAll,
this));
522 threadRunning_ =
true;
532 Pout<<
"OFstreamCollator : thread gather and write of " << fName
533 <<
" using communicator " << threadComm_ <<
endl;
539 <<
"mpi does not seem to have thread support." 540 <<
" Make sure to set buffer size 'maxThreadFileBufferSize'" 541 <<
" to at least " << totalSize
542 <<
" to be able to do the collating before threading." 548 waitForBufferSpace(data.size());
552 std::lock_guard<std::mutex> guard(mutex_);
578 Pout<<
"OFstreamCollator : Waiting for write thread" 586 Pout<<
"OFstreamCollator : Starting write thread" <<
endl;
588 thread_.
reset(
new std::thread(writeAll,
this));
589 threadRunning_ =
true;
606 Pout<<
"OFstreamCollator : waiting for thread to have consumed all" 609 waitForBufferSpace(-1);
#define forAll(list, i)
Loop across all elements in list.
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
A class for handling file names.
bool set(const label) const
Is element set.
virtual ~OFstreamCollator()
Destructor.
errorManipArg< error, int > exit(error &err, const int errNo=1)
void reset(T *=nullptr)
If object pointer already set, delete object and set to given.
dimensioned< Type > max(const dimensioned< Type > &, const dimensioned< Type > &)
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &start, const UList< char > &masterData, const labelUList &recvSizes, const PtrList< SubList< char >> &slaveData, const UPstream::commsTypes, const bool syncReturnState=true)
Write *this. Ostream only valid on master. Returns starts of.
void size(const label)
Override size to be inconsistent with allocated storage.
Ostream & endl(Ostream &os)
Add newline and flush stream.
static bool master(const label communicator=0)
Am I the master process.
labelList identity(const label len)
Create identity map (map[i] == i) of given length.
static label nRequests()
Get number of outstanding requests.
static int & msgType()
Message tag of standard messages.
T * ptr()
Return object pointer for reuse.
UList< label > labelUList
dimensioned< Type > sum(const DimensionedField< Type, GeoMesh > &df)
void clear()
Delete object (if the pointer is valid) and set pointer to.
Functions used by OpenFOAM that are specific to POSIX compliant operating systems and need to be repl...
bool write(const word &typeName, const fileName &, const string &data, IOstream::streamFormat, IOstream::versionNumber, IOstream::compressionType, const bool append, const bool useThread=true)
Write file with contents. Blocks until writethread has space.
A class for handling words, derived from string.
static void writeHeader(Ostream &os, const IOstream::versionNumber version, const IOstream::streamFormat format, const word &type, const string ¬e, const fileName &location, const word &name)
Helper: write FoamFile IOobject header.
void waitAll()
Wait for all thread actions to have finished.
iterator begin()
Return an iterator to begin traversing the UList.
streamFormat
Enumeration for the format of data in the stream.
bool valid() const
Return true if the autoPtr valid (ie, the pointer is set)
forAllConstIter(PtrDictionary< phaseModel >, mixture.phases(), phase)
errorManip< error > abort(error &err)
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
void setSize(const label)
Reset size of PtrList. If extending the PtrList, new entries are.
static void scatter(const List< commsStruct > &comms, T &Value, const int tag, const label comm)
Scatter data. Distribute without modification. Reverse of gather.
compressionType
Enumeration for the format of data in the stream.
static const string null
An empty string.
static label read(const commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=0)
Read into given buffer from given processor and return the.
static bool haveThreads()
Have support for threads.
defineTypeNameAndDebug(combustionModel, 0)
static bool write(const commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=0)
Write given buffer to given processor.
const bool writeData(readBool(pdfDictionary.lookup("writeData")))
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
bool mkDir(const fileName &, mode_t=0777)
Make a directory and return an error if it could not be created.
label size() const
Return the number of elements in the UPtrList.
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size. 0 = do not use thread.
static label nProcs(const label communicator=0)
Number of processes in parallel run.
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
A templated 1D list of pointers to objects of type <T>, where the size of the array is known and used...
std::streamsize byteSize() const
Return the binary size in number of characters of the UList.
prefixOSstream Pout(cout, "Pout")
void push(const T &a)
Push an element onto the stack.
An auto-pointer similar to the STL auto_ptr but with automatic casting to a reference to the type and...
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
unsigned int sleep(const unsigned int)
Sleep for the specified number of seconds.
Inter-processor communications stream.
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
static void gather(const label comm, const label data, labelList &datas)
Helper: gather single label. Note: using native Pstream.