Merge remote-tracking branch 'origin/master'
[unleashed/lotheac.git] / usr / src / uts / common / io / bufmod.c
blobf553c58affaa277acc5016683101024d6d4ee3bc
1 /*
2 * CDDL HEADER START
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License, Version 1.0 only
6 * (the "License"). You may not use this file except in compliance
7 * with the License.
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or http://www.opensolaris.org/os/licensing.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
20 * CDDL HEADER END
23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
27 #pragma ident "%Z%%M% %I% %E% SMI"
30 * STREAMS Buffering module
32 * This streams module collects incoming messages from modules below
33 * it on the stream and buffers them up into a smaller number of
34 * aggregated messages. Its main purpose is to reduce overhead by
35 * cutting down on the number of read (or getmsg) calls its client
36 * user process makes.
37 * - only M_DATA is buffered.
38 * - multithreading assumes configured as D_MTQPAIR
39 * - packets are lost only if flag SB_NO_HEADER is clear and buffer
40 * allocation fails.
41 * - in order message transmission. This is enforced for messages other
42 * than high priority messages.
43 * - zero length messages on the read side are not passed up the
44 * stream but used internally for synchronization.
45 * FLAGS:
46 * - SB_NO_PROTO_CVT - no conversion of M_PROTO messages to M_DATA.
47 * (conversion is the default for backwards compatibility
48 * hence the negative logic).
49 * - SB_NO_HEADER - no headers in buffered data.
50 * (adding headers is the default for backwards compatibility
51 * hence the negative logic).
52 * - SB_DEFER_CHUNK - provides improved response time in question-answer
53 * applications. Buffering is not enabled until the second message
54 * is received on the read side within the sb_ticks interval.
55 * This option will often be used in combination with flag SB_SEND_ON_WRITE.
56 * - SB_SEND_ON_WRITE - a write message results in any pending buffered read
57 * data being immediately sent upstream.
58 * - SB_NO_DROPS - bufmod behaves transparently in flow control and propagates
59 * the blocked flow condition downstream. If this flag is clear (default)
60 * messages will be dropped if the upstream flow is blocked.
64 #include <sys/types.h>
65 #include <sys/errno.h>
66 #include <sys/debug.h>
67 #include <sys/stropts.h>
68 #include <sys/time.h>
69 #include <sys/stream.h>
70 #include <sys/conf.h>
71 #include <sys/ddi.h>
72 #include <sys/sunddi.h>
73 #include <sys/kmem.h>
74 #include <sys/strsun.h>
75 #include <sys/bufmod.h>
76 #include <sys/modctl.h>
77 #include <sys/isa_defs.h>
80 * Per-Stream state information.
82 * If sb_ticks is negative, we don't deliver chunks until they're
83 * full. If it's zero, we deliver every packet as it arrives. (In
84 * this case we force sb_chunk to zero, to make the implementation
85 * easier.) Otherwise, sb_ticks gives the number of ticks in a
86 * buffering interval. The interval begins when the a read side data
87 * message is received and a timeout is not active. If sb_snap is
88 * zero, no truncation of the msg is done.
90 struct sb {
91 queue_t *sb_rq; /* our rq */
92 mblk_t *sb_mp; /* partial chunk */
93 mblk_t *sb_head; /* pre-allocated space for the next header */
94 mblk_t *sb_tail; /* first mblk of last message appended */
95 uint_t sb_mlen; /* sb_mp length */
96 uint_t sb_mcount; /* input msg count in sb_mp */
97 uint_t sb_chunk; /* max chunk size */
98 clock_t sb_ticks; /* timeout interval */
99 timeout_id_t sb_timeoutid; /* qtimeout() id */
100 uint_t sb_drops; /* cumulative # discarded msgs */
101 uint_t sb_snap; /* snapshot length */
102 uint_t sb_flags; /* flags field */
103 uint_t sb_state; /* state variable */
107 * Function prototypes.
109 static int sbopen(queue_t *, dev_t *, int, int, cred_t *);
110 static int sbclose(queue_t *, int, cred_t *);
111 static void sbwput(queue_t *, mblk_t *);
112 static void sbrput(queue_t *, mblk_t *);
113 static void sbrsrv(queue_t *);
114 static void sbioctl(queue_t *, mblk_t *);
115 static void sbaddmsg(queue_t *, mblk_t *);
116 static void sbtick(void *);
117 static void sbclosechunk(struct sb *);
118 static void sbsendit(queue_t *, mblk_t *);
120 static struct module_info sb_minfo = {
121 21, /* mi_idnum */
122 "bufmod", /* mi_idname */
123 0, /* mi_minpsz */
124 INFPSZ, /* mi_maxpsz */
125 1, /* mi_hiwat */
126 0 /* mi_lowat */
129 static struct qinit sb_rinit = {
130 (int (*)())sbrput, /* qi_putp */
131 (int (*)())sbrsrv, /* qi_srvp */
132 sbopen, /* qi_qopen */
133 sbclose, /* qi_qclose */
134 NULL, /* qi_qadmin */
135 &sb_minfo, /* qi_minfo */
136 NULL /* qi_mstat */
139 static struct qinit sb_winit = {
140 (int (*)())sbwput, /* qi_putp */
141 NULL, /* qi_srvp */
142 NULL, /* qi_qopen */
143 NULL, /* qi_qclose */
144 NULL, /* qi_qadmin */
145 &sb_minfo, /* qi_minfo */
146 NULL /* qi_mstat */
149 static struct streamtab sb_info = {
150 &sb_rinit, /* st_rdinit */
151 &sb_winit, /* st_wrinit */
152 NULL, /* st_muxrinit */
153 NULL /* st_muxwinit */
158 * This is the loadable module wrapper.
161 static struct fmodsw fsw = {
162 "bufmod",
163 &sb_info,
164 D_MTQPAIR | D_MP
168 * Module linkage information for the kernel.
171 static struct modlstrmod modlstrmod = {
172 &mod_strmodops, "streams buffer mod", &fsw
175 static struct modlinkage modlinkage = {
176 MODREV_1, &modlstrmod, NULL
181 _init(void)
183 return (mod_install(&modlinkage));
187 _fini(void)
189 return (mod_remove(&modlinkage));
193 _info(struct modinfo *modinfop)
195 return (mod_info(&modlinkage, modinfop));
199 /* ARGSUSED */
200 static int
201 sbopen(queue_t *rq, dev_t *dev, int oflag, int sflag, cred_t *crp)
203 struct sb *sbp;
204 ASSERT(rq);
206 if (sflag != MODOPEN)
207 return (EINVAL);
209 if (rq->q_ptr)
210 return (0);
213 * Allocate and initialize per-Stream structure.
215 sbp = kmem_alloc(sizeof (struct sb), KM_SLEEP);
216 sbp->sb_rq = rq;
217 sbp->sb_ticks = -1;
218 sbp->sb_chunk = SB_DFLT_CHUNK;
219 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
220 sbp->sb_mlen = 0;
221 sbp->sb_mcount = 0;
222 sbp->sb_timeoutid = 0;
223 sbp->sb_drops = 0;
224 sbp->sb_snap = 0;
225 sbp->sb_flags = 0;
226 sbp->sb_state = 0;
228 rq->q_ptr = WR(rq)->q_ptr = sbp;
230 qprocson(rq);
233 return (0);
236 /* ARGSUSED1 */
237 static int
238 sbclose(queue_t *rq, int flag, cred_t *credp)
240 struct sb *sbp = (struct sb *)rq->q_ptr;
242 ASSERT(sbp);
244 qprocsoff(rq);
246 * Cancel an outstanding timeout
248 if (sbp->sb_timeoutid != 0) {
249 (void) quntimeout(rq, sbp->sb_timeoutid);
250 sbp->sb_timeoutid = 0;
253 * Free the current chunk.
255 if (sbp->sb_mp) {
256 freemsg(sbp->sb_mp);
257 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
258 sbp->sb_mlen = 0;
262 * Free the per-Stream structure.
264 kmem_free((caddr_t)sbp, sizeof (struct sb));
265 rq->q_ptr = WR(rq)->q_ptr = NULL;
267 return (0);
271 * the correction factor is introduced to compensate for
272 * whatever assumptions the modules below have made about
273 * how much traffic is flowing through the stream and the fact
274 * that bufmod may be snipping messages with the sb_snap length.
276 #define SNIT_HIWAT(msgsize, fudge) ((4 * msgsize * fudge) + 512)
277 #define SNIT_LOWAT(msgsize, fudge) ((2 * msgsize * fudge) + 256)
280 static void
281 sbioc(queue_t *wq, mblk_t *mp)
283 struct iocblk *iocp;
284 struct sb *sbp = (struct sb *)wq->q_ptr;
285 clock_t ticks;
286 mblk_t *mop;
288 iocp = (struct iocblk *)mp->b_rptr;
290 switch (iocp->ioc_cmd) {
291 case SBIOCGCHUNK:
292 case SBIOCGSNAP:
293 case SBIOCGFLAGS:
294 case SBIOCGTIME:
295 miocack(wq, mp, 0, 0);
296 return;
298 case SBIOCSTIME:
299 #ifdef _SYSCALL32_IMPL
300 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
301 struct timeval32 *t32;
303 t32 = (struct timeval32 *)mp->b_cont->b_rptr;
304 if (t32->tv_sec < 0 || t32->tv_usec < 0) {
305 miocnak(wq, mp, 0, EINVAL);
306 break;
308 ticks = TIMEVAL_TO_TICK(t32);
309 } else
310 #endif /* _SYSCALL32_IMPL */
312 struct timeval *tb;
314 tb = (struct timeval *)mp->b_cont->b_rptr;
316 if (tb->tv_sec < 0 || tb->tv_usec < 0) {
317 miocnak(wq, mp, 0, EINVAL);
318 break;
320 ticks = TIMEVAL_TO_TICK(tb);
322 sbp->sb_ticks = ticks;
323 if (ticks == 0)
324 sbp->sb_chunk = 0;
325 miocack(wq, mp, 0, 0);
326 sbclosechunk(sbp);
327 return;
329 case SBIOCSCHUNK:
331 * set up hi/lo water marks on stream head read queue.
332 * unlikely to run out of resources. Fix at later date.
334 if ((mop = allocb(sizeof (struct stroptions),
335 BPRI_MED)) != NULL) {
336 struct stroptions *sop;
337 uint_t chunk;
339 chunk = *(uint_t *)mp->b_cont->b_rptr;
340 mop->b_datap->db_type = M_SETOPTS;
341 mop->b_wptr += sizeof (struct stroptions);
342 sop = (struct stroptions *)mop->b_rptr;
343 sop->so_flags = SO_HIWAT | SO_LOWAT;
344 sop->so_hiwat = SNIT_HIWAT(chunk, 1);
345 sop->so_lowat = SNIT_LOWAT(chunk, 1);
346 qreply(wq, mop);
349 sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
350 miocack(wq, mp, 0, 0);
351 sbclosechunk(sbp);
352 return;
354 case SBIOCSFLAGS:
355 sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
356 miocack(wq, mp, 0, 0);
357 return;
359 case SBIOCSSNAP:
361 * if chunking dont worry about effects of
362 * snipping of message size on head flow control
363 * since it has a relatively small bearing on the
364 * data rate onto the streamn head.
366 if (!sbp->sb_chunk) {
368 * set up hi/lo water marks on stream head read queue.
369 * unlikely to run out of resources. Fix at later date.
371 if ((mop = allocb(sizeof (struct stroptions),
372 BPRI_MED)) != NULL) {
373 struct stroptions *sop;
374 uint_t snap;
375 int fudge;
377 snap = *(uint_t *)mp->b_cont->b_rptr;
378 mop->b_datap->db_type = M_SETOPTS;
379 mop->b_wptr += sizeof (struct stroptions);
380 sop = (struct stroptions *)mop->b_rptr;
381 sop->so_flags = SO_HIWAT | SO_LOWAT;
382 fudge = snap <= 100 ? 4 :
383 snap <= 400 ? 2 :
385 sop->so_hiwat = SNIT_HIWAT(snap, fudge);
386 sop->so_lowat = SNIT_LOWAT(snap, fudge);
387 qreply(wq, mop);
391 sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
392 miocack(wq, mp, 0, 0);
393 return;
395 default:
396 ASSERT(0);
397 return;
402 * Write-side put procedure. Its main task is to detect ioctls
403 * for manipulating the buffering state and hand them to sbioctl.
404 * Other message types are passed on through.
406 static void
407 sbwput(queue_t *wq, mblk_t *mp)
409 struct sb *sbp = (struct sb *)wq->q_ptr;
410 struct copyresp *resp;
412 if (sbp->sb_flags & SB_SEND_ON_WRITE)
413 sbclosechunk(sbp);
414 switch (mp->b_datap->db_type) {
415 case M_IOCTL:
416 sbioctl(wq, mp);
417 break;
419 case M_IOCDATA:
420 resp = (struct copyresp *)mp->b_rptr;
421 if (resp->cp_rval) {
423 * Just free message on failure.
425 freemsg(mp);
426 break;
429 switch (resp->cp_cmd) {
430 case SBIOCSTIME:
431 case SBIOCSCHUNK:
432 case SBIOCSFLAGS:
433 case SBIOCSSNAP:
434 case SBIOCGTIME:
435 case SBIOCGCHUNK:
436 case SBIOCGSNAP:
437 case SBIOCGFLAGS:
438 sbioc(wq, mp);
439 break;
441 default:
442 putnext(wq, mp);
443 break;
445 break;
447 default:
448 putnext(wq, mp);
449 break;
454 * Read-side put procedure. It's responsible for buffering up incoming
455 * messages and grouping them into aggregates according to the current
456 * buffering parameters.
458 static void
459 sbrput(queue_t *rq, mblk_t *mp)
461 struct sb *sbp = (struct sb *)rq->q_ptr;
463 ASSERT(sbp);
465 switch (mp->b_datap->db_type) {
466 case M_PROTO:
467 if (sbp->sb_flags & SB_NO_PROTO_CVT) {
468 sbclosechunk(sbp);
469 sbsendit(rq, mp);
470 break;
471 } else {
473 * Convert M_PROTO to M_DATA.
475 mp->b_datap->db_type = M_DATA;
477 /* FALLTHRU */
479 case M_DATA:
480 if ((sbp->sb_flags & SB_DEFER_CHUNK) &&
481 !(sbp->sb_state & SB_FRCVD)) {
482 sbclosechunk(sbp);
483 sbsendit(rq, mp);
484 sbp->sb_state |= SB_FRCVD;
485 } else
486 sbaddmsg(rq, mp);
488 if ((sbp->sb_ticks > 0) && !(sbp->sb_timeoutid))
489 sbp->sb_timeoutid = qtimeout(sbp->sb_rq, sbtick,
490 sbp, sbp->sb_ticks);
492 break;
494 case M_FLUSH:
495 if (*mp->b_rptr & FLUSHR) {
497 * Reset timeout, flush the chunk currently in
498 * progress, and start a new chunk.
500 if (sbp->sb_timeoutid) {
501 (void) quntimeout(sbp->sb_rq,
502 sbp->sb_timeoutid);
503 sbp->sb_timeoutid = 0;
505 if (sbp->sb_mp) {
506 freemsg(sbp->sb_mp);
507 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
508 sbp->sb_mlen = 0;
509 sbp->sb_mcount = 0;
511 flushq(rq, FLUSHALL);
513 putnext(rq, mp);
514 break;
516 case M_CTL:
518 * Zero-length M_CTL means our timeout() popped.
520 if (MBLKL(mp) == 0) {
521 freemsg(mp);
522 sbclosechunk(sbp);
523 } else {
524 sbclosechunk(sbp);
525 sbsendit(rq, mp);
527 break;
529 default:
530 if (mp->b_datap->db_type <= QPCTL) {
531 sbclosechunk(sbp);
532 sbsendit(rq, mp);
533 } else {
534 /* Note: out of band */
535 putnext(rq, mp);
537 break;
542 * read service procedure.
544 /* ARGSUSED */
545 static void
546 sbrsrv(queue_t *rq)
548 mblk_t *mp;
551 * High priority messages shouldn't get here but if
552 * one does, jam it through to avoid infinite loop.
554 while ((mp = getq(rq)) != NULL) {
555 if (!canputnext(rq) && (mp->b_datap->db_type <= QPCTL)) {
556 /* should only get here if SB_NO_SROPS */
557 (void) putbq(rq, mp);
558 return;
560 putnext(rq, mp);
565 * Handle write-side M_IOCTL messages.
567 static void
568 sbioctl(queue_t *wq, mblk_t *mp)
570 struct sb *sbp = (struct sb *)wq->q_ptr;
571 struct iocblk *iocp = (struct iocblk *)mp->b_rptr;
572 struct timeval *t;
573 clock_t ticks;
574 mblk_t *mop;
575 int transparent = iocp->ioc_count;
576 mblk_t *datamp;
577 int error;
579 switch (iocp->ioc_cmd) {
580 case SBIOCSTIME:
581 if (iocp->ioc_count == TRANSPARENT) {
582 #ifdef _SYSCALL32_IMPL
583 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
584 mcopyin(mp, NULL, sizeof (struct timeval32),
585 NULL);
586 } else
587 #endif /* _SYSCALL32_IMPL */
589 mcopyin(mp, NULL, sizeof (*t), NULL);
591 qreply(wq, mp);
592 } else {
594 * Verify argument length.
596 #ifdef _SYSCALL32_IMPL
597 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
598 struct timeval32 *t32;
600 error = miocpullup(mp,
601 sizeof (struct timeval32));
602 if (error != 0) {
603 miocnak(wq, mp, 0, error);
604 break;
606 t32 = (struct timeval32 *)mp->b_cont->b_rptr;
607 if (t32->tv_sec < 0 || t32->tv_usec < 0) {
608 miocnak(wq, mp, 0, EINVAL);
609 break;
611 ticks = TIMEVAL_TO_TICK(t32);
612 } else
613 #endif /* _SYSCALL32_IMPL */
615 error = miocpullup(mp, sizeof (struct timeval));
616 if (error != 0) {
617 miocnak(wq, mp, 0, error);
618 break;
621 t = (struct timeval *)mp->b_cont->b_rptr;
622 if (t->tv_sec < 0 || t->tv_usec < 0) {
623 miocnak(wq, mp, 0, EINVAL);
624 break;
626 ticks = TIMEVAL_TO_TICK(t);
628 sbp->sb_ticks = ticks;
629 if (ticks == 0)
630 sbp->sb_chunk = 0;
631 miocack(wq, mp, 0, 0);
632 sbclosechunk(sbp);
634 break;
636 case SBIOCGTIME: {
637 struct timeval *t;
640 * Verify argument length.
642 if (transparent != TRANSPARENT) {
643 #ifdef _SYSCALL32_IMPL
644 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
645 error = miocpullup(mp,
646 sizeof (struct timeval32));
647 if (error != 0) {
648 miocnak(wq, mp, 0, error);
649 break;
651 } else
652 #endif /* _SYSCALL32_IMPL */
653 error = miocpullup(mp, sizeof (struct timeval));
654 if (error != 0) {
655 miocnak(wq, mp, 0, error);
656 break;
661 * If infinite timeout, return range error
662 * for the ioctl.
664 if (sbp->sb_ticks < 0) {
665 miocnak(wq, mp, 0, ERANGE);
666 break;
669 #ifdef _SYSCALL32_IMPL
670 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
671 struct timeval32 *t32;
673 if (transparent == TRANSPARENT) {
674 datamp = allocb(sizeof (*t32), BPRI_MED);
675 if (datamp == NULL) {
676 miocnak(wq, mp, 0, EAGAIN);
677 break;
679 mcopyout(mp, NULL, sizeof (*t32), NULL, datamp);
682 t32 = (struct timeval32 *)mp->b_cont->b_rptr;
683 TICK_TO_TIMEVAL32(sbp->sb_ticks, t32);
685 if (transparent == TRANSPARENT)
686 qreply(wq, mp);
687 else
688 miocack(wq, mp, sizeof (*t32), 0);
689 } else
690 #endif /* _SYSCALL32_IMPL */
692 if (transparent == TRANSPARENT) {
693 datamp = allocb(sizeof (*t), BPRI_MED);
694 if (datamp == NULL) {
695 miocnak(wq, mp, 0, EAGAIN);
696 break;
698 mcopyout(mp, NULL, sizeof (*t), NULL, datamp);
701 t = (struct timeval *)mp->b_cont->b_rptr;
702 TICK_TO_TIMEVAL(sbp->sb_ticks, t);
704 if (transparent == TRANSPARENT)
705 qreply(wq, mp);
706 else
707 miocack(wq, mp, sizeof (*t), 0);
709 break;
712 case SBIOCCTIME:
713 sbp->sb_ticks = -1;
714 miocack(wq, mp, 0, 0);
715 break;
717 case SBIOCSCHUNK:
718 if (iocp->ioc_count == TRANSPARENT) {
719 mcopyin(mp, NULL, sizeof (uint_t), NULL);
720 qreply(wq, mp);
721 } else {
723 * Verify argument length.
725 error = miocpullup(mp, sizeof (uint_t));
726 if (error != 0) {
727 miocnak(wq, mp, 0, error);
728 break;
732 * set up hi/lo water marks on stream head read queue.
733 * unlikely to run out of resources. Fix at later date.
735 if ((mop = allocb(sizeof (struct stroptions),
736 BPRI_MED)) != NULL) {
737 struct stroptions *sop;
738 uint_t chunk;
740 chunk = *(uint_t *)mp->b_cont->b_rptr;
741 mop->b_datap->db_type = M_SETOPTS;
742 mop->b_wptr += sizeof (struct stroptions);
743 sop = (struct stroptions *)mop->b_rptr;
744 sop->so_flags = SO_HIWAT | SO_LOWAT;
745 sop->so_hiwat = SNIT_HIWAT(chunk, 1);
746 sop->so_lowat = SNIT_LOWAT(chunk, 1);
747 qreply(wq, mop);
750 sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
751 miocack(wq, mp, 0, 0);
752 sbclosechunk(sbp);
754 break;
756 case SBIOCGCHUNK:
758 * Verify argument length.
760 if (transparent != TRANSPARENT) {
761 error = miocpullup(mp, sizeof (uint_t));
762 if (error != 0) {
763 miocnak(wq, mp, 0, error);
764 break;
768 if (transparent == TRANSPARENT) {
769 datamp = allocb(sizeof (uint_t), BPRI_MED);
770 if (datamp == NULL) {
771 miocnak(wq, mp, 0, EAGAIN);
772 break;
774 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
777 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_chunk;
779 if (transparent == TRANSPARENT)
780 qreply(wq, mp);
781 else
782 miocack(wq, mp, sizeof (uint_t), 0);
783 break;
785 case SBIOCSSNAP:
786 if (iocp->ioc_count == TRANSPARENT) {
787 mcopyin(mp, NULL, sizeof (uint_t), NULL);
788 qreply(wq, mp);
789 } else {
791 * Verify argument length.
793 error = miocpullup(mp, sizeof (uint_t));
794 if (error != 0) {
795 miocnak(wq, mp, 0, error);
796 break;
800 * if chunking dont worry about effects of
801 * snipping of message size on head flow control
802 * since it has a relatively small bearing on the
803 * data rate onto the streamn head.
805 if (!sbp->sb_chunk) {
807 * set up hi/lo water marks on stream
808 * head read queue. unlikely to run out
809 * of resources. Fix at later date.
811 if ((mop = allocb(sizeof (struct stroptions),
812 BPRI_MED)) != NULL) {
813 struct stroptions *sop;
814 uint_t snap;
815 int fudge;
817 snap = *(uint_t *)mp->b_cont->b_rptr;
818 mop->b_datap->db_type = M_SETOPTS;
819 mop->b_wptr += sizeof (*sop);
820 sop = (struct stroptions *)mop->b_rptr;
821 sop->so_flags = SO_HIWAT | SO_LOWAT;
822 fudge = (snap <= 100) ? 4 :
823 (snap <= 400) ? 2 : 1;
824 sop->so_hiwat = SNIT_HIWAT(snap, fudge);
825 sop->so_lowat = SNIT_LOWAT(snap, fudge);
826 qreply(wq, mop);
830 sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
832 miocack(wq, mp, 0, 0);
834 break;
836 case SBIOCGSNAP:
838 * Verify argument length
840 if (transparent != TRANSPARENT) {
841 error = miocpullup(mp, sizeof (uint_t));
842 if (error != 0) {
843 miocnak(wq, mp, 0, error);
844 break;
848 if (transparent == TRANSPARENT) {
849 datamp = allocb(sizeof (uint_t), BPRI_MED);
850 if (datamp == NULL) {
851 miocnak(wq, mp, 0, EAGAIN);
852 break;
854 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
857 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_snap;
859 if (transparent == TRANSPARENT)
860 qreply(wq, mp);
861 else
862 miocack(wq, mp, sizeof (uint_t), 0);
863 break;
865 case SBIOCSFLAGS:
867 * set the flags.
869 if (iocp->ioc_count == TRANSPARENT) {
870 mcopyin(mp, NULL, sizeof (uint_t), NULL);
871 qreply(wq, mp);
872 } else {
873 error = miocpullup(mp, sizeof (uint_t));
874 if (error != 0) {
875 miocnak(wq, mp, 0, error);
876 break;
878 sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
879 miocack(wq, mp, 0, 0);
881 break;
883 case SBIOCGFLAGS:
885 * Verify argument length
887 if (transparent != TRANSPARENT) {
888 error = miocpullup(mp, sizeof (uint_t));
889 if (error != 0) {
890 miocnak(wq, mp, 0, error);
891 break;
895 if (transparent == TRANSPARENT) {
896 datamp = allocb(sizeof (uint_t), BPRI_MED);
897 if (datamp == NULL) {
898 miocnak(wq, mp, 0, EAGAIN);
899 break;
901 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
904 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_flags;
906 if (transparent == TRANSPARENT)
907 qreply(wq, mp);
908 else
909 miocack(wq, mp, sizeof (uint_t), 0);
910 break;
913 default:
914 putnext(wq, mp);
915 break;
920 * Given a length l, calculate the amount of extra storage
921 * required to round it up to the next multiple of the alignment a.
923 #define RoundUpAmt(l, a) ((l) % (a) ? (a) - ((l) % (a)) : 0)
925 * Calculate additional amount of space required for alignment.
927 #define Align(l) RoundUpAmt(l, sizeof (ulong_t))
929 * Smallest possible message size when headers are enabled.
930 * This is used to calculate whether a chunk is nearly full.
932 #define SMALLEST_MESSAGE sizeof (struct sb_hdr) + _POINTER_ALIGNMENT
935 * Process a read-side M_DATA message.
937 * If the currently accumulating chunk doesn't have enough room
938 * for the message, close off the chunk, pass it upward, and start
939 * a new one. Then add the message to the current chunk, taking
940 * account of the possibility that the message's size exceeds the
941 * chunk size.
943 * If headers are enabled add an sb_hdr header and trailing alignment padding.
945 * To optimise performance the total number of msgbs should be kept
946 * to a minimum. This is achieved by using any remaining space in message N
947 * for both its own padding as well as the header of message N+1 if possible.
948 * If there's insufficient space we allocate one message to hold this 'wrapper'.
949 * (there's likely to be space beyond message N, since allocb would have
950 * rounded up the required size to one of the dblk_sizes).
953 static void
954 sbaddmsg(queue_t *rq, mblk_t *mp)
956 struct sb *sbp;
957 struct timeval t;
958 struct sb_hdr hp;
959 mblk_t *wrapper; /* padding for msg N, header for msg N+1 */
960 mblk_t *last; /* last mblk of current message */
961 size_t wrapperlen; /* length of header + padding */
962 size_t origlen; /* data length before truncation */
963 size_t pad; /* bytes required to align header */
965 sbp = (struct sb *)rq->q_ptr;
967 origlen = msgdsize(mp);
970 * Truncate the message.
972 if ((sbp->sb_snap > 0) && (origlen > sbp->sb_snap) &&
973 (adjmsg(mp, -(origlen - sbp->sb_snap)) == 1))
974 hp.sbh_totlen = hp.sbh_msglen = sbp->sb_snap;
975 else
976 hp.sbh_totlen = hp.sbh_msglen = origlen;
978 if (sbp->sb_flags & SB_NO_HEADER) {
981 * Would the inclusion of this message overflow the current
982 * chunk? If so close the chunk off and start a new one.
984 if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
985 sbclosechunk(sbp);
987 * First message too big for chunk - just send it up.
988 * This will always be true when we're not chunking.
990 if (hp.sbh_totlen > sbp->sb_chunk) {
991 sbsendit(rq, mp);
992 return;
996 * We now know that the msg will fit in the chunk.
997 * Link it onto the end of the chunk.
998 * Since linkb() walks the entire chain, we keep a pointer to
999 * the first mblk of the last msgb added and call linkb on that
1000 * that last message, rather than performing the
1001 * O(n) linkb() operation on the whole chain.
1002 * sb_head isn't needed in this SB_NO_HEADER mode.
1004 if (sbp->sb_mp)
1005 linkb(sbp->sb_tail, mp);
1006 else
1007 sbp->sb_mp = mp;
1009 sbp->sb_tail = mp;
1010 sbp->sb_mlen += hp.sbh_totlen;
1011 sbp->sb_mcount++;
1012 } else {
1013 /* Timestamp must be done immediately */
1014 uniqtime(&t);
1015 TIMEVAL_TO_TIMEVAL32(&hp.sbh_timestamp, &t);
1017 pad = Align(hp.sbh_totlen);
1018 hp.sbh_totlen += sizeof (hp);
1019 hp.sbh_totlen += pad;
1022 * Would the inclusion of this message overflow the current
1023 * chunk? If so close the chunk off and start a new one.
1025 if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
1026 sbclosechunk(sbp);
1028 if (sbp->sb_head == NULL) {
1029 /* Allocate leading header of new chunk */
1030 sbp->sb_head = allocb(sizeof (hp), BPRI_MED);
1031 if (sbp->sb_head == NULL) {
1033 * Memory allocation failure.
1034 * This will need to be revisited
1035 * since using certain flag combinations
1036 * can result in messages being dropped
1037 * silently.
1039 freemsg(mp);
1040 sbp->sb_drops++;
1041 return;
1043 sbp->sb_mp = sbp->sb_head;
1047 * Copy header into message
1049 hp.sbh_drops = sbp->sb_drops;
1050 hp.sbh_origlen = origlen;
1051 (void) memcpy(sbp->sb_head->b_wptr, (char *)&hp, sizeof (hp));
1052 sbp->sb_head->b_wptr += sizeof (hp);
1054 ASSERT(sbp->sb_head->b_wptr <= sbp->sb_head->b_datap->db_lim);
1057 * Join message to the chunk
1059 linkb(sbp->sb_head, mp);
1061 sbp->sb_mcount++;
1062 sbp->sb_mlen += hp.sbh_totlen;
1065 * If the first message alone is too big for the chunk close
1066 * the chunk now.
1067 * If the next message would immediately cause the chunk to
1068 * overflow we may as well close the chunk now. The next
1069 * message is certain to be at least SMALLEST_MESSAGE size.
1071 if (hp.sbh_totlen + SMALLEST_MESSAGE > sbp->sb_chunk) {
1072 sbclosechunk(sbp);
1073 return;
1077 * Find space for the wrapper. The wrapper consists of:
1079 * 1) Padding for this message (this is to ensure each header
1080 * begins on an 8 byte boundary in the userland buffer).
1082 * 2) Space for the next message's header, in case the next
1083 * next message will fit in this chunk.
1085 * It may be possible to append the wrapper to the last mblk
1086 * of the message, but only if we 'own' the data. If the dblk
1087 * has been shared through dupmsg() we mustn't alter it.
1090 wrapperlen = (sizeof (hp) + pad);
1092 /* Is there space for the wrapper beyond the message's data ? */
1093 for (last = mp; last->b_cont; last = last->b_cont)
1096 if ((wrapperlen <= MBLKTAIL(last)) &&
1097 (last->b_datap->db_ref == 1)) {
1098 if (pad > 0) {
1100 * Pad with zeroes to the next pointer boundary
1101 * (we don't want to disclose kernel data to
1102 * users), then advance wptr.
1104 (void) memset(last->b_wptr, 0, pad);
1105 last->b_wptr += pad;
1107 /* Remember where to write the header information */
1108 sbp->sb_head = last;
1109 } else {
1110 /* Have to allocate additional space for the wrapper */
1111 wrapper = allocb(wrapperlen, BPRI_MED);
1112 if (wrapper == NULL) {
1113 sbclosechunk(sbp);
1114 return;
1116 if (pad > 0) {
1118 * Pad with zeroes (we don't want to disclose
1119 * kernel data to users).
1121 (void) memset(wrapper->b_wptr, 0, pad);
1122 wrapper->b_wptr += pad;
1124 /* Link the wrapper msg onto the end of the chunk */
1125 linkb(mp, wrapper);
1126 /* Remember to write the next header in this wrapper */
1127 sbp->sb_head = wrapper;
1133 * Called from timeout().
1134 * Signal a timeout by passing a zero-length M_CTL msg in the read-side
1135 * to synchronize with any active module threads (open, close, wput, rput).
1137 static void
1138 sbtick(void *arg)
1140 struct sb *sbp = arg;
1141 queue_t *rq;
1143 ASSERT(sbp);
1145 rq = sbp->sb_rq;
1146 sbp->sb_timeoutid = 0; /* timeout has fired */
1148 if (putctl(rq, M_CTL) == 0) /* failure */
1149 sbp->sb_timeoutid = qtimeout(rq, sbtick, sbp, sbp->sb_ticks);
1153 * Close off the currently accumulating chunk and pass
1154 * it upward. Takes care of resetting timers as well.
1156 * This routine is called both directly and as a result
1157 * of the chunk timeout expiring.
1159 static void
1160 sbclosechunk(struct sb *sbp)
1162 mblk_t *mp;
1163 queue_t *rq;
1165 ASSERT(sbp);
1167 if (sbp->sb_timeoutid) {
1168 (void) quntimeout(sbp->sb_rq, sbp->sb_timeoutid);
1169 sbp->sb_timeoutid = 0;
1172 mp = sbp->sb_mp;
1173 rq = sbp->sb_rq;
1176 * If there's currently a chunk in progress, close it off
1177 * and try to send it up.
1179 if (mp) {
1180 sbsendit(rq, mp);
1184 * Clear old chunk. Ready for new msgs.
1186 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
1187 sbp->sb_mlen = 0;
1188 sbp->sb_mcount = 0;
1189 if (sbp->sb_flags & SB_DEFER_CHUNK)
1190 sbp->sb_state &= ~SB_FRCVD;
1194 static void
1195 sbsendit(queue_t *rq, mblk_t *mp)
1197 struct sb *sbp = (struct sb *)rq->q_ptr;
1199 if (!canputnext(rq)) {
1200 if (sbp->sb_flags & SB_NO_DROPS)
1201 (void) putq(rq, mp);
1202 else {
1203 freemsg(mp);
1204 sbp->sb_drops += sbp->sb_mcount;
1206 return;
1209 * If there are messages on the q already, keep
1210 * queueing them since they need to be processed in order.
1212 if (qsize(rq) > 0) {
1213 /* should only get here if SB_NO_DROPS */
1214 (void) putq(rq, mp);
1216 else
1217 putnext(rq, mp);