collatedFileOperation.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration | Website: https://openfoam.org
5  \\ / A nd | Copyright (C) 2017-2018 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include "collatedFileOperation.H"
28 #include "Pstream.H"
29 #include "Time.H"
31 #include "decomposedBlockData.H"
32 #include "registerSwitch.H"
33 #include "masterOFstream.H"
34 #include "OFstream.H"
35 
36 /* * * * * * * * * * * * * * * Static Member Data * * * * * * * * * * * * * */
37 
38 namespace Foam
39 {
40 namespace fileOperations
41 {
42  defineTypeNameAndDebug(collatedFileOperation, 0);
44  (
45  fileOperation,
46  collatedFileOperation,
47  word
48  );
49 
51  (
52  debug::floatOptimisationSwitch("maxThreadFileBufferSize", 1e9)
53  );
55  (
56  "maxThreadFileBufferSize",
57  float,
59  );
60 
61  // Mark as needing threaded mpi
63  (
64  fileOperationInitialise,
65  collatedFileOperationInitialise,
66  word,
67  collated
68  );
69 }
70 }
71 
72 
73 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
74 
76 {
77  labelList ioRanks;
78 
79  string ioRanksString(getEnv("FOAM_IORANKS"));
80  if (!ioRanksString.empty())
81  {
82  IStringStream is(ioRanksString);
83  is >> ioRanks;
84  }
85 
86  return ioRanks;
87 }
88 
89 
91 (
92  const label proci
93 )
94 const
95 {
96  if (Pstream::parRun())
97  {
98  return Pstream::master(comm_);
99  }
100  else
101  {
102  // Use any IO ranks
103  if (ioRanks_.size())
104  {
105  // Find myself in IO rank
106  return findIndex(ioRanks_, proci) != -1;
107  }
108  else
109  {
110  // Assume all in single communicator
111  return proci == 0;
112  }
113  }
114 }
115 
116 
118 (
119  const regIOobject& io,
120  const fileName& pathName,
124 ) const
125 {
126  // Append to processors/ file
127 
128  label proci = detectProcessorPath(io.objectPath());
129 
130  if (debug)
131  {
132  Pout<< "collatedFileOperation::writeObject :"
133  << " For local object : " << io.name()
134  << " appending processor " << proci
135  << " data to " << pathName << endl;
136  }
137 
138  if (proci == -1)
139  {
141  << "Not a valid processor path " << pathName
142  << exit(FatalError);
143  }
144 
145  const bool isMaster = isMasterRank(proci);
146 
147  // Determine the local rank if the pathName is a per-rank one
148  label localProci = proci;
149  {
150  fileName path, procDir, local;
151  label groupStart, groupSize, nProcs;
152  splitProcessorPath
153  (
154  pathName,
155  path,
156  procDir,
157  local,
158  groupStart,
159  groupSize,
160  nProcs
161  );
162  if (groupSize > 0 && groupStart != -1)
163  {
164  localProci = proci-groupStart;
165  }
166  }
167 
168 
169  // Create string from all data to write
170  string buf;
171  {
172  OStringStream os(fmt, ver);
173  if (isMaster)
174  {
175  if (!io.writeHeader(os))
176  {
177  return false;
178  }
179  }
180 
181  // Write the data to the Ostream
182  if (!io.writeData(os))
183  {
184  return false;
185  }
186 
187  if (isMaster)
188  {
190  }
191 
192  buf = os.str();
193  }
194 
195 
196  // Note: cannot do append + compression. This is a limitation
197  // of ogzstream (or rather most compressed formats)
198 
199  OFstream os
200  (
201  pathName,
203  ver,
204  IOstream::UNCOMPRESSED, // no compression
205  !isMaster
206  );
207 
208  if (!os.good())
209  {
211  << "Cannot open for appending"
212  << exit(FatalIOError);
213  }
214 
215  if (isMaster)
216  {
218  << "FoamFile\n{\n"
219  << " version " << os.version() << ";\n"
220  << " format " << os.format() << ";\n"
221  << " class " << decomposedBlockData::typeName
222  << ";\n"
223  << " location " << pathName << ";\n"
224  << " object " << pathName.name() << ";\n"
225  << "}" << nl;
226  IOobject::writeDivider(os) << nl;
227  }
228 
229  // Write data
230  UList<char> slice
231  (
232  const_cast<char*>(buf.data()),
233  label(buf.size())
234  );
235  os << nl << "// Processor" << localProci << nl << slice << nl;
236 
237  return os.good();
238 }
239 
240 
241 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
242 
244 (
245  const bool verbose
246 )
247 :
249  (
250  (
251  ioRanks().size()
253  (
255  subRanks(Pstream::nProcs())
256  )
258  ),
259  false
260  ),
261  myComm_(comm_),
262  writer_(maxThreadFileBufferSize, comm_),
263  nProcs_(Pstream::nProcs()),
264  ioRanks_(ioRanks())
265 {
266  if (verbose)
267  {
268  Info<< "I/O : " << typeName
269  << " (maxThreadFileBufferSize " << maxThreadFileBufferSize
270  << ')' << endl;
271 
272  if (maxThreadFileBufferSize == 0)
273  {
274  Info<< " Threading not activated "
275  "since maxThreadFileBufferSize = 0." << nl
276  << " Writing may run slowly for large file sizes."
277  << endl;
278  }
279  else
280  {
281  Info<< " Threading activated "
282  "since maxThreadFileBufferSize > 0." << nl
283  << " Requires large enough buffer to collect all data"
284  " or thread support " << nl
285  << " enabled in MPI. If thread support cannot be "
286  "enabled, deactivate" << nl
287  << " threading by setting maxThreadFileBufferSize "
288  "to 0 in" << nl
289  << " $FOAM_ETC/controlDict"
290  << endl;
291  }
292 
293  if (ioRanks_.size())
294  {
295  // Print a bit of information
296  stringList ioRanks(Pstream::nProcs());
297  if (Pstream::master(comm_))
298  {
299  ioRanks[Pstream::myProcNo()] = hostName()+"."+name(pid());
300  }
301  Pstream::gatherList(ioRanks);
302 
303  Info<< " IO nodes:" << endl;
304  forAll(ioRanks, proci)
305  {
306  if (!ioRanks[proci].empty())
307  {
308  Info<< " " << ioRanks[proci] << endl;
309  }
310  }
311  }
312 
313 
314  if
315  (
318  )
319  {
321  << "Resetting fileModificationChecking to inotify" << endl;
322  }
323 
324  if
325  (
328  )
329  {
331  << "Resetting fileModificationChecking to timeStamp" << endl;
332  }
333  }
334 }
335 
336 
338 (
339  const label comm,
340  const labelList& ioRanks,
341  const word& typeName,
342  const bool verbose
343 )
344 :
345  masterUncollatedFileOperation(comm, false),
346  myComm_(-1),
347  writer_(maxThreadFileBufferSize, comm),
348  nProcs_(Pstream::nProcs()),
349  ioRanks_(ioRanks)
350 {
351  if (verbose)
352  {
353  Info<< "I/O : " << typeName
354  << " (maxThreadFileBufferSize " << maxThreadFileBufferSize
355  << ')' << endl;
356 
357  if (maxThreadFileBufferSize == 0)
358  {
359  Info<< " Threading not activated "
360  "since maxThreadFileBufferSize = 0." << nl
361  << " Writing may run slowly for large file sizes."
362  << endl;
363  }
364  else
365  {
366  Info<< " Threading activated "
367  "since maxThreadFileBufferSize > 0." << nl
368  << " Requires large enough buffer to collect all data"
369  " or thread support " << nl
370  << " enabled in MPI. If thread support cannot be "
371  "enabled, deactivate" << nl
372  << " threading by setting maxThreadFileBufferSize "
373  "to 0 in" << nl
374  << " $FOAM_ETC/controlDict"
375  << endl;
376  }
377 
378  if
379  (
382  )
383  {
385  << "Resetting fileModificationChecking to inotify" << endl;
386  }
387 
388  if
389  (
392  )
393  {
395  << "Resetting fileModificationChecking to timeStamp" << endl;
396  }
397  }
398 }
399 
400 
401 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
402 
404 {
405  if (myComm_ != -1)
406  {
408  }
409 }
410 
411 
412 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
413 
415 (
416  const IOobject& io,
417  const word& typeName
418 ) const
419 {
420  // Replacement for objectPath
421  if (io.time().processorCase())
422  {
424  (
425  io,
427  "dummy", // not used for processorsobject
428  io.instance()
429  );
430  }
431  else
432  {
434  (
435  io,
437  word::null,
438  io.instance()
439  );
440  }
441 }
442 
443 
445 (
446  const regIOobject& io,
450  const bool valid
451 ) const
452 {
453  const Time& tm = io.time();
454  const fileName& inst = io.instance();
455 
456  if (inst.isAbsolute() || !tm.processorCase())
457  {
458  mkDir(io.path());
459  fileName pathName(io.objectPath());
460 
461  if (debug)
462  {
463  Pout<< "collatedFileOperation::writeObject :"
464  << " For object : " << io.name()
465  << " falling back to master-only output to " << io.path()
466  << endl;
467  }
468 
469  masterOFstream os
470  (
471  pathName,
472  fmt,
473  ver,
474  cmp,
475  false,
476  valid
477  );
478 
479  // If any of these fail, return (leave error handling to Ostream class)
480  if (!os.good())
481  {
482  return false;
483  }
484  if (!io.writeHeader(os))
485  {
486  return false;
487  }
488  // Write the data to the Ostream
489  if (!io.writeData(os))
490  {
491  return false;
492  }
494 
495  return true;
496  }
497  else
498  {
499  // Construct the equivalent processors/ directory
500  fileName path(processorsPath(io, inst, processorsDir(io)));
501 
502  mkDir(path);
503  fileName pathName(path/io.name());
504 
505  if (io.global())
506  {
507  if (debug)
508  {
509  Pout<< "collatedFileOperation::writeObject :"
510  << " For global object : " << io.name()
511  << " falling back to master-only output to " << pathName
512  << endl;
513  }
514 
515  masterOFstream os
516  (
517  pathName,
518  fmt,
519  ver,
520  cmp,
521  false,
522  valid
523  );
524 
525  // If any of these fail, return (leave error handling to Ostream
526  // class)
527  if (!os.good())
528  {
529  return false;
530  }
531  if (!io.writeHeader(os))
532  {
533  return false;
534  }
535  // Write the data to the Ostream
536  if (!io.writeData(os))
537  {
538  return false;
539  }
541 
542  return true;
543  }
544  else if (!Pstream::parRun())
545  {
546  // Special path for e.g. decomposePar. Append to
547  // processorsDDD/ file
548  if (debug)
549  {
550  Pout<< "collatedFileOperation::writeObject :"
551  << " For object : " << io.name()
552  << " appending to " << pathName << endl;
553  }
554 
555  return appendObject(io, pathName, fmt, ver, cmp);
556  }
557  else
558  {
559  // Re-check static maxThreadFileBufferSize variable to see
560  // if needs to use threading
561  bool useThread = (maxThreadFileBufferSize > 0);
562 
563  if (debug)
564  {
565  Pout<< "collatedFileOperation::writeObject :"
566  << " For object : " << io.name()
567  << " starting collating output to " << pathName
568  << " useThread:" << useThread << endl;
569  }
570 
571  if (!useThread)
572  {
573  writer_.waitAll();
574  }
575 
577  (
578  writer_,
579  pathName,
580  fmt,
581  ver,
582  cmp,
583  useThread
584  );
585 
586  // If any of these fail, return (leave error handling to Ostream
587  // class)
588  if (!os.good())
589  {
590  return false;
591  }
592  if (Pstream::master(comm_) && !io.writeHeader(os))
593  {
594  return false;
595  }
596  // Write the data to the Ostream
597  if (!io.writeData(os))
598  {
599  return false;
600  }
601  if (Pstream::master(comm_))
602  {
604  }
605 
606  return true;
607  }
608  }
609 }
610 
612 {
613  if (debug)
614  {
615  Pout<< "collatedFileOperation::flush : clearing and waiting for thread"
616  << endl;
617  }
619  // Wait for thread to finish (note: also removes thread)
620  writer_.waitAll();
621 }
622 
623 
625 (
626  const fileName& fName
627 ) const
628 {
629  if (Pstream::parRun())
630  {
631  const List<int>& procs(UPstream::procID(comm_));
632 
633  word procDir(processorsBaseDir+Foam::name(Pstream::nProcs()));
634 
635  if (procs.size() != Pstream::nProcs())
636  {
637  procDir +=
638  + "_"
639  + Foam::name(procs[0])
640  + "-"
641  + Foam::name(procs.last());
642  }
643  return procDir;
644  }
645  else
646  {
647  word procDir(processorsBaseDir+Foam::name(nProcs_));
648 
649  if (ioRanks_.size())
650  {
651  // Detect current processor number
652  label proci = detectProcessorPath(fName);
653 
654  if (proci != -1)
655  {
656  // Find lowest io rank
657  label minProc = 0;
658  label maxProc = nProcs_-1;
659  forAll(ioRanks_, i)
660  {
661  if (ioRanks_[i] >= nProcs_)
662  {
663  break;
664  }
665  else if (ioRanks_[i] <= proci)
666  {
667  minProc = ioRanks_[i];
668  }
669  else
670  {
671  maxProc = ioRanks_[i]-1;
672  break;
673  }
674  }
675  procDir +=
676  + "_"
677  + Foam::name(minProc)
678  + "-"
679  + Foam::name(maxProc);
680  }
681  }
682 
683  return procDir;
684  }
685 }
686 
687 
689 (
690  const IOobject& io
691 ) const
692 {
693  return processorsDir(io.objectPath());
694 }
695 
696 
698 {
699  nProcs_ = nProcs;
700 
701  if (debug)
702  {
703  Pout<< "collatedFileOperation::setNProcs :"
704  << " Setting number of processors to " << nProcs_ << endl;
705  }
706 }
707 
708 
709 // ************************************************************************* //
string getEnv(const word &)
Return environment variable of given name.
Definition: POSIX.C:96
fileName localObjectPath(const IOobject &, const pathType &searchType, const word &processorsDir, const word &instancePath) const
Construct filePath.
#define forAll(list, i)
Loop across all elements in list.
Definition: UList.H:428
bool processorCase() const
Return true if this is a processor case.
Definition: TimePaths.H:90
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
const word & name() const
Return name.
Definition: IOobject.H:297
A class for handling file names.
Definition: fileName.H:69
static Stream & writeBanner(Stream &os, bool noHint=false)
Write the standard OpenFOAM file/dictionary banner.
Definition: IOobjectI.H:45
float floatOptimisationSwitch(const char *name, const float defaultValue=0)
Lookup optimisation switch or add default value.
Definition: debug.C:206
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
error FatalError
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:319
Output to file stream.
Definition: OFstream.H:82
static Stream & writeDivider(Stream &os)
Write the standard file section divider.
Definition: IOobjectI.H:93
void size(const label)
Override size to be inconsistent with allocated storage.
Definition: ListI.H:163
static int myProcNo(const label communicator=0)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:427
collatedFileOperation(const bool verbose)
Construct null.
Master-only drop-in replacement for OFstream.
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:256
static bool master(const label communicator=0)
Am I the master process.
Definition: UPstream.H:421
versionNumber version() const
Return the stream version.
Definition: IOstream.H:399
static label worldComm
Default communicator (all processors)
Definition: UPstream.H:278
Class to control time during OpenFOAM simulations that is also the top-level objectRegistry.
Definition: Time.H:68
bool good() const
Return true if next operation might succeed.
Definition: IOstream.H:333
Macros for easy insertion into run-time selection tables.
addNamedToRunTimeSelectionTable(fileOperationInitialise, collatedFileOperationInitialise, word, collated)
fileName path() const
Return complete path.
Definition: IOobject.C:397
addToRunTimeSelectionTable(fileOperation, collatedFileOperation, word)
string hostName(const bool full=false)
Return the system&#39;s host name, as per hostname(1)
Definition: POSIX.C:124
bool isAbsolute() const
Return true if file name is absolute.
Definition: fileName.C:57
virtual fileName objectPath(const IOobject &io, const word &typeName) const
Generate disk file name for object. Opposite of filePath.
A class for handling words, derived from string.
Definition: word.H:59
Master-only drop-in replacement for OFstream.
word name() const
Return file name (part beyond last /)
Definition: fileName.C:179
streamFormat
Enumeration for the format of data in the stream.
Definition: IOstream.H:86
static const word null
An empty word.
Definition: word.H:77
streamFormat format() const
Return current stream format.
Definition: IOstream.H:377
bool isMasterRank(const label proci) const
Is proci master of communicator (in parallel) or master of.
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:61
compressionType
Enumeration for the format of data in the stream.
Definition: IOstream.H:193
pid_t pid()
Return the PID of this process.
Definition: POSIX.C:72
static const char nl
Definition: Ostream.H:265
bool appendObject(const regIOobject &io, const fileName &pathName, IOstream::streamFormat fmt, IOstream::versionNumber ver, IOstream::compressionType cmp) const
Append to processors/ file.
virtual bool writeObject(const regIOobject &, IOstream::streamFormat format=IOstream::ASCII, IOstream::versionNumber version=IOstream::currentVersion, IOstream::compressionType compression=IOstream::UNCOMPRESSED, const bool valid=true) const
Writes a regIOobject (so header, contents and divider).
fileName path(UMean.rootPath()/UMean.caseName()/"graphs"/UMean.instance())
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
static fileCheckTypes fileModificationChecking
Type of file modification checking.
Definition: IOobject.H:214
label findIndex(const ListType &, typename ListType::const_reference, const label start=0)
Find first occurrence of given element and return index,.
bool mkDir(const fileName &, mode_t=0777)
Make a directory and return an error if it could not be created.
Definition: POSIX.C:289
word name(const complex &)
Return a string representation of a complex.
Definition: complex.C:47
static float maxThreadFileBufferSize
Max size of thread buffer size. This is the overall size of.
virtual bool global() const
Is object global.
Definition: regIOobject.H:299
virtual void flush() const
Forcibly wait until all output done. Flush any cached data.
static bool & parRun()
Is this a parallel run?
Definition: UPstream.H:397
const fileName & instance() const
Definition: IOobject.H:392
static label nProcs(const label communicator=0)
Number of processes in parallel run.
Definition: UPstream.H:409
static Stream & writeEndDivider(Stream &os)
Write the standard end file divider.
Definition: IOobjectI.H:103
virtual void setNProcs(const label nProcs)
Set number of processor directories/results. Only used in.
#define WarningInFunction
Report a warning using Foam::Warning.
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:331
const Time & time() const
Return time.
Definition: IOobject.C:367
Input from memory buffer stream.
Definition: IStringStream.H:49
prefixOSstream Pout(cout, "Pout")
Definition: IOstreams.H:53
string str() const
Return the string.
registerOptSwitch("maxThreadFileBufferSize", float, collatedFileOperation::maxThreadFileBufferSize)
Version number type.
Definition: IOstream.H:96
regIOobject is an abstract class derived from IOobject to handle automatic object registration with t...
Definition: regIOobject.H:65
bool writeHeader(Ostream &) const
Write header.
messageStream Info
defineTypeNameAndDebug(collatedFileOperation, 0)
static label allocateCommunicator(const label parent, const labelList &subRanks, const bool doPstream=true)
Allocate a new communicator.
Definition: UPstream.C:253
virtual bool writeData(Ostream &) const =0
Pure virtual writaData function.
T & last()
Return the last element of the list.
Definition: UListI.H:128
IOobject defines the attributes of an object for which implicit objectRegistry management is supporte...
Definition: IOobject.H:92
static void gatherList(const List< commsStruct > &comms, List< T > &Values, const int tag, const label comm)
Gather data but keep individual values separate.
fileName objectPath() const
Return complete path + object name.
Definition: IOobject.H:418
Output to memory buffer stream.
Definition: OStringStream.H:49
static List< int > & procID(label communicator)
Process ID of given process index.
Definition: UPstream.H:438
virtual word processorsDir(const IOobject &) const
Actual name of processors dir.
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:319
Namespace for OpenFOAM.
IOerror FatalIOError