1 #include "git-compat-util.h"
3 #include "simple-ipc.h"
5 #include "thread-utils.h"
7 #include "unix-socket.h"
8 #include "unix-stream-server.h"
10 #ifndef SUPPORTS_SIMPLE_IPC
12 * This source file should only be compiled when Simple IPC is supported.
13 * See the top-level Makefile.
15 #error SUPPORTS_SIMPLE_IPC not defined
18 enum ipc_active_state
ipc_get_active_state(const char *path
)
20 enum ipc_active_state state
= IPC_STATE__OTHER_ERROR
;
21 struct ipc_client_connect_options options
22 = IPC_CLIENT_CONNECT_OPTIONS_INIT
;
24 struct ipc_client_connection
*connection_test
= NULL
;
26 options
.wait_if_busy
= 0;
27 options
.wait_if_not_found
= 0;
29 if (lstat(path
, &st
) == -1) {
33 return IPC_STATE__NOT_LISTENING
;
35 return IPC_STATE__INVALID_PATH
;
41 * Cygwin emulates Unix sockets by writing special-crafted files whose
42 * `system` bit is set.
44 * If we are too fast, Cygwin might still be in the process of marking
45 * the underlying file as a system file. Until then, we will not see a
46 * Unix socket here, but a plain file instead. Just in case that this
47 * is happening, wait a little and try again.
50 static const int delay
[] = { 1, 10, 20, 40, -1 };
53 for (i
= 0; S_ISREG(st
.st_mode
) && delay
[i
] > 0; i
++) {
54 sleep_millisec(delay
[i
]);
55 if (lstat(path
, &st
) == -1)
56 return IPC_STATE__INVALID_PATH
;
61 /* also complain if a plain file is in the way */
62 if ((st
.st_mode
& S_IFMT
) != S_IFSOCK
)
63 return IPC_STATE__INVALID_PATH
;
66 * Just because the filesystem has a S_IFSOCK type inode
67 * at `path`, doesn't mean it that there is a server listening.
70 state
= ipc_client_try_connect(path
, &options
, &connection_test
);
71 ipc_client_close_connection(connection_test
);
77 * Retry frequency when trying to connect to a server.
79 * This value should be short enough that we don't seriously delay our
80 * caller, but not fast enough that our spinning puts pressure on the
83 #define WAIT_STEP_MS (50)
86 * Try to connect to the server. If the server is just starting up or
87 * is very busy, we may not get a connection the first time.
89 static enum ipc_active_state
connect_to_server(
92 const struct ipc_client_connect_options
*options
,
99 for (k
= 0; k
< timeout_ms
; k
+= WAIT_STEP_MS
) {
100 int fd
= unix_stream_connect(path
, options
->uds_disallow_chdir
);
104 return IPC_STATE__LISTENING
;
107 if (errno
== ENOENT
) {
108 if (!options
->wait_if_not_found
)
109 return IPC_STATE__PATH_NOT_FOUND
;
111 goto sleep_and_try_again
;
114 if (errno
== ETIMEDOUT
) {
115 if (!options
->wait_if_busy
)
116 return IPC_STATE__NOT_LISTENING
;
118 goto sleep_and_try_again
;
121 if (errno
== ECONNREFUSED
) {
122 if (!options
->wait_if_busy
)
123 return IPC_STATE__NOT_LISTENING
;
125 goto sleep_and_try_again
;
128 return IPC_STATE__OTHER_ERROR
;
131 sleep_millisec(WAIT_STEP_MS
);
134 return IPC_STATE__NOT_LISTENING
;
138 * The total amount of time that we are willing to wait when trying to
139 * connect to a server.
141 * When the server is first started, it might take a little while for
142 * it to become ready to service requests. Likewise, the server may
143 * be very (temporarily) busy and not respond to our connections.
145 * We should gracefully and silently handle those conditions and try
146 * again for a reasonable time period.
148 * The value chosen here should be long enough for the server
149 * to reliably heal from the above conditions.
151 #define MY_CONNECTION_TIMEOUT_MS (1000)
153 enum ipc_active_state
ipc_client_try_connect(
155 const struct ipc_client_connect_options
*options
,
156 struct ipc_client_connection
**p_connection
)
158 enum ipc_active_state state
= IPC_STATE__OTHER_ERROR
;
161 *p_connection
= NULL
;
163 trace2_region_enter("ipc-client", "try-connect", NULL
);
164 trace2_data_string("ipc-client", NULL
, "try-connect/path", path
);
166 state
= connect_to_server(path
, MY_CONNECTION_TIMEOUT_MS
,
169 trace2_data_intmax("ipc-client", NULL
, "try-connect/state",
171 trace2_region_leave("ipc-client", "try-connect", NULL
);
173 if (state
== IPC_STATE__LISTENING
) {
174 (*p_connection
) = xcalloc(1, sizeof(struct ipc_client_connection
));
175 (*p_connection
)->fd
= fd
;
181 void ipc_client_close_connection(struct ipc_client_connection
*connection
)
186 if (connection
->fd
!= -1)
187 close(connection
->fd
);
192 int ipc_client_send_command_to_connection(
193 struct ipc_client_connection
*connection
,
194 const char *message
, size_t message_len
,
195 struct strbuf
*answer
)
199 strbuf_setlen(answer
, 0);
201 trace2_region_enter("ipc-client", "send-command", NULL
);
203 if (write_packetized_from_buf_no_flush(message
, message_len
,
204 connection
->fd
) < 0 ||
205 packet_flush_gently(connection
->fd
) < 0) {
206 ret
= error(_("could not send IPC command"));
210 if (read_packetized_to_strbuf(
211 connection
->fd
, answer
,
212 PACKET_READ_GENTLE_ON_EOF
| PACKET_READ_GENTLE_ON_READ_ERROR
) < 0) {
213 ret
= error(_("could not read IPC response"));
218 trace2_region_leave("ipc-client", "send-command", NULL
);
222 int ipc_client_send_command(const char *path
,
223 const struct ipc_client_connect_options
*options
,
224 const char *message
, size_t message_len
,
225 struct strbuf
*answer
)
228 enum ipc_active_state state
;
229 struct ipc_client_connection
*connection
= NULL
;
231 state
= ipc_client_try_connect(path
, options
, &connection
);
233 if (state
!= IPC_STATE__LISTENING
)
236 ret
= ipc_client_send_command_to_connection(connection
,
237 message
, message_len
,
240 ipc_client_close_connection(connection
);
245 static int set_socket_blocking_flag(int fd
, int make_nonblocking
)
249 flags
= fcntl(fd
, F_GETFL
, NULL
);
254 if (make_nonblocking
)
257 flags
&= ~O_NONBLOCK
;
259 return fcntl(fd
, F_SETFL
, flags
);
263 * Magic numbers used to annotate callback instance data.
264 * These are used to help guard against accidentally passing the
265 * wrong instance data across multiple levels of callbacks (which
266 * is easy to do if there are `void*` arguments).
269 MAGIC_SERVER_REPLY_DATA
,
270 MAGIC_WORKER_THREAD_DATA
,
271 MAGIC_ACCEPT_THREAD_DATA
,
275 struct ipc_server_reply_data
{
278 struct ipc_worker_thread_data
*worker_thread_data
;
281 struct ipc_worker_thread_data
{
283 struct ipc_worker_thread_data
*next_thread
;
284 struct ipc_server_data
*server_data
;
285 pthread_t pthread_id
;
288 struct ipc_accept_thread_data
{
290 struct ipc_server_data
*server_data
;
292 struct unix_ss_socket
*server_socket
;
294 int fd_send_shutdown
;
295 int fd_wait_shutdown
;
296 pthread_t pthread_id
;
300 * With unix-sockets, the conceptual "ipc-server" is implemented as a single
301 * controller "accept-thread" thread and a pool of "worker-thread" threads.
302 * The former does the usual `accept()` loop and dispatches connections
303 * to an idle worker thread. The worker threads wait in an idle loop for
304 * a new connection, communicate with the client and relay data to/from
305 * the `application_cb` and then wait for another connection from the
306 * server thread. This avoids the overhead of constantly creating and
307 * destroying threads.
309 struct ipc_server_data
{
311 ipc_server_application_cb
*application_cb
;
312 void *application_data
;
313 struct strbuf buf_path
;
315 struct ipc_accept_thread_data
*accept_thread
;
316 struct ipc_worker_thread_data
*worker_thread_list
;
318 pthread_mutex_t work_available_mutex
;
319 pthread_cond_t work_available_cond
;
322 * Accepted but not yet processed client connections are kept
323 * in a circular buffer FIFO. The queue is empty when the
324 * positions are equal.
331 int shutdown_requested
;
336 * Remove and return the oldest queued connection.
338 * Returns -1 if empty.
340 static int fifo_dequeue(struct ipc_server_data
*server_data
)
342 /* ASSERT holding mutex */
346 if (server_data
->back_pos
== server_data
->front_pos
)
349 fd
= server_data
->fifo_fds
[server_data
->front_pos
];
350 server_data
->fifo_fds
[server_data
->front_pos
] = -1;
352 server_data
->front_pos
++;
353 if (server_data
->front_pos
== server_data
->queue_size
)
354 server_data
->front_pos
= 0;
360 * Push a new fd onto the back of the queue.
362 * Drop it and return -1 if queue is already full.
364 static int fifo_enqueue(struct ipc_server_data
*server_data
, int fd
)
366 /* ASSERT holding mutex */
370 next_back_pos
= server_data
->back_pos
+ 1;
371 if (next_back_pos
== server_data
->queue_size
)
374 if (next_back_pos
== server_data
->front_pos
) {
375 /* Queue is full. Just drop it. */
380 server_data
->fifo_fds
[server_data
->back_pos
] = fd
;
381 server_data
->back_pos
= next_back_pos
;
387 * Wait for a connection to be queued to the FIFO and return it.
389 * Returns -1 if someone has already requested a shutdown.
391 static int worker_thread__wait_for_connection(
392 struct ipc_worker_thread_data
*worker_thread_data
)
394 /* ASSERT NOT holding mutex */
396 struct ipc_server_data
*server_data
= worker_thread_data
->server_data
;
399 pthread_mutex_lock(&server_data
->work_available_mutex
);
401 if (server_data
->shutdown_requested
)
404 fd
= fifo_dequeue(server_data
);
408 pthread_cond_wait(&server_data
->work_available_cond
,
409 &server_data
->work_available_mutex
);
411 pthread_mutex_unlock(&server_data
->work_available_mutex
);
417 * Forward declare our reply callback function so that any compiler
418 * errors are reported when we actually define the function (in addition
419 * to any errors reported when we try to pass this callback function as
420 * a parameter in a function call). The former are easier to understand.
422 static ipc_server_reply_cb do_io_reply_callback
;
425 * Relay application's response message to the client process.
426 * (We do not flush at this point because we allow the caller
427 * to chunk data to the client thru us.)
429 static int do_io_reply_callback(struct ipc_server_reply_data
*reply_data
,
430 const char *response
, size_t response_len
)
432 if (reply_data
->magic
!= MAGIC_SERVER_REPLY_DATA
)
433 BUG("reply_cb called with wrong instance data");
435 return write_packetized_from_buf_no_flush(response
, response_len
,
439 /* A randomly chosen value. */
440 #define MY_WAIT_POLL_TIMEOUT_MS (10)
443 * If the client hangs up without sending any data on the wire, just
444 * quietly close the socket and ignore this client.
446 * This worker thread is committed to reading the IPC request data
447 * from the client at the other end of this fd. Wait here for the
448 * client to actually put something on the wire -- because if the
449 * client just does a ping (connect and hangup without sending any
450 * data), our use of the pkt-line read routines will spew an error
453 * Return -1 if the client hung up.
454 * Return 0 if data (possibly incomplete) is ready.
456 static int worker_thread__wait_for_io_start(
457 struct ipc_worker_thread_data
*worker_thread_data
,
460 struct ipc_server_data
*server_data
= worker_thread_data
->server_data
;
461 struct pollfd pollfd
[1];
466 pollfd
[0].events
= POLLIN
;
468 result
= poll(pollfd
, 1, MY_WAIT_POLL_TIMEOUT_MS
);
480 pthread_mutex_lock(&server_data
->work_available_mutex
);
481 in_shutdown
= server_data
->shutdown_requested
;
482 pthread_mutex_unlock(&server_data
->work_available_mutex
);
485 * If a shutdown is already in progress and this
486 * client has not started talking yet, just drop it.
493 if (pollfd
[0].revents
& POLLHUP
)
496 if (pollfd
[0].revents
& POLLIN
)
508 * Receive the request/command from the client and pass it to the
509 * registered request-callback. The request-callback will compose
510 * a response and call our reply-callback to send it to the client.
512 static int worker_thread__do_io(
513 struct ipc_worker_thread_data
*worker_thread_data
,
516 /* ASSERT NOT holding lock */
518 struct strbuf buf
= STRBUF_INIT
;
519 struct ipc_server_reply_data reply_data
;
522 reply_data
.magic
= MAGIC_SERVER_REPLY_DATA
;
523 reply_data
.worker_thread_data
= worker_thread_data
;
527 ret
= read_packetized_to_strbuf(
529 PACKET_READ_GENTLE_ON_EOF
| PACKET_READ_GENTLE_ON_READ_ERROR
);
531 ret
= worker_thread_data
->server_data
->application_cb(
532 worker_thread_data
->server_data
->application_data
,
533 buf
.buf
, buf
.len
, do_io_reply_callback
, &reply_data
);
535 packet_flush_gently(reply_data
.fd
);
539 * The client probably disconnected/shutdown before it
540 * could send a well-formed message. Ignore it.
544 strbuf_release(&buf
);
545 close(reply_data
.fd
);
551 * Block SIGPIPE on the current thread (so that we get EPIPE from
552 * write() rather than an actual signal).
554 * Note that using sigchain_push() and _pop() to control SIGPIPE
555 * around our IO calls is not thread safe:
556 * [] It uses a global stack of handler frames.
557 * [] It uses ALLOC_GROW() to resize it.
558 * [] Finally, according to the `signal(2)` man-page:
559 * "The effects of `signal()` in a multithreaded process are unspecified."
561 static void thread_block_sigpipe(sigset_t
*old_set
)
565 sigemptyset(&new_set
);
566 sigaddset(&new_set
, SIGPIPE
);
568 sigemptyset(old_set
);
569 pthread_sigmask(SIG_BLOCK
, &new_set
, old_set
);
573 * Thread proc for an IPC worker thread. It handles a series of
574 * connections from clients. It pulls the next fd from the queue
575 * processes it, and then waits for the next client.
577 * Block SIGPIPE in this worker thread for the life of the thread.
578 * This avoids stray (and sometimes delayed) SIGPIPE signals caused
579 * by client errors and/or when we are under extremely heavy IO load.
581 * This means that the application callback will have SIGPIPE blocked.
582 * The callback should not change it.
584 static void *worker_thread_proc(void *_worker_thread_data
)
586 struct ipc_worker_thread_data
*worker_thread_data
= _worker_thread_data
;
587 struct ipc_server_data
*server_data
= worker_thread_data
->server_data
;
592 trace2_thread_start("ipc-worker");
594 thread_block_sigpipe(&old_set
);
597 fd
= worker_thread__wait_for_connection(worker_thread_data
);
599 break; /* in shutdown */
601 io
= worker_thread__wait_for_io_start(worker_thread_data
, fd
);
603 continue; /* client hung up without sending anything */
605 ret
= worker_thread__do_io(worker_thread_data
, fd
);
607 if (ret
== SIMPLE_IPC_QUIT
) {
608 trace2_data_string("ipc-worker", NULL
, "queue_stop_async",
611 * The application layer is telling the ipc-server
614 * We DO NOT have a response to send to the client.
616 * Queue an async stop (to stop the other threads) and
617 * allow this worker thread to exit now (no sense waiting
618 * for the thread-pool shutdown signal).
620 * Other non-idle worker threads are allowed to finish
621 * responding to their current clients.
623 ipc_server_stop_async(server_data
);
628 trace2_thread_exit();
632 /* A randomly chosen value. */
633 #define MY_ACCEPT_POLL_TIMEOUT_MS (60 * 1000)
636 * Accept a new client connection on our socket. This uses non-blocking
637 * IO so that we can also wait for shutdown requests on our socket-pair
638 * without actually spinning on a fast timeout.
640 static int accept_thread__wait_for_connection(
641 struct ipc_accept_thread_data
*accept_thread_data
)
643 struct pollfd pollfd
[2];
647 pollfd
[0].fd
= accept_thread_data
->fd_wait_shutdown
;
648 pollfd
[0].events
= POLLIN
;
650 pollfd
[1].fd
= accept_thread_data
->server_socket
->fd_socket
;
651 pollfd
[1].events
= POLLIN
;
653 result
= poll(pollfd
, 2, MY_ACCEPT_POLL_TIMEOUT_MS
);
664 * If someone deletes or force-creates a new unix
665 * domain socket at our path, all future clients
666 * will be routed elsewhere and we silently starve.
667 * If that happens, just queue a shutdown.
669 if (unix_ss_was_stolen(
670 accept_thread_data
->server_socket
)) {
671 trace2_data_string("ipc-accept", NULL
,
674 ipc_server_stop_async(
675 accept_thread_data
->server_data
);
680 if (pollfd
[0].revents
& POLLIN
) {
681 /* shutdown message queued to socketpair */
685 if (pollfd
[1].revents
& POLLIN
) {
686 /* a connection is available on server_socket */
689 accept(accept_thread_data
->server_socket
->fd_socket
,
695 * An error here is unlikely -- it probably
696 * indicates that the connecting process has
697 * already dropped the connection.
702 BUG("unandled poll result errno=%d r[0]=%d r[1]=%d",
703 errno
, pollfd
[0].revents
, pollfd
[1].revents
);
708 * Thread proc for the IPC server "accept thread". This waits for
709 * an incoming socket connection, appends it to the queue of available
710 * connections, and notifies a worker thread to process it.
712 * Block SIGPIPE in this thread for the life of the thread. This
713 * avoids any stray SIGPIPE signals when closing pipe fds under
714 * extremely heavy loads (such as when the fifo queue is full and we
715 * drop incomming connections).
717 static void *accept_thread_proc(void *_accept_thread_data
)
719 struct ipc_accept_thread_data
*accept_thread_data
= _accept_thread_data
;
720 struct ipc_server_data
*server_data
= accept_thread_data
->server_data
;
723 trace2_thread_start("ipc-accept");
725 thread_block_sigpipe(&old_set
);
728 int client_fd
= accept_thread__wait_for_connection(
731 pthread_mutex_lock(&server_data
->work_available_mutex
);
732 if (server_data
->shutdown_requested
) {
733 pthread_mutex_unlock(&server_data
->work_available_mutex
);
740 /* ignore transient accept() errors */
743 fifo_enqueue(server_data
, client_fd
);
744 pthread_cond_broadcast(&server_data
->work_available_cond
);
746 pthread_mutex_unlock(&server_data
->work_available_mutex
);
749 trace2_thread_exit();
754 * We can't predict the connection arrival rate relative to the worker
755 * processing rate, therefore we allow the "accept-thread" to queue up
756 * a generous number of connections, since we'd rather have the client
757 * not unnecessarily timeout if we can avoid it. (The assumption is
758 * that this will be used for FSMonitor and a few second wait on a
759 * connection is better than having the client timeout and do the full
760 * computation itself.)
762 * The FIFO queue size is set to a multiple of the worker pool size.
763 * This value chosen at random.
765 #define FIFO_SCALE (100)
768 * The backlog value for `listen(2)`. This doesn't need to huge,
769 * rather just large enough for our "accept-thread" to wake up and
770 * queue incoming connections onto the FIFO without the kernel
773 * This value chosen at random.
775 #define LISTEN_BACKLOG (50)
777 static int create_listener_socket(
779 const struct ipc_server_opts
*ipc_opts
,
780 struct unix_ss_socket
**new_server_socket
)
782 struct unix_ss_socket
*server_socket
= NULL
;
783 struct unix_stream_listen_opts uslg_opts
= UNIX_STREAM_LISTEN_OPTS_INIT
;
786 uslg_opts
.listen_backlog_size
= LISTEN_BACKLOG
;
787 uslg_opts
.disallow_chdir
= ipc_opts
->uds_disallow_chdir
;
789 ret
= unix_ss_create(path
, &uslg_opts
, -1, &server_socket
);
793 if (set_socket_blocking_flag(server_socket
->fd_socket
, 1)) {
794 int saved_errno
= errno
;
795 unix_ss_free(server_socket
);
800 *new_server_socket
= server_socket
;
802 trace2_data_string("ipc-server", NULL
, "listen-with-lock", path
);
806 static int setup_listener_socket(
808 const struct ipc_server_opts
*ipc_opts
,
809 struct unix_ss_socket
**new_server_socket
)
811 int ret
, saved_errno
;
813 trace2_region_enter("ipc-server", "create-listener_socket", NULL
);
815 ret
= create_listener_socket(path
, ipc_opts
, new_server_socket
);
818 trace2_region_leave("ipc-server", "create-listener_socket", NULL
);
825 * Start IPC server in a pool of background threads.
827 int ipc_server_run_async(struct ipc_server_data
**returned_server_data
,
828 const char *path
, const struct ipc_server_opts
*opts
,
829 ipc_server_application_cb
*application_cb
,
830 void *application_data
)
832 struct unix_ss_socket
*server_socket
= NULL
;
833 struct ipc_server_data
*server_data
;
837 int nr_threads
= opts
->nr_threads
;
839 *returned_server_data
= NULL
;
842 * Create a socketpair and set sv[1] to non-blocking. This
843 * will used to send a shutdown message to the accept-thread
844 * and allows the accept-thread to wait on EITHER a client
845 * connection or a shutdown request without spinning.
847 if (socketpair(AF_UNIX
, SOCK_STREAM
, 0, sv
) < 0)
850 if (set_socket_blocking_flag(sv
[1], 1)) {
851 int saved_errno
= errno
;
858 ret
= setup_listener_socket(path
, opts
, &server_socket
);
860 int saved_errno
= errno
;
867 server_data
= xcalloc(1, sizeof(*server_data
));
868 server_data
->magic
= MAGIC_SERVER_DATA
;
869 server_data
->application_cb
= application_cb
;
870 server_data
->application_data
= application_data
;
871 strbuf_init(&server_data
->buf_path
, 0);
872 strbuf_addstr(&server_data
->buf_path
, path
);
877 pthread_mutex_init(&server_data
->work_available_mutex
, NULL
);
878 pthread_cond_init(&server_data
->work_available_cond
, NULL
);
880 server_data
->queue_size
= nr_threads
* FIFO_SCALE
;
881 CALLOC_ARRAY(server_data
->fifo_fds
, server_data
->queue_size
);
883 server_data
->accept_thread
=
884 xcalloc(1, sizeof(*server_data
->accept_thread
));
885 server_data
->accept_thread
->magic
= MAGIC_ACCEPT_THREAD_DATA
;
886 server_data
->accept_thread
->server_data
= server_data
;
887 server_data
->accept_thread
->server_socket
= server_socket
;
888 server_data
->accept_thread
->fd_send_shutdown
= sv
[0];
889 server_data
->accept_thread
->fd_wait_shutdown
= sv
[1];
891 if (pthread_create(&server_data
->accept_thread
->pthread_id
, NULL
,
892 accept_thread_proc
, server_data
->accept_thread
))
893 die_errno(_("could not start accept_thread '%s'"), path
);
895 for (k
= 0; k
< nr_threads
; k
++) {
896 struct ipc_worker_thread_data
*wtd
;
898 wtd
= xcalloc(1, sizeof(*wtd
));
899 wtd
->magic
= MAGIC_WORKER_THREAD_DATA
;
900 wtd
->server_data
= server_data
;
902 if (pthread_create(&wtd
->pthread_id
, NULL
, worker_thread_proc
,
905 die(_("could not start worker[0] for '%s'"),
908 * Limp along with the thread pool that we have.
913 wtd
->next_thread
= server_data
->worker_thread_list
;
914 server_data
->worker_thread_list
= wtd
;
917 *returned_server_data
= server_data
;
922 * Gently tell the IPC server treads to shutdown.
923 * Can be run on any thread.
925 int ipc_server_stop_async(struct ipc_server_data
*server_data
)
927 /* ASSERT NOT holding mutex */
934 trace2_region_enter("ipc-server", "server-stop-async", NULL
);
936 pthread_mutex_lock(&server_data
->work_available_mutex
);
938 server_data
->shutdown_requested
= 1;
941 * Write a byte to the shutdown socket pair to wake up the
944 if (write(server_data
->accept_thread
->fd_send_shutdown
, "Q", 1) < 0)
945 error_errno("could not write to fd_send_shutdown");
948 * Drain the queue of existing connections.
950 while ((fd
= fifo_dequeue(server_data
)) != -1)
954 * Gently tell worker threads to stop processing new connections
955 * and exit. (This does not abort in-process conversations.)
957 pthread_cond_broadcast(&server_data
->work_available_cond
);
959 pthread_mutex_unlock(&server_data
->work_available_mutex
);
961 trace2_region_leave("ipc-server", "server-stop-async", NULL
);
967 * Wait for all IPC server threads to stop.
969 int ipc_server_await(struct ipc_server_data
*server_data
)
971 pthread_join(server_data
->accept_thread
->pthread_id
, NULL
);
973 if (!server_data
->shutdown_requested
)
974 BUG("ipc-server: accept-thread stopped for '%s'",
975 server_data
->buf_path
.buf
);
977 while (server_data
->worker_thread_list
) {
978 struct ipc_worker_thread_data
*wtd
=
979 server_data
->worker_thread_list
;
981 pthread_join(wtd
->pthread_id
, NULL
);
983 server_data
->worker_thread_list
= wtd
->next_thread
;
987 server_data
->is_stopped
= 1;
992 void ipc_server_free(struct ipc_server_data
*server_data
)
994 struct ipc_accept_thread_data
* accept_thread_data
;
999 if (!server_data
->is_stopped
)
1000 BUG("cannot free ipc-server while running for '%s'",
1001 server_data
->buf_path
.buf
);
1003 accept_thread_data
= server_data
->accept_thread
;
1004 if (accept_thread_data
) {
1005 unix_ss_free(accept_thread_data
->server_socket
);
1007 if (accept_thread_data
->fd_send_shutdown
!= -1)
1008 close(accept_thread_data
->fd_send_shutdown
);
1009 if (accept_thread_data
->fd_wait_shutdown
!= -1)
1010 close(accept_thread_data
->fd_wait_shutdown
);
1012 free(server_data
->accept_thread
);
1015 while (server_data
->worker_thread_list
) {
1016 struct ipc_worker_thread_data
*wtd
=
1017 server_data
->worker_thread_list
;
1019 server_data
->worker_thread_list
= wtd
->next_thread
;
1023 pthread_cond_destroy(&server_data
->work_available_cond
);
1024 pthread_mutex_destroy(&server_data
->work_available_mutex
);
1026 strbuf_release(&server_data
->buf_path
);
1028 free(server_data
->fifo_fds
);