OFstreamCollator.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration | Website: https://openfoam.org
5  \\ / A nd | Copyright (C) 2017-2020 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include "OFstreamCollator.H"
27 #include "OFstream.H"
28 #include "decomposedBlockData.H"
30 #include "OSspecific.H"
31 
32 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
33 
34 namespace Foam
35 {
36  defineTypeNameAndDebug(OFstreamCollator, 0);
37 }
38 
39 
40 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
41 
42 bool Foam::OFstreamCollator::writeFile
43 (
44  const label comm,
45  const word& typeName,
46  const fileName& fName,
47  const string& masterData,
48  const labelUList& recvSizes,
49  const PtrList<SubList<char>>& slaveData, // optional slave data
51  IOstream::versionNumber ver,
53  const bool append
54 )
55 {
56  if (debug)
57  {
58  Pout<< "OFstreamCollator : Writing master " << masterData.size()
59  << " bytes to " << fName
60  << " using comm " << comm << endl;
61  if (slaveData.size())
62  {
63  Pout<< "OFstreamCollator : Slave data" << endl;
64  forAll(slaveData, proci)
65  {
66  if (slaveData.set(proci))
67  {
68  Pout<< " " << proci
69  << " size:" << slaveData[proci].size()
70  << endl;
71  }
72  }
73  }
74  }
75 
76  autoPtr<OSstream> osPtr;
77  if (UPstream::master(comm))
78  {
79  Foam::mkDir(fName.path());
80  osPtr.reset
81  (
82  new OFstream
83  (
84  fName,
85  fmt,
86  ver,
87  cmp,
88  append
89  )
90  );
91 
92  // We don't have IOobject so cannot use IOobject::writeHeader
93  if (!append)
94  {
95  OSstream& os = osPtr();
97  (
98  os,
99  ver,
100  fmt,
101  typeName,
102  "",
103  fName,
104  fName.name()
105  );
106  }
107  }
108 
109 
110  UList<char> slice
111  (
112  const_cast<char*>(masterData.data()),
113  label(masterData.size())
114  );
115 
116  // Assuming threaded writing hides any slowness so we
117  // can use scheduled communication to send the data to
118  // the master processor in order. However can be unstable
119  // for some mpi so default is non-blocking.
120 
121  List<std::streamoff> start;
123  (
124  comm,
125  osPtr,
126  start,
127  slice,
128  recvSizes,
129  slaveData,
130  (
131  fileOperations::masterUncollatedFileOperation::
132  maxMasterFileBufferSize == 0
135  ),
136  false // do not reduce return state
137  );
138 
139  if (osPtr.valid() && !osPtr().good())
140  {
141  FatalIOErrorInFunction(osPtr())
142  << "Failed writing to " << fName << exit(FatalIOError);
143  }
144 
145  if (debug)
146  {
147  Pout<< "OFstreamCollator : Finished writing " << masterData.size()
148  << " bytes";
149  if (UPstream::master(comm))
150  {
151  off_t sum = 0;
152  forAll(recvSizes, i)
153  {
154  sum += recvSizes[i];
155  }
156  // Use ostringstream to display long int (until writing these is
157  // supported)
158  std::ostringstream os;
159  os << sum;
160  Pout<< " (overall " << os.str() << ")";
161  }
162  Pout<< " to " << fName
163  << " using comm " << comm << endl;
164  }
165 
166  return true;
167 }
168 
169 
170 void* Foam::OFstreamCollator::writeAll(void *threadarg)
171 {
172  OFstreamCollator& handler = *static_cast<OFstreamCollator*>(threadarg);
173 
174  // Consume stack
175  while (true)
176  {
177  writeData* ptr = nullptr;
178 
179  {
180  std::lock_guard<std::mutex> guard(handler.mutex_);
181  if (handler.objects_.size())
182  {
183  ptr = handler.objects_.pop();
184  }
185  }
186 
187  if (!ptr)
188  {
189  break;
190  }
191  else
192  {
193  // Convert storage to pointers
194  PtrList<SubList<char>> slaveData;
195  if (ptr->slaveData_.size())
196  {
197  slaveData.setSize(ptr->slaveData_.size());
198  forAll(slaveData, proci)
199  {
200  if (ptr->slaveData_.set(proci))
201  {
202  slaveData.set
203  (
204  proci,
205  new SubList<char>
206  (
207  ptr->slaveData_[proci],
208  ptr->sizes_[proci]
209  )
210  );
211  }
212  }
213  }
214 
215  bool ok = writeFile
216  (
217  ptr->comm_,
218  ptr->typeName_,
219  ptr->filePath_,
220  ptr->data_,
221  ptr->sizes_,
222  slaveData,
223  ptr->format_,
224  ptr->version_,
225  ptr->compression_,
226  ptr->append_
227  );
228  if (!ok)
229  {
230  FatalIOErrorInFunction(ptr->filePath_)
231  << "Failed writing " << ptr->filePath_
232  << exit(FatalIOError);
233  }
234 
235  delete ptr;
236  }
237  // sleep(1);
238  }
239 
240  if (debug)
241  {
242  Pout<< "OFstreamCollator : Exiting write thread " << endl;
243  }
244 
245  {
246  std::lock_guard<std::mutex> guard(handler.mutex_);
247  handler.threadRunning_ = false;
248  }
249 
250  return nullptr;
251 }
252 
253 
254 void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const
255 {
256  while (true)
257  {
258  // Count files to be written
259  off_t totalSize = 0;
260 
261  {
262  std::lock_guard<std::mutex> guard(mutex_);
263  forAllConstIter(FIFOStack<writeData*>, objects_, iter)
264  {
265  totalSize += iter()->size();
266  }
267  }
268 
269  if
270  (
271  totalSize == 0
272  || (wantedSize >= 0 && (totalSize+wantedSize) <= maxBufferSize_)
273  )
274  {
275  break;
276  }
277 
278  if (debug)
279  {
280  std::lock_guard<std::mutex> guard(mutex_);
281  Pout<< "OFstreamCollator : Waiting for buffer space."
282  << " Currently in use:" << totalSize
283  << " limit:" << maxBufferSize_
284  << " files:" << objects_.size()
285  << endl;
286  }
287 
288  sleep(5);
289  }
290 }
291 
292 
293 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
294 
296 :
297  maxBufferSize_(maxBufferSize),
298  threadRunning_(false),
299  localComm_(UPstream::worldComm),
300  threadComm_
301  (
302  UPstream::allocateCommunicator
303  (
304  localComm_,
305  identity(UPstream::nProcs(localComm_))
306  )
307  )
308 {}
309 
310 
312 (
313  const off_t maxBufferSize,
314  const label comm
315 )
316 :
317  maxBufferSize_(maxBufferSize),
318  threadRunning_(false),
319  localComm_(comm),
320  threadComm_
321  (
323  (
324  localComm_,
325  identity(UPstream::nProcs(localComm_))
326  )
327  )
328 {}
329 
330 
331 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
332 
334 {
335  if (thread_.valid())
336  {
337  if (debug)
338  {
339  Pout<< "~OFstreamCollator : Waiting for write thread" << endl;
340  }
341  thread_().join();
342  thread_.clear();
343  }
344 
345  if (threadComm_ != -1)
346  {
347  UPstream::freeCommunicator(threadComm_);
348  }
349 }
350 
351 
352 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
353 
355 (
356  const word& typeName,
357  const fileName& fName,
358  const string& data,
362  const bool append,
363  const bool useThread
364 )
365 {
366  // Determine (on master) sizes to receive. Note: do NOT use thread
367  // communicator
368  labelList recvSizes;
369  decomposedBlockData::gather(localComm_, label(data.size()), recvSizes);
370 
371  off_t totalSize = 0;
372  label maxLocalSize = 0;
373  {
374  for (label proci = 0; proci < recvSizes.size(); proci++)
375  {
376  totalSize += recvSizes[proci];
377  maxLocalSize = max(maxLocalSize, recvSizes[proci]);
378  }
379  Pstream::scatter(totalSize, Pstream::msgType(), localComm_);
380  Pstream::scatter(maxLocalSize, Pstream::msgType(), localComm_);
381  }
382 
383  if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
384  {
385  if (debug)
386  {
387  Pout<< "OFstreamCollator : non-thread gather and write of " << fName
388  << " using local comm " << localComm_ << endl;
389  }
390  // Direct collating and writing (so master blocks until all written!)
391  const PtrList<SubList<char>> dummySlaveData;
392  return writeFile
393  (
394  localComm_,
395  typeName,
396  fName,
397  data,
398  recvSizes,
399  dummySlaveData,
400  fmt,
401  ver,
402  cmp,
403  append
404  );
405  }
406  else if (totalSize <= maxBufferSize_)
407  {
408  // Total size can be stored locally so receive all data now and only
409  // do the writing in the thread
410 
411  if (debug)
412  {
413  Pout<< "OFstreamCollator : non-thread gather; thread write of "
414  << fName << endl;
415  }
416 
417  if (Pstream::master(localComm_))
418  {
419  waitForBufferSpace(totalSize);
420  }
421 
422 
423  // Receive in chunks of labelMax (2^31-1) since this is the maximum
424  // size that a List can be
425 
426  autoPtr<writeData> fileAndDataPtr
427  (
428  new writeData
429  (
430  threadComm_, // Note: comm not actually used anymore
431  typeName,
432  fName,
433  (
434  Pstream::master(localComm_)
435  ? data // Only used on master
436  : string::null
437  ),
438  recvSizes,
439  fmt,
440  ver,
441  cmp,
442  append
443  )
444  );
445  writeData& fileAndData = fileAndDataPtr();
446 
447  PtrList<List<char>>& slaveData = fileAndData.slaveData_;
448 
449  UList<char> slice(const_cast<char*>(data.data()), label(data.size()));
450 
451  slaveData.setSize(recvSizes.size());
452 
453  // Gather all data onto master. Is done in local communicator since
454  // not in write thread. Note that we do not store in contiguous
455  // buffer since that would limit to 2G chars.
456  label startOfRequests = Pstream::nRequests();
457  if (Pstream::master(localComm_))
458  {
459  for (label proci = 1; proci < slaveData.size(); proci++)
460  {
461  slaveData.set(proci, new List<char>(recvSizes[proci]));
463  (
465  proci,
466  reinterpret_cast<char*>(slaveData[proci].begin()),
467  slaveData[proci].byteSize(),
469  localComm_
470  );
471  }
472  }
473  else
474  {
475  if
476  (
478  (
480  0,
481  reinterpret_cast<const char*>(slice.begin()),
482  slice.byteSize(),
484  localComm_
485  )
486  )
487  {
489  << "Cannot send outgoing message. "
490  << "to:" << 0 << " nBytes:"
491  << label(slice.byteSize())
493  }
494  }
495  Pstream::waitRequests(startOfRequests);
496 
497  {
498  std::lock_guard<std::mutex> guard(mutex_);
499 
500  // Append to thread buffer
501  objects_.push(fileAndDataPtr.ptr());
502 
503  // Start thread if not running
504  if (!threadRunning_)
505  {
506  if (thread_.valid())
507  {
508  if (debug)
509  {
510  Pout<< "OFstreamCollator : Waiting for write thread"
511  << endl;
512  }
513  thread_().join();
514  }
515 
516  if (debug)
517  {
518  Pout<< "OFstreamCollator : Starting write thread"
519  << endl;
520  }
521  thread_.reset(new std::thread(writeAll, this));
522  threadRunning_ = true;
523  }
524  }
525 
526  return true;
527  }
528  else
529  {
530  if (debug)
531  {
532  Pout<< "OFstreamCollator : thread gather and write of " << fName
533  << " using communicator " << threadComm_ << endl;
534  }
535 
536  if (!UPstream::haveThreads())
537  {
539  << "mpi does not seem to have thread support."
540  << " Make sure to set buffer size 'maxThreadFileBufferSize'"
541  << " to at least " << totalSize
542  << " to be able to do the collating before threading."
543  << exit(FatalError);
544  }
545 
546  if (Pstream::master(localComm_))
547  {
548  waitForBufferSpace(data.size());
549  }
550 
551  {
552  std::lock_guard<std::mutex> guard(mutex_);
553 
554  // Push all file info on buffer. Note that no slave data provided
555  // so it will trigger communication inside the thread
556  objects_.push
557  (
558  new writeData
559  (
560  threadComm_,
561  typeName,
562  fName,
563  data,
564  recvSizes,
565  fmt,
566  ver,
567  cmp,
568  append
569  )
570  );
571 
572  if (!threadRunning_)
573  {
574  if (thread_.valid())
575  {
576  if (debug)
577  {
578  Pout<< "OFstreamCollator : Waiting for write thread"
579  << endl;
580  }
581  thread_().join();
582  }
583 
584  if (debug)
585  {
586  Pout<< "OFstreamCollator : Starting write thread" << endl;
587  }
588  thread_.reset(new std::thread(writeAll, this));
589  threadRunning_ = true;
590  }
591  }
592 
593  return true;
594  }
595 }
596 
597 
599 {
600  // Wait for all buffer space to be available i.e. wait for all jobs
601  // to finish
602  if (Pstream::master(localComm_))
603  {
604  if (debug)
605  {
606  Pout<< "OFstreamCollator : waiting for thread to have consumed all"
607  << endl;
608  }
609  waitForBufferSpace(-1);
610  }
611 }
612 
613 
614 // ************************************************************************* //
#define forAll(list, i)
Loop across all elements in list.
Definition: UList.H:434
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
A class for handling file names.
Definition: fileName.H:79
bool set(const label) const
Is element set.
Definition: PtrListI.H:65
virtual ~OFstreamCollator()
Destructor.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
void reset(T *=nullptr)
If object pointer already set, delete object and set to given.
Definition: autoPtrI.H:114
error FatalError
dimensioned< Type > max(const dimensioned< Type > &, const dimensioned< Type > &)
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:323
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &start, const UList< char > &masterData, const labelUList &recvSizes, const PtrList< SubList< char >> &slaveData, const UPstream::commsTypes, const bool syncReturnState=true)
Write *this. Ostream only valid on master. Returns starts of.
void size(const label)
Override size to be inconsistent with allocated storage.
Definition: ListI.H:164
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:251
static bool master(const label communicator=0)
Am I the master process.
Definition: UPstream.H:423
labelList identity(const label len)
Create identity map (map[i] == i) of given length.
Definition: ListOps.C:104
static label nRequests()
Get number of outstanding requests.
Definition: UPstream.C:137
static int & msgType()
Message tag of standard messages.
Definition: UPstream.H:476
T * ptr()
Return object pointer for reuse.
Definition: autoPtrI.H:90
UList< label > labelUList
Definition: UList.H:65
dimensioned< Type > sum(const DimensionedField< Type, GeoMesh > &df)
void clear()
Delete object (if the pointer is valid) and set pointer to.
Definition: autoPtrI.H:126
Functions used by OpenFOAM that are specific to POSIX compliant operating systems and need to be repl...
bool write(const word &typeName, const fileName &, const string &data, IOstream::streamFormat, IOstream::versionNumber, IOstream::compressionType, const bool append, const bool useThread=true)
Write file with contents. Blocks until writethread has space.
A class for handling words, derived from string.
Definition: word.H:59
static void writeHeader(Ostream &os, const IOstream::versionNumber version, const IOstream::streamFormat format, const word &type, const string &note, const fileName &location, const word &name)
Helper: write FoamFile IOobject header.
void waitAll()
Wait for all thread actions to have finished.
iterator begin()
Return an iterator to begin traversing the UList.
Definition: UListI.H:216
streamFormat
Enumeration for the format of data in the stream.
Definition: IOstream.H:86
bool valid() const
Return true if the autoPtr valid (ie, the pointer is set)
Definition: autoPtrI.H:83
forAllConstIter(PtrDictionary< phaseModel >, mixture.phases(), phase)
Definition: pEqn.H:29
errorManip< error > abort(error &err)
Definition: errorManip.H:131
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:60
void setSize(const label)
Reset size of PtrList. If extending the PtrList, new entries are.
Definition: PtrList.C:131
static void scatter(const List< commsStruct > &comms, T &Value, const int tag, const label comm)
Scatter data. Distribute without modification. Reverse of gather.
compressionType
Enumeration for the format of data in the stream.
Definition: IOstream.H:193
static const string null
An empty string.
Definition: string.H:88
static label read(const commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=0)
Read into given buffer from given processor and return the.
Definition: UIPread.C:79
static bool haveThreads()
Have support for threads.
Definition: UPstream.H:405
defineTypeNameAndDebug(combustionModel, 0)
static bool write(const commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=0)
Write given buffer to given processor.
Definition: UOPwrite.C:34
const bool writeData(readBool(pdfDictionary.lookup("writeData")))
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
Definition: UPstream.C:147
bool mkDir(const fileName &, mode_t=0777)
Make a directory and return an error if it could not be created.
Definition: POSIX.C:290
label size() const
Return the number of elements in the UPtrList.
Definition: UPtrListI.H:29
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size. 0 = do not use thread.
static label nProcs(const label communicator=0)
Number of processes in parallel run.
Definition: UPstream.H:411
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:335
A templated 1D list of pointers to objects of type <T>, where the size of the array is known and used...
Definition: List.H:70
std::streamsize byteSize() const
Return the binary size in number of characters of the UList.
Definition: UList.C:100
prefixOSstream Pout(cout, "Pout")
Definition: IOstreams.H:53
void push(const T &a)
Push an element onto the stack.
Definition: FIFOStack.H:84
Version number type.
Definition: IOstream.H:96
An auto-pointer similar to the STL auto_ptr but with automatic casting to a reference to the type and...
Definition: PtrList.H:52
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
Definition: UPstream.C:248
unsigned int sleep(const unsigned int)
Sleep for the specified number of seconds.
Definition: POSIX.C:1130
Inter-processor communications stream.
Definition: UPstream.H:58
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:314
Namespace for OpenFOAM.
static void gather(const label comm, const label data, labelList &datas)
Helper: gather single label. Note: using native Pstream.
IOerror FatalIOError