Tuesday, June 23, 2009

Tuning Array Mergesorts for speed - Parallel Merging

The multithreaded mergesort routines that I posted here a few days ago only multithreaded the sorting of partitions. They did not multithread the actual merge operations. I was lazy. How much did that omission cost? Well, I added some code to generate a gantt chart for each of the five threads involved in the four-core two-way mergesort. I'm not going to post the code that generates the gantt chart, or even show the chart, but the important thing that it showed was that the last action queued on the last worker thread to complete took 25% of the elapsed time (to sort 10 million records, with a multithreading threshold of 50,000, using 4 cores), because it was merging on one thread (and it wasn't just merging one level on one thread! It was merging at lots of levels).

Rewriting the algorithm so that the last merge operation at each level of merging (and *only* the last merge) can be split across multiple cores does indeed reduce the running time siginifantly (and if the merging is multithreaded, the speed-up for the overall sort, at least for sorting distinct integers, is much better. For 10 million integers, two, three and four core versions are roughly 1.98, 2.88, and 3.68 times faster, respectively, than the one-core version).

Way back in 2000, when I looked at multithreading the merge operations for two-way mergesort on a Pentium IV (which only had "a core and a half", given the limitations of the Pentium IV's hyperthreading), the performance benefit was marginal. But I should have done the calculations to predict the saving. Assuming all comparisons take equal time and that the comparisons required for the last few merge operations can be spread evenly across all the processors... the speed-up (on 4-cores) for 3- and 4-way mergesorts should be enough to make them both about 8% faster than Quicksort. Well, I don't think that can be right (Quicksort always wins!). But I'll have to code them to find out for sure.

I'm not certain the following code is correct (I haven't tested all the boundary cases yet. I think it will go wrong when hiRight==bCount), nor have I properly checked whether it even preserves stability, and I haven't written the code to split 3- and 4-way merges across multiple cores either.



namespace
{
  class PendingNotification: public INotifiable
  {
    protected:
      ThreadSafeCounter m_prerequisiteCount;
      INotifiable* m_target;
    public:
      PendingNotification(int prereqCount, INotifiable *target)
        : m_prerequisiteCount(prereqCount)
        , m_target(target)
      {
      }
      void notify()
      {
        if (--m_prerequisiteCount == 0 )
        {
          m_target->notify();
          delete this;
        }
      }
      bool notified()
      {
        return false;  //if it still exists, it hasn't been notified.
      }
      void wait()
      {
        return m_target->wait();  //it doesn't really make sense to wait on a PendingNotification
      }
  };
}

INotifiable *NewPendingNotification(int prerequisiteCount, INotifiable *target)
{
  return new PendingNotification(prerequisiteCount, target);
}

template <class T> void paranoidMergeRadix2External(T *a, int aCount, T* b, int bCount, T* dest, bool bLeftToRight, INotifiable *t)
{
  if (aCount==0)
  {
    for (;bCount>0;--bCount) *dest++=*b++;
  }
  else if (bCount==0)
  {
    for (;aCount>0;--aCount) *dest++=*a++;
  }
  else if (bLeftToRight)
  {
    mergeForecastRadix2External(a, a+aCount, b, b+bCount, dest);
  }
  else
  {
    mergeForecastBackwardsRadix2External(a+aCount-1, a-1, b+bCount-1, b-1, dest+aCount+bCount-1);
  }
  if (t!=NULL)
  {
    t->notify();
  }
}

template <class T> void parallelMergeRadix2External(T* a, int aCount, T* b, int bCount, T* dest, bool bLeftToRight, int processorCount, INotifiable *t)
{
  while (processorCount>1)
  {
    //Ick!  We need to split the merge into two parts.
    //We'll be kinda lazy.  We'll figure out where the median of the LHS would go in the RHS, and split that way.
    int middleLeft  = aCount/processorCount*(processorCount/2);
    int lowRight    = 0;
    int hiRight     = bCount;
    while (lowRight<hiRight)
    {
      int tryRight = ( lowRight + hiRight ) / 2;
      if ( a[middleLeft] <= b[tryRight]  )
        hiRight  = tryRight;
      else
        lowRight = tryRight + 1;
    }
    //At this point, we have either hiRight==bCount, or b[hiRight] < a[middleLeft] 
    while (middleLeft<aCount && a[middleLeft+1]<=b[hiRight]) middleLeft++;
    parallelMergeRadix2External(a, middleLeft+1, b, hiRight, dest, bLeftToRight, processorCount/2, t);
    a              += middleLeft+1;
    b              += hiRight;
    aCount         -= middleLeft+1;
    bCount         -= hiRight;
    dest           += middleLeft+hiRight+1;
    processorCount -= processorCount/2;
  }
  gpDispatcher->queueOrDo(CallAction7(paranoidMergeRadix2External<T>, a, aCount, b, bCount, dest, bLeftToRight, t));
}

template <class T> void parallelMergesortSubArrayRadix2(T *src, T *dest, int elementCount
                            , int processorCount, int multiThreadingThreshold
                            , bool srcIsInput, bool isLastMerge, INotifiable *t)
{
  if (elementCount>multiThreadingThreshold)
  {  
    int halfCount = elementCount >> 1;
    IAction*     mergeAction;       //the merge itself
    INotifiable* pendingMerge;    //for notifying the count-down latch that prevents the merge running                

    if (isLastMerge)
    {
      INotifiable* notificationDemultiplexer = NewPendingNotification(processorCount, t);
      mergeAction = CallAction8(parallelMergeRadix2External<T>, src, halfCount, src+halfCount, elementCount-halfCount
              , dest, srcIsInput, processorCount, notificationDemultiplexer);
      pendingMerge = NewPendingAction(2, mergeAction);      

    }
    else 
    {
      if (srcIsInput)
      {
        mergeAction = CallAction5(mergeForecastRadix2External<T>,  src, src+halfCount, src+halfCount, src+elementCount, dest);
      }
      else
      {
        mergeAction = CallAction5(mergeForecastBackwardsRadix2External<T>, src+halfCount-1, src-1, src+elementCount-1, src+halfCount-1, dest+elementCount-1);
      }
      IAction* notifyingAction =  NewNotifyingAction(mergeAction,t);
      pendingMerge = NewPendingAction(2, notifyingAction);
    }        

    parallelMergesortSubArrayRadix2(dest, src, halfCount
      , processorCount, multiThreadingThreshold, !srcIsInput, false, pendingMerge);
    parallelMergesortSubArrayRadix2(dest+halfCount, src+halfCount, elementCount-halfCount
      , processorCount, multiThreadingThreshold, !srcIsInput, isLastMerge, pendingMerge);

  }
  else
  {
    gpDispatcher->queueOrDo( NewNotifyingAction( CallAction5(mergesortForecastRadix2External<T>, src, elementCount, dest, srcIsInput, 32), t));
  }
}

template <class T>   void parallelMergesort(T *base, int elementCount, int processorCount, int multiThreadingThreshold)
{
  T *workArea = new T[elementCount];
  CountDown t(1);
  gpDispatcher->queueOrDo(CallAction8(parallelMergesortSubArrayRadix2<T>,workArea, base, elementCount, processorCount, multiThreadingThreshold, false, true, (INotifiable*)&t));
  t.wait();
  delete [] workArea;
}

No comments:

Post a Comment