Merge branch 'master' into experimental
[pkg-k5-afs_openafs.git] / src / rx / rx_pthread.c
blobb4272bbce6f0c78125173b8feab431372d3c0cb6
1 /*
2 * Copyright 2000, International Business Machines Corporation and others.
3 * All Rights Reserved.
5 * This software has been released under the terms of the IBM Public
6 * License. For details, see the LICENSE file in the top-level source
7 * directory or online at http://www.openafs.org/dl/license10.html
8 */
11 * An implementation of the rx socket listener for pthreads (not using select).
12 * This assumes that multiple read system calls may be extant at any given
13 * time. Also implements the pthread-specific event handling for rx.
15 * rx_pthread.c is used for the thread safe RX package.
18 #include <afsconfig.h>
19 #include <afs/param.h>
21 #include <roken.h>
22 #include <afs/opr.h>
24 #include <assert.h>
26 #ifdef AFS_PTHREAD_ENV
28 #include "rx.h"
29 #include "rx_globals.h"
30 #include "rx_pthread.h"
31 #include "rx_clock.h"
32 #include "rx_atomic.h"
33 #include "rx_internal.h"
34 #include "rx_pthread.h"
35 #ifdef AFS_NT40_ENV
36 #include "rx_xmit_nt.h"
37 #endif
39 static void rxi_SetThreadNum(int threadID);
41 /* Set rx_pthread_event_rescheduled if event_handler should just try
42 * again instead of sleeping.
44 * Protected by event_handler_mutex
46 static int rx_pthread_event_rescheduled = 0;
48 static void *rx_ListenerProc(void *);
51 * We supply an event handling thread for Rx's event processing.
52 * The condition variable is used to wakeup the thread whenever a new
53 * event is scheduled earlier than the previous earliest event.
54 * This thread is also responsible for keeping time.
56 static pthread_t event_handler_thread;
57 afs_kcondvar_t rx_event_handler_cond;
58 afs_kmutex_t event_handler_mutex;
59 afs_kcondvar_t rx_listener_cond;
60 afs_kmutex_t listener_mutex;
61 static int listeners_started = 0;
62 afs_kmutex_t rx_clock_mutex;
63 struct clock rxi_clockNow;
65 static rx_atomic_t threadHiNum;
67 int
68 rx_NewThreadId(void) {
69 return rx_atomic_inc_and_read(&threadHiNum);
73 * Delay the current thread the specified number of seconds.
75 void
76 rxi_Delay(int sec)
78 sleep(sec);
82 * Called from rx_Init()
84 void
85 rxi_InitializeThreadSupport(void)
87 /* listeners_started must only be reset if
88 * the listener thread terminates */
89 /* listeners_started = 0; */
90 clock_GetTime(&rxi_clockNow);
93 static void *
94 server_entry(void *argp)
96 void (*server_proc) (void *) = (void (*)(void *))argp;
97 server_proc(NULL);
98 dpf(("rx_pthread.c: server_entry: Server proc returned unexpectedly\n"));
99 return (void *) -1; /* reused as return value, see pthread(3) */
103 * Start an Rx server process.
105 void
106 rxi_StartServerProc(void *(*proc) (void *), int stacksize)
108 pthread_t thread;
109 pthread_attr_t tattr;
110 AFS_SIGSET_DECL;
112 if (pthread_attr_init(&tattr) != 0) {
113 osi_Panic("Unable to Create Rx server thread (pthread_attr_init)\n");
116 if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
117 osi_Panic("Unable to Create Rx server thread (pthread_attr_setdetachstate)\n");
121 * NOTE: We are ignoring the stack size parameter, for now.
123 AFS_SIGSET_CLEAR();
124 if (pthread_create(&thread, &tattr, server_entry, (void *)proc) != 0) {
125 osi_Panic("Unable to Create Rx server thread\n");
127 AFS_SIGSET_RESTORE();
131 * The event handling process.
133 static void *
134 event_handler(void *argp)
136 unsigned long rx_pthread_n_event_expired = 0;
137 unsigned long rx_pthread_n_event_waits = 0;
138 long rx_pthread_n_event_woken = 0;
139 unsigned long rx_pthread_n_event_error = 0;
140 struct timespec rx_pthread_next_event_time = { 0, 0 };
141 int error;
143 MUTEX_ENTER(&event_handler_mutex);
145 for (;;) {
146 struct clock cv;
147 struct clock next;
149 MUTEX_EXIT(&event_handler_mutex);
151 next.sec = 30; /* Time to sleep if there are no events scheduled */
152 next.usec = 0;
153 clock_GetTime(&cv);
154 rxevent_RaiseEvents(&next);
156 MUTEX_ENTER(&event_handler_mutex);
157 if (rx_pthread_event_rescheduled) {
158 rx_pthread_event_rescheduled = 0;
159 continue;
162 clock_Add(&cv, &next);
163 rx_pthread_next_event_time.tv_sec = cv.sec;
164 rx_pthread_next_event_time.tv_nsec = cv.usec * 1000;
165 rx_pthread_n_event_waits++;
166 error = CV_TIMEDWAIT(&rx_event_handler_cond, &event_handler_mutex, &rx_pthread_next_event_time);
167 if (error == 0) {
168 rx_pthread_n_event_woken++;
170 #ifdef AFS_NT40_ENV
171 else if (error == ETIMEDOUT) {
172 rx_pthread_n_event_expired++;
173 } else {
174 rx_pthread_n_event_error++;
176 #else
177 else if (errno == ETIMEDOUT) {
178 rx_pthread_n_event_expired++;
179 } else {
180 rx_pthread_n_event_error++;
182 #endif
183 rx_pthread_event_rescheduled = 0;
185 return NULL;
190 * This routine will get called by the event package whenever a new,
191 * earlier than others, event is posted. */
192 void
193 rxi_ReScheduleEvents(void)
195 MUTEX_ENTER(&event_handler_mutex);
196 CV_SIGNAL(&rx_event_handler_cond);
197 rx_pthread_event_rescheduled = 1;
198 MUTEX_EXIT(&event_handler_mutex);
202 /* Loop to listen on a socket. Return setting *newcallp if this
203 * thread should become a server thread. */
204 static void
205 rxi_ListenerProc(osi_socket sock, int *tnop, struct rx_call **newcallp)
207 unsigned int host;
208 u_short port;
209 struct rx_packet *p = (struct rx_packet *)0;
211 MUTEX_ENTER(&listener_mutex);
212 while (!listeners_started) {
213 CV_WAIT(&rx_listener_cond, &listener_mutex);
215 MUTEX_EXIT(&listener_mutex);
217 for (;;) {
218 /* See if a check for additional packets was issued */
219 rx_CheckPackets();
222 * Grab a new packet only if necessary (otherwise re-use the old one)
224 if (p) {
225 rxi_RestoreDataBufs(p);
226 } else {
227 if (!(p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE))) {
228 /* Could this happen with multiple socket listeners? */
229 osi_Panic("rxi_Listener: no packets!"); /* Shouldn't happen */
233 if (rxi_ReadPacket(sock, p, &host, &port)) {
234 clock_NewTime();
235 p = rxi_ReceivePacket(p, sock, host, port, tnop, newcallp);
236 if (newcallp && *newcallp) {
237 if (p)
238 rxi_FreePacket(p);
239 return;
243 /* NOTREACHED */
246 /* This is the listener process request loop. The listener process loop
247 * becomes a server thread when rxi_ListenerProc returns, and stays
248 * server thread until rxi_ServerProc returns. */
249 static void *
250 rx_ListenerProc(void *argp)
252 int threadID;
253 osi_socket sock = (osi_socket)(intptr_t)argp;
254 struct rx_call *newcall;
256 while (1) {
257 newcall = NULL;
258 threadID = -1;
259 rxi_ListenerProc(sock, &threadID, &newcall);
260 /* osi_Assert(threadID != -1); */
261 /* osi_Assert(newcall != NULL); */
262 sock = OSI_NULLSOCKET;
263 rxi_SetThreadNum(threadID);
264 rxi_ServerProc(threadID, newcall, &sock);
265 /* osi_Assert(sock != OSI_NULLSOCKET); */
267 /* not reached */
268 return NULL;
271 /* This is the server process request loop. The server process loop
272 * becomes a listener thread when rxi_ServerProc returns, and stays
273 * listener thread until rxi_ListenerProc returns. */
274 void *
275 rx_ServerProc(void * dummy)
277 osi_socket sock;
278 int threadID;
279 struct rx_call *newcall = NULL;
281 rxi_MorePackets(rx_maxReceiveWindow + 2); /* alloc more packets */
282 MUTEX_ENTER(&rx_quota_mutex);
283 rxi_dataQuota += rx_initSendWindow; /* Reserve some pkts for hard times */
284 /* threadID is used for making decisions in GetCall. Get it by bumping
285 * number of threads handling incoming calls */
286 /* Unique thread ID: used for scheduling purposes *and* as index into
287 * the host hold table (fileserver).
288 * The previously used rxi_availProcs is unsuitable as it
289 * will already go up and down as packets arrive while the server
290 * threads are still initialising! The recently introduced
291 * rxi_pthread_hinum does not necessarily lead to a server
292 * thread with id 0, which is not allowed to hop through the
293 * incoming call queue.
294 * So either introduce yet another counter or flag the FCFS
295 * thread... chose the latter.
297 MUTEX_ENTER(&rx_pthread_mutex);
298 threadID = rx_NewThreadId();
299 if (rxi_fcfs_thread_num == 0 && rxi_fcfs_thread_num != threadID)
300 rxi_fcfs_thread_num = threadID;
301 MUTEX_EXIT(&rx_pthread_mutex);
302 ++rxi_availProcs;
303 MUTEX_EXIT(&rx_quota_mutex);
305 while (1) {
306 sock = OSI_NULLSOCKET;
307 rxi_SetThreadNum(threadID);
308 rxi_ServerProc(threadID, newcall, &sock);
309 /* osi_Assert(sock != OSI_NULLSOCKET); */
310 newcall = NULL;
311 rxi_ListenerProc(sock, &threadID, &newcall);
312 /* osi_Assert(threadID != -1); */
313 /* osi_Assert(newcall != NULL); */
315 /* not reached */
316 return NULL;
320 * Historically used to start the listener process. We now have multiple
321 * listener processes (one for each socket); these are started by GetUdpSocket.
323 * The event handling process *is* started here (the old listener used
324 * to also handle events). The listener threads can't actually start
325 * listening until rxi_StartListener is called because most of R may not
326 * be initialized when rxi_Listen is called.
328 void
329 rxi_StartListener(void)
331 pthread_attr_t tattr;
332 AFS_SIGSET_DECL;
334 if (listeners_started)
335 return;
337 if (pthread_attr_init(&tattr) != 0) {
338 osi_Panic("Unable to create Rx event handling thread (pthread_attr_init)\n");
341 if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
342 osi_Panic("Unable to create Rx event handling thread (pthread_attr_setdetachstate)\n");
345 AFS_SIGSET_CLEAR();
346 if (pthread_create(&event_handler_thread, &tattr, event_handler, NULL) !=
347 0) {
348 osi_Panic("Unable to create Rx event handling thread\n");
350 rx_NewThreadId();
351 AFS_SIGSET_RESTORE();
353 MUTEX_ENTER(&listener_mutex);
354 CV_BROADCAST(&rx_listener_cond);
355 listeners_started = 1;
356 MUTEX_EXIT(&listener_mutex);
361 * Listen on the specified socket.
364 rxi_Listen(osi_socket sock)
366 pthread_t thread;
367 pthread_attr_t tattr;
368 AFS_SIGSET_DECL;
370 if (pthread_attr_init(&tattr) != 0) {
371 osi_Panic("Unable to create socket listener thread (pthread_attr_init)\n");
374 if (pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED) != 0) {
375 osi_Panic("Unable to create socket listener thread (pthread_attr_setdetachstate)\n");
378 AFS_SIGSET_CLEAR();
379 if (pthread_create(&thread, &tattr, rx_ListenerProc, (void *)(intptr_t)sock) != 0) {
380 osi_Panic("Unable to create socket listener thread\n");
382 rx_NewThreadId();
383 AFS_SIGSET_RESTORE();
384 return 0;
389 * Recvmsg.
393 rxi_Recvmsg(osi_socket socket, struct msghdr *msg_p, int flags)
395 int ret;
396 ret = recvmsg(socket, msg_p, flags);
398 #ifdef AFS_RXERRQ_ENV
399 if (ret < 0) {
400 while (rxi_HandleSocketError(socket) > 0)
403 #endif
405 return ret;
409 * Sendmsg.
412 rxi_Sendmsg(osi_socket socket, struct msghdr *msg_p, int flags)
414 int ret;
415 ret = sendmsg(socket, msg_p, flags);
417 #ifdef AFS_RXERRQ_ENV
418 if (ret < 0) {
419 while (rxi_HandleSocketError(socket) > 0)
421 return ret;
423 #else
424 # ifdef AFS_LINUX22_ENV
425 /* linux unfortunately returns ECONNREFUSED if the target port
426 * is no longer in use */
427 /* and EAGAIN if a UDP checksum is incorrect */
428 if (ret == -1 && errno != ECONNREFUSED && errno != EAGAIN) {
429 # else
430 if (ret == -1) {
431 # endif
432 dpf(("rxi_sendmsg failed, error %d\n", errno));
433 fflush(stdout);
434 # ifndef AFS_NT40_ENV
435 if (errno > 0)
436 return -errno;
437 # else
438 if (WSAGetLastError() > 0)
439 return -WSAGetLastError();
440 # endif
441 return -1;
443 #endif /* !AFS_RXERRQ_ENV */
444 return 0;
447 struct rx_ts_info_t * rx_ts_info_init(void) {
448 struct rx_ts_info_t * rx_ts_info;
449 rx_ts_info = calloc(1, sizeof(rx_ts_info_t));
450 osi_Assert(rx_ts_info != NULL && pthread_setspecific(rx_ts_info_key, rx_ts_info) == 0);
451 #ifdef RX_ENABLE_TSFPQ
452 opr_queue_Init(&rx_ts_info->_FPQ.queue);
454 MUTEX_ENTER(&rx_packets_mutex);
455 rx_TSFPQMaxProcs++;
456 RX_TS_FPQ_COMPUTE_LIMITS;
457 MUTEX_EXIT(&rx_packets_mutex);
458 #endif /* RX_ENABLE_TSFPQ */
459 return rx_ts_info;
463 rx_GetThreadNum(void) {
464 return (intptr_t)pthread_getspecific(rx_thread_id_key);
467 static void
468 rxi_SetThreadNum(int threadID) {
469 osi_Assert(pthread_setspecific(rx_thread_id_key,
470 (void *)(intptr_t)threadID) == 0);
474 rx_SetThreadNum(void) {
475 int threadId;
477 threadId = rx_NewThreadId();
478 rxi_SetThreadNum(threadId);
479 return threadId;
482 #endif