Header And Logo

PostgreSQL
| The world's most advanced open source database.

tcn.c

Go to the documentation of this file.
00001 /*-------------------------------------------------------------------------
00002  *
00003  * tcn.c
00004  *    triggered change notification support for PostgreSQL
00005  *
00006  * Portions Copyright (c) 2011-2013, PostgreSQL Global Development Group
00007  * Portions Copyright (c) 1994, Regents of the University of California
00008  *
00009  *
00010  * IDENTIFICATION
00011  *    contrib/tcn/tcn.c
00012  *
00013  *-------------------------------------------------------------------------
00014  */
00015 
00016 #include "postgres.h"
00017 
00018 #include "access/htup_details.h"
00019 #include "executor/spi.h"
00020 #include "commands/async.h"
00021 #include "commands/trigger.h"
00022 #include "lib/stringinfo.h"
00023 #include "utils/rel.h"
00024 #include "utils/syscache.h"
00025 
00026 
00027 PG_MODULE_MAGIC;
00028 
00029 
00030 /* forward declarations */
00031 Datum       triggered_change_notification(PG_FUNCTION_ARGS);
00032 
00033 
00034 /*
00035  * Copy from s (for source) to r (for result), wrapping with q (quote)
00036  * characters and doubling any quote characters found.
00037  */
00038 static void
00039 strcpy_quoted(StringInfo r, const char *s, const char q)
00040 {
00041     appendStringInfoCharMacro(r, q);
00042     while (*s)
00043     {
00044         if (*s == q)
00045             appendStringInfoCharMacro(r, q);
00046         appendStringInfoCharMacro(r, *s);
00047         s++;
00048     }
00049     appendStringInfoCharMacro(r, q);
00050 }
00051 
00052 /*
00053  * triggered_change_notification
00054  *
00055  * This trigger function will send a notification of data modification with
00056  * primary key values.  The channel will be "tcn" unless the trigger is
00057  * created with a parameter, in which case that parameter will be used.
00058  */
00059 PG_FUNCTION_INFO_V1(triggered_change_notification);
00060 
00061 Datum
00062 triggered_change_notification(PG_FUNCTION_ARGS)
00063 {
00064     TriggerData *trigdata = (TriggerData *) fcinfo->context;
00065     Trigger    *trigger;
00066     int         nargs;
00067     HeapTuple   trigtuple;
00068     Relation    rel;
00069     TupleDesc   tupdesc;
00070     char       *channel;
00071     char        operation;
00072     StringInfo  payload = makeStringInfo();
00073     bool        foundPK;
00074 
00075     List       *indexoidlist;
00076     ListCell   *indexoidscan;
00077 
00078     /* make sure it's called as a trigger */
00079     if (!CALLED_AS_TRIGGER(fcinfo))
00080         ereport(ERROR,
00081                 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
00082         errmsg("triggered_change_notification: must be called as trigger")));
00083 
00084     /* and that it's called after the change */
00085     if (!TRIGGER_FIRED_AFTER(trigdata->tg_event))
00086         ereport(ERROR,
00087                 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
00088                  errmsg("triggered_change_notification: must be called after the change")));
00089 
00090     /* and that it's called for each row */
00091     if (!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event))
00092         ereport(ERROR,
00093                 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
00094                  errmsg("triggered_change_notification: must be called for each row")));
00095 
00096     if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
00097         operation = 'I';
00098     else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
00099         operation = 'U';
00100     else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
00101         operation = 'D';
00102     else
00103     {
00104         elog(ERROR, "triggered_change_notification: trigger fired by unrecognized operation");
00105         operation = 'X';        /* silence compiler warning */
00106     }
00107 
00108     trigger = trigdata->tg_trigger;
00109     nargs = trigger->tgnargs;
00110     if (nargs > 1)
00111         ereport(ERROR,
00112                 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
00113                  errmsg("triggered_change_notification: must not be called with more than one parameter")));
00114 
00115     if (nargs == 0)
00116         channel = "tcn";
00117     else
00118         channel = trigger->tgargs[0];
00119 
00120     /* get tuple data */
00121     trigtuple = trigdata->tg_trigtuple;
00122     rel = trigdata->tg_relation;
00123     tupdesc = rel->rd_att;
00124 
00125     foundPK = false;
00126 
00127     /*
00128      * Get the list of index OIDs for the table from the relcache, and look up
00129      * each one in the pg_index syscache until we find one marked primary key
00130      * (hopefully there isn't more than one such).
00131      */
00132     indexoidlist = RelationGetIndexList(rel);
00133 
00134     foreach(indexoidscan, indexoidlist)
00135     {
00136         Oid         indexoid = lfirst_oid(indexoidscan);
00137         HeapTuple   indexTuple;
00138         Form_pg_index index;
00139 
00140         indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
00141         if (!HeapTupleIsValid(indexTuple))      /* should not happen */
00142             elog(ERROR, "cache lookup failed for index %u", indexoid);
00143         index = (Form_pg_index) GETSTRUCT(indexTuple);
00144         /* we're only interested if it is the primary key and valid */
00145         if (index->indisprimary && IndexIsValid(index))
00146         {
00147             int         numatts = index->indnatts;
00148 
00149             if (numatts > 0)
00150             {
00151                 int         i;
00152 
00153                 foundPK = true;
00154 
00155                 strcpy_quoted(payload, RelationGetRelationName(rel), '"');
00156                 appendStringInfoCharMacro(payload, ',');
00157                 appendStringInfoCharMacro(payload, operation);
00158 
00159                 for (i = 0; i < numatts; i++)
00160                 {
00161                     int         colno = index->indkey.values[i];
00162 
00163                     appendStringInfoCharMacro(payload, ',');
00164                     strcpy_quoted(payload, NameStr((tupdesc->attrs[colno - 1])->attname), '"');
00165                     appendStringInfoCharMacro(payload, '=');
00166                     strcpy_quoted(payload, SPI_getvalue(trigtuple, tupdesc, colno), '\'');
00167                 }
00168 
00169                 Async_Notify(channel, payload->data);
00170             }
00171             ReleaseSysCache(indexTuple);
00172             break;
00173         }
00174         ReleaseSysCache(indexTuple);
00175     }
00176 
00177     list_free(indexoidlist);
00178 
00179     if (!foundPK)
00180         ereport(ERROR,
00181                 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
00182                  errmsg("triggered_change_notification: must be called on a table with a primary key")));
00183 
00184     return PointerGetDatum(NULL);       /* after trigger; value doesn't matter */
00185 }