#include "postgres.h"
#include "miscadmin.h"
#include "postmaster/bgworker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "access/xact.h"
#include "executor/spi.h"
#include "fmgr.h"
#include "lib/stringinfo.h"
#include "pgstat.h"
#include "utils/builtins.h"
#include "utils/snapmgr.h"
#include "tcop/utility.h"
Go to the source code of this file.
Data Structures | |
struct | worktable |
Typedefs | |
typedef struct worktable | worktable |
Functions | |
void | _PG_init (void) |
static void | worker_spi_sigterm (SIGNAL_ARGS) |
static void | worker_spi_sighup (SIGNAL_ARGS) |
static void | initialize_worker_spi (worktable *table) |
static void | worker_spi_main (void *main_arg) |
Variables | |
PG_MODULE_MAGIC | |
static volatile sig_atomic_t | got_sighup = false |
static volatile sig_atomic_t | got_sigterm = false |
static int | worker_spi_naptime = 10 |
static int | worker_spi_total_workers = 2 |
void _PG_init | ( | void | ) |
Definition at line 292 of file worker_spi.c.
References BackgroundWorker::bgw_flags, BackgroundWorker::bgw_main, BackgroundWorker::bgw_main_arg, BackgroundWorker::bgw_name, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_sighup, BackgroundWorker::bgw_sigterm, BackgroundWorker::bgw_start_time, BGWORKER_SHMEM_ACCESS, DefineCustomIntVariable(), i, worktable::name, name, NULL, palloc(), PGC_POSTMASTER, PGC_SIGHUP, pstrdup(), RegisterBackgroundWorker(), worktable::schema, worker_spi_naptime, and worker_spi_total_workers.
{ BackgroundWorker worker; worktable *table; unsigned int i; char name[20]; /* get the configuration */ DefineCustomIntVariable("worker_spi.naptime", "Duration between each check (in seconds).", NULL, &worker_spi_naptime, 10, 1, INT_MAX, PGC_SIGHUP, 0, NULL, NULL, NULL); DefineCustomIntVariable("worker_spi.total_workers", "Number of workers.", NULL, &worker_spi_total_workers, 2, 1, 100, PGC_POSTMASTER, 0, NULL, NULL, NULL); /* set up common data for all our workers */ worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; worker.bgw_start_time = BgWorkerStart_RecoveryFinished; worker.bgw_restart_time = BGW_NEVER_RESTART; worker.bgw_main = worker_spi_main; worker.bgw_sighup = worker_spi_sighup; worker.bgw_sigterm = worker_spi_sigterm; /* * Now fill in worker-specific data, and do the actual registrations. */ for (i = 1; i <= worker_spi_total_workers; i++) { sprintf(name, "worker %d", i); worker.bgw_name = pstrdup(name); table = palloc(sizeof(worktable)); sprintf(name, "schema%d", i); table->schema = pstrdup(name); table->name = pstrdup("counted"); worker.bgw_main_arg = (void *) table; RegisterBackgroundWorker(&worker); } }
static void initialize_worker_spi | ( | worktable * | table | ) | [static] |
Definition at line 98 of file worker_spi.c.
References appendStringInfo(), buf, CommitTransactionCommand(), StringInfoData::data, DatumGetInt32, elog, FATAL, GetTransactionSnapshot(), initStringInfo(), worktable::name, NULL, pgstat_report_activity(), PopActiveSnapshot(), PushActiveSnapshot(), resetStringInfo(), worktable::schema, SetCurrentStatementStartTimestamp(), SPI_connect(), SPI_execute(), SPI_finish(), SPI_getbinval(), SPI_OK_SELECT, SPI_OK_UTILITY, SPI_processed, SPI_tuptable, StartTransactionCommand(), STATE_IDLE, STATE_RUNNING, SPITupleTable::tupdesc, and SPITupleTable::vals.
Referenced by worker_spi_main().
{ int ret; int ntup; bool isnull; StringInfoData buf; SetCurrentStatementStartTimestamp(); StartTransactionCommand(); SPI_connect(); PushActiveSnapshot(GetTransactionSnapshot()); pgstat_report_activity(STATE_RUNNING, "initializing spi_worker schema"); /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */ initStringInfo(&buf); appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'", table->schema); ret = SPI_execute(buf.data, true, 0); if (ret != SPI_OK_SELECT) elog(FATAL, "SPI_execute failed: error code %d", ret); if (SPI_processed != 1) elog(FATAL, "not a singleton result"); ntup = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull)); if (isnull) elog(FATAL, "null result"); if (ntup == 0) { resetStringInfo(&buf); appendStringInfo(&buf, "CREATE SCHEMA \"%s\" " "CREATE TABLE \"%s\" (" " type text CHECK (type IN ('total', 'delta')), " " value integer)" "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) " "WHERE type = 'total'", table->schema, table->name, table->name, table->name); /* set statement start time */ SetCurrentStatementStartTimestamp(); ret = SPI_execute(buf.data, false, 0); if (ret != SPI_OK_UTILITY) elog(FATAL, "failed to create my schema"); } SPI_finish(); PopActiveSnapshot(); CommitTransactionCommand(); pgstat_report_activity(STATE_IDLE, NULL); }
static void worker_spi_main | ( | void * | main_arg | ) | [static] |
Definition at line 157 of file worker_spi.c.
References appendStringInfo(), BackgroundWorkerInitializeConnection(), BackgroundWorkerUnblockSignals(), BackgroundWorker::bgw_name, buf, CommitTransactionCommand(), StringInfoData::data, DatumGetInt32, elog, FATAL, GetTransactionSnapshot(), got_sighup, got_sigterm, initialize_worker_spi(), initStringInfo(), LOG, MyBgworkerEntry, MyProc, worktable::name, NULL, PGC_SIGHUP, pgstat_report_activity(), PopActiveSnapshot(), proc_exit(), ProcessConfigFile(), PGPROC::procLatch, PushActiveSnapshot(), quote_identifier(), ResetLatch(), worktable::schema, SetCurrentStatementStartTimestamp(), SPI_connect(), SPI_execute(), SPI_finish(), SPI_getbinval(), SPI_OK_UPDATE_RETURNING, SPI_processed, SPI_tuptable, StartTransactionCommand(), STATE_IDLE, STATE_RUNNING, SPITupleTable::tupdesc, val, SPITupleTable::vals, WaitLatch(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_TIMEOUT, and worker_spi_naptime.
{ worktable *table = (worktable *) main_arg; StringInfoData buf; /* We're now ready to receive signals */ BackgroundWorkerUnblockSignals(); /* Connect to our database */ BackgroundWorkerInitializeConnection("postgres", NULL); elog(LOG, "%s initialized with %s.%s", MyBgworkerEntry->bgw_name, table->schema, table->name); initialize_worker_spi(table); /* * Quote identifiers passed to us. Note that this must be done after * initialize_worker_spi, because that routine assumes the names are not * quoted. * * Note some memory might be leaked here. */ table->schema = quote_identifier(table->schema); table->name = quote_identifier(table->name); initStringInfo(&buf); appendStringInfo(&buf, "WITH deleted AS (DELETE " "FROM %s.%s " "WHERE type = 'delta' RETURNING value), " "total AS (SELECT coalesce(sum(value), 0) as sum " "FROM deleted) " "UPDATE %s.%s " "SET value = %s.value + total.sum " "FROM total WHERE type = 'total' " "RETURNING %s.value", table->schema, table->name, table->schema, table->name, table->name, table->name); /* * Main loop: do this until the SIGTERM handler tells us to terminate */ while (!got_sigterm) { int ret; int rc; /* * Background workers mustn't call usleep() or any direct equivalent: * instead, they may wait on their process latch, which sleeps as * necessary, but is awakened if postmaster dies. That way the * background process goes away immediately in an emergency. */ rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, worker_spi_naptime * 1000L); ResetLatch(&MyProc->procLatch); /* emergency bailout if postmaster has died */ if (rc & WL_POSTMASTER_DEATH) proc_exit(1); /* * In case of a SIGHUP, just reload the configuration. */ if (got_sighup) { got_sighup = false; ProcessConfigFile(PGC_SIGHUP); } /* * Start a transaction on which we can run queries. Note that each * StartTransactionCommand() call should be preceded by a * SetCurrentStatementStartTimestamp() call, which sets both the time * for the statement we're about the run, and also the transaction * start time. Also, each other query sent to SPI should probably be * preceded by SetCurrentStatementStartTimestamp(), so that statement * start time is always up to date. * * The SPI_connect() call lets us run queries through the SPI manager, * and the PushActiveSnapshot() call creates an "active" snapshot which * is necessary for queries to have MVCC data to work on. * * The pgstat_report_activity() call makes our activity visible through * the pgstat views. */ SetCurrentStatementStartTimestamp(); StartTransactionCommand(); SPI_connect(); PushActiveSnapshot(GetTransactionSnapshot()); pgstat_report_activity(STATE_RUNNING, buf.data); /* We can now execute queries via SPI */ ret = SPI_execute(buf.data, false, 0); if (ret != SPI_OK_UPDATE_RETURNING) elog(FATAL, "cannot select from table %s.%s: error code %d", table->schema, table->name, ret); if (SPI_processed > 0) { bool isnull; int32 val; val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull)); if (!isnull) elog(LOG, "%s: count in %s.%s is now %d", MyBgworkerEntry->bgw_name, table->schema, table->name, val); } /* * And finish our transaction. */ SPI_finish(); PopActiveSnapshot(); CommitTransactionCommand(); pgstat_report_activity(STATE_IDLE, NULL); } proc_exit(0); }
static void worker_spi_sighup | ( | SIGNAL_ARGS | ) | [static] |
Definition at line 86 of file worker_spi.c.
References got_sighup, MyProc, PGPROC::procLatch, and SetLatch().
{ got_sighup = true; if (MyProc) SetLatch(&MyProc->procLatch); }
static void worker_spi_sigterm | ( | SIGNAL_ARGS | ) | [static] |
Definition at line 69 of file worker_spi.c.
References got_sigterm, MyProc, PGPROC::procLatch, and SetLatch().
{ int save_errno = errno; got_sigterm = true; if (MyProc) SetLatch(&MyProc->procLatch); errno = save_errno; }
volatile sig_atomic_t got_sighup = false [static] |
Definition at line 49 of file worker_spi.c.
Referenced by worker_spi_main(), and worker_spi_sighup().
volatile sig_atomic_t got_sigterm = false [static] |
Definition at line 50 of file worker_spi.c.
Referenced by worker_spi_main(), and worker_spi_sigterm().
Definition at line 44 of file worker_spi.c.
int worker_spi_naptime = 10 [static] |
Definition at line 53 of file worker_spi.c.
Referenced by _PG_init(), and worker_spi_main().
int worker_spi_total_workers = 2 [static] |
Definition at line 54 of file worker_spi.c.
Referenced by _PG_init().