Drop unused config options
[elliptics.git] / library / check.c
blob78699fa37bbb9171ab7fc20c7ea0ac06c5c4ecbc
1 /*
2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include "config.h"
18 #include <sys/types.h>
19 #include <sys/stat.h>
20 #include <sys/mman.h>
22 #include <dirent.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
30 #include "elliptics.h"
31 #include "elliptics/interface.h"
33 static char dnet_check_tmp_dir[] = "/dev/shm";
35 static int dnet_merge_remove_local(struct dnet_node *n, struct dnet_id *id, int full_process)
37 char buf[sizeof(struct dnet_cmd)];
38 struct dnet_cmd *cmd;
39 struct dnet_net_state *base;
40 int err = -ENOENT;
42 memset(buf, 0, sizeof(buf));
44 cmd = (struct dnet_cmd *)buf;
46 memcpy(&cmd->id, id, sizeof(struct dnet_id));
47 cmd->size = 0;
49 cmd->cmd = DNET_CMD_DEL;
50 if (!full_process)
51 cmd->flags = DNET_ATTR_DELETE_HISTORY;
53 base = dnet_node_state(n);
54 if (base) {
55 err = dnet_process_meta(base, cmd, NULL);
56 dnet_state_put(base);
59 return err;
62 static int dnet_dump_meta_container(struct dnet_node *n, struct dnet_meta_container *mc)
64 int fd, err;
65 char file[256];
66 char id_str[DNET_ID_SIZE*2+1];
68 snprintf(file, sizeof(file), "%s/%s.meta", dnet_check_tmp_dir, dnet_dump_id_len_raw(mc->id.id, DNET_ID_SIZE, id_str));
70 fd = open(file, O_RDWR | O_TRUNC | O_CREAT | O_CLOEXEC, 0644);
71 if (fd < 0) {
72 err = -errno;
73 dnet_log_raw(n, DNET_LOG_ERROR, "Failed to open meta container file '%s': %s\n",
74 file, strerror(errno));
75 goto err_out_exit;
78 err = write(fd, mc->data, mc->size);
79 if (err != (int)mc->size) {
80 err = -errno;
81 dnet_log_raw(n, DNET_LOG_ERROR, "Failed to write meta container into '%s': %s\n",
82 file, strerror(errno));
83 goto err_out_close;
85 err = 0;
87 err_out_close:
88 close(fd);
89 err_out_exit:
90 return err;
93 static int dnet_check_find_groups(struct dnet_node *n, struct dnet_meta_container *mc, int **groupsp)
95 int err, i, num;
96 struct dnet_meta *m;
97 int *groups;
99 m = dnet_meta_search(n, mc, DNET_META_GROUPS);
100 if (!m) {
101 dnet_log_raw(n, DNET_LOG_ERROR, "%s: failed to find groups metadata.\n", dnet_dump_id(&mc->id));
102 err = -ENOENT;
103 goto err_out_exit;
106 groups = malloc(m->size);
107 if (!groups) {
108 err = -ENOMEM;
109 goto err_out_exit;
111 memcpy(groups, m->data, m->size);
113 num = m->size / sizeof(int32_t);
115 for (i=0; i<num; ++i) {
116 dnet_log_raw(n, DNET_LOG_DSA, "%s: group: %d\n", dnet_dump_id(&mc->id), groups[i]);
119 *groupsp = groups;
121 return num;
123 err_out_exit:
124 dnet_dump_meta_container(n, mc);
125 return err;
128 /*static int dnet_bulk_db_check_update(struct dnet_node *n, struct dnet_meta_container *mc_array, int *rec_num,
129 struct dnet_meta_container *mc, int final)
131 int err = 0;
132 int64_t rec_processed;
133 KCREC recs[DNET_BULK_META_UPD_SIZE];
134 int rec_iter = 0;
136 if (!rec_num || *rec_num > DNET_BULK_META_UPD_SIZE) {
137 err = -EINVAL;
138 goto err_out_exit;
141 if (!final) {
142 if (!mc) {
143 dnet_log_raw(n, DNET_LOG_ERROR, "CHECK: mc should be passed\n");
144 return -1;
146 memcpy(&mc_array[*rec_num].id, &mc->id, sizeof(struct dnet_id));
147 mc_array[*rec_num].size = mc->size;
148 // Allocate extra memory for potential META_CHECK_STATUS structure
149 mc_array[*rec_num].data = malloc(mc->size + sizeof(struct dnet_meta) + sizeof(struct dnet_meta_check_status));
150 if (!mc_array[*rec_num].data) {
151 err = -ENOMEM;
152 goto err_out_exit;
154 memcpy(mc_array[*rec_num].data, mc->data, mc->size);
155 (*rec_num)++;
158 if (*rec_num == DNET_BULK_META_UPD_SIZE || (final && *rec_num > 0)) {
159 err = kcdbbegintran(n->meta, 0);
160 if (!err) {
161 err = -kcdbecode(n->meta);
162 dnet_log_raw(n, DNET_LOG_ERROR, "CHECK: DB: failed to start %s transaction, err: %d: %s.\n",
163 "meta", err, kcecodename(-err));
164 goto err_out_exit;
166 for (rec_iter = 0; rec_iter < *rec_num; ++rec_iter) {
167 err = dnet_db_check_update(n, &mc_array[rec_iter]);
168 recs[rec_iter].key.size = DNET_ID_SIZE;
169 recs[rec_iter].key.buf = (char *)mc_array[rec_iter].id.id;
170 recs[rec_iter].value.size = mc_array[rec_iter].size;
171 recs[rec_iter].value.buf = mc_array[rec_iter].data;
173 rec_processed = kcdbsetbulk(n->meta, recs, *rec_num, 0);
174 if ((int)rec_processed != *rec_num) {
175 err = -kcdbecode(n->meta);
176 dnet_log_raw(n, DNET_LOG_ERROR, "CHECK: DB: failed to set check update stamps, %d records processed, err: %d: %s.\n",
177 (int)rec_processed, err, kcecodename(-err));
178 kcdbendtran(n->meta, 0);
179 goto err_out_exit;
182 kcdbendtran(n->meta, 1);
183 for (rec_iter = 0; rec_iter < *rec_num; ++rec_iter) {
184 free(mc_array[rec_iter].data);
186 *rec_num = 0;
189 err_out_exit:
190 return err;
194 int dnet_cmd_bulk_check(struct dnet_net_state *orig, struct dnet_cmd *cmd, void *data)
196 struct dnet_bulk_id *ids = (struct dnet_bulk_id *)data;
197 struct dnet_meta_container mc;
198 struct dnet_meta_update mu;
199 struct dnet_id raw;
200 int i;
201 int err = 0;
202 int num;
204 if (!(cmd->size % sizeof(struct dnet_bulk_id))) {
205 num = cmd->size / sizeof(struct dnet_bulk_id);
207 dnet_log(orig->n, DNET_LOG_DSA, "BULK: received %d entries\n", num);
209 for (i = 0; i < num; ++i) {
211 /* Send empty reply every DNET_BULK_CHECK_PING records to prevent timeout */
212 if (i % DNET_BULK_CHECK_PING == 0 && i > 0) {
213 dnet_send_reply(orig, cmd, NULL, 0, 1);
216 dnet_log(orig->n, DNET_LOG_DSA, "BULK: processing ID %s\n", dnet_dump_id_str(ids[i].id.id));
217 mc.data = NULL;
218 dnet_setup_id(&mc.id, 0, ids[i].id.id);
219 err = orig->n->cb->meta_read(orig->n->cb->command_private, &ids[i].id, &mc.data);
220 if (mc.data) {
221 mc.size = err;
222 dnet_log(orig->n, DNET_LOG_DSA, "BULK: %d bytes of metadata found, searching for META_UPDATE group_id=%d\n",
223 mc.size, orig->n->st->idc->group->group_id);
224 if (dnet_get_meta_update(orig->n, &mc, &mu))
226 dnet_convert_meta_update(&ids[i].last_update);
227 dnet_log(orig->n, DNET_LOG_DSA, "BULK: mu.tsec=%lu, mu.tnsec=%lu, mu.flags=%02lx\n",
228 (unsigned long)mu.tm.tsec, (unsigned long)mu.tm.tnsec, (unsigned long)mu.flags);
229 dnet_log(orig->n, DNET_LOG_DSA,
230 "BULK: last_update.tsec=%lu, last_update.tnsec=%lu, last_update.flags=%02lx\n",
231 (unsigned long)ids[i].last_update.tm.tsec, (unsigned long)ids[i].last_update.tm.tnsec,
232 (unsigned long)ids[i].last_update.flags);
234 if ((mu.flags & DNET_IO_FLAGS_REMOVED) || (mu.tm.tsec < ids[i].last_update.tm.tsec) ||
235 ((mu.tm.tnsec < ids[i].last_update.tm.tnsec) && (mu.tm.tsec == ids[i].last_update.tm.tsec))) {
236 err = 0;
237 } else {
238 /* File is not needed to be updated */
239 dnet_setup_id(&raw, orig->n->id.group_id, ids[i].id.id);
240 err = dnet_stat_local(orig, &raw);
241 if (err) {
242 /* File was not found in the storage */
243 mu.tm.tsec = 1;
244 mu.flags = 0;
245 } else {
246 err = dnet_meta_update_check_status(orig->n, &mc);
247 if (err) {
248 dnet_log(orig->n, DNET_LOG_ERROR,
249 "BULK: %s: couldn't update meta CHECK_STATUS err: %d\n",
250 dnet_dump_id_str(ids[i].id.id), err);
255 memcpy(&ids[i].last_update, &mu, sizeof(struct dnet_meta_update));
256 dnet_convert_meta_update(&ids[i].last_update);
258 free(mc.data);
259 } else {
260 /* Meta is not present - set timestamp to very old one */
261 dnet_convert_meta_update(&ids[i].last_update);
262 ids[i].last_update.tm.tsec = 1;
263 ids[i].last_update.flags = 0;
264 dnet_convert_meta_update(&ids[i].last_update);
267 } else {
268 dnet_log(orig->n, DNET_LOG_ERROR, "BULK: received corrupted data, size = %llu, sizeof(dnet_bulk_id) = %zu\n",
269 (unsigned long long)cmd->size, sizeof(struct dnet_bulk_id));
270 err = -1;
271 goto err_out_exit;
274 return dnet_send_reply(orig, cmd, data, sizeof(struct dnet_bulk_id) * num, 0);
276 err_out_exit:
277 return err;
280 struct dnet_bulk_check_priv {
281 struct dnet_wait *w;
282 struct dnet_check_temp_db *db;
283 atomic_t refcnt;
284 int group_num;
285 int *groups;
288 static int dnet_bulk_check_complete_single(struct dnet_net_state *state, struct dnet_bulk_id *ids,
289 int remote_group, struct dnet_bulk_check_priv *p)
291 struct dnet_meta_container mc;
292 struct dnet_meta_container temp_mc;
293 struct dnet_meta_update *mu;
294 struct dnet_meta *mg;
295 struct dnet_id id;
296 char *tmpdata = NULL;
297 int *groups, group_num = 1;
298 int err = -EINVAL, error = 0;
299 int i = 0;
300 int my_group, lastest_group = -1;
301 struct dnet_meta_update lastest_mu, my_mu;
302 struct timeval current_ts;
303 int removed_in_all = 1, updated = 0;
304 int lastest = 0;
305 uint64_t cflags = 0;
307 my_group = state->n->id.group_id;
309 dnet_log(state->n, DNET_LOG_DSA, "BULK: checking ID %s\n", dnet_dump_id_str(ids->id.id));
310 err = -ENOENT;
311 error = 0;
313 dnet_setup_id(&mc.id, my_group, ids->id.id);
315 err = state->n->cb->meta_read(state->n->cb->command_private, &ids->id, &mc.data);
316 if (err <= 0) {
317 if (err == 0)
318 err = -ENOENT;
319 goto err_out_continue;
321 mc.size = err;
323 /* Set current group meta_update as lastest_mu */
324 if (!dnet_get_meta_update(state->n, &mc, &my_mu)) {
325 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: meta_update structure doesn't exist for group %d\n",
326 dnet_dump_id_str(ids->id.id), my_group);
327 err = -ENOENT;
328 goto err_out_kcfree;
330 dnet_convert_meta_update(&my_mu);
331 memcpy(&lastest_mu, &my_mu, sizeof(struct dnet_meta_update));
332 lastest_group = my_group;
334 /* Get group list */
335 if (p->group_num) {
336 /* groups came from dnet_check utility */
337 groups = p->groups;
338 group_num = p->group_num;
339 } else {
340 mg = dnet_meta_search(state->n, &mc, DNET_META_GROUPS);
341 if (!mg) {
342 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: DNET_META_GROUPS structure doesn't exist\n", dnet_dump_id_str(ids->id.id));
343 err = -ENOENT;
344 goto err_out_kcfree;
346 dnet_convert_meta(mg);
347 if (mg->size % sizeof(int)) {
348 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: DNET_META_GROUPS structure is corrupted\n", dnet_dump_id_str(ids->id.id));
349 err = -1;
350 goto err_out_kcfree;
352 group_num = mg->size / sizeof(int);
353 groups = (int *)mg->data;
354 dnet_convert_meta(mg);
357 /* Read temporary meta */
358 temp_mc.data = malloc(sizeof(struct dnet_meta_update) * group_num);
359 if (!temp_mc.data) {
360 err = -ENOMEM;
361 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: could not allocate memory for temp UPDATE_META\n", dnet_dump_id_str(ids->id.id));
362 goto err_out_kcfree;
364 memset(temp_mc.data, 0, sizeof(struct dnet_meta_update) * group_num);
365 temp_mc.size = sizeof(struct dnet_meta_update) * group_num;
367 err = dnet_db_read_raw(p->db->b, &ids->id, (void **)&tmpdata);
368 if (err <= 0) {
369 if (err < 0 && err != -2)
370 goto err_out_free;
371 /* No data in temp meta was stored. Placing local meta_update at the beginning */
372 mu = temp_mc.data;
373 mu[0].group_id = my_group;
374 mu[0].tm = my_mu.tm;
375 mu[0].flags = my_mu.flags;
377 } else {
378 if (err > (int)(sizeof(struct dnet_meta_update) * group_num)) {
379 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: too many data stored in temp meta\n", dnet_dump_id_str(ids->id.id));
380 err = -ENOMEM;
381 goto err_out_free;
383 memcpy(temp_mc.data, tmpdata, err);
386 /* Update temp meta with received group */
387 mu = temp_mc.data;
388 updated = 0;
389 lastest = 0;
390 for (i = 0; i < group_num; ++i) {
391 if (mu[i].group_id == remote_group) {
392 mu[i].tm = ids->last_update.tm;
393 mu[i].flags = ids->last_update.flags;
394 updated = 1;
397 if (mu[i].group_id == 0)
398 break;
400 if (!(mu[i].flags & DNET_IO_FLAGS_REMOVED))
401 removed_in_all = 0;
403 if (((mu[i].tm.tsec > mu[lastest].tm.tsec)
404 || ((mu[i].tm.tsec == mu[lastest].tm.tsec) && (mu[i].tm.tnsec > mu[lastest].tm.tnsec)))
405 && i != lastest) {
406 lastest = i;
407 lastest_group = groups[i];
411 if (!updated && i == group_num) {
412 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: no space left to save group in temp meta!\n", dnet_dump_id_str(ids->id.id));
413 err = -ENOMEM;
414 goto err_out_free;
416 if (!updated) {
417 mu[i].group_id = remote_group;
418 mu[i].tm = ids->last_update.tm;
419 mu[i].flags = ids->last_update.flags;
421 if (((mu[i].tm.tsec > mu[lastest].tm.tsec)
422 || ((mu[i].tm.tsec == mu[lastest].tm.tsec) && (mu[i].tm.tnsec > mu[lastest].tm.tnsec)))
423 && i != lastest) {
424 lastest = i;
425 lastest_group = groups[i];
428 ++i;
431 /* Not all groups processed yet */
432 if (i < group_num) {
433 err = 0;
434 err = dnet_db_write_raw(p->db->b, &ids->id, temp_mc.data, temp_mc.size);
435 if (err) {
436 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: unable to save temp meta, err: %d\n", dnet_dump_id_str(ids->id.id), err);
438 goto err_out_free;
441 /* Check if removal_delay second has gone since object was marked as REMOVED */
442 if (removed_in_all) {
443 gettimeofday(&current_ts, NULL);
444 if (((uint64_t)current_ts.tv_sec < mu[lastest].tm.tsec)
445 || ((uint64_t)current_ts.tv_sec - mu[lastest].tm.tsec) < (uint64_t)(state->n->removal_delay * 3600 * 24))
446 removed_in_all = 0;
449 /* TODO: receive newer files from remote groups
451 * Yep, we should read it locally and send it to other groups too
453 if ((lastest_group != my_group) && !(mu[lastest].flags & DNET_IO_FLAGS_REMOVED)) {
454 dnet_log(state->n, DNET_LOG_DSA, "BULK: %s: File on remote group %d is newer, skipping this file\n",
455 dnet_dump_id_str(ids->id.id), lastest_group);
456 err = 0;
457 goto err_out_free;
460 for (i = 0; i < group_num; ++i) {
461 err = 0;
462 if (mu[i].group_id == my_group)
463 continue;
465 dnet_setup_id(&id, mu[i].group_id, ids->id.id);
466 id.type = -1;
468 if (mu[lastest].flags & DNET_IO_FLAGS_REMOVED) {
469 if (removed_in_all) {
470 dnet_log(state->n, DNET_LOG_DSA, "BULK: dnet_remove_object_now %s in group %d, err=%d\n",
471 dnet_dump_id(&id), mu[i].group_id, err);
472 err = dnet_remove_object_now(state->n, &id, cflags);
473 } else {
474 if (!(mu[i].flags & DNET_IO_FLAGS_REMOVED)) {
475 err = dnet_remove_object(state->n, &id, NULL, NULL, cflags);
476 dnet_log(state->n, DNET_LOG_DSA, "BULK: dnet_remove_object %s in group %d err=%d\n",
477 dnet_dump_id(&id), mu[i].group_id, err);
480 if (err < 0)
481 goto err_out_cont2;
482 } else {
483 if ((mu[i].tm.tsec < mu[lastest].tm.tsec) || ((mu[i].tm.tsec == mu[lastest].tm.tsec) &&
484 ((mu[i].tm.tnsec < mu[lastest].tm.tnsec)))) {
485 err = state->n->cb->send(state, state->n->cb->command_private, &id);
487 if (err)
488 goto err_out_cont2;
490 err = dnet_meta_update_check_status_raw(state->n, &mc);
491 if (err)
492 goto err_out_cont2;
494 memcpy(&mc.id, &id, sizeof(struct dnet_id));
495 err = dnet_write_metadata(state->n, &mc, 1, cflags);
496 dnet_log(state->n, DNET_LOG_DSA, "BULK: dnet_write_metadata %s in group %d, err=%d\n",
497 dnet_dump_id(&id), my_group, err);
499 if (err < 0)
500 goto err_out_cont2;
503 err_out_cont2:
504 if (err < 0)
505 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: Error during sending transaction to group %d, err=%d\n",
506 dnet_dump_id_str(ids->id.id), groups[i], err);
507 if (!error && err < 0)
508 error = err;
511 if (mu[lastest].flags & DNET_IO_FLAGS_REMOVED) {
512 if (removed_in_all) {
513 err = dnet_merge_remove_local(state->n, &mc.id, 0);
514 } else if (!(my_mu.flags & DNET_IO_FLAGS_REMOVED)) {
515 err = dnet_merge_remove_local(state->n, &mc.id, 1);
519 if (!(mu[lastest].flags & DNET_IO_FLAGS_REMOVED) && !error) {
520 err = dnet_meta_update_check_status(state->n, &mc);
521 if (err) {
522 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: couldn't update meta CHECK_STATUS\n", dnet_dump_id_str(ids->id.id));
526 if (group_num > 2) {
527 err = dnet_db_remove_raw(p->db->b, &ids->id, 1);
528 if (!err) {
529 dnet_log_raw(state->n, DNET_LOG_ERROR, "BULK: %s: DB: failed to remove temp_meta object, err: %d.\n",
530 dnet_dump_id(&mc.id), err);
534 if (error > 0)
535 error = 0;
536 err_out_free:
537 free(temp_mc.data);
538 err_out_kcfree:
539 free(mc.data);
540 if (tmpdata)
541 free(tmpdata);
542 err_out_continue:
543 if (error < 0) {
544 dnet_log(state->n, DNET_LOG_ERROR, "Failed to check ID %s to %s, err=%d\n", dnet_dump_id_str(ids->id.id),
545 dnet_state_dump_addr(state), error);
547 if (i == group_num) {
548 dnet_counter_inc(state->n, DNET_CNTR_NODE_CHECK_COPY, error);
551 return error;
554 static int dnet_bulk_check_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv)
556 struct dnet_bulk_check_priv *p = priv;
557 int err = 0, i;
559 if (is_trans_destroyed(state, cmd)) {
560 dnet_wakeup(p->w, p->w->cond++);
561 dnet_wait_put(p->w);
562 dnet_check_temp_db_put(p->db);
563 if (atomic_dec_and_test(&p->refcnt)) {
564 free(p->groups);
565 free(p);
567 return 0;
570 /* Empty reply that prevents timeout */
571 if (cmd->size == 0) {
572 return 0;
575 if (!(cmd->size % sizeof(struct dnet_bulk_id))) {
576 struct dnet_bulk_id *ids = (struct dnet_bulk_id *)(cmd + 1);
577 int num = cmd->size / sizeof(struct dnet_bulk_id);
579 dnet_log(state->n, DNET_LOG_DSA, "BULK: received %d entries\n", num);
581 //dnet_db_ptr_get(&state->n->temp_meta);
582 //ret = kcdbbegintran(state->n->temp_meta.db, 0);
583 //if (!ret) {
584 // err = -kcdbecode(state->n->temp_meta.db);
585 // dnet_log_raw(state->n, DNET_LOG_ERROR, "BULK: DB: failed to start temp_meta transaction, err: %d: %s.\n",
586 // err, kcecodename(-err));
587 // return err;
590 for (i = 0; i < num && !state->n->need_exit; ++i) {
591 err = dnet_bulk_check_complete_single(state, &ids[i], cmd->id.group_id, p);
594 //kcdbendtran(state->n->temp_meta.db, 1);
595 //dnet_db_ptr_put(state->n, &state->n->temp_meta);
597 if (err) {
598 dnet_log(state->n, DNET_LOG_ERROR, "BULK: couldn't update meta CHECK_STATUS\n");
600 } else {
601 dnet_log(state->n, DNET_LOG_ERROR, "BULK: received corrupted data, size = %llu, sizeof(dnet_bulk_id) = %zu\n",
602 (unsigned long long)cmd->size, sizeof(struct dnet_bulk_id));
605 p->w->status = cmd->status;
606 return err;
609 int dnet_request_bulk_check(struct dnet_node *n, struct dnet_bulk_state *state, struct dnet_check_params *params)
611 struct dnet_trans_control ctl;
612 struct dnet_net_state *st;
613 struct dnet_bulk_check_priv *p;
614 struct timespec wait_ts;
615 int err;
617 p = (struct dnet_bulk_check_priv *)malloc(sizeof(struct dnet_bulk_check_priv));
618 if (!p) {
619 err = -ENOMEM;
620 goto err_out_exit;
622 atomic_init(&p->refcnt, 2);
623 p->db = params->db;
624 dnet_check_temp_db_get(p->db);
626 p->w = dnet_wait_alloc(0);
627 if (!p->w) {
628 err = -ENOMEM;
629 goto err_out_free;
632 p->group_num = params->group_num;
633 p->groups = (int *)malloc(sizeof(int) * params->group_num);
634 if (!p->groups) {
635 err = -ENOMEM;
636 goto err_out_put;
638 memcpy(p->groups, params->groups, sizeof(int) * params->group_num);
640 memset(&ctl, 0, sizeof(struct dnet_trans_control));
642 ctl.cmd = DNET_CMD_LIST;
643 ctl.complete = dnet_bulk_check_complete;
644 ctl.priv = p;
645 ctl.cflags = DNET_FLAGS_NEED_ACK | DNET_FLAGS_NOLOCK | DNET_ATTR_BULK_CHECK;
647 ctl.data = state->ids;
648 ctl.size = sizeof(struct dnet_bulk_id) * state->num;
650 st = dnet_state_search_by_addr(n, &state->addr);
651 if (!st) {
652 err = -ENOENT;
653 goto err_out_put;
655 dnet_setup_id(&ctl.id, st->idc->group->group_id, st->idc->ids[0].raw.id);
656 dnet_wait_get(p->w);
658 dnet_log(n, DNET_LOG_DSA, "BULK: sending %u bytes of data to %s (%s)\n",
659 ctl.size, dnet_dump_id(&ctl.id), dnet_server_convert_dnet_addr(&state->addr));
660 err = dnet_trans_alloc_send_state(st, &ctl);
661 dnet_state_put(st);
663 wait_ts = n->wait_ts;
664 wait_ts.tv_sec *= DNET_BULK_IDS_SIZE;
665 err = dnet_wait_event(p->w, p->w->cond != 0, &wait_ts);
666 if (err)
667 goto err_out_put;
669 if (p->w->status) {
670 err = p->w->status;
671 goto err_out_put;
674 dnet_wait_put(p->w);
676 if (atomic_dec_and_test(&p->refcnt)) {
677 free(p->groups);
678 free(p);
681 return 0;
683 err_out_put:
684 dnet_wait_put(p->w);
686 err_out_free:
687 if (atomic_dec_and_test(&p->refcnt)) {
688 free(p->groups);
689 free(p);
692 err_out_exit:
693 dnet_log(n, DNET_LOG_ERROR, "Bulk check exited with status %d\n", err);
694 return err;
697 static int dnet_bulk_add_id(struct dnet_node *n, struct dnet_bulk_array *bulk_array, struct dnet_id *id,
698 struct dnet_meta_container *mc, struct dnet_check_params *params)
700 int err = 0;
701 struct dnet_bulk_state tmp;
702 struct dnet_bulk_state *state = NULL;
703 struct dnet_net_state *st = dnet_state_get_first(n, id);
704 struct dnet_bulk_id *bulk_id;
705 struct dnet_meta_update mu;
707 dnet_log(n, DNET_LOG_DSA, "BULK: adding ID %s to array\n", dnet_dump_id(id));
708 if (!st)
709 return -1;
711 memcpy(&tmp.addr, &st->addr, sizeof(struct dnet_addr));
712 dnet_state_put(st);
714 dnet_log(n, DNET_LOG_DSA, "BULK: Searching state in states array\n");
715 state = bsearch(&tmp, bulk_array->states, bulk_array->num, sizeof(struct dnet_bulk_state), dnet_compare_bulk_state);
716 if (!state)
717 return -1;
719 if (!dnet_get_meta_update(n, mc, &mu))
720 return -ENOENT;
722 dnet_log(n, DNET_LOG_DSA, "BULK: addr = %s state->num = %d\n", dnet_server_convert_dnet_addr(&state->addr), state->num);
723 //pthread_mutex_lock(&state->state_lock);
724 if (state->num >= DNET_BULK_IDS_SIZE || state->num < 0)
725 goto err_out_unlock;
727 bulk_id = &state->ids[state->num];
728 memset(bulk_id, 0, sizeof(struct dnet_bulk_id));
730 memcpy(&bulk_id->id, &id->id, DNET_ID_SIZE);
732 dnet_log(n, DNET_LOG_DSA, "BULK: ID: %s, last_update->tsec=%llu, last_update->tnsec=%llu, flags=%02llx\n",
733 dnet_dump_id_str(bulk_id->id.id), (unsigned long long)mu.tm.tsec, (unsigned long long)mu.tm.tnsec,
734 (unsigned long long)mu.flags);
736 dnet_convert_meta_update(&mu);
738 memcpy(&bulk_id->last_update, &mu, sizeof(struct dnet_meta_update));
740 state->num++;
742 dnet_log(n, DNET_LOG_DSA, "BULK: addr = %s state->num = %d\n", dnet_server_convert_dnet_addr(&state->addr), state->num);
743 if (state->num == DNET_BULK_IDS_SIZE) {
744 err = dnet_request_bulk_check(n, state, params);
745 state->num = 0;
746 if (err)
747 goto err_out_unlock;
750 //pthread_mutex_unlock(&state->state_lock);
752 return 0;
754 err_out_unlock:
755 //pthread_mutex_unlock(&state->state_lock);
756 return -2;
759 static int dnet_check_number_of_copies(struct dnet_node *n, struct dnet_meta_container *mc, int *groups, int group_num,
760 struct dnet_bulk_array *bulk_array, struct dnet_check_params *params)
762 struct dnet_id raw;
763 int group_id = mc->id.group_id;
764 int err = 0, i, error = 0;
766 for (i=0; i<group_num; ++i) {
767 if (groups[i] == group_id)
768 continue;
770 dnet_setup_id(&raw, groups[i], mc->id.id);
772 err = dnet_bulk_add_id(n, bulk_array, &raw, mc, params);
773 if (err)
774 dnet_log(n, DNET_LOG_ERROR, "BULK: after adding ID %s err = %d\n", dnet_dump_id(&raw), err);
776 if (!err)
777 error = 0;
778 else if (!error)
779 error = err;
782 return error;
785 static int dnet_check_copies(struct dnet_node *n, struct dnet_meta_container *mc,
786 struct dnet_bulk_array *bulk_array, struct dnet_check_params *params)
788 int err;
789 int *groups = NULL;
791 if (params->group_num) {
792 groups = params->groups;
793 err = params->group_num;
794 } else {
795 err = dnet_check_find_groups(n, mc, &groups);
796 if (err <= 0)
797 return -ENOENT;
800 err = dnet_check_number_of_copies(n, mc, groups, err, bulk_array, params);
802 if (!params->group_num)
803 free(groups);
805 return err;
808 static int dnet_merge_direct(struct dnet_node *n, struct dnet_meta_container *mc)
810 struct dnet_net_state *base;
811 int cflags = 0;
812 int err;
814 dnet_log(n, DNET_LOG_DSA, "in dnet_merge_direct mc->size = %u\n", mc->size);
815 base = dnet_node_state(n);
816 if (!base) {
817 err = -ENOENT;
818 goto err_out_exit;
821 err = n->cb->send(base, n->cb->command_private, &mc->id);
822 dnet_log(n, DNET_LOG_DSA, "in dnet_merge_direct after n->cb->send err = %d\n\n", err);
823 if (err < 0)
824 goto err_out_put;
826 dnet_log(n, DNET_LOG_DSA, "in dnet_merge_direct2 mc->size = %u\n", mc->size);
827 err = dnet_write_metadata(n, mc, 0, cflags);
828 if (err <= 0)
829 goto err_out_put;
831 err = 0;
833 err_out_put:
834 dnet_state_put(base);
835 err_out_exit:
836 return err;
840 static int dnet_merge_upload(struct dnet_node *n, struct dnet_meta_container *mc)
842 struct dnet_net_state *base;
843 int err = 0;
845 base = dnet_node_state(n);
846 if (!base) {
847 err = -ENOENT;
848 goto err_out_exit;
851 err = n->cb->send(base, n->cb->command_private, &mc->id);
852 if (err)
853 goto err_out_put;
855 err = dnet_write_metadata(n, mc, 0);
856 if (err <= 0)
857 goto err_out_put;
859 err_out_put:
860 dnet_state_put(base);
861 err_out_exit:
862 return err;
866 static int dnet_merge_common(struct dnet_node *n, struct dnet_meta_container *remote_meta, struct dnet_meta_container *mc)
868 int err = 0;
869 struct dnet_meta_update local, remote;
870 uint64_t cflags = 0;
872 dnet_log(n, DNET_LOG_DSA, "in dnet_merge_common mc->size = %d\n", mc->size);
873 if (!dnet_get_meta_update(n, mc, &local)) {
874 err = -ENOENT;
875 dnet_log(n, DNET_LOG_ERROR, "%s: META_UPDATE not found in local meta\n", dnet_dump_id(&mc->id));
876 goto err_out_exit;
879 if (!dnet_get_meta_update(n, remote_meta, &remote)) {
880 err = -ENOENT;
881 dnet_log(n, DNET_LOG_ERROR, "%s: META_UPDATE not found in remote meta, perform direct merge\n", dnet_dump_id(&mc->id));
882 err = dnet_merge_direct(n, mc);
883 goto err_out_exit;
886 if ((local.tm.tsec > remote.tm.tsec) || (local.tm.tsec == remote.tm.tsec && local.tm.tnsec > remote.tm.tnsec)) {
887 if (local.flags & DNET_IO_FLAGS_REMOVED) {
888 err = dnet_remove_object_now(n, &mc->id, cflags);
889 } else {
890 err = dnet_merge_direct(n, mc);
894 err_out_exit:
895 return err;
899 static int dnet_check_merge(struct dnet_node *n, struct dnet_meta_container *mc)
901 int err;
902 struct dnet_meta_container remote_mc;
904 dnet_log(n, DNET_LOG_DSA, "in dnet_check_merge mc->size = %d\n", mc->size);
905 memset(&remote_mc, 0, sizeof(struct dnet_meta_container));
907 err = dnet_read_meta(n, &remote_mc, NULL, 0, &mc->id);
908 if (err) {
909 if (err != -ENOENT) {
910 dnet_log_raw(n, DNET_LOG_ERROR, "%s: failed to download object to be merged from storage: %d.\n",
911 dnet_dump_id(&mc->id), err);
912 goto err_out_exit;
915 dnet_log_raw(n, DNET_LOG_INFO, "%s: there is no meta in the storage to merge with, "
916 "doing direct merge (plain upload).\n", dnet_dump_id(&mc->id));
917 err = dnet_merge_direct(n, mc);
918 } else {
920 err = dnet_merge_common(n, &remote_mc, mc);
923 if (err)
924 goto err_out_exit;
926 err_out_exit:
927 if (remote_mc.data)
928 free(remote_mc.data);
929 return err;
932 int dnet_check(struct dnet_node *n, struct dnet_meta_container *mc, struct dnet_bulk_array *bulk_array,
933 int need_merge, struct dnet_check_params *params)
935 int err = 0;
937 dnet_log(n, DNET_LOG_DSA, "need_merge = %d, mc.size = %d\n", need_merge, mc->size);
938 if (need_merge) {
939 err = dnet_check_merge(n, mc);
940 dnet_log(n, DNET_LOG_DSA, "err=%d\n", err);
941 if (!err)
942 dnet_merge_remove_local(n, &mc->id, 0);
943 } else {
944 err = dnet_check_copies(n, mc, bulk_array, params);
946 return err;
949 static int dnet_check_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv)
951 struct dnet_wait *w = priv;
952 int err = -EINVAL;
954 if (is_trans_destroyed(state, cmd)) {
955 dnet_wakeup(w, w->cond++);
956 dnet_wait_put(w);
957 return 0;
960 if (cmd->size == sizeof(struct dnet_check_reply)) {
961 struct dnet_check_reply *r = (struct dnet_check_reply *)(cmd + 1);
963 dnet_convert_check_reply(r);
965 dnet_log(state->n, DNET_LOG_INFO, "check: total: %d, completed: %d, errors: %d\n",
966 r->total, r->completed, r->errors);
969 w->status = cmd->status;
970 return err;
973 static int dnet_send_check_request(struct dnet_net_state *st, struct dnet_id *id,
974 struct dnet_wait *w, struct dnet_check_request *r)
976 struct dnet_trans_control ctl;
977 char ctl_time[64];
978 struct tm tm;
980 memset(&ctl, 0, sizeof(struct dnet_trans_control));
982 memcpy(&ctl.id, id, sizeof(struct dnet_id));
983 ctl.cmd = DNET_CMD_LIST;
984 ctl.complete = dnet_check_complete;
985 ctl.priv = w;
986 ctl.cflags = DNET_FLAGS_NEED_ACK | DNET_FLAGS_NOLOCK;
988 if (r->timestamp) {
989 localtime_r((time_t *)&r->timestamp, &tm);
990 strftime(ctl_time, sizeof(ctl_time), "%F %R:%S %Z", &tm);
991 } else {
992 snprintf(ctl_time, sizeof(ctl_time), "all records");
995 dnet_log(st->n, DNET_LOG_INFO, "%s: check request: objects: %llu, threads: %llu, timestamp: %s, merge: %d\n",
996 dnet_state_dump_addr(st), (unsigned long long)r->obj_num, (unsigned long long)r->thread_num,
997 ctl_time, !!(r->flags & DNET_CHECK_MERGE));
999 dnet_convert_check_request(r);
1001 ctl.data = r;
1002 ctl.size = sizeof(*r) + r->obj_num * sizeof(struct dnet_id) + r->group_num * sizeof(int);
1004 return dnet_trans_alloc_send_state(st, &ctl);
1007 int dnet_request_check(struct dnet_node *n, struct dnet_check_request *r)
1009 struct dnet_wait *w;
1010 struct dnet_net_state *st;
1011 struct dnet_group *g;
1012 int err, num = 0;
1014 w = dnet_wait_alloc(0);
1015 if (!w) {
1016 err = -ENOMEM;
1017 goto err_out_exit;
1020 pthread_mutex_lock(&n->state_lock);
1021 list_for_each_entry(g, &n->group_list, group_entry) {
1022 list_for_each_entry(st, &g->state_list, state_entry) {
1023 struct dnet_id raw;
1025 if (st == n->st)
1026 continue;
1028 dnet_wait_get(w);
1030 dnet_setup_id(&raw, st->idc->group->group_id, st->idc->ids[0].raw.id);
1031 dnet_send_check_request(st, &raw, w, r);
1032 num++;
1035 pthread_mutex_unlock(&n->state_lock);
1037 err = dnet_wait_event(w, w->cond == num, &n->wait_ts);
1038 if (err)
1039 goto err_out_put;
1041 if (w->status) {
1042 err = w->status;
1043 goto err_out_put;
1046 dnet_wait_put(w);
1048 return num;
1050 err_out_put:
1051 dnet_wait_put(w);
1052 err_out_exit:
1053 dnet_log(n, DNET_LOG_ERROR, "Check exited with status %d\n", err);
1054 return err;