2 * 2008+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <sys/types.h>
25 #include "elliptics.h"
26 #include "elliptics/packet.h"
27 #include "elliptics/interface.h"
29 static inline int dnet_trans_cmp(uint64_t old
, uint64_t new)
38 struct dnet_trans
*dnet_trans_search(struct rb_root
*root
, uint64_t trans
)
40 struct rb_node
*n
= root
->rb_node
;
41 struct dnet_trans
*t
= NULL
;
45 t
= rb_entry(n
, struct dnet_trans
, trans_entry
);
47 cmp
= dnet_trans_cmp(t
->trans
, trans
);
53 return dnet_trans_get(t
);
59 int dnet_trans_insert_nolock(struct rb_root
*root
, struct dnet_trans
*a
)
61 struct rb_node
**n
= &root
->rb_node
, *parent
= NULL
;
68 t
= rb_entry(parent
, struct dnet_trans
, trans_entry
);
70 cmp
= dnet_trans_cmp(t
->trans
, a
->trans
);
74 n
= &parent
->rb_right
;
79 if (a
->st
&& a
->st
->n
)
80 dnet_log(a
->st
->n
, DNET_LOG_NOTICE
, "%s: added transaction: %llu -> %s.\n",
81 dnet_dump_id(&a
->cmd
.id
), (unsigned long long)a
->trans
,
82 dnet_server_convert_dnet_addr(&a
->st
->addr
));
84 rb_link_node(&a
->trans_entry
, parent
, n
);
85 rb_insert_color(&a
->trans_entry
, root
);
89 void dnet_trans_remove_nolock(struct rb_root
*root
, struct dnet_trans
*t
)
91 if (!t
->trans_entry
.rb_parent_color
) {
92 if (t
->st
&& t
->st
->n
)
93 dnet_log(t
->st
->n
, DNET_LOG_ERROR
, "%s: trying to remove standalone transaction %llu.\n",
94 dnet_dump_id(&t
->cmd
.id
), (unsigned long long)t
->trans
);
99 rb_erase(&t
->trans_entry
, root
);
100 t
->trans_entry
.rb_parent_color
= 0;
104 void dnet_trans_remove(struct dnet_trans
*t
)
106 struct dnet_net_state
*st
= t
->st
;
108 pthread_mutex_lock(&st
->trans_lock
);
109 dnet_trans_remove_nolock(&st
->trans_root
, t
);
110 list_del_init(&t
->trans_list_entry
);
111 pthread_mutex_unlock(&st
->trans_lock
);
114 struct dnet_trans
*dnet_trans_alloc(struct dnet_node
*n __unused
, uint64_t size
)
116 struct dnet_trans
*t
;
118 t
= malloc(sizeof(struct dnet_trans
) + size
);
122 memset(t
, 0, sizeof(struct dnet_trans
) + size
);
124 atomic_init(&t
->refcnt
, 1);
125 INIT_LIST_HEAD(&t
->trans_list_entry
);
127 gettimeofday(&t
->start
, NULL
);
135 void dnet_trans_destroy(struct dnet_trans
*t
)
137 struct dnet_net_state
*st
= NULL
;
144 gettimeofday(&tv
, NULL
);
145 diff
= 1000000 * (tv
.tv_sec
- t
->start
.tv_sec
) + (tv
.tv_usec
- t
->start
.tv_usec
);
147 if (t
->st
&& t
->st
->n
) {
150 pthread_mutex_lock(&st
->trans_lock
);
151 list_del_init(&t
->trans_list_entry
);
152 pthread_mutex_unlock(&st
->trans_lock
);
154 if (t
->trans_entry
.rb_parent_color
)
155 dnet_trans_remove(t
);
156 } else if (!list_empty(&t
->trans_list_entry
)) {
161 t
->cmd
.flags
|= DNET_FLAGS_DESTROY
;
162 t
->complete(t
->st
, &t
->cmd
, t
->priv
);
165 if (st
&& (t
->cmd
.status
== 0) &&
166 ((t
->command
== DNET_CMD_READ
) || (t
->command
== DNET_CMD_LOOKUP
))) {
168 if (diff
< st
->median_read_time
&& st
->weight
< DNET_STATE_MAX_WEIGHT
)
170 else if (diff
> st
->median_read_time
&& st
->weight
> 1)
173 st
->median_read_time
= (st
->median_read_time
+ diff
) / 2;
176 if (st
&& st
->n
&& t
->command
!= 0) {
180 localtime_r((time_t *)&t
->start
.tv_sec
, &tm
);
181 strftime(str
, sizeof(str
), "%F %R:%S", &tm
);
183 dnet_log(st
->n
, DNET_LOG_INFO
, "%s: destruction %s trans: %llu, reply: %d, st: %s, weight: %f, mrt: %ld, time: %ld, started: %s.%06lu, cached status: %d.\n",
184 dnet_dump_id(&t
->cmd
.id
),
185 dnet_cmd_string(t
->command
),
186 (unsigned long long)(t
->trans
& ~DNET_TRANS_REPLY
),
187 !!(t
->trans
& ~DNET_TRANS_REPLY
),
188 dnet_state_dump_addr(t
->st
),
189 st
->weight
, st
->median_read_time
, diff
,
190 str
, t
->start
.tv_usec
,
195 dnet_state_put(t
->st
);
196 dnet_state_put(t
->orig
);
201 int dnet_trans_alloc_send_state(struct dnet_net_state
*st
, struct dnet_trans_control
*ctl
)
203 struct dnet_io_req req
;
204 struct dnet_node
*n
= st
->n
;
205 struct dnet_cmd
*cmd
;
206 struct dnet_trans
*t
;
209 t
= dnet_trans_alloc(n
, sizeof(struct dnet_cmd
) + ctl
->size
);
213 ctl
->complete(NULL
, NULL
, ctl
->priv
);
217 t
->complete
= ctl
->complete
;
220 cmd
= (struct dnet_cmd
*)(t
+ 1);
222 memcpy(&cmd
->id
, &ctl
->id
, sizeof(struct dnet_id
));
223 cmd
->flags
= ctl
->cflags
;
224 cmd
->size
= ctl
->size
;
226 memcpy(&t
->cmd
, cmd
, sizeof(struct dnet_cmd
));
228 cmd
->cmd
= t
->command
= ctl
->cmd
;
230 if (ctl
->size
&& ctl
->data
)
231 memcpy(cmd
+ 1, ctl
->data
, ctl
->size
);
233 cmd
->trans
= t
->rcv_trans
= t
->trans
= atomic_inc(&n
->trans
);
235 dnet_convert_cmd(cmd
);
237 t
->st
= dnet_state_get(st
);
239 memset(&req
, 0, sizeof(req
));
242 req
.hsize
= sizeof(struct dnet_cmd
) + ctl
->size
;
244 dnet_log(n
, DNET_LOG_INFO
, "%s: alloc/send %s trans: %llu -> %s %f.\n",
245 dnet_dump_id(&cmd
->id
),
246 dnet_cmd_string(ctl
->cmd
),
247 (unsigned long long)t
->trans
,
248 dnet_server_convert_dnet_addr(&t
->st
->addr
), t
->st
->weight
);
250 err
= dnet_trans_send(t
, &req
);
262 int dnet_trans_alloc_send(struct dnet_session
*s
, struct dnet_trans_control
*ctl
)
264 struct dnet_node
*n
= s
->node
;
265 struct dnet_net_state
*st
;
268 st
= dnet_state_get_first(n
, &ctl
->id
);
272 ctl
->complete(NULL
, NULL
, ctl
->priv
);
276 err
= dnet_trans_alloc_send_state(st
, ctl
);
283 static void dnet_trans_check_stall(struct dnet_net_state
*st
)
285 struct dnet_trans
*t
;
287 int trans_timeout
= 0;
289 gettimeofday(&tv
, NULL
);
291 pthread_mutex_lock(&st
->trans_lock
);
292 list_for_each_entry(t
, &st
->trans_list
, trans_list_entry
) {
293 if (t
->time
.tv_sec
>= tv
.tv_sec
)
296 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: trans: %llu TIMEOUT\n", dnet_state_dump_addr(st
), (unsigned long long)t
->trans
);
299 pthread_mutex_unlock(&st
->trans_lock
);
307 dnet_log(st
->n
, DNET_LOG_ERROR
, "%s: TIMEOUT: transactions: %d, stall counter: %d, weight: %f\n",
308 dnet_state_dump_addr(st
), trans_timeout
, st
->stall
, st
->weight
);
309 if (st
->stall
>= st
->n
->stall_count
) {
310 shutdown(st
->read_s
, 2);
311 shutdown(st
->write_s
, 2);
313 dnet_state_remove_nolock(st
);
315 dnet_schedule_recv(st
);
316 dnet_schedule_send(st
);
321 if (st
->weight
< DNET_STATE_MAX_WEIGHT
)
325 dnet_log(st
->n
, DNET_LOG_INFO
, "%s: reseting state stall counter: weight: %f\n",
326 dnet_state_dump_addr(st
), st
->weight
);
331 static void dnet_check_all_states(struct dnet_node
*n
)
333 struct dnet_net_state
*st
, *tmp
;
334 struct dnet_group
*g
, *gtmp
;
336 pthread_mutex_lock(&n
->state_lock
);
337 list_for_each_entry_safe(g
, gtmp
, &n
->group_list
, group_entry
) {
338 list_for_each_entry_safe(st
, tmp
, &g
->state_list
, state_entry
) {
339 dnet_trans_check_stall(st
);
342 pthread_mutex_unlock(&n
->state_lock
);
345 static int dnet_check_route_table(struct dnet_node
*n
)
350 int group_num
= 0, i
;
351 struct dnet_net_state
*st
;
352 struct dnet_group
*g
;
354 pthread_mutex_lock(&n
->state_lock
);
355 list_for_each_entry(g
, &n
->group_list
, group_entry
) {
356 groups
[group_num
++] = g
->group_id
;
358 if (group_num
> (int)ARRAY_SIZE(groups
))
361 pthread_mutex_unlock(&n
->state_lock
);
363 for (i
= 0; i
< group_num
; ++i
) {
364 id
.group_id
= groups
[i
];
365 memcpy(id
.id
, &rnd
, sizeof(rnd
));
367 st
= dnet_state_get_first(n
, &id
);
369 dnet_recv_route_list(st
);
377 static void *dnet_check_process(void *data
)
379 struct dnet_node
*n
= data
;
380 long i
, timeout
, wait_for_stall
;
381 struct timeval tv1
, tv2
;
382 int checks
= 0, route_table_checks
= 3;
384 dnet_set_name("check");
386 if (!n
->check_timeout
)
387 n
->check_timeout
= 10;
389 dnet_log(n
, DNET_LOG_INFO
, "Started checking thread. Timeout: %lu seconds.\n",
392 while (!n
->need_exit
) {
393 gettimeofday(&tv1
, NULL
);
394 dnet_try_reconnect(n
);
395 if (++checks
== route_table_checks
) {
397 dnet_check_route_table(n
);
401 gettimeofday(&tv2
, NULL
);
403 timeout
= n
->check_timeout
- (tv2
.tv_sec
- tv1
.tv_sec
);
404 wait_for_stall
= n
->wait_ts
.tv_sec
;
406 for (i
=0; i
<timeout
; ++i
) {
410 if (--wait_for_stall
== 0) {
411 wait_for_stall
= n
->wait_ts
.tv_sec
;
412 dnet_check_all_states(n
);
421 int dnet_check_thread_start(struct dnet_node
*n
)
425 err
= pthread_create(&n
->check_tid
, NULL
, dnet_check_process
, n
);
427 dnet_log(n
, DNET_LOG_ERROR
, "Failed to start tree checking thread: err: %d.\n",
435 void dnet_check_thread_stop(struct dnet_node
*n
)
437 pthread_join(n
->check_tid
, NULL
);
438 dnet_log(n
, DNET_LOG_NOTICE
, "Checking thread stopped.\n");