UIPread.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration | Website: https://openfoam.org
5  \\ / A nd | Copyright (C) 2011-2018 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 Description
25  Read from UIPstream
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "UIPstream.H"
30 #include "PstreamGlobals.H"
31 #include "IOstreams.H"
32 
33 #include <mpi.h>
34 
35 // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
36 
38 (
39  const commsTypes commsType,
40  const int fromProcNo,
41  DynamicList<char>& externalBuf,
42  label& externalBufPosition,
43  const int tag,
44  const label comm,
45  const bool clearAtEnd,
46  streamFormat format,
47  versionNumber version
48 )
49 :
50  UPstream(commsType),
51  Istream(format, version),
52  fromProcNo_(fromProcNo),
53  externalBuf_(externalBuf),
54  externalBufPosition_(externalBufPosition),
55  tag_(tag),
56  comm_(comm),
57  clearAtEnd_(clearAtEnd),
58  messageSize_(0)
59 {
60  setOpened();
61  setGood();
62 
63  if (commsType == commsTypes::nonBlocking)
64  {
65  // Message is already received into externalBuf
66  }
67  else
68  {
69  MPI_Status status;
70 
71  label wantedSize = externalBuf_.capacity();
72 
73  if (debug)
74  {
75  Pout<< "UIPstream::UIPstream : read from:" << fromProcNo
76  << " tag:" << tag << " comm:" << comm_
77  << " wanted size:" << wantedSize
78  << Foam::endl;
79  }
80 
81 
82  // If the buffer size is not specified, probe the incoming message
83  // and set it
84  if (!wantedSize)
85  {
86  MPI_Probe
87  (
88  fromProcNo_,
89  tag_,
91  &status
92  );
93  MPI_Get_count(&status, MPI_BYTE, &messageSize_);
94 
95  externalBuf_.setCapacity(messageSize_);
96  wantedSize = messageSize_;
97 
98  if (debug)
99  {
100  Pout<< "UIPstream::UIPstream : probed size:" << wantedSize
101  << Foam::endl;
102  }
103  }
104 
105  messageSize_ = UIPstream::read
106  (
107  commsType,
108  fromProcNo_,
109  externalBuf_.begin(),
110  wantedSize,
111  tag_,
112  comm_
113  );
114 
115  // Set addressed size. Leave actual allocated memory intact.
116  externalBuf_.setSize(messageSize_);
117 
118  if (!messageSize_)
119  {
120  setEof();
121  }
122  }
123 }
124 
125 
126 Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
127 :
128  UPstream(buffers.commsType_),
129  Istream(buffers.format_, buffers.version_),
130  fromProcNo_(fromProcNo),
131  externalBuf_(buffers.recvBuf_[fromProcNo]),
132  externalBufPosition_(buffers.recvBufPos_[fromProcNo]),
133  tag_(buffers.tag_),
134  comm_(buffers.comm_),
135  clearAtEnd_(true),
136  messageSize_(0)
137 {
138  if
139  (
141  && !buffers.finishedSendsCalled_
142  )
143  {
145  << "PstreamBuffers::finishedSends() never called." << endl
146  << "Please call PstreamBuffers::finishedSends() after doing"
147  << " all your sends (using UOPstream) and before doing any"
148  << " receives (using UIPstream)" << Foam::exit(FatalError);
149  }
150 
151  setOpened();
152  setGood();
153 
155  {
156  // Message is already received into externalBuf
157  messageSize_ = buffers.recvBuf_[fromProcNo].size();
158 
159  if (debug)
160  {
161  Pout<< "UIPstream::UIPstream PstreamBuffers :"
162  << " fromProcNo:" << fromProcNo
163  << " tag:" << tag_ << " comm:" << comm_
164  << " receive buffer size:" << messageSize_
165  << Foam::endl;
166  }
167  }
168  else
169  {
170  MPI_Status status;
171 
172  label wantedSize = externalBuf_.capacity();
173 
174  if (debug)
175  {
176  Pout<< "UIPstream::UIPstream PstreamBuffers :"
177  << " read from:" << fromProcNo
178  << " tag:" << tag_ << " comm:" << comm_
179  << " wanted size:" << wantedSize
180  << Foam::endl;
181  }
182 
183  // If the buffer size is not specified, probe the incoming message
184  // and set it
185  if (!wantedSize)
186  {
187  MPI_Probe
188  (
189  fromProcNo_,
190  tag_,
192  &status
193  );
194  MPI_Get_count(&status, MPI_BYTE, &messageSize_);
195 
196  externalBuf_.setCapacity(messageSize_);
197  wantedSize = messageSize_;
198 
199  if (debug)
200  {
201  Pout<< "UIPstream::UIPstream PstreamBuffers : probed size:"
202  << wantedSize << Foam::endl;
203  }
204  }
205 
206  messageSize_ = UIPstream::read
207  (
208  commsType(),
209  fromProcNo_,
210  externalBuf_.begin(),
211  wantedSize,
212  tag_,
213  comm_
214  );
215 
216  // Set addressed size. Leave actual allocated memory intact.
217  externalBuf_.setSize(messageSize_);
218 
219  if (!messageSize_)
220  {
221  setEof();
222  }
223  }
224 }
225 
226 
227 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
228 
230 (
231  const commsTypes commsType,
232  const int fromProcNo,
233  char* buf,
234  const std::streamsize bufSize,
235  const int tag,
236  const label communicator
237 )
238 {
239  if (debug)
240  {
241  Pout<< "UIPstream::read : starting read from:" << fromProcNo
242  << " tag:" << tag << " comm:" << communicator
243  << " wanted size:" << label(bufSize)
244  << " commsType:" << UPstream::commsTypeNames[commsType]
245  << Foam::endl;
246  }
247  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
248  {
249  Pout<< "UIPstream::read : starting read from:" << fromProcNo
250  << " tag:" << tag << " comm:" << communicator
251  << " wanted size:" << label(bufSize)
252  << " commsType:" << UPstream::commsTypeNames[commsType]
253  << " warnComm:" << UPstream::warnComm
254  << Foam::endl;
256  }
257 
258  if (commsType == commsTypes::blocking || commsType == commsTypes::scheduled)
259  {
260  MPI_Status status;
261 
262  if
263  (
264  MPI_Recv
265  (
266  buf,
267  bufSize,
268  MPI_BYTE,
269  fromProcNo,
270  tag,
271  PstreamGlobals::MPICommunicators_[communicator],
272  &status
273  )
274  )
275  {
277  << "MPI_Recv cannot receive incoming message"
279 
280  return 0;
281  }
282 
283 
284  // Check size of message read
285 
286  int messageSize;
287  MPI_Get_count(&status, MPI_BYTE, &messageSize);
288 
289  if (debug)
290  {
291  Pout<< "UIPstream::read : finished read from:" << fromProcNo
292  << " tag:" << tag << " read size:" << label(bufSize)
293  << " commsType:" << UPstream::commsTypeNames[commsType]
294  << Foam::endl;
295  }
296 
297  if (messageSize > bufSize)
298  {
300  << "buffer (" << label(bufSize)
301  << ") not large enough for incoming message ("
302  << messageSize << ')'
304  }
305 
306  return messageSize;
307  }
308  else if (commsType == commsTypes::nonBlocking)
309  {
310  MPI_Request request;
311 
312  if
313  (
314  MPI_Irecv
315  (
316  buf,
317  bufSize,
318  MPI_BYTE,
319  fromProcNo,
320  tag,
321  PstreamGlobals::MPICommunicators_[communicator],
322  &request
323  )
324  )
325  {
327  << "MPI_Recv cannot start non-blocking receive"
329 
330  return 0;
331  }
332 
333  if (debug)
334  {
335  Pout<< "UIPstream::read : started read from:" << fromProcNo
336  << " tag:" << tag << " read size:" << label(bufSize)
337  << " commsType:" << UPstream::commsTypeNames[commsType]
338  << " request:" << PstreamGlobals::outstandingRequests_.size()
339  << Foam::endl;
340  }
341 
342  PstreamGlobals::outstandingRequests_.append(request);
343 
344  // Assume the message is completely received.
345  return bufSize;
346  }
347  else
348  {
350  << "Unsupported communications type "
351  << int(commsType)
353 
354  return 0;
355  }
356 }
357 
358 
359 // ************************************************************************* //
UIPstream(const commsTypes commsType, const int fromProcNo, DynamicList< char > &externalBuf, label &externalBufPosition, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool clearAtEnd=false, streamFormat format=BINARY, versionNumber version=currentVersion)
Construct given process index to read from and optional buffer size,.
Definition: UIPread.C:34
static void printStack(Ostream &)
Helper function to print a stack.
commsTypes commsType() const
Get the communications type of the stream.
Definition: UPstream.H:481
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
error FatalError
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:319
commsTypes
Types of communications.
Definition: UPstream.H:64
void setGood()
Set stream to be good.
Definition: IOstream.H:257
commsTypes commsType_
Communications type of this stream.
Definition: UPstream.H:252
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:256
DynamicList< MPI_Request > outstandingRequests_
Istream(streamFormat format=ASCII, versionNumber version=currentVersion, compressionType compression=UNCOMPRESSED)
Set stream status.
Definition: Istream.H:76
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
void setSize(const label)
Alter the addressed list size.
Definition: DynamicListI.H:163
void setCapacity(const label)
Alter the size of the underlying storage.
Definition: DynamicListI.H:118
iterator begin()
Return an iterator to begin traversing the UList.
Definition: UListI.H:216
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:281
errorManip< error > abort(error &err)
Definition: errorManip.H:131
static label read(const commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=0)
Read into given buffer from given processor and return the.
Definition: UIPread.C:79
static const NamedEnum< commsTypes, 3 > commsTypeNames
Definition: UPstream.H:71
void setEof()
Set stream to have reached eof.
Definition: IOstream.H:475
void setOpened()
Set stream opened.
Definition: IOstream.H:239
UPstream(const commsTypes commsType)
Construct given optional buffer size.
Definition: UPstream.H:287
prefixOSstream Pout(cout, "Pout")
Definition: IOstreams.H:53
DynamicList< MPI_Comm > MPICommunicators_
label capacity() const
Size of the underlying storage.
Definition: DynamicListI.H:109