Header And Logo

PostgreSQL
| The world's most advanced open source database.

worker_spi.c

Go to the documentation of this file.
00001 /* -------------------------------------------------------------------------
00002  *
00003  * worker_spi.c
00004  *      Sample background worker code that demonstrates various coding
00005  *      patterns: establishing a database connection; starting and committing
00006  *      transactions; using GUC variables, and heeding SIGHUP to reread
00007  *      the configuration file; reporting to pg_stat_activity; using the
00008  *      process latch to sleep and exit in case of postmaster death.
00009  *
00010  * This code connects to a database, creates a schema and table, and summarizes
00011  * the numbers contained therein.  To see it working, insert an initial value
00012  * with "total" type and some initial value; then insert some other rows with
00013  * "delta" type.  Delta rows will be deleted by this worker and their values
00014  * aggregated into the total.
00015  *
00016  * Copyright (C) 2013, PostgreSQL Global Development Group
00017  *
00018  * IDENTIFICATION
00019  *      contrib/worker_spi/worker_spi.c
00020  *
00021  * -------------------------------------------------------------------------
00022  */
00023 #include "postgres.h"
00024 
00025 /* These are always necessary for a bgworker */
00026 #include "miscadmin.h"
00027 #include "postmaster/bgworker.h"
00028 #include "storage/ipc.h"
00029 #include "storage/latch.h"
00030 #include "storage/lwlock.h"
00031 #include "storage/proc.h"
00032 #include "storage/shmem.h"
00033 
00034 /* these headers are used by this particular worker's code */
00035 #include "access/xact.h"
00036 #include "executor/spi.h"
00037 #include "fmgr.h"
00038 #include "lib/stringinfo.h"
00039 #include "pgstat.h"
00040 #include "utils/builtins.h"
00041 #include "utils/snapmgr.h"
00042 #include "tcop/utility.h"
00043 
00044 PG_MODULE_MAGIC;
00045 
00046 void    _PG_init(void);
00047 
00048 /* flags set by signal handlers */
00049 static volatile sig_atomic_t got_sighup = false;
00050 static volatile sig_atomic_t got_sigterm = false;
00051 
00052 /* GUC variables */
00053 static int  worker_spi_naptime = 10;
00054 static int  worker_spi_total_workers = 2;
00055 
00056 
00057 typedef struct worktable
00058 {
00059     const char     *schema;
00060     const char     *name;
00061 } worktable;
00062 
00063 /*
00064  * Signal handler for SIGTERM
00065  *      Set a flag to let the main loop to terminate, and set our latch to wake
00066  *      it up.
00067  */
00068 static void
00069 worker_spi_sigterm(SIGNAL_ARGS)
00070 {
00071     int         save_errno = errno;
00072 
00073     got_sigterm = true;
00074     if (MyProc)
00075         SetLatch(&MyProc->procLatch);
00076 
00077     errno = save_errno;
00078 }
00079 
00080 /*
00081  * Signal handler for SIGHUP
00082  *      Set a flag to let the main loop to reread the config file, and set
00083  *      our latch to wake it up.
00084  */
00085 static void
00086 worker_spi_sighup(SIGNAL_ARGS)
00087 {
00088     got_sighup = true;
00089     if (MyProc)
00090         SetLatch(&MyProc->procLatch);
00091 }
00092 
00093 /*
00094  * Initialize workspace for a worker process: create the schema if it doesn't
00095  * already exist.
00096  */
00097 static void
00098 initialize_worker_spi(worktable *table)
00099 {
00100     int     ret;
00101     int     ntup;
00102     bool    isnull;
00103     StringInfoData  buf;
00104 
00105     SetCurrentStatementStartTimestamp();
00106     StartTransactionCommand();
00107     SPI_connect();
00108     PushActiveSnapshot(GetTransactionSnapshot());
00109     pgstat_report_activity(STATE_RUNNING, "initializing spi_worker schema");
00110 
00111     /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
00112     initStringInfo(&buf);
00113     appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
00114                      table->schema);
00115 
00116     ret = SPI_execute(buf.data, true, 0);
00117     if (ret != SPI_OK_SELECT)
00118         elog(FATAL, "SPI_execute failed: error code %d", ret);
00119 
00120     if (SPI_processed != 1)
00121         elog(FATAL, "not a singleton result");
00122 
00123     ntup = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
00124                                        SPI_tuptable->tupdesc,
00125                                        1, &isnull));
00126     if (isnull)
00127         elog(FATAL, "null result");
00128 
00129     if (ntup == 0)
00130     {
00131         resetStringInfo(&buf);
00132         appendStringInfo(&buf,
00133                          "CREATE SCHEMA \"%s\" "
00134                          "CREATE TABLE \"%s\" ("
00135                          "      type text CHECK (type IN ('total', 'delta')), "
00136                          "      value   integer)"
00137                          "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
00138                          "WHERE type = 'total'",
00139                          table->schema, table->name, table->name, table->name);
00140 
00141         /* set statement start time */
00142         SetCurrentStatementStartTimestamp();
00143 
00144         ret = SPI_execute(buf.data, false, 0);
00145 
00146         if (ret != SPI_OK_UTILITY)
00147             elog(FATAL, "failed to create my schema");
00148     }
00149 
00150     SPI_finish();
00151     PopActiveSnapshot();
00152     CommitTransactionCommand();
00153     pgstat_report_activity(STATE_IDLE, NULL);
00154 }
00155 
00156 static void
00157 worker_spi_main(void *main_arg)
00158 {
00159     worktable      *table = (worktable *) main_arg;
00160     StringInfoData  buf;
00161 
00162     /* We're now ready to receive signals */
00163     BackgroundWorkerUnblockSignals();
00164 
00165     /* Connect to our database */
00166     BackgroundWorkerInitializeConnection("postgres", NULL);
00167 
00168     elog(LOG, "%s initialized with %s.%s",
00169          MyBgworkerEntry->bgw_name, table->schema, table->name);
00170     initialize_worker_spi(table);
00171 
00172     /*
00173      * Quote identifiers passed to us.  Note that this must be done after
00174      * initialize_worker_spi, because that routine assumes the names are not
00175      * quoted.
00176      *
00177      * Note some memory might be leaked here.
00178      */
00179     table->schema = quote_identifier(table->schema);
00180     table->name = quote_identifier(table->name);
00181 
00182     initStringInfo(&buf);
00183     appendStringInfo(&buf,
00184                      "WITH deleted AS (DELETE "
00185                      "FROM %s.%s "
00186                      "WHERE type = 'delta' RETURNING value), "
00187                      "total AS (SELECT coalesce(sum(value), 0) as sum "
00188                      "FROM deleted) "
00189                      "UPDATE %s.%s "
00190                      "SET value = %s.value + total.sum "
00191                      "FROM total WHERE type = 'total' "
00192                      "RETURNING %s.value",
00193                      table->schema, table->name,
00194                      table->schema, table->name,
00195                      table->name,
00196                      table->name);
00197 
00198     /*
00199      * Main loop: do this until the SIGTERM handler tells us to terminate
00200      */
00201     while (!got_sigterm)
00202     {
00203         int     ret;
00204         int     rc;
00205 
00206         /*
00207          * Background workers mustn't call usleep() or any direct equivalent:
00208          * instead, they may wait on their process latch, which sleeps as
00209          * necessary, but is awakened if postmaster dies.  That way the
00210          * background process goes away immediately in an emergency.
00211          */
00212         rc = WaitLatch(&MyProc->procLatch,
00213                        WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
00214                        worker_spi_naptime * 1000L);
00215         ResetLatch(&MyProc->procLatch);
00216 
00217         /* emergency bailout if postmaster has died */
00218         if (rc & WL_POSTMASTER_DEATH)
00219             proc_exit(1);
00220 
00221         /*
00222          * In case of a SIGHUP, just reload the configuration.
00223          */
00224         if (got_sighup)
00225         {
00226             got_sighup = false;
00227             ProcessConfigFile(PGC_SIGHUP);
00228         }
00229 
00230         /*
00231          * Start a transaction on which we can run queries.  Note that each
00232          * StartTransactionCommand() call should be preceded by a
00233          * SetCurrentStatementStartTimestamp() call, which sets both the time
00234          * for the statement we're about the run, and also the transaction
00235          * start time.  Also, each other query sent to SPI should probably be
00236          * preceded by SetCurrentStatementStartTimestamp(), so that statement
00237          * start time is always up to date.
00238          *
00239          * The SPI_connect() call lets us run queries through the SPI manager,
00240          * and the PushActiveSnapshot() call creates an "active" snapshot which
00241          * is necessary for queries to have MVCC data to work on.
00242          *
00243          * The pgstat_report_activity() call makes our activity visible through
00244          * the pgstat views.
00245          */
00246         SetCurrentStatementStartTimestamp();
00247         StartTransactionCommand();
00248         SPI_connect();
00249         PushActiveSnapshot(GetTransactionSnapshot());
00250         pgstat_report_activity(STATE_RUNNING, buf.data);
00251 
00252         /* We can now execute queries via SPI */
00253         ret = SPI_execute(buf.data, false, 0);
00254 
00255         if (ret != SPI_OK_UPDATE_RETURNING)
00256             elog(FATAL, "cannot select from table %s.%s: error code %d",
00257                  table->schema, table->name, ret);
00258 
00259         if (SPI_processed > 0)
00260         {
00261             bool    isnull;
00262             int32   val;
00263 
00264             val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
00265                                                SPI_tuptable->tupdesc,
00266                                                1, &isnull));
00267             if (!isnull)
00268                 elog(LOG, "%s: count in %s.%s is now %d",
00269                      MyBgworkerEntry->bgw_name,
00270                      table->schema, table->name, val);
00271         }
00272 
00273         /*
00274          * And finish our transaction.
00275          */
00276         SPI_finish();
00277         PopActiveSnapshot();
00278         CommitTransactionCommand();
00279         pgstat_report_activity(STATE_IDLE, NULL);
00280     }
00281 
00282     proc_exit(0);
00283 }
00284 
00285 /*
00286  * Entrypoint of this module.
00287  *
00288  * We register more than one worker process here, to demonstrate how that can
00289  * be done.
00290  */
00291 void
00292 _PG_init(void)
00293 {
00294     BackgroundWorker    worker;
00295     worktable          *table;
00296     unsigned int        i;
00297     char                name[20];
00298 
00299     /* get the configuration */
00300     DefineCustomIntVariable("worker_spi.naptime",
00301                 "Duration between each check (in seconds).",
00302                 NULL,
00303                 &worker_spi_naptime,
00304                 10,
00305                 1,
00306                 INT_MAX,
00307                 PGC_SIGHUP,
00308                 0,
00309                 NULL,
00310                 NULL,
00311                 NULL);
00312     DefineCustomIntVariable("worker_spi.total_workers",
00313                 "Number of workers.",
00314                 NULL,
00315                 &worker_spi_total_workers,
00316                 2,
00317                 1,
00318                 100,
00319                 PGC_POSTMASTER,
00320                 0,
00321                 NULL,
00322                 NULL,
00323                 NULL);
00324 
00325     /* set up common data for all our workers */
00326     worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
00327         BGWORKER_BACKEND_DATABASE_CONNECTION;
00328     worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
00329     worker.bgw_restart_time = BGW_NEVER_RESTART;
00330     worker.bgw_main = worker_spi_main;
00331     worker.bgw_sighup = worker_spi_sighup;
00332     worker.bgw_sigterm = worker_spi_sigterm;
00333 
00334     /*
00335      * Now fill in worker-specific data, and do the actual registrations.
00336      */
00337     for (i = 1; i <= worker_spi_total_workers; i++)
00338     {
00339         sprintf(name, "worker %d", i);
00340         worker.bgw_name = pstrdup(name);
00341 
00342         table = palloc(sizeof(worktable));
00343         sprintf(name, "schema%d", i);
00344         table->schema = pstrdup(name);
00345         table->name = pstrdup("counted");
00346         worker.bgw_main_arg = (void *) table;
00347 
00348         RegisterBackgroundWorker(&worker);
00349     }
00350 }