source: trunk/experimental/mtgl/stream_counter.hpp @ 3801

Revision 3801, 20.4 KB checked in by elgoodm, 2 years ago (diff)

New method with meta programming.

Line 
1
2/****************************************************************************/
3/*! \file stream_counter.hpp
4
5    \brief A bunch of methods that count how many streams are avaiable.  All
6           of the methods use for all streams construct and return the number
7           of streams the loop experienced.  The methods use #pragma mta no
8           inline in order to assure that the number of streams requested
9           is not contaminated by merging with other loops.  These
10           methods are useful for the following pattern:
11
12           1) First loop declares it wants x number of streams
13           2) Second loop declares it wants x + delta number of streams, where
14              delta is positive.  However, the second loop only uses x
15              streams.
16           3) Third loop does the same as the second loop.
17
18           This pattern gives some guarentees that the number of streams
19           used in the second and third loops are the same, because you
20           use delta less than requested, handling the case when you request
21           x streams but get something less than requested.
22
23    \author Eric Goodman (elgoodm@sandia.gov)
24
25    \date 12/21/2010
26*/
27/****************************************************************************/
28
29#ifndef MTGL_STREAM_COUNTER_HPP
30#define MTGL_STREAM_COUNTER_HPP
31
32#include <math.h>
33#include <mtgl/util.hpp>
34#include <mtgl/partitioning.hpp>
35#ifdef _OPENMP
36#include <omp.h>
37#endif
38
39namespace mtgl {
40
41#ifdef __MTA__
42
43#pragma mta no inline
44template <int num>
45static unsigned long stream_count() {
46  unsigned long stream_id = 0;
47  unsigned long num_streams = 1;
48  unsigned long rvalue;
49
50  #pragma mta use 1 streams
51  #pragma mta for all streams stream_id of num_streams
52  {
53    if (stream_id == 0) {
54      rvalue = num_streams;
55    }
56  }
57 
58  rvalue = rvalue * num;
59
60  return rvalue;
61}
62
63
64#endif
65
66#pragma mta no inline
67unsigned long stream_count100() {
68  unsigned long stream_id = 0;
69  unsigned long num_streams = 1;
70  unsigned long rvalue;
71
72  #pragma mta use 100 streams
73  #pragma mta for all streams stream_id of num_streams
74  {
75    if (stream_id == 0) {
76      rvalue = num_streams;
77    }
78  }
79
80  return rvalue;
81}
82
83#pragma mta no inline
84unsigned long stream_count90() {
85  unsigned long stream_id = 0;
86  unsigned long num_streams = 1;
87  unsigned long rvalue;
88
89  #pragma mta use 90 streams
90  #pragma mta for all streams stream_id of num_streams
91  {
92    if (stream_id == 0) {
93      rvalue = num_streams;
94    }
95  }
96
97  return rvalue;
98}
99
100
101
102#pragma mta no inline
103unsigned long stream_count80() {
104  unsigned long stream_id = 0;
105  unsigned long num_streams = 1;
106  unsigned long rvalue;
107
108  #pragma mta use 80 streams
109  #pragma mta for all streams stream_id of num_streams
110  {
111    if (stream_id == 0) {
112      rvalue = num_streams;
113    }
114  }
115
116  return rvalue;
117}
118
119#pragma mta no inline
120unsigned long stream_count70() {
121  unsigned long stream_id = 0;
122  unsigned long num_streams = 1;
123  unsigned long rvalue;
124
125  #pragma mta use 70 streams
126  #pragma mta for all streams stream_id of num_streams
127  {
128    if (stream_id == 0) {
129      rvalue = num_streams;
130    }
131  }
132
133  return rvalue;
134}
135
136#pragma mta no inline
137unsigned long stream_count60() {
138  unsigned long stream_id = 0;
139  unsigned long num_streams = 1;
140  unsigned long rvalue;
141
142  #pragma mta use 60 streams
143  #pragma mta for all streams stream_id of num_streams
144  {
145    if (stream_id == 0) {
146      rvalue = num_streams;
147    }
148  }
149
150  return rvalue;
151}
152
153/*!
154  Can't count on linear recurrence working automatically on non-XMT
155  systems so this does it explicitly.  Right now it only does addition
156  with a step of 1, i.e. array[i] = array[i] + array[i-1].  It should
157  be fairly straightforward to generalize this.
158
159  In tests, performance of the manual linear recurrence is about half
160  that of the XMT compiler-created linear recurrence.
161
162  \date 5/5/2011
163  \author Eric Goodman (elgoodm@sandia.gov)
164 */
165template<typename array_type, typename size_type>
166void linear_recurrence_manual(array_type* array, size_type num)
167{
168  #ifdef DEBUG
169  printf("\t\tDEBUG Entering linear recurrence\n");
170  #endif
171  #ifdef MTGL_TIMING2
172  mt_timer timer;
173  timer.start();
174  #endif
175  size_type num_streams = 1;
176  size_type stream_id = 0;
177  size_type record_num_streams = 1;
178
179  #ifdef _OPENMP
180  #pragma omp parallel
181  {
182    if (omp_get_thread_num() == 0) {
183      record_num_streams = omp_get_num_threads();
184    }
185  }
186  #elif __MTA__
187  record_num_streams = stream_count80();
188  #endif
189
190  long ends[record_num_streams];
191  long ends2[record_num_streams];
192
193  /// This first loop goes through and does a local linear recurrence
194  /// on a chunk of the array
195  #ifdef _OPENMP
196  #pragma omp parallel
197  #endif
198  #pragma mta for all streams stream_id of num_streams
199  #pragma mta use 95 streams
200  {
201    #ifdef _OPENMP
202    size_type stream_id = omp_get_thread_num();
203    size_type num_streams = omp_get_num_threads();
204    #endif
205    if (num_streams < record_num_streams) {
206      printf("ERROR: Not enough streams, %lu < %lu\n", num_streams,
207             record_num_streams);
208    }
209   
210    if (stream_id < record_num_streams) {
211      size_type beg = begin_block_range(num, stream_id, record_num_streams);
212      size_type end = end_block_range(num, stream_id, record_num_streams);
213
214      if (beg < end) {
215        ends2[stream_id] = end - 1;
216
217        for (size_type i = beg + 1; i < end; i++) {
218           array[i] = array[i] + array[i-1];
219        }
220      } else {
221        ends2[stream_id] = -1;
222      }
223    }
224  }
225  #pragma mta fence 
226  #ifdef MTGL_TIMING2
227  record_time(timer, "\t\tlinear_recurrence: Time for local application:\t");
228  timer.start();
229  #endif
230 
231  /// Calculate the offsets for each block sequentially (or in parallel
232  /// on the XMT if the compiler decides to do it.
233  if (ends2[0] > -1) {
234    ends[0] = array[ends2[0]];
235  } else {
236    ends[0] = 0;
237  }
238  for (size_type i = 1; i < record_num_streams; i++) {
239    if (ends2[i] > -1) {
240    ends[i] = array[ends2[i]] + ends[i-1];
241    } else {
242      ends[i] = ends[i - 1];
243    }
244  }
245  #ifdef MTGL_TIMING2
246  record_time(timer, "\t\tlinear_recurrence: Time for calculating offsets:\t");
247  timer.start();
248  #endif
249 
250  /// This loop goes through and adds the offsets to each block.
251  #ifdef _OPENMP
252  #pragma omp parallel
253  #endif
254  #pragma mta for all streams stream_id of num_streams
255  #pragma mta use 95 streams
256  {
257    #ifdef _OPENMP
258    size_type stream_id = omp_get_thread_num();
259    #endif
260    if (stream_id < record_num_streams) {
261      size_type beg = begin_block_range(num, stream_id, record_num_streams);
262      size_type end = end_block_range(num, stream_id, record_num_streams);
263       
264      array_type offset = 0;
265      if (stream_id > 0) {
266        offset = ends[stream_id - 1];
267      }
268      for (size_type i = beg; i < end; i++) {
269         array[i] = array[i] + offset;
270      }
271    }
272  }
273  #pragma mta fence 
274  #ifdef MTGL_TIMING2
275  record_time(timer, "\t\tlinear_recurrence: Time for local application:\t");
276  timer.start();
277  #endif
278}
279
280/*!
281  Takes a graph and a list of nodes and load balances the total set of
282  in-going edges from the nodes equally across record_num_streams number
283  of threads.  The start_vertex array holds the vertex index where the
284  stream should start.  The start_edge array holds the position of which
285  edge the stream should start at.
286
287  \param g1 The graph that the list of vertices refers to.
288  \param nodes The list of vertices.
289  \param num_nodes The length of the list of vertices.
290  \param record_num_streams The number of streams actually being used.
291  \param start_vertex An array that stores the starting vertex for each stream.
292  \param start_edge An array that stores the starting edge for each stream.
293
294  \date 5/5/2011
295  \author Eric Goodman (elgoodm@sandia.gov)
296 */
297template<typename graph_t>
298void load_balance_in_edges(graph_t& g1,
299          typename graph_traits<graph_t>::vertex_descriptor* nodes,
300          typename graph_traits<graph_t>::size_type num_nodes,
301          unsigned long record_num_streams,
302          unsigned long* start_vertex,
303          unsigned long* start_edge,
304          typename graph_traits<graph_t>::size_type* indegrees)
305{
306
307  #ifdef MTGL_TIMING1
308  mt_timer timer;
309  timer.start();
310  #endif
311  typedef typename graph_traits<graph_t>::size_type size_type;
312
313  size_type num_streams = 1;
314  size_type stream_id = 0;
315
316  #ifdef _OPENMP
317  #pragma omp parallel
318  #endif
319  #pragma mta for all streams stream_id of num_streams
320  #pragma mta use 100 streams
321  {
322    #ifdef _OPENMP
323    size_type stream_id = omp_get_thread_num();
324    #endif
325    if (stream_id < record_num_streams) {
326      size_type beg, end;
327      beg = begin_block_range(num_nodes, stream_id, record_num_streams);
328      end = end_block_range(num_nodes, stream_id, record_num_streams);
329
330      for(size_type i = beg; i < end; i++) {
331        indegrees[i] = in_degree(nodes[i], g1);
332      }
333    }
334  }
335  #pragma mta fence
336  #ifdef MTGL_TIMING1
337  record_time(timer, "\t\tload_balance_in_edges: Time for getting in "
338                     "degree:\t");
339  timer.start();
340  #endif
341
342  linear_recurrence(indegrees, num_nodes);
343  #ifdef MTGL_TIMING1
344  record_time(timer, "\t\tload_balance_in_edges: Time for linear recurrence "
345                     "on in degree:\t");
346  timer.start();
347  #endif
348
349  #ifdef _OPENMP
350  #pragma omp parallel
351  #endif
352  #pragma mta for all streams stream_id of num_streams
353  #pragma mta use 100 streams
354  {
355    #ifdef _OPENMP
356    size_type stream_id = omp_get_thread_num();
357    #endif
358    if (stream_id < record_num_streams) {
359
360      //Calculate the block size for a perfectly loaded balanced partition
361      //of the work.  We have to work along integer boundaries, but
362      //we need float precision so we can divy up the work into blocks of
363      //size floor(block_size) to ceil(block_size), depending on the
364      //stream id.
365      double block_size = static_cast<double>(indegrees[num_nodes - 1]) /
366                            record_num_streams;
367     
368      size_type beg, end;
369      beg = begin_block_range(num_nodes, stream_id, record_num_streams);
370      end = end_block_range(num_nodes, stream_id, record_num_streams);
371
372      if (beg < end) {
373        size_type candidate;
374        size_type last_global_edge;
375        size_type last_vertex;
376        if (end == num_nodes)
377        {
378          last_global_edge = indegrees[end - 1];
379          last_vertex = end;
380        } else {
381          last_global_edge = indegrees[end];
382          last_vertex = end + 1;
383        }
384        size_type current_vertex = beg;
385       
386        if (beg == 0) {
387          candidate = 0;
388        } else {
389          candidate = ceil(static_cast<double>(indegrees[beg-1]) / block_size);
390        }
391
392        size_type current_edge = candidate * block_size;
393        size_type vertex_last_edge = indegrees[current_vertex];
394
395        while (current_edge < last_global_edge &&
396               candidate < record_num_streams)
397        {
398          while (current_edge >= vertex_last_edge &&
399            current_vertex < num_nodes)
400          {
401            current_vertex++;
402            vertex_last_edge = indegrees[current_vertex];
403          }
404
405          if (current_edge < last_global_edge &&
406              current_vertex < num_nodes)
407          {
408            size_type d = 0;
409            if (current_vertex > 0) {
410              d = indegrees[current_vertex - 1];
411            }
412#ifdef __MTA__
413            //Don't know why we need this.  If not here, the XMT freezes.
414            mt_writexf(start_vertex[candidate], current_vertex);
415            mt_writexf(start_edge[candidate], current_edge - d);
416#else
417            start_vertex[candidate] = current_vertex;
418            start_edge[candidate] = current_edge -d;
419#endif
420          }
421
422          candidate = candidate + 1;
423          current_edge = candidate * block_size;
424        }
425      }
426    }
427  }
428  #pragma mta fence
429  #ifdef MTGL_TIMING1
430  record_time(timer, "\t\tload_balance_in_edges: Time for balance:\t");
431  timer.start();
432  #endif
433
434}
435
436template <typename graph_t>
437void load_balance_out_edges_by_blocks(
438      graph_t& g1,
439      typename graph_traits<graph_t>::vertex_descriptor* nodes,
440      typename graph_traits<graph_t>::size_type num_nodes,
441      unsigned long* start_vertex,
442      unsigned long* start_edge,
443      typename graph_traits<graph_t>::size_type* odegrees,
444      unsigned long& num_blocks,
445      unsigned long block_size = 1024)
446{
447
448  #ifdef MTGL_TIMING1
449  mt_timer timer;
450  timer.start();
451  #endif
452  typedef typename graph_traits<graph_t>::size_type size_type;
453
454  size_type num_streams = 1;
455  size_type stream_id = 0;
456
457  #ifdef _OPENMP
458  #pragma omp parallel
459  #endif
460  #pragma mta for all streams stream_id of num_streams
461  #pragma mta use 100 streams
462  {
463    #ifdef _OPENMP
464    size_type stream_id = omp_get_thread_num();
465    size_type num_streams = omp_get_num_threads();
466    #endif
467    size_type beg, end;
468    beg = begin_block_range(num_nodes, stream_id, num_streams);
469    end = end_block_range(num_nodes, stream_id, num_streams);
470
471    for(size_type i = beg; i < end; i++) {
472      odegrees[i] = out_degree(nodes[i], g1);
473    }
474  }
475  #pragma mta fence
476  #ifdef MTGL_TIMING1
477  record_time(timer, "\t\tload_balance_out_edges_by_block: "
478                     "Time for getting out "
479                     "degree:\t");
480  timer.start();
481  #endif
482
483  linear_recurrence(odegrees, num_nodes);
484  #ifdef MTGL_TIMING1
485  record_time(timer, "\t\tload_balance_out_edges_by_block: "
486                     "Time for linear recurrence "
487                     "on out degree:\t");
488  timer.start();
489  #endif
490
491  num_blocks = ceil(static_cast<double>(odegrees[num_nodes - 1]) / block_size);
492
493  #ifdef _OPENMP
494  #pragma omp parallel
495  #endif
496  #pragma mta for all streams stream_id of num_streams
497  #pragma mta use 100 streams
498  {
499    #ifdef _OPENMP
500    size_type stream_id = omp_get_thread_num();
501    size_type num_streams = omp_get_num_threads();
502    #endif
503
504    size_type beg, end;
505    beg = begin_block_range(num_nodes, stream_id, num_streams);
506    end = end_block_range(num_nodes, stream_id, num_streams);
507
508    if (beg < end) {
509      size_type candidate;
510      size_type last_global_edge;
511      size_type last_vertex;
512      if (end == num_nodes)
513      {
514        last_global_edge = odegrees[end - 1];
515        last_vertex = end;
516      } else {
517        last_global_edge = odegrees[end];
518        last_vertex = end + 1;
519      }
520      size_type current_vertex = beg;
521     
522      if (beg == 0) {
523        candidate = 0;
524      } else {
525        candidate = ceil(static_cast<double>(odegrees[beg-1]) / block_size);
526      }
527
528      size_type current_edge = candidate * block_size;
529      size_type vertex_last_edge = odegrees[current_vertex];
530
531      while (current_edge < last_global_edge &&
532             candidate < num_blocks)
533      {
534        while (current_edge >= vertex_last_edge &&
535          current_vertex < num_nodes)
536        {
537          current_vertex++;
538          vertex_last_edge = odegrees[current_vertex];
539        }
540
541        if (current_edge < last_global_edge &&
542            current_vertex < num_nodes)
543        {
544          size_type d = 0;
545          if (current_vertex > 0) {
546            d = odegrees[current_vertex - 1];
547          }
548#ifdef __MTA__
549          //Don't know why we need this.  If not here, the XMT freezes.
550          mt_writexf(start_vertex[candidate], current_vertex);
551          mt_writexf(start_edge[candidate], current_edge - d);
552#else
553          start_vertex[candidate] = current_vertex;
554          start_edge[candidate] = current_edge -d;
555#endif
556        }
557       
558        candidate = candidate + 1;
559        current_edge = candidate * block_size;
560      }
561    }
562  }
563  #pragma mta fence
564  #ifdef MTGL_TIMING1
565  record_time(timer, "\t\tload_balance_out_edges_by_block: "
566                     "Time for balance:\t");
567  timer.start();
568  #endif
569
570}
571
572
573/*!
574  Takes a graph and a list of nodes and load balances the total set of
575  out-going edges from the nodes equally across record_num_streams number
576  of threads.  The start_vertex array holds the vertex index where the
577  stream should start.  The start_edge array holds the position of which
578  edge the stream should start at.
579
580  \param g1 The graph that the list of vertices refers to.
581  \param nodes The list of vertices.
582  \param num_nodes The length of the list of vertices.
583  \param record_num_streams The number of streams actually being used.
584  \param start_vertex An array that stores the starting vertex for each stream.
585  \param start_edge An array that stores the starting edge for each stream.
586
587  \date 5/5/2011
588  \author Eric Goodman (elgoodm@sandia.gov)
589 */
590template<typename graph_t>
591void load_balance_out_edges(graph_t& g1,
592          typename graph_traits<graph_t>::vertex_descriptor* nodes,
593          typename graph_traits<graph_t>::size_type num_nodes,
594          unsigned long record_num_streams,
595          unsigned long* start_vertex,
596          unsigned long* start_edge,
597          typename graph_traits<graph_t>::size_type* odegrees)
598{
599
600  #ifdef MTGL_TIMING1
601  mt_timer timer;
602  timer.start();
603  #endif
604  typedef typename graph_traits<graph_t>::size_type size_type;
605
606  size_type num_streams = 1;
607  size_type stream_id = 0;
608
609  #ifdef _OPENMP
610  #pragma omp parallel
611  #endif
612  #pragma mta for all streams stream_id of num_streams
613  #pragma mta use 100 streams
614  {
615    #ifdef _OPENMP
616    size_type stream_id = omp_get_thread_num();
617    #endif
618    if (stream_id < record_num_streams) {
619      size_type beg, end;
620      beg = begin_block_range(num_nodes, stream_id, record_num_streams);
621      end = end_block_range(num_nodes, stream_id, record_num_streams);
622
623      for(size_type i = beg; i < end; i++) {
624        odegrees[i] = out_degree(nodes[i], g1);
625      }
626    }
627  }
628  #pragma mta fence
629  #ifdef MTGL_TIMING1
630  record_time(timer, "\t\tload_balance_out_edges: Time for getting out "
631                     "degree:\t");
632  timer.start();
633  #endif
634
635  linear_recurrence(odegrees, num_nodes);
636  #ifdef MTGL_TIMING1
637  record_time(timer, "\t\tload_balance_out_edges: Time for linear recurrence "
638                     "on out degree:\t");
639  timer.start();
640  #endif
641  #ifdef DEBUG
642  printf("\t\tDEBUG load_balance_out_edges odegrees[%lu] %lu\n",
643         num_nodes -1, odegrees[num_nodes-1]);
644  #endif
645
646  #ifdef _OPENMP
647  #pragma omp parallel
648  #endif
649  #pragma mta for all streams stream_id of num_streams
650  #pragma mta use 100 streams
651  {
652    #ifdef _OPENMP
653    size_type stream_id = omp_get_thread_num();
654    #endif
655    if (stream_id < record_num_streams) {
656
657      //Calculate the block size for a perfectly loaded balanced partition
658      //of the work.  We have to work along integer boundaries, but
659      //we need float precision so we can divy up the work into blocks of
660      //size floor(block_size) to ceil(block_size), depending on the
661      //stream id.
662      double block_size = static_cast<double>(odegrees[num_nodes - 1]) /
663                            record_num_streams;
664     
665      size_type beg, end;
666      beg = begin_block_range(num_nodes, stream_id, record_num_streams);
667      end = end_block_range(num_nodes, stream_id, record_num_streams);
668
669      if (beg < end) {
670        size_type candidate;
671        size_type last_global_edge;
672        size_type last_vertex;
673        if (end == num_nodes)
674        {
675          last_global_edge = odegrees[end - 1];
676          last_vertex = end;
677        } else {
678          last_global_edge = odegrees[end];
679          last_vertex = end + 1;
680        }
681        size_type current_vertex = beg;
682       
683        if (beg == 0) {
684          candidate = 0;
685        } else {
686          candidate = ceil(static_cast<double>(odegrees[beg-1]) / block_size);
687        }
688
689        size_type current_edge = candidate * block_size;
690        size_type vertex_last_edge = odegrees[current_vertex];
691
692        while (current_edge < last_global_edge &&
693               candidate < record_num_streams)
694        {
695          while (current_edge >= vertex_last_edge &&
696            current_vertex < num_nodes)
697          {
698            current_vertex++;
699            vertex_last_edge = odegrees[current_vertex];
700          }
701
702          if (current_edge < last_global_edge &&
703              current_vertex < num_nodes)
704          {
705            size_type d = 0;
706            if (current_vertex > 0) {
707              d = odegrees[current_vertex - 1];
708            }
709#ifdef __MTA__
710            //Don't know why we need this.  If not here, the XMT freezes.
711            mt_writexf(start_vertex[candidate], current_vertex);
712            mt_writexf(start_edge[candidate], current_edge - d);
713#else
714            start_vertex[candidate] = current_vertex;
715            start_edge[candidate] = current_edge -d;
716#endif
717          }
718         
719          candidate = candidate + 1;
720          current_edge = candidate * block_size;
721        }
722      }
723    }
724  }
725  #pragma mta fence
726  #ifdef MTGL_TIMING1
727  record_time(timer, "\t\tload_balance_out_edges: Time for balance:\t");
728  timer.start();
729  #endif
730
731}
732
733}
734
735#endif
Note: See TracBrowser for help on using the repository browser.