OFstreamCollator.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration | Website: https://openfoam.org
5  \\ / A nd | Copyright (C) 2017-2018 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include "OFstreamCollator.H"
27 #include "OFstream.H"
28 #include "decomposedBlockData.H"
30 
31 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
32 
33 namespace Foam
34 {
35  defineTypeNameAndDebug(OFstreamCollator, 0);
36 }
37 
38 
39 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
40 
41 bool Foam::OFstreamCollator::writeFile
42 (
43  const label comm,
44  const word& typeName,
45  const fileName& fName,
46  const string& masterData,
47  const labelUList& recvSizes,
48  const PtrList<SubList<char>>& slaveData, // optional slave data
50  IOstream::versionNumber ver,
52  const bool append
53 )
54 {
55  if (debug)
56  {
57  Pout<< "OFstreamCollator : Writing master " << masterData.size()
58  << " bytes to " << fName
59  << " using comm " << comm << endl;
60  if (slaveData.size())
61  {
62  Pout<< "OFstreamCollator : Slave data" << endl;
63  forAll(slaveData, proci)
64  {
65  if (slaveData.set(proci))
66  {
67  Pout<< " " << proci
68  << " size:" << slaveData[proci].size()
69  << endl;
70  }
71  }
72  }
73  }
74 
75  autoPtr<OSstream> osPtr;
76  if (UPstream::master(comm))
77  {
78  Foam::mkDir(fName.path());
79  osPtr.reset
80  (
81  new OFstream
82  (
83  fName,
84  fmt,
85  ver,
86  cmp,
87  append
88  )
89  );
90 
91  // We don't have IOobject so cannot use IOobject::writeHeader
92  if (!append)
93  {
94  OSstream& os = osPtr();
96  (
97  os,
98  ver,
99  fmt,
100  typeName,
101  "",
102  fName,
103  fName.name()
104  );
105  }
106  }
107 
108 
109  UList<char> slice
110  (
111  const_cast<char*>(masterData.data()),
112  label(masterData.size())
113  );
114 
115  // Assuming threaded writing hides any slowness so we
116  // can use scheduled communication to send the data to
117  // the master processor in order. However can be unstable
118  // for some mpi so default is non-blocking.
119 
120  List<std::streamoff> start;
122  (
123  comm,
124  osPtr,
125  start,
126  slice,
127  recvSizes,
128  slaveData,
129  (
130  fileOperations::masterUncollatedFileOperation::
131  maxMasterFileBufferSize == 0
134  ),
135  false // do not reduce return state
136  );
137 
138  if (osPtr.valid() && !osPtr().good())
139  {
140  FatalIOErrorInFunction(osPtr())
141  << "Failed writing to " << fName << exit(FatalIOError);
142  }
143 
144  if (debug)
145  {
146  Pout<< "OFstreamCollator : Finished writing " << masterData.size()
147  << " bytes";
148  if (UPstream::master(comm))
149  {
150  off_t sum = 0;
151  forAll(recvSizes, i)
152  {
153  sum += recvSizes[i];
154  }
155  // Use ostringstream to display long int (until writing these is
156  // supported)
157  std::ostringstream os;
158  os << sum;
159  Pout<< " (overall " << os.str() << ")";
160  }
161  Pout<< " to " << fName
162  << " using comm " << comm << endl;
163  }
164 
165  return true;
166 }
167 
168 
169 void* Foam::OFstreamCollator::writeAll(void *threadarg)
170 {
171  OFstreamCollator& handler = *static_cast<OFstreamCollator*>(threadarg);
172 
173  // Consume stack
174  while (true)
175  {
176  writeData* ptr = nullptr;
177 
178  {
179  std::lock_guard<std::mutex> guard(handler.mutex_);
180  if (handler.objects_.size())
181  {
182  ptr = handler.objects_.pop();
183  }
184  }
185 
186  if (!ptr)
187  {
188  break;
189  }
190  else
191  {
192  // Convert storage to pointers
193  PtrList<SubList<char>> slaveData;
194  if (ptr->slaveData_.size())
195  {
196  slaveData.setSize(ptr->slaveData_.size());
197  forAll(slaveData, proci)
198  {
199  if (ptr->slaveData_.set(proci))
200  {
201  slaveData.set
202  (
203  proci,
204  new SubList<char>
205  (
206  ptr->slaveData_[proci],
207  ptr->sizes_[proci]
208  )
209  );
210  }
211  }
212  }
213 
214  bool ok = writeFile
215  (
216  ptr->comm_,
217  ptr->typeName_,
218  ptr->pathName_,
219  ptr->data_,
220  ptr->sizes_,
221  slaveData,
222  ptr->format_,
223  ptr->version_,
224  ptr->compression_,
225  ptr->append_
226  );
227  if (!ok)
228  {
229  FatalIOErrorInFunction(ptr->pathName_)
230  << "Failed writing " << ptr->pathName_
231  << exit(FatalIOError);
232  }
233 
234  delete ptr;
235  }
236  // sleep(1);
237  }
238 
239  if (debug)
240  {
241  Pout<< "OFstreamCollator : Exiting write thread " << endl;
242  }
243 
244  {
245  std::lock_guard<std::mutex> guard(handler.mutex_);
246  handler.threadRunning_ = false;
247  }
248 
249  return nullptr;
250 }
251 
252 
253 void Foam::OFstreamCollator::waitForBufferSpace(const off_t wantedSize) const
254 {
255  while (true)
256  {
257  // Count files to be written
258  off_t totalSize = 0;
259 
260  {
261  std::lock_guard<std::mutex> guard(mutex_);
262  forAllConstIter(FIFOStack<writeData*>, objects_, iter)
263  {
264  totalSize += iter()->size();
265  }
266  }
267 
268  if
269  (
270  totalSize == 0
271  || (wantedSize >= 0 && (totalSize+wantedSize) <= maxBufferSize_)
272  )
273  {
274  break;
275  }
276 
277  if (debug)
278  {
279  std::lock_guard<std::mutex> guard(mutex_);
280  Pout<< "OFstreamCollator : Waiting for buffer space."
281  << " Currently in use:" << totalSize
282  << " limit:" << maxBufferSize_
283  << " files:" << objects_.size()
284  << endl;
285  }
286 
287  sleep(5);
288  }
289 }
290 
291 
292 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
293 
295 :
296  maxBufferSize_(maxBufferSize),
297  threadRunning_(false),
298  localComm_(UPstream::worldComm),
299  threadComm_
300  (
301  UPstream::allocateCommunicator
302  (
303  localComm_,
304  identity(UPstream::nProcs(localComm_))
305  )
306  )
307 {}
308 
309 
311 (
312  const off_t maxBufferSize,
313  const label comm
314 )
315 :
316  maxBufferSize_(maxBufferSize),
317  threadRunning_(false),
318  localComm_(comm),
319  threadComm_
320  (
322  (
323  localComm_,
324  identity(UPstream::nProcs(localComm_))
325  )
326  )
327 {}
328 
329 
330 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
331 
333 {
334  if (thread_.valid())
335  {
336  if (debug)
337  {
338  Pout<< "~OFstreamCollator : Waiting for write thread" << endl;
339  }
340  thread_().join();
341  thread_.clear();
342  }
343 
344  if (threadComm_ != -1)
345  {
346  UPstream::freeCommunicator(threadComm_);
347  }
348 }
349 
350 
351 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
352 
354 (
355  const word& typeName,
356  const fileName& fName,
357  const string& data,
361  const bool append,
362  const bool useThread
363 )
364 {
365  // Determine (on master) sizes to receive. Note: do NOT use thread
366  // communicator
367  labelList recvSizes;
368  decomposedBlockData::gather(localComm_, label(data.size()), recvSizes);
369 
370  off_t totalSize = 0;
371  label maxLocalSize = 0;
372  {
373  for (label proci = 0; proci < recvSizes.size(); proci++)
374  {
375  totalSize += recvSizes[proci];
376  maxLocalSize = max(maxLocalSize, recvSizes[proci]);
377  }
378  Pstream::scatter(totalSize, Pstream::msgType(), localComm_);
379  Pstream::scatter(maxLocalSize, Pstream::msgType(), localComm_);
380  }
381 
382  if (!useThread || maxBufferSize_ == 0 || maxLocalSize > maxBufferSize_)
383  {
384  if (debug)
385  {
386  Pout<< "OFstreamCollator : non-thread gather and write of " << fName
387  << " using local comm " << localComm_ << endl;
388  }
389  // Direct collating and writing (so master blocks until all written!)
390  const PtrList<SubList<char>> dummySlaveData;
391  return writeFile
392  (
393  localComm_,
394  typeName,
395  fName,
396  data,
397  recvSizes,
398  dummySlaveData,
399  fmt,
400  ver,
401  cmp,
402  append
403  );
404  }
405  else if (totalSize <= maxBufferSize_)
406  {
407  // Total size can be stored locally so receive all data now and only
408  // do the writing in the thread
409 
410  if (debug)
411  {
412  Pout<< "OFstreamCollator : non-thread gather; thread write of "
413  << fName << endl;
414  }
415 
416  if (Pstream::master(localComm_))
417  {
418  waitForBufferSpace(totalSize);
419  }
420 
421 
422  // Receive in chunks of labelMax (2^31-1) since this is the maximum
423  // size that a List can be
424 
425  autoPtr<writeData> fileAndDataPtr
426  (
427  new writeData
428  (
429  threadComm_, // Note: comm not actually used anymore
430  typeName,
431  fName,
432  (
433  Pstream::master(localComm_)
434  ? data // Only used on master
435  : string::null
436  ),
437  recvSizes,
438  fmt,
439  ver,
440  cmp,
441  append
442  )
443  );
444  writeData& fileAndData = fileAndDataPtr();
445 
446  PtrList<List<char>>& slaveData = fileAndData.slaveData_;
447 
448  UList<char> slice(const_cast<char*>(data.data()), label(data.size()));
449 
450  slaveData.setSize(recvSizes.size());
451 
452  // Gather all data onto master. Is done in local communicator since
453  // not in write thread. Note that we do not store in contiguous
454  // buffer since that would limit to 2G chars.
455  label startOfRequests = Pstream::nRequests();
456  if (Pstream::master(localComm_))
457  {
458  for (label proci = 1; proci < slaveData.size(); proci++)
459  {
460  slaveData.set(proci, new List<char>(recvSizes[proci]));
462  (
464  proci,
465  reinterpret_cast<char*>(slaveData[proci].begin()),
466  slaveData[proci].byteSize(),
468  localComm_
469  );
470  }
471  }
472  else
473  {
474  if
475  (
477  (
479  0,
480  reinterpret_cast<const char*>(slice.begin()),
481  slice.byteSize(),
483  localComm_
484  )
485  )
486  {
488  << "Cannot send outgoing message. "
489  << "to:" << 0 << " nBytes:"
490  << label(slice.byteSize())
492  }
493  }
494  Pstream::waitRequests(startOfRequests);
495 
496  {
497  std::lock_guard<std::mutex> guard(mutex_);
498 
499  // Append to thread buffer
500  objects_.push(fileAndDataPtr.ptr());
501 
502  // Start thread if not running
503  if (!threadRunning_)
504  {
505  if (thread_.valid())
506  {
507  if (debug)
508  {
509  Pout<< "OFstreamCollator : Waiting for write thread"
510  << endl;
511  }
512  thread_().join();
513  }
514 
515  if (debug)
516  {
517  Pout<< "OFstreamCollator : Starting write thread"
518  << endl;
519  }
520  thread_.reset(new std::thread(writeAll, this));
521  threadRunning_ = true;
522  }
523  }
524 
525  return true;
526  }
527  else
528  {
529  if (debug)
530  {
531  Pout<< "OFstreamCollator : thread gather and write of " << fName
532  << " using communicator " << threadComm_ << endl;
533  }
534 
535  if (!UPstream::haveThreads())
536  {
538  << "mpi does not seem to have thread support."
539  << " Make sure to set buffer size 'maxThreadFileBufferSize'"
540  << " to at least " << totalSize
541  << " to be able to do the collating before threading."
542  << exit(FatalError);
543  }
544 
545  if (Pstream::master(localComm_))
546  {
547  waitForBufferSpace(data.size());
548  }
549 
550  {
551  std::lock_guard<std::mutex> guard(mutex_);
552 
553  // Push all file info on buffer. Note that no slave data provided
554  // so it will trigger communication inside the thread
555  objects_.push
556  (
557  new writeData
558  (
559  threadComm_,
560  typeName,
561  fName,
562  data,
563  recvSizes,
564  fmt,
565  ver,
566  cmp,
567  append
568  )
569  );
570 
571  if (!threadRunning_)
572  {
573  if (thread_.valid())
574  {
575  if (debug)
576  {
577  Pout<< "OFstreamCollator : Waiting for write thread"
578  << endl;
579  }
580  thread_().join();
581  }
582 
583  if (debug)
584  {
585  Pout<< "OFstreamCollator : Starting write thread" << endl;
586  }
587  thread_.reset(new std::thread(writeAll, this));
588  threadRunning_ = true;
589  }
590  }
591 
592  return true;
593  }
594 }
595 
596 
598 {
599  // Wait for all buffer space to be available i.e. wait for all jobs
600  // to finish
601  if (Pstream::master(localComm_))
602  {
603  if (debug)
604  {
605  Pout<< "OFstreamCollator : waiting for thread to have consumed all"
606  << endl;
607  }
608  waitForBufferSpace(-1);
609  }
610 }
611 
612 
613 // ************************************************************************* //
#define forAll(list, i)
Loop across all elements in list.
Definition: UList.H:434
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
A class for handling file names.
Definition: fileName.H:79
bool set(const label) const
Is element set.
Definition: PtrListI.H:65
virtual ~OFstreamCollator()
Destructor.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
void reset(T *=nullptr)
If object pointer already set, delete object and set to given.
Definition: autoPtrI.H:114
error FatalError
dimensioned< Type > max(const dimensioned< Type > &, const dimensioned< Type > &)
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:319
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &start, const UList< char > &masterData, const labelUList &recvSizes, const PtrList< SubList< char >> &slaveData, const UPstream::commsTypes, const bool syncReturnState=true)
Write *this. Ostream only valid on master. Returns starts of.
void size(const label)
Override size to be inconsistent with allocated storage.
Definition: ListI.H:164
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:251
static bool master(const label communicator=0)
Am I the master process.
Definition: UPstream.H:423
labelList identity(const label len)
Create identity map (map[i] == i) of given length.
Definition: ListOps.C:104
static label nRequests()
Get number of outstanding requests.
Definition: UPstream.C:137
static int & msgType()
Message tag of standard messages.
Definition: UPstream.H:476
T * ptr()
Return object pointer for reuse.
Definition: autoPtrI.H:90
UList< label > labelUList
Definition: UList.H:65
dimensioned< Type > sum(const DimensionedField< Type, GeoMesh > &df)
void clear()
Delete object (if the pointer is valid) and set pointer to.
Definition: autoPtrI.H:126
bool write(const word &typeName, const fileName &, const string &data, IOstream::streamFormat, IOstream::versionNumber, IOstream::compressionType, const bool append, const bool useThread=true)
Write file with contents. Blocks until writethread has space.
A class for handling words, derived from string.
Definition: word.H:59
static void writeHeader(Ostream &os, const IOstream::versionNumber version, const IOstream::streamFormat format, const word &type, const string &note, const fileName &location, const word &name)
Helper: write FoamFile IOobject header.
void waitAll()
Wait for all thread actions to have finished.
iterator begin()
Return an iterator to begin traversing the UList.
Definition: UListI.H:216
streamFormat
Enumeration for the format of data in the stream.
Definition: IOstream.H:86
bool valid() const
Return true if the autoPtr valid (ie, the pointer is set)
Definition: autoPtrI.H:83
forAllConstIter(PtrDictionary< phaseModel >, mixture.phases(), phase)
Definition: pEqn.H:29
errorManip< error > abort(error &err)
Definition: errorManip.H:131
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:60
void setSize(const label)
Reset size of PtrList. If extending the PtrList, new entries are.
Definition: PtrList.C:131
static void scatter(const List< commsStruct > &comms, T &Value, const int tag, const label comm)
Scatter data. Distribute without modification. Reverse of gather.
compressionType
Enumeration for the format of data in the stream.
Definition: IOstream.H:193
static const string null
An empty string.
Definition: string.H:88
static label read(const commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=0)
Read into given buffer from given processor and return the.
Definition: UIPread.C:79
static bool haveThreads()
Have support for threads.
Definition: UPstream.H:405
defineTypeNameAndDebug(combustionModel, 0)
static bool write(const commsTypes commsType, const int toProcNo, const char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=0)
Write given buffer to given processor.
Definition: UOPwrite.C:34
const bool writeData(readBool(pdfDictionary.lookup("writeData")))
static void waitRequests(const label start=0)
Wait until all requests (from start onwards) have finished.
Definition: UPstream.C:147
bool mkDir(const fileName &, mode_t=0777)
Make a directory and return an error if it could not be created.
Definition: POSIX.C:290
label size() const
Return the number of elements in the UPtrList.
Definition: UPtrListI.H:29
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size. 0 = do not use thread.
static label nProcs(const label communicator=0)
Number of processes in parallel run.
Definition: UPstream.H:411
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:331
A templated 1D list of pointers to objects of type <T>, where the size of the array is known and used...
Definition: List.H:70
std::streamsize byteSize() const
Return the binary size in number of characters of the UList.
Definition: UList.C:100
prefixOSstream Pout(cout, "Pout")
Definition: IOstreams.H:53
void push(const T &a)
Push an element onto the stack.
Definition: FIFOStack.H:84
Version number type.
Definition: IOstream.H:96
An auto-pointer similar to the STL auto_ptr but with automatic casting to a reference to the type and...
Definition: PtrList.H:52
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
Definition: UPstream.C:250
unsigned int sleep(const unsigned int)
Sleep for the specified number of seconds.
Definition: POSIX.C:1134
Inter-processor communications stream.
Definition: UPstream.H:58
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:316
Namespace for OpenFOAM.
static void gather(const label comm, const label data, labelList &datas)
Helper: gather single label. Note: using native Pstream.
IOerror FatalIOError