2 Unix SMB/CIFS implementation.
3 Samba utility functions
4 Copyright © Jelmer Vernooij <jelmer@samba.org> 2008
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "lib/replace/system/python.h"
22 #include "python/py3compat.h"
23 #include "libcli/util/pyerrors.h"
24 #include "python/modules.h"
25 #include "../libcli/nbt/libnbt.h"
26 #include "lib/events/events.h"
28 void initnetbios(void);
30 extern PyTypeObject nbt_node_Type
;
35 struct nbt_name_socket
*socket
;
38 static void py_nbt_node_dealloc(nbt_node_Object
*self
)
40 talloc_free(self
->mem_ctx
);
41 Py_TYPE(self
)->tp_free(self
);
44 static PyObject
*py_nbt_node_init(PyTypeObject
*self
, PyObject
*args
, PyObject
*kwargs
)
46 struct tevent_context
*ev
;
47 nbt_node_Object
*ret
= PyObject_New(nbt_node_Object
, &nbt_node_Type
);
49 ret
->mem_ctx
= talloc_new(NULL
);
50 if (ret
->mem_ctx
== NULL
)
53 ev
= s4_event_context_init(ret
->mem_ctx
);
54 ret
->socket
= nbt_name_socket_init(ret
->mem_ctx
, ev
);
55 return (PyObject
*)ret
;
58 static bool PyObject_AsDestinationTuple(PyObject
*obj
, const char **dest_addr
, uint16_t *dest_port
)
60 if (PyUnicode_Check(obj
)) {
61 *dest_addr
= PyUnicode_AsUTF8(obj
);
62 *dest_port
= NBT_NAME_SERVICE_PORT
;
66 if (PyTuple_Check(obj
)) {
67 if (PyTuple_Size(obj
) < 1) {
68 PyErr_SetString(PyExc_TypeError
, "Destination tuple size invalid");
72 if (!PyUnicode_Check(PyTuple_GetItem(obj
, 0))) {
73 PyErr_SetString(PyExc_TypeError
, "Destination tuple first element not string");
77 *dest_addr
= PyUnicode_AsUTF8(obj
);
79 if (PyTuple_Size(obj
) == 1) {
80 *dest_port
= NBT_NAME_SERVICE_PORT
;
82 } else if (PyLong_Check(PyTuple_GetItem(obj
, 1))) {
83 *dest_port
= PyLong_AsLong(PyTuple_GetItem(obj
, 1));
86 PyErr_SetString(PyExc_TypeError
, "Destination tuple second element not a port");
91 PyErr_SetString(PyExc_TypeError
, "Destination tuple second element not a port");
95 static bool PyObject_AsNBTName(PyObject
*obj
, struct nbt_name_socket
*name_socket
, struct nbt_name
*name
)
97 if (PyTuple_Check(obj
)) {
98 if (PyTuple_Size(obj
) == 2) {
99 name
->name
= PyUnicode_AsUTF8(PyTuple_GetItem(obj
, 0));
100 if (name
->name
== NULL
) {
103 name
->type
= PyLong_AsLong(PyTuple_GetItem(obj
, 1));
104 if (name
->type
== -1 && PyErr_Occurred()) {
109 } else if (PyTuple_Size(obj
) == 3) {
110 name
->name
= PyUnicode_AsUTF8(PyTuple_GetItem(obj
, 0));
111 if (name
->name
== NULL
) {
114 name
->scope
= PyUnicode_AsUTF8(PyTuple_GetItem(obj
, 1));
115 if (name
->scope
== NULL
) {
118 name
->type
= PyLong_AsLong(PyTuple_GetItem(obj
, 2));
119 if (name
->type
== -1 && PyErr_Occurred()) {
124 PyErr_SetString(PyExc_TypeError
, "Invalid tuple size");
129 if (PyUnicode_Check(obj
)) {
130 /* FIXME: Parse string to be able to interpret things like RHONWYN<02> ? */
131 name
->name
= PyUnicode_AsUTF8(obj
);
132 if (name
->name
== NULL
) {
140 PyErr_SetString(PyExc_TypeError
, "Invalid type for object");
144 static PyObject
*PyObject_FromNBTName(struct nbt_name_socket
*name_socket
,
145 struct nbt_name
*name
)
148 return Py_BuildValue("(ssi)", name
->name
, name
->scope
, name
->type
);
150 return Py_BuildValue("(si)", name
->name
, name
->type
);
154 static PyObject
*py_nbt_name_query(PyObject
*self
, PyObject
*args
, PyObject
*kwargs
)
156 nbt_node_Object
*node
= (nbt_node_Object
*)self
;
157 PyObject
*ret
, *reply_addrs
, *py_dest
, *py_name
;
158 struct nbt_name_query io
;
162 const char *kwnames
[] = { "name", "dest", "broadcast", "wins", "timeout",
164 io
.in
.broadcast
= true;
165 io
.in
.wins_lookup
= false;
169 if (!PyArg_ParseTupleAndKeywords(args
, kwargs
, "OO|bbii:query_name",
170 discard_const_p(char *, kwnames
),
172 &io
.in
.broadcast
, &io
.in
.wins_lookup
,
173 &io
.in
.timeout
, &io
.in
.retries
)) {
177 if (!PyObject_AsDestinationTuple(py_dest
, &io
.in
.dest_addr
, &io
.in
.dest_port
))
180 if (!PyObject_AsNBTName(py_name
, node
->socket
, &io
.in
.name
))
183 status
= nbt_name_query(node
->socket
, NULL
, &io
);
185 if (NT_STATUS_IS_ERR(status
)) {
186 PyErr_SetNTSTATUS(status
);
190 ret
= PyTuple_New(3);
193 PyTuple_SetItem(ret
, 0, PyUnicode_FromString(io
.out
.reply_from
));
195 py_name
= PyObject_FromNBTName(node
->socket
, &io
.out
.name
);
199 PyTuple_SetItem(ret
, 1, py_name
);
201 reply_addrs
= PyList_New(io
.out
.num_addrs
);
202 if (reply_addrs
== NULL
) {
207 for (i
= 0; i
< io
.out
.num_addrs
; i
++) {
208 PyList_SetItem(reply_addrs
, i
, PyUnicode_FromString(io
.out
.reply_addrs
[i
]));
211 PyTuple_SetItem(ret
, 2, reply_addrs
);
215 static PyObject
*py_nbt_name_status(PyObject
*self
, PyObject
*args
, PyObject
*kwargs
)
217 nbt_node_Object
*node
= (nbt_node_Object
*)self
;
218 PyObject
*ret
, *py_dest
, *py_name
, *py_names
;
219 struct nbt_name_status io
;
223 const char *kwnames
[] = { "name", "dest", "timeout", "retries", NULL
};
228 if (!PyArg_ParseTupleAndKeywords(args
, kwargs
, "OO|ii:name_status",
229 discard_const_p(char *, kwnames
),
231 &io
.in
.timeout
, &io
.in
.retries
)) {
235 if (!PyObject_AsDestinationTuple(py_dest
, &io
.in
.dest_addr
, &io
.in
.dest_port
))
238 if (!PyObject_AsNBTName(py_name
, node
->socket
, &io
.in
.name
))
241 status
= nbt_name_status(node
->socket
, NULL
, &io
);
243 if (NT_STATUS_IS_ERR(status
)) {
244 PyErr_SetNTSTATUS(status
);
248 ret
= PyTuple_New(3);
251 PyTuple_SetItem(ret
, 0, PyUnicode_FromString(io
.out
.reply_from
));
253 py_name
= PyObject_FromNBTName(node
->socket
, &io
.out
.name
);
257 PyTuple_SetItem(ret
, 1, py_name
);
259 py_names
= PyList_New(io
.out
.status
.num_names
);
261 for (i
= 0; i
< io
.out
.status
.num_names
; i
++) {
262 PyList_SetItem(py_names
, i
, Py_BuildValue("(sii)",
263 io
.out
.status
.names
[i
].name
,
264 io
.out
.status
.names
[i
].nb_flags
,
265 io
.out
.status
.names
[i
].type
));
268 PyTuple_SetItem(ret
, 2, py_names
);
273 static PyObject
*py_nbt_name_register(PyObject
*self
, PyObject
*args
, PyObject
*kwargs
)
275 nbt_node_Object
*node
= (nbt_node_Object
*)self
;
276 PyObject
*ret
, *py_dest
, *py_name
;
277 struct nbt_name_register io
= {};
280 const char *kwnames
[] = { "name", "address", "dest", "register_demand", "broadcast",
281 "multi_homed", "ttl", "timeout", "retries", NULL
};
283 io
.in
.broadcast
= true;
284 io
.in
.multi_homed
= true;
285 io
.in
.register_demand
= true;
290 if (!PyArg_ParseTupleAndKeywords(args
, kwargs
, "OsO|bbbiii:query_name",
291 discard_const_p(char *, kwnames
),
292 &py_name
, &io
.in
.address
, &py_dest
,
293 &io
.in
.register_demand
,
294 &io
.in
.broadcast
, &io
.in
.multi_homed
,
295 &io
.in
.ttl
, &io
.in
.timeout
, &io
.in
.retries
)) {
299 if (!PyObject_AsDestinationTuple(py_dest
, &io
.in
.dest_addr
, &io
.in
.dest_port
))
302 if (!PyObject_AsNBTName(py_name
, node
->socket
, &io
.in
.name
))
305 status
= nbt_name_register(node
->socket
, NULL
, &io
);
307 if (NT_STATUS_IS_ERR(status
)) {
308 PyErr_SetNTSTATUS(status
);
312 ret
= PyTuple_New(4);
315 PyTuple_SetItem(ret
, 0, PyUnicode_FromString(io
.out
.reply_from
));
317 py_name
= PyObject_FromNBTName(node
->socket
, &io
.out
.name
);
321 PyTuple_SetItem(ret
, 1, py_name
);
323 PyTuple_SetItem(ret
, 2, PyUnicode_FromString(io
.out
.reply_addr
));
325 PyTuple_SetItem(ret
, 3, PyLong_FromLong(io
.out
.rcode
));
330 static PyObject
*py_nbt_name_refresh(PyObject
*self
, PyObject
*args
, PyObject
*kwargs
)
332 nbt_node_Object
*node
= (nbt_node_Object
*)self
;
333 PyObject
*ret
, *py_dest
, *py_name
;
334 struct nbt_name_refresh io
;
337 const char *kwnames
[] = { "name", "address", "dest", "nb_flags", "broadcast",
338 "ttl", "timeout", "retries", NULL
};
340 io
.in
.broadcast
= true;
346 if (!PyArg_ParseTupleAndKeywords(args
, kwargs
, "OsO|ibiii:query_name",
347 discard_const_p(char *, kwnames
),
348 &py_name
, &io
.in
.address
, &py_dest
,
351 &io
.in
.ttl
, &io
.in
.timeout
, &io
.in
.retries
)) {
355 if (!PyObject_AsDestinationTuple(py_dest
, &io
.in
.dest_addr
, &io
.in
.dest_port
))
358 if (!PyObject_AsNBTName(py_name
, node
->socket
, &io
.in
.name
))
361 status
= nbt_name_refresh(node
->socket
, NULL
, &io
);
363 if (NT_STATUS_IS_ERR(status
)) {
364 PyErr_SetNTSTATUS(status
);
368 ret
= PyTuple_New(3);
371 PyTuple_SetItem(ret
, 0, PyUnicode_FromString(io
.out
.reply_from
));
373 py_name
= PyObject_FromNBTName(node
->socket
, &io
.out
.name
);
377 PyTuple_SetItem(ret
, 1, py_name
);
379 PyTuple_SetItem(ret
, 2, PyUnicode_FromString(io
.out
.reply_addr
));
381 PyTuple_SetItem(ret
, 3, PyLong_FromLong(io
.out
.rcode
));
386 static PyObject
*py_nbt_name_release(PyObject
*self
, PyObject
*args
, PyObject
*kwargs
)
388 Py_RETURN_NONE
; /* FIXME */
391 static PyMethodDef py_nbt_methods
[] = {
392 { "query_name", PY_DISCARD_FUNC_SIG(PyCFunction
, py_nbt_name_query
),
393 METH_VARARGS
|METH_KEYWORDS
,
394 "S.query_name(name, dest, broadcast=True, wins=False, timeout=0, retries=3) -> (reply_from, name, reply_addr)\n"
395 "Query for a NetBIOS name" },
396 { "register_name", PY_DISCARD_FUNC_SIG(PyCFunction
,
397 py_nbt_name_register
),
398 METH_VARARGS
|METH_KEYWORDS
,
399 "S.register_name(name, address, dest, register_demand=True, broadcast=True, multi_homed=True, ttl=0, timeout=0, retries=0) -> (reply_from, name, reply_addr, rcode)\n"
400 "Register a new name" },
401 { "release_name", PY_DISCARD_FUNC_SIG(PyCFunction
, py_nbt_name_release
),
402 METH_VARARGS
|METH_KEYWORDS
, "S.release_name(name, address, dest, nb_flags=0, broadcast=true, timeout=0, retries=3) -> (reply_from, name, reply_addr, rcode)\n"
403 "release a previously registered name" },
404 { "refresh_name", PY_DISCARD_FUNC_SIG(PyCFunction
, py_nbt_name_refresh
),
405 METH_VARARGS
|METH_KEYWORDS
, "S.refresh_name(name, address, dest, nb_flags=0, broadcast=True, ttl=0, timeout=0, retries=0) -> (reply_from, name, reply_addr, rcode)\n"
406 "release a previously registered name" },
407 { "name_status", PY_DISCARD_FUNC_SIG(PyCFunction
, py_nbt_name_status
),
408 METH_VARARGS
|METH_KEYWORDS
,
409 "S.name_status(name, dest, timeout=0, retries=0) -> (reply_from, name, status)\n"
410 "Find the status of a name" },
415 PyTypeObject nbt_node_Type
= {
416 PyVarObject_HEAD_INIT(NULL
, 0)
417 .tp_name
= "netbios.Node",
418 .tp_basicsize
= sizeof(nbt_node_Object
),
419 .tp_flags
= Py_TPFLAGS_DEFAULT
|Py_TPFLAGS_BASETYPE
,
420 .tp_new
= py_nbt_node_init
,
421 .tp_dealloc
= (destructor
)py_nbt_node_dealloc
,
422 .tp_methods
= py_nbt_methods
,
424 "Create a new NetBIOS node\n"
427 static struct PyModuleDef moduledef
= {
428 PyModuleDef_HEAD_INIT
,
430 .m_doc
= "NetBIOS over TCP/IP support",
435 MODULE_INIT_FUNC(netbios
)
437 PyObject
*mod
= NULL
;
438 if (PyType_Ready(&nbt_node_Type
) < 0)
441 mod
= PyModule_Create(&moduledef
);
444 Py_INCREF((PyObject
*)&nbt_node_Type
);
445 PyModule_AddObject(mod
, "Node", (PyObject
*)&nbt_node_Type
);