ctdb-scripts: Move connection tracking to 10.interface
[samba4-gss.git] / source3 / smbd / notifyd / notifyd.c
blob0b07ab3e4354efb5a8a2817f891f97970e9e36ff
1 /*
2 * Unix SMB/CIFS implementation.
4 * Copyright (C) Volker Lendecke 2014
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "replace.h"
21 #include <tevent.h>
22 #include "notifyd_private.h"
23 #include "lib/util/server_id.h"
24 #include "lib/util/data_blob.h"
25 #include "librpc/gen_ndr/notify.h"
26 #include "librpc/gen_ndr/messaging.h"
27 #include "librpc/gen_ndr/server_id.h"
28 #include "lib/dbwrap/dbwrap.h"
29 #include "lib/dbwrap/dbwrap_rbt.h"
30 #include "messages.h"
31 #include "tdb.h"
32 #include "util_tdb.h"
33 #include "notifyd.h"
34 #include "lib/util/server_id_db.h"
35 #include "lib/util/tevent_unix.h"
36 #include "lib/util/tevent_ntstatus.h"
37 #include "ctdbd_conn.h"
38 #include "ctdb_srvids.h"
39 #include "server_id_db_util.h"
40 #include "lib/util/iov_buf.h"
41 #include "messages_util.h"
43 #ifdef CLUSTER_SUPPORT
44 #include "ctdb_protocol.h"
45 #endif
47 struct notifyd_peer;
50 * All of notifyd's state
53 struct notifyd_state {
54 struct tevent_context *ev;
55 struct messaging_context *msg_ctx;
56 struct ctdbd_connection *ctdbd_conn;
59 * Database of everything clients show interest in. Indexed by
60 * absolute path. The database keys are not 0-terminated
61 * to allow the critical operation, notifyd_trigger, to walk
62 * the structure from the top without adding intermediate 0s.
63 * The database records contain an array of
65 * struct notifyd_instance
67 * to be maintained and parsed by notifyd_parse_entry()
69 struct db_context *entries;
72 * In the cluster case, this is the place where we store a log
73 * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1
74 * forward them to our peer notifyd's in the cluster once a
75 * second or when the log grows too large.
78 struct messaging_reclog *log;
81 * Array of companion notifyd's in a cluster. Every notifyd
82 * broadcasts its messaging_reclog to every other notifyd in
83 * the cluster. This is done by making ctdb send a message to
84 * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node
85 * number CTDB_BROADCAST_CONNECTED. Everybody in the cluster who
86 * had called register_with_ctdbd this srvid will receive the
87 * broadcasts.
89 * Database replication happens via these broadcasts. Also,
90 * they serve as liveness indication. If a notifyd receives a
91 * broadcast from an unknown peer, it will create one for this
92 * srvid. Also when we don't hear anything from a peer for a
93 * while, we will discard it.
96 struct notifyd_peer **peers;
97 size_t num_peers;
99 sys_notify_watch_fn sys_notify_watch;
100 struct sys_notify_context *sys_notify_ctx;
103 struct notifyd_peer {
104 struct notifyd_state *state;
105 struct server_id pid;
106 uint64_t rec_index;
107 struct db_context *db;
108 time_t last_broadcast;
111 static void notifyd_rec_change(struct messaging_context *msg_ctx,
112 void *private_data, uint32_t msg_type,
113 struct server_id src, DATA_BLOB *data);
114 static void notifyd_trigger(struct messaging_context *msg_ctx,
115 void *private_data, uint32_t msg_type,
116 struct server_id src, DATA_BLOB *data);
117 static void notifyd_get_db(struct messaging_context *msg_ctx,
118 void *private_data, uint32_t msg_type,
119 struct server_id src, DATA_BLOB *data);
121 #ifdef CLUSTER_SUPPORT
122 static void notifyd_got_db(struct messaging_context *msg_ctx,
123 void *private_data, uint32_t msg_type,
124 struct server_id src, DATA_BLOB *data);
125 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
126 struct server_id src,
127 struct messaging_reclog *log);
128 #endif
129 static void notifyd_sys_callback(struct sys_notify_context *ctx,
130 void *private_data, struct notify_event *ev,
131 uint32_t filter);
133 #ifdef CLUSTER_SUPPORT
134 static struct tevent_req *notifyd_broadcast_reclog_send(
135 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
136 struct ctdbd_connection *ctdbd_conn, struct server_id src,
137 struct messaging_reclog *log);
138 static int notifyd_broadcast_reclog_recv(struct tevent_req *req);
140 static struct tevent_req *notifyd_clean_peers_send(
141 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
142 struct notifyd_state *notifyd);
143 static int notifyd_clean_peers_recv(struct tevent_req *req);
144 #endif
146 static int sys_notify_watch_dummy(
147 TALLOC_CTX *mem_ctx,
148 struct sys_notify_context *ctx,
149 const char *path,
150 uint32_t *filter,
151 uint32_t *subdir_filter,
152 void (*callback)(struct sys_notify_context *ctx,
153 void *private_data,
154 struct notify_event *ev,
155 uint32_t filter),
156 void *private_data,
157 void *handle_p)
159 void **handle = handle_p;
160 *handle = NULL;
161 return 0;
164 #ifdef CLUSTER_SUPPORT
165 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq);
166 static void notifyd_clean_peers_finished(struct tevent_req *subreq);
167 static int notifyd_snoop_broadcast(struct tevent_context *ev,
168 uint32_t src_vnn, uint32_t dst_vnn,
169 uint64_t dst_srvid,
170 const uint8_t *msg, size_t msglen,
171 void *private_data);
172 #endif
174 struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
175 struct messaging_context *msg_ctx,
176 struct ctdbd_connection *ctdbd_conn,
177 sys_notify_watch_fn sys_notify_watch,
178 struct sys_notify_context *sys_notify_ctx)
180 struct tevent_req *req;
181 #ifdef CLUSTER_SUPPORT
182 struct tevent_req *subreq;
183 #endif
184 struct notifyd_state *state;
185 struct server_id_db *names_db;
186 NTSTATUS status;
187 int ret;
189 req = tevent_req_create(mem_ctx, &state, struct notifyd_state);
190 if (req == NULL) {
191 return NULL;
193 state->ev = ev;
194 state->msg_ctx = msg_ctx;
195 state->ctdbd_conn = ctdbd_conn;
197 if (sys_notify_watch == NULL) {
198 sys_notify_watch = sys_notify_watch_dummy;
201 state->sys_notify_watch = sys_notify_watch;
202 state->sys_notify_ctx = sys_notify_ctx;
204 state->entries = db_open_rbt(state);
205 if (tevent_req_nomem(state->entries, req)) {
206 return tevent_req_post(req, ev);
209 status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_REC_CHANGE,
210 notifyd_rec_change);
211 if (tevent_req_nterror(req, status)) {
212 return tevent_req_post(req, ev);
215 status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_TRIGGER,
216 notifyd_trigger);
217 if (tevent_req_nterror(req, status)) {
218 goto deregister_rec_change;
221 status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_GET_DB,
222 notifyd_get_db);
223 if (tevent_req_nterror(req, status)) {
224 goto deregister_trigger;
227 names_db = messaging_names_db(msg_ctx);
229 ret = server_id_db_set_exclusive(names_db, "notify-daemon");
230 if (ret != 0) {
231 DBG_DEBUG("server_id_db_set_exclusive() failed: %s\n",
232 strerror(ret));
233 tevent_req_error(req, ret);
234 goto deregister_get_db;
237 if (ctdbd_conn == NULL) {
239 * No cluster around, skip the database replication
240 * engine
242 return req;
245 #ifdef CLUSTER_SUPPORT
246 status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_DB,
247 notifyd_got_db);
248 if (tevent_req_nterror(req, status)) {
249 goto deregister_get_db;
252 state->log = talloc_zero(state, struct messaging_reclog);
253 if (tevent_req_nomem(state->log, req)) {
254 goto deregister_db;
257 subreq = notifyd_broadcast_reclog_send(
258 state->log, ev, ctdbd_conn,
259 messaging_server_id(msg_ctx),
260 state->log);
261 if (tevent_req_nomem(subreq, req)) {
262 goto deregister_db;
264 tevent_req_set_callback(subreq,
265 notifyd_broadcast_reclog_finished,
266 req);
268 subreq = notifyd_clean_peers_send(state, ev, state);
269 if (tevent_req_nomem(subreq, req)) {
270 goto deregister_db;
272 tevent_req_set_callback(subreq, notifyd_clean_peers_finished,
273 req);
275 ret = register_with_ctdbd(ctdbd_conn,
276 CTDB_SRVID_SAMBA_NOTIFY_PROXY,
277 notifyd_snoop_broadcast, state);
278 if (ret != 0) {
279 tevent_req_error(req, ret);
280 goto deregister_db;
282 #endif
284 return req;
286 #ifdef CLUSTER_SUPPORT
287 deregister_db:
288 messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_DB, state);
289 #endif
290 deregister_get_db:
291 messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_GET_DB, state);
292 deregister_trigger:
293 messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_TRIGGER, state);
294 deregister_rec_change:
295 messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_REC_CHANGE, state);
296 return tevent_req_post(req, ev);
299 #ifdef CLUSTER_SUPPORT
301 static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq)
303 struct tevent_req *req = tevent_req_callback_data(
304 subreq, struct tevent_req);
305 int ret;
307 ret = notifyd_broadcast_reclog_recv(subreq);
308 TALLOC_FREE(subreq);
309 tevent_req_error(req, ret);
312 static void notifyd_clean_peers_finished(struct tevent_req *subreq)
314 struct tevent_req *req = tevent_req_callback_data(
315 subreq, struct tevent_req);
316 int ret;
318 ret = notifyd_clean_peers_recv(subreq);
319 TALLOC_FREE(subreq);
320 tevent_req_error(req, ret);
323 #endif
325 int notifyd_recv(struct tevent_req *req)
327 return tevent_req_simple_recv_unix(req);
330 static bool notifyd_apply_rec_change(
331 const struct server_id *client,
332 const char *path, size_t pathlen,
333 const struct notify_instance *chg,
334 struct db_context *entries,
335 sys_notify_watch_fn sys_notify_watch,
336 struct sys_notify_context *sys_notify_ctx,
337 struct messaging_context *msg_ctx)
339 struct db_record *rec = NULL;
340 struct notifyd_watcher watcher = {};
341 struct notifyd_instance *instances = NULL;
342 size_t num_instances;
343 size_t i;
344 struct notifyd_instance *instance = NULL;
345 TDB_DATA value;
346 NTSTATUS status;
347 bool ok = false;
348 bool new_watcher = false;
350 if (pathlen == 0) {
351 DBG_WARNING("pathlen==0\n");
352 return false;
354 if (path[pathlen-1] != '\0') {
355 DBG_WARNING("path not 0-terminated\n");
356 return false;
359 DBG_DEBUG("path=%s, filter=%"PRIu32", subdir_filter=%"PRIu32", "
360 "private_data=%p\n",
361 path,
362 chg->filter,
363 chg->subdir_filter,
364 chg->private_data);
366 rec = dbwrap_fetch_locked(
367 entries, entries,
368 make_tdb_data((const uint8_t *)path, pathlen-1));
370 if (rec == NULL) {
371 DBG_WARNING("dbwrap_fetch_locked failed\n");
372 goto fail;
375 num_instances = 0;
376 value = dbwrap_record_get_value(rec);
378 if (value.dsize != 0) {
379 ok = notifyd_parse_entry(value.dptr,
380 value.dsize,
381 &watcher,
382 NULL,
383 &num_instances);
384 if (!ok) {
385 goto fail;
390 * Overallocate by one instance to avoid a realloc when adding
392 instances = talloc_array(rec, struct notifyd_instance,
393 num_instances + 1);
394 if (instances == NULL) {
395 DBG_WARNING("talloc failed\n");
396 goto fail;
399 if (num_instances > 0) {
400 struct notifyd_instance *tmp = NULL;
401 size_t num_tmp = 0;
403 ok = notifyd_parse_entry(value.dptr,
404 value.dsize,
405 NULL,
406 &tmp,
407 &num_tmp);
408 if (!ok) {
409 goto fail;
412 memcpy(instances,
413 tmp,
414 sizeof(struct notifyd_instance) * num_tmp);
417 for (i=0; i<num_instances; i++) {
418 instance = &instances[i];
420 if (server_id_equal(&instance->client, client) &&
421 (instance->instance.private_data == chg->private_data)) {
422 break;
426 if (i < num_instances) {
427 instance->instance = *chg;
428 } else {
430 * We've overallocated for one instance
432 instance = &instances[num_instances];
434 *instance = (struct notifyd_instance) {
435 .client = *client,
436 .instance = *chg,
439 num_instances += 1;
443 * Calculate an intersection of the instances filters for the watcher.
445 if (instance->instance.filter > 0) {
446 uint32_t filter = instance->instance.filter;
448 if ((watcher.filter & filter) != filter) {
449 watcher.filter |= filter;
451 new_watcher = true;
456 * Calculate an intersection of the instances subdir_filters for the
457 * watcher.
459 if (instance->instance.subdir_filter > 0) {
460 uint32_t subdir_filter = instance->instance.subdir_filter;
462 if ((watcher.subdir_filter & subdir_filter) != subdir_filter) {
463 watcher.subdir_filter |= subdir_filter;
465 new_watcher = true;
469 if ((instance->instance.filter == 0) &&
470 (instance->instance.subdir_filter == 0)) {
471 uint32_t tmp_filter = 0;
472 uint32_t tmp_subdir_filter = 0;
474 /* This is a delete request */
475 *instance = instances[num_instances-1];
476 num_instances -= 1;
478 for (i = 0; i < num_instances; i++) {
479 struct notifyd_instance *tmp = &instances[i];
481 tmp_filter |= tmp->instance.filter;
482 tmp_subdir_filter |= tmp->instance.subdir_filter;
486 * If the filter has changed, register a new watcher with the
487 * changed filter.
489 if (watcher.filter != tmp_filter ||
490 watcher.subdir_filter != tmp_subdir_filter)
492 watcher.filter = tmp_filter;
493 watcher.subdir_filter = tmp_subdir_filter;
495 new_watcher = true;
499 if (new_watcher) {
501 * In case we removed all notify instances, we want to remove
502 * the watcher. We won't register a new one, if no filters are
503 * set anymore.
506 TALLOC_FREE(watcher.sys_watch);
508 watcher.sys_filter = watcher.filter;
509 watcher.sys_subdir_filter = watcher.subdir_filter;
512 * Only register a watcher if we have filter.
514 if (watcher.filter != 0 || watcher.subdir_filter != 0) {
515 int ret = sys_notify_watch(entries,
516 sys_notify_ctx,
517 path,
518 &watcher.sys_filter,
519 &watcher.sys_subdir_filter,
520 notifyd_sys_callback,
521 msg_ctx,
522 &watcher.sys_watch);
523 if (ret != 0) {
524 DBG_WARNING("sys_notify_watch for [%s] "
525 "returned %s\n",
526 path,
527 strerror(errno));
532 DBG_DEBUG("%s has %zu instances\n", path, num_instances);
534 if (num_instances == 0) {
535 TALLOC_FREE(watcher.sys_watch);
537 status = dbwrap_record_delete(rec);
538 if (!NT_STATUS_IS_OK(status)) {
539 DBG_WARNING("dbwrap_record_delete returned %s\n",
540 nt_errstr(status));
541 goto fail;
543 } else {
544 struct TDB_DATA iov[2] = {
546 .dptr = (uint8_t *)&watcher,
547 .dsize = sizeof(struct notifyd_watcher),
550 .dptr = (uint8_t *)instances,
551 .dsize = sizeof(struct notifyd_instance) *
552 num_instances,
556 status = dbwrap_record_storev(rec, iov, ARRAY_SIZE(iov), 0);
557 if (!NT_STATUS_IS_OK(status)) {
558 DBG_WARNING("dbwrap_record_storev returned %s\n",
559 nt_errstr(status));
560 goto fail;
564 ok = true;
565 fail:
566 TALLOC_FREE(rec);
567 return ok;
570 static void notifyd_sys_callback(struct sys_notify_context *ctx,
571 void *private_data, struct notify_event *ev,
572 uint32_t filter)
574 struct messaging_context *msg_ctx = talloc_get_type_abort(
575 private_data, struct messaging_context);
576 struct notify_trigger_msg msg;
577 struct iovec iov[4];
578 char slash = '/';
580 msg = (struct notify_trigger_msg) {
581 .when = timespec_current(),
582 .action = ev->action,
583 .filter = filter,
586 iov[0].iov_base = &msg;
587 iov[0].iov_len = offsetof(struct notify_trigger_msg, path);
588 iov[1].iov_base = discard_const_p(char, ev->dir);
589 iov[1].iov_len = strlen(ev->dir);
590 iov[2].iov_base = &slash;
591 iov[2].iov_len = 1;
592 iov[3].iov_base = discard_const_p(char, ev->path);
593 iov[3].iov_len = strlen(ev->path)+1;
595 messaging_send_iov(
596 msg_ctx, messaging_server_id(msg_ctx),
597 MSG_SMB_NOTIFY_TRIGGER, iov, ARRAY_SIZE(iov), NULL, 0);
600 static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize,
601 struct notify_rec_change_msg **pmsg,
602 size_t *pathlen)
604 struct notify_rec_change_msg *msg;
606 if (bufsize < offsetof(struct notify_rec_change_msg, path) + 1) {
607 DBG_WARNING("message too short, ignoring: %zu\n", bufsize);
608 return false;
611 *pmsg = msg = (struct notify_rec_change_msg *)buf;
612 *pathlen = bufsize - offsetof(struct notify_rec_change_msg, path);
614 DBG_DEBUG("Got rec_change_msg filter=%"PRIu32", "
615 "subdir_filter=%"PRIu32", private_data=%p, path=%.*s\n",
616 msg->instance.filter,
617 msg->instance.subdir_filter,
618 msg->instance.private_data,
619 (int)(*pathlen),
620 msg->path);
622 return true;
625 static void notifyd_rec_change(struct messaging_context *msg_ctx,
626 void *private_data, uint32_t msg_type,
627 struct server_id src, DATA_BLOB *data)
629 struct notifyd_state *state = talloc_get_type_abort(
630 private_data, struct notifyd_state);
631 struct server_id_buf idbuf;
632 struct notify_rec_change_msg *msg;
633 size_t pathlen;
634 bool ok;
635 struct notify_instance instance;
637 DBG_DEBUG("Got %zu bytes from %s\n", data->length,
638 server_id_str_buf(src, &idbuf));
640 ok = notifyd_parse_rec_change(data->data, data->length,
641 &msg, &pathlen);
642 if (!ok) {
643 return;
646 memcpy(&instance, &msg->instance, sizeof(instance)); /* avoid SIGBUS */
648 ok = notifyd_apply_rec_change(
649 &src, msg->path, pathlen, &instance,
650 state->entries, state->sys_notify_watch, state->sys_notify_ctx,
651 state->msg_ctx);
652 if (!ok) {
653 DBG_DEBUG("notifyd_apply_rec_change failed, ignoring\n");
654 return;
657 if ((state->log == NULL) || (state->ctdbd_conn == NULL)) {
658 return;
661 #ifdef CLUSTER_SUPPORT
664 struct messaging_rec **tmp;
665 struct messaging_reclog *log;
666 struct iovec iov = { .iov_base = data->data, .iov_len = data->length };
668 log = state->log;
670 tmp = talloc_realloc(log, log->recs, struct messaging_rec *,
671 log->num_recs+1);
672 if (tmp == NULL) {
673 DBG_WARNING("talloc_realloc failed, ignoring\n");
674 return;
676 log->recs = tmp;
678 log->recs[log->num_recs] = messaging_rec_create(
679 log->recs, src, messaging_server_id(msg_ctx),
680 msg_type, &iov, 1, NULL, 0);
682 if (log->recs[log->num_recs] == NULL) {
683 DBG_WARNING("messaging_rec_create failed, ignoring\n");
684 return;
687 log->num_recs += 1;
689 if (log->num_recs >= 100) {
691 * Don't let the log grow too large
693 notifyd_broadcast_reclog(state->ctdbd_conn,
694 messaging_server_id(msg_ctx), log);
698 #endif
701 struct notifyd_trigger_state {
702 struct messaging_context *msg_ctx;
703 struct notify_trigger_msg *msg;
704 bool recursive;
705 bool covered_by_sys_notify;
708 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
709 void *private_data);
711 static void notifyd_trigger(struct messaging_context *msg_ctx,
712 void *private_data, uint32_t msg_type,
713 struct server_id src, DATA_BLOB *data)
715 struct notifyd_state *state = talloc_get_type_abort(
716 private_data, struct notifyd_state);
717 struct server_id my_id = messaging_server_id(msg_ctx);
718 struct notifyd_trigger_state tstate;
719 const char *path;
720 const char *p, *next_p;
722 if (data->length < offsetof(struct notify_trigger_msg, path) + 1) {
723 DBG_WARNING("message too short, ignoring: %zu\n",
724 data->length);
725 return;
727 if (data->data[data->length-1] != 0) {
728 DBG_WARNING("path not 0-terminated, ignoring\n");;
729 return;
732 tstate.msg_ctx = msg_ctx;
734 tstate.covered_by_sys_notify = (src.vnn == my_id.vnn);
735 tstate.covered_by_sys_notify &= !server_id_equal(&src, &my_id);
737 tstate.msg = (struct notify_trigger_msg *)data->data;
738 path = tstate.msg->path;
740 DBG_DEBUG("Got trigger_msg action=%"PRIu32", filter=%"PRIu32", "
741 "path=%s\n",
742 tstate.msg->action,
743 tstate.msg->filter,
744 path);
746 if (path[0] != '/') {
747 DBG_WARNING("path %s does not start with /, ignoring\n",
748 path);
749 return;
752 for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
753 ptrdiff_t path_len = p - path;
754 TDB_DATA key;
755 uint32_t i;
757 next_p = strchr(p+1, '/');
758 tstate.recursive = (next_p != NULL);
760 DBG_DEBUG("Trying path %.*s\n", (int)path_len, path);
762 key = (TDB_DATA) { .dptr = discard_const_p(uint8_t, path),
763 .dsize = path_len };
765 dbwrap_parse_record(state->entries, key,
766 notifyd_trigger_parser, &tstate);
768 if (state->peers == NULL) {
769 continue;
772 if (src.vnn != my_id.vnn) {
773 continue;
776 for (i=0; i<state->num_peers; i++) {
777 if (state->peers[i]->db == NULL) {
779 * Inactive peer, did not get a db yet
781 continue;
783 dbwrap_parse_record(state->peers[i]->db, key,
784 notifyd_trigger_parser, &tstate);
789 static void notifyd_send_delete(struct messaging_context *msg_ctx,
790 TDB_DATA key,
791 struct notifyd_instance *instance);
793 static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
794 void *private_data)
797 struct notifyd_trigger_state *tstate = private_data;
798 struct notify_event_msg msg = { .action = tstate->msg->action,
799 .when = tstate->msg->when };
800 struct iovec iov[2];
801 size_t path_len = key.dsize;
802 struct notifyd_watcher watcher = {};
803 struct notifyd_instance *instances = NULL;
804 size_t num_instances = 0;
805 size_t i;
806 bool ok;
808 ok = notifyd_parse_entry(data.dptr,
809 data.dsize,
810 &watcher,
811 &instances,
812 &num_instances);
813 if (!ok) {
814 DBG_DEBUG("Could not parse notifyd_entry\n");
815 return;
818 DBG_DEBUG("Found %zu instances for %.*s\n",
819 num_instances,
820 (int)key.dsize,
821 (char *)key.dptr);
823 iov[0].iov_base = &msg;
824 iov[0].iov_len = offsetof(struct notify_event_msg, path);
825 iov[1].iov_base = tstate->msg->path + path_len + 1;
826 iov[1].iov_len = strlen((char *)(iov[1].iov_base)) + 1;
828 for (i=0; i<num_instances; i++) {
829 struct notifyd_instance *instance = &instances[i];
830 struct server_id_buf idbuf;
831 uint32_t i_filter;
832 NTSTATUS status;
834 if (tstate->covered_by_sys_notify) {
835 if (tstate->recursive) {
836 i_filter = watcher.sys_subdir_filter &
837 instance->instance.subdir_filter;
838 } else {
839 i_filter = watcher.sys_filter &
840 instance->instance.filter;
842 } else {
843 if (tstate->recursive) {
844 i_filter = instance->instance.subdir_filter;
845 } else {
846 i_filter = instance->instance.filter;
850 if ((i_filter & tstate->msg->filter) == 0) {
851 continue;
854 msg.private_data = instance->instance.private_data;
856 status = messaging_send_iov(
857 tstate->msg_ctx, instance->client,
858 MSG_PVFS_NOTIFY, iov, ARRAY_SIZE(iov), NULL, 0);
860 DBG_DEBUG("messaging_send_iov to %s returned %s\n",
861 server_id_str_buf(instance->client, &idbuf),
862 nt_errstr(status));
864 if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) &&
865 procid_is_local(&instance->client)) {
867 * That process has died
869 notifyd_send_delete(tstate->msg_ctx, key, instance);
870 continue;
873 if (!NT_STATUS_IS_OK(status)) {
874 DBG_WARNING("messaging_send_iov returned %s\n",
875 nt_errstr(status));
881 * Send a delete request to ourselves to properly discard a notify
882 * record for an smbd that has died.
885 static void notifyd_send_delete(struct messaging_context *msg_ctx,
886 TDB_DATA key,
887 struct notifyd_instance *instance)
889 struct notify_rec_change_msg msg = {
890 .instance.private_data = instance->instance.private_data
892 uint8_t nul = 0;
893 struct iovec iov[3];
894 NTSTATUS status;
897 * Send a rec_change to ourselves to delete a dead entry
900 iov[0] = (struct iovec) {
901 .iov_base = &msg,
902 .iov_len = offsetof(struct notify_rec_change_msg, path) };
903 iov[1] = (struct iovec) { .iov_base = key.dptr, .iov_len = key.dsize };
904 iov[2] = (struct iovec) { .iov_base = &nul, .iov_len = sizeof(nul) };
906 status = messaging_send_iov(msg_ctx,
907 instance->client,
908 MSG_SMB_NOTIFY_REC_CHANGE,
909 iov,
910 ARRAY_SIZE(iov),
911 NULL,
914 if (!NT_STATUS_IS_OK(status)) {
915 DBG_WARNING("messaging_send_iov failed: %s\n",
916 nt_errstr(status));
920 static void notifyd_get_db(struct messaging_context *msg_ctx,
921 void *private_data, uint32_t msg_type,
922 struct server_id src, DATA_BLOB *data)
924 struct notifyd_state *state = talloc_get_type_abort(
925 private_data, struct notifyd_state);
926 struct server_id_buf id1, id2;
927 NTSTATUS status;
928 uint64_t rec_index = UINT64_MAX;
929 uint8_t index_buf[sizeof(uint64_t)];
930 size_t dbsize;
931 uint8_t *buf;
932 struct iovec iov[2];
934 dbsize = dbwrap_marshall(state->entries, NULL, 0);
936 buf = talloc_array(talloc_tos(), uint8_t, dbsize);
937 if (buf == NULL) {
938 DBG_WARNING("talloc_array(%zu) failed\n", dbsize);
939 return;
942 dbsize = dbwrap_marshall(state->entries, buf, dbsize);
944 if (dbsize != talloc_get_size(buf)) {
945 DBG_DEBUG("dbsize changed: %zu->%zu\n",
946 talloc_get_size(buf),
947 dbsize);
948 TALLOC_FREE(buf);
949 return;
952 if (state->log != NULL) {
953 rec_index = state->log->rec_index;
955 SBVAL(index_buf, 0, rec_index);
957 iov[0] = (struct iovec) { .iov_base = index_buf,
958 .iov_len = sizeof(index_buf) };
959 iov[1] = (struct iovec) { .iov_base = buf,
960 .iov_len = dbsize };
962 DBG_DEBUG("Sending %zu bytes to %s->%s\n",
963 iov_buflen(iov, ARRAY_SIZE(iov)),
964 server_id_str_buf(messaging_server_id(msg_ctx), &id1),
965 server_id_str_buf(src, &id2));
967 status = messaging_send_iov(msg_ctx, src, MSG_SMB_NOTIFY_DB,
968 iov, ARRAY_SIZE(iov), NULL, 0);
969 TALLOC_FREE(buf);
970 if (!NT_STATUS_IS_OK(status)) {
971 DBG_WARNING("messaging_send_iov failed: %s\n",
972 nt_errstr(status));
976 #ifdef CLUSTER_SUPPORT
978 static int notifyd_add_proxy_syswatches(struct db_record *rec,
979 void *private_data);
981 static void notifyd_got_db(struct messaging_context *msg_ctx,
982 void *private_data, uint32_t msg_type,
983 struct server_id src, DATA_BLOB *data)
985 struct notifyd_state *state = talloc_get_type_abort(
986 private_data, struct notifyd_state);
987 struct notifyd_peer *p = NULL;
988 struct server_id_buf idbuf;
989 NTSTATUS status;
990 int count;
991 size_t i;
993 for (i=0; i<state->num_peers; i++) {
994 if (server_id_equal(&src, &state->peers[i]->pid)) {
995 p = state->peers[i];
996 break;
1000 if (p == NULL) {
1001 DBG_DEBUG("Did not find peer for db from %s\n",
1002 server_id_str_buf(src, &idbuf));
1003 return;
1006 if (data->length < 8) {
1007 DBG_DEBUG("Got short db length %zu from %s\n", data->length,
1008 server_id_str_buf(src, &idbuf));
1009 TALLOC_FREE(p);
1010 return;
1013 p->rec_index = BVAL(data->data, 0);
1015 p->db = db_open_rbt(p);
1016 if (p->db == NULL) {
1017 DBG_DEBUG("db_open_rbt failed\n");
1018 TALLOC_FREE(p);
1019 return;
1022 status = dbwrap_unmarshall(p->db, data->data + 8,
1023 data->length - 8);
1024 if (!NT_STATUS_IS_OK(status)) {
1025 DBG_DEBUG("dbwrap_unmarshall returned %s for db %s\n",
1026 nt_errstr(status),
1027 server_id_str_buf(src, &idbuf));
1028 TALLOC_FREE(p);
1029 return;
1032 dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state,
1033 &count);
1035 DBG_DEBUG("Database from %s contained %d records\n",
1036 server_id_str_buf(src, &idbuf),
1037 count);
1040 static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
1041 struct server_id src,
1042 struct messaging_reclog *log)
1044 enum ndr_err_code ndr_err;
1045 uint8_t msghdr[MESSAGE_HDR_LENGTH];
1046 DATA_BLOB blob;
1047 struct iovec iov[2];
1048 int ret;
1050 if (log == NULL) {
1051 return;
1054 DBG_DEBUG("rec_index=%"PRIu64", num_recs=%"PRIu32"\n",
1055 log->rec_index,
1056 log->num_recs);
1058 message_hdr_put(msghdr, MSG_SMB_NOTIFY_REC_CHANGES, src,
1059 (struct server_id) {0 });
1060 iov[0] = (struct iovec) { .iov_base = msghdr,
1061 .iov_len = sizeof(msghdr) };
1063 ndr_err = ndr_push_struct_blob(
1064 &blob, log, log,
1065 (ndr_push_flags_fn_t)ndr_push_messaging_reclog);
1066 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1067 DBG_WARNING("ndr_push_messaging_recs failed: %s\n",
1068 ndr_errstr(ndr_err));
1069 goto done;
1071 iov[1] = (struct iovec) { .iov_base = blob.data,
1072 .iov_len = blob.length };
1074 ret = ctdbd_messaging_send_iov(
1075 ctdbd_conn, CTDB_BROADCAST_CONNECTED,
1076 CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov));
1077 TALLOC_FREE(blob.data);
1078 if (ret != 0) {
1079 DBG_WARNING("ctdbd_messaging_send failed: %s\n",
1080 strerror(ret));
1081 goto done;
1084 log->rec_index += 1;
1086 done:
1087 log->num_recs = 0;
1088 TALLOC_FREE(log->recs);
1091 struct notifyd_broadcast_reclog_state {
1092 struct tevent_context *ev;
1093 struct ctdbd_connection *ctdbd_conn;
1094 struct server_id src;
1095 struct messaging_reclog *log;
1098 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq);
1100 static struct tevent_req *notifyd_broadcast_reclog_send(
1101 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1102 struct ctdbd_connection *ctdbd_conn, struct server_id src,
1103 struct messaging_reclog *log)
1105 struct tevent_req *req, *subreq;
1106 struct notifyd_broadcast_reclog_state *state;
1108 req = tevent_req_create(mem_ctx, &state,
1109 struct notifyd_broadcast_reclog_state);
1110 if (req == NULL) {
1111 return NULL;
1113 state->ev = ev;
1114 state->ctdbd_conn = ctdbd_conn;
1115 state->src = src;
1116 state->log = log;
1118 subreq = tevent_wakeup_send(state, state->ev,
1119 timeval_current_ofs_msec(1000));
1120 if (tevent_req_nomem(subreq, req)) {
1121 return tevent_req_post(req, ev);
1123 tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1124 return req;
1127 static void notifyd_broadcast_reclog_next(struct tevent_req *subreq)
1129 struct tevent_req *req = tevent_req_callback_data(
1130 subreq, struct tevent_req);
1131 struct notifyd_broadcast_reclog_state *state = tevent_req_data(
1132 req, struct notifyd_broadcast_reclog_state);
1133 bool ok;
1135 ok = tevent_wakeup_recv(subreq);
1136 TALLOC_FREE(subreq);
1137 if (!ok) {
1138 tevent_req_oom(req);
1139 return;
1142 notifyd_broadcast_reclog(state->ctdbd_conn, state->src, state->log);
1144 subreq = tevent_wakeup_send(state, state->ev,
1145 timeval_current_ofs_msec(1000));
1146 if (tevent_req_nomem(subreq, req)) {
1147 return;
1149 tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1152 static int notifyd_broadcast_reclog_recv(struct tevent_req *req)
1154 return tevent_req_simple_recv_unix(req);
1157 struct notifyd_clean_peers_state {
1158 struct tevent_context *ev;
1159 struct notifyd_state *notifyd;
1162 static void notifyd_clean_peers_next(struct tevent_req *subreq);
1164 static struct tevent_req *notifyd_clean_peers_send(
1165 TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1166 struct notifyd_state *notifyd)
1168 struct tevent_req *req, *subreq;
1169 struct notifyd_clean_peers_state *state;
1171 req = tevent_req_create(mem_ctx, &state,
1172 struct notifyd_clean_peers_state);
1173 if (req == NULL) {
1174 return NULL;
1176 state->ev = ev;
1177 state->notifyd = notifyd;
1179 subreq = tevent_wakeup_send(state, state->ev,
1180 timeval_current_ofs_msec(30000));
1181 if (tevent_req_nomem(subreq, req)) {
1182 return tevent_req_post(req, ev);
1184 tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1185 return req;
1188 static void notifyd_clean_peers_next(struct tevent_req *subreq)
1190 struct tevent_req *req = tevent_req_callback_data(
1191 subreq, struct tevent_req);
1192 struct notifyd_clean_peers_state *state = tevent_req_data(
1193 req, struct notifyd_clean_peers_state);
1194 struct notifyd_state *notifyd = state->notifyd;
1195 size_t i;
1196 bool ok;
1197 time_t now = time(NULL);
1199 ok = tevent_wakeup_recv(subreq);
1200 TALLOC_FREE(subreq);
1201 if (!ok) {
1202 tevent_req_oom(req);
1203 return;
1206 i = 0;
1207 while (i < notifyd->num_peers) {
1208 struct notifyd_peer *p = notifyd->peers[i];
1210 if ((now - p->last_broadcast) > 60) {
1211 struct server_id_buf idbuf;
1214 * Haven't heard for more than 60 seconds. Call this
1215 * peer dead
1218 DBG_DEBUG("peer %s died\n",
1219 server_id_str_buf(p->pid, &idbuf));
1221 * This implicitly decrements notifyd->num_peers
1223 TALLOC_FREE(p);
1224 } else {
1225 i += 1;
1229 subreq = tevent_wakeup_send(state, state->ev,
1230 timeval_current_ofs_msec(30000));
1231 if (tevent_req_nomem(subreq, req)) {
1232 return;
1234 tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1237 static int notifyd_clean_peers_recv(struct tevent_req *req)
1239 return tevent_req_simple_recv_unix(req);
1242 static int notifyd_add_proxy_syswatches(struct db_record *rec,
1243 void *private_data)
1245 struct notifyd_state *state = talloc_get_type_abort(
1246 private_data, struct notifyd_state);
1247 struct db_context *db = dbwrap_record_get_db(rec);
1248 TDB_DATA key = dbwrap_record_get_key(rec);
1249 TDB_DATA value = dbwrap_record_get_value(rec);
1250 struct notifyd_watcher watcher = {};
1251 char path[key.dsize+1];
1252 bool ok;
1253 int ret;
1255 memcpy(path, key.dptr, key.dsize);
1256 path[key.dsize] = '\0';
1258 /* This is a remote database, we just need the watcher. */
1259 ok = notifyd_parse_entry(value.dptr, value.dsize, &watcher, NULL, NULL);
1260 if (!ok) {
1261 DBG_WARNING("Could not parse notifyd entry for %s\n", path);
1262 return 0;
1265 watcher.sys_watch = NULL;
1266 watcher.sys_filter = watcher.filter;
1267 watcher.sys_subdir_filter = watcher.subdir_filter;
1269 ret = state->sys_notify_watch(db,
1270 state->sys_notify_ctx,
1271 path,
1272 &watcher.filter,
1273 &watcher.subdir_filter,
1274 notifyd_sys_callback,
1275 state->msg_ctx,
1276 &watcher.sys_watch);
1277 if (ret != 0) {
1278 DBG_WARNING("inotify_watch returned %s\n", strerror(errno));
1281 memcpy(value.dptr, &watcher, sizeof(struct notifyd_watcher));
1283 return 0;
1286 static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data)
1288 TDB_DATA key = dbwrap_record_get_key(rec);
1289 TDB_DATA value = dbwrap_record_get_value(rec);
1290 struct notifyd_watcher watcher = {};
1291 bool ok;
1293 ok = notifyd_parse_entry(value.dptr, value.dsize, &watcher, NULL, NULL);
1294 if (!ok) {
1295 DBG_WARNING("Could not parse notifyd entry for %.*s\n",
1296 (int)key.dsize, (char *)key.dptr);
1297 return 0;
1299 TALLOC_FREE(watcher.sys_watch);
1301 return 0;
1304 static int notifyd_peer_destructor(struct notifyd_peer *p)
1306 struct notifyd_state *state = p->state;
1307 size_t i;
1309 if (p->db != NULL) {
1310 dbwrap_traverse_read(p->db, notifyd_db_del_syswatches,
1311 NULL, NULL);
1314 for (i = 0; i<state->num_peers; i++) {
1315 if (p == state->peers[i]) {
1316 state->peers[i] = state->peers[state->num_peers-1];
1317 state->num_peers -= 1;
1318 break;
1321 return 0;
1324 static struct notifyd_peer *notifyd_peer_new(
1325 struct notifyd_state *state, struct server_id pid)
1327 struct notifyd_peer *p, **tmp;
1329 tmp = talloc_realloc(state, state->peers, struct notifyd_peer *,
1330 state->num_peers+1);
1331 if (tmp == NULL) {
1332 return NULL;
1334 state->peers = tmp;
1336 p = talloc_zero(state->peers, struct notifyd_peer);
1337 if (p == NULL) {
1338 return NULL;
1340 p->state = state;
1341 p->pid = pid;
1343 state->peers[state->num_peers] = p;
1344 state->num_peers += 1;
1346 talloc_set_destructor(p, notifyd_peer_destructor);
1348 return p;
1351 static void notifyd_apply_reclog(struct notifyd_peer *peer,
1352 const uint8_t *msg, size_t msglen)
1354 struct notifyd_state *state = peer->state;
1355 DATA_BLOB blob = { .data = discard_const_p(uint8_t, msg),
1356 .length = msglen };
1357 struct server_id_buf idbuf;
1358 struct messaging_reclog *log;
1359 enum ndr_err_code ndr_err;
1360 uint32_t i;
1362 if (peer->db == NULL) {
1364 * No db yet
1366 return;
1369 log = talloc(peer, struct messaging_reclog);
1370 if (log == NULL) {
1371 DBG_DEBUG("talloc failed\n");
1372 return;
1375 ndr_err = ndr_pull_struct_blob_all(
1376 &blob, log, log,
1377 (ndr_pull_flags_fn_t)ndr_pull_messaging_reclog);
1378 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1379 DBG_DEBUG("ndr_pull_messaging_reclog failed: %s\n",
1380 ndr_errstr(ndr_err));
1381 goto fail;
1384 DBG_DEBUG("Got %"PRIu32" recs index %"PRIu64" from %s\n",
1385 log->num_recs,
1386 log->rec_index,
1387 server_id_str_buf(peer->pid, &idbuf));
1389 if (log->rec_index != peer->rec_index) {
1390 DBG_INFO("Got rec index %"PRIu64" from %s, "
1391 "expected %"PRIu64"\n",
1392 log->rec_index,
1393 server_id_str_buf(peer->pid, &idbuf),
1394 peer->rec_index);
1395 goto fail;
1398 for (i=0; i<log->num_recs; i++) {
1399 struct messaging_rec *r = log->recs[i];
1400 struct notify_rec_change_msg *chg;
1401 size_t pathlen;
1402 bool ok;
1403 struct notify_instance instance;
1405 ok = notifyd_parse_rec_change(r->buf.data, r->buf.length,
1406 &chg, &pathlen);
1407 if (!ok) {
1408 DBG_INFO("notifyd_parse_rec_change failed\n");
1409 goto fail;
1412 /* avoid SIGBUS */
1413 memcpy(&instance, &chg->instance, sizeof(instance));
1415 ok = notifyd_apply_rec_change(&r->src, chg->path, pathlen,
1416 &instance, peer->db,
1417 state->sys_notify_watch,
1418 state->sys_notify_ctx,
1419 state->msg_ctx);
1420 if (!ok) {
1421 DBG_INFO("notifyd_apply_rec_change failed\n");
1422 goto fail;
1426 peer->rec_index += 1;
1427 peer->last_broadcast = time(NULL);
1429 TALLOC_FREE(log);
1430 return;
1432 fail:
1433 DBG_DEBUG("Dropping peer %s\n",
1434 server_id_str_buf(peer->pid, &idbuf));
1435 TALLOC_FREE(peer);
1439 * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE
1440 * messages) broadcasts by other notifyds. Several cases:
1442 * We don't know the source. This creates a new peer. Creating a peer
1443 * involves asking the peer for its full database. We assume ordered
1444 * messages, so the new database will arrive before the next broadcast
1445 * will.
1447 * We know the source and the log index matches. We will apply the log
1448 * locally to our peer's db as if we had received it from a local
1449 * client.
1451 * We know the source but the log index does not match. This means we
1452 * lost a message. We just drop the whole peer and wait for the next
1453 * broadcast, which will then trigger a fresh database pull.
1456 static int notifyd_snoop_broadcast(struct tevent_context *ev,
1457 uint32_t src_vnn, uint32_t dst_vnn,
1458 uint64_t dst_srvid,
1459 const uint8_t *msg, size_t msglen,
1460 void *private_data)
1462 struct notifyd_state *state = talloc_get_type_abort(
1463 private_data, struct notifyd_state);
1464 struct server_id my_id = messaging_server_id(state->msg_ctx);
1465 struct notifyd_peer *p;
1466 uint32_t i;
1467 uint32_t msg_type;
1468 struct server_id src, dst;
1469 struct server_id_buf idbuf;
1470 NTSTATUS status;
1472 if (msglen < MESSAGE_HDR_LENGTH) {
1473 DBG_DEBUG("Got short broadcast\n");
1474 return 0;
1476 message_hdr_get(&msg_type, &src, &dst, msg);
1478 if (msg_type != MSG_SMB_NOTIFY_REC_CHANGES) {
1479 DBG_DEBUG("Got message %"PRIu32", ignoring\n", msg_type);
1480 return 0;
1482 if (server_id_equal(&src, &my_id)) {
1483 DBG_DEBUG("Ignoring my own broadcast\n");
1484 return 0;
1487 DBG_DEBUG("Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n",
1488 server_id_str_buf(src, &idbuf));
1490 for (i=0; i<state->num_peers; i++) {
1491 if (server_id_equal(&state->peers[i]->pid, &src)) {
1493 DBG_DEBUG("Applying changes to peer %"PRIu32"\n", i);
1495 notifyd_apply_reclog(state->peers[i],
1496 msg + MESSAGE_HDR_LENGTH,
1497 msglen - MESSAGE_HDR_LENGTH);
1498 return 0;
1502 DBG_DEBUG("Creating new peer for %s\n",
1503 server_id_str_buf(src, &idbuf));
1505 p = notifyd_peer_new(state, src);
1506 if (p == NULL) {
1507 DBG_DEBUG("notifyd_peer_new failed\n");
1508 return 0;
1511 status = messaging_send_buf(state->msg_ctx, src, MSG_SMB_NOTIFY_GET_DB,
1512 NULL, 0);
1513 if (!NT_STATUS_IS_OK(status)) {
1514 DBG_DEBUG("messaging_send_buf failed: %s\n",
1515 nt_errstr(status));
1516 TALLOC_FREE(p);
1517 return 0;
1520 return 0;
1522 #endif