Header And Logo

PostgreSQL
| The world's most advanced open source database.

Data Structures | Defines | Typedefs | Functions | Variables

parallel.c File Reference

#include "postgres_fe.h"
#include "pg_backup_utils.h"
#include "parallel.h"
#include <sys/types.h>
#include <sys/wait.h>
#include "signal.h"
#include <unistd.h>
#include <fcntl.h>
Include dependency graph for parallel.c:

Go to the source code of this file.

Data Structures

struct  ShutdownInformation

Defines

#define PIPE_READ   0
#define PIPE_WRITE   1
#define pgpipe(a)   pipe(a)
#define piperead(a, b, c)   read(a,b,c)
#define pipewrite(a, b, c)   write(a,b,c)
#define messageStartsWith(msg, prefix)   (strncmp(msg, prefix, strlen(prefix)) == 0)
#define messageEquals(msg, pattern)   (strcmp(msg, pattern) == 0)

Typedefs

typedef struct ShutdownInformation ShutdownInformation

Functions

static ParallelSlotGetMyPSlot (ParallelState *pstate)
static void parallel_msg_master (ParallelSlot *slot, const char *modulename, const char *fmt, va_list ap) __attribute__((format(PG_PRINTF_ATTRIBUTE
static void static void archive_close_connection (int code, void *arg)
static void ShutdownWorkersHard (ParallelState *pstate)
static void WaitForTerminatingWorkers (ParallelState *pstate)
static void sigTermHandler (int signum)
static void SetupWorker (ArchiveHandle *AH, int pipefd[2], int worker, RestoreOptions *ropt)
static bool HasEveryWorkerTerminated (ParallelState *pstate)
static void lockTableNoWait (ArchiveHandle *AH, TocEntry *te)
static void WaitForCommands (ArchiveHandle *AH, int pipefd[2])
static char * getMessageFromMaster (int pipefd[2])
static void sendMessageToMaster (int pipefd[2], const char *str)
static int select_loop (int maxFd, fd_set *workerset)
static char * getMessageFromWorker (ParallelState *pstate, bool do_wait, int *worker)
static void sendMessageToWorker (ParallelState *pstate, int worker, const char *str)
static char * readMessageFromPipe (int fd)
void init_parallel_dump_utils (void)
void exit_horribly (const char *modulename, const char *fmt,...)
static PQExpBuffer getThreadLocalPQExpBuffer (void)
void on_exit_close_archive (Archive *AHX)
void checkAborting (ArchiveHandle *AH)
ParallelStateParallelBackupStart (ArchiveHandle *AH, RestoreOptions *ropt)
void ParallelBackupEnd (ArchiveHandle *AH, ParallelState *pstate)
void DispatchJobForTocEntry (ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act)
int GetIdleWorker (ParallelState *pstate)
bool IsEveryWorkerIdle (ParallelState *pstate)
void ListenToWorkers (ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
int ReapWorkerStatus (ParallelState *pstate, int *status)
void EnsureIdleWorker (ArchiveHandle *AH, ParallelState *pstate)
void EnsureWorkersFinished (ArchiveHandle *AH, ParallelState *pstate)

Variables

static bool aborting = false
static volatile sig_atomic_t wantAbort = 0
static ShutdownInformation shutdown_info
static const char * modulename = gettext_noop("parallel archiver")

Define Documentation

#define messageEquals (   msg,
  pattern 
)    (strcmp(msg, pattern) == 0)

Definition at line 108 of file parallel.c.

#define messageStartsWith (   msg,
  prefix 
)    (strncmp(msg, prefix, strlen(prefix)) == 0)

Definition at line 106 of file parallel.c.

Referenced by ListenToWorkers(), and WaitForCommands().

#define pgpipe (   a  )     pipe(a)

Definition at line 64 of file parallel.c.

Referenced by ParallelBackupStart().

#define PIPE_READ   0

Definition at line 32 of file parallel.c.

Referenced by getMessageFromMaster(), ParallelBackupStart(), and SetupWorker().

#define PIPE_WRITE   1

Definition at line 33 of file parallel.c.

Referenced by ParallelBackupStart(), sendMessageToMaster(), and SetupWorker().

#define piperead (   a,
  b,
  c 
)    read(a,b,c)

Definition at line 65 of file parallel.c.

Referenced by readMessageFromPipe().

#define pipewrite (   a,
  b,
  c 
)    write(a,b,c)

Definition at line 66 of file parallel.c.

Referenced by sendMessageToMaster(), and sendMessageToWorker().


Typedef Documentation


Function Documentation

static void archive_close_connection ( int  code,
void *  arg 
) [static]

Definition at line 292 of file parallel.c.

References aborting, ParallelArgs::AH, ShutdownInformation::AHX, ParallelSlot::args, DisconnectDatabase(), GetMyPSlot(), ShutdownInformation::pstate, _archiveHandle::public, and ShutdownWorkersHard().

Referenced by on_exit_close_archive().

{
    ShutdownInformation *si = (ShutdownInformation *) arg;

    if (si->pstate)
    {
        ParallelSlot *slot = GetMyPSlot(si->pstate);

        if (!slot)
        {
            /*
             * We're the master: We have already printed out the message
             * passed to exit_horribly() either from the master itself or from
             * a worker process. Now we need to close our own database
             * connection (only open during parallel dump but not restore) and
             * shut down the remaining workers.
             */
            DisconnectDatabase(si->AHX);
#ifndef WIN32

            /*
             * Setting aborting to true switches to best-effort-mode
             * (send/receive but ignore errors) in communicating with our
             * workers.
             */
            aborting = true;
#endif
            ShutdownWorkersHard(si->pstate);
        }
        else if (slot->args->AH)
            DisconnectDatabase(&(slot->args->AH->public));
    }
    else if (si->AHX)
        DisconnectDatabase(si->AHX);
}

void checkAborting ( ArchiveHandle AH  ) 

Definition at line 336 of file parallel.c.

References exit_horribly(), modulename, and wantAbort.

Referenced by _WriteBuf(), _WriteData(), ReadDataFromArchiveNone(), and WriteDataToArchive().

{
#ifdef WIN32
    if (WaitForSingleObject(termEvent, 0) == WAIT_OBJECT_0)
#else
    if (wantAbort)
#endif
        exit_horribly(modulename, "worker is terminating\n");
}

void DispatchJobForTocEntry ( ArchiveHandle AH,
ParallelState pstate,
TocEntry te,
T_Action  act 
)

Definition at line 722 of file parallel.c.

References arg, ParallelSlot::args, Assert, GetIdleWorker(), _archiveHandle::MasterStartParallelItemPtr, NO_SLOT, ParallelState::parallelSlot, sendMessageToWorker(), ParallelArgs::te, and ParallelSlot::workerStatus.

Referenced by restore_toc_entries_parallel(), and WriteDataChunks().

{
    int         worker;
    char       *arg;

    /* our caller makes sure that at least one worker is idle */
    Assert(GetIdleWorker(pstate) != NO_SLOT);
    worker = GetIdleWorker(pstate);
    Assert(worker != NO_SLOT);

    arg = (AH->MasterStartParallelItemPtr) (AH, te, act);

    sendMessageToWorker(pstate, worker, arg);

    pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
    pstate->parallelSlot[worker].args->te = te;
}

void EnsureIdleWorker ( ArchiveHandle AH,
ParallelState pstate 
)

Definition at line 1026 of file parallel.c.

References exit_horribly(), GetIdleWorker(), ListenToWorkers(), modulename, NO_SLOT, and ReapWorkerStatus().

Referenced by WriteDataChunks().

{
    int         ret_worker;
    int         work_status;

    for (;;)
    {
        int         nTerm = 0;

        while ((ret_worker = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
        {
            if (work_status != 0)
                exit_horribly(modulename, "Error processing a parallel work item.\n");

            nTerm++;
        }

        /*
         * We need to make sure that we have an idle worker before dispatching
         * the next item. If nTerm > 0 we already have that (quick check).
         */
        if (nTerm > 0)
            return;

        /* explicit check for an idle worker */
        if (GetIdleWorker(pstate) != NO_SLOT)
            return;

        /*
         * If we have no idle worker, read the result of one or more workers
         * and loop the loop to call ReapWorkerStatus() on them
         */
        ListenToWorkers(AH, pstate, true);
    }
}

void EnsureWorkersFinished ( ArchiveHandle AH,
ParallelState pstate 
)

Definition at line 1068 of file parallel.c.

References exit_horribly(), IsEveryWorkerIdle(), ListenToWorkers(), modulename, NO_SLOT, ParallelState::numWorkers, and ReapWorkerStatus().

Referenced by WriteDataChunks().

{
    int         work_status;

    if (!pstate || pstate->numWorkers == 1)
        return;

    /* Waiting for the remaining worker processes to finish */
    while (!IsEveryWorkerIdle(pstate))
    {
        if (ReapWorkerStatus(pstate, &work_status) == NO_SLOT)
            ListenToWorkers(AH, pstate, true);
        else if (work_status != 0)
            exit_horribly(modulename,
                          "Error processing a parallel work item\n");
    }
}

void exit_horribly ( const char *  modulename,
const char *  fmt,
  ... 
)

Definition at line 179 of file parallel.c.

References exit_nicely, GetMyPSlot(), NULL, parallel_msg_master(), ShutdownInformation::pstate, and vwrite_msg().

Referenced by _allocAH(), _check_database_version(), _Clone(), _CloseArchive(), _connectDB(), _CustomReadFunc(), _discoverArchiveFormat(), _doSetSessionAuth(), _EndBlob(), _LoadBlobs(), _PrintFileData(), _PrintTocData(), _ReadByte(), _ReopenArchive(), _skipData(), _StartBlob(), _StartBlobs(), _StartData(), _tarAddFile(), _tarGetHeader(), _tarPositionTo(), _tarReadRaw(), _tarWriteHeader(), _WorkerJobDumpDirectory(), _WriteBuf(), _WriteByte(), ahwrite(), AllocateCompressor(), binary_upgrade_extension_member(), buildTocEntryArrays(), cfopen(), cfopen_write(), checkAborting(), CloseArchive(), ConnectDatabase(), createViewAsClause(), die_on_query_failure(), dump_lo_buf(), dumpACL(), dumpBlobs(), dumpConstraint(), dumpDefaultACL(), dumpFunc(), EndDBCopyMode(), EnsureIdleWorker(), EnsureWorkersFinished(), ExecuteSqlCommandBuf(), ExecuteSqlQueryForSingleRow(), expand_schema_name_patterns(), findDependencyLoops(), findNamespace(), getAttrName(), getMessageFromWorker(), getRules(), getTableAttrs(), getTriggers(), InitArchiveFmt_Custom(), InitArchiveFmt_Directory(), InitArchiveFmt_Null(), InitArchiveFmt_Tar(), ListenToWorkers(), lockTableNoWait(), main(), mark_work_done(), on_exit_nicely(), ParallelBackupStart(), parseArchiveFormat(), ParseCompressionOption(), processEncodingEntry(), processStdStringsEntry(), ReadDataFromArchive(), ReadHead(), ReadOffset(), ReadStr(), ReadToc(), RestoreArchive(), RestoreOutput(), select_loop(), sendMessageToMaster(), sendMessageToWorker(), SetArchiveRestoreOptions(), setFilePath(), SetOutput(), setup_connection(), SortTocFromFile(), StartBlob(), StartRestoreBlob(), tarClose(), tarOpen(), tarWrite(), TopoSort(), WaitForCommands(), WriteData(), WriteDataToArchive(), and WriteDataToArchiveNone().

{
    va_list     ap;
    ParallelState *pstate = shutdown_info.pstate;
    ParallelSlot *slot;

    va_start(ap, fmt);

    if (pstate == NULL)
    {
        /* Not in parallel mode, just write to stderr */
        vwrite_msg(modulename, fmt, ap);
    }
    else
    {
        slot = GetMyPSlot(pstate);

        if (!slot)
            /* We're the parent, just write the message out */
            vwrite_msg(modulename, fmt, ap);
        else
            /* If we're a worker process, send the msg to the master process */
            parallel_msg_master(slot, modulename, fmt, ap);
    }

    va_end(ap);

    exit_nicely(1);
}

int GetIdleWorker ( ParallelState pstate  ) 

Definition at line 745 of file parallel.c.

References i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::workerStatus, and WRKR_IDLE.

Referenced by DispatchJobForTocEntry(), EnsureIdleWorker(), restore_toc_entries_parallel(), and WriteDataChunks().

{
    int         i;

    for (i = 0; i < pstate->numWorkers; i++)
        if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
            return i;
    return NO_SLOT;
}

static char * getMessageFromMaster ( int  pipefd[2]  )  [static]

Definition at line 1093 of file parallel.c.

References PIPE_READ, and readMessageFromPipe().

Referenced by WaitForCommands().

{
    return readMessageFromPipe(pipefd[PIPE_READ]);
}

static char * getMessageFromWorker ( ParallelState pstate,
bool  do_wait,
int *  worker 
) [static]

Definition at line 1182 of file parallel.c.

References Assert, exit_horribly(), i, modulename, NULL, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pipeRead, readMessageFromPipe(), select, select_loop(), strerror(), ParallelSlot::workerStatus, and WRKR_TERMINATED.

Referenced by ListenToWorkers().

{
    int         i;
    fd_set      workerset;
    int         maxFd = -1;
    struct timeval nowait = {0, 0};

    FD_ZERO(&workerset);

    for (i = 0; i < pstate->numWorkers; i++)
    {
        if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED)
            continue;
        FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
        /* actually WIN32 ignores the first parameter to select()... */
        if (pstate->parallelSlot[i].pipeRead > maxFd)
            maxFd = pstate->parallelSlot[i].pipeRead;
    }

    if (do_wait)
    {
        i = select_loop(maxFd, &workerset);
        Assert(i != 0);
    }
    else
    {
        if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
            return NULL;
    }

    if (i < 0)
        exit_horribly(modulename, "Error in ListenToWorkers(): %s", strerror(errno));

    for (i = 0; i < pstate->numWorkers; i++)
    {
        char       *msg;

        if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
            continue;

        msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
        *worker = i;
        return msg;
    }
    Assert(false);
    return NULL;
}

static ParallelSlot * GetMyPSlot ( ParallelState pstate  )  [static]

Definition at line 153 of file parallel.c.

References i, ParallelState::numWorkers, ParallelState::parallelSlot, and ParallelSlot::pid.

Referenced by archive_close_connection(), and exit_horribly().

{
    int         i;

    for (i = 0; i < pstate->numWorkers; i++)
#ifdef WIN32
        if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
#else
        if (pstate->parallelSlot[i].pid == getpid())
#endif
            return &(pstate->parallelSlot[i]);

    return NULL;
}

static PQExpBuffer getThreadLocalPQExpBuffer ( void   )  [static]

Definition at line 234 of file parallel.c.

References createPQExpBuffer(), and resetPQExpBuffer().

{
    /*
     * The Tls code goes awry if we use a static var, so we provide for both
     * static and auto, and omit any use of the static var when using Tls.
     */
    static PQExpBuffer s_id_return = NULL;
    PQExpBuffer id_return;

#ifdef WIN32
    if (parallel_init_done)
        id_return = (PQExpBuffer) TlsGetValue(tls_index);       /* 0 when not set */
    else
        id_return = s_id_return;
#else
    id_return = s_id_return;
#endif

    if (id_return)              /* first time through? */
    {
        /* same buffer, just wipe contents */
        resetPQExpBuffer(id_return);
    }
    else
    {
        /* new buffer */
        id_return = createPQExpBuffer();
#ifdef WIN32
        if (parallel_init_done)
            TlsSetValue(tls_index, id_return);
        else
            s_id_return = id_return;
#else
        s_id_return = id_return;
#endif

    }

    return id_return;
}

static bool HasEveryWorkerTerminated ( ParallelState pstate  )  [static]

Definition at line 759 of file parallel.c.

References i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::workerStatus, and WRKR_TERMINATED.

Referenced by WaitForTerminatingWorkers().

{
    int         i;

    for (i = 0; i < pstate->numWorkers; i++)
        if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED)
            return false;
    return true;
}

void init_parallel_dump_utils ( void   ) 

Definition at line 130 of file parallel.c.

References _, exit_nicely, NULL, and on_exit_nicely().

Referenced by main().

{
#ifdef WIN32
    if (!parallel_init_done)
    {
        WSADATA     wsaData;
        int         err;

        tls_index = TlsAlloc();
        mainThreadId = GetCurrentThreadId();
        err = WSAStartup(MAKEWORD(2, 2), &wsaData);
        if (err != 0)
        {
            fprintf(stderr, _("WSAStartup failed: %d\n"), err);
            exit_nicely(1);
        }
        on_exit_nicely(shutdown_parallel_dump_utils, NULL);
        parallel_init_done = true;
    }
#endif
}

bool IsEveryWorkerIdle ( ParallelState pstate  ) 

Definition at line 773 of file parallel.c.

References i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::workerStatus, and WRKR_IDLE.

Referenced by EnsureWorkersFinished(), ParallelBackupEnd(), and restore_toc_entries_parallel().

{
    int         i;

    for (i = 0; i < pstate->numWorkers; i++)
        if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
            return false;
    return true;
}

void ListenToWorkers ( ArchiveHandle AH,
ParallelState pstate,
bool  do_wait 
)

Definition at line 943 of file parallel.c.

References ACT_DUMP, ACT_RESTORE, archCustom, archDirectory, ParallelSlot::args, Assert, exit_horribly(), _archiveHandle::format, free, getMessageFromWorker(), _archiveHandle::MasterEndParallelItemPtr, messageStartsWith, modulename, ParallelState::parallelSlot, ParallelSlot::status, ParallelArgs::te, and ParallelSlot::workerStatus.

Referenced by EnsureIdleWorker(), EnsureWorkersFinished(), and restore_toc_entries_parallel().

{
    int         worker;
    char       *msg;

    msg = getMessageFromWorker(pstate, do_wait, &worker);

    if (!msg)
    {
        if (do_wait)
            exit_horribly(modulename, "A worker process died unexpectedly\n");
        return;
    }

    if (messageStartsWith(msg, "OK "))
    {
        char       *statusString;
        TocEntry   *te;

        pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED;
        te = pstate->parallelSlot[worker].args->te;
        if (messageStartsWith(msg, "OK RESTORE "))
        {
            statusString = msg + strlen("OK RESTORE ");
            pstate->parallelSlot[worker].status =
                (AH->MasterEndParallelItemPtr)
                (AH, te, statusString, ACT_RESTORE);
        }
        else if (messageStartsWith(msg, "OK DUMP "))
        {
            statusString = msg + strlen("OK DUMP ");
            pstate->parallelSlot[worker].status =
                (AH->MasterEndParallelItemPtr)
                (AH, te, statusString, ACT_DUMP);
        }
        else
            exit_horribly(modulename,
                          "Invalid message received from worker: %s\n", msg);
    }
    else if (messageStartsWith(msg, "ERROR "))
    {
        Assert(AH->format == archDirectory || AH->format == archCustom);
        pstate->parallelSlot[worker].workerStatus = WRKR_TERMINATED;
        exit_horribly(modulename, "%s", msg + strlen("ERROR "));
    }
    else
        exit_horribly(modulename, "Invalid message received from worker: %s\n", msg);

    /* both Unix and Win32 return pg_malloc()ed space, so we free it */
    free(msg);
}

static void lockTableNoWait ( ArchiveHandle AH,
TocEntry te 
) [static]

Definition at line 801 of file parallel.c.

References appendPQExpBuffer(), archDirectory, Assert, _tocEntry::catalogId, _archiveHandle::connection, createPQExpBuffer(), PQExpBufferData::data, _tocEntry::desc, destroyPQExpBuffer(), exit_horribly(), fmtQualifiedId(), _archiveHandle::format, modulename, CatalogId::oid, PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear(), PQerrorMessage(), PQexec(), PQgetvalue(), PQresultStatus(), Archive::remoteVersion, and resetPQExpBuffer().

Referenced by WaitForCommands().

{
    Archive    *AHX = (Archive *) AH;
    const char *qualId;
    PQExpBuffer query = createPQExpBuffer();
    PGresult   *res;

    Assert(AH->format == archDirectory);
    Assert(strcmp(te->desc, "BLOBS") != 0);

    appendPQExpBuffer(query,
                      "SELECT pg_namespace.nspname,"
                      "       pg_class.relname "
                      "  FROM pg_class "
                    "  JOIN pg_namespace on pg_namespace.oid = relnamespace "
                      " WHERE pg_class.oid = %d", te->catalogId.oid);

    res = PQexec(AH->connection, query->data);

    if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
        exit_horribly(modulename,
                      "could not get relation name for oid %d: %s\n",
                      te->catalogId.oid, PQerrorMessage(AH->connection));

    resetPQExpBuffer(query);

    qualId = fmtQualifiedId(AHX->remoteVersion,
                            PQgetvalue(res, 0, 0),
                            PQgetvalue(res, 0, 1));

    appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
                      qualId);
    PQclear(res);

    res = PQexec(AH->connection, query->data);

    if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
        exit_horribly(modulename,
                      "could not obtain lock on relation \"%s\". This "
             "usually means that someone requested an ACCESS EXCLUSIVE lock "
              "on the table after the pg_dump parent process has gotten the "
                      "initial ACCESS SHARE lock on the table.\n", qualId);

    PQclear(res);
    destroyPQExpBuffer(query);
}

void on_exit_close_archive ( Archive AHX  ) 
static void parallel_msg_master ( ParallelSlot slot,
const char *  modulename,
const char *  fmt,
va_list  ap 
) [static]

Definition at line 211 of file parallel.c.

References buf, ParallelSlot::pipeRevRead, ParallelSlot::pipeRevWrite, sendMessageToMaster(), and vsnprintf().

Referenced by exit_horribly().

{
    char        buf[512];
    int         pipefd[2];

    pipefd[PIPE_READ] = slot->pipeRevRead;
    pipefd[PIPE_WRITE] = slot->pipeRevWrite;

    strcpy(buf, "ERROR ");
    vsnprintf(buf + strlen("ERROR "),
              sizeof(buf) - strlen("ERROR "), fmt, ap);

    sendMessageToMaster(pipefd, buf);
}

void ParallelBackupEnd ( ArchiveHandle AH,
ParallelState pstate 
)

Definition at line 634 of file parallel.c.

References Assert, closesocket, free, i, IsEveryWorkerIdle(), ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pipeRead, ParallelSlot::pipeWrite, ShutdownInformation::pstate, and WaitForTerminatingWorkers().

Referenced by _CloseArchive(), and RestoreArchive().

{
    int         i;

    if (pstate->numWorkers == 1)
        return;

    Assert(IsEveryWorkerIdle(pstate));

    /* close the sockets so that the workers know they can exit */
    for (i = 0; i < pstate->numWorkers; i++)
    {
        closesocket(pstate->parallelSlot[i].pipeRead);
        closesocket(pstate->parallelSlot[i].pipeWrite);
    }
    WaitForTerminatingWorkers(pstate);

    /*
     * Remove the pstate again, so the exit handler in the parent will now
     * again fall back to closing AH->connection (if connected).
     */
    shutdown_info.pstate = NULL;

    free(pstate->parallelSlot);
    free(pstate);
}

ParallelState* ParallelBackupStart ( ArchiveHandle AH,
RestoreOptions ropt 
)

Definition at line 484 of file parallel.c.

References ParallelArgs::AH, ParallelSlot::args, Assert, CloneArchive(), closesocket, exit_horribly(), getLocalPQExpBuffer, i, modulename, NULL, ParallelState::numWorkers, Archive::numWorkers, ParallelState::parallelSlot, pg_malloc(), pgpipe, ParallelSlot::pid, PIPE_READ, PIPE_WRITE, ParallelSlot::pipeRead, ParallelSlot::pipeRevRead, ParallelSlot::pipeRevWrite, ParallelSlot::pipeWrite, ShutdownInformation::pstate, _archiveHandle::public, SetupWorker(), SIGQUIT, sigTermHandler(), strerror(), ParallelArgs::te, and ParallelSlot::workerStatus.

Referenced by _CloseArchive(), and RestoreArchive().

{
    ParallelState *pstate;
    int         i;
    const size_t slotSize = AH->public.numWorkers * sizeof(ParallelSlot);

    Assert(AH->public.numWorkers > 0);

    /* Ensure stdio state is quiesced before forking */
    fflush(NULL);

    pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));

    pstate->numWorkers = AH->public.numWorkers;
    pstate->parallelSlot = NULL;

    if (AH->public.numWorkers == 1)
        return pstate;

    pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize);
    memset((void *) pstate->parallelSlot, 0, slotSize);

    /*
     * Set the pstate in the shutdown_info. The exit handler uses pstate if
     * set and falls back to AHX otherwise.
     */
    shutdown_info.pstate = pstate;
    getLocalPQExpBuffer = getThreadLocalPQExpBuffer;

#ifdef WIN32
    tMasterThreadId = GetCurrentThreadId();
    termEvent = CreateEvent(NULL, true, false, "Terminate");
#else
    signal(SIGTERM, sigTermHandler);
    signal(SIGINT, sigTermHandler);
    signal(SIGQUIT, sigTermHandler);
#endif

    for (i = 0; i < pstate->numWorkers; i++)
    {
#ifdef WIN32
        WorkerInfo *wi;
        uintptr_t   handle;
#else
        pid_t       pid;
#endif
        int         pipeMW[2],
                    pipeWM[2];

        if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
            exit_horribly(modulename,
                          "Cannot create communication channels: %s\n",
                          strerror(errno));

        pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
        pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
        pstate->parallelSlot[i].args->AH = NULL;
        pstate->parallelSlot[i].args->te = NULL;
#ifdef WIN32
        /* Allocate a new structure for every worker */
        wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));

        wi->ropt = ropt;
        wi->worker = i;
        wi->AH = AH;
        wi->pipeRead = pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ];
        wi->pipeWrite = pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];

        handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
                                wi, 0, &(pstate->parallelSlot[i].threadId));
        pstate->parallelSlot[i].hThread = handle;
#else
        pid = fork();
        if (pid == 0)
        {
            /* we are the worker */
            int         j;
            int         pipefd[2] = {pipeMW[PIPE_READ], pipeWM[PIPE_WRITE]};

            /*
             * Store the fds for the reverse communication in pstate. Actually
             * we only use this in case of an error and don't use pstate
             * otherwise in the worker process. On Windows we write to the
             * global pstate, in Unix we write to our process-local copy but
             * that's also where we'd retrieve this information back from.
             */
            pstate->parallelSlot[i].pipeRevRead = pipefd[PIPE_READ];
            pstate->parallelSlot[i].pipeRevWrite = pipefd[PIPE_WRITE];
            pstate->parallelSlot[i].pid = getpid();

            /*
             * Call CloneArchive on Unix as well even though technically we
             * don't need to because fork() gives us a copy in our own address
             * space already. But CloneArchive resets the state information
             * and also clones the database connection (for parallel dump)
             * which both seem kinda helpful.
             */
            pstate->parallelSlot[i].args->AH = CloneArchive(AH);

            /* close read end of Worker -> Master */
            closesocket(pipeWM[PIPE_READ]);
            /* close write end of Master -> Worker */
            closesocket(pipeMW[PIPE_WRITE]);

            /*
             * Close all inherited fds for communication of the master with
             * the other workers.
             */
            for (j = 0; j < i; j++)
            {
                closesocket(pstate->parallelSlot[j].pipeRead);
                closesocket(pstate->parallelSlot[j].pipeWrite);
            }

            SetupWorker(pstate->parallelSlot[i].args->AH, pipefd, i, ropt);

            exit(0);
        }
        else if (pid < 0)
            /* fork failed */
            exit_horribly(modulename,
                          "could not create worker process: %s\n",
                          strerror(errno));

        /* we are the Master, pid > 0 here */
        Assert(pid > 0);

        /* close read end of Master -> Worker */
        closesocket(pipeMW[PIPE_READ]);
        /* close write end of Worker -> Master */
        closesocket(pipeWM[PIPE_WRITE]);

        pstate->parallelSlot[i].pid = pid;
#endif

        pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
        pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE];
    }

    return pstate;
}

static char * readMessageFromPipe ( int  fd  )  [static]

Definition at line 1260 of file parallel.c.

References Assert, pg_malloc(), piperead, and realloc.

Referenced by getMessageFromMaster(), and getMessageFromWorker().

{
    char       *msg;
    int         msgsize,
                bufsize;
    int         ret;

    /*
     * The problem here is that we need to deal with several possibilites: we
     * could receive only a partial message or several messages at once. The
     * caller expects us to return exactly one message however.
     *
     * We could either read in as much as we can and keep track of what we
     * delivered back to the caller or we just read byte by byte. Once we see
     * (char) 0, we know that it's the message's end. This would be quite
     * inefficient for more data but since we are reading only on the command
     * channel, the performance loss does not seem worth the trouble of
     * keeping internal states for different file descriptors.
     */
    bufsize = 64;               /* could be any number */
    msg = (char *) pg_malloc(bufsize);

    msgsize = 0;
    for (;;)
    {
        Assert(msgsize <= bufsize);
        ret = piperead(fd, msg + msgsize, 1);

        /* worker has closed the connection or another error happened */
        if (ret <= 0)
            return NULL;

        Assert(ret == 1);

        if (msg[msgsize] == '\0')
            return msg;

        msgsize++;
        if (msgsize == bufsize)
        {
            /* could be any number */
            bufsize += 16;
            msg = (char *) realloc(msg, bufsize);
        }
    }
}

int ReapWorkerStatus ( ParallelState pstate,
int *  status 
)

Definition at line 1003 of file parallel.c.

References i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::status, ParallelSlot::workerStatus, and WRKR_FINISHED.

Referenced by EnsureIdleWorker(), EnsureWorkersFinished(), and restore_toc_entries_parallel().

{
    int         i;

    for (i = 0; i < pstate->numWorkers; i++)
    {
        if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED)
        {
            *status = pstate->parallelSlot[i].status;
            pstate->parallelSlot[i].status = 0;
            pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
            return i;
        }
    }
    return NO_SLOT;
}

static int select_loop ( int  maxFd,
fd_set *  workerset 
) [static]

Definition at line 1120 of file parallel.c.

References aborting, Assert, EINTR, exit_horribly(), i, modulename, NULL, select, and wantAbort.

Referenced by getMessageFromWorker().

{
    int         i;
    fd_set      saveSet = *workerset;

#ifdef WIN32
    /* should always be the master */
    Assert(tMasterThreadId == GetCurrentThreadId());

    for (;;)
    {
        /*
         * sleep a quarter of a second before checking if we should terminate.
         */
        struct timeval tv = {0, 250000};

        *workerset = saveSet;
        i = select(maxFd + 1, workerset, NULL, NULL, &tv);

        if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
            continue;
        if (i)
            break;
    }

#else                           /* UNIX */

    for (;;)
    {
        *workerset = saveSet;
        i = select(maxFd + 1, workerset, NULL, NULL, NULL);

        /*
         * If we Ctrl-C the master process , it's likely that we interrupt
         * select() here. The signal handler will set wantAbort == true and
         * the shutdown journey starts from here. Note that we'll come back
         * here later when we tell all workers to terminate and read their
         * responses. But then we have aborting set to true.
         */
        if (wantAbort && !aborting)
            exit_horribly(modulename, "terminated by user\n");

        if (i < 0 && errno == EINTR)
            continue;
        break;
    }

#endif

    return i;
}

static void sendMessageToMaster ( int  pipefd[2],
const char *  str 
) [static]

Definition at line 1104 of file parallel.c.

References exit_horribly(), modulename, PIPE_WRITE, pipewrite, and strerror().

Referenced by parallel_msg_master(), and WaitForCommands().

{
    int         len = strlen(str) + 1;

    if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
        exit_horribly(modulename,
                      "Error writing to the communication channel: %s\n",
                      strerror(errno));
}

static void sendMessageToWorker ( ParallelState pstate,
int  worker,
const char *  str 
) [static]

Definition at line 1236 of file parallel.c.

References aborting, exit_horribly(), modulename, ParallelState::parallelSlot, ParallelSlot::pipeWrite, pipewrite, and strerror().

Referenced by DispatchJobForTocEntry().

{
    int         len = strlen(str) + 1;

    if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
    {
        /*
         * If we're already aborting anyway, don't care if we succeed or not.
         * The child might have gone already.
         */
#ifndef WIN32
        if (!aborting)
#endif
            exit_horribly(modulename,
                          "Error writing to the communication channel: %s\n",
                          strerror(errno));
    }
}

static void SetupWorker ( ArchiveHandle AH,
int  pipefd[2],
int  worker,
RestoreOptions ropt 
) [static]

Definition at line 438 of file parallel.c.

References Assert, closesocket, _archiveHandle::connection, NULL, PIPE_READ, PIPE_WRITE, _archiveHandle::SetupWorkerPtr, and WaitForCommands().

Referenced by ParallelBackupStart().

{
    /*
     * Call the setup worker function that's defined in the ArchiveHandle.
     *
     * We get the raw connection only for the reason that we can close it
     * properly when we shut down. This happens only that way when it is
     * brought down because of an error.
     */
    (AH->SetupWorkerPtr) ((Archive *) AH, ropt);

    Assert(AH->connection != NULL);

    WaitForCommands(AH, pipefd);

    closesocket(pipefd[PIPE_READ]);
    closesocket(pipefd[PIPE_WRITE]);
}

static void ShutdownWorkersHard ( ParallelState pstate  )  [static]

Definition at line 353 of file parallel.c.

References closesocket, i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pid, ParallelSlot::pipeWrite, SIG_IGN, SIGPIPE, and WaitForTerminatingWorkers().

Referenced by archive_close_connection().

{
#ifndef WIN32
    int         i;

    signal(SIGPIPE, SIG_IGN);

    /*
     * Close our write end of the sockets so that the workers know they can
     * exit.
     */
    for (i = 0; i < pstate->numWorkers; i++)
        closesocket(pstate->parallelSlot[i].pipeWrite);

    for (i = 0; i < pstate->numWorkers; i++)
        kill(pstate->parallelSlot[i].pid, SIGTERM);
#else
    /* The workers monitor this event via checkAborting(). */
    SetEvent(termEvent);
#endif

    WaitForTerminatingWorkers(pstate);
}

static void sigTermHandler ( int  signum  )  [static]

Definition at line 427 of file parallel.c.

References wantAbort.

Referenced by ParallelBackupStart().

{
    wantAbort = 1;
}

static void WaitForCommands ( ArchiveHandle AH,
int  pipefd[2] 
) [static]

Definition at line 856 of file parallel.c.

References archCustom, archDirectory, Assert, _archiveHandle::connection, _tocEntry::desc, exit_horribly(), _archiveHandle::format, free, getMessageFromMaster(), getTocEntryByDumpId(), lockTableNoWait(), messageStartsWith, modulename, NULL, PQfinish(), sendMessageToMaster(), _archiveHandle::WorkerJobDumpPtr, and _archiveHandle::WorkerJobRestorePtr.

Referenced by SetupWorker().

{
    char       *command;
    DumpId      dumpId;
    int         nBytes;
    char       *str = NULL;
    TocEntry   *te;

    for (;;)
    {
        if (!(command = getMessageFromMaster(pipefd)))
        {
            PQfinish(AH->connection);
            AH->connection = NULL;
            return;
        }

        if (messageStartsWith(command, "DUMP "))
        {
            Assert(AH->format == archDirectory);
            sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes);
            Assert(nBytes == strlen(command) - strlen("DUMP "));

            te = getTocEntryByDumpId(AH, dumpId);
            Assert(te != NULL);

            /*
             * Lock the table but with NOWAIT. Note that the parent is already
             * holding a lock. If we cannot acquire another ACCESS SHARE MODE
             * lock, then somebody else has requested an exclusive lock in the
             * meantime.  lockTableNoWait dies in this case to prevent a
             * deadlock.
             */
            if (strcmp(te->desc, "BLOBS") != 0)
                lockTableNoWait(AH, te);

            /*
             * The message we return here has been pg_malloc()ed and we are
             * responsible for free()ing it.
             */
            str = (AH->WorkerJobDumpPtr) (AH, te);
            Assert(AH->connection != NULL);
            sendMessageToMaster(pipefd, str);
            free(str);
        }
        else if (messageStartsWith(command, "RESTORE "))
        {
            Assert(AH->format == archDirectory || AH->format == archCustom);
            Assert(AH->connection != NULL);

            sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes);
            Assert(nBytes == strlen(command) - strlen("RESTORE "));

            te = getTocEntryByDumpId(AH, dumpId);
            Assert(te != NULL);

            /*
             * The message we return here has been pg_malloc()ed and we are
             * responsible for free()ing it.
             */
            str = (AH->WorkerJobRestorePtr) (AH, te);
            Assert(AH->connection != NULL);
            sendMessageToMaster(pipefd, str);
            free(str);
        }
        else
            exit_horribly(modulename,
                          "Unknown command on communication channel: %s\n",
                          command);
    }
}

static void WaitForTerminatingWorkers ( ParallelState pstate  )  [static]

Definition at line 381 of file parallel.c.

References Assert, free, HasEveryWorkerTerminated(), ParallelState::numWorkers, ParallelState::parallelSlot, pg_malloc(), ParallelSlot::pid, ParallelSlot::workerStatus, and WRKR_TERMINATED.

Referenced by ParallelBackupEnd(), and ShutdownWorkersHard().

{
    while (!HasEveryWorkerTerminated(pstate))
    {
        ParallelSlot *slot = NULL;
        int         j;

#ifndef WIN32
        int         status;
        pid_t       pid = wait(&status);

        for (j = 0; j < pstate->numWorkers; j++)
            if (pstate->parallelSlot[j].pid == pid)
                slot = &(pstate->parallelSlot[j]);
#else
        uintptr_t   hThread;
        DWORD       ret;
        uintptr_t  *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
        int         nrun = 0;

        for (j = 0; j < pstate->numWorkers; j++)
            if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
            {
                lpHandles[nrun] = pstate->parallelSlot[j].hThread;
                nrun++;
            }
        ret = WaitForMultipleObjects(nrun, (HANDLE *) lpHandles, false, INFINITE);
        Assert(ret != WAIT_FAILED);
        hThread = lpHandles[ret - WAIT_OBJECT_0];

        for (j = 0; j < pstate->numWorkers; j++)
            if (pstate->parallelSlot[j].hThread == hThread)
                slot = &(pstate->parallelSlot[j]);

        free(lpHandles);
#endif
        Assert(slot);

        slot->workerStatus = WRKR_TERMINATED;
    }
    Assert(HasEveryWorkerTerminated(pstate));
}


Variable Documentation

bool aborting = false [static]

Definition at line 61 of file parallel.c.

Referenced by archive_close_connection(), select_loop(), and sendMessageToWorker().

const char* modulename = gettext_noop("parallel archiver") [static]

Definition at line 75 of file parallel.c.

volatile sig_atomic_t wantAbort = 0 [static]

Definition at line 62 of file parallel.c.

Referenced by checkAborting(), select_loop(), and sigTermHandler().