Planeshift
|
00001 /* 00002 * genqueue.h by Matze Braun <[email protected]> 00003 * 00004 * Copyright (C) 2001 Atomic Blue ([email protected], http://www.atomicblue.org) 00005 * 00006 * 00007 * This program is free software; you can redistribute it and/or 00008 * modify it under the terms of the GNU General Public License 00009 * as published by the Free Software Foundation (version 2 of the License) 00010 * This program is distributed in the hope that it will be useful, 00011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00013 * GNU General Public License for more details. 00014 * You should have received a copy of the GNU General Public License 00015 * along with this program; if not, write to the Free Software 00016 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. 00017 * 00018 */ 00019 00020 #ifndef __GENQUEUE_H__ 00021 #define __GENQUEUE_H__ 00022 00023 #include <csutil/ref.h> 00024 #include <csutil/threading/mutex.h> 00025 #include <csutil/threading/condition.h> 00026 00027 #include "util/pserror.h" 00028 00038 template <class queuetype, template <class T> class refType = csRef > 00039 class GenericRefQueue 00040 { 00041 public: 00042 GenericRefQueue(unsigned int maxsize = 500) 00043 { 00044 /* we make the buffer 1 typ bigger, so we can avoid one check and one 00045 variable when testing if buffer is full */ 00046 qsize = maxsize; 00047 qbuffer = new refType<queuetype>[qsize + 1](); 00048 qstart = qend = 0; 00049 } 00050 00051 ~GenericRefQueue() 00052 { 00053 delete[] qbuffer; 00054 } 00055 00058 bool AddWait(queuetype* msg, csTicks timeout = 0) 00059 { 00060 // is there's a space in the queue left just add it 00061 CS::Threading::RecursiveMutexScopedLock lock(mutex); 00062 while(true) 00063 { 00064 bool added = Add(msg); 00065 if (added) 00066 { 00067 return true; 00068 } 00069 Error1("Queue full! Waiting.\n"); 00070 00071 // Wait release mutex before waiting so that it is possible to 00072 // add new messages. 00073 if (!datacondition.Wait(mutex, timeout)) 00074 { 00075 // Timed out waiting for new message 00076 return false; 00077 } 00078 } 00079 } 00080 00082 bool Add(queuetype* msg) 00083 { 00084 unsigned int tqend; 00085 00086 CS::Threading::RecursiveMutexScopedLock lock(mutex); 00087 00088 if (!msg->GetPending()) 00089 { 00090 tqend = (qend + 1) % qsize; 00091 // check if queue is full 00092 if (tqend == qstart) 00093 { 00094 Interrupt(); 00095 return false; 00096 } 00097 // check are we having a refcount race (in which msg would already be destroyed) 00098 CS_ASSERT(msg->GetRefCount() > 0); 00099 // add Message to queue 00100 qbuffer[qend]=msg; 00101 qend=tqend; 00102 00103 msg->SetPending(true); 00104 00105 Interrupt(); 00106 } 00107 return true; 00108 } 00109 00110 // Peeks at the next message from the queue but does not remove it. 00111 csPtr<queuetype> Peek() 00112 { 00113 CS::Threading::RecursiveMutexScopedLock lock(mutex); 00114 00115 csRef<queuetype> ptr; 00116 00117 unsigned int qpointer = qstart; 00118 00119 // if this is a weakref queue we should skip over null entries 00120 while(!ptr.IsValid()) 00121 { 00122 // check if queue is empty 00123 if (qpointer == qend) 00124 { 00125 return 0; 00126 } 00127 00128 // removes Message from queue 00129 ptr = qbuffer[qpointer]; 00130 00131 qpointer = (qpointer + 1) % qsize; 00132 } 00133 00134 return csPtr<queuetype>(ptr); 00135 } 00136 00142 csPtr<queuetype> Get() 00143 { 00144 CS::Threading::RecursiveMutexScopedLock lock(mutex); 00145 00146 csRef<queuetype> ptr; 00147 00148 // if this is a weakref queue we should skip over null entries 00149 while(!ptr.IsValid()) 00150 { 00151 // check if queue is empty 00152 if (qstart == qend) 00153 { 00154 Interrupt(); 00155 return 0; 00156 } 00157 00158 // removes Message from queue 00159 ptr = qbuffer[qstart]; 00160 qbuffer[qstart] = 0; 00161 00162 qstart = (qstart + 1) % qsize; 00163 } 00164 00165 ptr->SetPending(false); 00166 Interrupt(); 00167 00168 return csPtr<queuetype>(ptr); 00169 } 00170 00172 csPtr<queuetype> GetWait(csTicks timeout) 00173 { 00174 // is there's a message in the queue left just return it 00175 CS::Threading::RecursiveMutexScopedLock lock(mutex); 00176 while(true) 00177 { 00178 csRef<queuetype> temp = Get(); 00179 if (temp) 00180 { 00181 return csPtr<queuetype> (temp); 00182 } 00183 00184 // Wait release mutex before waiting so that it is possible to 00185 // add new messages. 00186 if (!datacondition.Wait(mutex, timeout)) 00187 { 00188 // Timed out waiting for new message 00189 return 0; 00190 } 00191 } 00192 CS_ASSERT(false); 00193 return 0; 00194 } 00195 00199 void Interrupt() 00200 { 00201 datacondition.NotifyOne(); 00202 } 00203 00207 unsigned int Count() 00208 { 00209 CS::Threading::RecursiveMutexScopedLock lock(mutex); 00210 if(qend < qstart) 00211 return qend + qsize - qstart; 00212 else 00213 return qend - qstart; 00214 } 00215 00216 bool IsFull() 00217 { 00218 CS::Threading::RecursiveMutexScopedLock lock(mutex); 00219 return ((qend + 1) % qsize == qstart); 00220 } 00221 protected: 00222 00223 refType<queuetype>* qbuffer; 00224 unsigned int qstart, qend, qsize; 00225 CS::Threading::RecursiveMutex mutex; 00226 CS::Threading::Condition datacondition; 00227 }; 00228 00231 #endif