#include "pg_backup_db.h"
Go to the source code of this file.
#define NO_SLOT (-1) |
Definition at line 60 of file parallel.h.
Referenced by DispatchJobForTocEntry(), EnsureIdleWorker(), EnsureWorkersFinished(), restore_toc_entries_parallel(), and WriteDataChunks().
typedef struct ParallelArgs ParallelArgs |
typedef struct ParallelSlot ParallelSlot |
typedef struct ParallelState ParallelState |
enum T_WorkerStatus |
Definition at line 27 of file parallel.h.
{ WRKR_TERMINATED = 0, WRKR_IDLE, WRKR_WORKING, WRKR_FINISHED } T_WorkerStatus;
void checkAborting | ( | struct _archiveHandle * | AH | ) |
Definition at line 336 of file parallel.c.
References exit_horribly(), modulename, and wantAbort.
Referenced by _WriteBuf(), _WriteData(), ReadDataFromArchiveNone(), and WriteDataToArchive().
{ #ifdef WIN32 if (WaitForSingleObject(termEvent, 0) == WAIT_OBJECT_0) #else if (wantAbort) #endif exit_horribly(modulename, "worker is terminating\n"); }
void DispatchJobForTocEntry | ( | struct _archiveHandle * | AH, | |
ParallelState * | pstate, | |||
struct _tocEntry * | te, | |||
T_Action | act | |||
) |
Definition at line 722 of file parallel.c.
References arg, ParallelSlot::args, Assert, GetIdleWorker(), _archiveHandle::MasterStartParallelItemPtr, NO_SLOT, ParallelState::parallelSlot, sendMessageToWorker(), ParallelArgs::te, and ParallelSlot::workerStatus.
Referenced by restore_toc_entries_parallel(), and WriteDataChunks().
{ int worker; char *arg; /* our caller makes sure that at least one worker is idle */ Assert(GetIdleWorker(pstate) != NO_SLOT); worker = GetIdleWorker(pstate); Assert(worker != NO_SLOT); arg = (AH->MasterStartParallelItemPtr) (AH, te, act); sendMessageToWorker(pstate, worker, arg); pstate->parallelSlot[worker].workerStatus = WRKR_WORKING; pstate->parallelSlot[worker].args->te = te; }
void EnsureIdleWorker | ( | struct _archiveHandle * | AH, | |
ParallelState * | pstate | |||
) |
Definition at line 1026 of file parallel.c.
References exit_horribly(), GetIdleWorker(), ListenToWorkers(), modulename, NO_SLOT, and ReapWorkerStatus().
Referenced by WriteDataChunks().
{ int ret_worker; int work_status; for (;;) { int nTerm = 0; while ((ret_worker = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT) { if (work_status != 0) exit_horribly(modulename, "Error processing a parallel work item.\n"); nTerm++; } /* * We need to make sure that we have an idle worker before dispatching * the next item. If nTerm > 0 we already have that (quick check). */ if (nTerm > 0) return; /* explicit check for an idle worker */ if (GetIdleWorker(pstate) != NO_SLOT) return; /* * If we have no idle worker, read the result of one or more workers * and loop the loop to call ReapWorkerStatus() on them */ ListenToWorkers(AH, pstate, true); } }
void EnsureWorkersFinished | ( | struct _archiveHandle * | AH, | |
ParallelState * | pstate | |||
) |
Definition at line 1068 of file parallel.c.
References exit_horribly(), IsEveryWorkerIdle(), ListenToWorkers(), modulename, NO_SLOT, ParallelState::numWorkers, and ReapWorkerStatus().
Referenced by WriteDataChunks().
{ int work_status; if (!pstate || pstate->numWorkers == 1) return; /* Waiting for the remaining worker processes to finish */ while (!IsEveryWorkerIdle(pstate)) { if (ReapWorkerStatus(pstate, &work_status) == NO_SLOT) ListenToWorkers(AH, pstate, true); else if (work_status != 0) exit_horribly(modulename, "Error processing a parallel work item\n"); } }
void exit_horribly | ( | const char * | modulename, | |
const char * | fmt, | |||
... | ||||
) |
int GetIdleWorker | ( | ParallelState * | pstate | ) |
Definition at line 745 of file parallel.c.
References i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::workerStatus, and WRKR_IDLE.
Referenced by DispatchJobForTocEntry(), EnsureIdleWorker(), restore_toc_entries_parallel(), and WriteDataChunks().
{ int i; for (i = 0; i < pstate->numWorkers; i++) if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE) return i; return NO_SLOT; }
void init_parallel_dump_utils | ( | void | ) |
Definition at line 130 of file parallel.c.
References _, exit_nicely, NULL, and on_exit_nicely().
Referenced by main().
{ #ifdef WIN32 if (!parallel_init_done) { WSADATA wsaData; int err; tls_index = TlsAlloc(); mainThreadId = GetCurrentThreadId(); err = WSAStartup(MAKEWORD(2, 2), &wsaData); if (err != 0) { fprintf(stderr, _("WSAStartup failed: %d\n"), err); exit_nicely(1); } on_exit_nicely(shutdown_parallel_dump_utils, NULL); parallel_init_done = true; } #endif }
bool IsEveryWorkerIdle | ( | ParallelState * | pstate | ) |
Definition at line 773 of file parallel.c.
References i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::workerStatus, and WRKR_IDLE.
Referenced by EnsureWorkersFinished(), ParallelBackupEnd(), and restore_toc_entries_parallel().
{ int i; for (i = 0; i < pstate->numWorkers; i++) if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE) return false; return true; }
void ListenToWorkers | ( | struct _archiveHandle * | AH, | |
ParallelState * | pstate, | |||
bool | do_wait | |||
) |
Definition at line 943 of file parallel.c.
References ACT_DUMP, ACT_RESTORE, archCustom, archDirectory, ParallelSlot::args, Assert, exit_horribly(), _archiveHandle::format, free, getMessageFromWorker(), _archiveHandle::MasterEndParallelItemPtr, messageStartsWith, modulename, ParallelState::parallelSlot, ParallelSlot::status, ParallelArgs::te, and ParallelSlot::workerStatus.
Referenced by EnsureIdleWorker(), EnsureWorkersFinished(), and restore_toc_entries_parallel().
{ int worker; char *msg; msg = getMessageFromWorker(pstate, do_wait, &worker); if (!msg) { if (do_wait) exit_horribly(modulename, "A worker process died unexpectedly\n"); return; } if (messageStartsWith(msg, "OK ")) { char *statusString; TocEntry *te; pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED; te = pstate->parallelSlot[worker].args->te; if (messageStartsWith(msg, "OK RESTORE ")) { statusString = msg + strlen("OK RESTORE "); pstate->parallelSlot[worker].status = (AH->MasterEndParallelItemPtr) (AH, te, statusString, ACT_RESTORE); } else if (messageStartsWith(msg, "OK DUMP ")) { statusString = msg + strlen("OK DUMP "); pstate->parallelSlot[worker].status = (AH->MasterEndParallelItemPtr) (AH, te, statusString, ACT_DUMP); } else exit_horribly(modulename, "Invalid message received from worker: %s\n", msg); } else if (messageStartsWith(msg, "ERROR ")) { Assert(AH->format == archDirectory || AH->format == archCustom); pstate->parallelSlot[worker].workerStatus = WRKR_TERMINATED; exit_horribly(modulename, "%s", msg + strlen("ERROR ")); } else exit_horribly(modulename, "Invalid message received from worker: %s\n", msg); /* both Unix and Win32 return pg_malloc()ed space, so we free it */ free(msg); }
void ParallelBackupEnd | ( | struct _archiveHandle * | AH, | |
ParallelState * | pstate | |||
) |
Definition at line 634 of file parallel.c.
References Assert, closesocket, free, i, IsEveryWorkerIdle(), ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pipeRead, ParallelSlot::pipeWrite, ShutdownInformation::pstate, and WaitForTerminatingWorkers().
Referenced by _CloseArchive(), and RestoreArchive().
{ int i; if (pstate->numWorkers == 1) return; Assert(IsEveryWorkerIdle(pstate)); /* close the sockets so that the workers know they can exit */ for (i = 0; i < pstate->numWorkers; i++) { closesocket(pstate->parallelSlot[i].pipeRead); closesocket(pstate->parallelSlot[i].pipeWrite); } WaitForTerminatingWorkers(pstate); /* * Remove the pstate again, so the exit handler in the parent will now * again fall back to closing AH->connection (if connected). */ shutdown_info.pstate = NULL; free(pstate->parallelSlot); free(pstate); }
ParallelState* ParallelBackupStart | ( | struct _archiveHandle * | AH, | |
RestoreOptions * | ropt | |||
) |
Definition at line 484 of file parallel.c.
References ParallelArgs::AH, ParallelSlot::args, Assert, CloneArchive(), closesocket, exit_horribly(), getLocalPQExpBuffer, i, modulename, NULL, ParallelState::numWorkers, Archive::numWorkers, ParallelState::parallelSlot, pg_malloc(), pgpipe, ParallelSlot::pid, PIPE_READ, PIPE_WRITE, ParallelSlot::pipeRead, ParallelSlot::pipeRevRead, ParallelSlot::pipeRevWrite, ParallelSlot::pipeWrite, ShutdownInformation::pstate, _archiveHandle::public, SetupWorker(), SIGQUIT, sigTermHandler(), strerror(), ParallelArgs::te, and ParallelSlot::workerStatus.
Referenced by _CloseArchive(), and RestoreArchive().
{ ParallelState *pstate; int i; const size_t slotSize = AH->public.numWorkers * sizeof(ParallelSlot); Assert(AH->public.numWorkers > 0); /* Ensure stdio state is quiesced before forking */ fflush(NULL); pstate = (ParallelState *) pg_malloc(sizeof(ParallelState)); pstate->numWorkers = AH->public.numWorkers; pstate->parallelSlot = NULL; if (AH->public.numWorkers == 1) return pstate; pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize); memset((void *) pstate->parallelSlot, 0, slotSize); /* * Set the pstate in the shutdown_info. The exit handler uses pstate if * set and falls back to AHX otherwise. */ shutdown_info.pstate = pstate; getLocalPQExpBuffer = getThreadLocalPQExpBuffer; #ifdef WIN32 tMasterThreadId = GetCurrentThreadId(); termEvent = CreateEvent(NULL, true, false, "Terminate"); #else signal(SIGTERM, sigTermHandler); signal(SIGINT, sigTermHandler); signal(SIGQUIT, sigTermHandler); #endif for (i = 0; i < pstate->numWorkers; i++) { #ifdef WIN32 WorkerInfo *wi; uintptr_t handle; #else pid_t pid; #endif int pipeMW[2], pipeWM[2]; if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0) exit_horribly(modulename, "Cannot create communication channels: %s\n", strerror(errno)); pstate->parallelSlot[i].workerStatus = WRKR_IDLE; pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs)); pstate->parallelSlot[i].args->AH = NULL; pstate->parallelSlot[i].args->te = NULL; #ifdef WIN32 /* Allocate a new structure for every worker */ wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo)); wi->ropt = ropt; wi->worker = i; wi->AH = AH; wi->pipeRead = pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ]; wi->pipeWrite = pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE]; handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32, wi, 0, &(pstate->parallelSlot[i].threadId)); pstate->parallelSlot[i].hThread = handle; #else pid = fork(); if (pid == 0) { /* we are the worker */ int j; int pipefd[2] = {pipeMW[PIPE_READ], pipeWM[PIPE_WRITE]}; /* * Store the fds for the reverse communication in pstate. Actually * we only use this in case of an error and don't use pstate * otherwise in the worker process. On Windows we write to the * global pstate, in Unix we write to our process-local copy but * that's also where we'd retrieve this information back from. */ pstate->parallelSlot[i].pipeRevRead = pipefd[PIPE_READ]; pstate->parallelSlot[i].pipeRevWrite = pipefd[PIPE_WRITE]; pstate->parallelSlot[i].pid = getpid(); /* * Call CloneArchive on Unix as well even though technically we * don't need to because fork() gives us a copy in our own address * space already. But CloneArchive resets the state information * and also clones the database connection (for parallel dump) * which both seem kinda helpful. */ pstate->parallelSlot[i].args->AH = CloneArchive(AH); /* close read end of Worker -> Master */ closesocket(pipeWM[PIPE_READ]); /* close write end of Master -> Worker */ closesocket(pipeMW[PIPE_WRITE]); /* * Close all inherited fds for communication of the master with * the other workers. */ for (j = 0; j < i; j++) { closesocket(pstate->parallelSlot[j].pipeRead); closesocket(pstate->parallelSlot[j].pipeWrite); } SetupWorker(pstate->parallelSlot[i].args->AH, pipefd, i, ropt); exit(0); } else if (pid < 0) /* fork failed */ exit_horribly(modulename, "could not create worker process: %s\n", strerror(errno)); /* we are the Master, pid > 0 here */ Assert(pid > 0); /* close read end of Master -> Worker */ closesocket(pipeMW[PIPE_READ]); /* close write end of Worker -> Master */ closesocket(pipeWM[PIPE_WRITE]); pstate->parallelSlot[i].pid = pid; #endif pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ]; pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE]; } return pstate; }
int ReapWorkerStatus | ( | ParallelState * | pstate, | |
int * | status | |||
) |
Definition at line 1003 of file parallel.c.
References i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::status, ParallelSlot::workerStatus, and WRKR_FINISHED.
Referenced by EnsureIdleWorker(), EnsureWorkersFinished(), and restore_toc_entries_parallel().
{ int i; for (i = 0; i < pstate->numWorkers; i++) { if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED) { *status = pstate->parallelSlot[i].status; pstate->parallelSlot[i].status = 0; pstate->parallelSlot[i].workerStatus = WRKR_IDLE; return i; } } return NO_SLOT; }
void noreturn |
Definition at line 93 of file parallel.h.