Header And Logo

PostgreSQL
| The world's most advanced open source database.

plpy_spi.c

Go to the documentation of this file.
00001 /*
00002  * interface to SPI functions
00003  *
00004  * src/pl/plpython/plpy_spi.c
00005  */
00006 
00007 #include "postgres.h"
00008 
00009 #include "access/htup_details.h"
00010 #include "access/xact.h"
00011 #include "catalog/pg_type.h"
00012 #include "executor/spi.h"
00013 #include "mb/pg_wchar.h"
00014 #include "parser/parse_type.h"
00015 #include "utils/memutils.h"
00016 #include "utils/syscache.h"
00017 
00018 #include "plpython.h"
00019 
00020 #include "plpy_spi.h"
00021 
00022 #include "plpy_elog.h"
00023 #include "plpy_main.h"
00024 #include "plpy_planobject.h"
00025 #include "plpy_plpymodule.h"
00026 #include "plpy_procedure.h"
00027 #include "plpy_resultobject.h"
00028 
00029 
00030 static PyObject *PLy_spi_execute_query(char *query, long limit);
00031 static PyObject *PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit);
00032 static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable, int rows, int status);
00033 static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata);
00034 
00035 
00036 /* prepare(query="select * from foo")
00037  * prepare(query="select * from foo where bar = $1", params=["text"])
00038  * prepare(query="select * from foo where bar = $1", params=["text"], limit=5)
00039  */
00040 PyObject *
00041 PLy_spi_prepare(PyObject *self, PyObject *args)
00042 {
00043     PLyPlanObject *plan;
00044     PyObject   *list = NULL;
00045     PyObject   *volatile optr = NULL;
00046     char       *query;
00047     volatile MemoryContext oldcontext;
00048     volatile ResourceOwner oldowner;
00049     volatile int nargs;
00050 
00051     if (!PyArg_ParseTuple(args, "s|O", &query, &list))
00052         return NULL;
00053 
00054     if (list && (!PySequence_Check(list)))
00055     {
00056         PLy_exception_set(PyExc_TypeError,
00057                        "second argument of plpy.prepare must be a sequence");
00058         return NULL;
00059     }
00060 
00061     if ((plan = (PLyPlanObject *) PLy_plan_new()) == NULL)
00062         return NULL;
00063 
00064     nargs = list ? PySequence_Length(list) : 0;
00065 
00066     plan->nargs = nargs;
00067     plan->types = nargs ? PLy_malloc(sizeof(Oid) * nargs) : NULL;
00068     plan->values = nargs ? PLy_malloc(sizeof(Datum) * nargs) : NULL;
00069     plan->args = nargs ? PLy_malloc(sizeof(PLyTypeInfo) * nargs) : NULL;
00070 
00071     oldcontext = CurrentMemoryContext;
00072     oldowner = CurrentResourceOwner;
00073 
00074     PLy_spi_subtransaction_begin(oldcontext, oldowner);
00075 
00076     PG_TRY();
00077     {
00078         int         i;
00079 
00080         /*
00081          * the other loop might throw an exception, if PLyTypeInfo member
00082          * isn't properly initialized the Py_DECREF(plan) will go boom
00083          */
00084         for (i = 0; i < nargs; i++)
00085         {
00086             PLy_typeinfo_init(&plan->args[i]);
00087             plan->values[i] = PointerGetDatum(NULL);
00088         }
00089 
00090         for (i = 0; i < nargs; i++)
00091         {
00092             char       *sptr;
00093             HeapTuple   typeTup;
00094             Oid         typeId;
00095             int32       typmod;
00096             Form_pg_type typeStruct;
00097 
00098             optr = PySequence_GetItem(list, i);
00099             if (PyString_Check(optr))
00100                 sptr = PyString_AsString(optr);
00101             else if (PyUnicode_Check(optr))
00102                 sptr = PLyUnicode_AsString(optr);
00103             else
00104             {
00105                 ereport(ERROR,
00106                         (errmsg("plpy.prepare: type name at ordinal position %d is not a string", i)));
00107                 sptr = NULL;    /* keep compiler quiet */
00108             }
00109 
00110             /********************************************************
00111              * Resolve argument type names and then look them up by
00112              * oid in the system cache, and remember the required
00113              *information for input conversion.
00114              ********************************************************/
00115 
00116             parseTypeString(sptr, &typeId, &typmod);
00117 
00118             typeTup = SearchSysCache1(TYPEOID,
00119                                       ObjectIdGetDatum(typeId));
00120             if (!HeapTupleIsValid(typeTup))
00121                 elog(ERROR, "cache lookup failed for type %u", typeId);
00122 
00123             Py_DECREF(optr);
00124 
00125             /*
00126              * set optr to NULL, so we won't try to unref it again in case of
00127              * an error
00128              */
00129             optr = NULL;
00130 
00131             plan->types[i] = typeId;
00132             typeStruct = (Form_pg_type) GETSTRUCT(typeTup);
00133             if (typeStruct->typtype != TYPTYPE_COMPOSITE)
00134                 PLy_output_datum_func(&plan->args[i], typeTup);
00135             else
00136                 ereport(ERROR,
00137                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
00138                    errmsg("plpy.prepare does not support composite types")));
00139             ReleaseSysCache(typeTup);
00140         }
00141 
00142         pg_verifymbstr(query, strlen(query), false);
00143         plan->plan = SPI_prepare(query, plan->nargs, plan->types);
00144         if (plan->plan == NULL)
00145             elog(ERROR, "SPI_prepare failed: %s",
00146                  SPI_result_code_string(SPI_result));
00147 
00148         /* transfer plan from procCxt to topCxt */
00149         if (SPI_keepplan(plan->plan))
00150             elog(ERROR, "SPI_keepplan failed");
00151 
00152         PLy_spi_subtransaction_commit(oldcontext, oldowner);
00153     }
00154     PG_CATCH();
00155     {
00156         Py_DECREF(plan);
00157         Py_XDECREF(optr);
00158 
00159         PLy_spi_subtransaction_abort(oldcontext, oldowner);
00160         return NULL;
00161     }
00162     PG_END_TRY();
00163 
00164     Assert(plan->plan != NULL);
00165     return (PyObject *) plan;
00166 }
00167 
00168 /* execute(query="select * from foo", limit=5)
00169  * execute(plan=plan, values=(foo, bar), limit=5)
00170  */
00171 PyObject *
00172 PLy_spi_execute(PyObject *self, PyObject *args)
00173 {
00174     char       *query;
00175     PyObject   *plan;
00176     PyObject   *list = NULL;
00177     long        limit = 0;
00178 
00179     if (PyArg_ParseTuple(args, "s|l", &query, &limit))
00180         return PLy_spi_execute_query(query, limit);
00181 
00182     PyErr_Clear();
00183 
00184     if (PyArg_ParseTuple(args, "O|Ol", &plan, &list, &limit) &&
00185         is_PLyPlanObject(plan))
00186         return PLy_spi_execute_plan(plan, list, limit);
00187 
00188     PLy_exception_set(PLy_exc_error, "plpy.execute expected a query or a plan");
00189     return NULL;
00190 }
00191 
00192 static PyObject *
00193 PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
00194 {
00195     volatile int nargs;
00196     int         i,
00197                 rv;
00198     PLyPlanObject *plan;
00199     volatile MemoryContext oldcontext;
00200     volatile ResourceOwner oldowner;
00201     PyObject   *ret;
00202 
00203     if (list != NULL)
00204     {
00205         if (!PySequence_Check(list) || PyString_Check(list) || PyUnicode_Check(list))
00206         {
00207             PLy_exception_set(PyExc_TypeError, "plpy.execute takes a sequence as its second argument");
00208             return NULL;
00209         }
00210         nargs = PySequence_Length(list);
00211     }
00212     else
00213         nargs = 0;
00214 
00215     plan = (PLyPlanObject *) ob;
00216 
00217     if (nargs != plan->nargs)
00218     {
00219         char       *sv;
00220         PyObject   *so = PyObject_Str(list);
00221 
00222         if (!so)
00223             PLy_elog(ERROR, "could not execute plan");
00224         sv = PyString_AsString(so);
00225         PLy_exception_set_plural(PyExc_TypeError,
00226                               "Expected sequence of %d argument, got %d: %s",
00227                              "Expected sequence of %d arguments, got %d: %s",
00228                                  plan->nargs,
00229                                  plan->nargs, nargs, sv);
00230         Py_DECREF(so);
00231 
00232         return NULL;
00233     }
00234 
00235     oldcontext = CurrentMemoryContext;
00236     oldowner = CurrentResourceOwner;
00237 
00238     PLy_spi_subtransaction_begin(oldcontext, oldowner);
00239 
00240     PG_TRY();
00241     {
00242         PLyExecutionContext *exec_ctx = PLy_current_execution_context();
00243         char       *volatile nulls;
00244         volatile int j;
00245 
00246         if (nargs > 0)
00247             nulls = palloc(nargs * sizeof(char));
00248         else
00249             nulls = NULL;
00250 
00251         for (j = 0; j < nargs; j++)
00252         {
00253             PyObject   *elem;
00254 
00255             elem = PySequence_GetItem(list, j);
00256             if (elem != Py_None)
00257             {
00258                 PG_TRY();
00259                 {
00260                     plan->values[j] =
00261                         plan->args[j].out.d.func(&(plan->args[j].out.d),
00262                                                  -1,
00263                                                  elem);
00264                 }
00265                 PG_CATCH();
00266                 {
00267                     Py_DECREF(elem);
00268                     PG_RE_THROW();
00269                 }
00270                 PG_END_TRY();
00271 
00272                 Py_DECREF(elem);
00273                 nulls[j] = ' ';
00274             }
00275             else
00276             {
00277                 Py_DECREF(elem);
00278                 plan->values[j] =
00279                     InputFunctionCall(&(plan->args[j].out.d.typfunc),
00280                                       NULL,
00281                                       plan->args[j].out.d.typioparam,
00282                                       -1);
00283                 nulls[j] = 'n';
00284             }
00285         }
00286 
00287         rv = SPI_execute_plan(plan->plan, plan->values, nulls,
00288                               exec_ctx->curr_proc->fn_readonly, limit);
00289         ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
00290 
00291         if (nargs > 0)
00292             pfree(nulls);
00293 
00294         PLy_spi_subtransaction_commit(oldcontext, oldowner);
00295     }
00296     PG_CATCH();
00297     {
00298         int         k;
00299 
00300         /*
00301          * cleanup plan->values array
00302          */
00303         for (k = 0; k < nargs; k++)
00304         {
00305             if (!plan->args[k].out.d.typbyval &&
00306                 (plan->values[k] != PointerGetDatum(NULL)))
00307             {
00308                 pfree(DatumGetPointer(plan->values[k]));
00309                 plan->values[k] = PointerGetDatum(NULL);
00310             }
00311         }
00312 
00313         PLy_spi_subtransaction_abort(oldcontext, oldowner);
00314         return NULL;
00315     }
00316     PG_END_TRY();
00317 
00318     for (i = 0; i < nargs; i++)
00319     {
00320         if (!plan->args[i].out.d.typbyval &&
00321             (plan->values[i] != PointerGetDatum(NULL)))
00322         {
00323             pfree(DatumGetPointer(plan->values[i]));
00324             plan->values[i] = PointerGetDatum(NULL);
00325         }
00326     }
00327 
00328     if (rv < 0)
00329     {
00330         PLy_exception_set(PLy_exc_spi_error,
00331                           "SPI_execute_plan failed: %s",
00332                           SPI_result_code_string(rv));
00333         return NULL;
00334     }
00335 
00336     return ret;
00337 }
00338 
00339 static PyObject *
00340 PLy_spi_execute_query(char *query, long limit)
00341 {
00342     int         rv;
00343     volatile MemoryContext oldcontext;
00344     volatile ResourceOwner oldowner;
00345     PyObject   *ret = NULL;
00346 
00347     oldcontext = CurrentMemoryContext;
00348     oldowner = CurrentResourceOwner;
00349 
00350     PLy_spi_subtransaction_begin(oldcontext, oldowner);
00351 
00352     PG_TRY();
00353     {
00354         PLyExecutionContext *exec_ctx = PLy_current_execution_context();
00355 
00356         pg_verifymbstr(query, strlen(query), false);
00357         rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, limit);
00358         ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
00359 
00360         PLy_spi_subtransaction_commit(oldcontext, oldowner);
00361     }
00362     PG_CATCH();
00363     {
00364         PLy_spi_subtransaction_abort(oldcontext, oldowner);
00365         return NULL;
00366     }
00367     PG_END_TRY();
00368 
00369     if (rv < 0)
00370     {
00371         Py_XDECREF(ret);
00372         PLy_exception_set(PLy_exc_spi_error,
00373                           "SPI_execute failed: %s",
00374                           SPI_result_code_string(rv));
00375         return NULL;
00376     }
00377 
00378     return ret;
00379 }
00380 
00381 static PyObject *
00382 PLy_spi_execute_fetch_result(SPITupleTable *tuptable, int rows, int status)
00383 {
00384     PLyResultObject *result;
00385     volatile MemoryContext oldcontext;
00386 
00387     result = (PLyResultObject *) PLy_result_new();
00388     Py_DECREF(result->status);
00389     result->status = PyInt_FromLong(status);
00390 
00391     if (status > 0 && tuptable == NULL)
00392     {
00393         Py_DECREF(result->nrows);
00394         result->nrows = PyInt_FromLong(rows);
00395     }
00396     else if (status > 0 && tuptable != NULL)
00397     {
00398         PLyTypeInfo args;
00399         int         i;
00400 
00401         Py_DECREF(result->nrows);
00402         result->nrows = PyInt_FromLong(rows);
00403         PLy_typeinfo_init(&args);
00404 
00405         oldcontext = CurrentMemoryContext;
00406         PG_TRY();
00407         {
00408             MemoryContext oldcontext2;
00409 
00410             /*
00411              * Save tuple descriptor for later use by result set metadata
00412              * functions.  Save it in TopMemoryContext so that it survives
00413              * outside of an SPI context.  We trust that PLy_result_dealloc()
00414              * will clean it up when the time is right.
00415              */
00416             oldcontext2 = MemoryContextSwitchTo(TopMemoryContext);
00417             result->tupdesc = CreateTupleDescCopy(tuptable->tupdesc);
00418             MemoryContextSwitchTo(oldcontext2);
00419 
00420             if (rows)
00421             {
00422                 Py_DECREF(result->rows);
00423                 result->rows = PyList_New(rows);
00424 
00425                 PLy_input_tuple_funcs(&args, tuptable->tupdesc);
00426                 for (i = 0; i < rows; i++)
00427                 {
00428                     PyObject   *row = PLyDict_FromTuple(&args, tuptable->vals[i],
00429                                                         tuptable->tupdesc);
00430 
00431                     PyList_SetItem(result->rows, i, row);
00432                 }
00433             }
00434         }
00435         PG_CATCH();
00436         {
00437             MemoryContextSwitchTo(oldcontext);
00438             if (!PyErr_Occurred())
00439                 PLy_exception_set(PLy_exc_error,
00440                        "unrecognized error in PLy_spi_execute_fetch_result");
00441             PLy_typeinfo_dealloc(&args);
00442             SPI_freetuptable(tuptable);
00443             Py_DECREF(result);
00444             return NULL;
00445         }
00446         PG_END_TRY();
00447 
00448         PLy_typeinfo_dealloc(&args);
00449         SPI_freetuptable(tuptable);
00450     }
00451 
00452     return (PyObject *) result;
00453 }
00454 
00455 /*
00456  * Utilities for running SPI functions in subtransactions.
00457  *
00458  * Usage:
00459  *
00460  *  MemoryContext oldcontext = CurrentMemoryContext;
00461  *  ResourceOwner oldowner = CurrentResourceOwner;
00462  *
00463  *  PLy_spi_subtransaction_begin(oldcontext, oldowner);
00464  *  PG_TRY();
00465  *  {
00466  *      <call SPI functions>
00467  *      PLy_spi_subtransaction_commit(oldcontext, oldowner);
00468  *  }
00469  *  PG_CATCH();
00470  *  {
00471  *      <do cleanup>
00472  *      PLy_spi_subtransaction_abort(oldcontext, oldowner);
00473  *      return NULL;
00474  *  }
00475  *  PG_END_TRY();
00476  *
00477  * These utilities take care of restoring connection to the SPI manager and
00478  * setting a Python exception in case of an abort.
00479  */
00480 void
00481 PLy_spi_subtransaction_begin(MemoryContext oldcontext, ResourceOwner oldowner)
00482 {
00483     BeginInternalSubTransaction(NULL);
00484     /* Want to run inside function's memory context */
00485     MemoryContextSwitchTo(oldcontext);
00486 }
00487 
00488 void
00489 PLy_spi_subtransaction_commit(MemoryContext oldcontext, ResourceOwner oldowner)
00490 {
00491     /* Commit the inner transaction, return to outer xact context */
00492     ReleaseCurrentSubTransaction();
00493     MemoryContextSwitchTo(oldcontext);
00494     CurrentResourceOwner = oldowner;
00495 
00496     /*
00497      * AtEOSubXact_SPI() should not have popped any SPI context, but just in
00498      * case it did, make sure we remain connected.
00499      */
00500     SPI_restore_connection();
00501 }
00502 
00503 void
00504 PLy_spi_subtransaction_abort(MemoryContext oldcontext, ResourceOwner oldowner)
00505 {
00506     ErrorData  *edata;
00507     PLyExceptionEntry *entry;
00508     PyObject   *exc;
00509 
00510     /* Save error info */
00511     MemoryContextSwitchTo(oldcontext);
00512     edata = CopyErrorData();
00513     FlushErrorState();
00514 
00515     /* Abort the inner transaction */
00516     RollbackAndReleaseCurrentSubTransaction();
00517     MemoryContextSwitchTo(oldcontext);
00518     CurrentResourceOwner = oldowner;
00519 
00520     /*
00521      * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
00522      * have left us in a disconnected state.  We need this hack to return to
00523      * connected state.
00524      */
00525     SPI_restore_connection();
00526 
00527     /* Look up the correct exception */
00528     entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode),
00529                         HASH_FIND, NULL);
00530     /* We really should find it, but just in case have a fallback */
00531     Assert(entry != NULL);
00532     exc = entry ? entry->exc : PLy_exc_spi_error;
00533     /* Make Python raise the exception */
00534     PLy_spi_exception_set(exc, edata);
00535     FreeErrorData(edata);
00536 }
00537 
00538 /*
00539  * Raise a SPIError, passing in it more error details, like the
00540  * internal query and error position.
00541  */
00542 static void
00543 PLy_spi_exception_set(PyObject *excclass, ErrorData *edata)
00544 {
00545     PyObject   *args = NULL;
00546     PyObject   *spierror = NULL;
00547     PyObject   *spidata = NULL;
00548 
00549     args = Py_BuildValue("(s)", edata->message);
00550     if (!args)
00551         goto failure;
00552 
00553     /* create a new SPI exception with the error message as the parameter */
00554     spierror = PyObject_CallObject(excclass, args);
00555     if (!spierror)
00556         goto failure;
00557 
00558     spidata = Py_BuildValue("(izzzi)", edata->sqlerrcode, edata->detail, edata->hint,
00559                             edata->internalquery, edata->internalpos);
00560     if (!spidata)
00561         goto failure;
00562 
00563     if (PyObject_SetAttrString(spierror, "spidata", spidata) == -1)
00564         goto failure;
00565 
00566     PyErr_SetObject(excclass, spierror);
00567 
00568     Py_DECREF(args);
00569     Py_DECREF(spierror);
00570     Py_DECREF(spidata);
00571     return;
00572 
00573 failure:
00574     Py_XDECREF(args);
00575     Py_XDECREF(spierror);
00576     Py_XDECREF(spidata);
00577     elog(ERROR, "could not convert SPI error to Python exception");
00578 }