Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #include "postgres.h"
00017
00018 #include "access/htup_details.h"
00019 #include "executor/spi.h"
00020 #include "commands/async.h"
00021 #include "commands/trigger.h"
00022 #include "lib/stringinfo.h"
00023 #include "utils/rel.h"
00024 #include "utils/syscache.h"
00025
00026
00027 PG_MODULE_MAGIC;
00028
00029
00030
00031 Datum triggered_change_notification(PG_FUNCTION_ARGS);
00032
00033
00034
00035
00036
00037
00038 static void
00039 strcpy_quoted(StringInfo r, const char *s, const char q)
00040 {
00041 appendStringInfoCharMacro(r, q);
00042 while (*s)
00043 {
00044 if (*s == q)
00045 appendStringInfoCharMacro(r, q);
00046 appendStringInfoCharMacro(r, *s);
00047 s++;
00048 }
00049 appendStringInfoCharMacro(r, q);
00050 }
00051
00052
00053
00054
00055
00056
00057
00058
00059 PG_FUNCTION_INFO_V1(triggered_change_notification);
00060
00061 Datum
00062 triggered_change_notification(PG_FUNCTION_ARGS)
00063 {
00064 TriggerData *trigdata = (TriggerData *) fcinfo->context;
00065 Trigger *trigger;
00066 int nargs;
00067 HeapTuple trigtuple;
00068 Relation rel;
00069 TupleDesc tupdesc;
00070 char *channel;
00071 char operation;
00072 StringInfo payload = makeStringInfo();
00073 bool foundPK;
00074
00075 List *indexoidlist;
00076 ListCell *indexoidscan;
00077
00078
00079 if (!CALLED_AS_TRIGGER(fcinfo))
00080 ereport(ERROR,
00081 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
00082 errmsg("triggered_change_notification: must be called as trigger")));
00083
00084
00085 if (!TRIGGER_FIRED_AFTER(trigdata->tg_event))
00086 ereport(ERROR,
00087 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
00088 errmsg("triggered_change_notification: must be called after the change")));
00089
00090
00091 if (!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event))
00092 ereport(ERROR,
00093 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
00094 errmsg("triggered_change_notification: must be called for each row")));
00095
00096 if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
00097 operation = 'I';
00098 else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
00099 operation = 'U';
00100 else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
00101 operation = 'D';
00102 else
00103 {
00104 elog(ERROR, "triggered_change_notification: trigger fired by unrecognized operation");
00105 operation = 'X';
00106 }
00107
00108 trigger = trigdata->tg_trigger;
00109 nargs = trigger->tgnargs;
00110 if (nargs > 1)
00111 ereport(ERROR,
00112 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
00113 errmsg("triggered_change_notification: must not be called with more than one parameter")));
00114
00115 if (nargs == 0)
00116 channel = "tcn";
00117 else
00118 channel = trigger->tgargs[0];
00119
00120
00121 trigtuple = trigdata->tg_trigtuple;
00122 rel = trigdata->tg_relation;
00123 tupdesc = rel->rd_att;
00124
00125 foundPK = false;
00126
00127
00128
00129
00130
00131
00132 indexoidlist = RelationGetIndexList(rel);
00133
00134 foreach(indexoidscan, indexoidlist)
00135 {
00136 Oid indexoid = lfirst_oid(indexoidscan);
00137 HeapTuple indexTuple;
00138 Form_pg_index index;
00139
00140 indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
00141 if (!HeapTupleIsValid(indexTuple))
00142 elog(ERROR, "cache lookup failed for index %u", indexoid);
00143 index = (Form_pg_index) GETSTRUCT(indexTuple);
00144
00145 if (index->indisprimary && IndexIsValid(index))
00146 {
00147 int numatts = index->indnatts;
00148
00149 if (numatts > 0)
00150 {
00151 int i;
00152
00153 foundPK = true;
00154
00155 strcpy_quoted(payload, RelationGetRelationName(rel), '"');
00156 appendStringInfoCharMacro(payload, ',');
00157 appendStringInfoCharMacro(payload, operation);
00158
00159 for (i = 0; i < numatts; i++)
00160 {
00161 int colno = index->indkey.values[i];
00162
00163 appendStringInfoCharMacro(payload, ',');
00164 strcpy_quoted(payload, NameStr((tupdesc->attrs[colno - 1])->attname), '"');
00165 appendStringInfoCharMacro(payload, '=');
00166 strcpy_quoted(payload, SPI_getvalue(trigtuple, tupdesc, colno), '\'');
00167 }
00168
00169 Async_Notify(channel, payload->data);
00170 }
00171 ReleaseSysCache(indexTuple);
00172 break;
00173 }
00174 ReleaseSysCache(indexTuple);
00175 }
00176
00177 list_free(indexoidlist);
00178
00179 if (!foundPK)
00180 ereport(ERROR,
00181 (errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
00182 errmsg("triggered_change_notification: must be called on a table with a primary key")));
00183
00184 return PointerGetDatum(NULL);
00185 }