Sunday, June 21, 2009

Tuning Array Mergesorts for speed, part 3



Finally... Some multithreading mergesorts. Each of these rely on other classes and routines that I've posted earlier. And, no, they're not as fast as a Quicksort, running on two or more cores. The graphs shown at left were obtained running each sort routine once (on a random permutation) for each value of N. The multithreading threshold was set at 50,000 records for each of the calls. The performance was calculated from the running time, T, like so: N * log(N) / log(2) / T / 1000000.

These results were obtained using a stack dispatcher rather than a FIFO queue dispatcher (in theory a FIFO queue should be friendlier to Mergesorts and less friendly to Quicksort, but in practice it seems to make little difference. Quicksort still wins comfortably running on 4 cores).

A multithreaded mergesort is a little more difficult than a multithreaded quicksort. In a Quicksort, once you've divided a sub-array into two partitions, there is never any further "communication" between the two partitions, so one wait on a count-down latch is sufficient for the whole sort. But in a Mergesort, you divide sub-arrays into two (or more) partitions, sort each of those, and then merge the partitions, once all of the sorts have finished. So latches are needed to make sure that the merges don't run too soon.

It isn't as bad as it sounds, since it isn't necessary to have any threads actually waiting on the latches (except for the "topmost" latch for the whole sort call). The thread that sorts the last partition can take over the job of carrying out the merge operation. An analogy: if the threads are people working in an office, "the last one to leave turns out the lights". The easiest way that I could think of to get that behaviour was to define a pending action class, defined in terms of interfaces and classes that I've mentioned earlier. Like so:

//Module: PendingAction.h
#pragma once
#ifndef PENDING_ACTION_H
#define PENDING_ACTION_H

class PendingAction:public INotifiable
{
  private:
    IAction*      m_Action;
    ThreadSafeCounter m_counter;
    JEvent            m_event;
    bool        m_selfDestruct;
  public:
    PendingAction(int prerequisiteCount, IAction *action, bool selfDestruct=false);
    virtual ~PendingAction();
    virtual void notify();
    virtual void wait();
    virtual bool notified();
};

INotifiable *NewPendingAction(int prerequisiteCount, IAction *action);

#endif //PENDING_ACTION_H

//Module: PendingAction.cpp
#include "PendingAction.h"

PendingAction::PendingAction(int prerequisiteCount, IAction *action, bool selfDestruct)
      : m_counter(prerequisiteCount+1)
      , m_Action(action)
      , m_selfDestruct(selfDestruct)
{
}
PendingAction::~PendingAction()
{
}
void PendingAction::notify()
{
  if ((--m_counter)==1)
  {
    if (m_selfDestruct)
    {
      IAction *action = m_Action;
      delete this;          //first!
      action->run();
    }
    else
    {
      m_Action->run();
      --m_counter;
      m_event.set();
    }
  }
}
void PendingAction::wait()          //only makes sense if not self-destructing
{
  m_event.wait();
}
bool PendingAction::notified()        //only makes sense if not self-destructing
{
  return m_counter.value() == 0;
}

INotifiable *NewPendingAction(int prerequisiteCount, IAction *action)
{
  return new PendingAction(prerequisiteCount, action, true);
}


template<class T> void mergeForecastBackwardsRadix2External(T *a, T* aStop, T* b, T* bStop, T* dest)
{
  if (aStop[1]<=bStop[1])
  {
    //the "b"s will run out before the "a"s.
    for (;b>bStop;*dest--=*b--)
    {
      for (;*b<*a;*dest--=*a--);
    }
    for (;a>aStop;*dest--=*a--);
  }
  else
  {
    for (;a>aStop;*dest--=*a--)
    {
      for (;*a<=*b;*dest--=*b--);
    }
    for (;b>bStop;*dest--=*b--);
  }
}

template <class T> void parallelMergesortSubArrayRadix2(T *src, T *dest, int elementCount
                            , int processorCount, int multiThreadingThreshold
                            , bool srcIsInput, INotifiable *t)
{
  if (elementCount>multiThreadingThreshold)
  {  
    int halfCount = elementCount >> 1;
    IAction *mergeAction       = NewNotifyingAction((srcIsInput) 
      ? CallAction5(mergeForecastRadix2External<T>,  src, src+halfCount, src+halfCount, src+elementCount, dest)
      : CallAction5(mergeForecastBackwardsRadix2External<T>, src+halfCount-1, src-1, src+elementCount-1, src+halfCount-1, dest+elementCount-1)
      , t);
    INotifiable *pendingMerge  = NewPendingAction(2, mergeAction);

    parallelMergesortSubArrayRadix2(dest, src, halfCount
      , processorCount, multiThreadingThreshold, !srcIsInput, pendingMerge);
    parallelMergesortSubArrayRadix2(dest+halfCount, src+halfCount, elementCount-halfCount
      , processorCount, multiThreadingThreshold, !srcIsInput, pendingMerge);

  }
  else
  {
    gpDispatcher->queueOrDo( NewNotifyingAction( CallAction5(mergesortForecastRadix2External<T>, src, elementCount, dest, srcIsInput, 32), t));
  }
}

template <class T>   void parallelMergesort(T *base, int elementCount, int processorCount, int multiThreadingThreshold)
{
  T *workArea = new T[elementCount];
  CountDown t(1);
  parallelMergesortSubArrayRadix2(workArea, base, elementCount, processorCount, multiThreadingThreshold, false, &t);
  t.wait();
  delete [] workArea;
}

template <class T> void parallelMergesortSubArrayRadix3(T *src, int elementCount, T *dest, bool isSourceInput,
                          int processorCount, int multiThreadingThreshold,
                          INotifiable* t)
{
  if (elementCount>multiThreadingThreshold)
  {  
    int stepSize = elementCount / 3 ;
    IAction *mergeAction       = NewNotifyingAction(
      (isSourceInput) 
        ? CallAction7(voidMergeRadix3FastForward<T>,  src, src+stepSize, src+stepSize, src+stepSize+stepSize, src+stepSize+stepSize,
                      src+elementCount, dest)
        : CallAction7(voidMergeRadix3FastBackward<T>, src+stepSize-1, src-1, src+stepSize+stepSize-1, src+stepSize-1,
                      src+elementCount-1, src+stepSize+stepSize-1, dest+elementCount-1)
        , t);
    INotifiable *pendingMerge  = NewPendingAction(3, mergeAction);

    parallelMergesortSubArrayRadix3(dest, stepSize, src
      , !isSourceInput , processorCount, multiThreadingThreshold, pendingMerge);
    parallelMergesortSubArrayRadix3(dest+stepSize, stepSize, src+stepSize
      , !isSourceInput, processorCount, multiThreadingThreshold, pendingMerge);
    parallelMergesortSubArrayRadix3(dest+stepSize+stepSize, elementCount-stepSize-stepSize, src+stepSize+stepSize
      , !isSourceInput, processorCount, multiThreadingThreshold, pendingMerge);
  }
  else
  {
    gpDispatcher->queueOrDo( NewNotifyingAction( CallAction4(mergeSortExternalRadix3Fast<T>, src, elementCount, dest, isSourceInput), t));
  }
}


template <class T>   void parallelMergesortRadix3(T *base, int elementCount, int processorCount, int multiThreadingThreshold=50000)
{
  T *workArea = new T[elementCount];
  CountDown t(1);
  parallelMergesortSubArrayRadix3(workArea, elementCount, base, false, processorCount, multiThreadingThreshold, &t);
  t.wait();
  delete [] workArea;
}

template <class T> void parallelMergesortSubArrayRadix4(T *src, int elementCount, T *dest, bool isSourceInput,
                          int processorCount, int multiThreadingThreshold,
                          INotifiable* t)
{
  if (elementCount>multiThreadingThreshold)
  {  
    int stepSize  = elementCount / 4 ;
    int twoStep   = stepSize + stepSize;
    int threeStep = twoStep  + stepSize;

    IAction *mergeAction       = NewNotifyingAction(
      (isSourceInput) 
        ? CallAction9(voidMergeRadix4FastForward<T>,  src, src+stepSize, src+stepSize, src+twoStep, src+twoStep,
                      src+threeStep, src+threeStep, src+elementCount, dest)
        : CallAction9(voidMergeRadix4FastBackward<T>, src+stepSize-1, src-1, src+twoStep-1, src+stepSize-1,
                      src+threeStep-1, src+twoStep-1, src+elementCount-1, src+threeStep-1, dest+elementCount-1)
        , t);
    INotifiable *pendingMerge  = NewPendingAction(4, mergeAction);

    parallelMergesortSubArrayRadix4(dest, stepSize, src
      , !isSourceInput , processorCount, multiThreadingThreshold, pendingMerge);
    parallelMergesortSubArrayRadix4(dest+stepSize, stepSize, src+stepSize
      , !isSourceInput, processorCount, multiThreadingThreshold, pendingMerge);
    parallelMergesortSubArrayRadix4(dest+twoStep, stepSize, src+twoStep
      , !isSourceInput, processorCount, multiThreadingThreshold, pendingMerge);
    parallelMergesortSubArrayRadix4(dest+threeStep, elementCount-threeStep, src+threeStep
      , !isSourceInput, processorCount, multiThreadingThreshold, pendingMerge);
  }
  else
  {
    gpDispatcher->queueOrDo( NewNotifyingAction( CallAction4(mergeSortExternalRadix4Fast<T>, src, elementCount, dest, isSourceInput), t));
  }
}


template <class T>   void parallelMergesortRadix4(T *base, int elementCount, int processorCount, int multiThreadingThreshold=50000)
{
  T *workArea = new T[elementCount];
  CountDown t(1);
  parallelMergesortSubArrayRadix4(workArea, elementCount, base, false, processorCount, multiThreadingThreshold, &t);
  t.wait();
  delete [] workArea;
}

I was using a trick to get the 2- and 3- core numbers... I had a "knobbling" class, which is used like this (if the number of worker threads is hardcoded to 4)...

//p = number of threads, say, 2 or 3
{
  ScopedThreadDisabler hobbler(gpDispatcher, 4-p); 
  //...get start time ...
  //...run the sort   ...
  //...get stop time  ...
}

class BroadcastEvent: public INotifiable
{
  private:
    JEvent        m_signaledEvent;
    CTR          m_done;
  public:
    BroadcastEvent();
    ~BroadcastEvent();
    virtual void notify();
    virtual void wait();
    virtual bool notified();
};


BroadcastEvent::BroadcastEvent():m_done(0) { }
BroadcastEvent::~BroadcastEvent()          { }

void BroadcastEvent::notify()      

  m_signaledEvent.set(); 
}

void BroadcastEvent::wait()        

  m_signaledEvent.wait(); 
  InterlockedIncrement(&m_done);
  m_signaledEvent.set(); 
}

bool BroadcastEvent::notified()   

  return m_done > 0; 
}

class PassiveWaitAction:public IAction
{
  private:
    INotifiable *m_waitOnMe;
  public:
    PassiveWaitAction(INotifiable *waitOnMe): m_waitOnMe(waitOnMe) {}
    virtual void run() { m_waitOnMe->wait(); }
};

IAction* NewPassiveWaitAction(INotifiable *waitOnMe)
{
  return NewSelfDeletingAction(new PassiveWaitAction(waitOnMe));
}

class ScopedThreadDisabler
{
  public:  
    BroadcastEvent m_Dying;
    CountDown      m_Dead;
    ScopedThreadDisabler(IDispatcher *here, int count);
    ~ScopedThreadDisabler();
};

ScopedThreadDisabler::ScopedThreadDisabler(IDispatcher *here, int count)
  : m_Dead(count)
{
  if (count==0) 
    m_Dead.notify();
  else
    for (;count>0;count--)
      here->queueOrDo(NewNotifyingAction(NewPassiveWaitAction(&m_Dying), &m_Dead));
}
ScopedThreadDisabler::~ScopedThreadDisabler()
{
  m_Dying.notify();
  m_Dead.wait();
}

No comments:

Post a Comment