Header And Logo

PostgreSQL
| The world's most advanced open source database.

parallel.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * parallel.c
00004  *
00005  *  Parallel support for the pg_dump archiver
00006  *
00007  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
00008  * Portions Copyright (c) 1994, Regents of the University of California
00009  *
00010  *  The author is not responsible for loss or damages that may
00011  *  result from its use.
00012  *
00013  * IDENTIFICATION
00014  *      src/bin/pg_dump/parallel.c
00015  *
00016  *-------------------------------------------------------------------------
00017  */
00018 
00019 #include "postgres_fe.h"
00020 
00021 #include "pg_backup_utils.h"
00022 #include "parallel.h"
00023 
00024 #ifndef WIN32
00025 #include <sys/types.h>
00026 #include <sys/wait.h>
00027 #include "signal.h"
00028 #include <unistd.h>
00029 #include <fcntl.h>
00030 #endif
00031 
00032 #define PIPE_READ                           0
00033 #define PIPE_WRITE                          1
00034 
00035 /* file-scope variables */
00036 #ifdef WIN32
00037 static unsigned int tMasterThreadId = 0;
00038 static HANDLE termEvent = INVALID_HANDLE_VALUE;
00039 static int  pgpipe(int handles[2]);
00040 static int  piperead(int s, char *buf, int len);
00041 
00042 /*
00043  * Structure to hold info passed by _beginthreadex() to the function it calls
00044  * via its single allowed argument.
00045  */
00046 typedef struct
00047 {
00048     ArchiveHandle *AH;
00049     RestoreOptions *ropt;
00050     int         worker;
00051     int         pipeRead;
00052     int         pipeWrite;
00053 } WorkerInfo;
00054 
00055 #define pipewrite(a,b,c)    send(a,b,c,0)
00056 #else
00057 /*
00058  * aborting is only ever used in the master, the workers are fine with just
00059  * wantAbort.
00060  */
00061 static bool aborting = false;
00062 static volatile sig_atomic_t wantAbort = 0;
00063 
00064 #define pgpipe(a)           pipe(a)
00065 #define piperead(a,b,c)     read(a,b,c)
00066 #define pipewrite(a,b,c)    write(a,b,c)
00067 #endif
00068 
00069 typedef struct ShutdownInformation
00070 {
00071     ParallelState *pstate;
00072     Archive    *AHX;
00073 } ShutdownInformation;
00074 
00075 static ShutdownInformation shutdown_info;
00076 
00077 static const char *modulename = gettext_noop("parallel archiver");
00078 
00079 static ParallelSlot *GetMyPSlot(ParallelState *pstate);
00080 static void
00081 parallel_msg_master(ParallelSlot *slot, const char *modulename,
00082                     const char *fmt, va_list ap)
00083 __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 0)));
00084 static void archive_close_connection(int code, void *arg);
00085 static void ShutdownWorkersHard(ParallelState *pstate);
00086 static void WaitForTerminatingWorkers(ParallelState *pstate);
00087 
00088 #ifndef WIN32
00089 static void sigTermHandler(int signum);
00090 #endif
00091 static void SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker,
00092             RestoreOptions *ropt);
00093 static bool HasEveryWorkerTerminated(ParallelState *pstate);
00094 
00095 static void lockTableNoWait(ArchiveHandle *AH, TocEntry *te);
00096 static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
00097 static char *getMessageFromMaster(int pipefd[2]);
00098 static void sendMessageToMaster(int pipefd[2], const char *str);
00099 static int  select_loop(int maxFd, fd_set *workerset);
00100 static char *getMessageFromWorker(ParallelState *pstate,
00101                      bool do_wait, int *worker);
00102 static void sendMessageToWorker(ParallelState *pstate,
00103                     int worker, const char *str);
00104 static char *readMessageFromPipe(int fd);
00105 
00106 #define messageStartsWith(msg, prefix) \
00107     (strncmp(msg, prefix, strlen(prefix)) == 0)
00108 #define messageEquals(msg, pattern) \
00109     (strcmp(msg, pattern) == 0)
00110 
00111 #ifdef WIN32
00112 static void shutdown_parallel_dump_utils(int code, void *unused);
00113 bool parallel_init_done = false;
00114 static DWORD tls_index;
00115 DWORD mainThreadId;
00116 #endif
00117 
00118 
00119 #ifdef WIN32
00120 static void
00121 shutdown_parallel_dump_utils(int code, void *unused)
00122 {
00123     /* Call the cleanup function only from the main thread */
00124     if (mainThreadId == GetCurrentThreadId())
00125         WSACleanup();
00126 }
00127 #endif
00128 
00129 void
00130 init_parallel_dump_utils(void)
00131 {
00132 #ifdef WIN32
00133     if (!parallel_init_done)
00134     {
00135         WSADATA     wsaData;
00136         int         err;
00137 
00138         tls_index = TlsAlloc();
00139         mainThreadId = GetCurrentThreadId();
00140         err = WSAStartup(MAKEWORD(2, 2), &wsaData);
00141         if (err != 0)
00142         {
00143             fprintf(stderr, _("WSAStartup failed: %d\n"), err);
00144             exit_nicely(1);
00145         }
00146         on_exit_nicely(shutdown_parallel_dump_utils, NULL);
00147         parallel_init_done = true;
00148     }
00149 #endif
00150 }
00151 
00152 static ParallelSlot *
00153 GetMyPSlot(ParallelState *pstate)
00154 {
00155     int         i;
00156 
00157     for (i = 0; i < pstate->numWorkers; i++)
00158 #ifdef WIN32
00159         if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
00160 #else
00161         if (pstate->parallelSlot[i].pid == getpid())
00162 #endif
00163             return &(pstate->parallelSlot[i]);
00164 
00165     return NULL;
00166 }
00167 
00168 /*
00169  * Fail and die, with a message to stderr.  Parameters as for write_msg.
00170  *
00171  * This is defined in parallel.c, because in parallel mode, things are more
00172  * complicated. If the worker process does exit_horribly(), we forward its
00173  * last words to the master process. The master process then does
00174  * exit_horribly() with this error message itself and prints it normally.
00175  * After printing the message, exit_horribly() on the master will shut down
00176  * the remaining worker processes.
00177  */
00178 void
00179 exit_horribly(const char *modulename, const char *fmt,...)
00180 {
00181     va_list     ap;
00182     ParallelState *pstate = shutdown_info.pstate;
00183     ParallelSlot *slot;
00184 
00185     va_start(ap, fmt);
00186 
00187     if (pstate == NULL)
00188     {
00189         /* Not in parallel mode, just write to stderr */
00190         vwrite_msg(modulename, fmt, ap);
00191     }
00192     else
00193     {
00194         slot = GetMyPSlot(pstate);
00195 
00196         if (!slot)
00197             /* We're the parent, just write the message out */
00198             vwrite_msg(modulename, fmt, ap);
00199         else
00200             /* If we're a worker process, send the msg to the master process */
00201             parallel_msg_master(slot, modulename, fmt, ap);
00202     }
00203 
00204     va_end(ap);
00205 
00206     exit_nicely(1);
00207 }
00208 
00209 /* Sends the error message from the worker to the master process */
00210 static void
00211 parallel_msg_master(ParallelSlot *slot, const char *modulename,
00212                     const char *fmt, va_list ap)
00213 {
00214     char        buf[512];
00215     int         pipefd[2];
00216 
00217     pipefd[PIPE_READ] = slot->pipeRevRead;
00218     pipefd[PIPE_WRITE] = slot->pipeRevWrite;
00219 
00220     strcpy(buf, "ERROR ");
00221     vsnprintf(buf + strlen("ERROR "),
00222               sizeof(buf) - strlen("ERROR "), fmt, ap);
00223 
00224     sendMessageToMaster(pipefd, buf);
00225 }
00226 
00227 /*
00228  * A thread-local version of getLocalPQExpBuffer().
00229  *
00230  * Non-reentrant but reduces memory leakage. (On Windows the memory leakage
00231  * will be one buffer per thread, which is at least better than one per call).
00232  */
00233 static PQExpBuffer
00234 getThreadLocalPQExpBuffer(void)
00235 {
00236     /*
00237      * The Tls code goes awry if we use a static var, so we provide for both
00238      * static and auto, and omit any use of the static var when using Tls.
00239      */
00240     static PQExpBuffer s_id_return = NULL;
00241     PQExpBuffer id_return;
00242 
00243 #ifdef WIN32
00244     if (parallel_init_done)
00245         id_return = (PQExpBuffer) TlsGetValue(tls_index);       /* 0 when not set */
00246     else
00247         id_return = s_id_return;
00248 #else
00249     id_return = s_id_return;
00250 #endif
00251 
00252     if (id_return)              /* first time through? */
00253     {
00254         /* same buffer, just wipe contents */
00255         resetPQExpBuffer(id_return);
00256     }
00257     else
00258     {
00259         /* new buffer */
00260         id_return = createPQExpBuffer();
00261 #ifdef WIN32
00262         if (parallel_init_done)
00263             TlsSetValue(tls_index, id_return);
00264         else
00265             s_id_return = id_return;
00266 #else
00267         s_id_return = id_return;
00268 #endif
00269 
00270     }
00271 
00272     return id_return;
00273 }
00274 
00275 /*
00276  * pg_dump and pg_restore register the Archive pointer for the exit handler
00277  * (called from exit_horribly). This function mainly exists so that we can
00278  * keep shutdown_info in file scope only.
00279  */
00280 void
00281 on_exit_close_archive(Archive *AHX)
00282 {
00283     shutdown_info.AHX = AHX;
00284     on_exit_nicely(archive_close_connection, &shutdown_info);
00285 }
00286 
00287 /*
00288  * This function can close archives in both the parallel and non-parallel
00289  * case.
00290  */
00291 static void
00292 archive_close_connection(int code, void *arg)
00293 {
00294     ShutdownInformation *si = (ShutdownInformation *) arg;
00295 
00296     if (si->pstate)
00297     {
00298         ParallelSlot *slot = GetMyPSlot(si->pstate);
00299 
00300         if (!slot)
00301         {
00302             /*
00303              * We're the master: We have already printed out the message
00304              * passed to exit_horribly() either from the master itself or from
00305              * a worker process. Now we need to close our own database
00306              * connection (only open during parallel dump but not restore) and
00307              * shut down the remaining workers.
00308              */
00309             DisconnectDatabase(si->AHX);
00310 #ifndef WIN32
00311 
00312             /*
00313              * Setting aborting to true switches to best-effort-mode
00314              * (send/receive but ignore errors) in communicating with our
00315              * workers.
00316              */
00317             aborting = true;
00318 #endif
00319             ShutdownWorkersHard(si->pstate);
00320         }
00321         else if (slot->args->AH)
00322             DisconnectDatabase(&(slot->args->AH->public));
00323     }
00324     else if (si->AHX)
00325         DisconnectDatabase(si->AHX);
00326 }
00327 
00328 /*
00329  * If we have one worker that terminates for some reason, we'd like the other
00330  * threads to terminate as well (and not finish with their 70 GB table dump
00331  * first...). Now in UNIX we can just kill these processes, and let the signal
00332  * handler set wantAbort to 1. In Windows we set a termEvent and this serves
00333  * as the signal for everyone to terminate.
00334  */
00335 void
00336 checkAborting(ArchiveHandle *AH)
00337 {
00338 #ifdef WIN32
00339     if (WaitForSingleObject(termEvent, 0) == WAIT_OBJECT_0)
00340 #else
00341     if (wantAbort)
00342 #endif
00343         exit_horribly(modulename, "worker is terminating\n");
00344 }
00345 
00346 /*
00347  * Shut down any remaining workers, this has an implicit do_wait == true.
00348  *
00349  * The fastest way we can make the workers terminate gracefully is when
00350  * they are listening for new commands and we just tell them to terminate.
00351  */
00352 static void
00353 ShutdownWorkersHard(ParallelState *pstate)
00354 {
00355 #ifndef WIN32
00356     int         i;
00357 
00358     signal(SIGPIPE, SIG_IGN);
00359 
00360     /*
00361      * Close our write end of the sockets so that the workers know they can
00362      * exit.
00363      */
00364     for (i = 0; i < pstate->numWorkers; i++)
00365         closesocket(pstate->parallelSlot[i].pipeWrite);
00366 
00367     for (i = 0; i < pstate->numWorkers; i++)
00368         kill(pstate->parallelSlot[i].pid, SIGTERM);
00369 #else
00370     /* The workers monitor this event via checkAborting(). */
00371     SetEvent(termEvent);
00372 #endif
00373 
00374     WaitForTerminatingWorkers(pstate);
00375 }
00376 
00377 /*
00378  * Wait for the termination of the processes using the OS-specific method.
00379  */
00380 static void
00381 WaitForTerminatingWorkers(ParallelState *pstate)
00382 {
00383     while (!HasEveryWorkerTerminated(pstate))
00384     {
00385         ParallelSlot *slot = NULL;
00386         int         j;
00387 
00388 #ifndef WIN32
00389         int         status;
00390         pid_t       pid = wait(&status);
00391 
00392         for (j = 0; j < pstate->numWorkers; j++)
00393             if (pstate->parallelSlot[j].pid == pid)
00394                 slot = &(pstate->parallelSlot[j]);
00395 #else
00396         uintptr_t   hThread;
00397         DWORD       ret;
00398         uintptr_t  *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
00399         int         nrun = 0;
00400 
00401         for (j = 0; j < pstate->numWorkers; j++)
00402             if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
00403             {
00404                 lpHandles[nrun] = pstate->parallelSlot[j].hThread;
00405                 nrun++;
00406             }
00407         ret = WaitForMultipleObjects(nrun, (HANDLE *) lpHandles, false, INFINITE);
00408         Assert(ret != WAIT_FAILED);
00409         hThread = lpHandles[ret - WAIT_OBJECT_0];
00410 
00411         for (j = 0; j < pstate->numWorkers; j++)
00412             if (pstate->parallelSlot[j].hThread == hThread)
00413                 slot = &(pstate->parallelSlot[j]);
00414 
00415         free(lpHandles);
00416 #endif
00417         Assert(slot);
00418 
00419         slot->workerStatus = WRKR_TERMINATED;
00420     }
00421     Assert(HasEveryWorkerTerminated(pstate));
00422 }
00423 
00424 #ifndef WIN32
00425 /* Signal handling (UNIX only) */
00426 static void
00427 sigTermHandler(int signum)
00428 {
00429     wantAbort = 1;
00430 }
00431 #endif
00432 
00433 /*
00434  * This function is called by both UNIX and Windows variants to set up a
00435  * worker process.
00436  */
00437 static void
00438 SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker,
00439             RestoreOptions *ropt)
00440 {
00441     /*
00442      * Call the setup worker function that's defined in the ArchiveHandle.
00443      *
00444      * We get the raw connection only for the reason that we can close it
00445      * properly when we shut down. This happens only that way when it is
00446      * brought down because of an error.
00447      */
00448     (AH->SetupWorkerPtr) ((Archive *) AH, ropt);
00449 
00450     Assert(AH->connection != NULL);
00451 
00452     WaitForCommands(AH, pipefd);
00453 
00454     closesocket(pipefd[PIPE_READ]);
00455     closesocket(pipefd[PIPE_WRITE]);
00456 }
00457 
00458 #ifdef WIN32
00459 static unsigned __stdcall
00460 init_spawned_worker_win32(WorkerInfo *wi)
00461 {
00462     ArchiveHandle *AH;
00463     int         pipefd[2] = {wi->pipeRead, wi->pipeWrite};
00464     int         worker = wi->worker;
00465     RestoreOptions *ropt = wi->ropt;
00466 
00467     AH = CloneArchive(wi->AH);
00468 
00469     free(wi);
00470     SetupWorker(AH, pipefd, worker, ropt);
00471 
00472     DeCloneArchive(AH);
00473     _endthreadex(0);
00474     return 0;
00475 }
00476 #endif
00477 
00478 /*
00479  * This function starts the parallel dump or restore by spawning off the
00480  * worker processes in both Unix and Windows. For Windows, it creates a number
00481  * of threads while it does a fork() on Unix.
00482  */
00483 ParallelState *
00484 ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
00485 {
00486     ParallelState *pstate;
00487     int         i;
00488     const size_t slotSize = AH->public.numWorkers * sizeof(ParallelSlot);
00489 
00490     Assert(AH->public.numWorkers > 0);
00491 
00492     /* Ensure stdio state is quiesced before forking */
00493     fflush(NULL);
00494 
00495     pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
00496 
00497     pstate->numWorkers = AH->public.numWorkers;
00498     pstate->parallelSlot = NULL;
00499 
00500     if (AH->public.numWorkers == 1)
00501         return pstate;
00502 
00503     pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize);
00504     memset((void *) pstate->parallelSlot, 0, slotSize);
00505 
00506     /*
00507      * Set the pstate in the shutdown_info. The exit handler uses pstate if
00508      * set and falls back to AHX otherwise.
00509      */
00510     shutdown_info.pstate = pstate;
00511     getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
00512 
00513 #ifdef WIN32
00514     tMasterThreadId = GetCurrentThreadId();
00515     termEvent = CreateEvent(NULL, true, false, "Terminate");
00516 #else
00517     signal(SIGTERM, sigTermHandler);
00518     signal(SIGINT, sigTermHandler);
00519     signal(SIGQUIT, sigTermHandler);
00520 #endif
00521 
00522     for (i = 0; i < pstate->numWorkers; i++)
00523     {
00524 #ifdef WIN32
00525         WorkerInfo *wi;
00526         uintptr_t   handle;
00527 #else
00528         pid_t       pid;
00529 #endif
00530         int         pipeMW[2],
00531                     pipeWM[2];
00532 
00533         if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
00534             exit_horribly(modulename,
00535                           "Cannot create communication channels: %s\n",
00536                           strerror(errno));
00537 
00538         pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
00539         pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
00540         pstate->parallelSlot[i].args->AH = NULL;
00541         pstate->parallelSlot[i].args->te = NULL;
00542 #ifdef WIN32
00543         /* Allocate a new structure for every worker */
00544         wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
00545 
00546         wi->ropt = ropt;
00547         wi->worker = i;
00548         wi->AH = AH;
00549         wi->pipeRead = pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ];
00550         wi->pipeWrite = pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];
00551 
00552         handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
00553                                 wi, 0, &(pstate->parallelSlot[i].threadId));
00554         pstate->parallelSlot[i].hThread = handle;
00555 #else
00556         pid = fork();
00557         if (pid == 0)
00558         {
00559             /* we are the worker */
00560             int         j;
00561             int         pipefd[2] = {pipeMW[PIPE_READ], pipeWM[PIPE_WRITE]};
00562 
00563             /*
00564              * Store the fds for the reverse communication in pstate. Actually
00565              * we only use this in case of an error and don't use pstate
00566              * otherwise in the worker process. On Windows we write to the
00567              * global pstate, in Unix we write to our process-local copy but
00568              * that's also where we'd retrieve this information back from.
00569              */
00570             pstate->parallelSlot[i].pipeRevRead = pipefd[PIPE_READ];
00571             pstate->parallelSlot[i].pipeRevWrite = pipefd[PIPE_WRITE];
00572             pstate->parallelSlot[i].pid = getpid();
00573 
00574             /*
00575              * Call CloneArchive on Unix as well even though technically we
00576              * don't need to because fork() gives us a copy in our own address
00577              * space already. But CloneArchive resets the state information
00578              * and also clones the database connection (for parallel dump)
00579              * which both seem kinda helpful.
00580              */
00581             pstate->parallelSlot[i].args->AH = CloneArchive(AH);
00582 
00583             /* close read end of Worker -> Master */
00584             closesocket(pipeWM[PIPE_READ]);
00585             /* close write end of Master -> Worker */
00586             closesocket(pipeMW[PIPE_WRITE]);
00587 
00588             /*
00589              * Close all inherited fds for communication of the master with
00590              * the other workers.
00591              */
00592             for (j = 0; j < i; j++)
00593             {
00594                 closesocket(pstate->parallelSlot[j].pipeRead);
00595                 closesocket(pstate->parallelSlot[j].pipeWrite);
00596             }
00597 
00598             SetupWorker(pstate->parallelSlot[i].args->AH, pipefd, i, ropt);
00599 
00600             exit(0);
00601         }
00602         else if (pid < 0)
00603             /* fork failed */
00604             exit_horribly(modulename,
00605                           "could not create worker process: %s\n",
00606                           strerror(errno));
00607 
00608         /* we are the Master, pid > 0 here */
00609         Assert(pid > 0);
00610 
00611         /* close read end of Master -> Worker */
00612         closesocket(pipeMW[PIPE_READ]);
00613         /* close write end of Worker -> Master */
00614         closesocket(pipeWM[PIPE_WRITE]);
00615 
00616         pstate->parallelSlot[i].pid = pid;
00617 #endif
00618 
00619         pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
00620         pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE];
00621     }
00622 
00623     return pstate;
00624 }
00625 
00626 /*
00627  * Tell all of our workers to terminate.
00628  *
00629  * Pretty straightforward routine, first we tell everyone to terminate, then
00630  * we listen to the workers' replies and finally close the sockets that we
00631  * have used for communication.
00632  */
00633 void
00634 ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
00635 {
00636     int         i;
00637 
00638     if (pstate->numWorkers == 1)
00639         return;
00640 
00641     Assert(IsEveryWorkerIdle(pstate));
00642 
00643     /* close the sockets so that the workers know they can exit */
00644     for (i = 0; i < pstate->numWorkers; i++)
00645     {
00646         closesocket(pstate->parallelSlot[i].pipeRead);
00647         closesocket(pstate->parallelSlot[i].pipeWrite);
00648     }
00649     WaitForTerminatingWorkers(pstate);
00650 
00651     /*
00652      * Remove the pstate again, so the exit handler in the parent will now
00653      * again fall back to closing AH->connection (if connected).
00654      */
00655     shutdown_info.pstate = NULL;
00656 
00657     free(pstate->parallelSlot);
00658     free(pstate);
00659 }
00660 
00661 
00662 /*
00663  * The sequence is the following (for dump, similar for restore):
00664  *
00665  * The master process starts the parallel backup in ParllelBackupStart, this
00666  * forks the worker processes which enter WaitForCommand().
00667  *
00668  * The master process dispatches an individual work item to one of the worker
00669  * processes in DispatchJobForTocEntry(). It calls
00670  * AH->MasterStartParallelItemPtr, a routine of the output format. This
00671  * function's arguments are the parents archive handle AH (containing the full
00672  * catalog information), the TocEntry that the worker should work on and a
00673  * T_Action act indicating whether this is a backup or a restore item.  The
00674  * function then converts the TocEntry assignment into a string that is then
00675  * sent over to the worker process. In the simplest case that would be
00676  * something like "DUMP 1234", with 1234 being the TocEntry id.
00677  *
00678  * The worker receives the message in the routine pointed to by
00679  * WorkerJobDumpPtr or WorkerJobRestorePtr. These are also pointers to
00680  * corresponding routines of the respective output format, e.g.
00681  * _WorkerJobDumpDirectory().
00682  *
00683  * Remember that we have forked off the workers only after we have read in the
00684  * catalog. That's why our worker processes can also access the catalog
00685  * information. Now they re-translate the textual representation to a TocEntry
00686  * on their side and do the required action (restore or dump).
00687  *
00688  * The result is again a textual string that is sent back to the master and is
00689  * interpreted by AH->MasterEndParallelItemPtr. This function can update state
00690  * or catalog information on the master's side, depending on the reply from
00691  * the worker process. In the end it returns status which is 0 for successful
00692  * execution.
00693  *
00694  * ---------------------------------------------------------------------
00695  * Master                                   Worker
00696  *
00697  *                                          enters WaitForCommands()
00698  * DispatchJobForTocEntry(...te...)
00699  *
00700  * [ Worker is IDLE ]
00701  *
00702  * arg = (MasterStartParallelItemPtr)()
00703  * send: DUMP arg
00704  *                                          receive: DUMP arg
00705  *                                          str = (WorkerJobDumpPtr)(arg)
00706  * [ Worker is WORKING ]                    ... gets te from arg ...
00707  *                                          ... dump te ...
00708  *                                          send: OK DUMP info
00709  *
00710  * In ListenToWorkers():
00711  *
00712  * [ Worker is FINISHED ]
00713  * receive: OK DUMP info
00714  * status = (MasterEndParallelItemPtr)(info)
00715  *
00716  * In ReapWorkerStatus(&ptr):
00717  * *ptr = status;
00718  * [ Worker is IDLE ]
00719  * ---------------------------------------------------------------------
00720  */
00721 void
00722 DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te,
00723                        T_Action act)
00724 {
00725     int         worker;
00726     char       *arg;
00727 
00728     /* our caller makes sure that at least one worker is idle */
00729     Assert(GetIdleWorker(pstate) != NO_SLOT);
00730     worker = GetIdleWorker(pstate);
00731     Assert(worker != NO_SLOT);
00732 
00733     arg = (AH->MasterStartParallelItemPtr) (AH, te, act);
00734 
00735     sendMessageToWorker(pstate, worker, arg);
00736 
00737     pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
00738     pstate->parallelSlot[worker].args->te = te;
00739 }
00740 
00741 /*
00742  * Find the first free parallel slot (if any).
00743  */
00744 int
00745 GetIdleWorker(ParallelState *pstate)
00746 {
00747     int         i;
00748 
00749     for (i = 0; i < pstate->numWorkers; i++)
00750         if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
00751             return i;
00752     return NO_SLOT;
00753 }
00754 
00755 /*
00756  * Return true iff every worker process is in the WRKR_TERMINATED state.
00757  */
00758 static bool
00759 HasEveryWorkerTerminated(ParallelState *pstate)
00760 {
00761     int         i;
00762 
00763     for (i = 0; i < pstate->numWorkers; i++)
00764         if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED)
00765             return false;
00766     return true;
00767 }
00768 
00769 /*
00770  * Return true iff every worker is in the WRKR_IDLE state.
00771  */
00772 bool
00773 IsEveryWorkerIdle(ParallelState *pstate)
00774 {
00775     int         i;
00776 
00777     for (i = 0; i < pstate->numWorkers; i++)
00778         if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
00779             return false;
00780     return true;
00781 }
00782 
00783 /*
00784  * ---------------------------------------------------------------------
00785  * One danger of the parallel backup is a possible deadlock:
00786  *
00787  * 1) Master dumps the schema and locks all tables in ACCESS SHARE mode.
00788  * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
00789  *    because the master holds a conflicting ACCESS SHARE lock).
00790  * 3) The worker process also requests an ACCESS SHARE lock to read the table.
00791  *    The worker's not granted that lock but is enqueued behind the ACCESS
00792  *    EXCLUSIVE lock request.
00793  * ---------------------------------------------------------------------
00794  *
00795  * Now what we do here is to just request a lock in ACCESS SHARE but with
00796  * NOWAIT in the worker prior to touching the table. If we don't get the lock,
00797  * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
00798  * are good to just fail the whole backup because we have detected a deadlock.
00799  */
00800 static void
00801 lockTableNoWait(ArchiveHandle *AH, TocEntry *te)
00802 {
00803     Archive    *AHX = (Archive *) AH;
00804     const char *qualId;
00805     PQExpBuffer query = createPQExpBuffer();
00806     PGresult   *res;
00807 
00808     Assert(AH->format == archDirectory);
00809     Assert(strcmp(te->desc, "BLOBS") != 0);
00810 
00811     appendPQExpBuffer(query,
00812                       "SELECT pg_namespace.nspname,"
00813                       "       pg_class.relname "
00814                       "  FROM pg_class "
00815                     "  JOIN pg_namespace on pg_namespace.oid = relnamespace "
00816                       " WHERE pg_class.oid = %d", te->catalogId.oid);
00817 
00818     res = PQexec(AH->connection, query->data);
00819 
00820     if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
00821         exit_horribly(modulename,
00822                       "could not get relation name for oid %d: %s\n",
00823                       te->catalogId.oid, PQerrorMessage(AH->connection));
00824 
00825     resetPQExpBuffer(query);
00826 
00827     qualId = fmtQualifiedId(AHX->remoteVersion,
00828                             PQgetvalue(res, 0, 0),
00829                             PQgetvalue(res, 0, 1));
00830 
00831     appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
00832                       qualId);
00833     PQclear(res);
00834 
00835     res = PQexec(AH->connection, query->data);
00836 
00837     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
00838         exit_horribly(modulename,
00839                       "could not obtain lock on relation \"%s\". This "
00840              "usually means that someone requested an ACCESS EXCLUSIVE lock "
00841               "on the table after the pg_dump parent process has gotten the "
00842                       "initial ACCESS SHARE lock on the table.\n", qualId);
00843 
00844     PQclear(res);
00845     destroyPQExpBuffer(query);
00846 }
00847 
00848 /*
00849  * That's the main routine for the worker.
00850  * When it starts up it enters this routine and waits for commands from the
00851  * master process. After having processed a command it comes back to here to
00852  * wait for the next command. Finally it will receive a TERMINATE command and
00853  * exit.
00854  */
00855 static void
00856 WaitForCommands(ArchiveHandle *AH, int pipefd[2])
00857 {
00858     char       *command;
00859     DumpId      dumpId;
00860     int         nBytes;
00861     char       *str = NULL;
00862     TocEntry   *te;
00863 
00864     for (;;)
00865     {
00866         if (!(command = getMessageFromMaster(pipefd)))
00867         {
00868             PQfinish(AH->connection);
00869             AH->connection = NULL;
00870             return;
00871         }
00872 
00873         if (messageStartsWith(command, "DUMP "))
00874         {
00875             Assert(AH->format == archDirectory);
00876             sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes);
00877             Assert(nBytes == strlen(command) - strlen("DUMP "));
00878 
00879             te = getTocEntryByDumpId(AH, dumpId);
00880             Assert(te != NULL);
00881 
00882             /*
00883              * Lock the table but with NOWAIT. Note that the parent is already
00884              * holding a lock. If we cannot acquire another ACCESS SHARE MODE
00885              * lock, then somebody else has requested an exclusive lock in the
00886              * meantime.  lockTableNoWait dies in this case to prevent a
00887              * deadlock.
00888              */
00889             if (strcmp(te->desc, "BLOBS") != 0)
00890                 lockTableNoWait(AH, te);
00891 
00892             /*
00893              * The message we return here has been pg_malloc()ed and we are
00894              * responsible for free()ing it.
00895              */
00896             str = (AH->WorkerJobDumpPtr) (AH, te);
00897             Assert(AH->connection != NULL);
00898             sendMessageToMaster(pipefd, str);
00899             free(str);
00900         }
00901         else if (messageStartsWith(command, "RESTORE "))
00902         {
00903             Assert(AH->format == archDirectory || AH->format == archCustom);
00904             Assert(AH->connection != NULL);
00905 
00906             sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes);
00907             Assert(nBytes == strlen(command) - strlen("RESTORE "));
00908 
00909             te = getTocEntryByDumpId(AH, dumpId);
00910             Assert(te != NULL);
00911 
00912             /*
00913              * The message we return here has been pg_malloc()ed and we are
00914              * responsible for free()ing it.
00915              */
00916             str = (AH->WorkerJobRestorePtr) (AH, te);
00917             Assert(AH->connection != NULL);
00918             sendMessageToMaster(pipefd, str);
00919             free(str);
00920         }
00921         else
00922             exit_horribly(modulename,
00923                           "Unknown command on communication channel: %s\n",
00924                           command);
00925     }
00926 }
00927 
00928 /*
00929  * ---------------------------------------------------------------------
00930  * Note the status change:
00931  *
00932  * DispatchJobForTocEntry       WRKR_IDLE -> WRKR_WORKING
00933  * ListenToWorkers              WRKR_WORKING -> WRKR_FINISHED / WRKR_TERMINATED
00934  * ReapWorkerStatus             WRKR_FINISHED -> WRKR_IDLE
00935  * ---------------------------------------------------------------------
00936  *
00937  * Just calling ReapWorkerStatus() when all workers are working might or might
00938  * not give you an idle worker because you need to call ListenToWorkers() in
00939  * between and only thereafter ReapWorkerStatus(). This is necessary in order
00940  * to get and deal with the status (=result) of the worker's execution.
00941  */
00942 void
00943 ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
00944 {
00945     int         worker;
00946     char       *msg;
00947 
00948     msg = getMessageFromWorker(pstate, do_wait, &worker);
00949 
00950     if (!msg)
00951     {
00952         if (do_wait)
00953             exit_horribly(modulename, "A worker process died unexpectedly\n");
00954         return;
00955     }
00956 
00957     if (messageStartsWith(msg, "OK "))
00958     {
00959         char       *statusString;
00960         TocEntry   *te;
00961 
00962         pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED;
00963         te = pstate->parallelSlot[worker].args->te;
00964         if (messageStartsWith(msg, "OK RESTORE "))
00965         {
00966             statusString = msg + strlen("OK RESTORE ");
00967             pstate->parallelSlot[worker].status =
00968                 (AH->MasterEndParallelItemPtr)
00969                 (AH, te, statusString, ACT_RESTORE);
00970         }
00971         else if (messageStartsWith(msg, "OK DUMP "))
00972         {
00973             statusString = msg + strlen("OK DUMP ");
00974             pstate->parallelSlot[worker].status =
00975                 (AH->MasterEndParallelItemPtr)
00976                 (AH, te, statusString, ACT_DUMP);
00977         }
00978         else
00979             exit_horribly(modulename,
00980                           "Invalid message received from worker: %s\n", msg);
00981     }
00982     else if (messageStartsWith(msg, "ERROR "))
00983     {
00984         Assert(AH->format == archDirectory || AH->format == archCustom);
00985         pstate->parallelSlot[worker].workerStatus = WRKR_TERMINATED;
00986         exit_horribly(modulename, "%s", msg + strlen("ERROR "));
00987     }
00988     else
00989         exit_horribly(modulename, "Invalid message received from worker: %s\n", msg);
00990 
00991     /* both Unix and Win32 return pg_malloc()ed space, so we free it */
00992     free(msg);
00993 }
00994 
00995 /*
00996  * This function is executed in the master process.
00997  *
00998  * This function is used to get the return value of a terminated worker
00999  * process. If a process has terminated, its status is stored in *status and
01000  * the id of the worker is returned.
01001  */
01002 int
01003 ReapWorkerStatus(ParallelState *pstate, int *status)
01004 {
01005     int         i;
01006 
01007     for (i = 0; i < pstate->numWorkers; i++)
01008     {
01009         if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED)
01010         {
01011             *status = pstate->parallelSlot[i].status;
01012             pstate->parallelSlot[i].status = 0;
01013             pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
01014             return i;
01015         }
01016     }
01017     return NO_SLOT;
01018 }
01019 
01020 /*
01021  * This function is executed in the master process.
01022  *
01023  * It looks for an idle worker process and only returns if there is one.
01024  */
01025 void
01026 EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate)
01027 {
01028     int         ret_worker;
01029     int         work_status;
01030 
01031     for (;;)
01032     {
01033         int         nTerm = 0;
01034 
01035         while ((ret_worker = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
01036         {
01037             if (work_status != 0)
01038                 exit_horribly(modulename, "Error processing a parallel work item.\n");
01039 
01040             nTerm++;
01041         }
01042 
01043         /*
01044          * We need to make sure that we have an idle worker before dispatching
01045          * the next item. If nTerm > 0 we already have that (quick check).
01046          */
01047         if (nTerm > 0)
01048             return;
01049 
01050         /* explicit check for an idle worker */
01051         if (GetIdleWorker(pstate) != NO_SLOT)
01052             return;
01053 
01054         /*
01055          * If we have no idle worker, read the result of one or more workers
01056          * and loop the loop to call ReapWorkerStatus() on them
01057          */
01058         ListenToWorkers(AH, pstate, true);
01059     }
01060 }
01061 
01062 /*
01063  * This function is executed in the master process.
01064  *
01065  * It waits for all workers to terminate.
01066  */
01067 void
01068 EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate)
01069 {
01070     int         work_status;
01071 
01072     if (!pstate || pstate->numWorkers == 1)
01073         return;
01074 
01075     /* Waiting for the remaining worker processes to finish */
01076     while (!IsEveryWorkerIdle(pstate))
01077     {
01078         if (ReapWorkerStatus(pstate, &work_status) == NO_SLOT)
01079             ListenToWorkers(AH, pstate, true);
01080         else if (work_status != 0)
01081             exit_horribly(modulename,
01082                           "Error processing a parallel work item\n");
01083     }
01084 }
01085 
01086 /*
01087  * This function is executed in the worker process.
01088  *
01089  * It returns the next message on the communication channel, blocking until it
01090  * becomes available.
01091  */
01092 static char *
01093 getMessageFromMaster(int pipefd[2])
01094 {
01095     return readMessageFromPipe(pipefd[PIPE_READ]);
01096 }
01097 
01098 /*
01099  * This function is executed in the worker process.
01100  *
01101  * It sends a message to the master on the communication channel.
01102  */
01103 static void
01104 sendMessageToMaster(int pipefd[2], const char *str)
01105 {
01106     int         len = strlen(str) + 1;
01107 
01108     if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
01109         exit_horribly(modulename,
01110                       "Error writing to the communication channel: %s\n",
01111                       strerror(errno));
01112 }
01113 
01114 /*
01115  * A select loop that repeats calling select until a descriptor in the read
01116  * set becomes readable. On Windows we have to check for the termination event
01117  * from time to time, on Unix we can just block forever.
01118  */
01119 static int
01120 select_loop(int maxFd, fd_set *workerset)
01121 {
01122     int         i;
01123     fd_set      saveSet = *workerset;
01124 
01125 #ifdef WIN32
01126     /* should always be the master */
01127     Assert(tMasterThreadId == GetCurrentThreadId());
01128 
01129     for (;;)
01130     {
01131         /*
01132          * sleep a quarter of a second before checking if we should terminate.
01133          */
01134         struct timeval tv = {0, 250000};
01135 
01136         *workerset = saveSet;
01137         i = select(maxFd + 1, workerset, NULL, NULL, &tv);
01138 
01139         if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
01140             continue;
01141         if (i)
01142             break;
01143     }
01144 
01145 #else                           /* UNIX */
01146 
01147     for (;;)
01148     {
01149         *workerset = saveSet;
01150         i = select(maxFd + 1, workerset, NULL, NULL, NULL);
01151 
01152         /*
01153          * If we Ctrl-C the master process , it's likely that we interrupt
01154          * select() here. The signal handler will set wantAbort == true and
01155          * the shutdown journey starts from here. Note that we'll come back
01156          * here later when we tell all workers to terminate and read their
01157          * responses. But then we have aborting set to true.
01158          */
01159         if (wantAbort && !aborting)
01160             exit_horribly(modulename, "terminated by user\n");
01161 
01162         if (i < 0 && errno == EINTR)
01163             continue;
01164         break;
01165     }
01166 
01167 #endif
01168 
01169     return i;
01170 }
01171 
01172 
01173 /*
01174  * This function is executed in the master process.
01175  *
01176  * It returns the next message from the worker on the communication channel,
01177  * optionally blocking (do_wait) until it becomes available.
01178  *
01179  * The id of the worker is returned in *worker.
01180  */
01181 static char *
01182 getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
01183 {
01184     int         i;
01185     fd_set      workerset;
01186     int         maxFd = -1;
01187     struct timeval nowait = {0, 0};
01188 
01189     FD_ZERO(&workerset);
01190 
01191     for (i = 0; i < pstate->numWorkers; i++)
01192     {
01193         if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED)
01194             continue;
01195         FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
01196         /* actually WIN32 ignores the first parameter to select()... */
01197         if (pstate->parallelSlot[i].pipeRead > maxFd)
01198             maxFd = pstate->parallelSlot[i].pipeRead;
01199     }
01200 
01201     if (do_wait)
01202     {
01203         i = select_loop(maxFd, &workerset);
01204         Assert(i != 0);
01205     }
01206     else
01207     {
01208         if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
01209             return NULL;
01210     }
01211 
01212     if (i < 0)
01213         exit_horribly(modulename, "Error in ListenToWorkers(): %s", strerror(errno));
01214 
01215     for (i = 0; i < pstate->numWorkers; i++)
01216     {
01217         char       *msg;
01218 
01219         if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
01220             continue;
01221 
01222         msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
01223         *worker = i;
01224         return msg;
01225     }
01226     Assert(false);
01227     return NULL;
01228 }
01229 
01230 /*
01231  * This function is executed in the master process.
01232  *
01233  * It sends a message to a certain worker on the communication channel.
01234  */
01235 static void
01236 sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
01237 {
01238     int         len = strlen(str) + 1;
01239 
01240     if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
01241     {
01242         /*
01243          * If we're already aborting anyway, don't care if we succeed or not.
01244          * The child might have gone already.
01245          */
01246 #ifndef WIN32
01247         if (!aborting)
01248 #endif
01249             exit_horribly(modulename,
01250                           "Error writing to the communication channel: %s\n",
01251                           strerror(errno));
01252     }
01253 }
01254 
01255 /*
01256  * The underlying function to read a message from the communication channel
01257  * (fd) with optional blocking (do_wait).
01258  */
01259 static char *
01260 readMessageFromPipe(int fd)
01261 {
01262     char       *msg;
01263     int         msgsize,
01264                 bufsize;
01265     int         ret;
01266 
01267     /*
01268      * The problem here is that we need to deal with several possibilites: we
01269      * could receive only a partial message or several messages at once. The
01270      * caller expects us to return exactly one message however.
01271      *
01272      * We could either read in as much as we can and keep track of what we
01273      * delivered back to the caller or we just read byte by byte. Once we see
01274      * (char) 0, we know that it's the message's end. This would be quite
01275      * inefficient for more data but since we are reading only on the command
01276      * channel, the performance loss does not seem worth the trouble of
01277      * keeping internal states for different file descriptors.
01278      */
01279     bufsize = 64;               /* could be any number */
01280     msg = (char *) pg_malloc(bufsize);
01281 
01282     msgsize = 0;
01283     for (;;)
01284     {
01285         Assert(msgsize <= bufsize);
01286         ret = piperead(fd, msg + msgsize, 1);
01287 
01288         /* worker has closed the connection or another error happened */
01289         if (ret <= 0)
01290             return NULL;
01291 
01292         Assert(ret == 1);
01293 
01294         if (msg[msgsize] == '\0')
01295             return msg;
01296 
01297         msgsize++;
01298         if (msgsize == bufsize)
01299         {
01300             /* could be any number */
01301             bufsize += 16;
01302             msg = (char *) realloc(msg, bufsize);
01303         }
01304     }
01305 }
01306 
01307 #ifdef WIN32
01308 /*
01309  * This is a replacement version of pipe for Win32 which allows returned
01310  * handles to be used in select(). Note that read/write calls must be replaced
01311  * with recv/send.
01312  */
01313 static int
01314 pgpipe(int handles[2])
01315 {
01316     SOCKET      s;
01317     struct sockaddr_in serv_addr;
01318     int         len = sizeof(serv_addr);
01319 
01320     handles[0] = handles[1] = INVALID_SOCKET;
01321 
01322     if ((s = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
01323     {
01324         write_msg(modulename, "pgpipe could not create socket: %ui",
01325                   WSAGetLastError());
01326         return -1;
01327     }
01328 
01329     memset((void *) &serv_addr, 0, sizeof(serv_addr));
01330     serv_addr.sin_family = AF_INET;
01331     serv_addr.sin_port = htons(0);
01332     serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
01333     if (bind(s, (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
01334     {
01335         write_msg(modulename, "pgpipe could not bind: %ui",
01336                   WSAGetLastError());
01337         closesocket(s);
01338         return -1;
01339     }
01340     if (listen(s, 1) == SOCKET_ERROR)
01341     {
01342         write_msg(modulename, "pgpipe could not listen: %ui",
01343                   WSAGetLastError());
01344         closesocket(s);
01345         return -1;
01346     }
01347     if (getsockname(s, (SOCKADDR *) & serv_addr, &len) == SOCKET_ERROR)
01348     {
01349         write_msg(modulename, "pgpipe could not getsockname: %ui",
01350                   WSAGetLastError());
01351         closesocket(s);
01352         return -1;
01353     }
01354     if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
01355     {
01356         write_msg(modulename, "pgpipe could not create socket 2: %ui",
01357                   WSAGetLastError());
01358         closesocket(s);
01359         return -1;
01360     }
01361 
01362     if (connect(handles[1], (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
01363     {
01364         write_msg(modulename, "pgpipe could not connect socket: %ui",
01365                   WSAGetLastError());
01366         closesocket(s);
01367         return -1;
01368     }
01369     if ((handles[0] = accept(s, (SOCKADDR *) & serv_addr, &len)) == INVALID_SOCKET)
01370     {
01371         write_msg(modulename, "pgpipe could not accept socket: %ui",
01372                   WSAGetLastError());
01373         closesocket(handles[1]);
01374         handles[1] = INVALID_SOCKET;
01375         closesocket(s);
01376         return -1;
01377     }
01378     closesocket(s);
01379     return 0;
01380 }
01381 
01382 static int
01383 piperead(int s, char *buf, int len)
01384 {
01385     int         ret = recv(s, buf, len, 0);
01386 
01387     if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
01388         /* EOF on the pipe! (win32 socket based implementation) */
01389         ret = 0;
01390     return ret;
01391 }
01392 
01393 #endif