*** empty log message ***
[arla.git] / rx / rx_user.c
blob80a8034ea751aa0bf77df0ffb6c5e48ce64db185
1 /*
2 ****************************************************************************
3 * Copyright IBM Corporation 1988, 1989 - All Rights Reserved *
4 * *
5 * Permission to use, copy, modify, and distribute this software and its *
6 * documentation for any purpose and without fee is hereby granted, *
7 * provided that the above copyright notice appear in all copies and *
8 * that both that copyright notice and this permission notice appear in *
9 * supporting documentation, and that the name of IBM not be used in *
10 * advertising or publicity pertaining to distribution of the software *
11 * without specific, written prior permission. *
12 * *
13 * IBM DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL *
14 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL IBM *
15 * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY *
16 * DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER *
17 * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING *
18 * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *
19 ****************************************************************************
23 * rx_user.c contains routines specific to the user space UNIX
24 * implementation of rx
27 #include "rx_locl.h"
29 RCSID("$Id$");
31 #ifndef IPPORT_USERRESERVED
33 * If in.h doesn't define this, define it anyway. Unfortunately, defining
34 * this doesn't put the code into the kernel to restrict kernel assigned
35 * port numbers to numbers below IPPORT_USERRESERVED...
37 #define IPPORT_USERRESERVED 5000
38 #endif
40 static osi_socket *rx_sockets = NULL;
41 static int num_rx_sockets = 0;
43 static fd_set rx_selectMask;
44 static int rx_maxSocketNumber = -1; /* Maximum socket number represented
45 * in the select mask */
46 static PROCESS rx_listenerPid; /* LWP process id of socket listener
47 * process */
48 void rxi_Listener(void);
51 * This routine will get called by the event package whenever a new,
52 * earlier than others, event is posted. If the Listener process
53 * is blocked in selects, this will unblock it. It also can be called
54 * to force a new trip through the rxi_Listener select loop when the set
55 * of file descriptors it should be listening to changes...
57 void
58 rxi_ReScheduleEvents(void)
60 if (rx_listenerPid)
61 IOMGR_Cancel(rx_listenerPid);
64 void
65 rxi_StartListener(void)
67 /* Initialize LWP & IOMGR in case no one else has */
68 PROCESS junk;
70 LWP_InitializeProcessSupport(LWP_NORMAL_PRIORITY, &junk);
71 IOMGR_Initialize();
73 /* Priority of listener should be high, so it can keep conns alive */
74 #define RX_LIST_STACK 24000
75 LWP_CreateProcess(rxi_Listener, RX_LIST_STACK, LWP_MAX_PRIORITY, 0,
76 "rx_Listener", &rx_listenerPid);
80 * Called by rx_StartServer to start up lwp's to service calls.
81 * NExistingProcs gives the number of procs already existing, and which
82 * therefore needn't be created.
84 void
85 rxi_StartServerProcs(int nExistingProcs)
87 struct rx_service *service;
88 int i;
89 int maxdiff = 0;
90 int nProcs = 0;
91 PROCESS scratchPid;
94 * For each service, reserve N processes, where N is the "minimum" number
95 * of processes that MUST be able to execute a request in parallel, at
96 * any time, for that process. Also compute the maximum difference
97 * between any service's maximum number of processes that can run (i.e.
98 * the maximum number that ever will be run, and a guarantee that this
99 * number will run if other services aren't running), and its minimum
100 * number. The result is the extra number of processes that we need in
101 * order to provide the latter guarantee
103 for (i = 0; i < RX_MAX_SERVICES; i++) {
104 int diff;
106 service = rx_services[i];
107 if (service == (struct rx_service *) 0)
108 break;
109 nProcs += service->minProcs;
110 diff = service->maxProcs - service->minProcs;
111 if (diff > maxdiff)
112 maxdiff = diff;
114 nProcs += maxdiff; /* Extra processes needed to allow max
115 * number requested to run in any
116 * given service, under good
117 * conditions */
118 nProcs -= nExistingProcs; /* Subtract the number of procs that
119 * were previously created for use as
120 * server procs */
121 for (i = 0; i < nProcs; i++) {
122 LWP_CreateProcess(rx_ServerProc, rx_stackSize, RX_PROCESS_PRIORITY, 0,
123 "rx_ServerProc", &scratchPid);
128 * Add the socket to the listing queue and add it to the select mask.
131 static int
132 addSocket(const char *name, osi_socket socketFd)
134 osi_socket *sockets;
136 if (socketFd >= FD_SETSIZE) {
137 osi_Msg(("%s: socket fd too large\n", name));
138 return OSI_NULLSOCKET;
141 sockets = realloc(rx_sockets, (num_rx_sockets + 1) * sizeof(*rx_sockets));
142 if (sockets == NULL) {
143 perror("socket");
144 osi_Msg(("%sunable to allocated memory for socket\n", name));
145 return 1;
147 rx_sockets = sockets;
149 rx_sockets[num_rx_sockets] = socketFd;
150 num_rx_sockets++;
152 if (rx_maxSocketNumber < 0)
153 FD_ZERO(&rx_selectMask);
155 FD_SET(socketFd, &rx_selectMask);
156 if (socketFd > rx_maxSocketNumber)
157 rx_maxSocketNumber = socketFd;
159 return 0;
164 * Make a socket for receiving/sending IP packets. Set it into non-blocking
165 * and large buffering modes. If port isn't specified, the kernel will pick
166 * one. Returns the socket (>= 0) on success. Returns OSI_NULLSOCKET on
167 * failure.
169 * Port must be in network byte order.
172 osi_socket
173 rxi_GetUDPSocket(uint16_t port, uint16_t *retport)
175 int code;
176 osi_socket socketFd = OSI_NULLSOCKET;
177 struct sockaddr_in taddr;
178 char *name = "rxi_GetUDPSocket: ";
179 socklen_t sa_size;
181 socketFd = socket(AF_INET, SOCK_DGRAM, 0);
182 if (socketFd < 0) {
183 perror("socket");
184 osi_Msg(("%sunable to create UDP socket\n", name));
185 return OSI_NULLSOCKET;
188 if (socketFd >= FD_SETSIZE) {
189 osi_Msg(("socket fd too large\n"));
190 close(socketFd);
191 return OSI_NULLSOCKET;
194 memset (&taddr, 0, sizeof(taddr));
195 taddr.sin_family = AF_INET;
196 taddr.sin_port = port;
198 code = bind(socketFd, (struct sockaddr *) &taddr, sizeof(taddr));
199 if (code < 0) {
200 perror("bind");
201 osi_Msg(("%sunable to bind UDP socket\n", name));
202 goto error;
205 sa_size = sizeof(taddr);
206 code = getsockname(socketFd, (struct sockaddr *) &taddr, &sa_size);
207 if (code < 0) {
208 perror("getsockname");
209 osi_Msg(("%sunable to bind UDP socket\n", name));
210 goto error;
212 if (retport)
213 *retport = taddr.sin_port;
216 * Use one of three different ways of getting a socket buffer expanded to
217 * a reasonable size
220 int len1, len2;
222 len1 = len2 = 32766;
224 rx_stats.socketGreedy =
225 (setsockopt(socketFd, SOL_SOCKET, SO_SNDBUF,
226 &len1, sizeof(len1)) >= 0) &&
227 (setsockopt(socketFd, SOL_SOCKET, SO_RCVBUF,
228 &len2, sizeof(len2)) >= 0);
231 if (!rx_stats.socketGreedy)
232 osi_Msg(("%s*WARNING* Unable to increase buffering on socket\n",name));
235 * Put it into non-blocking mode so that rx_Listener can do a polling
236 * read before entering select
238 if (fcntl(socketFd, F_SETFL, FNDELAY) == -1) {
239 perror("fcntl");
240 osi_Msg(("%sunable to set non-blocking mode on socket\n", name));
241 goto error;
244 if (addSocket(name, socketFd))
245 goto error;
247 return socketFd;
249 error:
250 close(socketFd);
252 return OSI_NULLSOCKET;
256 * Open icmp socket
259 osi_socket
260 rxi_GetICMPSocket(void)
262 int socketFd = -1;
263 char *name = "rxi_GetICMPSocket: ";
265 socketFd = socket(AF_INET, SOCK_RAW, IPPROTO_ICMP);
266 if (socketFd < 0) {
267 if (getuid() == 0) {
268 perror("socket");
269 osi_Msg(("%sunable to create ICMP socket\n", name));
271 goto error;
275 * Put it into non-blocking mode so that rx_Listener can do a polling
276 * read before entering select
278 if (fcntl(socketFd, F_SETFL, FNDELAY) == -1) {
279 perror("fcntl");
280 osi_Msg(("%sunable to set non-blocking mode on socket\n", name));
281 goto error;
283 if (addSocket(name, socketFd))
284 goto error;
286 return socketFd;
288 error:
289 if (socketFd >= 0)
290 close(socketFd);
291 return OSI_NULLSOCKET;
295 * The main loop which listens to the net for datagrams, and handles timeouts
296 * and retransmissions, etc. It also is responsible for scheduling the
297 * execution of pending events (in conjunction with event.c).
299 * Note interaction of nextPollTime and lastPollWorked. The idea is
300 * that if rx is not keeping up with the incoming stream of packets
301 * (because there are threads that are interfering with its running
302 * sufficiently often), rx does a polling select using IOMGR_Select
303 * (setting tv_sec = tv_usec = 0). Old code is a system select, but
304 * this was bad since we didn't know what calling conversion the
305 * system select() was using (on win32 hosts it was PASCAL, and you
306 * lost your $sp)
308 * So, our algorithm is that if the last poll on the file descriptor found
309 * useful data, or we're at the time nextPollTime (which is advanced so that
310 * it occurs every 3 or 4 seconds),
311 * then we try the polling select. If we eventually
312 * catch up (which we can tell by the polling select returning no input
313 * packets ready), then we don't do a polling select again until several
314 * seconds later (via nextPollTime mechanism).
317 #ifndef FD_COPY
318 #define FD_COPY(f, t) memcpy((t), (f), sizeof(*(f)))
319 #endif
321 void
322 rxi_Listener(void)
324 uint32_t host;
325 uint16_t port;
326 struct rx_packet *p = NULL;
327 fd_set rfds;
328 int i, fds;
329 struct clock cv;
330 long nextPollTime; /* time to next poll FD before
331 * sleeping */
332 int lastPollWorked, doingPoll; /* true iff last poll was useful */
333 struct timeval tv, *tvp;
335 clock_NewTime();
336 lastPollWorked = 0;
337 nextPollTime = 0;
338 for (;;) {
341 * Grab a new packet only if necessary (otherwise re-use the old one)
343 if (p == NULL) {
344 if ((p = rxi_AllocPacket(RX_PACKET_CLASS_RECEIVE)) == NULL)
345 osi_Panic("rxi_Listener: no packets!"); /* Shouldn't happen */
347 /* Wait for the next event time or a packet to arrive. */
349 * event_RaiseEvents schedules any events whose time has come and
350 * then atomically computes the time to the next event, guaranteeing
351 * that this is positive. If there is no next event, it returns 0
353 if (!rxevent_RaiseEvents(&cv))
354 tvp = NULL;
355 else {
358 * It's important to copy cv to tv, because the 4.3 documentation
359 * for select threatens that *tv may be updated after a select,
360 * in future editions of the system, to indicate how much of the
361 * time period has elapsed. So we shouldn't rely on tv not being
362 * altered.
364 tv.tv_sec = cv.sec; /* Time to next event */
365 tv.tv_usec = cv.usec;
366 tvp = &tv;
368 rx_stats.selects++;
369 FD_COPY(&rx_selectMask, &rfds);
370 if (lastPollWorked || nextPollTime < clock_Sec()) {
371 /* we're catching up, or haven't tried to for a few seconds */
372 doingPoll = 1;
373 nextPollTime = clock_Sec() + 4; /* try again in 4 seconds no
374 * matter what */
375 tv.tv_sec = tv.tv_usec = 0;/* make sure we poll */
376 tvp = &tv;
377 } else {
378 doingPoll = 0;
380 lastPollWorked = 0; /* default is that it didn't find
381 * anything */
383 fds = IOMGR_Select (rx_maxSocketNumber + 1, &rfds, 0, 0, tvp);
384 clock_NewTime();
385 if (fds > 0) {
386 if (doingPoll)
387 lastPollWorked = 1;
389 for (i = 0; i < num_rx_sockets; i++) {
390 if (p == NULL)
391 break;
392 else if (FD_ISSET(rx_sockets[i], &rfds)) {
393 if (rx_sockets[i] == rx_socket_icmp) {
394 rxi_ReadIcmp(rx_sockets[i]);
395 } else if (rxi_ReadPacket(rx_sockets[i],p, &host, &port)) {
396 p = rxi_ReceivePacket(p, rx_sockets[i], host, port);
402 /* NOTREACHED */
406 void
407 osi_Panic(const char *fmt, ...)
409 va_list ap;
410 va_start(ap, fmt);
411 fprintf(stderr, "Fatal Rx error: ");
412 vfprintf(stderr, fmt, ap);
413 va_end(ap);
414 fflush(stderr);
415 fflush(stdout);
416 abort();
419 void
420 osi_vMsg(const char *fmt, ...)
422 va_list ap;
423 va_start(ap, fmt);
424 vfprintf(stderr, fmt, ap);
425 va_end(ap);
426 fflush(stderr);
430 #define ADDRSPERSITE 256
432 #ifdef ADAPT_MTU
434 static u_long myNetAddrs[ADDRSPERSITE];
435 static int myNetMTUs[ADDRSPERSITE];
436 static int myNetFlags[ADDRSPERSITE];
437 static int numMyNetAddrs;
439 static void
440 GetIFInfo(void)
442 int s;
443 int len, res;
444 struct ifconf ifc;
445 struct ifreq ifs[ADDRSPERSITE];
446 struct sockaddr_in *a;
447 char *p;
448 struct ifreq ifreq;
449 size_t sz = 0;
451 numMyNetAddrs = 0;
452 memset(myNetAddrs, 0, sizeof(myNetAddrs));
453 memset(myNetMTUs, 0, sizeof(myNetMTUs));
454 memset(myNetFlags, 0, sizeof(myNetFlags));
456 s = socket(AF_INET, SOCK_DGRAM, 0);
457 if (s < 0)
458 return;
460 ifc.ifc_len = sizeof(ifs);
461 ifc.ifc_buf = (caddr_t) & ifs[0];
462 memset(&ifs[0], 0, sizeof(ifs));
464 res = ioctl(s, SIOCGIFCONF, &ifc);
465 if (res < 0) {
466 close(s);
467 return;
469 len = ifc.ifc_len / sizeof(struct ifreq);
470 if (len > ADDRSPERSITE)
471 len = ADDRSPERSITE;
473 ifreq.ifr_name[0] = '\0';
474 for (p = ifc.ifc_buf; p < ifc.ifc_buf + ifc.ifc_len; p += sz) {
475 struct ifreq *ifr = (struct ifreq *)p;
477 sz = sizeof(*ifr);
478 #ifdef SOCKADDR_HAS_SA_LEN
479 sz = max(sz, sizeof(ifr->ifr_name) + ifr->ifr_addr.sa_len);
480 #endif
481 if (strncmp (ifreq.ifr_name,
482 ifr->ifr_name,
483 sizeof(ifr->ifr_name))) {
484 res = ioctl(s, SIOCGIFFLAGS, ifr);
485 if (res < 0)
486 continue;
487 if (!(ifr->ifr_flags & IFF_UP))
488 continue;
489 if (ifr->ifr_flags & IFF_LOOPBACK)
490 continue;
491 myNetFlags[numMyNetAddrs] = ifr->ifr_flags;
493 res = ioctl(s, SIOCGIFADDR, ifr);
494 if (res < 0)
495 continue;
496 a = (struct sockaddr_in *)&ifr->ifr_addr;
497 if (a->sin_family != AF_INET)
498 continue;
499 myNetAddrs[numMyNetAddrs] = ntohl(a->sin_addr.s_addr);
501 res = -1;
502 #ifdef SIOCGIFMTU
503 res = ioctl(s, SIOCGIFMTU, ifr);
504 #elif SIOCRIFMTU
505 res = ioctl(s, SIOCRIFMTU, ifr);
506 #else
507 res = -1;
508 #endif
509 if (res == 0) {
510 myNetMTUs[numMyNetAddrs] = ifr->ifr_metric;
511 if (rx_maxReceiveSize < (myNetMTUs[numMyNetAddrs]
512 - RX_IPUDP_SIZE))
513 rx_maxReceiveSize = MIN(RX_MAX_PACKET_SIZE,
514 (myNetMTUs[numMyNetAddrs]
515 - RX_IPUDP_SIZE));
517 if (rx_MyMaxSendSize < myNetMTUs[numMyNetAddrs]
518 - RX_IPUDP_SIZE)
519 rx_MyMaxSendSize = myNetMTUs[numMyNetAddrs]
520 - RX_IPUDP_SIZE;
522 } else {
523 myNetMTUs[numMyNetAddrs] = OLD_MAX_PACKET_SIZE;
524 res = 0;
526 ++numMyNetAddrs;
527 ifreq = *ifr;
532 #if 0
533 RETSIGTYPE (*old)(int);
535 old = signal(SIGSYS, SIG_IGN);
536 if (syscall(31 /* AFS_SYSCALL */ , 28 /* AFSCALL_CALL */ ,
537 20 /* AFSOP_GETMTU */ , myNetAddrs[numMyNetAddrs],
538 &(myNetMTUs[numMyNetAddrs])));
539 myNetMTUs[numMyNetAddrs] = OLD_MAX_PACKET_SIZE;
540 signal(SIGSYS, old);
541 #endif
544 close(s);
547 * have to allocate at least enough to allow a single packet to reach its
548 * maximum size, so ReadPacket will work. Allocate enough for a couple
549 * of packets to do so, for good measure
551 /* MTUXXX before shipping, change this 8 to a 4 */
553 int npackets, ncbufs;
555 ncbufs = (rx_maxReceiveSize - RX_FIRSTBUFFERSIZE);
556 if (ncbufs > 0) {
557 ncbufs = ncbufs / RX_CBUFFERSIZE;
558 npackets = (rx_Window / 8);
559 npackets = (npackets > 2 ? npackets : 2);
560 rxi_MoreCbufs(npackets * (ncbufs + 1));
565 #endif /* ADAPT_MTU */
568 * Called from rxi_FindPeer, when initializing a clear rx_peer structure,
569 * to get interesting information.
572 void
573 rxi_InitPeerParams(struct rx_peer * pp)
575 uint32_t ppaddr, msk, net;
576 int rxmtu;
577 int ix, nlix = 0, nlcount;
578 static int Inited = 0;
580 #ifdef ADAPT_MTU
582 if (!Inited) {
583 GetIFInfo();
584 Inited = 1;
587 * try to second-guess IP, and identify which link is most likely to
588 * be used for traffic to/from this host.
590 ppaddr = ntohl(pp->host);
591 if (IN_CLASSA(ppaddr))
592 msk = IN_CLASSA_NET;
593 else if (IN_CLASSB(ppaddr))
594 msk = IN_CLASSB_NET;
595 else if (IN_CLASSC(ppaddr))
596 msk = IN_CLASSC_NET;
597 else
598 msk = 0;
599 net = ppaddr & msk;
601 for (nlcount = 0, ix = 0; ix < numMyNetAddrs; ++ix) {
602 #ifdef IFF_LOOPBACK
603 if (!(myNetFlags[ix] & IFF_LOOPBACK)) {
604 nlix = ix;
605 ++nlcount;
607 #endif /* IFF_LOOPBACK */
608 if ((myNetAddrs[ix] & msk) == net)
609 break;
612 pp->rateFlag = 2; /* start timing after two full packets */
614 * I don't initialize these, because I presume they are bzero'd...
615 * pp->burstSize pp->burst pp->burstWait.sec pp->burstWait.usec
616 * pp->timeout.usec
619 pp->maxWindow = rx_Window;
620 if (ix >= numMyNetAddrs) { /* not local */
621 pp->timeout.sec = 3;
622 pp->packetSize = RX_REMOTE_PACKET_SIZE;
623 } else {
624 pp->timeout.sec = 2;
625 pp->packetSize = MIN(RX_MAX_PACKET_SIZE,
626 (rx_MyMaxSendSize + RX_HEADER_SIZE));
629 /* Now, maybe get routing interface and override parameters. */
630 if (ix >= numMyNetAddrs && nlcount == 1)
631 ix = nlix;
633 if (ix < numMyNetAddrs) {
634 #ifdef IFF_POINTOPOINT
635 if (myNetFlags[ix] & IFF_POINTOPOINT) {
636 /* wish we knew the bit rate and the chunk size, sigh. */
637 pp->maxWindow = 10;
638 pp->timeout.sec = 4;
639 /* pp->timeout.usec = 0; */
640 pp->packetSize = RX_PP_PACKET_SIZE;
642 #endif /* IFF_POINTOPOINT */
645 * Reduce the packet size to one based on the MTU given by the
646 * interface.
648 if (myNetMTUs[ix] > (RX_IPUDP_SIZE + RX_HEADER_SIZE)) {
649 rxmtu = myNetMTUs[ix] - RX_IPUDP_SIZE;
650 if (rxmtu < pp->packetSize)
651 pp->packetSize = rxmtu;
654 #else /* ADAPT_MTU */
655 pp->rateFlag = 2; /* start timing after two full packets */
656 pp->maxWindow = rx_Window;
657 pp->timeout.sec = 2;
658 pp->packetSize = OLD_MAX_PACKET_SIZE;
659 #endif /* ADAPT_MTU */