00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include "postgres.h"
00024
00025
00026 #include "miscadmin.h"
00027 #include "postmaster/bgworker.h"
00028 #include "storage/ipc.h"
00029 #include "storage/latch.h"
00030 #include "storage/lwlock.h"
00031 #include "storage/proc.h"
00032 #include "storage/shmem.h"
00033
00034
00035 #include "access/xact.h"
00036 #include "executor/spi.h"
00037 #include "fmgr.h"
00038 #include "lib/stringinfo.h"
00039 #include "pgstat.h"
00040 #include "utils/builtins.h"
00041 #include "utils/snapmgr.h"
00042 #include "tcop/utility.h"
00043
00044 PG_MODULE_MAGIC;
00045
00046 void _PG_init(void);
00047
00048
00049 static volatile sig_atomic_t got_sighup = false;
00050 static volatile sig_atomic_t got_sigterm = false;
00051
00052
00053 static int worker_spi_naptime = 10;
00054 static int worker_spi_total_workers = 2;
00055
00056
00057 typedef struct worktable
00058 {
00059 const char *schema;
00060 const char *name;
00061 } worktable;
00062
00063
00064
00065
00066
00067
00068 static void
00069 worker_spi_sigterm(SIGNAL_ARGS)
00070 {
00071 int save_errno = errno;
00072
00073 got_sigterm = true;
00074 if (MyProc)
00075 SetLatch(&MyProc->procLatch);
00076
00077 errno = save_errno;
00078 }
00079
00080
00081
00082
00083
00084
00085 static void
00086 worker_spi_sighup(SIGNAL_ARGS)
00087 {
00088 got_sighup = true;
00089 if (MyProc)
00090 SetLatch(&MyProc->procLatch);
00091 }
00092
00093
00094
00095
00096
00097 static void
00098 initialize_worker_spi(worktable *table)
00099 {
00100 int ret;
00101 int ntup;
00102 bool isnull;
00103 StringInfoData buf;
00104
00105 SetCurrentStatementStartTimestamp();
00106 StartTransactionCommand();
00107 SPI_connect();
00108 PushActiveSnapshot(GetTransactionSnapshot());
00109 pgstat_report_activity(STATE_RUNNING, "initializing spi_worker schema");
00110
00111
00112 initStringInfo(&buf);
00113 appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
00114 table->schema);
00115
00116 ret = SPI_execute(buf.data, true, 0);
00117 if (ret != SPI_OK_SELECT)
00118 elog(FATAL, "SPI_execute failed: error code %d", ret);
00119
00120 if (SPI_processed != 1)
00121 elog(FATAL, "not a singleton result");
00122
00123 ntup = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
00124 SPI_tuptable->tupdesc,
00125 1, &isnull));
00126 if (isnull)
00127 elog(FATAL, "null result");
00128
00129 if (ntup == 0)
00130 {
00131 resetStringInfo(&buf);
00132 appendStringInfo(&buf,
00133 "CREATE SCHEMA \"%s\" "
00134 "CREATE TABLE \"%s\" ("
00135 " type text CHECK (type IN ('total', 'delta')), "
00136 " value integer)"
00137 "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
00138 "WHERE type = 'total'",
00139 table->schema, table->name, table->name, table->name);
00140
00141
00142 SetCurrentStatementStartTimestamp();
00143
00144 ret = SPI_execute(buf.data, false, 0);
00145
00146 if (ret != SPI_OK_UTILITY)
00147 elog(FATAL, "failed to create my schema");
00148 }
00149
00150 SPI_finish();
00151 PopActiveSnapshot();
00152 CommitTransactionCommand();
00153 pgstat_report_activity(STATE_IDLE, NULL);
00154 }
00155
00156 static void
00157 worker_spi_main(void *main_arg)
00158 {
00159 worktable *table = (worktable *) main_arg;
00160 StringInfoData buf;
00161
00162
00163 BackgroundWorkerUnblockSignals();
00164
00165
00166 BackgroundWorkerInitializeConnection("postgres", NULL);
00167
00168 elog(LOG, "%s initialized with %s.%s",
00169 MyBgworkerEntry->bgw_name, table->schema, table->name);
00170 initialize_worker_spi(table);
00171
00172
00173
00174
00175
00176
00177
00178
00179 table->schema = quote_identifier(table->schema);
00180 table->name = quote_identifier(table->name);
00181
00182 initStringInfo(&buf);
00183 appendStringInfo(&buf,
00184 "WITH deleted AS (DELETE "
00185 "FROM %s.%s "
00186 "WHERE type = 'delta' RETURNING value), "
00187 "total AS (SELECT coalesce(sum(value), 0) as sum "
00188 "FROM deleted) "
00189 "UPDATE %s.%s "
00190 "SET value = %s.value + total.sum "
00191 "FROM total WHERE type = 'total' "
00192 "RETURNING %s.value",
00193 table->schema, table->name,
00194 table->schema, table->name,
00195 table->name,
00196 table->name);
00197
00198
00199
00200
00201 while (!got_sigterm)
00202 {
00203 int ret;
00204 int rc;
00205
00206
00207
00208
00209
00210
00211
00212 rc = WaitLatch(&MyProc->procLatch,
00213 WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
00214 worker_spi_naptime * 1000L);
00215 ResetLatch(&MyProc->procLatch);
00216
00217
00218 if (rc & WL_POSTMASTER_DEATH)
00219 proc_exit(1);
00220
00221
00222
00223
00224 if (got_sighup)
00225 {
00226 got_sighup = false;
00227 ProcessConfigFile(PGC_SIGHUP);
00228 }
00229
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246 SetCurrentStatementStartTimestamp();
00247 StartTransactionCommand();
00248 SPI_connect();
00249 PushActiveSnapshot(GetTransactionSnapshot());
00250 pgstat_report_activity(STATE_RUNNING, buf.data);
00251
00252
00253 ret = SPI_execute(buf.data, false, 0);
00254
00255 if (ret != SPI_OK_UPDATE_RETURNING)
00256 elog(FATAL, "cannot select from table %s.%s: error code %d",
00257 table->schema, table->name, ret);
00258
00259 if (SPI_processed > 0)
00260 {
00261 bool isnull;
00262 int32 val;
00263
00264 val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
00265 SPI_tuptable->tupdesc,
00266 1, &isnull));
00267 if (!isnull)
00268 elog(LOG, "%s: count in %s.%s is now %d",
00269 MyBgworkerEntry->bgw_name,
00270 table->schema, table->name, val);
00271 }
00272
00273
00274
00275
00276 SPI_finish();
00277 PopActiveSnapshot();
00278 CommitTransactionCommand();
00279 pgstat_report_activity(STATE_IDLE, NULL);
00280 }
00281
00282 proc_exit(0);
00283 }
00284
00285
00286
00287
00288
00289
00290
00291 void
00292 _PG_init(void)
00293 {
00294 BackgroundWorker worker;
00295 worktable *table;
00296 unsigned int i;
00297 char name[20];
00298
00299
00300 DefineCustomIntVariable("worker_spi.naptime",
00301 "Duration between each check (in seconds).",
00302 NULL,
00303 &worker_spi_naptime,
00304 10,
00305 1,
00306 INT_MAX,
00307 PGC_SIGHUP,
00308 0,
00309 NULL,
00310 NULL,
00311 NULL);
00312 DefineCustomIntVariable("worker_spi.total_workers",
00313 "Number of workers.",
00314 NULL,
00315 &worker_spi_total_workers,
00316 2,
00317 1,
00318 100,
00319 PGC_POSTMASTER,
00320 0,
00321 NULL,
00322 NULL,
00323 NULL);
00324
00325
00326 worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
00327 BGWORKER_BACKEND_DATABASE_CONNECTION;
00328 worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
00329 worker.bgw_restart_time = BGW_NEVER_RESTART;
00330 worker.bgw_main = worker_spi_main;
00331 worker.bgw_sighup = worker_spi_sighup;
00332 worker.bgw_sigterm = worker_spi_sigterm;
00333
00334
00335
00336
00337 for (i = 1; i <= worker_spi_total_workers; i++)
00338 {
00339 sprintf(name, "worker %d", i);
00340 worker.bgw_name = pstrdup(name);
00341
00342 table = palloc(sizeof(worktable));
00343 sprintf(name, "schema%d", i);
00344 table->schema = pstrdup(name);
00345 table->name = pstrdup("counted");
00346 worker.bgw_main_arg = (void *) table;
00347
00348 RegisterBackgroundWorker(&worker);
00349 }
00350 }