Planeshift

genrefqueue.h

Go to the documentation of this file.
00001 /*
00002 * genqueue.h by Matze Braun <[email protected]>
00003 *
00004 * Copyright (C) 2001 Atomic Blue ([email protected], http://www.atomicblue.org) 
00005 *
00006 *
00007 * This program is free software; you can redistribute it and/or
00008 * modify it under the terms of the GNU General Public License
00009 * as published by the Free Software Foundation (version 2 of the License)
00010 * This program is distributed in the hope that it will be useful,
00011 * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013 * GNU General Public License for more details.
00014 * You should have received a copy of the GNU General Public License
00015 * along with this program; if not, write to the Free Software
00016 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
00017 *
00018 */
00019 
00020 #ifndef __GENQUEUE_H__
00021 #define __GENQUEUE_H__
00022 
00023 #include <csutil/ref.h>
00024 #include <csutil/threading/mutex.h>
00025 #include <csutil/threading/condition.h>
00026 
00027 #include "util/pserror.h"
00028 
00038 template <class queuetype, template <class T> class refType = csRef >
00039 class GenericRefQueue
00040 {
00041 public:
00042     GenericRefQueue(unsigned int maxsize = 500)
00043     {
00044         /* we make the buffer 1 typ bigger, so we can avoid one check and one
00045          variable when testing if buffer is full */
00046         qsize = maxsize;
00047         qbuffer = new refType<queuetype>[qsize + 1]();
00048         qstart = qend = 0;
00049     }
00050 
00051     ~GenericRefQueue()
00052     {
00053         delete[] qbuffer;
00054     }
00055     
00058     bool AddWait(queuetype* msg, csTicks timeout = 0)
00059     {
00060         // is there's a space in the queue left just add it
00061         CS::Threading::RecursiveMutexScopedLock lock(mutex);
00062         while(true)
00063         {
00064             bool added = Add(msg);
00065             if (added)
00066             {
00067                 return true;
00068             }
00069             Error1("Queue full! Waiting.\n");
00070             
00071             // Wait release mutex before waiting so that it is possible to
00072             // add new messages.
00073             if (!datacondition.Wait(mutex, timeout))
00074             {
00075                 // Timed out waiting for new message
00076                 return false;
00077             }
00078         }
00079     }
00080         
00082     bool Add(queuetype* msg)
00083     {
00084         unsigned int tqend;
00085 
00086         CS::Threading::RecursiveMutexScopedLock lock(mutex);
00087 
00088         if (!msg->GetPending())
00089         {
00090             tqend = (qend + 1) % qsize;
00091             // check if queue is full
00092             if (tqend == qstart)
00093             {
00094                 Interrupt();
00095                 return false;
00096             }
00097             // check are we having a refcount race (in which msg would already be destroyed)
00098                         CS_ASSERT(msg->GetRefCount() > 0);
00099             // add Message to queue
00100             qbuffer[qend]=msg;
00101             qend=tqend;
00102 
00103             msg->SetPending(true);
00104 
00105             Interrupt();
00106         }
00107         return true;
00108     }
00109     
00110     // Peeks at the next message from the queue but does not remove it.
00111     csPtr<queuetype> Peek()
00112         {
00113         CS::Threading::RecursiveMutexScopedLock lock(mutex);
00114 
00115         csRef<queuetype> ptr;
00116         
00117         unsigned int qpointer = qstart;
00118         
00119         // if this is a weakref queue we should skip over null entries
00120         while(!ptr.IsValid())
00121         {
00122             // check if queue is empty
00123             if (qpointer == qend)
00124             {
00125                 return 0;
00126             }
00127 
00128             // removes Message from queue
00129             ptr = qbuffer[qpointer];
00130             
00131             qpointer = (qpointer + 1) % qsize;
00132         }
00133 
00134         return csPtr<queuetype>(ptr);
00135         }
00136 
00142     csPtr<queuetype> Get()
00143     {
00144         CS::Threading::RecursiveMutexScopedLock lock(mutex);
00145 
00146         csRef<queuetype> ptr;
00147         
00148         // if this is a weakref queue we should skip over null entries
00149         while(!ptr.IsValid())
00150         {
00151             // check if queue is empty
00152             if (qstart == qend)
00153             {
00154                 Interrupt();
00155                 return 0;
00156             }
00157 
00158             // removes Message from queue
00159             ptr = qbuffer[qstart];
00160             qbuffer[qstart] = 0;
00161             
00162             qstart = (qstart + 1) % qsize;
00163         }
00164 
00165         ptr->SetPending(false);
00166         Interrupt();
00167 
00168         return csPtr<queuetype>(ptr);
00169     }
00170 
00172     csPtr<queuetype> GetWait(csTicks timeout)
00173     {
00174         // is there's a message in the queue left just return it
00175         CS::Threading::RecursiveMutexScopedLock lock(mutex);
00176         while(true)
00177         {
00178             csRef<queuetype> temp = Get();
00179             if (temp)
00180             {
00181                 return csPtr<queuetype> (temp);
00182             }
00183 
00184             // Wait release mutex before waiting so that it is possible to
00185             // add new messages.
00186             if (!datacondition.Wait(mutex, timeout))
00187             {
00188                 // Timed out waiting for new message
00189                 return 0;
00190             }
00191         }
00192         CS_ASSERT(false);
00193         return 0;
00194     }
00195 
00199     void Interrupt()
00200     {
00201         datacondition.NotifyOne();
00202     }
00203 
00207     unsigned int Count()
00208     {
00209         CS::Threading::RecursiveMutexScopedLock lock(mutex);
00210         if(qend < qstart)
00211             return qend + qsize - qstart;
00212         else
00213             return qend - qstart;
00214     }
00215     
00216     bool IsFull()
00217     {
00218         CS::Threading::RecursiveMutexScopedLock lock(mutex);
00219         return ((qend + 1) % qsize == qstart);
00220     }
00221 protected:
00222 
00223     refType<queuetype>* qbuffer;
00224     unsigned int qstart, qend, qsize;
00225     CS::Threading::RecursiveMutex mutex;
00226     CS::Threading::Condition datacondition;
00227 };
00228 
00231 #endif