Elliptics version update: 2.19.2.6
[elliptics.git] / library / trans.c
blob11c984942491cc75add75c293ca2f52baed4a8bc
1 /*
2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
3 * All rights reserved.
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
17 #include <sys/stat.h>
19 #include <assert.h>
20 #include <fcntl.h>
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <unistd.h>
25 #include "elliptics.h"
26 #include "elliptics/packet.h"
27 #include "elliptics/interface.h"
29 static inline int dnet_trans_cmp(uint64_t old, uint64_t new)
31 if (old > new)
32 return 1;
33 if (old < new)
34 return -1;
35 return 0;
38 struct dnet_trans *dnet_trans_search(struct rb_root *root, uint64_t trans)
40 struct rb_node *n = root->rb_node;
41 struct dnet_trans *t = NULL;
42 int cmp = 1;
44 while (n) {
45 t = rb_entry(n, struct dnet_trans, trans_entry);
47 cmp = dnet_trans_cmp(t->trans, trans);
48 if (cmp < 0)
49 n = n->rb_left;
50 else if (cmp > 0)
51 n = n->rb_right;
52 else
53 return dnet_trans_get(t);
56 return NULL;
59 int dnet_trans_insert_nolock(struct rb_root *root, struct dnet_trans *a)
61 struct rb_node **n = &root->rb_node, *parent = NULL;
62 struct dnet_trans *t;
63 int cmp;
65 while (*n) {
66 parent = *n;
68 t = rb_entry(parent, struct dnet_trans, trans_entry);
70 cmp = dnet_trans_cmp(t->trans, a->trans);
71 if (cmp < 0)
72 n = &parent->rb_left;
73 else if (cmp > 0)
74 n = &parent->rb_right;
75 else
76 return -EEXIST;
79 if (a->st && a->st->n)
80 dnet_log(a->st->n, DNET_LOG_NOTICE, "%s: added transaction: %llu -> %s.\n",
81 dnet_dump_id(&a->cmd.id), (unsigned long long)a->trans,
82 dnet_server_convert_dnet_addr(&a->st->addr));
84 rb_link_node(&a->trans_entry, parent, n);
85 rb_insert_color(&a->trans_entry, root);
86 return 0;
89 void dnet_trans_remove_nolock(struct rb_root *root, struct dnet_trans *t)
91 if (!t->trans_entry.rb_parent_color) {
92 if (t->st && t->st->n)
93 dnet_log(t->st->n, DNET_LOG_ERROR, "%s: trying to remove standalone transaction %llu.\n",
94 dnet_dump_id(&t->cmd.id), (unsigned long long)t->trans);
95 return;
98 if (t) {
99 rb_erase(&t->trans_entry, root);
100 t->trans_entry.rb_parent_color = 0;
104 void dnet_trans_remove(struct dnet_trans *t)
106 struct dnet_net_state *st = t->st;
108 pthread_mutex_lock(&st->trans_lock);
109 dnet_trans_remove_nolock(&st->trans_root, t);
110 list_del_init(&t->trans_list_entry);
111 pthread_mutex_unlock(&st->trans_lock);
114 struct dnet_trans *dnet_trans_alloc(struct dnet_node *n __unused, uint64_t size)
116 struct dnet_trans *t;
118 t = malloc(sizeof(struct dnet_trans) + size);
119 if (!t)
120 goto err_out_exit;
122 memset(t, 0, sizeof(struct dnet_trans) + size);
124 atomic_init(&t->refcnt, 1);
125 INIT_LIST_HEAD(&t->trans_list_entry);
127 gettimeofday(&t->start, NULL);
129 return t;
131 err_out_exit:
132 return NULL;
135 void dnet_trans_destroy(struct dnet_trans *t)
137 struct dnet_net_state *st = NULL;
138 struct timeval tv;
139 long diff;
141 if (!t)
142 return;
144 gettimeofday(&tv, NULL);
145 diff = 1000000 * (tv.tv_sec - t->start.tv_sec) + (tv.tv_usec - t->start.tv_usec);
147 if (t->st && t->st->n) {
148 st = t->st;
150 pthread_mutex_lock(&st->trans_lock);
151 list_del_init(&t->trans_list_entry);
152 pthread_mutex_unlock(&st->trans_lock);
154 if (t->trans_entry.rb_parent_color)
155 dnet_trans_remove(t);
156 } else if (!list_empty(&t->trans_list_entry)) {
157 assert(0);
160 if (t->complete) {
161 t->cmd.flags |= DNET_FLAGS_DESTROY;
162 t->complete(t->st, &t->cmd, t->priv);
165 if (st && (t->cmd.status == 0) &&
166 ((t->command == DNET_CMD_READ) || (t->command == DNET_CMD_LOOKUP))) {
168 if (diff < st->median_read_time && st->weight < DNET_STATE_MAX_WEIGHT)
169 st->weight *= 1.1;
170 else if (diff > st->median_read_time && st->weight > 1)
171 st->weight *= 0.8;
173 st->median_read_time = (st->median_read_time + diff) / 2;
176 if (st && st->n && t->command != 0) {
177 char str[64];
178 struct tm tm;
180 localtime_r((time_t *)&t->start.tv_sec, &tm);
181 strftime(str, sizeof(str), "%F %R:%S", &tm);
183 dnet_log(st->n, DNET_LOG_INFO, "%s: destruction %s trans: %llu, reply: %d, st: %s, weight: %f, mrt: %ld, time: %ld, started: %s.%06lu, cached status: %d.\n",
184 dnet_dump_id(&t->cmd.id),
185 dnet_cmd_string(t->command),
186 (unsigned long long)(t->trans & ~DNET_TRANS_REPLY),
187 !!(t->trans & ~DNET_TRANS_REPLY),
188 dnet_state_dump_addr(t->st),
189 st->weight, st->median_read_time, diff,
190 str, t->start.tv_usec,
191 t->cmd.status);
195 dnet_state_put(t->st);
196 dnet_state_put(t->orig);
198 free(t);
201 int dnet_trans_alloc_send_state(struct dnet_net_state *st, struct dnet_trans_control *ctl)
203 struct dnet_io_req req;
204 struct dnet_node *n = st->n;
205 struct dnet_cmd *cmd;
206 struct dnet_trans *t;
207 int err;
209 t = dnet_trans_alloc(n, sizeof(struct dnet_cmd) + ctl->size);
210 if (!t) {
211 err = -ENOMEM;
212 if (ctl->complete)
213 ctl->complete(NULL, NULL, ctl->priv);
214 goto err_out_exit;
217 t->complete = ctl->complete;
218 t->priv = ctl->priv;
220 cmd = (struct dnet_cmd *)(t + 1);
222 memcpy(&cmd->id, &ctl->id, sizeof(struct dnet_id));
223 cmd->flags = ctl->cflags;
224 cmd->size = ctl->size;
226 memcpy(&t->cmd, cmd, sizeof(struct dnet_cmd));
228 cmd->cmd = t->command = ctl->cmd;
230 if (ctl->size && ctl->data)
231 memcpy(cmd + 1, ctl->data, ctl->size);
233 cmd->trans = t->rcv_trans = t->trans = atomic_inc(&n->trans);
235 dnet_convert_cmd(cmd);
237 t->st = dnet_state_get(st);
239 memset(&req, 0, sizeof(req));
240 req.st = st;
241 req.header = cmd;
242 req.hsize = sizeof(struct dnet_cmd) + ctl->size;
244 dnet_log(n, DNET_LOG_INFO, "%s: alloc/send %s trans: %llu -> %s %f.\n",
245 dnet_dump_id(&cmd->id),
246 dnet_cmd_string(ctl->cmd),
247 (unsigned long long)t->trans,
248 dnet_server_convert_dnet_addr(&t->st->addr), t->st->weight);
250 err = dnet_trans_send(t, &req);
251 if (err)
252 goto err_out_put;
254 return 0;
256 err_out_put:
257 dnet_trans_put(t);
258 err_out_exit:
259 return err;
262 int dnet_trans_alloc_send(struct dnet_session *s, struct dnet_trans_control *ctl)
264 struct dnet_node *n = s->node;
265 struct dnet_net_state *st;
266 int err;
268 st = dnet_state_get_first(n, &ctl->id);
269 if (!st) {
270 err = -ENOENT;
271 if (ctl->complete)
272 ctl->complete(NULL, NULL, ctl->priv);
273 goto err_out_exit;
276 err = dnet_trans_alloc_send_state(st, ctl);
277 dnet_state_put(st);
279 err_out_exit:
280 return err;
283 static void dnet_trans_check_stall(struct dnet_net_state *st)
285 struct dnet_trans *t;
286 struct timeval tv;
287 int trans_timeout = 0;
289 gettimeofday(&tv, NULL);
291 pthread_mutex_lock(&st->trans_lock);
292 list_for_each_entry(t, &st->trans_list, trans_list_entry) {
293 if (t->time.tv_sec >= tv.tv_sec)
294 break;
296 dnet_log(st->n, DNET_LOG_ERROR, "%s: trans: %llu TIMEOUT\n", dnet_state_dump_addr(st), (unsigned long long)t->trans);
297 trans_timeout++;
299 pthread_mutex_unlock(&st->trans_lock);
301 if (trans_timeout) {
302 st->stall++;
304 if (st->weight >= 2)
305 st->weight /= 2;
307 dnet_log(st->n, DNET_LOG_ERROR, "%s: TIMEOUT: transactions: %d, stall counter: %d, weight: %f\n",
308 dnet_state_dump_addr(st), trans_timeout, st->stall, st->weight);
309 if (st->stall >= st->n->stall_count) {
310 shutdown(st->read_s, 2);
311 shutdown(st->write_s, 2);
313 dnet_state_remove_nolock(st);
314 } else {
315 dnet_schedule_recv(st);
316 dnet_schedule_send(st);
318 } else {
319 st->stall = 0;
321 if (st->weight < DNET_STATE_MAX_WEIGHT)
322 st->weight *= 1.2;
324 if (st->stall) {
325 dnet_log(st->n, DNET_LOG_INFO, "%s: reseting state stall counter: weight: %f\n",
326 dnet_state_dump_addr(st), st->weight);
331 static void dnet_check_all_states(struct dnet_node *n)
333 struct dnet_net_state *st, *tmp;
334 struct dnet_group *g, *gtmp;
336 pthread_mutex_lock(&n->state_lock);
337 list_for_each_entry_safe(g, gtmp, &n->group_list, group_entry) {
338 list_for_each_entry_safe(st, tmp, &g->state_list, state_entry) {
339 dnet_trans_check_stall(st);
342 pthread_mutex_unlock(&n->state_lock);
345 static int dnet_check_route_table(struct dnet_node *n)
347 int rnd = rand();
348 struct dnet_id id;
349 int groups[128];
350 int group_num = 0, i;
351 struct dnet_net_state *st;
352 struct dnet_group *g;
354 pthread_mutex_lock(&n->state_lock);
355 list_for_each_entry(g, &n->group_list, group_entry) {
356 groups[group_num++] = g->group_id;
358 if (group_num > (int)ARRAY_SIZE(groups))
359 break;
361 pthread_mutex_unlock(&n->state_lock);
363 for (i = 0; i < group_num; ++i) {
364 id.group_id = groups[i];
365 memcpy(id.id, &rnd, sizeof(rnd));
367 st = dnet_state_get_first(n, &id);
368 if (st) {
369 dnet_recv_route_list(st);
370 dnet_state_put(st);
374 return 0;
377 static void *dnet_check_process(void *data)
379 struct dnet_node *n = data;
380 long i, timeout, wait_for_stall;
381 struct timeval tv1, tv2;
382 int checks = 0, route_table_checks = 3;
384 dnet_set_name("check");
386 if (!n->check_timeout)
387 n->check_timeout = 10;
389 dnet_log(n, DNET_LOG_INFO, "Started checking thread. Timeout: %lu seconds.\n",
390 n->check_timeout);
392 while (!n->need_exit) {
393 gettimeofday(&tv1, NULL);
394 dnet_try_reconnect(n);
395 if (++checks == route_table_checks) {
396 checks = 0;
397 dnet_check_route_table(n);
400 dnet_discovery(n);
401 gettimeofday(&tv2, NULL);
403 timeout = n->check_timeout - (tv2.tv_sec - tv1.tv_sec);
404 wait_for_stall = n->wait_ts.tv_sec;
406 for (i=0; i<timeout; ++i) {
407 if (n->need_exit)
408 break;
410 if (--wait_for_stall == 0) {
411 wait_for_stall = n->wait_ts.tv_sec;
412 dnet_check_all_states(n);
414 sleep(1);
418 return NULL;
421 int dnet_check_thread_start(struct dnet_node *n)
423 int err;
425 err = pthread_create(&n->check_tid, NULL, dnet_check_process, n);
426 if (err) {
427 dnet_log(n, DNET_LOG_ERROR, "Failed to start tree checking thread: err: %d.\n",
428 err);
429 return -err;
432 return 0;
435 void dnet_check_thread_stop(struct dnet_node *n)
437 pthread_join(n->check_tid, NULL);
438 dnet_log(n, DNET_LOG_NOTICE, "Checking thread stopped.\n");