00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include "postgres_fe.h"
00020
00021 #include "pg_backup_utils.h"
00022 #include "parallel.h"
00023
00024 #ifndef WIN32
00025 #include <sys/types.h>
00026 #include <sys/wait.h>
00027 #include "signal.h"
00028 #include <unistd.h>
00029 #include <fcntl.h>
00030 #endif
00031
00032 #define PIPE_READ 0
00033 #define PIPE_WRITE 1
00034
00035
00036 #ifdef WIN32
00037 static unsigned int tMasterThreadId = 0;
00038 static HANDLE termEvent = INVALID_HANDLE_VALUE;
00039 static int pgpipe(int handles[2]);
00040 static int piperead(int s, char *buf, int len);
00041
00042
00043
00044
00045
00046 typedef struct
00047 {
00048 ArchiveHandle *AH;
00049 RestoreOptions *ropt;
00050 int worker;
00051 int pipeRead;
00052 int pipeWrite;
00053 } WorkerInfo;
00054
00055 #define pipewrite(a,b,c) send(a,b,c,0)
00056 #else
00057
00058
00059
00060
00061 static bool aborting = false;
00062 static volatile sig_atomic_t wantAbort = 0;
00063
00064 #define pgpipe(a) pipe(a)
00065 #define piperead(a,b,c) read(a,b,c)
00066 #define pipewrite(a,b,c) write(a,b,c)
00067 #endif
00068
00069 typedef struct ShutdownInformation
00070 {
00071 ParallelState *pstate;
00072 Archive *AHX;
00073 } ShutdownInformation;
00074
00075 static ShutdownInformation shutdown_info;
00076
00077 static const char *modulename = gettext_noop("parallel archiver");
00078
00079 static ParallelSlot *GetMyPSlot(ParallelState *pstate);
00080 static void
00081 parallel_msg_master(ParallelSlot *slot, const char *modulename,
00082 const char *fmt, va_list ap)
00083 __attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 0)));
00084 static void archive_close_connection(int code, void *arg);
00085 static void ShutdownWorkersHard(ParallelState *pstate);
00086 static void WaitForTerminatingWorkers(ParallelState *pstate);
00087
00088 #ifndef WIN32
00089 static void sigTermHandler(int signum);
00090 #endif
00091 static void SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker,
00092 RestoreOptions *ropt);
00093 static bool HasEveryWorkerTerminated(ParallelState *pstate);
00094
00095 static void lockTableNoWait(ArchiveHandle *AH, TocEntry *te);
00096 static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
00097 static char *getMessageFromMaster(int pipefd[2]);
00098 static void sendMessageToMaster(int pipefd[2], const char *str);
00099 static int select_loop(int maxFd, fd_set *workerset);
00100 static char *getMessageFromWorker(ParallelState *pstate,
00101 bool do_wait, int *worker);
00102 static void sendMessageToWorker(ParallelState *pstate,
00103 int worker, const char *str);
00104 static char *readMessageFromPipe(int fd);
00105
00106 #define messageStartsWith(msg, prefix) \
00107 (strncmp(msg, prefix, strlen(prefix)) == 0)
00108 #define messageEquals(msg, pattern) \
00109 (strcmp(msg, pattern) == 0)
00110
00111 #ifdef WIN32
00112 static void shutdown_parallel_dump_utils(int code, void *unused);
00113 bool parallel_init_done = false;
00114 static DWORD tls_index;
00115 DWORD mainThreadId;
00116 #endif
00117
00118
00119 #ifdef WIN32
00120 static void
00121 shutdown_parallel_dump_utils(int code, void *unused)
00122 {
00123
00124 if (mainThreadId == GetCurrentThreadId())
00125 WSACleanup();
00126 }
00127 #endif
00128
00129 void
00130 init_parallel_dump_utils(void)
00131 {
00132 #ifdef WIN32
00133 if (!parallel_init_done)
00134 {
00135 WSADATA wsaData;
00136 int err;
00137
00138 tls_index = TlsAlloc();
00139 mainThreadId = GetCurrentThreadId();
00140 err = WSAStartup(MAKEWORD(2, 2), &wsaData);
00141 if (err != 0)
00142 {
00143 fprintf(stderr, _("WSAStartup failed: %d\n"), err);
00144 exit_nicely(1);
00145 }
00146 on_exit_nicely(shutdown_parallel_dump_utils, NULL);
00147 parallel_init_done = true;
00148 }
00149 #endif
00150 }
00151
00152 static ParallelSlot *
00153 GetMyPSlot(ParallelState *pstate)
00154 {
00155 int i;
00156
00157 for (i = 0; i < pstate->numWorkers; i++)
00158 #ifdef WIN32
00159 if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
00160 #else
00161 if (pstate->parallelSlot[i].pid == getpid())
00162 #endif
00163 return &(pstate->parallelSlot[i]);
00164
00165 return NULL;
00166 }
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178 void
00179 exit_horribly(const char *modulename, const char *fmt,...)
00180 {
00181 va_list ap;
00182 ParallelState *pstate = shutdown_info.pstate;
00183 ParallelSlot *slot;
00184
00185 va_start(ap, fmt);
00186
00187 if (pstate == NULL)
00188 {
00189
00190 vwrite_msg(modulename, fmt, ap);
00191 }
00192 else
00193 {
00194 slot = GetMyPSlot(pstate);
00195
00196 if (!slot)
00197
00198 vwrite_msg(modulename, fmt, ap);
00199 else
00200
00201 parallel_msg_master(slot, modulename, fmt, ap);
00202 }
00203
00204 va_end(ap);
00205
00206 exit_nicely(1);
00207 }
00208
00209
00210 static void
00211 parallel_msg_master(ParallelSlot *slot, const char *modulename,
00212 const char *fmt, va_list ap)
00213 {
00214 char buf[512];
00215 int pipefd[2];
00216
00217 pipefd[PIPE_READ] = slot->pipeRevRead;
00218 pipefd[PIPE_WRITE] = slot->pipeRevWrite;
00219
00220 strcpy(buf, "ERROR ");
00221 vsnprintf(buf + strlen("ERROR "),
00222 sizeof(buf) - strlen("ERROR "), fmt, ap);
00223
00224 sendMessageToMaster(pipefd, buf);
00225 }
00226
00227
00228
00229
00230
00231
00232
00233 static PQExpBuffer
00234 getThreadLocalPQExpBuffer(void)
00235 {
00236
00237
00238
00239
00240 static PQExpBuffer s_id_return = NULL;
00241 PQExpBuffer id_return;
00242
00243 #ifdef WIN32
00244 if (parallel_init_done)
00245 id_return = (PQExpBuffer) TlsGetValue(tls_index);
00246 else
00247 id_return = s_id_return;
00248 #else
00249 id_return = s_id_return;
00250 #endif
00251
00252 if (id_return)
00253 {
00254
00255 resetPQExpBuffer(id_return);
00256 }
00257 else
00258 {
00259
00260 id_return = createPQExpBuffer();
00261 #ifdef WIN32
00262 if (parallel_init_done)
00263 TlsSetValue(tls_index, id_return);
00264 else
00265 s_id_return = id_return;
00266 #else
00267 s_id_return = id_return;
00268 #endif
00269
00270 }
00271
00272 return id_return;
00273 }
00274
00275
00276
00277
00278
00279
00280 void
00281 on_exit_close_archive(Archive *AHX)
00282 {
00283 shutdown_info.AHX = AHX;
00284 on_exit_nicely(archive_close_connection, &shutdown_info);
00285 }
00286
00287
00288
00289
00290
00291 static void
00292 archive_close_connection(int code, void *arg)
00293 {
00294 ShutdownInformation *si = (ShutdownInformation *) arg;
00295
00296 if (si->pstate)
00297 {
00298 ParallelSlot *slot = GetMyPSlot(si->pstate);
00299
00300 if (!slot)
00301 {
00302
00303
00304
00305
00306
00307
00308
00309 DisconnectDatabase(si->AHX);
00310 #ifndef WIN32
00311
00312
00313
00314
00315
00316
00317 aborting = true;
00318 #endif
00319 ShutdownWorkersHard(si->pstate);
00320 }
00321 else if (slot->args->AH)
00322 DisconnectDatabase(&(slot->args->AH->public));
00323 }
00324 else if (si->AHX)
00325 DisconnectDatabase(si->AHX);
00326 }
00327
00328
00329
00330
00331
00332
00333
00334
00335 void
00336 checkAborting(ArchiveHandle *AH)
00337 {
00338 #ifdef WIN32
00339 if (WaitForSingleObject(termEvent, 0) == WAIT_OBJECT_0)
00340 #else
00341 if (wantAbort)
00342 #endif
00343 exit_horribly(modulename, "worker is terminating\n");
00344 }
00345
00346
00347
00348
00349
00350
00351
00352 static void
00353 ShutdownWorkersHard(ParallelState *pstate)
00354 {
00355 #ifndef WIN32
00356 int i;
00357
00358 signal(SIGPIPE, SIG_IGN);
00359
00360
00361
00362
00363
00364 for (i = 0; i < pstate->numWorkers; i++)
00365 closesocket(pstate->parallelSlot[i].pipeWrite);
00366
00367 for (i = 0; i < pstate->numWorkers; i++)
00368 kill(pstate->parallelSlot[i].pid, SIGTERM);
00369 #else
00370
00371 SetEvent(termEvent);
00372 #endif
00373
00374 WaitForTerminatingWorkers(pstate);
00375 }
00376
00377
00378
00379
00380 static void
00381 WaitForTerminatingWorkers(ParallelState *pstate)
00382 {
00383 while (!HasEveryWorkerTerminated(pstate))
00384 {
00385 ParallelSlot *slot = NULL;
00386 int j;
00387
00388 #ifndef WIN32
00389 int status;
00390 pid_t pid = wait(&status);
00391
00392 for (j = 0; j < pstate->numWorkers; j++)
00393 if (pstate->parallelSlot[j].pid == pid)
00394 slot = &(pstate->parallelSlot[j]);
00395 #else
00396 uintptr_t hThread;
00397 DWORD ret;
00398 uintptr_t *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
00399 int nrun = 0;
00400
00401 for (j = 0; j < pstate->numWorkers; j++)
00402 if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
00403 {
00404 lpHandles[nrun] = pstate->parallelSlot[j].hThread;
00405 nrun++;
00406 }
00407 ret = WaitForMultipleObjects(nrun, (HANDLE *) lpHandles, false, INFINITE);
00408 Assert(ret != WAIT_FAILED);
00409 hThread = lpHandles[ret - WAIT_OBJECT_0];
00410
00411 for (j = 0; j < pstate->numWorkers; j++)
00412 if (pstate->parallelSlot[j].hThread == hThread)
00413 slot = &(pstate->parallelSlot[j]);
00414
00415 free(lpHandles);
00416 #endif
00417 Assert(slot);
00418
00419 slot->workerStatus = WRKR_TERMINATED;
00420 }
00421 Assert(HasEveryWorkerTerminated(pstate));
00422 }
00423
00424 #ifndef WIN32
00425
00426 static void
00427 sigTermHandler(int signum)
00428 {
00429 wantAbort = 1;
00430 }
00431 #endif
00432
00433
00434
00435
00436
00437 static void
00438 SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker,
00439 RestoreOptions *ropt)
00440 {
00441
00442
00443
00444
00445
00446
00447
00448 (AH->SetupWorkerPtr) ((Archive *) AH, ropt);
00449
00450 Assert(AH->connection != NULL);
00451
00452 WaitForCommands(AH, pipefd);
00453
00454 closesocket(pipefd[PIPE_READ]);
00455 closesocket(pipefd[PIPE_WRITE]);
00456 }
00457
00458 #ifdef WIN32
00459 static unsigned __stdcall
00460 init_spawned_worker_win32(WorkerInfo *wi)
00461 {
00462 ArchiveHandle *AH;
00463 int pipefd[2] = {wi->pipeRead, wi->pipeWrite};
00464 int worker = wi->worker;
00465 RestoreOptions *ropt = wi->ropt;
00466
00467 AH = CloneArchive(wi->AH);
00468
00469 free(wi);
00470 SetupWorker(AH, pipefd, worker, ropt);
00471
00472 DeCloneArchive(AH);
00473 _endthreadex(0);
00474 return 0;
00475 }
00476 #endif
00477
00478
00479
00480
00481
00482
00483 ParallelState *
00484 ParallelBackupStart(ArchiveHandle *AH, RestoreOptions *ropt)
00485 {
00486 ParallelState *pstate;
00487 int i;
00488 const size_t slotSize = AH->public.numWorkers * sizeof(ParallelSlot);
00489
00490 Assert(AH->public.numWorkers > 0);
00491
00492
00493 fflush(NULL);
00494
00495 pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
00496
00497 pstate->numWorkers = AH->public.numWorkers;
00498 pstate->parallelSlot = NULL;
00499
00500 if (AH->public.numWorkers == 1)
00501 return pstate;
00502
00503 pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize);
00504 memset((void *) pstate->parallelSlot, 0, slotSize);
00505
00506
00507
00508
00509
00510 shutdown_info.pstate = pstate;
00511 getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
00512
00513 #ifdef WIN32
00514 tMasterThreadId = GetCurrentThreadId();
00515 termEvent = CreateEvent(NULL, true, false, "Terminate");
00516 #else
00517 signal(SIGTERM, sigTermHandler);
00518 signal(SIGINT, sigTermHandler);
00519 signal(SIGQUIT, sigTermHandler);
00520 #endif
00521
00522 for (i = 0; i < pstate->numWorkers; i++)
00523 {
00524 #ifdef WIN32
00525 WorkerInfo *wi;
00526 uintptr_t handle;
00527 #else
00528 pid_t pid;
00529 #endif
00530 int pipeMW[2],
00531 pipeWM[2];
00532
00533 if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
00534 exit_horribly(modulename,
00535 "Cannot create communication channels: %s\n",
00536 strerror(errno));
00537
00538 pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
00539 pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
00540 pstate->parallelSlot[i].args->AH = NULL;
00541 pstate->parallelSlot[i].args->te = NULL;
00542 #ifdef WIN32
00543
00544 wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
00545
00546 wi->ropt = ropt;
00547 wi->worker = i;
00548 wi->AH = AH;
00549 wi->pipeRead = pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ];
00550 wi->pipeWrite = pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];
00551
00552 handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
00553 wi, 0, &(pstate->parallelSlot[i].threadId));
00554 pstate->parallelSlot[i].hThread = handle;
00555 #else
00556 pid = fork();
00557 if (pid == 0)
00558 {
00559
00560 int j;
00561 int pipefd[2] = {pipeMW[PIPE_READ], pipeWM[PIPE_WRITE]};
00562
00563
00564
00565
00566
00567
00568
00569
00570 pstate->parallelSlot[i].pipeRevRead = pipefd[PIPE_READ];
00571 pstate->parallelSlot[i].pipeRevWrite = pipefd[PIPE_WRITE];
00572 pstate->parallelSlot[i].pid = getpid();
00573
00574
00575
00576
00577
00578
00579
00580
00581 pstate->parallelSlot[i].args->AH = CloneArchive(AH);
00582
00583
00584 closesocket(pipeWM[PIPE_READ]);
00585
00586 closesocket(pipeMW[PIPE_WRITE]);
00587
00588
00589
00590
00591
00592 for (j = 0; j < i; j++)
00593 {
00594 closesocket(pstate->parallelSlot[j].pipeRead);
00595 closesocket(pstate->parallelSlot[j].pipeWrite);
00596 }
00597
00598 SetupWorker(pstate->parallelSlot[i].args->AH, pipefd, i, ropt);
00599
00600 exit(0);
00601 }
00602 else if (pid < 0)
00603
00604 exit_horribly(modulename,
00605 "could not create worker process: %s\n",
00606 strerror(errno));
00607
00608
00609 Assert(pid > 0);
00610
00611
00612 closesocket(pipeMW[PIPE_READ]);
00613
00614 closesocket(pipeWM[PIPE_WRITE]);
00615
00616 pstate->parallelSlot[i].pid = pid;
00617 #endif
00618
00619 pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
00620 pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE];
00621 }
00622
00623 return pstate;
00624 }
00625
00626
00627
00628
00629
00630
00631
00632
00633 void
00634 ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
00635 {
00636 int i;
00637
00638 if (pstate->numWorkers == 1)
00639 return;
00640
00641 Assert(IsEveryWorkerIdle(pstate));
00642
00643
00644 for (i = 0; i < pstate->numWorkers; i++)
00645 {
00646 closesocket(pstate->parallelSlot[i].pipeRead);
00647 closesocket(pstate->parallelSlot[i].pipeWrite);
00648 }
00649 WaitForTerminatingWorkers(pstate);
00650
00651
00652
00653
00654
00655 shutdown_info.pstate = NULL;
00656
00657 free(pstate->parallelSlot);
00658 free(pstate);
00659 }
00660
00661
00662
00663
00664
00665
00666
00667
00668
00669
00670
00671
00672
00673
00674
00675
00676
00677
00678
00679
00680
00681
00682
00683
00684
00685
00686
00687
00688
00689
00690
00691
00692
00693
00694
00695
00696
00697
00698
00699
00700
00701
00702
00703
00704
00705
00706
00707
00708
00709
00710
00711
00712
00713
00714
00715
00716
00717
00718
00719
00720
00721 void
00722 DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te,
00723 T_Action act)
00724 {
00725 int worker;
00726 char *arg;
00727
00728
00729 Assert(GetIdleWorker(pstate) != NO_SLOT);
00730 worker = GetIdleWorker(pstate);
00731 Assert(worker != NO_SLOT);
00732
00733 arg = (AH->MasterStartParallelItemPtr) (AH, te, act);
00734
00735 sendMessageToWorker(pstate, worker, arg);
00736
00737 pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
00738 pstate->parallelSlot[worker].args->te = te;
00739 }
00740
00741
00742
00743
00744 int
00745 GetIdleWorker(ParallelState *pstate)
00746 {
00747 int i;
00748
00749 for (i = 0; i < pstate->numWorkers; i++)
00750 if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
00751 return i;
00752 return NO_SLOT;
00753 }
00754
00755
00756
00757
00758 static bool
00759 HasEveryWorkerTerminated(ParallelState *pstate)
00760 {
00761 int i;
00762
00763 for (i = 0; i < pstate->numWorkers; i++)
00764 if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED)
00765 return false;
00766 return true;
00767 }
00768
00769
00770
00771
00772 bool
00773 IsEveryWorkerIdle(ParallelState *pstate)
00774 {
00775 int i;
00776
00777 for (i = 0; i < pstate->numWorkers; i++)
00778 if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
00779 return false;
00780 return true;
00781 }
00782
00783
00784
00785
00786
00787
00788
00789
00790
00791
00792
00793
00794
00795
00796
00797
00798
00799
00800 static void
00801 lockTableNoWait(ArchiveHandle *AH, TocEntry *te)
00802 {
00803 Archive *AHX = (Archive *) AH;
00804 const char *qualId;
00805 PQExpBuffer query = createPQExpBuffer();
00806 PGresult *res;
00807
00808 Assert(AH->format == archDirectory);
00809 Assert(strcmp(te->desc, "BLOBS") != 0);
00810
00811 appendPQExpBuffer(query,
00812 "SELECT pg_namespace.nspname,"
00813 " pg_class.relname "
00814 " FROM pg_class "
00815 " JOIN pg_namespace on pg_namespace.oid = relnamespace "
00816 " WHERE pg_class.oid = %d", te->catalogId.oid);
00817
00818 res = PQexec(AH->connection, query->data);
00819
00820 if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
00821 exit_horribly(modulename,
00822 "could not get relation name for oid %d: %s\n",
00823 te->catalogId.oid, PQerrorMessage(AH->connection));
00824
00825 resetPQExpBuffer(query);
00826
00827 qualId = fmtQualifiedId(AHX->remoteVersion,
00828 PQgetvalue(res, 0, 0),
00829 PQgetvalue(res, 0, 1));
00830
00831 appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
00832 qualId);
00833 PQclear(res);
00834
00835 res = PQexec(AH->connection, query->data);
00836
00837 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
00838 exit_horribly(modulename,
00839 "could not obtain lock on relation \"%s\". This "
00840 "usually means that someone requested an ACCESS EXCLUSIVE lock "
00841 "on the table after the pg_dump parent process has gotten the "
00842 "initial ACCESS SHARE lock on the table.\n", qualId);
00843
00844 PQclear(res);
00845 destroyPQExpBuffer(query);
00846 }
00847
00848
00849
00850
00851
00852
00853
00854
00855 static void
00856 WaitForCommands(ArchiveHandle *AH, int pipefd[2])
00857 {
00858 char *command;
00859 DumpId dumpId;
00860 int nBytes;
00861 char *str = NULL;
00862 TocEntry *te;
00863
00864 for (;;)
00865 {
00866 if (!(command = getMessageFromMaster(pipefd)))
00867 {
00868 PQfinish(AH->connection);
00869 AH->connection = NULL;
00870 return;
00871 }
00872
00873 if (messageStartsWith(command, "DUMP "))
00874 {
00875 Assert(AH->format == archDirectory);
00876 sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes);
00877 Assert(nBytes == strlen(command) - strlen("DUMP "));
00878
00879 te = getTocEntryByDumpId(AH, dumpId);
00880 Assert(te != NULL);
00881
00882
00883
00884
00885
00886
00887
00888
00889 if (strcmp(te->desc, "BLOBS") != 0)
00890 lockTableNoWait(AH, te);
00891
00892
00893
00894
00895
00896 str = (AH->WorkerJobDumpPtr) (AH, te);
00897 Assert(AH->connection != NULL);
00898 sendMessageToMaster(pipefd, str);
00899 free(str);
00900 }
00901 else if (messageStartsWith(command, "RESTORE "))
00902 {
00903 Assert(AH->format == archDirectory || AH->format == archCustom);
00904 Assert(AH->connection != NULL);
00905
00906 sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes);
00907 Assert(nBytes == strlen(command) - strlen("RESTORE "));
00908
00909 te = getTocEntryByDumpId(AH, dumpId);
00910 Assert(te != NULL);
00911
00912
00913
00914
00915
00916 str = (AH->WorkerJobRestorePtr) (AH, te);
00917 Assert(AH->connection != NULL);
00918 sendMessageToMaster(pipefd, str);
00919 free(str);
00920 }
00921 else
00922 exit_horribly(modulename,
00923 "Unknown command on communication channel: %s\n",
00924 command);
00925 }
00926 }
00927
00928
00929
00930
00931
00932
00933
00934
00935
00936
00937
00938
00939
00940
00941
00942 void
00943 ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
00944 {
00945 int worker;
00946 char *msg;
00947
00948 msg = getMessageFromWorker(pstate, do_wait, &worker);
00949
00950 if (!msg)
00951 {
00952 if (do_wait)
00953 exit_horribly(modulename, "A worker process died unexpectedly\n");
00954 return;
00955 }
00956
00957 if (messageStartsWith(msg, "OK "))
00958 {
00959 char *statusString;
00960 TocEntry *te;
00961
00962 pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED;
00963 te = pstate->parallelSlot[worker].args->te;
00964 if (messageStartsWith(msg, "OK RESTORE "))
00965 {
00966 statusString = msg + strlen("OK RESTORE ");
00967 pstate->parallelSlot[worker].status =
00968 (AH->MasterEndParallelItemPtr)
00969 (AH, te, statusString, ACT_RESTORE);
00970 }
00971 else if (messageStartsWith(msg, "OK DUMP "))
00972 {
00973 statusString = msg + strlen("OK DUMP ");
00974 pstate->parallelSlot[worker].status =
00975 (AH->MasterEndParallelItemPtr)
00976 (AH, te, statusString, ACT_DUMP);
00977 }
00978 else
00979 exit_horribly(modulename,
00980 "Invalid message received from worker: %s\n", msg);
00981 }
00982 else if (messageStartsWith(msg, "ERROR "))
00983 {
00984 Assert(AH->format == archDirectory || AH->format == archCustom);
00985 pstate->parallelSlot[worker].workerStatus = WRKR_TERMINATED;
00986 exit_horribly(modulename, "%s", msg + strlen("ERROR "));
00987 }
00988 else
00989 exit_horribly(modulename, "Invalid message received from worker: %s\n", msg);
00990
00991
00992 free(msg);
00993 }
00994
00995
00996
00997
00998
00999
01000
01001
01002 int
01003 ReapWorkerStatus(ParallelState *pstate, int *status)
01004 {
01005 int i;
01006
01007 for (i = 0; i < pstate->numWorkers; i++)
01008 {
01009 if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED)
01010 {
01011 *status = pstate->parallelSlot[i].status;
01012 pstate->parallelSlot[i].status = 0;
01013 pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
01014 return i;
01015 }
01016 }
01017 return NO_SLOT;
01018 }
01019
01020
01021
01022
01023
01024
01025 void
01026 EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate)
01027 {
01028 int ret_worker;
01029 int work_status;
01030
01031 for (;;)
01032 {
01033 int nTerm = 0;
01034
01035 while ((ret_worker = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
01036 {
01037 if (work_status != 0)
01038 exit_horribly(modulename, "Error processing a parallel work item.\n");
01039
01040 nTerm++;
01041 }
01042
01043
01044
01045
01046
01047 if (nTerm > 0)
01048 return;
01049
01050
01051 if (GetIdleWorker(pstate) != NO_SLOT)
01052 return;
01053
01054
01055
01056
01057
01058 ListenToWorkers(AH, pstate, true);
01059 }
01060 }
01061
01062
01063
01064
01065
01066
01067 void
01068 EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate)
01069 {
01070 int work_status;
01071
01072 if (!pstate || pstate->numWorkers == 1)
01073 return;
01074
01075
01076 while (!IsEveryWorkerIdle(pstate))
01077 {
01078 if (ReapWorkerStatus(pstate, &work_status) == NO_SLOT)
01079 ListenToWorkers(AH, pstate, true);
01080 else if (work_status != 0)
01081 exit_horribly(modulename,
01082 "Error processing a parallel work item\n");
01083 }
01084 }
01085
01086
01087
01088
01089
01090
01091
01092 static char *
01093 getMessageFromMaster(int pipefd[2])
01094 {
01095 return readMessageFromPipe(pipefd[PIPE_READ]);
01096 }
01097
01098
01099
01100
01101
01102
01103 static void
01104 sendMessageToMaster(int pipefd[2], const char *str)
01105 {
01106 int len = strlen(str) + 1;
01107
01108 if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
01109 exit_horribly(modulename,
01110 "Error writing to the communication channel: %s\n",
01111 strerror(errno));
01112 }
01113
01114
01115
01116
01117
01118
01119 static int
01120 select_loop(int maxFd, fd_set *workerset)
01121 {
01122 int i;
01123 fd_set saveSet = *workerset;
01124
01125 #ifdef WIN32
01126
01127 Assert(tMasterThreadId == GetCurrentThreadId());
01128
01129 for (;;)
01130 {
01131
01132
01133
01134 struct timeval tv = {0, 250000};
01135
01136 *workerset = saveSet;
01137 i = select(maxFd + 1, workerset, NULL, NULL, &tv);
01138
01139 if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
01140 continue;
01141 if (i)
01142 break;
01143 }
01144
01145 #else
01146
01147 for (;;)
01148 {
01149 *workerset = saveSet;
01150 i = select(maxFd + 1, workerset, NULL, NULL, NULL);
01151
01152
01153
01154
01155
01156
01157
01158
01159 if (wantAbort && !aborting)
01160 exit_horribly(modulename, "terminated by user\n");
01161
01162 if (i < 0 && errno == EINTR)
01163 continue;
01164 break;
01165 }
01166
01167 #endif
01168
01169 return i;
01170 }
01171
01172
01173
01174
01175
01176
01177
01178
01179
01180
01181 static char *
01182 getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
01183 {
01184 int i;
01185 fd_set workerset;
01186 int maxFd = -1;
01187 struct timeval nowait = {0, 0};
01188
01189 FD_ZERO(&workerset);
01190
01191 for (i = 0; i < pstate->numWorkers; i++)
01192 {
01193 if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED)
01194 continue;
01195 FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
01196
01197 if (pstate->parallelSlot[i].pipeRead > maxFd)
01198 maxFd = pstate->parallelSlot[i].pipeRead;
01199 }
01200
01201 if (do_wait)
01202 {
01203 i = select_loop(maxFd, &workerset);
01204 Assert(i != 0);
01205 }
01206 else
01207 {
01208 if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
01209 return NULL;
01210 }
01211
01212 if (i < 0)
01213 exit_horribly(modulename, "Error in ListenToWorkers(): %s", strerror(errno));
01214
01215 for (i = 0; i < pstate->numWorkers; i++)
01216 {
01217 char *msg;
01218
01219 if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
01220 continue;
01221
01222 msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
01223 *worker = i;
01224 return msg;
01225 }
01226 Assert(false);
01227 return NULL;
01228 }
01229
01230
01231
01232
01233
01234
01235 static void
01236 sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
01237 {
01238 int len = strlen(str) + 1;
01239
01240 if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
01241 {
01242
01243
01244
01245
01246 #ifndef WIN32
01247 if (!aborting)
01248 #endif
01249 exit_horribly(modulename,
01250 "Error writing to the communication channel: %s\n",
01251 strerror(errno));
01252 }
01253 }
01254
01255
01256
01257
01258
01259 static char *
01260 readMessageFromPipe(int fd)
01261 {
01262 char *msg;
01263 int msgsize,
01264 bufsize;
01265 int ret;
01266
01267
01268
01269
01270
01271
01272
01273
01274
01275
01276
01277
01278
01279 bufsize = 64;
01280 msg = (char *) pg_malloc(bufsize);
01281
01282 msgsize = 0;
01283 for (;;)
01284 {
01285 Assert(msgsize <= bufsize);
01286 ret = piperead(fd, msg + msgsize, 1);
01287
01288
01289 if (ret <= 0)
01290 return NULL;
01291
01292 Assert(ret == 1);
01293
01294 if (msg[msgsize] == '\0')
01295 return msg;
01296
01297 msgsize++;
01298 if (msgsize == bufsize)
01299 {
01300
01301 bufsize += 16;
01302 msg = (char *) realloc(msg, bufsize);
01303 }
01304 }
01305 }
01306
01307 #ifdef WIN32
01308
01309
01310
01311
01312
01313 static int
01314 pgpipe(int handles[2])
01315 {
01316 SOCKET s;
01317 struct sockaddr_in serv_addr;
01318 int len = sizeof(serv_addr);
01319
01320 handles[0] = handles[1] = INVALID_SOCKET;
01321
01322 if ((s = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
01323 {
01324 write_msg(modulename, "pgpipe could not create socket: %ui",
01325 WSAGetLastError());
01326 return -1;
01327 }
01328
01329 memset((void *) &serv_addr, 0, sizeof(serv_addr));
01330 serv_addr.sin_family = AF_INET;
01331 serv_addr.sin_port = htons(0);
01332 serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
01333 if (bind(s, (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
01334 {
01335 write_msg(modulename, "pgpipe could not bind: %ui",
01336 WSAGetLastError());
01337 closesocket(s);
01338 return -1;
01339 }
01340 if (listen(s, 1) == SOCKET_ERROR)
01341 {
01342 write_msg(modulename, "pgpipe could not listen: %ui",
01343 WSAGetLastError());
01344 closesocket(s);
01345 return -1;
01346 }
01347 if (getsockname(s, (SOCKADDR *) & serv_addr, &len) == SOCKET_ERROR)
01348 {
01349 write_msg(modulename, "pgpipe could not getsockname: %ui",
01350 WSAGetLastError());
01351 closesocket(s);
01352 return -1;
01353 }
01354 if ((handles[1] = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
01355 {
01356 write_msg(modulename, "pgpipe could not create socket 2: %ui",
01357 WSAGetLastError());
01358 closesocket(s);
01359 return -1;
01360 }
01361
01362 if (connect(handles[1], (SOCKADDR *) & serv_addr, len) == SOCKET_ERROR)
01363 {
01364 write_msg(modulename, "pgpipe could not connect socket: %ui",
01365 WSAGetLastError());
01366 closesocket(s);
01367 return -1;
01368 }
01369 if ((handles[0] = accept(s, (SOCKADDR *) & serv_addr, &len)) == INVALID_SOCKET)
01370 {
01371 write_msg(modulename, "pgpipe could not accept socket: %ui",
01372 WSAGetLastError());
01373 closesocket(handles[1]);
01374 handles[1] = INVALID_SOCKET;
01375 closesocket(s);
01376 return -1;
01377 }
01378 closesocket(s);
01379 return 0;
01380 }
01381
01382 static int
01383 piperead(int s, char *buf, int len)
01384 {
01385 int ret = recv(s, buf, len, 0);
01386
01387 if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
01388
01389 ret = 0;
01390 return ret;
01391 }
01392
01393 #endif