Header And Logo

PostgreSQL
| The world's most advanced open source database.

walreceiver.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * walreceiver.c
00004  *
00005  * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
00006  * is the process in the standby server that takes charge of receiving
00007  * XLOG records from a primary server during streaming replication.
00008  *
00009  * When the startup process determines that it's time to start streaming,
00010  * it instructs postmaster to start walreceiver. Walreceiver first connects
00011  * to the primary server (it will be served by a walsender process
00012  * in the primary server), and then keeps receiving XLOG records and
00013  * writing them to the disk as long as the connection is alive. As XLOG
00014  * records are received and flushed to disk, it updates the
00015  * WalRcv->receivedUpto variable in shared memory, to inform the startup
00016  * process of how far it can proceed with XLOG replay.
00017  *
00018  * If the primary server ends streaming, but doesn't disconnect, walreceiver
00019  * goes into "waiting" mode, and waits for the startup process to give new
00020  * instructions. The startup process will treat that the same as
00021  * disconnection, and will rescan the archive/pg_xlog directory. But when the
00022  * startup process wants to try streaming replication again, it will just
00023  * nudge the existing walreceiver process that's waiting, instead of launching
00024  * a new one.
00025  *
00026  * Normal termination is by SIGTERM, which instructs the walreceiver to
00027  * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
00028  * process, the walreceiver will simply abort and exit on SIGQUIT. A close
00029  * of the connection and a FATAL error are treated not as a crash but as
00030  * normal operation.
00031  *
00032  * This file contains the server-facing parts of walreceiver. The libpq-
00033  * specific parts are in the libpqwalreceiver module. It's loaded
00034  * dynamically to avoid linking the server with libpq.
00035  *
00036  * Portions Copyright (c) 2010-2013, PostgreSQL Global Development Group
00037  *
00038  *
00039  * IDENTIFICATION
00040  *    src/backend/replication/walreceiver.c
00041  *
00042  *-------------------------------------------------------------------------
00043  */
00044 #include "postgres.h"
00045 
00046 #include <signal.h>
00047 #include <unistd.h>
00048 
00049 #include "access/timeline.h"
00050 #include "access/transam.h"
00051 #include "access/xlog_internal.h"
00052 #include "libpq/pqformat.h"
00053 #include "libpq/pqsignal.h"
00054 #include "miscadmin.h"
00055 #include "replication/walreceiver.h"
00056 #include "replication/walsender.h"
00057 #include "storage/ipc.h"
00058 #include "storage/pmsignal.h"
00059 #include "storage/procarray.h"
00060 #include "utils/guc.h"
00061 #include "utils/ps_status.h"
00062 #include "utils/resowner.h"
00063 #include "utils/timestamp.h"
00064 
00065 
00066 /* GUC variables */
00067 int         wal_receiver_status_interval;
00068 int         wal_receiver_timeout;
00069 bool        hot_standby_feedback;
00070 
00071 /* libpqreceiver hooks to these when loaded */
00072 walrcv_connect_type walrcv_connect = NULL;
00073 walrcv_identify_system_type walrcv_identify_system = NULL;
00074 walrcv_startstreaming_type walrcv_startstreaming = NULL;
00075 walrcv_endstreaming_type walrcv_endstreaming = NULL;
00076 walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile = NULL;
00077 walrcv_receive_type walrcv_receive = NULL;
00078 walrcv_send_type walrcv_send = NULL;
00079 walrcv_disconnect_type walrcv_disconnect = NULL;
00080 
00081 #define NAPTIME_PER_CYCLE 100   /* max sleep time between cycles (100ms) */
00082 
00083 /*
00084  * These variables are used similarly to openLogFile/SegNo/Off,
00085  * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
00086  * corresponding the filename of recvFile.
00087  */
00088 static int  recvFile = -1;
00089 static TimeLineID   recvFileTLI = 0;
00090 static XLogSegNo recvSegNo = 0;
00091 static uint32 recvOff = 0;
00092 
00093 /*
00094  * Flags set by interrupt handlers of walreceiver for later service in the
00095  * main loop.
00096  */
00097 static volatile sig_atomic_t got_SIGHUP = false;
00098 static volatile sig_atomic_t got_SIGTERM = false;
00099 
00100 /*
00101  * LogstreamResult indicates the byte positions that we have already
00102  * written/fsynced.
00103  */
00104 static struct
00105 {
00106     XLogRecPtr  Write;          /* last byte + 1 written out in the standby */
00107     XLogRecPtr  Flush;          /* last byte + 1 flushed in the standby */
00108 }   LogstreamResult;
00109 
00110 static StringInfoData   reply_message;
00111 static StringInfoData   incoming_message;
00112 
00113 /*
00114  * About SIGTERM handling:
00115  *
00116  * We can't just exit(1) within SIGTERM signal handler, because the signal
00117  * might arrive in the middle of some critical operation, like while we're
00118  * holding a spinlock. We also can't just set a flag in signal handler and
00119  * check it in the main loop, because we perform some blocking operations
00120  * like libpqrcv_PQexec(), which can take a long time to finish.
00121  *
00122  * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
00123  * safe for the signal handler to elog(FATAL) immediately. Otherwise it just
00124  * sets got_SIGTERM flag, which is checked in the main loop when convenient.
00125  *
00126  * This is very much like what regular backends do with ImmediateInterruptOK,
00127  * ProcessInterrupts() etc.
00128  */
00129 static volatile bool WalRcvImmediateInterruptOK = false;
00130 
00131 /* Prototypes for private functions */
00132 static void ProcessWalRcvInterrupts(void);
00133 static void EnableWalRcvImmediateExit(void);
00134 static void DisableWalRcvImmediateExit(void);
00135 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
00136 static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
00137 static void WalRcvDie(int code, Datum arg);
00138 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
00139 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
00140 static void XLogWalRcvFlush(bool dying);
00141 static void XLogWalRcvSendReply(bool force, bool requestReply);
00142 static void XLogWalRcvSendHSFeedback(bool immed);
00143 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
00144 
00145 /* Signal handlers */
00146 static void WalRcvSigHupHandler(SIGNAL_ARGS);
00147 static void WalRcvSigUsr1Handler(SIGNAL_ARGS);
00148 static void WalRcvShutdownHandler(SIGNAL_ARGS);
00149 static void WalRcvQuickDieHandler(SIGNAL_ARGS);
00150 
00151 
00152 static void
00153 ProcessWalRcvInterrupts(void)
00154 {
00155     /*
00156      * Although walreceiver interrupt handling doesn't use the same scheme as
00157      * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
00158      * any incoming signals on Win32.
00159      */
00160     CHECK_FOR_INTERRUPTS();
00161 
00162     if (got_SIGTERM)
00163     {
00164         WalRcvImmediateInterruptOK = false;
00165         ereport(FATAL,
00166                 (errcode(ERRCODE_ADMIN_SHUTDOWN),
00167                  errmsg("terminating walreceiver process due to administrator command")));
00168     }
00169 }
00170 
00171 static void
00172 EnableWalRcvImmediateExit(void)
00173 {
00174     WalRcvImmediateInterruptOK = true;
00175     ProcessWalRcvInterrupts();
00176 }
00177 
00178 static void
00179 DisableWalRcvImmediateExit(void)
00180 {
00181     WalRcvImmediateInterruptOK = false;
00182     ProcessWalRcvInterrupts();
00183 }
00184 
00185 /* Main entry point for walreceiver process */
00186 void
00187 WalReceiverMain(void)
00188 {
00189     char        conninfo[MAXCONNINFO];
00190     XLogRecPtr  startpoint;
00191     TimeLineID  startpointTLI;
00192     TimeLineID  primaryTLI;
00193     bool        first_stream;
00194 
00195     /* use volatile pointer to prevent code rearrangement */
00196     volatile WalRcvData *walrcv = WalRcv;
00197     TimestampTz last_recv_timestamp;
00198     bool        ping_sent;
00199 
00200     /*
00201      * WalRcv should be set up already (if we are a backend, we inherit this
00202      * by fork() or EXEC_BACKEND mechanism from the postmaster).
00203      */
00204     Assert(walrcv != NULL);
00205 
00206     /*
00207      * Mark walreceiver as running in shared memory.
00208      *
00209      * Do this as early as possible, so that if we fail later on, we'll set
00210      * state to STOPPED. If we die before this, the startup process will keep
00211      * waiting for us to start up, until it times out.
00212      */
00213     SpinLockAcquire(&walrcv->mutex);
00214     Assert(walrcv->pid == 0);
00215     switch (walrcv->walRcvState)
00216     {
00217         case WALRCV_STOPPING:
00218             /* If we've already been requested to stop, don't start up. */
00219             walrcv->walRcvState = WALRCV_STOPPED;
00220             /* fall through */
00221 
00222         case WALRCV_STOPPED:
00223             SpinLockRelease(&walrcv->mutex);
00224             proc_exit(1);
00225             break;
00226 
00227         case WALRCV_STARTING:
00228             /* The usual case */
00229             break;
00230 
00231         case WALRCV_WAITING:
00232         case WALRCV_STREAMING:
00233         case WALRCV_RESTARTING:
00234         default:
00235             /* Shouldn't happen */
00236             elog(PANIC, "walreceiver still running according to shared memory state");
00237     }
00238     /* Advertise our PID so that the startup process can kill us */
00239     walrcv->pid = MyProcPid;
00240     walrcv->walRcvState = WALRCV_STREAMING;
00241 
00242     /* Fetch information required to start streaming */
00243     strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
00244     startpoint = walrcv->receiveStart;
00245     startpointTLI = walrcv->receiveStartTLI;
00246 
00247     /* Initialise to a sanish value */
00248     walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = GetCurrentTimestamp();
00249 
00250     SpinLockRelease(&walrcv->mutex);
00251 
00252     /* Arrange to clean up at walreceiver exit */
00253     on_shmem_exit(WalRcvDie, 0);
00254 
00255     OwnLatch(&walrcv->latch);
00256 
00257     /*
00258      * If possible, make this process a group leader, so that the postmaster
00259      * can signal any child processes too.  (walreceiver probably never has
00260      * any child processes, but for consistency we make all postmaster child
00261      * processes do this.)
00262      */
00263 #ifdef HAVE_SETSID
00264     if (setsid() < 0)
00265         elog(FATAL, "setsid() failed: %m");
00266 #endif
00267 
00268     /* Properly accept or ignore signals the postmaster might send us */
00269     pqsignal(SIGHUP, WalRcvSigHupHandler);      /* set flag to read config
00270                                                  * file */
00271     pqsignal(SIGINT, SIG_IGN);
00272     pqsignal(SIGTERM, WalRcvShutdownHandler);   /* request shutdown */
00273     pqsignal(SIGQUIT, WalRcvQuickDieHandler);   /* hard crash time */
00274     pqsignal(SIGALRM, SIG_IGN);
00275     pqsignal(SIGPIPE, SIG_IGN);
00276     pqsignal(SIGUSR1, WalRcvSigUsr1Handler);
00277     pqsignal(SIGUSR2, SIG_IGN);
00278 
00279     /* Reset some signals that are accepted by postmaster but not here */
00280     pqsignal(SIGCHLD, SIG_DFL);
00281     pqsignal(SIGTTIN, SIG_DFL);
00282     pqsignal(SIGTTOU, SIG_DFL);
00283     pqsignal(SIGCONT, SIG_DFL);
00284     pqsignal(SIGWINCH, SIG_DFL);
00285 
00286     /* We allow SIGQUIT (quickdie) at all times */
00287     sigdelset(&BlockSig, SIGQUIT);
00288 
00289     /* Load the libpq-specific functions */
00290     load_file("libpqwalreceiver", false);
00291     if (walrcv_connect == NULL || walrcv_startstreaming == NULL ||
00292         walrcv_endstreaming == NULL ||
00293         walrcv_identify_system == NULL ||
00294         walrcv_readtimelinehistoryfile == NULL ||
00295         walrcv_receive == NULL || walrcv_send == NULL ||
00296         walrcv_disconnect == NULL)
00297         elog(ERROR, "libpqwalreceiver didn't initialize correctly");
00298 
00299     /*
00300      * Create a resource owner to keep track of our resources (not clear that
00301      * we need this, but may as well have one).
00302      */
00303     CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
00304 
00305     /* Unblock signals (they were blocked when the postmaster forked us) */
00306     PG_SETMASK(&UnBlockSig);
00307 
00308     /* Establish the connection to the primary for XLOG streaming */
00309     EnableWalRcvImmediateExit();
00310     walrcv_connect(conninfo);
00311     DisableWalRcvImmediateExit();
00312 
00313     first_stream = true;
00314     for (;;)
00315     {
00316         /*
00317          * Check that we're connected to a valid server using the
00318          * IDENTIFY_SYSTEM replication command,
00319          */
00320         EnableWalRcvImmediateExit();
00321         walrcv_identify_system(&primaryTLI);
00322         DisableWalRcvImmediateExit();
00323 
00324         /*
00325          * Confirm that the current timeline of the primary is the same or
00326          * ahead of ours.
00327          */
00328         if (primaryTLI < startpointTLI)
00329             ereport(ERROR,
00330                     (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
00331                             primaryTLI, startpointTLI)));
00332 
00333         /*
00334          * Get any missing history files. We do this always, even when we're
00335          * not interested in that timeline, so that if we're promoted to become
00336          * the master later on, we don't select the same timeline that was
00337          * already used in the current master. This isn't bullet-proof - you'll
00338          * need some external software to manage your cluster if you need to
00339          * ensure that a unique timeline id is chosen in every case, but let's
00340          * avoid the confusion of timeline id collisions where we can.
00341          */
00342         WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
00343 
00344         /*
00345          * Start streaming.
00346          *
00347          * We'll try to start at the requested starting point and timeline,
00348          * even if it's different from the server's latest timeline. In case
00349          * we've already reached the end of the old timeline, the server will
00350          * finish the streaming immediately, and we will go back to await
00351          * orders from the startup process. If recovery_target_timeline is
00352          * 'latest', the startup process will scan pg_xlog and find the new
00353          * history file, bump recovery target timeline, and ask us to restart
00354          * on the new timeline.
00355          */
00356         ThisTimeLineID = startpointTLI;
00357         if (walrcv_startstreaming(startpointTLI, startpoint))
00358         {
00359             bool endofwal = false;
00360 
00361             if (first_stream)
00362                 ereport(LOG,
00363                         (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
00364                                 (uint32) (startpoint >> 32), (uint32) startpoint,
00365                                 startpointTLI)));
00366             else
00367                 ereport(LOG,
00368                         (errmsg("restarted WAL streaming at %X/%X on timeline %u",
00369                                 (uint32) (startpoint >> 32), (uint32) startpoint,
00370                                 startpointTLI)));
00371             first_stream = false;
00372 
00373             /* Initialize LogstreamResult and buffers for processing messages */
00374             LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
00375             initStringInfo(&reply_message);
00376             initStringInfo(&incoming_message);
00377 
00378             /* Initialize the last recv timestamp */
00379             last_recv_timestamp = GetCurrentTimestamp();
00380             ping_sent = false;
00381 
00382             /* Loop until end-of-streaming or error */
00383             while (!endofwal)
00384             {
00385                 char       *buf;
00386                 int         len;
00387 
00388                 /*
00389                  * Emergency bailout if postmaster has died.  This is to avoid
00390                  * the necessity for manual cleanup of all postmaster children.
00391                  */
00392                 if (!PostmasterIsAlive())
00393                     exit(1);
00394 
00395                 /*
00396                  * Exit walreceiver if we're not in recovery. This should not
00397                  * happen, but cross-check the status here.
00398                  */
00399                 if (!RecoveryInProgress())
00400                     ereport(FATAL,
00401                             (errmsg("cannot continue WAL streaming, recovery has already ended")));
00402 
00403                 /* Process any requests or signals received recently */
00404                 ProcessWalRcvInterrupts();
00405 
00406                 if (got_SIGHUP)
00407                 {
00408                     got_SIGHUP = false;
00409                     ProcessConfigFile(PGC_SIGHUP);
00410                     XLogWalRcvSendHSFeedback(true);
00411                 }
00412 
00413                 /* Wait a while for data to arrive */
00414                 len = walrcv_receive(NAPTIME_PER_CYCLE, &buf);
00415                 if (len != 0)
00416                 {
00417                     /*
00418                      * Process the received data, and any subsequent data we
00419                      * can read without blocking.
00420                      */
00421                     for (;;)
00422                     {
00423                         if (len > 0)
00424                         {
00425                             /* Something was received from master, so reset timeout */
00426                             last_recv_timestamp = GetCurrentTimestamp();
00427                             ping_sent = false;
00428                             XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
00429                         }
00430                         else if (len == 0)
00431                             break;
00432                         else if (len < 0)
00433                         {
00434                             ereport(LOG,
00435                                     (errmsg("replication terminated by primary server"),
00436                                      errdetail("End of WAL reached on timeline %u at %X/%X",
00437                                                startpointTLI,
00438                                                (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
00439                             endofwal = true;
00440                             break;
00441                         }
00442                         len = walrcv_receive(0, &buf);
00443                     }
00444 
00445                     /* Let the master know that we received some data. */
00446                     XLogWalRcvSendReply(false, false);
00447 
00448                     /*
00449                      * If we've written some records, flush them to disk and
00450                      * let the startup process and primary server know about
00451                      * them.
00452                      */
00453                     XLogWalRcvFlush(false);
00454                 }
00455                 else
00456                 {
00457                     /*
00458                      * We didn't receive anything new. If we haven't heard
00459                      * anything from the server for more than
00460                      * wal_receiver_timeout / 2, ping the server. Also, if it's
00461                      * been longer than wal_receiver_status_interval since the
00462                      * last update we sent, send a status update to the master
00463                      * anyway, to report any progress in applying WAL.
00464                      */
00465                     bool requestReply = false;
00466 
00467                     /*
00468                      * Check if time since last receive from standby has
00469                      * reached the configured limit.
00470                      */
00471                     if (wal_receiver_timeout > 0)
00472                     {
00473                         TimestampTz now = GetCurrentTimestamp();
00474                         TimestampTz timeout;
00475 
00476                         timeout =
00477                             TimestampTzPlusMilliseconds(last_recv_timestamp,
00478                                                         wal_receiver_timeout);
00479 
00480                         if (now >= timeout)
00481                             ereport(ERROR,
00482                                     (errmsg("terminating walreceiver due to timeout")));
00483 
00484                         /*
00485                          * We didn't receive anything new, for half of receiver
00486                          * replication timeout. Ping the server.
00487                          */
00488                         if (!ping_sent)
00489                         {
00490                             timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
00491                                                                   (wal_receiver_timeout/2));
00492                             if (now >= timeout)
00493                             {
00494                                 requestReply = true;
00495                                 ping_sent = true;
00496                             }
00497                         }
00498                     }
00499 
00500                     XLogWalRcvSendReply(requestReply, requestReply);
00501                     XLogWalRcvSendHSFeedback(false);
00502                 }
00503             }
00504 
00505             /*
00506              * The backend finished streaming. Exit streaming COPY-mode from
00507              * our side, too.
00508              */
00509             EnableWalRcvImmediateExit();
00510             walrcv_endstreaming(&primaryTLI);
00511             DisableWalRcvImmediateExit();
00512 
00513             /*
00514              * If the server had switched to a new timeline that we didn't know
00515              * about when we began streaming, fetch its timeline history file
00516              * now.
00517              */
00518             WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
00519         }
00520         else
00521             ereport(LOG,
00522                     (errmsg("primary server contains no more WAL on requested timeline %u",
00523                             startpointTLI)));
00524 
00525         /*
00526          * End of WAL reached on the requested timeline. Close the last
00527          * segment, and await for new orders from the startup process.
00528          */
00529         if (recvFile >= 0)
00530         {
00531             char        xlogfname[MAXFNAMELEN];
00532 
00533             XLogWalRcvFlush(false);
00534             if (close(recvFile) != 0)
00535                 ereport(PANIC,
00536                         (errcode_for_file_access(),
00537                          errmsg("could not close log segment %s: %m",
00538                                 XLogFileNameP(recvFileTLI, recvSegNo))));
00539 
00540             /*
00541              * Create .done file forcibly to prevent the streamed segment from
00542              * being archived later.
00543              */
00544             XLogFileName(xlogfname, recvFileTLI, recvSegNo);
00545             XLogArchiveForceDone(xlogfname);
00546         }
00547         recvFile = -1;
00548 
00549         elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
00550         WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
00551     }
00552     /* not reached */
00553 }
00554 
00555 /*
00556  * Wait for startup process to set receiveStart and receiveStartTLI.
00557  */
00558 static void
00559 WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
00560 {
00561     /* use volatile pointer to prevent code rearrangement */
00562     volatile WalRcvData *walrcv = WalRcv;
00563     int         state;
00564 
00565     SpinLockAcquire(&walrcv->mutex);
00566     state = walrcv->walRcvState;
00567     if (state != WALRCV_STREAMING)
00568     {
00569         SpinLockRelease(&walrcv->mutex);
00570         if (state == WALRCV_STOPPING)
00571             proc_exit(0);
00572         else
00573             elog(FATAL, "unexpected walreceiver state");
00574     }
00575     walrcv->walRcvState = WALRCV_WAITING;
00576     walrcv->receiveStart = InvalidXLogRecPtr;
00577     walrcv->receiveStartTLI = 0;
00578     SpinLockRelease(&walrcv->mutex);
00579 
00580     if (update_process_title)
00581         set_ps_display("idle", false);
00582 
00583     /*
00584      * nudge startup process to notice that we've stopped streaming and are
00585      * now waiting for instructions.
00586      */
00587     WakeupRecovery();
00588     for (;;)
00589     {
00590         ResetLatch(&walrcv->latch);
00591 
00592         /*
00593          * Emergency bailout if postmaster has died.  This is to avoid the
00594          * necessity for manual cleanup of all postmaster children.
00595          */
00596         if (!PostmasterIsAlive())
00597             exit(1);
00598 
00599         ProcessWalRcvInterrupts();
00600 
00601         SpinLockAcquire(&walrcv->mutex);
00602         Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
00603                walrcv->walRcvState == WALRCV_WAITING ||
00604                walrcv->walRcvState == WALRCV_STOPPING);
00605         if (walrcv->walRcvState == WALRCV_RESTARTING)
00606         {
00607             /* we don't expect primary_conninfo to change */
00608             *startpoint = walrcv->receiveStart;
00609             *startpointTLI = walrcv->receiveStartTLI;
00610             walrcv->walRcvState = WALRCV_STREAMING;
00611             SpinLockRelease(&walrcv->mutex);
00612             break;
00613         }
00614         if (walrcv->walRcvState == WALRCV_STOPPING)
00615         {
00616             /*
00617              * We should've received SIGTERM if the startup process wants
00618              * us to die, but might as well check it here too.
00619              */
00620             SpinLockRelease(&walrcv->mutex);
00621             exit(1);
00622         }
00623         SpinLockRelease(&walrcv->mutex);
00624 
00625         WaitLatch(&walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0);
00626     }
00627 
00628     if (update_process_title)
00629     {
00630         char        activitymsg[50];
00631 
00632         snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
00633                  (uint32) (*startpoint >> 32),
00634                  (uint32) *startpoint);
00635         set_ps_display(activitymsg, false);
00636     }
00637 }
00638 
00639 /*
00640  * Fetch any missing timeline history files between 'first' and 'last'
00641  * (inclusive) from the server.
00642  */
00643 static void
00644 WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
00645 {
00646     TimeLineID tli;
00647 
00648     for (tli = first; tli <= last; tli++)
00649     {
00650         /* there's no history file for timeline 1 */
00651         if (tli != 1 && !existsTimeLineHistory(tli))
00652         {
00653             char       *fname;
00654             char       *content;
00655             int         len;
00656             char        expectedfname[MAXFNAMELEN];
00657 
00658             ereport(LOG,
00659                     (errmsg("fetching timeline history file for timeline %u from primary server",
00660                             tli)));
00661 
00662             EnableWalRcvImmediateExit();
00663             walrcv_readtimelinehistoryfile(tli, &fname, &content, &len);
00664             DisableWalRcvImmediateExit();
00665 
00666             /*
00667              * Check that the filename on the master matches what we calculated
00668              * ourselves. This is just a sanity check, it should always match.
00669              */
00670             TLHistoryFileName(expectedfname, tli);
00671             if (strcmp(fname, expectedfname) != 0)
00672                 ereport(ERROR,
00673                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
00674                          errmsg_internal("primary reported unexpected filename for timeline history file of timeline %u",
00675                                          tli)));
00676 
00677             /*
00678              * Write the file to pg_xlog.
00679              */
00680             writeTimeLineHistoryFile(tli, content, len);
00681 
00682             pfree(fname);
00683             pfree(content);
00684         }
00685     }
00686 }
00687 
00688 /*
00689  * Mark us as STOPPED in shared memory at exit.
00690  */
00691 static void
00692 WalRcvDie(int code, Datum arg)
00693 {
00694     /* use volatile pointer to prevent code rearrangement */
00695     volatile WalRcvData *walrcv = WalRcv;
00696 
00697     /* Ensure that all WAL records received are flushed to disk */
00698     XLogWalRcvFlush(true);
00699 
00700     DisownLatch(&walrcv->latch);
00701 
00702     SpinLockAcquire(&walrcv->mutex);
00703     Assert(walrcv->walRcvState == WALRCV_STREAMING ||
00704            walrcv->walRcvState == WALRCV_RESTARTING ||
00705            walrcv->walRcvState == WALRCV_STARTING ||
00706            walrcv->walRcvState == WALRCV_WAITING ||
00707            walrcv->walRcvState == WALRCV_STOPPING);
00708     Assert(walrcv->pid == MyProcPid);
00709     walrcv->walRcvState = WALRCV_STOPPED;
00710     walrcv->pid = 0;
00711     SpinLockRelease(&walrcv->mutex);
00712 
00713     /* Terminate the connection gracefully. */
00714     if (walrcv_disconnect != NULL)
00715         walrcv_disconnect();
00716 
00717     /* Wake up the startup process to notice promptly that we're gone */
00718     WakeupRecovery();
00719 }
00720 
00721 /* SIGHUP: set flag to re-read config file at next convenient time */
00722 static void
00723 WalRcvSigHupHandler(SIGNAL_ARGS)
00724 {
00725     got_SIGHUP = true;
00726 }
00727 
00728 
00729 /* SIGUSR1: used by latch mechanism */
00730 static void
00731 WalRcvSigUsr1Handler(SIGNAL_ARGS)
00732 {
00733     latch_sigusr1_handler();
00734 }
00735 
00736 /* SIGTERM: set flag for main loop, or shutdown immediately if safe */
00737 static void
00738 WalRcvShutdownHandler(SIGNAL_ARGS)
00739 {
00740     int         save_errno = errno;
00741 
00742     got_SIGTERM = true;
00743 
00744     SetLatch(&WalRcv->latch);
00745 
00746     /* Don't joggle the elbow of proc_exit */
00747     if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
00748         ProcessWalRcvInterrupts();
00749 
00750     errno = save_errno;
00751 }
00752 
00753 /*
00754  * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
00755  *
00756  * Some backend has bought the farm, so we need to stop what we're doing and
00757  * exit.
00758  */
00759 static void
00760 WalRcvQuickDieHandler(SIGNAL_ARGS)
00761 {
00762     PG_SETMASK(&BlockSig);
00763 
00764     /*
00765      * We DO NOT want to run proc_exit() callbacks -- we're here because
00766      * shared memory may be corrupted, so we don't want to try to clean up our
00767      * transaction.  Just nail the windows shut and get out of town.  Now that
00768      * there's an atexit callback to prevent third-party code from breaking
00769      * things by calling exit() directly, we have to reset the callbacks
00770      * explicitly to make this work as intended.
00771      */
00772     on_exit_reset();
00773 
00774     /*
00775      * Note we do exit(2) not exit(0).  This is to force the postmaster into a
00776      * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
00777      * backend.  This is necessary precisely because we don't clean up our
00778      * shared memory state.  (The "dead man switch" mechanism in pmsignal.c
00779      * should ensure the postmaster sees this as a crash, too, but no harm in
00780      * being doubly sure.)
00781      */
00782     exit(2);
00783 }
00784 
00785 /*
00786  * Accept the message from XLOG stream, and process it.
00787  */
00788 static void
00789 XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
00790 {
00791     int         hdrlen;
00792     XLogRecPtr  dataStart;
00793     XLogRecPtr  walEnd;
00794     TimestampTz sendTime;
00795     bool        replyRequested;
00796 
00797     resetStringInfo(&incoming_message);
00798 
00799     switch (type)
00800     {
00801         case 'w':               /* WAL records */
00802             {
00803                 /* copy message to StringInfo */
00804                 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
00805                 if (len < hdrlen)
00806                     ereport(ERROR,
00807                             (errcode(ERRCODE_PROTOCOL_VIOLATION),
00808                              errmsg_internal("invalid WAL message received from primary")));
00809                 appendBinaryStringInfo(&incoming_message, buf, hdrlen);
00810 
00811                 /* read the fields */
00812                 dataStart = pq_getmsgint64(&incoming_message);
00813                 walEnd = pq_getmsgint64(&incoming_message);
00814                 sendTime = IntegerTimestampToTimestampTz(
00815                     pq_getmsgint64(&incoming_message));
00816                 ProcessWalSndrMessage(walEnd, sendTime);
00817 
00818                 buf += hdrlen;
00819                 len -= hdrlen;
00820                 XLogWalRcvWrite(buf, len, dataStart);
00821                 break;
00822             }
00823         case 'k':               /* Keepalive */
00824             {
00825                 /* copy message to StringInfo */
00826                 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
00827                 if (len != hdrlen)
00828                     ereport(ERROR,
00829                             (errcode(ERRCODE_PROTOCOL_VIOLATION),
00830                              errmsg_internal("invalid keepalive message received from primary")));
00831                 appendBinaryStringInfo(&incoming_message, buf, hdrlen);
00832 
00833                 /* read the fields */
00834                 walEnd = pq_getmsgint64(&incoming_message);
00835                 sendTime = IntegerTimestampToTimestampTz(
00836                     pq_getmsgint64(&incoming_message));
00837                 replyRequested = pq_getmsgbyte(&incoming_message);
00838 
00839                 ProcessWalSndrMessage(walEnd, sendTime);
00840 
00841                 /* If the primary requested a reply, send one immediately */
00842                 if (replyRequested)
00843                     XLogWalRcvSendReply(true, false);
00844                 break;
00845             }
00846         default:
00847             ereport(ERROR,
00848                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
00849                      errmsg_internal("invalid replication message type %d",
00850                                      type)));
00851     }
00852 }
00853 
00854 /*
00855  * Write XLOG data to disk.
00856  */
00857 static void
00858 XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
00859 {
00860     int         startoff;
00861     int         byteswritten;
00862 
00863     while (nbytes > 0)
00864     {
00865         int         segbytes;
00866 
00867         if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo))
00868         {
00869             bool        use_existent;
00870 
00871             /*
00872              * fsync() and close current file before we switch to next one. We
00873              * would otherwise have to reopen this file to fsync it later
00874              */
00875             if (recvFile >= 0)
00876             {
00877                 char        xlogfname[MAXFNAMELEN];
00878 
00879                 XLogWalRcvFlush(false);
00880 
00881                 /*
00882                  * XLOG segment files will be re-read by recovery in startup
00883                  * process soon, so we don't advise the OS to release cache
00884                  * pages associated with the file like XLogFileClose() does.
00885                  */
00886                 if (close(recvFile) != 0)
00887                     ereport(PANIC,
00888                             (errcode_for_file_access(),
00889                              errmsg("could not close log segment %s: %m",
00890                                     XLogFileNameP(recvFileTLI, recvSegNo))));
00891 
00892                 /*
00893                  * Create .done file forcibly to prevent the streamed segment from
00894                  * being archived later.
00895                  */
00896                 XLogFileName(xlogfname, recvFileTLI, recvSegNo);
00897                 XLogArchiveForceDone(xlogfname);
00898             }
00899             recvFile = -1;
00900 
00901             /* Create/use new log file */
00902             XLByteToSeg(recptr, recvSegNo);
00903             use_existent = true;
00904             recvFile = XLogFileInit(recvSegNo, &use_existent, true);
00905             recvFileTLI = ThisTimeLineID;
00906             recvOff = 0;
00907         }
00908 
00909         /* Calculate the start offset of the received logs */
00910         startoff = recptr % XLogSegSize;
00911 
00912         if (startoff + nbytes > XLogSegSize)
00913             segbytes = XLogSegSize - startoff;
00914         else
00915             segbytes = nbytes;
00916 
00917         /* Need to seek in the file? */
00918         if (recvOff != startoff)
00919         {
00920             if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
00921                 ereport(PANIC,
00922                         (errcode_for_file_access(),
00923                          errmsg("could not seek in log segment %s, to offset %u: %m",
00924                                 XLogFileNameP(recvFileTLI, recvSegNo),
00925                                 startoff)));
00926             recvOff = startoff;
00927         }
00928 
00929         /* OK to write the logs */
00930         errno = 0;
00931 
00932         byteswritten = write(recvFile, buf, segbytes);
00933         if (byteswritten <= 0)
00934         {
00935             /* if write didn't set errno, assume no disk space */
00936             if (errno == 0)
00937                 errno = ENOSPC;
00938             ereport(PANIC,
00939                     (errcode_for_file_access(),
00940                      errmsg("could not write to log segment %s "
00941                             "at offset %u, length %lu: %m",
00942                             XLogFileNameP(recvFileTLI, recvSegNo),
00943                             recvOff, (unsigned long) segbytes)));
00944         }
00945 
00946         /* Update state for write */
00947         recptr += byteswritten;
00948 
00949         recvOff += byteswritten;
00950         nbytes -= byteswritten;
00951         buf += byteswritten;
00952 
00953         LogstreamResult.Write = recptr;
00954     }
00955 }
00956 
00957 /*
00958  * Flush the log to disk.
00959  *
00960  * If we're in the midst of dying, it's unwise to do anything that might throw
00961  * an error, so we skip sending a reply in that case.
00962  */
00963 static void
00964 XLogWalRcvFlush(bool dying)
00965 {
00966     if (LogstreamResult.Flush < LogstreamResult.Write)
00967     {
00968         /* use volatile pointer to prevent code rearrangement */
00969         volatile WalRcvData *walrcv = WalRcv;
00970 
00971         issue_xlog_fsync(recvFile, recvSegNo);
00972 
00973         LogstreamResult.Flush = LogstreamResult.Write;
00974 
00975         /* Update shared-memory status */
00976         SpinLockAcquire(&walrcv->mutex);
00977         if (walrcv->receivedUpto < LogstreamResult.Flush)
00978         {
00979             walrcv->latestChunkStart = walrcv->receivedUpto;
00980             walrcv->receivedUpto = LogstreamResult.Flush;
00981             walrcv->receivedTLI = ThisTimeLineID;
00982         }
00983         SpinLockRelease(&walrcv->mutex);
00984 
00985         /* Signal the startup process and walsender that new WAL has arrived */
00986         WakeupRecovery();
00987         if (AllowCascadeReplication())
00988             WalSndWakeup();
00989 
00990         /* Report XLOG streaming progress in PS display */
00991         if (update_process_title)
00992         {
00993             char        activitymsg[50];
00994 
00995             snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
00996                      (uint32) (LogstreamResult.Write >> 32),
00997                      (uint32) LogstreamResult.Write);
00998             set_ps_display(activitymsg, false);
00999         }
01000 
01001         /* Also let the master know that we made some progress */
01002         if (!dying)
01003             XLogWalRcvSendReply(false, false);
01004     }
01005 }
01006 
01007 /*
01008  * Send reply message to primary, indicating our current XLOG positions, oldest
01009  * xmin and the current time.
01010  *
01011  * If 'force' is not set, the message is only sent if enough time has
01012  * passed since last status update to reach wal_receiver_status_interval.
01013  * If wal_receiver_status_interval is disabled altogether and 'force' is
01014  * false, this is a no-op.
01015  *
01016  * If 'requestReply' is true, requests the server to reply immediately upon
01017  * receiving this message. This is used for heartbearts, when approaching
01018  * wal_receiver_timeout.
01019  */
01020 static void
01021 XLogWalRcvSendReply(bool force, bool requestReply)
01022 {
01023     static XLogRecPtr writePtr = 0;
01024     static XLogRecPtr flushPtr = 0;
01025     XLogRecPtr  applyPtr;
01026     static TimestampTz sendTime = 0;
01027     TimestampTz now;
01028 
01029     /*
01030      * If the user doesn't want status to be reported to the master, be sure
01031      * to exit before doing anything at all.
01032      */
01033     if (!force && wal_receiver_status_interval <= 0)
01034         return;
01035 
01036     /* Get current timestamp. */
01037     now = GetCurrentTimestamp();
01038 
01039     /*
01040      * We can compare the write and flush positions to the last message we
01041      * sent without taking any lock, but the apply position requires a spin
01042      * lock, so we don't check that unless something else has changed or 10
01043      * seconds have passed.  This means that the apply log position will
01044      * appear, from the master's point of view, to lag slightly, but since
01045      * this is only for reporting purposes and only on idle systems, that's
01046      * probably OK.
01047      */
01048     if (!force
01049         && writePtr == LogstreamResult.Write
01050         && flushPtr == LogstreamResult.Flush
01051         && !TimestampDifferenceExceeds(sendTime, now,
01052                                        wal_receiver_status_interval * 1000))
01053         return;
01054     sendTime = now;
01055 
01056     /* Construct a new message */
01057     writePtr = LogstreamResult.Write;
01058     flushPtr = LogstreamResult.Flush;
01059     applyPtr = GetXLogReplayRecPtr(NULL);
01060 
01061     resetStringInfo(&reply_message);
01062     pq_sendbyte(&reply_message, 'r');
01063     pq_sendint64(&reply_message, writePtr);
01064     pq_sendint64(&reply_message, flushPtr);
01065     pq_sendint64(&reply_message, applyPtr);
01066     pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
01067     pq_sendbyte(&reply_message, requestReply ? 1 : 0);
01068 
01069     /* Send it */
01070     elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
01071          (uint32) (writePtr >> 32), (uint32) writePtr,
01072          (uint32) (flushPtr >> 32), (uint32) flushPtr,
01073          (uint32) (applyPtr >> 32), (uint32) applyPtr,
01074          requestReply ? " (reply requested)" : "");
01075 
01076     walrcv_send(reply_message.data, reply_message.len);
01077 }
01078 
01079 /*
01080  * Send hot standby feedback message to primary, plus the current time,
01081  * in case they don't have a watch.
01082  *
01083  * If the user disables feedback, send one final message to tell sender
01084  * to forget about the xmin on this standby.
01085  */
01086 static void
01087 XLogWalRcvSendHSFeedback(bool immed)
01088 {
01089     TimestampTz now;
01090     TransactionId nextXid;
01091     uint32      nextEpoch;
01092     TransactionId xmin;
01093     static TimestampTz sendTime = 0;
01094     static bool master_has_standby_xmin = false;
01095 
01096     /*
01097      * If the user doesn't want status to be reported to the master, be sure
01098      * to exit before doing anything at all.
01099      */
01100     if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
01101         !master_has_standby_xmin)
01102         return;
01103 
01104     /* Get current timestamp. */
01105     now = GetCurrentTimestamp();
01106 
01107     if (!immed)
01108     {
01109         /*
01110          * Send feedback at most once per wal_receiver_status_interval.
01111          */
01112         if (!TimestampDifferenceExceeds(sendTime, now,
01113                                     wal_receiver_status_interval * 1000))
01114             return;
01115         sendTime = now;
01116     }
01117 
01118     /*
01119      * If Hot Standby is not yet active there is nothing to send. Check this
01120      * after the interval has expired to reduce number of calls.
01121      */
01122     if (!HotStandbyActive())
01123     {
01124         Assert(!master_has_standby_xmin);
01125         return;
01126     }
01127 
01128     /*
01129      * Make the expensive call to get the oldest xmin once we are certain
01130      * everything else has been checked.
01131      */
01132     if (hot_standby_feedback)
01133         xmin = GetOldestXmin(true, false);
01134     else
01135         xmin = InvalidTransactionId;
01136 
01137     /*
01138      * Get epoch and adjust if nextXid and oldestXmin are different sides of
01139      * the epoch boundary.
01140      */
01141     GetNextXidAndEpoch(&nextXid, &nextEpoch);
01142     if (nextXid < xmin)
01143         nextEpoch--;
01144 
01145     elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
01146          xmin, nextEpoch);
01147 
01148     /* Construct the the message and send it. */
01149     resetStringInfo(&reply_message);
01150     pq_sendbyte(&reply_message, 'h');
01151     pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
01152     pq_sendint(&reply_message, xmin, 4);
01153     pq_sendint(&reply_message, nextEpoch, 4);
01154     walrcv_send(reply_message.data, reply_message.len);
01155     if (TransactionIdIsValid(xmin))
01156         master_has_standby_xmin = true;
01157     else
01158         master_has_standby_xmin = false;
01159 }
01160 
01161 /*
01162  * Update shared memory status upon receiving a message from primary.
01163  *
01164  * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
01165  * message, reported by primary.
01166  */
01167 static void
01168 ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
01169 {
01170     /* use volatile pointer to prevent code rearrangement */
01171     volatile WalRcvData *walrcv = WalRcv;
01172 
01173     TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
01174 
01175     /* Update shared-memory status */
01176     SpinLockAcquire(&walrcv->mutex);
01177     if (walrcv->latestWalEnd < walEnd)
01178         walrcv->latestWalEndTime = sendTime;
01179     walrcv->latestWalEnd = walEnd;
01180     walrcv->lastMsgSendTime = sendTime;
01181     walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
01182     SpinLockRelease(&walrcv->mutex);
01183 
01184     if (log_min_messages <= DEBUG2)
01185         elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
01186              timestamptz_to_str(sendTime),
01187              timestamptz_to_str(lastMsgReceiptTime),
01188              GetReplicationApplyDelay(),
01189              GetReplicationTransferLatency());
01190 }