2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
28 #include "elliptics.h"
29 #include "elliptics/interface.h"
31 static char dnet_check_tmp_dir
[] = "/dev/shm";
33 static int dnet_merge_remove_local(struct dnet_node
*n
, struct dnet_id
*id
, int full_process
)
35 char buf
[sizeof(struct dnet_cmd
)];
37 struct dnet_net_state
*base
;
40 memset(buf
, 0, sizeof(buf
));
42 cmd
= (struct dnet_cmd
*)buf
;
44 memcpy(&cmd
->id
, id
, sizeof(struct dnet_id
));
47 cmd
->cmd
= DNET_CMD_DEL
;
49 cmd
->flags
= DNET_ATTR_DELETE_HISTORY
;
51 base
= dnet_node_state(n
);
53 err
= dnet_process_meta(base
, cmd
, NULL
);
60 static int dnet_dump_meta_container(struct dnet_node
*n
, struct dnet_meta_container
*mc
)
64 char id_str
[DNET_ID_SIZE
*2+1];
66 snprintf(file
, sizeof(file
), "%s/%s.meta", dnet_check_tmp_dir
, dnet_dump_id_len_raw(mc
->id
.id
, DNET_ID_SIZE
, id_str
));
68 fd
= open(file
, O_RDWR
| O_TRUNC
| O_CREAT
| O_CLOEXEC
, 0644);
71 dnet_log_raw(n
, DNET_LOG_ERROR
, "Failed to open meta container file '%s': %s\n",
72 file
, strerror(errno
));
76 err
= write(fd
, mc
->data
, mc
->size
);
77 if (err
!= (int)mc
->size
) {
79 dnet_log_raw(n
, DNET_LOG_ERROR
, "Failed to write meta container into '%s': %s\n",
80 file
, strerror(errno
));
91 static int dnet_check_find_groups(struct dnet_node
*n
, struct dnet_meta_container
*mc
, int **groupsp
)
97 m
= dnet_meta_search(n
, mc
, DNET_META_GROUPS
);
99 dnet_log_raw(n
, DNET_LOG_ERROR
, "%s: failed to find groups metadata.\n", dnet_dump_id(&mc
->id
));
104 groups
= malloc(m
->size
);
109 memcpy(groups
, m
->data
, m
->size
);
111 num
= m
->size
/ sizeof(int32_t);
113 for (i
=0; i
<num
; ++i
) {
114 dnet_log_raw(n
, DNET_LOG_DEBUG
, "%s: group: %d\n", dnet_dump_id(&mc
->id
), groups
[i
]);
122 dnet_dump_meta_container(n
, mc
);
126 /*static int dnet_bulk_db_check_update(struct dnet_node *n, struct dnet_meta_container *mc_array, int *rec_num,
127 struct dnet_meta_container *mc, int final)
130 int64_t rec_processed;
131 KCREC recs[DNET_BULK_META_UPD_SIZE];
134 if (!rec_num || *rec_num > DNET_BULK_META_UPD_SIZE) {
141 dnet_log_raw(n, DNET_LOG_ERROR, "CHECK: mc should be passed\n");
144 memcpy(&mc_array[*rec_num].id, &mc->id, sizeof(struct dnet_id));
145 mc_array[*rec_num].size = mc->size;
146 // Allocate extra memory for potential META_CHECK_STATUS structure
147 mc_array[*rec_num].data = malloc(mc->size + sizeof(struct dnet_meta) + sizeof(struct dnet_meta_check_status));
148 if (!mc_array[*rec_num].data) {
152 memcpy(mc_array[*rec_num].data, mc->data, mc->size);
156 if (*rec_num == DNET_BULK_META_UPD_SIZE || (final && *rec_num > 0)) {
157 err = kcdbbegintran(n->meta, 0);
159 err = -kcdbecode(n->meta);
160 dnet_log_raw(n, DNET_LOG_ERROR, "CHECK: DB: failed to start %s transaction, err: %d: %s.\n",
161 "meta", err, kcecodename(-err));
164 for (rec_iter = 0; rec_iter < *rec_num; ++rec_iter) {
165 err = dnet_db_check_update(n, &mc_array[rec_iter]);
166 recs[rec_iter].key.size = DNET_ID_SIZE;
167 recs[rec_iter].key.buf = (char *)mc_array[rec_iter].id.id;
168 recs[rec_iter].value.size = mc_array[rec_iter].size;
169 recs[rec_iter].value.buf = mc_array[rec_iter].data;
171 rec_processed = kcdbsetbulk(n->meta, recs, *rec_num, 0);
172 if ((int)rec_processed != *rec_num) {
173 err = -kcdbecode(n->meta);
174 dnet_log_raw(n, DNET_LOG_ERROR, "CHECK: DB: failed to set check update stamps, %d records processed, err: %d: %s.\n",
175 (int)rec_processed, err, kcecodename(-err));
176 kcdbendtran(n->meta, 0);
180 kcdbendtran(n->meta, 1);
181 for (rec_iter = 0; rec_iter < *rec_num; ++rec_iter) {
182 free(mc_array[rec_iter].data);
192 int dnet_cmd_bulk_check(struct dnet_net_state
*orig
, struct dnet_cmd
*cmd
, void *data
)
194 struct dnet_bulk_id
*ids
= (struct dnet_bulk_id
*)data
;
195 struct dnet_meta_container mc
;
196 struct dnet_meta_update mu
;
202 if (!(cmd
->size
% sizeof(struct dnet_bulk_id
))) {
203 num
= cmd
->size
/ sizeof(struct dnet_bulk_id
);
205 dnet_log(orig
->n
, DNET_LOG_DEBUG
, "BULK: received %d entries\n", num
);
207 for (i
= 0; i
< num
; ++i
) {
209 /* Send empty reply every DNET_BULK_CHECK_PING records to prevent timeout */
210 if (i
% DNET_BULK_CHECK_PING
== 0 && i
> 0) {
211 dnet_send_reply(orig
, cmd
, NULL
, 0, 1);
214 dnet_log(orig
->n
, DNET_LOG_DEBUG
, "BULK: processing ID %s\n", dnet_dump_id_str(ids
[i
].id
.id
));
216 dnet_setup_id(&mc
.id
, 0, ids
[i
].id
.id
);
217 err
= orig
->n
->cb
->meta_read(orig
->n
->cb
->command_private
, &ids
[i
].id
, &mc
.data
);
220 dnet_log(orig
->n
, DNET_LOG_DEBUG
, "BULK: %d bytes of metadata found, searching for META_UPDATE group_id=%d\n",
221 mc
.size
, orig
->n
->st
->idc
->group
->group_id
);
222 if (dnet_get_meta_update(orig
->n
, &mc
, &mu
))
224 dnet_convert_meta_update(&ids
[i
].last_update
);
225 dnet_log(orig
->n
, DNET_LOG_DEBUG
, "BULK: mu.tsec=%lu, mu.tnsec=%lu, mu.flags=%02lx\n",
226 (unsigned long)mu
.tm
.tsec
, (unsigned long)mu
.tm
.tnsec
, (unsigned long)mu
.flags
);
227 dnet_log(orig
->n
, DNET_LOG_DEBUG
,
228 "BULK: last_update.tsec=%lu, last_update.tnsec=%lu, last_update.flags=%02lx\n",
229 (unsigned long)ids
[i
].last_update
.tm
.tsec
, (unsigned long)ids
[i
].last_update
.tm
.tnsec
,
230 (unsigned long)ids
[i
].last_update
.flags
);
232 if ((mu
.flags
& DNET_IO_FLAGS_REMOVED
) || (mu
.tm
.tsec
< ids
[i
].last_update
.tm
.tsec
) ||
233 ((mu
.tm
.tnsec
< ids
[i
].last_update
.tm
.tnsec
) && (mu
.tm
.tsec
== ids
[i
].last_update
.tm
.tsec
))) {
236 /* File is not needed to be updated */
237 dnet_setup_id(&raw
, orig
->n
->id
.group_id
, ids
[i
].id
.id
);
238 err
= dnet_stat_local(orig
, &raw
);
240 /* File was not found in the storage */
244 err
= dnet_meta_update_check_status(orig
->n
, &mc
);
246 dnet_log(orig
->n
, DNET_LOG_ERROR
,
247 "BULK: %s: couldn't update meta CHECK_STATUS err: %d\n",
248 dnet_dump_id_str(ids
[i
].id
.id
), err
);
253 memcpy(&ids
[i
].last_update
, &mu
, sizeof(struct dnet_meta_update
));
254 dnet_convert_meta_update(&ids
[i
].last_update
);
258 /* Meta is not present - set timestamp to very old one */
259 dnet_convert_meta_update(&ids
[i
].last_update
);
260 ids
[i
].last_update
.tm
.tsec
= 1;
261 ids
[i
].last_update
.flags
= 0;
262 dnet_convert_meta_update(&ids
[i
].last_update
);
266 dnet_log(orig
->n
, DNET_LOG_ERROR
, "BULK: received corrupted data, size = %llu, sizeof(dnet_bulk_id) = %zu\n",
267 (unsigned long long)cmd
->size
, sizeof(struct dnet_bulk_id
));
272 return dnet_send_reply(orig
, cmd
, data
, sizeof(struct dnet_bulk_id
) * num
, 0);
278 struct dnet_bulk_check_priv
{
280 struct dnet_check_temp_db
*db
;
286 static int dnet_bulk_check_complete_single(struct dnet_net_state
*state
, struct dnet_bulk_id
*ids
,
287 int remote_group
, struct dnet_bulk_check_priv
*p
)
289 struct dnet_session
*s
= dnet_session_create(state
->n
);
290 struct dnet_meta_container mc
;
291 struct dnet_meta_container temp_mc
;
292 struct dnet_meta_update
*mu
;
293 struct dnet_meta
*mg
;
295 char *tmpdata
= NULL
;
296 int *groups
, group_num
= 1;
297 int err
= -EINVAL
, error
= 0;
299 int my_group
, lastest_group
= -1;
300 struct dnet_meta_update lastest_mu
, my_mu
;
301 struct timeval current_ts
;
302 int removed_in_all
= 1, updated
= 0;
305 uint64_t ioflags
= 0;
308 my_group
= state
->n
->id
.group_id
;
310 dnet_log(state
->n
, DNET_LOG_DEBUG
, "BULK: checking ID %s\n", dnet_dump_id_str(ids
->id
.id
));
314 dnet_setup_id(&mc
.id
, my_group
, ids
->id
.id
);
316 err
= state
->n
->cb
->meta_read(state
->n
->cb
->command_private
, &ids
->id
, &mc
.data
);
320 goto err_out_continue
;
324 /* Set current group meta_update as lastest_mu */
325 if (!dnet_get_meta_update(state
->n
, &mc
, &my_mu
)) {
326 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: meta_update structure doesn't exist for group %d\n",
327 dnet_dump_id_str(ids
->id
.id
), my_group
);
331 dnet_convert_meta_update(&my_mu
);
332 memcpy(&lastest_mu
, &my_mu
, sizeof(struct dnet_meta_update
));
333 lastest_group
= my_group
;
337 /* groups came from dnet_check utility */
339 group_num
= p
->group_num
;
341 mg
= dnet_meta_search(state
->n
, &mc
, DNET_META_GROUPS
);
343 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: DNET_META_GROUPS structure doesn't exist\n", dnet_dump_id_str(ids
->id
.id
));
347 dnet_convert_meta(mg
);
348 if (mg
->size
% sizeof(int)) {
349 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: DNET_META_GROUPS structure is corrupted\n", dnet_dump_id_str(ids
->id
.id
));
353 group_num
= mg
->size
/ sizeof(int);
354 groups
= (int *)mg
->data
;
355 dnet_convert_meta(mg
);
358 /* Read temporary meta */
359 temp_mc
.data
= malloc(sizeof(struct dnet_meta_update
) * group_num
);
362 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: could not allocate memory for temp UPDATE_META\n", dnet_dump_id_str(ids
->id
.id
));
365 memset(temp_mc
.data
, 0, sizeof(struct dnet_meta_update
) * group_num
);
366 temp_mc
.size
= sizeof(struct dnet_meta_update
) * group_num
;
368 err
= dnet_db_read_raw(p
->db
->b
, &ids
->id
, (void **)&tmpdata
);
370 if (err
< 0 && err
!= -2)
372 /* No data in temp meta was stored. Placing local meta_update at the beginning */
374 mu
[0].group_id
= my_group
;
376 mu
[0].flags
= my_mu
.flags
;
379 if (err
> (int)(sizeof(struct dnet_meta_update
) * group_num
)) {
380 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: too many data stored in temp meta\n", dnet_dump_id_str(ids
->id
.id
));
384 memcpy(temp_mc
.data
, tmpdata
, err
);
387 /* Update temp meta with received group */
391 for (i
= 0; i
< group_num
; ++i
) {
392 if (mu
[i
].group_id
== remote_group
) {
393 mu
[i
].tm
= ids
->last_update
.tm
;
394 mu
[i
].flags
= ids
->last_update
.flags
;
398 if (mu
[i
].group_id
== 0)
401 if (!(mu
[i
].flags
& DNET_IO_FLAGS_REMOVED
))
404 if (((mu
[i
].tm
.tsec
> mu
[lastest
].tm
.tsec
)
405 || ((mu
[i
].tm
.tsec
== mu
[lastest
].tm
.tsec
) && (mu
[i
].tm
.tnsec
> mu
[lastest
].tm
.tnsec
)))
408 lastest_group
= groups
[i
];
412 if (!updated
&& i
== group_num
) {
413 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: no space left to save group in temp meta!\n", dnet_dump_id_str(ids
->id
.id
));
418 mu
[i
].group_id
= remote_group
;
419 mu
[i
].tm
= ids
->last_update
.tm
;
420 mu
[i
].flags
= ids
->last_update
.flags
;
422 if (((mu
[i
].tm
.tsec
> mu
[lastest
].tm
.tsec
)
423 || ((mu
[i
].tm
.tsec
== mu
[lastest
].tm
.tsec
) && (mu
[i
].tm
.tnsec
> mu
[lastest
].tm
.tnsec
)))
426 lastest_group
= groups
[i
];
432 /* Not all groups processed yet */
435 err
= dnet_db_write_raw(p
->db
->b
, &ids
->id
, temp_mc
.data
, temp_mc
.size
);
437 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: unable to save temp meta, err: %d\n", dnet_dump_id_str(ids
->id
.id
), err
);
442 /* Check if removal_delay second has gone since object was marked as REMOVED */
443 if (removed_in_all
) {
444 gettimeofday(¤t_ts
, NULL
);
445 if (((uint64_t)current_ts
.tv_sec
< mu
[lastest
].tm
.tsec
)
446 || ((uint64_t)current_ts
.tv_sec
- mu
[lastest
].tm
.tsec
) < (uint64_t)(state
->n
->removal_delay
* 3600 * 24))
450 /* TODO: receive newer files from remote groups
452 * Yep, we should read it locally and send it to other groups too
454 if ((lastest_group
!= my_group
) && !(mu
[lastest
].flags
& DNET_IO_FLAGS_REMOVED
)) {
455 dnet_log(state
->n
, DNET_LOG_DEBUG
, "BULK: %s: File on remote group %d is newer, skipping this file\n",
456 dnet_dump_id_str(ids
->id
.id
), lastest_group
);
461 for (i
= 0; i
< group_num
; ++i
) {
463 if (mu
[i
].group_id
== my_group
)
466 dnet_session_set_groups(s
, (int *)&mu
[i
].group_id
, 1);
467 dnet_setup_id(&id
, mu
[i
].group_id
, ids
->id
.id
);
470 if (mu
[lastest
].flags
& DNET_IO_FLAGS_REMOVED
) {
471 if (removed_in_all
) {
472 dnet_log(state
->n
, DNET_LOG_DEBUG
, "BULK: dnet_remove_object_now %s in group %d, err=%d\n",
473 dnet_dump_id(&id
), mu
[i
].group_id
, err
);
474 err
= dnet_remove_object_now(s
, &id
, cflags
, ioflags
);
476 if (!(mu
[i
].flags
& DNET_IO_FLAGS_REMOVED
)) {
477 err
= dnet_remove_object(s
, &id
, NULL
, NULL
, cflags
, ioflags
);
478 dnet_log(state
->n
, DNET_LOG_DEBUG
, "BULK: dnet_remove_object %s in group %d err=%d\n",
479 dnet_dump_id(&id
), mu
[i
].group_id
, err
);
485 if ((mu
[i
].tm
.tsec
< mu
[lastest
].tm
.tsec
) || ((mu
[i
].tm
.tsec
== mu
[lastest
].tm
.tsec
) &&
486 ((mu
[i
].tm
.tnsec
< mu
[lastest
].tm
.tnsec
)))) {
487 err
= state
->n
->cb
->send(state
, state
->n
->cb
->command_private
, &id
);
492 err
= dnet_meta_update_check_status_raw(state
->n
, &mc
);
496 memcpy(&mc
.id
, &id
, sizeof(struct dnet_id
));
497 err
= dnet_write_metadata(s
, &mc
, 1, cflags
);
498 dnet_log(state
->n
, DNET_LOG_DEBUG
, "BULK: dnet_write_metadata %s in group %d, err=%d\n",
499 dnet_dump_id(&id
), my_group
, err
);
507 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: Error during sending transaction to group %d, err=%d\n",
508 dnet_dump_id_str(ids
->id
.id
), groups
[i
], err
);
509 if (!error
&& err
< 0)
513 if (mu
[lastest
].flags
& DNET_IO_FLAGS_REMOVED
) {
514 if (removed_in_all
) {
515 err
= dnet_merge_remove_local(state
->n
, &mc
.id
, 0);
516 } else if (!(my_mu
.flags
& DNET_IO_FLAGS_REMOVED
)) {
517 err
= dnet_merge_remove_local(state
->n
, &mc
.id
, 1);
521 if (!(mu
[lastest
].flags
& DNET_IO_FLAGS_REMOVED
) && !error
) {
522 err
= dnet_meta_update_check_status(state
->n
, &mc
);
524 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: couldn't update meta CHECK_STATUS\n", dnet_dump_id_str(ids
->id
.id
));
529 err
= dnet_db_remove_raw(p
->db
->b
, &ids
->id
, 1);
531 dnet_log_raw(state
->n
, DNET_LOG_ERROR
, "BULK: %s: DB: failed to remove temp_meta object, err: %d.\n",
532 dnet_dump_id(&mc
.id
), err
);
546 dnet_log(state
->n
, DNET_LOG_ERROR
, "Failed to check ID %s to %s, err=%d\n", dnet_dump_id_str(ids
->id
.id
),
547 dnet_state_dump_addr(state
), error
);
549 if (i
== group_num
) {
550 dnet_counter_inc(state
->n
, DNET_CNTR_NODE_CHECK_COPY
, error
);
556 static int dnet_bulk_check_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
)
558 struct dnet_bulk_check_priv
*p
= priv
;
561 if (is_trans_destroyed(state
, cmd
)) {
562 dnet_wakeup(p
->w
, p
->w
->cond
++);
564 dnet_check_temp_db_put(p
->db
);
565 if (atomic_dec_and_test(&p
->refcnt
)) {
572 /* Empty reply that prevents timeout */
573 if (cmd
->size
== 0) {
577 if (!(cmd
->size
% sizeof(struct dnet_bulk_id
))) {
578 struct dnet_bulk_id
*ids
= (struct dnet_bulk_id
*)(cmd
+ 1);
579 int num
= cmd
->size
/ sizeof(struct dnet_bulk_id
);
581 dnet_log(state
->n
, DNET_LOG_DEBUG
, "BULK: received %d entries\n", num
);
583 //dnet_db_ptr_get(&state->n->temp_meta);
584 //ret = kcdbbegintran(state->n->temp_meta.db, 0);
586 // err = -kcdbecode(state->n->temp_meta.db);
587 // dnet_log_raw(state->n, DNET_LOG_ERROR, "BULK: DB: failed to start temp_meta transaction, err: %d: %s.\n",
588 // err, kcecodename(-err));
592 for (i
= 0; i
< num
&& !state
->n
->need_exit
; ++i
) {
593 err
= dnet_bulk_check_complete_single(state
, &ids
[i
], cmd
->id
.group_id
, p
);
596 //kcdbendtran(state->n->temp_meta.db, 1);
597 //dnet_db_ptr_put(state->n, &state->n->temp_meta);
600 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: couldn't update meta CHECK_STATUS\n");
603 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: received corrupted data, size = %llu, sizeof(dnet_bulk_id) = %zu\n",
604 (unsigned long long)cmd
->size
, sizeof(struct dnet_bulk_id
));
607 p
->w
->status
= cmd
->status
;
611 int dnet_request_bulk_check(struct dnet_node
*n
, struct dnet_bulk_state
*state
, struct dnet_check_params
*params
)
613 struct dnet_trans_control ctl
;
614 struct dnet_net_state
*st
;
615 struct dnet_bulk_check_priv
*p
;
616 struct timespec wait_ts
;
619 p
= (struct dnet_bulk_check_priv
*)malloc(sizeof(struct dnet_bulk_check_priv
));
624 atomic_init(&p
->refcnt
, 2);
626 dnet_check_temp_db_get(p
->db
);
628 p
->w
= dnet_wait_alloc(0);
634 p
->group_num
= params
->group_num
;
635 p
->groups
= (int *)malloc(sizeof(int) * params
->group_num
);
640 memcpy(p
->groups
, params
->groups
, sizeof(int) * params
->group_num
);
642 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
644 ctl
.cmd
= DNET_CMD_LIST
;
645 ctl
.complete
= dnet_bulk_check_complete
;
647 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| DNET_FLAGS_NOLOCK
| DNET_ATTR_BULK_CHECK
;
649 ctl
.data
= state
->ids
;
650 ctl
.size
= sizeof(struct dnet_bulk_id
) * state
->num
;
652 st
= dnet_state_search_by_addr(n
, &state
->addr
);
657 dnet_setup_id(&ctl
.id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
660 dnet_log(n
, DNET_LOG_DEBUG
, "BULK: sending %u bytes of data to %s (%s)\n",
661 ctl
.size
, dnet_dump_id(&ctl
.id
), dnet_server_convert_dnet_addr(&state
->addr
));
662 err
= dnet_trans_alloc_send_state(st
, &ctl
);
665 wait_ts
= n
->wait_ts
;
666 wait_ts
.tv_sec
*= DNET_BULK_IDS_SIZE
;
667 err
= dnet_wait_event(p
->w
, p
->w
->cond
!= 0, &wait_ts
);
678 if (atomic_dec_and_test(&p
->refcnt
)) {
689 if (atomic_dec_and_test(&p
->refcnt
)) {
695 dnet_log(n
, DNET_LOG_ERROR
, "Bulk check exited with status %d\n", err
);
699 static int dnet_bulk_add_id(struct dnet_node
*n
, struct dnet_bulk_array
*bulk_array
, struct dnet_id
*id
,
700 struct dnet_meta_container
*mc
, struct dnet_check_params
*params
)
703 struct dnet_bulk_state tmp
;
704 struct dnet_bulk_state
*state
= NULL
;
705 struct dnet_net_state
*st
= dnet_state_get_first(n
, id
);
706 struct dnet_bulk_id
*bulk_id
;
707 struct dnet_meta_update mu
;
709 dnet_log(n
, DNET_LOG_DEBUG
, "BULK: adding ID %s to array\n", dnet_dump_id(id
));
713 memcpy(&tmp
.addr
, &st
->addr
, sizeof(struct dnet_addr
));
716 dnet_log(n
, DNET_LOG_DEBUG
, "BULK: Searching state in states array\n");
717 state
= bsearch(&tmp
, bulk_array
->states
, bulk_array
->num
, sizeof(struct dnet_bulk_state
), dnet_compare_bulk_state
);
721 if (!dnet_get_meta_update(n
, mc
, &mu
))
724 dnet_log(n
, DNET_LOG_DEBUG
, "BULK: addr = %s state->num = %d\n", dnet_server_convert_dnet_addr(&state
->addr
), state
->num
);
725 //pthread_mutex_lock(&state->state_lock);
726 if (state
->num
>= DNET_BULK_IDS_SIZE
|| state
->num
< 0)
729 bulk_id
= &state
->ids
[state
->num
];
730 memset(bulk_id
, 0, sizeof(struct dnet_bulk_id
));
732 memcpy(&bulk_id
->id
, &id
->id
, DNET_ID_SIZE
);
734 dnet_log(n
, DNET_LOG_DEBUG
, "BULK: ID: %s, last_update->tsec=%llu, last_update->tnsec=%llu, flags=%02llx\n",
735 dnet_dump_id_str(bulk_id
->id
.id
), (unsigned long long)mu
.tm
.tsec
, (unsigned long long)mu
.tm
.tnsec
,
736 (unsigned long long)mu
.flags
);
738 dnet_convert_meta_update(&mu
);
740 memcpy(&bulk_id
->last_update
, &mu
, sizeof(struct dnet_meta_update
));
744 dnet_log(n
, DNET_LOG_DEBUG
, "BULK: addr = %s state->num = %d\n", dnet_server_convert_dnet_addr(&state
->addr
), state
->num
);
745 if (state
->num
== DNET_BULK_IDS_SIZE
) {
746 err
= dnet_request_bulk_check(n
, state
, params
);
752 //pthread_mutex_unlock(&state->state_lock);
757 //pthread_mutex_unlock(&state->state_lock);
761 static int dnet_check_number_of_copies(struct dnet_node
*n
, struct dnet_meta_container
*mc
, int *groups
, int group_num
,
762 struct dnet_bulk_array
*bulk_array
, struct dnet_check_params
*params
)
765 int group_id
= mc
->id
.group_id
;
766 int err
= 0, i
, error
= 0;
768 for (i
=0; i
<group_num
; ++i
) {
769 if (groups
[i
] == group_id
)
772 dnet_setup_id(&raw
, groups
[i
], mc
->id
.id
);
774 err
= dnet_bulk_add_id(n
, bulk_array
, &raw
, mc
, params
);
776 dnet_log(n
, DNET_LOG_ERROR
, "BULK: after adding ID %s err = %d\n", dnet_dump_id(&raw
), err
);
787 static int dnet_check_copies(struct dnet_node
*n
, struct dnet_meta_container
*mc
,
788 struct dnet_bulk_array
*bulk_array
, struct dnet_check_params
*params
)
793 if (params
->group_num
) {
794 groups
= params
->groups
;
795 err
= params
->group_num
;
797 err
= dnet_check_find_groups(n
, mc
, &groups
);
802 err
= dnet_check_number_of_copies(n
, mc
, groups
, err
, bulk_array
, params
);
804 if (!params
->group_num
)
810 static int dnet_merge_direct(struct dnet_session
*s
, struct dnet_meta_container
*mc
)
812 struct dnet_node
*n
= s
->node
;
813 struct dnet_net_state
*base
;
817 dnet_log(n
, DNET_LOG_DEBUG
, "in dnet_merge_direct mc->size = %u\n", mc
->size
);
818 base
= dnet_node_state(n
);
824 err
= n
->cb
->send(base
, n
->cb
->command_private
, &mc
->id
);
825 dnet_log(n
, DNET_LOG_DEBUG
, "in dnet_merge_direct after n->cb->send err = %d\n\n", err
);
829 dnet_log(n
, DNET_LOG_DEBUG
, "in dnet_merge_direct2 mc->size = %u\n", mc
->size
);
830 err
= dnet_write_metadata(s
, mc
, 0, cflags
);
837 dnet_state_put(base
);
843 static int dnet_merge_upload(struct dnet_node *n, struct dnet_meta_container *mc)
845 struct dnet_net_state *base;
848 base = dnet_node_state(n);
854 err = n->cb->send(base, n->cb->command_private, &mc->id);
858 err = dnet_write_metadata(n, mc, 0);
863 dnet_state_put(base);
869 static int dnet_merge_common(struct dnet_session
*s
, struct dnet_meta_container
*remote_meta
, struct dnet_meta_container
*mc
)
871 struct dnet_node
*n
= s
->node
;
873 struct dnet_meta_update local
, remote
;
875 uint64_t ioflags
= 0;
877 dnet_log(n
, DNET_LOG_DEBUG
, "in dnet_merge_common mc->size = %d\n", mc
->size
);
878 if (!dnet_get_meta_update(n
, mc
, &local
)) {
880 dnet_log(n
, DNET_LOG_ERROR
, "%s: META_UPDATE not found in local meta\n", dnet_dump_id(&mc
->id
));
884 if (!dnet_get_meta_update(n
, remote_meta
, &remote
)) {
886 dnet_log(n
, DNET_LOG_ERROR
, "%s: META_UPDATE not found in remote meta, perform direct merge\n", dnet_dump_id(&mc
->id
));
887 err
= dnet_merge_direct(s
, mc
);
891 if ((local
.tm
.tsec
> remote
.tm
.tsec
) || (local
.tm
.tsec
== remote
.tm
.tsec
&& local
.tm
.tnsec
> remote
.tm
.tnsec
)) {
892 if (local
.flags
& DNET_IO_FLAGS_REMOVED
) {
893 err
= dnet_remove_object_now(s
, &mc
->id
, cflags
, ioflags
);
895 err
= dnet_merge_direct(s
, mc
);
904 static int dnet_check_merge(struct dnet_session
*s
, struct dnet_meta_container
*mc
)
906 struct dnet_node
*n
= s
->node
;
908 struct dnet_meta_container remote_mc
;
910 dnet_log(n
, DNET_LOG_DEBUG
, "in dnet_check_merge mc->size = %d\n", mc
->size
);
911 memset(&remote_mc
, 0, sizeof(struct dnet_meta_container
));
913 err
= dnet_read_meta(s
, &remote_mc
, NULL
, 0, &mc
->id
);
915 if (err
!= -ENOENT
) {
916 dnet_log_raw(n
, DNET_LOG_ERROR
, "%s: failed to download object to be merged from storage: %d.\n",
917 dnet_dump_id(&mc
->id
), err
);
921 dnet_log_raw(n
, DNET_LOG_INFO
, "%s: there is no meta in the storage to merge with, "
922 "doing direct merge (plain upload).\n", dnet_dump_id(&mc
->id
));
923 err
= dnet_merge_direct(s
, mc
);
926 err
= dnet_merge_common(s
, &remote_mc
, mc
);
934 free(remote_mc
.data
);
938 int dnet_check(struct dnet_node
*n
, struct dnet_meta_container
*mc
, struct dnet_bulk_array
*bulk_array
,
939 int need_merge
, struct dnet_check_params
*params
)
942 struct dnet_session
*s
= dnet_session_create(n
);
943 dnet_session_set_groups(s
, (int *)&n
->id
.group_id
, 1);
945 dnet_log(n
, DNET_LOG_DEBUG
, "need_merge = %d, mc.size = %d\n", need_merge
, mc
->size
);
947 err
= dnet_check_merge(s
, mc
);
948 dnet_log(n
, DNET_LOG_DEBUG
, "err=%d\n", err
);
950 dnet_merge_remove_local(n
, &mc
->id
, 0);
952 err
= dnet_check_copies(n
, mc
, bulk_array
, params
);