2 * Unix SMB/CIFS implementation.
4 * Copyright (C) Volker Lendecke 2014
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
22 #include "notifyd_private.h"
23 #include "lib/util/server_id.h"
24 #include "lib/util/data_blob.h"
25 #include "librpc/gen_ndr/notify.h"
26 #include "librpc/gen_ndr/messaging.h"
27 #include "librpc/gen_ndr/server_id.h"
28 #include "lib/dbwrap/dbwrap.h"
29 #include "lib/dbwrap/dbwrap_rbt.h"
34 #include "lib/util/server_id_db.h"
35 #include "lib/util/tevent_unix.h"
36 #include "lib/util/tevent_ntstatus.h"
37 #include "ctdbd_conn.h"
38 #include "ctdb_srvids.h"
39 #include "server_id_db_util.h"
40 #include "lib/util/iov_buf.h"
41 #include "messages_util.h"
43 #ifdef CLUSTER_SUPPORT
44 #include "ctdb_protocol.h"
50 * All of notifyd's state
53 struct notifyd_state
{
54 struct tevent_context
*ev
;
55 struct messaging_context
*msg_ctx
;
56 struct ctdbd_connection
*ctdbd_conn
;
59 * Database of everything clients show interest in. Indexed by
60 * absolute path. The database keys are not 0-terminated
61 * to allow the critical operation, notifyd_trigger, to walk
62 * the structure from the top without adding intermediate 0s.
63 * The database records contain an array of
65 * struct notifyd_instance
67 * to be maintained and parsed by notifyd_parse_entry()
69 struct db_context
*entries
;
72 * In the cluster case, this is the place where we store a log
73 * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1
74 * forward them to our peer notifyd's in the cluster once a
75 * second or when the log grows too large.
78 struct messaging_reclog
*log
;
81 * Array of companion notifyd's in a cluster. Every notifyd
82 * broadcasts its messaging_reclog to every other notifyd in
83 * the cluster. This is done by making ctdb send a message to
84 * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node
85 * number CTDB_BROADCAST_CONNECTED. Everybody in the cluster who
86 * had called register_with_ctdbd this srvid will receive the
89 * Database replication happens via these broadcasts. Also,
90 * they serve as liveness indication. If a notifyd receives a
91 * broadcast from an unknown peer, it will create one for this
92 * srvid. Also when we don't hear anything from a peer for a
93 * while, we will discard it.
96 struct notifyd_peer
**peers
;
99 sys_notify_watch_fn sys_notify_watch
;
100 struct sys_notify_context
*sys_notify_ctx
;
103 struct notifyd_peer
{
104 struct notifyd_state
*state
;
105 struct server_id pid
;
107 struct db_context
*db
;
108 time_t last_broadcast
;
111 static void notifyd_rec_change(struct messaging_context
*msg_ctx
,
112 void *private_data
, uint32_t msg_type
,
113 struct server_id src
, DATA_BLOB
*data
);
114 static void notifyd_trigger(struct messaging_context
*msg_ctx
,
115 void *private_data
, uint32_t msg_type
,
116 struct server_id src
, DATA_BLOB
*data
);
117 static void notifyd_get_db(struct messaging_context
*msg_ctx
,
118 void *private_data
, uint32_t msg_type
,
119 struct server_id src
, DATA_BLOB
*data
);
121 #ifdef CLUSTER_SUPPORT
122 static void notifyd_got_db(struct messaging_context
*msg_ctx
,
123 void *private_data
, uint32_t msg_type
,
124 struct server_id src
, DATA_BLOB
*data
);
125 static void notifyd_broadcast_reclog(struct ctdbd_connection
*ctdbd_conn
,
126 struct server_id src
,
127 struct messaging_reclog
*log
);
129 static void notifyd_sys_callback(struct sys_notify_context
*ctx
,
130 void *private_data
, struct notify_event
*ev
,
133 #ifdef CLUSTER_SUPPORT
134 static struct tevent_req
*notifyd_broadcast_reclog_send(
135 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
136 struct ctdbd_connection
*ctdbd_conn
, struct server_id src
,
137 struct messaging_reclog
*log
);
138 static int notifyd_broadcast_reclog_recv(struct tevent_req
*req
);
140 static struct tevent_req
*notifyd_clean_peers_send(
141 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
142 struct notifyd_state
*notifyd
);
143 static int notifyd_clean_peers_recv(struct tevent_req
*req
);
146 static int sys_notify_watch_dummy(
148 struct sys_notify_context
*ctx
,
151 uint32_t *subdir_filter
,
152 void (*callback
)(struct sys_notify_context
*ctx
,
154 struct notify_event
*ev
,
159 void **handle
= handle_p
;
164 #ifdef CLUSTER_SUPPORT
165 static void notifyd_broadcast_reclog_finished(struct tevent_req
*subreq
);
166 static void notifyd_clean_peers_finished(struct tevent_req
*subreq
);
167 static int notifyd_snoop_broadcast(struct tevent_context
*ev
,
168 uint32_t src_vnn
, uint32_t dst_vnn
,
170 const uint8_t *msg
, size_t msglen
,
174 struct tevent_req
*notifyd_send(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
175 struct messaging_context
*msg_ctx
,
176 struct ctdbd_connection
*ctdbd_conn
,
177 sys_notify_watch_fn sys_notify_watch
,
178 struct sys_notify_context
*sys_notify_ctx
)
180 struct tevent_req
*req
;
181 #ifdef CLUSTER_SUPPORT
182 struct tevent_req
*subreq
;
184 struct notifyd_state
*state
;
185 struct server_id_db
*names_db
;
189 req
= tevent_req_create(mem_ctx
, &state
, struct notifyd_state
);
194 state
->msg_ctx
= msg_ctx
;
195 state
->ctdbd_conn
= ctdbd_conn
;
197 if (sys_notify_watch
== NULL
) {
198 sys_notify_watch
= sys_notify_watch_dummy
;
201 state
->sys_notify_watch
= sys_notify_watch
;
202 state
->sys_notify_ctx
= sys_notify_ctx
;
204 state
->entries
= db_open_rbt(state
);
205 if (tevent_req_nomem(state
->entries
, req
)) {
206 return tevent_req_post(req
, ev
);
209 status
= messaging_register(msg_ctx
, state
, MSG_SMB_NOTIFY_REC_CHANGE
,
211 if (tevent_req_nterror(req
, status
)) {
212 return tevent_req_post(req
, ev
);
215 status
= messaging_register(msg_ctx
, state
, MSG_SMB_NOTIFY_TRIGGER
,
217 if (tevent_req_nterror(req
, status
)) {
218 goto deregister_rec_change
;
221 status
= messaging_register(msg_ctx
, state
, MSG_SMB_NOTIFY_GET_DB
,
223 if (tevent_req_nterror(req
, status
)) {
224 goto deregister_trigger
;
227 names_db
= messaging_names_db(msg_ctx
);
229 ret
= server_id_db_set_exclusive(names_db
, "notify-daemon");
231 DBG_DEBUG("server_id_db_set_exclusive() failed: %s\n",
233 tevent_req_error(req
, ret
);
234 goto deregister_get_db
;
237 if (ctdbd_conn
== NULL
) {
239 * No cluster around, skip the database replication
245 #ifdef CLUSTER_SUPPORT
246 status
= messaging_register(msg_ctx
, state
, MSG_SMB_NOTIFY_DB
,
248 if (tevent_req_nterror(req
, status
)) {
249 goto deregister_get_db
;
252 state
->log
= talloc_zero(state
, struct messaging_reclog
);
253 if (tevent_req_nomem(state
->log
, req
)) {
257 subreq
= notifyd_broadcast_reclog_send(
258 state
->log
, ev
, ctdbd_conn
,
259 messaging_server_id(msg_ctx
),
261 if (tevent_req_nomem(subreq
, req
)) {
264 tevent_req_set_callback(subreq
,
265 notifyd_broadcast_reclog_finished
,
268 subreq
= notifyd_clean_peers_send(state
, ev
, state
);
269 if (tevent_req_nomem(subreq
, req
)) {
272 tevent_req_set_callback(subreq
, notifyd_clean_peers_finished
,
275 ret
= register_with_ctdbd(ctdbd_conn
,
276 CTDB_SRVID_SAMBA_NOTIFY_PROXY
,
277 notifyd_snoop_broadcast
, state
);
279 tevent_req_error(req
, ret
);
286 #ifdef CLUSTER_SUPPORT
288 messaging_deregister(msg_ctx
, MSG_SMB_NOTIFY_DB
, state
);
291 messaging_deregister(msg_ctx
, MSG_SMB_NOTIFY_GET_DB
, state
);
293 messaging_deregister(msg_ctx
, MSG_SMB_NOTIFY_TRIGGER
, state
);
294 deregister_rec_change
:
295 messaging_deregister(msg_ctx
, MSG_SMB_NOTIFY_REC_CHANGE
, state
);
296 return tevent_req_post(req
, ev
);
299 #ifdef CLUSTER_SUPPORT
301 static void notifyd_broadcast_reclog_finished(struct tevent_req
*subreq
)
303 struct tevent_req
*req
= tevent_req_callback_data(
304 subreq
, struct tevent_req
);
307 ret
= notifyd_broadcast_reclog_recv(subreq
);
309 tevent_req_error(req
, ret
);
312 static void notifyd_clean_peers_finished(struct tevent_req
*subreq
)
314 struct tevent_req
*req
= tevent_req_callback_data(
315 subreq
, struct tevent_req
);
318 ret
= notifyd_clean_peers_recv(subreq
);
320 tevent_req_error(req
, ret
);
325 int notifyd_recv(struct tevent_req
*req
)
327 return tevent_req_simple_recv_unix(req
);
330 static bool notifyd_apply_rec_change(
331 const struct server_id
*client
,
332 const char *path
, size_t pathlen
,
333 const struct notify_instance
*chg
,
334 struct db_context
*entries
,
335 sys_notify_watch_fn sys_notify_watch
,
336 struct sys_notify_context
*sys_notify_ctx
,
337 struct messaging_context
*msg_ctx
)
339 struct db_record
*rec
= NULL
;
340 struct notifyd_watcher watcher
= {};
341 struct notifyd_instance
*instances
= NULL
;
342 size_t num_instances
;
344 struct notifyd_instance
*instance
= NULL
;
348 bool new_watcher
= false;
351 DBG_WARNING("pathlen==0\n");
354 if (path
[pathlen
-1] != '\0') {
355 DBG_WARNING("path not 0-terminated\n");
359 DBG_DEBUG("path=%s, filter=%"PRIu32
", subdir_filter=%"PRIu32
", "
366 rec
= dbwrap_fetch_locked(
368 make_tdb_data((const uint8_t *)path
, pathlen
-1));
371 DBG_WARNING("dbwrap_fetch_locked failed\n");
376 value
= dbwrap_record_get_value(rec
);
378 if (value
.dsize
!= 0) {
379 ok
= notifyd_parse_entry(value
.dptr
,
390 * Overallocate by one instance to avoid a realloc when adding
392 instances
= talloc_array(rec
, struct notifyd_instance
,
394 if (instances
== NULL
) {
395 DBG_WARNING("talloc failed\n");
399 if (num_instances
> 0) {
400 struct notifyd_instance
*tmp
= NULL
;
403 ok
= notifyd_parse_entry(value
.dptr
,
414 sizeof(struct notifyd_instance
) * num_tmp
);
417 for (i
=0; i
<num_instances
; i
++) {
418 instance
= &instances
[i
];
420 if (server_id_equal(&instance
->client
, client
) &&
421 (instance
->instance
.private_data
== chg
->private_data
)) {
426 if (i
< num_instances
) {
427 instance
->instance
= *chg
;
430 * We've overallocated for one instance
432 instance
= &instances
[num_instances
];
434 *instance
= (struct notifyd_instance
) {
443 * Calculate an intersection of the instances filters for the watcher.
445 if (instance
->instance
.filter
> 0) {
446 uint32_t filter
= instance
->instance
.filter
;
448 if ((watcher
.filter
& filter
) != filter
) {
449 watcher
.filter
|= filter
;
456 * Calculate an intersection of the instances subdir_filters for the
459 if (instance
->instance
.subdir_filter
> 0) {
460 uint32_t subdir_filter
= instance
->instance
.subdir_filter
;
462 if ((watcher
.subdir_filter
& subdir_filter
) != subdir_filter
) {
463 watcher
.subdir_filter
|= subdir_filter
;
469 if ((instance
->instance
.filter
== 0) &&
470 (instance
->instance
.subdir_filter
== 0)) {
471 uint32_t tmp_filter
= 0;
472 uint32_t tmp_subdir_filter
= 0;
474 /* This is a delete request */
475 *instance
= instances
[num_instances
-1];
478 for (i
= 0; i
< num_instances
; i
++) {
479 struct notifyd_instance
*tmp
= &instances
[i
];
481 tmp_filter
|= tmp
->instance
.filter
;
482 tmp_subdir_filter
|= tmp
->instance
.subdir_filter
;
486 * If the filter has changed, register a new watcher with the
489 if (watcher
.filter
!= tmp_filter
||
490 watcher
.subdir_filter
!= tmp_subdir_filter
)
492 watcher
.filter
= tmp_filter
;
493 watcher
.subdir_filter
= tmp_subdir_filter
;
501 * In case we removed all notify instances, we want to remove
502 * the watcher. We won't register a new one, if no filters are
506 TALLOC_FREE(watcher
.sys_watch
);
508 watcher
.sys_filter
= watcher
.filter
;
509 watcher
.sys_subdir_filter
= watcher
.subdir_filter
;
512 * Only register a watcher if we have filter.
514 if (watcher
.filter
!= 0 || watcher
.subdir_filter
!= 0) {
515 int ret
= sys_notify_watch(entries
,
519 &watcher
.sys_subdir_filter
,
520 notifyd_sys_callback
,
524 DBG_WARNING("sys_notify_watch for [%s] "
532 DBG_DEBUG("%s has %zu instances\n", path
, num_instances
);
534 if (num_instances
== 0) {
535 TALLOC_FREE(watcher
.sys_watch
);
537 status
= dbwrap_record_delete(rec
);
538 if (!NT_STATUS_IS_OK(status
)) {
539 DBG_WARNING("dbwrap_record_delete returned %s\n",
544 struct TDB_DATA iov
[2] = {
546 .dptr
= (uint8_t *)&watcher
,
547 .dsize
= sizeof(struct notifyd_watcher
),
550 .dptr
= (uint8_t *)instances
,
551 .dsize
= sizeof(struct notifyd_instance
) *
556 status
= dbwrap_record_storev(rec
, iov
, ARRAY_SIZE(iov
), 0);
557 if (!NT_STATUS_IS_OK(status
)) {
558 DBG_WARNING("dbwrap_record_storev returned %s\n",
570 static void notifyd_sys_callback(struct sys_notify_context
*ctx
,
571 void *private_data
, struct notify_event
*ev
,
574 struct messaging_context
*msg_ctx
= talloc_get_type_abort(
575 private_data
, struct messaging_context
);
576 struct notify_trigger_msg msg
;
580 msg
= (struct notify_trigger_msg
) {
581 .when
= timespec_current(),
582 .action
= ev
->action
,
586 iov
[0].iov_base
= &msg
;
587 iov
[0].iov_len
= offsetof(struct notify_trigger_msg
, path
);
588 iov
[1].iov_base
= discard_const_p(char, ev
->dir
);
589 iov
[1].iov_len
= strlen(ev
->dir
);
590 iov
[2].iov_base
= &slash
;
592 iov
[3].iov_base
= discard_const_p(char, ev
->path
);
593 iov
[3].iov_len
= strlen(ev
->path
)+1;
596 msg_ctx
, messaging_server_id(msg_ctx
),
597 MSG_SMB_NOTIFY_TRIGGER
, iov
, ARRAY_SIZE(iov
), NULL
, 0);
600 static bool notifyd_parse_rec_change(uint8_t *buf
, size_t bufsize
,
601 struct notify_rec_change_msg
**pmsg
,
604 struct notify_rec_change_msg
*msg
;
606 if (bufsize
< offsetof(struct notify_rec_change_msg
, path
) + 1) {
607 DBG_WARNING("message too short, ignoring: %zu\n", bufsize
);
611 *pmsg
= msg
= (struct notify_rec_change_msg
*)buf
;
612 *pathlen
= bufsize
- offsetof(struct notify_rec_change_msg
, path
);
614 DBG_DEBUG("Got rec_change_msg filter=%"PRIu32
", "
615 "subdir_filter=%"PRIu32
", private_data=%p, path=%.*s\n",
616 msg
->instance
.filter
,
617 msg
->instance
.subdir_filter
,
618 msg
->instance
.private_data
,
625 static void notifyd_rec_change(struct messaging_context
*msg_ctx
,
626 void *private_data
, uint32_t msg_type
,
627 struct server_id src
, DATA_BLOB
*data
)
629 struct notifyd_state
*state
= talloc_get_type_abort(
630 private_data
, struct notifyd_state
);
631 struct server_id_buf idbuf
;
632 struct notify_rec_change_msg
*msg
;
635 struct notify_instance instance
;
637 DBG_DEBUG("Got %zu bytes from %s\n", data
->length
,
638 server_id_str_buf(src
, &idbuf
));
640 ok
= notifyd_parse_rec_change(data
->data
, data
->length
,
646 memcpy(&instance
, &msg
->instance
, sizeof(instance
)); /* avoid SIGBUS */
648 ok
= notifyd_apply_rec_change(
649 &src
, msg
->path
, pathlen
, &instance
,
650 state
->entries
, state
->sys_notify_watch
, state
->sys_notify_ctx
,
653 DBG_DEBUG("notifyd_apply_rec_change failed, ignoring\n");
657 if ((state
->log
== NULL
) || (state
->ctdbd_conn
== NULL
)) {
661 #ifdef CLUSTER_SUPPORT
664 struct messaging_rec
**tmp
;
665 struct messaging_reclog
*log
;
666 struct iovec iov
= { .iov_base
= data
->data
, .iov_len
= data
->length
};
670 tmp
= talloc_realloc(log
, log
->recs
, struct messaging_rec
*,
673 DBG_WARNING("talloc_realloc failed, ignoring\n");
678 log
->recs
[log
->num_recs
] = messaging_rec_create(
679 log
->recs
, src
, messaging_server_id(msg_ctx
),
680 msg_type
, &iov
, 1, NULL
, 0);
682 if (log
->recs
[log
->num_recs
] == NULL
) {
683 DBG_WARNING("messaging_rec_create failed, ignoring\n");
689 if (log
->num_recs
>= 100) {
691 * Don't let the log grow too large
693 notifyd_broadcast_reclog(state
->ctdbd_conn
,
694 messaging_server_id(msg_ctx
), log
);
701 struct notifyd_trigger_state
{
702 struct messaging_context
*msg_ctx
;
703 struct notify_trigger_msg
*msg
;
705 bool covered_by_sys_notify
;
708 static void notifyd_trigger_parser(TDB_DATA key
, TDB_DATA data
,
711 static void notifyd_trigger(struct messaging_context
*msg_ctx
,
712 void *private_data
, uint32_t msg_type
,
713 struct server_id src
, DATA_BLOB
*data
)
715 struct notifyd_state
*state
= talloc_get_type_abort(
716 private_data
, struct notifyd_state
);
717 struct server_id my_id
= messaging_server_id(msg_ctx
);
718 struct notifyd_trigger_state tstate
;
720 const char *p
, *next_p
;
722 if (data
->length
< offsetof(struct notify_trigger_msg
, path
) + 1) {
723 DBG_WARNING("message too short, ignoring: %zu\n",
727 if (data
->data
[data
->length
-1] != 0) {
728 DBG_WARNING("path not 0-terminated, ignoring\n");;
732 tstate
.msg_ctx
= msg_ctx
;
734 tstate
.covered_by_sys_notify
= (src
.vnn
== my_id
.vnn
);
735 tstate
.covered_by_sys_notify
&= !server_id_equal(&src
, &my_id
);
737 tstate
.msg
= (struct notify_trigger_msg
*)data
->data
;
738 path
= tstate
.msg
->path
;
740 DBG_DEBUG("Got trigger_msg action=%"PRIu32
", filter=%"PRIu32
", "
746 if (path
[0] != '/') {
747 DBG_WARNING("path %s does not start with /, ignoring\n",
752 for (p
= strchr(path
+1, '/'); p
!= NULL
; p
= next_p
) {
753 ptrdiff_t path_len
= p
- path
;
757 next_p
= strchr(p
+1, '/');
758 tstate
.recursive
= (next_p
!= NULL
);
760 DBG_DEBUG("Trying path %.*s\n", (int)path_len
, path
);
762 key
= (TDB_DATA
) { .dptr
= discard_const_p(uint8_t, path
),
765 dbwrap_parse_record(state
->entries
, key
,
766 notifyd_trigger_parser
, &tstate
);
768 if (state
->peers
== NULL
) {
772 if (src
.vnn
!= my_id
.vnn
) {
776 for (i
=0; i
<state
->num_peers
; i
++) {
777 if (state
->peers
[i
]->db
== NULL
) {
779 * Inactive peer, did not get a db yet
783 dbwrap_parse_record(state
->peers
[i
]->db
, key
,
784 notifyd_trigger_parser
, &tstate
);
789 static void notifyd_send_delete(struct messaging_context
*msg_ctx
,
791 struct notifyd_instance
*instance
);
793 static void notifyd_trigger_parser(TDB_DATA key
, TDB_DATA data
,
797 struct notifyd_trigger_state
*tstate
= private_data
;
798 struct notify_event_msg msg
= { .action
= tstate
->msg
->action
,
799 .when
= tstate
->msg
->when
};
801 size_t path_len
= key
.dsize
;
802 struct notifyd_watcher watcher
= {};
803 struct notifyd_instance
*instances
= NULL
;
804 size_t num_instances
= 0;
808 ok
= notifyd_parse_entry(data
.dptr
,
814 DBG_DEBUG("Could not parse notifyd_entry\n");
818 DBG_DEBUG("Found %zu instances for %.*s\n",
823 iov
[0].iov_base
= &msg
;
824 iov
[0].iov_len
= offsetof(struct notify_event_msg
, path
);
825 iov
[1].iov_base
= tstate
->msg
->path
+ path_len
+ 1;
826 iov
[1].iov_len
= strlen((char *)(iov
[1].iov_base
)) + 1;
828 for (i
=0; i
<num_instances
; i
++) {
829 struct notifyd_instance
*instance
= &instances
[i
];
830 struct server_id_buf idbuf
;
834 if (tstate
->covered_by_sys_notify
) {
835 if (tstate
->recursive
) {
836 i_filter
= watcher
.sys_subdir_filter
&
837 instance
->instance
.subdir_filter
;
839 i_filter
= watcher
.sys_filter
&
840 instance
->instance
.filter
;
843 if (tstate
->recursive
) {
844 i_filter
= instance
->instance
.subdir_filter
;
846 i_filter
= instance
->instance
.filter
;
850 if ((i_filter
& tstate
->msg
->filter
) == 0) {
854 msg
.private_data
= instance
->instance
.private_data
;
856 status
= messaging_send_iov(
857 tstate
->msg_ctx
, instance
->client
,
858 MSG_PVFS_NOTIFY
, iov
, ARRAY_SIZE(iov
), NULL
, 0);
860 DBG_DEBUG("messaging_send_iov to %s returned %s\n",
861 server_id_str_buf(instance
->client
, &idbuf
),
864 if (NT_STATUS_EQUAL(status
, NT_STATUS_OBJECT_NAME_NOT_FOUND
) &&
865 procid_is_local(&instance
->client
)) {
867 * That process has died
869 notifyd_send_delete(tstate
->msg_ctx
, key
, instance
);
873 if (!NT_STATUS_IS_OK(status
)) {
874 DBG_WARNING("messaging_send_iov returned %s\n",
881 * Send a delete request to ourselves to properly discard a notify
882 * record for an smbd that has died.
885 static void notifyd_send_delete(struct messaging_context
*msg_ctx
,
887 struct notifyd_instance
*instance
)
889 struct notify_rec_change_msg msg
= {
890 .instance
.private_data
= instance
->instance
.private_data
897 * Send a rec_change to ourselves to delete a dead entry
900 iov
[0] = (struct iovec
) {
902 .iov_len
= offsetof(struct notify_rec_change_msg
, path
) };
903 iov
[1] = (struct iovec
) { .iov_base
= key
.dptr
, .iov_len
= key
.dsize
};
904 iov
[2] = (struct iovec
) { .iov_base
= &nul
, .iov_len
= sizeof(nul
) };
906 status
= messaging_send_iov(msg_ctx
,
908 MSG_SMB_NOTIFY_REC_CHANGE
,
914 if (!NT_STATUS_IS_OK(status
)) {
915 DBG_WARNING("messaging_send_iov failed: %s\n",
920 static void notifyd_get_db(struct messaging_context
*msg_ctx
,
921 void *private_data
, uint32_t msg_type
,
922 struct server_id src
, DATA_BLOB
*data
)
924 struct notifyd_state
*state
= talloc_get_type_abort(
925 private_data
, struct notifyd_state
);
926 struct server_id_buf id1
, id2
;
928 uint64_t rec_index
= UINT64_MAX
;
929 uint8_t index_buf
[sizeof(uint64_t)];
934 dbsize
= dbwrap_marshall(state
->entries
, NULL
, 0);
936 buf
= talloc_array(talloc_tos(), uint8_t, dbsize
);
938 DBG_WARNING("talloc_array(%zu) failed\n", dbsize
);
942 dbsize
= dbwrap_marshall(state
->entries
, buf
, dbsize
);
944 if (dbsize
!= talloc_get_size(buf
)) {
945 DBG_DEBUG("dbsize changed: %zu->%zu\n",
946 talloc_get_size(buf
),
952 if (state
->log
!= NULL
) {
953 rec_index
= state
->log
->rec_index
;
955 SBVAL(index_buf
, 0, rec_index
);
957 iov
[0] = (struct iovec
) { .iov_base
= index_buf
,
958 .iov_len
= sizeof(index_buf
) };
959 iov
[1] = (struct iovec
) { .iov_base
= buf
,
962 DBG_DEBUG("Sending %zu bytes to %s->%s\n",
963 iov_buflen(iov
, ARRAY_SIZE(iov
)),
964 server_id_str_buf(messaging_server_id(msg_ctx
), &id1
),
965 server_id_str_buf(src
, &id2
));
967 status
= messaging_send_iov(msg_ctx
, src
, MSG_SMB_NOTIFY_DB
,
968 iov
, ARRAY_SIZE(iov
), NULL
, 0);
970 if (!NT_STATUS_IS_OK(status
)) {
971 DBG_WARNING("messaging_send_iov failed: %s\n",
976 #ifdef CLUSTER_SUPPORT
978 static int notifyd_add_proxy_syswatches(struct db_record
*rec
,
981 static void notifyd_got_db(struct messaging_context
*msg_ctx
,
982 void *private_data
, uint32_t msg_type
,
983 struct server_id src
, DATA_BLOB
*data
)
985 struct notifyd_state
*state
= talloc_get_type_abort(
986 private_data
, struct notifyd_state
);
987 struct notifyd_peer
*p
= NULL
;
988 struct server_id_buf idbuf
;
993 for (i
=0; i
<state
->num_peers
; i
++) {
994 if (server_id_equal(&src
, &state
->peers
[i
]->pid
)) {
1001 DBG_DEBUG("Did not find peer for db from %s\n",
1002 server_id_str_buf(src
, &idbuf
));
1006 if (data
->length
< 8) {
1007 DBG_DEBUG("Got short db length %zu from %s\n", data
->length
,
1008 server_id_str_buf(src
, &idbuf
));
1013 p
->rec_index
= BVAL(data
->data
, 0);
1015 p
->db
= db_open_rbt(p
);
1016 if (p
->db
== NULL
) {
1017 DBG_DEBUG("db_open_rbt failed\n");
1022 status
= dbwrap_unmarshall(p
->db
, data
->data
+ 8,
1024 if (!NT_STATUS_IS_OK(status
)) {
1025 DBG_DEBUG("dbwrap_unmarshall returned %s for db %s\n",
1027 server_id_str_buf(src
, &idbuf
));
1032 dbwrap_traverse_read(p
->db
, notifyd_add_proxy_syswatches
, state
,
1035 DBG_DEBUG("Database from %s contained %d records\n",
1036 server_id_str_buf(src
, &idbuf
),
1040 static void notifyd_broadcast_reclog(struct ctdbd_connection
*ctdbd_conn
,
1041 struct server_id src
,
1042 struct messaging_reclog
*log
)
1044 enum ndr_err_code ndr_err
;
1045 uint8_t msghdr
[MESSAGE_HDR_LENGTH
];
1047 struct iovec iov
[2];
1054 DBG_DEBUG("rec_index=%"PRIu64
", num_recs=%"PRIu32
"\n",
1058 message_hdr_put(msghdr
, MSG_SMB_NOTIFY_REC_CHANGES
, src
,
1059 (struct server_id
) {0 });
1060 iov
[0] = (struct iovec
) { .iov_base
= msghdr
,
1061 .iov_len
= sizeof(msghdr
) };
1063 ndr_err
= ndr_push_struct_blob(
1065 (ndr_push_flags_fn_t
)ndr_push_messaging_reclog
);
1066 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) {
1067 DBG_WARNING("ndr_push_messaging_recs failed: %s\n",
1068 ndr_errstr(ndr_err
));
1071 iov
[1] = (struct iovec
) { .iov_base
= blob
.data
,
1072 .iov_len
= blob
.length
};
1074 ret
= ctdbd_messaging_send_iov(
1075 ctdbd_conn
, CTDB_BROADCAST_CONNECTED
,
1076 CTDB_SRVID_SAMBA_NOTIFY_PROXY
, iov
, ARRAY_SIZE(iov
));
1077 TALLOC_FREE(blob
.data
);
1079 DBG_WARNING("ctdbd_messaging_send failed: %s\n",
1084 log
->rec_index
+= 1;
1088 TALLOC_FREE(log
->recs
);
1091 struct notifyd_broadcast_reclog_state
{
1092 struct tevent_context
*ev
;
1093 struct ctdbd_connection
*ctdbd_conn
;
1094 struct server_id src
;
1095 struct messaging_reclog
*log
;
1098 static void notifyd_broadcast_reclog_next(struct tevent_req
*subreq
);
1100 static struct tevent_req
*notifyd_broadcast_reclog_send(
1101 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
1102 struct ctdbd_connection
*ctdbd_conn
, struct server_id src
,
1103 struct messaging_reclog
*log
)
1105 struct tevent_req
*req
, *subreq
;
1106 struct notifyd_broadcast_reclog_state
*state
;
1108 req
= tevent_req_create(mem_ctx
, &state
,
1109 struct notifyd_broadcast_reclog_state
);
1114 state
->ctdbd_conn
= ctdbd_conn
;
1118 subreq
= tevent_wakeup_send(state
, state
->ev
,
1119 timeval_current_ofs_msec(1000));
1120 if (tevent_req_nomem(subreq
, req
)) {
1121 return tevent_req_post(req
, ev
);
1123 tevent_req_set_callback(subreq
, notifyd_broadcast_reclog_next
, req
);
1127 static void notifyd_broadcast_reclog_next(struct tevent_req
*subreq
)
1129 struct tevent_req
*req
= tevent_req_callback_data(
1130 subreq
, struct tevent_req
);
1131 struct notifyd_broadcast_reclog_state
*state
= tevent_req_data(
1132 req
, struct notifyd_broadcast_reclog_state
);
1135 ok
= tevent_wakeup_recv(subreq
);
1136 TALLOC_FREE(subreq
);
1138 tevent_req_oom(req
);
1142 notifyd_broadcast_reclog(state
->ctdbd_conn
, state
->src
, state
->log
);
1144 subreq
= tevent_wakeup_send(state
, state
->ev
,
1145 timeval_current_ofs_msec(1000));
1146 if (tevent_req_nomem(subreq
, req
)) {
1149 tevent_req_set_callback(subreq
, notifyd_broadcast_reclog_next
, req
);
1152 static int notifyd_broadcast_reclog_recv(struct tevent_req
*req
)
1154 return tevent_req_simple_recv_unix(req
);
1157 struct notifyd_clean_peers_state
{
1158 struct tevent_context
*ev
;
1159 struct notifyd_state
*notifyd
;
1162 static void notifyd_clean_peers_next(struct tevent_req
*subreq
);
1164 static struct tevent_req
*notifyd_clean_peers_send(
1165 TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
,
1166 struct notifyd_state
*notifyd
)
1168 struct tevent_req
*req
, *subreq
;
1169 struct notifyd_clean_peers_state
*state
;
1171 req
= tevent_req_create(mem_ctx
, &state
,
1172 struct notifyd_clean_peers_state
);
1177 state
->notifyd
= notifyd
;
1179 subreq
= tevent_wakeup_send(state
, state
->ev
,
1180 timeval_current_ofs_msec(30000));
1181 if (tevent_req_nomem(subreq
, req
)) {
1182 return tevent_req_post(req
, ev
);
1184 tevent_req_set_callback(subreq
, notifyd_clean_peers_next
, req
);
1188 static void notifyd_clean_peers_next(struct tevent_req
*subreq
)
1190 struct tevent_req
*req
= tevent_req_callback_data(
1191 subreq
, struct tevent_req
);
1192 struct notifyd_clean_peers_state
*state
= tevent_req_data(
1193 req
, struct notifyd_clean_peers_state
);
1194 struct notifyd_state
*notifyd
= state
->notifyd
;
1197 time_t now
= time(NULL
);
1199 ok
= tevent_wakeup_recv(subreq
);
1200 TALLOC_FREE(subreq
);
1202 tevent_req_oom(req
);
1207 while (i
< notifyd
->num_peers
) {
1208 struct notifyd_peer
*p
= notifyd
->peers
[i
];
1210 if ((now
- p
->last_broadcast
) > 60) {
1211 struct server_id_buf idbuf
;
1214 * Haven't heard for more than 60 seconds. Call this
1218 DBG_DEBUG("peer %s died\n",
1219 server_id_str_buf(p
->pid
, &idbuf
));
1221 * This implicitly decrements notifyd->num_peers
1229 subreq
= tevent_wakeup_send(state
, state
->ev
,
1230 timeval_current_ofs_msec(30000));
1231 if (tevent_req_nomem(subreq
, req
)) {
1234 tevent_req_set_callback(subreq
, notifyd_clean_peers_next
, req
);
1237 static int notifyd_clean_peers_recv(struct tevent_req
*req
)
1239 return tevent_req_simple_recv_unix(req
);
1242 static int notifyd_add_proxy_syswatches(struct db_record
*rec
,
1245 struct notifyd_state
*state
= talloc_get_type_abort(
1246 private_data
, struct notifyd_state
);
1247 struct db_context
*db
= dbwrap_record_get_db(rec
);
1248 TDB_DATA key
= dbwrap_record_get_key(rec
);
1249 TDB_DATA value
= dbwrap_record_get_value(rec
);
1250 struct notifyd_watcher watcher
= {};
1251 char path
[key
.dsize
+1];
1255 memcpy(path
, key
.dptr
, key
.dsize
);
1256 path
[key
.dsize
] = '\0';
1258 /* This is a remote database, we just need the watcher. */
1259 ok
= notifyd_parse_entry(value
.dptr
, value
.dsize
, &watcher
, NULL
, NULL
);
1261 DBG_WARNING("Could not parse notifyd entry for %s\n", path
);
1265 watcher
.sys_watch
= NULL
;
1266 watcher
.sys_filter
= watcher
.filter
;
1267 watcher
.sys_subdir_filter
= watcher
.subdir_filter
;
1269 ret
= state
->sys_notify_watch(db
,
1270 state
->sys_notify_ctx
,
1273 &watcher
.subdir_filter
,
1274 notifyd_sys_callback
,
1276 &watcher
.sys_watch
);
1278 DBG_WARNING("inotify_watch returned %s\n", strerror(errno
));
1281 memcpy(value
.dptr
, &watcher
, sizeof(struct notifyd_watcher
));
1286 static int notifyd_db_del_syswatches(struct db_record
*rec
, void *private_data
)
1288 TDB_DATA key
= dbwrap_record_get_key(rec
);
1289 TDB_DATA value
= dbwrap_record_get_value(rec
);
1290 struct notifyd_watcher watcher
= {};
1293 ok
= notifyd_parse_entry(value
.dptr
, value
.dsize
, &watcher
, NULL
, NULL
);
1295 DBG_WARNING("Could not parse notifyd entry for %.*s\n",
1296 (int)key
.dsize
, (char *)key
.dptr
);
1299 TALLOC_FREE(watcher
.sys_watch
);
1304 static int notifyd_peer_destructor(struct notifyd_peer
*p
)
1306 struct notifyd_state
*state
= p
->state
;
1309 if (p
->db
!= NULL
) {
1310 dbwrap_traverse_read(p
->db
, notifyd_db_del_syswatches
,
1314 for (i
= 0; i
<state
->num_peers
; i
++) {
1315 if (p
== state
->peers
[i
]) {
1316 state
->peers
[i
] = state
->peers
[state
->num_peers
-1];
1317 state
->num_peers
-= 1;
1324 static struct notifyd_peer
*notifyd_peer_new(
1325 struct notifyd_state
*state
, struct server_id pid
)
1327 struct notifyd_peer
*p
, **tmp
;
1329 tmp
= talloc_realloc(state
, state
->peers
, struct notifyd_peer
*,
1330 state
->num_peers
+1);
1336 p
= talloc_zero(state
->peers
, struct notifyd_peer
);
1343 state
->peers
[state
->num_peers
] = p
;
1344 state
->num_peers
+= 1;
1346 talloc_set_destructor(p
, notifyd_peer_destructor
);
1351 static void notifyd_apply_reclog(struct notifyd_peer
*peer
,
1352 const uint8_t *msg
, size_t msglen
)
1354 struct notifyd_state
*state
= peer
->state
;
1355 DATA_BLOB blob
= { .data
= discard_const_p(uint8_t, msg
),
1357 struct server_id_buf idbuf
;
1358 struct messaging_reclog
*log
;
1359 enum ndr_err_code ndr_err
;
1362 if (peer
->db
== NULL
) {
1369 log
= talloc(peer
, struct messaging_reclog
);
1371 DBG_DEBUG("talloc failed\n");
1375 ndr_err
= ndr_pull_struct_blob_all(
1377 (ndr_pull_flags_fn_t
)ndr_pull_messaging_reclog
);
1378 if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err
)) {
1379 DBG_DEBUG("ndr_pull_messaging_reclog failed: %s\n",
1380 ndr_errstr(ndr_err
));
1384 DBG_DEBUG("Got %"PRIu32
" recs index %"PRIu64
" from %s\n",
1387 server_id_str_buf(peer
->pid
, &idbuf
));
1389 if (log
->rec_index
!= peer
->rec_index
) {
1390 DBG_INFO("Got rec index %"PRIu64
" from %s, "
1391 "expected %"PRIu64
"\n",
1393 server_id_str_buf(peer
->pid
, &idbuf
),
1398 for (i
=0; i
<log
->num_recs
; i
++) {
1399 struct messaging_rec
*r
= log
->recs
[i
];
1400 struct notify_rec_change_msg
*chg
;
1403 struct notify_instance instance
;
1405 ok
= notifyd_parse_rec_change(r
->buf
.data
, r
->buf
.length
,
1408 DBG_INFO("notifyd_parse_rec_change failed\n");
1413 memcpy(&instance
, &chg
->instance
, sizeof(instance
));
1415 ok
= notifyd_apply_rec_change(&r
->src
, chg
->path
, pathlen
,
1416 &instance
, peer
->db
,
1417 state
->sys_notify_watch
,
1418 state
->sys_notify_ctx
,
1421 DBG_INFO("notifyd_apply_rec_change failed\n");
1426 peer
->rec_index
+= 1;
1427 peer
->last_broadcast
= time(NULL
);
1433 DBG_DEBUG("Dropping peer %s\n",
1434 server_id_str_buf(peer
->pid
, &idbuf
));
1439 * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE
1440 * messages) broadcasts by other notifyds. Several cases:
1442 * We don't know the source. This creates a new peer. Creating a peer
1443 * involves asking the peer for its full database. We assume ordered
1444 * messages, so the new database will arrive before the next broadcast
1447 * We know the source and the log index matches. We will apply the log
1448 * locally to our peer's db as if we had received it from a local
1451 * We know the source but the log index does not match. This means we
1452 * lost a message. We just drop the whole peer and wait for the next
1453 * broadcast, which will then trigger a fresh database pull.
1456 static int notifyd_snoop_broadcast(struct tevent_context
*ev
,
1457 uint32_t src_vnn
, uint32_t dst_vnn
,
1459 const uint8_t *msg
, size_t msglen
,
1462 struct notifyd_state
*state
= talloc_get_type_abort(
1463 private_data
, struct notifyd_state
);
1464 struct server_id my_id
= messaging_server_id(state
->msg_ctx
);
1465 struct notifyd_peer
*p
;
1468 struct server_id src
, dst
;
1469 struct server_id_buf idbuf
;
1472 if (msglen
< MESSAGE_HDR_LENGTH
) {
1473 DBG_DEBUG("Got short broadcast\n");
1476 message_hdr_get(&msg_type
, &src
, &dst
, msg
);
1478 if (msg_type
!= MSG_SMB_NOTIFY_REC_CHANGES
) {
1479 DBG_DEBUG("Got message %"PRIu32
", ignoring\n", msg_type
);
1482 if (server_id_equal(&src
, &my_id
)) {
1483 DBG_DEBUG("Ignoring my own broadcast\n");
1487 DBG_DEBUG("Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n",
1488 server_id_str_buf(src
, &idbuf
));
1490 for (i
=0; i
<state
->num_peers
; i
++) {
1491 if (server_id_equal(&state
->peers
[i
]->pid
, &src
)) {
1493 DBG_DEBUG("Applying changes to peer %"PRIu32
"\n", i
);
1495 notifyd_apply_reclog(state
->peers
[i
],
1496 msg
+ MESSAGE_HDR_LENGTH
,
1497 msglen
- MESSAGE_HDR_LENGTH
);
1502 DBG_DEBUG("Creating new peer for %s\n",
1503 server_id_str_buf(src
, &idbuf
));
1505 p
= notifyd_peer_new(state
, src
);
1507 DBG_DEBUG("notifyd_peer_new failed\n");
1511 status
= messaging_send_buf(state
->msg_ctx
, src
, MSG_SMB_NOTIFY_GET_DB
,
1513 if (!NT_STATUS_IS_OK(status
)) {
1514 DBG_DEBUG("messaging_send_buf failed: %s\n",