
Way back in 2000, when I looked at multithreading the merge operations for two-way mergesort on a Pentium IV (which only had "a core and a half", given the limitations of the Pentium IV's hyperthreading), the performance benefit was marginal. But I should have done the calculations to predict the saving. Assuming all comparisons take equal time and that the comparisons required for the last few merge operations can be spread evenly across all the processors... the speed-up (on 4-cores) for 3- and 4-way mergesorts should be enough to make them both about 8% faster than Quicksort. Well, I don't think that can be right (Quicksort always wins!). But I'll have to code them to find out for sure.
I'm not certain the following code is correct (I haven't tested all the boundary cases yet. I think it will go wrong when hiRight==bCount), nor have I properly checked whether it even preserves stability, and I haven't written the code to split 3- and 4-way merges across multiple cores either.
namespace
{
class PendingNotification: public INotifiable
{
protected:
ThreadSafeCounter m_prerequisiteCount;
INotifiable* m_target;
public:
PendingNotification(int prereqCount, INotifiable *target)
: m_prerequisiteCount(prereqCount)
, m_target(target)
{
}
void notify()
{
if (--m_prerequisiteCount == 0 )
{
m_target->notify();
delete this;
}
}
bool notified()
{
return false; //if it still exists, it hasn't been notified.
}
void wait()
{
return m_target->wait(); //it doesn't really make sense to wait on a PendingNotification
}
};
}
INotifiable *NewPendingNotification(int prerequisiteCount, INotifiable *target)
{
return new PendingNotification(prerequisiteCount, target);
}
template <class T> void paranoidMergeRadix2External(T *a, int aCount, T* b, int bCount, T* dest, bool bLeftToRight, INotifiable *t)
{
if (aCount==0)
{
for (;bCount>0;--bCount) *dest++=*b++;
}
else if (bCount==0)
{
for (;aCount>0;--aCount) *dest++=*a++;
}
else if (bLeftToRight)
{
mergeForecastRadix2External(a, a+aCount, b, b+bCount, dest);
}
else
{
mergeForecastBackwardsRadix2External(a+aCount-1, a-1, b+bCount-1, b-1, dest+aCount+bCount-1);
}
if (t!=NULL)
{
t->notify();
}
}
template <class T> void parallelMergeRadix2External(T* a, int aCount, T* b, int bCount, T* dest, bool bLeftToRight, int processorCount, INotifiable *t)
{
while (processorCount>1)
{
//Ick! We need to split the merge into two parts.
//We'll be kinda lazy. We'll figure out where the median of the LHS would go in the RHS, and split that way.
int middleLeft = aCount/processorCount*(processorCount/2);
int lowRight = 0;
int hiRight = bCount;
while (lowRight<hiRight)
{
int tryRight = ( lowRight + hiRight ) / 2;
if ( a[middleLeft] <= b[tryRight] )
hiRight = tryRight;
else
lowRight = tryRight + 1;
}
//At this point, we have either hiRight==bCount, or b[hiRight] < a[middleLeft]
while (middleLeft<aCount && a[middleLeft+1]<=b[hiRight]) middleLeft++;
parallelMergeRadix2External(a, middleLeft+1, b, hiRight, dest, bLeftToRight, processorCount/2, t);
a += middleLeft+1;
b += hiRight;
aCount -= middleLeft+1;
bCount -= hiRight;
dest += middleLeft+hiRight+1;
processorCount -= processorCount/2;
}
gpDispatcher->queueOrDo(CallAction7(paranoidMergeRadix2External<T>, a, aCount, b, bCount, dest, bLeftToRight, t));
}
template <class T> void parallelMergesortSubArrayRadix2(T *src, T *dest, int elementCount
, int processorCount, int multiThreadingThreshold
, bool srcIsInput, bool isLastMerge, INotifiable *t)
{
if (elementCount>multiThreadingThreshold)
{
int halfCount = elementCount >> 1;
IAction* mergeAction; //the merge itself
INotifiable* pendingMerge; //for notifying the count-down latch that prevents the merge running
if (isLastMerge)
{
INotifiable* notificationDemultiplexer = NewPendingNotification(processorCount, t);
mergeAction = CallAction8(parallelMergeRadix2External<T>, src, halfCount, src+halfCount, elementCount-halfCount
, dest, srcIsInput, processorCount, notificationDemultiplexer);
pendingMerge = NewPendingAction(2, mergeAction);
}
else
{
if (srcIsInput)
{
mergeAction = CallAction5(mergeForecastRadix2External<T>, src, src+halfCount, src+halfCount, src+elementCount, dest);
}
else
{
mergeAction = CallAction5(mergeForecastBackwardsRadix2External<T>, src+halfCount-1, src-1, src+elementCount-1, src+halfCount-1, dest+elementCount-1);
}
IAction* notifyingAction = NewNotifyingAction(mergeAction,t);
pendingMerge = NewPendingAction(2, notifyingAction);
}
parallelMergesortSubArrayRadix2(dest, src, halfCount
, processorCount, multiThreadingThreshold, !srcIsInput, false, pendingMerge);
parallelMergesortSubArrayRadix2(dest+halfCount, src+halfCount, elementCount-halfCount
, processorCount, multiThreadingThreshold, !srcIsInput, isLastMerge, pendingMerge);
}
else
{
gpDispatcher->queueOrDo( NewNotifyingAction( CallAction5(mergesortForecastRadix2External<T>, src, elementCount, dest, srcIsInput, 32), t));
}
}
template <class T> void parallelMergesort(T *base, int elementCount, int processorCount, int multiThreadingThreshold)
{
T *workArea = new T[elementCount];
CountDown t(1);
gpDispatcher->queueOrDo(CallAction8(parallelMergesortSubArrayRadix2<T>,workArea, base, elementCount, processorCount, multiThreadingThreshold, false, true, (INotifiable*)&t));
t.wait();
delete [] workArea;
}
No comments:
Post a Comment