Thursday, August 20, 2009

Zonesort mostly parallelized, but... it's a pyrrhic victory

Well... I have finally parallelized Zonesort (just about, I still need to do some extra work to deal with degenerate - that is, almost sorted or almost reverse-sorted input - cases properly). But there's a problem. It needs better load balancing. And I can't see a good way to do that. A single gantt chart illustrates the symptoms neatly.

This is a graph of CPU utilization (color, red is 100%, white is 0%) by time (across) by worker thread number (down), for a four thread Zonesort using 3-way merging (except for merging the last four zone lists), while it is sorting 9,395,322 distinct integers in about 0.32 seconds.

I've finally parallelized the last few merge operations (though I had to increase the space requirements by a factor of sqrt((M+1)/M) to achieve that), as well as the zone shuffling operation, but there's a load balancing problem earlier on in the sort, making - in this case, which is typical - the overall running time about 6% worse than it should be. If I could fix that it would still be considerably slower (by at least 10%) than a stock-standard 3-way mergesort on the same input. So I don't think I'll bother.

The other sorting routines I've ever parallelized never had significant per thread storage requirements, the way this one seems to. I can't share available zone lists between threads easily (and, no, serializing access to a global available zone list doesn't strike me as a good idea at all; that would mean altogether too much synchronization), so for a P-threaded sort I'm dividing the input into P pieces and sorting each piece independently. But some cores - and some threads - run faster than others, and my heroic (for which, read: stupid) assumption, that those P independent sorts would each run in about the same time, is clearly wrong.

I'm already using "barriers" in the last few merges and for the final zone shuffle (the time wasted by those barriers is visible in the gantt chart, and a barrier always means an idle thread), but at least I'm not doing any explicit thread control in the sort, and I'm not throwing away much extra memory (an important consideration when the whole point of this sorting routine is to save space compared to a standard merge sort) to co-ordinate threads running in parallel (so far I've been using up only about 200*P bytes on per-thread storage).

I'll post the parallel zoneSort once I've ironed out the bugs that stop it working for nearly-in-order or nearly-reverse-order inputs.

Saturday, July 25, 2009

Zonesort - Yet another variant of mergesort

I've been on holiday in Thailand, so I haven't been posting. While I was in Thailand, in between looking at orchids, eating fantastic seafood, and getting short-changed everywhere I went, I worked a little on a variant of mergesort ("Zoned Mergesort") that requires a storage overhead of approximately

2 * sqrt( N.R.I.M.P )

where N is the number of records, R is the size of each record, I is the size of an integer, M is the order of merge, and P is the number of concurrent threads (mergesort overhead is usually something like N.R / 2 which is a lot more, for large enough N). It's an old idea of mine: I first wrote an implementation in 2004 (though only for M=2, P=1) but shelved it because it was considerably (typically 20%) slower than a standard mergesort. But it turned out that some minor rewrites to better exploit forecasting let it compete on even terms with the performance of standard mergesort, with P=1 and M=2. So I extended it to M=3, where it did even better. But I still haven't parallelized it properly (and that will take time, because it's a fairly complicated sorting routine).

Zoned Mergesort was derived, in a round-about way, from an idea of
Alexander Kronrod (see Knuth's Art of Programming, volume 3): merging dividing arrays of N records into approximately sqrt(N) "zones" of approximately sqrt(N) records. It works like ths:
  1. Divide the input array into X zones of Y records each (where X is approximately sqrt(N*I/R/M/P))

  2. Allocate storage for M*P*Y additional records.

  3. Allocate storage for (M*P+X) integers. These will be used to maintain linked lists of sorted zones.

  4. Sort each zone, using the additional zones as a scratch area, and put each zone's index into a one element-list

  5. Start P separate lists, each of M zones, chosen from the M*P additional zones (these are "available zone" lists

  6. Assemble linked lists of sorted zones by merging and relinking (I'll explain this step in more detail later)

  7. When you get to one linked list of zones, permute the order of the zones so that the entire array is in order

The time overhead for steps 6 and 7 turns out not to be a serious problem. The linked list manipulation in step 6 has O(X) overhead, and can be pretty much ignored when N is large. The permutation in step 7 is rather more expensive but is still O(N) on average. In practice, when P=1 at least, for large N, the permutation cost is outweighed by the improved cache hit rate on writes (almost all writes end up being cache hits). The cache hit rate is better than that of a standard mergesort due to how the zone list merging works (which I'll outline for M=2):
  1. Remove a destination zone from the current thread's list of available zones

  2. Merge records from the zone at the head of the left list, and the zone at the head of the right list, into the destination zone, until either the destination zone is exhausted, or the left zone is exhausted, or the right zone is exhausted

  3. If the destination zone is exhausted, add it to the tail of the output list, and remove a zone from the available zone list, to be the current destination zone

  4. If the left hand zone is exhausted, move on the left zone list, so that the next zone in the list becomes the left input zone, and put the just-exhausted zone into the available zone list

  5. As for step 4, but replacing "left" with "right"

  6. If neither the left zone list or the right zone list is exhausted, go back to step 2

  7. Copy records from the remaining zones in the input that hasn't been exhausted, until it runs out (allocating new destination zones and releasing exhausted input zones as above).

The cache benefit comes from the fact that, during most of the zone list merging phase, the current destination zone was almost always in use, recently, as an input zone. It turns out that you may have to read X.M-X+1 records before the first input zone is exhausted, which is why there need to be M "extra" zones in the initial available zone list for each thread.

There's some extra fiddling around to take account of the fact that the last zone will usually be smaller than the others (since N is unlikely to zero modulo Y), but nothing serious.

The big problem, though, is that parallelizing the last few zone list merges is painfully difficult. I've only got part of the way through it and I think it'll take me about six hours solid work to write even a clunky fully-parallel version. No fun at all!

Friday, June 26, 2009

Daily "It Ain't So" - wikipiedia's Mergesort article

For some reason blogger has decided this was published 29-Jun-2009. Which ain't so. It's actually 03-Jan-2011 as I type this!

Today's It-Ain't-So is for wikipedia's mergesort article (as it is today, not as it might be down the track).
  • A "mergesort" using lists is not an in-place sort. Either the pointer is part of the record - in which case the list mergesort isn't sorting elements, it's modifying them, or the pointer is not part of the record, in which case mergesort isn't in-place.
    [that's how the ancient Greeks would have argued it, and they would have been right]

  • It is not quite true that "mergesorting" lists is, in performance terms, no better than heapsort. It is true, if swapping values between nodes is not allowed (for a randomly-ordered input, as the sort proceeds the order of the addresses becomes increasingly jumbled up, leading to cache misses). But, if swapping values between nodes is allowed, and the input was initially of in-address-order nodes adjacent in memory, it's possible (and not very difficult) to rewrite the nodes in the sorted sublists after every, say, 8 levels of merging, avoiding the cache-miss problem, and beating heapsort comfortably, for large n. I've got some code for doing that buried somewhere (it's about ten years old and I haven't used it in about eight years, and I don't remember if it "cheated" and used additional storage for the rewrite. Maybe it did).

  • If you can use "link fields" for mergesort (and it's still a mergesort?) then you can also use them for quicksort (and hence, quicksort can be stable).

  • Mergesorting linked lists does not require an O(1) space overhead: it requires O(logn) space to record (at each of the O(logn) merging levels) which node is the head of the first sorted sublist, while the second sublist at the same level is being sorted, unless swapping values between nodes is allowed, or adjacent sorted sublists are "taped together" and you're happy to spend O(nlogn) time finding the start of the right hand sublist before each merge begins. I've never seen either of those techniques used; all the linked list mergesorts I've ever seen had an plog2(n) space overhead (on top of the np space overhead for the linkage fields), where p is the size of a link.

  • While it's true that heapsort won't work on linked lists, there is a close analogue of heapsort that works on priority trees. As I understand it, heapsort was originally developed from a priority-tree algorithm.

  • While it's technically true to say that a self-balancing binary search tree is better than an on-line mergesort, when "the received pieces are small compared to the sorted list", there's an invalid assumption wired into that statement. Why would there be one sorted list, rather than ~O(logm) sorted sublists, where m is the number of elements that have been "fed" to the on-line sort routine so far, with the (i+1t)h sublist at most half the length of the ith? That's the explicit stack used in a straight mergesort co-routine.
And now for the discussion page that hangs off the article...
  • Where I said that Quicksort's real advantage over Mergesort comes down to the amount of record movement (July 2005), I was, well, wrong. I should have qualified that statement a lot more. I should have said that, a Quicksort with a two-handed partitioning subroutine will beat a (binary) Mergesort (even one tuned to do less boundary checks than Quicksort) if and only if element moves are significantly more expensive than element comparisons.
  • Asturneresquire (August 2007) is wrong: Quicksort does not make significantly better use of the cache than Mergesort. It might at first seem that mergesort will put more of a load on the cache because it is accessing more memory, but it isn't that simple: mergesort is accessing each address fewer times on average. The cache (on, say, an I7) easily keeps up with both Mergesort and Quicksort. It isn't even "cracking a sweat".

Thursday, June 25, 2009

Heapsort - Floyd's Trick - part 2 - Sorting Strings

An aside: I would have posted this one sooner, but I was running into an unpleasant problem due to a bug in a do-it-myself string class. Yes, the world has too many string classes - it probably already had one too many when the second one was written, perhaps even when the first was written - and mine was as buggy, or more so, than most. I know that writing my own string classes is stupid but I loathe std::string. It's soooo bloated, and with methods, not free functions. But still, that doesn't justify my writing my own string classes. Thinking it does, even temporarily, is a good example of the tu qoque phallacy. I have repented now! But the string class is already written. Someday, I'll compound my sin by posting some string classes too.

First, some preliminary comments about sorting strings. In later posts I shall be writing some posts about sort algorithms tailored specifically for sorting strings (e.g. Burstsort, in its various flavours, and B-Tree sorts), where prefix-extraction, tries, and the like, will come into play. But for now, the focus will be on Heapsorts.

Two-way mergesort is generally considered to be the reference record-comparison array sort for sorting strings. That's because, when sorting pointers to strings (which is often what happens in C++ and also, under the hood, in languages like Java), comparisons tend to be relatively expensive compared to record copies or exchanges. And fair enough too. String pointers may also be a good proxy for pointers to some more complex types and classes (when multiple members of those types/classes are being used as the sort key).

Asymptotically, a two-way top-down Mergesort with a randomly ordered input where all the values are distinct, requires N*log(N)/log(2) comparisons, on average (and the best case is closer to half that).

When sorting string pointers, with a single-threaded record comparison sort, the second most important consideration is minimizing the number of comparisons. Minimizing the number of cache misses is actually rather more important, and later on that consideration will lead to some bizarre sorting algorithms. But not yet.

I covered how Floyd's trick is actually implemented in an earlier post. As I remarked there, it reduces the asymptotic average number of comparisons required for a radix M heapsort from M.log(N)/log(M) to (M-1).log(N)/log(M). Using those formulae, and assuming that comparisons are what count, and using the performance, X, of mergesort as the reference, we can try to predict sort routine performance:

  • For Radix-2 Heapsort with Floyd's trick: 1.0*X

  • For classic Quicksort: 1/2/ln(2) ~= 0.72*X

  • For Radix-2 Heapsort without Floyd's trick: 0.5*X

  • For Radix-4 Heapsort with Floyd's trick: 2/3 ~= 0.67*X

  • For Radix-4 Heapsort without Floyd's trick: 0.5*X



But it ain't so!





As you can see, from the two graphs, merely counting comparisons doesn't lead to a good prediction for the running time. Counting comparisons and cache misses works better. Even rough approximations of the cache miss count lead to reasonably good predictions for the running time.

  • For most of the comparisons that take place during a mergesort, one of the records being compared was compared immediately before-hand. The access pattern in the array itself is predictable, and, asymptotically, half of the string look-ups will be cache hits.

  • Quicksort likewise (though Quicksort does even better than I would have
    expected, and I don't have any explanation for that yet)

  • String look-ups near enough to the top of the heap are cache hits. When the element being sifted down into the heap is being compared "on the way down" both string lookups are cache hits. Given that cache hits are what count, these comparisons are much cheaper than the others, low in the heap. That's the reason that Floyd's trick doesn't improve the performance of radix-2 heapsort much. On the other hand, the access pattern in the array itself is not predictable, so array element look-ups also result in cache misses.

Wednesday, June 24, 2009

Extra Horsepower



The graph shows the relative performance (for sorting an array of 10 million integers) for Quicksort, versus 2-way, 3-way, and 4-way mergesorts that parallelize the last (and only the last) merge operation at each level of merging.

Before parallelizing 3- and 4-way merges (yuu-uuk), I wondered... would it be worth parallelizing all the 2-way-merge operations, not just the last one at each level of merging (and using a StackDispatcher too)...? The answer turned out to be... no. It's very slightly slower (possibly because the extra synchronization overhead, plus the binary-search overhead for finding where to split up the inputs when dividing up the processing, outweighs the cache benefit, if any).

I also figured it might be a good idea to *check* that stability is preserved. Luckily that's pretty easy, with template sort routines. I declared a type, Horse, generated arrays of Horse (see below) (by setting the value members, randomly permuting, and then writing the starting index of each element into the satellite members), and checked that stability was preserved after sorting.

template<class V, class S> class ValueAndSatellite
{
public:
  V value;
  S satellite;
  ValueAndSatellite()         { }
  ValueAndSatellite(V v, S s) { value=v; satellite=s; }
  ~ValueAndSatellite()        { }
  bool operator<  (const ValueAndSatellite<V,S> &rhs) { return value<rhs.value; }
  bool operator<= (const ValueAndSatellite<V,S> &rhs) { return value<=rhs.value; }
  bool operator== (const ValueAndSatellite<V,S> &rhs) { return value==rhs.value; }
};
typedef ValueAndSatellite<int, int> Horse;

Stability was preserved, for the 2- and 3-way mergesorts, but not for the 4-way mergesort. I haven't yet figured out how the 4-way mergesort went wrong. I'll have to post a correction when I find out.

Incidentally, with an array of Horse (each Horse is twice the size of an int, but has approximately the same comparison cost), Quicksort beat two-way, three-way, and even four-way merging comfortably. Even if all of the value members of the Horses in the array were distinct, Quicksort was 11% faster than a four-way merge.






For 3- and 4-way merging I've opted not to worry so much about trying to "split evenly in the presence of large numbers of duplicates". It's too hard to think through all the cases. Instead, I... just hope the splits are about right ("on average, if there aren't too many duplicates, they'll be okay"). And, at least for sorting integers on 4 cores,

  • my 3-way mergesort beats Quicksort comfortably. But not by the 8% I had expected. Only by about 5%.

  • my 4-way mergesort beats Quicksort by about 9% (I had expected 8%). But then, it isn't stable (ouch!)



Here's my current code (less classes, functions, and so forth that have appeared in previous posts)...

template<class T> void mergeForecastBackwardsRadix2External(T *a, T* aStop, T* b, T* bStop, T* dest)
{
  if (aStop[1]<=bStop[1])
  {
    //the "b"s will run out before the "a"s.
    for (;b>bStop;*dest--=*b--)
    {
      for (;*b<*a;*dest--=*a--);
    }
    for (;a>aStop;*dest--=*a--);
  }
  else
  {
    for (;a>aStop;*dest--=*a--)
    {
      for (;*a<=*b;*dest--=*b--);
    }
    for (;b>bStop;*dest--=*b--);
  }
}


template <class T> inline int indexOfFirstElementInBlockGreaterOrEqualTo(T *block, int count, T &v)
{
  int loIndex = 0;
  int hiIndex = count;
  while (loIndex<hiIndex)
  {
    int tryIndex = ( loIndex + hiIndex ) / 2;
    if ( v <= block[tryIndex]  )
      hiIndex  = tryIndex;
    else
      loIndex = tryIndex + 1;
  }
  return hiIndex;
}

template <class T> inline int indexOfFirstElementInBlockGreaterThan(T *block, int count, T &v)
{
  int loIndex = 0;
  int hiIndex = count;
  while (loIndex<hiIndex)
  {
    int tryIndex = ( loIndex + hiIndex ) / 2;
    if ( v < block[tryIndex]  )
      hiIndex  = tryIndex;
    else
      loIndex = tryIndex + 1;
  }
  return hiIndex;
}

template <class T> void paranoidMergeRadix2External(T *a, int aCount, T* b, int bCount, T* dest, bool bLeftToRight, INotifiable *t)
{
  if (aCount==0)
  {
    for (;bCount>0;--bCount) *dest++=*b++;
  }
  else if (bCount==0)
  {
    for (;aCount>0;--aCount) *dest++=*a++;
  }
  else if (bLeftToRight)
  {
    mergeForecastRadix2External(a, a+aCount, b, b+bCount, dest);
  }
  else
  {
    mergeForecastBackwardsRadix2External(a+aCount-1, a-1, b+bCount-1, b-1, dest+aCount+bCount-1);
  }
  if (t!=NULL)
  {
    t->notify();
  }
}

template <class T> void parallelMergeRadix2External(T* a, int aCount, T* b, int bCount, T* dest, bool bLeftToRight, int processorCount, INotifiable *t)
{
  while (processorCount>1)
  {
    //Ick!  We need to split the merge into two parts.
    //We'll be kinda lazy.  We'll figure out where the median of the LHS would go in the RHS, and split that way.
    int middleLeft  = aCount/processorCount*(processorCount/2);
    int hiLeft = middleLeft+1+indexOfFirstElementInBlockGreaterThan(a+middleLeft+1, aCount-middleLeft-1, a[middleLeft]);  
      //index of first element >v, in a, or aCount if there is none
    int loRight   = indexOfFirstElementInBlockGreaterOrEqualTo(b, bCount, a[middleLeft]);
      //index of first element >=v, in b, or bCount if there is none

    //Normally: merge a[0..hiLeft-1] with b[0..loRight-1], so set... takeLeft=hiLeft.
    //But:      We want takeLeft + loRight to be roughly (aCount+bCount)/processorCount*(proessorCount/2), and takeLeft
    //      can be any value between loLeft and hiLeft.  

    int takeLeft     = hiLeft;
    int desiredSplit = (aCount+bCount)/processorCount*(processorCount/2);
    int loLeft       = -1;
    if (takeLeft + loRight > desiredSplit) //too many records will be going to the "left" merge operation
    {
      if (desiredSplit - loRight >= middleLeft)
        takeLeft = desiredSplit - loRight;    //since any value between middleLeft and hiLeft is fine for # records to take from a
      else
      {
        //otherwise: need to know the minimum number of records we could *safely* take from block "a".
        loLeft = indexOfFirstElementInBlockGreaterOrEqualTo(a, middleLeft, a[middleLeft]);
        if (desiredSplit - loRight > loLeft)
        {
          takeLeft = desiredSplit - loRight;
        }
        else
        {
          takeLeft = loLeft;
        }
      }
    }    

    parallelMergeRadix2External(a, takeLeft, b, loRight, dest, bLeftToRight, processorCount/2, t);
    a              += takeLeft;
    b              += loRight;
    aCount         -= takeLeft;
    bCount         -= loRight;
    dest           += takeLeft+loRight;
    processorCount -= processorCount/2;
  }
  gpDispatcher->queueOrDo(CallAction7(paranoidMergeRadix2External<T>, a, aCount, b, bCount, dest, bLeftToRight, t));
}

template <class T> void parallelMergesortSubArrayRadix2(T *src, T *dest, int elementCount
                            , int processorCount, int multiThreadingThreshold
                            , bool srcIsInput, bool isLastMerge, INotifiable *t)
{
  if (elementCount>multiThreadingThreshold)
  {  
    int halfCount = elementCount >> 1;
    IAction*     mergeAction;       //the merge itself
    INotifiable* pendingMerge;    //for notifying the count-down latch that prevents the merge running                

    if (isLastMerge)
    {
      INotifiable* notificationDemultiplexer = NewPendingNotification(processorCount, t);
      mergeAction = CallAction8(parallelMergeRadix2External<T>, src, halfCount, src+halfCount, elementCount-halfCount
              , dest, srcIsInput, processorCount, notificationDemultiplexer);
      pendingMerge = NewPendingAction(2, mergeAction);      

    }
    else 
    {
      if (srcIsInput)
      {
        mergeAction = CallAction5(mergeForecastRadix2External<T>,  src, src+halfCount, src+halfCount, src+elementCount, dest);
      }
      else
      {
        mergeAction = CallAction5(mergeForecastBackwardsRadix2External<T>, src+halfCount-1, src-1, src+elementCount-1, src+halfCount-1, dest+elementCount-1);
      }
      IAction* notifyingAction =  NewNotifyingAction(mergeAction,t);
      pendingMerge = NewPendingAction(2, notifyingAction);
    }        

    parallelMergesortSubArrayRadix2(dest, src, halfCount
      , processorCount, multiThreadingThreshold, !srcIsInput, false, pendingMerge);
    parallelMergesortSubArrayRadix2(dest+halfCount, src+halfCount, elementCount-halfCount
      , processorCount, multiThreadingThreshold, !srcIsInput, isLastMerge, pendingMerge);

  }
  else
  {
    gpDispatcher->queueOrDo( NewNotifyingAction( CallAction5(mergesortForecastRadix2External<T>, src, elementCount, dest, srcIsInput, 32), t));
  }
}

template <class T>   void parallelMergesort(T *base, int elementCount, int processorCount, int multiThreadingThreshold)
{
  T *workArea = new T[elementCount];
  CountDown t(1);
  gpDispatcher->queueOrDo(CallAction8(parallelMergesortSubArrayRadix2<T>,workArea, base, elementCount, processorCount, multiThreadingThreshold, false, 
    true /*set to false to turn off parallization of the last merge on each level*//, (INotifiable*)&t));
  t.wait();
  delete [] workArea;
}

template <class T> void paranoidMergeRadix3External(T *a, int aCount, T* b, int bCount, T* c, int cCount, T* dest, bool bLeftToRight, INotifiable *t)
{
  if (aCount==0) 
    paranoidMergeRadix2External(b, bCount, c, cCount, dest, bLeftToRight, NULL);
  else if (bCount==0)
    paranoidMergeRadix2External(a, aCount, c, cCount, dest, bLeftToRight, NULL);
  else if (cCount==0)
    paranoidMergeRadix2External(a, aCount, b, bCount, dest, bLeftToRight, NULL);
  else if (bLeftToRight)
    mergeRadix3FastForward(a, a+aCount, b, b+bCount, c, c+cCount, dest);
  else
    mergeRadix3FastBackward(a+aCount-1, a-1, b+bCount-1, b-1, c+cCount-1, c-1, dest+aCount+bCount+cCount-1);

  if (t!=NULL)
  {
    t->notify();
  }
}

template <class T> void parallelMergeRadix3External(T* a, int aCount, T* b, int bCount, T* c, int cCount, T* dest, bool bLeftToRight, int processorCount, INotifiable *t)
{
  while (processorCount > 1)
  {
    int aMiddle  = aCount/processorCount*(processorCount/2);
    int bMiddle  = indexOfFirstElementInBlockGreaterThan(b, bCount, a[aMiddle]);
    int cMiddle  = indexOfFirstElementInBlockGreaterThan(c, cCount, a[aMiddle]);
        aMiddle += indexOfFirstElementInBlockGreaterThan(a+aMiddle, aCount-aMiddle, a[aMiddle]);
    parallelMergeRadix3External(a, aMiddle, b, bMiddle, c, cMiddle, dest, bLeftToRight, processorCount/2, t);
    a += aMiddle; aCount -= aMiddle;
    b += bMiddle; bCount -= bMiddle;
    c += cMiddle; cCount -= cMiddle;
    dest += aMiddle + bMiddle + cMiddle;
    processorCount -= processorCount/2;
  }
  gpDispatcher->queueOrDo(CallAction9(paranoidMergeRadix3External<T>, a, aCount, b, bCount, c, cCount, dest, bLeftToRight, t));
}


template <class T> void parallelMergesortSubArrayRadix3(T *src, int elementCount, T *dest, bool isSourceInput,
                          bool isLastMerge, int processorCount, int multiThreadingThreshold,
                          INotifiable* t)
{
  if (elementCount>multiThreadingThreshold)
  {  
    int stepSize = elementCount / 3 ;
    int twoStep  = stepSize + stepSize;
    IAction *mergeAction;
    INotifiable *pendingMerge;

    if (isLastMerge)
    {
      INotifiable *notificationDemultiplexer = NewPendingNotification(processorCount, t);
      mergeAction = CallAction10(parallelMergeRadix3External<T>, src, stepSize, src+stepSize, stepSize, src+twoStep, elementCount-twoStep
              , dest, isSourceInput, processorCount, notificationDemultiplexer);
      pendingMerge = NewPendingAction(3, mergeAction);      
    }
    else 
    {
      if (isSourceInput)
      {
        mergeAction = CallAction7(voidMergeRadix3FastForward<T>,  src, src+stepSize, src+stepSize, src+stepSize+stepSize, src+stepSize+stepSize,
                  src+elementCount, dest);
      }
      else
      {
        mergeAction = CallAction7(voidMergeRadix3FastBackward<T>, src+stepSize-1, src-1, src+stepSize+stepSize-1, src+stepSize-1,
                      src+elementCount-1, src+stepSize+stepSize-1, dest+elementCount-1);
      }
      IAction* notifyingAction =  NewNotifyingAction(mergeAction,t);
      pendingMerge = NewPendingAction(3, notifyingAction);
    }

    parallelMergesortSubArrayRadix3(dest, stepSize, src
      , !isSourceInput , false,       processorCount, multiThreadingThreshold, pendingMerge);
    parallelMergesortSubArrayRadix3(dest+stepSize, stepSize, src+stepSize
      , !isSourceInput , false,       processorCount, multiThreadingThreshold, pendingMerge);
    parallelMergesortSubArrayRadix3(dest+stepSize+stepSize, elementCount-stepSize-stepSize, src+stepSize+stepSize
      , !isSourceInput , isLastMerge, processorCount, multiThreadingThreshold, pendingMerge);
  }
  else
  {
    gpDispatcher->queueOrDo( NewNotifyingAction( CallAction4(mergeSortExternalRadix3Fast<T>, src, elementCount, dest, isSourceInput), t));
  }
}


template <class T>   void parallelMergesortRadix3(T *base, int elementCount, int processorCount, int multiThreadingThreshold=50000)
{
  T *workArea = new T[elementCount];
  CountDown t(1);
  gpDispatcher->queueOrDo(CallAction8(parallelMergesortSubArrayRadix3<T>, workArea, elementCount, base, false, true, processorCount, multiThreadingThreshold, (INotifiable*)&t));
  t.wait();
  delete [] workArea;
}

template <class T> void paranoidMergeRadix4External(T *a, int aCount, T* b, int bCount, T* c, int cCount, T* d, int dCount, T* dest, bool bLeftToRight, INotifiable *t)
{
  if (aCount==0) 
    paranoidMergeRadix3External(b, bCount, c, cCount, d, dCount, dest, bLeftToRight, NULL);
  else if (bCount==0)
    paranoidMergeRadix3External(a, aCount, c, cCount, d, dCount, dest, bLeftToRight, NULL);
  else if (cCount==0)
    paranoidMergeRadix3External(a, aCount, b, bCount, d, dCount, dest, bLeftToRight, NULL);
  else if (dCount==0)
    paranoidMergeRadix3External(a, aCount, b, bCount, c, cCount, dest, bLeftToRight, NULL);
  else if (bLeftToRight)
    mergeRadix4FastForward(a, a+aCount, b, b+bCount, c, c+cCount, d, d+dCount, dest);
  else
    mergeRadix4FastBackward(a+aCount-1, a-1, b+bCount-1, b-1, c+cCount-1, c-1, d+dCount-1, d-1, dest+aCount+bCount+cCount+dCount-1);

  if (t!=NULL)
  {
    t->notify();
  }
}

template <class T> void parallelMergeRadix4External(
  T* a, int aCount, T* b, int bCount, T* c, int cCount, T* d, int dCount,
  T* dest, bool bLeftToRight, int processorCount, INotifiable *t)
{
  while (processorCount > 1)
  {
    int aMiddle  = aCount/processorCount*(processorCount/2);
    int bMiddle  = indexOfFirstElementInBlockGreaterThan(b, bCount, a[aMiddle]);
    int cMiddle  = indexOfFirstElementInBlockGreaterThan(c, cCount, a[aMiddle]);
    int dMiddle  = indexOfFirstElementInBlockGreaterThan(d, dCount, a[aMiddle]);
        aMiddle += indexOfFirstElementInBlockGreaterThan(a+aMiddle, aCount-aMiddle, a[aMiddle]);
    parallelMergeRadix4External(a, aMiddle, b, bMiddle, c, cMiddle, d, dMiddle, dest, bLeftToRight, processorCount/2, t);
    a += aMiddle; aCount -= aMiddle;
    b += bMiddle; bCount -= bMiddle;
    c += cMiddle; cCount -= cMiddle;
    d += dMiddle; dCount -= dMiddle;
    dest += aMiddle + bMiddle + cMiddle + dMiddle;
    processorCount -= processorCount/2;
  }
  gpDispatcher->queueOrDo(CallAction11(paranoidMergeRadix4External<T>, a, aCount, b, bCount, c, cCount, d, dCount, dest, bLeftToRight, t));
}

template <class T> void parallelMergesortSubArrayRadix4(T *src, int elementCount, T *dest, bool isSourceInput,
                          bool isLastMerge, int processorCount, int multiThreadingThreshold,
                          INotifiable* t)
{
  if (elementCount>multiThreadingThreshold)
  {  
    int stepSize  = elementCount >> 2 ;
    int twoStep   = stepSize + stepSize;
    int threeStep = twoStep  + stepSize;
    IAction *mergeAction;
    INotifiable *pendingMerge;

    if (isLastMerge)
    {
      INotifiable *notificationDemultiplexer = NewPendingNotification(processorCount, t);
      mergeAction = CallAction12(parallelMergeRadix4External<T>, src, stepSize, src+stepSize, stepSize
              , src+twoStep, stepSize, src+threeStep, elementCount-threeStep
              , dest, isSourceInput, processorCount, notificationDemultiplexer);
      pendingMerge = NewPendingAction(4, mergeAction);      
    }
    else 
    {
      if (isSourceInput)
      {
        mergeAction = CallAction9(voidMergeRadix4FastForward<T>,  src, src+stepSize, src+stepSize, src+twoStep, src+twoStep,
                  src+threeStep, src+threeStep, src+elementCount, dest);
      }
      else
      {
        mergeAction = CallAction9(voidMergeRadix4FastBackward<T>, src+stepSize-1, src-1, src+twoStep-1, src+stepSize-1,
                  src+threeStep-1, src+twoStep-1, src+elementCount-1, src+threeStep-1, dest+elementCount-1);
      }
      IAction* notifyingAction =  NewNotifyingAction(mergeAction,t);
      pendingMerge = NewPendingAction(4, notifyingAction);
    }

    parallelMergesortSubArrayRadix4(dest, stepSize, src
      , !isSourceInput , false,       processorCount, multiThreadingThreshold, pendingMerge);
    parallelMergesortSubArrayRadix4(dest+stepSize, stepSize, src+stepSize
      , !isSourceInput , false,       processorCount, multiThreadingThreshold, pendingMerge);
    parallelMergesortSubArrayRadix4(dest+twoStep, stepSize, src+twoStep
      , !isSourceInput , false,       processorCount, multiThreadingThreshold, pendingMerge);
    parallelMergesortSubArrayRadix4(dest+threeStep, elementCount-threeStep, src+threeStep
      , !isSourceInput , isLastMerge, processorCount, multiThreadingThreshold, pendingMerge);
  }
  else
  {
    gpDispatcher->queueOrDo( NewNotifyingAction( CallAction4(mergeSortExternalRadix4Fast<T>, src, elementCount, dest, isSourceInput), t));
  }
}

template <class T>   void parallelMergesortRadix4(T *base, int elementCount, int processorCount, int multiThreadingThreshold=50000)
{
  T *workArea = new T[elementCount];
  CountDown t(1);
  gpDispatcher->queueOrDo(CallAction8(parallelMergesortSubArrayRadix4<T>, workArea, elementCount, base, false, true, 
    processorCount, multiThreadingThreshold, (INotifiable*)&t));
  t.wait();
  delete [] workArea;
}

Tuesday, June 23, 2009

Tuning Array Mergesorts for speed - Parallel Merging

The multithreaded mergesort routines that I posted here a few days ago only multithreaded the sorting of partitions. They did not multithread the actual merge operations. I was lazy. How much did that omission cost? Well, I added some code to generate a gantt chart for each of the five threads involved in the four-core two-way mergesort. I'm not going to post the code that generates the gantt chart, or even show the chart, but the important thing that it showed was that the last action queued on the last worker thread to complete took 25% of the elapsed time (to sort 10 million records, with a multithreading threshold of 50,000, using 4 cores), because it was merging on one thread (and it wasn't just merging one level on one thread! It was merging at lots of levels).

Rewriting the algorithm so that the last merge operation at each level of merging (and *only* the last merge) can be split across multiple cores does indeed reduce the running time siginifantly (and if the merging is multithreaded, the speed-up for the overall sort, at least for sorting distinct integers, is much better. For 10 million integers, two, three and four core versions are roughly 1.98, 2.88, and 3.68 times faster, respectively, than the one-core version).

Way back in 2000, when I looked at multithreading the merge operations for two-way mergesort on a Pentium IV (which only had "a core and a half", given the limitations of the Pentium IV's hyperthreading), the performance benefit was marginal. But I should have done the calculations to predict the saving. Assuming all comparisons take equal time and that the comparisons required for the last few merge operations can be spread evenly across all the processors... the speed-up (on 4-cores) for 3- and 4-way mergesorts should be enough to make them both about 8% faster than Quicksort. Well, I don't think that can be right (Quicksort always wins!). But I'll have to code them to find out for sure.

I'm not certain the following code is correct (I haven't tested all the boundary cases yet. I think it will go wrong when hiRight==bCount), nor have I properly checked whether it even preserves stability, and I haven't written the code to split 3- and 4-way merges across multiple cores either.



namespace
{
  class PendingNotification: public INotifiable
  {
    protected:
      ThreadSafeCounter m_prerequisiteCount;
      INotifiable* m_target;
    public:
      PendingNotification(int prereqCount, INotifiable *target)
        : m_prerequisiteCount(prereqCount)
        , m_target(target)
      {
      }
      void notify()
      {
        if (--m_prerequisiteCount == 0 )
        {
          m_target->notify();
          delete this;
        }
      }
      bool notified()
      {
        return false;  //if it still exists, it hasn't been notified.
      }
      void wait()
      {
        return m_target->wait();  //it doesn't really make sense to wait on a PendingNotification
      }
  };
}

INotifiable *NewPendingNotification(int prerequisiteCount, INotifiable *target)
{
  return new PendingNotification(prerequisiteCount, target);
}

template <class T> void paranoidMergeRadix2External(T *a, int aCount, T* b, int bCount, T* dest, bool bLeftToRight, INotifiable *t)
{
  if (aCount==0)
  {
    for (;bCount>0;--bCount) *dest++=*b++;
  }
  else if (bCount==0)
  {
    for (;aCount>0;--aCount) *dest++=*a++;
  }
  else if (bLeftToRight)
  {
    mergeForecastRadix2External(a, a+aCount, b, b+bCount, dest);
  }
  else
  {
    mergeForecastBackwardsRadix2External(a+aCount-1, a-1, b+bCount-1, b-1, dest+aCount+bCount-1);
  }
  if (t!=NULL)
  {
    t->notify();
  }
}

template <class T> void parallelMergeRadix2External(T* a, int aCount, T* b, int bCount, T* dest, bool bLeftToRight, int processorCount, INotifiable *t)
{
  while (processorCount>1)
  {
    //Ick!  We need to split the merge into two parts.
    //We'll be kinda lazy.  We'll figure out where the median of the LHS would go in the RHS, and split that way.
    int middleLeft  = aCount/processorCount*(processorCount/2);
    int lowRight    = 0;
    int hiRight     = bCount;
    while (lowRight<hiRight)
    {
      int tryRight = ( lowRight + hiRight ) / 2;
      if ( a[middleLeft] <= b[tryRight]  )
        hiRight  = tryRight;
      else
        lowRight = tryRight + 1;
    }
    //At this point, we have either hiRight==bCount, or b[hiRight] < a[middleLeft] 
    while (middleLeft<aCount && a[middleLeft+1]<=b[hiRight]) middleLeft++;
    parallelMergeRadix2External(a, middleLeft+1, b, hiRight, dest, bLeftToRight, processorCount/2, t);
    a              += middleLeft+1;
    b              += hiRight;
    aCount         -= middleLeft+1;
    bCount         -= hiRight;
    dest           += middleLeft+hiRight+1;
    processorCount -= processorCount/2;
  }
  gpDispatcher->queueOrDo(CallAction7(paranoidMergeRadix2External<T>, a, aCount, b, bCount, dest, bLeftToRight, t));
}

template <class T> void parallelMergesortSubArrayRadix2(T *src, T *dest, int elementCount
                            , int processorCount, int multiThreadingThreshold
                            , bool srcIsInput, bool isLastMerge, INotifiable *t)
{
  if (elementCount>multiThreadingThreshold)
  {  
    int halfCount = elementCount >> 1;
    IAction*     mergeAction;       //the merge itself
    INotifiable* pendingMerge;    //for notifying the count-down latch that prevents the merge running                

    if (isLastMerge)
    {
      INotifiable* notificationDemultiplexer = NewPendingNotification(processorCount, t);
      mergeAction = CallAction8(parallelMergeRadix2External<T>, src, halfCount, src+halfCount, elementCount-halfCount
              , dest, srcIsInput, processorCount, notificationDemultiplexer);
      pendingMerge = NewPendingAction(2, mergeAction);      

    }
    else 
    {
      if (srcIsInput)
      {
        mergeAction = CallAction5(mergeForecastRadix2External<T>,  src, src+halfCount, src+halfCount, src+elementCount, dest);
      }
      else
      {
        mergeAction = CallAction5(mergeForecastBackwardsRadix2External<T>, src+halfCount-1, src-1, src+elementCount-1, src+halfCount-1, dest+elementCount-1);
      }
      IAction* notifyingAction =  NewNotifyingAction(mergeAction,t);
      pendingMerge = NewPendingAction(2, notifyingAction);
    }        

    parallelMergesortSubArrayRadix2(dest, src, halfCount
      , processorCount, multiThreadingThreshold, !srcIsInput, false, pendingMerge);
    parallelMergesortSubArrayRadix2(dest+halfCount, src+halfCount, elementCount-halfCount
      , processorCount, multiThreadingThreshold, !srcIsInput, isLastMerge, pendingMerge);

  }
  else
  {
    gpDispatcher->queueOrDo( NewNotifyingAction( CallAction5(mergesortForecastRadix2External<T>, src, elementCount, dest, srcIsInput, 32), t));
  }
}

template <class T>   void parallelMergesort(T *base, int elementCount, int processorCount, int multiThreadingThreshold)
{
  T *workArea = new T[elementCount];
  CountDown t(1);
  gpDispatcher->queueOrDo(CallAction8(parallelMergesortSubArrayRadix2<T>,workArea, base, elementCount, processorCount, multiThreadingThreshold, false, true, (INotifiable*)&t));
  t.wait();
  delete [] workArea;
}

Monday, June 22, 2009

Multithreading other sort routines






With all that infrastructure for writing multithreading sorts available (see my last few posts), it's not difficult to write multithreaded versions of other sorting algorithms. This post will cover multithreaded versions of combsort and shellsort. I can warn in advance that... they suck. I've graphed them running on one, two, three and four cores, against one- (and sometimes two-) core classic Quicksort. They don't do very well at all.

It's easy to see how to parallelize shellsort and combsort. Both algorithms use a series of "gaps" (or "intervals"). For each gap, g shellsort fully sorts each "slice" of elements with indexes separated by g, and combsort partially sorts each "slice". For each gap, g, each of the g separate slices can be processed independently. But they're slow. The examples I provide here could be tuned better.

You can see how they might be tuned better by looking at, for example, the fourth graph, showing a breakdown of the time spent for each combsort pass. There, it is clear that an insertion sort should be used much sooner, say when the gap reaches 17. But that wouldn't really improve things much (my own tests suggest by about 17%, for sorting 9 million integers on 4 cores). Shellsort and Combsort just do not parallelize well in practice. Shellsort in particular parallelizes poorly.

There are two versions of Combsort shown here, because the stock-standard Combsort has truly awful performance on average (and a very bad worst-case). A slightly different version that alternates between left-to-right and right-to-left scans has a markedly better average case (and a way better worst case).

I'll start with a parallel insertion sort (!), because that's how you finish off a parallel combsort or a parallel shell sort. This implementation is only worth calling if the array is "nearly" sorted, as is the case "at the tail end" of a shellSort or a combSort.

template <class T>   void parallelInsertionSort(T *base, int elementCount, int processorCount, int multiThreadingThreshold=50000
                         , INotifiable *notifyMe=NULL)
{
  CountDown task;
  if (elementCount<processorCount*64 || processorCount<2)
  {
    insertionSort(base, elementCount);
  }
  else
  {
    task += processorCount;
    for (int p=0;p<processorCount;++p)
    {
      int i = (elementCount * p     ) / processorCount;
      int j = (elementCount * (p+1) ) / processorCount;
      gpDispatcher->queueOrDo( NewNotifyingAction(CallAction2(insertionSort<T>,base+i,j-i), &task ) );
    }
    task.wait();
    for (int p=1;p<processorCount;++p)
    {
      int i = (elementCount * p     ) / processorCount;
      int k = (elementCount * (p+1) ) / processorCount;
      T v;
      //insert the elements in the range [i] through [k-1]
      //into the range [0] through [i-1], given that the
      //two ranges, [0..(i-1)] and [i..(k-1)] are 
      //independently sorted.
      for (int j=i;j<k;++j)
      {
        v = base[j];
        int h;
        for (h=j;h>0;--h)
          if (v<base[h-1])
            base[h] = base[h-1];
          else
            break;
        if (h==j)
        {
          //as soon as the element that *was* at [i-1]
          //is placed correctly, it is not necessary to
          //look at [(i+1)..(k-1)].  They're placed
          //where they should be already.
          break;
        }
        base[h] = v;
      }
    }    
  }
  if (notifyMe!=NULL)
    notifyMe->notify();
}

Now for the parallel shellsort:

template <class T> void shellSortSlices(T *base, int elementCount, int step, int firstChain, int lastChain)
{
  for (int i=firstChain+step;i<elementCount;i+=step)
  {
    int k=i + lastChain - firstChain;
    if (elementCount<=k) 
    {
      k=elementCount-1;
    }
    for (int j=i;j<=k;++j)
    {
      int h;
      T v = base[j];
      for (h=j;h>=step;h-=step)
        if (v<base[h-step]) 
          base[h]=base[h-step];
        else
          break;
      base[h] = v;
    }
  }
}

template <class T>   void parallelShellSort(T *base, int elementCount, int processorCount, int multiThreadingThreshold)
{      
  int gaps[] = { 5,13,37,97,251,631,1579,3947,9871,24677,61703,154267,385709,964283,2410711,
          6026827,15067067,37667713,94169297,235423249,588558169,1471395493 };
  int f;
  for (f=0; gaps[f]<elementCount; f++);
  f--;  
  CountDown task;
  for (;f>=0;f--)
  {
    int g = gaps[f];
    task += processorCount;
    for (int p=0; p<processorCount; p++)
    {
      int i =  p*g / processorCount;
      int j = (p+1)*g / processorCount - 1;
      gpDispatcher->queueOrDo ( NewNotifyingAction(CallAction5(shellSortSlices<T>, base, elementCount, g, i, j), &task));
    }        
    task.wait();
  }
  parallelInsertionSort(base, elementCount, processorCount, multiThreadingThreshold, &task);  
}

And the parallel combsort:


int alternatingGaps[] = {2,3,5,7,11,17,23,37,53,79,113,163,229,331,463,653,
  919,1289,1811,2539,3557,4987,6983,9781,13693,19181,26861,37607,52667,
  73751,103289,144611,202471,283463,396871,555637,777901,1089091,1524763,
  2134697,2988607,4184087,5857727,8200847,11481199,16073693,22503181,31504453,
  44106241,61748749,86448259,121027561,169438579,237214009,332099617,464939459,
  650915219,911281291,1275793807,1786111267};
  //These are primes near powers of P=1.4
  //Alternating combsort can make do with slightly fewer passes than
  //vanilla combsort, as it does better at moving elements that belong
  //low, low.  Vanilla combsort is nowhere near as good at that.
  //Vanilla combsort moves elements that belong high, high, very efficiently,
  //and "hopes the values that belong low take care of themselves".
  //But they don't.

int combSortIntervals[] = {
  2,3,5,7,11,17,23,29,37,53,71,97,127,167,223,293,383,499,653,853,
  1109,1447,1889,2459,3203,4177,5431,7069,9199,11959,15551,20219,  
  26293,34183,44449,57787,75133,97673,126989,165089,214631,279023,
  362741,471571,613049,796967,1036067,1346899,1750979,2276293,2959183,
  3846943,5001049,6501367,8451791,10987343,14283559,18568637,
  24139231,31381043,40795357,53033969,68944157,89627413,116515667,
  151470353,196911461,255984881,332780323,432614407,562398721,731118341,
  950453827,1235589917,1606266791,2088146713 };
  //These are the primes near powers of P=1.3
  //Combsort runs very slightly faster, on average, if all the 
  //intervals are prime.

template <class T> void combPassSlices(T *base, int elementCount, int step, int firstChain, int lastChain)
{
  T v;
  for (int i=firstChain+step;i<elementCount;i+=step)
  {
    int k=i + lastChain - firstChain;
    if (elementCount<=k) 
      k=elementCount-1;
    for (int j=i;j<=k;++j)
    {
      if (base[j]<base[j-step])
      {
        v = base[j];
        base[j] = base[j-step];
        base[j-step] = v;
      }
    }
  }
}

template <class T> void downwardCombPassSlices(T *base, int elementCount, int step, int firstChain, int lastChain)
{
  T v;
  int h,i,j;
  for (j=elementCount-firstChain-1;j>=step;j-=step)
  {
    h = j + firstChain - lastChain;
    if (h<step) 
    {
      h=step;
    }
    for (i=j;i>=h;--i)
    {
      if (base[i]<base[i-step])
      {
        v = base[i];
        base[i] = base[i-step];
        base[i-step] = v;
      }
    }
  }
}

template <class T>   void parallelCombSort(T *base, int elementCount, int processorCount, int multiThreadingThreshold, bool alternating)
{    
  int *gaps = combSortIntervals;
  if (alternating)
    gaps = alternatingGaps;

  int f;
  for (f=0; gaps[f]<elementCount; f++);
  f--;  
  CountDown task;
  for (;f>=0;f--)
  {
    int g = gaps[f];
    task += processorCount;
    for (int p=0; p<processorCount; p++)
    {
      int i =  p*g / processorCount;
      int j = (p+1)*g / processorCount - 1;
      if (alternating && (f&1)!=0)
        gpDispatcher->queueOrDo (NewNotifyingAction(CallAction5(downwardCombPassSlices<T>, base, elementCount, g, i, j), &task));
      else
        gpDispatcher->queueOrDo (NewNotifyingAction(CallAction5(combPassSlices<T>, base, elementCount, g, i, j), &task));
    }        
    task.wait();
  }
  parallelInsertionSort(base, elementCount, processorCount, multiThreadingThreshold);  
}