ParSortableList.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | Copyright (C) 2011-2013 OpenFOAM Foundation
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include "ParSortableList.H"
27 #include "SortableList.H"
28 #include "Pstream.H"
29 #include "ListListOps.H"
30 #include "PstreamReduceOps.H"
31 
32 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
33 
34 template<class Type>
36 (
37  const List<Type>& elems,
38  Ostream& os
39 ) const
40 {
41  os << '(';
42 
43  forAll(elems, elemI)
44  {
45  os << ' ' << elems[elemI];
46  }
47  os << ')';
48 }
49 
50 
51 // Copy src, starting at destI into dest.
52 template<class Type>
54 (
55  const List<Type>& values,
56  const labelList& indices,
57  const label fromProcNo,
58  label& destI,
59  List<taggedValue>& dest
60 ) const
61 {
62  forAll(values, elemI)
63  {
64  taggedValue& tagVal = dest[destI];
65 
66  tagVal.value() = values[elemI];
67  tagVal.index() = indices[elemI];
68  tagVal.procID() = fromProcNo;
69 
70  destI++;
71  }
72 }
73 
74 
75 template<class Type>
77 (
78  const List<Type>& elems,
79  List<Type>& pivots
80 ) const
81 {
82  pivots.setSize(Pstream::nProcs());
83 
84  label pivotPos = 0;
85 
86  forAll(pivots, pivotI)
87  {
88  pivots[pivotI] = elems[pivotPos];
89 
90  pivotPos += elems.size()/Pstream::nProcs();
91  }
92 }
93 
94 
95 template<class Type>
97 (
98  List<Type>& values,
99  labelList& indices,
100  const label bufSize,
101  const label destProcI
102 ) const
103 {
104  if (destProcI != Pstream::myProcNo())
105  {
106  values.setSize(bufSize);
107  indices.setSize(bufSize);
108 
109  if (debug)
110  {
111  Pout<< "Sending to " << destProcI << " elements:" << values
112  << endl;
113  }
114 
115  {
116  OPstream toSlave(Pstream::blocking, destProcI);
117  toSlave << values << indices;
118  }
119  }
120 }
121 
122 
123 // * * * * * * * * * * * * * * * * Constructors * * * * * * * * * * * * * * //
124 
125 // Construct from List, sorting the elements
126 template<class Type>
128 :
129  List<Type>(values),
130  indices_(0),
131  procs_(0)
132 {
133  sort();
134 }
135 
136 
137 // Construct given size. Sort later on.
138 template<class Type>
140 :
141  List<Type>(size),
142  indices_(0),
143  procs_(0)
144 {}
145 
146 
147 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
148 
149 // Sort
150 template<class Type>
152 {
153  //
154  // 0. Get total size of dataset.
155  //
156 
157  label n = this->size();
158 
159  reduce(n, sumOp<label>());
160 
161 
162  // 1. Sort list locally
163  SortableList<Type> sorted(*this);
164 
165  // Collect elements at pivot points
166  labelListList sortedGatherList(Pstream::nProcs());
167 
168  labelList& pivots = sortedGatherList[Pstream::myProcNo()];
169 
170  getPivots(sorted, pivots);
171 
172  if (debug)
173  {
174  Pout<< "pivots:";
175  write(pivots, Pout);
176  Pout<< endl;
177  }
178 
179 
180  //
181  // 2. Combine pivotlist per processor onto master, sort, get pivots.
182  //
183 
184  Pstream::gatherList(sortedGatherList);
185 
186  if (Pstream::master())
187  {
188  labelList allPivots =
189  ListListOps::combine<labelList>
190  (
191  sortedGatherList,
193  );
194 
195  SortableList<Type> sortedPivots(allPivots);
196 
197  if (debug)
198  {
199  Pout<< "allPivots:";
200  write(allPivots, Pout);
201  Pout<< endl;
202  }
203 
204  getPivots(sortedPivots, pivots);
205  }
206  Pstream::scatter(pivots);
207 
208  if (debug)
209  {
210  Pout<< "new pivots:";
211  write(pivots, Pout);
212  Pout<< endl;
213  }
214 
215 
216  //
217  // 3. Distribute pivots & distribute.
218  //
219 
220  label pivotI = 1;
221  label destProcI = 0;
222 
223  // Buffer for my own data. Keep original index together with value.
224  labelList ownValues(sorted.size());
225  labelList ownIndices(sorted.size());
226  label ownI = 0;
227 
228  // Buffer for sending data
229  labelList sendValues(sorted.size());
230  labelList sendIndices(sorted.size());
231  label sendI = 0;
232 
233  forAll(sorted, sortedI)
234  {
235  if ((pivotI < Pstream::nProcs()) && (sorted[sortedI] > pivots[pivotI]))
236  {
237  checkAndSend(sendValues, sendIndices, sendI, destProcI);
238 
239  // Reset buffer.
240  sendValues.setSize(sorted.size());
241  sendIndices.setSize(sorted.size());
242  sendI = 0;
243 
244  pivotI++;
245  destProcI++;
246  }
247 
248  if (destProcI != Pstream::myProcNo())
249  {
250  sendValues[sendI] = sorted[sortedI];
251  sendIndices[sendI] = sorted.indices()[sortedI];
252  sendI++;
253  }
254  else
255  {
256  ownValues[ownI] = sorted[sortedI];
257  ownIndices[ownI] = sorted.indices()[sortedI];
258  ownI++;
259  }
260  }
261 
262 
263  // Handle trailing send buffer
264  if (sendI != 0)
265  {
266  checkAndSend(sendValues, sendIndices, sendI, destProcI);
267  }
268 
269  // Print ownValues
270  ownValues.setSize(ownI);
271  ownIndices.setSize(ownI);
272 
273  if (debug & 2)
274  {
275  Pout<< "Not sending (to myself) elements "
276  << ownValues << endl;
277  }
278 
279  //
280  // 4. Combine pieces from all processors & sort. Use indices() from
281  // SortableList to remember source processor number.
282  //
283 
284  // Allocate receive buffer. Acc. to paper upper bound is 2*n/p
285  // (n=total size, p=nProcs). Resize later on.
286  List<taggedValue> combinedValues(2 * n/Pstream::nProcs());
287 
288  label combinedI = 0;
289 
290  for (label procI = 0; procI < Pstream::nProcs(); procI++)
291  {
292  if (procI == Pstream::myProcNo())
293  {
294  if (debug & 2)
295  {
296  Pout<< "Copying from own:" << ownValues << endl;
297  }
298 
299  // Copy ownValues,ownIndices into combined buffer
300  copyInto(ownValues, ownIndices, procI, combinedI, combinedValues);
301  }
302  else
303  {
304  labelList recValues;
305  labelList recIndices;
306 
307  {
308  if (debug)
309  {
310  Pout<< "Receiving from " << procI << endl;
311  }
312 
313  IPstream fromSlave(Pstream::blocking, procI);
314 
315  fromSlave >> recValues >> recIndices;
316 
317  if (debug & 2)
318  {
319  Pout<< "Received from " << procI
320  << " elements:" << recValues << endl;
321  }
322  }
323 
324  if (debug)
325  {
326  Pout<< "Copying starting at:" << combinedI << endl;
327  }
328  copyInto(recValues, recIndices, procI, combinedI, combinedValues);
329  }
330  }
331  combinedValues.setSize(combinedI);
332 
333  // Sort according to values
334  Foam::sort(combinedValues);
335 
336  // Copy into *this
337  this->setSize(combinedI);
338  indices_.setSize(combinedI);
339  procs_.setSize(combinedI);
340 
341  forAll(combinedValues, elemI)
342  {
343  this->operator[](elemI) = combinedValues[elemI].value();
344  indices_[elemI] = combinedValues[elemI].index();
345  procs_[elemI] = combinedValues[elemI].procID();
346  }
347 }
348 
349 
350 // ************************************************************************* //
void reduce(const List< UPstream::commsStruct > &comms, T &Value, const BinaryOp &bop, const int tag, const label comm)
const labelList & indices() const
Return the list of sorted indices. Updated every sort.
Definition: SortableList.H:90
void sort(UList< T > &)
Definition: UList.C:107
static void gatherList(const List< commsStruct > &comms, List< T > &Values, const int tag, const label comm)
Gather data but keep individual values separate.
intWM_LABEL_SIZE_t label
A label is an int32_t or int64_t as specified by the pre-processor macro WM_LABEL_SIZE.
Definition: label.H:59
void size(const label)
Override size to be inconsistent with allocated storage.
Definition: ListI.H:76
Implementation of PSRS parallel sorting routine.
label n
A 1D array of objects of type <T>, where the size of the vector is known and used for subscript bound...
Definition: HashTable.H:59
Type & operator[](const label)
Return element of UList.
void sort()
(stable) sort the list (if changed after construction time)
static label nProcs(const label communicator=0)
Number of processes in parallel run.
Definition: UPstream.H:386
void setSize(const label)
Reset size of List.
Definition: List.C:318
Ostream & endl(Ostream &os)
Add newline and flush stream.
Definition: Ostream.H:251
A list that is sorted upon construction or when explicitly requested with the sort() method...
Definition: List.H:65
#define forAll(list, i)
Definition: UList.H:421
static void scatter(const List< commsStruct > &comms, T &Value, const int tag, const label comm)
Scatter data. Distribute without modification. Reverse of gather.
static int myProcNo(const label communicator=0)
Number of this process (starting from masterNo() = 0)
Definition: UPstream.H:404
Input inter-processor communications stream.
Definition: IPstream.H:50
List< label > labelList
A List of labels.
Definition: labelList.H:56
label size() const
Return the number of elements in the UList.
static bool master(const label communicator=0)
Am I the master process.
Definition: UPstream.H:398
ParSortableList(const UList< Type > &)
Construct from List, sorting the elements.
prefixOSstream Pout(cout,"Pout")
Definition: IOstreams.H:53