Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #include "postgres.h"
00018
00019 #include <sys/types.h>
00020 #include <sys/stat.h>
00021 #include <sys/time.h>
00022 #include <time.h>
00023 #include <unistd.h>
00024 #include <signal.h>
00025
00026 #include "access/xlog_internal.h"
00027 #include "postmaster/startup.h"
00028 #include "replication/walreceiver.h"
00029 #include "storage/pmsignal.h"
00030 #include "storage/shmem.h"
00031 #include "utils/timestamp.h"
00032
00033 WalRcvData *WalRcv = NULL;
00034
00035
00036
00037
00038
00039 #define WALRCV_STARTUP_TIMEOUT 10
00040
00041
00042 Size
00043 WalRcvShmemSize(void)
00044 {
00045 Size size = 0;
00046
00047 size = add_size(size, sizeof(WalRcvData));
00048
00049 return size;
00050 }
00051
00052
00053 void
00054 WalRcvShmemInit(void)
00055 {
00056 bool found;
00057
00058 WalRcv = (WalRcvData *)
00059 ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
00060
00061 if (!found)
00062 {
00063
00064 MemSet(WalRcv, 0, WalRcvShmemSize());
00065 WalRcv->walRcvState = WALRCV_STOPPED;
00066 SpinLockInit(&WalRcv->mutex);
00067 InitSharedLatch(&WalRcv->latch);
00068 }
00069 }
00070
00071
00072 bool
00073 WalRcvRunning(void)
00074 {
00075
00076 volatile WalRcvData *walrcv = WalRcv;
00077 WalRcvState state;
00078 pg_time_t startTime;
00079
00080 SpinLockAcquire(&walrcv->mutex);
00081
00082 state = walrcv->walRcvState;
00083 startTime = walrcv->startTime;
00084
00085 SpinLockRelease(&walrcv->mutex);
00086
00087
00088
00089
00090
00091
00092
00093 if (state == WALRCV_STARTING)
00094 {
00095 pg_time_t now = (pg_time_t) time(NULL);
00096
00097 if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
00098 {
00099 SpinLockAcquire(&walrcv->mutex);
00100
00101 if (walrcv->walRcvState == WALRCV_STARTING)
00102 state = walrcv->walRcvState = WALRCV_STOPPED;
00103
00104 SpinLockRelease(&walrcv->mutex);
00105 }
00106 }
00107
00108 if (state != WALRCV_STOPPED)
00109 return true;
00110 else
00111 return false;
00112 }
00113
00114
00115
00116
00117
00118 bool
00119 WalRcvStreaming(void)
00120 {
00121
00122 volatile WalRcvData *walrcv = WalRcv;
00123 WalRcvState state;
00124 pg_time_t startTime;
00125
00126 SpinLockAcquire(&walrcv->mutex);
00127
00128 state = walrcv->walRcvState;
00129 startTime = walrcv->startTime;
00130
00131 SpinLockRelease(&walrcv->mutex);
00132
00133
00134
00135
00136
00137
00138
00139 if (state == WALRCV_STARTING)
00140 {
00141 pg_time_t now = (pg_time_t) time(NULL);
00142
00143 if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
00144 {
00145 SpinLockAcquire(&walrcv->mutex);
00146
00147 if (walrcv->walRcvState == WALRCV_STARTING)
00148 state = walrcv->walRcvState = WALRCV_STOPPED;
00149
00150 SpinLockRelease(&walrcv->mutex);
00151 }
00152 }
00153
00154 if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
00155 state == WALRCV_RESTARTING)
00156 return true;
00157 else
00158 return false;
00159 }
00160
00161
00162
00163
00164
00165 void
00166 ShutdownWalRcv(void)
00167 {
00168
00169 volatile WalRcvData *walrcv = WalRcv;
00170 pid_t walrcvpid = 0;
00171
00172
00173
00174
00175
00176
00177 SpinLockAcquire(&walrcv->mutex);
00178 switch (walrcv->walRcvState)
00179 {
00180 case WALRCV_STOPPED:
00181 break;
00182 case WALRCV_STARTING:
00183 walrcv->walRcvState = WALRCV_STOPPED;
00184 break;
00185
00186 case WALRCV_STREAMING:
00187 case WALRCV_WAITING:
00188 case WALRCV_RESTARTING:
00189 walrcv->walRcvState = WALRCV_STOPPING;
00190
00191 case WALRCV_STOPPING:
00192 walrcvpid = walrcv->pid;
00193 break;
00194 }
00195 SpinLockRelease(&walrcv->mutex);
00196
00197
00198
00199
00200 if (walrcvpid != 0)
00201 kill(walrcvpid, SIGTERM);
00202
00203
00204
00205
00206
00207 while (WalRcvRunning())
00208 {
00209
00210
00211
00212
00213 HandleStartupProcInterrupts();
00214
00215 pg_usleep(100000);
00216 }
00217 }
00218
00219
00220
00221
00222
00223
00224
00225 void
00226 RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo)
00227 {
00228
00229 volatile WalRcvData *walrcv = WalRcv;
00230 bool launch = false;
00231 pg_time_t now = (pg_time_t) time(NULL);
00232
00233
00234
00235
00236
00237
00238
00239 if (recptr % XLogSegSize != 0)
00240 recptr -= recptr % XLogSegSize;
00241
00242 SpinLockAcquire(&walrcv->mutex);
00243
00244
00245 Assert(walrcv->walRcvState == WALRCV_STOPPED ||
00246 walrcv->walRcvState == WALRCV_WAITING);
00247
00248 if (conninfo != NULL)
00249 strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
00250 else
00251 walrcv->conninfo[0] = '\0';
00252
00253 if (walrcv->walRcvState == WALRCV_STOPPED)
00254 {
00255 launch = true;
00256 walrcv->walRcvState = WALRCV_STARTING;
00257 }
00258 else
00259 walrcv->walRcvState = WALRCV_RESTARTING;
00260 walrcv->startTime = now;
00261
00262
00263
00264
00265
00266 if (walrcv->receiveStart == 0)
00267 {
00268 walrcv->receivedUpto = recptr;
00269 walrcv->latestChunkStart = recptr;
00270 }
00271 walrcv->receiveStart = recptr;
00272 walrcv->receiveStartTLI = tli;
00273
00274 SpinLockRelease(&walrcv->mutex);
00275
00276 if (launch)
00277 SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
00278 else
00279 SetLatch(&walrcv->latch);
00280 }
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290 XLogRecPtr
00291 GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
00292 {
00293
00294 volatile WalRcvData *walrcv = WalRcv;
00295 XLogRecPtr recptr;
00296
00297 SpinLockAcquire(&walrcv->mutex);
00298 recptr = walrcv->receivedUpto;
00299 if (latestChunkStart)
00300 *latestChunkStart = walrcv->latestChunkStart;
00301 if (receiveTLI)
00302 *receiveTLI = walrcv->receivedTLI;
00303 SpinLockRelease(&walrcv->mutex);
00304
00305 return recptr;
00306 }
00307
00308
00309
00310
00311 int
00312 GetReplicationApplyDelay(void)
00313 {
00314
00315 volatile WalRcvData *walrcv = WalRcv;
00316
00317 XLogRecPtr receivePtr;
00318 XLogRecPtr replayPtr;
00319
00320 long secs;
00321 int usecs;
00322
00323 SpinLockAcquire(&walrcv->mutex);
00324 receivePtr = walrcv->receivedUpto;
00325 SpinLockRelease(&walrcv->mutex);
00326
00327 replayPtr = GetXLogReplayRecPtr(NULL);
00328
00329 if (receivePtr == replayPtr)
00330 return 0;
00331
00332 TimestampDifference(GetCurrentChunkReplayStartTime(),
00333 GetCurrentTimestamp(),
00334 &secs, &usecs);
00335
00336 return (((int) secs * 1000) + (usecs / 1000));
00337 }
00338
00339
00340
00341
00342
00343 int
00344 GetReplicationTransferLatency(void)
00345 {
00346
00347 volatile WalRcvData *walrcv = WalRcv;
00348
00349 TimestampTz lastMsgSendTime;
00350 TimestampTz lastMsgReceiptTime;
00351
00352 long secs = 0;
00353 int usecs = 0;
00354 int ms;
00355
00356 SpinLockAcquire(&walrcv->mutex);
00357 lastMsgSendTime = walrcv->lastMsgSendTime;
00358 lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
00359 SpinLockRelease(&walrcv->mutex);
00360
00361 TimestampDifference(lastMsgSendTime,
00362 lastMsgReceiptTime,
00363 &secs, &usecs);
00364
00365 ms = ((int) secs * 1000) + (usecs / 1000);
00366
00367 return ms;
00368 }