ctdb-server: Remove duplicate logic
[samba4-gss.git] / source4 / lib / messaging / messaging.c
blob6e9c644b9bb8ca8704bc7237404eb95443726c68
1 /*
2 Unix SMB/CIFS implementation.
4 Samba internal messaging functions
6 Copyright (C) Andrew Tridgell 2004
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 3 of the License, or
11 (at your option) any later version.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 #include "includes.h"
23 #include "lib/events/events.h"
24 #include "lib/util/server_id.h"
25 #include "system/filesys.h"
26 #include "messaging/messaging.h"
27 #include "messaging/messaging_internal.h"
28 #include "../lib/util/dlinklist.h"
29 #include "lib/socket/socket.h"
30 #include "librpc/gen_ndr/ndr_irpc.h"
31 #include "lib/messaging/irpc.h"
32 #include "../lib/util/unix_privs.h"
33 #include "librpc/rpc/dcerpc.h"
34 #include "cluster/cluster.h"
35 #include "../lib/util/tevent_ntstatus.h"
36 #include "lib/param/param.h"
37 #include "lib/util/server_id_db.h"
38 #include "lib/util/talloc_report_printf.h"
39 #include "lib/messaging/messages_dgm.h"
40 #include "lib/messaging/messages_dgm_ref.h"
41 #include "../source3/lib/messages_util.h"
42 #include <tdb.h>
43 #include "lib/util/idtree.h"
45 /* change the message version with any incompatible changes in the protocol */
46 #define IMESSAGING_VERSION 1
49 a pending irpc call
51 struct irpc_request {
52 struct irpc_request *prev, *next;
53 struct imessaging_context *msg_ctx;
54 int callid;
55 struct {
56 void (*handler)(struct irpc_request *irpc, struct irpc_message *m);
57 void *private_data;
58 } incoming;
61 /* we have a linked list of dispatch handlers for each msg_type that
62 this messaging server can deal with */
63 struct dispatch_fn {
64 struct dispatch_fn *next, *prev;
65 uint32_t msg_type;
66 void *private_data;
67 msg_callback_t fn;
70 /* an individual message */
72 static void irpc_handler(struct imessaging_context *,
73 void *,
74 uint32_t,
75 struct server_id,
76 size_t,
77 int *,
78 DATA_BLOB *);
82 A useful function for testing the message system.
84 static void ping_message(struct imessaging_context *msg,
85 void *private_data,
86 uint32_t msg_type,
87 struct server_id src,
88 size_t num_fds,
89 int *fds,
90 DATA_BLOB *data)
92 struct server_id_buf idbuf;
94 if (num_fds != 0) {
95 DBG_WARNING("Received %zu fds, ignoring message\n", num_fds);
96 return;
99 DEBUG(1,("INFO: Received PING message from server %s [%.*s]\n",
100 server_id_str_buf(src, &idbuf), (int)data->length,
101 data->data?(const char *)data->data:""));
102 imessaging_send(msg, src, MSG_PONG, data);
105 static void pool_message(struct imessaging_context *msg,
106 void *private_data,
107 uint32_t msg_type,
108 struct server_id src,
109 size_t num_fds,
110 int *fds,
111 DATA_BLOB *data)
113 FILE *f = NULL;
115 if (num_fds != 1) {
116 DBG_WARNING("Received %zu fds, ignoring message\n", num_fds);
117 return;
120 f = fdopen(fds[0], "w");
121 if (f == NULL) {
122 DBG_DEBUG("fopen failed: %s\n", strerror(errno));
123 return;
126 talloc_full_report_printf(NULL, f);
127 fclose(f);
130 static void ringbuf_log_msg(struct imessaging_context *msg,
131 void *private_data,
132 uint32_t msg_type,
133 struct server_id src,
134 size_t num_fds,
135 int *fds,
136 DATA_BLOB *data)
138 char *log = debug_get_ringbuf();
139 size_t logsize = debug_get_ringbuf_size();
140 DATA_BLOB blob;
142 if (num_fds != 0) {
143 DBG_WARNING("Received %zu fds, ignoring message\n", num_fds);
144 return;
147 if (log == NULL) {
148 log = discard_const_p(char, "*disabled*\n");
149 logsize = strlen(log) + 1;
152 blob.data = (uint8_t *)log;
153 blob.length = logsize;
155 imessaging_send(msg, src, MSG_RINGBUF_LOG, &blob);
158 /****************************************************************************
159 Receive a "set debug level" message.
160 ****************************************************************************/
162 static void debug_imessage(struct imessaging_context *msg_ctx,
163 void *private_data,
164 uint32_t msg_type,
165 struct server_id src,
166 size_t num_fds,
167 int *fds,
168 DATA_BLOB *data)
170 const char *params_str = (const char *)data->data;
171 struct server_id_buf src_buf;
172 struct server_id dst = imessaging_get_server_id(msg_ctx);
173 struct server_id_buf dst_buf;
175 if (num_fds != 0) {
176 DBG_WARNING("Received %zu fds, ignoring message\n", num_fds);
177 return;
180 /* Check, it's a proper string! */
181 if (params_str[(data->length)-1] != '\0') {
182 DBG_ERR("Invalid debug message from pid %s to pid %s\n",
183 server_id_str_buf(src, &src_buf),
184 server_id_str_buf(dst, &dst_buf));
185 return;
188 DBG_ERR("INFO: Remote set of debug to `%s' (pid %s from pid %s)\n",
189 params_str,
190 server_id_str_buf(dst, &dst_buf),
191 server_id_str_buf(src, &src_buf));
193 debug_parse_levels(params_str);
196 /****************************************************************************
197 Return current debug level.
198 ****************************************************************************/
200 static void debuglevel_imessage(struct imessaging_context *msg_ctx,
201 void *private_data,
202 uint32_t msg_type,
203 struct server_id src,
204 size_t num_fds,
205 int *fds,
206 DATA_BLOB *data)
208 char *message = debug_list_class_names_and_levels();
209 DATA_BLOB blob = data_blob_null;
210 struct server_id_buf src_buf;
211 struct server_id dst = imessaging_get_server_id(msg_ctx);
212 struct server_id_buf dst_buf;
214 if (num_fds != 0) {
215 DBG_WARNING("Received %zu fds, ignoring message\n", num_fds);
216 return;
219 DBG_DEBUG("Received REQ_DEBUGLEVEL message (pid %s from pid %s)\n",
220 server_id_str_buf(dst, &dst_buf),
221 server_id_str_buf(src, &src_buf));
223 if (message == NULL) {
224 DBG_ERR("debug_list_class_names_and_levels returned NULL\n");
225 return;
228 blob = data_blob_string_const_null(message);
229 imessaging_send(msg_ctx, src, MSG_DEBUGLEVEL, &blob);
231 TALLOC_FREE(message);
235 return uptime of messaging server via irpc
237 static NTSTATUS irpc_uptime(struct irpc_message *msg,
238 struct irpc_uptime *r)
240 struct imessaging_context *ctx = talloc_get_type(msg->private_data, struct imessaging_context);
241 *r->out.start_time = timeval_to_nttime(&ctx->start_time);
242 return NT_STATUS_OK;
245 static struct dispatch_fn *imessaging_find_dispatch(
246 struct imessaging_context *msg, uint32_t msg_type)
248 /* temporary IDs use an idtree, the rest use a array of pointers */
249 if (msg_type >= MSG_TMP_BASE) {
250 return (struct dispatch_fn *)idr_find(msg->dispatch_tree,
251 msg_type);
253 if (msg_type < msg->num_types) {
254 return msg->dispatch[msg_type];
256 return NULL;
260 Register a dispatch function for a particular message type.
262 NTSTATUS imessaging_register(struct imessaging_context *msg, void *private_data,
263 uint32_t msg_type, msg_callback_t fn)
265 struct dispatch_fn *d;
267 /* possibly expand dispatch array */
268 if (msg_type >= msg->num_types) {
269 struct dispatch_fn **dp;
270 uint32_t i;
271 dp = talloc_realloc(msg, msg->dispatch, struct dispatch_fn *, msg_type+1);
272 NT_STATUS_HAVE_NO_MEMORY(dp);
273 msg->dispatch = dp;
274 for (i=msg->num_types;i<=msg_type;i++) {
275 msg->dispatch[i] = NULL;
277 msg->num_types = msg_type+1;
280 d = talloc_zero(msg->dispatch, struct dispatch_fn);
281 NT_STATUS_HAVE_NO_MEMORY(d);
282 d->msg_type = msg_type;
283 d->private_data = private_data;
284 d->fn = fn;
286 DLIST_ADD(msg->dispatch[msg_type], d);
288 return NT_STATUS_OK;
292 register a temporary message handler. The msg_type is allocated
293 above MSG_TMP_BASE
295 NTSTATUS imessaging_register_tmp(struct imessaging_context *msg, void *private_data,
296 msg_callback_t fn, uint32_t *msg_type)
298 struct dispatch_fn *d;
299 int id;
301 d = talloc_zero(msg->dispatch, struct dispatch_fn);
302 NT_STATUS_HAVE_NO_MEMORY(d);
303 d->private_data = private_data;
304 d->fn = fn;
306 id = idr_get_new_above(msg->dispatch_tree, d, MSG_TMP_BASE, UINT16_MAX);
307 if (id == -1) {
308 talloc_free(d);
309 return NT_STATUS_TOO_MANY_CONTEXT_IDS;
312 d->msg_type = (uint32_t)id;
313 (*msg_type) = d->msg_type;
315 return NT_STATUS_OK;
319 De-register the function for a particular message type. Return the number of
320 functions deregistered.
322 size_t imessaging_deregister(struct imessaging_context *msg, uint32_t msg_type, void *private_data)
324 struct dispatch_fn *d, *next;
325 size_t removed = 0;
327 if (msg_type >= msg->num_types) {
328 d = (struct dispatch_fn *)idr_find(msg->dispatch_tree,
329 msg_type);
330 if (!d) return 0;
331 idr_remove(msg->dispatch_tree, msg_type);
332 talloc_free(d);
333 return 1;
336 for (d = msg->dispatch[msg_type]; d; d = next) {
337 next = d->next;
338 if (d->private_data == private_data) {
339 DLIST_REMOVE(msg->dispatch[msg_type], d);
340 talloc_free(d);
341 ++removed;
345 return removed;
350 int imessaging_cleanup(struct imessaging_context *msg)
352 return 0;
355 static void imessaging_dgm_recv(struct tevent_context *ev,
356 const uint8_t *buf, size_t buf_len,
357 int *fds, size_t num_fds,
358 void *private_data);
360 /* Keep a list of imessaging contexts */
361 static struct imessaging_context *msg_ctxs;
364 * A process has terminated, clean-up any names it has registered.
366 NTSTATUS imessaging_process_cleanup(
367 struct imessaging_context *msg_ctx,
368 pid_t pid)
370 struct irpc_name_records *names = NULL;
371 uint32_t i = 0;
372 uint32_t j = 0;
373 TALLOC_CTX *mem_ctx = talloc_new(NULL);
375 if (mem_ctx == NULL) {
376 DBG_ERR("OOM unable to clean up messaging for process (%d)\n",
377 pid);
378 return NT_STATUS_NO_MEMORY;
381 names = irpc_all_servers(msg_ctx, mem_ctx);
382 if (names == NULL) {
383 TALLOC_FREE(mem_ctx);
384 return NT_STATUS_OK;
386 for (i = 0; i < names->num_records; i++) {
387 for (j = 0; j < names->names[i]->count; j++) {
388 if (names->names[i]->ids[j].pid == pid) {
389 int ret = server_id_db_prune_name(
390 msg_ctx->names,
391 names->names[i]->name,
392 names->names[i]->ids[j]);
393 if (ret != 0 && ret != ENOENT) {
394 TALLOC_FREE(mem_ctx);
395 return map_nt_error_from_unix_common(
396 ret);
401 TALLOC_FREE(mem_ctx);
402 return NT_STATUS_OK;
405 static int imessaging_context_destructor(struct imessaging_context *msg)
407 struct irpc_request *irpc = NULL;
408 struct irpc_request *next = NULL;
410 for (irpc = msg->requests; irpc != NULL; irpc = next) {
411 next = irpc->next;
413 DLIST_REMOVE(msg->requests, irpc);
414 irpc->callid = -1;
417 DLIST_REMOVE(msg_ctxs, msg);
418 TALLOC_FREE(msg->msg_dgm_ref);
419 return 0;
423 * Cleanup messaging dgm contexts on a specific event context.
425 * We must make sure to unref all messaging_dgm_ref's *before* the
426 * tevent context goes away. Only when the last ref is freed, the
427 * refcounted messaging dgm context will be freed.
429 void imessaging_dgm_unref_ev(struct tevent_context *ev)
431 struct imessaging_context *msg = NULL;
433 for (msg = msg_ctxs; msg != NULL; msg = msg->next) {
434 if (msg->ev == ev) {
435 TALLOC_FREE(msg->msg_dgm_ref);
440 static NTSTATUS imessaging_reinit(struct imessaging_context *msg)
442 int ret = -1;
443 struct irpc_request *irpc = NULL;
444 struct irpc_request *next = NULL;
446 for (irpc = msg->requests; irpc != NULL; irpc = next) {
447 next = irpc->next;
449 DLIST_REMOVE(msg->requests, irpc);
450 irpc->callid = -1;
453 TALLOC_FREE(msg->msg_dgm_ref);
455 if (msg->discard_incoming) {
456 msg->num_incoming_listeners = 0;
457 } else {
458 msg->num_incoming_listeners = 1;
461 msg->server_id.pid = getpid();
463 msg->msg_dgm_ref = messaging_dgm_ref(msg,
464 msg->ev,
465 &msg->server_id.unique_id,
466 msg->sock_dir,
467 msg->lock_dir,
468 imessaging_dgm_recv,
469 msg,
470 &ret);
472 if (msg->msg_dgm_ref == NULL) {
473 DEBUG(2, ("messaging_dgm_ref failed: %s\n",
474 strerror(ret)));
475 return map_nt_error_from_unix_common(ret);
478 server_id_db_reinit(msg->names, msg->server_id);
479 return NT_STATUS_OK;
483 * Must be called after a fork.
485 NTSTATUS imessaging_reinit_all(void)
487 struct imessaging_context *msg = NULL;
489 for (msg = msg_ctxs; msg != NULL; msg = msg->next) {
490 NTSTATUS status = imessaging_reinit(msg);
491 if (!NT_STATUS_IS_OK(status)) {
492 return status;
495 return NT_STATUS_OK;
499 create the listening socket and setup the dispatcher
501 static struct imessaging_context *imessaging_init_internal(
502 TALLOC_CTX *mem_ctx,
503 bool discard_incoming,
504 struct loadparm_context *lp_ctx,
505 struct server_id server_id,
506 struct tevent_context *ev)
508 NTSTATUS status;
509 struct imessaging_context *msg;
510 bool ok;
511 int ret;
512 const char *lock_dir = NULL;
513 int tdb_flags = TDB_INCOMPATIBLE_HASH | TDB_CLEAR_IF_FIRST;
515 if (ev == NULL) {
516 return NULL;
519 msg = talloc_zero(mem_ctx, struct imessaging_context);
520 if (msg == NULL) {
521 return NULL;
523 msg->ev = ev;
524 msg->discard_incoming = discard_incoming;
525 if (msg->discard_incoming) {
526 msg->num_incoming_listeners = 0;
527 } else {
528 msg->num_incoming_listeners = 1;
531 talloc_set_destructor(msg, imessaging_context_destructor);
533 /* create the messaging directory if needed */
535 lock_dir = lpcfg_lock_directory(lp_ctx);
536 if (lock_dir == NULL) {
537 goto fail;
540 msg->sock_dir = lpcfg_private_path(msg, lp_ctx, "msg.sock");
541 if (msg->sock_dir == NULL) {
542 goto fail;
544 ok = directory_create_or_exist_strict(msg->sock_dir, geteuid(), 0700);
545 if (!ok) {
546 goto fail;
549 msg->lock_dir = lpcfg_lock_path(msg, lp_ctx, "msg.lock");
550 if (msg->lock_dir == NULL) {
551 goto fail;
553 ok = directory_create_or_exist_strict(msg->lock_dir, geteuid(), 0755);
554 if (!ok) {
555 goto fail;
558 msg->msg_dgm_ref = messaging_dgm_ref(
559 msg, ev, &server_id.unique_id, msg->sock_dir, msg->lock_dir,
560 imessaging_dgm_recv, msg, &ret);
562 if (msg->msg_dgm_ref == NULL) {
563 goto fail;
566 msg->server_id = server_id;
567 msg->idr = idr_init(msg);
568 if (msg->idr == NULL) {
569 goto fail;
572 msg->dispatch_tree = idr_init(msg);
573 if (msg->dispatch_tree == NULL) {
574 goto fail;
577 msg->start_time = timeval_current();
579 tdb_flags |= lpcfg_tdb_flags(lp_ctx, 0);
582 * This context holds a destructor that cleans up any names
583 * registered on this context on talloc_free()
585 msg->names = server_id_db_init(msg, server_id, lock_dir, 0, tdb_flags);
586 if (msg->names == NULL) {
587 goto fail;
590 status = imessaging_register(msg, NULL, MSG_PING, ping_message);
591 if (!NT_STATUS_IS_OK(status)) {
592 goto fail;
594 status = imessaging_register(msg, NULL, MSG_REQ_POOL_USAGE,
595 pool_message);
596 if (!NT_STATUS_IS_OK(status)) {
597 goto fail;
599 status = imessaging_register(msg, NULL, MSG_IRPC, irpc_handler);
600 if (!NT_STATUS_IS_OK(status)) {
601 goto fail;
603 status = imessaging_register(msg, NULL, MSG_REQ_RINGBUF_LOG,
604 ringbuf_log_msg);
605 if (!NT_STATUS_IS_OK(status)) {
606 goto fail;
608 status = imessaging_register(msg, NULL, MSG_DEBUG,
609 debug_imessage);
610 if (!NT_STATUS_IS_OK(status)) {
611 goto fail;
613 status = imessaging_register(msg, NULL, MSG_REQ_DEBUGLEVEL,
614 debuglevel_imessage);
615 if (!NT_STATUS_IS_OK(status)) {
616 goto fail;
618 status = IRPC_REGISTER(msg, irpc, IRPC_UPTIME, irpc_uptime, msg);
619 if (!NT_STATUS_IS_OK(status)) {
620 goto fail;
622 #if defined(DEVELOPER) || defined(ENABLE_SELFTEST)
624 * Register handlers for messages specific to developer and
625 * self test builds
627 status = imessaging_register_extra_handlers(msg);
628 if (!NT_STATUS_IS_OK(status)) {
629 goto fail;
631 #endif /* defined(DEVELOPER) || defined(ENABLE_SELFTEST) */
633 DLIST_ADD(msg_ctxs, msg);
635 return msg;
636 fail:
637 talloc_free(msg);
638 return NULL;
642 create the listening socket and setup the dispatcher
644 struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
645 struct loadparm_context *lp_ctx,
646 struct server_id server_id,
647 struct tevent_context *ev)
649 bool discard_incoming = false;
650 return imessaging_init_internal(mem_ctx,
651 discard_incoming,
652 lp_ctx,
653 server_id,
654 ev);
657 struct imessaging_context *imessaging_init_discard_incoming(
658 TALLOC_CTX *mem_ctx,
659 struct loadparm_context *lp_ctx,
660 struct server_id server_id,
661 struct tevent_context *ev)
663 bool discard_incoming = true;
664 return imessaging_init_internal(mem_ctx,
665 discard_incoming,
666 lp_ctx,
667 server_id,
668 ev);
671 struct imessaging_post_state {
672 struct imessaging_context *msg_ctx;
673 struct imessaging_post_state **busy_ref;
674 size_t buf_len;
675 uint8_t buf[];
678 static int imessaging_post_state_destructor(struct imessaging_post_state *state)
680 if (state->busy_ref != NULL) {
681 *state->busy_ref = NULL;
682 state->busy_ref = NULL;
684 return 0;
687 static void imessaging_post_handler(struct tevent_context *ev,
688 struct tevent_immediate *ti,
689 void *private_data)
691 struct imessaging_post_state *state = talloc_get_type_abort(
692 private_data, struct imessaging_post_state);
694 if (state == NULL) {
695 return;
699 * In usecases like using messaging_client_init() with irpc processing
700 * we may free the imessaging_context during the messaging handler.
701 * imessaging_post_state is a child of imessaging_context and
702 * might be implicitly free'ed before the explicit TALLOC_FREE(state).
704 * The busy_ref pointer makes sure the destructor clears
705 * the local 'state' variable.
708 SMB_ASSERT(state->busy_ref == NULL);
709 state->busy_ref = &state;
711 imessaging_dgm_recv(ev, state->buf, state->buf_len, NULL, 0,
712 state->msg_ctx);
714 state->busy_ref = NULL;
715 TALLOC_FREE(state);
718 static int imessaging_post_self(struct imessaging_context *msg,
719 const uint8_t *buf, size_t buf_len)
721 struct tevent_immediate *ti;
722 struct imessaging_post_state *state;
724 state = talloc_size(
725 msg, offsetof(struct imessaging_post_state, buf) + buf_len);
726 if (state == NULL) {
727 return ENOMEM;
729 talloc_set_name_const(state, "struct imessaging_post_state");
731 talloc_set_destructor(state, imessaging_post_state_destructor);
733 ti = tevent_create_immediate(state);
734 if (ti == NULL) {
735 TALLOC_FREE(state);
736 return ENOMEM;
739 state->msg_ctx = msg;
740 state->busy_ref = NULL;
741 state->buf_len = buf_len;
742 memcpy(state->buf, buf, buf_len);
744 tevent_schedule_immediate(ti, msg->ev, imessaging_post_handler,
745 state);
747 return 0;
750 static void imessaging_dgm_recv(struct tevent_context *ev,
751 const uint8_t *buf, size_t buf_len,
752 int *fds, size_t num_fds,
753 void *private_data)
755 struct imessaging_context *msg = talloc_get_type_abort(
756 private_data, struct imessaging_context);
757 uint32_t msg_type;
758 struct server_id src, dst;
759 struct server_id_buf srcbuf, dstbuf;
760 DATA_BLOB data;
762 if (buf_len < MESSAGE_HDR_LENGTH) {
763 /* Invalid message, ignore */
764 return;
767 if (msg->num_incoming_listeners == 0) {
768 struct server_id_buf selfbuf;
770 message_hdr_get(&msg_type, &src, &dst, buf);
772 DBG_DEBUG("not listening - discarding message from "
773 "src[%s] to dst[%s] (self[%s]) type=0x%x "
774 "on %s event context\n",
775 server_id_str_buf(src, &srcbuf),
776 server_id_str_buf(dst, &dstbuf),
777 server_id_str_buf(msg->server_id, &selfbuf),
778 (unsigned)msg_type,
779 (ev != msg->ev) ? "different" : "main");
780 return;
783 if (ev != msg->ev) {
784 int ret;
785 ret = imessaging_post_self(msg, buf, buf_len);
786 if (ret != 0) {
787 DBG_WARNING("imessaging_post_self failed: %s\n",
788 strerror(ret));
790 return;
793 message_hdr_get(&msg_type, &src, &dst, buf);
795 data.data = discard_const_p(uint8_t, buf + MESSAGE_HDR_LENGTH);
796 data.length = buf_len - MESSAGE_HDR_LENGTH;
798 if ((cluster_id_equal(&dst, &msg->server_id)) ||
799 ((dst.task_id == 0) && (msg->server_id.pid == 0))) {
800 struct dispatch_fn *d, *next;
802 DEBUG(10, ("%s: dst %s matches my id: %s, type=0x%x\n",
803 __func__,
804 server_id_str_buf(dst, &dstbuf),
805 server_id_str_buf(msg->server_id, &srcbuf),
806 (unsigned)msg_type));
808 d = imessaging_find_dispatch(msg, msg_type);
810 for (; d; d = next) {
811 next = d->next;
812 d->fn(msg,
813 d->private_data,
814 d->msg_type,
815 src,
816 num_fds,
817 fds,
818 &data);
820 } else {
821 DEBUG(10, ("%s: Ignoring type=0x%x dst %s, I am %s, \n",
822 __func__, (unsigned)msg_type,
823 server_id_str_buf(dst, &dstbuf),
824 server_id_str_buf(msg->server_id, &srcbuf)));
829 A hack, for the short term until we get 'client only' messaging in place
831 struct imessaging_context *imessaging_client_init(TALLOC_CTX *mem_ctx,
832 struct loadparm_context *lp_ctx,
833 struct tevent_context *ev)
835 struct server_id id = {
836 .pid = getpid(),
837 .task_id = generate_random(),
838 .vnn = NONCLUSTER_VNN,
840 /* This is because we are not in the s3 serverid database */
841 .unique_id = SERVERID_UNIQUE_ID_NOT_TO_VERIFY,
844 return imessaging_init_discard_incoming(mem_ctx, lp_ctx, id, ev);
848 a list of registered irpc server functions
850 struct irpc_list {
851 struct irpc_list *next, *prev;
852 struct GUID uuid;
853 const struct ndr_interface_table *table;
854 int callnum;
855 irpc_function_t fn;
856 void *private_data;
861 register a irpc server function
863 NTSTATUS irpc_register(struct imessaging_context *msg_ctx,
864 const struct ndr_interface_table *table,
865 int callnum, irpc_function_t fn, void *private_data)
867 struct irpc_list *irpc;
869 /* override an existing handler, if any */
870 for (irpc=msg_ctx->irpc; irpc; irpc=irpc->next) {
871 if (irpc->table == table && irpc->callnum == callnum) {
872 break;
875 if (irpc == NULL) {
876 irpc = talloc(msg_ctx, struct irpc_list);
877 NT_STATUS_HAVE_NO_MEMORY(irpc);
878 DLIST_ADD(msg_ctx->irpc, irpc);
881 irpc->table = table;
882 irpc->callnum = callnum;
883 irpc->fn = fn;
884 irpc->private_data = private_data;
885 irpc->uuid = irpc->table->syntax_id.uuid;
887 return NT_STATUS_OK;
892 handle an incoming irpc reply message
894 static void irpc_handler_reply(struct imessaging_context *msg_ctx, struct irpc_message *m)
896 struct irpc_request *irpc;
898 irpc = (struct irpc_request *)idr_find(msg_ctx->idr, m->header.callid);
899 if (irpc == NULL) return;
901 irpc->incoming.handler(irpc, m);
905 send a irpc reply
907 NTSTATUS irpc_send_reply(struct irpc_message *m, NTSTATUS status)
909 struct ndr_push *push;
910 DATA_BLOB packet;
911 enum ndr_err_code ndr_err;
913 m->header.status = status;
915 /* setup the reply */
916 push = ndr_push_init_ctx(m->ndr);
917 if (push == NULL) {
918 status = NT_STATUS_NO_MEMORY;
919 goto failed;
922 m->header.flags |= IRPC_FLAG_REPLY;
923 m->header.creds.token= NULL;
925 /* construct the packet */
926 ndr_err = ndr_push_irpc_header(push, NDR_SCALARS|NDR_BUFFERS, &m->header);
927 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
928 status = ndr_map_error2ntstatus(ndr_err);
929 goto failed;
932 ndr_err = m->irpc->table->calls[m->irpc->callnum].ndr_push(push, NDR_OUT, m->data);
933 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
934 status = ndr_map_error2ntstatus(ndr_err);
935 goto failed;
938 /* send the reply message */
939 packet = ndr_push_blob(push);
940 status = imessaging_send(m->msg_ctx, m->from, MSG_IRPC, &packet);
941 if (!NT_STATUS_IS_OK(status)) goto failed;
943 failed:
944 talloc_free(m);
945 return status;
949 handle an incoming irpc request message
951 static void irpc_handler_request(struct imessaging_context *msg_ctx,
952 struct irpc_message *m)
954 struct irpc_list *i;
955 void *r;
956 enum ndr_err_code ndr_err;
958 for (i=msg_ctx->irpc; i; i=i->next) {
959 if (GUID_equal(&i->uuid, &m->header.uuid) &&
960 i->table->syntax_id.if_version == m->header.if_version &&
961 i->callnum == m->header.callnum) {
962 break;
966 if (i == NULL) {
967 /* no registered handler for this message */
968 talloc_free(m);
969 return;
972 /* allocate space for the structure */
973 r = talloc_zero_size(m->ndr, i->table->calls[m->header.callnum].struct_size);
974 if (r == NULL) goto failed;
976 m->ndr->flags |= LIBNDR_FLAG_REF_ALLOC;
978 /* parse the request data */
979 ndr_err = i->table->calls[i->callnum].ndr_pull(m->ndr, NDR_IN, r);
980 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
982 /* make the call */
983 m->private_data= i->private_data;
984 m->defer_reply = false;
985 m->no_reply = false;
986 m->msg_ctx = msg_ctx;
987 m->irpc = i;
988 m->data = r;
990 m->header.status = i->fn(m, r);
992 if (m->no_reply) {
993 /* the server function won't ever be replying to this request */
994 talloc_free(m);
995 return;
998 if (m->defer_reply) {
999 /* the server function has asked to defer the reply to later */
1000 talloc_steal(msg_ctx, m);
1001 return;
1004 irpc_send_reply(m, m->header.status);
1005 return;
1007 failed:
1008 talloc_free(m);
1012 handle an incoming irpc message
1014 static void irpc_handler(struct imessaging_context *msg_ctx,
1015 void *private_data,
1016 uint32_t msg_type,
1017 struct server_id src,
1018 size_t num_fds,
1019 int *fds,
1020 DATA_BLOB *packet)
1022 struct irpc_message *m;
1023 enum ndr_err_code ndr_err;
1025 if (num_fds != 0) {
1026 DBG_WARNING("Received %zu fds, ignoring message\n", num_fds);
1027 return;
1030 m = talloc(msg_ctx, struct irpc_message);
1031 if (m == NULL) goto failed;
1033 m->from = src;
1035 m->ndr = ndr_pull_init_blob(packet, m);
1036 if (m->ndr == NULL) goto failed;
1038 m->ndr->flags |= LIBNDR_FLAG_REF_ALLOC;
1040 ndr_err = ndr_pull_irpc_header(m->ndr, NDR_BUFFERS|NDR_SCALARS, &m->header);
1041 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) goto failed;
1043 if (m->header.flags & IRPC_FLAG_REPLY) {
1044 irpc_handler_reply(msg_ctx, m);
1045 } else {
1046 irpc_handler_request(msg_ctx, m);
1048 return;
1050 failed:
1051 talloc_free(m);
1056 destroy a irpc request
1058 static int irpc_destructor(struct irpc_request *irpc)
1060 if (irpc->callid != -1) {
1061 DLIST_REMOVE(irpc->msg_ctx->requests, irpc);
1062 idr_remove(irpc->msg_ctx->idr, irpc->callid);
1063 if (irpc->msg_ctx->discard_incoming) {
1064 SMB_ASSERT(irpc->msg_ctx->num_incoming_listeners > 0);
1065 } else {
1066 SMB_ASSERT(irpc->msg_ctx->num_incoming_listeners > 1);
1068 irpc->msg_ctx->num_incoming_listeners -= 1;
1069 irpc->callid = -1;
1072 return 0;
1076 add a string name that this irpc server can be called on
1078 It will be removed from the DB either via irpc_remove_name or on
1079 talloc_free(msg_ctx->names).
1081 NTSTATUS irpc_add_name(struct imessaging_context *msg_ctx, const char *name)
1083 int ret;
1085 ret = server_id_db_add(msg_ctx->names, name);
1086 if (ret != 0) {
1087 return map_nt_error_from_unix_common(ret);
1089 return NT_STATUS_OK;
1092 static int all_servers_func(const char *name, unsigned num_servers,
1093 const struct server_id *servers,
1094 void *private_data)
1096 struct irpc_name_records *name_records = talloc_get_type(
1097 private_data, struct irpc_name_records);
1098 struct irpc_name_record *name_record;
1099 uint32_t i;
1101 name_records->names
1102 = talloc_realloc(name_records, name_records->names,
1103 struct irpc_name_record *, name_records->num_records+1);
1104 if (!name_records->names) {
1105 return -1;
1108 name_records->names[name_records->num_records] = name_record
1109 = talloc(name_records->names,
1110 struct irpc_name_record);
1111 if (!name_record) {
1112 return -1;
1115 name_records->num_records++;
1117 name_record->name = talloc_strdup(name_record, name);
1118 if (!name_record->name) {
1119 return -1;
1122 name_record->count = num_servers;
1123 name_record->ids = talloc_array(name_record, struct server_id,
1124 num_servers);
1125 if (name_record->ids == NULL) {
1126 return -1;
1128 for (i=0;i<name_record->count;i++) {
1129 name_record->ids[i] = servers[i];
1131 return 0;
1135 return a list of server ids for a server name
1137 struct irpc_name_records *irpc_all_servers(struct imessaging_context *msg_ctx,
1138 TALLOC_CTX *mem_ctx)
1140 int ret;
1141 struct irpc_name_records *name_records = talloc_zero(mem_ctx, struct irpc_name_records);
1142 if (name_records == NULL) {
1143 return NULL;
1146 ret = server_id_db_traverse_read(msg_ctx->names, all_servers_func,
1147 name_records);
1148 if (ret == -1) {
1149 TALLOC_FREE(name_records);
1150 return NULL;
1153 return name_records;
1157 remove a name from a messaging context
1159 void irpc_remove_name(struct imessaging_context *msg_ctx, const char *name)
1161 server_id_db_remove(msg_ctx->names, name);
1164 struct server_id imessaging_get_server_id(struct imessaging_context *msg_ctx)
1166 return msg_ctx->server_id;
1169 struct irpc_bh_state {
1170 struct imessaging_context *msg_ctx;
1171 const struct dcerpc_binding *binding;
1172 struct server_id server_id;
1173 const struct ndr_interface_table *table;
1174 uint32_t timeout;
1175 struct security_token *token;
1178 static const struct dcerpc_binding *irpc_bh_get_binding(struct dcerpc_binding_handle *h)
1180 struct irpc_bh_state *hs = dcerpc_binding_handle_data(h,
1181 struct irpc_bh_state);
1183 return hs->binding;
1186 static bool irpc_bh_is_connected(struct dcerpc_binding_handle *h)
1188 struct irpc_bh_state *hs = dcerpc_binding_handle_data(h,
1189 struct irpc_bh_state);
1191 if (!hs->msg_ctx) {
1192 return false;
1195 return true;
1198 static uint32_t irpc_bh_set_timeout(struct dcerpc_binding_handle *h,
1199 uint32_t timeout)
1201 struct irpc_bh_state *hs = dcerpc_binding_handle_data(h,
1202 struct irpc_bh_state);
1203 uint32_t old = hs->timeout;
1205 hs->timeout = timeout;
1207 return old;
1210 struct irpc_bh_raw_call_state {
1211 struct irpc_request *irpc;
1212 uint32_t opnum;
1213 DATA_BLOB in_data;
1214 DATA_BLOB in_packet;
1215 DATA_BLOB out_data;
1218 static void irpc_bh_raw_call_incoming_handler(struct irpc_request *irpc,
1219 struct irpc_message *m);
1221 static struct tevent_req *irpc_bh_raw_call_send(TALLOC_CTX *mem_ctx,
1222 struct tevent_context *ev,
1223 struct dcerpc_binding_handle *h,
1224 const struct GUID *object,
1225 uint32_t opnum,
1226 uint32_t in_flags,
1227 const uint8_t *in_data,
1228 size_t in_length)
1230 struct irpc_bh_state *hs =
1231 dcerpc_binding_handle_data(h,
1232 struct irpc_bh_state);
1233 struct tevent_req *req;
1234 struct irpc_bh_raw_call_state *state;
1235 bool ok;
1236 struct irpc_header header;
1237 struct ndr_push *ndr;
1238 NTSTATUS status;
1239 enum ndr_err_code ndr_err;
1241 req = tevent_req_create(mem_ctx, &state,
1242 struct irpc_bh_raw_call_state);
1243 if (req == NULL) {
1244 return NULL;
1246 state->opnum = opnum;
1247 state->in_data.data = discard_const_p(uint8_t, in_data);
1248 state->in_data.length = in_length;
1250 ok = irpc_bh_is_connected(h);
1251 if (!ok) {
1252 tevent_req_nterror(req, NT_STATUS_CONNECTION_DISCONNECTED);
1253 return tevent_req_post(req, ev);
1256 state->irpc = talloc_zero(state, struct irpc_request);
1257 if (tevent_req_nomem(state->irpc, req)) {
1258 return tevent_req_post(req, ev);
1261 state->irpc->msg_ctx = hs->msg_ctx;
1262 state->irpc->callid = idr_get_new(hs->msg_ctx->idr,
1263 state->irpc, UINT16_MAX);
1264 if (state->irpc->callid == -1) {
1265 tevent_req_nterror(req, NT_STATUS_INSUFFICIENT_RESOURCES);
1266 return tevent_req_post(req, ev);
1268 state->irpc->incoming.handler = irpc_bh_raw_call_incoming_handler;
1269 state->irpc->incoming.private_data = req;
1271 /* make sure we accept incoming messages */
1272 SMB_ASSERT(state->irpc->msg_ctx->num_incoming_listeners < UINT64_MAX);
1273 state->irpc->msg_ctx->num_incoming_listeners += 1;
1274 DLIST_ADD_END(state->irpc->msg_ctx->requests, state->irpc);
1275 talloc_set_destructor(state->irpc, irpc_destructor);
1277 /* setup the header */
1278 header.uuid = hs->table->syntax_id.uuid;
1280 header.if_version = hs->table->syntax_id.if_version;
1281 header.callid = state->irpc->callid;
1282 header.callnum = state->opnum;
1283 header.flags = 0;
1284 header.status = NT_STATUS_OK;
1285 header.creds.token= hs->token;
1287 /* construct the irpc packet */
1288 ndr = ndr_push_init_ctx(state->irpc);
1289 if (tevent_req_nomem(ndr, req)) {
1290 return tevent_req_post(req, ev);
1293 ndr_err = ndr_push_irpc_header(ndr, NDR_SCALARS|NDR_BUFFERS, &header);
1294 status = ndr_map_error2ntstatus(ndr_err);
1295 if (!NT_STATUS_IS_OK(status)) {
1296 tevent_req_nterror(req, status);
1297 return tevent_req_post(req, ev);
1300 ndr_err = ndr_push_bytes(ndr, in_data, in_length);
1301 status = ndr_map_error2ntstatus(ndr_err);
1302 if (!NT_STATUS_IS_OK(status)) {
1303 tevent_req_nterror(req, status);
1304 return tevent_req_post(req, ev);
1307 /* and send it */
1308 state->in_packet = ndr_push_blob(ndr);
1309 status = imessaging_send(hs->msg_ctx, hs->server_id,
1310 MSG_IRPC, &state->in_packet);
1311 if (!NT_STATUS_IS_OK(status)) {
1312 tevent_req_nterror(req, status);
1313 return tevent_req_post(req, ev);
1316 if (hs->timeout != IRPC_CALL_TIMEOUT_INF) {
1317 /* set timeout-callback in case caller wants that */
1318 ok = tevent_req_set_endtime(req, ev, timeval_current_ofs(hs->timeout, 0));
1319 if (!ok) {
1320 return tevent_req_post(req, ev);
1324 return req;
1327 static void irpc_bh_raw_call_incoming_handler(struct irpc_request *irpc,
1328 struct irpc_message *m)
1330 struct tevent_req *req =
1331 talloc_get_type_abort(irpc->incoming.private_data,
1332 struct tevent_req);
1333 struct irpc_bh_raw_call_state *state =
1334 tevent_req_data(req,
1335 struct irpc_bh_raw_call_state);
1337 talloc_steal(state, m);
1339 if (!NT_STATUS_IS_OK(m->header.status)) {
1340 tevent_req_nterror(req, m->header.status);
1341 return;
1344 state->out_data = data_blob_talloc(state,
1345 m->ndr->data + m->ndr->offset,
1346 m->ndr->data_size - m->ndr->offset);
1347 if ((m->ndr->data_size - m->ndr->offset) > 0 && !state->out_data.data) {
1348 tevent_req_oom(req);
1349 return;
1352 tevent_req_done(req);
1355 static NTSTATUS irpc_bh_raw_call_recv(struct tevent_req *req,
1356 TALLOC_CTX *mem_ctx,
1357 uint8_t **out_data,
1358 size_t *out_length,
1359 uint32_t *out_flags)
1361 struct irpc_bh_raw_call_state *state =
1362 tevent_req_data(req,
1363 struct irpc_bh_raw_call_state);
1364 NTSTATUS status;
1366 if (tevent_req_is_nterror(req, &status)) {
1367 tevent_req_received(req);
1368 return status;
1371 *out_data = talloc_move(mem_ctx, &state->out_data.data);
1372 *out_length = state->out_data.length;
1373 *out_flags = 0;
1374 tevent_req_received(req);
1375 return NT_STATUS_OK;
1378 struct irpc_bh_disconnect_state {
1379 uint8_t _dummy;
1382 static struct tevent_req *irpc_bh_disconnect_send(TALLOC_CTX *mem_ctx,
1383 struct tevent_context *ev,
1384 struct dcerpc_binding_handle *h)
1386 struct irpc_bh_state *hs = dcerpc_binding_handle_data(h,
1387 struct irpc_bh_state);
1388 struct tevent_req *req;
1389 struct irpc_bh_disconnect_state *state;
1390 bool ok;
1392 req = tevent_req_create(mem_ctx, &state,
1393 struct irpc_bh_disconnect_state);
1394 if (req == NULL) {
1395 return NULL;
1398 ok = irpc_bh_is_connected(h);
1399 if (!ok) {
1400 tevent_req_nterror(req, NT_STATUS_CONNECTION_DISCONNECTED);
1401 return tevent_req_post(req, ev);
1404 hs->msg_ctx = NULL;
1406 tevent_req_done(req);
1407 return tevent_req_post(req, ev);
1410 static NTSTATUS irpc_bh_disconnect_recv(struct tevent_req *req)
1412 NTSTATUS status;
1414 if (tevent_req_is_nterror(req, &status)) {
1415 tevent_req_received(req);
1416 return status;
1419 tevent_req_received(req);
1420 return NT_STATUS_OK;
1423 static bool irpc_bh_ref_alloc(struct dcerpc_binding_handle *h)
1425 return true;
1428 static void irpc_bh_do_ndr_print(struct dcerpc_binding_handle *h,
1429 ndr_flags_type ndr_flags,
1430 const void *_struct_ptr,
1431 const struct ndr_interface_call *call)
1433 void *struct_ptr = discard_const(_struct_ptr);
1434 bool print_in = false;
1435 bool print_out = false;
1437 if (DEBUGLEVEL >= 11) {
1438 print_in = true;
1439 print_out = true;
1442 if (ndr_flags & NDR_IN) {
1443 if (print_in) {
1444 ndr_print_function_debug(call->ndr_print,
1445 call->name,
1446 ndr_flags,
1447 struct_ptr);
1450 if (ndr_flags & NDR_OUT) {
1451 if (print_out) {
1452 ndr_print_function_debug(call->ndr_print,
1453 call->name,
1454 ndr_flags,
1455 struct_ptr);
1460 static const struct dcerpc_binding_handle_ops irpc_bh_ops = {
1461 .name = "wbint",
1462 .get_binding = irpc_bh_get_binding,
1463 .is_connected = irpc_bh_is_connected,
1464 .set_timeout = irpc_bh_set_timeout,
1465 .raw_call_send = irpc_bh_raw_call_send,
1466 .raw_call_recv = irpc_bh_raw_call_recv,
1467 .disconnect_send = irpc_bh_disconnect_send,
1468 .disconnect_recv = irpc_bh_disconnect_recv,
1470 .ref_alloc = irpc_bh_ref_alloc,
1471 .do_ndr_print = irpc_bh_do_ndr_print,
1474 /* initialise a irpc binding handle */
1475 struct dcerpc_binding_handle *irpc_binding_handle(TALLOC_CTX *mem_ctx,
1476 struct imessaging_context *msg_ctx,
1477 struct server_id server_id,
1478 const struct ndr_interface_table *table)
1480 struct dcerpc_binding_handle *h = NULL;
1481 struct irpc_bh_state *hs = NULL;
1482 struct dcerpc_binding *b = NULL;
1483 NTSTATUS status;
1485 h = dcerpc_binding_handle_create(mem_ctx,
1486 &irpc_bh_ops,
1487 NULL,
1488 table,
1489 &hs,
1490 struct irpc_bh_state,
1491 __location__);
1492 if (h == NULL) {
1493 return NULL;
1495 hs->msg_ctx = msg_ctx;
1496 hs->server_id = server_id;
1497 hs->table = table;
1498 hs->timeout = IRPC_CALL_TIMEOUT;
1500 status = dcerpc_parse_binding(hs, "", &b);
1501 if (!NT_STATUS_IS_OK(status)) {
1502 TALLOC_FREE(h);
1503 return NULL;
1505 status = dcerpc_binding_set_transport(b, NCACN_INTERNAL);
1506 if (!NT_STATUS_IS_OK(status)) {
1507 TALLOC_FREE(h);
1508 return NULL;
1510 status = dcerpc_binding_set_string_option(b, "host", "localhost");
1511 if (!NT_STATUS_IS_OK(status)) {
1512 TALLOC_FREE(h);
1513 return NULL;
1515 status = dcerpc_binding_set_string_option(b, "endpoint", "irpc");
1516 if (!NT_STATUS_IS_OK(status)) {
1517 TALLOC_FREE(h);
1518 return NULL;
1520 status = dcerpc_binding_set_abstract_syntax(b, &table->syntax_id);
1521 if (!NT_STATUS_IS_OK(status)) {
1522 TALLOC_FREE(h);
1523 return NULL;
1526 hs->binding = b;
1528 return h;
1531 struct dcerpc_binding_handle *irpc_binding_handle_by_name(TALLOC_CTX *mem_ctx,
1532 struct imessaging_context *msg_ctx,
1533 const char *dest_task,
1534 const struct ndr_interface_table *table)
1536 struct dcerpc_binding_handle *h;
1537 unsigned num_sids;
1538 struct server_id *sids;
1539 struct server_id sid;
1540 NTSTATUS status;
1542 /* find the server task */
1544 status = irpc_servers_byname(msg_ctx, mem_ctx, dest_task,
1545 &num_sids, &sids);
1546 if (!NT_STATUS_IS_OK(status)) {
1547 errno = EADDRNOTAVAIL;
1548 return NULL;
1550 sid = sids[0];
1551 talloc_free(sids);
1553 h = irpc_binding_handle(mem_ctx, msg_ctx,
1554 sid, table);
1555 if (h == NULL) {
1556 return NULL;
1559 return h;
1562 void irpc_binding_handle_add_security_token(struct dcerpc_binding_handle *h,
1563 struct security_token *token)
1565 struct irpc_bh_state *hs =
1566 dcerpc_binding_handle_data(h,
1567 struct irpc_bh_state);
1569 hs->token = token;