OFstreamCollator.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | Copyright (C) 2017 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include "OFstreamCollator.H"
27 #include "OFstream.H"
28 #include "OSspecific.H"
29 #include "IOstreams.H"
30 #include "Pstream.H"
31 #include "decomposedBlockData.H"
32 #include "PstreamReduceOps.H"
33 
34 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
35 
36 namespace Foam
37 {
38  defineTypeNameAndDebug(OFstreamCollator, 0);
39 }
40 
41 
42 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
43 
44 bool Foam::OFstreamCollator::writeFile
45 (
46  const label comm,
47  const word& typeName,
48  const fileName& fName,
49  const string& s,
51  IOstream::versionNumber ver,
53  const bool append
54 )
55 {
56  if (debug)
57  {
58  Pout<< "OFstreamCollator : Writing " << s.size()
59  << " bytes to " << fName
60  << " using comm " << comm << endl;
61  }
62 
63  autoPtr<OSstream> osPtr;
64  if (UPstream::master(comm))
65  {
66  Foam::mkDir(fName.path());
67  osPtr.reset
68  (
69  new OFstream
70  (
71  fName,
72  fmt,
73  ver,
74  cmp,
75  append
76  )
77  );
78 
79  // We don't have IOobject so cannot use IOobject::writeHeader
80  OSstream& os = osPtr();
82  (
83  os,
84  ver,
85  fmt,
86  typeName,
87  "",
88  fName,
89  fName.name()
90  );
91  }
92 
93  UList<char> slice(const_cast<char*>(s.data()), label(s.size()));
94 
95  // Assuming threaded writing hides any slowness so we might
96  // as well use scheduled communication to send the data to
97  // the master processor in order.
98 
99  List<std::streamoff> start;
101  (
102  comm,
103  osPtr,
104  start,
105  slice,
107  false // do not reduce return state
108  );
109 
110  if (osPtr.valid() && !osPtr().good())
111  {
112  FatalIOErrorInFunction(osPtr())
113  << "Failed writing to " << fName << exit(FatalIOError);
114  }
115 
116  if (debug)
117  {
118  Pout<< "OFstreamCollator : Finished writing " << s.size()
119  << " bytes to " << fName
120  << " using comm " << comm << endl;
121  }
122 
123  return true;
124 }
125 
126 
127 void* Foam::OFstreamCollator::writeAll(void *threadarg)
128 {
129  OFstreamCollator& handler = *static_cast<OFstreamCollator*>(threadarg);
130 
131  // Consume stack
132  while (true)
133  {
134  writeData* ptr = nullptr;
135 
136  //pthread_mutex_lock(&handler.mutex_);
137  lockMutex(handler.mutex_);
138 
139  if (handler.objects_.size())
140  {
141  ptr = handler.objects_.pop();
142  }
143  //pthread_mutex_unlock(&handler.mutex_);
144  unlockMutex(handler.mutex_);
145 
146  if (!ptr)
147  {
148  break;
149  }
150  else
151  {
152  bool ok = writeFile
153  (
154  handler.comm_,
155  ptr->typeName_,
156  ptr->pathName_,
157  ptr->data_,
158  ptr->format_,
159  ptr->version_,
160  ptr->compression_,
161  ptr->append_
162  );
163  if (!ok)
164  {
165  FatalIOErrorInFunction(ptr->pathName_)
166  << "Failed writing " << ptr->pathName_
167  << exit(FatalIOError);
168  }
169 
170  delete ptr;
171  }
172  //sleep(1);
173  }
174 
175  if (debug)
176  {
177  Pout<< "OFstreamCollator : Exiting write thread " << endl;
178  }
179 
180  //pthread_mutex_lock(&handler.mutex_);
181  lockMutex(handler.mutex_);
182  handler.threadRunning_ = false;
183  //pthread_mutex_unlock(&handler.mutex_);
184  unlockMutex(handler.mutex_);
185 
186  return nullptr;
187 }
188 
189 
190 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
191 
193 :
194  maxBufferSize_(maxBufferSize),
195  //mutex_(PTHREAD_MUTEX_INITIALIZER),
196  mutex_
197  (
198  maxBufferSize_ > 0
199  ? allocateMutex()
200  : -1
201  ),
202  thread_
203  (
204  maxBufferSize_ > 0
205  ? allocateThread()
206  : -1
207  ),
208  threadRunning_(false),
209  comm_
210  (
211  UPstream::allocateCommunicator
212  (
213  UPstream::worldComm,
214  identity(UPstream::nProcs(UPstream::worldComm))
215  )
216  )
217 {}
218 
219 
220 // * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * //
221 
223 {
224  if (threadRunning_)
225  {
226  if (debug)
227  {
228  Pout<< "~OFstreamCollator : Waiting for write thread" << endl;
229  }
230 
231  //pthread_join(thread_, nullptr);
232  joinThread(thread_);
233  }
234  if (thread_ != -1)
235  {
236  freeThread(thread_);
237  }
238  if (mutex_ != -1)
239  {
240  freeMutex(mutex_);
241  }
242  if (comm_ != -1)
243  {
245  }
246 }
247 
248 
249 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
250 
252 (
253  const word& typeName,
254  const fileName& fName,
255  const string& data,
259  const bool append
260 )
261 {
262  if (maxBufferSize_ > 0)
263  {
264  while (true)
265  {
266  // Count files to be written
267  off_t totalSize = 0;
268  //pthread_mutex_lock(&mutex_);
269  lockMutex(mutex_);
270  forAllConstIter(FIFOStack<writeData*>, objects_, iter)
271  {
272  totalSize += iter()->data_.size();
273  }
274  //pthread_mutex_unlock(&mutex_);
275  unlockMutex(mutex_);
276 
277  if
278  (
279  totalSize == 0
280  || (totalSize+off_t(data.size()) < maxBufferSize_)
281  )
282  {
283  break;
284  }
285 
286  if (debug)
287  {
288  Pout<< "OFstreamCollator : Waiting for buffer space."
289  << " Currently in use:" << totalSize
290  << " limit:" << maxBufferSize_
291  << endl;
292  }
293 
294  sleep(5);
295  }
296 
297  if (debug)
298  {
299  Pout<< "OFstreamCollator : relaying write of " << fName
300  << " to thread " << endl;
301  }
302  //pthread_mutex_lock(&mutex_);
303  lockMutex(mutex_);
304  objects_.push
305  (
306  new writeData(typeName, fName, data, fmt, ver, cmp, append)
307  );
308  //pthread_mutex_unlock(&mutex_);
309  unlockMutex(mutex_);
310 
311  //pthread_mutex_lock(&mutex_);
312  lockMutex(mutex_);
313  if (!threadRunning_)
314  {
315  createThread(thread_, writeAll, this);
316  if (debug)
317  {
318  Pout<< "OFstreamCollator : Started write thread " << endl;
319  }
320  threadRunning_ = true;
321  }
322  //pthread_mutex_unlock(&mutex_);
323  unlockMutex(mutex_);
324 
325  return true;
326  }
327  else
328  {
329  // Immediate writing
330  return writeFile(comm_, typeName, fName, data, fmt, ver, cmp, append);
331  }
332 }
333 
334 
335 // ************************************************************************* //
A FIFO stack based on a singly-linked list.
Definition: FIFOStack.H:51
void freeMutex(const label)
Free a mutex variable.
Definition: POSIX.C:1481
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
A class for handling file names.
Definition: fileName.H:69
Inter-processor communication reduction functions.
virtual ~OFstreamCollator()
Destructor.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:253
static bool master(const label communicator=0)
Am I the master process.
Definition: UPstream.H:412
static bool writeBlocks(const label comm, autoPtr< OSstream > &osPtr, List< std::streamoff > &start, const UList< char > &, const UPstream::commsTypes, const bool syncReturnState=true)
Write *this. Ostream only valid on master. Returns starts of.
labelList identity(const label len)
Create identity map (map[i] == i) of given length.
Definition: ListOps.C:104
void unlockMutex(const label)
Unlock a mutex variable.
Definition: POSIX.C:1467
void freeThread(const label)
Delete a thread.
Definition: POSIX.C:1416
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
Functions used by OpenFOAM that are specific to POSIX compliant operating systems and need to be repl...
A class for handling words, derived from string.
Definition: word.H:59
static void writeHeader(Ostream &os, const IOstream::versionNumber version, const IOstream::streamFormat format, const word &type, const string &note, const fileName &location, const word &name)
Helper: write FoamFile IOobject header.
streamFormat
Enumeration for the format of data in the stream.
Definition: IOstream.H:86
forAllConstIter(PtrDictionary< phaseModel >, mixture.phases(), phase)
Definition: pEqn.H:29
void joinThread(const label)
Wait for thread.
Definition: POSIX.C:1402
void createThread(const label, void *(*start_routine)(void *), void *arg)
Start a thread.
Definition: POSIX.C:1384
compressionType
Enumeration for the format of data in the stream.
Definition: IOstream.H:193
defineTypeNameAndDebug(combustionModel, 0)
const bool writeData(readBool(pdfDictionary.lookup("writeData")))
void lockMutex(const label)
Lock a mutex variable.
Definition: POSIX.C:1453
bool mkDir(const fileName &, mode_t=0777)
Make a directory and return an error if it could not be created.
Definition: POSIX.C:297
label allocateThread()
Allocate a thread.
Definition: POSIX.C:1356
OFstreamCollator(const off_t maxBufferSize)
Construct from buffer size. 0 = do not use thread.
#define FatalIOErrorInFunction(ios)
Report an error message using Foam::FatalIOError.
Definition: error.H:331
prefixOSstream Pout(cout, "Pout")
Definition: IOstreams.H:53
void push(const T &a)
Push an element onto the stack.
Definition: FIFOStack.H:84
Version number type.
Definition: IOstream.H:96
label allocateMutex()
Allocate a mutex variable.
Definition: POSIX.C:1426
unsigned int sleep(const unsigned int)
Sleep for the specified number of seconds.
Definition: POSIX.C:1092
bool write(const word &typeName, const fileName &, const string &data, IOstream::streamFormat, IOstream::versionNumber, IOstream::compressionType, const bool append)
Write file with contents. Blocks until writethread has space.
Inter-processor communications stream.
Definition: UPstream.H:58
static void freeCommunicator(const label communicator, const bool doPstream=true)
Free a previously allocated communicator.
Definition: UPstream.C:316
Namespace for OpenFOAM.
IOerror FatalIOError