client_login_timeout: check wait_for_welcome
[pgbouncer.git] / src / janitor.c
blobae0a6a888774afc7347a3400a530c4c773b0a9a9
1 /*
2 * PgBouncer - Lightweight connection pooler for PostgreSQL.
3 *
4 * Copyright (c) 2007-2009 Marko Kreen, Skype Technologies OÜ
5 *
6 * Permission to use, copy, modify, and/or distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20 * Periodic maintenance.
23 #include "bouncer.h"
25 /* do full maintenance 3x per second */
26 static struct timeval full_maint_period = {0, USEC / 3};
27 static struct event full_maint_ev;
29 /* close all sockets in server list */
30 static void close_server_list(StatList *sk_list, const char *reason)
32 List *item, *tmp;
33 PgSocket *server;
35 statlist_for_each_safe(item, sk_list, tmp) {
36 server = container_of(item, PgSocket, head);
37 disconnect_server(server, true, "%s", reason);
41 static void close_client_list(StatList *sk_list, const char *reason)
43 List *item, *tmp;
44 PgSocket *client;
46 statlist_for_each_safe(item, sk_list, tmp) {
47 client = container_of(item, PgSocket, head);
48 disconnect_client(client, true, "%s", reason);
52 bool suspend_socket(PgSocket *sk, bool force_suspend)
54 if (sk->suspended)
55 return true;
57 if (sbuf_is_empty(&sk->sbuf)) {
58 if (sbuf_pause(&sk->sbuf))
59 sk->suspended = 1;
62 if (sk->suspended || !force_suspend)
63 return sk->suspended;
65 if (is_server_socket(sk))
66 disconnect_server(sk, true, "suspend_timeout");
67 else
68 disconnect_client(sk, true, "suspend_timeout");
69 return true;
72 /* suspend all sockets in socket list */
73 static int suspend_socket_list(StatList *list, bool force_suspend)
75 List *item, *tmp;
76 PgSocket *sk;
77 int active = 0;
79 statlist_for_each_safe(item, list, tmp) {
80 sk = container_of(item, PgSocket, head);
81 if (!suspend_socket(sk, force_suspend))
82 active++;
84 return active;
87 /* resume all suspended sockets in socket list */
88 static void resume_socket_list(StatList *list)
90 List *item, *tmp;
91 PgSocket *sk;
93 statlist_for_each_safe(item, list, tmp) {
94 sk = container_of(item, PgSocket, head);
95 if (sk->suspended) {
96 sk->suspended = 0;
97 sbuf_continue(&sk->sbuf);
102 /* resume all suspended sockets in all pools */
103 static void resume_sockets(void)
105 List *item;
106 PgPool *pool;
108 statlist_for_each(item, &pool_list) {
109 pool = container_of(item, PgPool, head);
110 if (pool->db->admin)
111 continue;
112 resume_socket_list(&pool->active_client_list);
113 resume_socket_list(&pool->active_server_list);
114 resume_socket_list(&pool->idle_server_list);
115 resume_socket_list(&pool->used_server_list);
119 /* resume pools and listen sockets */
120 void resume_all(void)
122 resume_sockets();
123 resume_pooler();
127 * send test/reset query to server if needed
129 static void launch_recheck(PgPool *pool)
131 const char *q = cf_server_check_query;
132 bool need_check = true;
133 PgSocket *server;
134 bool res = true;
136 /* find clean server */
137 while (1) {
138 server = first_socket(&pool->used_server_list);
139 if (!server)
140 return;
141 if (server->ready)
142 break;
143 disconnect_server(server, true, "idle server got dirty");
146 /* is the check needed? */
147 if (q == NULL || q[0] == 0)
148 need_check = false;
149 else if (cf_server_check_delay > 0) {
150 usec_t now = get_cached_time();
151 if (now - server->request_time < cf_server_check_delay)
152 need_check = false;
155 if (need_check) {
156 /* send test query, wait for result */
157 slog_debug(server, "P: Checking: %s", q);
158 change_server_state(server, SV_TESTED);
159 SEND_generic(res, server, 'Q', "s", q);
160 if (!res)
161 disconnect_server(server, false, "test query failed");
162 } else
163 /* make immediately available */
164 release_server(server);
168 * make servers available
170 static void per_loop_activate(PgPool *pool)
172 List *item, *tmp;
173 PgSocket *client;
175 /* see if any server have been freed */
176 statlist_for_each_safe(item, &pool->waiting_client_list, tmp) {
177 client = container_of(item, PgSocket, head);
178 if (!statlist_empty(&pool->idle_server_list)) {
180 /* db not fully initialized after reboot */
181 if (client->wait_for_welcome && !pool->welcome_msg_ready) {
182 launch_new_connection(pool);
183 continue;
186 /* there is a ready server already */
187 activate_client(client);
188 } else if (!statlist_empty(&pool->tested_server_list)) {
189 /* some connections are in testing process */
190 break;
191 } else if (!statlist_empty(&pool->used_server_list)) {
192 /* ask for more connections to be tested */
193 launch_recheck(pool);
194 break;
195 } else {
196 /* not enough connections */
197 launch_new_connection(pool);
198 break;
204 * pause active clients
206 static int per_loop_pause(PgPool *pool)
208 int active = 0;
210 if (pool->db->admin)
211 return 0;
213 close_server_list(&pool->idle_server_list, "pause mode");
214 close_server_list(&pool->used_server_list, "pause mode");
215 close_server_list(&pool->new_server_list, "pause mode");
217 active += statlist_count(&pool->active_server_list);
218 active += statlist_count(&pool->tested_server_list);
220 return active;
224 * suspend active clients and servers
226 static int per_loop_suspend(PgPool *pool, bool force_suspend)
228 int active = 0;
230 if (pool->db->admin)
231 return 0;
233 active += suspend_socket_list(&pool->active_client_list, force_suspend);
235 /* this list is unsuspendable, but still need force_suspend and counting */
236 active += suspend_socket_list(&pool->waiting_client_list, force_suspend);
237 if (active)
238 per_loop_activate(pool);
240 if (!active) {
241 active += suspend_socket_list(&pool->active_server_list, force_suspend);
242 active += suspend_socket_list(&pool->idle_server_list, force_suspend);
244 /* as all clients are done, no need for them */
245 close_server_list(&pool->tested_server_list, "close unsafe file descriptors on suspend");
246 close_server_list(&pool->used_server_list, "close unsafe file descriptors on suspend");
249 return active;
253 * this function is called for each event loop.
255 void per_loop_maint(void)
257 List *item;
258 PgPool *pool;
259 int active = 0;
260 int partial_pause = 0;
261 bool force_suspend = false;
263 if (cf_pause_mode == P_SUSPEND && cf_suspend_timeout > 0) {
264 usec_t stime = get_cached_time() - g_suspend_start;
265 if (stime >= cf_suspend_timeout)
266 force_suspend = true;
269 statlist_for_each(item, &pool_list) {
270 pool = container_of(item, PgPool, head);
271 if (pool->db->admin)
272 continue;
273 switch (cf_pause_mode) {
274 case P_NONE:
275 if (pool->db->db_paused) {
276 partial_pause = 1;
277 active += per_loop_pause(pool);
278 } else
279 per_loop_activate(pool);
280 break;
281 case P_PAUSE:
282 active += per_loop_pause(pool);
283 break;
284 case P_SUSPEND:
285 active += per_loop_suspend(pool, force_suspend);
286 break;
290 switch (cf_pause_mode) {
291 case P_SUSPEND:
292 if (force_suspend) {
293 close_client_list(&login_client_list, "suspend_timeout");
294 } else
295 active += statlist_count(&login_client_list);
296 case P_PAUSE:
297 if (!active)
298 admin_pause_done();
299 break;
300 case P_NONE:
301 if (partial_pause && !active)
302 admin_pause_done();
303 break;
307 /* maintaining clients in pool */
308 static void pool_client_maint(PgPool *pool)
310 List *item, *tmp;
311 usec_t now = get_cached_time();
312 PgSocket *client;
313 usec_t age;
315 /* force client_idle_timeout */
316 if (cf_client_idle_timeout > 0) {
317 statlist_for_each_safe(item, &pool->active_client_list, tmp) {
318 client = container_of(item, PgSocket, head);
319 Assert(client->state == CL_ACTIVE);
320 if (client->link)
321 continue;
322 if (now - client->request_time > cf_client_idle_timeout)
323 disconnect_client(client, true, "client_idle_timeout");
327 /* force timeouts for waiting queries */
328 if (cf_query_timeout > 0 || cf_query_wait_timeout > 0) {
329 statlist_for_each_safe(item, &pool->waiting_client_list, tmp) {
330 client = container_of(item, PgSocket, head);
331 Assert(client->state == CL_WAITING);
332 if (client->query_start == 0) {
333 age = now - client->request_time;
334 //log_warning("query_start==0");
335 } else
336 age = now - client->query_start;
338 if (cf_query_timeout > 0 && age > cf_query_timeout)
339 disconnect_client(client, true, "query_timeout");
340 else if (cf_query_wait_timeout > 0 && age > cf_query_wait_timeout)
341 disconnect_client(client, true, "query_wait_timeout");
345 /* apply client_login_timeout to clients waiting for welcome pkt */
346 if (cf_client_login_timeout > 0 && !pool->welcome_msg_ready) {
347 statlist_for_each_safe(item, &pool->waiting_client_list, tmp) {
348 client = container_of(item, PgSocket, head);
349 age = now - client->connect_time;
350 if (!client->wait_for_welcome)
351 continue;
352 if (age > cf_client_login_timeout)
353 disconnect_client(client, true, "client_login_timeout (server down)");
358 static void check_unused_servers(PgPool *pool, StatList *slist, bool idle_test)
360 usec_t now = get_cached_time();
361 List *item, *tmp;
362 usec_t idle, age;
363 PgSocket *server;
364 usec_t lifetime_kill_gap = 0;
367 * Calculate the time that disconnects because of server_lifetime
368 * must be separated. This avoids the need to re-launch lot
369 * of connections together.
371 if (pool->db->pool_size > 0)
372 lifetime_kill_gap = cf_server_lifetime / pool->db->pool_size;
374 /* disconnect idle servers if needed */
375 statlist_for_each_safe(item, slist, tmp) {
376 server = container_of(item, PgSocket, head);
378 age = now - server->connect_time;
379 idle = now - server->request_time;
381 if (server->close_needed) {
382 disconnect_server(server, true, "database configuration changed");
383 } else if (server->state == SV_IDLE && !server->ready) {
384 disconnect_server(server, true, "SV_IDLE server got dirty");
385 } else if (server->state == SV_USED && !server->ready) {
386 disconnect_server(server, true, "SV_USED server got dirty");
387 } else if (cf_server_idle_timeout > 0 && idle > cf_server_idle_timeout) {
388 disconnect_server(server, true, "server idle timeout");
389 } else if (age >= cf_server_lifetime) {
390 if (pool->last_lifetime_disconnect + lifetime_kill_gap <= now) {
391 disconnect_server(server, true, "server lifetime over");
392 pool->last_lifetime_disconnect = now;
394 } else if (cf_pause_mode == P_PAUSE) {
395 disconnect_server(server, true, "pause mode");
396 } else if (idle_test && *cf_server_check_query) {
397 if (idle > cf_server_check_delay)
398 change_server_state(server, SV_USED);
404 * Check pool size, close conns if too many. Makes pooler
405 * react faster to the case when admin decreased pool size.
407 static void check_pool_size(PgPool *pool)
409 PgSocket *server;
410 int cur = statlist_count(&pool->active_server_list)
411 + statlist_count(&pool->idle_server_list)
412 + statlist_count(&pool->used_server_list)
413 + statlist_count(&pool->tested_server_list);
415 /* cancel pkt may create new srv conn without
416 * taking pool_size into account
418 * statlist_count(&pool->new_server_list)
421 int many = cur - (pool->db->pool_size + pool->db->res_pool_size);
423 Assert(pool->db->pool_size >= 0);
425 while (many > 0) {
426 server = first_socket(&pool->used_server_list);
427 if (!server)
428 server = first_socket(&pool->idle_server_list);
429 if (!server)
430 break;
431 disconnect_server(server, true, "too many servers in the pool");
432 many--;
436 /* maintain servers in a pool */
437 static void pool_server_maint(PgPool *pool)
439 List *item, *tmp;
440 usec_t age, now = get_cached_time();
441 PgSocket *server;
443 /* find and disconnect idle servers */
444 check_unused_servers(pool, &pool->used_server_list, 0);
445 check_unused_servers(pool, &pool->tested_server_list, 0);
446 check_unused_servers(pool, &pool->idle_server_list, 1);
448 /* where query got did not get answer in query_timeout */
449 if (cf_query_timeout > 0) {
450 statlist_for_each_safe(item, &pool->active_server_list, tmp) {
451 server = container_of(item, PgSocket, head);
452 Assert(server->state == SV_ACTIVE);
453 if (server->ready)
454 continue;
455 age = now - server->link->request_time;
456 if (age > cf_query_timeout)
457 disconnect_server(server, true, "query timeout");
461 /* find connections that got connect, but could not log in */
462 if (cf_server_connect_timeout > 0) {
463 statlist_for_each_safe(item, &pool->new_server_list, tmp) {
464 server = container_of(item, PgSocket, head);
465 Assert(server->state == SV_LOGIN);
467 age = now - server->connect_time;
468 if (age > cf_server_connect_timeout)
469 disconnect_server(server, true, "connect timeout");
473 check_pool_size(pool);
476 static void cleanup_client_logins(void)
478 List *item, *tmp;
479 PgSocket *client;
480 usec_t age;
481 usec_t now = get_cached_time();
483 if (cf_client_login_timeout <= 0)
484 return;
486 statlist_for_each_safe(item, &login_client_list, tmp) {
487 client = container_of(item, PgSocket, head);
488 age = now - client->connect_time;
489 if (age > cf_client_login_timeout)
490 disconnect_client(client, true, "client_login_timeout");
494 static void kill_database(PgDatabase *db);
495 static void cleanup_inactive_autodatabases(void)
497 List *item, *tmp;
498 PgDatabase *db;
499 usec_t age;
500 usec_t now = get_cached_time();
502 if (cf_autodb_idle_timeout <= 0)
503 return;
505 statlist_for_each_safe(item, &autodatabase_idle_list, tmp) {
506 db = container_of(item, PgDatabase, head);
507 age = now - db->inactive_time;
508 if (age > cf_autodb_idle_timeout)
509 kill_database(db);
510 else
511 break;
515 /* full-scale maintenance, done only occasionally */
516 static void do_full_maint(int sock, short flags, void *arg)
518 List *item, *tmp;
519 PgPool *pool;
522 * Avoid doing anything that may surprise other pgbouncer.
524 if (cf_pause_mode == P_SUSPEND)
525 goto skip_maint;
527 statlist_for_each_safe(item, &pool_list, tmp) {
528 pool = container_of(item, PgPool, head);
529 if (pool->db->admin)
530 continue;
531 pool_server_maint(pool);
532 pool_client_maint(pool);
533 if (pool->db->db_auto && pool->db->inactive_time == 0 &&
534 pool_client_count(pool) == 0 && pool_server_count(pool) == 0 ) {
535 pool->db->inactive_time = get_cached_time();
536 statlist_remove(&pool->db->head, &database_list);
537 statlist_append(&pool->db->head, &autodatabase_idle_list);
541 cleanup_inactive_autodatabases();
543 cleanup_client_logins();
545 if (cf_shutdown == 1 && get_active_server_count() == 0) {
546 log_info("server connections dropped, exiting");
547 cf_shutdown = 2;
548 event_loopbreak();
549 return;
552 if (cf_auth_type >= AUTH_TRUST)
553 loader_users_check();
555 skip_maint:
556 safe_evtimer_add(&full_maint_ev, &full_maint_period);
559 /* first-time initializtion */
560 void janitor_setup(void)
562 /* launch maintenance */
563 evtimer_set(&full_maint_ev, do_full_maint, NULL);
564 safe_evtimer_add(&full_maint_ev, &full_maint_period);
567 static void kill_pool(PgPool *pool)
569 const char *reason = "database removed";
571 close_client_list(&pool->active_client_list, reason);
572 close_client_list(&pool->waiting_client_list, reason);
573 close_client_list(&pool->cancel_req_list, reason);
575 close_server_list(&pool->active_server_list, reason);
576 close_server_list(&pool->idle_server_list, reason);
577 close_server_list(&pool->used_server_list, reason);
578 close_server_list(&pool->tested_server_list, reason);
579 close_server_list(&pool->new_server_list, reason);
581 list_del(&pool->map_head);
582 statlist_remove(&pool->head, &pool_list);
583 obj_free(pool_cache, pool);
586 static void kill_database(PgDatabase *db)
588 PgPool *pool;
589 List *item, *tmp;
591 log_warning("dropping database '%s' as it does not exist anymore or inactive auto-database", db->name);
593 statlist_for_each_safe(item, &pool_list, tmp) {
594 pool = container_of(item, PgPool, head);
595 if (pool->db == db)
596 kill_pool(pool);
598 if (db->forced_user)
599 obj_free(user_cache, db->forced_user);
600 if (db->connect_query)
601 free((void *)db->connect_query);
602 if (db->inactive_time)
603 statlist_remove(&db->head, &autodatabase_idle_list);
604 else
605 statlist_remove(&db->head, &database_list);
606 obj_free(db_cache, db);
609 /* as [pgbouncer] section can be loaded after databases,
610 there's need for review */
611 void config_postprocess(void)
613 List *item, *tmp;
614 PgDatabase *db;
616 statlist_for_each_safe(item, &database_list, tmp) {
617 db = container_of(item, PgDatabase, head);
618 if (db->db_dead) {
619 kill_database(db);
620 continue;
622 if (db->pool_size < 0)
623 db->pool_size = cf_default_pool_size;
624 if (db->res_pool_size < 0)
625 db->res_pool_size = cf_res_pool_size;