Monday, January 17, 2011

C++ mergesortAsymmetric source code

Here's my current draft of mergesortAsymmetric(). But I'm suspicious there's something wrong with it somewhere, because when I tuned it for space it became suspiciously fast. So far, it seems a little faster, on average, than quickSortClassic(), for sorting integers. I don't see how that can be right! (It does more element moves and more comparisons than quicksort or standard binary mergesort - and I haven't fixed up either of the low-level sort routines to use pre-increment instead of post-increment). There must be a mistake buried in there somewhere, but I can't see what it is.

template <class T>void insertionSort(T *src, int count)
{
  //
  //Notes:  1.insertionSort is a "workhorse" sorting routine.  Many
  //      other sorting routines use it.  quickSort and Combsort
  //      may use it to "finish off the job".  Mergesorts may use it
  //      to create initial runs.
  //      2.insertionSort has a very low "set-up" cost, and for
  //      small enough N, is more efficient than quickSort.
  //      3.The following assumes that ((void*)src-sizeof(src[0]) >= 0),
  //          which may not be true if sizeof(src[0]) is large enough!
  //
  T v;
  T *stopAt  = src+count;  //pointer to location after last element to sort
  T *sorting;          //pointer to location after last element known
                //to be in sort order w.r.t. elements to its left
  T *scanning;        //used for finding where the element at *sorting
                //should be placed.

  for (sorting=src+1 ; sorting<stopAt; sorting++)
  {
    v=*sorting;
    for (scanning=sorting-1; src<=scanning && v<*scanning; scanning--)
    {
        scanning[1]=scanning[0];
    }
    scanning[1]=v;
  }
}

template <class T> int insertionSortExternal(T *input, int count, T *output)
{
  //note: it is assumed that there is no overlap between output and input
  //    (or in C notation, that this...
  //      !(input<=output && output<input+count) &&
  //      !(output<=input && input<output+count)
  //    ...holds true).
  //
  T *endOutput = output+count;
  T *sortedBoundary;
  T *scanBack;
  *output = *input++;        //copy over first element
  for (sortedBoundary=output+1;sortedBoundary<endOutput;sortedBoundary++)
  {
    for (scanBack=sortedBoundary-1;scanBack>=output;scanBack--)
      if (*scanBack<=*input) breakelse scanBack[1]=*scanBack;
    scanBack[1]=*input++;
  }
  return 0;
}

template <class T> inline int indexOfFirstElementInBlockGreaterThan(T *block, int count, T &v)
{
  int loIndex = 0;
  int hiIndex = count;
  while (loIndex<hiIndex)
  {
    int tryIndex = ( loIndex + hiIndex ) / 2;
    if ( v < block[tryIndex]  )
      hiIndex  = tryIndex;
    else
      loIndex = tryIndex + 1;
  }
  return hiIndex;
}

//Date        By Change
//=========== == ======
//04-Jan-2010 JB Draft
//11-Jan-2010 JB Revisions to AsymmetricMergesorter, to reduce the space requirement 
//               from NR  (N=record count, R=record size) 
//               to   NPR (where P=ratio of smaller
//                         sublist to list size during each merge).
//
//               There are two sort routines, sortRight and sortLeft.
//               sortRight: 1. calls itself to sort the right-hand (n-np) records,
//                             back to the same location
//                          2. calls sortLeft to sort the left-hand np records into
//                             the np-record workarea
//                          3. merges the left-hand data from the workarea, with
//                             the right-hand data.
//               sortLeft:  1. sorts left hand of destination to source
//                          2. sorts right hand of destination to source
//                          3. merges left and right source to destination                          
//
//               There's an unexpected benefit: performance is considerably *better*
//               

template<class T> void mergeAsymmetricRadix2External(T* a, T* aStop, T* b, T* bStop, T* dest)
{
  //Assumes:  aStop-a is considerably less than bStop-b
  //Note:     We want to merge until we run out of elements in a.
  //          But, if elements in b will run out first, we can't do that safely.
  //

  if (aStop[-1]<=bStop[-1])//the easy case.  a runs out first
  {
    for (;a<aStop;++dest,++a)
    {
      for (;*b<*a;++dest,++b)
      {
        *dest=*b;
      }
      *dest=*a;
    }
    for (;b<bStop;++dest,++b)
    {
      *dest=*b;
    }
  }
  else //the harder case: b's will run out first.  but we to check for a's running out *first*
  {
    //
    //find a portion of the a's, left(a) that *will* run out first,
    //and merge left(a) and b until left(a) runs out.
    //
    T* aChop = a + indexOfFirstElementInBlockGreaterThan(a, aStop-a, bStop[-1]);
    for (;a<aChop;++dest,++a)
    {
      for (;*b<*a;++dest,++b)
      {
        *dest=*b;
      }
      *dest=*a;
    }
    for (;b<bStop;++dest,++b)
    {
      for (;*a<=*b;++dest,++a)
      {
        *dest=*a;
      }
      *dest = *b;
    }
    for (;a<aStop;++dest,++a)
    {
      *dest=*a;
    }
  }
}

template <class T> class AsymmetricMergesorter
{
private:
  T*  m_base;
  T*  m_workArea;
  int m_workCount;
  int m_count;
  int m_cutOff;
  int m_numerator;
  int m_denominator;
public:
  AsymmetricMergesorter(T* base, int count, int cutOff, int numerator, int denominator)
    : m_base(base)
    , m_count(count)
    , m_cutOff(cutOff)
    , m_numerator(numerator)
    , m_denominator(denominator)
  {
    m_workCount = (long)(long)m_count * m_numerator / m_denominator ;
    m_workArea  = new T [ m_workCount ]; //NPR space
  }
  ~AsymmetricMergesorter()
  {
    delete [] m_workArea;
  }
  void sortLeft(T *source, int count, T* dest, bool bSourceIsInput)
  {
    if (count<m_cutOff)
    {
      if (bSourceIsInput)
        insertionSortExternal(source, count, dest);
      else
        insertionSort(dest, count);
    }
    else
    {
      int leftCount = ( (long)(long)count * m_numerator / m_denominator );
      sortLeft( dest,             leftCount,       source,         !bSourceIsInput );
      sortLeft( source+leftCount, count-leftCount, dest+leftCount,  bSourceIsInput);
      mergeAsymmetricRadix2External( source, source+leftCount, dest+leftCount, dest+count, dest);
    }
  }
  void sortRight(T* source, int count)
  {
    if (count<m_cutOff)
    {  
      insertionSort(source, count);
    }
    else
    {
      int leftCount = ( (long)(long)count * m_numerator / m_denominator );    
      sortRight(source+leftCount, count-leftCount);
      sortLeft (source,           leftCount,        m_workArea,  true);
      mergeAsymmetricRadix2External(m_workArea, m_workArea+leftCount, source+leftCount, source+count, source);
    }
  }
  void sort()
  {
    sortRight(m_base, m_count);
  }
};

template<class T> void mergesortAsymmetricRadix2(T *a, int count,int cutOff=32,int numerator=1, int denominator=4)
{
  AsymmetricMergesorter<T> s(a, count, cutOff, numerator, denominator);
  s.sort();
}

No comments:

Post a Comment