Header And Logo

PostgreSQL
| The world's most advanced open source database.

walreceiverfuncs.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * walreceiverfuncs.c
00004  *
00005  * This file contains functions used by the startup process to communicate
00006  * with the walreceiver process. Functions implementing walreceiver itself
00007  * are in walreceiver.c.
00008  *
00009  * Portions Copyright (c) 2010-2013, PostgreSQL Global Development Group
00010  *
00011  *
00012  * IDENTIFICATION
00013  *    src/backend/replication/walreceiverfuncs.c
00014  *
00015  *-------------------------------------------------------------------------
00016  */
00017 #include "postgres.h"
00018 
00019 #include <sys/types.h>
00020 #include <sys/stat.h>
00021 #include <sys/time.h>
00022 #include <time.h>
00023 #include <unistd.h>
00024 #include <signal.h>
00025 
00026 #include "access/xlog_internal.h"
00027 #include "postmaster/startup.h"
00028 #include "replication/walreceiver.h"
00029 #include "storage/pmsignal.h"
00030 #include "storage/shmem.h"
00031 #include "utils/timestamp.h"
00032 
00033 WalRcvData *WalRcv = NULL;
00034 
00035 /*
00036  * How long to wait for walreceiver to start up after requesting
00037  * postmaster to launch it. In seconds.
00038  */
00039 #define WALRCV_STARTUP_TIMEOUT 10
00040 
00041 /* Report shared memory space needed by WalRcvShmemInit */
00042 Size
00043 WalRcvShmemSize(void)
00044 {
00045     Size        size = 0;
00046 
00047     size = add_size(size, sizeof(WalRcvData));
00048 
00049     return size;
00050 }
00051 
00052 /* Allocate and initialize walreceiver-related shared memory */
00053 void
00054 WalRcvShmemInit(void)
00055 {
00056     bool        found;
00057 
00058     WalRcv = (WalRcvData *)
00059         ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
00060 
00061     if (!found)
00062     {
00063         /* First time through, so initialize */
00064         MemSet(WalRcv, 0, WalRcvShmemSize());
00065         WalRcv->walRcvState = WALRCV_STOPPED;
00066         SpinLockInit(&WalRcv->mutex);
00067         InitSharedLatch(&WalRcv->latch);
00068     }
00069 }
00070 
00071 /* Is walreceiver running (or starting up)? */
00072 bool
00073 WalRcvRunning(void)
00074 {
00075     /* use volatile pointer to prevent code rearrangement */
00076     volatile WalRcvData *walrcv = WalRcv;
00077     WalRcvState state;
00078     pg_time_t   startTime;
00079 
00080     SpinLockAcquire(&walrcv->mutex);
00081 
00082     state = walrcv->walRcvState;
00083     startTime = walrcv->startTime;
00084 
00085     SpinLockRelease(&walrcv->mutex);
00086 
00087     /*
00088      * If it has taken too long for walreceiver to start up, give up. Setting
00089      * the state to STOPPED ensures that if walreceiver later does start up
00090      * after all, it will see that it's not supposed to be running and die
00091      * without doing anything.
00092      */
00093     if (state == WALRCV_STARTING)
00094     {
00095         pg_time_t   now = (pg_time_t) time(NULL);
00096 
00097         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
00098         {
00099             SpinLockAcquire(&walrcv->mutex);
00100 
00101             if (walrcv->walRcvState == WALRCV_STARTING)
00102                 state = walrcv->walRcvState = WALRCV_STOPPED;
00103 
00104             SpinLockRelease(&walrcv->mutex);
00105         }
00106     }
00107 
00108     if (state != WALRCV_STOPPED)
00109         return true;
00110     else
00111         return false;
00112 }
00113 
00114 /*
00115  * Is walreceiver running and streaming (or at least attempting to connect,
00116  * or starting up)?
00117  */
00118 bool
00119 WalRcvStreaming(void)
00120 {
00121     /* use volatile pointer to prevent code rearrangement */
00122     volatile WalRcvData *walrcv = WalRcv;
00123     WalRcvState state;
00124     pg_time_t   startTime;
00125 
00126     SpinLockAcquire(&walrcv->mutex);
00127 
00128     state = walrcv->walRcvState;
00129     startTime = walrcv->startTime;
00130 
00131     SpinLockRelease(&walrcv->mutex);
00132 
00133     /*
00134      * If it has taken too long for walreceiver to start up, give up. Setting
00135      * the state to STOPPED ensures that if walreceiver later does start up
00136      * after all, it will see that it's not supposed to be running and die
00137      * without doing anything.
00138      */
00139     if (state == WALRCV_STARTING)
00140     {
00141         pg_time_t   now = (pg_time_t) time(NULL);
00142 
00143         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
00144         {
00145             SpinLockAcquire(&walrcv->mutex);
00146 
00147             if (walrcv->walRcvState == WALRCV_STARTING)
00148                 state = walrcv->walRcvState = WALRCV_STOPPED;
00149 
00150             SpinLockRelease(&walrcv->mutex);
00151         }
00152     }
00153 
00154     if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
00155         state == WALRCV_RESTARTING)
00156         return true;
00157     else
00158         return false;
00159 }
00160 
00161 /*
00162  * Stop walreceiver (if running) and wait for it to die.
00163  * Executed by the Startup process.
00164  */
00165 void
00166 ShutdownWalRcv(void)
00167 {
00168     /* use volatile pointer to prevent code rearrangement */
00169     volatile WalRcvData *walrcv = WalRcv;
00170     pid_t       walrcvpid = 0;
00171 
00172     /*
00173      * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
00174      * mode once it's finished, and will also request postmaster to not
00175      * restart itself.
00176      */
00177     SpinLockAcquire(&walrcv->mutex);
00178     switch (walrcv->walRcvState)
00179     {
00180         case WALRCV_STOPPED:
00181             break;
00182         case WALRCV_STARTING:
00183             walrcv->walRcvState = WALRCV_STOPPED;
00184             break;
00185 
00186         case WALRCV_STREAMING:
00187         case WALRCV_WAITING:
00188         case WALRCV_RESTARTING:
00189             walrcv->walRcvState = WALRCV_STOPPING;
00190             /* fall through */
00191         case WALRCV_STOPPING:
00192             walrcvpid = walrcv->pid;
00193             break;
00194     }
00195     SpinLockRelease(&walrcv->mutex);
00196 
00197     /*
00198      * Signal walreceiver process if it was still running.
00199      */
00200     if (walrcvpid != 0)
00201         kill(walrcvpid, SIGTERM);
00202 
00203     /*
00204      * Wait for walreceiver to acknowledge its death by setting state to
00205      * WALRCV_STOPPED.
00206      */
00207     while (WalRcvRunning())
00208     {
00209         /*
00210          * This possibly-long loop needs to handle interrupts of startup
00211          * process.
00212          */
00213         HandleStartupProcInterrupts();
00214 
00215         pg_usleep(100000);      /* 100ms */
00216     }
00217 }
00218 
00219 /*
00220  * Request postmaster to start walreceiver.
00221  *
00222  * recptr indicates the position where streaming should begin, and conninfo
00223  * is a libpq connection string to use.
00224  */
00225 void
00226 RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo)
00227 {
00228     /* use volatile pointer to prevent code rearrangement */
00229     volatile WalRcvData *walrcv = WalRcv;
00230     bool        launch = false;
00231     pg_time_t   now = (pg_time_t) time(NULL);
00232 
00233     /*
00234      * We always start at the beginning of the segment. That prevents a broken
00235      * segment (i.e., with no records in the first half of a segment) from
00236      * being created by XLOG streaming, which might cause trouble later on if
00237      * the segment is e.g archived.
00238      */
00239     if (recptr % XLogSegSize != 0)
00240         recptr -= recptr % XLogSegSize;
00241 
00242     SpinLockAcquire(&walrcv->mutex);
00243 
00244     /* It better be stopped if we try to restart it */
00245     Assert(walrcv->walRcvState == WALRCV_STOPPED ||
00246            walrcv->walRcvState == WALRCV_WAITING);
00247 
00248     if (conninfo != NULL)
00249         strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
00250     else
00251         walrcv->conninfo[0] = '\0';
00252 
00253     if (walrcv->walRcvState == WALRCV_STOPPED)
00254     {
00255         launch = true;
00256         walrcv->walRcvState = WALRCV_STARTING;
00257     }
00258     else
00259         walrcv->walRcvState = WALRCV_RESTARTING;
00260     walrcv->startTime = now;
00261 
00262     /*
00263      * If this is the first startup of walreceiver, we initialize receivedUpto
00264      * and latestChunkStart to receiveStart.
00265      */
00266     if (walrcv->receiveStart == 0)
00267     {
00268         walrcv->receivedUpto = recptr;
00269         walrcv->latestChunkStart = recptr;
00270     }
00271     walrcv->receiveStart = recptr;
00272     walrcv->receiveStartTLI = tli;
00273 
00274     SpinLockRelease(&walrcv->mutex);
00275 
00276     if (launch)
00277         SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
00278     else
00279         SetLatch(&walrcv->latch);
00280 }
00281 
00282 /*
00283  * Returns the last+1 byte position that walreceiver has written.
00284  *
00285  * Optionally, returns the previous chunk start, that is the first byte
00286  * written in the most recent walreceiver flush cycle.  Callers not
00287  * interested in that value may pass NULL for latestChunkStart. Same for
00288  * receiveTLI.
00289  */
00290 XLogRecPtr
00291 GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
00292 {
00293     /* use volatile pointer to prevent code rearrangement */
00294     volatile WalRcvData *walrcv = WalRcv;
00295     XLogRecPtr  recptr;
00296 
00297     SpinLockAcquire(&walrcv->mutex);
00298     recptr = walrcv->receivedUpto;
00299     if (latestChunkStart)
00300         *latestChunkStart = walrcv->latestChunkStart;
00301     if (receiveTLI)
00302         *receiveTLI = walrcv->receivedTLI;
00303     SpinLockRelease(&walrcv->mutex);
00304 
00305     return recptr;
00306 }
00307 
00308 /*
00309  * Returns the replication apply delay in ms
00310  */
00311 int
00312 GetReplicationApplyDelay(void)
00313 {
00314     /* use volatile pointer to prevent code rearrangement */
00315     volatile WalRcvData *walrcv = WalRcv;
00316 
00317     XLogRecPtr  receivePtr;
00318     XLogRecPtr  replayPtr;
00319 
00320     long        secs;
00321     int         usecs;
00322 
00323     SpinLockAcquire(&walrcv->mutex);
00324     receivePtr = walrcv->receivedUpto;
00325     SpinLockRelease(&walrcv->mutex);
00326 
00327     replayPtr = GetXLogReplayRecPtr(NULL);
00328 
00329     if (receivePtr == replayPtr)
00330         return 0;
00331 
00332     TimestampDifference(GetCurrentChunkReplayStartTime(),
00333                         GetCurrentTimestamp(),
00334                         &secs, &usecs);
00335 
00336     return (((int) secs * 1000) + (usecs / 1000));
00337 }
00338 
00339 /*
00340  * Returns the network latency in ms, note that this includes any
00341  * difference in clock settings between the servers, as well as timezone.
00342  */
00343 int
00344 GetReplicationTransferLatency(void)
00345 {
00346     /* use volatile pointer to prevent code rearrangement */
00347     volatile WalRcvData *walrcv = WalRcv;
00348 
00349     TimestampTz lastMsgSendTime;
00350     TimestampTz lastMsgReceiptTime;
00351 
00352     long        secs = 0;
00353     int         usecs = 0;
00354     int         ms;
00355 
00356     SpinLockAcquire(&walrcv->mutex);
00357     lastMsgSendTime = walrcv->lastMsgSendTime;
00358     lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
00359     SpinLockRelease(&walrcv->mutex);
00360 
00361     TimestampDifference(lastMsgSendTime,
00362                         lastMsgReceiptTime,
00363                         &secs, &usecs);
00364 
00365     ms = ((int) secs * 1000) + (usecs / 1000);
00366 
00367     return ms;
00368 }