Header And Logo

PostgreSQL
| The world's most advanced open source database.

Data Structures | Defines | Typedefs | Enumerations | Functions | Variables

parallel.h File Reference

#include "pg_backup_db.h"
Include dependency graph for parallel.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ParallelArgs
struct  ParallelSlot
struct  ParallelState

Defines

#define NO_SLOT   (-1)

Typedefs

typedef struct ParallelArgs ParallelArgs
typedef struct ParallelSlot ParallelSlot
typedef struct ParallelState ParallelState

Enumerations

enum  T_WorkerStatus { WRKR_TERMINATED = 0, WRKR_IDLE, WRKR_WORKING, WRKR_FINISHED }

Functions

void init_parallel_dump_utils (void)
int GetIdleWorker (ParallelState *pstate)
bool IsEveryWorkerIdle (ParallelState *pstate)
void ListenToWorkers (struct _archiveHandle *AH, ParallelState *pstate, bool do_wait)
int ReapWorkerStatus (ParallelState *pstate, int *status)
void EnsureIdleWorker (struct _archiveHandle *AH, ParallelState *pstate)
void EnsureWorkersFinished (struct _archiveHandle *AH, ParallelState *pstate)
ParallelStateParallelBackupStart (struct _archiveHandle *AH, RestoreOptions *ropt)
void DispatchJobForTocEntry (struct _archiveHandle *AH, ParallelState *pstate, struct _tocEntry *te, T_Action act)
void ParallelBackupEnd (struct _archiveHandle *AH, ParallelState *pstate)
void checkAborting (struct _archiveHandle *AH)
void exit_horribly (const char *modulename, const char *fmt,...) __attribute__((format(PG_PRINTF_ATTRIBUTE

Variables

void noreturn

Define Documentation

#define NO_SLOT   (-1)

Typedef Documentation

typedef struct ParallelArgs ParallelArgs
typedef struct ParallelSlot ParallelSlot
typedef struct ParallelState ParallelState

Enumeration Type Documentation

Enumerator:
WRKR_TERMINATED 
WRKR_IDLE 
WRKR_WORKING 
WRKR_FINISHED 

Definition at line 27 of file parallel.h.

{
    WRKR_TERMINATED = 0,
    WRKR_IDLE,
    WRKR_WORKING,
    WRKR_FINISHED
}   T_WorkerStatus;


Function Documentation

void checkAborting ( struct _archiveHandle AH  ) 

Definition at line 336 of file parallel.c.

References exit_horribly(), modulename, and wantAbort.

Referenced by _WriteBuf(), _WriteData(), ReadDataFromArchiveNone(), and WriteDataToArchive().

{
#ifdef WIN32
    if (WaitForSingleObject(termEvent, 0) == WAIT_OBJECT_0)
#else
    if (wantAbort)
#endif
        exit_horribly(modulename, "worker is terminating\n");
}

void DispatchJobForTocEntry ( struct _archiveHandle AH,
ParallelState pstate,
struct _tocEntry te,
T_Action  act 
)

Definition at line 722 of file parallel.c.

References arg, ParallelSlot::args, Assert, GetIdleWorker(), _archiveHandle::MasterStartParallelItemPtr, NO_SLOT, ParallelState::parallelSlot, sendMessageToWorker(), ParallelArgs::te, and ParallelSlot::workerStatus.

Referenced by restore_toc_entries_parallel(), and WriteDataChunks().

{
    int         worker;
    char       *arg;

    /* our caller makes sure that at least one worker is idle */
    Assert(GetIdleWorker(pstate) != NO_SLOT);
    worker = GetIdleWorker(pstate);
    Assert(worker != NO_SLOT);

    arg = (AH->MasterStartParallelItemPtr) (AH, te, act);

    sendMessageToWorker(pstate, worker, arg);

    pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
    pstate->parallelSlot[worker].args->te = te;
}

void EnsureIdleWorker ( struct _archiveHandle AH,
ParallelState pstate 
)

Definition at line 1026 of file parallel.c.

References exit_horribly(), GetIdleWorker(), ListenToWorkers(), modulename, NO_SLOT, and ReapWorkerStatus().

Referenced by WriteDataChunks().

{
    int         ret_worker;
    int         work_status;

    for (;;)
    {
        int         nTerm = 0;

        while ((ret_worker = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
        {
            if (work_status != 0)
                exit_horribly(modulename, "Error processing a parallel work item.\n");

            nTerm++;
        }

        /*
         * We need to make sure that we have an idle worker before dispatching
         * the next item. If nTerm > 0 we already have that (quick check).
         */
        if (nTerm > 0)
            return;

        /* explicit check for an idle worker */
        if (GetIdleWorker(pstate) != NO_SLOT)
            return;

        /*
         * If we have no idle worker, read the result of one or more workers
         * and loop the loop to call ReapWorkerStatus() on them
         */
        ListenToWorkers(AH, pstate, true);
    }
}

void EnsureWorkersFinished ( struct _archiveHandle AH,
ParallelState pstate 
)

Definition at line 1068 of file parallel.c.

References exit_horribly(), IsEveryWorkerIdle(), ListenToWorkers(), modulename, NO_SLOT, ParallelState::numWorkers, and ReapWorkerStatus().

Referenced by WriteDataChunks().

{
    int         work_status;

    if (!pstate || pstate->numWorkers == 1)
        return;

    /* Waiting for the remaining worker processes to finish */
    while (!IsEveryWorkerIdle(pstate))
    {
        if (ReapWorkerStatus(pstate, &work_status) == NO_SLOT)
            ListenToWorkers(AH, pstate, true);
        else if (work_status != 0)
            exit_horribly(modulename,
                          "Error processing a parallel work item\n");
    }
}

void exit_horribly ( const char *  modulename,
const char *  fmt,
  ... 
)
int GetIdleWorker ( ParallelState pstate  ) 

Definition at line 745 of file parallel.c.

References i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::workerStatus, and WRKR_IDLE.

Referenced by DispatchJobForTocEntry(), EnsureIdleWorker(), restore_toc_entries_parallel(), and WriteDataChunks().

{
    int         i;

    for (i = 0; i < pstate->numWorkers; i++)
        if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
            return i;
    return NO_SLOT;
}

void init_parallel_dump_utils ( void   ) 

Definition at line 130 of file parallel.c.

References _, exit_nicely, NULL, and on_exit_nicely().

Referenced by main().

{
#ifdef WIN32
    if (!parallel_init_done)
    {
        WSADATA     wsaData;
        int         err;

        tls_index = TlsAlloc();
        mainThreadId = GetCurrentThreadId();
        err = WSAStartup(MAKEWORD(2, 2), &wsaData);
        if (err != 0)
        {
            fprintf(stderr, _("WSAStartup failed: %d\n"), err);
            exit_nicely(1);
        }
        on_exit_nicely(shutdown_parallel_dump_utils, NULL);
        parallel_init_done = true;
    }
#endif
}

bool IsEveryWorkerIdle ( ParallelState pstate  ) 

Definition at line 773 of file parallel.c.

References i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::workerStatus, and WRKR_IDLE.

Referenced by EnsureWorkersFinished(), ParallelBackupEnd(), and restore_toc_entries_parallel().

{
    int         i;

    for (i = 0; i < pstate->numWorkers; i++)
        if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
            return false;
    return true;
}

void ListenToWorkers ( struct _archiveHandle AH,
ParallelState pstate,
bool  do_wait 
)

Definition at line 943 of file parallel.c.

References ACT_DUMP, ACT_RESTORE, archCustom, archDirectory, ParallelSlot::args, Assert, exit_horribly(), _archiveHandle::format, free, getMessageFromWorker(), _archiveHandle::MasterEndParallelItemPtr, messageStartsWith, modulename, ParallelState::parallelSlot, ParallelSlot::status, ParallelArgs::te, and ParallelSlot::workerStatus.

Referenced by EnsureIdleWorker(), EnsureWorkersFinished(), and restore_toc_entries_parallel().

{
    int         worker;
    char       *msg;

    msg = getMessageFromWorker(pstate, do_wait, &worker);

    if (!msg)
    {
        if (do_wait)
            exit_horribly(modulename, "A worker process died unexpectedly\n");
        return;
    }

    if (messageStartsWith(msg, "OK "))
    {
        char       *statusString;
        TocEntry   *te;

        pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED;
        te = pstate->parallelSlot[worker].args->te;
        if (messageStartsWith(msg, "OK RESTORE "))
        {
            statusString = msg + strlen("OK RESTORE ");
            pstate->parallelSlot[worker].status =
                (AH->MasterEndParallelItemPtr)
                (AH, te, statusString, ACT_RESTORE);
        }
        else if (messageStartsWith(msg, "OK DUMP "))
        {
            statusString = msg + strlen("OK DUMP ");
            pstate->parallelSlot[worker].status =
                (AH->MasterEndParallelItemPtr)
                (AH, te, statusString, ACT_DUMP);
        }
        else
            exit_horribly(modulename,
                          "Invalid message received from worker: %s\n", msg);
    }
    else if (messageStartsWith(msg, "ERROR "))
    {
        Assert(AH->format == archDirectory || AH->format == archCustom);
        pstate->parallelSlot[worker].workerStatus = WRKR_TERMINATED;
        exit_horribly(modulename, "%s", msg + strlen("ERROR "));
    }
    else
        exit_horribly(modulename, "Invalid message received from worker: %s\n", msg);

    /* both Unix and Win32 return pg_malloc()ed space, so we free it */
    free(msg);
}

void ParallelBackupEnd ( struct _archiveHandle AH,
ParallelState pstate 
)

Definition at line 634 of file parallel.c.

References Assert, closesocket, free, i, IsEveryWorkerIdle(), ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pipeRead, ParallelSlot::pipeWrite, ShutdownInformation::pstate, and WaitForTerminatingWorkers().

Referenced by _CloseArchive(), and RestoreArchive().

{
    int         i;

    if (pstate->numWorkers == 1)
        return;

    Assert(IsEveryWorkerIdle(pstate));

    /* close the sockets so that the workers know they can exit */
    for (i = 0; i < pstate->numWorkers; i++)
    {
        closesocket(pstate->parallelSlot[i].pipeRead);
        closesocket(pstate->parallelSlot[i].pipeWrite);
    }
    WaitForTerminatingWorkers(pstate);

    /*
     * Remove the pstate again, so the exit handler in the parent will now
     * again fall back to closing AH->connection (if connected).
     */
    shutdown_info.pstate = NULL;

    free(pstate->parallelSlot);
    free(pstate);
}

ParallelState* ParallelBackupStart ( struct _archiveHandle AH,
RestoreOptions ropt 
)

Definition at line 484 of file parallel.c.

References ParallelArgs::AH, ParallelSlot::args, Assert, CloneArchive(), closesocket, exit_horribly(), getLocalPQExpBuffer, i, modulename, NULL, ParallelState::numWorkers, Archive::numWorkers, ParallelState::parallelSlot, pg_malloc(), pgpipe, ParallelSlot::pid, PIPE_READ, PIPE_WRITE, ParallelSlot::pipeRead, ParallelSlot::pipeRevRead, ParallelSlot::pipeRevWrite, ParallelSlot::pipeWrite, ShutdownInformation::pstate, _archiveHandle::public, SetupWorker(), SIGQUIT, sigTermHandler(), strerror(), ParallelArgs::te, and ParallelSlot::workerStatus.

Referenced by _CloseArchive(), and RestoreArchive().

{
    ParallelState *pstate;
    int         i;
    const size_t slotSize = AH->public.numWorkers * sizeof(ParallelSlot);

    Assert(AH->public.numWorkers > 0);

    /* Ensure stdio state is quiesced before forking */
    fflush(NULL);

    pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));

    pstate->numWorkers = AH->public.numWorkers;
    pstate->parallelSlot = NULL;

    if (AH->public.numWorkers == 1)
        return pstate;

    pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize);
    memset((void *) pstate->parallelSlot, 0, slotSize);

    /*
     * Set the pstate in the shutdown_info. The exit handler uses pstate if
     * set and falls back to AHX otherwise.
     */
    shutdown_info.pstate = pstate;
    getLocalPQExpBuffer = getThreadLocalPQExpBuffer;

#ifdef WIN32
    tMasterThreadId = GetCurrentThreadId();
    termEvent = CreateEvent(NULL, true, false, "Terminate");
#else
    signal(SIGTERM, sigTermHandler);
    signal(SIGINT, sigTermHandler);
    signal(SIGQUIT, sigTermHandler);
#endif

    for (i = 0; i < pstate->numWorkers; i++)
    {
#ifdef WIN32
        WorkerInfo *wi;
        uintptr_t   handle;
#else
        pid_t       pid;
#endif
        int         pipeMW[2],
                    pipeWM[2];

        if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
            exit_horribly(modulename,
                          "Cannot create communication channels: %s\n",
                          strerror(errno));

        pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
        pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
        pstate->parallelSlot[i].args->AH = NULL;
        pstate->parallelSlot[i].args->te = NULL;
#ifdef WIN32
        /* Allocate a new structure for every worker */
        wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));

        wi->ropt = ropt;
        wi->worker = i;
        wi->AH = AH;
        wi->pipeRead = pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ];
        wi->pipeWrite = pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];

        handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
                                wi, 0, &(pstate->parallelSlot[i].threadId));
        pstate->parallelSlot[i].hThread = handle;
#else
        pid = fork();
        if (pid == 0)
        {
            /* we are the worker */
            int         j;
            int         pipefd[2] = {pipeMW[PIPE_READ], pipeWM[PIPE_WRITE]};

            /*
             * Store the fds for the reverse communication in pstate. Actually
             * we only use this in case of an error and don't use pstate
             * otherwise in the worker process. On Windows we write to the
             * global pstate, in Unix we write to our process-local copy but
             * that's also where we'd retrieve this information back from.
             */
            pstate->parallelSlot[i].pipeRevRead = pipefd[PIPE_READ];
            pstate->parallelSlot[i].pipeRevWrite = pipefd[PIPE_WRITE];
            pstate->parallelSlot[i].pid = getpid();

            /*
             * Call CloneArchive on Unix as well even though technically we
             * don't need to because fork() gives us a copy in our own address
             * space already. But CloneArchive resets the state information
             * and also clones the database connection (for parallel dump)
             * which both seem kinda helpful.
             */
            pstate->parallelSlot[i].args->AH = CloneArchive(AH);

            /* close read end of Worker -> Master */
            closesocket(pipeWM[PIPE_READ]);
            /* close write end of Master -> Worker */
            closesocket(pipeMW[PIPE_WRITE]);

            /*
             * Close all inherited fds for communication of the master with
             * the other workers.
             */
            for (j = 0; j < i; j++)
            {
                closesocket(pstate->parallelSlot[j].pipeRead);
                closesocket(pstate->parallelSlot[j].pipeWrite);
            }

            SetupWorker(pstate->parallelSlot[i].args->AH, pipefd, i, ropt);

            exit(0);
        }
        else if (pid < 0)
            /* fork failed */
            exit_horribly(modulename,
                          "could not create worker process: %s\n",
                          strerror(errno));

        /* we are the Master, pid > 0 here */
        Assert(pid > 0);

        /* close read end of Master -> Worker */
        closesocket(pipeMW[PIPE_READ]);
        /* close write end of Worker -> Master */
        closesocket(pipeWM[PIPE_WRITE]);

        pstate->parallelSlot[i].pid = pid;
#endif

        pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
        pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE];
    }

    return pstate;
}

int ReapWorkerStatus ( ParallelState pstate,
int *  status 
)

Definition at line 1003 of file parallel.c.

References i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::status, ParallelSlot::workerStatus, and WRKR_FINISHED.

Referenced by EnsureIdleWorker(), EnsureWorkersFinished(), and restore_toc_entries_parallel().

{
    int         i;

    for (i = 0; i < pstate->numWorkers; i++)
    {
        if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED)
        {
            *status = pstate->parallelSlot[i].status;
            pstate->parallelSlot[i].status = 0;
            pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
            return i;
        }
    }
    return NO_SLOT;
}


Variable Documentation

void noreturn

Definition at line 93 of file parallel.h.