UIPread.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration | Website: https://openfoam.org
5  \\ / A nd | Copyright (C) 2011-2024 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 Description
25  Read from UIPstream
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "UIPstream.H"
30 #include "PstreamGlobals.H"
31 #include "IOstreams.H"
32 
33 #include <mpi.h>
34 
35 // * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * //
36 
38 (
39  const commsTypes commsType,
40  const int fromProcNo,
41  DynamicList<char>& externalBuf,
42  label& externalBufPosition,
43  const int tag,
44  const label comm,
45  const bool clearAtEnd,
46  const streamFormat format,
47  const versionNumber version,
48  const bool global
49 )
50 :
51  UPstream(commsType),
52  Istream(format, version, UNCOMPRESSED, global),
53  fromProcNo_(fromProcNo),
54  externalBuf_(externalBuf),
55  externalBufPosition_(externalBufPosition),
56  tag_(tag),
57  comm_(comm),
58  clearAtEnd_(clearAtEnd),
59  messageSize_(0)
60 {
61  setOpened();
62  setGood();
63 
64  if (commsType == commsTypes::nonBlocking)
65  {
66  // Message is already received into externalBuf
67  }
68  else
69  {
70  MPI_Status status;
71 
72  label wantedSize = externalBuf_.capacity();
73 
74  if (debug)
75  {
76  Pout<< "UIPstream::UIPstream : read from:" << fromProcNo
77  << " tag:" << tag << " comm:" << comm_
78  << " wanted size:" << wantedSize
79  << Foam::endl;
80  }
81 
82 
83  // If the buffer size is not specified, probe the incoming message
84  // and set it
85  if (!wantedSize)
86  {
87  MPI_Probe
88  (
89  fromProcNo_,
90  tag_,
92  &status
93  );
94  MPI_Get_count(&status, MPI_BYTE, &messageSize_);
95 
96  externalBuf_.setCapacity(messageSize_);
97  wantedSize = messageSize_;
98 
99  if (debug)
100  {
101  Pout<< "UIPstream::UIPstream : probed size:" << wantedSize
102  << Foam::endl;
103  }
104  }
105 
106  messageSize_ = UIPstream::read
107  (
108  commsType,
109  fromProcNo_,
110  externalBuf_.begin(),
111  wantedSize,
112  tag_,
113  comm_
114  );
115 
116  // Set addressed size. Leave actual allocated memory intact.
117  externalBuf_.setSize(messageSize_);
118 
119  if (!messageSize_)
120  {
121  setEof();
122  }
123  }
124 }
125 
126 
127 Foam::UIPstream::UIPstream(const int fromProcNo, PstreamBuffers& buffers)
128 :
129  UPstream(buffers.commsType_),
130  Istream(buffers.format_, buffers.version_),
131  fromProcNo_(fromProcNo),
132  externalBuf_(buffers.recvBuf_[fromProcNo]),
133  externalBufPosition_(buffers.recvBufPos_[fromProcNo]),
134  tag_(buffers.tag_),
135  comm_(buffers.comm_),
136  clearAtEnd_(true),
137  messageSize_(0)
138 {
139  if
140  (
141  commsType() != UPstream::commsTypes::scheduled
142  && !buffers.finishedSendsCalled_
143  )
144  {
146  << "PstreamBuffers::finishedSends() never called." << endl
147  << "Please call PstreamBuffers::finishedSends() after doing"
148  << " all your sends (using UOPstream) and before doing any"
149  << " receives (using UIPstream)" << Foam::exit(FatalError);
150  }
151 
152  setOpened();
153  setGood();
154 
155  if (commsType() == commsTypes::nonBlocking)
156  {
157  // Message is already received into externalBuf
158  messageSize_ = buffers.recvBuf_[fromProcNo].size();
159 
160  if (debug)
161  {
162  Pout<< "UIPstream::UIPstream PstreamBuffers :"
163  << " fromProcNo:" << fromProcNo
164  << " tag:" << tag_ << " comm:" << comm_
165  << " receive buffer size:" << messageSize_
166  << Foam::endl;
167  }
168  }
169  else
170  {
171  MPI_Status status;
172 
173  label wantedSize = externalBuf_.capacity();
174 
175  if (debug)
176  {
177  Pout<< "UIPstream::UIPstream PstreamBuffers :"
178  << " read from:" << fromProcNo
179  << " tag:" << tag_ << " comm:" << comm_
180  << " wanted size:" << wantedSize
181  << Foam::endl;
182  }
183 
184  // If the buffer size is not specified, probe the incoming message
185  // and set it
186  if (!wantedSize)
187  {
188  MPI_Probe
189  (
190  fromProcNo_,
191  tag_,
193  &status
194  );
195  MPI_Get_count(&status, MPI_BYTE, &messageSize_);
196 
197  externalBuf_.setCapacity(messageSize_);
198  wantedSize = messageSize_;
199 
200  if (debug)
201  {
202  Pout<< "UIPstream::UIPstream PstreamBuffers : probed size:"
203  << wantedSize << Foam::endl;
204  }
205  }
206 
207  messageSize_ = UIPstream::read
208  (
209  commsType(),
210  fromProcNo_,
211  externalBuf_.begin(),
212  wantedSize,
213  tag_,
214  comm_
215  );
216 
217  // Set addressed size. Leave actual allocated memory intact.
218  externalBuf_.setSize(messageSize_);
219 
220  if (!messageSize_)
221  {
222  setEof();
223  }
224  }
225 }
226 
227 
228 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
229 
231 (
232  const commsTypes commsType,
233  const int fromProcNo,
234  char* buf,
235  const std::streamsize bufSize,
236  const int tag,
237  const label communicator
238 )
239 {
240  if (debug)
241  {
242  Pout<< "UIPstream::read : starting read from:" << fromProcNo
243  << " tag:" << tag << " comm:" << communicator
244  << " wanted size:" << label(bufSize)
245  << " commsType:" << UPstream::commsTypeNames[commsType]
246  << Foam::endl;
247  }
248  if (UPstream::warnComm != -1 && communicator != UPstream::warnComm)
249  {
250  Pout<< "UIPstream::read : starting read from:" << fromProcNo
251  << " tag:" << tag << " comm:" << communicator
252  << " wanted size:" << label(bufSize)
253  << " commsType:" << UPstream::commsTypeNames[commsType]
254  << " warnComm:" << UPstream::warnComm
255  << Foam::endl;
257  }
258 
260  {
261  MPI_Status status;
262 
263  if
264  (
265  MPI_Recv
266  (
267  buf,
268  bufSize,
269  MPI_BYTE,
270  fromProcNo,
271  tag,
272  PstreamGlobals::MPICommunicators_[communicator],
273  &status
274  )
275  )
276  {
278  << "MPI_Recv cannot receive incoming message"
280 
281  return 0;
282  }
283 
284 
285  // Check size of message read
286 
287  int messageSize;
288  MPI_Get_count(&status, MPI_BYTE, &messageSize);
289 
290  if (debug)
291  {
292  Pout<< "UIPstream::read : finished read from:" << fromProcNo
293  << " tag:" << tag << " read size:" << label(bufSize)
294  << " commsType:" << UPstream::commsTypeNames[commsType]
295  << Foam::endl;
296  }
297 
298  if (messageSize > bufSize)
299  {
301  << "buffer (" << label(bufSize)
302  << ") not large enough for incoming message ("
303  << messageSize << ')'
305  }
306 
307  return messageSize;
308  }
310  {
311  MPI_Request request;
312 
313  if
314  (
315  MPI_Irecv
316  (
317  buf,
318  bufSize,
319  MPI_BYTE,
320  fromProcNo,
321  tag,
322  PstreamGlobals::MPICommunicators_[communicator],
323  &request
324  )
325  )
326  {
328  << "MPI_Recv cannot start non-blocking receive"
330 
331  return 0;
332  }
333 
334  if (debug)
335  {
336  Pout<< "UIPstream::read : started read from:" << fromProcNo
337  << " tag:" << tag << " read size:" << label(bufSize)
338  << " commsType:" << UPstream::commsTypeNames[commsType]
339  << " request:" << PstreamGlobals::outstandingRequests_.size()
340  << Foam::endl;
341  }
342 
343  PstreamGlobals::outstandingRequests_.append(request);
344 
345  // Assume the message is completely received.
346  return bufSize;
347  }
348  else
349  {
351  << "Unsupported communications type "
352  << int(commsType)
354 
355  return 0;
356  }
357 }
358 
359 
360 // ************************************************************************* //
Useful combination of include files which define Sin, Sout and Serr and the use of IO streams general...
static label read(const commsTypes commsType, const int fromProcNo, char *buf, const std::streamsize bufSize, const int tag=UPstream::msgType(), const label communicator=0)
Read into given buffer from given processor and return the.
Definition: UIPread.C:80
UIPstream(const commsTypes commsType, const int fromProcNo, DynamicList< char > &externalBuf, label &externalBufPosition, const int tag=UPstream::msgType(), const label comm=UPstream::worldComm, const bool clearAtEnd=false, const streamFormat format=BINARY, const versionNumber version=currentVersion, const bool global=false)
Construct given process index to read from and optional buffer size,.
Definition: UIPread.C:34
static label warnComm
Debugging: warn for use of any communicator differing from warnComm.
Definition: UPstream.H:281
static const NamedEnum< commsTypes, 3 > commsTypeNames
Definition: UPstream.H:71
commsTypes commsType() const
Get the communications type of the stream.
Definition: UPstream.H:483
static void printStack(Ostream &)
Helper function to print a stack.
#define FatalErrorInFunction
Report an error message using Foam::FatalError.
Definition: error.H:334
DynamicList< MPI_Comm > MPICommunicators_
DynamicList< MPI_Request > outstandingRequests_
void read(Istream &, label &, const dictionary &)
In-place read with dictionary lookup.
errorManipArg< error, int > exit(error &err, const int errNo=1)
Definition: errorManip.H:124
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:258
errorManip< error > abort(error &err)
Definition: errorManip.H:131
prefixOSstream Pout(cout, "Pout")
Definition: IOstreams.H:53
error FatalError
word format(conversionProperties.lookup("format"))