#include "postgres_fe.h"
#include "pg_backup_utils.h"
#include "parallel.h"
#include <sys/types.h>
#include <sys/wait.h>
#include "signal.h"
#include <unistd.h>
#include <fcntl.h>
Go to the source code of this file.
Data Structures | |
struct | ShutdownInformation |
Defines | |
#define | PIPE_READ 0 |
#define | PIPE_WRITE 1 |
#define | pgpipe(a) pipe(a) |
#define | piperead(a, b, c) read(a,b,c) |
#define | pipewrite(a, b, c) write(a,b,c) |
#define | messageStartsWith(msg, prefix) (strncmp(msg, prefix, strlen(prefix)) == 0) |
#define | messageEquals(msg, pattern) (strcmp(msg, pattern) == 0) |
Typedefs | |
typedef struct ShutdownInformation | ShutdownInformation |
Functions | |
static ParallelSlot * | GetMyPSlot (ParallelState *pstate) |
static void | parallel_msg_master (ParallelSlot *slot, const char *modulename, const char *fmt, va_list ap) __attribute__((format(PG_PRINTF_ATTRIBUTE |
static void static void | archive_close_connection (int code, void *arg) |
static void | ShutdownWorkersHard (ParallelState *pstate) |
static void | WaitForTerminatingWorkers (ParallelState *pstate) |
static void | sigTermHandler (int signum) |
static void | SetupWorker (ArchiveHandle *AH, int pipefd[2], int worker, RestoreOptions *ropt) |
static bool | HasEveryWorkerTerminated (ParallelState *pstate) |
static void | lockTableNoWait (ArchiveHandle *AH, TocEntry *te) |
static void | WaitForCommands (ArchiveHandle *AH, int pipefd[2]) |
static char * | getMessageFromMaster (int pipefd[2]) |
static void | sendMessageToMaster (int pipefd[2], const char *str) |
static int | select_loop (int maxFd, fd_set *workerset) |
static char * | getMessageFromWorker (ParallelState *pstate, bool do_wait, int *worker) |
static void | sendMessageToWorker (ParallelState *pstate, int worker, const char *str) |
static char * | readMessageFromPipe (int fd) |
void | init_parallel_dump_utils (void) |
void | exit_horribly (const char *modulename, const char *fmt,...) |
static PQExpBuffer | getThreadLocalPQExpBuffer (void) |
void | on_exit_close_archive (Archive *AHX) |
void | checkAborting (ArchiveHandle *AH) |
ParallelState * | ParallelBackupStart (ArchiveHandle *AH, RestoreOptions *ropt) |
void | ParallelBackupEnd (ArchiveHandle *AH, ParallelState *pstate) |
void | DispatchJobForTocEntry (ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act) |
int | GetIdleWorker (ParallelState *pstate) |
bool | IsEveryWorkerIdle (ParallelState *pstate) |
void | ListenToWorkers (ArchiveHandle *AH, ParallelState *pstate, bool do_wait) |
int | ReapWorkerStatus (ParallelState *pstate, int *status) |
void | EnsureIdleWorker (ArchiveHandle *AH, ParallelState *pstate) |
void | EnsureWorkersFinished (ArchiveHandle *AH, ParallelState *pstate) |
Variables | |
static bool | aborting = false |
static volatile sig_atomic_t | wantAbort = 0 |
static ShutdownInformation | shutdown_info |
static const char * | modulename = gettext_noop("parallel archiver") |
#define messageEquals | ( | msg, | ||
pattern | ||||
) | (strcmp(msg, pattern) == 0) |
Definition at line 108 of file parallel.c.
#define messageStartsWith | ( | msg, | ||
prefix | ||||
) | (strncmp(msg, prefix, strlen(prefix)) == 0) |
Definition at line 106 of file parallel.c.
Referenced by ListenToWorkers(), and WaitForCommands().
#define pgpipe | ( | a | ) | pipe(a) |
Definition at line 64 of file parallel.c.
Referenced by ParallelBackupStart().
#define PIPE_READ 0 |
Definition at line 32 of file parallel.c.
Referenced by getMessageFromMaster(), ParallelBackupStart(), and SetupWorker().
#define PIPE_WRITE 1 |
Definition at line 33 of file parallel.c.
Referenced by ParallelBackupStart(), sendMessageToMaster(), and SetupWorker().
Definition at line 65 of file parallel.c.
Referenced by readMessageFromPipe().
Definition at line 66 of file parallel.c.
Referenced by sendMessageToMaster(), and sendMessageToWorker().
typedef struct ShutdownInformation ShutdownInformation |
static void archive_close_connection | ( | int | code, | |
void * | arg | |||
) | [static] |
Definition at line 292 of file parallel.c.
References aborting, ParallelArgs::AH, ShutdownInformation::AHX, ParallelSlot::args, DisconnectDatabase(), GetMyPSlot(), ShutdownInformation::pstate, _archiveHandle::public, and ShutdownWorkersHard().
Referenced by on_exit_close_archive().
{ ShutdownInformation *si = (ShutdownInformation *) arg; if (si->pstate) { ParallelSlot *slot = GetMyPSlot(si->pstate); if (!slot) { /* * We're the master: We have already printed out the message * passed to exit_horribly() either from the master itself or from * a worker process. Now we need to close our own database * connection (only open during parallel dump but not restore) and * shut down the remaining workers. */ DisconnectDatabase(si->AHX); #ifndef WIN32 /* * Setting aborting to true switches to best-effort-mode * (send/receive but ignore errors) in communicating with our * workers. */ aborting = true; #endif ShutdownWorkersHard(si->pstate); } else if (slot->args->AH) DisconnectDatabase(&(slot->args->AH->public)); } else if (si->AHX) DisconnectDatabase(si->AHX); }
void checkAborting | ( | ArchiveHandle * | AH | ) |
Definition at line 336 of file parallel.c.
References exit_horribly(), modulename, and wantAbort.
Referenced by _WriteBuf(), _WriteData(), ReadDataFromArchiveNone(), and WriteDataToArchive().
{ #ifdef WIN32 if (WaitForSingleObject(termEvent, 0) == WAIT_OBJECT_0) #else if (wantAbort) #endif exit_horribly(modulename, "worker is terminating\n"); }
void DispatchJobForTocEntry | ( | ArchiveHandle * | AH, | |
ParallelState * | pstate, | |||
TocEntry * | te, | |||
T_Action | act | |||
) |
Definition at line 722 of file parallel.c.
References arg, ParallelSlot::args, Assert, GetIdleWorker(), _archiveHandle::MasterStartParallelItemPtr, NO_SLOT, ParallelState::parallelSlot, sendMessageToWorker(), ParallelArgs::te, and ParallelSlot::workerStatus.
Referenced by restore_toc_entries_parallel(), and WriteDataChunks().
{ int worker; char *arg; /* our caller makes sure that at least one worker is idle */ Assert(GetIdleWorker(pstate) != NO_SLOT); worker = GetIdleWorker(pstate); Assert(worker != NO_SLOT); arg = (AH->MasterStartParallelItemPtr) (AH, te, act); sendMessageToWorker(pstate, worker, arg); pstate->parallelSlot[worker].workerStatus = WRKR_WORKING; pstate->parallelSlot[worker].args->te = te; }
void EnsureIdleWorker | ( | ArchiveHandle * | AH, | |
ParallelState * | pstate | |||
) |
Definition at line 1026 of file parallel.c.
References exit_horribly(), GetIdleWorker(), ListenToWorkers(), modulename, NO_SLOT, and ReapWorkerStatus().
Referenced by WriteDataChunks().
{ int ret_worker; int work_status; for (;;) { int nTerm = 0; while ((ret_worker = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT) { if (work_status != 0) exit_horribly(modulename, "Error processing a parallel work item.\n"); nTerm++; } /* * We need to make sure that we have an idle worker before dispatching * the next item. If nTerm > 0 we already have that (quick check). */ if (nTerm > 0) return; /* explicit check for an idle worker */ if (GetIdleWorker(pstate) != NO_SLOT) return; /* * If we have no idle worker, read the result of one or more workers * and loop the loop to call ReapWorkerStatus() on them */ ListenToWorkers(AH, pstate, true); } }
void EnsureWorkersFinished | ( | ArchiveHandle * | AH, | |
ParallelState * | pstate | |||
) |
Definition at line 1068 of file parallel.c.
References exit_horribly(), IsEveryWorkerIdle(), ListenToWorkers(), modulename, NO_SLOT, ParallelState::numWorkers, and ReapWorkerStatus().
Referenced by WriteDataChunks().
{ int work_status; if (!pstate || pstate->numWorkers == 1) return; /* Waiting for the remaining worker processes to finish */ while (!IsEveryWorkerIdle(pstate)) { if (ReapWorkerStatus(pstate, &work_status) == NO_SLOT) ListenToWorkers(AH, pstate, true); else if (work_status != 0) exit_horribly(modulename, "Error processing a parallel work item\n"); } }
void exit_horribly | ( | const char * | modulename, | |
const char * | fmt, | |||
... | ||||
) |
Definition at line 179 of file parallel.c.
References exit_nicely, GetMyPSlot(), NULL, parallel_msg_master(), ShutdownInformation::pstate, and vwrite_msg().
Referenced by _allocAH(), _check_database_version(), _Clone(), _CloseArchive(), _connectDB(), _CustomReadFunc(), _discoverArchiveFormat(), _doSetSessionAuth(), _EndBlob(), _LoadBlobs(), _PrintFileData(), _PrintTocData(), _ReadByte(), _ReopenArchive(), _skipData(), _StartBlob(), _StartBlobs(), _StartData(), _tarAddFile(), _tarGetHeader(), _tarPositionTo(), _tarReadRaw(), _tarWriteHeader(), _WorkerJobDumpDirectory(), _WriteBuf(), _WriteByte(), ahwrite(), AllocateCompressor(), binary_upgrade_extension_member(), buildTocEntryArrays(), cfopen(), cfopen_write(), checkAborting(), CloseArchive(), ConnectDatabase(), createViewAsClause(), die_on_query_failure(), dump_lo_buf(), dumpACL(), dumpBlobs(), dumpConstraint(), dumpDefaultACL(), dumpFunc(), EndDBCopyMode(), EnsureIdleWorker(), EnsureWorkersFinished(), ExecuteSqlCommandBuf(), ExecuteSqlQueryForSingleRow(), expand_schema_name_patterns(), findDependencyLoops(), findNamespace(), getAttrName(), getMessageFromWorker(), getRules(), getTableAttrs(), getTriggers(), InitArchiveFmt_Custom(), InitArchiveFmt_Directory(), InitArchiveFmt_Null(), InitArchiveFmt_Tar(), ListenToWorkers(), lockTableNoWait(), main(), mark_work_done(), on_exit_nicely(), ParallelBackupStart(), parseArchiveFormat(), ParseCompressionOption(), processEncodingEntry(), processStdStringsEntry(), ReadDataFromArchive(), ReadHead(), ReadOffset(), ReadStr(), ReadToc(), RestoreArchive(), RestoreOutput(), select_loop(), sendMessageToMaster(), sendMessageToWorker(), SetArchiveRestoreOptions(), setFilePath(), SetOutput(), setup_connection(), SortTocFromFile(), StartBlob(), StartRestoreBlob(), tarClose(), tarOpen(), tarWrite(), TopoSort(), WaitForCommands(), WriteData(), WriteDataToArchive(), and WriteDataToArchiveNone().
{ va_list ap; ParallelState *pstate = shutdown_info.pstate; ParallelSlot *slot; va_start(ap, fmt); if (pstate == NULL) { /* Not in parallel mode, just write to stderr */ vwrite_msg(modulename, fmt, ap); } else { slot = GetMyPSlot(pstate); if (!slot) /* We're the parent, just write the message out */ vwrite_msg(modulename, fmt, ap); else /* If we're a worker process, send the msg to the master process */ parallel_msg_master(slot, modulename, fmt, ap); } va_end(ap); exit_nicely(1); }
int GetIdleWorker | ( | ParallelState * | pstate | ) |
Definition at line 745 of file parallel.c.
References i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::workerStatus, and WRKR_IDLE.
Referenced by DispatchJobForTocEntry(), EnsureIdleWorker(), restore_toc_entries_parallel(), and WriteDataChunks().
{ int i; for (i = 0; i < pstate->numWorkers; i++) if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE) return i; return NO_SLOT; }
static char * getMessageFromMaster | ( | int | pipefd[2] | ) | [static] |
Definition at line 1093 of file parallel.c.
References PIPE_READ, and readMessageFromPipe().
Referenced by WaitForCommands().
{ return readMessageFromPipe(pipefd[PIPE_READ]); }
static char * getMessageFromWorker | ( | ParallelState * | pstate, | |
bool | do_wait, | |||
int * | worker | |||
) | [static] |
Definition at line 1182 of file parallel.c.
References Assert, exit_horribly(), i, modulename, NULL, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pipeRead, readMessageFromPipe(), select, select_loop(), strerror(), ParallelSlot::workerStatus, and WRKR_TERMINATED.
Referenced by ListenToWorkers().
{ int i; fd_set workerset; int maxFd = -1; struct timeval nowait = {0, 0}; FD_ZERO(&workerset); for (i = 0; i < pstate->numWorkers; i++) { if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED) continue; FD_SET(pstate->parallelSlot[i].pipeRead, &workerset); /* actually WIN32 ignores the first parameter to select()... */ if (pstate->parallelSlot[i].pipeRead > maxFd) maxFd = pstate->parallelSlot[i].pipeRead; } if (do_wait) { i = select_loop(maxFd, &workerset); Assert(i != 0); } else { if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0) return NULL; } if (i < 0) exit_horribly(modulename, "Error in ListenToWorkers(): %s", strerror(errno)); for (i = 0; i < pstate->numWorkers; i++) { char *msg; if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset)) continue; msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead); *worker = i; return msg; } Assert(false); return NULL; }
static ParallelSlot * GetMyPSlot | ( | ParallelState * | pstate | ) | [static] |
Definition at line 153 of file parallel.c.
References i, ParallelState::numWorkers, ParallelState::parallelSlot, and ParallelSlot::pid.
Referenced by archive_close_connection(), and exit_horribly().
{ int i; for (i = 0; i < pstate->numWorkers; i++) #ifdef WIN32 if (pstate->parallelSlot[i].threadId == GetCurrentThreadId()) #else if (pstate->parallelSlot[i].pid == getpid()) #endif return &(pstate->parallelSlot[i]); return NULL; }
static PQExpBuffer getThreadLocalPQExpBuffer | ( | void | ) | [static] |
Definition at line 234 of file parallel.c.
References createPQExpBuffer(), and resetPQExpBuffer().
{ /* * The Tls code goes awry if we use a static var, so we provide for both * static and auto, and omit any use of the static var when using Tls. */ static PQExpBuffer s_id_return = NULL; PQExpBuffer id_return; #ifdef WIN32 if (parallel_init_done) id_return = (PQExpBuffer) TlsGetValue(tls_index); /* 0 when not set */ else id_return = s_id_return; #else id_return = s_id_return; #endif if (id_return) /* first time through? */ { /* same buffer, just wipe contents */ resetPQExpBuffer(id_return); } else { /* new buffer */ id_return = createPQExpBuffer(); #ifdef WIN32 if (parallel_init_done) TlsSetValue(tls_index, id_return); else s_id_return = id_return; #else s_id_return = id_return; #endif } return id_return; }
static bool HasEveryWorkerTerminated | ( | ParallelState * | pstate | ) | [static] |
Definition at line 759 of file parallel.c.
References i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::workerStatus, and WRKR_TERMINATED.
Referenced by WaitForTerminatingWorkers().
{ int i; for (i = 0; i < pstate->numWorkers; i++) if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED) return false; return true; }
void init_parallel_dump_utils | ( | void | ) |
Definition at line 130 of file parallel.c.
References _, exit_nicely, NULL, and on_exit_nicely().
Referenced by main().
{ #ifdef WIN32 if (!parallel_init_done) { WSADATA wsaData; int err; tls_index = TlsAlloc(); mainThreadId = GetCurrentThreadId(); err = WSAStartup(MAKEWORD(2, 2), &wsaData); if (err != 0) { fprintf(stderr, _("WSAStartup failed: %d\n"), err); exit_nicely(1); } on_exit_nicely(shutdown_parallel_dump_utils, NULL); parallel_init_done = true; } #endif }
bool IsEveryWorkerIdle | ( | ParallelState * | pstate | ) |
Definition at line 773 of file parallel.c.
References i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::workerStatus, and WRKR_IDLE.
Referenced by EnsureWorkersFinished(), ParallelBackupEnd(), and restore_toc_entries_parallel().
{ int i; for (i = 0; i < pstate->numWorkers; i++) if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE) return false; return true; }
void ListenToWorkers | ( | ArchiveHandle * | AH, | |
ParallelState * | pstate, | |||
bool | do_wait | |||
) |
Definition at line 943 of file parallel.c.
References ACT_DUMP, ACT_RESTORE, archCustom, archDirectory, ParallelSlot::args, Assert, exit_horribly(), _archiveHandle::format, free, getMessageFromWorker(), _archiveHandle::MasterEndParallelItemPtr, messageStartsWith, modulename, ParallelState::parallelSlot, ParallelSlot::status, ParallelArgs::te, and ParallelSlot::workerStatus.
Referenced by EnsureIdleWorker(), EnsureWorkersFinished(), and restore_toc_entries_parallel().
{ int worker; char *msg; msg = getMessageFromWorker(pstate, do_wait, &worker); if (!msg) { if (do_wait) exit_horribly(modulename, "A worker process died unexpectedly\n"); return; } if (messageStartsWith(msg, "OK ")) { char *statusString; TocEntry *te; pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED; te = pstate->parallelSlot[worker].args->te; if (messageStartsWith(msg, "OK RESTORE ")) { statusString = msg + strlen("OK RESTORE "); pstate->parallelSlot[worker].status = (AH->MasterEndParallelItemPtr) (AH, te, statusString, ACT_RESTORE); } else if (messageStartsWith(msg, "OK DUMP ")) { statusString = msg + strlen("OK DUMP "); pstate->parallelSlot[worker].status = (AH->MasterEndParallelItemPtr) (AH, te, statusString, ACT_DUMP); } else exit_horribly(modulename, "Invalid message received from worker: %s\n", msg); } else if (messageStartsWith(msg, "ERROR ")) { Assert(AH->format == archDirectory || AH->format == archCustom); pstate->parallelSlot[worker].workerStatus = WRKR_TERMINATED; exit_horribly(modulename, "%s", msg + strlen("ERROR ")); } else exit_horribly(modulename, "Invalid message received from worker: %s\n", msg); /* both Unix and Win32 return pg_malloc()ed space, so we free it */ free(msg); }
static void lockTableNoWait | ( | ArchiveHandle * | AH, | |
TocEntry * | te | |||
) | [static] |
Definition at line 801 of file parallel.c.
References appendPQExpBuffer(), archDirectory, Assert, _tocEntry::catalogId, _archiveHandle::connection, createPQExpBuffer(), PQExpBufferData::data, _tocEntry::desc, destroyPQExpBuffer(), exit_horribly(), fmtQualifiedId(), _archiveHandle::format, modulename, CatalogId::oid, PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQexec(), PQgetvalue(), PQresultStatus(), Archive::remoteVersion, and resetPQExpBuffer().
Referenced by WaitForCommands().
{ Archive *AHX = (Archive *) AH; const char *qualId; PQExpBuffer query = createPQExpBuffer(); PGresult *res; Assert(AH->format == archDirectory); Assert(strcmp(te->desc, "BLOBS") != 0); appendPQExpBuffer(query, "SELECT pg_namespace.nspname," " pg_class.relname " " FROM pg_class " " JOIN pg_namespace on pg_namespace.oid = relnamespace " " WHERE pg_class.oid = %d", te->catalogId.oid); res = PQexec(AH->connection, query->data); if (!res || PQresultStatus(res) != PGRES_TUPLES_OK) exit_horribly(modulename, "could not get relation name for oid %d: %s\n", te->catalogId.oid, PQerrorMessage(AH->connection)); resetPQExpBuffer(query); qualId = fmtQualifiedId(AHX->remoteVersion, PQgetvalue(res, 0, 0), PQgetvalue(res, 0, 1)); appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT", qualId); PQclear(res); res = PQexec(AH->connection, query->data); if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) exit_horribly(modulename, "could not obtain lock on relation \"%s\". This " "usually means that someone requested an ACCESS EXCLUSIVE lock " "on the table after the pg_dump parent process has gotten the " "initial ACCESS SHARE lock on the table.\n", qualId); PQclear(res); destroyPQExpBuffer(query); }
void on_exit_close_archive | ( | Archive * | AHX | ) |
Definition at line 281 of file parallel.c.
References ShutdownInformation::AHX, archive_close_connection(), and on_exit_nicely().
Referenced by main().
{ shutdown_info.AHX = AHX; on_exit_nicely(archive_close_connection, &shutdown_info); }
static void parallel_msg_master | ( | ParallelSlot * | slot, | |
const char * | modulename, | |||
const char * | fmt, | |||
va_list | ap | |||
) | [static] |
Definition at line 211 of file parallel.c.
References buf, ParallelSlot::pipeRevRead, ParallelSlot::pipeRevWrite, sendMessageToMaster(), and vsnprintf().
Referenced by exit_horribly().
{ char buf[512]; int pipefd[2]; pipefd[PIPE_READ] = slot->pipeRevRead; pipefd[PIPE_WRITE] = slot->pipeRevWrite; strcpy(buf, "ERROR "); vsnprintf(buf + strlen("ERROR "), sizeof(buf) - strlen("ERROR "), fmt, ap); sendMessageToMaster(pipefd, buf); }
void ParallelBackupEnd | ( | ArchiveHandle * | AH, | |
ParallelState * | pstate | |||
) |
Definition at line 634 of file parallel.c.
References Assert, closesocket, free, i, IsEveryWorkerIdle(), ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pipeRead, ParallelSlot::pipeWrite, ShutdownInformation::pstate, and WaitForTerminatingWorkers().
Referenced by _CloseArchive(), and RestoreArchive().
{ int i; if (pstate->numWorkers == 1) return; Assert(IsEveryWorkerIdle(pstate)); /* close the sockets so that the workers know they can exit */ for (i = 0; i < pstate->numWorkers; i++) { closesocket(pstate->parallelSlot[i].pipeRead); closesocket(pstate->parallelSlot[i].pipeWrite); } WaitForTerminatingWorkers(pstate); /* * Remove the pstate again, so the exit handler in the parent will now * again fall back to closing AH->connection (if connected). */ shutdown_info.pstate = NULL; free(pstate->parallelSlot); free(pstate); }
ParallelState* ParallelBackupStart | ( | ArchiveHandle * | AH, | |
RestoreOptions * | ropt | |||
) |
Definition at line 484 of file parallel.c.
References ParallelArgs::AH, ParallelSlot::args, Assert, CloneArchive(), closesocket, exit_horribly(), getLocalPQExpBuffer, i, modulename, NULL, ParallelState::numWorkers, Archive::numWorkers, ParallelState::parallelSlot, pg_malloc(), pgpipe, ParallelSlot::pid, PIPE_READ, PIPE_WRITE, ParallelSlot::pipeRead, ParallelSlot::pipeRevRead, ParallelSlot::pipeRevWrite, ParallelSlot::pipeWrite, ShutdownInformation::pstate, _archiveHandle::public, SetupWorker(), SIGQUIT, sigTermHandler(), strerror(), ParallelArgs::te, and ParallelSlot::workerStatus.
Referenced by _CloseArchive(), and RestoreArchive().
{ ParallelState *pstate; int i; const size_t slotSize = AH->public.numWorkers * sizeof(ParallelSlot); Assert(AH->public.numWorkers > 0); /* Ensure stdio state is quiesced before forking */ fflush(NULL); pstate = (ParallelState *) pg_malloc(sizeof(ParallelState)); pstate->numWorkers = AH->public.numWorkers; pstate->parallelSlot = NULL; if (AH->public.numWorkers == 1) return pstate; pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize); memset((void *) pstate->parallelSlot, 0, slotSize); /* * Set the pstate in the shutdown_info. The exit handler uses pstate if * set and falls back to AHX otherwise. */ shutdown_info.pstate = pstate; getLocalPQExpBuffer = getThreadLocalPQExpBuffer; #ifdef WIN32 tMasterThreadId = GetCurrentThreadId(); termEvent = CreateEvent(NULL, true, false, "Terminate"); #else signal(SIGTERM, sigTermHandler); signal(SIGINT, sigTermHandler); signal(SIGQUIT, sigTermHandler); #endif for (i = 0; i < pstate->numWorkers; i++) { #ifdef WIN32 WorkerInfo *wi; uintptr_t handle; #else pid_t pid; #endif int pipeMW[2], pipeWM[2]; if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0) exit_horribly(modulename, "Cannot create communication channels: %s\n", strerror(errno)); pstate->parallelSlot[i].workerStatus = WRKR_IDLE; pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs)); pstate->parallelSlot[i].args->AH = NULL; pstate->parallelSlot[i].args->te = NULL; #ifdef WIN32 /* Allocate a new structure for every worker */ wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo)); wi->ropt = ropt; wi->worker = i; wi->AH = AH; wi->pipeRead = pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ]; wi->pipeWrite = pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE]; handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32, wi, 0, &(pstate->parallelSlot[i].threadId)); pstate->parallelSlot[i].hThread = handle; #else pid = fork(); if (pid == 0) { /* we are the worker */ int j; int pipefd[2] = {pipeMW[PIPE_READ], pipeWM[PIPE_WRITE]}; /* * Store the fds for the reverse communication in pstate. Actually * we only use this in case of an error and don't use pstate * otherwise in the worker process. On Windows we write to the * global pstate, in Unix we write to our process-local copy but * that's also where we'd retrieve this information back from. */ pstate->parallelSlot[i].pipeRevRead = pipefd[PIPE_READ]; pstate->parallelSlot[i].pipeRevWrite = pipefd[PIPE_WRITE]; pstate->parallelSlot[i].pid = getpid(); /* * Call CloneArchive on Unix as well even though technically we * don't need to because fork() gives us a copy in our own address * space already. But CloneArchive resets the state information * and also clones the database connection (for parallel dump) * which both seem kinda helpful. */ pstate->parallelSlot[i].args->AH = CloneArchive(AH); /* close read end of Worker -> Master */ closesocket(pipeWM[PIPE_READ]); /* close write end of Master -> Worker */ closesocket(pipeMW[PIPE_WRITE]); /* * Close all inherited fds for communication of the master with * the other workers. */ for (j = 0; j < i; j++) { closesocket(pstate->parallelSlot[j].pipeRead); closesocket(pstate->parallelSlot[j].pipeWrite); } SetupWorker(pstate->parallelSlot[i].args->AH, pipefd, i, ropt); exit(0); } else if (pid < 0) /* fork failed */ exit_horribly(modulename, "could not create worker process: %s\n", strerror(errno)); /* we are the Master, pid > 0 here */ Assert(pid > 0); /* close read end of Master -> Worker */ closesocket(pipeMW[PIPE_READ]); /* close write end of Worker -> Master */ closesocket(pipeWM[PIPE_WRITE]); pstate->parallelSlot[i].pid = pid; #endif pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ]; pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE]; } return pstate; }
static char * readMessageFromPipe | ( | int | fd | ) | [static] |
Definition at line 1260 of file parallel.c.
References Assert, pg_malloc(), piperead, and realloc.
Referenced by getMessageFromMaster(), and getMessageFromWorker().
{ char *msg; int msgsize, bufsize; int ret; /* * The problem here is that we need to deal with several possibilites: we * could receive only a partial message or several messages at once. The * caller expects us to return exactly one message however. * * We could either read in as much as we can and keep track of what we * delivered back to the caller or we just read byte by byte. Once we see * (char) 0, we know that it's the message's end. This would be quite * inefficient for more data but since we are reading only on the command * channel, the performance loss does not seem worth the trouble of * keeping internal states for different file descriptors. */ bufsize = 64; /* could be any number */ msg = (char *) pg_malloc(bufsize); msgsize = 0; for (;;) { Assert(msgsize <= bufsize); ret = piperead(fd, msg + msgsize, 1); /* worker has closed the connection or another error happened */ if (ret <= 0) return NULL; Assert(ret == 1); if (msg[msgsize] == '\0') return msg; msgsize++; if (msgsize == bufsize) { /* could be any number */ bufsize += 16; msg = (char *) realloc(msg, bufsize); } } }
int ReapWorkerStatus | ( | ParallelState * | pstate, | |
int * | status | |||
) |
Definition at line 1003 of file parallel.c.
References i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::status, ParallelSlot::workerStatus, and WRKR_FINISHED.
Referenced by EnsureIdleWorker(), EnsureWorkersFinished(), and restore_toc_entries_parallel().
{ int i; for (i = 0; i < pstate->numWorkers; i++) { if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED) { *status = pstate->parallelSlot[i].status; pstate->parallelSlot[i].status = 0; pstate->parallelSlot[i].workerStatus = WRKR_IDLE; return i; } } return NO_SLOT; }
static int select_loop | ( | int | maxFd, | |
fd_set * | workerset | |||
) | [static] |
Definition at line 1120 of file parallel.c.
References aborting, Assert, EINTR, exit_horribly(), i, modulename, NULL, select, and wantAbort.
Referenced by getMessageFromWorker().
{ int i; fd_set saveSet = *workerset; #ifdef WIN32 /* should always be the master */ Assert(tMasterThreadId == GetCurrentThreadId()); for (;;) { /* * sleep a quarter of a second before checking if we should terminate. */ struct timeval tv = {0, 250000}; *workerset = saveSet; i = select(maxFd + 1, workerset, NULL, NULL, &tv); if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR) continue; if (i) break; } #else /* UNIX */ for (;;) { *workerset = saveSet; i = select(maxFd + 1, workerset, NULL, NULL, NULL); /* * If we Ctrl-C the master process , it's likely that we interrupt * select() here. The signal handler will set wantAbort == true and * the shutdown journey starts from here. Note that we'll come back * here later when we tell all workers to terminate and read their * responses. But then we have aborting set to true. */ if (wantAbort && !aborting) exit_horribly(modulename, "terminated by user\n"); if (i < 0 && errno == EINTR) continue; break; } #endif return i; }
static void sendMessageToMaster | ( | int | pipefd[2], | |
const char * | str | |||
) | [static] |
Definition at line 1104 of file parallel.c.
References exit_horribly(), modulename, PIPE_WRITE, pipewrite, and strerror().
Referenced by parallel_msg_master(), and WaitForCommands().
{ int len = strlen(str) + 1; if (pipewrite(pipefd[PIPE_WRITE], str, len) != len) exit_horribly(modulename, "Error writing to the communication channel: %s\n", strerror(errno)); }
static void sendMessageToWorker | ( | ParallelState * | pstate, | |
int | worker, | |||
const char * | str | |||
) | [static] |
Definition at line 1236 of file parallel.c.
References aborting, exit_horribly(), modulename, ParallelState::parallelSlot, ParallelSlot::pipeWrite, pipewrite, and strerror().
Referenced by DispatchJobForTocEntry().
{ int len = strlen(str) + 1; if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len) { /* * If we're already aborting anyway, don't care if we succeed or not. * The child might have gone already. */ #ifndef WIN32 if (!aborting) #endif exit_horribly(modulename, "Error writing to the communication channel: %s\n", strerror(errno)); } }
static void SetupWorker | ( | ArchiveHandle * | AH, | |
int | pipefd[2], | |||
int | worker, | |||
RestoreOptions * | ropt | |||
) | [static] |
Definition at line 438 of file parallel.c.
References Assert, closesocket, _archiveHandle::connection, NULL, PIPE_READ, PIPE_WRITE, _archiveHandle::SetupWorkerPtr, and WaitForCommands().
Referenced by ParallelBackupStart().
{ /* * Call the setup worker function that's defined in the ArchiveHandle. * * We get the raw connection only for the reason that we can close it * properly when we shut down. This happens only that way when it is * brought down because of an error. */ (AH->SetupWorkerPtr) ((Archive *) AH, ropt); Assert(AH->connection != NULL); WaitForCommands(AH, pipefd); closesocket(pipefd[PIPE_READ]); closesocket(pipefd[PIPE_WRITE]); }
static void ShutdownWorkersHard | ( | ParallelState * | pstate | ) | [static] |
Definition at line 353 of file parallel.c.
References closesocket, i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pid, ParallelSlot::pipeWrite, SIG_IGN, SIGPIPE, and WaitForTerminatingWorkers().
Referenced by archive_close_connection().
{ #ifndef WIN32 int i; signal(SIGPIPE, SIG_IGN); /* * Close our write end of the sockets so that the workers know they can * exit. */ for (i = 0; i < pstate->numWorkers; i++) closesocket(pstate->parallelSlot[i].pipeWrite); for (i = 0; i < pstate->numWorkers; i++) kill(pstate->parallelSlot[i].pid, SIGTERM); #else /* The workers monitor this event via checkAborting(). */ SetEvent(termEvent); #endif WaitForTerminatingWorkers(pstate); }
static void sigTermHandler | ( | int | signum | ) | [static] |
Definition at line 427 of file parallel.c.
References wantAbort.
Referenced by ParallelBackupStart().
{ wantAbort = 1; }
static void WaitForCommands | ( | ArchiveHandle * | AH, | |
int | pipefd[2] | |||
) | [static] |
Definition at line 856 of file parallel.c.
References archCustom, archDirectory, Assert, _archiveHandle::connection, _tocEntry::desc, exit_horribly(), _archiveHandle::format, free, getMessageFromMaster(), getTocEntryByDumpId(), lockTableNoWait(), messageStartsWith, modulename, NULL, PQfinish(), sendMessageToMaster(), _archiveHandle::WorkerJobDumpPtr, and _archiveHandle::WorkerJobRestorePtr.
Referenced by SetupWorker().
{ char *command; DumpId dumpId; int nBytes; char *str = NULL; TocEntry *te; for (;;) { if (!(command = getMessageFromMaster(pipefd))) { PQfinish(AH->connection); AH->connection = NULL; return; } if (messageStartsWith(command, "DUMP ")) { Assert(AH->format == archDirectory); sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes); Assert(nBytes == strlen(command) - strlen("DUMP ")); te = getTocEntryByDumpId(AH, dumpId); Assert(te != NULL); /* * Lock the table but with NOWAIT. Note that the parent is already * holding a lock. If we cannot acquire another ACCESS SHARE MODE * lock, then somebody else has requested an exclusive lock in the * meantime. lockTableNoWait dies in this case to prevent a * deadlock. */ if (strcmp(te->desc, "BLOBS") != 0) lockTableNoWait(AH, te); /* * The message we return here has been pg_malloc()ed and we are * responsible for free()ing it. */ str = (AH->WorkerJobDumpPtr) (AH, te); Assert(AH->connection != NULL); sendMessageToMaster(pipefd, str); free(str); } else if (messageStartsWith(command, "RESTORE ")) { Assert(AH->format == archDirectory || AH->format == archCustom); Assert(AH->connection != NULL); sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes); Assert(nBytes == strlen(command) - strlen("RESTORE ")); te = getTocEntryByDumpId(AH, dumpId); Assert(te != NULL); /* * The message we return here has been pg_malloc()ed and we are * responsible for free()ing it. */ str = (AH->WorkerJobRestorePtr) (AH, te); Assert(AH->connection != NULL); sendMessageToMaster(pipefd, str); free(str); } else exit_horribly(modulename, "Unknown command on communication channel: %s\n", command); } }
static void WaitForTerminatingWorkers | ( | ParallelState * | pstate | ) | [static] |
Definition at line 381 of file parallel.c.
References Assert, free, HasEveryWorkerTerminated(), ParallelState::numWorkers, ParallelState::parallelSlot, pg_malloc(), ParallelSlot::pid, ParallelSlot::workerStatus, and WRKR_TERMINATED.
Referenced by ParallelBackupEnd(), and ShutdownWorkersHard().
{ while (!HasEveryWorkerTerminated(pstate)) { ParallelSlot *slot = NULL; int j; #ifndef WIN32 int status; pid_t pid = wait(&status); for (j = 0; j < pstate->numWorkers; j++) if (pstate->parallelSlot[j].pid == pid) slot = &(pstate->parallelSlot[j]); #else uintptr_t hThread; DWORD ret; uintptr_t *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers); int nrun = 0; for (j = 0; j < pstate->numWorkers; j++) if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED) { lpHandles[nrun] = pstate->parallelSlot[j].hThread; nrun++; } ret = WaitForMultipleObjects(nrun, (HANDLE *) lpHandles, false, INFINITE); Assert(ret != WAIT_FAILED); hThread = lpHandles[ret - WAIT_OBJECT_0]; for (j = 0; j < pstate->numWorkers; j++) if (pstate->parallelSlot[j].hThread == hThread) slot = &(pstate->parallelSlot[j]); free(lpHandles); #endif Assert(slot); slot->workerStatus = WRKR_TERMINATED; } Assert(HasEveryWorkerTerminated(pstate)); }
Definition at line 61 of file parallel.c.
Referenced by archive_close_connection(), select_loop(), and sendMessageToWorker().
const char* modulename = gettext_noop("parallel archiver") [static] |
Definition at line 77 of file parallel.c.
Referenced by checkAborting(), EnsureIdleWorker(), EnsureWorkersFinished(), getMessageFromWorker(), ListenToWorkers(), lockTableNoWait(), ParallelBackupStart(), select_loop(), sendMessageToMaster(), sendMessageToWorker(), and WaitForCommands().
ShutdownInformation shutdown_info [static] |
Definition at line 75 of file parallel.c.
volatile sig_atomic_t wantAbort = 0 [static] |
Definition at line 62 of file parallel.c.
Referenced by checkAborting(), select_loop(), and sigTermHandler().