OFstreamCollator.H
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration | Website: https://openfoam.org
5  \\ / A nd | Copyright (C) 2017-2019 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 Class
25  Foam::OFstreamCollator
26 
27 Description
28  Threaded file writer.
29 
30  Collects all data from all processors and writes as single
31  'decomposedBlockData' file. The operation is determined by the
32  buffer size (maxThreadFileBufferSize setting):
33  - local size of data is larger than buffer: receive and write processor
34  by processor (i.e. 'scheduled'). Does not use a thread, no file size
35  limit.
36  - total size of data is larger than buffer (but local is not):
37  thread does all the collecting and writing of the processors. No file
38  size limit.
39  - total size of data is less than buffer:
40  collecting is done locally; the thread only does the writing
41  (since the data has already been collected)
42 
43 
44 Operation determine
45 
46 SourceFiles
47  OFstreamCollator.C
48 
49 \*---------------------------------------------------------------------------*/
50 
51 #ifndef OFstreamCollator_H
52 #define OFstreamCollator_H
53 
54 #include <thread>
55 #include <mutex>
56 #include "IOstream.H"
57 #include "labelList.H"
58 #include "FIFOStack.H"
59 #include "SubList.H"
60 
61 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
62 
63 namespace Foam
64 {
65 
66 /*---------------------------------------------------------------------------*\
67  Class OFstreamCollator Declaration
68 \*---------------------------------------------------------------------------*/
69 
70 class OFstreamCollator
71 {
72  // Private class
73 
74  class writeData
75  {
76  public:
77 
78  const label comm_;
79  const word typeName_;
80  const fileName pathName_;
81  const string data_;
82  const labelList sizes_;
83  PtrList<List<char>> slaveData_;
84  const IOstream::streamFormat format_;
85  const IOstream::versionNumber version_;
86  const IOstream::compressionType compression_;
87  const bool append_;
88 
89  writeData
90  (
91  const label comm,
92  const word& typeName,
93  const fileName& pathName,
94  const string& data,
95  const labelList& sizes,
98  IOstream::compressionType compression,
99  const bool append
100  )
101  :
102  comm_(comm),
103  typeName_(typeName),
104  pathName_(pathName),
105  data_(data),
106  sizes_(sizes),
107  slaveData_(0),
108  format_(format),
109  version_(version),
110  compression_(compression),
111  append_(append)
112  {}
113 
114  //- (approximate) size of master + any optional slave data
115  off_t size() const
116  {
117  off_t sz = data_.size();
118  forAll(slaveData_, i)
119  {
120  if (slaveData_.set(i))
121  {
122  sz += slaveData_[i].size();
123  }
124  }
125  return sz;
126  }
127  };
128 
129 
130  // Private Data
131 
132  //- Total amount of storage to use for object stack below
133  const off_t maxBufferSize_;
134 
135  mutable std::mutex mutex_;
136 
137  autoPtr<std::thread> thread_;
138 
139  //- Stack of files to write + contents
140  FIFOStack<writeData*> objects_;
141 
142  //- Whether thread is running (and not exited)
143  bool threadRunning_;
144 
145  //- Communicator to use for all parallel ops (in simulation thread)
146  label localComm_;
147 
148  //- Communicator to use for all parallel ops (in write thread)
149  label threadComm_;
150 
151 
152  // Private Member Functions
153 
154  //- Write actual file
155  static bool writeFile
156  (
157  const label comm,
158  const word& typeName,
159  const fileName& fName,
160  const string& masterData,
161  const labelUList& recvSizes,
162  const PtrList<SubList<char>>& slaveData,
166  const bool append
167  );
168 
169  //- Write all files in stack
170  static void* writeAll(void *threadarg);
171 
172  //- Wait for total size of objects_ (master + optional slave data)
173  // to be wantedSize less than overall maxBufferSize.
174  void waitForBufferSpace(const off_t wantedSize) const;
175 
176 
177 public:
178 
179  // Declare name of the class and its debug switch
180  TypeName("OFstreamCollator");
181 
182 
183  // Constructors
184 
185  //- Construct from buffer size. 0 = do not use thread
186  OFstreamCollator(const off_t maxBufferSize);
187 
188  //- Construct from buffer size (0 = do not use thread) and local
189  // thread
190  OFstreamCollator(const off_t maxBufferSize, const label comm);
191 
192 
193  //- Destructor
194  virtual ~OFstreamCollator();
195 
196 
197  // Member Functions
198 
199  //- Write file with contents. Blocks until writethread has space
200  // available (total file sizes < maxBufferSize)
201  bool write
202  (
203  const word& typeName,
204  const fileName&,
205  const string& data,
209  const bool append,
210  const bool useThread = true
211  );
212 
213  //- Wait for all thread actions to have finished
214  void waitAll();
215 };
216 
217 
218 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
219 
220 } // End namespace Foam
221 
222 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
223 
224 #endif
225 
226 // ************************************************************************* //
A FIFO stack based on a singly-linked list.
Definition: FIFOStack.H:51
Threaded file writer.
#define forAll(list, i)
Loop across all elements in list.
Definition: UList.H:434
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
A class for handling file names.
Definition: fileName.H:79
bool set(const label) const
Is element set.
Definition: PtrListI.H:65
virtual ~OFstreamCollator()
Destructor.
word format(conversionProperties.lookup("format"))
bool write(const word &typeName, const fileName &, const string &data, IOstream::streamFormat, IOstream::versionNumber, IOstream::compressionType, const bool append, const bool useThread=true)
Write file with contents. Blocks until writethread has space.
A List obtained as a section of another List.
Definition: SubList.H:53
A class for handling words, derived from string.
Definition: word.H:59
void waitAll()
Wait for all thread actions to have finished.
streamFormat
Enumeration for the format of data in the stream.
Definition: IOstream.H:86
rAUs append(new volScalarField(IOobject::groupName("rAU", phase1.name()), 1.0/(U1Eqn.A()+byDt(max(phase1.residualAlpha() - alpha1, scalar(0)) *rho1))))
A 1D vector of objects of type <T>, where the size of the vector is known and can be used for subscri...
Definition: HashTable.H:60
compressionType
Enumeration for the format of data in the stream.
Definition: IOstream.H:193
Database for solution and other reduced data.
Definition: data.H:51
const bool writeData(readBool(pdfDictionary.lookup("writeData")))
label size() const
Return the number of elements in the UPtrList.
Definition: UPtrListI.H:29
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size. 0 = do not use thread.
A templated 1D list of pointers to objects of type <T>, where the size of the array is known and used...
Definition: List.H:70
Version number type.
Definition: IOstream.H:96
TypeName("OFstreamCollator")
Namespace for OpenFOAM.