2 Unix SMB/CIFS implementation.
3 Watch dbwrap record changes
4 Copyright (C) Volker Lendecke 2012
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "system/filesys.h"
22 #include "lib/util/server_id.h"
23 #include "dbwrap/dbwrap.h"
24 #include "dbwrap_watch.h"
25 #include "dbwrap_open.h"
26 #include "lib/util/util_tdb.h"
27 #include "lib/util/tevent_ntstatus.h"
29 #include "server_id_watch.h"
30 #include "lib/dbwrap/dbwrap_private.h"
32 struct dbwrap_watcher
{
34 * Process watching this record
38 * Individual instance inside the waiter, incremented each
39 * time a watcher is created
44 #define DBWRAP_WATCHER_BUF_LENGTH (SERVER_ID_BUF_LENGTH + sizeof(uint64_t))
45 #define DBWRAP_MAX_WATCHERS (INT32_MAX/DBWRAP_WATCHER_BUF_LENGTH)
48 * Watched records contain a header of:
50 * [uint32] num_records
51 * 0 [DBWRAP_WATCHER_BUF_LENGTH] \
52 * 1 [DBWRAP_WATCHER_BUF_LENGTH] |
53 * .. |- Array of watchers
54 * (num_records-1)[DBWRAP_WATCHER_BUF_LENGTH] /
56 * [Remainder of record....]
58 * If this header is absent then this is a
59 * fresh record of length zero (no watchers).
62 static bool dbwrap_watch_rec_parse(
65 size_t *pnum_watchers
,
70 if (data
.dsize
== 0) {
72 if (pwatchers
!= NULL
) {
75 if (pnum_watchers
!= NULL
) {
79 *pdata
= (TDB_DATA
) { .dptr
= NULL
};
84 if (data
.dsize
< sizeof(uint32_t)) {
89 num_watchers
= IVAL(data
.dptr
, 0);
91 data
.dptr
+= sizeof(uint32_t);
92 data
.dsize
-= sizeof(uint32_t);
94 if (num_watchers
> data
.dsize
/DBWRAP_WATCHER_BUF_LENGTH
) {
99 if (pwatchers
!= NULL
) {
100 *pwatchers
= data
.dptr
;
102 if (pnum_watchers
!= NULL
) {
103 *pnum_watchers
= num_watchers
;
106 size_t watchers_len
= num_watchers
* DBWRAP_WATCHER_BUF_LENGTH
;
107 *pdata
= (TDB_DATA
) {
108 .dptr
= data
.dptr
+ watchers_len
,
109 .dsize
= data
.dsize
- watchers_len
116 static void dbwrap_watcher_get(struct dbwrap_watcher
*w
,
117 const uint8_t buf
[DBWRAP_WATCHER_BUF_LENGTH
])
119 server_id_get(&w
->pid
, buf
);
120 w
->instance
= BVAL(buf
, SERVER_ID_BUF_LENGTH
);
123 static void dbwrap_watcher_put(uint8_t buf
[DBWRAP_WATCHER_BUF_LENGTH
],
124 const struct dbwrap_watcher
*w
)
126 server_id_put(buf
, w
->pid
);
127 SBVAL(buf
, SERVER_ID_BUF_LENGTH
, w
->instance
);
130 static void dbwrap_watch_log_invalid_record(
131 struct db_context
*db
, TDB_DATA key
, TDB_DATA value
)
133 DBG_ERR("Found invalid record in %s\n", dbwrap_name(db
));
134 dump_data(1, key
.dptr
, key
.dsize
);
135 dump_data(1, value
.dptr
, value
.dsize
);
138 struct db_watched_ctx
{
139 struct db_context
*backend
;
140 struct messaging_context
*msg
;
143 struct db_watched_record
{
144 struct db_record
*rec
;
145 struct server_id self
;
147 struct db_record
*rec
;
148 TDB_DATA initial_value
;
151 bool force_fini_store
;
152 struct dbwrap_watcher added
;
156 * The is the number of watcher records
157 * parsed from backend.initial_value
161 * This is the pointer to
162 * the optentially first watcher record
163 * parsed from backend.initial_value
165 * The pointer actually points to memory
166 * in backend.initial_value.
168 * Note it might be NULL, if count is 0.
172 * This remembers if we already
173 * notified the watchers.
175 * As we only need to do that once during:
178 * between rec = fetch_locked
185 struct dbwrap_watcher watcher
;
189 static struct db_watched_record
*db_record_get_watched_record(struct db_record
*rec
)
192 * we can't use wrec = talloc_get_type_abort() here!
193 * because wrec is likely a stack variable in
194 * dbwrap_watched_do_locked_fn()
196 * In order to have a least some protection
197 * we verify the cross reference pointers
198 * between rec and wrec
200 struct db_watched_record
*wrec
=
201 (struct db_watched_record
*)rec
->private_data
;
202 SMB_ASSERT(wrec
->rec
== rec
);
206 static NTSTATUS
dbwrap_watched_record_storev(
207 struct db_watched_record
*wrec
,
208 const TDB_DATA
*dbufs
, int num_dbufs
, int flags
);
209 static NTSTATUS
dbwrap_watched_storev(struct db_record
*rec
,
210 const TDB_DATA
*dbufs
, int num_dbufs
,
212 static NTSTATUS
dbwrap_watched_delete(struct db_record
*rec
);
213 static void dbwrap_watched_trigger_wakeup(struct messaging_context
*msg_ctx
,
214 struct dbwrap_watcher
*watcher
);
215 static int db_watched_record_destructor(struct db_watched_record
*wrec
);
217 static void db_watched_record_init(struct db_context
*db
,
218 struct messaging_context
*msg_ctx
,
219 struct db_record
*rec
,
220 struct db_watched_record
*wrec
,
221 struct db_record
*backend_rec
,
222 TDB_DATA backend_value
)
226 *rec
= (struct db_record
) {
228 .key
= dbwrap_record_get_key(backend_rec
),
229 .storev
= dbwrap_watched_storev
,
230 .delete_rec
= dbwrap_watched_delete
,
231 .private_data
= wrec
,
234 *wrec
= (struct db_watched_record
) {
236 .self
= messaging_server_id(msg_ctx
),
239 .initial_value
= backend_value
,
240 .initial_valid
= true,
244 ok
= dbwrap_watch_rec_parse(backend_value
,
245 &wrec
->watchers
.first
,
246 &wrec
->watchers
.count
,
249 dbwrap_watch_log_invalid_record(rec
->db
, rec
->key
, backend_value
);
250 /* wipe invalid data */
251 rec
->value
= (TDB_DATA
) { .dptr
= NULL
, .dsize
= 0 };
255 static struct db_record
*dbwrap_watched_fetch_locked(
256 struct db_context
*db
, TALLOC_CTX
*mem_ctx
, TDB_DATA key
)
258 struct db_watched_ctx
*ctx
= talloc_get_type_abort(
259 db
->private_data
, struct db_watched_ctx
);
260 struct db_record
*rec
= NULL
;
261 struct db_watched_record
*wrec
= NULL
;
262 struct db_record
*backend_rec
= NULL
;
263 TDB_DATA backend_value
= { .dptr
= NULL
, };
265 rec
= talloc_zero(mem_ctx
, struct db_record
);
269 wrec
= talloc_zero(rec
, struct db_watched_record
);
275 backend_rec
= dbwrap_fetch_locked(ctx
->backend
, wrec
, key
);
276 if (backend_rec
== NULL
) {
280 backend_value
= dbwrap_record_get_value(backend_rec
);
282 db_watched_record_init(db
, ctx
->msg
,
284 backend_rec
, backend_value
);
285 rec
->value_valid
= true;
286 talloc_set_destructor(wrec
, db_watched_record_destructor
);
291 struct db_watched_record_fini_state
{
292 struct db_watched_record
*wrec
;
299 static void db_watched_record_fini_fetcher(TDB_DATA key
,
300 TDB_DATA backend_value
,
303 struct db_watched_record_fini_state
*state
=
304 (struct db_watched_record_fini_state
*)private_data
;
305 struct db_watched_record
*wrec
= state
->wrec
;
306 struct db_record
*rec
= wrec
->rec
;
312 * We're within dbwrap_parse_record()
313 * and backend_value directly points into
314 * the mmap'ed tdb, so we need to copy the
318 ok
= dbwrap_watch_rec_parse(backend_value
, NULL
, NULL
, &value
);
320 struct db_context
*db
= dbwrap_record_get_db(rec
);
322 dbwrap_watch_log_invalid_record(db
, key
, backend_value
);
324 /* wipe invalid data */
325 value
= (TDB_DATA
) { .dptr
= NULL
, .dsize
= 0 };
328 copy_size
= MIN(rec
->value
.dsize
, value
.dsize
);
329 if (copy_size
!= 0) {
331 * First reuse the buffer we already had
334 memcpy(rec
->value
.dptr
, value
.dptr
, copy_size
);
335 state
->dbufs
[state
->num_dbufs
++] = rec
->value
;
336 value
.dsize
-= copy_size
;
337 value
.dptr
+= copy_size
;
340 if (value
.dsize
!= 0) {
344 * There's still new data left
345 * allocate it on callers stackframe
347 p
= talloc_memdup(state
->frame
, value
.dptr
, value
.dsize
);
349 DBG_WARNING("failed to allocate %zu bytes\n",
354 state
->dbufs
[state
->num_dbufs
++] = (TDB_DATA
) {
355 .dptr
= p
, .dsize
= value
.dsize
,
362 static void db_watched_record_fini(struct db_watched_record
*wrec
)
364 struct db_watched_record_fini_state state
= { .wrec
= wrec
, };
365 struct db_context
*backend
= dbwrap_record_get_db(wrec
->backend
.rec
);
366 struct db_record
*rec
= wrec
->rec
;
367 TDB_DATA key
= dbwrap_record_get_key(wrec
->backend
.rec
);
370 if (!wrec
->force_fini_store
) {
374 if (wrec
->backend
.initial_valid
) {
375 if (rec
->value
.dsize
!= 0) {
376 state
.dbufs
[state
.num_dbufs
++] = rec
->value
;
380 * We need to fetch the current
381 * value from the backend again,
382 * which may need to allocate memory
383 * on the provided stackframe.
386 state
.frame
= talloc_stackframe();
388 status
= dbwrap_parse_record(backend
, key
,
389 db_watched_record_fini_fetcher
, &state
);
390 if (!NT_STATUS_IS_OK(status
)) {
391 DBG_WARNING("dbwrap_parse_record failed: %s\n",
393 TALLOC_FREE(state
.frame
);
397 TALLOC_FREE(state
.frame
);
403 * We don't want to wake up others just because
404 * we added ourself as new watcher. But if we
405 * removed outself from the first position
406 * we need to alert the next one.
408 if (!wrec
->removed_first
) {
409 dbwrap_watched_watch_skip_alerting(rec
);
412 status
= dbwrap_watched_record_storev(wrec
, state
.dbufs
, state
.num_dbufs
, 0);
413 TALLOC_FREE(state
.frame
);
414 if (!NT_STATUS_IS_OK(status
)) {
415 DBG_WARNING("dbwrap_watched_record_storev failed: %s\n",
423 static int db_watched_record_destructor(struct db_watched_record
*wrec
)
425 struct db_record
*rec
= wrec
->rec
;
426 struct db_watched_ctx
*ctx
= talloc_get_type_abort(
427 rec
->db
->private_data
, struct db_watched_ctx
);
429 db_watched_record_fini(wrec
);
430 TALLOC_FREE(wrec
->backend
.rec
);
431 dbwrap_watched_trigger_wakeup(ctx
->msg
, &wrec
->wakeup
.watcher
);
435 struct dbwrap_watched_do_locked_state
{
436 struct db_context
*db
;
437 struct messaging_context
*msg_ctx
;
438 struct db_watched_record
*wrec
;
439 struct db_record
*rec
;
440 void (*fn
)(struct db_record
*rec
,
446 static void dbwrap_watched_do_locked_fn(
447 struct db_record
*backend_rec
,
448 TDB_DATA backend_value
,
451 struct dbwrap_watched_do_locked_state
*state
=
452 (struct dbwrap_watched_do_locked_state
*)private_data
;
454 db_watched_record_init(state
->db
, state
->msg_ctx
,
455 state
->rec
, state
->wrec
,
456 backend_rec
, backend_value
);
458 state
->fn(state
->rec
, state
->rec
->value
, state
->private_data
);
460 db_watched_record_fini(state
->wrec
);
463 static NTSTATUS
dbwrap_watched_do_locked(struct db_context
*db
, TDB_DATA key
,
464 void (*fn
)(struct db_record
*rec
,
469 struct db_watched_ctx
*ctx
= talloc_get_type_abort(
470 db
->private_data
, struct db_watched_ctx
);
471 struct db_watched_record wrec
;
472 struct db_record rec
;
473 struct dbwrap_watched_do_locked_state state
= {
474 .db
= db
, .msg_ctx
= ctx
->msg
,
475 .rec
= &rec
, .wrec
= &wrec
,
476 .fn
= fn
, .private_data
= private_data
,
480 status
= dbwrap_do_locked(
481 ctx
->backend
, key
, dbwrap_watched_do_locked_fn
, &state
);
482 if (!NT_STATUS_IS_OK(status
)) {
483 DBG_DEBUG("dbwrap_do_locked returned %s\n", nt_errstr(status
));
487 DBG_DEBUG("dbwrap_watched_do_locked_fn returned\n");
489 dbwrap_watched_trigger_wakeup(state
.msg_ctx
, &wrec
.wakeup
.watcher
);
494 static void dbwrap_watched_record_prepare_wakeup(
495 struct db_watched_record
*wrec
)
498 * Wakeup only needs to happen once (if at all)
500 if (wrec
->watchers
.alerted
) {
504 wrec
->watchers
.alerted
= true;
506 if (wrec
->watchers
.count
== 0) {
507 DBG_DEBUG("No watchers\n");
511 while (wrec
->watchers
.count
!= 0) {
512 struct server_id_buf tmp
;
515 dbwrap_watcher_get(&wrec
->wakeup
.watcher
, wrec
->watchers
.first
);
516 exists
= serverid_exists(&wrec
->wakeup
.watcher
.pid
);
518 DBG_DEBUG("Discard non-existing waiter %s:%"PRIu64
"\n",
519 server_id_str_buf(wrec
->wakeup
.watcher
.pid
, &tmp
),
520 wrec
->wakeup
.watcher
.instance
);
521 wrec
->watchers
.first
+= DBWRAP_WATCHER_BUF_LENGTH
;
522 wrec
->watchers
.count
-= 1;
527 * We will only wakeup the first waiter, via
528 * dbwrap_watched_trigger_wakeup(), but keep
529 * all (including the first one) in the list that
530 * will be flushed back to the backend record
531 * again. Waiters are removing their entries
532 * via dbwrap_watched_watch_remove_instance()
533 * when they no longer want to monitor the record.
535 DBG_DEBUG("Will alert first waiter %s:%"PRIu64
"\n",
536 server_id_str_buf(wrec
->wakeup
.watcher
.pid
, &tmp
),
537 wrec
->wakeup
.watcher
.instance
);
542 static void dbwrap_watched_trigger_wakeup(struct messaging_context
*msg_ctx
,
543 struct dbwrap_watcher
*watcher
)
545 struct server_id_buf tmp
;
546 uint8_t instance_buf
[8];
549 if (watcher
->instance
== 0) {
550 DBG_DEBUG("No one to wakeup\n");
554 DBG_DEBUG("Alerting %s:%"PRIu64
"\n",
555 server_id_str_buf(watcher
->pid
, &tmp
),
558 SBVAL(instance_buf
, 0, watcher
->instance
);
560 status
= messaging_send_buf(
565 sizeof(instance_buf
));
566 if (!NT_STATUS_IS_OK(status
)) {
567 DBG_WARNING("messaging_send_buf to %s failed: %s - ignoring...\n",
568 server_id_str_buf(watcher
->pid
, &tmp
),
573 static NTSTATUS
dbwrap_watched_record_storev(
574 struct db_watched_record
*wrec
,
575 const TDB_DATA
*dbufs
, int num_dbufs
, int flags
)
577 uint8_t num_watchers_buf
[4] = { 0 };
578 uint8_t add_buf
[DBWRAP_WATCHER_BUF_LENGTH
];
579 size_t num_store_watchers
;
580 TDB_DATA my_dbufs
[num_dbufs
+3];
581 int num_my_dbufs
= 0;
583 size_t add_count
= 0;
585 dbwrap_watched_record_prepare_wakeup(wrec
);
587 wrec
->backend
.initial_valid
= false;
588 wrec
->force_fini_store
= false;
590 if (wrec
->added
.pid
.pid
!= 0) {
591 dbwrap_watcher_put(add_buf
, &wrec
->added
);
595 num_store_watchers
= wrec
->watchers
.count
+ add_count
;
596 if (num_store_watchers
== 0 && num_dbufs
== 0) {
597 status
= dbwrap_record_delete(wrec
->backend
.rec
);
600 if (num_store_watchers
>= DBWRAP_MAX_WATCHERS
) {
601 DBG_WARNING("Can't handle %zu watchers\n",
603 return NT_STATUS_INSUFFICIENT_RESOURCES
;
606 SIVAL(num_watchers_buf
, 0, num_store_watchers
);
608 my_dbufs
[num_my_dbufs
++] = (TDB_DATA
) {
609 .dptr
= num_watchers_buf
, .dsize
= sizeof(num_watchers_buf
),
611 if (wrec
->watchers
.count
!= 0) {
612 my_dbufs
[num_my_dbufs
++] = (TDB_DATA
) {
613 .dptr
= wrec
->watchers
.first
, .dsize
= wrec
->watchers
.count
* DBWRAP_WATCHER_BUF_LENGTH
,
616 if (add_count
!= 0) {
617 my_dbufs
[num_my_dbufs
++] = (TDB_DATA
) {
619 .dsize
= sizeof(add_buf
),
622 if (num_dbufs
!= 0) {
623 memcpy(my_dbufs
+num_my_dbufs
, dbufs
, num_dbufs
* sizeof(*dbufs
));
624 num_my_dbufs
+= num_dbufs
;
627 SMB_ASSERT(num_my_dbufs
<= ARRAY_SIZE(my_dbufs
));
629 status
= dbwrap_record_storev(
630 wrec
->backend
.rec
, my_dbufs
, num_my_dbufs
, flags
);
634 static NTSTATUS
dbwrap_watched_storev(struct db_record
*rec
,
635 const TDB_DATA
*dbufs
, int num_dbufs
,
638 struct db_watched_record
*wrec
= db_record_get_watched_record(rec
);
640 return dbwrap_watched_record_storev(wrec
, dbufs
, num_dbufs
, flags
);
643 static NTSTATUS
dbwrap_watched_delete(struct db_record
*rec
)
645 struct db_watched_record
*wrec
= db_record_get_watched_record(rec
);
648 * dbwrap_watched_record_storev() will figure out
649 * if the record should be deleted or if there are still
650 * watchers to be stored.
652 return dbwrap_watched_record_storev(wrec
, NULL
, 0, 0);
655 struct dbwrap_watched_traverse_state
{
656 int (*fn
)(struct db_record
*rec
, void *private_data
);
660 static int dbwrap_watched_traverse_fn(struct db_record
*rec
,
663 struct dbwrap_watched_traverse_state
*state
= private_data
;
664 struct db_record prec
= *rec
;
667 ok
= dbwrap_watch_rec_parse(rec
->value
, NULL
, NULL
, &prec
.value
);
671 if (prec
.value
.dsize
== 0) {
674 prec
.value_valid
= true;
676 return state
->fn(&prec
, state
->private_data
);
679 static int dbwrap_watched_traverse(struct db_context
*db
,
680 int (*fn
)(struct db_record
*rec
,
684 struct db_watched_ctx
*ctx
= talloc_get_type_abort(
685 db
->private_data
, struct db_watched_ctx
);
686 struct dbwrap_watched_traverse_state state
= {
687 .fn
= fn
, .private_data
= private_data
};
691 status
= dbwrap_traverse(
692 ctx
->backend
, dbwrap_watched_traverse_fn
, &state
, &ret
);
693 if (!NT_STATUS_IS_OK(status
)) {
699 static int dbwrap_watched_traverse_read(struct db_context
*db
,
700 int (*fn
)(struct db_record
*rec
,
704 struct db_watched_ctx
*ctx
= talloc_get_type_abort(
705 db
->private_data
, struct db_watched_ctx
);
706 struct dbwrap_watched_traverse_state state
= {
707 .fn
= fn
, .private_data
= private_data
};
711 status
= dbwrap_traverse_read(
712 ctx
->backend
, dbwrap_watched_traverse_fn
, &state
, &ret
);
713 if (!NT_STATUS_IS_OK(status
)) {
719 static int dbwrap_watched_get_seqnum(struct db_context
*db
)
721 struct db_watched_ctx
*ctx
= talloc_get_type_abort(
722 db
->private_data
, struct db_watched_ctx
);
723 return dbwrap_get_seqnum(ctx
->backend
);
726 static int dbwrap_watched_transaction_start(struct db_context
*db
)
728 struct db_watched_ctx
*ctx
= talloc_get_type_abort(
729 db
->private_data
, struct db_watched_ctx
);
730 return dbwrap_transaction_start(ctx
->backend
);
733 static int dbwrap_watched_transaction_commit(struct db_context
*db
)
735 struct db_watched_ctx
*ctx
= talloc_get_type_abort(
736 db
->private_data
, struct db_watched_ctx
);
737 return dbwrap_transaction_commit(ctx
->backend
);
740 static int dbwrap_watched_transaction_cancel(struct db_context
*db
)
742 struct db_watched_ctx
*ctx
= talloc_get_type_abort(
743 db
->private_data
, struct db_watched_ctx
);
744 return dbwrap_transaction_cancel(ctx
->backend
);
747 struct dbwrap_watched_parse_record_state
{
748 struct db_context
*db
;
749 void (*parser
)(TDB_DATA key
, TDB_DATA data
, void *private_data
);
754 static void dbwrap_watched_parse_record_parser(TDB_DATA key
, TDB_DATA data
,
757 struct dbwrap_watched_parse_record_state
*state
= private_data
;
760 state
->ok
= dbwrap_watch_rec_parse(data
, NULL
, NULL
, &userdata
);
762 dbwrap_watch_log_invalid_record(state
->db
, key
, data
);
766 state
->parser(key
, userdata
, state
->private_data
);
769 static NTSTATUS
dbwrap_watched_parse_record(
770 struct db_context
*db
, TDB_DATA key
,
771 void (*parser
)(TDB_DATA key
, TDB_DATA data
, void *private_data
),
774 struct db_watched_ctx
*ctx
= talloc_get_type_abort(
775 db
->private_data
, struct db_watched_ctx
);
776 struct dbwrap_watched_parse_record_state state
= {
779 .private_data
= private_data
,
783 status
= dbwrap_parse_record(
784 ctx
->backend
, key
, dbwrap_watched_parse_record_parser
, &state
);
785 if (!NT_STATUS_IS_OK(status
)) {
789 return NT_STATUS_NOT_FOUND
;
794 static void dbwrap_watched_parse_record_done(struct tevent_req
*subreq
);
796 static struct tevent_req
*dbwrap_watched_parse_record_send(
798 struct tevent_context
*ev
,
799 struct db_context
*db
,
801 void (*parser
)(TDB_DATA key
, TDB_DATA data
, void *private_data
),
803 enum dbwrap_req_state
*req_state
)
805 struct db_watched_ctx
*ctx
= talloc_get_type_abort(
806 db
->private_data
, struct db_watched_ctx
);
807 struct tevent_req
*req
= NULL
;
808 struct tevent_req
*subreq
= NULL
;
809 struct dbwrap_watched_parse_record_state
*state
= NULL
;
811 req
= tevent_req_create(mem_ctx
, &state
,
812 struct dbwrap_watched_parse_record_state
);
814 *req_state
= DBWRAP_REQ_ERROR
;
818 *state
= (struct dbwrap_watched_parse_record_state
) {
820 .private_data
= private_data
,
824 subreq
= dbwrap_parse_record_send(state
,
828 dbwrap_watched_parse_record_parser
,
831 if (tevent_req_nomem(subreq
, req
)) {
832 *req_state
= DBWRAP_REQ_ERROR
;
833 return tevent_req_post(req
, ev
);
836 tevent_req_set_callback(subreq
, dbwrap_watched_parse_record_done
, req
);
840 static void dbwrap_watched_parse_record_done(struct tevent_req
*subreq
)
842 struct tevent_req
*req
= tevent_req_callback_data(
843 subreq
, struct tevent_req
);
844 struct dbwrap_watched_parse_record_state
*state
= tevent_req_data(
845 req
, struct dbwrap_watched_parse_record_state
);
848 status
= dbwrap_parse_record_recv(subreq
);
850 if (tevent_req_nterror(req
, status
)) {
855 tevent_req_nterror(req
, NT_STATUS_NOT_FOUND
);
859 tevent_req_done(req
);
863 static NTSTATUS
dbwrap_watched_parse_record_recv(struct tevent_req
*req
)
867 if (tevent_req_is_nterror(req
, &status
)) {
868 tevent_req_received(req
);
872 tevent_req_received(req
);
876 static int dbwrap_watched_exists(struct db_context
*db
, TDB_DATA key
)
878 struct db_watched_ctx
*ctx
= talloc_get_type_abort(
879 db
->private_data
, struct db_watched_ctx
);
881 return dbwrap_exists(ctx
->backend
, key
);
884 static size_t dbwrap_watched_id(struct db_context
*db
, uint8_t *id
,
887 struct db_watched_ctx
*ctx
= talloc_get_type_abort(
888 db
->private_data
, struct db_watched_ctx
);
890 return dbwrap_db_id(ctx
->backend
, id
, idlen
);
893 struct db_context
*db_open_watched(TALLOC_CTX
*mem_ctx
,
894 struct db_context
**backend
,
895 struct messaging_context
*msg
)
897 struct db_context
*db
;
898 struct db_watched_ctx
*ctx
;
900 db
= talloc_zero(mem_ctx
, struct db_context
);
904 ctx
= talloc_zero(db
, struct db_watched_ctx
);
909 db
->private_data
= ctx
;
913 ctx
->backend
= talloc_move(ctx
, backend
);
914 db
->lock_order
= ctx
->backend
->lock_order
;
915 ctx
->backend
->lock_order
= DBWRAP_LOCK_ORDER_NONE
;
917 db
->fetch_locked
= dbwrap_watched_fetch_locked
;
918 db
->do_locked
= dbwrap_watched_do_locked
;
919 db
->traverse
= dbwrap_watched_traverse
;
920 db
->traverse_read
= dbwrap_watched_traverse_read
;
921 db
->get_seqnum
= dbwrap_watched_get_seqnum
;
922 db
->transaction_start
= dbwrap_watched_transaction_start
;
923 db
->transaction_commit
= dbwrap_watched_transaction_commit
;
924 db
->transaction_cancel
= dbwrap_watched_transaction_cancel
;
925 db
->parse_record
= dbwrap_watched_parse_record
;
926 db
->parse_record_send
= dbwrap_watched_parse_record_send
;
927 db
->parse_record_recv
= dbwrap_watched_parse_record_recv
;
928 db
->exists
= dbwrap_watched_exists
;
929 db
->id
= dbwrap_watched_id
;
930 db
->name
= dbwrap_name(ctx
->backend
);
935 uint64_t dbwrap_watched_watch_add_instance(struct db_record
*rec
)
937 struct db_watched_record
*wrec
= db_record_get_watched_record(rec
);
938 static uint64_t global_instance
= 1;
940 SMB_ASSERT(wrec
->added
.instance
== 0);
942 wrec
->added
= (struct dbwrap_watcher
) {
944 .instance
= global_instance
++,
947 wrec
->force_fini_store
= true;
949 return wrec
->added
.instance
;
952 void dbwrap_watched_watch_remove_instance(struct db_record
*rec
, uint64_t instance
)
954 struct db_watched_record
*wrec
= db_record_get_watched_record(rec
);
955 struct dbwrap_watcher clear_watcher
= {
957 .instance
= instance
,
960 struct server_id_buf buf
;
966 if (wrec
->added
.instance
== instance
) {
967 SMB_ASSERT(server_id_equal(&wrec
->added
.pid
, &wrec
->self
));
968 DBG_DEBUG("Watcher %s:%"PRIu64
" reverted from adding\n",
969 server_id_str_buf(clear_watcher
.pid
, &buf
),
970 clear_watcher
.instance
);
971 ZERO_STRUCT(wrec
->added
);
974 for (i
=0; i
< wrec
->watchers
.count
; i
++) {
975 struct dbwrap_watcher watcher
;
976 size_t off
= i
*DBWRAP_WATCHER_BUF_LENGTH
;
981 dbwrap_watcher_get(&watcher
, wrec
->watchers
.first
+ off
);
983 if (clear_watcher
.instance
!= watcher
.instance
) {
986 if (!server_id_equal(&clear_watcher
.pid
, &watcher
.pid
)) {
990 wrec
->force_fini_store
= true;
993 DBG_DEBUG("Watcher %s:%"PRIu64
" removed from first position of %zu\n",
994 server_id_str_buf(clear_watcher
.pid
, &buf
),
995 clear_watcher
.instance
,
996 wrec
->watchers
.count
);
997 wrec
->watchers
.first
+= DBWRAP_WATCHER_BUF_LENGTH
;
998 wrec
->watchers
.count
-= 1;
999 wrec
->removed_first
= true;
1002 if (i
== (wrec
->watchers
.count
-1)) {
1003 DBG_DEBUG("Watcher %s:%"PRIu64
" removed from last position of %zu\n",
1004 server_id_str_buf(clear_watcher
.pid
, &buf
),
1005 clear_watcher
.instance
,
1006 wrec
->watchers
.count
);
1007 wrec
->watchers
.count
-= 1;
1011 DBG_DEBUG("Watcher %s:%"PRIu64
" cleared at position %zu from %zu\n",
1012 server_id_str_buf(clear_watcher
.pid
, &buf
),
1013 clear_watcher
.instance
, i
+1,
1014 wrec
->watchers
.count
);
1016 next_off
= off
+ DBWRAP_WATCHER_BUF_LENGTH
;
1017 full_len
= wrec
->watchers
.count
* DBWRAP_WATCHER_BUF_LENGTH
;
1018 move_len
= full_len
- next_off
;
1019 memmove(wrec
->watchers
.first
+ off
,
1020 wrec
->watchers
.first
+ next_off
,
1022 wrec
->watchers
.count
-= 1;
1026 DBG_DEBUG("Watcher %s:%"PRIu64
" not found in %zu watchers\n",
1027 server_id_str_buf(clear_watcher
.pid
, &buf
),
1028 clear_watcher
.instance
,
1029 wrec
->watchers
.count
);
1033 void dbwrap_watched_watch_skip_alerting(struct db_record
*rec
)
1035 struct db_watched_record
*wrec
= db_record_get_watched_record(rec
);
1037 wrec
->wakeup
.watcher
= (struct dbwrap_watcher
) { .instance
= 0, };
1038 wrec
->watchers
.alerted
= true;
1041 void dbwrap_watched_watch_reset_alerting(struct db_record
*rec
)
1043 struct db_watched_record
*wrec
= db_record_get_watched_record(rec
);
1045 wrec
->wakeup
.watcher
= (struct dbwrap_watcher
) { .instance
= 0, };
1046 wrec
->watchers
.alerted
= false;
1049 void dbwrap_watched_watch_force_alerting(struct db_record
*rec
)
1051 struct db_watched_record
*wrec
= db_record_get_watched_record(rec
);
1053 dbwrap_watched_record_prepare_wakeup(wrec
);
1056 struct dbwrap_watched_watch_state
{
1057 struct db_context
*db
;
1059 struct dbwrap_watcher watcher
;
1060 struct server_id blocker
;
1064 static bool dbwrap_watched_msg_filter(struct messaging_rec
*rec
,
1065 void *private_data
);
1066 static void dbwrap_watched_watch_done(struct tevent_req
*subreq
);
1067 static void dbwrap_watched_watch_blocker_died(struct tevent_req
*subreq
);
1068 static int dbwrap_watched_watch_state_destructor(
1069 struct dbwrap_watched_watch_state
*state
);
1071 struct tevent_req
*dbwrap_watched_watch_send(TALLOC_CTX
*mem_ctx
,
1072 struct tevent_context
*ev
,
1073 struct db_record
*rec
,
1074 uint64_t resumed_instance
,
1075 struct server_id blocker
)
1077 struct db_context
*db
= dbwrap_record_get_db(rec
);
1078 struct db_watched_ctx
*ctx
= talloc_get_type_abort(
1079 db
->private_data
, struct db_watched_ctx
);
1080 struct db_watched_record
*wrec
= db_record_get_watched_record(rec
);
1081 struct tevent_req
*req
, *subreq
;
1082 struct dbwrap_watched_watch_state
*state
;
1085 req
= tevent_req_create(mem_ctx
, &state
,
1086 struct dbwrap_watched_watch_state
);
1091 state
->blocker
= blocker
;
1093 if (ctx
->msg
== NULL
) {
1094 tevent_req_nterror(req
, NT_STATUS_NOT_SUPPORTED
);
1095 return tevent_req_post(req
, ev
);
1098 if (resumed_instance
== 0 && wrec
->added
.instance
== 0) {
1100 * Adding a new instance
1102 instance
= dbwrap_watched_watch_add_instance(rec
);
1103 } else if (resumed_instance
!= 0 && wrec
->added
.instance
== 0) {
1105 * Resuming an existing instance that was
1106 * already present before do_locked started
1108 instance
= resumed_instance
;
1109 } else if (resumed_instance
== wrec
->added
.instance
) {
1111 * The caller used dbwrap_watched_watch_add_instance()
1112 * already during this do_locked() invocation.
1114 instance
= resumed_instance
;
1116 tevent_req_nterror(req
, NT_STATUS_REQUEST_NOT_ACCEPTED
);
1117 return tevent_req_post(req
, ev
);
1120 state
->watcher
= (struct dbwrap_watcher
) {
1121 .pid
= messaging_server_id(ctx
->msg
),
1122 .instance
= instance
,
1125 state
->key
= tdb_data_talloc_copy(state
, rec
->key
);
1126 if (tevent_req_nomem(state
->key
.dptr
, req
)) {
1127 return tevent_req_post(req
, ev
);
1130 subreq
= messaging_filtered_read_send(
1131 state
, ev
, ctx
->msg
, dbwrap_watched_msg_filter
, state
);
1132 if (tevent_req_nomem(subreq
, req
)) {
1133 return tevent_req_post(req
, ev
);
1135 tevent_req_set_callback(subreq
, dbwrap_watched_watch_done
, req
);
1137 talloc_set_destructor(state
, dbwrap_watched_watch_state_destructor
);
1139 if (blocker
.pid
!= 0) {
1140 subreq
= server_id_watch_send(state
, ev
, blocker
);
1141 if (tevent_req_nomem(subreq
, req
)) {
1142 return tevent_req_post(req
, ev
);
1144 tevent_req_set_callback(
1145 subreq
, dbwrap_watched_watch_blocker_died
, req
);
1151 static void dbwrap_watched_watch_blocker_died(struct tevent_req
*subreq
)
1153 struct tevent_req
*req
= tevent_req_callback_data(
1154 subreq
, struct tevent_req
);
1155 struct dbwrap_watched_watch_state
*state
= tevent_req_data(
1156 req
, struct dbwrap_watched_watch_state
);
1159 ret
= server_id_watch_recv(subreq
, NULL
);
1160 TALLOC_FREE(subreq
);
1162 tevent_req_nterror(req
, map_nt_error_from_unix(ret
));
1165 state
->blockerdead
= true;
1166 tevent_req_done(req
);
1169 static void dbwrap_watched_watch_state_destructor_fn(
1170 struct db_record
*rec
,
1174 struct dbwrap_watched_watch_state
*state
= talloc_get_type_abort(
1175 private_data
, struct dbwrap_watched_watch_state
);
1178 * Here we just remove ourself from the in memory
1179 * watchers array and let db_watched_record_fini()
1180 * call dbwrap_watched_record_storev() to do the magic
1181 * of writing back the modified in memory copy.
1183 dbwrap_watched_watch_remove_instance(rec
, state
->watcher
.instance
);
1187 static int dbwrap_watched_watch_state_destructor(
1188 struct dbwrap_watched_watch_state
*state
)
1192 status
= dbwrap_do_locked(
1195 dbwrap_watched_watch_state_destructor_fn
,
1197 if (!NT_STATUS_IS_OK(status
)) {
1198 DBG_WARNING("dbwrap_do_locked failed: %s\n",
1204 static bool dbwrap_watched_msg_filter(struct messaging_rec
*rec
,
1207 struct dbwrap_watched_watch_state
*state
= talloc_get_type_abort(
1208 private_data
, struct dbwrap_watched_watch_state
);
1211 if (rec
->msg_type
!= MSG_DBWRAP_MODIFIED
) {
1214 if (rec
->num_fds
!= 0) {
1218 if (rec
->buf
.length
!= sizeof(instance
)) {
1219 DBG_DEBUG("Got size %zu, expected %zu\n",
1225 instance
= BVAL(rec
->buf
.data
, 0);
1227 if (instance
!= state
->watcher
.instance
) {
1228 DBG_DEBUG("Got instance %"PRIu64
", expected %"PRIu64
"\n",
1230 state
->watcher
.instance
);
1237 static void dbwrap_watched_watch_done(struct tevent_req
*subreq
)
1239 struct tevent_req
*req
= tevent_req_callback_data(
1240 subreq
, struct tevent_req
);
1241 struct dbwrap_watched_watch_state
*state
= tevent_req_data(
1242 req
, struct dbwrap_watched_watch_state
);
1243 struct messaging_rec
*rec
;
1246 ret
= messaging_filtered_read_recv(subreq
, state
, &rec
);
1247 TALLOC_FREE(subreq
);
1249 tevent_req_nterror(req
, map_nt_error_from_unix(ret
));
1252 tevent_req_done(req
);
1255 NTSTATUS
dbwrap_watched_watch_recv(struct tevent_req
*req
,
1256 uint64_t *pkeep_instance
,
1258 struct server_id
*blocker
)
1260 struct dbwrap_watched_watch_state
*state
= tevent_req_data(
1261 req
, struct dbwrap_watched_watch_state
);
1264 if (tevent_req_is_nterror(req
, &status
)) {
1265 tevent_req_received(req
);
1268 if (pkeep_instance
!= NULL
) {
1269 *pkeep_instance
= state
->watcher
.instance
;
1271 * No need to remove ourselves anymore,
1272 * the caller will take care of removing itself.
1274 talloc_set_destructor(state
, NULL
);
1276 if (blockerdead
!= NULL
) {
1277 *blockerdead
= state
->blockerdead
;
1279 if (blocker
!= NULL
) {
1280 *blocker
= state
->blocker
;
1282 tevent_req_received(req
);
1283 return NT_STATUS_OK
;