2 Unix SMB/CIFS Implementation.
3 DSDB replication service outgoing Pull-Replication
5 Copyright (C) Stefan Metzmacher 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 #include "dsdb/samdb/samdb.h"
24 #include "auth/auth.h"
25 #include "samba/service.h"
26 #include "lib/events/events.h"
27 #include "dsdb/repl/drepl_service.h"
28 #include <ldb_errors.h>
29 #include "../lib/util/dlinklist.h"
30 #include "librpc/gen_ndr/ndr_misc.h"
31 #include "librpc/gen_ndr/ndr_drsuapi.h"
32 #include "librpc/gen_ndr/ndr_drsblobs.h"
33 #include "libcli/composite/composite.h"
34 #include "libcli/security/security.h"
37 #define DBGC_CLASS DBGC_DRS_REPL
40 update repsFrom/repsTo error information
42 void drepl_reps_update(struct dreplsrv_service
*s
, const char *reps_attr
,
44 struct GUID
*source_dsa_obj_guid
, WERROR status
)
46 struct repsFromToBlob
*reps
;
49 TALLOC_CTX
*tmp_ctx
= talloc_new(s
);
54 unix_to_nt_time(&now
, t
);
56 werr
= dsdb_loadreps(s
->samdb
, tmp_ctx
, dn
, reps_attr
, &reps
, &count
);
57 if (!W_ERROR_IS_OK(werr
)) {
62 for (i
=0; i
<count
; i
++) {
63 if (GUID_equal(source_dsa_obj_guid
,
64 &reps
[i
].ctr
.ctr1
.source_dsa_obj_guid
)) {
70 /* no record to update */
75 /* only update the status fields */
76 reps
[i
].ctr
.ctr1
.last_attempt
= now
;
77 reps
[i
].ctr
.ctr1
.result_last_attempt
= status
;
78 if (W_ERROR_IS_OK(status
)) {
79 reps
[i
].ctr
.ctr1
.last_success
= now
;
80 reps
[i
].ctr
.ctr1
.consecutive_sync_failures
= 0;
82 reps
[i
].ctr
.ctr1
.consecutive_sync_failures
++;
85 werr
= dsdb_savereps(s
->samdb
, tmp_ctx
, dn
, reps_attr
, reps
, count
);
86 if (!W_ERROR_IS_OK(werr
)) {
87 DEBUG(2,("drepl_reps_update: Failed to save %s for %s: %s\n",
88 reps_attr
, ldb_dn_get_linearized(dn
), win_errstr(werr
)));
93 WERROR
dreplsrv_schedule_partition_pull_source(struct dreplsrv_service
*s
,
94 struct dreplsrv_partition_source_dsa
*source
,
96 enum drsuapi_DsExtendedOperation extended_op
,
98 dreplsrv_extended_callback_t callback
,
101 struct dreplsrv_out_operation
*op
;
103 op
= talloc_zero(s
, struct dreplsrv_out_operation
);
104 W_ERROR_HAVE_NO_MEMORY(op
);
108 * source may either be the long-term list of partners, or
109 * from dreplsrv_partition_source_dsa_temporary(). Because it
110 * can be either, we can't talloc_steal() it here, so we
111 * instead we reference it.
113 * We never talloc_free() the p->sources pointers - indeed we
114 * never remove them - and the temp source will otherwise go
115 * away with the msg it is allocated on.
117 * Finally the pointer created in drepl_request_extended_op()
118 * is removed with talloc_unlink().
121 op
->source_dsa
= talloc_reference(op
, source
);
122 if (!op
->source_dsa
) {
123 return WERR_NOT_ENOUGH_MEMORY
;
126 op
->options
= options
;
127 op
->extended_op
= extended_op
;
128 op
->fsmo_info
= fsmo_info
;
129 op
->callback
= callback
;
130 op
->cb_data
= cb_data
;
131 op
->schedule_time
= time(NULL
);
134 DLIST_ADD_END(s
->ops
.pending
, op
);
139 static WERROR
dreplsrv_schedule_partition_pull(struct dreplsrv_service
*s
,
140 struct dreplsrv_partition
*p
,
144 struct dreplsrv_partition_source_dsa
*cur
;
146 for (cur
= p
->sources
; cur
; cur
= cur
->next
) {
147 status
= dreplsrv_schedule_partition_pull_source(s
, cur
,
148 0, DRSUAPI_EXOP_NONE
, 0,
150 W_ERROR_NOT_OK_RETURN(status
);
156 WERROR
dreplsrv_schedule_pull_replication(struct dreplsrv_service
*s
, TALLOC_CTX
*mem_ctx
)
159 struct dreplsrv_partition
*p
;
161 for (p
= s
->partitions
; p
; p
= p
->next
) {
162 status
= dreplsrv_schedule_partition_pull(s
, p
, mem_ctx
);
163 W_ERROR_NOT_OK_RETURN(status
);
170 static void dreplsrv_pending_op_callback(struct tevent_req
*subreq
)
172 struct dreplsrv_out_operation
*op
= tevent_req_callback_data(subreq
,
173 struct dreplsrv_out_operation
);
174 struct repsFromTo1
*rf
= op
->source_dsa
->repsFrom1
;
175 struct dreplsrv_service
*s
= op
->service
;
178 werr
= dreplsrv_op_pull_source_recv(subreq
);
181 DEBUG(4,("dreplsrv_op_pull_source(%s) for %s\n", win_errstr(werr
),
182 ldb_dn_get_linearized(op
->source_dsa
->partition
->dn
)));
184 if (op
->extended_op
== DRSUAPI_EXOP_NONE
) {
185 drepl_reps_update(s
, "repsFrom", op
->source_dsa
->partition
->dn
,
186 &rf
->source_dsa_obj_guid
, werr
);
190 op
->callback(s
, werr
, op
->extended_ret
, op
->cb_data
);
193 s
->ops
.current
= NULL
;
194 dreplsrv_run_pending_ops(s
);
197 void dreplsrv_run_pull_ops(struct dreplsrv_service
*s
)
199 struct dreplsrv_out_operation
*op
;
202 struct tevent_req
*subreq
;
205 if (s
->ops
.n_current
|| s
->ops
.current
) {
206 /* if there's still one running, we're done */
210 if (!s
->ops
.pending
) {
211 /* if there're no pending operations, we're done */
216 unix_to_nt_time(&now
, t
);
220 DLIST_REMOVE(s
->ops
.pending
, op
);
222 op
->source_dsa
->repsFrom1
->last_attempt
= now
;
224 /* check if inbound replication is enabled */
225 if (!(op
->options
& DRSUAPI_DRS_SYNC_FORCED
)) {
226 uint32_t rep_options
;
227 if (samdb_ntds_options(op
->service
->samdb
, &rep_options
) != LDB_SUCCESS
) {
228 werr
= WERR_DS_DRA_INTERNAL_ERROR
;
232 if ((rep_options
& DS_NTDSDSA_OPT_DISABLE_INBOUND_REPL
)) {
233 werr
= WERR_DS_DRA_SINK_DISABLED
;
238 subreq
= dreplsrv_op_pull_source_send(op
, s
->task
->event_ctx
, op
);
240 werr
= WERR_NOT_ENOUGH_MEMORY
;
244 tevent_req_set_callback(subreq
, dreplsrv_pending_op_callback
, op
);
248 if (op
->extended_op
== DRSUAPI_EXOP_NONE
) {
249 drepl_reps_update(s
, "repsFrom", op
->source_dsa
->partition
->dn
,
250 &op
->source_dsa
->repsFrom1
->source_dsa_obj_guid
, werr
);
252 /* unblock queue processing */
253 s
->ops
.current
= NULL
;
255 * let the callback do its job just like in any other failure situation
258 op
->callback(s
, werr
, op
->extended_ret
, op
->cb_data
);