Following is a custom thread pool class thatI attempted to write. Would you please take a look at the code and see what design issues (or coding ) issues does it have?
namespace Gemini.Threading.ThreadPool
{
public delegate void CallBackHandler(object context);
public sealed class CustomThreadPool : IDisposable
{
#region private memebers...
private readonly int _CurrentPoolCount;
private List<ThreadData> _Threads = new List<ThreadData>();
private ManualResetEvent _ItemsInQueueEvent = new ManualResetEvent(false);
private SynchQueue _QueueWorkList;
private static void OnThreadStart(object o)
{
try
{
ThreadData data = (ThreadData)o;
Interlocked.Exchange(ref data._InUse, 0);
data._startedEvent.Set();
SynchQueue pendingQueue = data._PendingQueue;
Console.WriteLine(" ThreadPool[tid:{0}]thread started", Thread.CurrentThread.ManagedThreadId);
while (true)
{
int eventIndex = WaitHandle.WaitAny(new WaitHandle[] { data._startEvent, data._QueueState });
Interlocked.Exchange(ref data._InUse, 1);
CallBackHandler callbk = null;
object context = null;
if (eventIndex == 1)
{
QueueData qdata = pendingQueue.DeQueue();
if (qdata == null)
{
continue;
}
DebugOutput(qdata);
callbk = qdata._Callback;
context = qdata._Context;
}
else if (eventIndex == 0)
{
Console.WriteLine("ThreadPool[tid{0}]:: processing the non-queued work", Thread.CurrentThread.ManagedThreadId);
callbk = data._callback;
context = data._Context;
}
else throw new Exception("Unknown Error");
if (callbk != null)
{
callbk(context);
}
else throw new ArgumentException("No callback specified");
Interlocked.Exchange(ref data._InUse, 0);
}
}
catch (ThreadAbortException){}
}
private static void DebugOutput(QueueData qdata)
{
TimeSpan tm = DateTime.Now - qdata.dtEnqueuedTime;
string time = "";
if (tm.Days > 0)
time += tm.Days.ToString() + "days ";
if (tm.Hours > 0)
time += tm.Hours.ToString() + "hrs ";
if (tm.Minutes > 0)
time += tm.Minutes.ToString() + "mins ";
if (tm.Seconds > 0)
time += tm.Seconds.ToString() + "secs ";
if (tm.Milliseconds > 0)
time += tm.Milliseconds.ToString() + "ms ";
Console.WriteLine("ThreadPool[tid:{0}]::Processing Queued Worked after : {1}", Thread.CurrentThread.ManagedThreadId, time);
}
#endregion
#region public methods
public CustomThreadPool(int count)
{
_CurrentPoolCount = count;
_QueueWorkList = new SynchQueue(_ItemsInQueueEvent);
//initialize a fixed number of threads....
for (int i = 0; i < _CurrentPoolCount; ++i)
{
ThreadData dt = new ThreadData(_ItemsInQueueEvent, _QueueWorkList);
_Threads.Add(dt);
WaitHandle.WaitAll(new WaitHandle[] { dt._startedEvent });
}
}
public void Dispose()
{
foreach (ThreadData td in _Threads)
{
td.Dispose();
}
}
// processes the data either immediately or queues up the data...
// returns true: if data is queued up, otherwise false
public bool Process(CallBackHandler workcallback, object context)
{
foreach (ThreadData data in _Threads)
{
if (Interlocked.CompareExchange(ref data._InUse, 1, 0) == 0)
{
//Interlocked.Exchange...
data._callback = workcallback;
data._Context = context;
data._startEvent.Set();
return false;
}
}
//else we need to queue the call in the queue and as soon as a thread becomes ready we need to pick next
//item from the queue.
//no idle thread found , queue the data...
_QueueWorkList.Enqueue(new QueueData(workcallback, context));
return true;
}
#endregion
#region internal classes...
internal sealed class ThreadData
{
internal CallBackHandler _callback;
internal Thread _Thread;
internal int _InUse;
internal object _Context;
internal AutoResetEvent _startedEvent = new AutoResetEvent(false);
internal ManualResetEvent _QueueState;
internal SynchQueue _PendingQueue;
internal AutoResetEvent _startEvent = new AutoResetEvent(false);
internal void Dispose()
{
if (_Thread != null)
{
_Thread.Abort();
_Thread.Join();
}
}
internal ThreadData(ManualResetEvent qs, SynchQueue sq)
{
_PendingQueue = sq;
_QueueState = qs;
_InUse = 0;
_Thread = new Thread(new ParameterizedThreadStart(CustomThreadPool.OnThreadStart));
_Thread.Start(this);
}
}
internal sealed class QueueData
{
internal DateTime dtEnqueuedTime;
internal DateTime dtDequeuedTime;
internal CallBackHandler _Callback;
internal object _Context;
internal QueueData(CallBackHandler cb, object o)
{
_Callback = cb;
_Context = o;
}
}
internal class SynchQueue
{
private Queue<QueueData> _QueuedCallBacks = new Queue<QueueData>();
private ManualResetEvent _QueueState;
internal SynchQueue(ManualResetEvent handle)
{
_QueueState = handle;
}
internal void Enqueue(QueueData qd)
{
lock (this)
{
qd.dtEnqueuedTime = DateTime.Now;
_QueuedCallBacks.Enqueue(qd);
_QueueState.Set();
}
}
internal QueueData DeQueue()
{
lock (this)
{
QueueData data = null;
if (_QueuedCallBacks.Count == 0)
_QueueState.Reset();
else
data = _QueuedCallBacks.Dequeue();
return data;
}
}
}
#endregion
}
}
:::usuage:
public static void Main(string [] args){
Gemini.Threading.ThreadPool.CustomThreadPool tp = new Gemini.Threading.ThreadPool.CustomThreadPool(iThreadCount);
for (int i = 0; i < iCallCount; ++i)
{
tp.Process(new CallBackHandler(OnCallback), null);
}
}
static void OnCallback(object o) {
Console.WriteLine("do some work here");
}