2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
18 #include <sys/types.h>
30 #include "elliptics.h"
31 #include "elliptics/interface.h"
33 static char dnet_check_tmp_dir
[] = "/dev/shm";
35 static int dnet_merge_remove_local(struct dnet_node
*n
, struct dnet_id
*id
, int full_process
)
37 char buf
[sizeof(struct dnet_cmd
)];
39 struct dnet_net_state
*base
;
42 memset(buf
, 0, sizeof(buf
));
44 cmd
= (struct dnet_cmd
*)buf
;
46 memcpy(&cmd
->id
, id
, sizeof(struct dnet_id
));
49 cmd
->cmd
= DNET_CMD_DEL
;
51 cmd
->flags
= DNET_ATTR_DELETE_HISTORY
;
53 base
= dnet_node_state(n
);
55 err
= dnet_process_meta(base
, cmd
, NULL
);
62 static int dnet_dump_meta_container(struct dnet_node
*n
, struct dnet_meta_container
*mc
)
66 char id_str
[DNET_ID_SIZE
*2+1];
68 snprintf(file
, sizeof(file
), "%s/%s.meta", dnet_check_tmp_dir
, dnet_dump_id_len_raw(mc
->id
.id
, DNET_ID_SIZE
, id_str
));
70 fd
= open(file
, O_RDWR
| O_TRUNC
| O_CREAT
| O_CLOEXEC
, 0644);
73 dnet_log_raw(n
, DNET_LOG_ERROR
, "Failed to open meta container file '%s': %s\n",
74 file
, strerror(errno
));
78 err
= write(fd
, mc
->data
, mc
->size
);
79 if (err
!= (int)mc
->size
) {
81 dnet_log_raw(n
, DNET_LOG_ERROR
, "Failed to write meta container into '%s': %s\n",
82 file
, strerror(errno
));
93 static int dnet_check_find_groups(struct dnet_node
*n
, struct dnet_meta_container
*mc
, int **groupsp
)
99 m
= dnet_meta_search(n
, mc
, DNET_META_GROUPS
);
101 dnet_log_raw(n
, DNET_LOG_ERROR
, "%s: failed to find groups metadata.\n", dnet_dump_id(&mc
->id
));
106 groups
= malloc(m
->size
);
111 memcpy(groups
, m
->data
, m
->size
);
113 num
= m
->size
/ sizeof(int32_t);
115 for (i
=0; i
<num
; ++i
) {
116 dnet_log_raw(n
, DNET_LOG_DSA
, "%s: group: %d\n", dnet_dump_id(&mc
->id
), groups
[i
]);
124 dnet_dump_meta_container(n
, mc
);
128 /*static int dnet_bulk_db_check_update(struct dnet_node *n, struct dnet_meta_container *mc_array, int *rec_num,
129 struct dnet_meta_container *mc, int final)
132 int64_t rec_processed;
133 KCREC recs[DNET_BULK_META_UPD_SIZE];
136 if (!rec_num || *rec_num > DNET_BULK_META_UPD_SIZE) {
143 dnet_log_raw(n, DNET_LOG_ERROR, "CHECK: mc should be passed\n");
146 memcpy(&mc_array[*rec_num].id, &mc->id, sizeof(struct dnet_id));
147 mc_array[*rec_num].size = mc->size;
148 // Allocate extra memory for potential META_CHECK_STATUS structure
149 mc_array[*rec_num].data = malloc(mc->size + sizeof(struct dnet_meta) + sizeof(struct dnet_meta_check_status));
150 if (!mc_array[*rec_num].data) {
154 memcpy(mc_array[*rec_num].data, mc->data, mc->size);
158 if (*rec_num == DNET_BULK_META_UPD_SIZE || (final && *rec_num > 0)) {
159 err = kcdbbegintran(n->meta, 0);
161 err = -kcdbecode(n->meta);
162 dnet_log_raw(n, DNET_LOG_ERROR, "CHECK: DB: failed to start %s transaction, err: %d: %s.\n",
163 "meta", err, kcecodename(-err));
166 for (rec_iter = 0; rec_iter < *rec_num; ++rec_iter) {
167 err = dnet_db_check_update(n, &mc_array[rec_iter]);
168 recs[rec_iter].key.size = DNET_ID_SIZE;
169 recs[rec_iter].key.buf = (char *)mc_array[rec_iter].id.id;
170 recs[rec_iter].value.size = mc_array[rec_iter].size;
171 recs[rec_iter].value.buf = mc_array[rec_iter].data;
173 rec_processed = kcdbsetbulk(n->meta, recs, *rec_num, 0);
174 if ((int)rec_processed != *rec_num) {
175 err = -kcdbecode(n->meta);
176 dnet_log_raw(n, DNET_LOG_ERROR, "CHECK: DB: failed to set check update stamps, %d records processed, err: %d: %s.\n",
177 (int)rec_processed, err, kcecodename(-err));
178 kcdbendtran(n->meta, 0);
182 kcdbendtran(n->meta, 1);
183 for (rec_iter = 0; rec_iter < *rec_num; ++rec_iter) {
184 free(mc_array[rec_iter].data);
194 int dnet_cmd_bulk_check(struct dnet_net_state
*orig
, struct dnet_cmd
*cmd
, void *data
)
196 struct dnet_bulk_id
*ids
= (struct dnet_bulk_id
*)data
;
197 struct dnet_meta_container mc
;
198 struct dnet_meta_update mu
;
204 if (!(cmd
->size
% sizeof(struct dnet_bulk_id
))) {
205 num
= cmd
->size
/ sizeof(struct dnet_bulk_id
);
207 dnet_log(orig
->n
, DNET_LOG_DSA
, "BULK: received %d entries\n", num
);
209 for (i
= 0; i
< num
; ++i
) {
211 /* Send empty reply every DNET_BULK_CHECK_PING records to prevent timeout */
212 if (i
% DNET_BULK_CHECK_PING
== 0 && i
> 0) {
213 dnet_send_reply(orig
, cmd
, NULL
, 0, 1);
216 dnet_log(orig
->n
, DNET_LOG_DSA
, "BULK: processing ID %s\n", dnet_dump_id_str(ids
[i
].id
.id
));
218 dnet_setup_id(&mc
.id
, 0, ids
[i
].id
.id
);
219 err
= orig
->n
->cb
->meta_read(orig
->n
->cb
->command_private
, &ids
[i
].id
, &mc
.data
);
222 dnet_log(orig
->n
, DNET_LOG_DSA
, "BULK: %d bytes of metadata found, searching for META_UPDATE group_id=%d\n",
223 mc
.size
, orig
->n
->st
->idc
->group
->group_id
);
224 if (dnet_get_meta_update(orig
->n
, &mc
, &mu
))
226 dnet_convert_meta_update(&ids
[i
].last_update
);
227 dnet_log(orig
->n
, DNET_LOG_DSA
, "BULK: mu.tsec=%lu, mu.tnsec=%lu, mu.flags=%02lx\n",
228 (unsigned long)mu
.tm
.tsec
, (unsigned long)mu
.tm
.tnsec
, (unsigned long)mu
.flags
);
229 dnet_log(orig
->n
, DNET_LOG_DSA
,
230 "BULK: last_update.tsec=%lu, last_update.tnsec=%lu, last_update.flags=%02lx\n",
231 (unsigned long)ids
[i
].last_update
.tm
.tsec
, (unsigned long)ids
[i
].last_update
.tm
.tnsec
,
232 (unsigned long)ids
[i
].last_update
.flags
);
234 if ((mu
.flags
& DNET_IO_FLAGS_REMOVED
) || (mu
.tm
.tsec
< ids
[i
].last_update
.tm
.tsec
) ||
235 ((mu
.tm
.tnsec
< ids
[i
].last_update
.tm
.tnsec
) && (mu
.tm
.tsec
== ids
[i
].last_update
.tm
.tsec
))) {
238 /* File is not needed to be updated */
239 dnet_setup_id(&raw
, orig
->n
->id
.group_id
, ids
[i
].id
.id
);
240 err
= dnet_stat_local(orig
, &raw
);
242 /* File was not found in the storage */
246 err
= dnet_meta_update_check_status(orig
->n
, &mc
);
248 dnet_log(orig
->n
, DNET_LOG_ERROR
,
249 "BULK: %s: couldn't update meta CHECK_STATUS err: %d\n",
250 dnet_dump_id_str(ids
[i
].id
.id
), err
);
255 memcpy(&ids
[i
].last_update
, &mu
, sizeof(struct dnet_meta_update
));
256 dnet_convert_meta_update(&ids
[i
].last_update
);
260 /* Meta is not present - set timestamp to very old one */
261 dnet_convert_meta_update(&ids
[i
].last_update
);
262 ids
[i
].last_update
.tm
.tsec
= 1;
263 ids
[i
].last_update
.flags
= 0;
264 dnet_convert_meta_update(&ids
[i
].last_update
);
268 dnet_log(orig
->n
, DNET_LOG_ERROR
, "BULK: received corrupted data, size = %llu, sizeof(dnet_bulk_id) = %zu\n",
269 (unsigned long long)cmd
->size
, sizeof(struct dnet_bulk_id
));
274 return dnet_send_reply(orig
, cmd
, data
, sizeof(struct dnet_bulk_id
) * num
, 0);
280 struct dnet_bulk_check_priv
{
282 struct dnet_check_temp_db
*db
;
288 static int dnet_bulk_check_complete_single(struct dnet_net_state
*state
, struct dnet_bulk_id
*ids
,
289 int remote_group
, struct dnet_bulk_check_priv
*p
)
291 struct dnet_meta_container mc
;
292 struct dnet_meta_container temp_mc
;
293 struct dnet_meta_update
*mu
;
294 struct dnet_meta
*mg
;
296 char *tmpdata
= NULL
;
297 int *groups
, group_num
= 1;
298 int err
= -EINVAL
, error
= 0;
300 int my_group
, lastest_group
= -1;
301 struct dnet_meta_update lastest_mu
, my_mu
;
302 struct timeval current_ts
;
303 int removed_in_all
= 1, updated
= 0;
307 my_group
= state
->n
->id
.group_id
;
309 dnet_log(state
->n
, DNET_LOG_DSA
, "BULK: checking ID %s\n", dnet_dump_id_str(ids
->id
.id
));
313 dnet_setup_id(&mc
.id
, my_group
, ids
->id
.id
);
315 err
= state
->n
->cb
->meta_read(state
->n
->cb
->command_private
, &ids
->id
, &mc
.data
);
319 goto err_out_continue
;
323 /* Set current group meta_update as lastest_mu */
324 if (!dnet_get_meta_update(state
->n
, &mc
, &my_mu
)) {
325 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: meta_update structure doesn't exist for group %d\n",
326 dnet_dump_id_str(ids
->id
.id
), my_group
);
330 dnet_convert_meta_update(&my_mu
);
331 memcpy(&lastest_mu
, &my_mu
, sizeof(struct dnet_meta_update
));
332 lastest_group
= my_group
;
336 /* groups came from dnet_check utility */
338 group_num
= p
->group_num
;
340 mg
= dnet_meta_search(state
->n
, &mc
, DNET_META_GROUPS
);
342 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: DNET_META_GROUPS structure doesn't exist\n", dnet_dump_id_str(ids
->id
.id
));
346 dnet_convert_meta(mg
);
347 if (mg
->size
% sizeof(int)) {
348 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: DNET_META_GROUPS structure is corrupted\n", dnet_dump_id_str(ids
->id
.id
));
352 group_num
= mg
->size
/ sizeof(int);
353 groups
= (int *)mg
->data
;
354 dnet_convert_meta(mg
);
357 /* Read temporary meta */
358 temp_mc
.data
= malloc(sizeof(struct dnet_meta_update
) * group_num
);
361 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: could not allocate memory for temp UPDATE_META\n", dnet_dump_id_str(ids
->id
.id
));
364 memset(temp_mc
.data
, 0, sizeof(struct dnet_meta_update
) * group_num
);
365 temp_mc
.size
= sizeof(struct dnet_meta_update
) * group_num
;
367 err
= dnet_db_read_raw(p
->db
->b
, &ids
->id
, (void **)&tmpdata
);
369 if (err
< 0 && err
!= -2)
371 /* No data in temp meta was stored. Placing local meta_update at the beginning */
373 mu
[0].group_id
= my_group
;
375 mu
[0].flags
= my_mu
.flags
;
378 if (err
> (int)(sizeof(struct dnet_meta_update
) * group_num
)) {
379 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: too many data stored in temp meta\n", dnet_dump_id_str(ids
->id
.id
));
383 memcpy(temp_mc
.data
, tmpdata
, err
);
386 /* Update temp meta with received group */
390 for (i
= 0; i
< group_num
; ++i
) {
391 if (mu
[i
].group_id
== remote_group
) {
392 mu
[i
].tm
= ids
->last_update
.tm
;
393 mu
[i
].flags
= ids
->last_update
.flags
;
397 if (mu
[i
].group_id
== 0)
400 if (!(mu
[i
].flags
& DNET_IO_FLAGS_REMOVED
))
403 if (((mu
[i
].tm
.tsec
> mu
[lastest
].tm
.tsec
)
404 || ((mu
[i
].tm
.tsec
== mu
[lastest
].tm
.tsec
) && (mu
[i
].tm
.tnsec
> mu
[lastest
].tm
.tnsec
)))
407 lastest_group
= groups
[i
];
411 if (!updated
&& i
== group_num
) {
412 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: no space left to save group in temp meta!\n", dnet_dump_id_str(ids
->id
.id
));
417 mu
[i
].group_id
= remote_group
;
418 mu
[i
].tm
= ids
->last_update
.tm
;
419 mu
[i
].flags
= ids
->last_update
.flags
;
421 if (((mu
[i
].tm
.tsec
> mu
[lastest
].tm
.tsec
)
422 || ((mu
[i
].tm
.tsec
== mu
[lastest
].tm
.tsec
) && (mu
[i
].tm
.tnsec
> mu
[lastest
].tm
.tnsec
)))
425 lastest_group
= groups
[i
];
431 /* Not all groups processed yet */
434 err
= dnet_db_write_raw(p
->db
->b
, &ids
->id
, temp_mc
.data
, temp_mc
.size
);
436 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: unable to save temp meta, err: %d\n", dnet_dump_id_str(ids
->id
.id
), err
);
441 /* Check if removal_delay second has gone since object was marked as REMOVED */
442 if (removed_in_all
) {
443 gettimeofday(¤t_ts
, NULL
);
444 if (((uint64_t)current_ts
.tv_sec
< mu
[lastest
].tm
.tsec
)
445 || ((uint64_t)current_ts
.tv_sec
- mu
[lastest
].tm
.tsec
) < (uint64_t)(state
->n
->removal_delay
* 3600 * 24))
449 /* TODO: receive newer files from remote groups
451 * Yep, we should read it locally and send it to other groups too
453 if ((lastest_group
!= my_group
) && !(mu
[lastest
].flags
& DNET_IO_FLAGS_REMOVED
)) {
454 dnet_log(state
->n
, DNET_LOG_DSA
, "BULK: %s: File on remote group %d is newer, skipping this file\n",
455 dnet_dump_id_str(ids
->id
.id
), lastest_group
);
460 for (i
= 0; i
< group_num
; ++i
) {
462 if (mu
[i
].group_id
== my_group
)
465 dnet_setup_id(&id
, mu
[i
].group_id
, ids
->id
.id
);
468 if (mu
[lastest
].flags
& DNET_IO_FLAGS_REMOVED
) {
469 if (removed_in_all
) {
470 dnet_log(state
->n
, DNET_LOG_DSA
, "BULK: dnet_remove_object_now %s in group %d, err=%d\n",
471 dnet_dump_id(&id
), mu
[i
].group_id
, err
);
472 err
= dnet_remove_object_now(state
->n
, &id
, cflags
);
474 if (!(mu
[i
].flags
& DNET_IO_FLAGS_REMOVED
)) {
475 err
= dnet_remove_object(state
->n
, &id
, NULL
, NULL
, cflags
);
476 dnet_log(state
->n
, DNET_LOG_DSA
, "BULK: dnet_remove_object %s in group %d err=%d\n",
477 dnet_dump_id(&id
), mu
[i
].group_id
, err
);
483 if ((mu
[i
].tm
.tsec
< mu
[lastest
].tm
.tsec
) || ((mu
[i
].tm
.tsec
== mu
[lastest
].tm
.tsec
) &&
484 ((mu
[i
].tm
.tnsec
< mu
[lastest
].tm
.tnsec
)))) {
485 err
= state
->n
->cb
->send(state
, state
->n
->cb
->command_private
, &id
);
490 err
= dnet_meta_update_check_status_raw(state
->n
, &mc
);
494 memcpy(&mc
.id
, &id
, sizeof(struct dnet_id
));
495 err
= dnet_write_metadata(state
->n
, &mc
, 1, cflags
);
496 dnet_log(state
->n
, DNET_LOG_DSA
, "BULK: dnet_write_metadata %s in group %d, err=%d\n",
497 dnet_dump_id(&id
), my_group
, err
);
505 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: Error during sending transaction to group %d, err=%d\n",
506 dnet_dump_id_str(ids
->id
.id
), groups
[i
], err
);
507 if (!error
&& err
< 0)
511 if (mu
[lastest
].flags
& DNET_IO_FLAGS_REMOVED
) {
512 if (removed_in_all
) {
513 err
= dnet_merge_remove_local(state
->n
, &mc
.id
, 0);
514 } else if (!(my_mu
.flags
& DNET_IO_FLAGS_REMOVED
)) {
515 err
= dnet_merge_remove_local(state
->n
, &mc
.id
, 1);
519 if (!(mu
[lastest
].flags
& DNET_IO_FLAGS_REMOVED
) && !error
) {
520 err
= dnet_meta_update_check_status(state
->n
, &mc
);
522 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: couldn't update meta CHECK_STATUS\n", dnet_dump_id_str(ids
->id
.id
));
527 err
= dnet_db_remove_raw(p
->db
->b
, &ids
->id
, 1);
529 dnet_log_raw(state
->n
, DNET_LOG_ERROR
, "BULK: %s: DB: failed to remove temp_meta object, err: %d.\n",
530 dnet_dump_id(&mc
.id
), err
);
544 dnet_log(state
->n
, DNET_LOG_ERROR
, "Failed to check ID %s to %s, err=%d\n", dnet_dump_id_str(ids
->id
.id
),
545 dnet_state_dump_addr(state
), error
);
547 if (i
== group_num
) {
548 dnet_counter_inc(state
->n
, DNET_CNTR_NODE_CHECK_COPY
, error
);
554 static int dnet_bulk_check_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
)
556 struct dnet_bulk_check_priv
*p
= priv
;
559 if (is_trans_destroyed(state
, cmd
)) {
560 dnet_wakeup(p
->w
, p
->w
->cond
++);
562 dnet_check_temp_db_put(p
->db
);
563 if (atomic_dec_and_test(&p
->refcnt
)) {
570 /* Empty reply that prevents timeout */
571 if (cmd
->size
== 0) {
575 if (!(cmd
->size
% sizeof(struct dnet_bulk_id
))) {
576 struct dnet_bulk_id
*ids
= (struct dnet_bulk_id
*)(cmd
+ 1);
577 int num
= cmd
->size
/ sizeof(struct dnet_bulk_id
);
579 dnet_log(state
->n
, DNET_LOG_DSA
, "BULK: received %d entries\n", num
);
581 //dnet_db_ptr_get(&state->n->temp_meta);
582 //ret = kcdbbegintran(state->n->temp_meta.db, 0);
584 // err = -kcdbecode(state->n->temp_meta.db);
585 // dnet_log_raw(state->n, DNET_LOG_ERROR, "BULK: DB: failed to start temp_meta transaction, err: %d: %s.\n",
586 // err, kcecodename(-err));
590 for (i
= 0; i
< num
&& !state
->n
->need_exit
; ++i
) {
591 err
= dnet_bulk_check_complete_single(state
, &ids
[i
], cmd
->id
.group_id
, p
);
594 //kcdbendtran(state->n->temp_meta.db, 1);
595 //dnet_db_ptr_put(state->n, &state->n->temp_meta);
598 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: couldn't update meta CHECK_STATUS\n");
601 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: received corrupted data, size = %llu, sizeof(dnet_bulk_id) = %zu\n",
602 (unsigned long long)cmd
->size
, sizeof(struct dnet_bulk_id
));
605 p
->w
->status
= cmd
->status
;
609 int dnet_request_bulk_check(struct dnet_node
*n
, struct dnet_bulk_state
*state
, struct dnet_check_params
*params
)
611 struct dnet_trans_control ctl
;
612 struct dnet_net_state
*st
;
613 struct dnet_bulk_check_priv
*p
;
614 struct timespec wait_ts
;
617 p
= (struct dnet_bulk_check_priv
*)malloc(sizeof(struct dnet_bulk_check_priv
));
622 atomic_init(&p
->refcnt
, 2);
624 dnet_check_temp_db_get(p
->db
);
626 p
->w
= dnet_wait_alloc(0);
632 p
->group_num
= params
->group_num
;
633 p
->groups
= (int *)malloc(sizeof(int) * params
->group_num
);
638 memcpy(p
->groups
, params
->groups
, sizeof(int) * params
->group_num
);
640 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
642 ctl
.cmd
= DNET_CMD_LIST
;
643 ctl
.complete
= dnet_bulk_check_complete
;
645 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| DNET_FLAGS_NOLOCK
| DNET_ATTR_BULK_CHECK
;
647 ctl
.data
= state
->ids
;
648 ctl
.size
= sizeof(struct dnet_bulk_id
) * state
->num
;
650 st
= dnet_state_search_by_addr(n
, &state
->addr
);
655 dnet_setup_id(&ctl
.id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
658 dnet_log(n
, DNET_LOG_DSA
, "BULK: sending %u bytes of data to %s (%s)\n",
659 ctl
.size
, dnet_dump_id(&ctl
.id
), dnet_server_convert_dnet_addr(&state
->addr
));
660 err
= dnet_trans_alloc_send_state(st
, &ctl
);
663 wait_ts
= n
->wait_ts
;
664 wait_ts
.tv_sec
*= DNET_BULK_IDS_SIZE
;
665 err
= dnet_wait_event(p
->w
, p
->w
->cond
!= 0, &wait_ts
);
676 if (atomic_dec_and_test(&p
->refcnt
)) {
687 if (atomic_dec_and_test(&p
->refcnt
)) {
693 dnet_log(n
, DNET_LOG_ERROR
, "Bulk check exited with status %d\n", err
);
697 static int dnet_bulk_add_id(struct dnet_node
*n
, struct dnet_bulk_array
*bulk_array
, struct dnet_id
*id
,
698 struct dnet_meta_container
*mc
, struct dnet_check_params
*params
)
701 struct dnet_bulk_state tmp
;
702 struct dnet_bulk_state
*state
= NULL
;
703 struct dnet_net_state
*st
= dnet_state_get_first(n
, id
);
704 struct dnet_bulk_id
*bulk_id
;
705 struct dnet_meta_update mu
;
707 dnet_log(n
, DNET_LOG_DSA
, "BULK: adding ID %s to array\n", dnet_dump_id(id
));
711 memcpy(&tmp
.addr
, &st
->addr
, sizeof(struct dnet_addr
));
714 dnet_log(n
, DNET_LOG_DSA
, "BULK: Searching state in states array\n");
715 state
= bsearch(&tmp
, bulk_array
->states
, bulk_array
->num
, sizeof(struct dnet_bulk_state
), dnet_compare_bulk_state
);
719 if (!dnet_get_meta_update(n
, mc
, &mu
))
722 dnet_log(n
, DNET_LOG_DSA
, "BULK: addr = %s state->num = %d\n", dnet_server_convert_dnet_addr(&state
->addr
), state
->num
);
723 //pthread_mutex_lock(&state->state_lock);
724 if (state
->num
>= DNET_BULK_IDS_SIZE
|| state
->num
< 0)
727 bulk_id
= &state
->ids
[state
->num
];
728 memset(bulk_id
, 0, sizeof(struct dnet_bulk_id
));
730 memcpy(&bulk_id
->id
, &id
->id
, DNET_ID_SIZE
);
732 dnet_log(n
, DNET_LOG_DSA
, "BULK: ID: %s, last_update->tsec=%llu, last_update->tnsec=%llu, flags=%02llx\n",
733 dnet_dump_id_str(bulk_id
->id
.id
), (unsigned long long)mu
.tm
.tsec
, (unsigned long long)mu
.tm
.tnsec
,
734 (unsigned long long)mu
.flags
);
736 dnet_convert_meta_update(&mu
);
738 memcpy(&bulk_id
->last_update
, &mu
, sizeof(struct dnet_meta_update
));
742 dnet_log(n
, DNET_LOG_DSA
, "BULK: addr = %s state->num = %d\n", dnet_server_convert_dnet_addr(&state
->addr
), state
->num
);
743 if (state
->num
== DNET_BULK_IDS_SIZE
) {
744 err
= dnet_request_bulk_check(n
, state
, params
);
750 //pthread_mutex_unlock(&state->state_lock);
755 //pthread_mutex_unlock(&state->state_lock);
759 static int dnet_check_number_of_copies(struct dnet_node
*n
, struct dnet_meta_container
*mc
, int *groups
, int group_num
,
760 struct dnet_bulk_array
*bulk_array
, struct dnet_check_params
*params
)
763 int group_id
= mc
->id
.group_id
;
764 int err
= 0, i
, error
= 0;
766 for (i
=0; i
<group_num
; ++i
) {
767 if (groups
[i
] == group_id
)
770 dnet_setup_id(&raw
, groups
[i
], mc
->id
.id
);
772 err
= dnet_bulk_add_id(n
, bulk_array
, &raw
, mc
, params
);
774 dnet_log(n
, DNET_LOG_ERROR
, "BULK: after adding ID %s err = %d\n", dnet_dump_id(&raw
), err
);
785 static int dnet_check_copies(struct dnet_node
*n
, struct dnet_meta_container
*mc
,
786 struct dnet_bulk_array
*bulk_array
, struct dnet_check_params
*params
)
791 if (params
->group_num
) {
792 groups
= params
->groups
;
793 err
= params
->group_num
;
795 err
= dnet_check_find_groups(n
, mc
, &groups
);
800 err
= dnet_check_number_of_copies(n
, mc
, groups
, err
, bulk_array
, params
);
802 if (!params
->group_num
)
808 static int dnet_merge_direct(struct dnet_node
*n
, struct dnet_meta_container
*mc
)
810 struct dnet_net_state
*base
;
814 dnet_log(n
, DNET_LOG_DSA
, "in dnet_merge_direct mc->size = %u\n", mc
->size
);
815 base
= dnet_node_state(n
);
821 err
= n
->cb
->send(base
, n
->cb
->command_private
, &mc
->id
);
822 dnet_log(n
, DNET_LOG_DSA
, "in dnet_merge_direct after n->cb->send err = %d\n\n", err
);
826 dnet_log(n
, DNET_LOG_DSA
, "in dnet_merge_direct2 mc->size = %u\n", mc
->size
);
827 err
= dnet_write_metadata(n
, mc
, 0, cflags
);
834 dnet_state_put(base
);
840 static int dnet_merge_upload(struct dnet_node *n, struct dnet_meta_container *mc)
842 struct dnet_net_state *base;
845 base = dnet_node_state(n);
851 err = n->cb->send(base, n->cb->command_private, &mc->id);
855 err = dnet_write_metadata(n, mc, 0);
860 dnet_state_put(base);
866 static int dnet_merge_common(struct dnet_node
*n
, struct dnet_meta_container
*remote_meta
, struct dnet_meta_container
*mc
)
869 struct dnet_meta_update local
, remote
;
872 dnet_log(n
, DNET_LOG_DSA
, "in dnet_merge_common mc->size = %d\n", mc
->size
);
873 if (!dnet_get_meta_update(n
, mc
, &local
)) {
875 dnet_log(n
, DNET_LOG_ERROR
, "%s: META_UPDATE not found in local meta\n", dnet_dump_id(&mc
->id
));
879 if (!dnet_get_meta_update(n
, remote_meta
, &remote
)) {
881 dnet_log(n
, DNET_LOG_ERROR
, "%s: META_UPDATE not found in remote meta, perform direct merge\n", dnet_dump_id(&mc
->id
));
882 err
= dnet_merge_direct(n
, mc
);
886 if ((local
.tm
.tsec
> remote
.tm
.tsec
) || (local
.tm
.tsec
== remote
.tm
.tsec
&& local
.tm
.tnsec
> remote
.tm
.tnsec
)) {
887 if (local
.flags
& DNET_IO_FLAGS_REMOVED
) {
888 err
= dnet_remove_object_now(n
, &mc
->id
, cflags
);
890 err
= dnet_merge_direct(n
, mc
);
899 static int dnet_check_merge(struct dnet_node
*n
, struct dnet_meta_container
*mc
)
902 struct dnet_meta_container remote_mc
;
904 dnet_log(n
, DNET_LOG_DSA
, "in dnet_check_merge mc->size = %d\n", mc
->size
);
905 memset(&remote_mc
, 0, sizeof(struct dnet_meta_container
));
907 err
= dnet_read_meta(n
, &remote_mc
, NULL
, 0, &mc
->id
);
909 if (err
!= -ENOENT
) {
910 dnet_log_raw(n
, DNET_LOG_ERROR
, "%s: failed to download object to be merged from storage: %d.\n",
911 dnet_dump_id(&mc
->id
), err
);
915 dnet_log_raw(n
, DNET_LOG_INFO
, "%s: there is no meta in the storage to merge with, "
916 "doing direct merge (plain upload).\n", dnet_dump_id(&mc
->id
));
917 err
= dnet_merge_direct(n
, mc
);
920 err
= dnet_merge_common(n
, &remote_mc
, mc
);
928 free(remote_mc
.data
);
932 int dnet_check(struct dnet_node
*n
, struct dnet_meta_container
*mc
, struct dnet_bulk_array
*bulk_array
,
933 int need_merge
, struct dnet_check_params
*params
)
937 dnet_log(n
, DNET_LOG_DSA
, "need_merge = %d, mc.size = %d\n", need_merge
, mc
->size
);
939 err
= dnet_check_merge(n
, mc
);
940 dnet_log(n
, DNET_LOG_DSA
, "err=%d\n", err
);
942 dnet_merge_remove_local(n
, &mc
->id
, 0);
944 err
= dnet_check_copies(n
, mc
, bulk_array
, params
);
949 static int dnet_check_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
)
951 struct dnet_wait
*w
= priv
;
954 if (is_trans_destroyed(state
, cmd
)) {
955 dnet_wakeup(w
, w
->cond
++);
960 if (cmd
->size
== sizeof(struct dnet_check_reply
)) {
961 struct dnet_check_reply
*r
= (struct dnet_check_reply
*)(cmd
+ 1);
963 dnet_convert_check_reply(r
);
965 dnet_log(state
->n
, DNET_LOG_INFO
, "check: total: %d, completed: %d, errors: %d\n",
966 r
->total
, r
->completed
, r
->errors
);
969 w
->status
= cmd
->status
;
973 static int dnet_send_check_request(struct dnet_net_state
*st
, struct dnet_id
*id
,
974 struct dnet_wait
*w
, struct dnet_check_request
*r
)
976 struct dnet_trans_control ctl
;
980 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
982 memcpy(&ctl
.id
, id
, sizeof(struct dnet_id
));
983 ctl
.cmd
= DNET_CMD_LIST
;
984 ctl
.complete
= dnet_check_complete
;
986 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| DNET_FLAGS_NOLOCK
;
989 localtime_r((time_t *)&r
->timestamp
, &tm
);
990 strftime(ctl_time
, sizeof(ctl_time
), "%F %R:%S %Z", &tm
);
992 snprintf(ctl_time
, sizeof(ctl_time
), "all records");
995 dnet_log(st
->n
, DNET_LOG_INFO
, "%s: check request: objects: %llu, threads: %llu, timestamp: %s, merge: %d\n",
996 dnet_state_dump_addr(st
), (unsigned long long)r
->obj_num
, (unsigned long long)r
->thread_num
,
997 ctl_time
, !!(r
->flags
& DNET_CHECK_MERGE
));
999 dnet_convert_check_request(r
);
1002 ctl
.size
= sizeof(*r
) + r
->obj_num
* sizeof(struct dnet_id
) + r
->group_num
* sizeof(int);
1004 return dnet_trans_alloc_send_state(st
, &ctl
);
1007 int dnet_request_check(struct dnet_node
*n
, struct dnet_check_request
*r
)
1009 struct dnet_wait
*w
;
1010 struct dnet_net_state
*st
;
1011 struct dnet_group
*g
;
1014 w
= dnet_wait_alloc(0);
1020 pthread_mutex_lock(&n
->state_lock
);
1021 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
1022 list_for_each_entry(st
, &g
->state_list
, state_entry
) {
1030 dnet_setup_id(&raw
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
1031 dnet_send_check_request(st
, &raw
, w
, r
);
1035 pthread_mutex_unlock(&n
->state_lock
);
1037 err
= dnet_wait_event(w
, w
->cond
== num
, &n
->wait_ts
);
1053 dnet_log(n
, DNET_LOG_ERROR
, "Check exited with status %d\n", err
);