2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007-2009
5 Copyright (C) Michael Adam 2009
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 #include "system/filesys.h"
23 #include "lib/tdb_wrap/tdb_wrap.h"
25 #include "dbwrap/dbwrap.h"
26 #include "dbwrap/dbwrap_ctdb.h"
27 #include "dbwrap/dbwrap_rbt.h"
28 #include "lib/param/param.h"
30 #include "ctdb/include/ctdb_protocol.h"
31 #include "ctdbd_conn.h"
32 #include "dbwrap/dbwrap.h"
33 #include "dbwrap/dbwrap_private.h"
34 #include "dbwrap/dbwrap_ctdb.h"
37 #include "messages_ctdb.h"
38 #include "lib/cluster_support.h"
39 #include "lib/util/tevent_ntstatus.h"
41 struct db_ctdb_transaction_handle
{
42 struct db_ctdb_ctx
*ctx
;
44 * we store the writes done under a transaction:
46 struct ctdb_marshall_buffer
*m_write
;
53 struct db_context
*db
;
54 struct tdb_wrap
*wtdb
;
56 struct db_ctdb_transaction_handle
*transaction
;
57 struct g_lock_ctx
*lock_ctx
;
59 /* thresholds for warning messages */
60 int warn_unlock_msecs
;
61 int warn_migrate_msecs
;
62 int warn_migrate_attempts
;
63 int warn_locktime_msecs
;
67 struct db_ctdb_ctx
*ctdb_ctx
;
68 struct ctdb_ltdb_header header
;
69 struct timeval lock_time
;
72 struct ctdb_async_ctx
{
74 struct ctdbd_connection
*async_conn
;
77 static struct ctdb_async_ctx ctdb_async_ctx
;
79 static int ctdb_async_ctx_init_internal(TALLOC_CTX
*mem_ctx
,
80 struct tevent_context
*ev
,
86 TALLOC_FREE(ctdb_async_ctx
.async_conn
);
87 ctdb_async_ctx
.initialized
= false;
90 if (ctdb_async_ctx
.initialized
) {
95 ret
= ctdbd_init_async_connection(
99 &ctdb_async_ctx
.async_conn
);
103 DBG_ERR("ctdbd_init_async_connection(%s, timeout=%d) "
104 "failed: ret=%d %s\n",
111 SMB_ASSERT(ctdb_async_ctx
.async_conn
!= NULL
);
113 ctdb_async_ctx
.initialized
= true;
117 static int ctdb_async_ctx_init(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
)
119 return ctdb_async_ctx_init_internal(mem_ctx
, ev
, false);
122 int ctdb_async_ctx_reinit(TALLOC_CTX
*mem_ctx
, struct tevent_context
*ev
)
124 return ctdb_async_ctx_init_internal(mem_ctx
, ev
, true);
127 static NTSTATUS
tdb_error_to_ntstatus(struct tdb_context
*tdb
)
129 enum TDB_ERROR tret
= tdb_error(tdb
);
131 return map_nt_error_from_tdb(tret
);
134 struct db_ctdb_ltdb_parse_state
{
135 void (*parser
)(TDB_DATA key
, struct ctdb_ltdb_header
*header
,
136 TDB_DATA data
, void *private_data
);
140 static int db_ctdb_ltdb_parser(TDB_DATA key
, TDB_DATA data
,
143 struct db_ctdb_ltdb_parse_state
*state
=
144 (struct db_ctdb_ltdb_parse_state
*)private_data
;
146 if (data
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
151 key
, (struct ctdb_ltdb_header
*)data
.dptr
,
152 make_tdb_data(data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
153 data
.dsize
- sizeof(struct ctdb_ltdb_header
)),
154 state
->private_data
);
158 static NTSTATUS
db_ctdb_ltdb_parse(
159 struct db_ctdb_ctx
*db
, TDB_DATA key
,
160 void (*parser
)(TDB_DATA key
, struct ctdb_ltdb_header
*header
,
161 TDB_DATA data
, void *private_data
),
164 struct db_ctdb_ltdb_parse_state state
;
167 state
.parser
= parser
;
168 state
.private_data
= private_data
;
170 ret
= tdb_parse_record(db
->wtdb
->tdb
, key
, db_ctdb_ltdb_parser
,
173 return NT_STATUS_NOT_FOUND
;
179 * Store a record together with the ctdb record header
180 * in the local copy of the database.
182 static NTSTATUS
db_ctdb_ltdb_store(struct db_ctdb_ctx
*db
,
184 struct ctdb_ltdb_header
*header
,
185 const TDB_DATA
*dbufs
, int num_dbufs
)
187 TDB_DATA recs
[num_dbufs
+1];
190 recs
[0] = (TDB_DATA
) { .dptr
= (uint8_t *)header
,
191 .dsize
= sizeof(struct ctdb_ltdb_header
) };
192 memcpy(&recs
[1], dbufs
, sizeof(TDB_DATA
) * num_dbufs
);
194 ret
= tdb_storev(db
->wtdb
->tdb
, key
, recs
, num_dbufs
+ 1, TDB_REPLACE
);
196 return (ret
== 0) ? NT_STATUS_OK
197 : tdb_error_to_ntstatus(db
->wtdb
->tdb
);
202 form a ctdb_rec_data record from a key/data pair
204 static struct ctdb_rec_data_old
*db_ctdb_marshall_record(TALLOC_CTX
*mem_ctx
, uint32_t reqid
,
206 struct ctdb_ltdb_header
*header
,
210 struct ctdb_rec_data_old
*d
;
212 length
= offsetof(struct ctdb_rec_data_old
, data
) + key
.dsize
+
213 data
.dsize
+ sizeof(*header
);
214 d
= (struct ctdb_rec_data_old
*)talloc_size(mem_ctx
, length
);
220 d
->keylen
= key
.dsize
;
221 memcpy(&d
->data
[0], key
.dptr
, key
.dsize
);
223 d
->datalen
= data
.dsize
+ sizeof(*header
);
224 memcpy(&d
->data
[key
.dsize
], header
, sizeof(*header
));
225 memcpy(&d
->data
[key
.dsize
+sizeof(*header
)], data
.dptr
, data
.dsize
);
230 /* helper function for marshalling multiple records */
231 static struct ctdb_marshall_buffer
*db_ctdb_marshall_add(TALLOC_CTX
*mem_ctx
,
232 struct ctdb_marshall_buffer
*m
,
236 struct ctdb_ltdb_header
*header
,
239 struct ctdb_rec_data_old
*r
;
240 size_t m_size
, r_size
;
241 struct ctdb_marshall_buffer
*m2
= NULL
;
243 r
= db_ctdb_marshall_record(talloc_tos(), reqid
, key
, header
, data
);
250 m
= (struct ctdb_marshall_buffer
*)talloc_zero_size(
251 mem_ctx
, offsetof(struct ctdb_marshall_buffer
, data
));
258 m_size
= talloc_get_size(m
);
259 r_size
= talloc_get_size(r
);
261 m2
= (struct ctdb_marshall_buffer
*)talloc_realloc_size(
262 mem_ctx
, m
, m_size
+ r_size
);
268 memcpy(m_size
+ (uint8_t *)m2
, r
, r_size
);
277 /* we've finished marshalling, return a data blob with the marshalled records */
278 static TDB_DATA
db_ctdb_marshall_finish(struct ctdb_marshall_buffer
*m
)
281 data
.dptr
= (uint8_t *)m
;
282 data
.dsize
= talloc_get_size(m
);
287 loop over a marshalling buffer
289 - pass r==NULL to start
290 - loop the number of times indicated by m->count
292 static struct ctdb_rec_data_old
*db_ctdb_marshall_loop_next_key(
293 struct ctdb_marshall_buffer
*m
, struct ctdb_rec_data_old
*r
, TDB_DATA
*key
)
296 r
= (struct ctdb_rec_data_old
*)&m
->data
[0];
298 r
= (struct ctdb_rec_data_old
*)(r
->length
+ (uint8_t *)r
);
301 key
->dptr
= &r
->data
[0];
302 key
->dsize
= r
->keylen
;
306 static bool db_ctdb_marshall_buf_parse(
307 struct ctdb_rec_data_old
*r
, uint32_t *reqid
,
308 struct ctdb_ltdb_header
**header
, TDB_DATA
*data
)
310 if (r
->datalen
< sizeof(struct ctdb_ltdb_header
)) {
316 data
->dptr
= &r
->data
[r
->keylen
] + sizeof(struct ctdb_ltdb_header
);
317 data
->dsize
= r
->datalen
- sizeof(struct ctdb_ltdb_header
);
319 *header
= (struct ctdb_ltdb_header
*)&r
->data
[r
->keylen
];
325 * CTDB transaction destructor
327 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle
*h
)
331 status
= g_lock_unlock(h
->ctx
->lock_ctx
,
332 string_term_tdb_data(h
->lock_name
));
333 if (!NT_STATUS_IS_OK(status
)) {
334 DEBUG(0, ("g_lock_unlock failed for %s: %s\n", h
->lock_name
,
342 * CTDB dbwrap API: transaction_start function
343 * starts a transaction on a persistent database
345 static int db_ctdb_transaction_start(struct db_context
*db
)
347 struct db_ctdb_transaction_handle
*h
;
349 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
352 if (!db
->persistent
) {
353 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
358 if (ctx
->transaction
) {
359 ctx
->transaction
->nesting
++;
360 DEBUG(5, (__location__
" transaction start on db 0x%08x: nesting %d -> %d\n",
361 ctx
->db_id
, ctx
->transaction
->nesting
- 1, ctx
->transaction
->nesting
));
365 h
= talloc_zero(db
, struct db_ctdb_transaction_handle
);
367 DEBUG(0,(__location__
" oom for transaction handle\n"));
373 h
->lock_name
= talloc_asprintf(h
, "transaction_db_0x%08x",
374 (unsigned int)ctx
->db_id
);
375 if (h
->lock_name
== NULL
) {
376 DEBUG(0, ("talloc_asprintf failed\n"));
382 * Wait a day, i.e. forever...
384 status
= g_lock_lock(ctx
->lock_ctx
,
385 string_term_tdb_data(h
->lock_name
),
387 tevent_timeval_set(86400, 0),
390 if (!NT_STATUS_IS_OK(status
)) {
391 DEBUG(0, ("g_lock_lock failed: %s\n", nt_errstr(status
)));
396 talloc_set_destructor(h
, db_ctdb_transaction_destructor
);
398 ctx
->transaction
= h
;
400 DEBUG(5,(__location__
" transaction started on db 0x%08x\n", ctx
->db_id
));
405 static bool parse_newest_in_marshall_buffer(
406 struct ctdb_marshall_buffer
*buf
, TDB_DATA key
,
407 void (*parser
)(TDB_DATA key
, struct ctdb_ltdb_header
*header
,
408 TDB_DATA data
, void *private_data
),
411 struct ctdb_rec_data_old
*rec
= NULL
;
412 struct ctdb_ltdb_header
*h
= NULL
;
421 * Walk the list of records written during this
422 * transaction. If we want to read one we have already
423 * written, return the last written sample. Thus we do not do
424 * a "break;" for the first hit, this record might have been
428 for (i
=0; i
<buf
->count
; i
++) {
432 rec
= db_ctdb_marshall_loop_next_key(buf
, rec
, &tkey
);
437 if (!tdb_data_equal(key
, tkey
)) {
441 if (!db_ctdb_marshall_buf_parse(rec
, &reqid
, &h
, &data
)) {
450 parser(key
, h
, data
, private_data
);
455 struct pull_newest_from_marshall_buffer_state
{
456 struct ctdb_ltdb_header
*pheader
;
461 static void pull_newest_from_marshall_buffer_parser(
462 TDB_DATA key
, struct ctdb_ltdb_header
*header
,
463 TDB_DATA data
, void *private_data
)
465 struct pull_newest_from_marshall_buffer_state
*state
=
466 (struct pull_newest_from_marshall_buffer_state
*)private_data
;
468 if (state
->pheader
!= NULL
) {
469 memcpy(state
->pheader
, header
, sizeof(*state
->pheader
));
471 if (state
->pdata
!= NULL
) {
472 state
->pdata
->dsize
= data
.dsize
;
473 state
->pdata
->dptr
= (uint8_t *)talloc_memdup(
474 state
->mem_ctx
, data
.dptr
, data
.dsize
);
478 static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer
*buf
,
480 struct ctdb_ltdb_header
*pheader
,
484 struct pull_newest_from_marshall_buffer_state state
;
486 state
.pheader
= pheader
;
487 state
.mem_ctx
= mem_ctx
;
490 if (!parse_newest_in_marshall_buffer(
491 buf
, key
, pull_newest_from_marshall_buffer_parser
,
495 if ((pdata
!= NULL
) && (pdata
->dsize
!= 0) && (pdata
->dptr
== NULL
)) {
502 static NTSTATUS
db_ctdb_storev_transaction(struct db_record
*rec
,
503 const TDB_DATA
*dbufs
, int num_dbufs
,
505 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
);
507 static struct db_record
*db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx
*ctx
,
511 struct db_record
*result
;
514 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
515 DEBUG(0, ("talloc failed\n"));
519 result
->db
= ctx
->db
;
520 result
->private_data
= ctx
->transaction
;
522 result
->key
.dsize
= key
.dsize
;
523 result
->key
.dptr
= (uint8_t *)talloc_memdup(result
, key
.dptr
,
525 if (result
->key
.dptr
== NULL
) {
526 DEBUG(0, ("talloc failed\n"));
531 result
->storev
= db_ctdb_storev_transaction
;
532 result
->delete_rec
= db_ctdb_delete_transaction
;
534 if (ctx
->transaction
== NULL
) {
535 DEBUG(0, ("no transaction available\n"));
539 if (pull_newest_from_marshall_buffer(ctx
->transaction
->m_write
, key
,
540 NULL
, result
, &result
->value
)) {
541 result
->value_valid
= true;
545 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
546 if (ctdb_data
.dptr
== NULL
) {
547 /* create the record */
548 result
->value
= tdb_null
;
549 result
->value_valid
= true;
553 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
554 result
->value
.dptr
= NULL
;
556 if ((result
->value
.dsize
!= 0)
557 && !(result
->value
.dptr
= (uint8_t *)talloc_memdup(
558 result
, ctdb_data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
559 result
->value
.dsize
))) {
560 DEBUG(0, ("talloc failed\n"));
564 result
->value_valid
= true;
566 SAFE_FREE(ctdb_data
.dptr
);
571 static int db_ctdb_record_destructor(struct db_record
**recp
)
573 struct db_record
*rec
= talloc_get_type_abort(*recp
, struct db_record
);
574 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
575 rec
->private_data
, struct db_ctdb_transaction_handle
);
576 int ret
= h
->ctx
->db
->transaction_commit(h
->ctx
->db
);
578 DEBUG(0,(__location__
" transaction_commit failed\n"));
584 auto-create a transaction for persistent databases
586 static struct db_record
*db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx
*ctx
,
591 struct db_record
*rec
, **recp
;
593 res
= db_ctdb_transaction_start(ctx
->db
);
598 rec
= db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
600 ctx
->db
->transaction_cancel(ctx
->db
);
604 /* destroy this transaction when we release the lock */
605 recp
= talloc(rec
, struct db_record
*);
607 ctx
->db
->transaction_cancel(ctx
->db
);
612 talloc_set_destructor(recp
, db_ctdb_record_destructor
);
618 stores a record inside a transaction
620 static NTSTATUS
db_ctdb_transaction_store(struct db_ctdb_transaction_handle
*h
,
621 TDB_DATA key
, TDB_DATA data
)
623 TALLOC_CTX
*tmp_ctx
= talloc_new(h
);
625 struct ctdb_ltdb_header header
;
629 /* we need the header so we can update the RSN */
631 if (!pull_newest_from_marshall_buffer(h
->m_write
, key
, &header
,
634 rec
= tdb_fetch(h
->ctx
->wtdb
->tdb
, key
);
636 if (rec
.dptr
!= NULL
) {
637 memcpy(&header
, rec
.dptr
,
638 sizeof(struct ctdb_ltdb_header
));
639 rec
.dsize
-= sizeof(struct ctdb_ltdb_header
);
642 * a special case, we are writing the same
643 * data that is there now
645 if (data
.dsize
== rec
.dsize
&&
647 rec
.dptr
+ sizeof(struct ctdb_ltdb_header
),
650 talloc_free(tmp_ctx
);
657 header
.dmaster
= get_my_vnn();
660 h
->m_write
= db_ctdb_marshall_add(h
, h
->m_write
, h
->ctx
->db_id
, 0, key
, &header
, data
);
661 if (h
->m_write
== NULL
) {
662 DEBUG(0,(__location__
" Failed to add to marshalling record\n"));
663 talloc_free(tmp_ctx
);
664 return NT_STATUS_NO_MEMORY
;
667 talloc_free(tmp_ctx
);
673 a record store inside a transaction
675 static NTSTATUS
db_ctdb_storev_transaction(
676 struct db_record
*rec
, const TDB_DATA
*dbufs
, int num_dbufs
, int flag
)
678 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
679 rec
->private_data
, struct db_ctdb_transaction_handle
);
683 status
= dbwrap_merge_dbufs(&data
, rec
, dbufs
, num_dbufs
);
684 if (!NT_STATUS_IS_OK(status
)) {
688 status
= db_ctdb_transaction_store(h
, rec
->key
, data
);
690 TALLOC_FREE(data
.dptr
);
696 a record delete inside a transaction
698 static NTSTATUS
db_ctdb_delete_transaction(struct db_record
*rec
)
700 struct db_ctdb_transaction_handle
*h
= talloc_get_type_abort(
701 rec
->private_data
, struct db_ctdb_transaction_handle
);
704 status
= db_ctdb_transaction_store(h
, rec
->key
, tdb_null
);
708 static void db_ctdb_fetch_db_seqnum_parser(
709 TDB_DATA key
, struct ctdb_ltdb_header
*header
,
710 TDB_DATA data
, void *private_data
)
712 uint64_t *seqnum
= (uint64_t *)private_data
;
714 if (data
.dsize
!= sizeof(uint64_t)) {
718 memcpy(seqnum
, data
.dptr
, sizeof(*seqnum
));
722 * Fetch the db sequence number of a persistent db directly from the db.
724 static NTSTATUS
db_ctdb_fetch_db_seqnum_from_db(struct db_ctdb_ctx
*db
,
730 if (seqnum
== NULL
) {
731 return NT_STATUS_INVALID_PARAMETER
;
734 key
= string_term_tdb_data(CTDB_DB_SEQNUM_KEY
);
736 status
= db_ctdb_ltdb_parse(
737 db
, key
, db_ctdb_fetch_db_seqnum_parser
, seqnum
);
739 if (NT_STATUS_IS_OK(status
)) {
742 if (NT_STATUS_EQUAL(status
, NT_STATUS_NOT_FOUND
)) {
750 * Store the database sequence number inside a transaction.
752 static NTSTATUS
db_ctdb_store_db_seqnum(struct db_ctdb_transaction_handle
*h
,
756 TDB_DATA key
= string_term_tdb_data(CTDB_DB_SEQNUM_KEY
);
757 TDB_DATA data
= { .dptr
=(uint8_t *)&seqnum
, .dsize
=sizeof(seqnum
) };
759 status
= db_ctdb_transaction_store(h
, key
, data
);
767 static int db_ctdb_transaction_commit(struct db_context
*db
)
769 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
773 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
774 uint64_t old_seqnum
, new_seqnum
;
778 DEBUG(0,(__location__
" transaction commit with no open transaction on db 0x%08x\n", ctx
->db_id
));
782 if (h
->nested_cancel
) {
783 db
->transaction_cancel(db
);
784 DEBUG(5,(__location__
" Failed transaction commit after nested cancel\n"));
788 if (h
->nesting
!= 0) {
790 DEBUG(5, (__location__
" transaction commit on db 0x%08x: nesting %d -> %d\n",
791 ctx
->db_id
, ctx
->transaction
->nesting
+ 1, ctx
->transaction
->nesting
));
795 if (h
->m_write
== NULL
) {
797 * No changes were made, so don't change the seqnum,
798 * don't push to other node, just exit with success.
804 DEBUG(5,(__location__
" transaction commit on db 0x%08x\n", ctx
->db_id
));
807 * As the last db action before committing, bump the database sequence
808 * number. Note that this undoes all changes to the seqnum records
809 * performed under the transaction. This record is not meant to be
810 * modified by user interaction. It is for internal use only...
812 rets
= db_ctdb_fetch_db_seqnum_from_db(ctx
, &old_seqnum
);
813 if (!NT_STATUS_IS_OK(rets
)) {
814 DEBUG(1, (__location__
" failed to fetch the db sequence number "
815 "in transaction commit on db 0x%08x\n", ctx
->db_id
));
820 new_seqnum
= old_seqnum
+ 1;
822 rets
= db_ctdb_store_db_seqnum(h
, new_seqnum
);
823 if (!NT_STATUS_IS_OK(rets
)) {
824 DEBUG(1, (__location__
"failed to store the db sequence number "
825 " in transaction commit on db 0x%08x\n", ctx
->db_id
));
831 /* tell ctdbd to commit to the other nodes */
832 ret
= ctdbd_control_local(messaging_ctdb_connection(),
833 CTDB_CONTROL_TRANS3_COMMIT
,
835 db_ctdb_marshall_finish(h
->m_write
),
836 NULL
, NULL
, &status
);
837 if ((ret
!= 0) || status
!= 0) {
839 * The TRANS3_COMMIT control should only possibly fail when a
840 * recovery has been running concurrently. In any case, the db
841 * will be the same on all nodes, either the new copy or the
842 * old copy. This can be detected by comparing the old and new
843 * local sequence numbers.
845 rets
= db_ctdb_fetch_db_seqnum_from_db(ctx
, &new_seqnum
);
846 if (!NT_STATUS_IS_OK(rets
)) {
847 DEBUG(1, (__location__
" failed to refetch db sequence "
848 "number after failed TRANS3_COMMIT\n"));
853 if (new_seqnum
== old_seqnum
) {
854 /* Recovery prevented all our changes: retry. */
857 if (new_seqnum
!= (old_seqnum
+ 1)) {
858 DEBUG(0, (__location__
" ERROR: new_seqnum[%lu] != "
859 "old_seqnum[%lu] + (0 or 1) after failed "
860 "TRANS3_COMMIT - this should not happen!\n",
861 (unsigned long)new_seqnum
,
862 (unsigned long)old_seqnum
));
867 * Recovery propagated our changes to all nodes, completing
868 * our commit for us - succeed.
875 h
->ctx
->transaction
= NULL
;
884 static int db_ctdb_transaction_cancel(struct db_context
*db
)
886 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
888 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
891 DEBUG(0,(__location__
" transaction cancel with no open transaction on db 0x%08x\n", ctx
->db_id
));
895 if (h
->nesting
!= 0) {
897 h
->nested_cancel
= true;
898 DEBUG(5, (__location__
" transaction cancel on db 0x%08x: nesting %d -> %d\n",
899 ctx
->db_id
, ctx
->transaction
->nesting
+ 1, ctx
->transaction
->nesting
));
903 DEBUG(5,(__location__
" Cancel transaction on db 0x%08x\n", ctx
->db_id
));
905 ctx
->transaction
= NULL
;
911 static NTSTATUS
db_ctdb_storev(struct db_record
*rec
,
912 const TDB_DATA
*dbufs
, int num_dbufs
, int flag
)
914 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
915 rec
->private_data
, struct db_ctdb_rec
);
918 status
= db_ctdb_ltdb_store(crec
->ctdb_ctx
, rec
->key
, &(crec
->header
),
925 static NTSTATUS
db_ctdb_send_schedule_for_deletion(struct db_record
*rec
)
927 NTSTATUS status
= NT_STATUS_OK
;
929 struct ctdb_control_schedule_for_deletion
*dd
;
932 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
933 rec
->private_data
, struct db_ctdb_rec
);
934 struct db_ctdb_ctx
*ctx
= crec
->ctdb_ctx
;
936 indata
.dsize
= offsetof(struct ctdb_control_schedule_for_deletion
, key
) + rec
->key
.dsize
;
937 indata
.dptr
= talloc_zero_array(crec
, uint8_t, indata
.dsize
);
938 if (indata
.dptr
== NULL
) {
939 DEBUG(0, (__location__
" talloc failed!\n"));
940 return NT_STATUS_NO_MEMORY
;
943 dd
= (struct ctdb_control_schedule_for_deletion
*)(void *)indata
.dptr
;
944 dd
->db_id
= ctx
->db_id
;
945 dd
->hdr
= crec
->header
;
946 dd
->keylen
= rec
->key
.dsize
;
947 memcpy(dd
->key
, rec
->key
.dptr
, rec
->key
.dsize
);
949 ret
= ctdbd_control_local(messaging_ctdb_connection(),
950 CTDB_CONTROL_SCHEDULE_FOR_DELETION
,
951 crec
->ctdb_ctx
->db_id
,
952 CTDB_CTRL_FLAG_NOREPLY
, /* flags */
957 talloc_free(indata
.dptr
);
959 if ((ret
!= 0) || cstatus
!= 0) {
960 DEBUG(1, (__location__
" Error sending local control "
961 "SCHEDULE_FOR_DELETION: %s, cstatus = %"PRIi32
"\n",
962 strerror(ret
), cstatus
));
964 status
= map_nt_error_from_unix(ret
);
966 status
= NT_STATUS_UNSUCCESSFUL
;
973 static NTSTATUS
db_ctdb_delete(struct db_record
*rec
)
978 * We have to store the header with empty data. TODO: Fix the
982 status
= db_ctdb_storev(rec
, &tdb_null
, 1, 0);
983 if (!NT_STATUS_IS_OK(status
)) {
987 status
= db_ctdb_send_schedule_for_deletion(rec
);
991 static int db_ctdb_record_destr(struct db_record
* data
)
993 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
994 data
->private_data
, struct db_ctdb_rec
);
997 struct timeval before
;
1000 DEBUG(10, (DEBUGLEVEL
> 10
1001 ? "Unlocking db %u key %s\n"
1002 : "Unlocking db %u key %.20s\n",
1003 (int)crec
->ctdb_ctx
->db_id
,
1004 hex_encode_talloc(data
, (unsigned char *)data
->key
.dptr
,
1007 before
= timeval_current();
1009 ret
= tdb_chainunlock(crec
->ctdb_ctx
->wtdb
->tdb
, data
->key
);
1011 timediff
= timeval_elapsed(&before
);
1012 timediff
*= 1000; /* get us milliseconds */
1014 if (timediff
> crec
->ctdb_ctx
->warn_unlock_msecs
) {
1016 key
= hex_encode_talloc(talloc_tos(),
1017 (unsigned char *)data
->key
.dptr
,
1019 DEBUG(0, ("tdb_chainunlock on db %s, key %s took %f milliseconds\n",
1020 tdb_name(crec
->ctdb_ctx
->wtdb
->tdb
), key
,
1026 DEBUG(0, ("tdb_chainunlock failed\n"));
1030 threshold
= crec
->ctdb_ctx
->warn_locktime_msecs
;
1031 if (threshold
!= 0) {
1032 timediff
= timeval_elapsed(&crec
->lock_time
) * 1000;
1033 if (timediff
> threshold
) {
1036 key
= hex_encode_talloc(data
,
1037 (unsigned char *)data
->key
.dptr
,
1039 DEBUG(0, ("Held tdb lock on db %s, key %s "
1040 "%f milliseconds\n",
1041 tdb_name(crec
->ctdb_ctx
->wtdb
->tdb
),
1050 * Check whether we have a valid local copy of the given record,
1051 * either for reading or for writing.
1053 static bool db_ctdb_can_use_local_hdr(const struct ctdb_ltdb_header
*hdr
,
1054 uint32_t my_vnn
, bool read_only
)
1056 if (hdr
->dmaster
!= my_vnn
) {
1057 /* If we're not dmaster, it must be r/o copy. */
1058 return read_only
&& (hdr
->flags
& CTDB_REC_RO_HAVE_READONLY
);
1062 * If we want write access, no one may have r/o copies.
1064 return read_only
|| !(hdr
->flags
& CTDB_REC_RO_HAVE_DELEGATIONS
);
1067 static bool db_ctdb_can_use_local_copy(TDB_DATA ctdb_data
, uint32_t my_vnn
,
1070 if (ctdb_data
.dptr
== NULL
) {
1074 if (ctdb_data
.dsize
< sizeof(struct ctdb_ltdb_header
)) {
1078 return db_ctdb_can_use_local_hdr(
1079 (struct ctdb_ltdb_header
*)ctdb_data
.dptr
, my_vnn
, read_only
);
1082 static struct db_record
*fetch_locked_internal(struct db_ctdb_ctx
*ctx
,
1083 TALLOC_CTX
*mem_ctx
,
1086 struct db_record
*result
;
1087 struct db_ctdb_rec
*crec
;
1089 int migrate_attempts
;
1090 struct timeval migrate_start
;
1091 struct timeval chainlock_start
;
1092 struct timeval ctdb_start_time
;
1093 double chainlock_time
= 0;
1094 double ctdb_time
= 0;
1099 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
1100 DEBUG(0, ("talloc failed\n"));
1104 if (!(crec
= talloc_zero(result
, struct db_ctdb_rec
))) {
1105 DEBUG(0, ("talloc failed\n"));
1106 TALLOC_FREE(result
);
1110 result
->db
= ctx
->db
;
1111 result
->private_data
= (void *)crec
;
1112 crec
->ctdb_ctx
= ctx
;
1114 result
->key
.dsize
= key
.dsize
;
1115 result
->key
.dptr
= (uint8_t *)talloc_memdup(result
, key
.dptr
,
1117 if (result
->key
.dptr
== NULL
) {
1118 DEBUG(0, ("talloc failed\n"));
1119 TALLOC_FREE(result
);
1123 migrate_attempts
= 0;
1124 GetTimeOfDay(&migrate_start
);
1127 * Do a blocking lock on the record
1131 if (DEBUGLEVEL
>= 10) {
1132 char *keystr
= hex_encode_talloc(result
, key
.dptr
, key
.dsize
);
1133 DEBUG(10, (DEBUGLEVEL
> 10
1134 ? "Locking db %u key %s\n"
1135 : "Locking db %u key %.20s\n",
1136 (int)crec
->ctdb_ctx
->db_id
, keystr
));
1137 TALLOC_FREE(keystr
);
1140 GetTimeOfDay(&chainlock_start
);
1141 lockret
= tdb_chainlock(ctx
->wtdb
->tdb
, key
);
1142 chainlock_time
+= timeval_elapsed(&chainlock_start
);
1145 DEBUG(3, ("tdb_chainlock failed\n"));
1146 TALLOC_FREE(result
);
1150 result
->storev
= db_ctdb_storev
;
1151 result
->delete_rec
= db_ctdb_delete
;
1152 talloc_set_destructor(result
, db_ctdb_record_destr
);
1154 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
1157 * See if we have a valid record and we are the dmaster. If so, we can
1158 * take the shortcut and just return it.
1161 if (!db_ctdb_can_use_local_copy(ctdb_data
, get_my_vnn(), false)) {
1162 SAFE_FREE(ctdb_data
.dptr
);
1163 tdb_chainunlock(ctx
->wtdb
->tdb
, key
);
1164 talloc_set_destructor(result
, NULL
);
1166 migrate_attempts
+= 1;
1168 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %"PRIu32
" "
1169 "(%"PRIu32
") %"PRIu32
"\n",
1170 ctdb_data
.dptr
, ctdb_data
.dptr
?
1171 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
:
1175 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->flags
: 0));
1177 GetTimeOfDay(&ctdb_start_time
);
1178 ret
= ctdbd_migrate(messaging_ctdb_connection(), ctx
->db_id
,
1180 ctdb_time
+= timeval_elapsed(&ctdb_start_time
);
1183 DEBUG(5, ("ctdbd_migrate failed: %s\n",
1185 TALLOC_FREE(result
);
1188 /* now its migrated, try again */
1194 duration
= timeval_elapsed(&migrate_start
);
1197 * Convert the duration to milliseconds to avoid a
1198 * floating-point division of
1199 * lp_parm_int("migrate_duration") by 1000.
1201 duration_msecs
= duration
* 1000;
1204 if ((migrate_attempts
> ctx
->warn_migrate_attempts
) ||
1205 (duration_msecs
> ctx
->warn_migrate_msecs
)) {
1208 if (tdb_get_flags(ctx
->wtdb
->tdb
) & TDB_INCOMPATIBLE_HASH
) {
1209 chain
= tdb_jenkins_hash(&key
) %
1210 tdb_hash_size(ctx
->wtdb
->tdb
);
1213 DEBUG(0, ("db_ctdb_fetch_locked for %s key %s, chain %d "
1214 "needed %d attempts, %d milliseconds, "
1215 "chainlock: %f ms, CTDB %f ms\n",
1216 tdb_name(ctx
->wtdb
->tdb
),
1217 hex_encode_talloc(talloc_tos(),
1218 (unsigned char *)key
.dptr
,
1221 migrate_attempts
, duration_msecs
,
1222 chainlock_time
* 1000.0,
1223 ctdb_time
* 1000.0));
1226 GetTimeOfDay(&crec
->lock_time
);
1228 memcpy(&crec
->header
, ctdb_data
.dptr
, sizeof(crec
->header
));
1230 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(crec
->header
);
1231 result
->value
.dptr
= NULL
;
1233 if (result
->value
.dsize
!= 0) {
1234 result
->value
.dptr
= talloc_memdup(
1235 result
, ctdb_data
.dptr
+ sizeof(crec
->header
),
1236 result
->value
.dsize
);
1237 if (result
->value
.dptr
== NULL
) {
1238 DBG_ERR("talloc failed\n");
1239 TALLOC_FREE(result
);
1243 result
->value_valid
= true;
1245 SAFE_FREE(ctdb_data
.dptr
);
1250 static struct db_record
*db_ctdb_fetch_locked(struct db_context
*db
,
1251 TALLOC_CTX
*mem_ctx
,
1254 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1255 struct db_ctdb_ctx
);
1257 if (ctx
->transaction
!= NULL
) {
1258 return db_ctdb_fetch_locked_transaction(ctx
, mem_ctx
, key
);
1261 if (db
->persistent
) {
1262 return db_ctdb_fetch_locked_persistent(ctx
, mem_ctx
, key
);
1265 return fetch_locked_internal(ctx
, mem_ctx
, key
);
1268 struct db_ctdb_parse_record_state
{
1269 void (*parser
)(TDB_DATA key
, TDB_DATA data
, void *private_data
);
1272 bool ask_for_readonly_copy
;
1277 static void db_ctdb_parse_record_parser(
1278 TDB_DATA key
, struct ctdb_ltdb_header
*header
,
1279 TDB_DATA data
, void *private_data
)
1281 struct db_ctdb_parse_record_state
*state
=
1282 (struct db_ctdb_parse_record_state
*)private_data
;
1283 state
->parser(key
, data
, state
->private_data
);
1286 static void db_ctdb_parse_record_parser_nonpersistent(
1287 TDB_DATA key
, struct ctdb_ltdb_header
*header
,
1288 TDB_DATA data
, void *private_data
)
1290 struct db_ctdb_parse_record_state
*state
=
1291 (struct db_ctdb_parse_record_state
*)private_data
;
1293 if (db_ctdb_can_use_local_hdr(header
, state
->my_vnn
, true)) {
1295 * A record consisting only of the ctdb header can be
1296 * a validly created empty record or a tombstone
1297 * record of a deleted record (not vacuumed yet). Mark
1300 state
->empty_record
= (data
.dsize
== 0);
1301 if (!state
->empty_record
) {
1302 state
->parser(key
, data
, state
->private_data
);
1307 * We found something in the db, so it seems that this record,
1308 * while not usable locally right now, is popular. Ask for a
1311 state
->ask_for_readonly_copy
= true;
1315 static NTSTATUS
db_ctdb_try_parse_local_record(struct db_ctdb_ctx
*ctx
,
1317 struct db_ctdb_parse_record_state
*state
)
1321 if (ctx
->transaction
!= NULL
) {
1322 struct db_ctdb_transaction_handle
*h
= ctx
->transaction
;
1326 * Transactions only happen for persistent db's.
1329 found
= parse_newest_in_marshall_buffer(
1330 h
->m_write
, key
, db_ctdb_parse_record_parser
, state
);
1333 return NT_STATUS_OK
;
1337 if (ctx
->db
->persistent
) {
1339 * Persistent db, but not found in the transaction buffer
1341 return db_ctdb_ltdb_parse(
1342 ctx
, key
, db_ctdb_parse_record_parser
, state
);
1345 state
->done
= false;
1346 state
->ask_for_readonly_copy
= false;
1348 status
= db_ctdb_ltdb_parse(
1349 ctx
, key
, db_ctdb_parse_record_parser_nonpersistent
, state
);
1350 if (NT_STATUS_IS_OK(status
) && state
->done
) {
1351 if (state
->empty_record
) {
1353 * We know authoritatively, that this is an empty
1354 * record. Since ctdb does not distinguish between empty
1355 * and deleted records, this can be a record stored as
1356 * empty or a not-yet-vacuumed tombstone record of a
1357 * deleted record. Now Samba right now can live without
1358 * empty records, so we can safely report this record
1361 * See bugs 10008 and 12005.
1363 return NT_STATUS_NOT_FOUND
;
1365 return NT_STATUS_OK
;
1368 return NT_STATUS_MORE_PROCESSING_REQUIRED
;
1371 static NTSTATUS
db_ctdb_parse_record(struct db_context
*db
, TDB_DATA key
,
1372 void (*parser
)(TDB_DATA key
,
1374 void *private_data
),
1377 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(
1378 db
->private_data
, struct db_ctdb_ctx
);
1379 struct db_ctdb_parse_record_state state
;
1383 state
.parser
= parser
;
1384 state
.private_data
= private_data
;
1385 state
.my_vnn
= get_my_vnn();
1386 state
.empty_record
= false;
1388 status
= db_ctdb_try_parse_local_record(ctx
, key
, &state
);
1389 if (!NT_STATUS_EQUAL(status
, NT_STATUS_MORE_PROCESSING_REQUIRED
)) {
1393 ret
= ctdbd_parse(messaging_ctdb_connection(), ctx
->db_id
, key
,
1394 state
.ask_for_readonly_copy
, parser
, private_data
);
1396 if (ret
== ENOENT
) {
1399 * NT_STATUS_OBJECT_NAME_NOT_FOUND. Our upper
1400 * layers expect NT_STATUS_NOT_FOUND for "no
1401 * record around". We need to convert dbwrap
1402 * to 0/errno away from NTSTATUS ... :-)
1404 return NT_STATUS_NOT_FOUND
;
1406 return map_nt_error_from_unix(ret
);
1408 return NT_STATUS_OK
;
1411 static void db_ctdb_parse_record_done(struct tevent_req
*subreq
);
1413 static struct tevent_req
*db_ctdb_parse_record_send(
1414 TALLOC_CTX
*mem_ctx
,
1415 struct tevent_context
*ev
,
1416 struct db_context
*db
,
1418 void (*parser
)(TDB_DATA key
,
1420 void *private_data
),
1422 enum dbwrap_req_state
*req_state
)
1424 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(
1425 db
->private_data
, struct db_ctdb_ctx
);
1426 struct tevent_req
*req
= NULL
;
1427 struct tevent_req
*subreq
= NULL
;
1428 struct db_ctdb_parse_record_state
*state
= NULL
;
1431 req
= tevent_req_create(mem_ctx
, &state
,
1432 struct db_ctdb_parse_record_state
);
1434 *req_state
= DBWRAP_REQ_ERROR
;
1439 *state
= (struct db_ctdb_parse_record_state
) {
1441 .private_data
= private_data
,
1442 .my_vnn
= get_my_vnn(),
1443 .empty_record
= false,
1446 status
= db_ctdb_try_parse_local_record(ctx
, key
, state
);
1447 if (!NT_STATUS_EQUAL(status
, NT_STATUS_MORE_PROCESSING_REQUIRED
)) {
1448 if (tevent_req_nterror(req
, status
)) {
1449 *req_state
= DBWRAP_REQ_ERROR
;
1450 return tevent_req_post(req
, ev
);
1452 *req_state
= DBWRAP_REQ_DONE
;
1453 tevent_req_done(req
);
1454 return tevent_req_post(req
, ev
);
1457 subreq
= ctdbd_parse_send(state
,
1459 ctdb_async_ctx
.async_conn
,
1462 state
->ask_for_readonly_copy
,
1466 if (tevent_req_nomem(subreq
, req
)) {
1467 *req_state
= DBWRAP_REQ_ERROR
;
1468 return tevent_req_post(req
, ev
);
1470 tevent_req_set_callback(subreq
, db_ctdb_parse_record_done
, req
);
1475 static void db_ctdb_parse_record_done(struct tevent_req
*subreq
)
1477 struct tevent_req
*req
= tevent_req_callback_data(
1478 subreq
, struct tevent_req
);
1481 ret
= ctdbd_parse_recv(subreq
);
1482 TALLOC_FREE(subreq
);
1484 if (ret
== ENOENT
) {
1486 * This maps to NT_STATUS_OBJECT_NAME_NOT_FOUND. Our
1487 * upper layers expect NT_STATUS_NOT_FOUND for "no
1488 * record around". We need to convert dbwrap to 0/errno
1489 * away from NTSTATUS ... :-)
1491 tevent_req_nterror(req
, NT_STATUS_NOT_FOUND
);
1494 tevent_req_nterror(req
, map_nt_error_from_unix(ret
));
1498 tevent_req_done(req
);
1502 static NTSTATUS
db_ctdb_parse_record_recv(struct tevent_req
*req
)
1504 return tevent_req_simple_recv_ntstatus(req
);
1507 struct traverse_state
{
1508 struct db_context
*db
;
1509 int (*fn
)(struct db_record
*rec
, void *private_data
);
1514 static void traverse_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1516 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1517 struct db_record
*rec
= NULL
;
1518 TALLOC_CTX
*tmp_ctx
= NULL
;
1520 tmp_ctx
= talloc_new(state
->db
);
1521 if (tmp_ctx
== NULL
) {
1522 DBG_ERR("talloc_new failed\n");
1526 /* we have to give them a locked record to prevent races */
1527 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, key
);
1528 if (rec
!= NULL
&& rec
->value
.dsize
> 0) {
1529 state
->fn(rec
, state
->private_data
);
1532 talloc_free(tmp_ctx
);
1535 static int traverse_persistent_callback(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1538 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1539 struct db_record
*rec
;
1540 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
1544 * Skip the __db_sequence_number__ key:
1545 * This is used for persistent transactions internally.
1547 if (kbuf
.dsize
== strlen(CTDB_DB_SEQNUM_KEY
) + 1 &&
1548 strcmp((const char*)kbuf
.dptr
, CTDB_DB_SEQNUM_KEY
) == 0)
1553 /* we have to give them a locked record to prevent races */
1554 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, kbuf
);
1555 if (rec
&& rec
->value
.dsize
> 0) {
1556 ret
= state
->fn(rec
, state
->private_data
);
1560 talloc_free(tmp_ctx
);
1564 /* wrapper to use traverse_persistent_callback with dbwrap */
1565 static int traverse_persistent_callback_dbwrap(struct db_record
*rec
, void* data
)
1567 return traverse_persistent_callback(NULL
, rec
->key
, rec
->value
, data
);
1570 static int db_ctdbd_traverse(uint32_t db_id
,
1571 void (*fn
)(TDB_DATA key
, TDB_DATA data
,
1572 void *private_data
),
1575 struct ctdbd_connection
*conn
;
1579 ret
= ctdbd_init_connection(talloc_tos(), lp_ctdbd_socket(),
1580 lp_ctdb_timeout(), &conn
);
1583 DBG_WARNING("ctdbd_init_connection failed: %s\n",
1588 ret
= ctdbd_traverse(conn
, db_id
, fn
, private_data
);
1592 DBG_WARNING("ctdbd_traverse failed: %s\n",
1601 static int db_ctdb_traverse(struct db_context
*db
,
1602 int (*fn
)(struct db_record
*rec
,
1603 void *private_data
),
1607 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1608 struct db_ctdb_ctx
);
1609 struct traverse_state state
;
1611 state
= (struct traverse_state
) {
1614 .private_data
= private_data
,
1617 if (db
->persistent
) {
1618 struct tdb_context
*ltdb
= ctx
->wtdb
->tdb
;
1620 /* for persistent databases we don't need to do a ctdb traverse,
1621 we can do a faster local traverse */
1622 ret
= tdb_traverse(ltdb
, traverse_persistent_callback
, &state
);
1626 if (ctx
->transaction
&& ctx
->transaction
->m_write
) {
1628 * we now have to handle keys not yet
1629 * present at transaction start
1631 struct db_context
*newkeys
= db_open_rbt(talloc_tos());
1632 struct ctdb_marshall_buffer
*mbuf
= ctx
->transaction
->m_write
;
1633 struct ctdb_rec_data_old
*rec
=NULL
;
1638 if (newkeys
== NULL
) {
1642 for (i
=0; i
<mbuf
->count
; i
++) {
1644 rec
= db_ctdb_marshall_loop_next_key(
1646 SMB_ASSERT(rec
!= NULL
);
1648 if (!tdb_exists(ltdb
, key
)) {
1649 dbwrap_store(newkeys
, key
, tdb_null
, 0);
1652 status
= dbwrap_traverse(newkeys
,
1653 traverse_persistent_callback_dbwrap
,
1656 talloc_free(newkeys
);
1657 if (!NT_STATUS_IS_OK(status
)) {
1665 ret
= db_ctdbd_traverse(ctx
->db_id
, traverse_callback
, &state
);
1672 static NTSTATUS
db_ctdb_storev_deny(struct db_record
*rec
,
1673 const TDB_DATA
*dbufs
, int num_dbufs
, int flag
)
1675 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1678 static NTSTATUS
db_ctdb_delete_deny(struct db_record
*rec
)
1680 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
1683 static void traverse_read_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
1685 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1686 struct db_record rec
;
1692 rec
.storev
= db_ctdb_storev_deny
;
1693 rec
.delete_rec
= db_ctdb_delete_deny
;
1694 rec
.private_data
= NULL
;
1695 rec
.value_valid
= true;
1696 state
->fn(&rec
, state
->private_data
);
1700 static int traverse_persistent_callback_read(TDB_CONTEXT
*tdb
, TDB_DATA kbuf
, TDB_DATA dbuf
,
1703 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
1704 struct db_record rec
;
1707 * Skip the __db_sequence_number__ key:
1708 * This is used for persistent transactions internally.
1710 if (kbuf
.dsize
== strlen(CTDB_DB_SEQNUM_KEY
) + 1 &&
1711 strcmp((const char*)kbuf
.dptr
, CTDB_DB_SEQNUM_KEY
) == 0)
1720 rec
.value_valid
= true;
1721 rec
.storev
= db_ctdb_storev_deny
;
1722 rec
.delete_rec
= db_ctdb_delete_deny
;
1723 rec
.private_data
= NULL
;
1725 if (rec
.value
.dsize
<= sizeof(struct ctdb_ltdb_header
)) {
1726 /* a deleted record */
1729 rec
.value
.dsize
-= sizeof(struct ctdb_ltdb_header
);
1730 rec
.value
.dptr
+= sizeof(struct ctdb_ltdb_header
);
1733 return state
->fn(&rec
, state
->private_data
);
1736 static int db_ctdb_traverse_read(struct db_context
*db
,
1737 int (*fn
)(struct db_record
*rec
,
1738 void *private_data
),
1742 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1743 struct db_ctdb_ctx
);
1744 struct traverse_state state
;
1746 state
= (struct traverse_state
) {
1749 .private_data
= private_data
,
1752 if (db
->persistent
) {
1753 /* for persistent databases we don't need to do a ctdb traverse,
1754 we can do a faster local traverse */
1757 nrecs
= tdb_traverse_read(ctx
->wtdb
->tdb
,
1758 traverse_persistent_callback_read
,
1766 ret
= db_ctdbd_traverse(ctx
->db_id
, traverse_read_callback
, &state
);
1773 static int db_ctdb_get_seqnum(struct db_context
*db
)
1775 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
1776 struct db_ctdb_ctx
);
1777 return tdb_get_seqnum(ctx
->wtdb
->tdb
);
1780 static size_t db_ctdb_id(struct db_context
*db
, uint8_t *id
, size_t idlen
)
1782 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(
1783 db
->private_data
, struct db_ctdb_ctx
);
1785 if (idlen
>= sizeof(ctx
->db_id
)) {
1786 memcpy(id
, &ctx
->db_id
, sizeof(ctx
->db_id
));
1789 return sizeof(ctx
->db_id
);
1792 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
1793 struct messaging_context
*msg_ctx
,
1795 int hash_size
, int tdb_flags
,
1796 int open_flags
, mode_t mode
,
1797 enum dbwrap_lock_order lock_order
,
1798 uint64_t dbwrap_flags
)
1800 struct db_context
*result
;
1801 struct db_ctdb_ctx
*db_ctdb
;
1803 struct loadparm_context
*lp_ctx
;
1805 TDB_DATA outdata
= {0};
1806 bool persistent
= (tdb_flags
& TDB_CLEAR_IF_FIRST
) == 0;
1810 if (!lp_clustering()) {
1811 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1815 if (!(result
= talloc_zero(mem_ctx
, struct db_context
))) {
1816 DEBUG(0, ("talloc failed\n"));
1817 TALLOC_FREE(result
);
1821 if (!(db_ctdb
= talloc(result
, struct db_ctdb_ctx
))) {
1822 DEBUG(0, ("talloc failed\n"));
1823 TALLOC_FREE(result
);
1827 result
->name
= talloc_strdup(result
, name
);
1828 if (result
->name
== NULL
) {
1829 DEBUG(0, ("talloc failed\n"));
1830 TALLOC_FREE(result
);
1834 db_ctdb
->transaction
= NULL
;
1835 db_ctdb
->db
= result
;
1837 ret
= ctdbd_db_attach(messaging_ctdb_connection(), name
,
1838 &db_ctdb
->db_id
, persistent
);
1840 DEBUG(0, ("ctdbd_db_attach failed for %s: %s\n", name
,
1842 TALLOC_FREE(result
);
1846 if (tdb_flags
& TDB_SEQNUM
) {
1847 data
.dptr
= (uint8_t *)&db_ctdb
->db_id
;
1848 data
.dsize
= sizeof(db_ctdb
->db_id
);
1850 ret
= ctdbd_control_local(messaging_ctdb_connection(),
1851 CTDB_CONTROL_ENABLE_SEQNUM
,
1853 NULL
, NULL
, &cstatus
);
1854 if ((ret
!= 0) || cstatus
!= 0) {
1855 DBG_ERR("ctdb_control for enable seqnum "
1856 "failed: %s\n", strerror(ret
));
1857 TALLOC_FREE(result
);
1862 db_path
= ctdbd_dbpath(messaging_ctdb_connection(), db_ctdb
,
1864 if (db_path
== NULL
) {
1865 DBG_ERR("ctdbd_dbpath failed\n");
1866 TALLOC_FREE(result
);
1870 result
->persistent
= persistent
;
1871 result
->lock_order
= lock_order
;
1873 data
.dptr
= (uint8_t *)&db_ctdb
->db_id
;
1874 data
.dsize
= sizeof(db_ctdb
->db_id
);
1876 ret
= ctdbd_control_local(messaging_ctdb_connection(),
1877 CTDB_CONTROL_DB_OPEN_FLAGS
,
1878 0, 0, data
, NULL
, &outdata
, &cstatus
);
1880 DBG_ERR(" ctdb control for db_open_flags "
1881 "failed: %s\n", strerror(ret
));
1882 TALLOC_FREE(result
);
1886 if (cstatus
!= 0 || outdata
.dsize
!= sizeof(int)) {
1887 DBG_ERR("ctdb_control for db_open_flags failed\n");
1888 TALLOC_FREE(outdata
.dptr
);
1889 TALLOC_FREE(result
);
1893 tdb_flags
= *(int *)outdata
.dptr
;
1894 TALLOC_FREE(outdata
.dptr
);
1896 if (!result
->persistent
) {
1897 ret
= ctdb_async_ctx_init(NULL
, messaging_tevent_context(msg_ctx
));
1899 DBG_ERR("ctdb_async_ctx_init failed: %s\n", strerror(ret
));
1900 TALLOC_FREE(result
);
1905 if (!result
->persistent
&&
1906 (dbwrap_flags
& DBWRAP_FLAG_OPTIMIZE_READONLY_ACCESS
))
1910 indata
= make_tdb_data((uint8_t *)&db_ctdb
->db_id
,
1911 sizeof(db_ctdb
->db_id
));
1913 ret
= ctdbd_control_local(
1914 messaging_ctdb_connection(),
1915 CTDB_CONTROL_SET_DB_READONLY
, 0, 0,
1916 indata
, NULL
, NULL
, &cstatus
);
1917 if ((ret
!= 0) || (cstatus
!= 0)) {
1918 DEBUG(1, ("CTDB_CONTROL_SET_DB_READONLY failed: "
1919 "%s, %"PRIi32
"\n", strerror(ret
), cstatus
));
1920 TALLOC_FREE(result
);
1925 lp_ctx
= loadparm_init_s3(db_path
, loadparm_s3_helpers());
1927 if (hash_size
== 0) {
1928 hash_size
= lpcfg_tdb_hash_size(lp_ctx
, db_path
);
1931 db_ctdb
->wtdb
= tdb_wrap_open(db_ctdb
, db_path
, hash_size
,
1932 lpcfg_tdb_flags(lp_ctx
, tdb_flags
),
1934 talloc_unlink(db_path
, lp_ctx
);
1935 if (db_ctdb
->wtdb
== NULL
) {
1936 DEBUG(0, ("Could not open tdb %s: %s\n", db_path
, strerror(errno
)));
1937 TALLOC_FREE(result
);
1940 talloc_free(db_path
);
1942 /* honor permissions if user has specified O_CREAT */
1943 if (open_flags
& O_CREAT
) {
1945 fd
= tdb_fd(db_ctdb
->wtdb
->tdb
);
1946 ret
= fchmod(fd
, mode
);
1948 DBG_WARNING("fchmod failed: %s\n",
1950 TALLOC_FREE(result
);
1955 if (result
->persistent
) {
1956 db_ctdb
->lock_ctx
= g_lock_ctx_init(db_ctdb
, msg_ctx
);
1957 if (db_ctdb
->lock_ctx
== NULL
) {
1958 DEBUG(0, ("g_lock_ctx_init failed\n"));
1959 TALLOC_FREE(result
);
1964 db_ctdb
->warn_unlock_msecs
= lp_parm_int(-1, "ctdb",
1965 "unlock_warn_threshold", 5);
1966 db_ctdb
->warn_migrate_attempts
= lp_parm_int(-1, "ctdb",
1967 "migrate_attempts", 10);
1968 db_ctdb
->warn_migrate_msecs
= lp_parm_int(-1, "ctdb",
1969 "migrate_duration", 5000);
1970 db_ctdb
->warn_locktime_msecs
= lp_ctdb_locktime_warn_threshold();
1972 result
->private_data
= (void *)db_ctdb
;
1973 result
->fetch_locked
= db_ctdb_fetch_locked
;
1974 result
->parse_record
= db_ctdb_parse_record
;
1975 result
->parse_record_send
= db_ctdb_parse_record_send
;
1976 result
->parse_record_recv
= db_ctdb_parse_record_recv
;
1977 result
->traverse
= db_ctdb_traverse
;
1978 result
->traverse_read
= db_ctdb_traverse_read
;
1979 result
->get_seqnum
= db_ctdb_get_seqnum
;
1980 result
->transaction_start
= db_ctdb_transaction_start
;
1981 result
->transaction_commit
= db_ctdb_transaction_commit
;
1982 result
->transaction_cancel
= db_ctdb_transaction_cancel
;
1983 result
->id
= db_ctdb_id
;
1985 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1986 name
, db_ctdb
->db_id
));