UIPread.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | Copyright (C) 2011-2015 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 Description
25  Read from UIPstream
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "UIPstream.H"
30 #include "PstreamGlobals.H"
31 #include "IOstreams.H"
32 
33 #include <mpi.h>
34 
35 // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
36 
38 (
39  const commsTypes commsType,
40  const int fromProcNo,
41  DynamicList<char>& externalBuf,
42  label& externalBufPosition,
43  const int tag,
44  const label comm,
45  const bool clearAtEnd,
46  streamFormat format,
47  versionNumber version
48 )
49 :
50  UPstream(commsType),
51  Istream(format, version),
52  fromProcNo_(fromProcNo),
53  externalBuf_(externalBuf),
54  externalBufPosition_(externalBufPosition),
55  tag_(tag),
56  comm_(comm),
57  clearAtEnd_(clearAtEnd),
58  messageSize_(0)
59 {
60  setOpened();
61  setGood();
62 
63  if (commsType == UPstream::nonBlocking)
64  {
65  // Message is already received into externalBuf
66  }
67  else
68  {
69  MPI_Status status;
70 
71  label wantedSize = externalBuf_.capacity();
72 
73  if (debug)
74  {
75  Pout<< "UIPstream::UIPstream : read from:" << fromProcNo
76  << " tag:" << tag << " comm:" << comm_
77  << " wanted size:" << wantedSize
78  << Foam::endl;
79  }
80 
81 
82  // If the buffer size is not specified, probe the incomming message
83  // and set it
84  if (!wantedSize)
85  {
86  MPI_Probe
87  (
88  fromProcNo_,
89  tag_,
91  &status
92  );
93  MPI_Get_count(&status, MPI_BYTE, &messageSize_);
94 
95  externalBuf_.setCapacity(messageSize_);
96  wantedSize = messageSize_;
97 
98  if (debug)
99  {
100  Pout<< "UIPstream::UIPstream : probed size:" << wantedSize
101  << Foam::endl;
102  }
103  }
104 
105  messageSize_ = UIPstream::read
106  (
107  commsType,
108  fromProcNo_,
109  externalBuf_.begin(),
110  wantedSize,
111  tag_,
112  comm_
113  );
114 
115  // Set addressed size. Leave actual allocated memory intact.
116  externalBuf_.setSize(messageSize_);
117 
118  if (!messageSize_)
119  {
120  setEof();
121  }
122  }
123 }
124 
125 
126 Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
127 :
128  UPstream(buffers.commsType_),
129  Istream(buffers.format_, buffers.version_),
130  fromProcNo_(fromProcNo),
131  externalBuf_(buffers.recvBuf_[fromProcNo]),
132  externalBufPosition_(buffers.recvBufPos_[fromProcNo]),
133  tag_(buffers.tag_),
134  comm_(buffers.comm_),
135  clearAtEnd_(true),
136  messageSize_(0)
137 {
138  if (commsType() != UPstream::scheduled && !buffers.finishedSendsCalled_)
139  {
141  << "PstreamBuffers::finishedSends() never called." << endl
142  << "Please call PstreamBuffers::finishedSends() after doing"
143  << " all your sends (using UOPstream) and before doing any"
144  << " receives (using UIPstream)" << Foam::exit(FatalError);
145  }
146 
147  setOpened();
148  setGood();
149 
151  {
152  // Message is already received into externalBuf
153  messageSize_ = buffers.recvBuf_[fromProcNo].size();
154 
155  if (debug)
156  {
157  Pout<< "UIPstream::UIPstream PstreamBuffers :"
158  << " fromProcNo:" << fromProcNo
159  << " tag:" << tag_ << " comm:" << comm_
160  << " receive buffer size:" << messageSize_
161  << Foam::endl;
162  }
163  }
164  else
165  {
166  MPI_Status status;
167 
168  label wantedSize = externalBuf_.capacity();
169 
170  if (debug)
171  {
172  Pout<< "UIPstream::UIPstream PstreamBuffers :"
173  << " read from:" << fromProcNo
174  << " tag:" << tag_ << " comm:" << comm_
175  << " wanted size:" << wantedSize
176  << Foam::endl;
177  }
178 
179  // If the buffer size is not specified, probe the incomming message
180  // and set it
181  if (!wantedSize)
182  {
183  MPI_Probe
184  (
185  fromProcNo_,
186  tag_,
188  &status
189  );
190  MPI_Get_count(&status, MPI_BYTE, &messageSize_);
191 
192  externalBuf_.setCapacity(messageSize_);
193  wantedSize = messageSize_;
194 
195  if (debug)
196  {
197  Pout<< "UIPstream::UIPstream PstreamBuffers : probed size:"
198  << wantedSize << Foam::endl;
199  }
200  }
201 
202  messageSize_ = UIPstream::read
203  (
204  commsType(),
205  fromProcNo_,
206  externalBuf_.begin(),
207  wantedSize,
208  tag_,
209  comm_
210  );
211 
212  // Set addressed size. Leave actual allocated memory intact.
213  externalBuf_.setSize(messageSize_);
214 
215  if (!messageSize_)
216  {
217  setEof();
218  }
219  }
220 }
221 
222 
223 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
224 
226 (
227  const commsTypes commsType,
228  const int fromProcNo,
229  char* buf,
230  const std::streamsize bufSize,
231  const int tag,
232  const label communicator
233 )
234 {
235  if (debug)
236  {
237  Pout<< "UIPstream::read : starting read from:" << fromProcNo
238  << " tag:" << tag << " comm:" << communicator
239  << " wanted size:" << label(bufSize)
240  << " commsType:" << UPstream::commsTypeNames[commsType]
241  << Foam::endl;
242  }
243  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
244  {
245  Pout<< "UIPstream::read : starting read from:" << fromProcNo
246  << " tag:" << tag << " comm:" << communicator
247  << " wanted size:" << label(bufSize)
248  << " commsType:" << UPstream::commsTypeNames[commsType]
249  << " warnComm:" << UPstream::warnComm
250  << Foam::endl;
252  }
253 
254  if (commsType == blocking || commsType == scheduled)
255  {
256  MPI_Status status;
257 
258  if
259  (
260  MPI_Recv
261  (
262  buf,
263  bufSize,
264  MPI_BYTE,
265  fromProcNo,
266  tag,
267  PstreamGlobals::MPICommunicators_[communicator],
268  &status
269  )
270  )
271  {
273  << "MPI_Recv cannot receive incomming message"
275 
276  return 0;
277  }
278 
279 
280  // Check size of message read
281 
282  int messageSize;
283  MPI_Get_count(&status, MPI_BYTE, &messageSize);
284 
285  if (debug)
286  {
287  Pout<< "UIPstream::read : finished read from:" << fromProcNo
288  << " tag:" << tag << " read size:" << label(bufSize)
289  << " commsType:" << UPstream::commsTypeNames[commsType]
290  << Foam::endl;
291  }
292 
293  if (messageSize > bufSize)
294  {
296  << "buffer (" << label(bufSize)
297  << ") not large enough for incomming message ("
298  << messageSize << ')'
300  }
301 
302  return messageSize;
303  }
304  else if (commsType == nonBlocking)
305  {
306  MPI_Request request;
307 
308  if
309  (
310  MPI_Irecv
311  (
312  buf,
313  bufSize,
314  MPI_BYTE,
315  fromProcNo,
316  tag,
317  PstreamGlobals::MPICommunicators_[communicator],
318  &request
319  )
320  )
321  {
323  << "MPI_Recv cannot start non-blocking receive"
325 
326  return 0;
327  }
328 
329  if (debug)
330  {
331  Pout<< "UIPstream::read : started read from:" << fromProcNo
332  << " tag:" << tag << " read size:" << label(bufSize)
333  << " commsType:" << UPstream::commsTypeNames[commsType]
334  << " request:" << PstreamGlobals::outstandingRequests_.size()
335  << Foam::endl;
336  }
337 
338  PstreamGlobals::outstandingRequests_.append(request);
339 
340  // Assume the message is completely received.
341  return bufSize;
342  }
343  else
344  {
346  << "Unsupported communications type "
347  << commsType
349 
350  return 0;
351  }
352 }
353 
354 
355 // ************************************************************************* //
UIPstream(const commsTypes commsType, const int fromProcNo, DynamicList< char > &externalBuf, label &externalBufPosition, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool clearAtEnd=false, streamFormat format=BINARY, versionNumber version=currentVersion)
Construct given process index to read from and optional buffer size,.
Definition: UIPread.C:34
static void printStack(Ostream &)
Helper function to print a stack.
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
error FatalError
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:319
commsTypes
Types of communications.
Definition: UPstream.H:64
void setGood()
Set stream to be good.
Definition: IOstream.H:257
commsTypes commsType_
Communications type of this stream.
Definition: UPstream.H:248
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:253
DynamicList< MPI_Request > outstandingRequests_
Istream(streamFormat format=ASCII, versionNumber version=currentVersion, compressionType compression=UNCOMPRESSED)
Set stream status.
Definition: Istream.H:76
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
void setSize(const label)
Alter the addressed list size.
Definition: DynamicListI.H:163
void setCapacity(const label)
Alter the size of the underlying storage.
Definition: DynamicListI.H:118
iterator begin()
Return an iterator to begin traversing the UList.
Definition: UListI.H:216
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:277
errorManip< error > abort(error &err)
Definition: errorManip.H:131
prefixOSstream Pout(cout,"Pout")
Definition: IOstreams.H:53
static label read(const commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=0)
Read into given buffer from given processor and return the.
Definition: UIPread.C:79
static const NamedEnum< commsTypes, 3 > commsTypeNames
Definition: UPstream.H:71
void setEof()
Set stream to have reached eof.
Definition: IOstream.H:475
commsTypes commsType() const
Get the communications type of the stream.
Definition: UPstream.H:471
void setOpened()
Set stream opened.
Definition: IOstream.H:239
UPstream(const commsTypes commsType)
Construct given optional buffer size.
Definition: UPstream.H:283
label capacity() const
Size of the underlying storage.
Definition: DynamicListI.H:109
DynamicList< MPI_Comm > MPICommunicators_