Wednesday, June 24, 2009

Extra Horsepower



The graph shows the relative performance (for sorting an array of 10 million integers) for Quicksort, versus 2-way, 3-way, and 4-way mergesorts that parallelize the last (and only the last) merge operation at each level of merging.

Before parallelizing 3- and 4-way merges (yuu-uuk), I wondered... would it be worth parallelizing all the 2-way-merge operations, not just the last one at each level of merging (and using a StackDispatcher too)...? The answer turned out to be... no. It's very slightly slower (possibly because the extra synchronization overhead, plus the binary-search overhead for finding where to split up the inputs when dividing up the processing, outweighs the cache benefit, if any).

I also figured it might be a good idea to *check* that stability is preserved. Luckily that's pretty easy, with template sort routines. I declared a type, Horse, generated arrays of Horse (see below) (by setting the value members, randomly permuting, and then writing the starting index of each element into the satellite members), and checked that stability was preserved after sorting.

template<class V, class S> class ValueAndSatellite
{
public:
  V value;
  S satellite;
  ValueAndSatellite()         { }
  ValueAndSatellite(V v, S s) { value=v; satellite=s; }
  ~ValueAndSatellite()        { }
  bool operator<  (const ValueAndSatellite<V,S> &rhs) { return value<rhs.value; }
  bool operator<= (const ValueAndSatellite<V,S> &rhs) { return value<=rhs.value; }
  bool operator== (const ValueAndSatellite<V,S> &rhs) { return value==rhs.value; }
};
typedef ValueAndSatellite<int, int> Horse;

Stability was preserved, for the 2- and 3-way mergesorts, but not for the 4-way mergesort. I haven't yet figured out how the 4-way mergesort went wrong. I'll have to post a correction when I find out.

Incidentally, with an array of Horse (each Horse is twice the size of an int, but has approximately the same comparison cost), Quicksort beat two-way, three-way, and even four-way merging comfortably. Even if all of the value members of the Horses in the array were distinct, Quicksort was 11% faster than a four-way merge.






For 3- and 4-way merging I've opted not to worry so much about trying to "split evenly in the presence of large numbers of duplicates". It's too hard to think through all the cases. Instead, I... just hope the splits are about right ("on average, if there aren't too many duplicates, they'll be okay"). And, at least for sorting integers on 4 cores,

  • my 3-way mergesort beats Quicksort comfortably. But not by the 8% I had expected. Only by about 5%.

  • my 4-way mergesort beats Quicksort by about 9% (I had expected 8%). But then, it isn't stable (ouch!)



Here's my current code (less classes, functions, and so forth that have appeared in previous posts)...

template<class T> void mergeForecastBackwardsRadix2External(T *a, T* aStop, T* b, T* bStop, T* dest)
{
  if (aStop[1]<=bStop[1])
  {
    //the "b"s will run out before the "a"s.
    for (;b>bStop;*dest--=*b--)
    {
      for (;*b<*a;*dest--=*a--);
    }
    for (;a>aStop;*dest--=*a--);
  }
  else
  {
    for (;a>aStop;*dest--=*a--)
    {
      for (;*a<=*b;*dest--=*b--);
    }
    for (;b>bStop;*dest--=*b--);
  }
}


template <class T> inline int indexOfFirstElementInBlockGreaterOrEqualTo(T *block, int count, T &v)
{
  int loIndex = 0;
  int hiIndex = count;
  while (loIndex<hiIndex)
  {
    int tryIndex = ( loIndex + hiIndex ) / 2;
    if ( v <= block[tryIndex]  )
      hiIndex  = tryIndex;
    else
      loIndex = tryIndex + 1;
  }
  return hiIndex;
}

template <class T> inline int indexOfFirstElementInBlockGreaterThan(T *block, int count, T &v)
{
  int loIndex = 0;
  int hiIndex = count;
  while (loIndex<hiIndex)
  {
    int tryIndex = ( loIndex + hiIndex ) / 2;
    if ( v < block[tryIndex]  )
      hiIndex  = tryIndex;
    else
      loIndex = tryIndex + 1;
  }
  return hiIndex;
}

template <class T> void paranoidMergeRadix2External(T *a, int aCount, T* b, int bCount, T* dest, bool bLeftToRight, INotifiable *t)
{
  if (aCount==0)
  {
    for (;bCount>0;--bCount) *dest++=*b++;
  }
  else if (bCount==0)
  {
    for (;aCount>0;--aCount) *dest++=*a++;
  }
  else if (bLeftToRight)
  {
    mergeForecastRadix2External(a, a+aCount, b, b+bCount, dest);
  }
  else
  {
    mergeForecastBackwardsRadix2External(a+aCount-1, a-1, b+bCount-1, b-1, dest+aCount+bCount-1);
  }
  if (t!=NULL)
  {
    t->notify();
  }
}

template <class T> void parallelMergeRadix2External(T* a, int aCount, T* b, int bCount, T* dest, bool bLeftToRight, int processorCount, INotifiable *t)
{
  while (processorCount>1)
  {
    //Ick!  We need to split the merge into two parts.
    //We'll be kinda lazy.  We'll figure out where the median of the LHS would go in the RHS, and split that way.
    int middleLeft  = aCount/processorCount*(processorCount/2);
    int hiLeft = middleLeft+1+indexOfFirstElementInBlockGreaterThan(a+middleLeft+1, aCount-middleLeft-1, a[middleLeft]);  
      //index of first element >v, in a, or aCount if there is none
    int loRight   = indexOfFirstElementInBlockGreaterOrEqualTo(b, bCount, a[middleLeft]);
      //index of first element >=v, in b, or bCount if there is none

    //Normally: merge a[0..hiLeft-1] with b[0..loRight-1], so set... takeLeft=hiLeft.
    //But:      We want takeLeft + loRight to be roughly (aCount+bCount)/processorCount*(proessorCount/2), and takeLeft
    //      can be any value between loLeft and hiLeft.  

    int takeLeft     = hiLeft;
    int desiredSplit = (aCount+bCount)/processorCount*(processorCount/2);
    int loLeft       = -1;
    if (takeLeft + loRight > desiredSplit) //too many records will be going to the "left" merge operation
    {
      if (desiredSplit - loRight >= middleLeft)
        takeLeft = desiredSplit - loRight;    //since any value between middleLeft and hiLeft is fine for # records to take from a
      else
      {
        //otherwise: need to know the minimum number of records we could *safely* take from block "a".
        loLeft = indexOfFirstElementInBlockGreaterOrEqualTo(a, middleLeft, a[middleLeft]);
        if (desiredSplit - loRight > loLeft)
        {
          takeLeft = desiredSplit - loRight;
        }
        else
        {
          takeLeft = loLeft;
        }
      }
    }    

    parallelMergeRadix2External(a, takeLeft, b, loRight, dest, bLeftToRight, processorCount/2, t);
    a              += takeLeft;
    b              += loRight;
    aCount         -= takeLeft;
    bCount         -= loRight;
    dest           += takeLeft+loRight;
    processorCount -= processorCount/2;
  }
  gpDispatcher->queueOrDo(CallAction7(paranoidMergeRadix2External<T>, a, aCount, b, bCount, dest, bLeftToRight, t));
}

template <class T> void parallelMergesortSubArrayRadix2(T *src, T *dest, int elementCount
                            , int processorCount, int multiThreadingThreshold
                            , bool srcIsInput, bool isLastMerge, INotifiable *t)
{
  if (elementCount>multiThreadingThreshold)
  {  
    int halfCount = elementCount >> 1;
    IAction*     mergeAction;       //the merge itself
    INotifiable* pendingMerge;    //for notifying the count-down latch that prevents the merge running                

    if (isLastMerge)
    {
      INotifiable* notificationDemultiplexer = NewPendingNotification(processorCount, t);
      mergeAction = CallAction8(parallelMergeRadix2External<T>, src, halfCount, src+halfCount, elementCount-halfCount
              , dest, srcIsInput, processorCount, notificationDemultiplexer);
      pendingMerge = NewPendingAction(2, mergeAction);      

    }
    else 
    {
      if (srcIsInput)
      {
        mergeAction = CallAction5(mergeForecastRadix2External<T>,  src, src+halfCount, src+halfCount, src+elementCount, dest);
      }
      else
      {
        mergeAction = CallAction5(mergeForecastBackwardsRadix2External<T>, src+halfCount-1, src-1, src+elementCount-1, src+halfCount-1, dest+elementCount-1);
      }
      IAction* notifyingAction =  NewNotifyingAction(mergeAction,t);
      pendingMerge = NewPendingAction(2, notifyingAction);
    }        

    parallelMergesortSubArrayRadix2(dest, src, halfCount
      , processorCount, multiThreadingThreshold, !srcIsInput, false, pendingMerge);
    parallelMergesortSubArrayRadix2(dest+halfCount, src+halfCount, elementCount-halfCount
      , processorCount, multiThreadingThreshold, !srcIsInput, isLastMerge, pendingMerge);

  }
  else
  {
    gpDispatcher->queueOrDo( NewNotifyingAction( CallAction5(mergesortForecastRadix2External<T>, src, elementCount, dest, srcIsInput, 32), t));
  }
}

template <class T>   void parallelMergesort(T *base, int elementCount, int processorCount, int multiThreadingThreshold)
{
  T *workArea = new T[elementCount];
  CountDown t(1);
  gpDispatcher->queueOrDo(CallAction8(parallelMergesortSubArrayRadix2<T>,workArea, base, elementCount, processorCount, multiThreadingThreshold, false, 
    true /*set to false to turn off parallization of the last merge on each level*//, (INotifiable*)&t));
  t.wait();
  delete [] workArea;
}

template <class T> void paranoidMergeRadix3External(T *a, int aCount, T* b, int bCount, T* c, int cCount, T* dest, bool bLeftToRight, INotifiable *t)
{
  if (aCount==0) 
    paranoidMergeRadix2External(b, bCount, c, cCount, dest, bLeftToRight, NULL);
  else if (bCount==0)
    paranoidMergeRadix2External(a, aCount, c, cCount, dest, bLeftToRight, NULL);
  else if (cCount==0)
    paranoidMergeRadix2External(a, aCount, b, bCount, dest, bLeftToRight, NULL);
  else if (bLeftToRight)
    mergeRadix3FastForward(a, a+aCount, b, b+bCount, c, c+cCount, dest);
  else
    mergeRadix3FastBackward(a+aCount-1, a-1, b+bCount-1, b-1, c+cCount-1, c-1, dest+aCount+bCount+cCount-1);

  if (t!=NULL)
  {
    t->notify();
  }
}

template <class T> void parallelMergeRadix3External(T* a, int aCount, T* b, int bCount, T* c, int cCount, T* dest, bool bLeftToRight, int processorCount, INotifiable *t)
{
  while (processorCount > 1)
  {
    int aMiddle  = aCount/processorCount*(processorCount/2);
    int bMiddle  = indexOfFirstElementInBlockGreaterThan(b, bCount, a[aMiddle]);
    int cMiddle  = indexOfFirstElementInBlockGreaterThan(c, cCount, a[aMiddle]);
        aMiddle += indexOfFirstElementInBlockGreaterThan(a+aMiddle, aCount-aMiddle, a[aMiddle]);
    parallelMergeRadix3External(a, aMiddle, b, bMiddle, c, cMiddle, dest, bLeftToRight, processorCount/2, t);
    a += aMiddle; aCount -= aMiddle;
    b += bMiddle; bCount -= bMiddle;
    c += cMiddle; cCount -= cMiddle;
    dest += aMiddle + bMiddle + cMiddle;
    processorCount -= processorCount/2;
  }
  gpDispatcher->queueOrDo(CallAction9(paranoidMergeRadix3External<T>, a, aCount, b, bCount, c, cCount, dest, bLeftToRight, t));
}


template <class T> void parallelMergesortSubArrayRadix3(T *src, int elementCount, T *dest, bool isSourceInput,
                          bool isLastMerge, int processorCount, int multiThreadingThreshold,
                          INotifiable* t)
{
  if (elementCount>multiThreadingThreshold)
  {  
    int stepSize = elementCount / 3 ;
    int twoStep  = stepSize + stepSize;
    IAction *mergeAction;
    INotifiable *pendingMerge;

    if (isLastMerge)
    {
      INotifiable *notificationDemultiplexer = NewPendingNotification(processorCount, t);
      mergeAction = CallAction10(parallelMergeRadix3External<T>, src, stepSize, src+stepSize, stepSize, src+twoStep, elementCount-twoStep
              , dest, isSourceInput, processorCount, notificationDemultiplexer);
      pendingMerge = NewPendingAction(3, mergeAction);      
    }
    else 
    {
      if (isSourceInput)
      {
        mergeAction = CallAction7(voidMergeRadix3FastForward<T>,  src, src+stepSize, src+stepSize, src+stepSize+stepSize, src+stepSize+stepSize,
                  src+elementCount, dest);
      }
      else
      {
        mergeAction = CallAction7(voidMergeRadix3FastBackward<T>, src+stepSize-1, src-1, src+stepSize+stepSize-1, src+stepSize-1,
                      src+elementCount-1, src+stepSize+stepSize-1, dest+elementCount-1);
      }
      IAction* notifyingAction =  NewNotifyingAction(mergeAction,t);
      pendingMerge = NewPendingAction(3, notifyingAction);
    }

    parallelMergesortSubArrayRadix3(dest, stepSize, src
      , !isSourceInput , false,       processorCount, multiThreadingThreshold, pendingMerge);
    parallelMergesortSubArrayRadix3(dest+stepSize, stepSize, src+stepSize
      , !isSourceInput , false,       processorCount, multiThreadingThreshold, pendingMerge);
    parallelMergesortSubArrayRadix3(dest+stepSize+stepSize, elementCount-stepSize-stepSize, src+stepSize+stepSize
      , !isSourceInput , isLastMerge, processorCount, multiThreadingThreshold, pendingMerge);
  }
  else
  {
    gpDispatcher->queueOrDo( NewNotifyingAction( CallAction4(mergeSortExternalRadix3Fast<T>, src, elementCount, dest, isSourceInput), t));
  }
}


template <class T>   void parallelMergesortRadix3(T *base, int elementCount, int processorCount, int multiThreadingThreshold=50000)
{
  T *workArea = new T[elementCount];
  CountDown t(1);
  gpDispatcher->queueOrDo(CallAction8(parallelMergesortSubArrayRadix3<T>, workArea, elementCount, base, false, true, processorCount, multiThreadingThreshold, (INotifiable*)&t));
  t.wait();
  delete [] workArea;
}

template <class T> void paranoidMergeRadix4External(T *a, int aCount, T* b, int bCount, T* c, int cCount, T* d, int dCount, T* dest, bool bLeftToRight, INotifiable *t)
{
  if (aCount==0) 
    paranoidMergeRadix3External(b, bCount, c, cCount, d, dCount, dest, bLeftToRight, NULL);
  else if (bCount==0)
    paranoidMergeRadix3External(a, aCount, c, cCount, d, dCount, dest, bLeftToRight, NULL);
  else if (cCount==0)
    paranoidMergeRadix3External(a, aCount, b, bCount, d, dCount, dest, bLeftToRight, NULL);
  else if (dCount==0)
    paranoidMergeRadix3External(a, aCount, b, bCount, c, cCount, dest, bLeftToRight, NULL);
  else if (bLeftToRight)
    mergeRadix4FastForward(a, a+aCount, b, b+bCount, c, c+cCount, d, d+dCount, dest);
  else
    mergeRadix4FastBackward(a+aCount-1, a-1, b+bCount-1, b-1, c+cCount-1, c-1, d+dCount-1, d-1, dest+aCount+bCount+cCount+dCount-1);

  if (t!=NULL)
  {
    t->notify();
  }
}

template <class T> void parallelMergeRadix4External(
  T* a, int aCount, T* b, int bCount, T* c, int cCount, T* d, int dCount,
  T* dest, bool bLeftToRight, int processorCount, INotifiable *t)
{
  while (processorCount > 1)
  {
    int aMiddle  = aCount/processorCount*(processorCount/2);
    int bMiddle  = indexOfFirstElementInBlockGreaterThan(b, bCount, a[aMiddle]);
    int cMiddle  = indexOfFirstElementInBlockGreaterThan(c, cCount, a[aMiddle]);
    int dMiddle  = indexOfFirstElementInBlockGreaterThan(d, dCount, a[aMiddle]);
        aMiddle += indexOfFirstElementInBlockGreaterThan(a+aMiddle, aCount-aMiddle, a[aMiddle]);
    parallelMergeRadix4External(a, aMiddle, b, bMiddle, c, cMiddle, d, dMiddle, dest, bLeftToRight, processorCount/2, t);
    a += aMiddle; aCount -= aMiddle;
    b += bMiddle; bCount -= bMiddle;
    c += cMiddle; cCount -= cMiddle;
    d += dMiddle; dCount -= dMiddle;
    dest += aMiddle + bMiddle + cMiddle + dMiddle;
    processorCount -= processorCount/2;
  }
  gpDispatcher->queueOrDo(CallAction11(paranoidMergeRadix4External<T>, a, aCount, b, bCount, c, cCount, d, dCount, dest, bLeftToRight, t));
}

template <class T> void parallelMergesortSubArrayRadix4(T *src, int elementCount, T *dest, bool isSourceInput,
                          bool isLastMerge, int processorCount, int multiThreadingThreshold,
                          INotifiable* t)
{
  if (elementCount>multiThreadingThreshold)
  {  
    int stepSize  = elementCount >> 2 ;
    int twoStep   = stepSize + stepSize;
    int threeStep = twoStep  + stepSize;
    IAction *mergeAction;
    INotifiable *pendingMerge;

    if (isLastMerge)
    {
      INotifiable *notificationDemultiplexer = NewPendingNotification(processorCount, t);
      mergeAction = CallAction12(parallelMergeRadix4External<T>, src, stepSize, src+stepSize, stepSize
              , src+twoStep, stepSize, src+threeStep, elementCount-threeStep
              , dest, isSourceInput, processorCount, notificationDemultiplexer);
      pendingMerge = NewPendingAction(4, mergeAction);      
    }
    else 
    {
      if (isSourceInput)
      {
        mergeAction = CallAction9(voidMergeRadix4FastForward<T>,  src, src+stepSize, src+stepSize, src+twoStep, src+twoStep,
                  src+threeStep, src+threeStep, src+elementCount, dest);
      }
      else
      {
        mergeAction = CallAction9(voidMergeRadix4FastBackward<T>, src+stepSize-1, src-1, src+twoStep-1, src+stepSize-1,
                  src+threeStep-1, src+twoStep-1, src+elementCount-1, src+threeStep-1, dest+elementCount-1);
      }
      IAction* notifyingAction =  NewNotifyingAction(mergeAction,t);
      pendingMerge = NewPendingAction(4, notifyingAction);
    }

    parallelMergesortSubArrayRadix4(dest, stepSize, src
      , !isSourceInput , false,       processorCount, multiThreadingThreshold, pendingMerge);
    parallelMergesortSubArrayRadix4(dest+stepSize, stepSize, src+stepSize
      , !isSourceInput , false,       processorCount, multiThreadingThreshold, pendingMerge);
    parallelMergesortSubArrayRadix4(dest+twoStep, stepSize, src+twoStep
      , !isSourceInput , false,       processorCount, multiThreadingThreshold, pendingMerge);
    parallelMergesortSubArrayRadix4(dest+threeStep, elementCount-threeStep, src+threeStep
      , !isSourceInput , isLastMerge, processorCount, multiThreadingThreshold, pendingMerge);
  }
  else
  {
    gpDispatcher->queueOrDo( NewNotifyingAction( CallAction4(mergeSortExternalRadix4Fast<T>, src, elementCount, dest, isSourceInput), t));
  }
}

template <class T>   void parallelMergesortRadix4(T *base, int elementCount, int processorCount, int multiThreadingThreshold=50000)
{
  T *workArea = new T[elementCount];
  CountDown t(1);
  gpDispatcher->queueOrDo(CallAction8(parallelMergesortSubArrayRadix4<T>, workArea, elementCount, base, false, true, 
    processorCount, multiThreadingThreshold, (INotifiable*)&t));
  t.wait();
  delete [] workArea;
}

No comments:

Post a Comment