41 bool Foam::OFstreamCollator::writeFile
45 const fileName& fName,
46 const string& masterData,
48 const PtrList<SubList<char>>& slaveData,
50 IOstream::versionNumber ver,
57 Pout<<
"OFstreamCollator : Writing master " << masterData.size()
58 <<
" bytes to " << fName
59 <<
" using comm " << comm <<
endl;
62 Pout<<
"OFstreamCollator : Slave data" <<
endl;
65 if (slaveData.set(proci))
68 <<
" size:" << slaveData[proci].size()
75 autoPtr<OSstream> osPtr;
94 OSstream& os = osPtr();
111 const_cast<char*>(masterData.data()),
112 label(masterData.size())
120 List<std::streamoff> start;
130 fileOperations::masterUncollatedFileOperation::
131 maxMasterFileBufferSize == 0
138 if (osPtr.valid() && !osPtr().good())
146 Pout<<
"OFstreamCollator : Finished writing " << masterData.size()
157 std::ostringstream os;
159 Pout<<
" (overall " << os.str() <<
")";
161 Pout<<
" to " << fName
162 <<
" using comm " << comm <<
endl;
169 void* Foam::OFstreamCollator::writeAll(
void *threadarg)
171 OFstreamCollator& handler = *
static_cast<OFstreamCollator*
>(threadarg);
179 std::lock_guard<std::mutex> guard(handler.mutex_);
180 if (handler.objects_.size())
182 ptr = handler.objects_.pop();
193 PtrList<SubList<char>> slaveData;
194 if (ptr->slaveData_.size())
196 slaveData.setSize(ptr->slaveData_.size());
199 if (ptr->slaveData_.set(proci))
206 ptr->slaveData_[proci],
230 <<
"Failed writing " << ptr->pathName_
241 Pout<<
"OFstreamCollator : Exiting write thread " <<
endl;
245 std::lock_guard<std::mutex> guard(handler.mutex_);
246 handler.threadRunning_ =
false;
253 void Foam::OFstreamCollator::waitForBufferSpace(
const off_t wantedSize)
const 261 std::lock_guard<std::mutex> guard(mutex_);
264 totalSize += iter()->size();
271 || (wantedSize >= 0 && (totalSize+wantedSize) <= maxBufferSize_)
279 std::lock_guard<std::mutex> guard(mutex_);
280 Pout<<
"OFstreamCollator : Waiting for buffer space." 281 <<
" Currently in use:" << totalSize
282 <<
" limit:" << maxBufferSize_
283 <<
" files:" << objects_.size()
296 maxBufferSize_(maxBufferSize),
297 threadRunning_(false),
312 const off_t maxBufferSize,
316 maxBufferSize_(maxBufferSize),
317 threadRunning_(
false),
338 Pout<<
"~OFstreamCollator : Waiting for write thread" <<
endl;
344 if (threadComm_ != -1)
355 const word& typeName,
371 label maxLocalSize = 0;
373 for (
label proci = 0; proci < recvSizes.
size(); proci++)
375 totalSize += recvSizes[proci];
376 maxLocalSize =
max(maxLocalSize, recvSizes[proci]);
382 if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
386 Pout<<
"OFstreamCollator : non-thread gather and write of " << fName
387 <<
" using local comm " << localComm_ <<
endl;
405 else if (totalSize <= maxBufferSize_)
412 Pout<<
"OFstreamCollator : non-thread gather; thread write of " 418 waitForBufferSpace(totalSize);
444 writeData& fileAndData = fileAndDataPtr();
458 for (
label proci = 1; proci < slaveData.
size(); proci++)
465 reinterpret_cast<char*>(slaveData[proci].begin()),
466 slaveData[proci].byteSize(),
480 reinterpret_cast<const char*>(slice.
begin()),
488 <<
"Cannot send outgoing message. " 489 <<
"to:" << 0 <<
" nBytes:" 497 std::lock_guard<std::mutex> guard(mutex_);
500 objects_.
push(fileAndDataPtr.
ptr());
509 Pout<<
"OFstreamCollator : Waiting for write thread" 517 Pout<<
"OFstreamCollator : Starting write thread" 520 thread_.
reset(
new std::thread(writeAll,
this));
521 threadRunning_ =
true;
531 Pout<<
"OFstreamCollator : thread gather and write of " << fName
532 <<
" using communicator " << threadComm_ <<
endl;
538 <<
"mpi does not seem to have thread support." 539 <<
" Make sure to set buffer size 'maxThreadFileBufferSize'" 540 <<
" to at least " << totalSize
541 <<
" to be able to do the collating before threading." 547 waitForBufferSpace(data.size());
551 std::lock_guard<std::mutex> guard(mutex_);
577 Pout<<
"OFstreamCollator : Waiting for write thread" 585 Pout<<
"OFstreamCollator : Starting write thread" <<
endl;
587 thread_.
reset(
new std::thread(writeAll,
this));
588 threadRunning_ =
true;
605 Pout<<
"OFstreamCollator : waiting for thread to have consumed all" 608 waitForBufferSpace(-1);
#define forAll(list, i)
Loop across all elements in list.
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
A class for handling file names.
bool set(const label) const
Is element set.
virtual ~OFstreamCollator()
Destructor.
errorManipArg< error, int > exit(error &err, const int errNo=1)
void reset(T *=nullptr)
If object pointer already set, delete object and set to given.
dimensioned< Type > max(const dimensioned< Type > &, const dimensioned< Type > &)
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &start, const UList< char > &masterData, const labelUList &recvSizes, const PtrList< SubList< char >> &slaveData, const UPstream::commsTypes, const bool syncReturnState=true)
Write *this. Ostream only valid on master. Returns starts of.
void size(const label)
Override size to be inconsistent with allocated storage.
Ostream & endl(Ostream &os)
Add newline and flush stream.
static bool master(const label communicator=0)
Am I the master process.
labelList identity(const label len)
Create identity map (map[i] == i) of given length.
static label nRequests()
Get number of outstanding requests.
static int & msgType()
Message tag of standard messages.
T * ptr()
Return object pointer for reuse.
UList< label > labelUList
dimensioned< Type > sum(const DimensionedField< Type, GeoMesh > &df)
void clear()
Delete object (if the pointer is valid) and set pointer to.
bool write(const word &typeName, const fileName &, const string &data, IOstream::streamFormat, IOstream::versionNumber, IOstream::compressionType, const bool append, const bool useThread=true)
Write file with contents. Blocks until writethread has space.
A class for handling words, derived from string.
static void writeHeader(Ostream &os, const IOstream::versionNumber version, const IOstream::streamFormat format, const word &type, const string ¬e, const fileName &location, const word &name)
Helper: write FoamFile IOobject header.
void waitAll()
Wait for all thread actions to have finished.
iterator begin()
Return an iterator to begin traversing the UList.
streamFormat
Enumeration for the format of data in the stream.
bool valid() const
Return true if the autoPtr valid (ie, the pointer is set)
forAllConstIter(PtrDictionary< phaseModel >, mixture.phases(), phase)
errorManip< error > abort(error &err)
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
void setSize(const label)
Reset size of PtrList. If extending the PtrList, new entries are.
static void scatter(const List< commsStruct > &comms, T &Value, const int tag, const label comm)
Scatter data. Distribute without modification. Reverse of gather.
compressionType
Enumeration for the format of data in the stream.
static const string null
An empty string.
static label read(const commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=0)
Read into given buffer from given processor and return the.
static bool haveThreads()
Have support for threads.
defineTypeNameAndDebug(combustionModel, 0)
static bool write(const commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=0)
Write given buffer to given processor.
const bool writeData(readBool(pdfDictionary.lookup("writeData")))
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
bool mkDir(const fileName &, mode_t=0777)
Make a directory and return an error if it could not be created.
label size() const
Return the number of elements in the UPtrList.
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size. 0 = do not use thread.
static label nProcs(const label communicator=0)
Number of processes in parallel run.
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
A templated 1D list of pointers to objects of type <T>, where the size of the array is known and used...
std::streamsize byteSize() const
Return the binary size in number of characters of the UList.
prefixOSstream Pout(cout, "Pout")
void push(const T &a)
Push an element onto the stack.
An auto-pointer similar to the STL auto_ptr but with automatic casting to a reference to the type and...
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
unsigned int sleep(const unsigned int)
Sleep for the specified number of seconds.
Inter-processor communications stream.
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
static void gather(const label comm, const label data, labelList &datas)
Helper: gather single label. Note: using native Pstream.