00001
00002
00003
00004
00005
00006
00007 #include "postgres.h"
00008
00009 #include "access/xact.h"
00010 #include "mb/pg_wchar.h"
00011
00012 #include "plpython.h"
00013
00014 #include "plpy_cursorobject.h"
00015
00016 #include "plpy_elog.h"
00017 #include "plpy_main.h"
00018 #include "plpy_planobject.h"
00019 #include "plpy_procedure.h"
00020 #include "plpy_resultobject.h"
00021 #include "plpy_spi.h"
00022
00023
00024 static PyObject *PLy_cursor_query(const char *query);
00025 static PyObject *PLy_cursor_plan(PyObject *ob, PyObject *args);
00026 static void PLy_cursor_dealloc(PyObject *arg);
00027 static PyObject *PLy_cursor_iternext(PyObject *self);
00028 static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
00029 static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
00030
00031 static char PLy_cursor_doc[] = {
00032 "Wrapper around a PostgreSQL cursor"
00033 };
00034
00035 static PyMethodDef PLy_cursor_methods[] = {
00036 {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
00037 {"close", PLy_cursor_close, METH_NOARGS, NULL},
00038 {NULL, NULL, 0, NULL}
00039 };
00040
00041 static PyTypeObject PLy_CursorType = {
00042 PyVarObject_HEAD_INIT(NULL, 0)
00043 "PLyCursor",
00044 sizeof(PLyCursorObject),
00045 0,
00046
00047
00048
00049
00050 PLy_cursor_dealloc,
00051 0,
00052 0,
00053 0,
00054 0,
00055 0,
00056 0,
00057 0,
00058 0,
00059 0,
00060 0,
00061 0,
00062 0,
00063 0,
00064 0,
00065 Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER,
00066 PLy_cursor_doc,
00067 0,
00068 0,
00069 0,
00070 0,
00071 PyObject_SelfIter,
00072 PLy_cursor_iternext,
00073 PLy_cursor_methods,
00074 };
00075
00076 void
00077 PLy_cursor_init_type(void)
00078 {
00079 if (PyType_Ready(&PLy_CursorType) < 0)
00080 elog(ERROR, "could not initialize PLy_CursorType");
00081 }
00082
00083 PyObject *
00084 PLy_cursor(PyObject *self, PyObject *args)
00085 {
00086 char *query;
00087 PyObject *plan;
00088 PyObject *planargs = NULL;
00089
00090 if (PyArg_ParseTuple(args, "s", &query))
00091 return PLy_cursor_query(query);
00092
00093 PyErr_Clear();
00094
00095 if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
00096 return PLy_cursor_plan(plan, planargs);
00097
00098 PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
00099 return NULL;
00100 }
00101
00102
00103 static PyObject *
00104 PLy_cursor_query(const char *query)
00105 {
00106 PLyCursorObject *cursor;
00107 volatile MemoryContext oldcontext;
00108 volatile ResourceOwner oldowner;
00109
00110 if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
00111 return NULL;
00112 cursor->portalname = NULL;
00113 cursor->closed = false;
00114 PLy_typeinfo_init(&cursor->result);
00115
00116 oldcontext = CurrentMemoryContext;
00117 oldowner = CurrentResourceOwner;
00118
00119 PLy_spi_subtransaction_begin(oldcontext, oldowner);
00120
00121 PG_TRY();
00122 {
00123 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
00124 SPIPlanPtr plan;
00125 Portal portal;
00126
00127 pg_verifymbstr(query, strlen(query), false);
00128
00129 plan = SPI_prepare(query, 0, NULL);
00130 if (plan == NULL)
00131 elog(ERROR, "SPI_prepare failed: %s",
00132 SPI_result_code_string(SPI_result));
00133
00134 portal = SPI_cursor_open(NULL, plan, NULL, NULL,
00135 exec_ctx->curr_proc->fn_readonly);
00136 SPI_freeplan(plan);
00137
00138 if (portal == NULL)
00139 elog(ERROR, "SPI_cursor_open() failed: %s",
00140 SPI_result_code_string(SPI_result));
00141
00142 cursor->portalname = PLy_strdup(portal->name);
00143
00144 PLy_spi_subtransaction_commit(oldcontext, oldowner);
00145 }
00146 PG_CATCH();
00147 {
00148 PLy_spi_subtransaction_abort(oldcontext, oldowner);
00149 return NULL;
00150 }
00151 PG_END_TRY();
00152
00153 Assert(cursor->portalname != NULL);
00154 return (PyObject *) cursor;
00155 }
00156
00157 static PyObject *
00158 PLy_cursor_plan(PyObject *ob, PyObject *args)
00159 {
00160 PLyCursorObject *cursor;
00161 volatile int nargs;
00162 int i;
00163 PLyPlanObject *plan;
00164 volatile MemoryContext oldcontext;
00165 volatile ResourceOwner oldowner;
00166
00167 if (args)
00168 {
00169 if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args))
00170 {
00171 PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
00172 return NULL;
00173 }
00174 nargs = PySequence_Length(args);
00175 }
00176 else
00177 nargs = 0;
00178
00179 plan = (PLyPlanObject *) ob;
00180
00181 if (nargs != plan->nargs)
00182 {
00183 char *sv;
00184 PyObject *so = PyObject_Str(args);
00185
00186 if (!so)
00187 PLy_elog(ERROR, "could not execute plan");
00188 sv = PyString_AsString(so);
00189 PLy_exception_set_plural(PyExc_TypeError,
00190 "Expected sequence of %d argument, got %d: %s",
00191 "Expected sequence of %d arguments, got %d: %s",
00192 plan->nargs,
00193 plan->nargs, nargs, sv);
00194 Py_DECREF(so);
00195
00196 return NULL;
00197 }
00198
00199 if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
00200 return NULL;
00201 cursor->portalname = NULL;
00202 cursor->closed = false;
00203 PLy_typeinfo_init(&cursor->result);
00204
00205 oldcontext = CurrentMemoryContext;
00206 oldowner = CurrentResourceOwner;
00207
00208 PLy_spi_subtransaction_begin(oldcontext, oldowner);
00209
00210 PG_TRY();
00211 {
00212 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
00213 Portal portal;
00214 char *volatile nulls;
00215 volatile int j;
00216
00217 if (nargs > 0)
00218 nulls = palloc(nargs * sizeof(char));
00219 else
00220 nulls = NULL;
00221
00222 for (j = 0; j < nargs; j++)
00223 {
00224 PyObject *elem;
00225
00226 elem = PySequence_GetItem(args, j);
00227 if (elem != Py_None)
00228 {
00229 PG_TRY();
00230 {
00231 plan->values[j] =
00232 plan->args[j].out.d.func(&(plan->args[j].out.d),
00233 -1,
00234 elem);
00235 }
00236 PG_CATCH();
00237 {
00238 Py_DECREF(elem);
00239 PG_RE_THROW();
00240 }
00241 PG_END_TRY();
00242
00243 Py_DECREF(elem);
00244 nulls[j] = ' ';
00245 }
00246 else
00247 {
00248 Py_DECREF(elem);
00249 plan->values[j] =
00250 InputFunctionCall(&(plan->args[j].out.d.typfunc),
00251 NULL,
00252 plan->args[j].out.d.typioparam,
00253 -1);
00254 nulls[j] = 'n';
00255 }
00256 }
00257
00258 portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
00259 exec_ctx->curr_proc->fn_readonly);
00260 if (portal == NULL)
00261 elog(ERROR, "SPI_cursor_open() failed: %s",
00262 SPI_result_code_string(SPI_result));
00263
00264 cursor->portalname = PLy_strdup(portal->name);
00265
00266 PLy_spi_subtransaction_commit(oldcontext, oldowner);
00267 }
00268 PG_CATCH();
00269 {
00270 int k;
00271
00272
00273 for (k = 0; k < nargs; k++)
00274 {
00275 if (!plan->args[k].out.d.typbyval &&
00276 (plan->values[k] != PointerGetDatum(NULL)))
00277 {
00278 pfree(DatumGetPointer(plan->values[k]));
00279 plan->values[k] = PointerGetDatum(NULL);
00280 }
00281 }
00282
00283 Py_DECREF(cursor);
00284
00285 PLy_spi_subtransaction_abort(oldcontext, oldowner);
00286 return NULL;
00287 }
00288 PG_END_TRY();
00289
00290 for (i = 0; i < nargs; i++)
00291 {
00292 if (!plan->args[i].out.d.typbyval &&
00293 (plan->values[i] != PointerGetDatum(NULL)))
00294 {
00295 pfree(DatumGetPointer(plan->values[i]));
00296 plan->values[i] = PointerGetDatum(NULL);
00297 }
00298 }
00299
00300 Assert(cursor->portalname != NULL);
00301 return (PyObject *) cursor;
00302 }
00303
00304 static void
00305 PLy_cursor_dealloc(PyObject *arg)
00306 {
00307 PLyCursorObject *cursor;
00308 Portal portal;
00309
00310 cursor = (PLyCursorObject *) arg;
00311
00312 if (!cursor->closed)
00313 {
00314 portal = GetPortalByName(cursor->portalname);
00315
00316 if (PortalIsValid(portal))
00317 SPI_cursor_close(portal);
00318 }
00319
00320 PLy_free(cursor->portalname);
00321 cursor->portalname = NULL;
00322
00323 PLy_typeinfo_dealloc(&cursor->result);
00324 arg->ob_type->tp_free(arg);
00325 }
00326
00327 static PyObject *
00328 PLy_cursor_iternext(PyObject *self)
00329 {
00330 PLyCursorObject *cursor;
00331 PyObject *ret;
00332 volatile MemoryContext oldcontext;
00333 volatile ResourceOwner oldowner;
00334 Portal portal;
00335
00336 cursor = (PLyCursorObject *) self;
00337
00338 if (cursor->closed)
00339 {
00340 PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
00341 return NULL;
00342 }
00343
00344 portal = GetPortalByName(cursor->portalname);
00345 if (!PortalIsValid(portal))
00346 {
00347 PLy_exception_set(PyExc_ValueError,
00348 "iterating a cursor in an aborted subtransaction");
00349 return NULL;
00350 }
00351
00352 oldcontext = CurrentMemoryContext;
00353 oldowner = CurrentResourceOwner;
00354
00355 PLy_spi_subtransaction_begin(oldcontext, oldowner);
00356
00357 PG_TRY();
00358 {
00359 SPI_cursor_fetch(portal, true, 1);
00360 if (SPI_processed == 0)
00361 {
00362 PyErr_SetNone(PyExc_StopIteration);
00363 ret = NULL;
00364 }
00365 else
00366 {
00367 if (cursor->result.is_rowtype != 1)
00368 PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc);
00369
00370 ret = PLyDict_FromTuple(&cursor->result, SPI_tuptable->vals[0],
00371 SPI_tuptable->tupdesc);
00372 }
00373
00374 SPI_freetuptable(SPI_tuptable);
00375
00376 PLy_spi_subtransaction_commit(oldcontext, oldowner);
00377 }
00378 PG_CATCH();
00379 {
00380 SPI_freetuptable(SPI_tuptable);
00381
00382 PLy_spi_subtransaction_abort(oldcontext, oldowner);
00383 return NULL;
00384 }
00385 PG_END_TRY();
00386
00387 return ret;
00388 }
00389
00390 static PyObject *
00391 PLy_cursor_fetch(PyObject *self, PyObject *args)
00392 {
00393 PLyCursorObject *cursor;
00394 int count;
00395 PLyResultObject *ret;
00396 volatile MemoryContext oldcontext;
00397 volatile ResourceOwner oldowner;
00398 Portal portal;
00399
00400 if (!PyArg_ParseTuple(args, "i", &count))
00401 return NULL;
00402
00403 cursor = (PLyCursorObject *) self;
00404
00405 if (cursor->closed)
00406 {
00407 PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
00408 return NULL;
00409 }
00410
00411 portal = GetPortalByName(cursor->portalname);
00412 if (!PortalIsValid(portal))
00413 {
00414 PLy_exception_set(PyExc_ValueError,
00415 "iterating a cursor in an aborted subtransaction");
00416 return NULL;
00417 }
00418
00419 ret = (PLyResultObject *) PLy_result_new();
00420 if (ret == NULL)
00421 return NULL;
00422
00423 oldcontext = CurrentMemoryContext;
00424 oldowner = CurrentResourceOwner;
00425
00426 PLy_spi_subtransaction_begin(oldcontext, oldowner);
00427
00428 PG_TRY();
00429 {
00430 SPI_cursor_fetch(portal, true, count);
00431
00432 if (cursor->result.is_rowtype != 1)
00433 PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc);
00434
00435 Py_DECREF(ret->status);
00436 ret->status = PyInt_FromLong(SPI_OK_FETCH);
00437
00438 Py_DECREF(ret->nrows);
00439 ret->nrows = PyInt_FromLong(SPI_processed);
00440
00441 if (SPI_processed != 0)
00442 {
00443 int i;
00444
00445 Py_DECREF(ret->rows);
00446 ret->rows = PyList_New(SPI_processed);
00447
00448 for (i = 0; i < SPI_processed; i++)
00449 {
00450 PyObject *row = PLyDict_FromTuple(&cursor->result,
00451 SPI_tuptable->vals[i],
00452 SPI_tuptable->tupdesc);
00453
00454 PyList_SetItem(ret->rows, i, row);
00455 }
00456 }
00457
00458 SPI_freetuptable(SPI_tuptable);
00459
00460 PLy_spi_subtransaction_commit(oldcontext, oldowner);
00461 }
00462 PG_CATCH();
00463 {
00464 SPI_freetuptable(SPI_tuptable);
00465
00466 PLy_spi_subtransaction_abort(oldcontext, oldowner);
00467 return NULL;
00468 }
00469 PG_END_TRY();
00470
00471 return (PyObject *) ret;
00472 }
00473
00474 static PyObject *
00475 PLy_cursor_close(PyObject *self, PyObject *unused)
00476 {
00477 PLyCursorObject *cursor = (PLyCursorObject *) self;
00478
00479 if (!cursor->closed)
00480 {
00481 Portal portal = GetPortalByName(cursor->portalname);
00482
00483 if (!PortalIsValid(portal))
00484 {
00485 PLy_exception_set(PyExc_ValueError,
00486 "closing a cursor in an aborted subtransaction");
00487 return NULL;
00488 }
00489
00490 SPI_cursor_close(portal);
00491 cursor->closed = true;
00492 }
00493
00494 Py_INCREF(Py_None);
00495 return Py_None;
00496 }