00001 /*------------------------------------------------------------------------- 00002 * 00003 * sinvaladt.c 00004 * POSTGRES shared cache invalidation data manager. 00005 * 00006 * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group 00007 * Portions Copyright (c) 1994, Regents of the University of California 00008 * 00009 * 00010 * IDENTIFICATION 00011 * src/backend/storage/ipc/sinvaladt.c 00012 * 00013 *------------------------------------------------------------------------- 00014 */ 00015 #include "postgres.h" 00016 00017 #include <signal.h> 00018 #include <unistd.h> 00019 00020 #include "miscadmin.h" 00021 #include "storage/backendid.h" 00022 #include "storage/ipc.h" 00023 #include "storage/proc.h" 00024 #include "storage/procsignal.h" 00025 #include "storage/shmem.h" 00026 #include "storage/sinvaladt.h" 00027 #include "storage/spin.h" 00028 00029 00030 /* 00031 * Conceptually, the shared cache invalidation messages are stored in an 00032 * infinite array, where maxMsgNum is the next array subscript to store a 00033 * submitted message in, minMsgNum is the smallest array subscript containing 00034 * a message not yet read by all backends, and we always have maxMsgNum >= 00035 * minMsgNum. (They are equal when there are no messages pending.) For each 00036 * active backend, there is a nextMsgNum pointer indicating the next message it 00037 * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every 00038 * backend. 00039 * 00040 * (In the current implementation, minMsgNum is a lower bound for the 00041 * per-process nextMsgNum values, but it isn't rigorously kept equal to the 00042 * smallest nextMsgNum --- it may lag behind. We only update it when 00043 * SICleanupQueue is called, and we try not to do that often.) 00044 * 00045 * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES 00046 * entries. We translate MsgNum values into circular-buffer indexes by 00047 * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as 00048 * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum 00049 * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space 00050 * in the buffer. If the buffer does overflow, we recover by setting the 00051 * "reset" flag for each backend that has fallen too far behind. A backend 00052 * that is in "reset" state is ignored while determining minMsgNum. When 00053 * it does finally attempt to receive inval messages, it must discard all 00054 * its invalidatable state, since it won't know what it missed. 00055 * 00056 * To reduce the probability of needing resets, we send a "catchup" interrupt 00057 * to any backend that seems to be falling unreasonably far behind. The 00058 * normal behavior is that at most one such interrupt is in flight at a time; 00059 * when a backend completes processing a catchup interrupt, it executes 00060 * SICleanupQueue, which will signal the next-furthest-behind backend if 00061 * needed. This avoids undue contention from multiple backends all trying 00062 * to catch up at once. However, the furthest-back backend might be stuck 00063 * in a state where it can't catch up. Eventually it will get reset, so it 00064 * won't cause any more problems for anyone but itself. But we don't want 00065 * to find that a bunch of other backends are now too close to the reset 00066 * threshold to be saved. So SICleanupQueue is designed to occasionally 00067 * send extra catchup interrupts as the queue gets fuller, to backends that 00068 * are far behind and haven't gotten one yet. As long as there aren't a lot 00069 * of "stuck" backends, we won't need a lot of extra interrupts, since ones 00070 * that aren't stuck will propagate their interrupts to the next guy. 00071 * 00072 * We would have problems if the MsgNum values overflow an integer, so 00073 * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND 00074 * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be 00075 * large so that we don't need to do this often. It must be a multiple of 00076 * MAXNUMMESSAGES so that the existing circular-buffer entries don't need 00077 * to be moved when we do it. 00078 * 00079 * Access to the shared sinval array is protected by two locks, SInvalReadLock 00080 * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this 00081 * authorizes them to modify their own ProcState but not to modify or even 00082 * look at anyone else's. When we need to perform array-wide updates, 00083 * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to 00084 * lock out all readers. Writers take SInvalWriteLock (always in exclusive 00085 * mode) to serialize adding messages to the queue. Note that a writer 00086 * can operate in parallel with one or more readers, because the writer 00087 * has no need to touch anyone's ProcState, except in the infrequent cases 00088 * when SICleanupQueue is needed. The only point of overlap is that 00089 * the writer wants to change maxMsgNum while readers need to read it. 00090 * We deal with that by having a spinlock that readers must take for just 00091 * long enough to read maxMsgNum, while writers take it for just long enough 00092 * to write maxMsgNum. (The exact rule is that you need the spinlock to 00093 * read maxMsgNum if you are not holding SInvalWriteLock, and you need the 00094 * spinlock to write maxMsgNum unless you are holding both locks.) 00095 * 00096 * Note: since maxMsgNum is an int and hence presumably atomically readable/ 00097 * writable, the spinlock might seem unnecessary. The reason it is needed 00098 * is to provide a memory barrier: we need to be sure that messages written 00099 * to the array are actually there before maxMsgNum is increased, and that 00100 * readers will see that data after fetching maxMsgNum. Multiprocessors 00101 * that have weak memory-ordering guarantees can fail without the memory 00102 * barrier instructions that are included in the spinlock sequences. 00103 */ 00104 00105 00106 /* 00107 * Configurable parameters. 00108 * 00109 * MAXNUMMESSAGES: max number of shared-inval messages we can buffer. 00110 * Must be a power of 2 for speed. 00111 * 00112 * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow. 00113 * Must be a multiple of MAXNUMMESSAGES. Should be large. 00114 * 00115 * CLEANUP_MIN: the minimum number of messages that must be in the buffer 00116 * before we bother to call SICleanupQueue. 00117 * 00118 * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once 00119 * we exceed CLEANUP_MIN. Should be a power of 2 for speed. 00120 * 00121 * SIG_THRESHOLD: the minimum number of messages a backend must have fallen 00122 * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT. 00123 * 00124 * WRITE_QUANTUM: the max number of messages to push into the buffer per 00125 * iteration of SIInsertDataEntries. Noncritical but should be less than 00126 * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once 00127 * per iteration. 00128 */ 00129 00130 #define MAXNUMMESSAGES 4096 00131 #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144) 00132 #define CLEANUP_MIN (MAXNUMMESSAGES / 2) 00133 #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16) 00134 #define SIG_THRESHOLD (MAXNUMMESSAGES / 2) 00135 #define WRITE_QUANTUM 64 00136 00137 /* Per-backend state in shared invalidation structure */ 00138 typedef struct ProcState 00139 { 00140 /* procPid is zero in an inactive ProcState array entry. */ 00141 pid_t procPid; /* PID of backend, for signaling */ 00142 PGPROC *proc; /* PGPROC of backend */ 00143 /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */ 00144 int nextMsgNum; /* next message number to read */ 00145 bool resetState; /* backend needs to reset its state */ 00146 bool signaled; /* backend has been sent catchup signal */ 00147 bool hasMessages; /* backend has unread messages */ 00148 00149 /* 00150 * Backend only sends invalidations, never receives them. This only makes 00151 * sense for Startup process during recovery because it doesn't maintain a 00152 * relcache, yet it fires inval messages to allow query backends to see 00153 * schema changes. 00154 */ 00155 bool sendOnly; /* backend only sends, never receives */ 00156 00157 /* 00158 * Next LocalTransactionId to use for each idle backend slot. We keep 00159 * this here because it is indexed by BackendId and it is convenient to 00160 * copy the value to and from local memory when MyBackendId is set. It's 00161 * meaningless in an active ProcState entry. 00162 */ 00163 LocalTransactionId nextLXID; 00164 } ProcState; 00165 00166 /* Shared cache invalidation memory segment */ 00167 typedef struct SISeg 00168 { 00169 /* 00170 * General state information 00171 */ 00172 int minMsgNum; /* oldest message still needed */ 00173 int maxMsgNum; /* next message number to be assigned */ 00174 int nextThreshold; /* # of messages to call SICleanupQueue */ 00175 int lastBackend; /* index of last active procState entry, +1 */ 00176 int maxBackends; /* size of procState array */ 00177 00178 slock_t msgnumLock; /* spinlock protecting maxMsgNum */ 00179 00180 /* 00181 * Circular buffer holding shared-inval messages 00182 */ 00183 SharedInvalidationMessage buffer[MAXNUMMESSAGES]; 00184 00185 /* 00186 * Per-backend state info. 00187 * 00188 * We declare procState as 1 entry because C wants a fixed-size array, but 00189 * actually it is maxBackends entries long. 00190 */ 00191 ProcState procState[1]; /* reflects the invalidation state */ 00192 } SISeg; 00193 00194 static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */ 00195 00196 00197 static LocalTransactionId nextLocalTransactionId; 00198 00199 static void CleanupInvalidationState(int status, Datum arg); 00200 00201 00202 /* 00203 * SInvalShmemSize --- return shared-memory space needed 00204 */ 00205 Size 00206 SInvalShmemSize(void) 00207 { 00208 Size size; 00209 00210 size = offsetof(SISeg, procState); 00211 size = add_size(size, mul_size(sizeof(ProcState), MaxBackends)); 00212 00213 return size; 00214 } 00215 00216 /* 00217 * SharedInvalBufferInit 00218 * Create and initialize the SI message buffer 00219 */ 00220 void 00221 CreateSharedInvalidationState(void) 00222 { 00223 Size size; 00224 int i; 00225 bool found; 00226 00227 /* Allocate space in shared memory */ 00228 size = offsetof(SISeg, procState); 00229 size = add_size(size, mul_size(sizeof(ProcState), MaxBackends)); 00230 00231 shmInvalBuffer = (SISeg *) 00232 ShmemInitStruct("shmInvalBuffer", size, &found); 00233 if (found) 00234 return; 00235 00236 /* Clear message counters, save size of procState array, init spinlock */ 00237 shmInvalBuffer->minMsgNum = 0; 00238 shmInvalBuffer->maxMsgNum = 0; 00239 shmInvalBuffer->nextThreshold = CLEANUP_MIN; 00240 shmInvalBuffer->lastBackend = 0; 00241 shmInvalBuffer->maxBackends = MaxBackends; 00242 SpinLockInit(&shmInvalBuffer->msgnumLock); 00243 00244 /* The buffer[] array is initially all unused, so we need not fill it */ 00245 00246 /* Mark all backends inactive, and initialize nextLXID */ 00247 for (i = 0; i < shmInvalBuffer->maxBackends; i++) 00248 { 00249 shmInvalBuffer->procState[i].procPid = 0; /* inactive */ 00250 shmInvalBuffer->procState[i].proc = NULL; 00251 shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */ 00252 shmInvalBuffer->procState[i].resetState = false; 00253 shmInvalBuffer->procState[i].signaled = false; 00254 shmInvalBuffer->procState[i].hasMessages = false; 00255 shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId; 00256 } 00257 } 00258 00259 /* 00260 * SharedInvalBackendInit 00261 * Initialize a new backend to operate on the sinval buffer 00262 */ 00263 void 00264 SharedInvalBackendInit(bool sendOnly) 00265 { 00266 int index; 00267 ProcState *stateP = NULL; 00268 SISeg *segP = shmInvalBuffer; 00269 00270 /* 00271 * This can run in parallel with read operations, but not with write 00272 * operations, since SIInsertDataEntries relies on lastBackend to set 00273 * hasMessages appropriately. 00274 */ 00275 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); 00276 00277 /* Look for a free entry in the procState array */ 00278 for (index = 0; index < segP->lastBackend; index++) 00279 { 00280 if (segP->procState[index].procPid == 0) /* inactive slot? */ 00281 { 00282 stateP = &segP->procState[index]; 00283 break; 00284 } 00285 } 00286 00287 if (stateP == NULL) 00288 { 00289 if (segP->lastBackend < segP->maxBackends) 00290 { 00291 stateP = &segP->procState[segP->lastBackend]; 00292 Assert(stateP->procPid == 0); 00293 segP->lastBackend++; 00294 } 00295 else 00296 { 00297 /* 00298 * out of procState slots: MaxBackends exceeded -- report normally 00299 */ 00300 MyBackendId = InvalidBackendId; 00301 LWLockRelease(SInvalWriteLock); 00302 ereport(FATAL, 00303 (errcode(ERRCODE_TOO_MANY_CONNECTIONS), 00304 errmsg("sorry, too many clients already"))); 00305 } 00306 } 00307 00308 MyBackendId = (stateP - &segP->procState[0]) + 1; 00309 00310 /* Advertise assigned backend ID in MyProc */ 00311 MyProc->backendId = MyBackendId; 00312 00313 /* Fetch next local transaction ID into local memory */ 00314 nextLocalTransactionId = stateP->nextLXID; 00315 00316 /* mark myself active, with all extant messages already read */ 00317 stateP->procPid = MyProcPid; 00318 stateP->proc = MyProc; 00319 stateP->nextMsgNum = segP->maxMsgNum; 00320 stateP->resetState = false; 00321 stateP->signaled = false; 00322 stateP->hasMessages = false; 00323 stateP->sendOnly = sendOnly; 00324 00325 LWLockRelease(SInvalWriteLock); 00326 00327 /* register exit routine to mark my entry inactive at exit */ 00328 on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP)); 00329 00330 elog(DEBUG4, "my backend ID is %d", MyBackendId); 00331 } 00332 00333 /* 00334 * CleanupInvalidationState 00335 * Mark the current backend as no longer active. 00336 * 00337 * This function is called via on_shmem_exit() during backend shutdown. 00338 * 00339 * arg is really of type "SISeg*". 00340 */ 00341 static void 00342 CleanupInvalidationState(int status, Datum arg) 00343 { 00344 SISeg *segP = (SISeg *) DatumGetPointer(arg); 00345 ProcState *stateP; 00346 int i; 00347 00348 Assert(PointerIsValid(segP)); 00349 00350 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); 00351 00352 stateP = &segP->procState[MyBackendId - 1]; 00353 00354 /* Update next local transaction ID for next holder of this backendID */ 00355 stateP->nextLXID = nextLocalTransactionId; 00356 00357 /* Mark myself inactive */ 00358 stateP->procPid = 0; 00359 stateP->proc = NULL; 00360 stateP->nextMsgNum = 0; 00361 stateP->resetState = false; 00362 stateP->signaled = false; 00363 00364 /* Recompute index of last active backend */ 00365 for (i = segP->lastBackend; i > 0; i--) 00366 { 00367 if (segP->procState[i - 1].procPid != 0) 00368 break; 00369 } 00370 segP->lastBackend = i; 00371 00372 LWLockRelease(SInvalWriteLock); 00373 } 00374 00375 /* 00376 * BackendIdGetProc 00377 * Get the PGPROC structure for a backend, given the backend ID. 00378 * The result may be out of date arbitrarily quickly, so the caller 00379 * must be careful about how this information is used. NULL is 00380 * returned if the backend is not active. 00381 */ 00382 PGPROC * 00383 BackendIdGetProc(int backendID) 00384 { 00385 PGPROC *result = NULL; 00386 SISeg *segP = shmInvalBuffer; 00387 00388 /* Need to lock out additions/removals of backends */ 00389 LWLockAcquire(SInvalWriteLock, LW_SHARED); 00390 00391 if (backendID > 0 && backendID <= segP->lastBackend) 00392 { 00393 ProcState *stateP = &segP->procState[backendID - 1]; 00394 00395 result = stateP->proc; 00396 } 00397 00398 LWLockRelease(SInvalWriteLock); 00399 00400 return result; 00401 } 00402 00403 /* 00404 * SIInsertDataEntries 00405 * Add new invalidation message(s) to the buffer. 00406 */ 00407 void 00408 SIInsertDataEntries(const SharedInvalidationMessage *data, int n) 00409 { 00410 SISeg *segP = shmInvalBuffer; 00411 00412 /* 00413 * N can be arbitrarily large. We divide the work into groups of no more 00414 * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for 00415 * an unreasonably long time. (This is not so much because we care about 00416 * letting in other writers, as that some just-caught-up backend might be 00417 * trying to do SICleanupQueue to pass on its signal, and we don't want it 00418 * to have to wait a long time.) Also, we need to consider calling 00419 * SICleanupQueue every so often. 00420 */ 00421 while (n > 0) 00422 { 00423 int nthistime = Min(n, WRITE_QUANTUM); 00424 int numMsgs; 00425 int max; 00426 int i; 00427 00428 n -= nthistime; 00429 00430 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); 00431 00432 /* 00433 * If the buffer is full, we *must* acquire some space. Clean the 00434 * queue and reset anyone who is preventing space from being freed. 00435 * Otherwise, clean the queue only when it's exceeded the next 00436 * fullness threshold. We have to loop and recheck the buffer state 00437 * after any call of SICleanupQueue. 00438 */ 00439 for (;;) 00440 { 00441 numMsgs = segP->maxMsgNum - segP->minMsgNum; 00442 if (numMsgs + nthistime > MAXNUMMESSAGES || 00443 numMsgs >= segP->nextThreshold) 00444 SICleanupQueue(true, nthistime); 00445 else 00446 break; 00447 } 00448 00449 /* 00450 * Insert new message(s) into proper slot of circular buffer 00451 */ 00452 max = segP->maxMsgNum; 00453 while (nthistime-- > 0) 00454 { 00455 segP->buffer[max % MAXNUMMESSAGES] = *data++; 00456 max++; 00457 } 00458 00459 /* Update current value of maxMsgNum using spinlock */ 00460 { 00461 /* use volatile pointer to prevent code rearrangement */ 00462 volatile SISeg *vsegP = segP; 00463 00464 SpinLockAcquire(&vsegP->msgnumLock); 00465 vsegP->maxMsgNum = max; 00466 SpinLockRelease(&vsegP->msgnumLock); 00467 } 00468 00469 /* 00470 * Now that the maxMsgNum change is globally visible, we give everyone 00471 * a swift kick to make sure they read the newly added messages. 00472 * Releasing SInvalWriteLock will enforce a full memory barrier, so 00473 * these (unlocked) changes will be committed to memory before we exit 00474 * the function. 00475 */ 00476 for (i = 0; i < segP->lastBackend; i++) 00477 { 00478 ProcState *stateP = &segP->procState[i]; 00479 00480 stateP->hasMessages = true; 00481 } 00482 00483 LWLockRelease(SInvalWriteLock); 00484 } 00485 } 00486 00487 /* 00488 * SIGetDataEntries 00489 * get next SI message(s) for current backend, if there are any 00490 * 00491 * Possible return values: 00492 * 0: no SI message available 00493 * n>0: next n SI messages have been extracted into data[] 00494 * -1: SI reset message extracted 00495 * 00496 * If the return value is less than the array size "datasize", the caller 00497 * can assume that there are no more SI messages after the one(s) returned. 00498 * Otherwise, another call is needed to collect more messages. 00499 * 00500 * NB: this can run in parallel with other instances of SIGetDataEntries 00501 * executing on behalf of other backends, since each instance will modify only 00502 * fields of its own backend's ProcState, and no instance will look at fields 00503 * of other backends' ProcStates. We express this by grabbing SInvalReadLock 00504 * in shared mode. Note that this is not exactly the normal (read-only) 00505 * interpretation of a shared lock! Look closely at the interactions before 00506 * allowing SInvalReadLock to be grabbed in shared mode for any other reason! 00507 * 00508 * NB: this can also run in parallel with SIInsertDataEntries. It is not 00509 * guaranteed that we will return any messages added after the routine is 00510 * entered. 00511 * 00512 * Note: we assume that "datasize" is not so large that it might be important 00513 * to break our hold on SInvalReadLock into segments. 00514 */ 00515 int 00516 SIGetDataEntries(SharedInvalidationMessage *data, int datasize) 00517 { 00518 SISeg *segP; 00519 ProcState *stateP; 00520 int max; 00521 int n; 00522 00523 segP = shmInvalBuffer; 00524 stateP = &segP->procState[MyBackendId - 1]; 00525 00526 /* 00527 * Before starting to take locks, do a quick, unlocked test to see whether 00528 * there can possibly be anything to read. On a multiprocessor system, 00529 * it's possible that this load could migrate backwards and occur before 00530 * we actually enter this function, so we might miss a sinval message that 00531 * was just added by some other processor. But they can't migrate 00532 * backwards over a preceding lock acquisition, so it should be OK. If we 00533 * haven't acquired a lock preventing against further relevant 00534 * invalidations, any such occurrence is not much different than if the 00535 * invalidation had arrived slightly later in the first place. 00536 */ 00537 if (!stateP->hasMessages) 00538 return 0; 00539 00540 LWLockAcquire(SInvalReadLock, LW_SHARED); 00541 00542 /* 00543 * We must reset hasMessages before determining how many messages we're 00544 * going to read. That way, if new messages arrive after we have 00545 * determined how many we're reading, the flag will get reset and we'll 00546 * notice those messages part-way through. 00547 * 00548 * Note that, if we don't end up reading all of the messages, we had 00549 * better be certain to reset this flag before exiting! 00550 */ 00551 stateP->hasMessages = false; 00552 00553 /* Fetch current value of maxMsgNum using spinlock */ 00554 { 00555 /* use volatile pointer to prevent code rearrangement */ 00556 volatile SISeg *vsegP = segP; 00557 00558 SpinLockAcquire(&vsegP->msgnumLock); 00559 max = vsegP->maxMsgNum; 00560 SpinLockRelease(&vsegP->msgnumLock); 00561 } 00562 00563 if (stateP->resetState) 00564 { 00565 /* 00566 * Force reset. We can say we have dealt with any messages added 00567 * since the reset, as well; and that means we should clear the 00568 * signaled flag, too. 00569 */ 00570 stateP->nextMsgNum = max; 00571 stateP->resetState = false; 00572 stateP->signaled = false; 00573 LWLockRelease(SInvalReadLock); 00574 return -1; 00575 } 00576 00577 /* 00578 * Retrieve messages and advance backend's counter, until data array is 00579 * full or there are no more messages. 00580 * 00581 * There may be other backends that haven't read the message(s), so we 00582 * cannot delete them here. SICleanupQueue() will eventually remove them 00583 * from the queue. 00584 */ 00585 n = 0; 00586 while (n < datasize && stateP->nextMsgNum < max) 00587 { 00588 data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES]; 00589 stateP->nextMsgNum++; 00590 } 00591 00592 /* 00593 * If we have caught up completely, reset our "signaled" flag so that 00594 * we'll get another signal if we fall behind again. 00595 * 00596 * If we haven't caught up completely, reset the hasMessages flag so that 00597 * we see the remaining messages next time. 00598 */ 00599 if (stateP->nextMsgNum >= max) 00600 stateP->signaled = false; 00601 else 00602 stateP->hasMessages = true; 00603 00604 LWLockRelease(SInvalReadLock); 00605 return n; 00606 } 00607 00608 /* 00609 * SICleanupQueue 00610 * Remove messages that have been consumed by all active backends 00611 * 00612 * callerHasWriteLock is TRUE if caller is holding SInvalWriteLock. 00613 * minFree is the minimum number of message slots to make free. 00614 * 00615 * Possible side effects of this routine include marking one or more 00616 * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT 00617 * to some backend that seems to be getting too far behind. We signal at 00618 * most one backend at a time, for reasons explained at the top of the file. 00619 * 00620 * Caution: because we transiently release write lock when we have to signal 00621 * some other backend, it is NOT guaranteed that there are still minFree 00622 * free message slots at exit. Caller must recheck and perhaps retry. 00623 */ 00624 void 00625 SICleanupQueue(bool callerHasWriteLock, int minFree) 00626 { 00627 SISeg *segP = shmInvalBuffer; 00628 int min, 00629 minsig, 00630 lowbound, 00631 numMsgs, 00632 i; 00633 ProcState *needSig = NULL; 00634 00635 /* Lock out all writers and readers */ 00636 if (!callerHasWriteLock) 00637 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); 00638 LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE); 00639 00640 /* 00641 * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the 00642 * furthest-back backend that needs signaling (if any), and reset any 00643 * backends that are too far back. Note that because we ignore sendOnly 00644 * backends here it is possible for them to keep sending messages without 00645 * a problem even when they are the only active backend. 00646 */ 00647 min = segP->maxMsgNum; 00648 minsig = min - SIG_THRESHOLD; 00649 lowbound = min - MAXNUMMESSAGES + minFree; 00650 00651 for (i = 0; i < segP->lastBackend; i++) 00652 { 00653 ProcState *stateP = &segP->procState[i]; 00654 int n = stateP->nextMsgNum; 00655 00656 /* Ignore if inactive or already in reset state */ 00657 if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly) 00658 continue; 00659 00660 /* 00661 * If we must free some space and this backend is preventing it, force 00662 * him into reset state and then ignore until he catches up. 00663 */ 00664 if (n < lowbound) 00665 { 00666 stateP->resetState = true; 00667 /* no point in signaling him ... */ 00668 continue; 00669 } 00670 00671 /* Track the global minimum nextMsgNum */ 00672 if (n < min) 00673 min = n; 00674 00675 /* Also see who's furthest back of the unsignaled backends */ 00676 if (n < minsig && !stateP->signaled) 00677 { 00678 minsig = n; 00679 needSig = stateP; 00680 } 00681 } 00682 segP->minMsgNum = min; 00683 00684 /* 00685 * When minMsgNum gets really large, decrement all message counters so as 00686 * to forestall overflow of the counters. This happens seldom enough that 00687 * folding it into the previous loop would be a loser. 00688 */ 00689 if (min >= MSGNUMWRAPAROUND) 00690 { 00691 segP->minMsgNum -= MSGNUMWRAPAROUND; 00692 segP->maxMsgNum -= MSGNUMWRAPAROUND; 00693 for (i = 0; i < segP->lastBackend; i++) 00694 { 00695 /* we don't bother skipping inactive entries here */ 00696 segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND; 00697 } 00698 } 00699 00700 /* 00701 * Determine how many messages are still in the queue, and set the 00702 * threshold at which we should repeat SICleanupQueue(). 00703 */ 00704 numMsgs = segP->maxMsgNum - segP->minMsgNum; 00705 if (numMsgs < CLEANUP_MIN) 00706 segP->nextThreshold = CLEANUP_MIN; 00707 else 00708 segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM; 00709 00710 /* 00711 * Lastly, signal anyone who needs a catchup interrupt. Since 00712 * SendProcSignal() might not be fast, we don't want to hold locks while 00713 * executing it. 00714 */ 00715 if (needSig) 00716 { 00717 pid_t his_pid = needSig->procPid; 00718 BackendId his_backendId = (needSig - &segP->procState[0]) + 1; 00719 00720 needSig->signaled = true; 00721 LWLockRelease(SInvalReadLock); 00722 LWLockRelease(SInvalWriteLock); 00723 elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid); 00724 SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId); 00725 if (callerHasWriteLock) 00726 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); 00727 } 00728 else 00729 { 00730 LWLockRelease(SInvalReadLock); 00731 if (!callerHasWriteLock) 00732 LWLockRelease(SInvalWriteLock); 00733 } 00734 } 00735 00736 00737 /* 00738 * GetNextLocalTransactionId --- allocate a new LocalTransactionId 00739 * 00740 * We split VirtualTransactionIds into two parts so that it is possible 00741 * to allocate a new one without any contention for shared memory, except 00742 * for a bit of additional overhead during backend startup/shutdown. 00743 * The high-order part of a VirtualTransactionId is a BackendId, and the 00744 * low-order part is a LocalTransactionId, which we assign from a local 00745 * counter. To avoid the risk of a VirtualTransactionId being reused 00746 * within a short interval, successive procs occupying the same backend ID 00747 * slot should use a consecutive sequence of local IDs, which is implemented 00748 * by copying nextLocalTransactionId as seen above. 00749 */ 00750 LocalTransactionId 00751 GetNextLocalTransactionId(void) 00752 { 00753 LocalTransactionId result; 00754 00755 /* loop to avoid returning InvalidLocalTransactionId at wraparound */ 00756 do 00757 { 00758 result = nextLocalTransactionId++; 00759 } while (!LocalTransactionIdIsValid(result)); 00760 00761 return result; 00762 }