00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #include "postgres.h"
00014
00015 #include "access/sysattr.h"
00016 #include "catalog/pg_type.h"
00017 #include "executor/executor.h"
00018 #include "utils/builtins.h"
00019 #include "utils/lsyscache.h"
00020 #include "utils/portal.h"
00021 #include "utils/rel.h"
00022
00023
00024 static char *fetch_cursor_param_value(ExprContext *econtext, int paramId);
00025 static ScanState *search_plan_tree(PlanState *node, Oid table_oid);
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 bool
00041 execCurrentOf(CurrentOfExpr *cexpr,
00042 ExprContext *econtext,
00043 Oid table_oid,
00044 ItemPointer current_tid)
00045 {
00046 char *cursor_name;
00047 char *table_name;
00048 Portal portal;
00049 QueryDesc *queryDesc;
00050
00051
00052 if (cexpr->cursor_name)
00053 cursor_name = cexpr->cursor_name;
00054 else
00055 cursor_name = fetch_cursor_param_value(econtext, cexpr->cursor_param);
00056
00057
00058 table_name = get_rel_name(table_oid);
00059 if (table_name == NULL)
00060 elog(ERROR, "cache lookup failed for relation %u", table_oid);
00061
00062
00063 portal = GetPortalByName(cursor_name);
00064 if (!PortalIsValid(portal))
00065 ereport(ERROR,
00066 (errcode(ERRCODE_UNDEFINED_CURSOR),
00067 errmsg("cursor \"%s\" does not exist", cursor_name)));
00068
00069
00070
00071
00072
00073 if (portal->strategy != PORTAL_ONE_SELECT)
00074 ereport(ERROR,
00075 (errcode(ERRCODE_INVALID_CURSOR_STATE),
00076 errmsg("cursor \"%s\" is not a SELECT query",
00077 cursor_name)));
00078 queryDesc = PortalGetQueryDesc(portal);
00079 if (queryDesc == NULL || queryDesc->estate == NULL)
00080 ereport(ERROR,
00081 (errcode(ERRCODE_INVALID_CURSOR_STATE),
00082 errmsg("cursor \"%s\" is held from a previous transaction",
00083 cursor_name)));
00084
00085
00086
00087
00088
00089
00090
00091
00092 if (queryDesc->estate->es_rowMarks)
00093 {
00094 ExecRowMark *erm;
00095 ListCell *lc;
00096
00097
00098
00099
00100
00101 erm = NULL;
00102 foreach(lc, queryDesc->estate->es_rowMarks)
00103 {
00104 ExecRowMark *thiserm = (ExecRowMark *) lfirst(lc);
00105
00106 if (!RowMarkRequiresRowShareLock(thiserm->markType))
00107 continue;
00108
00109 if (RelationGetRelid(thiserm->relation) == table_oid)
00110 {
00111 if (erm)
00112 ereport(ERROR,
00113 (errcode(ERRCODE_INVALID_CURSOR_STATE),
00114 errmsg("cursor \"%s\" has multiple FOR UPDATE/SHARE references to table \"%s\"",
00115 cursor_name, table_name)));
00116 erm = thiserm;
00117 }
00118 }
00119
00120 if (erm == NULL)
00121 ereport(ERROR,
00122 (errcode(ERRCODE_INVALID_CURSOR_STATE),
00123 errmsg("cursor \"%s\" does not have a FOR UPDATE/SHARE reference to table \"%s\"",
00124 cursor_name, table_name)));
00125
00126
00127
00128
00129
00130 if (portal->atStart || portal->atEnd)
00131 ereport(ERROR,
00132 (errcode(ERRCODE_INVALID_CURSOR_STATE),
00133 errmsg("cursor \"%s\" is not positioned on a row",
00134 cursor_name)));
00135
00136
00137 if (ItemPointerIsValid(&(erm->curCtid)))
00138 {
00139 *current_tid = erm->curCtid;
00140 return true;
00141 }
00142
00143
00144
00145
00146
00147
00148 return false;
00149 }
00150 else
00151 {
00152 ScanState *scanstate;
00153 bool lisnull;
00154 Oid tuple_tableoid PG_USED_FOR_ASSERTS_ONLY;
00155 ItemPointer tuple_tid;
00156
00157
00158
00159
00160
00161
00162 scanstate = search_plan_tree(queryDesc->planstate, table_oid);
00163 if (!scanstate)
00164 ereport(ERROR,
00165 (errcode(ERRCODE_INVALID_CURSOR_STATE),
00166 errmsg("cursor \"%s\" is not a simply updatable scan of table \"%s\"",
00167 cursor_name, table_name)));
00168
00169
00170
00171
00172
00173
00174
00175
00176 if (portal->atStart || portal->atEnd)
00177 ereport(ERROR,
00178 (errcode(ERRCODE_INVALID_CURSOR_STATE),
00179 errmsg("cursor \"%s\" is not positioned on a row",
00180 cursor_name)));
00181
00182
00183 if (TupIsNull(scanstate->ss_ScanTupleSlot))
00184 return false;
00185
00186
00187 tuple_tableoid =
00188 DatumGetObjectId(slot_getattr(scanstate->ss_ScanTupleSlot,
00189 TableOidAttributeNumber,
00190 &lisnull));
00191 Assert(!lisnull);
00192 tuple_tid = (ItemPointer)
00193 DatumGetPointer(slot_getattr(scanstate->ss_ScanTupleSlot,
00194 SelfItemPointerAttributeNumber,
00195 &lisnull));
00196 Assert(!lisnull);
00197
00198 Assert(tuple_tableoid == table_oid);
00199
00200 *current_tid = *tuple_tid;
00201
00202 return true;
00203 }
00204 }
00205
00206
00207
00208
00209
00210
00211 static char *
00212 fetch_cursor_param_value(ExprContext *econtext, int paramId)
00213 {
00214 ParamListInfo paramInfo = econtext->ecxt_param_list_info;
00215
00216 if (paramInfo &&
00217 paramId > 0 && paramId <= paramInfo->numParams)
00218 {
00219 ParamExternData *prm = ¶mInfo->params[paramId - 1];
00220
00221
00222 if (!OidIsValid(prm->ptype) && paramInfo->paramFetch != NULL)
00223 (*paramInfo->paramFetch) (paramInfo, paramId);
00224
00225 if (OidIsValid(prm->ptype) && !prm->isnull)
00226 {
00227
00228 if (prm->ptype != REFCURSOROID)
00229 ereport(ERROR,
00230 (errcode(ERRCODE_DATATYPE_MISMATCH),
00231 errmsg("type of parameter %d (%s) does not match that when preparing the plan (%s)",
00232 paramId,
00233 format_type_be(prm->ptype),
00234 format_type_be(REFCURSOROID))));
00235
00236
00237 return TextDatumGetCString(prm->value);
00238 }
00239 }
00240
00241 ereport(ERROR,
00242 (errcode(ERRCODE_UNDEFINED_OBJECT),
00243 errmsg("no value found for parameter %d", paramId)));
00244 return NULL;
00245 }
00246
00247
00248
00249
00250
00251
00252
00253 static ScanState *
00254 search_plan_tree(PlanState *node, Oid table_oid)
00255 {
00256 if (node == NULL)
00257 return NULL;
00258 switch (nodeTag(node))
00259 {
00260
00261
00262
00263 case T_SeqScanState:
00264 case T_IndexScanState:
00265 case T_IndexOnlyScanState:
00266 case T_BitmapHeapScanState:
00267 case T_TidScanState:
00268 {
00269 ScanState *sstate = (ScanState *) node;
00270
00271 if (RelationGetRelid(sstate->ss_currentRelation) == table_oid)
00272 return sstate;
00273 break;
00274 }
00275
00276
00277
00278
00279
00280 case T_AppendState:
00281 {
00282 AppendState *astate = (AppendState *) node;
00283 ScanState *result = NULL;
00284 int i;
00285
00286 for (i = 0; i < astate->as_nplans; i++)
00287 {
00288 ScanState *elem = search_plan_tree(astate->appendplans[i],
00289 table_oid);
00290
00291 if (!elem)
00292 continue;
00293 if (result)
00294 return NULL;
00295 result = elem;
00296 }
00297 return result;
00298 }
00299
00300
00301
00302
00303 case T_MergeAppendState:
00304 {
00305 MergeAppendState *mstate = (MergeAppendState *) node;
00306 ScanState *result = NULL;
00307 int i;
00308
00309 for (i = 0; i < mstate->ms_nplans; i++)
00310 {
00311 ScanState *elem = search_plan_tree(mstate->mergeplans[i],
00312 table_oid);
00313
00314 if (!elem)
00315 continue;
00316 if (result)
00317 return NULL;
00318 result = elem;
00319 }
00320 return result;
00321 }
00322
00323
00324
00325
00326
00327 case T_ResultState:
00328 case T_LimitState:
00329 return search_plan_tree(node->lefttree, table_oid);
00330
00331
00332
00333
00334 case T_SubqueryScanState:
00335 return search_plan_tree(((SubqueryScanState *) node)->subplan,
00336 table_oid);
00337
00338 default:
00339
00340 break;
00341 }
00342 return NULL;
00343 }