4 * src/pl/plpython/plpy_cursorobject.c
11 #include "catalog/pg_type.h"
12 #include "mb/pg_wchar.h"
13 #include "plpy_cursorobject.h"
14 #include "plpy_elog.h"
15 #include "plpy_main.h"
16 #include "plpy_planobject.h"
17 #include "plpy_resultobject.h"
20 #include "utils/memutils.h"
22 static PyObject
*PLy_cursor_query(const char *query
);
23 static void PLy_cursor_dealloc(PyObject
*arg
);
24 static PyObject
*PLy_cursor_iternext(PyObject
*self
);
25 static PyObject
*PLy_cursor_fetch(PyObject
*self
, PyObject
*args
);
26 static PyObject
*PLy_cursor_close(PyObject
*self
, PyObject
*unused
);
28 static const char PLy_cursor_doc
[] = "Wrapper around a PostgreSQL cursor";
30 static PyMethodDef PLy_cursor_methods
[] = {
31 {"fetch", PLy_cursor_fetch
, METH_VARARGS
, NULL
},
32 {"close", PLy_cursor_close
, METH_NOARGS
, NULL
},
36 static PyTypeObject PLy_CursorType
= {
37 PyVarObject_HEAD_INIT(NULL
, 0)
38 .tp_name
= "PLyCursor",
39 .tp_basicsize
= sizeof(PLyCursorObject
),
40 .tp_dealloc
= PLy_cursor_dealloc
,
41 .tp_flags
= Py_TPFLAGS_DEFAULT
| Py_TPFLAGS_BASETYPE
,
42 .tp_doc
= PLy_cursor_doc
,
43 .tp_iter
= PyObject_SelfIter
,
44 .tp_iternext
= PLy_cursor_iternext
,
45 .tp_methods
= PLy_cursor_methods
,
49 PLy_cursor_init_type(void)
51 if (PyType_Ready(&PLy_CursorType
) < 0)
52 elog(ERROR
, "could not initialize PLy_CursorType");
56 PLy_cursor(PyObject
*self
, PyObject
*args
)
60 PyObject
*planargs
= NULL
;
62 if (PyArg_ParseTuple(args
, "s", &query
))
63 return PLy_cursor_query(query
);
67 if (PyArg_ParseTuple(args
, "O|O", &plan
, &planargs
))
68 return PLy_cursor_plan(plan
, planargs
);
70 PLy_exception_set(PLy_exc_error
, "plpy.cursor expected a query or a plan");
76 PLy_cursor_query(const char *query
)
78 PLyCursorObject
*cursor
;
79 PLyExecutionContext
*exec_ctx
= PLy_current_execution_context();
80 volatile MemoryContext oldcontext
;
81 volatile ResourceOwner oldowner
;
83 if ((cursor
= PyObject_New(PLyCursorObject
, &PLy_CursorType
)) == NULL
)
85 cursor
->portalname
= NULL
;
86 cursor
->closed
= false;
87 cursor
->mcxt
= AllocSetContextCreate(TopMemoryContext
,
88 "PL/Python cursor context",
89 ALLOCSET_DEFAULT_SIZES
);
91 /* Initialize for converting result tuples to Python */
92 PLy_input_setup_func(&cursor
->result
, cursor
->mcxt
,
96 oldcontext
= CurrentMemoryContext
;
97 oldowner
= CurrentResourceOwner
;
99 PLy_spi_subtransaction_begin(oldcontext
, oldowner
);
106 pg_verifymbstr(query
, strlen(query
), false);
108 plan
= SPI_prepare(query
, 0, NULL
);
110 elog(ERROR
, "SPI_prepare failed: %s",
111 SPI_result_code_string(SPI_result
));
113 portal
= SPI_cursor_open(NULL
, plan
, NULL
, NULL
,
114 exec_ctx
->curr_proc
->fn_readonly
);
118 elog(ERROR
, "SPI_cursor_open() failed: %s",
119 SPI_result_code_string(SPI_result
));
121 cursor
->portalname
= MemoryContextStrdup(cursor
->mcxt
, portal
->name
);
125 PLy_spi_subtransaction_commit(oldcontext
, oldowner
);
129 PLy_spi_subtransaction_abort(oldcontext
, oldowner
);
134 Assert(cursor
->portalname
!= NULL
);
135 return (PyObject
*) cursor
;
139 PLy_cursor_plan(PyObject
*ob
, PyObject
*args
)
141 PLyCursorObject
*cursor
;
145 PLyExecutionContext
*exec_ctx
= PLy_current_execution_context();
146 volatile MemoryContext oldcontext
;
147 volatile ResourceOwner oldowner
;
151 if (!PySequence_Check(args
) || PyUnicode_Check(args
))
153 PLy_exception_set(PyExc_TypeError
, "plpy.cursor takes a sequence as its second argument");
156 nargs
= PySequence_Length(args
);
161 plan
= (PLyPlanObject
*) ob
;
163 if (nargs
!= plan
->nargs
)
166 PyObject
*so
= PyObject_Str(args
);
169 PLy_elog(ERROR
, "could not execute plan");
170 sv
= PLyUnicode_AsString(so
);
171 PLy_exception_set_plural(PyExc_TypeError
,
172 "Expected sequence of %d argument, got %d: %s",
173 "Expected sequence of %d arguments, got %d: %s",
175 plan
->nargs
, nargs
, sv
);
181 if ((cursor
= PyObject_New(PLyCursorObject
, &PLy_CursorType
)) == NULL
)
183 cursor
->portalname
= NULL
;
184 cursor
->closed
= false;
185 cursor
->mcxt
= AllocSetContextCreate(TopMemoryContext
,
186 "PL/Python cursor context",
187 ALLOCSET_DEFAULT_SIZES
);
189 /* Initialize for converting result tuples to Python */
190 PLy_input_setup_func(&cursor
->result
, cursor
->mcxt
,
192 exec_ctx
->curr_proc
);
194 oldcontext
= CurrentMemoryContext
;
195 oldowner
= CurrentResourceOwner
;
197 PLy_spi_subtransaction_begin(oldcontext
, oldowner
);
202 char *volatile nulls
;
206 nulls
= palloc(nargs
* sizeof(char));
210 for (j
= 0; j
< nargs
; j
++)
212 PLyObToDatum
*arg
= &plan
->args
[j
];
215 elem
= PySequence_GetItem(args
, j
);
220 plan
->values
[j
] = PLy_output_convert(arg
, elem
, &isnull
);
221 nulls
[j
] = isnull
? 'n' : ' ';
230 portal
= SPI_cursor_open(NULL
, plan
->plan
, plan
->values
, nulls
,
231 exec_ctx
->curr_proc
->fn_readonly
);
233 elog(ERROR
, "SPI_cursor_open() failed: %s",
234 SPI_result_code_string(SPI_result
));
236 cursor
->portalname
= MemoryContextStrdup(cursor
->mcxt
, portal
->name
);
240 PLy_spi_subtransaction_commit(oldcontext
, oldowner
);
246 /* cleanup plan->values array */
247 for (k
= 0; k
< nargs
; k
++)
249 if (!plan
->args
[k
].typbyval
&&
250 (plan
->values
[k
] != PointerGetDatum(NULL
)))
252 pfree(DatumGetPointer(plan
->values
[k
]));
253 plan
->values
[k
] = PointerGetDatum(NULL
);
259 PLy_spi_subtransaction_abort(oldcontext
, oldowner
);
264 for (i
= 0; i
< nargs
; i
++)
266 if (!plan
->args
[i
].typbyval
&&
267 (plan
->values
[i
] != PointerGetDatum(NULL
)))
269 pfree(DatumGetPointer(plan
->values
[i
]));
270 plan
->values
[i
] = PointerGetDatum(NULL
);
274 Assert(cursor
->portalname
!= NULL
);
275 return (PyObject
*) cursor
;
279 PLy_cursor_dealloc(PyObject
*arg
)
281 PLyCursorObject
*cursor
;
284 cursor
= (PLyCursorObject
*) arg
;
288 portal
= GetPortalByName(cursor
->portalname
);
290 if (PortalIsValid(portal
))
293 SPI_cursor_close(portal
);
295 cursor
->closed
= true;
299 MemoryContextDelete(cursor
->mcxt
);
302 arg
->ob_type
->tp_free(arg
);
306 PLy_cursor_iternext(PyObject
*self
)
308 PLyCursorObject
*cursor
;
310 PLyExecutionContext
*exec_ctx
= PLy_current_execution_context();
311 volatile MemoryContext oldcontext
;
312 volatile ResourceOwner oldowner
;
315 cursor
= (PLyCursorObject
*) self
;
319 PLy_exception_set(PyExc_ValueError
, "iterating a closed cursor");
323 portal
= GetPortalByName(cursor
->portalname
);
324 if (!PortalIsValid(portal
))
326 PLy_exception_set(PyExc_ValueError
,
327 "iterating a cursor in an aborted subtransaction");
331 oldcontext
= CurrentMemoryContext
;
332 oldowner
= CurrentResourceOwner
;
334 PLy_spi_subtransaction_begin(oldcontext
, oldowner
);
338 SPI_cursor_fetch(portal
, true, 1);
339 if (SPI_processed
== 0)
341 PyErr_SetNone(PyExc_StopIteration
);
346 PLy_input_setup_tuple(&cursor
->result
, SPI_tuptable
->tupdesc
,
347 exec_ctx
->curr_proc
);
349 ret
= PLy_input_from_tuple(&cursor
->result
, SPI_tuptable
->vals
[0],
350 SPI_tuptable
->tupdesc
, true);
353 SPI_freetuptable(SPI_tuptable
);
355 PLy_spi_subtransaction_commit(oldcontext
, oldowner
);
359 PLy_spi_subtransaction_abort(oldcontext
, oldowner
);
368 PLy_cursor_fetch(PyObject
*self
, PyObject
*args
)
370 PLyCursorObject
*cursor
;
372 PLyResultObject
*ret
;
373 PLyExecutionContext
*exec_ctx
= PLy_current_execution_context();
374 volatile MemoryContext oldcontext
;
375 volatile ResourceOwner oldowner
;
378 if (!PyArg_ParseTuple(args
, "i:fetch", &count
))
381 cursor
= (PLyCursorObject
*) self
;
385 PLy_exception_set(PyExc_ValueError
, "fetch from a closed cursor");
389 portal
= GetPortalByName(cursor
->portalname
);
390 if (!PortalIsValid(portal
))
392 PLy_exception_set(PyExc_ValueError
,
393 "iterating a cursor in an aborted subtransaction");
397 ret
= (PLyResultObject
*) PLy_result_new();
401 oldcontext
= CurrentMemoryContext
;
402 oldowner
= CurrentResourceOwner
;
404 PLy_spi_subtransaction_begin(oldcontext
, oldowner
);
408 SPI_cursor_fetch(portal
, true, count
);
410 Py_DECREF(ret
->status
);
411 ret
->status
= PyLong_FromLong(SPI_OK_FETCH
);
413 Py_DECREF(ret
->nrows
);
414 ret
->nrows
= PyLong_FromUnsignedLongLong(SPI_processed
);
416 if (SPI_processed
!= 0)
421 * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
422 * and list indices; so we cannot support a result larger than
425 if (SPI_processed
> (uint64
) PY_SSIZE_T_MAX
)
427 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED
),
428 errmsg("query result has too many rows to fit in a Python list")));
430 Py_DECREF(ret
->rows
);
431 ret
->rows
= PyList_New(SPI_processed
);
439 PLy_input_setup_tuple(&cursor
->result
, SPI_tuptable
->tupdesc
,
440 exec_ctx
->curr_proc
);
442 for (i
= 0; i
< SPI_processed
; i
++)
444 PyObject
*row
= PLy_input_from_tuple(&cursor
->result
,
445 SPI_tuptable
->vals
[i
],
446 SPI_tuptable
->tupdesc
,
449 PyList_SetItem(ret
->rows
, i
, row
);
454 SPI_freetuptable(SPI_tuptable
);
456 PLy_spi_subtransaction_commit(oldcontext
, oldowner
);
460 PLy_spi_subtransaction_abort(oldcontext
, oldowner
);
465 return (PyObject
*) ret
;
469 PLy_cursor_close(PyObject
*self
, PyObject
*unused
)
471 PLyCursorObject
*cursor
= (PLyCursorObject
*) self
;
475 Portal portal
= GetPortalByName(cursor
->portalname
);
477 if (!PortalIsValid(portal
))
479 PLy_exception_set(PyExc_ValueError
,
480 "closing a cursor in an aborted subtransaction");
485 SPI_cursor_close(portal
);
486 cursor
->closed
= true;