2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
28 #include "elliptics.h"
29 #include "elliptics/interface.h"
31 static char dnet_check_tmp_dir
[] = "/dev/shm";
33 static int dnet_merge_remove_local(struct dnet_node
*n
, struct dnet_id
*id
, int full_process
)
35 char buf
[sizeof(struct dnet_cmd
)];
37 struct dnet_net_state
*base
;
40 memset(buf
, 0, sizeof(buf
));
42 cmd
= (struct dnet_cmd
*)buf
;
44 memcpy(&cmd
->id
, id
, sizeof(struct dnet_id
));
47 cmd
->cmd
= DNET_CMD_DEL
;
49 cmd
->flags
= DNET_ATTR_DELETE_HISTORY
;
51 base
= dnet_node_state(n
);
53 err
= dnet_process_meta(base
, cmd
, NULL
);
60 static int dnet_dump_meta_container(struct dnet_node
*n
, struct dnet_meta_container
*mc
)
64 char id_str
[DNET_ID_SIZE
*2+1];
66 snprintf(file
, sizeof(file
), "%s/%s.meta", dnet_check_tmp_dir
, dnet_dump_id_len_raw(mc
->id
.id
, DNET_ID_SIZE
, id_str
));
68 fd
= open(file
, O_RDWR
| O_TRUNC
| O_CREAT
| O_CLOEXEC
, 0644);
71 dnet_log_raw(n
, DNET_LOG_ERROR
, "Failed to open meta container file '%s': %s\n",
72 file
, strerror(errno
));
76 err
= write(fd
, mc
->data
, mc
->size
);
77 if (err
!= (int)mc
->size
) {
79 dnet_log_raw(n
, DNET_LOG_ERROR
, "Failed to write meta container into '%s': %s\n",
80 file
, strerror(errno
));
91 static int dnet_check_find_groups(struct dnet_node
*n
, struct dnet_meta_container
*mc
, int **groupsp
)
97 m
= dnet_meta_search(n
, mc
, DNET_META_GROUPS
);
99 dnet_log_raw(n
, DNET_LOG_ERROR
, "%s: failed to find groups metadata.\n", dnet_dump_id(&mc
->id
));
104 groups
= malloc(m
->size
);
109 memcpy(groups
, m
->data
, m
->size
);
111 num
= m
->size
/ sizeof(int32_t);
113 for (i
=0; i
<num
; ++i
) {
114 dnet_log_raw(n
, DNET_LOG_DEBUG
, "%s: group: %d\n", dnet_dump_id(&mc
->id
), groups
[i
]);
122 dnet_dump_meta_container(n
, mc
);
126 /*static int dnet_bulk_db_check_update(struct dnet_node *n, struct dnet_meta_container *mc_array, int *rec_num,
127 struct dnet_meta_container *mc, int final)
130 int64_t rec_processed;
131 KCREC recs[DNET_BULK_META_UPD_SIZE];
134 if (!rec_num || *rec_num > DNET_BULK_META_UPD_SIZE) {
141 dnet_log_raw(n, DNET_LOG_ERROR, "CHECK: mc should be passed\n");
144 memcpy(&mc_array[*rec_num].id, &mc->id, sizeof(struct dnet_id));
145 mc_array[*rec_num].size = mc->size;
146 // Allocate extra memory for potential META_CHECK_STATUS structure
147 mc_array[*rec_num].data = malloc(mc->size + sizeof(struct dnet_meta) + sizeof(struct dnet_meta_check_status));
148 if (!mc_array[*rec_num].data) {
152 memcpy(mc_array[*rec_num].data, mc->data, mc->size);
156 if (*rec_num == DNET_BULK_META_UPD_SIZE || (final && *rec_num > 0)) {
157 err = kcdbbegintran(n->meta, 0);
159 err = -kcdbecode(n->meta);
160 dnet_log_raw(n, DNET_LOG_ERROR, "CHECK: DB: failed to start %s transaction, err: %d: %s.\n",
161 "meta", err, kcecodename(-err));
164 for (rec_iter = 0; rec_iter < *rec_num; ++rec_iter) {
165 err = dnet_db_check_update(n, &mc_array[rec_iter]);
166 recs[rec_iter].key.size = DNET_ID_SIZE;
167 recs[rec_iter].key.buf = (char *)mc_array[rec_iter].id.id;
168 recs[rec_iter].value.size = mc_array[rec_iter].size;
169 recs[rec_iter].value.buf = mc_array[rec_iter].data;
171 rec_processed = kcdbsetbulk(n->meta, recs, *rec_num, 0);
172 if ((int)rec_processed != *rec_num) {
173 err = -kcdbecode(n->meta);
174 dnet_log_raw(n, DNET_LOG_ERROR, "CHECK: DB: failed to set check update stamps, %d records processed, err: %d: %s.\n",
175 (int)rec_processed, err, kcecodename(-err));
176 kcdbendtran(n->meta, 0);
180 kcdbendtran(n->meta, 1);
181 for (rec_iter = 0; rec_iter < *rec_num; ++rec_iter) {
182 free(mc_array[rec_iter].data);
192 int dnet_cmd_bulk_check(struct dnet_net_state
*orig
, struct dnet_cmd
*cmd
, void *data
)
194 struct dnet_bulk_id
*ids
= (struct dnet_bulk_id
*)data
;
195 struct dnet_meta_container mc
;
196 struct dnet_meta_update mu
;
202 if (!(cmd
->size
% sizeof(struct dnet_bulk_id
))) {
203 num
= cmd
->size
/ sizeof(struct dnet_bulk_id
);
205 dnet_log(orig
->n
, DNET_LOG_DEBUG
, "BULK: received %d entries\n", num
);
207 for (i
= 0; i
< num
; ++i
) {
209 /* Send empty reply every DNET_BULK_CHECK_PING records to prevent timeout */
210 if (i
% DNET_BULK_CHECK_PING
== 0 && i
> 0) {
211 dnet_send_reply(orig
, cmd
, NULL
, 0, 1);
214 dnet_log(orig
->n
, DNET_LOG_DEBUG
, "BULK: processing ID %s\n", dnet_dump_id_str(ids
[i
].id
.id
));
216 dnet_setup_id(&mc
.id
, 0, ids
[i
].id
.id
);
217 err
= orig
->n
->cb
->meta_read(orig
->n
->cb
->command_private
, &ids
[i
].id
, &mc
.data
);
220 dnet_log(orig
->n
, DNET_LOG_DEBUG
, "BULK: %d bytes of metadata found, searching for META_UPDATE group_id=%d\n",
221 mc
.size
, orig
->n
->st
->idc
->group
->group_id
);
222 if (dnet_get_meta_update(orig
->n
, &mc
, &mu
))
224 dnet_convert_meta_update(&ids
[i
].last_update
);
225 dnet_log(orig
->n
, DNET_LOG_DEBUG
, "BULK: mu.tsec=%lu, mu.tnsec=%lu, mu.flags=%02lx\n",
226 (unsigned long)mu
.tm
.tsec
, (unsigned long)mu
.tm
.tnsec
, (unsigned long)mu
.flags
);
227 dnet_log(orig
->n
, DNET_LOG_DEBUG
,
228 "BULK: last_update.tsec=%lu, last_update.tnsec=%lu, last_update.flags=%02lx\n",
229 (unsigned long)ids
[i
].last_update
.tm
.tsec
, (unsigned long)ids
[i
].last_update
.tm
.tnsec
,
230 (unsigned long)ids
[i
].last_update
.flags
);
232 if ((mu
.flags
& DNET_IO_FLAGS_REMOVED
) || (mu
.tm
.tsec
< ids
[i
].last_update
.tm
.tsec
) ||
233 ((mu
.tm
.tnsec
< ids
[i
].last_update
.tm
.tnsec
) && (mu
.tm
.tsec
== ids
[i
].last_update
.tm
.tsec
))) {
236 /* File is not needed to be updated */
237 dnet_setup_id(&raw
, orig
->n
->id
.group_id
, ids
[i
].id
.id
);
238 err
= dnet_stat_local(orig
, &raw
);
240 /* File was not found in the storage */
244 err
= dnet_meta_update_check_status(orig
->n
, &mc
);
246 dnet_log(orig
->n
, DNET_LOG_ERROR
,
247 "BULK: %s: couldn't update meta CHECK_STATUS err: %d\n",
248 dnet_dump_id_str(ids
[i
].id
.id
), err
);
253 memcpy(&ids
[i
].last_update
, &mu
, sizeof(struct dnet_meta_update
));
254 dnet_convert_meta_update(&ids
[i
].last_update
);
258 /* Meta is not present - set timestamp to very old one */
259 dnet_convert_meta_update(&ids
[i
].last_update
);
260 ids
[i
].last_update
.tm
.tsec
= 1;
261 ids
[i
].last_update
.flags
= 0;
262 dnet_convert_meta_update(&ids
[i
].last_update
);
266 dnet_log(orig
->n
, DNET_LOG_ERROR
, "BULK: received corrupted data, size = %llu, sizeof(dnet_bulk_id) = %zu\n",
267 (unsigned long long)cmd
->size
, sizeof(struct dnet_bulk_id
));
272 return dnet_send_reply(orig
, cmd
, data
, sizeof(struct dnet_bulk_id
) * num
, 0);
278 struct dnet_bulk_check_priv
{
280 struct dnet_check_temp_db
*db
;
286 static int dnet_bulk_check_complete_single(struct dnet_net_state
*state
, struct dnet_bulk_id
*ids
,
287 int remote_group
, struct dnet_bulk_check_priv
*p
)
289 struct dnet_meta_container mc
;
290 struct dnet_meta_container temp_mc
;
291 struct dnet_meta_update
*mu
;
292 struct dnet_meta
*mg
;
294 char *tmpdata
= NULL
;
295 int *groups
, group_num
= 1;
296 int err
= -EINVAL
, error
= 0;
298 int my_group
, lastest_group
= -1;
299 struct dnet_meta_update lastest_mu
, my_mu
;
300 struct timeval current_ts
;
301 int removed_in_all
= 1, updated
= 0;
304 uint64_t ioflags
= 0;
306 my_group
= state
->n
->id
.group_id
;
308 dnet_log(state
->n
, DNET_LOG_DEBUG
, "BULK: checking ID %s\n", dnet_dump_id_str(ids
->id
.id
));
312 dnet_setup_id(&mc
.id
, my_group
, ids
->id
.id
);
314 err
= state
->n
->cb
->meta_read(state
->n
->cb
->command_private
, &ids
->id
, &mc
.data
);
318 goto err_out_continue
;
322 /* Set current group meta_update as lastest_mu */
323 if (!dnet_get_meta_update(state
->n
, &mc
, &my_mu
)) {
324 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: meta_update structure doesn't exist for group %d\n",
325 dnet_dump_id_str(ids
->id
.id
), my_group
);
329 dnet_convert_meta_update(&my_mu
);
330 memcpy(&lastest_mu
, &my_mu
, sizeof(struct dnet_meta_update
));
331 lastest_group
= my_group
;
335 /* groups came from dnet_check utility */
337 group_num
= p
->group_num
;
339 mg
= dnet_meta_search(state
->n
, &mc
, DNET_META_GROUPS
);
341 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: DNET_META_GROUPS structure doesn't exist\n", dnet_dump_id_str(ids
->id
.id
));
345 dnet_convert_meta(mg
);
346 if (mg
->size
% sizeof(int)) {
347 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: DNET_META_GROUPS structure is corrupted\n", dnet_dump_id_str(ids
->id
.id
));
351 group_num
= mg
->size
/ sizeof(int);
352 groups
= (int *)mg
->data
;
353 dnet_convert_meta(mg
);
356 /* Read temporary meta */
357 temp_mc
.data
= malloc(sizeof(struct dnet_meta_update
) * group_num
);
360 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: could not allocate memory for temp UPDATE_META\n", dnet_dump_id_str(ids
->id
.id
));
363 memset(temp_mc
.data
, 0, sizeof(struct dnet_meta_update
) * group_num
);
364 temp_mc
.size
= sizeof(struct dnet_meta_update
) * group_num
;
366 err
= dnet_db_read_raw(p
->db
->b
, &ids
->id
, (void **)&tmpdata
);
368 if (err
< 0 && err
!= -2)
370 /* No data in temp meta was stored. Placing local meta_update at the beginning */
372 mu
[0].group_id
= my_group
;
374 mu
[0].flags
= my_mu
.flags
;
377 if (err
> (int)(sizeof(struct dnet_meta_update
) * group_num
)) {
378 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: too many data stored in temp meta\n", dnet_dump_id_str(ids
->id
.id
));
382 memcpy(temp_mc
.data
, tmpdata
, err
);
385 /* Update temp meta with received group */
389 for (i
= 0; i
< group_num
; ++i
) {
390 if (mu
[i
].group_id
== remote_group
) {
391 mu
[i
].tm
= ids
->last_update
.tm
;
392 mu
[i
].flags
= ids
->last_update
.flags
;
396 if (mu
[i
].group_id
== 0)
399 if (!(mu
[i
].flags
& DNET_IO_FLAGS_REMOVED
))
402 if (((mu
[i
].tm
.tsec
> mu
[lastest
].tm
.tsec
)
403 || ((mu
[i
].tm
.tsec
== mu
[lastest
].tm
.tsec
) && (mu
[i
].tm
.tnsec
> mu
[lastest
].tm
.tnsec
)))
406 lastest_group
= groups
[i
];
410 if (!updated
&& i
== group_num
) {
411 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: no space left to save group in temp meta!\n", dnet_dump_id_str(ids
->id
.id
));
416 mu
[i
].group_id
= remote_group
;
417 mu
[i
].tm
= ids
->last_update
.tm
;
418 mu
[i
].flags
= ids
->last_update
.flags
;
420 if (((mu
[i
].tm
.tsec
> mu
[lastest
].tm
.tsec
)
421 || ((mu
[i
].tm
.tsec
== mu
[lastest
].tm
.tsec
) && (mu
[i
].tm
.tnsec
> mu
[lastest
].tm
.tnsec
)))
424 lastest_group
= groups
[i
];
430 /* Not all groups processed yet */
433 err
= dnet_db_write_raw(p
->db
->b
, &ids
->id
, temp_mc
.data
, temp_mc
.size
);
435 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: unable to save temp meta, err: %d\n", dnet_dump_id_str(ids
->id
.id
), err
);
440 /* Check if removal_delay second has gone since object was marked as REMOVED */
441 if (removed_in_all
) {
442 gettimeofday(¤t_ts
, NULL
);
443 if (((uint64_t)current_ts
.tv_sec
< mu
[lastest
].tm
.tsec
)
444 || ((uint64_t)current_ts
.tv_sec
- mu
[lastest
].tm
.tsec
) < (uint64_t)(state
->n
->removal_delay
* 3600 * 24))
448 /* TODO: receive newer files from remote groups
450 * Yep, we should read it locally and send it to other groups too
452 if ((lastest_group
!= my_group
) && !(mu
[lastest
].flags
& DNET_IO_FLAGS_REMOVED
)) {
453 dnet_log(state
->n
, DNET_LOG_DEBUG
, "BULK: %s: File on remote group %d is newer, skipping this file\n",
454 dnet_dump_id_str(ids
->id
.id
), lastest_group
);
459 for (i
= 0; i
< group_num
; ++i
) {
461 if (mu
[i
].group_id
== my_group
)
464 dnet_setup_id(&id
, mu
[i
].group_id
, ids
->id
.id
);
467 if (mu
[lastest
].flags
& DNET_IO_FLAGS_REMOVED
) {
468 if (removed_in_all
) {
469 dnet_log(state
->n
, DNET_LOG_DEBUG
, "BULK: dnet_remove_object_now %s in group %d, err=%d\n",
470 dnet_dump_id(&id
), mu
[i
].group_id
, err
);
471 err
= dnet_remove_object_now(state
->n
, &id
, cflags
, ioflags
);
473 if (!(mu
[i
].flags
& DNET_IO_FLAGS_REMOVED
)) {
474 err
= dnet_remove_object(state
->n
, &id
, NULL
, NULL
, cflags
, ioflags
);
475 dnet_log(state
->n
, DNET_LOG_DEBUG
, "BULK: dnet_remove_object %s in group %d err=%d\n",
476 dnet_dump_id(&id
), mu
[i
].group_id
, err
);
482 if ((mu
[i
].tm
.tsec
< mu
[lastest
].tm
.tsec
) || ((mu
[i
].tm
.tsec
== mu
[lastest
].tm
.tsec
) &&
483 ((mu
[i
].tm
.tnsec
< mu
[lastest
].tm
.tnsec
)))) {
484 err
= state
->n
->cb
->send(state
, state
->n
->cb
->command_private
, &id
);
489 err
= dnet_meta_update_check_status_raw(state
->n
, &mc
);
493 memcpy(&mc
.id
, &id
, sizeof(struct dnet_id
));
494 err
= dnet_write_metadata(state
->n
, &mc
, 1, cflags
);
495 dnet_log(state
->n
, DNET_LOG_DEBUG
, "BULK: dnet_write_metadata %s in group %d, err=%d\n",
496 dnet_dump_id(&id
), my_group
, err
);
504 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: Error during sending transaction to group %d, err=%d\n",
505 dnet_dump_id_str(ids
->id
.id
), groups
[i
], err
);
506 if (!error
&& err
< 0)
510 if (mu
[lastest
].flags
& DNET_IO_FLAGS_REMOVED
) {
511 if (removed_in_all
) {
512 err
= dnet_merge_remove_local(state
->n
, &mc
.id
, 0);
513 } else if (!(my_mu
.flags
& DNET_IO_FLAGS_REMOVED
)) {
514 err
= dnet_merge_remove_local(state
->n
, &mc
.id
, 1);
518 if (!(mu
[lastest
].flags
& DNET_IO_FLAGS_REMOVED
) && !error
) {
519 err
= dnet_meta_update_check_status(state
->n
, &mc
);
521 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: %s: couldn't update meta CHECK_STATUS\n", dnet_dump_id_str(ids
->id
.id
));
526 err
= dnet_db_remove_raw(p
->db
->b
, &ids
->id
, 1);
528 dnet_log_raw(state
->n
, DNET_LOG_ERROR
, "BULK: %s: DB: failed to remove temp_meta object, err: %d.\n",
529 dnet_dump_id(&mc
.id
), err
);
543 dnet_log(state
->n
, DNET_LOG_ERROR
, "Failed to check ID %s to %s, err=%d\n", dnet_dump_id_str(ids
->id
.id
),
544 dnet_state_dump_addr(state
), error
);
546 if (i
== group_num
) {
547 dnet_counter_inc(state
->n
, DNET_CNTR_NODE_CHECK_COPY
, error
);
553 static int dnet_bulk_check_complete(struct dnet_net_state
*state
, struct dnet_cmd
*cmd
, void *priv
)
555 struct dnet_bulk_check_priv
*p
= priv
;
558 if (is_trans_destroyed(state
, cmd
)) {
559 dnet_wakeup(p
->w
, p
->w
->cond
++);
561 dnet_check_temp_db_put(p
->db
);
562 if (atomic_dec_and_test(&p
->refcnt
)) {
569 /* Empty reply that prevents timeout */
570 if (cmd
->size
== 0) {
574 if (!(cmd
->size
% sizeof(struct dnet_bulk_id
))) {
575 struct dnet_bulk_id
*ids
= (struct dnet_bulk_id
*)(cmd
+ 1);
576 int num
= cmd
->size
/ sizeof(struct dnet_bulk_id
);
578 dnet_log(state
->n
, DNET_LOG_DEBUG
, "BULK: received %d entries\n", num
);
580 //dnet_db_ptr_get(&state->n->temp_meta);
581 //ret = kcdbbegintran(state->n->temp_meta.db, 0);
583 // err = -kcdbecode(state->n->temp_meta.db);
584 // dnet_log_raw(state->n, DNET_LOG_ERROR, "BULK: DB: failed to start temp_meta transaction, err: %d: %s.\n",
585 // err, kcecodename(-err));
589 for (i
= 0; i
< num
&& !state
->n
->need_exit
; ++i
) {
590 err
= dnet_bulk_check_complete_single(state
, &ids
[i
], cmd
->id
.group_id
, p
);
593 //kcdbendtran(state->n->temp_meta.db, 1);
594 //dnet_db_ptr_put(state->n, &state->n->temp_meta);
597 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: couldn't update meta CHECK_STATUS\n");
600 dnet_log(state
->n
, DNET_LOG_ERROR
, "BULK: received corrupted data, size = %llu, sizeof(dnet_bulk_id) = %zu\n",
601 (unsigned long long)cmd
->size
, sizeof(struct dnet_bulk_id
));
604 p
->w
->status
= cmd
->status
;
608 int dnet_request_bulk_check(struct dnet_node
*n
, struct dnet_bulk_state
*state
, struct dnet_check_params
*params
)
610 struct dnet_trans_control ctl
;
611 struct dnet_net_state
*st
;
612 struct dnet_bulk_check_priv
*p
;
613 struct timespec wait_ts
;
616 p
= (struct dnet_bulk_check_priv
*)malloc(sizeof(struct dnet_bulk_check_priv
));
621 atomic_init(&p
->refcnt
, 2);
623 dnet_check_temp_db_get(p
->db
);
625 p
->w
= dnet_wait_alloc(0);
631 p
->group_num
= params
->group_num
;
632 p
->groups
= (int *)malloc(sizeof(int) * params
->group_num
);
637 memcpy(p
->groups
, params
->groups
, sizeof(int) * params
->group_num
);
639 memset(&ctl
, 0, sizeof(struct dnet_trans_control
));
641 ctl
.cmd
= DNET_CMD_LIST
;
642 ctl
.complete
= dnet_bulk_check_complete
;
644 ctl
.cflags
= DNET_FLAGS_NEED_ACK
| DNET_FLAGS_NOLOCK
| DNET_ATTR_BULK_CHECK
;
646 ctl
.data
= state
->ids
;
647 ctl
.size
= sizeof(struct dnet_bulk_id
) * state
->num
;
649 st
= dnet_state_search_by_addr(n
, &state
->addr
);
654 dnet_setup_id(&ctl
.id
, st
->idc
->group
->group_id
, st
->idc
->ids
[0].raw
.id
);
657 dnet_log(n
, DNET_LOG_DEBUG
, "BULK: sending %u bytes of data to %s (%s)\n",
658 ctl
.size
, dnet_dump_id(&ctl
.id
), dnet_server_convert_dnet_addr(&state
->addr
));
659 err
= dnet_trans_alloc_send_state(st
, &ctl
);
662 wait_ts
= n
->wait_ts
;
663 wait_ts
.tv_sec
*= DNET_BULK_IDS_SIZE
;
664 err
= dnet_wait_event(p
->w
, p
->w
->cond
!= 0, &wait_ts
);
675 if (atomic_dec_and_test(&p
->refcnt
)) {
686 if (atomic_dec_and_test(&p
->refcnt
)) {
692 dnet_log(n
, DNET_LOG_ERROR
, "Bulk check exited with status %d\n", err
);
696 static int dnet_bulk_add_id(struct dnet_node
*n
, struct dnet_bulk_array
*bulk_array
, struct dnet_id
*id
,
697 struct dnet_meta_container
*mc
, struct dnet_check_params
*params
)
700 struct dnet_bulk_state tmp
;
701 struct dnet_bulk_state
*state
= NULL
;
702 struct dnet_net_state
*st
= dnet_state_get_first(n
, id
);
703 struct dnet_bulk_id
*bulk_id
;
704 struct dnet_meta_update mu
;
706 dnet_log(n
, DNET_LOG_DEBUG
, "BULK: adding ID %s to array\n", dnet_dump_id(id
));
710 memcpy(&tmp
.addr
, &st
->addr
, sizeof(struct dnet_addr
));
713 dnet_log(n
, DNET_LOG_DEBUG
, "BULK: Searching state in states array\n");
714 state
= bsearch(&tmp
, bulk_array
->states
, bulk_array
->num
, sizeof(struct dnet_bulk_state
), dnet_compare_bulk_state
);
718 if (!dnet_get_meta_update(n
, mc
, &mu
))
721 dnet_log(n
, DNET_LOG_DEBUG
, "BULK: addr = %s state->num = %d\n", dnet_server_convert_dnet_addr(&state
->addr
), state
->num
);
722 //pthread_mutex_lock(&state->state_lock);
723 if (state
->num
>= DNET_BULK_IDS_SIZE
|| state
->num
< 0)
726 bulk_id
= &state
->ids
[state
->num
];
727 memset(bulk_id
, 0, sizeof(struct dnet_bulk_id
));
729 memcpy(&bulk_id
->id
, &id
->id
, DNET_ID_SIZE
);
731 dnet_log(n
, DNET_LOG_DEBUG
, "BULK: ID: %s, last_update->tsec=%llu, last_update->tnsec=%llu, flags=%02llx\n",
732 dnet_dump_id_str(bulk_id
->id
.id
), (unsigned long long)mu
.tm
.tsec
, (unsigned long long)mu
.tm
.tnsec
,
733 (unsigned long long)mu
.flags
);
735 dnet_convert_meta_update(&mu
);
737 memcpy(&bulk_id
->last_update
, &mu
, sizeof(struct dnet_meta_update
));
741 dnet_log(n
, DNET_LOG_DEBUG
, "BULK: addr = %s state->num = %d\n", dnet_server_convert_dnet_addr(&state
->addr
), state
->num
);
742 if (state
->num
== DNET_BULK_IDS_SIZE
) {
743 err
= dnet_request_bulk_check(n
, state
, params
);
749 //pthread_mutex_unlock(&state->state_lock);
754 //pthread_mutex_unlock(&state->state_lock);
758 static int dnet_check_number_of_copies(struct dnet_node
*n
, struct dnet_meta_container
*mc
, int *groups
, int group_num
,
759 struct dnet_bulk_array
*bulk_array
, struct dnet_check_params
*params
)
762 int group_id
= mc
->id
.group_id
;
763 int err
= 0, i
, error
= 0;
765 for (i
=0; i
<group_num
; ++i
) {
766 if (groups
[i
] == group_id
)
769 dnet_setup_id(&raw
, groups
[i
], mc
->id
.id
);
771 err
= dnet_bulk_add_id(n
, bulk_array
, &raw
, mc
, params
);
773 dnet_log(n
, DNET_LOG_ERROR
, "BULK: after adding ID %s err = %d\n", dnet_dump_id(&raw
), err
);
784 static int dnet_check_copies(struct dnet_node
*n
, struct dnet_meta_container
*mc
,
785 struct dnet_bulk_array
*bulk_array
, struct dnet_check_params
*params
)
790 if (params
->group_num
) {
791 groups
= params
->groups
;
792 err
= params
->group_num
;
794 err
= dnet_check_find_groups(n
, mc
, &groups
);
799 err
= dnet_check_number_of_copies(n
, mc
, groups
, err
, bulk_array
, params
);
801 if (!params
->group_num
)
807 static int dnet_merge_direct(struct dnet_node
*n
, struct dnet_meta_container
*mc
)
809 struct dnet_net_state
*base
;
813 dnet_log(n
, DNET_LOG_DEBUG
, "in dnet_merge_direct mc->size = %u\n", mc
->size
);
814 base
= dnet_node_state(n
);
820 err
= n
->cb
->send(base
, n
->cb
->command_private
, &mc
->id
);
821 dnet_log(n
, DNET_LOG_DEBUG
, "in dnet_merge_direct after n->cb->send err = %d\n\n", err
);
825 dnet_log(n
, DNET_LOG_DEBUG
, "in dnet_merge_direct2 mc->size = %u\n", mc
->size
);
826 err
= dnet_write_metadata(n
, mc
, 0, cflags
);
833 dnet_state_put(base
);
839 static int dnet_merge_upload(struct dnet_node *n, struct dnet_meta_container *mc)
841 struct dnet_net_state *base;
844 base = dnet_node_state(n);
850 err = n->cb->send(base, n->cb->command_private, &mc->id);
854 err = dnet_write_metadata(n, mc, 0);
859 dnet_state_put(base);
865 static int dnet_merge_common(struct dnet_node
*n
, struct dnet_meta_container
*remote_meta
, struct dnet_meta_container
*mc
)
868 struct dnet_meta_update local
, remote
;
870 uint64_t ioflags
= 0;
872 dnet_log(n
, DNET_LOG_DEBUG
, "in dnet_merge_common mc->size = %d\n", mc
->size
);
873 if (!dnet_get_meta_update(n
, mc
, &local
)) {
875 dnet_log(n
, DNET_LOG_ERROR
, "%s: META_UPDATE not found in local meta\n", dnet_dump_id(&mc
->id
));
879 if (!dnet_get_meta_update(n
, remote_meta
, &remote
)) {
881 dnet_log(n
, DNET_LOG_ERROR
, "%s: META_UPDATE not found in remote meta, perform direct merge\n", dnet_dump_id(&mc
->id
));
882 err
= dnet_merge_direct(n
, mc
);
886 if ((local
.tm
.tsec
> remote
.tm
.tsec
) || (local
.tm
.tsec
== remote
.tm
.tsec
&& local
.tm
.tnsec
> remote
.tm
.tnsec
)) {
887 if (local
.flags
& DNET_IO_FLAGS_REMOVED
) {
888 err
= dnet_remove_object_now(n
, &mc
->id
, cflags
, ioflags
);
890 err
= dnet_merge_direct(n
, mc
);
899 static int dnet_check_merge(struct dnet_node
*n
, struct dnet_meta_container
*mc
)
902 struct dnet_meta_container remote_mc
;
904 dnet_log(n
, DNET_LOG_DEBUG
, "in dnet_check_merge mc->size = %d\n", mc
->size
);
905 memset(&remote_mc
, 0, sizeof(struct dnet_meta_container
));
907 err
= dnet_read_meta(n
, &remote_mc
, NULL
, 0, &mc
->id
);
909 if (err
!= -ENOENT
) {
910 dnet_log_raw(n
, DNET_LOG_ERROR
, "%s: failed to download object to be merged from storage: %d.\n",
911 dnet_dump_id(&mc
->id
), err
);
915 dnet_log_raw(n
, DNET_LOG_INFO
, "%s: there is no meta in the storage to merge with, "
916 "doing direct merge (plain upload).\n", dnet_dump_id(&mc
->id
));
917 err
= dnet_merge_direct(n
, mc
);
920 err
= dnet_merge_common(n
, &remote_mc
, mc
);
928 free(remote_mc
.data
);
932 int dnet_check(struct dnet_node
*n
, struct dnet_meta_container
*mc
, struct dnet_bulk_array
*bulk_array
,
933 int need_merge
, struct dnet_check_params
*params
)
937 dnet_log(n
, DNET_LOG_DEBUG
, "need_merge = %d, mc.size = %d\n", need_merge
, mc
->size
);
939 err
= dnet_check_merge(n
, mc
);
940 dnet_log(n
, DNET_LOG_DEBUG
, "err=%d\n", err
);
942 dnet_merge_remove_local(n
, &mc
->id
, 0);
944 err
= dnet_check_copies(n
, mc
, bulk_array
, params
);