Header And Logo

PostgreSQL
| The world's most advanced open source database.

Functions | Variables

unix_latch.c File Reference

#include "postgres.h"
#include <fcntl.h>
#include <limits.h>
#include <signal.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/types.h>
#include "miscadmin.h"
#include "portability/instr_time.h"
#include "postmaster/postmaster.h"
#include "storage/latch.h"
#include "storage/pmsignal.h"
#include "storage/shmem.h"
Include dependency graph for unix_latch.c:

Go to the source code of this file.

Functions

static void sendSelfPipeByte (void)
static void drainSelfPipe (void)
void InitializeLatchSupport (void)
void InitLatch (volatile Latch *latch)
void InitSharedLatch (volatile Latch *latch)
void OwnLatch (volatile Latch *latch)
void DisownLatch (volatile Latch *latch)
int WaitLatch (volatile Latch *latch, int wakeEvents, long timeout)
int WaitLatchOrSocket (volatile Latch *latch, int wakeEvents, pgsocket sock, long timeout)
void SetLatch (volatile Latch *latch)
void ResetLatch (volatile Latch *latch)
void latch_sigusr1_handler (void)

Variables

static volatile sig_atomic_t waiting = false
static int selfpipe_readfd = -1
static int selfpipe_writefd = -1

Function Documentation

void DisownLatch ( volatile Latch latch  ) 

Definition at line 166 of file unix_latch.c.

Referenced by AuxiliaryProcKill(), ProcKill(), StartupXLOG(), WalRcvDie(), and WalSndKill().

{
    Assert(latch->is_shared);
    Assert(latch->owner_pid == MyProcPid);

    latch->owner_pid = 0;
}

static void drainSelfPipe ( void   )  [static]

Definition at line 626 of file unix_latch.c.

References buf, EAGAIN, EINTR, elog, ERROR, EWOULDBLOCK, read, selfpipe_readfd, and waiting.

Referenced by WaitLatchOrSocket().

{
    /*
     * There shouldn't normally be more than one byte in the pipe, or maybe a
     * few bytes if multiple processes run SetLatch at the same instant.
     */
    char        buf[16];
    int         rc;

    for (;;)
    {
        rc = read(selfpipe_readfd, buf, sizeof(buf));
        if (rc < 0)
        {
            if (errno == EAGAIN || errno == EWOULDBLOCK)
                break;          /* the pipe is empty */
            else if (errno == EINTR)
                continue;       /* retry */
            else
            {
                waiting = false;
                elog(ERROR, "read() on self-pipe failed: %m");
            }
        }
        else if (rc == 0)
        {
            waiting = false;
            elog(ERROR, "unexpected EOF on self-pipe");
        }
        else if (rc < sizeof(buf))
        {
            /* we successfully drained the pipe; no need to read() again */
            break;
        }
        /* else buffer wasn't big enough, so read again */
    }
}

void InitializeLatchSupport ( void   ) 

Definition at line 77 of file unix_latch.c.

Referenced by InitAuxiliaryProcess(), InitProcess(), PgArchiverMain(), PgstatCollectorMain(), and SysLoggerMain().

{
    int         pipefd[2];

    Assert(selfpipe_readfd == -1);

    /*
     * Set up the self-pipe that allows a signal handler to wake up the
     * select() in WaitLatch. Make the write-end non-blocking, so that
     * SetLatch won't block if the event has already been set many times
     * filling the kernel buffer. Make the read-end non-blocking too, so that
     * we can easily clear the pipe by reading until EAGAIN or EWOULDBLOCK.
     */
    if (pipe(pipefd) < 0)
        elog(FATAL, "pipe() failed: %m");
    if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) < 0)
        elog(FATAL, "fcntl() failed on read-end of self-pipe: %m");
    if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) < 0)
        elog(FATAL, "fcntl() failed on write-end of self-pipe: %m");

    selfpipe_readfd = pipefd[0];
    selfpipe_writefd = pipefd[1];
}

void InitLatch ( volatile Latch latch  ) 

Definition at line 105 of file unix_latch.c.

Referenced by PgArchiverMain(), PgstatCollectorMain(), and SysLoggerMain().

{
    /* Assert InitializeLatchSupport has been called in this process */
    Assert(selfpipe_readfd >= 0);

    latch->is_set = false;
    latch->owner_pid = MyProcPid;
    latch->is_shared = false;
}

void InitSharedLatch ( volatile Latch latch  ) 

Definition at line 127 of file unix_latch.c.

Referenced by InitProcGlobal(), WalRcvShmemInit(), WalSndShmemInit(), and XLOGShmemInit().

{
    latch->is_set = false;
    latch->owner_pid = 0;
    latch->is_shared = true;
}

void latch_sigusr1_handler ( void   ) 
void OwnLatch ( volatile Latch latch  ) 

Definition at line 148 of file unix_latch.c.

Referenced by InitAuxiliaryProcess(), InitProcess(), InitWalSenderSlot(), StartupXLOG(), and WalReceiverMain().

{
    /* Assert InitializeLatchSupport has been called in this process */
    Assert(selfpipe_readfd >= 0);

    Assert(latch->is_shared);

    /* sanity check */
    if (latch->owner_pid != 0)
        elog(ERROR, "latch already owned");

    latch->owner_pid = MyProcPid;
}

void ResetLatch ( volatile Latch latch  ) 

Definition at line 552 of file unix_latch.c.

Referenced by AutoVacLauncherMain(), BackgroundWriterMain(), CheckpointerMain(), pgarch_MainLoop(), PgstatCollectorMain(), SyncRepWaitForLSN(), SysLoggerMain(), WaitForWALToBecomeAvailable(), WalRcvWaitForStartPosition(), WalSndLoop(), WalWriterMain(), and worker_spi_main().

{
    /* Only the owner should reset the latch */
    Assert(latch->owner_pid == MyProcPid);

    latch->is_set = false;

    /*
     * XXX there really ought to be a memory barrier operation right here, to
     * ensure that the write to is_set gets flushed to main memory before we
     * examine any flag variables.  Otherwise a concurrent SetLatch might
     * falsely conclude that it needn't signal us, even though we have missed
     * seeing some flag updates that SetLatch was supposed to inform us of.
     * For the moment, callers must supply their own synchronization of flag
     * variables (see latch.h).
     */
}

static void sendSelfPipeByte ( void   )  [static]

Definition at line 589 of file unix_latch.c.

References EAGAIN, EINTR, EWOULDBLOCK, selfpipe_writefd, and write.

Referenced by latch_sigusr1_handler(), and SetLatch().

{
    int         rc;
    char        dummy = 0;

retry:
    rc = write(selfpipe_writefd, &dummy, 1);
    if (rc < 0)
    {
        /* If interrupted by signal, just retry */
        if (errno == EINTR)
            goto retry;

        /*
         * If the pipe is full, we don't need to retry, the data that's there
         * already is enough to wake up WaitLatch.
         */
        if (errno == EAGAIN || errno == EWOULDBLOCK)
            return;

        /*
         * Oops, the write() failed for some other reason. We might be in a
         * signal handler, so it's not safe to elog(). We have no choice but
         * silently ignore the error.
         */
        return;
    }
}

void SetLatch ( volatile Latch latch  ) 

Definition at line 496 of file unix_latch.c.

Referenced by ArchSigHupHandler(), ArchSigTermHandler(), avl_sighup_handler(), avl_sigterm_handler(), avl_sigusr2_handler(), BgSigHupHandler(), ChkptSigHupHandler(), die(), ForwardFsyncRequest(), handle_sig_alarm(), pgarch_waken(), pgarch_waken_stop(), pgstat_exit(), pgstat_sighup_handler(), ReqCheckpointHandler(), ReqShutdownHandler(), RequestXLogStreaming(), SigHupHandler(), sigHupHandler(), sigUsr1Handler(), StatementCancelHandler(), StrategyGetBuffer(), SyncRepWakeQueue(), WakeupRecovery(), WalRcvShutdownHandler(), WalShutdownHandler(), WalSigHupHandler(), WalSndLastCycleHandler(), WalSndSigHupHandler(), WalSndWakeup(), worker_spi_sighup(), worker_spi_sigterm(), and XLogSetAsyncXactLSN().

{
    pid_t       owner_pid;

    /*
     * XXX there really ought to be a memory barrier operation right here, to
     * ensure that any flag variables we might have changed get flushed to
     * main memory before we check/set is_set.  Without that, we have to
     * require that callers provide their own synchronization for machines
     * with weak memory ordering (see latch.h).
     */

    /* Quick exit if already set */
    if (latch->is_set)
        return;

    latch->is_set = true;

    /*
     * See if anyone's waiting for the latch. It can be the current process if
     * we're in a signal handler. We use the self-pipe to wake up the select()
     * in that case. If it's another process, send a signal.
     *
     * Fetch owner_pid only once, in case the latch is concurrently getting
     * owned or disowned. XXX: This assumes that pid_t is atomic, which isn't
     * guaranteed to be true! In practice, the effective range of pid_t fits
     * in a 32 bit integer, and so should be atomic. In the worst case, we
     * might end up signaling the wrong process. Even then, you're very
     * unlucky if a process with that bogus pid exists and belongs to
     * Postgres; and PG database processes should handle excess SIGUSR1
     * interrupts without a problem anyhow.
     *
     * Another sort of race condition that's possible here is for a new
     * process to own the latch immediately after we look, so we don't signal
     * it. This is okay so long as all callers of ResetLatch/WaitLatch follow
     * the standard coding convention of waiting at the bottom of their loops,
     * not the top, so that they'll correctly process latch-setting events
     * that happen before they enter the loop.
     */
    owner_pid = latch->owner_pid;
    if (owner_pid == 0)
        return;
    else if (owner_pid == MyProcPid)
    {
        if (waiting)
            sendSelfPipeByte();
    }
    else
        kill(owner_pid, SIGUSR1);
}

int WaitLatch ( volatile Latch latch,
int  wakeEvents,
long  timeout 
)
int WaitLatchOrSocket ( volatile Latch latch,
int  wakeEvents,
pgsocket  sock,
long  timeout 
)

Definition at line 208 of file unix_latch.c.

Referenced by PgstatCollectorMain(), SysLoggerMain(), WaitLatch(), and WalSndLoop().

{
    int         result = 0;
    int         rc;
    instr_time  start_time,
                cur_time;
    long        cur_timeout;

#ifdef HAVE_POLL
    struct pollfd pfds[3];
    int         nfds;
#else
    struct timeval tv,
               *tvp;
    fd_set      input_mask;
    fd_set      output_mask;
    int         hifd;
#endif

    /* Ignore WL_SOCKET_* events if no valid socket is given */
    if (sock == PGINVALID_SOCKET)
        wakeEvents &= ~(WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE);

    Assert(wakeEvents != 0);    /* must have at least one wake event */
    /* Cannot specify WL_SOCKET_WRITEABLE without WL_SOCKET_READABLE */
    Assert((wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) != WL_SOCKET_WRITEABLE);

    if ((wakeEvents & WL_LATCH_SET) && latch->owner_pid != MyProcPid)
        elog(ERROR, "cannot wait on a latch owned by another process");

    /*
     * Initialize timeout if requested.  We must record the current time so
     * that we can determine the remaining timeout if the poll() or select()
     * is interrupted.  (On some platforms, select() will update the contents
     * of "tv" for us, but unfortunately we can't rely on that.)
     */
    if (wakeEvents & WL_TIMEOUT)
    {
        INSTR_TIME_SET_CURRENT(start_time);
        Assert(timeout >= 0 && timeout <= INT_MAX);
        cur_timeout = timeout;

#ifndef HAVE_POLL
        tv.tv_sec = cur_timeout / 1000L;
        tv.tv_usec = (cur_timeout % 1000L) * 1000L;
        tvp = &tv;
#endif
    }
    else
    {
        cur_timeout = -1;

#ifndef HAVE_POLL
        tvp = NULL;
#endif
    }

    waiting = true;
    do
    {
        /*
         * Clear the pipe, then check if the latch is set already. If someone
         * sets the latch between this and the poll()/select() below, the
         * setter will write a byte to the pipe (or signal us and the signal
         * handler will do that), and the poll()/select() will return
         * immediately.
         *
         * Note: we assume that the kernel calls involved in drainSelfPipe()
         * and SetLatch() will provide adequate synchronization on machines
         * with weak memory ordering, so that we cannot miss seeing is_set if
         * the signal byte is already in the pipe when we drain it.
         */
        drainSelfPipe();

        if ((wakeEvents & WL_LATCH_SET) && latch->is_set)
        {
            result |= WL_LATCH_SET;

            /*
             * Leave loop immediately, avoid blocking again. We don't attempt
             * to report any other events that might also be satisfied.
             */
            break;
        }

        /* Must wait ... we use poll(2) if available, otherwise select(2) */
#ifdef HAVE_POLL
        nfds = 0;
        if (wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
        {
            /* socket, if used, is always in pfds[0] */
            pfds[0].fd = sock;
            pfds[0].events = 0;
            if (wakeEvents & WL_SOCKET_READABLE)
                pfds[0].events |= POLLIN;
            if (wakeEvents & WL_SOCKET_WRITEABLE)
                pfds[0].events |= POLLOUT;
            pfds[0].revents = 0;
            nfds++;
        }

        pfds[nfds].fd = selfpipe_readfd;
        pfds[nfds].events = POLLIN;
        pfds[nfds].revents = 0;
        nfds++;

        if (wakeEvents & WL_POSTMASTER_DEATH)
        {
            /* postmaster fd, if used, is always in pfds[nfds - 1] */
            pfds[nfds].fd = postmaster_alive_fds[POSTMASTER_FD_WATCH];
            pfds[nfds].events = POLLIN;
            pfds[nfds].revents = 0;
            nfds++;
        }

        /* Sleep */
        rc = poll(pfds, nfds, (int) cur_timeout);

        /* Check return code */
        if (rc < 0)
        {
            /* EINTR is okay, otherwise complain */
            if (errno != EINTR)
            {
                waiting = false;
                ereport(ERROR,
                        (errcode_for_socket_access(),
                         errmsg("poll() failed: %m")));
            }
        }
        else if (rc == 0)
        {
            /* timeout exceeded */
            if (wakeEvents & WL_TIMEOUT)
                result |= WL_TIMEOUT;
        }
        else
        {
            /* at least one event occurred, so check revents values */
            if ((wakeEvents & WL_SOCKET_READABLE) &&
                (pfds[0].revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL)))
            {
                /* data available in socket, or EOF/error condition */
                result |= WL_SOCKET_READABLE;
            }
            if ((wakeEvents & WL_SOCKET_WRITEABLE) &&
                (pfds[0].revents & POLLOUT))
            {
                result |= WL_SOCKET_WRITEABLE;
            }

            /*
             * We expect a POLLHUP when the remote end is closed, but because
             * we don't expect the pipe to become readable or to have any
             * errors either, treat those cases as postmaster death, too.
             */
            if ((wakeEvents & WL_POSTMASTER_DEATH) &&
                (pfds[nfds - 1].revents & (POLLHUP | POLLIN | POLLERR | POLLNVAL)))
            {
                /*
                 * According to the select(2) man page on Linux, select(2) may
                 * spuriously return and report a file descriptor as readable,
                 * when it's not; and presumably so can poll(2).  It's not
                 * clear that the relevant cases would ever apply to the
                 * postmaster pipe, but since the consequences of falsely
                 * returning WL_POSTMASTER_DEATH could be pretty unpleasant,
                 * we take the trouble to positively verify EOF with
                 * PostmasterIsAlive().
                 */
                if (!PostmasterIsAlive())
                    result |= WL_POSTMASTER_DEATH;
            }
        }
#else                           /* !HAVE_POLL */

        FD_ZERO(&input_mask);
        FD_ZERO(&output_mask);

        FD_SET(selfpipe_readfd, &input_mask);
        hifd = selfpipe_readfd;

        if (wakeEvents & WL_POSTMASTER_DEATH)
        {
            FD_SET(postmaster_alive_fds[POSTMASTER_FD_WATCH], &input_mask);
            if (postmaster_alive_fds[POSTMASTER_FD_WATCH] > hifd)
                hifd = postmaster_alive_fds[POSTMASTER_FD_WATCH];
        }

        if (wakeEvents & WL_SOCKET_READABLE)
        {
            FD_SET(sock, &input_mask);
            if (sock > hifd)
                hifd = sock;
        }

        if (wakeEvents & WL_SOCKET_WRITEABLE)
        {
            FD_SET(sock, &output_mask);
            if (sock > hifd)
                hifd = sock;
        }

        /* Sleep */
        rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp);

        /* Check return code */
        if (rc < 0)
        {
            /* EINTR is okay, otherwise complain */
            if (errno != EINTR)
            {
                waiting = false;
                ereport(ERROR,
                        (errcode_for_socket_access(),
                         errmsg("select() failed: %m")));
            }
        }
        else if (rc == 0)
        {
            /* timeout exceeded */
            if (wakeEvents & WL_TIMEOUT)
                result |= WL_TIMEOUT;
        }
        else
        {
            /* at least one event occurred, so check masks */
            if ((wakeEvents & WL_SOCKET_READABLE) && FD_ISSET(sock, &input_mask))
            {
                /* data available in socket, or EOF */
                result |= WL_SOCKET_READABLE;
            }
            if ((wakeEvents & WL_SOCKET_WRITEABLE) && FD_ISSET(sock, &output_mask))
            {
                result |= WL_SOCKET_WRITEABLE;
            }
            if ((wakeEvents & WL_POSTMASTER_DEATH) &&
            FD_ISSET(postmaster_alive_fds[POSTMASTER_FD_WATCH], &input_mask))
            {
                /*
                 * According to the select(2) man page on Linux, select(2) may
                 * spuriously return and report a file descriptor as readable,
                 * when it's not; and presumably so can poll(2).  It's not
                 * clear that the relevant cases would ever apply to the
                 * postmaster pipe, but since the consequences of falsely
                 * returning WL_POSTMASTER_DEATH could be pretty unpleasant,
                 * we take the trouble to positively verify EOF with
                 * PostmasterIsAlive().
                 */
                if (!PostmasterIsAlive())
                    result |= WL_POSTMASTER_DEATH;
            }
        }
#endif   /* HAVE_POLL */

        /* If we're not done, update cur_timeout for next iteration */
        if (result == 0 && cur_timeout >= 0)
        {
            INSTR_TIME_SET_CURRENT(cur_time);
            INSTR_TIME_SUBTRACT(cur_time, start_time);
            cur_timeout = timeout - (long) INSTR_TIME_GET_MILLISEC(cur_time);
            if (cur_timeout < 0)
                cur_timeout = 0;

#ifndef HAVE_POLL
            tv.tv_sec = cur_timeout / 1000L;
            tv.tv_usec = (cur_timeout % 1000L) * 1000L;
#endif
        }
    } while (result == 0);
    waiting = false;

    return result;
}


Variable Documentation

int selfpipe_readfd = -1 [static]
int selfpipe_writefd = -1 [static]

Definition at line 63 of file unix_latch.c.

Referenced by InitializeLatchSupport(), and sendSelfPipeByte().

volatile sig_atomic_t waiting = false [static]