collatedFileOperation.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration | Website: https://openfoam.org
5  \\ / A nd | Copyright (C) 2017-2019 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include "collatedFileOperation.H"
28 #include "Pstream.H"
29 #include "Time.H"
31 #include "decomposedBlockData.H"
32 #include "registerSwitch.H"
33 #include "masterOFstream.H"
34 #include "OFstream.H"
35 
36 /* * * * * * * * * * * * * * * Static Member Data * * * * * * * * * * * * * */
37 
38 namespace Foam
39 {
40 namespace fileOperations
41 {
44  (
47  word
48  );
49 
51  (
52  debug::floatOptimisationSwitch("maxThreadFileBufferSize", 1e9)
53  );
55  (
56  "maxThreadFileBufferSize",
57  float,
58  collatedFileOperation::maxThreadFileBufferSize
59  );
60 
61  // Mark as needing threaded mpi
63  (
66  word,
67  collated
68  );
69 }
70 }
71 
72 
73 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
74 
76 {
77  labelList ioRanks;
78 
79  string ioRanksString(getEnv("FOAM_IORANKS"));
80  if (!ioRanksString.empty())
81  {
82  IStringStream is(ioRanksString);
83  is >> ioRanks;
84  }
85 
86  return ioRanks;
87 }
88 
89 
91 (
92  const label proci
93 )
94 const
95 {
96  if (Pstream::parRun())
97  {
98  return Pstream::master(comm_);
99  }
100  else
101  {
102  // Use any IO ranks
103  if (ioRanks_.size())
104  {
105  // Find myself in IO rank
106  return findIndex(ioRanks_, proci) != -1;
107  }
108  else
109  {
110  // Assume all in single communicator
111  return proci == 0;
112  }
113  }
114 }
115 
116 
118 (
119  const regIOobject& io,
120  const fileName& pathName,
124 ) const
125 {
126  // Append to processors/ file
127 
128  label proci = detectProcessorPath(io.objectPath());
129 
130  if (debug)
131  {
132  Pout<< "collatedFileOperation::writeObject :"
133  << " For local object : " << io.name()
134  << " appending processor " << proci
135  << " data to " << pathName << endl;
136  }
137 
138  if (proci == -1)
139  {
141  << "Not a valid processor path " << pathName
142  << exit(FatalError);
143  }
144 
145  const bool isMaster = isMasterRank(proci);
146 
147  // Determine the local rank if the pathName is a per-rank one
148  label localProci = proci;
149  {
150  fileName path, procDir, local;
151  label groupStart, groupSize, nProcs;
152  splitProcessorPath
153  (
154  pathName,
155  path,
156  procDir,
157  local,
158  groupStart,
159  groupSize,
160  nProcs
161  );
162  if (groupSize > 0 && groupStart != -1)
163  {
164  localProci = proci-groupStart;
165  }
166  }
167 
168 
169  // Create string from all data to write
170  string buf;
171  {
172  OStringStream os(fmt, ver);
173  if (isMaster)
174  {
175  if (!io.writeHeader(os))
176  {
177  return false;
178  }
179  }
180 
181  // Write the data to the Ostream
182  if (!io.writeData(os))
183  {
184  return false;
185  }
186 
187  if (isMaster)
188  {
190  }
191 
192  buf = os.str();
193  }
194 
195 
196  // Note: cannot do append + compression. This is a limitation
197  // of ogzstream (or rather most compressed formats)
198 
199  OFstream os
200  (
201  pathName,
203  ver,
204  IOstream::UNCOMPRESSED, // no compression
205  !isMaster
206  );
207 
208  if (!os.good())
209  {
211  << "Cannot open for appending"
212  << exit(FatalIOError);
213  }
214 
215  if (isMaster)
216  {
218  << IOobject::foamFile << "\n{\n"
219  << " version " << os.version() << ";\n"
220  << " format " << os.format() << ";\n"
221  << " class " << decomposedBlockData::typeName
222  << ";\n"
223  << " location " << pathName << ";\n"
224  << " object " << pathName.name() << ";\n"
225  << "}" << nl;
226  IOobject::writeDivider(os) << nl;
227  }
228 
229  // Write data
230  UList<char> slice
231  (
232  const_cast<char*>(buf.data()),
233  label(buf.size())
234  );
235  os << nl << "// Processor" << localProci << nl << slice << nl;
236 
237  return os.good();
238 }
239 
240 
241 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
242 
244 (
245  const bool verbose
246 )
247 :
249  (
250  (
251  ioRanks().size()
253  (
255  subRanks(Pstream::nProcs())
256  )
258  ),
259  false
260  ),
261  myComm_(comm_),
262  writer_(maxThreadFileBufferSize, comm_),
263  nProcs_(Pstream::nProcs()),
264  ioRanks_(ioRanks())
265 {
266  if (verbose)
267  {
268  InfoHeader
269  << "I/O : " << typeName
270  << " (maxThreadFileBufferSize " << maxThreadFileBufferSize
271  << ')' << endl;
272 
273  if (maxThreadFileBufferSize == 0)
274  {
275  InfoHeader
276  << " Threading not activated "
277  "since maxThreadFileBufferSize = 0." << nl
278  << " Writing may run slowly for large file sizes."
279  << endl;
280  }
281  else
282  {
283  InfoHeader
284  << " Threading activated "
285  "since maxThreadFileBufferSize > 0." << nl
286  << " Requires large enough buffer to collect all data"
287  " or thread support " << nl
288  << " enabled in MPI. If thread support cannot be "
289  "enabled, deactivate" << nl
290  << " threading by setting maxThreadFileBufferSize "
291  "to 0 in" << nl
292  << " $FOAM_ETC/controlDict"
293  << endl;
294  }
295 
296  if (ioRanks_.size())
297  {
298  // Print a bit of information
299  stringList ioRanks(Pstream::nProcs());
300  if (Pstream::master(comm_))
301  {
302  ioRanks[Pstream::myProcNo()] = hostName()+"."+name(pid());
303  }
304  Pstream::gatherList(ioRanks);
305 
306  InfoHeader << " IO nodes:" << endl;
307  forAll(ioRanks, proci)
308  {
309  if (!ioRanks[proci].empty())
310  {
311  InfoHeader << " " << ioRanks[proci] << endl;
312  }
313  }
314  }
315 
316 
317  if
318  (
321  )
322  {
324  << "Resetting fileModificationChecking to inotify" << endl;
325  }
326 
327  if
328  (
331  )
332  {
334  << "Resetting fileModificationChecking to timeStamp" << endl;
335  }
336  }
337 }
338 
339 
341 (
342  const label comm,
343  const labelList& ioRanks,
344  const word& typeName,
345  const bool verbose
346 )
347 :
348  masterUncollatedFileOperation(comm, false),
349  myComm_(-1),
350  writer_(maxThreadFileBufferSize, comm),
351  nProcs_(Pstream::nProcs()),
352  ioRanks_(ioRanks)
353 {
354  if (verbose)
355  {
356  InfoHeader
357  << "I/O : " << typeName
358  << " (maxThreadFileBufferSize " << maxThreadFileBufferSize
359  << ')' << endl;
360 
361  if (maxThreadFileBufferSize == 0)
362  {
363  InfoHeader
364  << " Threading not activated "
365  "since maxThreadFileBufferSize = 0." << nl
366  << " Writing may run slowly for large file sizes."
367  << endl;
368  }
369  else
370  {
371  InfoHeader
372  << " Threading activated "
373  "since maxThreadFileBufferSize > 0." << nl
374  << " Requires large enough buffer to collect all data"
375  " or thread support " << nl
376  << " enabled in MPI. If thread support cannot be "
377  "enabled, deactivate" << nl
378  << " threading by setting maxThreadFileBufferSize "
379  "to 0 in" << nl
380  << " $FOAM_ETC/controlDict"
381  << endl;
382  }
383 
384  if
385  (
388  )
389  {
391  << "Resetting fileModificationChecking to inotify" << endl;
392  }
393 
394  if
395  (
398  )
399  {
401  << "Resetting fileModificationChecking to timeStamp" << endl;
402  }
403  }
404 }
405 
406 
407 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
408 
410 {
411  if (myComm_ != -1 && myComm_ != UPstream::worldComm)
412  {
414  }
415 }
416 
417 
418 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
419 
421 (
422  const IOobject& io,
423  const word& typeName
424 ) const
425 {
426  // Replacement for objectPath
427  if (io.time().processorCase())
428  {
430  (
431  io,
433  "dummy", // not used for processorsobject
434  io.instance()
435  );
436  }
437  else
438  {
440  (
441  io,
443  word::null,
444  io.instance()
445  );
446  }
447 }
448 
449 
451 (
452  const regIOobject& io,
456  const bool write
457 ) const
458 {
459  const Time& tm = io.time();
460  const fileName& inst = io.instance();
461 
462  if (inst.isAbsolute() || !tm.processorCase())
463  {
464  mkDir(io.path());
465  fileName pathName(io.objectPath());
466 
467  if (debug)
468  {
469  Pout<< "collatedFileOperation::writeObject :"
470  << " For object : " << io.name()
471  << " falling back to master-only output to " << io.path()
472  << endl;
473  }
474 
475  masterOFstream os
476  (
477  pathName,
478  fmt,
479  ver,
480  cmp,
481  false,
482  write
483  );
484 
485  // If any of these fail, return (leave error handling to Ostream class)
486  if (!os.good())
487  {
488  return false;
489  }
490  if (!io.writeHeader(os))
491  {
492  return false;
493  }
494  // Write the data to the Ostream
495  if (!io.writeData(os))
496  {
497  return false;
498  }
500 
501  return true;
502  }
503  else
504  {
505  // Construct the equivalent processors/ directory
506  fileName path(processorsPath(io, inst, processorsDir(io)));
507 
508  mkDir(path);
509  fileName pathName(path/io.name());
510 
511  if (io.global())
512  {
513  if (debug)
514  {
515  Pout<< "collatedFileOperation::writeObject :"
516  << " For global object : " << io.name()
517  << " falling back to master-only output to " << pathName
518  << endl;
519  }
520 
521  masterOFstream os
522  (
523  pathName,
524  fmt,
525  ver,
526  cmp,
527  false,
528  write
529  );
530 
531  // If any of these fail, return (leave error handling to Ostream
532  // class)
533  if (!os.good())
534  {
535  return false;
536  }
537  if (!io.writeHeader(os))
538  {
539  return false;
540  }
541  // Write the data to the Ostream
542  if (!io.writeData(os))
543  {
544  return false;
545  }
547 
548  return true;
549  }
550  else if (!Pstream::parRun())
551  {
552  // Special path for e.g. decomposePar. Append to
553  // processorsDDD/ file
554  if (debug)
555  {
556  Pout<< "collatedFileOperation::writeObject :"
557  << " For object : " << io.name()
558  << " appending to " << pathName << endl;
559  }
560 
561  return appendObject(io, pathName, fmt, ver, cmp);
562  }
563  else
564  {
565  // Re-check static maxThreadFileBufferSize variable to see
566  // if needs to use threading
567  bool useThread = (maxThreadFileBufferSize > 0);
568 
569  if (debug)
570  {
571  Pout<< "collatedFileOperation::writeObject :"
572  << " For object : " << io.name()
573  << " starting collating output to " << pathName
574  << " useThread:" << useThread << endl;
575  }
576 
577  if (!useThread)
578  {
579  writer_.waitAll();
580  }
581 
583  (
584  writer_,
585  pathName,
586  fmt,
587  ver,
588  cmp,
589  useThread
590  );
591 
592  // If any of these fail, return (leave error handling to Ostream
593  // class)
594  if (!os.good())
595  {
596  return false;
597  }
598  if (Pstream::master(comm_) && !io.writeHeader(os))
599  {
600  return false;
601  }
602  // Write the data to the Ostream
603  if (!io.writeData(os))
604  {
605  return false;
606  }
607  if (Pstream::master(comm_))
608  {
610  }
611 
612  return true;
613  }
614  }
615 }
616 
618 {
619  if (debug)
620  {
621  Pout<< "collatedFileOperation::flush : clearing and waiting for thread"
622  << endl;
623  }
625  // Wait for thread to finish (note: also removes thread)
626  writer_.waitAll();
627 }
628 
629 
631 (
632  const fileName& fName
633 ) const
634 {
635  if (Pstream::parRun())
636  {
637  const List<int>& procs(UPstream::procID(comm_));
638 
639  word procDir(processorsBaseDir+Foam::name(Pstream::nProcs()));
640 
641  if (procs.size() != Pstream::nProcs())
642  {
643  procDir +=
644  + "_"
645  + Foam::name(procs[0])
646  + "-"
647  + Foam::name(procs.last());
648  }
649  return procDir;
650  }
651  else
652  {
653  word procDir(processorsBaseDir+Foam::name(nProcs_));
654 
655  if (ioRanks_.size())
656  {
657  // Detect current processor number
658  label proci = detectProcessorPath(fName);
659 
660  if (proci != -1)
661  {
662  // Find lowest io rank
663  label minProc = 0;
664  label maxProc = nProcs_-1;
665  forAll(ioRanks_, i)
666  {
667  if (ioRanks_[i] >= nProcs_)
668  {
669  break;
670  }
671  else if (ioRanks_[i] <= proci)
672  {
673  minProc = ioRanks_[i];
674  }
675  else
676  {
677  maxProc = ioRanks_[i]-1;
678  break;
679  }
680  }
681  procDir +=
682  + "_"
683  + Foam::name(minProc)
684  + "-"
685  + Foam::name(maxProc);
686  }
687  }
688 
689  return procDir;
690  }
691 }
692 
693 
695 (
696  const IOobject& io
697 ) const
698 {
699  return processorsDir(io.objectPath());
700 }
701 
702 
704 {
705  nProcs_ = nProcs;
706 
707  if (debug)
708  {
709  Pout<< "collatedFileOperation::setNProcs :"
710  << " Setting number of processors to " << nProcs_ << endl;
711  }
712 }
713 
714 
715 // ************************************************************************* //
string getEnv(const word &)
Return environment variable of given name.
Definition: POSIX.C:97
fileName localObjectPath(const IOobject &, const pathType &searchType, const word &processorsDir, const word &instancePath) const
Construct filePath.
#define forAll(list, i)
Loop across all elements in list.
Definition: UList.H:434
bool processorCase() const
Return true if this is a processor case.
Definition: TimePaths.H:90
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
const word & name() const
Return name.
Definition: IOobject.H:303
A class for handling file names.
Definition: fileName.H:79
static Stream & writeBanner(Stream &os, bool noHint=false)
Write the standard OpenFOAM file/dictionary banner.
Definition: IOobjectI.H:65
float floatOptimisationSwitch(const char *name, const float defaultValue=0)
Lookup optimisation switch or add default value.
Definition: debug.C:206
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
error FatalError
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:319
virtual bool writeObject(const regIOobject &, IOstream::streamFormat format=IOstream::ASCII, IOstream::versionNumber version=IOstream::currentVersion, IOstream::compressionType compression=IOstream::UNCOMPRESSED, const bool write=true) const
Writes a regIOobject (so header, contents and divider).
#define InfoHeader
Report write to Foam::Info if the local log switch is true.
Output to file stream.
Definition: OFstream.H:82
static Stream & writeDivider(Stream &os)
Write the standard file section divider.
Definition: IOobjectI.H:113
void size(const label)
Override size to be inconsistent with allocated storage.
Definition: ListI.H:164
static int myProcNo(const label communicator=0)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:429
collatedFileOperation(const bool verbose)
Construct null.
Master-only drop-in replacement for OFstream.
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:251
static bool master(const label communicator=0)
Am I the master process.
Definition: UPstream.H:423
versionNumber version() const
Return the stream version.
Definition: IOstream.H:399
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:278
Class to control time during OpenFOAM simulations that is also the top-level objectRegistry.
Definition: Time.H:68
bool good() const
Return true if next operation might succeed.
Definition: IOstream.H:333
Macros for easy insertion into run-time selection tables.
addNamedToRunTimeSelectionTable(fileOperationInitialise, collatedFileOperationInitialise, word, collated)
fileName path() const
Return complete path.
Definition: IOobject.C:358
addToRunTimeSelectionTable(fileOperation, collatedFileOperation, word)
string hostName(const bool full=false)
Return the system&#39;s host name, as per hostname(1)
Definition: POSIX.C:125
bool isAbsolute() const
Return true if file name is absolute.
Definition: fileName.C:61
virtual fileName objectPath(const IOobject &io, const word &typeName) const
Generate disk file name for object. Opposite of filePath.
A class for handling words, derived from string.
Definition: word.H:59
Master-only drop-in replacement for OFstream.
word name() const
Return file name (part beyond last /)
Definition: fileName.C:183
streamFormat
Enumeration for the format of data in the stream.
Definition: IOstream.H:86
static const word null
An empty word.
Definition: word.H:77
streamFormat format() const
Return current stream format.
Definition: IOstream.H:377
bool isMasterRank(const label proci) const
Is proci master of communicator (in parallel) or master of.
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:60
compressionType
Enumeration for the format of data in the stream.
Definition: IOstream.H:193
pid_t pid()
Return the PID of this process.
Definition: POSIX.C:73
static const char nl
Definition: Ostream.H:260
bool appendObject(const regIOobject &io, const fileName &pathName, IOstream::streamFormat fmt, IOstream::versionNumber ver, IOstream::compressionType cmp) const
Append to processors/ file.
static constexpr const char * foamFile
Keyword for the FoamFile header sub-dictionary.
Definition: IOobject.H:98
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
static fileCheckTypes fileModificationChecking
Type of file modification checking.
Definition: IOobject.H:219
label findIndex(const ListType &, typename ListType::const_reference, const label start=0)
Find first occurrence of given element and return index,.
bool mkDir(const fileName &, mode_t=0777)
Make a directory and return an error if it could not be created.
Definition: POSIX.C:290
word name(const complex &)
Return a string representation of a complex.
Definition: complex.C:47
static float maxThreadFileBufferSize
Max size of thread buffer size. This is the overall size of.
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
static bool & parRun()
Is this a parallel run?
Definition: UPstream.H:399
const fileName & instance() const
Definition: IOobject.H:390
static label nProcs(const label communicator=0)
Number of processes in parallel run.
Definition: UPstream.H:411
static Stream & writeEndDivider(Stream &os)
Write the standard end file divider.
Definition: IOobjectI.H:123
virtual void setNProcs(const label nProcs)
Set number of processor directories/results. Only used in.
#define WarningInFunction
Report a warning using Foam::Warning.
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:331
const Time & time() const
Return time.
Definition: IOobject.C:328
Input from memory buffer stream.
Definition: IStringStream.H:49
prefixOSstream Pout(cout, "Pout")
Definition: IOstreams.H:53
Version of masterUncollatedFileOperation that collates regIOobjects into a container in the processor...
string str() const
Return the string.
registerOptSwitch("maxThreadFileBufferSize", float, collatedFileOperation::maxThreadFileBufferSize)
virtual bool global() const
Is object same for all processors.
Definition: regIOobject.C:442
Version number type.
Definition: IOstream.H:96
regIOobject is an abstract class derived from IOobject to handle automatic object registration with t...
Definition: regIOobject.H:55
bool writeHeader(Ostream &) const
Write header.
defineTypeNameAndDebug(collatedFileOperation, 0)
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
Definition: UPstream.C:250
virtual bool writeData(Ostream &) const =0
Pure virtual writaData function.
T & last()
Return the last element of the list.
Definition: UListI.H:128
IOobject defines the attributes of an object for which implicit objectRegistry management is supporte...
Definition: IOobject.H:92
static void gatherList(const List< commsStruct > &comms, List< T > &Values, const int tag, const label comm)
Gather data but keep individual values separate.
fileName objectPath() const
Return complete path + object name.
Definition: IOobject.H:419
Output to memory buffer stream.
Definition: OStringStream.H:49
static List< int > & procID(label communicator)
Process ID of given process index.
Definition: UPstream.H:440
virtual word processorsDir(const IOobject &) const
Actual name of processors dir.
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:316
Namespace for OpenFOAM.
fileName path(UMean.rootPath()/UMean.caseName()/functionObjects::writeFile::outputPrefix/"graphs"/UMean.instance())
IOerror FatalIOError