Header And Logo

PostgreSQL
| The world's most advanced open source database.

Defines | Functions | Variables

async.h File Reference

#include "fmgr.h"
Include dependency graph for async.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Defines

#define NUM_ASYNC_BUFFERS   8

Functions

Size AsyncShmemSize (void)
void AsyncShmemInit (void)
void Async_Notify (const char *channel, const char *payload)
void Async_Listen (const char *channel)
void Async_Unlisten (const char *channel)
void Async_UnlistenAll (void)
Datum pg_listening_channels (PG_FUNCTION_ARGS)
Datum pg_notify (PG_FUNCTION_ARGS)
void PreCommit_Notify (void)
void AtCommit_Notify (void)
void AtAbort_Notify (void)
void AtSubStart_Notify (void)
void AtSubCommit_Notify (void)
void AtSubAbort_Notify (void)
void AtPrepare_Notify (void)
void ProcessCompletedNotifies (void)
void HandleNotifyInterrupt (void)
void EnableNotifyInterrupt (void)
bool DisableNotifyInterrupt (void)

Variables

bool Trace_notify

Define Documentation

#define NUM_ASYNC_BUFFERS   8

Definition at line 21 of file async.h.

Referenced by AsyncShmemInit(), and AsyncShmemSize().


Function Documentation

void Async_Listen ( const char *  channel  ) 

Definition at line 625 of file async.c.

References DEBUG1, elog, LISTEN_LISTEN, MyProcPid, queue_listen(), and Trace_notify.

Referenced by standard_ProcessUtility().

{
    if (Trace_notify)
        elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);

    queue_listen(LISTEN_LISTEN, channel);
}

void Async_Notify ( const char *  channel,
const char *  payload 
)

Definition at line 534 of file async.c.

References AsyncExistsPendingNotify(), Notification::channel, CurTransactionContext, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, lappend(), MemoryContextSwitchTo(), NAMEDATALEN, NOTIFY_PAYLOAD_MAX_LENGTH, palloc(), Notification::payload, pstrdup(), and Trace_notify.

Referenced by pg_notify(), standard_ProcessUtility(), and triggered_change_notification().

{
    Notification *n;
    MemoryContext oldcontext;

    if (Trace_notify)
        elog(DEBUG1, "Async_Notify(%s)", channel);

    /* a channel name must be specified */
    if (!channel || !strlen(channel))
        ereport(ERROR,
                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                 errmsg("channel name cannot be empty")));

    if (strlen(channel) >= NAMEDATALEN)
        ereport(ERROR,
                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                 errmsg("channel name too long")));

    if (payload)
    {
        if (strlen(payload) >= NOTIFY_PAYLOAD_MAX_LENGTH)
            ereport(ERROR,
                    (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                     errmsg("payload string too long")));
    }

    /* no point in making duplicate entries in the list ... */
    if (AsyncExistsPendingNotify(channel, payload))
        return;

    /*
     * The notification list needs to live until end of transaction, so store
     * it in the transaction context.
     */
    oldcontext = MemoryContextSwitchTo(CurTransactionContext);

    n = (Notification *) palloc(sizeof(Notification));
    n->channel = pstrdup(channel);
    if (payload)
        n->payload = pstrdup(payload);
    else
        n->payload = "";

    /*
     * We want to preserve the order so we need to append every notification.
     * See comments at AsyncExistsPendingNotify().
     */
    pendingNotifies = lappend(pendingNotifies, n);

    MemoryContextSwitchTo(oldcontext);
}

void Async_Unlisten ( const char *  channel  ) 

Definition at line 639 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN, MyProcPid, NIL, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by standard_ProcessUtility().

{
    if (Trace_notify)
        elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);

    /* If we couldn't possibly be listening, no need to queue anything */
    if (pendingActions == NIL && !unlistenExitRegistered)
        return;

    queue_listen(LISTEN_UNLISTEN, channel);
}

void Async_UnlistenAll ( void   ) 

Definition at line 657 of file async.c.

References DEBUG1, elog, LISTEN_UNLISTEN_ALL, MyProcPid, NIL, queue_listen(), Trace_notify, and unlistenExitRegistered.

Referenced by DiscardAll(), and standard_ProcessUtility().

{
    if (Trace_notify)
        elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);

    /* If we couldn't possibly be listening, no need to queue anything */
    if (pendingActions == NIL && !unlistenExitRegistered)
        return;

    queue_listen(LISTEN_UNLISTEN_ALL, "");
}

void AsyncShmemInit ( void   ) 

Definition at line 434 of file async.c.

References add_size(), AsyncCtl, AsyncCtlLock, i, AsyncQueueControl::lastQueueFillWarn, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MaxBackends, mul_size(), NULL, NUM_ASYNC_BUFFERS, QUEUE_BACKEND_PID, QUEUE_BACKEND_POS, QUEUE_HEAD, QUEUE_POS_PAGE, QUEUE_TAIL, SET_QUEUE_POS, ShmemInitStruct(), SimpleLruInit(), SimpleLruWritePage(), SimpleLruZeroPage(), SlruScanDirCbDeleteAll(), and SlruScanDirectory().

Referenced by CreateSharedMemoryAndSemaphores().

{
    bool        found;
    int         slotno;
    Size        size;

    /*
     * Create or attach to the AsyncQueueControl structure.
     *
     * The used entries in the backend[] array run from 1 to MaxBackends.
     * sizeof(AsyncQueueControl) already includes space for the unused zero'th
     * entry, but we need to add on space for the used entries.
     */
    size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
    size = add_size(size, sizeof(AsyncQueueControl));

    asyncQueueControl = (AsyncQueueControl *)
        ShmemInitStruct("Async Queue Control", size, &found);

    if (!found)
    {
        /* First time through, so initialize it */
        int         i;

        SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
        SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
        asyncQueueControl->lastQueueFillWarn = 0;
        /* zero'th entry won't be used, but let's initialize it anyway */
        for (i = 0; i <= MaxBackends; i++)
        {
            QUEUE_BACKEND_PID(i) = InvalidPid;
            SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
        }
    }

    /*
     * Set up SLRU management of the pg_notify data.
     */
    AsyncCtl->PagePrecedes = asyncQueuePagePrecedes;
    SimpleLruInit(AsyncCtl, "Async Ctl", NUM_ASYNC_BUFFERS, 0,
                  AsyncCtlLock, "pg_notify");
    /* Override default assumption that writes should be fsync'd */
    AsyncCtl->do_fsync = false;

    if (!found)
    {
        /*
         * During start or reboot, clean out the pg_notify directory.
         */
        (void) SlruScanDirectory(AsyncCtl, SlruScanDirCbDeleteAll, NULL);

        /* Now initialize page zero to empty */
        LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
        slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD));
        /* This write is just to verify that pg_notify/ is writable */
        SimpleLruWritePage(AsyncCtl, slotno);
        LWLockRelease(AsyncCtlLock);
    }
}

Size AsyncShmemSize ( void   ) 

Definition at line 417 of file async.c.

References add_size(), MaxBackends, mul_size(), NUM_ASYNC_BUFFERS, and SimpleLruShmemSize().

Referenced by CreateSharedMemoryAndSemaphores().

{
    Size        size;

    /* This had better match AsyncShmemInit */
    size = mul_size(MaxBackends, sizeof(QueueBackendStatus));
    size = add_size(size, sizeof(AsyncQueueControl));

    size = add_size(size, SimpleLruShmemSize(NUM_ASYNC_BUFFERS, 0));

    return size;
}

void AtAbort_Notify ( void   ) 

Definition at line 1517 of file async.c.

References amRegisteredListener, asyncQueueUnregister(), ClearPendingActionsAndNotifies(), and NIL.

Referenced by AbortTransaction().

{
    /*
     * If we LISTEN but then roll back the transaction after PreCommit_Notify,
     * we have registered as a listener but have not made any entry in
     * listenChannels.  In that case, deregister again.
     */
    if (amRegisteredListener && listenChannels == NIL)
        asyncQueueUnregister();

    /* And clean up */
    ClearPendingActionsAndNotifies();
}

void AtCommit_Notify ( void   ) 

Definition at line 861 of file async.c.

References ListenAction::action, amRegisteredListener, asyncQueueUnregister(), ListenAction::channel, ClearPendingActionsAndNotifies(), DEBUG1, elog, Exec_ListenCommit(), Exec_UnlistenAllCommit(), Exec_UnlistenCommit(), lfirst, LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, NIL, and Trace_notify.

Referenced by CommitTransaction().

{
    ListCell   *p;

    /*
     * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
     * return as soon as possible
     */
    if (!pendingActions && !pendingNotifies)
        return;

    if (Trace_notify)
        elog(DEBUG1, "AtCommit_Notify");

    /* Perform any pending listen/unlisten actions */
    foreach(p, pendingActions)
    {
        ListenAction *actrec = (ListenAction *) lfirst(p);

        switch (actrec->action)
        {
            case LISTEN_LISTEN:
                Exec_ListenCommit(actrec->channel);
                break;
            case LISTEN_UNLISTEN:
                Exec_UnlistenCommit(actrec->channel);
                break;
            case LISTEN_UNLISTEN_ALL:
                Exec_UnlistenAllCommit();
                break;
        }
    }

    /* If no longer listening to anything, get out of listener array */
    if (amRegisteredListener && listenChannels == NIL)
        asyncQueueUnregister();

    /* And clean up */
    ClearPendingActionsAndNotifies();
}

void AtPrepare_Notify ( void   ) 

Definition at line 737 of file async.c.

References ereport, errcode(), errmsg(), and ERROR.

Referenced by PrepareTransaction().

{
    /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */
    if (pendingActions || pendingNotifies)
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY")));
}

void AtSubAbort_Notify ( void   ) 

Definition at line 1599 of file async.c.

References GetCurrentTransactionNestLevel(), linitial, list_delete_first(), and list_length().

Referenced by AbortSubTransaction().

{
    int         my_level = GetCurrentTransactionNestLevel();

    /*
     * All we have to do is pop the stack --- the actions/notifies made in
     * this subxact are no longer interesting, and the space will be freed
     * when CurTransactionContext is recycled.
     *
     * This routine could be called more than once at a given nesting level if
     * there is trouble during subxact abort.  Avoid dumping core by using
     * GetCurrentTransactionNestLevel as the indicator of how far we need to
     * prune the list.
     */
    while (list_length(upperPendingActions) > my_level - 2)
    {
        pendingActions = (List *) linitial(upperPendingActions);
        upperPendingActions = list_delete_first(upperPendingActions);
    }

    while (list_length(upperPendingNotifies) > my_level - 2)
    {
        pendingNotifies = (List *) linitial(upperPendingNotifies);
        upperPendingNotifies = list_delete_first(upperPendingNotifies);
    }
}

void AtSubCommit_Notify ( void   ) 

Definition at line 1567 of file async.c.

References Assert, GetCurrentTransactionNestLevel(), linitial, list_concat(), list_delete_first(), and list_length().

Referenced by CommitSubTransaction().

{
    List       *parentPendingActions;
    List       *parentPendingNotifies;

    parentPendingActions = (List *) linitial(upperPendingActions);
    upperPendingActions = list_delete_first(upperPendingActions);

    Assert(list_length(upperPendingActions) ==
           GetCurrentTransactionNestLevel() - 2);

    /*
     * Mustn't try to eliminate duplicates here --- see queue_listen()
     */
    pendingActions = list_concat(parentPendingActions, pendingActions);

    parentPendingNotifies = (List *) linitial(upperPendingNotifies);
    upperPendingNotifies = list_delete_first(upperPendingNotifies);

    Assert(list_length(upperPendingNotifies) ==
           GetCurrentTransactionNestLevel() - 2);

    /*
     * We could try to eliminate duplicates here, but it seems not worthwhile.
     */
    pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
}

void AtSubStart_Notify ( void   ) 
bool DisableNotifyInterrupt ( void   ) 
void EnableNotifyInterrupt ( void   ) 

Definition at line 1716 of file async.c.

References DEBUG1, elog, IsTransactionOrTransactionBlock(), notifyInterruptEnabled, notifyInterruptOccurred, ProcessIncomingNotify(), and Trace_notify.

Referenced by prepare_for_client_read(), and ProcessCatchupEvent().

{
    if (IsTransactionOrTransactionBlock())
        return;                 /* not really idle */

    /*
     * This code is tricky because we are communicating with a signal handler
     * that could interrupt us at any point.  If we just checked
     * notifyInterruptOccurred and then set notifyInterruptEnabled, we could
     * fail to respond promptly to a signal that happens in between those two
     * steps.  (A very small time window, perhaps, but Murphy's Law says you
     * can hit it...)  Instead, we first set the enable flag, then test the
     * occurred flag.  If we see an unserviced interrupt has occurred, we
     * re-clear the enable flag before going off to do the service work. (That
     * prevents re-entrant invocation of ProcessIncomingNotify() if another
     * interrupt occurs.) If an interrupt comes in between the setting and
     * clearing of notifyInterruptEnabled, then it will have done the service
     * work and left notifyInterruptOccurred zero, so we have to check again
     * after clearing enable.  The whole thing has to be in a loop in case
     * another interrupt occurs while we're servicing the first. Once we get
     * out of the loop, enable is set and we know there is no unserviced
     * interrupt.
     *
     * NB: an overenthusiastic optimizing compiler could easily break this
     * code. Hopefully, they all understand what "volatile" means these days.
     */
    for (;;)
    {
        notifyInterruptEnabled = 1;
        if (!notifyInterruptOccurred)
            break;
        notifyInterruptEnabled = 0;
        if (notifyInterruptOccurred)
        {
            if (Trace_notify)
                elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");

            ProcessIncomingNotify();

            if (Trace_notify)
                elog(DEBUG1, "EnableNotifyInterrupt: done");
        }
    }
}

void HandleNotifyInterrupt ( void   ) 

Definition at line 1636 of file async.c.

References CHECK_FOR_INTERRUPTS, DEBUG1, elog, ImmediateInterruptOK, notifyInterruptEnabled, notifyInterruptOccurred, proc_exit_inprogress, ProcessIncomingNotify(), and Trace_notify.

Referenced by procsignal_sigusr1_handler().

{
    /*
     * Note: this is called by a SIGNAL HANDLER. You must be very wary what
     * you do here. Some helpful soul had this routine sprinkled with
     * TPRINTFs, which would likely lead to corruption of stdio buffers if
     * they were ever turned on.
     */

    /* Don't joggle the elbow of proc_exit */
    if (proc_exit_inprogress)
        return;

    if (notifyInterruptEnabled)
    {
        bool        save_ImmediateInterruptOK = ImmediateInterruptOK;

        /*
         * We may be called while ImmediateInterruptOK is true; turn it off
         * while messing with the NOTIFY state.  (We would have to save and
         * restore it anyway, because PGSemaphore operations inside
         * ProcessIncomingNotify() might reset it.)
         */
        ImmediateInterruptOK = false;

        /*
         * I'm not sure whether some flavors of Unix might allow another
         * SIGUSR1 occurrence to recursively interrupt this routine. To cope
         * with the possibility, we do the same sort of dance that
         * EnableNotifyInterrupt must do --- see that routine for comments.
         */
        notifyInterruptEnabled = 0;     /* disable any recursive signal */
        notifyInterruptOccurred = 1;    /* do at least one iteration */
        for (;;)
        {
            notifyInterruptEnabled = 1;
            if (!notifyInterruptOccurred)
                break;
            notifyInterruptEnabled = 0;
            if (notifyInterruptOccurred)
            {
                /* Here, it is finally safe to do stuff. */
                if (Trace_notify)
                    elog(DEBUG1, "HandleNotifyInterrupt: perform async notify");

                ProcessIncomingNotify();

                if (Trace_notify)
                    elog(DEBUG1, "HandleNotifyInterrupt: done");
            }
        }

        /*
         * Restore ImmediateInterruptOK, and check for interrupts if needed.
         */
        ImmediateInterruptOK = save_ImmediateInterruptOK;
        if (save_ImmediateInterruptOK)
            CHECK_FOR_INTERRUPTS();
    }
    else
    {
        /*
         * In this path it is NOT SAFE to do much of anything, except this:
         */
        notifyInterruptOccurred = 1;
    }
}

Datum pg_listening_channels ( PG_FUNCTION_ARGS   ) 

Definition at line 677 of file async.c.

References CStringGetTextDatum, lfirst, list_head(), lnext, MemoryContextSwitchTo(), FuncCallContext::multi_call_memory_ctx, NULL, palloc(), SRF_FIRSTCALL_INIT, SRF_IS_FIRSTCALL, SRF_PERCALL_SETUP, SRF_RETURN_DONE, SRF_RETURN_NEXT, and FuncCallContext::user_fctx.

{
    FuncCallContext *funcctx;
    ListCell  **lcp;

    /* stuff done only on the first call of the function */
    if (SRF_IS_FIRSTCALL())
    {
        MemoryContext oldcontext;

        /* create a function context for cross-call persistence */
        funcctx = SRF_FIRSTCALL_INIT();

        /* switch to memory context appropriate for multiple function calls */
        oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

        /* allocate memory for user context */
        lcp = (ListCell **) palloc(sizeof(ListCell *));
        *lcp = list_head(listenChannels);
        funcctx->user_fctx = (void *) lcp;

        MemoryContextSwitchTo(oldcontext);
    }

    /* stuff done on every call of the function */
    funcctx = SRF_PERCALL_SETUP();
    lcp = (ListCell **) funcctx->user_fctx;

    while (*lcp != NULL)
    {
        char       *channel = (char *) lfirst(*lcp);

        *lcp = lnext(*lcp);
        SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
    }

    SRF_RETURN_DONE(funcctx);
}

Datum pg_notify ( PG_FUNCTION_ARGS   ) 

Definition at line 500 of file async.c.

References Async_Notify(), PG_ARGISNULL, PG_GETARG_TEXT_PP, PG_RETURN_VOID, PreventCommandDuringRecovery(), and text_to_cstring().

{
    const char *channel;
    const char *payload;

    if (PG_ARGISNULL(0))
        channel = "";
    else
        channel = text_to_cstring(PG_GETARG_TEXT_PP(0));

    if (PG_ARGISNULL(1))
        payload = "";
    else
        payload = text_to_cstring(PG_GETARG_TEXT_PP(1));

    /* For NOTIFY as a statement, this is checked in ProcessUtility */
    PreventCommandDuringRecovery("NOTIFY");

    Async_Notify(channel, payload);

    PG_RETURN_VOID();
}

void PreCommit_Notify ( void   ) 

Definition at line 762 of file async.c.

References AccessExclusiveLock, ListenAction::action, asyncQueueAddEntries(), asyncQueueFillWarning(), asyncQueueIsFull(), AsyncQueueLock, backendHasSentNotifications, DatabaseRelationId, DEBUG1, elog, ereport, errcode(), errmsg(), ERROR, Exec_ListenPreCommit(), GetCurrentTransactionId(), InvalidOid, lfirst, list_head(), LISTEN_LISTEN, LISTEN_UNLISTEN, LISTEN_UNLISTEN_ALL, LockSharedObject(), LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), NIL, NULL, and Trace_notify.

Referenced by CommitTransaction().

{
    ListCell   *p;

    if (pendingActions == NIL && pendingNotifies == NIL)
        return;                 /* no relevant statements in this xact */

    if (Trace_notify)
        elog(DEBUG1, "PreCommit_Notify");

    /* Preflight for any pending listen/unlisten actions */
    foreach(p, pendingActions)
    {
        ListenAction *actrec = (ListenAction *) lfirst(p);

        switch (actrec->action)
        {
            case LISTEN_LISTEN:
                Exec_ListenPreCommit();
                break;
            case LISTEN_UNLISTEN:
                /* there is no Exec_UnlistenPreCommit() */
                break;
            case LISTEN_UNLISTEN_ALL:
                /* there is no Exec_UnlistenAllPreCommit() */
                break;
        }
    }

    /* Queue any pending notifies */
    if (pendingNotifies)
    {
        ListCell   *nextNotify;

        /*
         * Make sure that we have an XID assigned to the current transaction.
         * GetCurrentTransactionId is cheap if we already have an XID, but not
         * so cheap if we don't, and we'd prefer not to do that work while
         * holding AsyncQueueLock.
         */
        (void) GetCurrentTransactionId();

        /*
         * Serialize writers by acquiring a special lock that we hold till
         * after commit.  This ensures that queue entries appear in commit
         * order, and in particular that there are never uncommitted queue
         * entries ahead of committed ones, so an uncommitted transaction
         * can't block delivery of deliverable notifications.
         *
         * We use a heavyweight lock so that it'll automatically be released
         * after either commit or abort.  This also allows deadlocks to be
         * detected, though really a deadlock shouldn't be possible here.
         *
         * The lock is on "database 0", which is pretty ugly but it doesn't
         * seem worth inventing a special locktag category just for this.
         * (Historical note: before PG 9.0, a similar lock on "database 0" was
         * used by the flatfiles mechanism.)
         */
        LockSharedObject(DatabaseRelationId, InvalidOid, 0,
                         AccessExclusiveLock);

        /* Now push the notifications into the queue */
        backendHasSentNotifications = true;

        nextNotify = list_head(pendingNotifies);
        while (nextNotify != NULL)
        {
            /*
             * Add the pending notifications to the queue.  We acquire and
             * release AsyncQueueLock once per page, which might be overkill
             * but it does allow readers to get in while we're doing this.
             *
             * A full queue is very uncommon and should really not happen,
             * given that we have so much space available in the SLRU pages.
             * Nevertheless we need to deal with this possibility. Note that
             * when we get here we are in the process of committing our
             * transaction, but we have not yet committed to clog, so at this
             * point in time we can still roll the transaction back.
             */
            LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
            asyncQueueFillWarning();
            if (asyncQueueIsFull())
                ereport(ERROR,
                        (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                      errmsg("too many notifications in the NOTIFY queue")));
            nextNotify = asyncQueueAddEntries(nextNotify);
            LWLockRelease(AsyncQueueLock);
        }
    }
}

void ProcessCompletedNotifies ( void   ) 

Definition at line 1057 of file async.c.

References asyncQueueAdvanceTail(), asyncQueueReadAllNotifications(), backendHasSentNotifications, CommitTransactionCommand(), CurrentMemoryContext, DEBUG1, elog, MemoryContextSwitchTo(), NIL, SignalBackends(), StartTransactionCommand(), and Trace_notify.

Referenced by PostgresMain().

{
    MemoryContext caller_context;
    bool        signalled;

    /* Nothing to do if we didn't send any notifications */
    if (!backendHasSentNotifications)
        return;

    /*
     * We reset the flag immediately; otherwise, if any sort of error occurs
     * below, we'd be locked up in an infinite loop, because control will come
     * right back here after error cleanup.
     */
    backendHasSentNotifications = false;

    /*
     * We must preserve the caller's memory context (probably MessageContext)
     * across the transaction we do here.
     */
    caller_context = CurrentMemoryContext;

    if (Trace_notify)
        elog(DEBUG1, "ProcessCompletedNotifies");

    /*
     * We must run asyncQueueReadAllNotifications inside a transaction, else
     * bad things happen if it gets an error.
     */
    StartTransactionCommand();

    /* Send signals to other backends */
    signalled = SignalBackends();

    if (listenChannels != NIL)
    {
        /* Read the queue ourselves, and send relevant stuff to the frontend */
        asyncQueueReadAllNotifications();
    }
    else if (!signalled)
    {
        /*
         * If we found no other listening backends, and we aren't listening
         * ourselves, then we must execute asyncQueueAdvanceTail to flush the
         * queue, because ain't nobody else gonna do it.  This prevents queue
         * overflow when we're sending useless notifies to nobody. (A new
         * listener could have joined since we looked, but if so this is
         * harmless.)
         */
        asyncQueueAdvanceTail();
    }

    CommitTransactionCommand();

    MemoryContextSwitchTo(caller_context);

    /* We don't need pq_flush() here since postgres.c will do one shortly */
}


Variable Documentation