Merge remote-tracking branch 'origin/master'
[unleashed/lotheac.git] / usr / src / uts / common / io / stream.c
bloba76f1315d59a481e1ef9ca68d01a4a8ded7a6b71
1 /*
2 * CDDL HEADER START
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
19 * CDDL HEADER END
21 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */
22 /* All Rights Reserved */
25 * Copyright 2009 Sun Microsystems, Inc. All rights reserved.
26 * Use is subject to license terms.
29 #include <sys/types.h>
30 #include <sys/param.h>
31 #include <sys/thread.h>
32 #include <sys/sysmacros.h>
33 #include <sys/stropts.h>
34 #include <sys/stream.h>
35 #include <sys/strsubr.h>
36 #include <sys/strsun.h>
37 #include <sys/conf.h>
38 #include <sys/debug.h>
39 #include <sys/cmn_err.h>
40 #include <sys/kmem.h>
41 #include <sys/atomic.h>
42 #include <sys/errno.h>
43 #include <sys/vtrace.h>
44 #include <sys/ftrace.h>
45 #include <sys/ontrap.h>
46 #include <sys/multidata.h>
47 #include <sys/multidata_impl.h>
48 #include <sys/sdt.h>
49 #include <sys/strft.h>
51 #ifdef DEBUG
52 #include <sys/kmem_impl.h>
53 #endif
56 * This file contains all the STREAMS utility routines that may
57 * be used by modules and drivers.
61 * STREAMS message allocator: principles of operation
63 * The streams message allocator consists of all the routines that
64 * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
65 * dupb(), freeb() and freemsg(). What follows is a high-level view
66 * of how the allocator works.
68 * Every streams message consists of one or more mblks, a dblk, and data.
69 * All mblks for all types of messages come from a common mblk_cache.
70 * The dblk and data come in several flavors, depending on how the
71 * message is allocated:
73 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
74 * fixed-size dblk/data caches. For message sizes that are multiples of
75 * PAGESIZE, dblks are allocated separately from the buffer.
76 * The associated buffer is allocated by the constructor using kmem_alloc().
77 * For all other message sizes, dblk and its associated data is allocated
78 * as a single contiguous chunk of memory.
79 * Objects in these caches consist of a dblk plus its associated data.
80 * allocb() determines the nearest-size cache by table lookup:
81 * the dblk_cache[] array provides the mapping from size to dblk cache.
83 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
84 * kmem_alloc()'ing a buffer for the data and supplying that
85 * buffer to gesballoc(), described below.
87 * (3) The four flavors of [d]esballoc[a] are all implemented by a
88 * common routine, gesballoc() ("generic esballoc"). gesballoc()
89 * allocates a dblk from the global dblk_esb_cache and sets db_base,
90 * db_lim and db_frtnp to describe the caller-supplied buffer.
92 * While there are several routines to allocate messages, there is only
93 * one routine to free messages: freeb(). freeb() simply invokes the
94 * dblk's free method, dbp->db_free(), which is set at allocation time.
96 * dupb() creates a new reference to a message by allocating a new mblk,
97 * incrementing the dblk reference count and setting the dblk's free
98 * method to dblk_decref(). The dblk's original free method is retained
99 * in db_lastfree. dblk_decref() decrements the reference count on each
100 * freeb(). If this is not the last reference it just frees the mblk;
101 * if this *is* the last reference, it restores db_free to db_lastfree,
102 * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
104 * The implementation makes aggressive use of kmem object caching for
105 * maximum performance. This makes the code simple and compact, but
106 * also a bit abstruse in some places. The invariants that constitute a
107 * message's constructed state, described below, are more subtle than usual.
109 * Every dblk has an "attached mblk" as part of its constructed state.
110 * The mblk is allocated by the dblk's constructor and remains attached
111 * until the message is either dup'ed or pulled up. In the dupb() case
112 * the mblk association doesn't matter until the last free, at which time
113 * dblk_decref() attaches the last mblk to the dblk. pullupmsg() affects
114 * the mblk association because it swaps the leading mblks of two messages,
115 * so it is responsible for swapping their db_mblk pointers accordingly.
116 * From a constructed-state viewpoint it doesn't matter that a dblk's
117 * attached mblk can change while the message is allocated; all that
118 * matters is that the dblk has *some* attached mblk when it's freed.
120 * The sizes of the allocb() small-message caches are not magical.
121 * They represent a good trade-off between internal and external
122 * fragmentation for current workloads. They should be reevaluated
123 * periodically, especially if allocations larger than DBLK_MAX_CACHE
124 * become common. We use 64-byte alignment so that dblks don't
125 * straddle cache lines unnecessarily.
127 #define DBLK_MAX_CACHE 73728
128 #define DBLK_CACHE_ALIGN 64
129 #define DBLK_MIN_SIZE 8
130 #define DBLK_SIZE_SHIFT 3
132 #ifdef _BIG_ENDIAN
133 #define DBLK_RTFU_SHIFT(field) \
134 (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
135 #else
136 #define DBLK_RTFU_SHIFT(field) \
137 (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
138 #endif
140 #define DBLK_RTFU(ref, type, flags, uioflag) \
141 (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
142 ((type) << DBLK_RTFU_SHIFT(db_type)) | \
143 (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
144 ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
145 #define DBLK_RTFU_REF_MASK (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
146 #define DBLK_RTFU_WORD(dbp) (*((uint32_t *)&(dbp)->db_ref))
147 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band))
149 static size_t dblk_sizes[] = {
150 #ifdef _LP64
151 16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
152 8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
153 40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
154 #else
155 64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
156 8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
157 40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
158 #endif
159 DBLK_MAX_CACHE, 0
162 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
163 static struct kmem_cache *mblk_cache;
164 static struct kmem_cache *dblk_esb_cache;
165 static struct kmem_cache *fthdr_cache;
166 static struct kmem_cache *ftblk_cache;
168 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
169 static mblk_t *allocb_oversize(size_t size, int flags);
170 static int allocb_tryhard_fails;
171 static void frnop_func(void *arg);
172 frtn_t frnop = { frnop_func };
173 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
175 static boolean_t rwnext_enter(queue_t *qp);
176 static void rwnext_exit(queue_t *qp);
179 * Patchable mblk/dblk kmem_cache flags.
181 int dblk_kmem_flags = 0;
182 int mblk_kmem_flags = 0;
184 static int
185 dblk_constructor(void *buf, void *cdrarg, int kmflags)
187 dblk_t *dbp = buf;
188 ssize_t msg_size = (ssize_t)cdrarg;
189 size_t index;
191 ASSERT(msg_size != 0);
193 index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
195 ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
197 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
198 return (-1);
199 if ((msg_size & PAGEOFFSET) == 0) {
200 dbp->db_base = kmem_alloc(msg_size, kmflags);
201 if (dbp->db_base == NULL) {
202 kmem_cache_free(mblk_cache, dbp->db_mblk);
203 return (-1);
205 } else {
206 dbp->db_base = (unsigned char *)&dbp[1];
209 dbp->db_mblk->b_datap = dbp;
210 dbp->db_cache = dblk_cache[index];
211 dbp->db_lim = dbp->db_base + msg_size;
212 dbp->db_free = dbp->db_lastfree = dblk_lastfree;
213 dbp->db_frtnp = NULL;
214 dbp->db_fthdr = NULL;
215 dbp->db_credp = NULL;
216 dbp->db_cpid = -1;
217 dbp->db_struioflag = 0;
218 dbp->db_struioun.cksum.flags = 0;
219 return (0);
222 /*ARGSUSED*/
223 static int
224 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
226 dblk_t *dbp = buf;
228 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
229 return (-1);
230 dbp->db_mblk->b_datap = dbp;
231 dbp->db_cache = dblk_esb_cache;
232 dbp->db_fthdr = NULL;
233 dbp->db_credp = NULL;
234 dbp->db_cpid = -1;
235 dbp->db_struioflag = 0;
236 dbp->db_struioun.cksum.flags = 0;
237 return (0);
240 static int
241 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
243 dblk_t *dbp = buf;
244 bcache_t *bcp = cdrarg;
246 if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
247 return (-1);
249 dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
250 if (dbp->db_base == NULL) {
251 kmem_cache_free(mblk_cache, dbp->db_mblk);
252 return (-1);
255 dbp->db_mblk->b_datap = dbp;
256 dbp->db_cache = (void *)bcp;
257 dbp->db_lim = dbp->db_base + bcp->size;
258 dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
259 dbp->db_frtnp = NULL;
260 dbp->db_fthdr = NULL;
261 dbp->db_credp = NULL;
262 dbp->db_cpid = -1;
263 dbp->db_struioflag = 0;
264 dbp->db_struioun.cksum.flags = 0;
265 return (0);
268 /*ARGSUSED*/
269 static void
270 dblk_destructor(void *buf, void *cdrarg)
272 dblk_t *dbp = buf;
273 ssize_t msg_size = (ssize_t)cdrarg;
275 ASSERT(dbp->db_mblk->b_datap == dbp);
276 ASSERT(msg_size != 0);
277 ASSERT(dbp->db_struioflag == 0);
278 ASSERT(dbp->db_struioun.cksum.flags == 0);
280 if ((msg_size & PAGEOFFSET) == 0) {
281 kmem_free(dbp->db_base, msg_size);
284 kmem_cache_free(mblk_cache, dbp->db_mblk);
287 static void
288 bcache_dblk_destructor(void *buf, void *cdrarg)
290 dblk_t *dbp = buf;
291 bcache_t *bcp = cdrarg;
293 kmem_cache_free(bcp->buffer_cache, dbp->db_base);
295 ASSERT(dbp->db_mblk->b_datap == dbp);
296 ASSERT(dbp->db_struioflag == 0);
297 ASSERT(dbp->db_struioun.cksum.flags == 0);
299 kmem_cache_free(mblk_cache, dbp->db_mblk);
302 /* ARGSUSED */
303 static int
304 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
306 ftblk_t *fbp = buf;
307 int i;
309 bzero(fbp, sizeof (ftblk_t));
310 if (str_ftstack != 0) {
311 for (i = 0; i < FTBLK_EVNTS; i++)
312 fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
315 return (0);
318 /* ARGSUSED */
319 static void
320 ftblk_destructor(void *buf, void *cdrarg)
322 ftblk_t *fbp = buf;
323 int i;
325 if (str_ftstack != 0) {
326 for (i = 0; i < FTBLK_EVNTS; i++) {
327 if (fbp->ev[i].stk != NULL) {
328 kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
329 fbp->ev[i].stk = NULL;
335 static int
336 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
338 fthdr_t *fhp = buf;
340 return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
343 static void
344 fthdr_destructor(void *buf, void *cdrarg)
346 fthdr_t *fhp = buf;
348 ftblk_destructor(&fhp->first, cdrarg);
351 void
352 streams_msg_init(void)
354 char name[40];
355 size_t size;
356 size_t lastsize = DBLK_MIN_SIZE;
357 size_t *sizep;
358 struct kmem_cache *cp;
359 size_t tot_size;
360 int offset;
362 mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
363 NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
365 for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
367 if ((offset = (size & PAGEOFFSET)) != 0) {
369 * We are in the middle of a page, dblk should
370 * be allocated on the same page
372 tot_size = size + sizeof (dblk_t);
373 ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
374 < PAGESIZE);
375 ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
377 } else {
380 * buf size is multiple of page size, dblk and
381 * buffer are allocated separately.
384 ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
385 tot_size = sizeof (dblk_t);
388 (void) sprintf(name, "streams_dblk_%ld", size);
389 cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
390 dblk_constructor, dblk_destructor, NULL, (void *)(size),
391 NULL, dblk_kmem_flags);
393 while (lastsize <= size) {
394 dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
395 lastsize += DBLK_MIN_SIZE;
399 dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
400 DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
401 (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
402 fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
403 fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
404 ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
405 ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
407 /* Initialize Multidata caches */
408 mmd_init();
410 /* initialize throttling queue for esballoc */
411 esballoc_queue_init();
414 /*ARGSUSED*/
415 mblk_t *
416 allocb(size_t size, uint_t pri)
418 dblk_t *dbp;
419 mblk_t *mp;
420 size_t index;
422 index = (size - 1) >> DBLK_SIZE_SHIFT;
424 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
425 if (size != 0) {
426 mp = allocb_oversize(size, KM_NOSLEEP);
427 goto out;
429 index = 0;
432 if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
433 mp = NULL;
434 goto out;
437 mp = dbp->db_mblk;
438 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
439 mp->b_next = mp->b_prev = mp->b_cont = NULL;
440 mp->b_rptr = mp->b_wptr = dbp->db_base;
441 mp->b_queue = NULL;
442 MBLK_BAND_FLAG_WORD(mp) = 0;
443 STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
444 out:
445 FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
447 return (mp);
451 * Allocate an mblk taking db_credp and db_cpid from the template.
452 * Allow the cred to be NULL.
454 mblk_t *
455 allocb_tmpl(size_t size, const mblk_t *tmpl)
457 mblk_t *mp = allocb(size, 0);
459 if (mp != NULL) {
460 dblk_t *src = tmpl->b_datap;
461 dblk_t *dst = mp->b_datap;
462 cred_t *cr;
463 pid_t cpid;
465 cr = msg_getcred(tmpl, &cpid);
466 if (cr != NULL)
467 crhold(dst->db_credp = cr);
468 dst->db_cpid = cpid;
469 dst->db_type = src->db_type;
471 return (mp);
474 mblk_t *
475 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
477 mblk_t *mp = allocb(size, 0);
479 ASSERT(cr != NULL);
480 if (mp != NULL) {
481 dblk_t *dbp = mp->b_datap;
483 crhold(dbp->db_credp = cr);
484 dbp->db_cpid = cpid;
486 return (mp);
489 mblk_t *
490 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
492 mblk_t *mp = allocb_wait(size, 0, flags, error);
494 ASSERT(cr != NULL);
495 if (mp != NULL) {
496 dblk_t *dbp = mp->b_datap;
498 crhold(dbp->db_credp = cr);
499 dbp->db_cpid = cpid;
502 return (mp);
506 * Extract the db_cred (and optionally db_cpid) from a message.
507 * We find the first mblk which has a non-NULL db_cred and use that.
508 * If none found we return NULL.
509 * Does NOT get a hold on the cred.
511 cred_t *
512 msg_getcred(const mblk_t *mp, pid_t *cpidp)
514 cred_t *cr = NULL;
515 cred_t *cr2;
516 mblk_t *mp2;
518 while (mp != NULL) {
519 dblk_t *dbp = mp->b_datap;
521 cr = dbp->db_credp;
522 if (cr == NULL) {
523 mp = mp->b_cont;
524 continue;
526 if (cpidp != NULL)
527 *cpidp = dbp->db_cpid;
529 #ifdef DEBUG
531 * Normally there should at most one db_credp in a message.
532 * But if there are multiple (as in the case of some M_IOC*
533 * and some internal messages in TCP/IP bind logic) then
534 * they must be identical in the normal case.
535 * However, a socket can be shared between different uids
536 * in which case data queued in TCP would be from different
537 * creds. Thus we can only assert for the zoneid being the
538 * same. Due to Multi-level Level Ports for TX, some
539 * cred_t can have a NULL cr_zone, and we skip the comparison
540 * in that case.
542 mp2 = mp->b_cont;
543 while (mp2 != NULL) {
544 cr2 = DB_CRED(mp2);
545 if (cr2 != NULL) {
546 DTRACE_PROBE2(msg__getcred,
547 cred_t *, cr, cred_t *, cr2);
548 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
549 crgetzone(cr) == NULL ||
550 crgetzone(cr2) == NULL);
552 mp2 = mp2->b_cont;
554 #endif
555 return (cr);
557 if (cpidp != NULL)
558 *cpidp = NOPID;
559 return (NULL);
563 * Variant of msg_getcred which, when a cred is found
564 * 1. Returns with a hold on the cred
565 * 2. Clears the first cred in the mblk.
566 * This is more efficient to use than a msg_getcred() + crhold() when
567 * the message is freed after the cred has been extracted.
569 * The caller is responsible for ensuring that there is no other reference
570 * on the message since db_credp can not be cleared when there are other
571 * references.
573 cred_t *
574 msg_extractcred(mblk_t *mp, pid_t *cpidp)
576 cred_t *cr = NULL;
577 cred_t *cr2;
578 mblk_t *mp2;
580 while (mp != NULL) {
581 dblk_t *dbp = mp->b_datap;
583 cr = dbp->db_credp;
584 if (cr == NULL) {
585 mp = mp->b_cont;
586 continue;
588 ASSERT(dbp->db_ref == 1);
589 dbp->db_credp = NULL;
590 if (cpidp != NULL)
591 *cpidp = dbp->db_cpid;
592 #ifdef DEBUG
594 * Normally there should at most one db_credp in a message.
595 * But if there are multiple (as in the case of some M_IOC*
596 * and some internal messages in TCP/IP bind logic) then
597 * they must be identical in the normal case.
598 * However, a socket can be shared between different uids
599 * in which case data queued in TCP would be from different
600 * creds. Thus we can only assert for the zoneid being the
601 * same. Due to Multi-level Level Ports for TX, some
602 * cred_t can have a NULL cr_zone, and we skip the comparison
603 * in that case.
605 mp2 = mp->b_cont;
606 while (mp2 != NULL) {
607 cr2 = DB_CRED(mp2);
608 if (cr2 != NULL) {
609 DTRACE_PROBE2(msg__extractcred,
610 cred_t *, cr, cred_t *, cr2);
611 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
612 crgetzone(cr) == NULL ||
613 crgetzone(cr2) == NULL);
615 mp2 = mp2->b_cont;
617 #endif
618 return (cr);
620 return (NULL);
623 void
624 freeb(mblk_t *mp)
626 dblk_t *dbp = mp->b_datap;
628 ASSERT(dbp->db_ref > 0);
629 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
630 FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
632 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
634 dbp->db_free(mp, dbp);
637 void
638 freemsg(mblk_t *mp)
640 FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
641 while (mp) {
642 dblk_t *dbp = mp->b_datap;
643 mblk_t *mp_cont = mp->b_cont;
645 ASSERT(dbp->db_ref > 0);
646 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
648 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
650 dbp->db_free(mp, dbp);
651 mp = mp_cont;
656 * Reallocate a block for another use. Try hard to use the old block.
657 * If the old data is wanted (copy), leave b_wptr at the end of the data,
658 * otherwise return b_wptr = b_rptr.
660 * This routine is private and unstable.
662 mblk_t *
663 reallocb(mblk_t *mp, size_t size, uint_t copy)
665 mblk_t *mp1;
666 unsigned char *old_rptr;
667 ptrdiff_t cur_size;
669 if (mp == NULL)
670 return (allocb(size, BPRI_HI));
672 cur_size = mp->b_wptr - mp->b_rptr;
673 old_rptr = mp->b_rptr;
675 ASSERT(mp->b_datap->db_ref != 0);
677 if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
679 * If the data is wanted and it will fit where it is, no
680 * work is required.
682 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
683 return (mp);
685 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
686 mp1 = mp;
687 } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
688 /* XXX other mp state could be copied too, db_flags ... ? */
689 mp1->b_cont = mp->b_cont;
690 } else {
691 return (NULL);
694 if (copy) {
695 bcopy(old_rptr, mp1->b_rptr, cur_size);
696 mp1->b_wptr = mp1->b_rptr + cur_size;
699 if (mp != mp1)
700 freeb(mp);
702 return (mp1);
705 static void
706 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
708 ASSERT(dbp->db_mblk == mp);
709 if (dbp->db_fthdr != NULL)
710 str_ftfree(dbp);
712 /* set credp and projid to be 'unspecified' before returning to cache */
713 if (dbp->db_credp != NULL) {
714 crfree(dbp->db_credp);
715 dbp->db_credp = NULL;
717 dbp->db_cpid = -1;
719 /* Reset the struioflag and the checksum flag fields */
720 dbp->db_struioflag = 0;
721 dbp->db_struioun.cksum.flags = 0;
723 /* and the COOKED and/or UIOA flag(s) */
724 dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
726 kmem_cache_free(dbp->db_cache, dbp);
729 static void
730 dblk_decref(mblk_t *mp, dblk_t *dbp)
732 if (dbp->db_ref != 1) {
733 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
734 -(1 << DBLK_RTFU_SHIFT(db_ref)));
736 * atomic_add_32_nv() just decremented db_ref, so we no longer
737 * have a reference to the dblk, which means another thread
738 * could free it. Therefore we cannot examine the dblk to
739 * determine whether ours was the last reference. Instead,
740 * we extract the new and minimum reference counts from rtfu.
741 * Note that all we're really saying is "if (ref != refmin)".
743 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
744 ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
745 kmem_cache_free(mblk_cache, mp);
746 return;
749 dbp->db_mblk = mp;
750 dbp->db_free = dbp->db_lastfree;
751 dbp->db_lastfree(mp, dbp);
754 mblk_t *
755 dupb(mblk_t *mp)
757 dblk_t *dbp = mp->b_datap;
758 mblk_t *new_mp;
759 uint32_t oldrtfu, newrtfu;
761 if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
762 goto out;
764 new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
765 new_mp->b_rptr = mp->b_rptr;
766 new_mp->b_wptr = mp->b_wptr;
767 new_mp->b_datap = dbp;
768 new_mp->b_queue = NULL;
769 MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
771 STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
773 dbp->db_free = dblk_decref;
774 do {
775 ASSERT(dbp->db_ref > 0);
776 oldrtfu = DBLK_RTFU_WORD(dbp);
777 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
779 * If db_ref is maxed out we can't dup this message anymore.
781 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
782 kmem_cache_free(mblk_cache, new_mp);
783 new_mp = NULL;
784 goto out;
786 } while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) !=
787 oldrtfu);
789 out:
790 FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
791 return (new_mp);
794 static void
795 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
797 frtn_t *frp = dbp->db_frtnp;
799 ASSERT(dbp->db_mblk == mp);
800 frp->free_func(frp->free_arg);
801 if (dbp->db_fthdr != NULL)
802 str_ftfree(dbp);
804 /* set credp and projid to be 'unspecified' before returning to cache */
805 if (dbp->db_credp != NULL) {
806 crfree(dbp->db_credp);
807 dbp->db_credp = NULL;
809 dbp->db_cpid = -1;
810 dbp->db_struioflag = 0;
811 dbp->db_struioun.cksum.flags = 0;
813 kmem_cache_free(dbp->db_cache, dbp);
816 /*ARGSUSED*/
817 static void
818 frnop_func(void *arg)
823 * Generic esballoc used to implement the four flavors: [d]esballoc[a].
825 static mblk_t *
826 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
827 void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
829 dblk_t *dbp;
830 mblk_t *mp;
832 ASSERT(base != NULL && frp != NULL);
834 if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
835 mp = NULL;
836 goto out;
839 mp = dbp->db_mblk;
840 dbp->db_base = base;
841 dbp->db_lim = base + size;
842 dbp->db_free = dbp->db_lastfree = lastfree;
843 dbp->db_frtnp = frp;
844 DBLK_RTFU_WORD(dbp) = db_rtfu;
845 mp->b_next = mp->b_prev = mp->b_cont = NULL;
846 mp->b_rptr = mp->b_wptr = base;
847 mp->b_queue = NULL;
848 MBLK_BAND_FLAG_WORD(mp) = 0;
850 out:
851 FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
852 return (mp);
855 /*ARGSUSED*/
856 mblk_t *
857 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
859 mblk_t *mp;
862 * Note that this is structured to allow the common case (i.e.
863 * STREAMS flowtracing disabled) to call gesballoc() with tail
864 * call optimization.
866 if (!str_ftnever) {
867 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
868 frp, freebs_enqueue, KM_NOSLEEP);
870 if (mp != NULL)
871 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
872 return (mp);
875 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
876 frp, freebs_enqueue, KM_NOSLEEP));
880 * Same as esballoc() but sleeps waiting for memory.
882 /*ARGSUSED*/
883 mblk_t *
884 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
886 mblk_t *mp;
889 * Note that this is structured to allow the common case (i.e.
890 * STREAMS flowtracing disabled) to call gesballoc() with tail
891 * call optimization.
893 if (!str_ftnever) {
894 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
895 frp, freebs_enqueue, KM_SLEEP);
897 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
898 return (mp);
901 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
902 frp, freebs_enqueue, KM_SLEEP));
905 /*ARGSUSED*/
906 mblk_t *
907 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
909 mblk_t *mp;
912 * Note that this is structured to allow the common case (i.e.
913 * STREAMS flowtracing disabled) to call gesballoc() with tail
914 * call optimization.
916 if (!str_ftnever) {
917 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
918 frp, dblk_lastfree_desb, KM_NOSLEEP);
920 if (mp != NULL)
921 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
922 return (mp);
925 return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
926 frp, dblk_lastfree_desb, KM_NOSLEEP));
929 /*ARGSUSED*/
930 mblk_t *
931 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
933 mblk_t *mp;
936 * Note that this is structured to allow the common case (i.e.
937 * STREAMS flowtracing disabled) to call gesballoc() with tail
938 * call optimization.
940 if (!str_ftnever) {
941 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
942 frp, freebs_enqueue, KM_NOSLEEP);
944 if (mp != NULL)
945 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
946 return (mp);
949 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
950 frp, freebs_enqueue, KM_NOSLEEP));
953 /*ARGSUSED*/
954 mblk_t *
955 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
957 mblk_t *mp;
960 * Note that this is structured to allow the common case (i.e.
961 * STREAMS flowtracing disabled) to call gesballoc() with tail
962 * call optimization.
964 if (!str_ftnever) {
965 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
966 frp, dblk_lastfree_desb, KM_NOSLEEP);
968 if (mp != NULL)
969 STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
970 return (mp);
973 return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
974 frp, dblk_lastfree_desb, KM_NOSLEEP));
977 static void
978 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
980 bcache_t *bcp = dbp->db_cache;
982 ASSERT(dbp->db_mblk == mp);
983 if (dbp->db_fthdr != NULL)
984 str_ftfree(dbp);
986 /* set credp and projid to be 'unspecified' before returning to cache */
987 if (dbp->db_credp != NULL) {
988 crfree(dbp->db_credp);
989 dbp->db_credp = NULL;
991 dbp->db_cpid = -1;
992 dbp->db_struioflag = 0;
993 dbp->db_struioun.cksum.flags = 0;
995 mutex_enter(&bcp->mutex);
996 kmem_cache_free(bcp->dblk_cache, dbp);
997 bcp->alloc--;
999 if (bcp->alloc == 0 && bcp->destroy != 0) {
1000 kmem_cache_destroy(bcp->dblk_cache);
1001 kmem_cache_destroy(bcp->buffer_cache);
1002 mutex_exit(&bcp->mutex);
1003 mutex_destroy(&bcp->mutex);
1004 kmem_free(bcp, sizeof (bcache_t));
1005 } else {
1006 mutex_exit(&bcp->mutex);
1010 bcache_t *
1011 bcache_create(char *name, size_t size, uint_t align)
1013 bcache_t *bcp;
1014 char buffer[255];
1016 ASSERT((align & (align - 1)) == 0);
1018 if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1019 return (NULL);
1021 bcp->size = size;
1022 bcp->align = align;
1023 bcp->alloc = 0;
1024 bcp->destroy = 0;
1026 mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1028 (void) sprintf(buffer, "%s_buffer_cache", name);
1029 bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1030 NULL, NULL, NULL, 0);
1031 (void) sprintf(buffer, "%s_dblk_cache", name);
1032 bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1033 DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1034 NULL, (void *)bcp, NULL, 0);
1036 return (bcp);
1039 void
1040 bcache_destroy(bcache_t *bcp)
1042 ASSERT(bcp != NULL);
1044 mutex_enter(&bcp->mutex);
1045 if (bcp->alloc == 0) {
1046 kmem_cache_destroy(bcp->dblk_cache);
1047 kmem_cache_destroy(bcp->buffer_cache);
1048 mutex_exit(&bcp->mutex);
1049 mutex_destroy(&bcp->mutex);
1050 kmem_free(bcp, sizeof (bcache_t));
1051 } else {
1052 bcp->destroy++;
1053 mutex_exit(&bcp->mutex);
1057 /*ARGSUSED*/
1058 mblk_t *
1059 bcache_allocb(bcache_t *bcp, uint_t pri)
1061 dblk_t *dbp;
1062 mblk_t *mp = NULL;
1064 ASSERT(bcp != NULL);
1066 mutex_enter(&bcp->mutex);
1067 if (bcp->destroy != 0) {
1068 mutex_exit(&bcp->mutex);
1069 goto out;
1072 if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1073 mutex_exit(&bcp->mutex);
1074 goto out;
1076 bcp->alloc++;
1077 mutex_exit(&bcp->mutex);
1079 ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1081 mp = dbp->db_mblk;
1082 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1083 mp->b_next = mp->b_prev = mp->b_cont = NULL;
1084 mp->b_rptr = mp->b_wptr = dbp->db_base;
1085 mp->b_queue = NULL;
1086 MBLK_BAND_FLAG_WORD(mp) = 0;
1087 STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1088 out:
1089 FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1091 return (mp);
1094 static void
1095 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1097 ASSERT(dbp->db_mblk == mp);
1098 if (dbp->db_fthdr != NULL)
1099 str_ftfree(dbp);
1101 /* set credp and projid to be 'unspecified' before returning to cache */
1102 if (dbp->db_credp != NULL) {
1103 crfree(dbp->db_credp);
1104 dbp->db_credp = NULL;
1106 dbp->db_cpid = -1;
1107 dbp->db_struioflag = 0;
1108 dbp->db_struioun.cksum.flags = 0;
1110 kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1111 kmem_cache_free(dbp->db_cache, dbp);
1114 static mblk_t *
1115 allocb_oversize(size_t size, int kmflags)
1117 mblk_t *mp;
1118 void *buf;
1120 size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1121 if ((buf = kmem_alloc(size, kmflags)) == NULL)
1122 return (NULL);
1123 if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1124 &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1125 kmem_free(buf, size);
1127 if (mp != NULL)
1128 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1130 return (mp);
1133 mblk_t *
1134 allocb_tryhard(size_t target_size)
1136 size_t size;
1137 mblk_t *bp;
1139 for (size = target_size; size < target_size + 512;
1140 size += DBLK_CACHE_ALIGN)
1141 if ((bp = allocb(size, BPRI_HI)) != NULL)
1142 return (bp);
1143 allocb_tryhard_fails++;
1144 return (NULL);
1148 * This routine is consolidation private for STREAMS internal use
1149 * This routine may only be called from sync routines (i.e., not
1150 * from put or service procedures). It is located here (rather
1151 * than strsubr.c) so that we don't have to expose all of the
1152 * allocb() implementation details in header files.
1154 mblk_t *
1155 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1157 dblk_t *dbp;
1158 mblk_t *mp;
1159 size_t index;
1161 index = (size -1) >> DBLK_SIZE_SHIFT;
1163 if (flags & STR_NOSIG) {
1164 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1165 if (size != 0) {
1166 mp = allocb_oversize(size, KM_SLEEP);
1167 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1168 (uintptr_t)mp);
1169 return (mp);
1171 index = 0;
1174 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1175 mp = dbp->db_mblk;
1176 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1177 mp->b_next = mp->b_prev = mp->b_cont = NULL;
1178 mp->b_rptr = mp->b_wptr = dbp->db_base;
1179 mp->b_queue = NULL;
1180 MBLK_BAND_FLAG_WORD(mp) = 0;
1181 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1183 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1185 } else {
1186 while ((mp = allocb(size, pri)) == NULL) {
1187 if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1188 return (NULL);
1192 return (mp);
1196 * Call function 'func' with 'arg' when a class zero block can
1197 * be allocated with priority 'pri'.
1199 bufcall_id_t
1200 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1202 return (bufcall(1, pri, func, arg));
1206 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1207 * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1208 * This provides consistency for all internal allocators of ioctl.
1210 mblk_t *
1211 mkiocb(uint_t cmd)
1213 struct iocblk *ioc;
1214 mblk_t *mp;
1217 * Allocate enough space for any of the ioctl related messages.
1219 if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1220 return (NULL);
1222 bzero(mp->b_rptr, sizeof (union ioctypes));
1225 * Set the mblk_t information and ptrs correctly.
1227 mp->b_wptr += sizeof (struct iocblk);
1228 mp->b_datap->db_type = M_IOCTL;
1231 * Fill in the fields.
1233 ioc = (struct iocblk *)mp->b_rptr;
1234 ioc->ioc_cmd = cmd;
1235 ioc->ioc_cr = kcred;
1236 ioc->ioc_id = getiocseqno();
1237 ioc->ioc_flag = IOC_NATIVE;
1238 return (mp);
1242 * test if block of given size can be allocated with a request of
1243 * the given priority.
1244 * 'pri' is no longer used, but is retained for compatibility.
1246 /* ARGSUSED */
1248 testb(size_t size, uint_t pri)
1250 return ((size + sizeof (dblk_t)) <= kmem_avail());
1254 * Call function 'func' with argument 'arg' when there is a reasonably
1255 * good chance that a block of size 'size' can be allocated.
1256 * 'pri' is no longer used, but is retained for compatibility.
1258 /* ARGSUSED */
1259 bufcall_id_t
1260 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1262 static long bid = 1; /* always odd to save checking for zero */
1263 bufcall_id_t bc_id;
1264 struct strbufcall *bcp;
1266 if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1267 return (0);
1269 bcp->bc_func = func;
1270 bcp->bc_arg = arg;
1271 bcp->bc_size = size;
1272 bcp->bc_next = NULL;
1273 bcp->bc_executor = NULL;
1275 mutex_enter(&strbcall_lock);
1277 * After bcp is linked into strbcalls and strbcall_lock is dropped there
1278 * should be no references to bcp since it may be freed by
1279 * runbufcalls(). Since bcp_id field is returned, we save its value in
1280 * the local var.
1282 bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2); /* keep it odd */
1285 * add newly allocated stream event to existing
1286 * linked list of events.
1288 if (strbcalls.bc_head == NULL) {
1289 strbcalls.bc_head = strbcalls.bc_tail = bcp;
1290 } else {
1291 strbcalls.bc_tail->bc_next = bcp;
1292 strbcalls.bc_tail = bcp;
1295 cv_signal(&strbcall_cv);
1296 mutex_exit(&strbcall_lock);
1297 return (bc_id);
1301 * Cancel a bufcall request.
1303 void
1304 unbufcall(bufcall_id_t id)
1306 strbufcall_t *bcp, *pbcp;
1308 mutex_enter(&strbcall_lock);
1309 again:
1310 pbcp = NULL;
1311 for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1312 if (id == bcp->bc_id)
1313 break;
1314 pbcp = bcp;
1316 if (bcp) {
1317 if (bcp->bc_executor != NULL) {
1318 if (bcp->bc_executor != curthread) {
1319 cv_wait(&bcall_cv, &strbcall_lock);
1320 goto again;
1322 } else {
1323 if (pbcp)
1324 pbcp->bc_next = bcp->bc_next;
1325 else
1326 strbcalls.bc_head = bcp->bc_next;
1327 if (bcp == strbcalls.bc_tail)
1328 strbcalls.bc_tail = pbcp;
1329 kmem_free(bcp, sizeof (strbufcall_t));
1332 mutex_exit(&strbcall_lock);
1336 * Duplicate a message block by block (uses dupb), returning
1337 * a pointer to the duplicate message.
1338 * Returns a non-NULL value only if the entire message
1339 * was dup'd.
1341 mblk_t *
1342 dupmsg(mblk_t *bp)
1344 mblk_t *head, *nbp;
1346 if (!bp || !(nbp = head = dupb(bp)))
1347 return (NULL);
1349 while (bp->b_cont) {
1350 if (!(nbp->b_cont = dupb(bp->b_cont))) {
1351 freemsg(head);
1352 return (NULL);
1354 nbp = nbp->b_cont;
1355 bp = bp->b_cont;
1357 return (head);
1360 #define DUPB_NOLOAN(bp) \
1361 ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1362 copyb((bp)) : dupb((bp)))
1364 mblk_t *
1365 dupmsg_noloan(mblk_t *bp)
1367 mblk_t *head, *nbp;
1369 if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1370 ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1371 return (NULL);
1373 while (bp->b_cont) {
1374 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1375 freemsg(head);
1376 return (NULL);
1378 nbp = nbp->b_cont;
1379 bp = bp->b_cont;
1381 return (head);
1385 * Copy data from message and data block to newly allocated message and
1386 * data block. Returns new message block pointer, or NULL if error.
1387 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1388 * as in the original even when db_base is not word aligned. (bug 1052877)
1390 mblk_t *
1391 copyb(mblk_t *bp)
1393 mblk_t *nbp;
1394 dblk_t *dp, *ndp;
1395 uchar_t *base;
1396 size_t size;
1397 size_t unaligned;
1399 ASSERT(bp->b_wptr >= bp->b_rptr);
1401 dp = bp->b_datap;
1402 if (dp->db_fthdr != NULL)
1403 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1406 * Special handling for Multidata message; this should be
1407 * removed once a copy-callback routine is made available.
1409 if (dp->db_type == M_MULTIDATA) {
1410 cred_t *cr;
1412 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1413 return (NULL);
1415 nbp->b_flag = bp->b_flag;
1416 nbp->b_band = bp->b_band;
1417 ndp = nbp->b_datap;
1419 /* See comments below on potential issues. */
1420 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1422 ASSERT(ndp->db_type == dp->db_type);
1423 cr = dp->db_credp;
1424 if (cr != NULL)
1425 crhold(ndp->db_credp = cr);
1426 ndp->db_cpid = dp->db_cpid;
1427 return (nbp);
1430 size = dp->db_lim - dp->db_base;
1431 unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1432 if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1433 return (NULL);
1434 nbp->b_flag = bp->b_flag;
1435 nbp->b_band = bp->b_band;
1436 ndp = nbp->b_datap;
1439 * Well, here is a potential issue. If we are trying to
1440 * trace a flow, and we copy the message, we might lose
1441 * information about where this message might have been.
1442 * So we should inherit the FT data. On the other hand,
1443 * a user might be interested only in alloc to free data.
1444 * So I guess the real answer is to provide a tunable.
1446 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1448 base = ndp->db_base + unaligned;
1449 bcopy(dp->db_base, ndp->db_base + unaligned, size);
1451 nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1452 nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1454 return (nbp);
1458 * Copy data from message to newly allocated message using new
1459 * data blocks. Returns a pointer to the new message, or NULL if error.
1461 mblk_t *
1462 copymsg(mblk_t *bp)
1464 mblk_t *head, *nbp;
1466 if (!bp || !(nbp = head = copyb(bp)))
1467 return (NULL);
1469 while (bp->b_cont) {
1470 if (!(nbp->b_cont = copyb(bp->b_cont))) {
1471 freemsg(head);
1472 return (NULL);
1474 nbp = nbp->b_cont;
1475 bp = bp->b_cont;
1477 return (head);
1481 * link a message block to tail of message
1483 void
1484 linkb(mblk_t *mp, mblk_t *bp)
1486 ASSERT(mp && bp);
1488 for (; mp->b_cont; mp = mp->b_cont)
1490 mp->b_cont = bp;
1494 * unlink a message block from head of message
1495 * return pointer to new message.
1496 * NULL if message becomes empty.
1498 mblk_t *
1499 unlinkb(mblk_t *bp)
1501 mblk_t *bp1;
1503 bp1 = bp->b_cont;
1504 bp->b_cont = NULL;
1505 return (bp1);
1509 * remove a message block "bp" from message "mp"
1511 * Return pointer to new message or NULL if no message remains.
1512 * Return -1 if bp is not found in message.
1514 mblk_t *
1515 rmvb(mblk_t *mp, mblk_t *bp)
1517 mblk_t *tmp;
1518 mblk_t *lastp = NULL;
1520 ASSERT(mp && bp);
1521 for (tmp = mp; tmp; tmp = tmp->b_cont) {
1522 if (tmp == bp) {
1523 if (lastp)
1524 lastp->b_cont = tmp->b_cont;
1525 else
1526 mp = tmp->b_cont;
1527 tmp->b_cont = NULL;
1528 return (mp);
1530 lastp = tmp;
1532 return ((mblk_t *)-1);
1536 * Concatenate and align first len bytes of common
1537 * message type. Len == -1, means concat everything.
1538 * Returns 1 on success, 0 on failure
1539 * After the pullup, mp points to the pulled up data.
1542 pullupmsg(mblk_t *mp, ssize_t len)
1544 mblk_t *bp, *b_cont;
1545 dblk_t *dbp;
1546 ssize_t n;
1548 ASSERT(mp->b_datap->db_ref > 0);
1549 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1552 * We won't handle Multidata message, since it contains
1553 * metadata which this function has no knowledge of; we
1554 * assert on DEBUG, and return failure otherwise.
1556 ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1557 if (mp->b_datap->db_type == M_MULTIDATA)
1558 return (0);
1560 if (len == -1) {
1561 if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1562 return (1);
1563 len = xmsgsize(mp);
1564 } else {
1565 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1566 ASSERT(first_mblk_len >= 0);
1568 * If the length is less than that of the first mblk,
1569 * we want to pull up the message into an aligned mblk.
1570 * Though not part of the spec, some callers assume it.
1572 if (len <= first_mblk_len) {
1573 if (str_aligned(mp->b_rptr))
1574 return (1);
1575 len = first_mblk_len;
1576 } else if (xmsgsize(mp) < len)
1577 return (0);
1580 if ((bp = allocb_tmpl(len, mp)) == NULL)
1581 return (0);
1583 dbp = bp->b_datap;
1584 *bp = *mp; /* swap mblks so bp heads the old msg... */
1585 mp->b_datap = dbp; /* ... and mp heads the new message */
1586 mp->b_datap->db_mblk = mp;
1587 bp->b_datap->db_mblk = bp;
1588 mp->b_rptr = mp->b_wptr = dbp->db_base;
1590 do {
1591 ASSERT(bp->b_datap->db_ref > 0);
1592 ASSERT(bp->b_wptr >= bp->b_rptr);
1593 n = MIN(bp->b_wptr - bp->b_rptr, len);
1594 ASSERT(n >= 0); /* allow zero-length mblk_t's */
1595 if (n > 0)
1596 bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1597 mp->b_wptr += n;
1598 bp->b_rptr += n;
1599 len -= n;
1600 if (bp->b_rptr != bp->b_wptr)
1601 break;
1602 b_cont = bp->b_cont;
1603 freeb(bp);
1604 bp = b_cont;
1605 } while (len && bp);
1607 mp->b_cont = bp; /* tack on whatever wasn't pulled up */
1609 return (1);
1613 * Concatenate and align at least the first len bytes of common message
1614 * type. Len == -1 means concatenate everything. The original message is
1615 * unaltered. Returns a pointer to a new message on success, otherwise
1616 * returns NULL.
1618 mblk_t *
1619 msgpullup(mblk_t *mp, ssize_t len)
1621 mblk_t *newmp;
1622 ssize_t totlen;
1623 ssize_t n;
1626 * We won't handle Multidata message, since it contains
1627 * metadata which this function has no knowledge of; we
1628 * assert on DEBUG, and return failure otherwise.
1630 ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1631 if (mp->b_datap->db_type == M_MULTIDATA)
1632 return (NULL);
1634 totlen = xmsgsize(mp);
1636 if ((len > 0) && (len > totlen))
1637 return (NULL);
1640 * Copy all of the first msg type into one new mblk, then dupmsg
1641 * and link the rest onto this.
1644 len = totlen;
1646 if ((newmp = allocb_tmpl(len, mp)) == NULL)
1647 return (NULL);
1649 newmp->b_flag = mp->b_flag;
1650 newmp->b_band = mp->b_band;
1652 while (len > 0) {
1653 n = mp->b_wptr - mp->b_rptr;
1654 ASSERT(n >= 0); /* allow zero-length mblk_t's */
1655 if (n > 0)
1656 bcopy(mp->b_rptr, newmp->b_wptr, n);
1657 newmp->b_wptr += n;
1658 len -= n;
1659 mp = mp->b_cont;
1662 if (mp != NULL) {
1663 newmp->b_cont = dupmsg(mp);
1664 if (newmp->b_cont == NULL) {
1665 freemsg(newmp);
1666 return (NULL);
1670 return (newmp);
1674 * Trim bytes from message
1675 * len > 0, trim from head
1676 * len < 0, trim from tail
1677 * Returns 1 on success, 0 on failure.
1680 adjmsg(mblk_t *mp, ssize_t len)
1682 mblk_t *bp;
1683 mblk_t *save_bp = NULL;
1684 mblk_t *prev_bp;
1685 mblk_t *bcont;
1686 unsigned char type;
1687 ssize_t n;
1688 int fromhead;
1689 int first;
1691 ASSERT(mp != NULL);
1693 * We won't handle Multidata message, since it contains
1694 * metadata which this function has no knowledge of; we
1695 * assert on DEBUG, and return failure otherwise.
1697 ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1698 if (mp->b_datap->db_type == M_MULTIDATA)
1699 return (0);
1701 if (len < 0) {
1702 fromhead = 0;
1703 len = -len;
1704 } else {
1705 fromhead = 1;
1708 if (xmsgsize(mp) < len)
1709 return (0);
1711 if (fromhead) {
1712 first = 1;
1713 while (len) {
1714 ASSERT(mp->b_wptr >= mp->b_rptr);
1715 n = MIN(mp->b_wptr - mp->b_rptr, len);
1716 mp->b_rptr += n;
1717 len -= n;
1720 * If this is not the first zero length
1721 * message remove it
1723 if (!first && (mp->b_wptr == mp->b_rptr)) {
1724 bcont = mp->b_cont;
1725 freeb(mp);
1726 mp = save_bp->b_cont = bcont;
1727 } else {
1728 save_bp = mp;
1729 mp = mp->b_cont;
1731 first = 0;
1733 } else {
1734 type = mp->b_datap->db_type;
1735 while (len) {
1736 bp = mp;
1737 save_bp = NULL;
1740 * Find the last message of same type
1742 while (bp && bp->b_datap->db_type == type) {
1743 ASSERT(bp->b_wptr >= bp->b_rptr);
1744 prev_bp = save_bp;
1745 save_bp = bp;
1746 bp = bp->b_cont;
1748 if (save_bp == NULL)
1749 break;
1750 n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1751 save_bp->b_wptr -= n;
1752 len -= n;
1755 * If this is not the first message
1756 * and we have taken away everything
1757 * from this message, remove it
1760 if ((save_bp != mp) &&
1761 (save_bp->b_wptr == save_bp->b_rptr)) {
1762 bcont = save_bp->b_cont;
1763 freeb(save_bp);
1764 prev_bp->b_cont = bcont;
1768 return (1);
1772 * get number of data bytes in message
1774 size_t
1775 msgdsize(mblk_t *bp)
1777 size_t count = 0;
1779 for (; bp; bp = bp->b_cont)
1780 if (bp->b_datap->db_type == M_DATA) {
1781 ASSERT(bp->b_wptr >= bp->b_rptr);
1782 count += bp->b_wptr - bp->b_rptr;
1784 return (count);
1788 * Get a message off head of queue
1790 * If queue has no buffers then mark queue
1791 * with QWANTR. (queue wants to be read by
1792 * someone when data becomes available)
1794 * If there is something to take off then do so.
1795 * If queue falls below hi water mark turn off QFULL
1796 * flag. Decrement weighted count of queue.
1797 * Also turn off QWANTR because queue is being read.
1799 * The queue count is maintained on a per-band basis.
1800 * Priority band 0 (normal messages) uses q_count,
1801 * q_lowat, etc. Non-zero priority bands use the
1802 * fields in their respective qband structures
1803 * (qb_count, qb_lowat, etc.) All messages appear
1804 * on the same list, linked via their b_next pointers.
1805 * q_first is the head of the list. q_count does
1806 * not reflect the size of all the messages on the
1807 * queue. It only reflects those messages in the
1808 * normal band of flow. The one exception to this
1809 * deals with high priority messages. They are in
1810 * their own conceptual "band", but are accounted
1811 * against q_count.
1813 * If queue count is below the lo water mark and QWANTW
1814 * is set, enable the closest backq which has a service
1815 * procedure and turn off the QWANTW flag.
1817 * getq could be built on top of rmvq, but isn't because
1818 * of performance considerations.
1820 * A note on the use of q_count and q_mblkcnt:
1821 * q_count is the traditional byte count for messages that
1822 * have been put on a queue. Documentation tells us that
1823 * we shouldn't rely on that count, but some drivers/modules
1824 * do. What was needed, however, is a mechanism to prevent
1825 * runaway streams from consuming all of the resources,
1826 * and particularly be able to flow control zero-length
1827 * messages. q_mblkcnt is used for this purpose. It
1828 * counts the number of mblk's that are being put on
1829 * the queue. The intention here, is that each mblk should
1830 * contain one byte of data and, for the purpose of
1831 * flow-control, logically does. A queue will become
1832 * full when EITHER of these values (q_count and q_mblkcnt)
1833 * reach the highwater mark. It will clear when BOTH
1834 * of them drop below the highwater mark. And it will
1835 * backenable when BOTH of them drop below the lowwater
1836 * mark.
1837 * With this algorithm, a driver/module might be able
1838 * to find a reasonably accurate q_count, and the
1839 * framework can still try and limit resource usage.
1841 mblk_t *
1842 getq(queue_t *q)
1844 mblk_t *bp;
1845 uchar_t band = 0;
1847 bp = getq_noenab(q, 0);
1848 if (bp != NULL)
1849 band = bp->b_band;
1852 * Inlined from qbackenable().
1853 * Quick check without holding the lock.
1855 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1856 return (bp);
1858 qbackenable(q, band);
1859 return (bp);
1863 * Calculate number of data bytes in a single data message block taking
1864 * multidata messages into account.
1867 #define ADD_MBLK_SIZE(mp, size) \
1868 if (DB_TYPE(mp) != M_MULTIDATA) { \
1869 (size) += MBLKL(mp); \
1870 } else { \
1871 uint_t pinuse; \
1873 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse); \
1874 (size) += pinuse; \
1878 * Returns the number of bytes in a message (a message is defined as a
1879 * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1880 * also return the number of distinct mblks in the message.
1883 mp_cont_len(mblk_t *bp, int *mblkcnt)
1885 mblk_t *mp;
1886 int mblks = 0;
1887 int bytes = 0;
1889 for (mp = bp; mp != NULL; mp = mp->b_cont) {
1890 ADD_MBLK_SIZE(mp, bytes);
1891 mblks++;
1894 if (mblkcnt != NULL)
1895 *mblkcnt = mblks;
1897 return (bytes);
1901 * Like getq() but does not backenable. This is used by the stream
1902 * head when a putback() is likely. The caller must call qbackenable()
1903 * after it is done with accessing the queue.
1904 * The rbytes arguments to getq_noneab() allows callers to specify a
1905 * the maximum number of bytes to return. If the current amount on the
1906 * queue is less than this then the entire message will be returned.
1907 * A value of 0 returns the entire message and is equivalent to the old
1908 * default behaviour prior to the addition of the rbytes argument.
1910 mblk_t *
1911 getq_noenab(queue_t *q, ssize_t rbytes)
1913 mblk_t *bp, *mp1;
1914 mblk_t *mp2 = NULL;
1915 qband_t *qbp;
1916 kthread_id_t freezer;
1917 int bytecnt = 0, mblkcnt = 0;
1919 /* freezestr should allow its caller to call getq/putq */
1920 freezer = STREAM(q)->sd_freezer;
1921 if (freezer == curthread) {
1922 ASSERT(frozenstr(q));
1923 ASSERT(MUTEX_HELD(QLOCK(q)));
1924 } else
1925 mutex_enter(QLOCK(q));
1927 if ((bp = q->q_first) == 0) {
1928 q->q_flag |= QWANTR;
1929 } else {
1931 * If the caller supplied a byte threshold and there is
1932 * more than this amount on the queue then break up the
1933 * the message appropriately. We can only safely do
1934 * this for M_DATA messages.
1936 if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1937 (q->q_count > rbytes)) {
1939 * Inline version of mp_cont_len() which terminates
1940 * when we meet or exceed rbytes.
1942 for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1943 mblkcnt++;
1944 ADD_MBLK_SIZE(mp1, bytecnt);
1945 if (bytecnt >= rbytes)
1946 break;
1949 * We need to account for the following scenarios:
1951 * 1) Too much data in the first message:
1952 * mp1 will be the mblk which puts us over our
1953 * byte limit.
1954 * 2) Not enough data in the first message:
1955 * mp1 will be NULL.
1956 * 3) Exactly the right amount of data contained within
1957 * whole mblks:
1958 * mp1->b_cont will be where we break the message.
1960 if (bytecnt > rbytes) {
1962 * Dup/copy mp1 and put what we don't need
1963 * back onto the queue. Adjust the read/write
1964 * and continuation pointers appropriately
1965 * and decrement the current mblk count to
1966 * reflect we are putting an mblk back onto
1967 * the queue.
1968 * When adjusting the message pointers, it's
1969 * OK to use the existing bytecnt and the
1970 * requested amount (rbytes) to calculate the
1971 * the new write offset (b_wptr) of what we
1972 * are taking. However, we cannot use these
1973 * values when calculating the read offset of
1974 * the mblk we are putting back on the queue.
1975 * This is because the begining (b_rptr) of the
1976 * mblk represents some arbitrary point within
1977 * the message.
1978 * It's simplest to do this by advancing b_rptr
1979 * by the new length of mp1 as we don't have to
1980 * remember any intermediate state.
1982 ASSERT(mp1 != NULL);
1983 mblkcnt--;
1984 if ((mp2 = dupb(mp1)) == NULL &&
1985 (mp2 = copyb(mp1)) == NULL) {
1986 bytecnt = mblkcnt = 0;
1987 goto dup_failed;
1989 mp2->b_cont = mp1->b_cont;
1990 mp1->b_wptr -= bytecnt - rbytes;
1991 mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
1992 mp1->b_cont = NULL;
1993 bytecnt = rbytes;
1994 } else {
1996 * Either there is not enough data in the first
1997 * message or there is no excess data to deal
1998 * with. If mp1 is NULL, we are taking the
1999 * whole message. No need to do anything.
2000 * Otherwise we assign mp1->b_cont to mp2 as
2001 * we will be putting this back onto the head of
2002 * the queue.
2004 if (mp1 != NULL) {
2005 mp2 = mp1->b_cont;
2006 mp1->b_cont = NULL;
2010 * If mp2 is not NULL then we have part of the message
2011 * to put back onto the queue.
2013 if (mp2 != NULL) {
2014 if ((mp2->b_next = bp->b_next) == NULL)
2015 q->q_last = mp2;
2016 else
2017 bp->b_next->b_prev = mp2;
2018 q->q_first = mp2;
2019 } else {
2020 if ((q->q_first = bp->b_next) == NULL)
2021 q->q_last = NULL;
2022 else
2023 q->q_first->b_prev = NULL;
2025 } else {
2027 * Either no byte threshold was supplied, there is
2028 * not enough on the queue or we failed to
2029 * duplicate/copy a data block. In these cases we
2030 * just take the entire first message.
2032 dup_failed:
2033 bytecnt = mp_cont_len(bp, &mblkcnt);
2034 if ((q->q_first = bp->b_next) == NULL)
2035 q->q_last = NULL;
2036 else
2037 q->q_first->b_prev = NULL;
2039 if (bp->b_band == 0) {
2040 q->q_count -= bytecnt;
2041 q->q_mblkcnt -= mblkcnt;
2042 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2043 (q->q_mblkcnt < q->q_hiwat))) {
2044 q->q_flag &= ~QFULL;
2046 } else {
2047 int i;
2049 ASSERT(bp->b_band <= q->q_nband);
2050 ASSERT(q->q_bandp != NULL);
2051 ASSERT(MUTEX_HELD(QLOCK(q)));
2052 qbp = q->q_bandp;
2053 i = bp->b_band;
2054 while (--i > 0)
2055 qbp = qbp->qb_next;
2056 if (qbp->qb_first == qbp->qb_last) {
2057 qbp->qb_first = NULL;
2058 qbp->qb_last = NULL;
2059 } else {
2060 qbp->qb_first = bp->b_next;
2062 qbp->qb_count -= bytecnt;
2063 qbp->qb_mblkcnt -= mblkcnt;
2064 if (qbp->qb_mblkcnt == 0 ||
2065 ((qbp->qb_count < qbp->qb_hiwat) &&
2066 (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2067 qbp->qb_flag &= ~QB_FULL;
2070 q->q_flag &= ~QWANTR;
2071 bp->b_next = NULL;
2072 bp->b_prev = NULL;
2074 if (freezer != curthread)
2075 mutex_exit(QLOCK(q));
2077 STR_FTEVENT_MSG(bp, q, FTEV_GETQ, 0);
2079 return (bp);
2083 * Determine if a backenable is needed after removing a message in the
2084 * specified band.
2085 * NOTE: This routine assumes that something like getq_noenab() has been
2086 * already called.
2088 * For the read side it is ok to hold sd_lock across calling this (and the
2089 * stream head often does).
2090 * But for the write side strwakeq might be invoked and it acquires sd_lock.
2092 void
2093 qbackenable(queue_t *q, uchar_t band)
2095 int backenab = 0;
2096 qband_t *qbp;
2097 kthread_id_t freezer;
2099 ASSERT(q);
2100 ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2103 * Quick check without holding the lock.
2104 * OK since after getq() has lowered the q_count these flags
2105 * would not change unless either the qbackenable() is done by
2106 * another thread (which is ok) or the queue has gotten QFULL
2107 * in which case another backenable will take place when the queue
2108 * drops below q_lowat.
2110 if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2111 return;
2113 /* freezestr should allow its caller to call getq/putq */
2114 freezer = STREAM(q)->sd_freezer;
2115 if (freezer == curthread) {
2116 ASSERT(frozenstr(q));
2117 ASSERT(MUTEX_HELD(QLOCK(q)));
2118 } else
2119 mutex_enter(QLOCK(q));
2121 if (band == 0) {
2122 if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2123 q->q_mblkcnt < q->q_lowat)) {
2124 backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2126 } else {
2127 int i;
2129 ASSERT((unsigned)band <= q->q_nband);
2130 ASSERT(q->q_bandp != NULL);
2132 qbp = q->q_bandp;
2133 i = band;
2134 while (--i > 0)
2135 qbp = qbp->qb_next;
2137 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2138 qbp->qb_mblkcnt < qbp->qb_lowat)) {
2139 backenab = qbp->qb_flag & QB_WANTW;
2143 if (backenab == 0) {
2144 if (freezer != curthread)
2145 mutex_exit(QLOCK(q));
2146 return;
2149 /* Have to drop the lock across strwakeq and backenable */
2150 if (backenab & QWANTWSYNC)
2151 q->q_flag &= ~QWANTWSYNC;
2152 if (backenab & (QWANTW|QB_WANTW)) {
2153 if (band != 0)
2154 qbp->qb_flag &= ~QB_WANTW;
2155 else {
2156 q->q_flag &= ~QWANTW;
2160 if (freezer != curthread)
2161 mutex_exit(QLOCK(q));
2163 if (backenab & QWANTWSYNC)
2164 strwakeq(q, QWANTWSYNC);
2165 if (backenab & (QWANTW|QB_WANTW))
2166 backenable(q, band);
2170 * Remove a message from a queue. The queue count and other
2171 * flow control parameters are adjusted and the back queue
2172 * enabled if necessary.
2174 * rmvq can be called with the stream frozen, but other utility functions
2175 * holding QLOCK, and by streams modules without any locks/frozen.
2177 void
2178 rmvq(queue_t *q, mblk_t *mp)
2180 ASSERT(mp != NULL);
2182 rmvq_noenab(q, mp);
2183 if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2185 * qbackenable can handle a frozen stream but not a "random"
2186 * qlock being held. Drop lock across qbackenable.
2188 mutex_exit(QLOCK(q));
2189 qbackenable(q, mp->b_band);
2190 mutex_enter(QLOCK(q));
2191 } else {
2192 qbackenable(q, mp->b_band);
2197 * Like rmvq() but without any backenabling.
2198 * This exists to handle SR_CONSOL_DATA in strrput().
2200 void
2201 rmvq_noenab(queue_t *q, mblk_t *mp)
2203 int i;
2204 qband_t *qbp = NULL;
2205 kthread_id_t freezer;
2206 int bytecnt = 0, mblkcnt = 0;
2208 freezer = STREAM(q)->sd_freezer;
2209 if (freezer == curthread) {
2210 ASSERT(frozenstr(q));
2211 ASSERT(MUTEX_HELD(QLOCK(q)));
2212 } else if (MUTEX_HELD(QLOCK(q))) {
2213 /* Don't drop lock on exit */
2214 freezer = curthread;
2215 } else
2216 mutex_enter(QLOCK(q));
2218 ASSERT(mp->b_band <= q->q_nband);
2219 if (mp->b_band != 0) { /* Adjust band pointers */
2220 ASSERT(q->q_bandp != NULL);
2221 qbp = q->q_bandp;
2222 i = mp->b_band;
2223 while (--i > 0)
2224 qbp = qbp->qb_next;
2225 if (mp == qbp->qb_first) {
2226 if (mp->b_next && mp->b_band == mp->b_next->b_band)
2227 qbp->qb_first = mp->b_next;
2228 else
2229 qbp->qb_first = NULL;
2231 if (mp == qbp->qb_last) {
2232 if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2233 qbp->qb_last = mp->b_prev;
2234 else
2235 qbp->qb_last = NULL;
2240 * Remove the message from the list.
2242 if (mp->b_prev)
2243 mp->b_prev->b_next = mp->b_next;
2244 else
2245 q->q_first = mp->b_next;
2246 if (mp->b_next)
2247 mp->b_next->b_prev = mp->b_prev;
2248 else
2249 q->q_last = mp->b_prev;
2250 mp->b_next = NULL;
2251 mp->b_prev = NULL;
2253 /* Get the size of the message for q_count accounting */
2254 bytecnt = mp_cont_len(mp, &mblkcnt);
2256 if (mp->b_band == 0) { /* Perform q_count accounting */
2257 q->q_count -= bytecnt;
2258 q->q_mblkcnt -= mblkcnt;
2259 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2260 (q->q_mblkcnt < q->q_hiwat))) {
2261 q->q_flag &= ~QFULL;
2263 } else { /* Perform qb_count accounting */
2264 qbp->qb_count -= bytecnt;
2265 qbp->qb_mblkcnt -= mblkcnt;
2266 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2267 (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2268 qbp->qb_flag &= ~QB_FULL;
2271 if (freezer != curthread)
2272 mutex_exit(QLOCK(q));
2274 STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, 0);
2278 * Empty a queue.
2279 * If flag is set, remove all messages. Otherwise, remove
2280 * only non-control messages. If queue falls below its low
2281 * water mark, and QWANTW is set, enable the nearest upstream
2282 * service procedure.
2284 * Historical note: when merging the M_FLUSH code in strrput with this
2285 * code one difference was discovered. flushq did not have a check
2286 * for q_lowat == 0 in the backenabling test.
2288 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2289 * if one exists on the queue.
2291 void
2292 flushq_common(queue_t *q, int flag, int pcproto_flag)
2294 mblk_t *mp, *nmp;
2295 qband_t *qbp;
2296 int backenab = 0;
2297 unsigned char bpri;
2298 unsigned char qbf[NBAND]; /* band flushing backenable flags */
2300 if (q->q_first == NULL)
2301 return;
2303 mutex_enter(QLOCK(q));
2304 mp = q->q_first;
2305 q->q_first = NULL;
2306 q->q_last = NULL;
2307 q->q_count = 0;
2308 q->q_mblkcnt = 0;
2309 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2310 qbp->qb_first = NULL;
2311 qbp->qb_last = NULL;
2312 qbp->qb_count = 0;
2313 qbp->qb_mblkcnt = 0;
2314 qbp->qb_flag &= ~QB_FULL;
2316 q->q_flag &= ~QFULL;
2317 mutex_exit(QLOCK(q));
2318 while (mp) {
2319 nmp = mp->b_next;
2320 mp->b_next = mp->b_prev = NULL;
2322 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, 0);
2324 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2325 (void) putq(q, mp);
2326 else if (flag || datamsg(mp->b_datap->db_type))
2327 freemsg(mp);
2328 else
2329 (void) putq(q, mp);
2330 mp = nmp;
2332 bpri = 1;
2333 mutex_enter(QLOCK(q));
2334 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2335 if ((qbp->qb_flag & QB_WANTW) &&
2336 (((qbp->qb_count < qbp->qb_lowat) &&
2337 (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2338 qbp->qb_lowat == 0)) {
2339 qbp->qb_flag &= ~QB_WANTW;
2340 backenab = 1;
2341 qbf[bpri] = 1;
2342 } else
2343 qbf[bpri] = 0;
2344 bpri++;
2346 ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2347 if ((q->q_flag & QWANTW) &&
2348 (((q->q_count < q->q_lowat) &&
2349 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2350 q->q_flag &= ~QWANTW;
2351 backenab = 1;
2352 qbf[0] = 1;
2353 } else
2354 qbf[0] = 0;
2357 * If any band can now be written to, and there is a writer
2358 * for that band, then backenable the closest service procedure.
2360 if (backenab) {
2361 mutex_exit(QLOCK(q));
2362 for (bpri = q->q_nband; bpri != 0; bpri--)
2363 if (qbf[bpri])
2364 backenable(q, bpri);
2365 if (qbf[0])
2366 backenable(q, 0);
2367 } else
2368 mutex_exit(QLOCK(q));
2372 * The real flushing takes place in flushq_common. This is done so that
2373 * a flag which specifies whether or not M_PCPROTO messages should be flushed
2374 * or not. Currently the only place that uses this flag is the stream head.
2376 void
2377 flushq(queue_t *q, int flag)
2379 flushq_common(q, flag, 0);
2383 * Flush the queue of messages of the given priority band.
2384 * There is some duplication of code between flushq and flushband.
2385 * This is because we want to optimize the code as much as possible.
2386 * The assumption is that there will be more messages in the normal
2387 * (priority 0) band than in any other.
2389 * Historical note: when merging the M_FLUSH code in strrput with this
2390 * code one difference was discovered. flushband had an extra check for
2391 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2392 * case. That check does not match the man page for flushband and was not
2393 * in the strrput flush code hence it was removed.
2395 void
2396 flushband(queue_t *q, unsigned char pri, int flag)
2398 mblk_t *mp;
2399 mblk_t *nmp;
2400 mblk_t *last;
2401 qband_t *qbp;
2402 int band;
2404 ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2405 if (pri > q->q_nband) {
2406 return;
2408 mutex_enter(QLOCK(q));
2409 if (pri == 0) {
2410 mp = q->q_first;
2411 q->q_first = NULL;
2412 q->q_last = NULL;
2413 q->q_count = 0;
2414 q->q_mblkcnt = 0;
2415 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2416 qbp->qb_first = NULL;
2417 qbp->qb_last = NULL;
2418 qbp->qb_count = 0;
2419 qbp->qb_mblkcnt = 0;
2420 qbp->qb_flag &= ~QB_FULL;
2422 q->q_flag &= ~QFULL;
2423 mutex_exit(QLOCK(q));
2424 while (mp) {
2425 nmp = mp->b_next;
2426 mp->b_next = mp->b_prev = NULL;
2427 if ((mp->b_band == 0) &&
2428 ((flag == FLUSHALL) ||
2429 datamsg(mp->b_datap->db_type)))
2430 freemsg(mp);
2431 else
2432 (void) putq(q, mp);
2433 mp = nmp;
2435 mutex_enter(QLOCK(q));
2436 if ((q->q_flag & QWANTW) &&
2437 (((q->q_count < q->q_lowat) &&
2438 (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2439 q->q_flag &= ~QWANTW;
2440 mutex_exit(QLOCK(q));
2442 backenable(q, pri);
2443 } else
2444 mutex_exit(QLOCK(q));
2445 } else { /* pri != 0 */
2446 boolean_t flushed = B_FALSE;
2447 band = pri;
2449 ASSERT(MUTEX_HELD(QLOCK(q)));
2450 qbp = q->q_bandp;
2451 while (--band > 0)
2452 qbp = qbp->qb_next;
2453 mp = qbp->qb_first;
2454 if (mp == NULL) {
2455 mutex_exit(QLOCK(q));
2456 return;
2458 last = qbp->qb_last->b_next;
2460 * rmvq_noenab() and freemsg() are called for each mblk that
2461 * meets the criteria. The loop is executed until the last
2462 * mblk has been processed.
2464 while (mp != last) {
2465 ASSERT(mp->b_band == pri);
2466 nmp = mp->b_next;
2467 if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2468 rmvq_noenab(q, mp);
2469 freemsg(mp);
2470 flushed = B_TRUE;
2472 mp = nmp;
2474 mutex_exit(QLOCK(q));
2477 * If any mblk(s) has been freed, we know that qbackenable()
2478 * will need to be called.
2480 if (flushed)
2481 qbackenable(q, pri);
2486 * Return 1 if the queue is not full. If the queue is full, return
2487 * 0 (may not put message) and set QWANTW flag (caller wants to write
2488 * to the queue).
2491 canput(queue_t *q)
2493 TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2495 /* this is for loopback transports, they should not do a canput */
2496 ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2498 /* Find next forward module that has a service procedure */
2499 q = q->q_nfsrv;
2501 if (!(q->q_flag & QFULL)) {
2502 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2503 return (1);
2505 mutex_enter(QLOCK(q));
2506 if (q->q_flag & QFULL) {
2507 q->q_flag |= QWANTW;
2508 mutex_exit(QLOCK(q));
2509 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2510 return (0);
2512 mutex_exit(QLOCK(q));
2513 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2514 return (1);
2518 * This is the new canput for use with priority bands. Return 1 if the
2519 * band is not full. If the band is full, return 0 (may not put message)
2520 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2521 * write to the queue).
2524 bcanput(queue_t *q, unsigned char pri)
2526 qband_t *qbp;
2528 TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2529 if (!q)
2530 return (0);
2532 /* Find next forward module that has a service procedure */
2533 q = q->q_nfsrv;
2535 mutex_enter(QLOCK(q));
2536 if (pri == 0) {
2537 if (q->q_flag & QFULL) {
2538 q->q_flag |= QWANTW;
2539 mutex_exit(QLOCK(q));
2540 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2541 "bcanput:%p %X %d", q, pri, 0);
2542 return (0);
2544 } else { /* pri != 0 */
2545 if (pri > q->q_nband) {
2547 * No band exists yet, so return success.
2549 mutex_exit(QLOCK(q));
2550 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2551 "bcanput:%p %X %d", q, pri, 1);
2552 return (1);
2554 qbp = q->q_bandp;
2555 while (--pri)
2556 qbp = qbp->qb_next;
2557 if (qbp->qb_flag & QB_FULL) {
2558 qbp->qb_flag |= QB_WANTW;
2559 mutex_exit(QLOCK(q));
2560 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2561 "bcanput:%p %X %d", q, pri, 0);
2562 return (0);
2565 mutex_exit(QLOCK(q));
2566 TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2567 "bcanput:%p %X %d", q, pri, 1);
2568 return (1);
2572 * Put a message on a queue.
2574 * Messages are enqueued on a priority basis. The priority classes
2575 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2576 * and B_NORMAL (type < QPCTL && band == 0).
2578 * Add appropriate weighted data block sizes to queue count.
2579 * If queue hits high water mark then set QFULL flag.
2581 * If QNOENAB is not set (putq is allowed to enable the queue),
2582 * enable the queue only if the message is PRIORITY,
2583 * or the QWANTR flag is set (indicating that the service procedure
2584 * is ready to read the queue. This implies that a service
2585 * procedure must NEVER put a high priority message back on its own
2586 * queue, as this would result in an infinite loop (!).
2589 putq(queue_t *q, mblk_t *bp)
2591 mblk_t *tmp;
2592 qband_t *qbp = NULL;
2593 int mcls = (int)queclass(bp);
2594 kthread_id_t freezer;
2595 int bytecnt = 0, mblkcnt = 0;
2597 freezer = STREAM(q)->sd_freezer;
2598 if (freezer == curthread) {
2599 ASSERT(frozenstr(q));
2600 ASSERT(MUTEX_HELD(QLOCK(q)));
2601 } else
2602 mutex_enter(QLOCK(q));
2605 * Make sanity checks and if qband structure is not yet
2606 * allocated, do so.
2608 if (mcls == QPCTL) {
2609 if (bp->b_band != 0)
2610 bp->b_band = 0; /* force to be correct */
2611 } else if (bp->b_band != 0) {
2612 int i;
2613 qband_t **qbpp;
2615 if (bp->b_band > q->q_nband) {
2618 * The qband structure for this priority band is
2619 * not on the queue yet, so we have to allocate
2620 * one on the fly. It would be wasteful to
2621 * associate the qband structures with every
2622 * queue when the queues are allocated. This is
2623 * because most queues will only need the normal
2624 * band of flow which can be described entirely
2625 * by the queue itself.
2627 qbpp = &q->q_bandp;
2628 while (*qbpp)
2629 qbpp = &(*qbpp)->qb_next;
2630 while (bp->b_band > q->q_nband) {
2631 if ((*qbpp = allocband()) == NULL) {
2632 if (freezer != curthread)
2633 mutex_exit(QLOCK(q));
2634 return (0);
2636 (*qbpp)->qb_hiwat = q->q_hiwat;
2637 (*qbpp)->qb_lowat = q->q_lowat;
2638 q->q_nband++;
2639 qbpp = &(*qbpp)->qb_next;
2642 ASSERT(MUTEX_HELD(QLOCK(q)));
2643 qbp = q->q_bandp;
2644 i = bp->b_band;
2645 while (--i)
2646 qbp = qbp->qb_next;
2650 * If queue is empty, add the message and initialize the pointers.
2651 * Otherwise, adjust message pointers and queue pointers based on
2652 * the type of the message and where it belongs on the queue. Some
2653 * code is duplicated to minimize the number of conditionals and
2654 * hopefully minimize the amount of time this routine takes.
2656 if (!q->q_first) {
2657 bp->b_next = NULL;
2658 bp->b_prev = NULL;
2659 q->q_first = bp;
2660 q->q_last = bp;
2661 if (qbp) {
2662 qbp->qb_first = bp;
2663 qbp->qb_last = bp;
2665 } else if (!qbp) { /* bp->b_band == 0 */
2668 * If queue class of message is less than or equal to
2669 * that of the last one on the queue, tack on to the end.
2671 tmp = q->q_last;
2672 if (mcls <= (int)queclass(tmp)) {
2673 bp->b_next = NULL;
2674 bp->b_prev = tmp;
2675 tmp->b_next = bp;
2676 q->q_last = bp;
2677 } else {
2678 tmp = q->q_first;
2679 while ((int)queclass(tmp) >= mcls)
2680 tmp = tmp->b_next;
2683 * Insert bp before tmp.
2685 bp->b_next = tmp;
2686 bp->b_prev = tmp->b_prev;
2687 if (tmp->b_prev)
2688 tmp->b_prev->b_next = bp;
2689 else
2690 q->q_first = bp;
2691 tmp->b_prev = bp;
2693 } else { /* bp->b_band != 0 */
2694 if (qbp->qb_first) {
2695 tmp = qbp->qb_last;
2698 * Insert bp after the last message in this band.
2700 bp->b_next = tmp->b_next;
2701 if (tmp->b_next)
2702 tmp->b_next->b_prev = bp;
2703 else
2704 q->q_last = bp;
2705 bp->b_prev = tmp;
2706 tmp->b_next = bp;
2707 } else {
2708 tmp = q->q_last;
2709 if ((mcls < (int)queclass(tmp)) ||
2710 (bp->b_band <= tmp->b_band)) {
2713 * Tack bp on end of queue.
2715 bp->b_next = NULL;
2716 bp->b_prev = tmp;
2717 tmp->b_next = bp;
2718 q->q_last = bp;
2719 } else {
2720 tmp = q->q_first;
2721 while (tmp->b_datap->db_type >= QPCTL)
2722 tmp = tmp->b_next;
2723 while (tmp->b_band >= bp->b_band)
2724 tmp = tmp->b_next;
2727 * Insert bp before tmp.
2729 bp->b_next = tmp;
2730 bp->b_prev = tmp->b_prev;
2731 if (tmp->b_prev)
2732 tmp->b_prev->b_next = bp;
2733 else
2734 q->q_first = bp;
2735 tmp->b_prev = bp;
2737 qbp->qb_first = bp;
2739 qbp->qb_last = bp;
2742 /* Get message byte count for q_count accounting */
2743 bytecnt = mp_cont_len(bp, &mblkcnt);
2745 if (qbp) {
2746 qbp->qb_count += bytecnt;
2747 qbp->qb_mblkcnt += mblkcnt;
2748 if ((qbp->qb_count >= qbp->qb_hiwat) ||
2749 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2750 qbp->qb_flag |= QB_FULL;
2752 } else {
2753 q->q_count += bytecnt;
2754 q->q_mblkcnt += mblkcnt;
2755 if ((q->q_count >= q->q_hiwat) ||
2756 (q->q_mblkcnt >= q->q_hiwat)) {
2757 q->q_flag |= QFULL;
2761 STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, 0);
2763 if ((mcls > QNORM) ||
2764 (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2765 qenable_locked(q);
2766 ASSERT(MUTEX_HELD(QLOCK(q)));
2767 if (freezer != curthread)
2768 mutex_exit(QLOCK(q));
2770 return (1);
2774 * Put stuff back at beginning of Q according to priority order.
2775 * See comment on putq above for details.
2778 putbq(queue_t *q, mblk_t *bp)
2780 mblk_t *tmp;
2781 qband_t *qbp = NULL;
2782 int mcls = (int)queclass(bp);
2783 kthread_id_t freezer;
2784 int bytecnt = 0, mblkcnt = 0;
2786 ASSERT(q && bp);
2787 ASSERT(bp->b_next == NULL);
2788 freezer = STREAM(q)->sd_freezer;
2789 if (freezer == curthread) {
2790 ASSERT(frozenstr(q));
2791 ASSERT(MUTEX_HELD(QLOCK(q)));
2792 } else
2793 mutex_enter(QLOCK(q));
2796 * Make sanity checks and if qband structure is not yet
2797 * allocated, do so.
2799 if (mcls == QPCTL) {
2800 if (bp->b_band != 0)
2801 bp->b_band = 0; /* force to be correct */
2802 } else if (bp->b_band != 0) {
2803 int i;
2804 qband_t **qbpp;
2806 if (bp->b_band > q->q_nband) {
2807 qbpp = &q->q_bandp;
2808 while (*qbpp)
2809 qbpp = &(*qbpp)->qb_next;
2810 while (bp->b_band > q->q_nband) {
2811 if ((*qbpp = allocband()) == NULL) {
2812 if (freezer != curthread)
2813 mutex_exit(QLOCK(q));
2814 return (0);
2816 (*qbpp)->qb_hiwat = q->q_hiwat;
2817 (*qbpp)->qb_lowat = q->q_lowat;
2818 q->q_nband++;
2819 qbpp = &(*qbpp)->qb_next;
2822 qbp = q->q_bandp;
2823 i = bp->b_band;
2824 while (--i)
2825 qbp = qbp->qb_next;
2829 * If queue is empty or if message is high priority,
2830 * place on the front of the queue.
2832 tmp = q->q_first;
2833 if ((!tmp) || (mcls == QPCTL)) {
2834 bp->b_next = tmp;
2835 if (tmp)
2836 tmp->b_prev = bp;
2837 else
2838 q->q_last = bp;
2839 q->q_first = bp;
2840 bp->b_prev = NULL;
2841 if (qbp) {
2842 qbp->qb_first = bp;
2843 qbp->qb_last = bp;
2845 } else if (qbp) { /* bp->b_band != 0 */
2846 tmp = qbp->qb_first;
2847 if (tmp) {
2850 * Insert bp before the first message in this band.
2852 bp->b_next = tmp;
2853 bp->b_prev = tmp->b_prev;
2854 if (tmp->b_prev)
2855 tmp->b_prev->b_next = bp;
2856 else
2857 q->q_first = bp;
2858 tmp->b_prev = bp;
2859 } else {
2860 tmp = q->q_last;
2861 if ((mcls < (int)queclass(tmp)) ||
2862 (bp->b_band < tmp->b_band)) {
2865 * Tack bp on end of queue.
2867 bp->b_next = NULL;
2868 bp->b_prev = tmp;
2869 tmp->b_next = bp;
2870 q->q_last = bp;
2871 } else {
2872 tmp = q->q_first;
2873 while (tmp->b_datap->db_type >= QPCTL)
2874 tmp = tmp->b_next;
2875 while (tmp->b_band > bp->b_band)
2876 tmp = tmp->b_next;
2879 * Insert bp before tmp.
2881 bp->b_next = tmp;
2882 bp->b_prev = tmp->b_prev;
2883 if (tmp->b_prev)
2884 tmp->b_prev->b_next = bp;
2885 else
2886 q->q_first = bp;
2887 tmp->b_prev = bp;
2889 qbp->qb_last = bp;
2891 qbp->qb_first = bp;
2892 } else { /* bp->b_band == 0 && !QPCTL */
2895 * If the queue class or band is less than that of the last
2896 * message on the queue, tack bp on the end of the queue.
2898 tmp = q->q_last;
2899 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2900 bp->b_next = NULL;
2901 bp->b_prev = tmp;
2902 tmp->b_next = bp;
2903 q->q_last = bp;
2904 } else {
2905 tmp = q->q_first;
2906 while (tmp->b_datap->db_type >= QPCTL)
2907 tmp = tmp->b_next;
2908 while (tmp->b_band > bp->b_band)
2909 tmp = tmp->b_next;
2912 * Insert bp before tmp.
2914 bp->b_next = tmp;
2915 bp->b_prev = tmp->b_prev;
2916 if (tmp->b_prev)
2917 tmp->b_prev->b_next = bp;
2918 else
2919 q->q_first = bp;
2920 tmp->b_prev = bp;
2924 /* Get message byte count for q_count accounting */
2925 bytecnt = mp_cont_len(bp, &mblkcnt);
2927 if (qbp) {
2928 qbp->qb_count += bytecnt;
2929 qbp->qb_mblkcnt += mblkcnt;
2930 if ((qbp->qb_count >= qbp->qb_hiwat) ||
2931 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2932 qbp->qb_flag |= QB_FULL;
2934 } else {
2935 q->q_count += bytecnt;
2936 q->q_mblkcnt += mblkcnt;
2937 if ((q->q_count >= q->q_hiwat) ||
2938 (q->q_mblkcnt >= q->q_hiwat)) {
2939 q->q_flag |= QFULL;
2943 STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, 0);
2945 if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2946 qenable_locked(q);
2947 ASSERT(MUTEX_HELD(QLOCK(q)));
2948 if (freezer != curthread)
2949 mutex_exit(QLOCK(q));
2951 return (1);
2955 * Insert a message before an existing message on the queue. If the
2956 * existing message is NULL, the new messages is placed on the end of
2957 * the queue. The queue class of the new message is ignored. However,
2958 * the priority band of the new message must adhere to the following
2959 * ordering:
2961 * emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2963 * All flow control parameters are updated.
2965 * insq can be called with the stream frozen, but other utility functions
2966 * holding QLOCK, and by streams modules without any locks/frozen.
2969 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2971 mblk_t *tmp;
2972 qband_t *qbp = NULL;
2973 int mcls = (int)queclass(mp);
2974 kthread_id_t freezer;
2975 int bytecnt = 0, mblkcnt = 0;
2977 freezer = STREAM(q)->sd_freezer;
2978 if (freezer == curthread) {
2979 ASSERT(frozenstr(q));
2980 ASSERT(MUTEX_HELD(QLOCK(q)));
2981 } else if (MUTEX_HELD(QLOCK(q))) {
2982 /* Don't drop lock on exit */
2983 freezer = curthread;
2984 } else
2985 mutex_enter(QLOCK(q));
2987 if (mcls == QPCTL) {
2988 if (mp->b_band != 0)
2989 mp->b_band = 0; /* force to be correct */
2990 if (emp && emp->b_prev &&
2991 (emp->b_prev->b_datap->db_type < QPCTL))
2992 goto badord;
2994 if (emp) {
2995 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
2996 (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
2997 (emp->b_prev->b_band < mp->b_band))) {
2998 goto badord;
3000 } else {
3001 tmp = q->q_last;
3002 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
3003 badord:
3004 cmn_err(CE_WARN,
3005 "insq: attempt to insert message out of order "
3006 "on q %p", (void *)q);
3007 if (freezer != curthread)
3008 mutex_exit(QLOCK(q));
3009 return (0);
3013 if (mp->b_band != 0) {
3014 int i;
3015 qband_t **qbpp;
3017 if (mp->b_band > q->q_nband) {
3018 qbpp = &q->q_bandp;
3019 while (*qbpp)
3020 qbpp = &(*qbpp)->qb_next;
3021 while (mp->b_band > q->q_nband) {
3022 if ((*qbpp = allocband()) == NULL) {
3023 if (freezer != curthread)
3024 mutex_exit(QLOCK(q));
3025 return (0);
3027 (*qbpp)->qb_hiwat = q->q_hiwat;
3028 (*qbpp)->qb_lowat = q->q_lowat;
3029 q->q_nband++;
3030 qbpp = &(*qbpp)->qb_next;
3033 qbp = q->q_bandp;
3034 i = mp->b_band;
3035 while (--i)
3036 qbp = qbp->qb_next;
3039 if ((mp->b_next = emp) != NULL) {
3040 if ((mp->b_prev = emp->b_prev) != NULL)
3041 emp->b_prev->b_next = mp;
3042 else
3043 q->q_first = mp;
3044 emp->b_prev = mp;
3045 } else {
3046 if ((mp->b_prev = q->q_last) != NULL)
3047 q->q_last->b_next = mp;
3048 else
3049 q->q_first = mp;
3050 q->q_last = mp;
3053 /* Get mblk and byte count for q_count accounting */
3054 bytecnt = mp_cont_len(mp, &mblkcnt);
3056 if (qbp) { /* adjust qband pointers and count */
3057 if (!qbp->qb_first) {
3058 qbp->qb_first = mp;
3059 qbp->qb_last = mp;
3060 } else {
3061 if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3062 (mp->b_prev->b_band != mp->b_band)))
3063 qbp->qb_first = mp;
3064 else if (mp->b_next == NULL || (mp->b_next != NULL &&
3065 (mp->b_next->b_band != mp->b_band)))
3066 qbp->qb_last = mp;
3068 qbp->qb_count += bytecnt;
3069 qbp->qb_mblkcnt += mblkcnt;
3070 if ((qbp->qb_count >= qbp->qb_hiwat) ||
3071 (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3072 qbp->qb_flag |= QB_FULL;
3074 } else {
3075 q->q_count += bytecnt;
3076 q->q_mblkcnt += mblkcnt;
3077 if ((q->q_count >= q->q_hiwat) ||
3078 (q->q_mblkcnt >= q->q_hiwat)) {
3079 q->q_flag |= QFULL;
3083 STR_FTEVENT_MSG(mp, q, FTEV_INSQ, 0);
3085 if (canenable(q) && (q->q_flag & QWANTR))
3086 qenable_locked(q);
3088 ASSERT(MUTEX_HELD(QLOCK(q)));
3089 if (freezer != curthread)
3090 mutex_exit(QLOCK(q));
3092 return (1);
3096 * Create and put a control message on queue.
3099 putctl(queue_t *q, int type)
3101 mblk_t *bp;
3103 if ((datamsg(type) && (type != M_DELAY)) ||
3104 (bp = allocb_tryhard(0)) == NULL)
3105 return (0);
3106 bp->b_datap->db_type = (unsigned char) type;
3108 put(q, bp);
3110 return (1);
3114 * Control message with a single-byte parameter
3117 putctl1(queue_t *q, int type, int param)
3119 mblk_t *bp;
3121 if ((datamsg(type) && (type != M_DELAY)) ||
3122 (bp = allocb_tryhard(1)) == NULL)
3123 return (0);
3124 bp->b_datap->db_type = (unsigned char)type;
3125 *bp->b_wptr++ = (unsigned char)param;
3127 put(q, bp);
3129 return (1);
3133 putnextctl1(queue_t *q, int type, int param)
3135 mblk_t *bp;
3137 if ((datamsg(type) && (type != M_DELAY)) ||
3138 ((bp = allocb_tryhard(1)) == NULL))
3139 return (0);
3141 bp->b_datap->db_type = (unsigned char)type;
3142 *bp->b_wptr++ = (unsigned char)param;
3144 putnext(q, bp);
3146 return (1);
3150 putnextctl(queue_t *q, int type)
3152 mblk_t *bp;
3154 if ((datamsg(type) && (type != M_DELAY)) ||
3155 ((bp = allocb_tryhard(0)) == NULL))
3156 return (0);
3157 bp->b_datap->db_type = (unsigned char)type;
3159 putnext(q, bp);
3161 return (1);
3165 * Return the queue upstream from this one
3167 queue_t *
3168 backq(queue_t *q)
3170 q = _OTHERQ(q);
3171 if (q->q_next) {
3172 q = q->q_next;
3173 return (_OTHERQ(q));
3175 return (NULL);
3179 * Send a block back up the queue in reverse from this
3180 * one (e.g. to respond to ioctls)
3182 void
3183 qreply(queue_t *q, mblk_t *bp)
3185 ASSERT(q && bp);
3187 putnext(_OTHERQ(q), bp);
3191 * Streams Queue Scheduling
3193 * Queues are enabled through qenable() when they have messages to
3194 * process. They are serviced by queuerun(), which runs each enabled
3195 * queue's service procedure. The call to queuerun() is processor
3196 * dependent - the general principle is that it be run whenever a queue
3197 * is enabled but before returning to user level. For system calls,
3198 * the function runqueues() is called if their action causes a queue
3199 * to be enabled. For device interrupts, queuerun() should be
3200 * called before returning from the last level of interrupt. Beyond
3201 * this, no timing assumptions should be made about queue scheduling.
3205 * Enable a queue: put it on list of those whose service procedures are
3206 * ready to run and set up the scheduling mechanism.
3207 * The broadcast is done outside the mutex -> to avoid the woken thread
3208 * from contending with the mutex. This is OK 'cos the queue has been
3209 * enqueued on the runlist and flagged safely at this point.
3211 void
3212 qenable(queue_t *q)
3214 mutex_enter(QLOCK(q));
3215 qenable_locked(q);
3216 mutex_exit(QLOCK(q));
3219 * Return number of messages on queue
3222 qsize(queue_t *qp)
3224 int count = 0;
3225 mblk_t *mp;
3227 mutex_enter(QLOCK(qp));
3228 for (mp = qp->q_first; mp; mp = mp->b_next)
3229 count++;
3230 mutex_exit(QLOCK(qp));
3231 return (count);
3235 * noenable - set queue so that putq() will not enable it.
3236 * enableok - set queue so that putq() can enable it.
3238 void
3239 noenable(queue_t *q)
3241 mutex_enter(QLOCK(q));
3242 q->q_flag |= QNOENB;
3243 mutex_exit(QLOCK(q));
3246 void
3247 enableok(queue_t *q)
3249 mutex_enter(QLOCK(q));
3250 q->q_flag &= ~QNOENB;
3251 mutex_exit(QLOCK(q));
3255 * Set queue fields.
3258 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3260 qband_t *qbp = NULL;
3261 queue_t *wrq;
3262 int error = 0;
3263 kthread_id_t freezer;
3265 freezer = STREAM(q)->sd_freezer;
3266 if (freezer == curthread) {
3267 ASSERT(frozenstr(q));
3268 ASSERT(MUTEX_HELD(QLOCK(q)));
3269 } else
3270 mutex_enter(QLOCK(q));
3272 if (what >= QBAD) {
3273 error = EINVAL;
3274 goto done;
3276 if (pri != 0) {
3277 int i;
3278 qband_t **qbpp;
3280 if (pri > q->q_nband) {
3281 qbpp = &q->q_bandp;
3282 while (*qbpp)
3283 qbpp = &(*qbpp)->qb_next;
3284 while (pri > q->q_nband) {
3285 if ((*qbpp = allocband()) == NULL) {
3286 error = EAGAIN;
3287 goto done;
3289 (*qbpp)->qb_hiwat = q->q_hiwat;
3290 (*qbpp)->qb_lowat = q->q_lowat;
3291 q->q_nband++;
3292 qbpp = &(*qbpp)->qb_next;
3295 qbp = q->q_bandp;
3296 i = pri;
3297 while (--i)
3298 qbp = qbp->qb_next;
3300 switch (what) {
3302 case QHIWAT:
3303 if (qbp)
3304 qbp->qb_hiwat = (size_t)val;
3305 else
3306 q->q_hiwat = (size_t)val;
3307 break;
3309 case QLOWAT:
3310 if (qbp)
3311 qbp->qb_lowat = (size_t)val;
3312 else
3313 q->q_lowat = (size_t)val;
3314 break;
3316 case QMAXPSZ:
3317 if (qbp)
3318 error = EINVAL;
3319 else
3320 q->q_maxpsz = (ssize_t)val;
3323 * Performance concern, strwrite looks at the module below
3324 * the stream head for the maxpsz each time it does a write
3325 * we now cache it at the stream head. Check to see if this
3326 * queue is sitting directly below the stream head.
3328 wrq = STREAM(q)->sd_wrq;
3329 if (q != wrq->q_next)
3330 break;
3333 * If the stream is not frozen drop the current QLOCK and
3334 * acquire the sd_wrq QLOCK which protects sd_qn_*
3336 if (freezer != curthread) {
3337 mutex_exit(QLOCK(q));
3338 mutex_enter(QLOCK(wrq));
3340 ASSERT(MUTEX_HELD(QLOCK(wrq)));
3342 if (strmsgsz != 0) {
3343 if (val == INFPSZ)
3344 val = strmsgsz;
3345 else {
3346 if (STREAM(q)->sd_vnode->v_type == VFIFO)
3347 val = MIN(PIPE_BUF, val);
3348 else
3349 val = MIN(strmsgsz, val);
3352 STREAM(q)->sd_qn_maxpsz = val;
3353 if (freezer != curthread) {
3354 mutex_exit(QLOCK(wrq));
3355 mutex_enter(QLOCK(q));
3357 break;
3359 case QMINPSZ:
3360 if (qbp)
3361 error = EINVAL;
3362 else
3363 q->q_minpsz = (ssize_t)val;
3366 * Performance concern, strwrite looks at the module below
3367 * the stream head for the maxpsz each time it does a write
3368 * we now cache it at the stream head. Check to see if this
3369 * queue is sitting directly below the stream head.
3371 wrq = STREAM(q)->sd_wrq;
3372 if (q != wrq->q_next)
3373 break;
3376 * If the stream is not frozen drop the current QLOCK and
3377 * acquire the sd_wrq QLOCK which protects sd_qn_*
3379 if (freezer != curthread) {
3380 mutex_exit(QLOCK(q));
3381 mutex_enter(QLOCK(wrq));
3383 STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3385 if (freezer != curthread) {
3386 mutex_exit(QLOCK(wrq));
3387 mutex_enter(QLOCK(q));
3389 break;
3391 case QSTRUIOT:
3392 if (qbp)
3393 error = EINVAL;
3394 else
3395 q->q_struiot = (ushort_t)val;
3396 break;
3398 case QCOUNT:
3399 case QFIRST:
3400 case QLAST:
3401 case QFLAG:
3402 error = EPERM;
3403 break;
3405 default:
3406 error = EINVAL;
3407 break;
3409 done:
3410 if (freezer != curthread)
3411 mutex_exit(QLOCK(q));
3412 return (error);
3416 * Get queue fields.
3419 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3421 qband_t *qbp = NULL;
3422 int error = 0;
3423 kthread_id_t freezer;
3425 freezer = STREAM(q)->sd_freezer;
3426 if (freezer == curthread) {
3427 ASSERT(frozenstr(q));
3428 ASSERT(MUTEX_HELD(QLOCK(q)));
3429 } else
3430 mutex_enter(QLOCK(q));
3431 if (what >= QBAD) {
3432 error = EINVAL;
3433 goto done;
3435 if (pri != 0) {
3436 int i;
3437 qband_t **qbpp;
3439 if (pri > q->q_nband) {
3440 qbpp = &q->q_bandp;
3441 while (*qbpp)
3442 qbpp = &(*qbpp)->qb_next;
3443 while (pri > q->q_nband) {
3444 if ((*qbpp = allocband()) == NULL) {
3445 error = EAGAIN;
3446 goto done;
3448 (*qbpp)->qb_hiwat = q->q_hiwat;
3449 (*qbpp)->qb_lowat = q->q_lowat;
3450 q->q_nband++;
3451 qbpp = &(*qbpp)->qb_next;
3454 qbp = q->q_bandp;
3455 i = pri;
3456 while (--i)
3457 qbp = qbp->qb_next;
3459 switch (what) {
3460 case QHIWAT:
3461 if (qbp)
3462 *(size_t *)valp = qbp->qb_hiwat;
3463 else
3464 *(size_t *)valp = q->q_hiwat;
3465 break;
3467 case QLOWAT:
3468 if (qbp)
3469 *(size_t *)valp = qbp->qb_lowat;
3470 else
3471 *(size_t *)valp = q->q_lowat;
3472 break;
3474 case QMAXPSZ:
3475 if (qbp)
3476 error = EINVAL;
3477 else
3478 *(ssize_t *)valp = q->q_maxpsz;
3479 break;
3481 case QMINPSZ:
3482 if (qbp)
3483 error = EINVAL;
3484 else
3485 *(ssize_t *)valp = q->q_minpsz;
3486 break;
3488 case QCOUNT:
3489 if (qbp)
3490 *(size_t *)valp = qbp->qb_count;
3491 else
3492 *(size_t *)valp = q->q_count;
3493 break;
3495 case QFIRST:
3496 if (qbp)
3497 *(mblk_t **)valp = qbp->qb_first;
3498 else
3499 *(mblk_t **)valp = q->q_first;
3500 break;
3502 case QLAST:
3503 if (qbp)
3504 *(mblk_t **)valp = qbp->qb_last;
3505 else
3506 *(mblk_t **)valp = q->q_last;
3507 break;
3509 case QFLAG:
3510 if (qbp)
3511 *(uint_t *)valp = qbp->qb_flag;
3512 else
3513 *(uint_t *)valp = q->q_flag;
3514 break;
3516 case QSTRUIOT:
3517 if (qbp)
3518 error = EINVAL;
3519 else
3520 *(short *)valp = q->q_struiot;
3521 break;
3523 default:
3524 error = EINVAL;
3525 break;
3527 done:
3528 if (freezer != curthread)
3529 mutex_exit(QLOCK(q));
3530 return (error);
3534 * Function awakes all in cvwait/sigwait/pollwait, on one of:
3535 * QWANTWSYNC or QWANTR or QWANTW,
3537 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3538 * deferred wakeup will be done. Also if strpoll() in progress then a
3539 * deferred pollwakeup will be done.
3541 void
3542 strwakeq(queue_t *q, int flag)
3544 stdata_t *stp = STREAM(q);
3545 pollhead_t *pl;
3547 mutex_enter(&stp->sd_lock);
3548 pl = &stp->sd_pollist;
3549 if (flag & QWANTWSYNC) {
3550 ASSERT(!(q->q_flag & QREADR));
3551 if (stp->sd_flag & WSLEEP) {
3552 stp->sd_flag &= ~WSLEEP;
3553 cv_broadcast(&stp->sd_wrq->q_wait);
3554 } else {
3555 stp->sd_wakeq |= WSLEEP;
3558 mutex_exit(&stp->sd_lock);
3559 pollwakeup(pl, POLLWRNORM);
3560 mutex_enter(&stp->sd_lock);
3562 if (stp->sd_sigflags & S_WRNORM)
3563 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3564 } else if (flag & QWANTR) {
3565 if (stp->sd_flag & RSLEEP) {
3566 stp->sd_flag &= ~RSLEEP;
3567 cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3568 } else {
3569 stp->sd_wakeq |= RSLEEP;
3572 mutex_exit(&stp->sd_lock);
3573 pollwakeup(pl, POLLIN | POLLRDNORM);
3574 mutex_enter(&stp->sd_lock);
3577 int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3579 if (events)
3580 strsendsig(stp->sd_siglist, events, 0, 0);
3582 } else {
3583 if (stp->sd_flag & WSLEEP) {
3584 stp->sd_flag &= ~WSLEEP;
3585 cv_broadcast(&stp->sd_wrq->q_wait);
3588 mutex_exit(&stp->sd_lock);
3589 pollwakeup(pl, POLLWRNORM);
3590 mutex_enter(&stp->sd_lock);
3592 if (stp->sd_sigflags & S_WRNORM)
3593 strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3595 mutex_exit(&stp->sd_lock);
3599 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3601 stdata_t *stp = STREAM(q);
3602 int typ = STRUIOT_STANDARD;
3603 uio_t *uiop = &dp->d_uio;
3604 dblk_t *dbp;
3605 ssize_t uiocnt;
3606 ssize_t cnt;
3607 unsigned char *ptr;
3608 ssize_t resid;
3609 int error = 0;
3610 on_trap_data_t otd;
3611 queue_t *stwrq;
3614 * Plumbing may change while taking the type so store the
3615 * queue in a temporary variable. It doesn't matter even
3616 * if the we take the type from the previous plumbing,
3617 * that's because if the plumbing has changed when we were
3618 * holding the queue in a temporary variable, we can continue
3619 * processing the message the way it would have been processed
3620 * in the old plumbing, without any side effects but a bit
3621 * extra processing for partial ip header checksum.
3623 * This has been done to avoid holding the sd_lock which is
3624 * very hot.
3627 stwrq = stp->sd_struiowrq;
3628 if (stwrq)
3629 typ = stwrq->q_struiot;
3631 for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3632 dbp = mp->b_datap;
3633 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3634 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3635 cnt = MIN(uiocnt, uiop->uio_resid);
3636 if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3637 (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3639 * Either this mblk has already been processed
3640 * or there is no more room in this mblk (?).
3642 continue;
3644 switch (typ) {
3645 case STRUIOT_STANDARD:
3646 if (noblock) {
3647 if (on_trap(&otd, OT_DATA_ACCESS)) {
3648 no_trap();
3649 error = EWOULDBLOCK;
3650 goto out;
3653 if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3654 if (noblock)
3655 no_trap();
3656 goto out;
3658 if (noblock)
3659 no_trap();
3660 break;
3662 default:
3663 error = EIO;
3664 goto out;
3666 dbp->db_struioflag |= STRUIO_DONE;
3667 dbp->db_cksumstuff += cnt;
3669 out:
3670 if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3672 * A fault has occured and some bytes were moved to the
3673 * current mblk, the uio_t has already been updated by
3674 * the appropriate uio routine, so also update the mblk
3675 * to reflect this in case this same mblk chain is used
3676 * again (after the fault has been handled).
3678 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3679 if (uiocnt >= resid)
3680 dbp->db_cksumstuff += resid;
3682 return (error);
3686 * Try to enter queue synchronously. Any attempt to enter a closing queue will
3687 * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3688 * that removeq() will not try to close the queue while a thread is inside the
3689 * queue.
3691 static boolean_t
3692 rwnext_enter(queue_t *qp)
3694 mutex_enter(QLOCK(qp));
3695 if (qp->q_flag & QWCLOSE) {
3696 mutex_exit(QLOCK(qp));
3697 return (B_FALSE);
3699 qp->q_rwcnt++;
3700 ASSERT(qp->q_rwcnt != 0);
3701 mutex_exit(QLOCK(qp));
3702 return (B_TRUE);
3706 * Decrease the count of threads running in sync stream queue and wake up any
3707 * threads blocked in removeq().
3709 static void
3710 rwnext_exit(queue_t *qp)
3712 mutex_enter(QLOCK(qp));
3713 qp->q_rwcnt--;
3714 if (qp->q_flag & QWANTRMQSYNC) {
3715 qp->q_flag &= ~QWANTRMQSYNC;
3716 cv_broadcast(&qp->q_wait);
3718 mutex_exit(QLOCK(qp));
3722 * The purpose of rwnext() is to call the rw procedure of the next
3723 * (downstream) modules queue.
3725 * treated as put entrypoint for perimeter syncronization.
3727 * There's no need to grab sq_putlocks here (which only exist for CIPUT
3728 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3729 * not matter if any regular put entrypoints have been already entered. We
3730 * can't increment one of the sq_putcounts (instead of sq_count) because
3731 * qwait_rw won't know which counter to decrement.
3733 * It would be reasonable to add the lockless FASTPUT logic.
3736 rwnext(queue_t *qp, struiod_t *dp)
3738 queue_t *nqp;
3739 syncq_t *sq;
3740 uint16_t count;
3741 uint16_t flags;
3742 struct qinit *qi;
3743 int (*proc)();
3744 struct stdata *stp;
3745 int isread;
3746 int rval;
3748 stp = STREAM(qp);
3750 * Prevent q_next from changing by holding sd_lock until acquiring
3751 * SQLOCK. Note that a read-side rwnext from the streamhead will
3752 * already have sd_lock acquired. In either case sd_lock is always
3753 * released after acquiring SQLOCK.
3755 * The streamhead read-side holding sd_lock when calling rwnext is
3756 * required to prevent a race condition were M_DATA mblks flowing
3757 * up the read-side of the stream could be bypassed by a rwnext()
3758 * down-call. In this case sd_lock acts as the streamhead perimeter.
3760 if ((nqp = _WR(qp)) == qp) {
3761 isread = 0;
3762 mutex_enter(&stp->sd_lock);
3763 qp = nqp->q_next;
3764 } else {
3765 isread = 1;
3766 if (nqp != stp->sd_wrq)
3767 /* Not streamhead */
3768 mutex_enter(&stp->sd_lock);
3769 qp = _RD(nqp->q_next);
3771 qi = qp->q_qinfo;
3772 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3774 * Not a synchronous module or no r/w procedure for this
3775 * queue, so just return EINVAL and let the caller handle it.
3777 mutex_exit(&stp->sd_lock);
3778 return (EINVAL);
3781 if (rwnext_enter(qp) == B_FALSE) {
3782 mutex_exit(&stp->sd_lock);
3783 return (EINVAL);
3786 sq = qp->q_syncq;
3787 mutex_enter(SQLOCK(sq));
3788 mutex_exit(&stp->sd_lock);
3789 count = sq->sq_count;
3790 flags = sq->sq_flags;
3791 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3793 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3795 * if this queue is being closed, return.
3797 if (qp->q_flag & QWCLOSE) {
3798 mutex_exit(SQLOCK(sq));
3799 rwnext_exit(qp);
3800 return (EINVAL);
3804 * Wait until we can enter the inner perimeter.
3806 sq->sq_flags = flags | SQ_WANTWAKEUP;
3807 cv_wait(&sq->sq_wait, SQLOCK(sq));
3808 count = sq->sq_count;
3809 flags = sq->sq_flags;
3812 if (isread == 0 && stp->sd_struiowrq == NULL ||
3813 isread == 1 && stp->sd_struiordq == NULL) {
3815 * Stream plumbing changed while waiting for inner perimeter
3816 * so just return EINVAL and let the caller handle it.
3818 mutex_exit(SQLOCK(sq));
3819 rwnext_exit(qp);
3820 return (EINVAL);
3822 if (!(flags & SQ_CIPUT))
3823 sq->sq_flags = flags | SQ_EXCL;
3824 sq->sq_count = count + 1;
3825 ASSERT(sq->sq_count != 0); /* Wraparound */
3827 * Note: The only message ordering guarantee that rwnext() makes is
3828 * for the write queue flow-control case. All others (r/w queue
3829 * with q_count > 0 (or q_first != 0)) are the resposibilty of
3830 * the queue's rw procedure. This could be genralized here buy
3831 * running the queue's service procedure, but that wouldn't be
3832 * the most efficent for all cases.
3834 mutex_exit(SQLOCK(sq));
3835 if (! isread && (qp->q_flag & QFULL)) {
3837 * Write queue may be flow controlled. If so,
3838 * mark the queue for wakeup when it's not.
3840 mutex_enter(QLOCK(qp));
3841 if (qp->q_flag & QFULL) {
3842 qp->q_flag |= QWANTWSYNC;
3843 mutex_exit(QLOCK(qp));
3844 rval = EWOULDBLOCK;
3845 goto out;
3847 mutex_exit(QLOCK(qp));
3850 if (! isread && dp->d_mp)
3851 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3852 dp->d_mp->b_datap->db_base);
3854 rval = (*proc)(qp, dp);
3856 if (isread && dp->d_mp)
3857 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3858 dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3859 out:
3861 * The queue is protected from being freed by sq_count, so it is
3862 * safe to call rwnext_exit and reacquire SQLOCK(sq).
3864 rwnext_exit(qp);
3866 mutex_enter(SQLOCK(sq));
3867 flags = sq->sq_flags;
3868 ASSERT(sq->sq_count != 0);
3869 sq->sq_count--;
3870 if (flags & SQ_TAIL) {
3871 putnext_tail(sq, qp, flags);
3873 * The only purpose of this ASSERT is to preserve calling stack
3874 * in DEBUG kernel.
3876 ASSERT(flags & SQ_TAIL);
3877 return (rval);
3879 ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3881 * Safe to always drop SQ_EXCL:
3882 * Not SQ_CIPUT means we set SQ_EXCL above
3883 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3884 * did a qwriter(INNER) in which case nobody else
3885 * is in the inner perimeter and we are exiting.
3887 * I would like to make the following assertion:
3889 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3890 * sq->sq_count == 0);
3892 * which indicates that if we are both putshared and exclusive,
3893 * we became exclusive while executing the putproc, and the only
3894 * claim on the syncq was the one we dropped a few lines above.
3895 * But other threads that enter putnext while the syncq is exclusive
3896 * need to make a claim as they may need to drop SQLOCK in the
3897 * has_writers case to avoid deadlocks. If these threads are
3898 * delayed or preempted, it is possible that the writer thread can
3899 * find out that there are other claims making the (sq_count == 0)
3900 * test invalid.
3903 sq->sq_flags = flags & ~SQ_EXCL;
3904 if (sq->sq_flags & SQ_WANTWAKEUP) {
3905 sq->sq_flags &= ~SQ_WANTWAKEUP;
3906 cv_broadcast(&sq->sq_wait);
3908 mutex_exit(SQLOCK(sq));
3909 return (rval);
3913 * The purpose of infonext() is to call the info procedure of the next
3914 * (downstream) modules queue.
3916 * treated as put entrypoint for perimeter syncronization.
3918 * There's no need to grab sq_putlocks here (which only exist for CIPUT
3919 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3920 * it does not matter if any regular put entrypoints have been already
3921 * entered.
3924 infonext(queue_t *qp, infod_t *idp)
3926 queue_t *nqp;
3927 syncq_t *sq;
3928 uint16_t count;
3929 uint16_t flags;
3930 struct qinit *qi;
3931 int (*proc)();
3932 struct stdata *stp;
3933 int rval;
3935 stp = STREAM(qp);
3937 * Prevent q_next from changing by holding sd_lock until
3938 * acquiring SQLOCK.
3940 mutex_enter(&stp->sd_lock);
3941 if ((nqp = _WR(qp)) == qp) {
3942 qp = nqp->q_next;
3943 } else {
3944 qp = _RD(nqp->q_next);
3946 qi = qp->q_qinfo;
3947 if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3948 mutex_exit(&stp->sd_lock);
3949 return (EINVAL);
3951 sq = qp->q_syncq;
3952 mutex_enter(SQLOCK(sq));
3953 mutex_exit(&stp->sd_lock);
3954 count = sq->sq_count;
3955 flags = sq->sq_flags;
3956 ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3958 while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3960 * Wait until we can enter the inner perimeter.
3962 sq->sq_flags = flags | SQ_WANTWAKEUP;
3963 cv_wait(&sq->sq_wait, SQLOCK(sq));
3964 count = sq->sq_count;
3965 flags = sq->sq_flags;
3968 if (! (flags & SQ_CIPUT))
3969 sq->sq_flags = flags | SQ_EXCL;
3970 sq->sq_count = count + 1;
3971 ASSERT(sq->sq_count != 0); /* Wraparound */
3972 mutex_exit(SQLOCK(sq));
3974 rval = (*proc)(qp, idp);
3976 mutex_enter(SQLOCK(sq));
3977 flags = sq->sq_flags;
3978 ASSERT(sq->sq_count != 0);
3979 sq->sq_count--;
3980 if (flags & SQ_TAIL) {
3981 putnext_tail(sq, qp, flags);
3983 * The only purpose of this ASSERT is to preserve calling stack
3984 * in DEBUG kernel.
3986 ASSERT(flags & SQ_TAIL);
3987 return (rval);
3989 ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3991 * XXXX
3992 * I am not certain the next comment is correct here. I need to consider
3993 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
3994 * might cause other problems. It just might be safer to drop it if
3995 * !SQ_CIPUT because that is when we set it.
3998 * Safe to always drop SQ_EXCL:
3999 * Not SQ_CIPUT means we set SQ_EXCL above
4000 * For SQ_CIPUT SQ_EXCL will only be set if the put procedure
4001 * did a qwriter(INNER) in which case nobody else
4002 * is in the inner perimeter and we are exiting.
4004 * I would like to make the following assertion:
4006 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
4007 * sq->sq_count == 0);
4009 * which indicates that if we are both putshared and exclusive,
4010 * we became exclusive while executing the putproc, and the only
4011 * claim on the syncq was the one we dropped a few lines above.
4012 * But other threads that enter putnext while the syncq is exclusive
4013 * need to make a claim as they may need to drop SQLOCK in the
4014 * has_writers case to avoid deadlocks. If these threads are
4015 * delayed or preempted, it is possible that the writer thread can
4016 * find out that there are other claims making the (sq_count == 0)
4017 * test invalid.
4020 sq->sq_flags = flags & ~SQ_EXCL;
4021 mutex_exit(SQLOCK(sq));
4022 return (rval);
4026 * Return nonzero if the queue is responsible for struio(), else return 0.
4029 isuioq(queue_t *q)
4031 if (q->q_flag & QREADR)
4032 return (STREAM(q)->sd_struiordq == q);
4033 else
4034 return (STREAM(q)->sd_struiowrq == q);
4037 int disable_putlocks = 1;
4040 * called by create_putlock.
4042 static void
4043 create_syncq_putlocks(queue_t *q)
4045 syncq_t *sq = q->q_syncq;
4046 ciputctrl_t *cip;
4047 int i;
4049 ASSERT(sq != NULL);
4051 ASSERT(disable_putlocks == 0);
4052 ASSERT(n_ciputctrl >= min_n_ciputctrl);
4053 ASSERT(ciputctrl_cache != NULL);
4055 if (!(sq->sq_type & SQ_CIPUT))
4056 return;
4058 for (i = 0; i <= 1; i++) {
4059 if (sq->sq_ciputctrl == NULL) {
4060 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4061 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4062 mutex_enter(SQLOCK(sq));
4063 if (sq->sq_ciputctrl != NULL) {
4064 mutex_exit(SQLOCK(sq));
4065 kmem_cache_free(ciputctrl_cache, cip);
4066 } else {
4067 ASSERT(sq->sq_nciputctrl == 0);
4068 sq->sq_nciputctrl = n_ciputctrl - 1;
4070 * putnext checks sq_ciputctrl without holding
4071 * SQLOCK. if it is not NULL putnext assumes
4072 * sq_nciputctrl is initialized. membar below
4073 * insures that.
4075 membar_producer();
4076 sq->sq_ciputctrl = cip;
4077 mutex_exit(SQLOCK(sq));
4080 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
4081 if (i == 1)
4082 break;
4083 q = _OTHERQ(q);
4084 if (!(q->q_flag & QPERQ)) {
4085 ASSERT(sq == q->q_syncq);
4086 break;
4088 ASSERT(q->q_syncq != NULL);
4089 ASSERT(sq != q->q_syncq);
4090 sq = q->q_syncq;
4091 ASSERT(sq->sq_type & SQ_CIPUT);
4096 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
4097 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
4098 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
4099 * starting from q and down to the driver.
4101 * This should be called after the affected queues are part of stream
4102 * geometry. It should be called from driver/module open routine after
4103 * qprocson() call. It is also called from nfs syscall where it is known that
4104 * stream is configured and won't change its geometry during create_putlock
4105 * call.
4107 * caller normally uses 0 value for the stream argument to speed up MT putnext
4108 * into the perimeter of q for example because its perimeter is per module
4109 * (e.g. IP).
4111 * caller normally uses non 0 value for the stream argument to hint the system
4112 * that the stream of q is a very contended global system stream
4113 * (e.g. NFS/UDP) and the part of the stream from q to the driver is
4114 * particularly MT hot.
4116 * Caller insures stream plumbing won't happen while we are here and therefore
4117 * q_next can be safely used.
4120 void
4121 create_putlocks(queue_t *q, int stream)
4123 ciputctrl_t *cip;
4124 struct stdata *stp = STREAM(q);
4126 q = _WR(q);
4127 ASSERT(stp != NULL);
4129 if (disable_putlocks != 0)
4130 return;
4132 if (n_ciputctrl < min_n_ciputctrl)
4133 return;
4135 ASSERT(ciputctrl_cache != NULL);
4137 if (stream != 0 && stp->sd_ciputctrl == NULL) {
4138 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4139 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4140 mutex_enter(&stp->sd_lock);
4141 if (stp->sd_ciputctrl != NULL) {
4142 mutex_exit(&stp->sd_lock);
4143 kmem_cache_free(ciputctrl_cache, cip);
4144 } else {
4145 ASSERT(stp->sd_nciputctrl == 0);
4146 stp->sd_nciputctrl = n_ciputctrl - 1;
4148 * putnext checks sd_ciputctrl without holding
4149 * sd_lock. if it is not NULL putnext assumes
4150 * sd_nciputctrl is initialized. membar below
4151 * insures that.
4153 membar_producer();
4154 stp->sd_ciputctrl = cip;
4155 mutex_exit(&stp->sd_lock);
4159 ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4161 while (_SAMESTR(q)) {
4162 create_syncq_putlocks(q);
4163 if (stream == 0)
4164 return;
4165 q = q->q_next;
4167 ASSERT(q != NULL);
4168 create_syncq_putlocks(q);
4172 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4173 * through a stream.
4175 * Data currently record per-event is a timestamp, module/driver name,
4176 * downstream module/driver name, optional callstack, event type and a per
4177 * type datum. Much of the STREAMS framework is instrumented for automatic
4178 * flow tracing (when enabled). Events can be defined and used by STREAMS
4179 * modules and drivers.
4181 * Global objects:
4183 * str_ftevent() - Add a flow-trace event to a dblk.
4184 * str_ftfree() - Free flow-trace data
4186 * Local objects:
4188 * fthdr_cache - pointer to the kmem cache for trace header.
4189 * ftblk_cache - pointer to the kmem cache for trace data blocks.
4192 int str_ftnever = 1; /* Don't do STREAMS flow tracing */
4193 int str_ftstack = 0; /* Don't record event call stacks */
4195 void
4196 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4198 ftblk_t *bp = hp->tail;
4199 ftblk_t *nbp;
4200 ftevnt_t *ep;
4201 int ix, nix;
4203 ASSERT(hp != NULL);
4205 for (;;) {
4206 if ((ix = bp->ix) == FTBLK_EVNTS) {
4208 * Tail doesn't have room, so need a new tail.
4210 * To make this MT safe, first, allocate a new
4211 * ftblk, and initialize it. To make life a
4212 * little easier, reserve the first slot (mostly
4213 * by making ix = 1). When we are finished with
4214 * the initialization, CAS this pointer to the
4215 * tail. If this succeeds, this is the new
4216 * "next" block. Otherwise, another thread
4217 * got here first, so free the block and start
4218 * again.
4220 nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4221 if (nbp == NULL) {
4222 /* no mem, so punt */
4223 str_ftnever++;
4224 /* free up all flow data? */
4225 return;
4227 nbp->nxt = NULL;
4228 nbp->ix = 1;
4230 * Just in case there is another thread about
4231 * to get the next index, we need to make sure
4232 * the value is there for it.
4234 membar_producer();
4235 if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) {
4236 /* CAS was successful */
4237 bp->nxt = nbp;
4238 membar_producer();
4239 bp = nbp;
4240 ix = 0;
4241 goto cas_good;
4242 } else {
4243 kmem_cache_free(ftblk_cache, nbp);
4244 bp = hp->tail;
4245 continue;
4248 nix = ix + 1;
4249 if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) {
4250 cas_good:
4251 if (curthread != hp->thread) {
4252 hp->thread = curthread;
4253 evnt |= FTEV_CS;
4255 if (CPU->cpu_seqid != hp->cpu_seqid) {
4256 hp->cpu_seqid = CPU->cpu_seqid;
4257 evnt |= FTEV_PS;
4259 ep = &bp->ev[ix];
4260 break;
4264 if (evnt & FTEV_QMASK) {
4265 queue_t *qp = p;
4267 if (!(qp->q_flag & QREADR))
4268 evnt |= FTEV_ISWR;
4270 ep->mid = Q2NAME(qp);
4273 * We only record the next queue name for FTEV_PUTNEXT since
4274 * that's the only time we *really* need it, and the putnext()
4275 * code ensures that qp->q_next won't vanish. (We could use
4276 * claimstr()/releasestr() but at a performance cost.)
4278 if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4279 ep->midnext = Q2NAME(qp->q_next);
4280 else
4281 ep->midnext = NULL;
4282 } else {
4283 ep->mid = p;
4284 ep->midnext = NULL;
4287 if (ep->stk != NULL)
4288 ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4290 ep->ts = gethrtime();
4291 ep->evnt = evnt;
4292 ep->data = data;
4293 hp->hash = (hp->hash << 9) + hp->hash;
4294 hp->hash += (evnt << 16) | data;
4295 hp->hash += (uintptr_t)ep->mid;
4299 * Free flow-trace data.
4301 void
4302 str_ftfree(dblk_t *dbp)
4304 fthdr_t *hp = dbp->db_fthdr;
4305 ftblk_t *bp = &hp->first;
4306 ftblk_t *nbp;
4308 if (bp != hp->tail || bp->ix != 0) {
4310 * Clear out the hash, have the tail point to itself, and free
4311 * any continuation blocks.
4313 bp = hp->first.nxt;
4314 hp->tail = &hp->first;
4315 hp->hash = 0;
4316 hp->first.nxt = NULL;
4317 hp->first.ix = 0;
4318 while (bp != NULL) {
4319 nbp = bp->nxt;
4320 kmem_cache_free(ftblk_cache, bp);
4321 bp = nbp;
4324 kmem_cache_free(fthdr_cache, hp);
4325 dbp->db_fthdr = NULL;