Fix header inclusion order in c.h.
[pgsql.git] / src / pl / plpython / plpy_cursorobject.c
blob6108384c9a50c1b584020a19a55b2943e3323b37
1 /*
2 * the PLyCursor class
4 * src/pl/plpython/plpy_cursorobject.c
5 */
7 #include "postgres.h"
9 #include <limits.h>
11 #include "catalog/pg_type.h"
12 #include "mb/pg_wchar.h"
13 #include "plpy_cursorobject.h"
14 #include "plpy_elog.h"
15 #include "plpy_main.h"
16 #include "plpy_planobject.h"
17 #include "plpy_resultobject.h"
18 #include "plpy_spi.h"
19 #include "plpython.h"
20 #include "utils/memutils.h"
22 static PyObject *PLy_cursor_query(const char *query);
23 static void PLy_cursor_dealloc(PyObject *arg);
24 static PyObject *PLy_cursor_iternext(PyObject *self);
25 static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
26 static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
28 static const char PLy_cursor_doc[] = "Wrapper around a PostgreSQL cursor";
30 static PyMethodDef PLy_cursor_methods[] = {
31 {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
32 {"close", PLy_cursor_close, METH_NOARGS, NULL},
33 {NULL, NULL, 0, NULL}
36 static PyTypeObject PLy_CursorType = {
37 PyVarObject_HEAD_INIT(NULL, 0)
38 .tp_name = "PLyCursor",
39 .tp_basicsize = sizeof(PLyCursorObject),
40 .tp_dealloc = PLy_cursor_dealloc,
41 .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
42 .tp_doc = PLy_cursor_doc,
43 .tp_iter = PyObject_SelfIter,
44 .tp_iternext = PLy_cursor_iternext,
45 .tp_methods = PLy_cursor_methods,
48 void
49 PLy_cursor_init_type(void)
51 if (PyType_Ready(&PLy_CursorType) < 0)
52 elog(ERROR, "could not initialize PLy_CursorType");
55 PyObject *
56 PLy_cursor(PyObject *self, PyObject *args)
58 char *query;
59 PyObject *plan;
60 PyObject *planargs = NULL;
62 if (PyArg_ParseTuple(args, "s", &query))
63 return PLy_cursor_query(query);
65 PyErr_Clear();
67 if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
68 return PLy_cursor_plan(plan, planargs);
70 PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
71 return NULL;
75 static PyObject *
76 PLy_cursor_query(const char *query)
78 PLyCursorObject *cursor;
79 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
80 volatile MemoryContext oldcontext;
81 volatile ResourceOwner oldowner;
83 if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
84 return NULL;
85 cursor->portalname = NULL;
86 cursor->closed = false;
87 cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
88 "PL/Python cursor context",
89 ALLOCSET_DEFAULT_SIZES);
91 /* Initialize for converting result tuples to Python */
92 PLy_input_setup_func(&cursor->result, cursor->mcxt,
93 RECORDOID, -1,
94 exec_ctx->curr_proc);
96 oldcontext = CurrentMemoryContext;
97 oldowner = CurrentResourceOwner;
99 PLy_spi_subtransaction_begin(oldcontext, oldowner);
101 PG_TRY();
103 SPIPlanPtr plan;
104 Portal portal;
106 pg_verifymbstr(query, strlen(query), false);
108 plan = SPI_prepare(query, 0, NULL);
109 if (plan == NULL)
110 elog(ERROR, "SPI_prepare failed: %s",
111 SPI_result_code_string(SPI_result));
113 portal = SPI_cursor_open(NULL, plan, NULL, NULL,
114 exec_ctx->curr_proc->fn_readonly);
115 SPI_freeplan(plan);
117 if (portal == NULL)
118 elog(ERROR, "SPI_cursor_open() failed: %s",
119 SPI_result_code_string(SPI_result));
121 cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
123 PinPortal(portal);
125 PLy_spi_subtransaction_commit(oldcontext, oldowner);
127 PG_CATCH();
129 PLy_spi_subtransaction_abort(oldcontext, oldowner);
130 return NULL;
132 PG_END_TRY();
134 Assert(cursor->portalname != NULL);
135 return (PyObject *) cursor;
138 PyObject *
139 PLy_cursor_plan(PyObject *ob, PyObject *args)
141 PLyCursorObject *cursor;
142 volatile int nargs;
143 int i;
144 PLyPlanObject *plan;
145 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
146 volatile MemoryContext oldcontext;
147 volatile ResourceOwner oldowner;
149 if (args)
151 if (!PySequence_Check(args) || PyUnicode_Check(args))
153 PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
154 return NULL;
156 nargs = PySequence_Length(args);
158 else
159 nargs = 0;
161 plan = (PLyPlanObject *) ob;
163 if (nargs != plan->nargs)
165 char *sv;
166 PyObject *so = PyObject_Str(args);
168 if (!so)
169 PLy_elog(ERROR, "could not execute plan");
170 sv = PLyUnicode_AsString(so);
171 PLy_exception_set_plural(PyExc_TypeError,
172 "Expected sequence of %d argument, got %d: %s",
173 "Expected sequence of %d arguments, got %d: %s",
174 plan->nargs,
175 plan->nargs, nargs, sv);
176 Py_DECREF(so);
178 return NULL;
181 if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
182 return NULL;
183 cursor->portalname = NULL;
184 cursor->closed = false;
185 cursor->mcxt = AllocSetContextCreate(TopMemoryContext,
186 "PL/Python cursor context",
187 ALLOCSET_DEFAULT_SIZES);
189 /* Initialize for converting result tuples to Python */
190 PLy_input_setup_func(&cursor->result, cursor->mcxt,
191 RECORDOID, -1,
192 exec_ctx->curr_proc);
194 oldcontext = CurrentMemoryContext;
195 oldowner = CurrentResourceOwner;
197 PLy_spi_subtransaction_begin(oldcontext, oldowner);
199 PG_TRY();
201 Portal portal;
202 char *volatile nulls;
203 volatile int j;
205 if (nargs > 0)
206 nulls = palloc(nargs * sizeof(char));
207 else
208 nulls = NULL;
210 for (j = 0; j < nargs; j++)
212 PLyObToDatum *arg = &plan->args[j];
213 PyObject *elem;
215 elem = PySequence_GetItem(args, j);
216 PG_TRY(2);
218 bool isnull;
220 plan->values[j] = PLy_output_convert(arg, elem, &isnull);
221 nulls[j] = isnull ? 'n' : ' ';
223 PG_FINALLY(2);
225 Py_DECREF(elem);
227 PG_END_TRY(2);
230 portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
231 exec_ctx->curr_proc->fn_readonly);
232 if (portal == NULL)
233 elog(ERROR, "SPI_cursor_open() failed: %s",
234 SPI_result_code_string(SPI_result));
236 cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
238 PinPortal(portal);
240 PLy_spi_subtransaction_commit(oldcontext, oldowner);
242 PG_CATCH();
244 int k;
246 /* cleanup plan->values array */
247 for (k = 0; k < nargs; k++)
249 if (!plan->args[k].typbyval &&
250 (plan->values[k] != PointerGetDatum(NULL)))
252 pfree(DatumGetPointer(plan->values[k]));
253 plan->values[k] = PointerGetDatum(NULL);
257 Py_DECREF(cursor);
259 PLy_spi_subtransaction_abort(oldcontext, oldowner);
260 return NULL;
262 PG_END_TRY();
264 for (i = 0; i < nargs; i++)
266 if (!plan->args[i].typbyval &&
267 (plan->values[i] != PointerGetDatum(NULL)))
269 pfree(DatumGetPointer(plan->values[i]));
270 plan->values[i] = PointerGetDatum(NULL);
274 Assert(cursor->portalname != NULL);
275 return (PyObject *) cursor;
278 static void
279 PLy_cursor_dealloc(PyObject *arg)
281 PLyCursorObject *cursor;
282 Portal portal;
284 cursor = (PLyCursorObject *) arg;
286 if (!cursor->closed)
288 portal = GetPortalByName(cursor->portalname);
290 if (PortalIsValid(portal))
292 UnpinPortal(portal);
293 SPI_cursor_close(portal);
295 cursor->closed = true;
297 if (cursor->mcxt)
299 MemoryContextDelete(cursor->mcxt);
300 cursor->mcxt = NULL;
302 arg->ob_type->tp_free(arg);
305 static PyObject *
306 PLy_cursor_iternext(PyObject *self)
308 PLyCursorObject *cursor;
309 PyObject *ret;
310 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
311 volatile MemoryContext oldcontext;
312 volatile ResourceOwner oldowner;
313 Portal portal;
315 cursor = (PLyCursorObject *) self;
317 if (cursor->closed)
319 PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
320 return NULL;
323 portal = GetPortalByName(cursor->portalname);
324 if (!PortalIsValid(portal))
326 PLy_exception_set(PyExc_ValueError,
327 "iterating a cursor in an aborted subtransaction");
328 return NULL;
331 oldcontext = CurrentMemoryContext;
332 oldowner = CurrentResourceOwner;
334 PLy_spi_subtransaction_begin(oldcontext, oldowner);
336 PG_TRY();
338 SPI_cursor_fetch(portal, true, 1);
339 if (SPI_processed == 0)
341 PyErr_SetNone(PyExc_StopIteration);
342 ret = NULL;
344 else
346 PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
347 exec_ctx->curr_proc);
349 ret = PLy_input_from_tuple(&cursor->result, SPI_tuptable->vals[0],
350 SPI_tuptable->tupdesc, true);
353 SPI_freetuptable(SPI_tuptable);
355 PLy_spi_subtransaction_commit(oldcontext, oldowner);
357 PG_CATCH();
359 PLy_spi_subtransaction_abort(oldcontext, oldowner);
360 return NULL;
362 PG_END_TRY();
364 return ret;
367 static PyObject *
368 PLy_cursor_fetch(PyObject *self, PyObject *args)
370 PLyCursorObject *cursor;
371 int count;
372 PLyResultObject *ret;
373 PLyExecutionContext *exec_ctx = PLy_current_execution_context();
374 volatile MemoryContext oldcontext;
375 volatile ResourceOwner oldowner;
376 Portal portal;
378 if (!PyArg_ParseTuple(args, "i:fetch", &count))
379 return NULL;
381 cursor = (PLyCursorObject *) self;
383 if (cursor->closed)
385 PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
386 return NULL;
389 portal = GetPortalByName(cursor->portalname);
390 if (!PortalIsValid(portal))
392 PLy_exception_set(PyExc_ValueError,
393 "iterating a cursor in an aborted subtransaction");
394 return NULL;
397 ret = (PLyResultObject *) PLy_result_new();
398 if (ret == NULL)
399 return NULL;
401 oldcontext = CurrentMemoryContext;
402 oldowner = CurrentResourceOwner;
404 PLy_spi_subtransaction_begin(oldcontext, oldowner);
406 PG_TRY();
408 SPI_cursor_fetch(portal, true, count);
410 Py_DECREF(ret->status);
411 ret->status = PyLong_FromLong(SPI_OK_FETCH);
413 Py_DECREF(ret->nrows);
414 ret->nrows = PyLong_FromUnsignedLongLong(SPI_processed);
416 if (SPI_processed != 0)
418 uint64 i;
421 * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
422 * and list indices; so we cannot support a result larger than
423 * PY_SSIZE_T_MAX.
425 if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
426 ereport(ERROR,
427 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
428 errmsg("query result has too many rows to fit in a Python list")));
430 Py_DECREF(ret->rows);
431 ret->rows = PyList_New(SPI_processed);
432 if (!ret->rows)
434 Py_DECREF(ret);
435 ret = NULL;
437 else
439 PLy_input_setup_tuple(&cursor->result, SPI_tuptable->tupdesc,
440 exec_ctx->curr_proc);
442 for (i = 0; i < SPI_processed; i++)
444 PyObject *row = PLy_input_from_tuple(&cursor->result,
445 SPI_tuptable->vals[i],
446 SPI_tuptable->tupdesc,
447 true);
449 PyList_SetItem(ret->rows, i, row);
454 SPI_freetuptable(SPI_tuptable);
456 PLy_spi_subtransaction_commit(oldcontext, oldowner);
458 PG_CATCH();
460 PLy_spi_subtransaction_abort(oldcontext, oldowner);
461 return NULL;
463 PG_END_TRY();
465 return (PyObject *) ret;
468 static PyObject *
469 PLy_cursor_close(PyObject *self, PyObject *unused)
471 PLyCursorObject *cursor = (PLyCursorObject *) self;
473 if (!cursor->closed)
475 Portal portal = GetPortalByName(cursor->portalname);
477 if (!PortalIsValid(portal))
479 PLy_exception_set(PyExc_ValueError,
480 "closing a cursor in an aborted subtransaction");
481 return NULL;
484 UnpinPortal(portal);
485 SPI_cursor_close(portal);
486 cursor->closed = true;
489 Py_RETURN_NONE;