#include "postgres.h"#include "miscadmin.h"#include "postmaster/bgworker.h"#include "storage/ipc.h"#include "storage/latch.h"#include "storage/lwlock.h"#include "storage/proc.h"#include "storage/shmem.h"#include "access/xact.h"#include "executor/spi.h"#include "fmgr.h"#include "lib/stringinfo.h"#include "pgstat.h"#include "utils/builtins.h"#include "utils/snapmgr.h"#include "tcop/utility.h"
Go to the source code of this file.
Data Structures | |
| struct | worktable |
Typedefs | |
| typedef struct worktable | worktable |
Functions | |
| void | _PG_init (void) |
| static void | worker_spi_sigterm (SIGNAL_ARGS) |
| static void | worker_spi_sighup (SIGNAL_ARGS) |
| static void | initialize_worker_spi (worktable *table) |
| static void | worker_spi_main (void *main_arg) |
Variables | |
| PG_MODULE_MAGIC | |
| static volatile sig_atomic_t | got_sighup = false |
| static volatile sig_atomic_t | got_sigterm = false |
| static int | worker_spi_naptime = 10 |
| static int | worker_spi_total_workers = 2 |
| void _PG_init | ( | void | ) |
Definition at line 292 of file worker_spi.c.
References BackgroundWorker::bgw_flags, BackgroundWorker::bgw_main, BackgroundWorker::bgw_main_arg, BackgroundWorker::bgw_name, BackgroundWorker::bgw_restart_time, BackgroundWorker::bgw_sighup, BackgroundWorker::bgw_sigterm, BackgroundWorker::bgw_start_time, BGWORKER_SHMEM_ACCESS, DefineCustomIntVariable(), i, worktable::name, name, NULL, palloc(), PGC_POSTMASTER, PGC_SIGHUP, pstrdup(), RegisterBackgroundWorker(), worktable::schema, worker_spi_naptime, and worker_spi_total_workers.
{
BackgroundWorker worker;
worktable *table;
unsigned int i;
char name[20];
/* get the configuration */
DefineCustomIntVariable("worker_spi.naptime",
"Duration between each check (in seconds).",
NULL,
&worker_spi_naptime,
10,
1,
INT_MAX,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);
DefineCustomIntVariable("worker_spi.total_workers",
"Number of workers.",
NULL,
&worker_spi_total_workers,
2,
1,
100,
PGC_POSTMASTER,
0,
NULL,
NULL,
NULL);
/* set up common data for all our workers */
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART;
worker.bgw_main = worker_spi_main;
worker.bgw_sighup = worker_spi_sighup;
worker.bgw_sigterm = worker_spi_sigterm;
/*
* Now fill in worker-specific data, and do the actual registrations.
*/
for (i = 1; i <= worker_spi_total_workers; i++)
{
sprintf(name, "worker %d", i);
worker.bgw_name = pstrdup(name);
table = palloc(sizeof(worktable));
sprintf(name, "schema%d", i);
table->schema = pstrdup(name);
table->name = pstrdup("counted");
worker.bgw_main_arg = (void *) table;
RegisterBackgroundWorker(&worker);
}
}
| static void initialize_worker_spi | ( | worktable * | table | ) | [static] |
Definition at line 98 of file worker_spi.c.
References appendStringInfo(), buf, CommitTransactionCommand(), StringInfoData::data, DatumGetInt32, elog, FATAL, GetTransactionSnapshot(), initStringInfo(), worktable::name, NULL, pgstat_report_activity(), PopActiveSnapshot(), PushActiveSnapshot(), resetStringInfo(), worktable::schema, SetCurrentStatementStartTimestamp(), SPI_connect(), SPI_execute(), SPI_finish(), SPI_getbinval(), SPI_OK_SELECT, SPI_OK_UTILITY, SPI_processed, SPI_tuptable, StartTransactionCommand(), STATE_IDLE, STATE_RUNNING, SPITupleTable::tupdesc, and SPITupleTable::vals.
Referenced by worker_spi_main().
{
int ret;
int ntup;
bool isnull;
StringInfoData buf;
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
pgstat_report_activity(STATE_RUNNING, "initializing spi_worker schema");
/* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
initStringInfo(&buf);
appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
table->schema);
ret = SPI_execute(buf.data, true, 0);
if (ret != SPI_OK_SELECT)
elog(FATAL, "SPI_execute failed: error code %d", ret);
if (SPI_processed != 1)
elog(FATAL, "not a singleton result");
ntup = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
SPI_tuptable->tupdesc,
1, &isnull));
if (isnull)
elog(FATAL, "null result");
if (ntup == 0)
{
resetStringInfo(&buf);
appendStringInfo(&buf,
"CREATE SCHEMA \"%s\" "
"CREATE TABLE \"%s\" ("
" type text CHECK (type IN ('total', 'delta')), "
" value integer)"
"CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
"WHERE type = 'total'",
table->schema, table->name, table->name, table->name);
/* set statement start time */
SetCurrentStatementStartTimestamp();
ret = SPI_execute(buf.data, false, 0);
if (ret != SPI_OK_UTILITY)
elog(FATAL, "failed to create my schema");
}
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
pgstat_report_activity(STATE_IDLE, NULL);
}
| static void worker_spi_main | ( | void * | main_arg | ) | [static] |
Definition at line 157 of file worker_spi.c.
References appendStringInfo(), BackgroundWorkerInitializeConnection(), BackgroundWorkerUnblockSignals(), BackgroundWorker::bgw_name, buf, CommitTransactionCommand(), StringInfoData::data, DatumGetInt32, elog, FATAL, GetTransactionSnapshot(), got_sighup, got_sigterm, initialize_worker_spi(), initStringInfo(), LOG, MyBgworkerEntry, MyProc, worktable::name, NULL, PGC_SIGHUP, pgstat_report_activity(), PopActiveSnapshot(), proc_exit(), ProcessConfigFile(), PGPROC::procLatch, PushActiveSnapshot(), quote_identifier(), ResetLatch(), worktable::schema, SetCurrentStatementStartTimestamp(), SPI_connect(), SPI_execute(), SPI_finish(), SPI_getbinval(), SPI_OK_UPDATE_RETURNING, SPI_processed, SPI_tuptable, StartTransactionCommand(), STATE_IDLE, STATE_RUNNING, SPITupleTable::tupdesc, val, SPITupleTable::vals, WaitLatch(), WL_LATCH_SET, WL_POSTMASTER_DEATH, WL_TIMEOUT, and worker_spi_naptime.
{
worktable *table = (worktable *) main_arg;
StringInfoData buf;
/* We're now ready to receive signals */
BackgroundWorkerUnblockSignals();
/* Connect to our database */
BackgroundWorkerInitializeConnection("postgres", NULL);
elog(LOG, "%s initialized with %s.%s",
MyBgworkerEntry->bgw_name, table->schema, table->name);
initialize_worker_spi(table);
/*
* Quote identifiers passed to us. Note that this must be done after
* initialize_worker_spi, because that routine assumes the names are not
* quoted.
*
* Note some memory might be leaked here.
*/
table->schema = quote_identifier(table->schema);
table->name = quote_identifier(table->name);
initStringInfo(&buf);
appendStringInfo(&buf,
"WITH deleted AS (DELETE "
"FROM %s.%s "
"WHERE type = 'delta' RETURNING value), "
"total AS (SELECT coalesce(sum(value), 0) as sum "
"FROM deleted) "
"UPDATE %s.%s "
"SET value = %s.value + total.sum "
"FROM total WHERE type = 'total' "
"RETURNING %s.value",
table->schema, table->name,
table->schema, table->name,
table->name,
table->name);
/*
* Main loop: do this until the SIGTERM handler tells us to terminate
*/
while (!got_sigterm)
{
int ret;
int rc;
/*
* Background workers mustn't call usleep() or any direct equivalent:
* instead, they may wait on their process latch, which sleeps as
* necessary, but is awakened if postmaster dies. That way the
* background process goes away immediately in an emergency.
*/
rc = WaitLatch(&MyProc->procLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
worker_spi_naptime * 1000L);
ResetLatch(&MyProc->procLatch);
/* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
/*
* In case of a SIGHUP, just reload the configuration.
*/
if (got_sighup)
{
got_sighup = false;
ProcessConfigFile(PGC_SIGHUP);
}
/*
* Start a transaction on which we can run queries. Note that each
* StartTransactionCommand() call should be preceded by a
* SetCurrentStatementStartTimestamp() call, which sets both the time
* for the statement we're about the run, and also the transaction
* start time. Also, each other query sent to SPI should probably be
* preceded by SetCurrentStatementStartTimestamp(), so that statement
* start time is always up to date.
*
* The SPI_connect() call lets us run queries through the SPI manager,
* and the PushActiveSnapshot() call creates an "active" snapshot which
* is necessary for queries to have MVCC data to work on.
*
* The pgstat_report_activity() call makes our activity visible through
* the pgstat views.
*/
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
pgstat_report_activity(STATE_RUNNING, buf.data);
/* We can now execute queries via SPI */
ret = SPI_execute(buf.data, false, 0);
if (ret != SPI_OK_UPDATE_RETURNING)
elog(FATAL, "cannot select from table %s.%s: error code %d",
table->schema, table->name, ret);
if (SPI_processed > 0)
{
bool isnull;
int32 val;
val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
SPI_tuptable->tupdesc,
1, &isnull));
if (!isnull)
elog(LOG, "%s: count in %s.%s is now %d",
MyBgworkerEntry->bgw_name,
table->schema, table->name, val);
}
/*
* And finish our transaction.
*/
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
pgstat_report_activity(STATE_IDLE, NULL);
}
proc_exit(0);
}
| static void worker_spi_sighup | ( | SIGNAL_ARGS | ) | [static] |
Definition at line 86 of file worker_spi.c.
References got_sighup, MyProc, PGPROC::procLatch, and SetLatch().
{
got_sighup = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
}
| static void worker_spi_sigterm | ( | SIGNAL_ARGS | ) | [static] |
Definition at line 69 of file worker_spi.c.
References got_sigterm, MyProc, PGPROC::procLatch, and SetLatch().
{
int save_errno = errno;
got_sigterm = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno;
}
volatile sig_atomic_t got_sighup = false [static] |
Definition at line 49 of file worker_spi.c.
Referenced by worker_spi_main(), and worker_spi_sighup().
volatile sig_atomic_t got_sigterm = false [static] |
Definition at line 50 of file worker_spi.c.
Referenced by worker_spi_main(), and worker_spi_sigterm().
Definition at line 44 of file worker_spi.c.
int worker_spi_naptime = 10 [static] |
Definition at line 53 of file worker_spi.c.
Referenced by _PG_init(), and worker_spi_main().
int worker_spi_total_workers = 2 [static] |
Definition at line 54 of file worker_spi.c.
Referenced by _PG_init().
1.7.1