42 bool Foam::OFstreamCollator::writeFile
46 const fileName& fName,
47 const string& masterData,
49 const PtrList<SubList<char>>& slaveData,
50 IOstream::streamFormat fmt,
51 IOstream::versionNumber ver,
52 IOstream::compressionType cmp,
58 Pout<<
"OFstreamCollator : Writing master " << masterData.size()
59 <<
" bytes to " << fName
60 <<
" using comm " << comm <<
endl;
63 Pout<<
"OFstreamCollator : Slave data" <<
endl;
66 if (slaveData.set(proci))
69 <<
" size:" << slaveData[proci].size()
76 autoPtr<OSstream> osPtr;
95 OSstream& os = osPtr();
112 const_cast<char*
>(masterData.data()),
113 label(masterData.size())
121 List<std::streamoff> start;
131 fileOperations::masterUncollatedFileOperation::
132 maxMasterFileBufferSize == 0
139 if (osPtr.valid() && !osPtr().good())
147 Pout<<
"OFstreamCollator : Finished writing " << masterData.size()
158 std::ostringstream os;
160 Pout<<
" (overall " << os.str() <<
")";
162 Pout<<
" to " << fName
163 <<
" using comm " << comm <<
endl;
170 void* Foam::OFstreamCollator::writeAll(
void *threadarg)
172 OFstreamCollator& handler = *
static_cast<OFstreamCollator*
>(threadarg);
180 std::lock_guard<std::mutex> guard(handler.mutex_);
181 if (handler.objects_.size())
183 ptr = handler.objects_.pop();
194 PtrList<SubList<char>> slaveData;
195 if (ptr->slaveData_.size())
197 slaveData.setSize(ptr->slaveData_.size());
200 if (ptr->slaveData_.set(proci))
207 ptr->slaveData_[proci],
231 <<
"Failed writing " << ptr->filePath_
242 Pout<<
"OFstreamCollator : Exiting write thread " <<
endl;
246 std::lock_guard<std::mutex> guard(handler.mutex_);
247 handler.threadRunning_ =
false;
254 void Foam::OFstreamCollator::waitForBufferSpace(
const off_t wantedSize)
const
262 std::lock_guard<std::mutex> guard(mutex_);
265 totalSize += iter()->size();
272 || (wantedSize >= 0 && (totalSize+wantedSize) <= maxBufferSize_)
280 std::lock_guard<std::mutex> guard(mutex_);
281 Pout<<
"OFstreamCollator : Waiting for buffer space."
282 <<
" Currently in use:" << totalSize
283 <<
" limit:" << maxBufferSize_
284 <<
" files:" << objects_.size()
297 maxBufferSize_(maxBufferSize),
298 threadRunning_(false),
313 const off_t maxBufferSize,
317 maxBufferSize_(maxBufferSize),
318 threadRunning_(false),
339 Pout<<
"~OFstreamCollator : Waiting for write thread" <<
endl;
345 if (threadComm_ != -1)
356 const word& typeName,
372 label maxLocalSize = 0;
374 for (
label proci = 0; proci < recvSizes.
size(); proci++)
376 totalSize += recvSizes[proci];
377 maxLocalSize =
max(maxLocalSize, recvSizes[proci]);
383 if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
387 Pout<<
"OFstreamCollator : non-thread gather and write of " << fName
388 <<
" using local comm " << localComm_ <<
endl;
406 else if (totalSize <= maxBufferSize_)
413 Pout<<
"OFstreamCollator : non-thread gather; thread write of "
419 waitForBufferSpace(totalSize);
445 writeData& fileAndData = fileAndDataPtr();
459 for (
label proci = 1; proci < slaveData.
size(); proci++)
466 reinterpret_cast<char*
>(slaveData[proci].begin()),
467 slaveData[proci].byteSize(),
481 reinterpret_cast<const char*
>(slice.
begin()),
489 <<
"Cannot send outgoing message. "
490 <<
"to:" << 0 <<
" nBytes:"
498 std::lock_guard<std::mutex> guard(mutex_);
501 objects_.push(fileAndDataPtr.
ptr());
510 Pout<<
"OFstreamCollator : Waiting for write thread"
518 Pout<<
"OFstreamCollator : Starting write thread"
521 thread_.reset(
new std::thread(writeAll,
this));
522 threadRunning_ =
true;
532 Pout<<
"OFstreamCollator : thread gather and write of " << fName
533 <<
" using communicator " << threadComm_ <<
endl;
539 <<
"mpi does not seem to have thread support."
540 <<
" Make sure to set buffer size 'maxThreadFileBufferSize'"
541 <<
" to at least " << totalSize
542 <<
" to be able to do the collating before threading."
548 waitForBufferSpace(data.size());
552 std::lock_guard<std::mutex> guard(mutex_);
578 Pout<<
"OFstreamCollator : Waiting for write thread"
586 Pout<<
"OFstreamCollator : Starting write thread" <<
endl;
588 thread_.reset(
new std::thread(writeAll,
this));
589 threadRunning_ =
true;
606 Pout<<
"OFstreamCollator : waiting for thread to have consumed all"
609 waitForBufferSpace(-1);
Functions used by OpenFOAM that are specific to POSIX compliant operating systems and need to be repl...
#define forAll(list, i)
Loop across all elements in list.
#define forAllConstIter(Container, container, iter)
Iterate across all elements in the container object of type.
streamFormat
Enumeration for the format of data in the stream.
compressionType
Enumeration for the format of data in the stream.
void size(const label)
Override size to be inconsistent with allocated storage.
virtual ~OFstreamCollator()
Destructor.
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size. 0 = do not use thread.
bool write(const word &typeName, const fileName &, const string &data, IOstream::streamFormat, IOstream::versionNumber, IOstream::compressionType, const bool append, const bool useThread=true)
Write file with contents. Blocks until writethread has space.
void waitAll()
Wait for all thread actions to have finished.
static void scatter(const List< commsStruct > &comms, T &Value, const int tag, const label comm)
Scatter data. Distribute without modification. Reverse of gather.
A templated 1D list of pointers to objects of type <T>, where the size of the array is known and used...
bool set(const label) const
Is element set.
void setSize(const label)
Reset size of PtrList. If extending the PtrList, new entries are.
static label read(const commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=0)
Read into given buffer from given processor and return the.
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
iterator begin()
Return an iterator to begin traversing the UList.
std::streamsize byteSize() const
Return the binary size in number of characters of the UList.
static bool write(const commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=0)
Write given buffer to given processor.
Inter-processor communications stream.
static bool haveThreads()
Have support for threads.
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
static bool master(const label communicator=0)
Am I the master process.
static label nRequests()
Get number of outstanding requests.
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
static int & msgType()
Message tag of standard messages.
label size() const
Return the number of elements in the UPtrList.
An auto-pointer similar to the STL auto_ptr but with automatic casting to a reference to the type and...
T * ptr()
Return object pointer for reuse.
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &start, const UList< char > &masterData, const labelUList &recvSizes, const PtrList< SubList< char >> &slaveData, const UPstream::commsTypes, const bool syncReturnState=true)
Write *this. Ostream only valid on master. Returns starts of.
static void gather(const label comm, const label data, labelList &datas)
Helper: gather single label. Note: using native Pstream.
static void writeHeader(Ostream &os, const IOstream::versionNumber version, const IOstream::streamFormat format, const word &type, const string ¬e, const fileName &location, const word &name)
Helper: write FoamFile IOobject header.
A class for handling file names.
static const string null
An empty string.
A class for handling words, derived from string.
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
errorManipArg< error, int > exit(error &err, const int errNo=1)
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
bool mkDir(const fileName &, mode_t=0777)
Make a directory and return an error if it could not be created.
Ostream & endl(Ostream &os)
Add newline and flush stream.
dimensioned< Type > sum(const DimensionedField< Type, GeoMesh > &df)
errorManip< error > abort(error &err)
defineTypeNameAndDebug(combustionModel, 0)
layerAndWeight max(const layerAndWeight &a, const layerAndWeight &b)
prefixOSstream Pout(cout, "Pout")
labelList identityMap(const label len)
Create identity map (map[i] == i) of given length.
UList< label > labelUList
unsigned int sleep(const unsigned int)
Sleep for the specified number of seconds.
const bool writeData(readBool(pdfDictionary.lookup("writeData")))