2 * Copyright 2000, International Business Machines Corporation and others.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
11 * An implementation of the rx socket listener for pthreads (not using select).
12 * This assumes that multiple read system calls may be extant at any given
13 * time. Also implements the pthread-specific event handling for rx.
15 * rx_pthread.c is used for the thread safe RX package.
18 #include <afsconfig.h>
19 #include <afs/param.h>
26 #ifdef AFS_PTHREAD_ENV
29 #include "rx_globals.h"
30 #include "rx_pthread.h"
32 #include "rx_atomic.h"
33 #include "rx_internal.h"
34 #include "rx_pthread.h"
36 #include "rx_xmit_nt.h"
39 static void rxi_SetThreadNum(int threadID
);
41 /* Set rx_pthread_event_rescheduled if event_handler should just try
42 * again instead of sleeping.
44 * Protected by event_handler_mutex
46 static int rx_pthread_event_rescheduled
= 0;
48 static void *rx_ListenerProc(void *);
51 * We supply an event handling thread for Rx's event processing.
52 * The condition variable is used to wakeup the thread whenever a new
53 * event is scheduled earlier than the previous earliest event.
54 * This thread is also responsible for keeping time.
56 static pthread_t event_handler_thread
;
57 afs_kcondvar_t rx_event_handler_cond
;
58 afs_kmutex_t event_handler_mutex
;
59 afs_kcondvar_t rx_listener_cond
;
60 afs_kmutex_t listener_mutex
;
61 static int listeners_started
= 0;
62 afs_kmutex_t rx_clock_mutex
;
63 struct clock rxi_clockNow
;
65 static rx_atomic_t threadHiNum
;
68 rx_NewThreadId(void) {
69 return rx_atomic_inc_and_read(&threadHiNum
);
73 * Delay the current thread the specified number of seconds.
82 * Called from rx_Init()
85 rxi_InitializeThreadSupport(void)
87 /* listeners_started must only be reset if
88 * the listener thread terminates */
89 /* listeners_started = 0; */
90 clock_GetTime(&rxi_clockNow
);
94 server_entry(void *argp
)
96 void (*server_proc
) (void *) = (void (*)(void *))argp
;
98 dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
99 return (void *) -1; /* reused as return value, see pthread(3) */
103 * Start an Rx server process.
106 rxi_StartServerProc(void *(*proc
) (void *), int stacksize
)
109 pthread_attr_t tattr
;
112 if (pthread_attr_init(&tattr
) != 0) {
113 osi_Panic("Unable to Create Rx server thread (pthread_attr_init)\n");
116 if (pthread_attr_setdetachstate(&tattr
, PTHREAD_CREATE_DETACHED
) != 0) {
117 osi_Panic("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
121 * NOTE: We are ignoring the stack size parameter, for now.
124 if (pthread_create(&thread
, &tattr
, server_entry
, (void *)proc
) != 0) {
125 osi_Panic("Unable to Create Rx server thread\n");
127 AFS_SIGSET_RESTORE();
131 * The event handling process.
134 event_handler(void *argp
)
136 unsigned long rx_pthread_n_event_expired
= 0;
137 unsigned long rx_pthread_n_event_waits
= 0;
138 long rx_pthread_n_event_woken
= 0;
139 unsigned long rx_pthread_n_event_error
= 0;
140 struct timespec rx_pthread_next_event_time
= { 0, 0 };
143 MUTEX_ENTER(&event_handler_mutex
);
149 MUTEX_EXIT(&event_handler_mutex
);
151 next
.sec
= 30; /* Time to sleep if there are no events scheduled */
154 rxevent_RaiseEvents(&next
);
156 MUTEX_ENTER(&event_handler_mutex
);
157 if (rx_pthread_event_rescheduled
) {
158 rx_pthread_event_rescheduled
= 0;
162 clock_Add(&cv
, &next
);
163 rx_pthread_next_event_time
.tv_sec
= cv
.sec
;
164 rx_pthread_next_event_time
.tv_nsec
= cv
.usec
* 1000;
165 rx_pthread_n_event_waits
++;
166 error
= CV_TIMEDWAIT(&rx_event_handler_cond
, &event_handler_mutex
, &rx_pthread_next_event_time
);
168 rx_pthread_n_event_woken
++;
171 else if (error
== ETIMEDOUT
) {
172 rx_pthread_n_event_expired
++;
174 rx_pthread_n_event_error
++;
177 else if (errno
== ETIMEDOUT
) {
178 rx_pthread_n_event_expired
++;
180 rx_pthread_n_event_error
++;
183 rx_pthread_event_rescheduled
= 0;
190 * This routine will get called by the event package whenever a new,
191 * earlier than others, event is posted. */
193 rxi_ReScheduleEvents(void)
195 MUTEX_ENTER(&event_handler_mutex
);
196 CV_SIGNAL(&rx_event_handler_cond
);
197 rx_pthread_event_rescheduled
= 1;
198 MUTEX_EXIT(&event_handler_mutex
);
202 /* Loop to listen on a socket. Return setting *newcallp if this
203 * thread should become a server thread. */
205 rxi_ListenerProc(osi_socket sock
, int *tnop
, struct rx_call
**newcallp
)
209 struct rx_packet
*p
= (struct rx_packet
*)0;
211 MUTEX_ENTER(&listener_mutex
);
212 while (!listeners_started
) {
213 CV_WAIT(&rx_listener_cond
, &listener_mutex
);
215 MUTEX_EXIT(&listener_mutex
);
218 /* See if a check for additional packets was issued */
222 * Grab a new packet only if necessary (otherwise re-use the old one)
225 rxi_RestoreDataBufs(p
);
227 if (!(p
= rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE
))) {
228 /* Could this happen with multiple socket listeners? */
229 osi_Panic("rxi_Listener: no packets!"); /* Shouldn't happen */
233 if (rxi_ReadPacket(sock
, p
, &host
, &port
)) {
235 p
= rxi_ReceivePacket(p
, sock
, host
, port
, tnop
, newcallp
);
236 if (newcallp
&& *newcallp
) {
246 /* This is the listener process request loop. The listener process loop
247 * becomes a server thread when rxi_ListenerProc returns, and stays
248 * server thread until rxi_ServerProc returns. */
250 rx_ListenerProc(void *argp
)
253 osi_socket sock
= (osi_socket
)(intptr_t)argp
;
254 struct rx_call
*newcall
;
259 rxi_ListenerProc(sock
, &threadID
, &newcall
);
260 /* osi_Assert(threadID != -1); */
261 /* osi_Assert(newcall != NULL); */
262 sock
= OSI_NULLSOCKET
;
263 rxi_SetThreadNum(threadID
);
264 rxi_ServerProc(threadID
, newcall
, &sock
);
265 /* osi_Assert(sock != OSI_NULLSOCKET); */
271 /* This is the server process request loop. The server process loop
272 * becomes a listener thread when rxi_ServerProc returns, and stays
273 * listener thread until rxi_ListenerProc returns. */
275 rx_ServerProc(void * dummy
)
279 struct rx_call
*newcall
= NULL
;
281 rxi_MorePackets(rx_maxReceiveWindow
+ 2); /* alloc more packets */
282 MUTEX_ENTER(&rx_quota_mutex
);
283 rxi_dataQuota
+= rx_initSendWindow
; /* Reserve some pkts for hard times */
284 /* threadID is used for making decisions in GetCall. Get it by bumping
285 * number of threads handling incoming calls */
286 /* Unique thread ID: used for scheduling purposes *and* as index into
287 * the host hold table (fileserver).
288 * The previously used rxi_availProcs is unsuitable as it
289 * will already go up and down as packets arrive while the server
290 * threads are still initialising! The recently introduced
291 * rxi_pthread_hinum does not necessarily lead to a server
292 * thread with id 0, which is not allowed to hop through the
293 * incoming call queue.
294 * So either introduce yet another counter or flag the FCFS
295 * thread... chose the latter.
297 MUTEX_ENTER(&rx_pthread_mutex
);
298 threadID
= rx_NewThreadId();
299 if (rxi_fcfs_thread_num
== 0 && rxi_fcfs_thread_num
!= threadID
)
300 rxi_fcfs_thread_num
= threadID
;
301 MUTEX_EXIT(&rx_pthread_mutex
);
303 MUTEX_EXIT(&rx_quota_mutex
);
306 sock
= OSI_NULLSOCKET
;
307 rxi_SetThreadNum(threadID
);
308 rxi_ServerProc(threadID
, newcall
, &sock
);
309 /* osi_Assert(sock != OSI_NULLSOCKET); */
311 rxi_ListenerProc(sock
, &threadID
, &newcall
);
312 /* osi_Assert(threadID != -1); */
313 /* osi_Assert(newcall != NULL); */
320 * Historically used to start the listener process. We now have multiple
321 * listener processes (one for each socket); these are started by GetUdpSocket.
323 * The event handling process *is* started here (the old listener used
324 * to also handle events). The listener threads can't actually start
325 * listening until rxi_StartListener is called because most of R may not
326 * be initialized when rxi_Listen is called.
329 rxi_StartListener(void)
331 pthread_attr_t tattr
;
334 if (listeners_started
)
337 if (pthread_attr_init(&tattr
) != 0) {
338 osi_Panic("Unable to create Rx event handling thread (pthread_attr_init)\n");
341 if (pthread_attr_setdetachstate(&tattr
, PTHREAD_CREATE_DETACHED
) != 0) {
342 osi_Panic("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
346 if (pthread_create(&event_handler_thread
, &tattr
, event_handler
, NULL
) !=
348 osi_Panic("Unable to create Rx event handling thread\n");
351 AFS_SIGSET_RESTORE();
353 MUTEX_ENTER(&listener_mutex
);
354 CV_BROADCAST(&rx_listener_cond
);
355 listeners_started
= 1;
356 MUTEX_EXIT(&listener_mutex
);
361 * Listen on the specified socket.
364 rxi_Listen(osi_socket sock
)
367 pthread_attr_t tattr
;
370 if (pthread_attr_init(&tattr
) != 0) {
371 osi_Panic("Unable to create socket listener thread (pthread_attr_init)\n");
374 if (pthread_attr_setdetachstate(&tattr
, PTHREAD_CREATE_DETACHED
) != 0) {
375 osi_Panic("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
379 if (pthread_create(&thread
, &tattr
, rx_ListenerProc
, (void *)(intptr_t)sock
) != 0) {
380 osi_Panic("Unable to create socket listener thread\n");
383 AFS_SIGSET_RESTORE();
393 rxi_Recvmsg(osi_socket socket
, struct msghdr
*msg_p
, int flags
)
396 ret
= recvmsg(socket
, msg_p
, flags
);
398 #ifdef AFS_RXERRQ_ENV
400 while (rxi_HandleSocketError(socket
) > 0)
412 rxi_Sendmsg(osi_socket socket
, struct msghdr
*msg_p
, int flags
)
415 ret
= sendmsg(socket
, msg_p
, flags
);
417 #ifdef AFS_RXERRQ_ENV
419 while (rxi_HandleSocketError(socket
) > 0)
424 # ifdef AFS_LINUX22_ENV
425 /* linux unfortunately returns ECONNREFUSED if the target port
426 * is no longer in use */
427 /* and EAGAIN if a UDP checksum is incorrect */
428 if (ret
== -1 && errno
!= ECONNREFUSED
&& errno
!= EAGAIN
) {
432 dpf(("rxi_sendmsg failed, error %d\n", errno
));
434 # ifndef AFS_NT40_ENV
438 if (WSAGetLastError() > 0)
439 return -WSAGetLastError();
443 #endif /* !AFS_RXERRQ_ENV */
447 struct rx_ts_info_t
* rx_ts_info_init(void) {
448 struct rx_ts_info_t
* rx_ts_info
;
449 rx_ts_info
= calloc(1, sizeof(rx_ts_info_t
));
450 osi_Assert(rx_ts_info
!= NULL
&& pthread_setspecific(rx_ts_info_key
, rx_ts_info
) == 0);
451 #ifdef RX_ENABLE_TSFPQ
452 opr_queue_Init(&rx_ts_info
->_FPQ
.queue
);
454 MUTEX_ENTER(&rx_packets_mutex
);
456 RX_TS_FPQ_COMPUTE_LIMITS
;
457 MUTEX_EXIT(&rx_packets_mutex
);
458 #endif /* RX_ENABLE_TSFPQ */
463 rx_GetThreadNum(void) {
464 return (intptr_t)pthread_getspecific(rx_thread_id_key
);
468 rxi_SetThreadNum(int threadID
) {
469 osi_Assert(pthread_setspecific(rx_thread_id_key
,
470 (void *)(intptr_t)threadID
) == 0);
474 rx_SetThreadNum(void) {
477 threadId
= rx_NewThreadId();
478 rxi_SetThreadNum(threadId
);