collatedFileOperation.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration | Website: https://openfoam.org
5  \\ / A nd | Copyright (C) 2017-2024 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include "collatedFileOperation.H"
27 #include "Time.H"
29 #include "decomposedBlockData.H"
30 #include "masterOFstream.H"
31 #include "OFstream.H"
33 
34 /* * * * * * * * * * * * * * * Static Member Data * * * * * * * * * * * * * */
35 
36 namespace Foam
37 {
38 namespace fileOperations
39 {
42  (
45  word
46  );
47 
49  (
50  debug::floatOptimisationSwitch("maxThreadFileBufferSize", 1e9)
51  );
52 
53  // Mark as needing threaded mpi
55  (
58  word,
59  collated
60  );
61 }
62 }
63 
64 
65 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
66 
68 {
70 
71  string ioRanksString(getEnv("FOAM_IORANKS"));
72  if (!ioRanksString.empty())
73  {
74  IStringStream is(ioRanksString);
75  is >> ioRanks;
76  }
77 
78  return ioRanks;
79 }
80 
81 
83 (
84  const label proci
85 )
86 const
87 {
88  if (Pstream::parRun())
89  {
90  return Pstream::master(comm_);
91  }
92  else
93  {
94  // Use any IO ranks
95  if (ioRanks_.size())
96  {
97  // Find myself in IO rank
98  return findIndex(ioRanks_, proci) != -1;
99  }
100  else
101  {
102  // Assume all in single communicator
103  return proci == 0;
104  }
105  }
106 }
107 
108 
110 (
111  const regIOobject& io,
112  const fileName& filePath,
116 ) const
117 {
118  // Append to processors/ file
119 
120  label proci = detectProcessorPath(io.objectPath());
121 
122  if (debug)
123  {
124  Pout<< "collatedFileOperation::writeObject :"
125  << " For local object : " << io.name()
126  << " appending processor " << proci
127  << " data to " << filePath << endl;
128  }
129 
130  if (proci == -1)
131  {
133  << "Not a valid processor path " << filePath
134  << exit(FatalError);
135  }
136 
137  const bool isMaster = isMasterRank(proci);
138 
139  // Determine the local rank if the filePath is a per-rank one
140  label localProci = proci;
141  {
142  fileName path, procDir, local;
143  label groupStart, groupSize, nProcs;
144  splitProcessorPath
145  (
146  filePath,
147  path,
148  procDir,
149  local,
150  groupStart,
151  groupSize,
152  nProcs
153  );
154  if (groupSize > 0 && groupStart != -1)
155  {
156  localProci = proci-groupStart;
157  }
158  }
159 
160 
161  // Create string from all data to write
162  string buf;
163  {
164  OStringStream os(fmt, ver);
165  if (isMaster)
166  {
167  if (!io.writeHeader(os))
168  {
169  return false;
170  }
171  }
172 
173  // Write the data to the Ostream
174  if (!io.writeData(os))
175  {
176  return false;
177  }
178 
179  if (isMaster)
180  {
182  }
183 
184  buf = os.str();
185  }
186 
187 
188  // Note: cannot do append + compression. This is a limitation
189  // of ogzstream (or rather most compressed formats)
190 
191  OFstream os
192  (
193  filePath,
195  ver,
196  IOstream::UNCOMPRESSED, // no compression
197  !isMaster
198  );
199 
200  if (!os.good())
201  {
203  << "Cannot open for appending"
204  << exit(FatalIOError);
205  }
206 
207  if (isMaster)
208  {
209  IOobject::writeBanner(os) << IOobject::foamFile << "\n{\n";
210 
211  if (os.version() != IOstream::currentVersion)
212  {
213  os << " version " << os.version() << ";\n";
214  }
215 
216  os << " format " << os.format() << ";\n"
217  << " class " << decomposedBlockData::typeName
218  << ";\n"
219  << " location " << filePath << ";\n"
220  << " object " << filePath.name() << ";\n"
221  << "}" << nl;
222 
223  IOobject::writeDivider(os) << nl;
224  }
225 
226  // Write data
227  UList<char> slice
228  (
229  const_cast<char*>(buf.data()),
230  label(buf.size())
231  );
232  os << nl << "// Processor" << localProci << nl << slice << nl;
233 
234  return os.good();
235 }
236 
237 
238 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
239 
241 (
242  const bool verbose
243 )
244 :
246  (
247  (
248  ioRanks().size()
249  ? UPstream::allocateCommunicator
250  (
251  UPstream::worldComm,
252  subRanks(Pstream::nProcs())
253  )
254  : UPstream::worldComm
255  ),
256  false
257  ),
258  myComm_(comm_),
259  writer_(maxThreadFileBufferSize, comm_),
260  nProcs_(Pstream::nProcs()),
261  ioRanks_(ioRanks())
262 {
263  if (verbose)
264  {
265  InfoHeader
266  << "I/O : " << typeName
267  << " (maxThreadFileBufferSize " << maxThreadFileBufferSize
268  << ')' << endl;
269 
270  if (maxThreadFileBufferSize == 0)
271  {
272  InfoHeader
273  << " Threading not activated "
274  "since maxThreadFileBufferSize = 0." << nl
275  << " Writing may run slowly for large file sizes."
276  << endl;
277  }
278  else
279  {
280  InfoHeader
281  << " Threading activated "
282  "since maxThreadFileBufferSize > 0." << nl
283  << " Requires large enough buffer to collect all data"
284  " or thread support " << nl
285  << " enabled in MPI. If thread support cannot be "
286  "enabled, deactivate" << nl
287  << " threading by setting maxThreadFileBufferSize "
288  "to 0 in" << nl
289  << " $FOAM_ETC/controlDict"
290  << endl;
291  }
292 
293  if (ioRanks_.size())
294  {
295  // Print a bit of information
297  if (Pstream::master(comm_))
298  {
299  ioRanks[Pstream::myProcNo()] = hostName()+"."+name(pid());
300  }
302 
303  InfoHeader << " IO nodes:" << endl;
304  forAll(ioRanks, proci)
305  {
306  if (!ioRanks[proci].empty())
307  {
308  InfoHeader << " " << ioRanks[proci] << endl;
309  }
310  }
311  }
312 
313 
314  if
315  (
318  )
319  {
321  << "Resetting fileModificationChecking to inotify" << endl;
322  }
323 
324  if
325  (
328  )
329  {
331  << "Resetting fileModificationChecking to timeStamp" << endl;
332  }
333  }
334 }
335 
336 
338 (
339  const label comm,
340  const labelList& ioRanks,
341  const word& typeName,
342  const bool verbose
343 )
344 :
345  masterUncollatedFileOperation(comm, false),
346  myComm_(-1),
347  writer_(maxThreadFileBufferSize, comm),
348  nProcs_(Pstream::nProcs()),
349  ioRanks_(ioRanks)
350 {
351  if (verbose)
352  {
353  InfoHeader
354  << "I/O : " << typeName
355  << " (maxThreadFileBufferSize " << maxThreadFileBufferSize
356  << ')' << endl;
357 
358  if (maxThreadFileBufferSize == 0)
359  {
360  InfoHeader
361  << " Threading not activated "
362  "since maxThreadFileBufferSize = 0." << nl
363  << " Writing may run slowly for large file sizes."
364  << endl;
365  }
366  else
367  {
368  InfoHeader
369  << " Threading activated "
370  "since maxThreadFileBufferSize > 0." << nl
371  << " Requires large enough buffer to collect all data"
372  " or thread support " << nl
373  << " enabled in MPI. If thread support cannot be "
374  "enabled, deactivate" << nl
375  << " threading by setting maxThreadFileBufferSize "
376  "to 0 in" << nl
377  << " $FOAM_ETC/controlDict"
378  << endl;
379  }
380 
381  if
382  (
385  )
386  {
388  << "Resetting fileModificationChecking to inotify" << endl;
389  }
390 
391  if
392  (
395  )
396  {
398  << "Resetting fileModificationChecking to timeStamp" << endl;
399  }
400  }
401 }
402 
403 
404 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
405 
407 {
408  if (myComm_ != -1 && myComm_ != UPstream::worldComm)
409  {
411  }
412 }
413 
414 
415 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
416 
418 (
419  const IOobject& io
420 ) const
421 {
422  // Replacement for objectPath
423  if (io.time().processorCase())
424  {
426  (
427  io,
429  "dummy", // not used for processorsobject
430  io.instance()
431  );
432  }
433  else
434  {
436  (
437  io,
439  word::null,
440  io.instance()
441  );
442  }
443 }
444 
445 
447 (
448  const regIOobject& io,
452  const bool write
453 ) const
454 {
455  const Time& tm = io.time();
456  const fileName& inst = io.instance();
457 
458  if (inst.isAbsolute() || !tm.processorCase())
459  {
460  mkDir(io.path());
461  fileName filePath(io.objectPath());
462 
463  if (debug)
464  {
465  Pout<< "collatedFileOperation::writeObject :"
466  << " For object : " << io.name()
467  << " falling back to master-only output to " << io.path()
468  << endl;
469  }
470 
471  masterOFstream os
472  (
473  filePath,
474  fmt,
475  ver,
476  cmp,
477  false,
478  write
479  );
480 
481  // If any of these fail, return (leave error handling to Ostream class)
482  if (!os.good())
483  {
484  return false;
485  }
486  if (!io.writeHeader(os))
487  {
488  return false;
489  }
490  // Write the data to the Ostream
491  if (!io.writeData(os))
492  {
493  return false;
494  }
496 
497  return true;
498  }
499  else
500  {
501  // Construct the equivalent processors/ directory
502  fileName path(processorsPath(io, inst, processorsDir(io)));
503 
504  mkDir(path);
505  fileName filePath(path/io.name());
506 
507  if (io.global())
508  {
509  if (debug)
510  {
511  Pout<< "collatedFileOperation::writeObject :"
512  << " For global object : " << io.name()
513  << " falling back to master-only output to " << filePath
514  << endl;
515  }
516 
517  masterOFstream os
518  (
519  filePath,
520  fmt,
521  ver,
522  cmp,
523  false,
524  write
525  );
526 
527  // If any of these fail, return (leave error handling to Ostream
528  // class)
529  if (!os.good())
530  {
531  return false;
532  }
533  if (!io.writeHeader(os))
534  {
535  return false;
536  }
537  // Write the data to the Ostream
538  if (!io.writeData(os))
539  {
540  return false;
541  }
543 
544  return true;
545  }
546  else if (!Pstream::parRun())
547  {
548  // Special path for e.g. decomposePar. Append to
549  // processorsDDD/ file
550  if (debug)
551  {
552  Pout<< "collatedFileOperation::writeObject :"
553  << " For object : " << io.name()
554  << " appending to " << filePath << endl;
555  }
556 
557  return appendObject(io, filePath, fmt, ver, cmp);
558  }
559  else
560  {
561  // Re-check static maxThreadFileBufferSize variable to see
562  // if needs to use threading
563  bool useThread = (maxThreadFileBufferSize > 0);
564 
565  if (debug)
566  {
567  Pout<< "collatedFileOperation::writeObject :"
568  << " For object : " << io.name()
569  << " starting collating output to " << filePath
570  << " useThread:" << useThread << endl;
571  }
572 
573  if (!useThread)
574  {
575  writer_.waitAll();
576  }
577 
579  (
580  writer_,
581  filePath,
582  fmt,
583  ver,
584  cmp,
585  useThread
586  );
587 
588  // If any of these fail, return (leave error handling to Ostream
589  // class)
590  if (!os.good())
591  {
592  return false;
593  }
594  if (Pstream::master(comm_) && !io.writeHeader(os))
595  {
596  return false;
597  }
598  // Write the data to the Ostream
599  if (!io.writeData(os))
600  {
601  return false;
602  }
603  if (Pstream::master(comm_))
604  {
606  }
607 
608  return true;
609  }
610  }
611 }
612 
614 {
615  if (debug)
616  {
617  Pout<< "collatedFileOperation::flush : clearing and waiting for thread"
618  << endl;
619  }
621  // Wait for thread to finish (note: also removes thread)
622  writer_.waitAll();
623 }
624 
625 
627 (
628  const fileName& fName
629 ) const
630 {
631  if (Pstream::parRun())
632  {
633  const List<int>& procs(UPstream::procID(comm_));
634 
635  word procDir(processorsBaseDir+Foam::name(Pstream::nProcs()));
636 
637  if (procs.size() != Pstream::nProcs())
638  {
639  procDir +=
640  + "_"
641  + Foam::name(procs[0])
642  + "-"
643  + Foam::name(procs.last());
644  }
645  return procDir;
646  }
647  else
648  {
649  word procDir(processorsBaseDir+Foam::name(nProcs_));
650 
651  if (ioRanks_.size())
652  {
653  // Detect current processor number
654  label proci = detectProcessorPath(fName);
655 
656  if (proci != -1)
657  {
658  // Find lowest io rank
659  label minProc = 0;
660  label maxProc = nProcs_-1;
661  forAll(ioRanks_, i)
662  {
663  if (ioRanks_[i] >= nProcs_)
664  {
665  break;
666  }
667  else if (ioRanks_[i] <= proci)
668  {
669  minProc = ioRanks_[i];
670  }
671  else
672  {
673  maxProc = ioRanks_[i]-1;
674  break;
675  }
676  }
677  procDir +=
678  + "_"
679  + Foam::name(minProc)
680  + "-"
681  + Foam::name(maxProc);
682  }
683  }
684 
685  return procDir;
686  }
687 }
688 
689 
691 (
692  const IOobject& io
693 ) const
694 {
695  return processorsDir(io.objectPath(false));
696 }
697 
698 
700 {
701  nProcs_ = nProcs;
702 
703  if (debug)
704  {
705  Pout<< "collatedFileOperation::setNProcs :"
706  << " Setting number of processors to " << nProcs_ << endl;
707  }
708 }
709 
710 
711 // ************************************************************************* //
#define forAll(list, i)
Loop across all elements in list.
Definition: UList.H:433
Macros for easy insertion into run-time selection tables.
IOobject defines the attributes of an object for which implicit objectRegistry management is supporte...
Definition: IOobject.H:99
static Stream & writeBanner(Stream &os, bool noHint=false)
Write the standard OpenFOAM file/dictionary banner.
Definition: IOobjectI.H:45
const Time & time() const
Return time.
Definition: IOobject.C:315
static Stream & writeEndDivider(Stream &os)
Write the standard end file divider.
Definition: IOobjectI.H:103
fileName & instance() const
Return the instance directory, constant, system, <time> etc.
Definition: IOobject.C:352
static constexpr const char * foamFile
Keyword for the FoamFile header sub-dictionary.
Definition: IOobject.H:104
static fileCheckTypes fileModificationChecking
Type of file modification checking.
Definition: IOobject.H:223
static Stream & writeDivider(Stream &os)
Write the standard file section divider.
Definition: IOobjectI.H:93
const word & name() const
Return name.
Definition: IOobject.H:307
bool writeHeader(Ostream &) const
Write header.
fileName objectPath(const bool global) const
Return complete path + object name including the processor.
Definition: IOobject.H:411
Version number type.
Definition: IOstream.H:97
static const versionNumber currentVersion
Current version number.
Definition: IOstream.H:203
streamFormat format() const
Return current stream format.
Definition: IOstream.H:377
streamFormat
Enumeration for the format of data in the stream.
Definition: IOstream.H:87
versionNumber version() const
Return the stream version.
Definition: IOstream.H:399
compressionType
Enumeration for the format of data in the stream.
Definition: IOstream.H:194
bool good() const
Return true if next operation might succeed.
Definition: IOstream.H:333
Input from memory buffer stream.
Definition: IStringStream.H:52
void size(const label)
Override size to be inconsistent with allocated storage.
Definition: ListI.H:164
Output to file stream.
Definition: OFstream.H:87
Output to memory buffer stream.
Definition: OStringStream.H:52
string str() const
Return the string.
Inter-processor communications stream.
Definition: Pstream.H:56
static void gatherList(const List< commsStruct > &comms, List< T > &Values, const int tag, const label comm)
Gather data but keep individual values separate.
bool processorCase() const
Return true if this is a processor case.
Definition: TimePaths.H:88
Class to control time during OpenFOAM simulations that is also the top-level objectRegistry.
Definition: Time.H:76
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: UList.H:74
T & last()
Return the last element of the list.
Definition: UListI.H:128
Inter-processor communications stream.
Definition: UPstream.H:59
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:311
static bool master(const label communicator=0)
Am I the master process.
Definition: UPstream.H:423
static List< int > & procID(label communicator)
Process ID of given process index.
Definition: UPstream.H:440
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:278
static label nProcs(const label communicator=0)
Number of processes in parallel run.
Definition: UPstream.H:411
static bool & parRun()
Is this a parallel run?
Definition: UPstream.H:399
static int myProcNo(const label communicator=0)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:429
A class for handling file names.
Definition: fileName.H:82
bool isAbsolute() const
Return true if file name is absolute.
Definition: fileName.C:73
word name() const
Return file name (part beyond last /)
Definition: fileName.C:195
const label comm_
Communicator to use.
Definition: fileOperation.H:96
Version of masterUncollatedFileOperation that collates regIOobjects into a container in the processor...
virtual bool writeObject(const regIOobject &, IOstream::streamFormat format=IOstream::ASCII, IOstream::versionNumber version=IOstream::currentVersion, IOstream::compressionType compression=IOstream::UNCOMPRESSED, const bool write=true) const
Writes a regIOobject (so header, contents and divider).
bool isMasterRank(const label proci) const
Is proci master of communicator (in parallel) or master of.
virtual fileName objectPath(const IOobject &io) const
Generate disk file name for object. Opposite of filePath.
virtual void setNProcs(const label nProcs)
Set number of processor directories/results. Only used in.
virtual word processorsDir(const IOobject &) const
Actual name of processors dir.
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
const labelList ioRanks_
Ranks of IO handlers.
collatedFileOperation(const bool verbose)
Construct null.
static float maxThreadFileBufferSize
Max size of thread buffer size. This is the overall size of.
bool appendObject(const regIOobject &io, const fileName &filePath, IOstream::streamFormat fmt, IOstream::versionNumber ver, IOstream::compressionType cmp) const
Append to processors/ file.
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
fileName relativeObjectPath(const IOobject &, const pathType &searchType, const word &processorsDir, const word &instancePath) const
Construct filePath.
Master-only drop-in replacement for OFstream.
regIOobject is an abstract class derived from IOobject to handle automatic object registration with t...
Definition: regIOobject.H:55
fileName objectPath() const
Return complete path + object name.
Definition: regIOobject.H:158
virtual bool writeData(Ostream &) const =0
Pure virtual writaData function.
virtual bool global() const
Return true if object is global, i.e. same for all processors.
Definition: regIOobject.C:181
fileName path() const
Return complete path.
Definition: regIOobject.C:199
Master-only drop-in replacement for OFstream.
A class for handling words, derived from string.
Definition: word.H:62
static const word null
An empty word.
Definition: word.H:77
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:346
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:334
#define WarningInFunction
Report a warning using Foam::Warning.
#define InfoHeader
Report write to Foam::Info if the local log switch is true.
float floatOptimisationSwitch(const char *name, const float defaultValue=0)
Lookup optimisation switch or add default value.
Definition: debug.C:269
addToRunTimeSelectionTable(fileOperation, collatedFileOperation, word)
defineTypeNameAndDebug(collatedFileOperation, 0)
addNamedToRunTimeSelectionTable(fileOperationInitialise, collatedFileOperationInitialise, word, collated)
void write(std::ostream &os, const bool binary, List< floatScalar > &fField)
Write floats ascii or binary.
Namespace for OpenFOAM.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
bool mkDir(const fileName &, mode_t=0777)
Make a directory and return an error if it could not be created.
Definition: POSIX.C:290
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:258
string hostName(const bool full=false)
Return the system's host name, as per hostname(1)
Definition: POSIX.C:125
string getEnv(const word &)
Return environment variable of given name.
Definition: POSIX.C:97
IOerror FatalIOError
word name(const LagrangianState state)
Return a string representation of a Lagrangian state enumeration.
pid_t pid()
Return the PID of this process.
Definition: POSIX.C:73
prefixOSstream Pout(cout, "Pout")
Definition: IOstreams.H:53
label findIndex(const ListType &, typename ListType::const_reference, const label start=0)
Find first occurrence of given element and return index,.
error FatalError
static const char nl
Definition: Ostream.H:267