collatedFileOperation.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration | Website: https://openfoam.org
5  \\ / A nd | Copyright (C) 2017-2021 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include "collatedFileOperation.H"
27 #include "Time.H"
29 #include "decomposedBlockData.H"
30 #include "masterOFstream.H"
31 #include "OFstream.H"
33 
34 /* * * * * * * * * * * * * * * Static Member Data * * * * * * * * * * * * * */
35 
36 namespace Foam
37 {
38 namespace fileOperations
39 {
42  (
45  word
46  );
47 
49  (
50  debug::floatOptimisationSwitch("maxThreadFileBufferSize", 1e9)
51  );
52 
53  // Mark as needing threaded mpi
55  (
58  word,
59  collated
60  );
61 }
62 }
63 
64 
65 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
66 
68 {
69  labelList ioRanks;
70 
71  string ioRanksString(getEnv("FOAM_IORANKS"));
72  if (!ioRanksString.empty())
73  {
74  IStringStream is(ioRanksString);
75  is >> ioRanks;
76  }
77 
78  return ioRanks;
79 }
80 
81 
83 (
84  const label proci
85 )
86 const
87 {
88  if (Pstream::parRun())
89  {
90  return Pstream::master(comm_);
91  }
92  else
93  {
94  // Use any IO ranks
95  if (ioRanks_.size())
96  {
97  // Find myself in IO rank
98  return findIndex(ioRanks_, proci) != -1;
99  }
100  else
101  {
102  // Assume all in single communicator
103  return proci == 0;
104  }
105  }
106 }
107 
108 
110 (
111  const regIOobject& io,
112  const fileName& filePath,
116 ) const
117 {
118  // Append to processors/ file
119 
120  label proci = detectProcessorPath(io.objectPath());
121 
122  if (debug)
123  {
124  Pout<< "collatedFileOperation::writeObject :"
125  << " For local object : " << io.name()
126  << " appending processor " << proci
127  << " data to " << filePath << endl;
128  }
129 
130  if (proci == -1)
131  {
133  << "Not a valid processor path " << filePath
134  << exit(FatalError);
135  }
136 
137  const bool isMaster = isMasterRank(proci);
138 
139  // Determine the local rank if the filePath is a per-rank one
140  label localProci = proci;
141  {
142  fileName path, procDir, local;
143  label groupStart, groupSize, nProcs;
144  splitProcessorPath
145  (
146  filePath,
147  path,
148  procDir,
149  local,
150  groupStart,
151  groupSize,
152  nProcs
153  );
154  if (groupSize > 0 && groupStart != -1)
155  {
156  localProci = proci-groupStart;
157  }
158  }
159 
160 
161  // Create string from all data to write
162  string buf;
163  {
164  OStringStream os(fmt, ver);
165  if (isMaster)
166  {
167  if (!io.writeHeader(os))
168  {
169  return false;
170  }
171  }
172 
173  // Write the data to the Ostream
174  if (!io.writeData(os))
175  {
176  return false;
177  }
178 
179  if (isMaster)
180  {
182  }
183 
184  buf = os.str();
185  }
186 
187 
188  // Note: cannot do append + compression. This is a limitation
189  // of ogzstream (or rather most compressed formats)
190 
191  OFstream os
192  (
193  filePath,
195  ver,
196  IOstream::UNCOMPRESSED, // no compression
197  !isMaster
198  );
199 
200  if (!os.good())
201  {
203  << "Cannot open for appending"
204  << exit(FatalIOError);
205  }
206 
207  if (isMaster)
208  {
209  IOobject::writeBanner(os) << IOobject::foamFile << "\n{\n";
210 
211  if (os.version() != IOstream::currentVersion)
212  {
213  os << " version " << os.version() << ";\n";
214  }
215 
216  os << " format " << os.format() << ";\n"
217  << " class " << decomposedBlockData::typeName
218  << ";\n"
219  << " location " << filePath << ";\n"
220  << " object " << filePath.name() << ";\n"
221  << "}" << nl;
222 
223  IOobject::writeDivider(os) << nl;
224  }
225 
226  // Write data
227  UList<char> slice
228  (
229  const_cast<char*>(buf.data()),
230  label(buf.size())
231  );
232  os << nl << "// Processor" << localProci << nl << slice << nl;
233 
234  return os.good();
235 }
236 
237 
238 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
239 
241 (
242  const bool verbose
243 )
244 :
246  (
247  (
248  ioRanks().size()
250  (
252  subRanks(Pstream::nProcs())
253  )
255  ),
256  false
257  ),
258  myComm_(comm_),
259  writer_(maxThreadFileBufferSize, comm_),
260  nProcs_(Pstream::nProcs()),
261  ioRanks_(ioRanks())
262 {
263  if (verbose)
264  {
265  InfoHeader
266  << "I/O : " << typeName
267  << " (maxThreadFileBufferSize " << maxThreadFileBufferSize
268  << ')' << endl;
269 
270  if (maxThreadFileBufferSize == 0)
271  {
272  InfoHeader
273  << " Threading not activated "
274  "since maxThreadFileBufferSize = 0." << nl
275  << " Writing may run slowly for large file sizes."
276  << endl;
277  }
278  else
279  {
280  InfoHeader
281  << " Threading activated "
282  "since maxThreadFileBufferSize > 0." << nl
283  << " Requires large enough buffer to collect all data"
284  " or thread support " << nl
285  << " enabled in MPI. If thread support cannot be "
286  "enabled, deactivate" << nl
287  << " threading by setting maxThreadFileBufferSize "
288  "to 0 in" << nl
289  << " $FOAM_ETC/controlDict"
290  << endl;
291  }
292 
293  if (ioRanks_.size())
294  {
295  // Print a bit of information
296  stringList ioRanks(Pstream::nProcs());
297  if (Pstream::master(comm_))
298  {
299  ioRanks[Pstream::myProcNo()] = hostName()+"."+name(pid());
300  }
301  Pstream::gatherList(ioRanks);
302 
303  InfoHeader << " IO nodes:" << endl;
304  forAll(ioRanks, proci)
305  {
306  if (!ioRanks[proci].empty())
307  {
308  InfoHeader << " " << ioRanks[proci] << endl;
309  }
310  }
311  }
312 
313 
314  if
315  (
318  )
319  {
321  << "Resetting fileModificationChecking to inotify" << endl;
322  }
323 
324  if
325  (
328  )
329  {
331  << "Resetting fileModificationChecking to timeStamp" << endl;
332  }
333  }
334 }
335 
336 
338 (
339  const label comm,
340  const labelList& ioRanks,
341  const word& typeName,
342  const bool verbose
343 )
344 :
345  masterUncollatedFileOperation(comm, false),
346  myComm_(-1),
347  writer_(maxThreadFileBufferSize, comm),
348  nProcs_(Pstream::nProcs()),
349  ioRanks_(ioRanks)
350 {
351  if (verbose)
352  {
353  InfoHeader
354  << "I/O : " << typeName
355  << " (maxThreadFileBufferSize " << maxThreadFileBufferSize
356  << ')' << endl;
357 
358  if (maxThreadFileBufferSize == 0)
359  {
360  InfoHeader
361  << " Threading not activated "
362  "since maxThreadFileBufferSize = 0." << nl
363  << " Writing may run slowly for large file sizes."
364  << endl;
365  }
366  else
367  {
368  InfoHeader
369  << " Threading activated "
370  "since maxThreadFileBufferSize > 0." << nl
371  << " Requires large enough buffer to collect all data"
372  " or thread support " << nl
373  << " enabled in MPI. If thread support cannot be "
374  "enabled, deactivate" << nl
375  << " threading by setting maxThreadFileBufferSize "
376  "to 0 in" << nl
377  << " $FOAM_ETC/controlDict"
378  << endl;
379  }
380 
381  if
382  (
385  )
386  {
388  << "Resetting fileModificationChecking to inotify" << endl;
389  }
390 
391  if
392  (
395  )
396  {
398  << "Resetting fileModificationChecking to timeStamp" << endl;
399  }
400  }
401 }
402 
403 
404 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
405 
407 {
408  if (myComm_ != -1 && myComm_ != UPstream::worldComm)
409  {
411  }
412 }
413 
414 
415 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
416 
418 (
419  const IOobject& io,
420  const word& typeName
421 ) const
422 {
423  // Replacement for objectPath
424  if (io.time().processorCase())
425  {
427  (
428  io,
430  "dummy", // not used for processorsobject
431  io.instance()
432  );
433  }
434  else
435  {
437  (
438  io,
440  word::null,
441  io.instance()
442  );
443  }
444 }
445 
446 
448 (
449  const regIOobject& io,
453  const bool write
454 ) const
455 {
456  const Time& tm = io.time();
457  const fileName& inst = io.instance();
458 
459  if (inst.isAbsolute() || !tm.processorCase())
460  {
461  mkDir(io.path());
462  fileName filePath(io.objectPath());
463 
464  if (debug)
465  {
466  Pout<< "collatedFileOperation::writeObject :"
467  << " For object : " << io.name()
468  << " falling back to master-only output to " << io.path()
469  << endl;
470  }
471 
472  masterOFstream os
473  (
474  filePath,
475  fmt,
476  ver,
477  cmp,
478  false,
479  write
480  );
481 
482  // If any of these fail, return (leave error handling to Ostream class)
483  if (!os.good())
484  {
485  return false;
486  }
487  if (!io.writeHeader(os))
488  {
489  return false;
490  }
491  // Write the data to the Ostream
492  if (!io.writeData(os))
493  {
494  return false;
495  }
497 
498  return true;
499  }
500  else
501  {
502  // Construct the equivalent processors/ directory
503  fileName path(processorsPath(io, inst, processorsDir(io)));
504 
505  mkDir(path);
506  fileName filePath(path/io.name());
507 
508  if (io.global())
509  {
510  if (debug)
511  {
512  Pout<< "collatedFileOperation::writeObject :"
513  << " For global object : " << io.name()
514  << " falling back to master-only output to " << filePath
515  << endl;
516  }
517 
518  masterOFstream os
519  (
520  filePath,
521  fmt,
522  ver,
523  cmp,
524  false,
525  write
526  );
527 
528  // If any of these fail, return (leave error handling to Ostream
529  // class)
530  if (!os.good())
531  {
532  return false;
533  }
534  if (!io.writeHeader(os))
535  {
536  return false;
537  }
538  // Write the data to the Ostream
539  if (!io.writeData(os))
540  {
541  return false;
542  }
544 
545  return true;
546  }
547  else if (!Pstream::parRun())
548  {
549  // Special path for e.g. decomposePar. Append to
550  // processorsDDD/ file
551  if (debug)
552  {
553  Pout<< "collatedFileOperation::writeObject :"
554  << " For object : " << io.name()
555  << " appending to " << filePath << endl;
556  }
557 
558  return appendObject(io, filePath, fmt, ver, cmp);
559  }
560  else
561  {
562  // Re-check static maxThreadFileBufferSize variable to see
563  // if needs to use threading
564  bool useThread = (maxThreadFileBufferSize > 0);
565 
566  if (debug)
567  {
568  Pout<< "collatedFileOperation::writeObject :"
569  << " For object : " << io.name()
570  << " starting collating output to " << filePath
571  << " useThread:" << useThread << endl;
572  }
573 
574  if (!useThread)
575  {
576  writer_.waitAll();
577  }
578 
580  (
581  writer_,
582  filePath,
583  fmt,
584  ver,
585  cmp,
586  useThread
587  );
588 
589  // If any of these fail, return (leave error handling to Ostream
590  // class)
591  if (!os.good())
592  {
593  return false;
594  }
595  if (Pstream::master(comm_) && !io.writeHeader(os))
596  {
597  return false;
598  }
599  // Write the data to the Ostream
600  if (!io.writeData(os))
601  {
602  return false;
603  }
604  if (Pstream::master(comm_))
605  {
607  }
608 
609  return true;
610  }
611  }
612 }
613 
615 {
616  if (debug)
617  {
618  Pout<< "collatedFileOperation::flush : clearing and waiting for thread"
619  << endl;
620  }
622  // Wait for thread to finish (note: also removes thread)
623  writer_.waitAll();
624 }
625 
626 
628 (
629  const fileName& fName
630 ) const
631 {
632  if (Pstream::parRun())
633  {
634  const List<int>& procs(UPstream::procID(comm_));
635 
636  word procDir(processorsBaseDir+Foam::name(Pstream::nProcs()));
637 
638  if (procs.size() != Pstream::nProcs())
639  {
640  procDir +=
641  + "_"
642  + Foam::name(procs[0])
643  + "-"
644  + Foam::name(procs.last());
645  }
646  return procDir;
647  }
648  else
649  {
650  word procDir(processorsBaseDir+Foam::name(nProcs_));
651 
652  if (ioRanks_.size())
653  {
654  // Detect current processor number
655  label proci = detectProcessorPath(fName);
656 
657  if (proci != -1)
658  {
659  // Find lowest io rank
660  label minProc = 0;
661  label maxProc = nProcs_-1;
662  forAll(ioRanks_, i)
663  {
664  if (ioRanks_[i] >= nProcs_)
665  {
666  break;
667  }
668  else if (ioRanks_[i] <= proci)
669  {
670  minProc = ioRanks_[i];
671  }
672  else
673  {
674  maxProc = ioRanks_[i]-1;
675  break;
676  }
677  }
678  procDir +=
679  + "_"
680  + Foam::name(minProc)
681  + "-"
682  + Foam::name(maxProc);
683  }
684  }
685 
686  return procDir;
687  }
688 }
689 
690 
692 (
693  const IOobject& io
694 ) const
695 {
696  return processorsDir(io.objectPath());
697 }
698 
699 
701 {
702  nProcs_ = nProcs;
703 
704  if (debug)
705  {
706  Pout<< "collatedFileOperation::setNProcs :"
707  << " Setting number of processors to " << nProcs_ << endl;
708  }
709 }
710 
711 
712 // ************************************************************************* //
string getEnv(const word &)
Return environment variable of given name.
Definition: POSIX.C:97
fileName localObjectPath(const IOobject &, const pathType &searchType, const word &processorsDir, const word &instancePath) const
Construct filePath.
#define forAll(list, i)
Loop across all elements in list.
Definition: UList.H:434
bool processorCase() const
Return true if this is a processor case.
Definition: TimePaths.H:89
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
const word & name() const
Return name.
Definition: IOobject.H:303
A class for handling file names.
Definition: fileName.H:79
static Stream & writeBanner(Stream &os, bool noHint=false)
Write the standard OpenFOAM file/dictionary banner.
Definition: IOobjectI.H:65
float floatOptimisationSwitch(const char *name, const float defaultValue=0)
Lookup optimisation switch or add default value.
Definition: debug.C:278
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
error FatalError
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:323
virtual bool writeObject(const regIOobject &, IOstream::streamFormat format=IOstream::ASCII, IOstream::versionNumber version=IOstream::currentVersion, IOstream::compressionType compression=IOstream::UNCOMPRESSED, const bool write=true) const
Writes a regIOobject (so header, contents and divider).
#define InfoHeader
Report write to Foam::Info if the local log switch is true.
Output to file stream.
Definition: OFstream.H:82
static Stream & writeDivider(Stream &os)
Write the standard file section divider.
Definition: IOobjectI.H:113
void size(const label)
Override size to be inconsistent with allocated storage.
Definition: ListI.H:164
static int myProcNo(const label communicator=0)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:429
collatedFileOperation(const bool verbose)
Construct null.
Master-only drop-in replacement for OFstream.
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:251
static bool master(const label communicator=0)
Am I the master process.
Definition: UPstream.H:423
versionNumber version() const
Return the stream version.
Definition: IOstream.H:396
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:278
Class to control time during OpenFOAM simulations that is also the top-level objectRegistry.
Definition: Time.H:68
bool good() const
Return true if next operation might succeed.
Definition: IOstream.H:330
Macros for easy insertion into run-time selection tables.
addNamedToRunTimeSelectionTable(fileOperationInitialise, collatedFileOperationInitialise, word, collated)
fileName path() const
Return complete path.
Definition: IOobject.C:348
addToRunTimeSelectionTable(fileOperation, collatedFileOperation, word)
string hostName(const bool full=false)
Return the system&#39;s host name, as per hostname(1)
Definition: POSIX.C:125
bool isAbsolute() const
Return true if file name is absolute.
Definition: fileName.C:73
virtual fileName objectPath(const IOobject &io, const word &typeName) const
Generate disk file name for object. Opposite of filePath.
A class for handling words, derived from string.
Definition: word.H:59
Master-only drop-in replacement for OFstream.
word name() const
Return file name (part beyond last /)
Definition: fileName.C:195
streamFormat
Enumeration for the format of data in the stream.
Definition: IOstream.H:86
static const word null
An empty word.
Definition: word.H:77
streamFormat format() const
Return current stream format.
Definition: IOstream.H:374
bool isMasterRank(const label proci) const
Is proci master of communicator (in parallel) or master of.
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:60
compressionType
Enumeration for the format of data in the stream.
Definition: IOstream.H:193
pid_t pid()
Return the PID of this process.
Definition: POSIX.C:73
static const char nl
Definition: Ostream.H:260
static constexpr const char * foamFile
Keyword for the FoamFile header sub-dictionary.
Definition: IOobject.H:98
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
static fileCheckTypes fileModificationChecking
Type of file modification checking.
Definition: IOobject.H:219
label findIndex(const ListType &, typename ListType::const_reference, const label start=0)
Find first occurrence of given element and return index,.
bool mkDir(const fileName &, mode_t=0777)
Make a directory and return an error if it could not be created.
Definition: POSIX.C:290
word name(const complex &)
Return a string representation of a complex.
Definition: complex.C:47
static float maxThreadFileBufferSize
Max size of thread buffer size. This is the overall size of.
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
static bool & parRun()
Is this a parallel run?
Definition: UPstream.H:399
const fileName & instance() const
Definition: IOobject.H:390
static label nProcs(const label communicator=0)
Number of processes in parallel run.
Definition: UPstream.H:411
static Stream & writeEndDivider(Stream &os)
Write the standard end file divider.
Definition: IOobjectI.H:123
virtual void setNProcs(const label nProcs)
Set number of processor directories/results. Only used in.
#define WarningInFunction
Report a warning using Foam::Warning.
static const versionNumber currentVersion
Current version number.
Definition: IOstream.H:203
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:335
const Time & time() const
Return time.
Definition: IOobject.C:318
Input from memory buffer stream.
Definition: IStringStream.H:49
prefixOSstream Pout(cout, "Pout")
Definition: IOstreams.H:53
Version of masterUncollatedFileOperation that collates regIOobjects into a container in the processor...
string str() const
Return the string.
virtual bool global() const
Is object same for all processors.
Definition: regIOobject.C:418
Version number type.
Definition: IOstream.H:96
regIOobject is an abstract class derived from IOobject to handle automatic object registration with t...
Definition: regIOobject.H:52
bool writeHeader(Ostream &) const
Write header.
defineTypeNameAndDebug(collatedFileOperation, 0)
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
Definition: UPstream.C:248
virtual bool writeData(Ostream &) const =0
Pure virtual writaData function.
T & last()
Return the last element of the list.
Definition: UListI.H:128
IOobject defines the attributes of an object for which implicit objectRegistry management is supporte...
Definition: IOobject.H:92
static void gatherList(const List< commsStruct > &comms, List< T > &Values, const int tag, const label comm)
Gather data but keep individual values separate.
fileName objectPath() const
Return complete path + object name.
Definition: IOobject.H:419
Output to memory buffer stream.
Definition: OStringStream.H:49
static List< int > & procID(label communicator)
Process ID of given process index.
Definition: UPstream.H:440
virtual word processorsDir(const IOobject &) const
Actual name of processors dir.
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:314
bool appendObject(const regIOobject &io, const fileName &filePath, IOstream::streamFormat fmt, IOstream::versionNumber ver, IOstream::compressionType cmp) const
Append to processors/ file.
Namespace for OpenFOAM.
fileName path(UMean.rootPath()/UMean.caseName()/functionObjects::writeFile::outputPrefix/"graphs"/UMean.instance())
IOerror FatalIOError