2 * PgBouncer - Lightweight connection pooler for PostgreSQL.
4 * Copyright (c) 2007-2009 Marko Kreen, Skype Technologies OÜ
6 * Permission to use, copy, modify, and/or distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20 * Periodic maintenance.
25 /* do full maintenance 3x per second */
26 static struct timeval full_maint_period
= {0, USEC
/ 3};
27 static struct event full_maint_ev
;
29 /* close all sockets in server list */
30 static void close_server_list(StatList
*sk_list
, const char *reason
)
35 statlist_for_each_safe(item
, sk_list
, tmp
) {
36 server
= container_of(item
, PgSocket
, head
);
37 disconnect_server(server
, true, "%s", reason
);
41 static void close_client_list(StatList
*sk_list
, const char *reason
)
46 statlist_for_each_safe(item
, sk_list
, tmp
) {
47 client
= container_of(item
, PgSocket
, head
);
48 disconnect_client(client
, true, "%s", reason
);
52 bool suspend_socket(PgSocket
*sk
, bool force_suspend
)
57 if (sbuf_is_empty(&sk
->sbuf
)) {
58 if (sbuf_pause(&sk
->sbuf
))
62 if (sk
->suspended
|| !force_suspend
)
65 if (is_server_socket(sk
))
66 disconnect_server(sk
, true, "suspend_timeout");
68 disconnect_client(sk
, true, "suspend_timeout");
72 /* suspend all sockets in socket list */
73 static int suspend_socket_list(StatList
*list
, bool force_suspend
)
79 statlist_for_each_safe(item
, list
, tmp
) {
80 sk
= container_of(item
, PgSocket
, head
);
81 if (!suspend_socket(sk
, force_suspend
))
87 /* resume all suspended sockets in socket list */
88 static void resume_socket_list(StatList
*list
)
93 statlist_for_each_safe(item
, list
, tmp
) {
94 sk
= container_of(item
, PgSocket
, head
);
97 sbuf_continue(&sk
->sbuf
);
102 /* resume all suspended sockets in all pools */
103 static void resume_sockets(void)
108 statlist_for_each(item
, &pool_list
) {
109 pool
= container_of(item
, PgPool
, head
);
112 resume_socket_list(&pool
->active_client_list
);
113 resume_socket_list(&pool
->active_server_list
);
114 resume_socket_list(&pool
->idle_server_list
);
115 resume_socket_list(&pool
->used_server_list
);
119 /* resume pools and listen sockets */
120 void resume_all(void)
127 * send test/reset query to server if needed
129 static void launch_recheck(PgPool
*pool
)
131 const char *q
= cf_server_check_query
;
132 bool need_check
= true;
136 /* find clean server */
138 server
= first_socket(&pool
->used_server_list
);
143 disconnect_server(server
, true, "idle server got dirty");
146 /* is the check needed? */
147 if (q
== NULL
|| q
[0] == 0)
149 else if (cf_server_check_delay
> 0) {
150 usec_t now
= get_cached_time();
151 if (now
- server
->request_time
< cf_server_check_delay
)
156 /* send test query, wait for result */
157 slog_debug(server
, "P: Checking: %s", q
);
158 change_server_state(server
, SV_TESTED
);
159 SEND_generic(res
, server
, 'Q', "s", q
);
161 disconnect_server(server
, false, "test query failed");
163 /* make immediately available */
164 release_server(server
);
168 * make servers available
170 static void per_loop_activate(PgPool
*pool
)
175 /* see if any server have been freed */
176 statlist_for_each_safe(item
, &pool
->waiting_client_list
, tmp
) {
177 client
= container_of(item
, PgSocket
, head
);
178 if (!statlist_empty(&pool
->idle_server_list
)) {
180 /* db not fully initialized after reboot */
181 if (client
->wait_for_welcome
&& !pool
->welcome_msg_ready
) {
182 launch_new_connection(pool
);
186 /* there is a ready server already */
187 activate_client(client
);
188 } else if (!statlist_empty(&pool
->tested_server_list
)) {
189 /* some connections are in testing process */
191 } else if (!statlist_empty(&pool
->used_server_list
)) {
192 /* ask for more connections to be tested */
193 launch_recheck(pool
);
196 /* not enough connections */
197 launch_new_connection(pool
);
204 * pause active clients
206 static int per_loop_pause(PgPool
*pool
)
213 close_server_list(&pool
->idle_server_list
, "pause mode");
214 close_server_list(&pool
->used_server_list
, "pause mode");
215 close_server_list(&pool
->new_server_list
, "pause mode");
217 active
+= statlist_count(&pool
->active_server_list
);
218 active
+= statlist_count(&pool
->tested_server_list
);
224 * suspend active clients and servers
226 static int per_loop_suspend(PgPool
*pool
, bool force_suspend
)
233 active
+= suspend_socket_list(&pool
->active_client_list
, force_suspend
);
235 /* this list is unsuspendable, but still need force_suspend and counting */
236 active
+= suspend_socket_list(&pool
->waiting_client_list
, force_suspend
);
238 per_loop_activate(pool
);
241 active
+= suspend_socket_list(&pool
->active_server_list
, force_suspend
);
242 active
+= suspend_socket_list(&pool
->idle_server_list
, force_suspend
);
244 /* as all clients are done, no need for them */
245 close_server_list(&pool
->tested_server_list
, "close unsafe file descriptors on suspend");
246 close_server_list(&pool
->used_server_list
, "close unsafe file descriptors on suspend");
253 * this function is called for each event loop.
255 void per_loop_maint(void)
260 int partial_pause
= 0;
261 bool force_suspend
= false;
263 if (cf_pause_mode
== P_SUSPEND
&& cf_suspend_timeout
> 0) {
264 usec_t stime
= get_cached_time() - g_suspend_start
;
265 if (stime
>= cf_suspend_timeout
)
266 force_suspend
= true;
269 statlist_for_each(item
, &pool_list
) {
270 pool
= container_of(item
, PgPool
, head
);
273 switch (cf_pause_mode
) {
275 if (pool
->db
->db_paused
) {
277 active
+= per_loop_pause(pool
);
279 per_loop_activate(pool
);
282 active
+= per_loop_pause(pool
);
285 active
+= per_loop_suspend(pool
, force_suspend
);
290 switch (cf_pause_mode
) {
293 close_client_list(&login_client_list
, "suspend_timeout");
295 active
+= statlist_count(&login_client_list
);
301 if (partial_pause
&& !active
)
307 /* maintaining clients in pool */
308 static void pool_client_maint(PgPool
*pool
)
311 usec_t now
= get_cached_time();
315 /* force client_idle_timeout */
316 if (cf_client_idle_timeout
> 0) {
317 statlist_for_each_safe(item
, &pool
->active_client_list
, tmp
) {
318 client
= container_of(item
, PgSocket
, head
);
319 Assert(client
->state
== CL_ACTIVE
);
322 if (now
- client
->request_time
> cf_client_idle_timeout
)
323 disconnect_client(client
, true, "client_idle_timeout");
327 /* force timeouts for waiting queries */
328 if (cf_query_timeout
> 0 || cf_query_wait_timeout
> 0) {
329 statlist_for_each_safe(item
, &pool
->waiting_client_list
, tmp
) {
330 client
= container_of(item
, PgSocket
, head
);
331 Assert(client
->state
== CL_WAITING
);
332 if (client
->query_start
== 0) {
333 age
= now
- client
->request_time
;
334 //log_warning("query_start==0");
336 age
= now
- client
->query_start
;
338 if (cf_query_timeout
> 0 && age
> cf_query_timeout
)
339 disconnect_client(client
, true, "query_timeout");
340 else if (cf_query_wait_timeout
> 0 && age
> cf_query_wait_timeout
)
341 disconnect_client(client
, true, "query_wait_timeout");
345 /* apply client_login_timeout to clients waiting for welcome pkt */
346 if (cf_client_login_timeout
> 0 && !pool
->welcome_msg_ready
) {
347 statlist_for_each_safe(item
, &pool
->waiting_client_list
, tmp
) {
348 client
= container_of(item
, PgSocket
, head
);
349 age
= now
- client
->connect_time
;
350 if (!client
->wait_for_welcome
)
352 if (age
> cf_client_login_timeout
)
353 disconnect_client(client
, true, "client_login_timeout (server down)");
358 static void check_unused_servers(PgPool
*pool
, StatList
*slist
, bool idle_test
)
360 usec_t now
= get_cached_time();
364 usec_t lifetime_kill_gap
= 0;
367 * Calculate the time that disconnects because of server_lifetime
368 * must be separated. This avoids the need to re-launch lot
369 * of connections together.
371 if (pool
->db
->pool_size
> 0)
372 lifetime_kill_gap
= cf_server_lifetime
/ pool
->db
->pool_size
;
374 /* disconnect idle servers if needed */
375 statlist_for_each_safe(item
, slist
, tmp
) {
376 server
= container_of(item
, PgSocket
, head
);
378 age
= now
- server
->connect_time
;
379 idle
= now
- server
->request_time
;
381 if (server
->close_needed
) {
382 disconnect_server(server
, true, "database configuration changed");
383 } else if (server
->state
== SV_IDLE
&& !server
->ready
) {
384 disconnect_server(server
, true, "SV_IDLE server got dirty");
385 } else if (server
->state
== SV_USED
&& !server
->ready
) {
386 disconnect_server(server
, true, "SV_USED server got dirty");
387 } else if (cf_server_idle_timeout
> 0 && idle
> cf_server_idle_timeout
) {
388 disconnect_server(server
, true, "server idle timeout");
389 } else if (age
>= cf_server_lifetime
) {
390 if (pool
->last_lifetime_disconnect
+ lifetime_kill_gap
<= now
) {
391 disconnect_server(server
, true, "server lifetime over");
392 pool
->last_lifetime_disconnect
= now
;
394 } else if (cf_pause_mode
== P_PAUSE
) {
395 disconnect_server(server
, true, "pause mode");
396 } else if (idle_test
&& *cf_server_check_query
) {
397 if (idle
> cf_server_check_delay
)
398 change_server_state(server
, SV_USED
);
404 * Check pool size, close conns if too many. Makes pooler
405 * react faster to the case when admin decreased pool size.
407 static void check_pool_size(PgPool
*pool
)
410 int cur
= statlist_count(&pool
->active_server_list
)
411 + statlist_count(&pool
->idle_server_list
)
412 + statlist_count(&pool
->used_server_list
)
413 + statlist_count(&pool
->tested_server_list
);
415 /* cancel pkt may create new srv conn without
416 * taking pool_size into account
418 * statlist_count(&pool->new_server_list)
421 int many
= cur
- (pool
->db
->pool_size
+ pool
->db
->res_pool_size
);
423 Assert(pool
->db
->pool_size
>= 0);
426 server
= first_socket(&pool
->used_server_list
);
428 server
= first_socket(&pool
->idle_server_list
);
431 disconnect_server(server
, true, "too many servers in the pool");
436 /* maintain servers in a pool */
437 static void pool_server_maint(PgPool
*pool
)
440 usec_t age
, now
= get_cached_time();
443 /* find and disconnect idle servers */
444 check_unused_servers(pool
, &pool
->used_server_list
, 0);
445 check_unused_servers(pool
, &pool
->tested_server_list
, 0);
446 check_unused_servers(pool
, &pool
->idle_server_list
, 1);
448 /* where query got did not get answer in query_timeout */
449 if (cf_query_timeout
> 0) {
450 statlist_for_each_safe(item
, &pool
->active_server_list
, tmp
) {
451 server
= container_of(item
, PgSocket
, head
);
452 Assert(server
->state
== SV_ACTIVE
);
455 age
= now
- server
->link
->request_time
;
456 if (age
> cf_query_timeout
)
457 disconnect_server(server
, true, "query timeout");
461 /* find connections that got connect, but could not log in */
462 if (cf_server_connect_timeout
> 0) {
463 statlist_for_each_safe(item
, &pool
->new_server_list
, tmp
) {
464 server
= container_of(item
, PgSocket
, head
);
465 Assert(server
->state
== SV_LOGIN
);
467 age
= now
- server
->connect_time
;
468 if (age
> cf_server_connect_timeout
)
469 disconnect_server(server
, true, "connect timeout");
473 check_pool_size(pool
);
476 static void cleanup_client_logins(void)
481 usec_t now
= get_cached_time();
483 if (cf_client_login_timeout
<= 0)
486 statlist_for_each_safe(item
, &login_client_list
, tmp
) {
487 client
= container_of(item
, PgSocket
, head
);
488 age
= now
- client
->connect_time
;
489 if (age
> cf_client_login_timeout
)
490 disconnect_client(client
, true, "client_login_timeout");
494 static void kill_database(PgDatabase
*db
);
495 static void cleanup_inactive_autodatabases(void)
500 usec_t now
= get_cached_time();
502 if (cf_autodb_idle_timeout
<= 0)
505 statlist_for_each_safe(item
, &autodatabase_idle_list
, tmp
) {
506 db
= container_of(item
, PgDatabase
, head
);
507 age
= now
- db
->inactive_time
;
508 if (age
> cf_autodb_idle_timeout
)
515 /* full-scale maintenance, done only occasionally */
516 static void do_full_maint(int sock
, short flags
, void *arg
)
522 * Avoid doing anything that may surprise other pgbouncer.
524 if (cf_pause_mode
== P_SUSPEND
)
527 statlist_for_each_safe(item
, &pool_list
, tmp
) {
528 pool
= container_of(item
, PgPool
, head
);
531 pool_server_maint(pool
);
532 pool_client_maint(pool
);
533 if (pool
->db
->db_auto
&& pool
->db
->inactive_time
== 0 &&
534 pool_client_count(pool
) == 0 && pool_server_count(pool
) == 0 ) {
535 pool
->db
->inactive_time
= get_cached_time();
536 statlist_remove(&pool
->db
->head
, &database_list
);
537 statlist_append(&pool
->db
->head
, &autodatabase_idle_list
);
541 cleanup_inactive_autodatabases();
543 cleanup_client_logins();
545 if (cf_shutdown
== 1 && get_active_server_count() == 0) {
546 log_info("server connections dropped, exiting");
552 if (cf_auth_type
>= AUTH_TRUST
)
553 loader_users_check();
556 safe_evtimer_add(&full_maint_ev
, &full_maint_period
);
559 /* first-time initializtion */
560 void janitor_setup(void)
562 /* launch maintenance */
563 evtimer_set(&full_maint_ev
, do_full_maint
, NULL
);
564 safe_evtimer_add(&full_maint_ev
, &full_maint_period
);
567 static void kill_pool(PgPool
*pool
)
569 const char *reason
= "database removed";
571 close_client_list(&pool
->active_client_list
, reason
);
572 close_client_list(&pool
->waiting_client_list
, reason
);
573 close_client_list(&pool
->cancel_req_list
, reason
);
575 close_server_list(&pool
->active_server_list
, reason
);
576 close_server_list(&pool
->idle_server_list
, reason
);
577 close_server_list(&pool
->used_server_list
, reason
);
578 close_server_list(&pool
->tested_server_list
, reason
);
579 close_server_list(&pool
->new_server_list
, reason
);
581 list_del(&pool
->map_head
);
582 statlist_remove(&pool
->head
, &pool_list
);
583 obj_free(pool_cache
, pool
);
586 static void kill_database(PgDatabase
*db
)
591 log_warning("dropping database '%s' as it does not exist anymore or inactive auto-database", db
->name
);
593 statlist_for_each_safe(item
, &pool_list
, tmp
) {
594 pool
= container_of(item
, PgPool
, head
);
599 obj_free(user_cache
, db
->forced_user
);
600 if (db
->connect_query
)
601 free((void *)db
->connect_query
);
602 if (db
->inactive_time
)
603 statlist_remove(&db
->head
, &autodatabase_idle_list
);
605 statlist_remove(&db
->head
, &database_list
);
606 obj_free(db_cache
, db
);
609 /* as [pgbouncer] section can be loaded after databases,
610 there's need for review */
611 void config_postprocess(void)
616 statlist_for_each_safe(item
, &database_list
, tmp
) {
617 db
= container_of(item
, PgDatabase
, head
);
622 if (db
->pool_size
< 0)
623 db
->pool_size
= cf_default_pool_size
;
624 if (db
->res_pool_size
< 0)
625 db
->res_pool_size
= cf_res_pool_size
;