ParSortableList.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | Copyright (C) 2011-2016 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include "ParSortableList.H"
27 #include "SortableList.H"
28 #include "Pstream.H"
29 #include "ListListOps.H"
30 #include "PstreamReduceOps.H"
31 
32 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
33 
34 template<class Type>
36 (
37  const List<Type>& elems,
38  Ostream& os
39 ) const
40 {
41  os << '(';
42 
43  forAll(elems, elemI)
44  {
45  os << ' ' << elems[elemI];
46  }
47  os << ')';
48 }
49 
50 
51 template<class Type>
53 (
54  const List<Type>& values,
55  const labelList& indices,
56  const label fromProcNo,
57  label& destI,
58  List<taggedValue>& dest
59 ) const
60 {
61  forAll(values, elemI)
62  {
63  taggedValue& tagVal = dest[destI];
64 
65  tagVal.value() = values[elemI];
66  tagVal.index() = indices[elemI];
67  tagVal.procID() = fromProcNo;
68 
69  destI++;
70  }
71 }
72 
73 
74 template<class Type>
76 (
77  const List<Type>& elems,
78  List<Type>& pivots
79 ) const
80 {
81  pivots.setSize(Pstream::nProcs());
82 
83  label pivotPos = 0;
84 
85  forAll(pivots, pivotI)
86  {
87  pivots[pivotI] = elems[pivotPos];
88 
89  pivotPos += elems.size()/Pstream::nProcs();
90  }
91 }
92 
93 
94 template<class Type>
96 (
97  List<Type>& values,
98  labelList& indices,
99  const label bufSize,
100  const label destProci
101 ) const
102 {
103  if (destProci != Pstream::myProcNo())
104  {
105  values.setSize(bufSize);
106  indices.setSize(bufSize);
107 
108  if (debug)
109  {
110  Pout<< "Sending to " << destProci << " elements:" << values
111  << endl;
112  }
113 
114  {
115  OPstream toSlave(Pstream::blocking, destProci);
116  toSlave << values << indices;
117  }
118  }
119 }
120 
121 
122 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
123 
124 template<class Type>
126 :
127  List<Type>(values),
128  indices_(0),
129  procs_(0)
130 {
131  sort();
132 }
133 
134 
135 template<class Type>
137 :
138  List<Type>(size),
139  indices_(0),
140  procs_(0)
141 {}
142 
143 
144 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
145 
146 template<class Type>
148 {
149  //
150  // 0. Get total size of dataset.
151  //
152 
153  label n = this->size();
154 
155  reduce(n, sumOp<label>());
156 
157 
158  // 1. Sort list locally
159  SortableList<Type> sorted(*this);
160 
161  // Collect elements at pivot points
162  labelListList sortedGatherList(Pstream::nProcs());
163 
164  labelList& pivots = sortedGatherList[Pstream::myProcNo()];
165 
166  getPivots(sorted, pivots);
167 
168  if (debug)
169  {
170  Pout<< "pivots:";
171  write(pivots, Pout);
172  Pout<< endl;
173  }
174 
175 
176  //
177  // 2. Combine pivotlist per processor onto master, sort, get pivots.
178  //
179 
180  Pstream::gatherList(sortedGatherList);
181 
182  if (Pstream::master())
183  {
184  labelList allPivots =
185  ListListOps::combine<labelList>
186  (
187  sortedGatherList,
189  );
190 
191  SortableList<Type> sortedPivots(allPivots);
192 
193  if (debug)
194  {
195  Pout<< "allPivots:";
196  write(allPivots, Pout);
197  Pout<< endl;
198  }
199 
200  getPivots(sortedPivots, pivots);
201  }
202  Pstream::scatter(pivots);
203 
204  if (debug)
205  {
206  Pout<< "new pivots:";
207  write(pivots, Pout);
208  Pout<< endl;
209  }
210 
211 
212  //
213  // 3. Distribute pivots & distribute.
214  //
215 
216  label pivotI = 1;
217  label destProci = 0;
218 
219  // Buffer for my own data. Keep original index together with value.
220  labelList ownValues(sorted.size());
221  labelList ownIndices(sorted.size());
222  label ownI = 0;
223 
224  // Buffer for sending data
225  labelList sendValues(sorted.size());
226  labelList sendIndices(sorted.size());
227  label sendI = 0;
228 
229  forAll(sorted, sortedI)
230  {
231  if ((pivotI < Pstream::nProcs()) && (sorted[sortedI] > pivots[pivotI]))
232  {
233  checkAndSend(sendValues, sendIndices, sendI, destProci);
234 
235  // Reset buffer.
236  sendValues.setSize(sorted.size());
237  sendIndices.setSize(sorted.size());
238  sendI = 0;
239 
240  pivotI++;
241  destProci++;
242  }
243 
244  if (destProci != Pstream::myProcNo())
245  {
246  sendValues[sendI] = sorted[sortedI];
247  sendIndices[sendI] = sorted.indices()[sortedI];
248  sendI++;
249  }
250  else
251  {
252  ownValues[ownI] = sorted[sortedI];
253  ownIndices[ownI] = sorted.indices()[sortedI];
254  ownI++;
255  }
256  }
257 
258 
259  // Handle trailing send buffer
260  if (sendI != 0)
261  {
262  checkAndSend(sendValues, sendIndices, sendI, destProci);
263  }
264 
265  // Print ownValues
266  ownValues.setSize(ownI);
267  ownIndices.setSize(ownI);
268 
269  if (debug & 2)
270  {
271  Pout<< "Not sending (to myself) elements "
272  << ownValues << endl;
273  }
274 
275  //
276  // 4. Combine pieces from all processors & sort. Use indices() from
277  // SortableList to remember source processor number.
278  //
279 
280  // Allocate receive buffer. Acc. to paper upper bound is 2*n/p
281  // (n=total size, p=nProcs). Resize later on.
282  List<taggedValue> combinedValues(2 * n/Pstream::nProcs());
283 
284  label combinedI = 0;
285 
286  for (label proci = 0; proci < Pstream::nProcs(); proci++)
287  {
288  if (proci == Pstream::myProcNo())
289  {
290  if (debug & 2)
291  {
292  Pout<< "Copying from own:" << ownValues << endl;
293  }
294 
295  // Copy ownValues,ownIndices into combined buffer
296  copyInto(ownValues, ownIndices, proci, combinedI, combinedValues);
297  }
298  else
299  {
300  labelList recValues;
301  labelList recIndices;
302 
303  {
304  if (debug)
305  {
306  Pout<< "Receiving from " << proci << endl;
307  }
308 
309  IPstream fromSlave(Pstream::blocking, proci);
310 
311  fromSlave >> recValues >> recIndices;
312 
313  if (debug & 2)
314  {
315  Pout<< "Received from " << proci
316  << " elements:" << recValues << endl;
317  }
318  }
319 
320  if (debug)
321  {
322  Pout<< "Copying starting at:" << combinedI << endl;
323  }
324  copyInto(recValues, recIndices, proci, combinedI, combinedValues);
325  }
326  }
327  combinedValues.setSize(combinedI);
328 
329  // Sort according to values
330  Foam::sort(combinedValues);
331 
332  // Copy into *this
333  this->setSize(combinedI);
334  indices_.setSize(combinedI);
335  procs_.setSize(combinedI);
336 
337  forAll(combinedValues, elemI)
338  {
339  this->operator[](elemI) = combinedValues[elemI].value();
340  indices_[elemI] = combinedValues[elemI].index();
341  procs_[elemI] = combinedValues[elemI].procID();
342  }
343 }
344 
345 
346 // ************************************************************************* //
#define forAll(list, i)
Loop across all elements in list.
Definition: UList.H:428
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
Inter-processor communication reduction functions.
Type & operator[](const label)
Return element of UList.
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: HashTable.H:59
A list that is sorted upon construction or when explicitly requested with the sort() method...
Definition: List.H:68
void size(const label)
Override size to be inconsistent with allocated storage.
Definition: ListI.H:76
static int myProcNo(const label communicator=0)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:417
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:253
static bool master(const label communicator=0)
Am I the master process.
Definition: UPstream.H:411
Input inter-processor communications stream.
Definition: IPstream.H:50
void sort(UList< T > &)
Definition: UList.C:115
List< label > labelList
A List of labels.
Definition: labelList.H:56
static void scatter(const List< commsStruct > &comms, T &Value, const int tag, const label comm)
Scatter data. Distribute without modification. Reverse of gather.
prefixOSstream Pout(cout,"Pout")
Definition: IOstreams.H:53
label size() const
Return the number of elements in the UList.
void reduce(const List< UPstream::commsStruct > &comms, T &Value, const BinaryOp &bop, const int tag, const label comm)
void setSize(const label)
Reset size of List.
Definition: List.C:295
static label nProcs(const label communicator=0)
Number of processes in parallel run.
Definition: UPstream.H:399
ParSortableList(const UList< Type > &)
Construct from List, sorting the elements.
Implementation of PSRS parallel sorting routine.
label n
const labelList & indices() const
Return the list of sorted indices. Updated every sort.
Definition: SortableList.H:90
static void gatherList(const List< commsStruct > &comms, List< T > &Values, const int tag, const label comm)
Gather data but keep individual values separate.
void sort()
(stable) sort the list (if changed after construction time)