collatedFileOperation.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration | Website: https://openfoam.org
5  \\ / A nd | Copyright (C) 2017-2025 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include "collatedFileOperation.H"
27 #include "Time.H"
29 #include "decomposedBlockData.H"
30 #include "masterOFstream.H"
31 #include "OFstream.H"
33 
34 /* * * * * * * * * * * * * * * Static Member Data * * * * * * * * * * * * * */
35 
36 namespace Foam
37 {
38 namespace fileOperations
39 {
42  (
45  word
46  );
47 
49  (
50  debug::floatOptimisationSwitch("maxThreadFileBufferSize", 1e9)
51  );
52 
53  // Mark as needing threaded mpi
55  (
58  word,
59  collated
60  );
61 }
62 }
63 
64 
65 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
66 
68 {
70 
71  string ioRanksString(getEnv("FOAM_IORANKS"));
72  if (!ioRanksString.empty())
73  {
74  IStringStream is(ioRanksString);
75  is >> ioRanks;
76  }
77 
78  return ioRanks;
79 }
80 
81 
83 (
84  const label proci
85 )
86 const
87 {
88  if (Pstream::parRun())
89  {
90  return Pstream::master(comm_);
91  }
92  else
93  {
94  // Use any IO ranks
95  if (ioRanks_.size())
96  {
97  // Find myself in IO rank
98  return findIndex(ioRanks_, proci) != -1;
99  }
100  else
101  {
102  // Assume all in single communicator
103  return proci == 0;
104  }
105  }
106 }
107 
108 
110 (
111  const regIOobject& io,
112  const fileName& filePath,
116 ) const
117 {
118  // Append to processors/ file
119 
120  label proci = detectProcessorPath(io.objectPath());
121 
122  if (debug)
123  {
124  Pout<< "collatedFileOperation::writeObject :"
125  << " For local object : " << io.name()
126  << " appending processor " << proci
127  << " data to " << filePath << endl;
128  }
129 
130  if (proci == -1)
131  {
133  << "Not a valid processor path " << filePath
134  << exit(FatalError);
135  }
136 
137  const bool isMaster = isMasterRank(proci);
138 
139  // Determine the local rank if the filePath is a per-rank one
140  label localProci = proci;
141  {
142  fileName path, procDir, local;
143  label groupStart, groupSize, nProcs;
144  splitProcessorPath
145  (
146  filePath,
147  path,
148  procDir,
149  local,
150  groupStart,
151  groupSize,
152  nProcs
153  );
154  if (groupSize > 0 && groupStart != -1)
155  {
156  localProci = proci-groupStart;
157  }
158  }
159 
160 
161  // Create string from all data to write
162  string buf;
163  {
164  OStringStream os(fmt, ver);
165  if (isMaster)
166  {
167  if (!io.writeHeader(os))
168  {
169  return false;
170  }
171  }
172 
173  // Write the data to the Ostream
174  if (!io.writeData(os))
175  {
176  return false;
177  }
178 
179  if (isMaster)
180  {
182  }
183 
184  buf = os.str();
185  }
186 
187 
188  // Note: cannot do append + compression. This is a limitation
189  // of ogzstream (or rather most compressed formats)
190 
191  OFstream os
192  (
193  filePath,
195  ver,
196  IOstream::UNCOMPRESSED, // no compression
197  !isMaster
198  );
199 
200  if (!os.good())
201  {
203  << "Cannot open for appending"
204  << exit(FatalIOError);
205  }
206 
207  if (isMaster)
208  {
210  (
211  os,
212  os.version(),
213  os.format(),
215  string(),
216  filePath,
217  filePath.name()
218  );
219  }
220 
221  // Write data
222  UList<char> slice
223  (
224  const_cast<char*>(buf.data()),
225  label(buf.size())
226  );
227  os << nl << "// Processor" << localProci << nl << slice << nl;
228 
229  return os.good();
230 }
231 
232 
233 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
234 
236 (
237  const bool verbose
238 )
239 :
241  (
242  (
243  ioRanks().size()
244  ? UPstream::allocateCommunicator
245  (
246  UPstream::worldComm,
247  subRanks(Pstream::nProcs())
248  )
249  : UPstream::worldComm
250  ),
251  false
252  ),
253  myComm_(comm_),
254  writer_(maxThreadFileBufferSize, comm_),
255  nProcs_(Pstream::nProcs()),
256  ioRanks_(ioRanks())
257 {
258  if (verbose)
259  {
260  InfoHeader
261  << "I/O : " << typeName
262  << " (maxThreadFileBufferSize " << maxThreadFileBufferSize
263  << ')' << endl;
264 
265  if (maxThreadFileBufferSize == 0)
266  {
267  InfoHeader
268  << " Threading not activated "
269  "since maxThreadFileBufferSize = 0." << nl
270  << " Writing may run slowly for large file sizes."
271  << endl;
272  }
273  else
274  {
275  InfoHeader
276  << " Threading activated "
277  "since maxThreadFileBufferSize > 0." << nl
278  << " Requires large enough buffer to collect all data"
279  " or thread support " << nl
280  << " enabled in MPI. If thread support cannot be "
281  "enabled, deactivate" << nl
282  << " threading by setting maxThreadFileBufferSize "
283  "to 0 in" << nl
284  << " $FOAM_ETC/controlDict"
285  << endl;
286  }
287 
288  if (ioRanks_.size())
289  {
290  // Print a bit of information
292  if (Pstream::master(comm_))
293  {
294  ioRanks[Pstream::myProcNo()] = hostName()+"."+name(pid());
295  }
297 
298  InfoHeader << " IO nodes:" << endl;
299  forAll(ioRanks, proci)
300  {
301  if (!ioRanks[proci].empty())
302  {
303  InfoHeader << " " << ioRanks[proci] << endl;
304  }
305  }
306  }
307 
308 
309  if
310  (
313  )
314  {
316  << "Resetting fileModificationChecking to inotify" << endl;
317  }
318 
319  if
320  (
323  )
324  {
326  << "Resetting fileModificationChecking to timeStamp" << endl;
327  }
328  }
329 }
330 
331 
333 (
334  const label comm,
335  const labelList& ioRanks,
336  const word& typeName,
337  const bool verbose
338 )
339 :
340  masterUncollatedFileOperation(comm, false),
341  myComm_(-1),
342  writer_(maxThreadFileBufferSize, comm),
343  nProcs_(Pstream::nProcs()),
344  ioRanks_(ioRanks)
345 {
346  if (verbose)
347  {
348  InfoHeader
349  << "I/O : " << typeName
350  << " (maxThreadFileBufferSize " << maxThreadFileBufferSize
351  << ')' << endl;
352 
353  if (maxThreadFileBufferSize == 0)
354  {
355  InfoHeader
356  << " Threading not activated "
357  "since maxThreadFileBufferSize = 0." << nl
358  << " Writing may run slowly for large file sizes."
359  << endl;
360  }
361  else
362  {
363  InfoHeader
364  << " Threading activated "
365  "since maxThreadFileBufferSize > 0." << nl
366  << " Requires large enough buffer to collect all data"
367  " or thread support " << nl
368  << " enabled in MPI. If thread support cannot be "
369  "enabled, deactivate" << nl
370  << " threading by setting maxThreadFileBufferSize "
371  "to 0 in" << nl
372  << " $FOAM_ETC/controlDict"
373  << endl;
374  }
375 
376  if
377  (
380  )
381  {
383  << "Resetting fileModificationChecking to inotify" << endl;
384  }
385 
386  if
387  (
390  )
391  {
393  << "Resetting fileModificationChecking to timeStamp" << endl;
394  }
395  }
396 }
397 
398 
399 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
400 
402 {
403  if (myComm_ != -1 && myComm_ != UPstream::worldComm)
404  {
406  }
407 }
408 
409 
410 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
411 
413 (
414  const IOobject& io
415 ) const
416 {
417  // Replacement for objectPath
418  if (io.time().processorCase())
419  {
421  (
422  io,
424  "dummy", // not used for processorsobject
425  io.instance()
426  );
427  }
428  else
429  {
431  (
432  io,
434  word::null,
435  io.instance()
436  );
437  }
438 }
439 
440 
442 (
443  const regIOobject& io,
447  const bool write
448 ) const
449 {
450  const Time& tm = io.time();
451  const fileName& inst = io.instance();
452 
453  if (inst.isAbsolute() || !tm.processorCase())
454  {
455  mkDir(io.path());
456  fileName filePath(io.objectPath());
457 
458  if (debug)
459  {
460  Pout<< "collatedFileOperation::writeObject :"
461  << " For object : " << io.name()
462  << " falling back to master-only output to " << io.path()
463  << endl;
464  }
465 
466  masterOFstream os
467  (
468  filePath,
469  fmt,
470  ver,
471  cmp,
472  false,
473  write
474  );
475 
476  // If any of these fail, return (leave error handling to Ostream class)
477  if (!os.good())
478  {
479  return false;
480  }
481  if (!io.writeHeader(os))
482  {
483  return false;
484  }
485  // Write the data to the Ostream
486  if (!io.writeData(os))
487  {
488  return false;
489  }
491 
492  return true;
493  }
494  else
495  {
496  // Construct the equivalent processors/ directory
497  fileName path(processorsPath(io, inst, processorsDir(io)));
498 
499  mkDir(path);
500  fileName filePath(path/io.name());
501 
502  if (io.global())
503  {
504  if (debug)
505  {
506  Pout<< "collatedFileOperation::writeObject :"
507  << " For global object : " << io.name()
508  << " falling back to master-only output to " << filePath
509  << endl;
510  }
511 
512  masterOFstream os
513  (
514  filePath,
515  fmt,
516  ver,
517  cmp,
518  false,
519  write
520  );
521 
522  // If any of these fail, return (leave error handling to Ostream
523  // class)
524  if (!os.good())
525  {
526  return false;
527  }
528  if (!io.writeHeader(os))
529  {
530  return false;
531  }
532  // Write the data to the Ostream
533  if (!io.writeData(os))
534  {
535  return false;
536  }
538 
539  return true;
540  }
541  else if (!Pstream::parRun())
542  {
543  // Special path for e.g. decomposePar. Append to
544  // processorsDDD/ file
545  if (debug)
546  {
547  Pout<< "collatedFileOperation::writeObject :"
548  << " For object : " << io.name()
549  << " appending to " << filePath << endl;
550  }
551 
552  return appendObject(io, filePath, fmt, ver, cmp);
553  }
554  else
555  {
556  // Re-check static maxThreadFileBufferSize variable to see
557  // if needs to use threading
558  bool useThread = (maxThreadFileBufferSize > 0);
559 
560  if (debug)
561  {
562  Pout<< "collatedFileOperation::writeObject :"
563  << " For object : " << io.name()
564  << " starting collating output to " << filePath
565  << " useThread:" << useThread << endl;
566  }
567 
568  if (!useThread)
569  {
570  writer_.waitAll();
571  }
572 
574  (
575  writer_,
576  filePath,
577  fmt,
578  ver,
579  cmp,
580  useThread
581  );
582 
583  // If any of these fail, return (leave error handling to Ostream
584  // class)
585  if (!os.good())
586  {
587  return false;
588  }
589  if (Pstream::master(comm_) && !io.writeHeader(os))
590  {
591  return false;
592  }
593  // Write the data to the Ostream
594  if (!io.writeData(os))
595  {
596  return false;
597  }
598  if (Pstream::master(comm_))
599  {
601  }
602 
603  return true;
604  }
605  }
606 }
607 
609 {
610  if (debug)
611  {
612  Pout<< "collatedFileOperation::flush : clearing and waiting for thread"
613  << endl;
614  }
616  // Wait for thread to finish (note: also removes thread)
617  writer_.waitAll();
618 }
619 
620 
622 (
623  const fileName& fName
624 ) const
625 {
626  if (Pstream::parRun())
627  {
628  const List<int>& procs(UPstream::procID(comm_));
629 
630  word procDir(processorsBaseDir+Foam::name(Pstream::nProcs()));
631 
632  if (procs.size() != Pstream::nProcs())
633  {
634  procDir +=
635  + "_"
636  + Foam::name(procs[0])
637  + "-"
638  + Foam::name(procs.last());
639  }
640  return procDir;
641  }
642  else
643  {
644  word procDir(processorsBaseDir+Foam::name(nProcs_));
645 
646  if (ioRanks_.size())
647  {
648  // Detect current processor number
649  label proci = detectProcessorPath(fName);
650 
651  if (proci != -1)
652  {
653  // Find lowest io rank
654  label minProc = 0;
655  label maxProc = nProcs_-1;
656  forAll(ioRanks_, i)
657  {
658  if (ioRanks_[i] >= nProcs_)
659  {
660  break;
661  }
662  else if (ioRanks_[i] <= proci)
663  {
664  minProc = ioRanks_[i];
665  }
666  else
667  {
668  maxProc = ioRanks_[i]-1;
669  break;
670  }
671  }
672  procDir +=
673  + "_"
674  + Foam::name(minProc)
675  + "-"
676  + Foam::name(maxProc);
677  }
678  }
679 
680  return procDir;
681  }
682 }
683 
684 
686 (
687  const IOobject& io
688 ) const
689 {
690  return processorsDir(io.objectPath(false));
691 }
692 
693 
695 {
696  nProcs_ = nProcs;
697 
698  if (debug)
699  {
700  Pout<< "collatedFileOperation::setNProcs :"
701  << " Setting number of processors to " << nProcs_ << endl;
702  }
703 }
704 
705 
706 // ************************************************************************* //
#define forAll(list, i)
Loop across all elements in list.
Definition: UList.H:449
Macros for easy insertion into run-time selection tables.
IOobject defines the attributes of an object for which implicit objectRegistry management is supporte...
Definition: IOobject.H:99
const Time & time() const
Return time.
Definition: IOobject.C:315
static Stream & writeEndDivider(Stream &os)
Write the standard end file divider.
Definition: IOobjectI.H:103
fileName & instance() const
Return the instance directory, constant, system, <time> etc.
Definition: IOobject.C:352
static fileCheckTypes fileModificationChecking
Type of file modification checking.
Definition: IOobject.H:223
const word & name() const
Return name.
Definition: IOobject.H:307
static bool writeHeader(Ostream &os, const IOstream::versionNumber version, const IOstream::streamFormat format, const word &type, const string &note, const fileName &location, const word &name)
Write header.
fileName objectPath(const bool global) const
Return complete path + object name including the processor.
Definition: IOobject.H:411
Version number type.
Definition: IOstream.H:97
streamFormat format() const
Return current stream format.
Definition: IOstream.H:377
streamFormat
Enumeration for the format of data in the stream.
Definition: IOstream.H:87
versionNumber version() const
Return the stream version.
Definition: IOstream.H:399
compressionType
Enumeration for the format of data in the stream.
Definition: IOstream.H:194
bool good() const
Return true if next operation might succeed.
Definition: IOstream.H:333
Input from memory buffer stream.
Definition: IStringStream.H:52
void size(const label)
Override size to be inconsistent with allocated storage.
Definition: ListI.H:164
Output to file stream.
Definition: OFstream.H:87
Output to memory buffer stream.
Definition: OStringStream.H:52
string str() const
Return the string.
Inter-processor communications stream.
Definition: Pstream.H:56
static void gatherList(const List< commsStruct > &comms, List< T > &Values, const int tag, const label comm)
Gather data but keep individual values separate.
bool processorCase() const
Return true if this is a processor case.
Definition: TimePaths.H:88
Class to control time during OpenFOAM simulations that is also the top-level objectRegistry.
Definition: Time.H:76
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: UList.H:74
T & last()
Return the last element of the list.
Definition: UListI.H:128
Inter-processor communications stream.
Definition: UPstream.H:59
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:311
static bool master(const label communicator=0)
Am I the master process.
Definition: UPstream.H:423
static List< int > & procID(label communicator)
Process ID of given process index.
Definition: UPstream.H:440
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:278
static label nProcs(const label communicator=0)
Number of processes in parallel run.
Definition: UPstream.H:411
static bool & parRun()
Is this a parallel run?
Definition: UPstream.H:399
static int myProcNo(const label communicator=0)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:429
A class for handling file names.
Definition: fileName.H:82
bool isAbsolute() const
Return true if file name is absolute.
Definition: fileName.C:73
word name() const
Return file name (part beyond last /)
Definition: fileName.C:195
const label comm_
Communicator to use.
Definition: fileOperation.H:96
Version of masterUncollatedFileOperation that collates regIOobjects into a container in the processor...
virtual bool writeObject(const regIOobject &, IOstream::streamFormat format=IOstream::ASCII, IOstream::versionNumber version=IOstream::currentVersion, IOstream::compressionType compression=IOstream::UNCOMPRESSED, const bool write=true) const
Writes a regIOobject (so header, contents and divider).
bool isMasterRank(const label proci) const
Is proci master of communicator (in parallel) or master of.
virtual fileName objectPath(const IOobject &io) const
Generate disk file name for object. Opposite of filePath.
virtual void setNProcs(const label nProcs)
Set number of processor directories/results. Only used in.
virtual word processorsDir(const IOobject &) const
Actual name of processors dir.
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
const labelList ioRanks_
Ranks of IO handlers.
collatedFileOperation(const bool verbose)
Construct null.
static float maxThreadFileBufferSize
Max size of thread buffer size. This is the overall size of.
bool appendObject(const regIOobject &io, const fileName &filePath, IOstream::streamFormat fmt, IOstream::versionNumber ver, IOstream::compressionType cmp) const
Append to processors/ file.
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
fileName relativeObjectPath(const IOobject &, const pathType &searchType, const word &processorsDir, const word &instancePath) const
Construct filePath.
Master-only drop-in replacement for OFstream.
regIOobject is an abstract class derived from IOobject to handle automatic object registration with t...
Definition: regIOobject.H:55
fileName objectPath() const
Return complete path + object name.
Definition: regIOobject.H:155
virtual bool writeData(Ostream &) const =0
Pure virtual writaData function.
virtual bool global() const
Return true if object is global, i.e. same for all processors.
Definition: regIOobject.C:157
fileName path() const
Return complete path.
Definition: regIOobject.C:175
Master-only drop-in replacement for OFstream.
Template function which returns the un-mangled name of a given type. Useful for types which do not ha...
A class for handling words, derived from string.
Definition: word.H:63
static const word null
An empty word.
Definition: word.H:78
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:346
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:334
#define WarningInFunction
Report a warning using Foam::Warning.
#define InfoHeader
Report write to Foam::Info if the local log switch is true.
float floatOptimisationSwitch(const char *name, const float defaultValue=0)
Lookup optimisation switch or add default value.
Definition: debug.C:301
addToRunTimeSelectionTable(fileOperation, collatedFileOperation, word)
defineTypeNameAndDebug(collatedFileOperation, 0)
addNamedToRunTimeSelectionTable(fileOperationInitialise, collatedFileOperationInitialise, word, collated)
void write(std::ostream &os, const bool binary, List< floatScalar > &fField)
Write floats ascii or binary.
Namespace for OpenFOAM.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
bool mkDir(const fileName &, mode_t=0777)
Make a directory and return an error if it could not be created.
Definition: POSIX.C:290
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:288
String typeName(const std::type_info &info)
Return the un-mangled name given the standard type info.
string hostName(const bool full=false)
Return the system's host name, as per hostname(1)
Definition: POSIX.C:125
string getEnv(const word &)
Return environment variable of given name.
Definition: POSIX.C:97
IOerror FatalIOError
word name(const LagrangianState state)
Return a string representation of a Lagrangian state enumeration.
pid_t pid()
Return the PID of this process.
Definition: POSIX.C:73
prefixOSstream Pout(cout, "Pout")
Definition: IOstreams.H:53
label findIndex(const ListType &, typename ListType::const_reference, const label start=0)
Find first occurrence of given element and return index,.
error FatalError
static const char nl
Definition: Ostream.H:297