Saturday, June 20, 2009

Multithreading Quicksorts - The code

This post provides the code for some example multithreading Quicksorts. I'm covering Quicksort first because it's slightly easier to write efficient multithreading Quicksorts than Mergesorts. Later on I'll be showing off multithreaded Combsorts and Shellsorts too, and those will also using a lot of the infrastructure that appears here.

This post contains nearly 1500 lines of code, because it defines a complete program.

Imagine that each line of the form //Module: x ... indicates the start of the code in a module with that filename (there would have to be a bunch more #include statements, though). All the code here is for MS Windows and the Microsoft Visual C++ compiler (I built with VS 2008, but it may work with some earlier versions too).

I've been lazy and I've hard-coded the number of cores to 4 (in the call to IDispatcher::startThreads() in main).

The MUTEX_HANDLE, EVENT_HANDLE and THREAD_HANDLE types exist because on many operating systems (Linux springs to mind, these aren't all defined as HANDLE the way they are on MS Windows).

There are some #define statements that you can vary if you want.

  • Setting MUTEX_IS_CRITICAL_SECTION to 1 switches the code over to using MS Windows mutex handles, which are considerably slower

  • Setting COUNTDOWN_USES_FANCY_MEMORY_SEMANTICS to 0 will switch the code in the CountDown class - and in the DumbThread class - over to using InterlockedIncrement and InterlockedDecrement, rather than InterlockedIncrementAcquire and InterlockedDecrementRelease. On a 64-bit Vista machine using the older InterlockedIncrement and InterlockedDecrement functions tends to make the program run slightly slower.



Examples 1 and 2 divide work up by creating new threads. That is usually a pretty bad idea, because starting new threads takes a long time, uses up memory and other system resources, and may fail. For examples 1 and 2 the modules are:

  • InsertionSort.h - defines a template insertion sort routine

  • QuicksortClassic.h - defines a single-threaded quicksort routine

  • CTR.h : defines a counter type (why? because different compilers ship with headers that declare the prototypes of functions like InterlockedIncrement slightly differently).

  • DumbThread.h and DumbThread.cpp - declares a class for starting a new thread, doing something in it, and waiting for the thread to complete. You would need to rewrite both these modules for a Linux version.

  • quickSortExample1.h - The first example of a multi-threaded quicksort, creating new threads, and waiting on them, via a subclass of DumbThread

  • quickSortExample2.h - Another example, which creates fewer threads (but still creates too many).



Example 3 reduces the amount of waiting. Threads sorting sub-arrays no longer wait on other threads. Only one thread waits, on a single count-down latch. But this doesn't really help at all (not by itself, anyway), because now too many threads are likely to run at once. For example 3 the modules are:

  • INotifiable.h - Declares an interface for objects that can be told that a synchronization event has occurred

  • INotifiable.cpp - Dummy do-nothing implementations for same

  • ThreadSafeCounter.h - Self-explanatory

  • JMutex.h - Wraps Operating Sytem mutex objects (OR Windows Critical sections

  • JEvent.h - Wraps a Window event object, guarded by a JMutex

  • CountDown.h - Declares a count-down latch

  • ThreadSafeCounter.cpp, JMutex.cpp, JEvent.cpp, CountDown.cpp - Windows implementations

  • quickSortExample3.h



You would need to replace ThreadSafeCounter.h, JMutex.h and JEvent.h for a linux build.

Example 4, which does away with (most) creating of child threads. There is one worker thread per core, and all the worker threads share a queue of actions to perform. The modules for example 4 are:

  • IAction.h, IAction.cpp, NotifyingAction.h, NotifyingAction.cpp - classes for hiding the details of actions, and helper functions for producing actions for making specific function calls

  • IDispatcher.h - interface (and base implementation) for a dispatcher; a dispatcher is an object that is used for coordinating a group of co-worker threads

  • DivideAndConquer.h and DivideAndConquer.cpp - a class for CountDown objects that are counting down tasks that are being sent to the action queue of an IDispatcher implementation. This class merely saves some typing in client routines; it isn't really necessary

  • ThreadSafeBase.h and ThreadSafeBase.cpp - a base class for objects that can be locked and unlocked / for implementing behaviour sort of like java's synchronized...

  • NakedBuffer.h, LIFOBuffer.h - stripped down buffering classes. Though they're not stripped down enough, and I guess you could use std::vector instead

  • ThreadSafeQueue.h - template class for thread-safe queues and stacks

  • DispatcherBase.h and .cpp - A base class for IDispatcher implementations

  • StackDispatcher.h and StackDispatcher.cpp - A stack-based dispatcher

  • quickSortExample4.h - after all that, the actual parallel quicksort isn't very interesting!



#include <windows.h>
#include <math.h>     //for floor()
#include <stdio.h>    //for fopen_s(), etc.
#include <iostream>    //for std::out etc

//Module: insertionSort.h
template <class T>void insertionSort(T *src, int count)
{
  //
  //Notes:  1.insertionSort is a "workhorse" sorting routine.  Many
  //      other sorting routines use it.  quickSort and Combsort
  //      may use it to "finish off the job".  Mergesorts may use it
  //      to create initial runs.
  //      2.insertionSort has a very low "set-up" cost, and for
  //      small enough N, is more efficient than quickSort.
  //      3.The following assumes that ((void*)src-sizeof(src[0]) >= 0),
  //          which may not be true if sizeof(src[0]) is large enough!
  //
  T v;
  T *stopAt  = src+count;  //pointer to location after last element to sort
  T *sorting;          //pointer to location after last element known
                //to be in sort order w.r.t. elements to its left
  T *scanning;        //used for finding where the element at *sorting
                //should be placed.

  for (sorting=src+1 ; sorting<stopAt; sorting++)
  {
    v=*sorting;
    for (scanning=sorting-1; src<=scanning && v<*scanning; scanning--)
    {
        scanning[1]=scanning[0];
    }
    scanning[1]=v;
  }
}

//Module: QuicksortClassic.h
template <class T> inline int setSentinels(T *arr,int count)
{
  //
  //  Description:
  //      ensures that arr[0] <= arr[i], for all 0<i<count
  //      and   arr[count-1] > arr[i], for all 0<=i<count-1
  //  Notes:  Mainly used to provide "sentinel" records to 
  //      simplify boundary checks in partitioning sorts.
  //      
  //
  T *minAt;  T min;
  T *maxAt;  T max;
  T *scan;
  minAt=arr+count-1; maxAt=minAt; min=max=*maxAt; 
  for (scan=arr+count-2;scan>=arr;scan--)
    if (*scan<=min)
      { minAt=scan; min=*scan; }
    else if (max<*scan)
      { maxAt=scan; max=*scan; }
  *minAt=arr[0];      arr[0]=min;

  if (maxAt==arr) maxAt=minAt;          
    //the above line handles a particularly nasty special case:
    //If the first element *was* the maximum then we've 
    //swapped it with minAt, so maxAt has to point to the 
    //place we moved it to.

  *maxAt=arr[count-1];  arr[count-1]=max;
  return (min==max);
}
  
template <class T> inline int partitionFromLeft(T *arr, int lowerIndex, int upperIndex)
{
  int lhs=lowerIndex;
  int rhs=upperIndex+1;
  T pivot=arr[lowerIndex];
  T tmp;
  for (;;) 
  {
    do { lhs++; } 
      while (lhs<=rhs && arr[lhs]<pivot);
    do { rhs--;  } 
      while (pivot<arr[rhs]);
    if (rhs<=lhs) break;
    { tmp=arr[lhs]; arr[lhs]=arr[rhs]; arr[rhs]=tmp; }
  }
  if (rhs!=lowerIndex)
  {
    { tmp=arr[lowerIndex]; arr[lowerIndex]= arr[rhs];
    arr[rhs]=tmp; }
  }
  return rhs;  
}


template <class T> inline int partitionFromRight(T *arr, int lowerIndex, int upperIndex)
{
  int lhs=lowerIndex-1;
  int rhs=upperIndex;
  T pivot=arr[upperIndex];
  T tmp;
  for (;;) 
  {
    do { rhs--;  } 
      while (lhs<=rhs && pivot<arr[rhs]);
    do { lhs++;  } 
      while (arr[lhs]<pivot);
    if (rhs<=lhs) break;
    tmp = arr[lhs]; arr[lhs] = arr[rhs]; arr[rhs]=tmp;
  }
  if (lhs!=upperIndex)
  {
    tmp =arr[lhs]; arr[lhs] = arr[upperIndex]; arr[upperIndex]=tmp;
  }
  return lhs;  
}

template <class T> inline int partitionFromCentre(T *arr, int lowerIndex, int upperIndex, int centre)
{
  int lhs=lowerIndex-1;
  int rhs=upperIndex+1;
  T pivot=arr[centre];
  T tmp;
  for (;;) 
  {
    do { lhs++; } 
      while (lhs<centre && arr[lhs]<pivot);  
    do { rhs--; } 
      while (centre<rhs && pivot<arr[rhs]);  
    if (lhs==centre || rhs==centre) break;
    tmp = arr[lhs]; arr[lhs] = arr[rhs]; arr[rhs]=tmp;
  }
  if (lhs!=rhs)
  {
    if (lhs==centre) 
      return partitionFromLeft(arr, centre, rhs);
    else
      return partitionFromRight(arr, lhs, centre);
  }
  return rhs;  
}

template <class T> void quicksortClassic(T *arr, int count, int cutOff)
{
  setSentinels(arr, count);
  arr++; count--;
  return quicksortClassic2(arr,count,cutOff);
}

template <class T> void quicksortClassic(T *arr, int count)
{
  setSentinels(arr, count);
  arr++; count--;
  return quicksortClassic2(arr,count,64);
}

template <class T> void quicksortClassic2(T *arr, int count,int cutOff)
{
  while (count>cutOff)
  {
    T pivot;
    T tmp;
    pivot = *arr;  //<-- this is replaced in BestOf3 version.  See below
    T *lhs=arr; 
    T *rhs=arr+count;
    for (;;) {
      do { lhs++; } while (*lhs<pivot);
      do { rhs--; } while (pivot<*rhs);
      if (rhs<=lhs) break;
      tmp = *lhs; *lhs=*rhs; *rhs=tmp;
    }
    *arr = *rhs; *rhs=pivot;
    if (rhs-arr < count/2)
    {
      quicksortClassic2(arr, rhs-arr, cutOff);
      count = arr + count - rhs - 1;
      arr = rhs+1;
    }
    else
    {
      quicksortClassic2(rhs+1, arr + count - rhs - 1, cutOff);
      count = rhs-arr;
    }    
  }
  insertionSort(arr,count);
}

//Module: CTR.h
#ifndef IS_CTR_DEFINED
  #ifdef _MSC_VER
    typedef volatile long CTR;
  #else
    typedef long int CTR;
  #endif
  #define IS_CTR_DEFINED
#endif
#define MUTEX_HANDLE HANDLE
#define EVENT_HANDLE HANDLE

//Module:  DumbThread.h
//Purpose: Declares the DumbThread class.
//Usage:   1. Declare your class, say XThread, inheriting from DumbThread, and
//            (a) Mandatory: Implement doWork() method
//            (b) Optional:  Override  cleanup() method
//         2. In client code:
//            (a) Declare/initialize an instance of XThread
//            (b) call XThread::start()
//            (c) call XThread::wait()
//

#pragma once
#ifndef DUMBTHREAD_H
#define DUMBTHREAD_H

#ifndef THREAD_HANDLE
  #define THREAD_HANDLE   HANDLE
#endif

#define THREADEX_FAILED ((HANDLE)(-1))
#define NULL_THREAD     ((HANDLE)(0))


#include <process.h>    //for _beginthreadex

#ifndef PTHREAD_START_DEFINED
  #define PTHREAD_START_DEFINED
  typedef unsigned (__stdcall *PTHREAD_START)(void *);
#endif

#ifndef chBEGINTHREADEX
  #define chBEGINTHREADEX(psa, cbStack, pfnStartAddr, pvParam, fdwCreate, pdwThreadID)  \
  ((THREAD_HANDLE) _beginthreadex( (void*)(psa), (unsigned) cbStack, \
    (PTHREAD_START) (pfnStartAddr), (void*) pvParam, (unsigned) (fdwCreate), \
      (unsigned *) (pdwThreadID)))
#endif

class DumbThread
{
protected:
  int             threadID;
  THREAD_HANDLE   threadHandle;
  unsigned long   returnCode;
public:
  DumbThread();
  ~DumbThread();
  bool start();
  void wait();
  virtual void doWork() = 0;
  virtual void cleanup();
  void setReturnCode(unsigned long returnCode);
  unsigned long getReturnCode() const;
};

class NotSoDumbThread: public DumbThread
{
public:
  bool start();
};

#endif //DUMBTHREAD_H

//Module:  DumbThread.cpp
//Purpose: Implementation of the DumbThread class.
namespace
{
  CTR g_dumbThreadsRunning = 0;

  unsigned long (__stdcall handToDumbThread)(void *param)
  {
    DumbThread* dumbThread = (DumbThread*)param;
    dumbThread->doWork();
    unsigned long retCode = dumbThread->getReturnCode();
    dumbThread->cleanup();
    return retCode;
  }

  unsigned long (__stdcall handToNotSoDumbThread)(void *param)
  {
    //has to decrement g_dumbThreadsRunning
    CTR threadCount = InterlockedIncrementAcquire(&g_dumbThreadsRunning);
    NotSoDumbThread* notSoDumbThread = (NotSoDumbThread*)param;
    notSoDumbThread->doWork();
    unsigned long retCode = notSoDumbThread->getReturnCode();
    notSoDumbThread->cleanup();
    threadCount = InterlockedDecrementRelease(&g_dumbThreadsRunning);
    return retCode;
  }

}

DumbThread::DumbThread()  
: threadID(0)
, threadHandle(NULL_THREAD)
, returnCode(0)
{
}
DumbThread::~DumbThread()   
{
  if (threadHandle!=NULL_THREAD)
  {
    CloseHandle(threadHandle);    
  }
}
bool DumbThread::start()
{
  threadHandle = chBEGINTHREADEX(NULL, 0, handToDumbThread, this, 0, &threadID);
  if (threadHandle==(THREADEX_FAILED)) threadHandle=NULL_THREAD;  //treat -1 condition same as 0...
  return (threadHandle!=NULL_THREAD);
}
void DumbThread::wait()
{  
  if (threadHandle!=NULL_THREAD)
  {
    WaitForSingleObject(threadHandle, INFINITE);
    CloseHandle(threadHandle);
    threadHandle=NULL_THREAD;
  }
  else
  {
    doWork();
    cleanup();
  }
}

void DumbThread::setReturnCode(unsigned long returnCodeToUse)
{
  returnCode = returnCodeToUse;
}

unsigned long DumbThread::getReturnCode() const
{
  return returnCode;
}

void DumbThread::cleanup()
{
}

bool NotSoDumbThread::start() //false indicates the work and the cleanup had to be done in the calling thread
{
  if (g_dumbThreadsRunning<16)
  {
    threadHandle = chBEGINTHREADEX(NULL, 0, handToNotSoDumbThread, this, 0, &threadID);
    if (threadHandle==(THREADEX_FAILED)) threadHandle=NULL_THREAD;  //treat -1 condition same as 0...
    return true;
  }
  else
  {
    doWork();
    cleanup();
    return false;
  }
}

//Module: quicksortExample1.h
template <class T> class QuicksortCall: public DumbThread 
{
protected
  T* m_base;
  int m_count;
  int m_cutOff;
public:
  QuicksortCall(T *base, int count, int cutOff) : m_base(base), m_count(count), m_cutOff(cutOff) { start(); }
  virtual void doWork()
  {
    if (m_count < m_cutOff) 
    {
      if (1<m_count) quicksortClassic(m_base, m_count);
      return;
    }
    int n = partitionFromCentre(m_base, 0, m_count-1, m_count/2);
    QuicksortCall c1(m_base, n, m_cutOff);
    QuicksortCall c2(m_base+n+1, m_count-n-1, m_cutOff);
    c1.wait();
    c2.wait();
  }
  static void sort(T *base, int count, int cutOff)
  {
    QuicksortCall c(base, count, cutOff);
    c.wait();
  }
};

//Module: quicksortExample2.h
template <class T> class QuicksortCall2: public DumbThread
{
protected
  T* m_base;
  int m_count;
  int m_cutOff;
public:
  QuicksortCall2(T *base, int count, int cutOff) : m_base(base), m_count(count), m_cutOff(cutOff) { start(); }
  virtual void doWork()
  {
    while (m_cutOff < m_count)
    {
      int n = partitionFromCentre(m_base, 0, m_count-1, m_count/2);
      if (n+n<m_count)
      {
        if (1<n) 
        {
          QuicksortCall2 c1(m_base, n, m_cutOff);
          m_base  += n+1;
          m_count -= n+1;
          doWork();
          c1.wait();
          return;
        }
        m_base  += n+1;
        m_count -= n+1;
      }
      else
      {
        if (1<m_count-n-1) 
        {
          QuicksortCall2 c2(m_base+n+1, m_count-n-1, m_cutOff);
          m_count = n;
          doWork();
          c2.wait();
          return;
        }
        m_count = n;
      }
    }
    if (1<m_count) quicksortClassic(m_base, m_count);
  }
  static void sort(T *base, int count, int cutOff)
  {
    QuicksortCall2 c(base, count, cutOff);
    c.wait();
  }
};

//Module: INotifiable.h
class INotifiable
{
  public:
    INotifiable();
    virtual ~INotifiable();
    virtual void notify() = 0;
    virtual bool notified() = 0;
    virtual void wait() = 0;
};

//Module: ThreadSafeCounter.h
class ThreadSafeCounter
{
  protected:
    CTR    m_Ctr;
  public:
    ThreadSafeCounter();
    ThreadSafeCounter(CTR initialValue);
    ~ThreadSafeCounter();
    CTR operator++();            
    CTR operator--();
    CTR operator+=(CTR v);
    CTR operator-=(CTR v);
    const CTR value() const;
    const bool operator!=(const CTR & v) const;        
    const bool operator==(const CTR & v) const;        
    const bool operator<=(const CTR & v) const;
};

class ThreadSafeCounterAcquireUpReleaseDown: public ThreadSafeCounter
{
  public:
    ThreadSafeCounterAcquireUpReleaseDown();
    ThreadSafeCounterAcquireUpReleaseDown(CTR initialValue);
    ~ThreadSafeCounterAcquireUpReleaseDown();
    CTR operator++();            
    CTR operator--();
    CTR operator+=(CTR v);
    CTR operator-=(CTR v);
    const CTR value() const;
    const bool operator!=(const CTR & v) const;
    const bool operator==(const CTR & v) const;
    const bool operator<=(const CTR & v) const;
};

//Module: JMutex.h
#define MUTEX_IS_CRITICAL_SECTION (0)
#define COUNTDOWN_USES_FANCY_MEMORY_SEMANTICS (1)

class JMutex
{
private:
  #if (MUTEX_IS_CRITICAL_SECTION)
    CRITICAL_SECTION m_CriticalSection;
  #else
    MUTEX_HANDLE m_hMutex;
  #endif
public:
  JMutex();
  ~JMutex();
  bool acquire(int tryLimit=-1);
  void release();
};

class JMutexLock
{
  private:  
    JMutex &m_lockOn;
  public:
    JMutexLock(JMutex &lockMe);
    ~JMutexLock();
};

//Module: JEvent.h
class JEvent
{
private:  
  EVENT_HANDLE    m_hEvent;
  JMutex              m_mutex;
  int          _pendingSets;
  
public:
  JEvent();
  void wait();  
  void set();  
  ~JEvent();
};

//Module: CountDown.h
class CountDown: public INotifiable
{
  private:
    #if (COUNTDOWN_USES_FANCY_MEMORY_SEMANTICS)
      ThreadSafeCounterAcquireUpReleaseDown m_Count;
    #else
      ThreadSafeCounter m_Count;
    #endif
    JEvent        m_zeroedEvent;
    CountDown(const CountDown &rhs); //Don't want these to get copied! Ever!
  public:
    CountDown();
    CountDown(CTR initialValue);
    virtual ~CountDown();
    virtual void notify() ;
    virtual bool notified() ;
    virtual void wait() ;
    CountDown& operator+=(CTR inc);
    CountDown& operator++();
    CTR increment();
};

//Module: ThreadSafeCounter.cpp
ThreadSafeCounter::ThreadSafeCounter()                 

  m_Ctr = 0; 
  InterlockedExchange(&m_Ctr, 0); 

ThreadSafeCounter::ThreadSafeCounter(CTR initialValue) 

  m_Ctr = initialValue;  
}
ThreadSafeCounter::~ThreadSafeCounter()        
{
}
CTR ThreadSafeCounter::operator++()            

  return InterlockedIncrement(&m_Ctr); 
}
CTR ThreadSafeCounter::operator--()          

  return InterlockedDecrement(&m_Ctr); 
}
CTR ThreadSafeCounter::operator+=(CTR v)        

  CTR result = m_Ctr;
  while (v>0) { result = InterlockedIncrement(&m_Ctr); v--; }
  while (v<0) { result = InterlockedDecrement(&m_Ctr); v++; }
  return result; 
}
CTR ThreadSafeCounter::operator-=(CTR v)        

  CTR result = m_Ctr;
  while (v<0) { result = InterlockedIncrement(&m_Ctr); v--; }
  while (v>0) { result = InterlockedDecrement(&m_Ctr); v++; }
  return result; 
}

const bool ThreadSafeCounter::operator!=(const CTR & v)  const

  return m_Ctr!=v; 
}

const bool ThreadSafeCounter::operator==(const CTR & v)  const

  return m_Ctr==v; 
}
const bool ThreadSafeCounter::operator<=(const CTR & v)  const

  return m_Ctr<=v; 
}

const CTR ThreadSafeCounter::value() const
{
  return m_Ctr;
}

ThreadSafeCounterAcquireUpReleaseDown::ThreadSafeCounterAcquireUpReleaseDown() {  } 
ThreadSafeCounterAcquireUpReleaseDown::ThreadSafeCounterAcquireUpReleaseDown(CTR initialValue)  : ThreadSafeCounter(initialValue) { }
ThreadSafeCounterAcquireUpReleaseDown::~ThreadSafeCounterAcquireUpReleaseDown()  { }

CTR ThreadSafeCounterAcquireUpReleaseDown::operator++()            

  return InterlockedIncrementAcquire(&m_Ctr); 
}
CTR ThreadSafeCounterAcquireUpReleaseDown::operator--()          

  return InterlockedDecrementRelease(&m_Ctr); 
}
CTR ThreadSafeCounterAcquireUpReleaseDown::operator+=(CTR v)        

  CTR result = m_Ctr;
  while (v>0) { result = InterlockedIncrementAcquire(&m_Ctr); v--; }
  while (v<0) { result = InterlockedDecrementRelease(&m_Ctr); v++; }
  return result; 
}
CTR ThreadSafeCounterAcquireUpReleaseDown::operator-=(CTR v)        

  CTR result = m_Ctr;
  while (v<0) { result = InterlockedIncrementAcquire(&m_Ctr); v--; }
  while (v>0) { result = InterlockedDecrementRelease(&m_Ctr); v++; }
  return result; 
}

const bool ThreadSafeCounterAcquireUpReleaseDown::operator!=(const CTR & v)  const

  return m_Ctr!=v; 
}

const bool ThreadSafeCounterAcquireUpReleaseDown::operator==(const CTR & v)  const

  return m_Ctr==v; 
}
const bool ThreadSafeCounterAcquireUpReleaseDown::operator<=(const CTR & v)  const

  return m_Ctr<=v; 
}

const CTR ThreadSafeCounterAcquireUpReleaseDown::value() const
{
  return m_Ctr;
}

//Module: JMutex.cpp
JMutex::JMutex()  

  #if (MUTEX_IS_CRITICAL_SECTION)
    InitializeCriticalSectionAndSpinCount(&m_CriticalSection,100);
  #else
    m_hMutex=CreateMutex(NULL,false,NULL);
  #endif
}
JMutex::~JMutex()  

  #if (MUTEX_IS_CRITICAL_SECTION)
    DeleteCriticalSection(&m_CriticalSection);
  #else
    CloseHandle(m_hMutex);
  #endif
}
bool JMutex::acquire(int tryLimit) 

  #ifdef __linux__
    pthread_mutex_lock(_hMutex);
  #elif (MUTEX_IS_CRITICAL_SECTION)
    EnterCriticalSection(&m_CriticalSection);
  #else
    WaitForSingleObject(m_hMutex, INFINITE);
  #endif
  return true;
}
void JMutex::release() 
{     
  #ifdef __linux__
    pthread_mutex_unlock(_hMutex);
  #elif (MUTEX_IS_CRITICAL_SECTION)
    LeaveCriticalSection(&m_CriticalSection);
  #else
    ReleaseMutex(m_hMutex);
  #endif
}

JMutexLock::JMutexLock(JMutex &lockMe) 
: m_lockOn(lockMe) 

  m_lockOn.acquire() ; 
}
JMutexLock::~JMutexLock() 

  m_lockOn.release(); 
}

//Module: JEvent.cpp
JEvent::JEvent()  
  : _pendingSets(0)

  m_hEvent = CreateEvent(NULL, true, false, NULL); 
  ResetEvent(m_hEvent);
}

void JEvent::wait()  
{  
  do
  {
    {
      JMutexLock lock(m_mutex);
      if (_pendingSets>0)
      {
        _pendingSets--;
        if (_pendingSets==0) 
        {
          ResetEvent(m_hEvent);
        }
        return;
      }
    }
    WaitForSingleObject(m_hEvent, INFINITE);
  }
  while (true);
}
void JEvent::set()  

  JMutexLock lock(m_mutex);
  ++_pendingSets;
  SetEvent(m_hEvent);
}
JEvent::~JEvent()  

  CloseHandle(m_hEvent); 
}

//Module: CountDown.cpp
CountDown::CountDown()                {}
CountDown::CountDown(const CountDown &rhs)      {}
CountDown::CountDown(CTR initialValue): m_Count(initialValue)  
{
}
CountDown::~CountDown()                {}

void CountDown::notify() 
{
  if ((--m_Count)==0)
  {
    m_zeroedEvent.set();
  }
}
bool CountDown::notified() 
{
  return m_Count <= 0;
}
void CountDown::wait() 
{
  if (m_Count.value()>0)
    m_zeroedEvent.wait();
}

CountDown& CountDown::operator+=(CTR inc) 

  m_Count+=inc; 
  return *this; 
}

CountDown& CountDown::operator++() 
{
  ++m_Count;
  return *this;
}

CTR CountDown::increment()
{
  return ++m_Count;
}

//Module: quickSortExample3.h
template <class T> class QuicksortCall3: public NotSoDumbThread
{
protected:
  T*         m_base;
  int        m_count;
  int         m_cutOff;
  CountDown* m_latch;
public:
  QuicksortCall3(T *base, int count, int cutOff, CountDown* latch)
    : m_base(base)
    , m_count(count)
    , m_cutOff(cutOff)
    , m_latch(latch)
  {
    m_latch->increment();
    start();
  }
  virtual void doWork()
  {
    while (m_cutOff < m_count)
    {
      QuicksortCall3* pc;
      int n = partitionFromCentre(m_base, 0, m_count-1, m_count/2);
      if (n+n<m_count)
      {
        pc = new QuicksortCall3(m_base, n, m_cutOff, m_latch);
        m_count -= n+1;
        m_base  += n+1;
      }
      else
      {
        pc = new QuicksortCall3(m_base+n+1, m_count-n-1, m_cutOff, m_latch);
        m_count = n;
      }
    }
    if (1<m_count) 
    {
      quicksortClassic(m_base, m_count);
    }
  }
  virtual void cleanup()
  {
    m_latch->notify();
    delete this;
  }
  static void sort(T *base, int count, int cutOff)
  {
    CountDown latch;
    QuicksortCall3* c = new QuicksortCall3(base, count, cutOff, &latch);
    latch.wait();
  }
};

//Module: IAction.h
class IAction
{
  //
  //Pure virtual
  //
  public
    IAction();
    virtual ~IAction();
    virtual void run() = 0;
};

template <class P1, class P2, class P3, class P4>
class Action4:public IAction
{
  public:
    P1 mp1; P2 mp2; P3 mp3; P4 mp4; 
    void (*mf)(P1,P2,P3,P4);
    Action4(void (*f)(P1,P2,P3,P4), P1 p1, P2 p2, P3 p3, P4 p4)
      : mf(f), mp1(p1), mp2(p2), mp3(p3), mp4(p4) {}
    ~Action4() {} 
    void run() 
    { 
      if (mf!=NULL) 
        mf(mp1,mp2,mp3,mp4) ; 
    }
};
//I've left out a bunch of other similar classes for 1-parameter, 2-parameter, 
// ... 8-parameter actions.  They're obvious.

IAction* NewSelfDeletingAction(IAction *deleteMe);

template <class P1, class P2, class P3, class P4> IAction*
  CallAction4 ( void (*f)(P1,P2,P3,P4), P1 p1, P2 p2, P3 p3, P4 p4)
{
  return NewSelfDeletingAction(new Action4<P1,P2,P3,P4>(f, p1, p2, p3, p4) );  
}

//IAction.cpp
class SelfDeletingAction:public IAction
{
  private:
    IAction *m_deleteMe;
  public:
    SelfDeletingAction(IAction *deleteMe); 
    virtual void run();
};

SelfDeletingAction::SelfDeletingAction(IAction *deleteMe): m_deleteMe(deleteMe) {}

void SelfDeletingAction::run() { m_deleteMe->run(); delete m_deleteMe; delete this; }

IAction* NewSelfDeletingAction(IAction *deleteMe)
{
  return new SelfDeletingAction(deleteMe);
}

//Module: NotifyingAction.h
class NotifyingAction:public IAction
{
  //
  //
  //
  private:
    IAction*  mInnerAction;
    INotifiable*  mTarget;
  public:
    NotifyingAction(IAction* innerAction, INotifiable *target);
    ~NotifyingAction();
    void run();
};

IAction *NewNotifyingAction(IAction *innerAction, INotifiable *target);
IAction* NewPassiveWaitAction(INotifiable *waitOnMe);

//Module: NotifyingAction.cpp
NotifyingAction::NotifyingAction(IAction* innerAction, INotifiable *target)
      : mInnerAction(innerAction)
      , mTarget(target) 

}
NotifyingAction::~NotifyingAction()
{
}
void NotifyingAction::run()
{
  mInnerAction->run();
  mTarget->notify();
}

IAction *NewNotifyingAction(IAction *innerAction, INotifiable *target)
{
  return new SelfDeletingAction(new NotifyingAction(innerAction, target));
}

//Module: IDispatcher.h
class IDispatcher
{
  //
  //
  //
  public:
    IDispatcher();
    virtual ~IDispatcher();
    virtual void optIn() = 0;
    virtual bool optInUntil(INotifiable *optOutWhenNotified) = 0;  //returns true if outputWhenNotified->notified(), false if not
    virtual void queueOrDo(IAction *action) = 0;
    virtual int startThreads(int additionalThreadCount) = 0;    
};

extern IDispatcher* gpDispatcher;

//Module: IDispatcher.cpp
IDispatcher::IDispatcher() {}
IDispatcher::~IDispatcher() {}

//Module: DivideAndConquer.h
class DivideAndConquer: public CountDown
{
  private:
    IDispatcher* m_dispatcher;
  public:
    DivideAndConquer();
    DivideAndConquer(IDispatcher *dispatcherToUse);
    ~DivideAndConquer();
    void queueOrDo(IAction *a);
    void doAndWait(IAction *a);
};

//Module: DivideAndConquer.cpp
DivideAndConquer::DivideAndConquer()
: m_dispatcher(gpDispatcher)
{
}

DivideAndConquer::DivideAndConquer(IDispatcher* useThisDispatcher)
: m_dispatcher(useThisDispatcher)
{
}

DivideAndConquer::~DivideAndConquer()
{
}

void DivideAndConquer::queueOrDo(IAction *a)
{
  increment();
  gpDispatcher->queueOrDo(NewNotifyingAction(a, this));
}

void DivideAndConquer::doAndWait(IAction *a)
{
  queueOrDo(a);
  wait();
}

//Module: IAction.cpp
IAction::IAction() {}
IAction::~IAction() {}

//Module: INotifiable.cpp
INotifiable::INotifiable() {}
INotifiable::~INotifiable() {}

//Module: ThreadSafeBase.h
class ThreadSafeBase
{
protected:
    mutable JMutex    m_QueueLock;
    JEvent        m_EventAllThreadsDead;
    ThreadSafeCounter m_Dying;
    ThreadSafeCounter m_WaitingThreadCount;
    JEvent        m_EventWorkToDo;
protected:
  void waitStarted();
  void waitFinished();
  bool isDying();
public:
  ThreadSafeBase();
  ~ThreadSafeBase();
  JMutex & mutex();
};

//Module: ThreadSafeBase.cpp
ThreadSafeBase::ThreadSafeBase() 
{
}

ThreadSafeBase::~ThreadSafeBase()
{
  {
    JMutexLock lock(m_QueueLock);
    ++m_Dying;
    m_EventWorkToDo.set();
  }
  if (m_WaitingThreadCount!=0)
    m_EventAllThreadsDead.wait();
}
void ThreadSafeBase::waitStarted() { ++m_WaitingThreadCount; }

void ThreadSafeBase::waitFinished()
{
  bool lastThread = (--m_WaitingThreadCount==0);
  if  (m_Dying!=0)
  {
    if (lastThread)
      m_EventAllThreadsDead.set();
    else
      m_EventWorkToDo.set();
  }
}

bool ThreadSafeBase::isDying()
{
  return (m_Dying!=0) ;
}

JMutex & ThreadSafeBase::mutex()
{
  return m_QueueLock;
}

//Module: NakedBuffer.h
template<class T> class NakedBuffer    //note that this is a *LIFO* buffer
{
public:
  T *data;
  T *position;
  T *top;
  bool allocated;
public:
  NakedBuffer(): allocated(true)        { position=data=new T[10]; top=data+10; }
  NakedBuffer(int size):allocated(true) { position=data=new T[size]; top=data+size; }
  NakedBuffer(T *fromData, int size)    { allocated=false; data=fromData; top=position=fromData+size; }
  ~NakedBuffer()                  { if (allocated) delete [] data; data=position=top=NULL; allocated=false; }
  T pop()                      { return *--position; }
  int push(const T& v)                { *position++=v; return 1; }
  bool isEmpty() const              { return position==data; }
  int  findValue(const T& v) const       { for (int i=position-data-1;i>0;i--)
                        if (data[i]==v) return i; return -1; 
                      }
  bool valueExists(const T& v) const    { return findValue(v)>=0; }
  bool isSorted() const          { return false; }  
  bool isStable() const          { return false; }  
  T& operator[](const int index)      { return data[index]; }
  int count() const            { return position-data; }
};

template<class T> void reallocate(T*& base, int oldSize, int newSize)
{
  if (oldSize<newSize)
  {
    T *newBase = new T [newSize];
    for (int i=0; i<oldSize; i++)
      newBase[i] = base[i];
    T *oldBase = base;
    base = newBase;
    delete [] oldBase;
  }  
}

//Module: LIFOBuffer.h
template<class T> class LIFOBuffer:public NakedBuffer<T>  //a bit safer than NakedBuffer
{
  using NakedBuffer<T>::position;
  using NakedBuffer<T>::data;
  using NakedBuffer<T>::top;
  using NakedBuffer<T>::allocated;
public:
  LIFOBuffer():NakedBuffer<T>()          { }
  LIFOBuffer(int size):NakedBuffer<T>(size) { }
  LIFOBuffer(T *fromData, int size):NakedBuffer<T>(size)
  {
    for (int i=0;i<size;++i)
      data[i]=fromData[i];
    position=data+size;
  }
  void grow(int newSize)            
  { 
    int count=position-data;
    if (newSize>count)
    {
      if (data==NULL) 
      {
        if (newSize<2) newSize=2;
        data=new T[newSize];
        position=data;
        top=data+newSize;
        return;
      }
      if (count>newSize) 
      {
        count=newSize;      //ouch! may leave things hanging!
      }
      reallocate(data, count, newSize);
      position=data+count;
      top=data+newSize;
    }
  }    
  int push(const T& v)       { if (position==top) grow(count()<<1); *position++=v; return 1; }    
  T& operator[](const int index) { return data[index]; }
  int count() const         { return position-data; }
  bool isEmpty() const       { return position==data; }
  inline int findValue(const T& v) const
  {  
    for (T *scan=data;scan<position;scan++)
    {
      if (*scan==v) 
        return scan-data;
    }
    return -1; 
  }
  inline bool valueExists(const T& v) const
  { 
    return findValue(v)>=0; 
  }
  T pop() 
  { 
    if (position==data) throw __FUNCTION__ ": cannot pop() when isEmpty()";
    return *--position; 
  }
  int nativePush(const T& v) 
  { 
    if (position==top) 
      if (position==NULL)
        grow(2);
      else
        grow(nativeCount()<<1); 

    *position++=v; 
    return 1; 
  }
  T nativePop()      { return *--position; }
  int nativeCount()    { return position-data; }
  void freeUp()
  {
    this->clear();
    if (data!=NULL) delete [] data; 
    data=NULL;
    position=NULL;
    allocated=false;
  }
};

//Module: ThreadSafeQueue.h
template <class T, class Buffer=FIFOBuffer<T> > class ThreadSafeQueue:public ThreadSafeBase
{
  protected:
    Buffer            m_InnerQueue;

  public:
    ThreadSafeQueue()
      : ThreadSafeBase() //note: passive waiting allowed... for now.
    {
    }
    bool tryPush(T pushMe)
    {      
      {
        JMutexLock lock(m_QueueLock);
        if (isDying()) return false;
        m_InnerQueue.push(pushMe);
      }
      m_EventWorkToDo.set();
      return true;
    }
    bool waitPush(T pushMe)
    {
      {
        JMutexLock lock(m_QueueLock);
        m_InnerQueue.push(pushMe);
        if (isDying()) return false;
      }
      m_EventWorkToDo.set();
      return true;
    }
    bool tryPop(T &popHere)
    {
      JMutexLock lock(m_QueueLock);
      if (m_InnerQueue.isEmpty())
      {
        return false;
      }
      popHere = m_InnerQueue.pop();
      return true;
    }
    bool waitPop(T &popHere)
    {
      waitStarted();
      bool rc = false;
      while (!rc && m_Dying==0)
      {
        m_EventWorkToDo.wait();
        rc = tryPop(popHere);
      }
      waitFinished();
      return rc;
    }
    int count() const
    {
      JMutexLock lock(m_QueueLock);
      return m_InnerQueue.count();
    }
    const T operator [] (int index) const  //note: cannot be T&, because reference would not be threadsafe
    {
      JMutexLock lock(m_QueueLock);
      if (index<0 || m_InnerQueue.count()<=index)
        throw "index out of range";
      return m_InnerQueue[index];
    }
};

template <class T> class ThreadSafeStack: public ThreadSafeQueue<T, LIFOBuffer<T> > 
{
};

//Module: DispatcherBase.h
class DispatcherBase: public IDispatcher
{
  protected:
    ThreadSafeCounter      m_ThreadCount;
    virtual int threadCount();

  public:
    virtual void optIn() = 0;
    virtual bool optInUntil(INotifiable *optOutWhenNotified) = 0;
    virtual void queueOrDo(IAction *action) = 0;
    virtual int startThreads(int additionalThreadCount);
};

//Module: DispatcherBase.cpp
unsigned long (__stdcall handToDispatcher)(void *param)
{
  IDispatcher *dispatcher = (IDispatcher*)param;
  dispatcher->optIn();
  return 0;
}

int DispatcherBase::startThreads(int threadCount)
{
  int threadsStarted=0;
  for (;threadCount>0;threadCount--)
  {
    int threadID;
    THREAD_HANDLE hThread = chBEGINTHREADEX(NULL, 0, handToDispatcher, this, 0, &threadID);
    if (hThread==(THREAD_HANDLE)(-1)) hThread=0;  //treat -1 condition same as 0...
    if (hThread  !=0)
      threadsStarted++;
  }
  return threadsStarted;
}

int DispatcherBase::threadCount()
{
  return m_ThreadCount.value();
}

//Module: StackDispatcher.h
class StackDispatcher: public DispatcherBase
{
  protected:
    ThreadSafeStack<IAction*>  m_Stack;

  public:
    StackDispatcher();
    ~StackDispatcher();
    virtual void optIn();
    virtual bool optInUntil(INotifiable *optOutWhenNotified);
    virtual void queueOrDo(IAction *action);
};

//Module: StackDispatcher.cpp
StackDispatcher::StackDispatcher()  {}
StackDispatcher::~StackDispatcher() {}
void StackDispatcher::queueOrDo(IAction *action)
{
  if (m_ThreadCount!=0)
    if (m_Stack.tryPush(action))
      return;
  action->run();
}

void StackDispatcher::optIn()
{
  ++m_ThreadCount;
  IAction *nextAction=NULL;
  while (m_Stack.waitPop(nextAction))
  {
    nextAction->run();
    nextAction=NULL;
  }
  --m_ThreadCount;
}  

bool StackDispatcher::optInUntil(INotifiable* optOutWhenNotified)
{
  IAction *nextAction=NULL;
  while (m_Stack.waitPop(nextAction))
  {
    nextAction->run();
    nextAction=NULL;
    if (optOutWhenNotified->notified())
      return true;
  }
  return false;
}

//Module: quicksortExample4.h
template <class T> void parallelQuicksortSubarray(T *base, int elementCount, int threadingElementCount,
                          DivideAndConquer* task)
{
  int pivot = 0;
  while (elementCount>64)
  {
    int pivot = partitionFromCentre(base, 0, elementCount-1, elementCount/2);    
    if (pivot<elementCount/2)
    {
      if (threadingElementCount<pivot)
      {
        task->queueOrDo( CallAction4(parallelQuicksortSubarray<T>, base, pivot, threadingElementCount, task) );
      }
      else
      {
        quicksortClassic(base, pivot, 32);
      }
      elementCount -= (pivot+1);
      base += pivot + 1;
    }
    else
    {
      if (pivot+threadingElementCount<elementCount)
      {
        task->queueOrDo(  CallAction4(parallelQuicksortSubarray<T>, base+pivot+1, elementCount-pivot-1, threadingElementCount, task) ) ;
      }
      else
      {
        quicksortClassic(base+pivot+1, elementCount-pivot-1, 32);
      }
      elementCount = pivot;
    }
  }
  if (elementCount>1)
    insertionSort(base, elementCount);
}

template <class T>   void parallelQuicksort(T *base, int elementCount, int /*processorCount*//, int threadingThreshold=50000)
{
  DivideAndConquer t;
  t.doAndWait ( CallAction4(parallelQuicksortSubarray<T>, base, elementCount,threadingThreshold, &t));
}

//Module: randomIntegerInputs.h
void sdrand(double seed);
double drand();
int *newRandomPermutationArray(int count);

//Module: randomIntegerInputs.cpp
void sdrand(double seed)
{
  srand((int)(seed*32767));
}

double drand()
{
  return ((double)(rand()*32768 + rand()))/32768.0/32768.0;
}

int *newRandomPermutationArray(int count)
{
  int *testData = new int[count];
  testData[0]=0;
  for (int i=1;i<count;++i)
  {
    int j = (int)floor(drand() * (i+1));
    testData[i] = testData[j];
    testData[j] = i;
  }
  return testData;
}

template <class T> void randomlyPermuteArray(T *data, int count)
{
  T v;
  int j;
  for (int i=1;i<count;++i)
  {
    v = data[i];
    j = (int)floor(drand() * (i+1));    
    data[i] = data[j];
    data[j] = v;    
  }
}

//module: MTQuicksort.cpp
StackDispatcher myDispatcher;
IDispatcher* gpDispatcher;

#define output std::cout

int main(int argc, char * argv[])
{
  gpDispatcher = &myDispatcher;
  gpDispatcher->startThreads(4);

  int n=10000000;
  int *base = newRandomPermutationArray(n); 
  LARGE_INTEGER start;
  LARGE_INTEGER finish;
  LARGE_INTEGER freq;
  QueryPerformanceFrequency(&freq);
  double ref=0;
  for (int m=65536*8*8;m>=16;m/=2)
  {    
    output << m;

    if (ref==0)
    {
      randomlyPermuteArray(base, n);
      QueryPerformanceCounter(&start);
      quicksortClassic(base, n);
      QueryPerformanceCounter(&finish);
      ref = (double) ( finish.QuadPart - start.QuadPart ) / (double) freq.QuadPart;
    }
    output << "\t" << ref;

    if (1024<m)
    {
      randomlyPermuteArray(base, n);
      QueryPerformanceCounter(&start);
      QuicksortCall<int>::sort(base, n, m);
      QueryPerformanceCounter(&finish);
      output << "\t" << (double) ( finish.QuadPart - start.QuadPart ) / (double) freq.QuadPart;

      randomlyPermuteArray(base, n);
      QueryPerformanceCounter(&start);
      QuicksortCall2<int>::sort(base, n, m);
      QueryPerformanceCounter(&finish);
      output << "\t" << (double) ( finish.QuadPart - start.QuadPart ) / (double) freq.QuadPart;
    }
    else
      output << "\t\t";

    randomlyPermuteArray(base, n);
    QueryPerformanceCounter(&start);
    QuicksortCall3<int>::sort(base, n, m);
    QueryPerformanceCounter(&finish);
    output << "\t" << (double) ( finish.QuadPart - start.QuadPart ) / (double) freq.QuadPart;

    randomlyPermuteArray(base, n);
    QueryPerformanceCounter(&start);
    parallelQuicksort(base, n, 4, m);
    QueryPerformanceCounter(&finish);
    output << "\t" << (double) ( finish.QuadPart - start.QuadPart ) / (double) freq.QuadPart;
    output << "\n";
  }
  /*
  FILE *phial;
  fopen_s(&phial, ".\\output.txt","w");
  fwrite(output.value(), 1, strlen(output.value()), phial);
  fclose(phial);
  debug << output;
  */
/
  delete [] base;
  return 0;
}

No comments:

Post a Comment