*** empty log message ***
[arla.git] / rx / rx_rdwr.c
blob4301b2dd006e304260f0316f0cc7ec3c14488de3
1 /*
2 ****************************************************************************
3 * Copyright IBM Corporation 1988, 1989 - All Rights Reserved *
4 * *
5 * Permission to use, copy, modify, and distribute this software and its *
6 * documentation for any purpose and without fee is hereby granted, *
7 * provided that the above copyright notice appear in all copies and *
8 * that both that copyright notice and this permission notice appear in *
9 * supporting documentation, and that the name of IBM not be used in *
10 * advertising or publicity pertaining to distribution of the software *
11 * without specific, written prior permission. *
12 * *
13 * IBM DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL *
14 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL IBM *
15 * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY *
16 * DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER *
17 * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING *
18 * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *
19 ****************************************************************************
22 #include "rx_locl.h"
24 RCSID("$Id$");
26 ssize_t
27 rx_Read(struct rx_call *call, void *vbuf, size_t nbytes)
29 struct rx_packet *rp;
30 size_t requestCount;
31 char *buf = (char *)vbuf;
33 SPLVAR;
34 /* XXXX took out clock_NewTime from here. Was it needed? */
35 requestCount = nbytes;
37 NETPRI;
38 GLOBAL_LOCK();
39 RX_MUTEX_ENTER(&call->lock);
41 while (nbytes) {
42 if (call->nLeft == 0) {
43 /* Get next packet */
44 for (;;) {
45 if (call->error || (call->mode != RX_MODE_RECEIVING)) {
46 if (call->error) {
47 RX_MUTEX_EXIT(&call->lock);
48 GLOBAL_UNLOCK();
49 USERPRI;
50 return 0;
52 if (call->mode == RX_MODE_SENDING) {
53 rx_FlushWrite(call);
54 continue;
57 if (queue_IsNotEmpty(&call->rq)) {
58 /* Check that next packet available is next in sequence */
59 rp = queue_First(&call->rq, rx_packet);
60 if (rp->header.seq == call->rnext) {
61 long error;
62 struct rx_connection *conn = call->conn;
64 queue_Remove(rp);
67 * RXS_CheckPacket called to undo RXS_PreparePacket's
68 * work. It may reduce the length of the packet by
69 * up to conn->maxTrailerSize, to reflect the length
70 * of the data + the header.
72 if ((error = RXS_CheckPacket(conn->securityObject,
73 call, rp)) != 0) {
76 * Used to merely shut down the call, but now we
77 * shut down the whole connection since this may
78 * indicate an attempt to hijack it
80 rxi_ConnectionError(conn, error);
81 rp = rxi_SendConnectionAbort(conn, rp);
82 rxi_FreePacket(rp);
84 RX_MUTEX_EXIT(&call->lock);
85 GLOBAL_UNLOCK();
86 USERPRI;
87 return 0;
89 call->rnext++;
90 call->currentPacket = rp;
91 call->curvec = 1; /* 0th vec is always header */
94 * begin at the beginning [ more or less ], continue
95 * on until the end, then stop.
97 call->curpos = call->conn->securityHeaderSize;
100 * Notice that this code works correctly if the data
101 * size is 0 (which it may be--no reply arguments
102 * from server, for example). This relies heavily on
103 * the fact that the code below immediately frees the
104 * packet (no yields, etc.). If it didn't, this
105 * would be a problem because a value of zero for
106 * call->nLeft normally means that there is no read
107 * packet
109 call->nLeft = rp->length;
110 if (rp->header.flags & RX_LAST_PACKET)
111 call->flags |= RX_CALL_RECEIVE_DONE;
114 * now, if we haven't send a hard ack for window/2
115 * packets we spontaneously generate one, to take
116 * care of the case where (most likely in the kernel)
117 * we receive a window-full of packets, and ack all
118 * of them before any are read by the user, thus not
119 * hard-acking any of them. The sender retransmits
120 * in this case only under a timer, which is a real
121 * loser
125 int ack_window;
127 #ifdef ADAPT_WINDOW
128 ack_window = call->conn->peer->maxWindow >> 1;
129 #else /* !ADAPT_WINDOW */
130 ack_window = rx_Window >> 1;
131 #endif/* ADAPT_WINDOW */
133 if (call->rnext > (call->lastAcked + ack_window))
134 rxi_SendAck(call, 0, 0, 0, 0, RX_ACK_DELAY);
136 break;
140 MTUXXX doesn't there need to be an "else" here ???
142 /* Are there ever going to be any more packets? */
143 if (call->flags & RX_CALL_RECEIVE_DONE) {
144 RX_MUTEX_EXIT(&call->lock);
145 GLOBAL_UNLOCK();
146 USERPRI;
147 return requestCount - nbytes;
149 /* Wait for in-sequence packet */
150 call->flags |= RX_CALL_READER_WAIT;
151 clock_NewTime();
152 call->startWait = clock_Sec();
154 RX_MUTEX_EXIT(&call->lock);
155 RX_MUTEX_ENTER(&call->lockq);
156 #ifdef RX_ENABLE_LOCKS
157 while (call->flags & RX_CALL_READER_WAIT)
158 cv_wait(&call->cv_rq, &call->lockq);
159 #else
160 osi_rxSleep(&call->rq);
161 #endif
162 RX_MUTEX_EXIT(&call->lockq);
163 RX_MUTEX_ENTER(&call->lock);
165 call->startWait = 0;
167 } else { /* assert(call->currentPacket); */
169 * MTUXXX this should be replaced by
170 * some error-recovery code before
171 * shipping
174 * It's possible for call->nLeft to be smaller than
175 * any particular iov_len. Usually, recvmsg doesn't change the
176 * iov_len, since it reflects the size of the buffer. We have to
177 * keep track of the number of bytes read in the length field of
178 * the packet struct. On the final portion of a received packet,
179 * it's almost certain that call->nLeft will be smaller than the
180 * final buffer.
182 size_t t, l1;
183 caddr_t p1;
185 while (nbytes && call->currentPacket) {
186 p1 = (char*)call->currentPacket->wirevec[call->curvec].iov_base +
187 call->curpos;
188 l1 = call->currentPacket->wirevec[call->curvec].iov_len -
189 call->curpos;
191 t = MIN(l1, nbytes);
192 t = MIN(t, call->nLeft);
193 memcpy(buf, p1, t);
194 p1 += t;
195 buf += t;
196 l1 -= t;
197 nbytes -= t;
198 call->curpos += t;
199 call->nLeft -= t;
201 if (call->nLeft == 0) {
202 /* out of packet. Get another one. */
203 rxi_FreePacket(call->currentPacket);
204 call->currentPacket = NULL;
205 } else if (l1 == 0) {
206 /* need to get another struct iov */
207 if (++call->curvec > call->currentPacket->niovecs) {
209 * current packet is exhausted, get ready for another
213 * don't worry about curvec and stuff, they get set
214 * somewhere else
216 rxi_FreePacket(call->currentPacket);
217 call->currentPacket = NULL;
218 call->nLeft = 0;
219 } else
220 call->curpos = 0;
223 if (nbytes == 0) {
224 /* user buffer is full, return */
225 RX_MUTEX_EXIT(&call->lock);
226 GLOBAL_UNLOCK();
227 USERPRI;
228 return requestCount;
232 } /* while (nbytes) ... */
234 RX_MUTEX_EXIT(&call->lock);
235 GLOBAL_UNLOCK();
236 USERPRI;
237 return requestCount;
240 ssize_t
241 rx_Write(struct rx_call *call, const void *vbuf, size_t nbytes)
243 struct rx_connection *conn = call->conn;
244 size_t requestCount = nbytes;
245 const char *buf = (const char *)vbuf;
247 SPLVAR;
249 GLOBAL_LOCK();
250 RX_MUTEX_ENTER(&call->lock);
251 NETPRI;
252 if (call->mode != RX_MODE_SENDING) {
253 if ((conn->type == RX_SERVER_CONNECTION)
254 && (call->mode == RX_MODE_RECEIVING)) {
255 call->mode = RX_MODE_SENDING;
256 if (call->currentPacket) {
257 rxi_FreePacket(call->currentPacket);
258 call->currentPacket = NULL;
259 call->nLeft = 0;
260 call->nFree = 0;
262 } else {
263 RX_MUTEX_EXIT(&call->lock);
264 GLOBAL_UNLOCK();
265 USERPRI;
266 return 0;
271 * Loop condition is checked at end, so that a write of 0 bytes will
272 * force a packet to be created--specially for the case where there are 0
273 * bytes on the stream, but we must send a packet anyway.
275 do {
276 if (call->nFree == 0) {
277 struct rx_packet *cp = call->currentPacket;
279 if (!call->error && call->currentPacket) {
280 clock_NewTime(); /* Bogus: need new time package */
283 * The 0, below, specifies that it is not the last packet:
284 * there will be others. PrepareSendPacket may alter the
285 * packet length by up to conn->securityMaxTrailerSize
287 rxi_PrepareSendPacket(call, cp, 0);
288 queue_Append(&call->tq, cp);
289 rxi_Start(0, call);
291 /* Wait for transmit window to open up */
292 while (!call->error &&
293 call->tnext + 1 > call->tfirst + call->twind) {
294 clock_NewTime();
295 call->startWait = clock_Sec();
297 RX_MUTEX_EXIT(&call->lock);
298 RX_MUTEX_ENTER(&call->lockw);
300 #ifdef RX_ENABLE_LOCKS
301 cv_wait(&call->cv_twind, &call->lockw);
302 #else
303 call->flags |= RX_CALL_WAIT_WINDOW_ALLOC;
304 osi_rxSleep(&call->twind);
305 #endif
306 RX_MUTEX_EXIT(&call->lockw);
307 RX_MUTEX_ENTER(&call->lock);
309 call->startWait = 0;
311 if ((call->currentPacket = rxi_AllocSendPacket(call,
312 nbytes)) != 0) {
313 call->nFree = call->currentPacket->length;
314 call->curvec = 1; /* 0th vec is always header */
317 * begin at the beginning [ more or less ], continue on until
318 * the end, then stop.
320 call->curpos = call->conn->securityHeaderSize;
322 if (call->error) {
323 if (call->currentPacket) {
324 rxi_FreePacket(call->currentPacket);
325 call->currentPacket = NULL;
327 RX_MUTEX_EXIT(&call->lock);
328 GLOBAL_UNLOCK();
329 USERPRI;
330 return 0;
335 * If the remaining bytes fit in the buffer, then store them and
336 * return. Don't ship a buffer that's full immediately to the
337 * peer--we don't know if it's the last buffer yet
340 if (!(call->currentPacket)) {
341 call->nFree = 0;
344 struct rx_packet *cp = call->currentPacket;
345 size_t t, l1;
346 caddr_t p1;
348 while (nbytes && call->nFree) {
349 p1 = (char *)cp->wirevec[call->curvec].iov_base + call->curpos;
350 l1 = cp->wirevec[call->curvec].iov_len - call->curpos;
352 t = MIN(call->nFree, MIN(l1, nbytes));
353 memcpy(p1, buf, t);
354 p1 += t;
355 buf += t;
356 l1 -= t;
357 nbytes -= t;
358 call->curpos += t;
359 call->nFree -= t;
361 if (!l1) {
362 call->curpos = 0;
363 /* need to get another struct iov */
364 if (++call->curvec >= cp->niovecs) {
365 /* current packet is full, extend or send it */
366 call->nFree = 0;
369 if (!call->nFree) {
370 size_t len, mud;
372 len = cp->length;
373 mud = rx_MaxUserDataSize(conn);
374 if (mud > len) {
375 size_t want;
377 if (nbytes)
378 want = MIN(nbytes, mud - len);
379 else
380 want = mud - len;
381 rxi_AllocDataBuf(cp, want);
382 if (cp->length > mud)
383 cp->length = mud;
384 call->nFree += (cp->length - len);
387 } /*
388 * while bytes to send and room to
389 * send them
391 /* might be out of space now */
392 if (!nbytes) {
393 RX_MUTEX_EXIT(&call->lock);
394 GLOBAL_UNLOCK();
395 USERPRI;
396 return requestCount;
397 } else; /*
398 * more data to send, so get another
399 * packet and keep going
402 } while (nbytes);
404 RX_MUTEX_EXIT(&call->lock);
405 GLOBAL_UNLOCK();
406 USERPRI;
407 return requestCount - nbytes;
411 * Flush any buffered data to the stream, switch to read mode
412 * (clients) or to EOF mode (servers)
414 void
415 rx_FlushWrite(struct rx_call *call)
417 SPLVAR;
418 NETPRI;
419 if (call->mode == RX_MODE_SENDING) {
420 struct rx_packet *cp;
422 call->mode = (call->conn->type == RX_CLIENT_CONNECTION ?
423 RX_MODE_RECEIVING : RX_MODE_EOF);
425 if (call->currentPacket) {
427 /* cp->length is only supposed to be the user's data */
429 cp = call->currentPacket;
432 * cp->length was already set to (then-current) MaxUserDataSize
433 * or less.
435 cp->length -= call->nFree;
436 call->currentPacket = (struct rx_packet *) 0;
437 call->nFree = 0;
440 } else {
441 cp = rxi_AllocSendPacket(call, 0);
442 if (!cp) {
443 /* Mode can no longer be MODE_SENDING */
444 USERPRI;
445 return;
447 cp->length = 0;
448 cp->niovecs = 2; /* just the header + sec header */
449 call->nFree = 0;
452 /* The 1 specifies that this is the last packet */
453 rxi_PrepareSendPacket(call, cp, 1);
454 queue_Append(&call->tq, cp);
455 rxi_Start(0, call);
457 USERPRI;