Merge pull request #4 from oktocat/master
[elliptics.git] / library / check.c
blob12c29449800dc962d5ea89d8f7c54b2219b51e05
1 /*
2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
17 #include <sys/stat.h>
18 #include <sys/mman.h>
20 #include <dirent.h>
21 #include <errno.h>
22 #include <fcntl.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <string.h>
26 #include <unistd.h>
28 #include "elliptics.h"
29 #include "elliptics/interface.h"
31 static char dnet_check_tmp_dir[] = "/dev/shm";
33 static int dnet_merge_remove_local(struct dnet_node *n, struct dnet_id *id, int full_process)
35 char buf[sizeof(struct dnet_cmd)];
36 struct dnet_cmd *cmd;
37 struct dnet_net_state *base;
38 int err = -ENOENT;
40 memset(buf, 0, sizeof(buf));
42 cmd = (struct dnet_cmd *)buf;
44 memcpy(&cmd->id, id, sizeof(struct dnet_id));
45 cmd->size = 0;
47 cmd->cmd = DNET_CMD_DEL;
48 if (!full_process)
49 cmd->flags = DNET_ATTR_DELETE_HISTORY;
51 base = dnet_node_state(n);
52 if (base) {
53 err = dnet_process_meta(base, cmd, NULL);
54 dnet_state_put(base);
57 return err;
60 static int dnet_dump_meta_container(struct dnet_node *n, struct dnet_meta_container *mc)
62 int fd, err;
63 char file[256];
64 char id_str[DNET_ID_SIZE*2+1];
66 snprintf(file, sizeof(file), "%s/%s.meta", dnet_check_tmp_dir, dnet_dump_id_len_raw(mc->id.id, DNET_ID_SIZE, id_str));
68 fd = open(file, O_RDWR | O_TRUNC | O_CREAT | O_CLOEXEC, 0644);
69 if (fd < 0) {
70 err = -errno;
71 dnet_log_raw(n, DNET_LOG_ERROR, "Failed to open meta container file '%s': %s\n",
72 file, strerror(errno));
73 goto err_out_exit;
76 err = write(fd, mc->data, mc->size);
77 if (err != (int)mc->size) {
78 err = -errno;
79 dnet_log_raw(n, DNET_LOG_ERROR, "Failed to write meta container into '%s': %s\n",
80 file, strerror(errno));
81 goto err_out_close;
83 err = 0;
85 err_out_close:
86 close(fd);
87 err_out_exit:
88 return err;
91 static int dnet_check_find_groups(struct dnet_node *n, struct dnet_meta_container *mc, int **groupsp)
93 int err, i, num;
94 struct dnet_meta *m;
95 int *groups;
97 m = dnet_meta_search(n, mc, DNET_META_GROUPS);
98 if (!m) {
99 dnet_log_raw(n, DNET_LOG_ERROR, "%s: failed to find groups metadata.\n", dnet_dump_id(&mc->id));
100 err = -ENOENT;
101 goto err_out_exit;
104 groups = malloc(m->size);
105 if (!groups) {
106 err = -ENOMEM;
107 goto err_out_exit;
109 memcpy(groups, m->data, m->size);
111 num = m->size / sizeof(int32_t);
113 for (i=0; i<num; ++i) {
114 dnet_log_raw(n, DNET_LOG_DEBUG, "%s: group: %d\n", dnet_dump_id(&mc->id), groups[i]);
117 *groupsp = groups;
119 return num;
121 err_out_exit:
122 dnet_dump_meta_container(n, mc);
123 return err;
126 /*static int dnet_bulk_db_check_update(struct dnet_node *n, struct dnet_meta_container *mc_array, int *rec_num,
127 struct dnet_meta_container *mc, int final)
129 int err = 0;
130 int64_t rec_processed;
131 KCREC recs[DNET_BULK_META_UPD_SIZE];
132 int rec_iter = 0;
134 if (!rec_num || *rec_num > DNET_BULK_META_UPD_SIZE) {
135 err = -EINVAL;
136 goto err_out_exit;
139 if (!final) {
140 if (!mc) {
141 dnet_log_raw(n, DNET_LOG_ERROR, "CHECK: mc should be passed\n");
142 return -1;
144 memcpy(&mc_array[*rec_num].id, &mc->id, sizeof(struct dnet_id));
145 mc_array[*rec_num].size = mc->size;
146 // Allocate extra memory for potential META_CHECK_STATUS structure
147 mc_array[*rec_num].data = malloc(mc->size + sizeof(struct dnet_meta) + sizeof(struct dnet_meta_check_status));
148 if (!mc_array[*rec_num].data) {
149 err = -ENOMEM;
150 goto err_out_exit;
152 memcpy(mc_array[*rec_num].data, mc->data, mc->size);
153 (*rec_num)++;
156 if (*rec_num == DNET_BULK_META_UPD_SIZE || (final && *rec_num > 0)) {
157 err = kcdbbegintran(n->meta, 0);
158 if (!err) {
159 err = -kcdbecode(n->meta);
160 dnet_log_raw(n, DNET_LOG_ERROR, "CHECK: DB: failed to start %s transaction, err: %d: %s.\n",
161 "meta", err, kcecodename(-err));
162 goto err_out_exit;
164 for (rec_iter = 0; rec_iter < *rec_num; ++rec_iter) {
165 err = dnet_db_check_update(n, &mc_array[rec_iter]);
166 recs[rec_iter].key.size = DNET_ID_SIZE;
167 recs[rec_iter].key.buf = (char *)mc_array[rec_iter].id.id;
168 recs[rec_iter].value.size = mc_array[rec_iter].size;
169 recs[rec_iter].value.buf = mc_array[rec_iter].data;
171 rec_processed = kcdbsetbulk(n->meta, recs, *rec_num, 0);
172 if ((int)rec_processed != *rec_num) {
173 err = -kcdbecode(n->meta);
174 dnet_log_raw(n, DNET_LOG_ERROR, "CHECK: DB: failed to set check update stamps, %d records processed, err: %d: %s.\n",
175 (int)rec_processed, err, kcecodename(-err));
176 kcdbendtran(n->meta, 0);
177 goto err_out_exit;
180 kcdbendtran(n->meta, 1);
181 for (rec_iter = 0; rec_iter < *rec_num; ++rec_iter) {
182 free(mc_array[rec_iter].data);
184 *rec_num = 0;
187 err_out_exit:
188 return err;
192 int dnet_cmd_bulk_check(struct dnet_net_state *orig, struct dnet_cmd *cmd, void *data)
194 struct dnet_bulk_id *ids = (struct dnet_bulk_id *)data;
195 struct dnet_meta_container mc;
196 struct dnet_meta_update mu;
197 struct dnet_id raw;
198 int i;
199 int err = 0;
200 int num;
202 if (!(cmd->size % sizeof(struct dnet_bulk_id))) {
203 num = cmd->size / sizeof(struct dnet_bulk_id);
205 dnet_log(orig->n, DNET_LOG_DEBUG, "BULK: received %d entries\n", num);
207 for (i = 0; i < num; ++i) {
209 /* Send empty reply every DNET_BULK_CHECK_PING records to prevent timeout */
210 if (i % DNET_BULK_CHECK_PING == 0 && i > 0) {
211 dnet_send_reply(orig, cmd, NULL, 0, 1);
214 dnet_log(orig->n, DNET_LOG_DEBUG, "BULK: processing ID %s\n", dnet_dump_id_str(ids[i].id.id));
215 mc.data = NULL;
216 dnet_setup_id(&mc.id, 0, ids[i].id.id);
217 err = orig->n->cb->meta_read(orig->n->cb->command_private, &ids[i].id, &mc.data);
218 if (mc.data) {
219 mc.size = err;
220 dnet_log(orig->n, DNET_LOG_DEBUG, "BULK: %d bytes of metadata found, searching for META_UPDATE group_id=%d\n",
221 mc.size, orig->n->st->idc->group->group_id);
222 if (dnet_get_meta_update(orig->n, &mc, &mu))
224 dnet_convert_meta_update(&ids[i].last_update);
225 dnet_log(orig->n, DNET_LOG_DEBUG, "BULK: mu.tsec=%lu, mu.tnsec=%lu, mu.flags=%02lx\n",
226 (unsigned long)mu.tm.tsec, (unsigned long)mu.tm.tnsec, (unsigned long)mu.flags);
227 dnet_log(orig->n, DNET_LOG_DEBUG,
228 "BULK: last_update.tsec=%lu, last_update.tnsec=%lu, last_update.flags=%02lx\n",
229 (unsigned long)ids[i].last_update.tm.tsec, (unsigned long)ids[i].last_update.tm.tnsec,
230 (unsigned long)ids[i].last_update.flags);
232 if ((mu.flags & DNET_IO_FLAGS_REMOVED) || (mu.tm.tsec < ids[i].last_update.tm.tsec) ||
233 ((mu.tm.tnsec < ids[i].last_update.tm.tnsec) && (mu.tm.tsec == ids[i].last_update.tm.tsec))) {
234 err = 0;
235 } else {
236 /* File is not needed to be updated */
237 dnet_setup_id(&raw, orig->n->id.group_id, ids[i].id.id);
238 err = dnet_stat_local(orig, &raw);
239 if (err) {
240 /* File was not found in the storage */
241 mu.tm.tsec = 1;
242 mu.flags = 0;
243 } else {
244 err = dnet_meta_update_check_status(orig->n, &mc);
245 if (err) {
246 dnet_log(orig->n, DNET_LOG_ERROR,
247 "BULK: %s: couldn't update meta CHECK_STATUS err: %d\n",
248 dnet_dump_id_str(ids[i].id.id), err);
253 memcpy(&ids[i].last_update, &mu, sizeof(struct dnet_meta_update));
254 dnet_convert_meta_update(&ids[i].last_update);
256 free(mc.data);
257 } else {
258 /* Meta is not present - set timestamp to very old one */
259 dnet_convert_meta_update(&ids[i].last_update);
260 ids[i].last_update.tm.tsec = 1;
261 ids[i].last_update.flags = 0;
262 dnet_convert_meta_update(&ids[i].last_update);
265 } else {
266 dnet_log(orig->n, DNET_LOG_ERROR, "BULK: received corrupted data, size = %llu, sizeof(dnet_bulk_id) = %zu\n",
267 (unsigned long long)cmd->size, sizeof(struct dnet_bulk_id));
268 err = -1;
269 goto err_out_exit;
272 return dnet_send_reply(orig, cmd, data, sizeof(struct dnet_bulk_id) * num, 0);
274 err_out_exit:
275 return err;
278 struct dnet_bulk_check_priv {
279 struct dnet_wait *w;
280 struct dnet_check_temp_db *db;
281 atomic_t refcnt;
282 int group_num;
283 int *groups;
286 static int dnet_bulk_check_complete_single(struct dnet_net_state *state, struct dnet_bulk_id *ids,
287 int remote_group, struct dnet_bulk_check_priv *p)
289 struct dnet_meta_container mc;
290 struct dnet_meta_container temp_mc;
291 struct dnet_meta_update *mu;
292 struct dnet_meta *mg;
293 struct dnet_id id;
294 char *tmpdata = NULL;
295 int *groups, group_num = 1;
296 int err = -EINVAL, error = 0;
297 int i = 0;
298 int my_group, lastest_group = -1;
299 struct dnet_meta_update lastest_mu, my_mu;
300 struct timeval current_ts;
301 int removed_in_all = 1, updated = 0;
302 int lastest = 0;
303 uint64_t cflags = 0;
304 uint64_t ioflags = 0;
306 my_group = state->n->id.group_id;
308 dnet_log(state->n, DNET_LOG_DEBUG, "BULK: checking ID %s\n", dnet_dump_id_str(ids->id.id));
309 err = -ENOENT;
310 error = 0;
312 dnet_setup_id(&mc.id, my_group, ids->id.id);
314 err = state->n->cb->meta_read(state->n->cb->command_private, &ids->id, &mc.data);
315 if (err <= 0) {
316 if (err == 0)
317 err = -ENOENT;
318 goto err_out_continue;
320 mc.size = err;
322 /* Set current group meta_update as lastest_mu */
323 if (!dnet_get_meta_update(state->n, &mc, &my_mu)) {
324 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: meta_update structure doesn't exist for group %d\n",
325 dnet_dump_id_str(ids->id.id), my_group);
326 err = -ENOENT;
327 goto err_out_kcfree;
329 dnet_convert_meta_update(&my_mu);
330 memcpy(&lastest_mu, &my_mu, sizeof(struct dnet_meta_update));
331 lastest_group = my_group;
333 /* Get group list */
334 if (p->group_num) {
335 /* groups came from dnet_check utility */
336 groups = p->groups;
337 group_num = p->group_num;
338 } else {
339 mg = dnet_meta_search(state->n, &mc, DNET_META_GROUPS);
340 if (!mg) {
341 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: DNET_META_GROUPS structure doesn't exist\n", dnet_dump_id_str(ids->id.id));
342 err = -ENOENT;
343 goto err_out_kcfree;
345 dnet_convert_meta(mg);
346 if (mg->size % sizeof(int)) {
347 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: DNET_META_GROUPS structure is corrupted\n", dnet_dump_id_str(ids->id.id));
348 err = -1;
349 goto err_out_kcfree;
351 group_num = mg->size / sizeof(int);
352 groups = (int *)mg->data;
353 dnet_convert_meta(mg);
356 /* Read temporary meta */
357 temp_mc.data = malloc(sizeof(struct dnet_meta_update) * group_num);
358 if (!temp_mc.data) {
359 err = -ENOMEM;
360 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: could not allocate memory for temp UPDATE_META\n", dnet_dump_id_str(ids->id.id));
361 goto err_out_kcfree;
363 memset(temp_mc.data, 0, sizeof(struct dnet_meta_update) * group_num);
364 temp_mc.size = sizeof(struct dnet_meta_update) * group_num;
366 err = dnet_db_read_raw(p->db->b, &ids->id, (void **)&tmpdata);
367 if (err <= 0) {
368 if (err < 0 && err != -2)
369 goto err_out_free;
370 /* No data in temp meta was stored. Placing local meta_update at the beginning */
371 mu = temp_mc.data;
372 mu[0].group_id = my_group;
373 mu[0].tm = my_mu.tm;
374 mu[0].flags = my_mu.flags;
376 } else {
377 if (err > (int)(sizeof(struct dnet_meta_update) * group_num)) {
378 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: too many data stored in temp meta\n", dnet_dump_id_str(ids->id.id));
379 err = -ENOMEM;
380 goto err_out_free;
382 memcpy(temp_mc.data, tmpdata, err);
385 /* Update temp meta with received group */
386 mu = temp_mc.data;
387 updated = 0;
388 lastest = 0;
389 for (i = 0; i < group_num; ++i) {
390 if (mu[i].group_id == remote_group) {
391 mu[i].tm = ids->last_update.tm;
392 mu[i].flags = ids->last_update.flags;
393 updated = 1;
396 if (mu[i].group_id == 0)
397 break;
399 if (!(mu[i].flags & DNET_IO_FLAGS_REMOVED))
400 removed_in_all = 0;
402 if (((mu[i].tm.tsec > mu[lastest].tm.tsec)
403 || ((mu[i].tm.tsec == mu[lastest].tm.tsec) && (mu[i].tm.tnsec > mu[lastest].tm.tnsec)))
404 && i != lastest) {
405 lastest = i;
406 lastest_group = groups[i];
410 if (!updated && i == group_num) {
411 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: no space left to save group in temp meta!\n", dnet_dump_id_str(ids->id.id));
412 err = -ENOMEM;
413 goto err_out_free;
415 if (!updated) {
416 mu[i].group_id = remote_group;
417 mu[i].tm = ids->last_update.tm;
418 mu[i].flags = ids->last_update.flags;
420 if (((mu[i].tm.tsec > mu[lastest].tm.tsec)
421 || ((mu[i].tm.tsec == mu[lastest].tm.tsec) && (mu[i].tm.tnsec > mu[lastest].tm.tnsec)))
422 && i != lastest) {
423 lastest = i;
424 lastest_group = groups[i];
427 ++i;
430 /* Not all groups processed yet */
431 if (i < group_num) {
432 err = 0;
433 err = dnet_db_write_raw(p->db->b, &ids->id, temp_mc.data, temp_mc.size);
434 if (err) {
435 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: unable to save temp meta, err: %d\n", dnet_dump_id_str(ids->id.id), err);
437 goto err_out_free;
440 /* Check if removal_delay second has gone since object was marked as REMOVED */
441 if (removed_in_all) {
442 gettimeofday(&current_ts, NULL);
443 if (((uint64_t)current_ts.tv_sec < mu[lastest].tm.tsec)
444 || ((uint64_t)current_ts.tv_sec - mu[lastest].tm.tsec) < (uint64_t)(state->n->removal_delay * 3600 * 24))
445 removed_in_all = 0;
448 /* TODO: receive newer files from remote groups
450 * Yep, we should read it locally and send it to other groups too
452 if ((lastest_group != my_group) && !(mu[lastest].flags & DNET_IO_FLAGS_REMOVED)) {
453 dnet_log(state->n, DNET_LOG_DEBUG, "BULK: %s: File on remote group %d is newer, skipping this file\n",
454 dnet_dump_id_str(ids->id.id), lastest_group);
455 err = 0;
456 goto err_out_free;
459 for (i = 0; i < group_num; ++i) {
460 err = 0;
461 if (mu[i].group_id == my_group)
462 continue;
464 dnet_setup_id(&id, mu[i].group_id, ids->id.id);
465 id.type = -1;
467 if (mu[lastest].flags & DNET_IO_FLAGS_REMOVED) {
468 if (removed_in_all) {
469 dnet_log(state->n, DNET_LOG_DEBUG, "BULK: dnet_remove_object_now %s in group %d, err=%d\n",
470 dnet_dump_id(&id), mu[i].group_id, err);
471 err = dnet_remove_object_now(state->n, &id, cflags, ioflags);
472 } else {
473 if (!(mu[i].flags & DNET_IO_FLAGS_REMOVED)) {
474 err = dnet_remove_object(state->n, &id, NULL, NULL, cflags, ioflags);
475 dnet_log(state->n, DNET_LOG_DEBUG, "BULK: dnet_remove_object %s in group %d err=%d\n",
476 dnet_dump_id(&id), mu[i].group_id, err);
479 if (err < 0)
480 goto err_out_cont2;
481 } else {
482 if ((mu[i].tm.tsec < mu[lastest].tm.tsec) || ((mu[i].tm.tsec == mu[lastest].tm.tsec) &&
483 ((mu[i].tm.tnsec < mu[lastest].tm.tnsec)))) {
484 err = state->n->cb->send(state, state->n->cb->command_private, &id);
486 if (err)
487 goto err_out_cont2;
489 err = dnet_meta_update_check_status_raw(state->n, &mc);
490 if (err)
491 goto err_out_cont2;
493 memcpy(&mc.id, &id, sizeof(struct dnet_id));
494 err = dnet_write_metadata(state->n, &mc, 1, cflags);
495 dnet_log(state->n, DNET_LOG_DEBUG, "BULK: dnet_write_metadata %s in group %d, err=%d\n",
496 dnet_dump_id(&id), my_group, err);
498 if (err < 0)
499 goto err_out_cont2;
502 err_out_cont2:
503 if (err < 0)
504 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: Error during sending transaction to group %d, err=%d\n",
505 dnet_dump_id_str(ids->id.id), groups[i], err);
506 if (!error && err < 0)
507 error = err;
510 if (mu[lastest].flags & DNET_IO_FLAGS_REMOVED) {
511 if (removed_in_all) {
512 err = dnet_merge_remove_local(state->n, &mc.id, 0);
513 } else if (!(my_mu.flags & DNET_IO_FLAGS_REMOVED)) {
514 err = dnet_merge_remove_local(state->n, &mc.id, 1);
518 if (!(mu[lastest].flags & DNET_IO_FLAGS_REMOVED) && !error) {
519 err = dnet_meta_update_check_status(state->n, &mc);
520 if (err) {
521 dnet_log(state->n, DNET_LOG_ERROR, "BULK: %s: couldn't update meta CHECK_STATUS\n", dnet_dump_id_str(ids->id.id));
525 if (group_num > 2) {
526 err = dnet_db_remove_raw(p->db->b, &ids->id, 1);
527 if (!err) {
528 dnet_log_raw(state->n, DNET_LOG_ERROR, "BULK: %s: DB: failed to remove temp_meta object, err: %d.\n",
529 dnet_dump_id(&mc.id), err);
533 if (error > 0)
534 error = 0;
535 err_out_free:
536 free(temp_mc.data);
537 err_out_kcfree:
538 free(mc.data);
539 if (tmpdata)
540 free(tmpdata);
541 err_out_continue:
542 if (error < 0) {
543 dnet_log(state->n, DNET_LOG_ERROR, "Failed to check ID %s to %s, err=%d\n", dnet_dump_id_str(ids->id.id),
544 dnet_state_dump_addr(state), error);
546 if (i == group_num) {
547 dnet_counter_inc(state->n, DNET_CNTR_NODE_CHECK_COPY, error);
550 return error;
553 static int dnet_bulk_check_complete(struct dnet_net_state *state, struct dnet_cmd *cmd, void *priv)
555 struct dnet_bulk_check_priv *p = priv;
556 int err = 0, i;
558 if (is_trans_destroyed(state, cmd)) {
559 dnet_wakeup(p->w, p->w->cond++);
560 dnet_wait_put(p->w);
561 dnet_check_temp_db_put(p->db);
562 if (atomic_dec_and_test(&p->refcnt)) {
563 free(p->groups);
564 free(p);
566 return 0;
569 /* Empty reply that prevents timeout */
570 if (cmd->size == 0) {
571 return 0;
574 if (!(cmd->size % sizeof(struct dnet_bulk_id))) {
575 struct dnet_bulk_id *ids = (struct dnet_bulk_id *)(cmd + 1);
576 int num = cmd->size / sizeof(struct dnet_bulk_id);
578 dnet_log(state->n, DNET_LOG_DEBUG, "BULK: received %d entries\n", num);
580 //dnet_db_ptr_get(&state->n->temp_meta);
581 //ret = kcdbbegintran(state->n->temp_meta.db, 0);
582 //if (!ret) {
583 // err = -kcdbecode(state->n->temp_meta.db);
584 // dnet_log_raw(state->n, DNET_LOG_ERROR, "BULK: DB: failed to start temp_meta transaction, err: %d: %s.\n",
585 // err, kcecodename(-err));
586 // return err;
589 for (i = 0; i < num && !state->n->need_exit; ++i) {
590 err = dnet_bulk_check_complete_single(state, &ids[i], cmd->id.group_id, p);
593 //kcdbendtran(state->n->temp_meta.db, 1);
594 //dnet_db_ptr_put(state->n, &state->n->temp_meta);
596 if (err) {
597 dnet_log(state->n, DNET_LOG_ERROR, "BULK: couldn't update meta CHECK_STATUS\n");
599 } else {
600 dnet_log(state->n, DNET_LOG_ERROR, "BULK: received corrupted data, size = %llu, sizeof(dnet_bulk_id) = %zu\n",
601 (unsigned long long)cmd->size, sizeof(struct dnet_bulk_id));
604 p->w->status = cmd->status;
605 return err;
608 int dnet_request_bulk_check(struct dnet_node *n, struct dnet_bulk_state *state, struct dnet_check_params *params)
610 struct dnet_trans_control ctl;
611 struct dnet_net_state *st;
612 struct dnet_bulk_check_priv *p;
613 struct timespec wait_ts;
614 int err;
616 p = (struct dnet_bulk_check_priv *)malloc(sizeof(struct dnet_bulk_check_priv));
617 if (!p) {
618 err = -ENOMEM;
619 goto err_out_exit;
621 atomic_init(&p->refcnt, 2);
622 p->db = params->db;
623 dnet_check_temp_db_get(p->db);
625 p->w = dnet_wait_alloc(0);
626 if (!p->w) {
627 err = -ENOMEM;
628 goto err_out_free;
631 p->group_num = params->group_num;
632 p->groups = (int *)malloc(sizeof(int) * params->group_num);
633 if (!p->groups) {
634 err = -ENOMEM;
635 goto err_out_put;
637 memcpy(p->groups, params->groups, sizeof(int) * params->group_num);
639 memset(&ctl, 0, sizeof(struct dnet_trans_control));
641 ctl.cmd = DNET_CMD_LIST;
642 ctl.complete = dnet_bulk_check_complete;
643 ctl.priv = p;
644 ctl.cflags = DNET_FLAGS_NEED_ACK | DNET_FLAGS_NOLOCK | DNET_ATTR_BULK_CHECK;
646 ctl.data = state->ids;
647 ctl.size = sizeof(struct dnet_bulk_id) * state->num;
649 st = dnet_state_search_by_addr(n, &state->addr);
650 if (!st) {
651 err = -ENOENT;
652 goto err_out_put;
654 dnet_setup_id(&ctl.id, st->idc->group->group_id, st->idc->ids[0].raw.id);
655 dnet_wait_get(p->w);
657 dnet_log(n, DNET_LOG_DEBUG, "BULK: sending %u bytes of data to %s (%s)\n",
658 ctl.size, dnet_dump_id(&ctl.id), dnet_server_convert_dnet_addr(&state->addr));
659 err = dnet_trans_alloc_send_state(st, &ctl);
660 dnet_state_put(st);
662 wait_ts = n->wait_ts;
663 wait_ts.tv_sec *= DNET_BULK_IDS_SIZE;
664 err = dnet_wait_event(p->w, p->w->cond != 0, &wait_ts);
665 if (err)
666 goto err_out_put;
668 if (p->w->status) {
669 err = p->w->status;
670 goto err_out_put;
673 dnet_wait_put(p->w);
675 if (atomic_dec_and_test(&p->refcnt)) {
676 free(p->groups);
677 free(p);
680 return 0;
682 err_out_put:
683 dnet_wait_put(p->w);
685 err_out_free:
686 if (atomic_dec_and_test(&p->refcnt)) {
687 free(p->groups);
688 free(p);
691 err_out_exit:
692 dnet_log(n, DNET_LOG_ERROR, "Bulk check exited with status %d\n", err);
693 return err;
696 static int dnet_bulk_add_id(struct dnet_node *n, struct dnet_bulk_array *bulk_array, struct dnet_id *id,
697 struct dnet_meta_container *mc, struct dnet_check_params *params)
699 int err = 0;
700 struct dnet_bulk_state tmp;
701 struct dnet_bulk_state *state = NULL;
702 struct dnet_net_state *st = dnet_state_get_first(n, id);
703 struct dnet_bulk_id *bulk_id;
704 struct dnet_meta_update mu;
706 dnet_log(n, DNET_LOG_DEBUG, "BULK: adding ID %s to array\n", dnet_dump_id(id));
707 if (!st)
708 return -1;
710 memcpy(&tmp.addr, &st->addr, sizeof(struct dnet_addr));
711 dnet_state_put(st);
713 dnet_log(n, DNET_LOG_DEBUG, "BULK: Searching state in states array\n");
714 state = bsearch(&tmp, bulk_array->states, bulk_array->num, sizeof(struct dnet_bulk_state), dnet_compare_bulk_state);
715 if (!state)
716 return -1;
718 if (!dnet_get_meta_update(n, mc, &mu))
719 return -ENOENT;
721 dnet_log(n, DNET_LOG_DEBUG, "BULK: addr = %s state->num = %d\n", dnet_server_convert_dnet_addr(&state->addr), state->num);
722 //pthread_mutex_lock(&state->state_lock);
723 if (state->num >= DNET_BULK_IDS_SIZE || state->num < 0)
724 goto err_out_unlock;
726 bulk_id = &state->ids[state->num];
727 memset(bulk_id, 0, sizeof(struct dnet_bulk_id));
729 memcpy(&bulk_id->id, &id->id, DNET_ID_SIZE);
731 dnet_log(n, DNET_LOG_DEBUG, "BULK: ID: %s, last_update->tsec=%llu, last_update->tnsec=%llu, flags=%02llx\n",
732 dnet_dump_id_str(bulk_id->id.id), (unsigned long long)mu.tm.tsec, (unsigned long long)mu.tm.tnsec,
733 (unsigned long long)mu.flags);
735 dnet_convert_meta_update(&mu);
737 memcpy(&bulk_id->last_update, &mu, sizeof(struct dnet_meta_update));
739 state->num++;
741 dnet_log(n, DNET_LOG_DEBUG, "BULK: addr = %s state->num = %d\n", dnet_server_convert_dnet_addr(&state->addr), state->num);
742 if (state->num == DNET_BULK_IDS_SIZE) {
743 err = dnet_request_bulk_check(n, state, params);
744 state->num = 0;
745 if (err)
746 goto err_out_unlock;
749 //pthread_mutex_unlock(&state->state_lock);
751 return 0;
753 err_out_unlock:
754 //pthread_mutex_unlock(&state->state_lock);
755 return -2;
758 static int dnet_check_number_of_copies(struct dnet_node *n, struct dnet_meta_container *mc, int *groups, int group_num,
759 struct dnet_bulk_array *bulk_array, struct dnet_check_params *params)
761 struct dnet_id raw;
762 int group_id = mc->id.group_id;
763 int err = 0, i, error = 0;
765 for (i=0; i<group_num; ++i) {
766 if (groups[i] == group_id)
767 continue;
769 dnet_setup_id(&raw, groups[i], mc->id.id);
771 err = dnet_bulk_add_id(n, bulk_array, &raw, mc, params);
772 if (err)
773 dnet_log(n, DNET_LOG_ERROR, "BULK: after adding ID %s err = %d\n", dnet_dump_id(&raw), err);
775 if (!err)
776 error = 0;
777 else if (!error)
778 error = err;
781 return error;
784 static int dnet_check_copies(struct dnet_node *n, struct dnet_meta_container *mc,
785 struct dnet_bulk_array *bulk_array, struct dnet_check_params *params)
787 int err;
788 int *groups = NULL;
790 if (params->group_num) {
791 groups = params->groups;
792 err = params->group_num;
793 } else {
794 err = dnet_check_find_groups(n, mc, &groups);
795 if (err <= 0)
796 return -ENOENT;
799 err = dnet_check_number_of_copies(n, mc, groups, err, bulk_array, params);
801 if (!params->group_num)
802 free(groups);
804 return err;
807 static int dnet_merge_direct(struct dnet_node *n, struct dnet_meta_container *mc)
809 struct dnet_net_state *base;
810 int cflags = 0;
811 int err;
813 dnet_log(n, DNET_LOG_DEBUG, "in dnet_merge_direct mc->size = %u\n", mc->size);
814 base = dnet_node_state(n);
815 if (!base) {
816 err = -ENOENT;
817 goto err_out_exit;
820 err = n->cb->send(base, n->cb->command_private, &mc->id);
821 dnet_log(n, DNET_LOG_DEBUG, "in dnet_merge_direct after n->cb->send err = %d\n\n", err);
822 if (err < 0)
823 goto err_out_put;
825 dnet_log(n, DNET_LOG_DEBUG, "in dnet_merge_direct2 mc->size = %u\n", mc->size);
826 err = dnet_write_metadata(n, mc, 0, cflags);
827 if (err <= 0)
828 goto err_out_put;
830 err = 0;
832 err_out_put:
833 dnet_state_put(base);
834 err_out_exit:
835 return err;
839 static int dnet_merge_upload(struct dnet_node *n, struct dnet_meta_container *mc)
841 struct dnet_net_state *base;
842 int err = 0;
844 base = dnet_node_state(n);
845 if (!base) {
846 err = -ENOENT;
847 goto err_out_exit;
850 err = n->cb->send(base, n->cb->command_private, &mc->id);
851 if (err)
852 goto err_out_put;
854 err = dnet_write_metadata(n, mc, 0);
855 if (err <= 0)
856 goto err_out_put;
858 err_out_put:
859 dnet_state_put(base);
860 err_out_exit:
861 return err;
865 static int dnet_merge_common(struct dnet_node *n, struct dnet_meta_container *remote_meta, struct dnet_meta_container *mc)
867 int err = 0;
868 struct dnet_meta_update local, remote;
869 uint64_t cflags = 0;
870 uint64_t ioflags = 0;
872 dnet_log(n, DNET_LOG_DEBUG, "in dnet_merge_common mc->size = %d\n", mc->size);
873 if (!dnet_get_meta_update(n, mc, &local)) {
874 err = -ENOENT;
875 dnet_log(n, DNET_LOG_ERROR, "%s: META_UPDATE not found in local meta\n", dnet_dump_id(&mc->id));
876 goto err_out_exit;
879 if (!dnet_get_meta_update(n, remote_meta, &remote)) {
880 err = -ENOENT;
881 dnet_log(n, DNET_LOG_ERROR, "%s: META_UPDATE not found in remote meta, perform direct merge\n", dnet_dump_id(&mc->id));
882 err = dnet_merge_direct(n, mc);
883 goto err_out_exit;
886 if ((local.tm.tsec > remote.tm.tsec) || (local.tm.tsec == remote.tm.tsec && local.tm.tnsec > remote.tm.tnsec)) {
887 if (local.flags & DNET_IO_FLAGS_REMOVED) {
888 err = dnet_remove_object_now(n, &mc->id, cflags, ioflags);
889 } else {
890 err = dnet_merge_direct(n, mc);
894 err_out_exit:
895 return err;
899 static int dnet_check_merge(struct dnet_node *n, struct dnet_meta_container *mc)
901 int err;
902 struct dnet_meta_container remote_mc;
904 dnet_log(n, DNET_LOG_DEBUG, "in dnet_check_merge mc->size = %d\n", mc->size);
905 memset(&remote_mc, 0, sizeof(struct dnet_meta_container));
907 err = dnet_read_meta(n, &remote_mc, NULL, 0, &mc->id);
908 if (err) {
909 if (err != -ENOENT) {
910 dnet_log_raw(n, DNET_LOG_ERROR, "%s: failed to download object to be merged from storage: %d.\n",
911 dnet_dump_id(&mc->id), err);
912 goto err_out_exit;
915 dnet_log_raw(n, DNET_LOG_INFO, "%s: there is no meta in the storage to merge with, "
916 "doing direct merge (plain upload).\n", dnet_dump_id(&mc->id));
917 err = dnet_merge_direct(n, mc);
918 } else {
920 err = dnet_merge_common(n, &remote_mc, mc);
923 if (err)
924 goto err_out_exit;
926 err_out_exit:
927 if (remote_mc.data)
928 free(remote_mc.data);
929 return err;
932 int dnet_check(struct dnet_node *n, struct dnet_meta_container *mc, struct dnet_bulk_array *bulk_array,
933 int need_merge, struct dnet_check_params *params)
935 int err = 0;
937 dnet_log(n, DNET_LOG_DEBUG, "need_merge = %d, mc.size = %d\n", need_merge, mc->size);
938 if (need_merge) {
939 err = dnet_check_merge(n, mc);
940 dnet_log(n, DNET_LOG_DEBUG, "err=%d\n", err);
941 if (!err)
942 dnet_merge_remove_local(n, &mc->id, 0);
943 } else {
944 err = dnet_check_copies(n, mc, bulk_array, params);
946 return err;