collatedFileOperation.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration | Website: https://openfoam.org
5  \\ / A nd | Copyright (C) 2017-2021 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include "collatedFileOperation.H"
27 #include "Time.H"
29 #include "decomposedBlockData.H"
30 #include "masterOFstream.H"
31 #include "OFstream.H"
33 
34 /* * * * * * * * * * * * * * * Static Member Data * * * * * * * * * * * * * */
35 
36 namespace Foam
37 {
38 namespace fileOperations
39 {
42  (
45  word
46  );
47 
49  (
50  debug::floatOptimisationSwitch("maxThreadFileBufferSize", 1e9)
51  );
52 
53  // Mark as needing threaded mpi
55  (
58  word,
59  collated
60  );
61 }
62 }
63 
64 
65 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
66 
68 {
70 
71  string ioRanksString(getEnv("FOAM_IORANKS"));
72  if (!ioRanksString.empty())
73  {
74  IStringStream is(ioRanksString);
75  is >> ioRanks;
76  }
77 
78  return ioRanks;
79 }
80 
81 
83 (
84  const label proci
85 )
86 const
87 {
88  if (Pstream::parRun())
89  {
90  return Pstream::master(comm_);
91  }
92  else
93  {
94  // Use any IO ranks
95  if (ioRanks_.size())
96  {
97  // Find myself in IO rank
98  return findIndex(ioRanks_, proci) != -1;
99  }
100  else
101  {
102  // Assume all in single communicator
103  return proci == 0;
104  }
105  }
106 }
107 
108 
110 (
111  const regIOobject& io,
112  const fileName& filePath,
116 ) const
117 {
118  // Append to processors/ file
119 
120  label proci = detectProcessorPath(io.objectPath());
121 
122  if (debug)
123  {
124  Pout<< "collatedFileOperation::writeObject :"
125  << " For local object : " << io.name()
126  << " appending processor " << proci
127  << " data to " << filePath << endl;
128  }
129 
130  if (proci == -1)
131  {
133  << "Not a valid processor path " << filePath
134  << exit(FatalError);
135  }
136 
137  const bool isMaster = isMasterRank(proci);
138 
139  // Determine the local rank if the filePath is a per-rank one
140  label localProci = proci;
141  {
142  fileName path, procDir, local;
143  label groupStart, groupSize, nProcs;
144  splitProcessorPath
145  (
146  filePath,
147  path,
148  procDir,
149  local,
150  groupStart,
151  groupSize,
152  nProcs
153  );
154  if (groupSize > 0 && groupStart != -1)
155  {
156  localProci = proci-groupStart;
157  }
158  }
159 
160 
161  // Create string from all data to write
162  string buf;
163  {
164  OStringStream os(fmt, ver);
165  if (isMaster)
166  {
167  if (!io.writeHeader(os))
168  {
169  return false;
170  }
171  }
172 
173  // Write the data to the Ostream
174  if (!io.writeData(os))
175  {
176  return false;
177  }
178 
179  if (isMaster)
180  {
182  }
183 
184  buf = os.str();
185  }
186 
187 
188  // Note: cannot do append + compression. This is a limitation
189  // of ogzstream (or rather most compressed formats)
190 
191  OFstream os
192  (
193  filePath,
195  ver,
196  IOstream::UNCOMPRESSED, // no compression
197  !isMaster
198  );
199 
200  if (!os.good())
201  {
203  << "Cannot open for appending"
204  << exit(FatalIOError);
205  }
206 
207  if (isMaster)
208  {
209  IOobject::writeBanner(os) << IOobject::foamFile << "\n{\n";
210 
211  if (os.version() != IOstream::currentVersion)
212  {
213  os << " version " << os.version() << ";\n";
214  }
215 
216  os << " format " << os.format() << ";\n"
217  << " class " << decomposedBlockData::typeName
218  << ";\n"
219  << " location " << filePath << ";\n"
220  << " object " << filePath.name() << ";\n"
221  << "}" << nl;
222 
223  IOobject::writeDivider(os) << nl;
224  }
225 
226  // Write data
227  UList<char> slice
228  (
229  const_cast<char*>(buf.data()),
230  label(buf.size())
231  );
232  os << nl << "// Processor" << localProci << nl << slice << nl;
233 
234  return os.good();
235 }
236 
237 
238 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
239 
241 (
242  const bool verbose
243 )
244 :
246  (
247  (
248  ioRanks().size()
249  ? UPstream::allocateCommunicator
250  (
251  UPstream::worldComm,
252  subRanks(Pstream::nProcs())
253  )
254  : UPstream::worldComm
255  ),
256  false
257  ),
258  myComm_(comm_),
259  writer_(maxThreadFileBufferSize, comm_),
260  nProcs_(Pstream::nProcs()),
261  ioRanks_(ioRanks())
262 {
263  if (verbose)
264  {
265  InfoHeader
266  << "I/O : " << typeName
267  << " (maxThreadFileBufferSize " << maxThreadFileBufferSize
268  << ')' << endl;
269 
270  if (maxThreadFileBufferSize == 0)
271  {
272  InfoHeader
273  << " Threading not activated "
274  "since maxThreadFileBufferSize = 0." << nl
275  << " Writing may run slowly for large file sizes."
276  << endl;
277  }
278  else
279  {
280  InfoHeader
281  << " Threading activated "
282  "since maxThreadFileBufferSize > 0." << nl
283  << " Requires large enough buffer to collect all data"
284  " or thread support " << nl
285  << " enabled in MPI. If thread support cannot be "
286  "enabled, deactivate" << nl
287  << " threading by setting maxThreadFileBufferSize "
288  "to 0 in" << nl
289  << " $FOAM_ETC/controlDict"
290  << endl;
291  }
292 
293  if (ioRanks_.size())
294  {
295  // Print a bit of information
297  if (Pstream::master(comm_))
298  {
299  ioRanks[Pstream::myProcNo()] = hostName()+"."+name(pid());
300  }
302 
303  InfoHeader << " IO nodes:" << endl;
304  forAll(ioRanks, proci)
305  {
306  if (!ioRanks[proci].empty())
307  {
308  InfoHeader << " " << ioRanks[proci] << endl;
309  }
310  }
311  }
312 
313 
314  if
315  (
318  )
319  {
321  << "Resetting fileModificationChecking to inotify" << endl;
322  }
323 
324  if
325  (
328  )
329  {
331  << "Resetting fileModificationChecking to timeStamp" << endl;
332  }
333  }
334 }
335 
336 
338 (
339  const label comm,
340  const labelList& ioRanks,
341  const word& typeName,
342  const bool verbose
343 )
344 :
345  masterUncollatedFileOperation(comm, false),
346  myComm_(-1),
347  writer_(maxThreadFileBufferSize, comm),
348  nProcs_(Pstream::nProcs()),
349  ioRanks_(ioRanks)
350 {
351  if (verbose)
352  {
353  InfoHeader
354  << "I/O : " << typeName
355  << " (maxThreadFileBufferSize " << maxThreadFileBufferSize
356  << ')' << endl;
357 
358  if (maxThreadFileBufferSize == 0)
359  {
360  InfoHeader
361  << " Threading not activated "
362  "since maxThreadFileBufferSize = 0." << nl
363  << " Writing may run slowly for large file sizes."
364  << endl;
365  }
366  else
367  {
368  InfoHeader
369  << " Threading activated "
370  "since maxThreadFileBufferSize > 0." << nl
371  << " Requires large enough buffer to collect all data"
372  " or thread support " << nl
373  << " enabled in MPI. If thread support cannot be "
374  "enabled, deactivate" << nl
375  << " threading by setting maxThreadFileBufferSize "
376  "to 0 in" << nl
377  << " $FOAM_ETC/controlDict"
378  << endl;
379  }
380 
381  if
382  (
385  )
386  {
388  << "Resetting fileModificationChecking to inotify" << endl;
389  }
390 
391  if
392  (
395  )
396  {
398  << "Resetting fileModificationChecking to timeStamp" << endl;
399  }
400  }
401 }
402 
403 
404 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
405 
407 {
408  if (myComm_ != -1 && myComm_ != UPstream::worldComm)
409  {
411  }
412 }
413 
414 
415 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
416 
418 (
419  const IOobject& io,
420  const word& typeName
421 ) const
422 {
423  // Replacement for objectPath
424  if (io.time().processorCase())
425  {
427  (
428  io,
430  "dummy", // not used for processorsobject
431  io.instance()
432  );
433  }
434  else
435  {
437  (
438  io,
440  word::null,
441  io.instance()
442  );
443  }
444 }
445 
446 
448 (
449  const regIOobject& io,
453  const bool write
454 ) const
455 {
456  const Time& tm = io.time();
457  const fileName& inst = io.instance();
458 
459  if (inst.isAbsolute() || !tm.processorCase())
460  {
461  mkDir(io.path());
462  fileName filePath(io.objectPath());
463 
464  if (debug)
465  {
466  Pout<< "collatedFileOperation::writeObject :"
467  << " For object : " << io.name()
468  << " falling back to master-only output to " << io.path()
469  << endl;
470  }
471 
472  masterOFstream os
473  (
474  filePath,
475  fmt,
476  ver,
477  cmp,
478  false,
479  write
480  );
481 
482  // If any of these fail, return (leave error handling to Ostream class)
483  if (!os.good())
484  {
485  return false;
486  }
487  if (!io.writeHeader(os))
488  {
489  return false;
490  }
491  // Write the data to the Ostream
492  if (!io.writeData(os))
493  {
494  return false;
495  }
497 
498  return true;
499  }
500  else
501  {
502  // Construct the equivalent processors/ directory
503  fileName path(processorsPath(io, inst, processorsDir(io)));
504 
505  mkDir(path);
506  fileName filePath(path/io.name());
507 
508  if (io.global())
509  {
510  if (debug)
511  {
512  Pout<< "collatedFileOperation::writeObject :"
513  << " For global object : " << io.name()
514  << " falling back to master-only output to " << filePath
515  << endl;
516  }
517 
518  masterOFstream os
519  (
520  filePath,
521  fmt,
522  ver,
523  cmp,
524  false,
525  write
526  );
527 
528  // If any of these fail, return (leave error handling to Ostream
529  // class)
530  if (!os.good())
531  {
532  return false;
533  }
534  if (!io.writeHeader(os))
535  {
536  return false;
537  }
538  // Write the data to the Ostream
539  if (!io.writeData(os))
540  {
541  return false;
542  }
544 
545  return true;
546  }
547  else if (!Pstream::parRun())
548  {
549  // Special path for e.g. decomposePar. Append to
550  // processorsDDD/ file
551  if (debug)
552  {
553  Pout<< "collatedFileOperation::writeObject :"
554  << " For object : " << io.name()
555  << " appending to " << filePath << endl;
556  }
557 
558  return appendObject(io, filePath, fmt, ver, cmp);
559  }
560  else
561  {
562  // Re-check static maxThreadFileBufferSize variable to see
563  // if needs to use threading
564  bool useThread = (maxThreadFileBufferSize > 0);
565 
566  if (debug)
567  {
568  Pout<< "collatedFileOperation::writeObject :"
569  << " For object : " << io.name()
570  << " starting collating output to " << filePath
571  << " useThread:" << useThread << endl;
572  }
573 
574  if (!useThread)
575  {
576  writer_.waitAll();
577  }
578 
580  (
581  writer_,
582  filePath,
583  fmt,
584  ver,
585  cmp,
586  useThread
587  );
588 
589  // If any of these fail, return (leave error handling to Ostream
590  // class)
591  if (!os.good())
592  {
593  return false;
594  }
595  if (Pstream::master(comm_) && !io.writeHeader(os))
596  {
597  return false;
598  }
599  // Write the data to the Ostream
600  if (!io.writeData(os))
601  {
602  return false;
603  }
604  if (Pstream::master(comm_))
605  {
607  }
608 
609  return true;
610  }
611  }
612 }
613 
615 {
616  if (debug)
617  {
618  Pout<< "collatedFileOperation::flush : clearing and waiting for thread"
619  << endl;
620  }
622  // Wait for thread to finish (note: also removes thread)
623  writer_.waitAll();
624 }
625 
626 
628 (
629  const fileName& fName
630 ) const
631 {
632  if (Pstream::parRun())
633  {
634  const List<int>& procs(UPstream::procID(comm_));
635 
636  word procDir(processorsBaseDir+Foam::name(Pstream::nProcs()));
637 
638  if (procs.size() != Pstream::nProcs())
639  {
640  procDir +=
641  + "_"
642  + Foam::name(procs[0])
643  + "-"
644  + Foam::name(procs.last());
645  }
646  return procDir;
647  }
648  else
649  {
650  word procDir(processorsBaseDir+Foam::name(nProcs_));
651 
652  if (ioRanks_.size())
653  {
654  // Detect current processor number
655  label proci = detectProcessorPath(fName);
656 
657  if (proci != -1)
658  {
659  // Find lowest io rank
660  label minProc = 0;
661  label maxProc = nProcs_-1;
662  forAll(ioRanks_, i)
663  {
664  if (ioRanks_[i] >= nProcs_)
665  {
666  break;
667  }
668  else if (ioRanks_[i] <= proci)
669  {
670  minProc = ioRanks_[i];
671  }
672  else
673  {
674  maxProc = ioRanks_[i]-1;
675  break;
676  }
677  }
678  procDir +=
679  + "_"
680  + Foam::name(minProc)
681  + "-"
682  + Foam::name(maxProc);
683  }
684  }
685 
686  return procDir;
687  }
688 }
689 
690 
692 (
693  const IOobject& io
694 ) const
695 {
696  return processorsDir(io.objectPath(false));
697 }
698 
699 
701 {
702  nProcs_ = nProcs;
703 
704  if (debug)
705  {
706  Pout<< "collatedFileOperation::setNProcs :"
707  << " Setting number of processors to " << nProcs_ << endl;
708  }
709 }
710 
711 
712 // ************************************************************************* //
#define forAll(list, i)
Loop across all elements in list.
Definition: UList.H:434
Macros for easy insertion into run-time selection tables.
IOobject defines the attributes of an object for which implicit objectRegistry management is supporte...
Definition: IOobject.H:99
static Stream & writeBanner(Stream &os, bool noHint=false)
Write the standard OpenFOAM file/dictionary banner.
Definition: IOobjectI.H:45
const Time & time() const
Return time.
Definition: IOobject.C:318
static Stream & writeEndDivider(Stream &os)
Write the standard end file divider.
Definition: IOobjectI.H:103
fileName & instance() const
Return the instance directory, constant, system, <time> etc.
Definition: IOobject.C:355
static constexpr const char * foamFile
Keyword for the FoamFile header sub-dictionary.
Definition: IOobject.H:104
static fileCheckTypes fileModificationChecking
Type of file modification checking.
Definition: IOobject.H:226
static Stream & writeDivider(Stream &os)
Write the standard file section divider.
Definition: IOobjectI.H:93
const word & name() const
Return name.
Definition: IOobject.H:310
bool writeHeader(Ostream &) const
Write header.
fileName objectPath(const bool global) const
Return complete path + object name including the processor.
Definition: IOobject.H:411
Version number type.
Definition: IOstream.H:97
static const versionNumber currentVersion
Current version number.
Definition: IOstream.H:203
streamFormat format() const
Return current stream format.
Definition: IOstream.H:374
streamFormat
Enumeration for the format of data in the stream.
Definition: IOstream.H:87
versionNumber version() const
Return the stream version.
Definition: IOstream.H:396
compressionType
Enumeration for the format of data in the stream.
Definition: IOstream.H:194
bool good() const
Return true if next operation might succeed.
Definition: IOstream.H:330
Input from memory buffer stream.
Definition: IStringStream.H:52
void size(const label)
Override size to be inconsistent with allocated storage.
Definition: ListI.H:164
Output to file stream.
Definition: OFstream.H:86
Output to memory buffer stream.
Definition: OStringStream.H:52
string str() const
Return the string.
Inter-processor communications stream.
Definition: Pstream.H:56
static void gatherList(const List< commsStruct > &comms, List< T > &Values, const int tag, const label comm)
Gather data but keep individual values separate.
bool processorCase() const
Return true if this is a processor case.
Definition: TimePaths.H:88
Class to control time during OpenFOAM simulations that is also the top-level objectRegistry.
Definition: Time.H:76
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: UList.H:74
T & last()
Return the last element of the list.
Definition: UListI.H:128
Inter-processor communications stream.
Definition: UPstream.H:59
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:314
static bool master(const label communicator=0)
Am I the master process.
Definition: UPstream.H:423
static List< int > & procID(label communicator)
Process ID of given process index.
Definition: UPstream.H:440
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:278
static label nProcs(const label communicator=0)
Number of processes in parallel run.
Definition: UPstream.H:411
static bool & parRun()
Is this a parallel run?
Definition: UPstream.H:399
static int myProcNo(const label communicator=0)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:429
A class for handling file names.
Definition: fileName.H:82
bool isAbsolute() const
Return true if file name is absolute.
Definition: fileName.C:73
word name() const
Return file name (part beyond last /)
Definition: fileName.C:195
const label comm_
Communicator to use.
Definition: fileOperation.H:96
Version of masterUncollatedFileOperation that collates regIOobjects into a container in the processor...
virtual bool writeObject(const regIOobject &, IOstream::streamFormat format=IOstream::ASCII, IOstream::versionNumber version=IOstream::currentVersion, IOstream::compressionType compression=IOstream::UNCOMPRESSED, const bool write=true) const
Writes a regIOobject (so header, contents and divider).
bool isMasterRank(const label proci) const
Is proci master of communicator (in parallel) or master of.
virtual void setNProcs(const label nProcs)
Set number of processor directories/results. Only used in.
virtual word processorsDir(const IOobject &) const
Actual name of processors dir.
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
const labelList ioRanks_
Ranks of IO handlers.
collatedFileOperation(const bool verbose)
Construct null.
static float maxThreadFileBufferSize
Max size of thread buffer size. This is the overall size of.
bool appendObject(const regIOobject &io, const fileName &filePath, IOstream::streamFormat fmt, IOstream::versionNumber ver, IOstream::compressionType cmp) const
Append to processors/ file.
virtual fileName objectPath(const IOobject &io, const word &typeName) const
Generate disk file name for object. Opposite of filePath.
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
fileName relativeObjectPath(const IOobject &, const pathType &searchType, const word &processorsDir, const word &instancePath) const
Construct filePath.
Master-only drop-in replacement for OFstream.
regIOobject is an abstract class derived from IOobject to handle automatic object registration with t...
Definition: regIOobject.H:55
fileName objectPath() const
Return complete path + object name.
Definition: regIOobject.H:159
virtual bool writeData(Ostream &) const =0
Pure virtual writaData function.
virtual bool global() const
Return true if object is global, i.e. same for all processors.
Definition: regIOobject.C:181
fileName path() const
Return complete path.
Definition: regIOobject.C:199
Master-only drop-in replacement for OFstream.
A class for handling words, derived from string.
Definition: word.H:62
static const word null
An empty word.
Definition: word.H:77
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:318
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:306
#define WarningInFunction
Report a warning using Foam::Warning.
#define InfoHeader
Report write to Foam::Info if the local log switch is true.
float floatOptimisationSwitch(const char *name, const float defaultValue=0)
Lookup optimisation switch or add default value.
Definition: debug.C:278
addToRunTimeSelectionTable(fileOperation, collatedFileOperation, word)
defineTypeNameAndDebug(collatedFileOperation, 0)
addNamedToRunTimeSelectionTable(fileOperationInitialise, collatedFileOperationInitialise, word, collated)
void write(std::ostream &os, const bool binary, List< floatScalar > &fField)
Write floats ascii or binary.
Namespace for OpenFOAM.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
bool mkDir(const fileName &, mode_t=0777)
Make a directory and return an error if it could not be created.
Definition: POSIX.C:290
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:251
string hostName(const bool full=false)
Return the system's host name, as per hostname(1)
Definition: POSIX.C:125
string getEnv(const word &)
Return environment variable of given name.
Definition: POSIX.C:97
IOerror FatalIOError
pid_t pid()
Return the PID of this process.
Definition: POSIX.C:73
prefixOSstream Pout(cout, "Pout")
Definition: IOstreams.H:53
label findIndex(const ListType &, typename ListType::const_reference, const label start=0)
Find first occurrence of given element and return index,.
error FatalError
word name(const complex &)
Return a string representation of a complex.
Definition: complex.C:47
static const char nl
Definition: Ostream.H:260