view usr/src/uts/common/io/stream.c @ 3932:efce29b04ab4

6522339 Performance enhancement with enhanced esballoc design
author ss146032
date Fri, 30 Mar 2007 04:14:30 -0700
parents 163b8acfdadd
children 96c808cd96bc
line wrap: on
line source

/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License (the "License").
 * You may not use this file except in compliance with the License.
 *
 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
 * or http://www.opensolaris.org/os/licensing.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information: Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 */
/*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
/*	  All Rights Reserved  	*/


/*
 * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
 * Use is subject to license terms.
 */

#pragma ident	"%Z%%M%	%I%	%E% SMI"

#include <sys/types.h>
#include <sys/param.h>
#include <sys/thread.h>
#include <sys/sysmacros.h>
#include <sys/stropts.h>
#include <sys/stream.h>
#include <sys/strsubr.h>
#include <sys/strsun.h>
#include <sys/conf.h>
#include <sys/debug.h>
#include <sys/cmn_err.h>
#include <sys/kmem.h>
#include <sys/atomic.h>
#include <sys/errno.h>
#include <sys/vtrace.h>
#include <sys/ftrace.h>
#include <sys/ontrap.h>
#include <sys/multidata.h>
#include <sys/multidata_impl.h>
#include <sys/sdt.h>
#include <sys/strft.h>

#ifdef DEBUG
#include <sys/kmem_impl.h>
#endif

/*
 * This file contains all the STREAMS utility routines that may
 * be used by modules and drivers.
 */

/*
 * STREAMS message allocator: principles of operation
 *
 * The streams message allocator consists of all the routines that
 * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
 * dupb(), freeb() and freemsg().  What follows is a high-level view
 * of how the allocator works.
 *
 * Every streams message consists of one or more mblks, a dblk, and data.
 * All mblks for all types of messages come from a common mblk_cache.
 * The dblk and data come in several flavors, depending on how the
 * message is allocated:
 *
 * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
 *     fixed-size dblk/data caches. For message sizes that are multiples of
 *     PAGESIZE, dblks are allocated separately from the buffer.
 *     The associated buffer is allocated by the constructor using kmem_alloc().
 *     For all other message sizes, dblk and its associated data is allocated
 *     as a single contiguous chunk of memory.
 *     Objects in these caches consist of a dblk plus its associated data.
 *     allocb() determines the nearest-size cache by table lookup:
 *     the dblk_cache[] array provides the mapping from size to dblk cache.
 *
 * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
 *     kmem_alloc()'ing a buffer for the data and supplying that
 *     buffer to gesballoc(), described below.
 *
 * (3) The four flavors of [d]esballoc[a] are all implemented by a
 *     common routine, gesballoc() ("generic esballoc").  gesballoc()
 *     allocates a dblk from the global dblk_esb_cache and sets db_base,
 *     db_lim and db_frtnp to describe the caller-supplied buffer.
 *
 * While there are several routines to allocate messages, there is only
 * one routine to free messages: freeb().  freeb() simply invokes the
 * dblk's free method, dbp->db_free(), which is set at allocation time.
 *
 * dupb() creates a new reference to a message by allocating a new mblk,
 * incrementing the dblk reference count and setting the dblk's free
 * method to dblk_decref().  The dblk's original free method is retained
 * in db_lastfree.  dblk_decref() decrements the reference count on each
 * freeb().  If this is not the last reference it just frees the mblk;
 * if this *is* the last reference, it restores db_free to db_lastfree,
 * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
 *
 * The implementation makes aggressive use of kmem object caching for
 * maximum performance.  This makes the code simple and compact, but
 * also a bit abstruse in some places.  The invariants that constitute a
 * message's constructed state, described below, are more subtle than usual.
 *
 * Every dblk has an "attached mblk" as part of its constructed state.
 * The mblk is allocated by the dblk's constructor and remains attached
 * until the message is either dup'ed or pulled up.  In the dupb() case
 * the mblk association doesn't matter until the last free, at which time
 * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
 * the mblk association because it swaps the leading mblks of two messages,
 * so it is responsible for swapping their db_mblk pointers accordingly.
 * From a constructed-state viewpoint it doesn't matter that a dblk's
 * attached mblk can change while the message is allocated; all that
 * matters is that the dblk has *some* attached mblk when it's freed.
 *
 * The sizes of the allocb() small-message caches are not magical.
 * They represent a good trade-off between internal and external
 * fragmentation for current workloads.  They should be reevaluated
 * periodically, especially if allocations larger than DBLK_MAX_CACHE
 * become common.  We use 64-byte alignment so that dblks don't
 * straddle cache lines unnecessarily.
 */
#define	DBLK_MAX_CACHE		73728
#define	DBLK_CACHE_ALIGN	64
#define	DBLK_MIN_SIZE		8
#define	DBLK_SIZE_SHIFT		3

#ifdef _BIG_ENDIAN
#define	DBLK_RTFU_SHIFT(field)	\
	(8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
#else
#define	DBLK_RTFU_SHIFT(field)	\
	(8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
#endif

#define	DBLK_RTFU(ref, type, flags, uioflag)	\
	(((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
	((type) << DBLK_RTFU_SHIFT(db_type)) | \
	(((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
	((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
#define	DBLK_RTFU_REF_MASK	(DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
#define	DBLK_RTFU_WORD(dbp)	(*((uint32_t *)&(dbp)->db_ref))
#define	MBLK_BAND_FLAG_WORD(mp)	(*((uint32_t *)&(mp)->b_band))

static size_t dblk_sizes[] = {
#ifdef _LP64
	16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3920,
	8192, 12112, 16384, 20304, 24576, 28496, 32768, 36688,
	40960, 44880, 49152, 53072, 57344, 61264, 65536, 69456,
#else
	64, 128, 320, 576, 1088, 1536, 1984, 2624, 3968,
	8192, 12160, 16384, 20352, 24576, 28544, 32768, 36736,
	40960, 44928, 49152, 53120, 57344, 61312, 65536, 69504,
#endif
	DBLK_MAX_CACHE, 0
};

static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
static struct kmem_cache *mblk_cache;
static struct kmem_cache *dblk_esb_cache;
static struct kmem_cache *fthdr_cache;
static struct kmem_cache *ftblk_cache;

static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
static mblk_t *allocb_oversize(size_t size, int flags);
static int allocb_tryhard_fails;
static void frnop_func(void *arg);
frtn_t frnop = { frnop_func };
static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);

static boolean_t rwnext_enter(queue_t *qp);
static void rwnext_exit(queue_t *qp);

/*
 * Patchable mblk/dblk kmem_cache flags.
 */
int dblk_kmem_flags = 0;
int mblk_kmem_flags = 0;


static int
dblk_constructor(void *buf, void *cdrarg, int kmflags)
{
	dblk_t *dbp = buf;
	ssize_t msg_size = (ssize_t)cdrarg;
	size_t index;

	ASSERT(msg_size != 0);

	index = (msg_size - 1) >> DBLK_SIZE_SHIFT;

	ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));

	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
		return (-1);
	if ((msg_size & PAGEOFFSET) == 0) {
		dbp->db_base = kmem_alloc(msg_size, kmflags);
		if (dbp->db_base == NULL) {
			kmem_cache_free(mblk_cache, dbp->db_mblk);
			return (-1);
		}
	} else {
		dbp->db_base = (unsigned char *)&dbp[1];
	}

	dbp->db_mblk->b_datap = dbp;
	dbp->db_cache = dblk_cache[index];
	dbp->db_lim = dbp->db_base + msg_size;
	dbp->db_free = dbp->db_lastfree = dblk_lastfree;
	dbp->db_frtnp = NULL;
	dbp->db_fthdr = NULL;
	dbp->db_credp = NULL;
	dbp->db_cpid = -1;
	dbp->db_struioflag = 0;
	dbp->db_struioun.cksum.flags = 0;
	return (0);
}

/*ARGSUSED*/
static int
dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
{
	dblk_t *dbp = buf;

	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
		return (-1);
	dbp->db_mblk->b_datap = dbp;
	dbp->db_cache = dblk_esb_cache;
	dbp->db_fthdr = NULL;
	dbp->db_credp = NULL;
	dbp->db_cpid = -1;
	dbp->db_struioflag = 0;
	dbp->db_struioun.cksum.flags = 0;
	return (0);
}

static int
bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
{
	dblk_t *dbp = buf;
	bcache_t *bcp = (bcache_t *)cdrarg;

	if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
		return (-1);

	if ((dbp->db_base = (unsigned char *)kmem_cache_alloc(bcp->buffer_cache,
	    kmflags)) == NULL) {
		kmem_cache_free(mblk_cache, dbp->db_mblk);
		return (-1);
	}

	dbp->db_mblk->b_datap = dbp;
	dbp->db_cache = (void *)bcp;
	dbp->db_lim = dbp->db_base + bcp->size;
	dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
	dbp->db_frtnp = NULL;
	dbp->db_fthdr = NULL;
	dbp->db_credp = NULL;
	dbp->db_cpid = -1;
	dbp->db_struioflag = 0;
	dbp->db_struioun.cksum.flags = 0;
	return (0);
}

/*ARGSUSED*/
static void
dblk_destructor(void *buf, void *cdrarg)
{
	dblk_t *dbp = buf;
	ssize_t msg_size = (ssize_t)cdrarg;

	ASSERT(dbp->db_mblk->b_datap == dbp);

	ASSERT(msg_size != 0);

	ASSERT(dbp->db_struioflag == 0);
	ASSERT(dbp->db_struioun.cksum.flags == 0);

	if ((msg_size & PAGEOFFSET) == 0) {
		kmem_free(dbp->db_base, msg_size);
	}

	kmem_cache_free(mblk_cache, dbp->db_mblk);
}

static void
bcache_dblk_destructor(void *buf, void *cdrarg)
{
	dblk_t *dbp = buf;
	bcache_t *bcp = (bcache_t *)cdrarg;

	kmem_cache_free(bcp->buffer_cache, dbp->db_base);

	ASSERT(dbp->db_mblk->b_datap == dbp);

	ASSERT(dbp->db_struioflag == 0);
	ASSERT(dbp->db_struioun.cksum.flags == 0);

	kmem_cache_free(mblk_cache, dbp->db_mblk);
}

void
streams_msg_init(void)
{
	char name[40];
	size_t size;
	size_t lastsize = DBLK_MIN_SIZE;
	size_t *sizep;
	struct kmem_cache *cp;
	size_t tot_size;
	int offset;

	mblk_cache = kmem_cache_create("streams_mblk",
		sizeof (mblk_t), 32, NULL, NULL, NULL, NULL, NULL,
		mblk_kmem_flags);

	for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {

		if ((offset = (size & PAGEOFFSET)) != 0) {
			/*
			 * We are in the middle of a page, dblk should
			 * be allocated on the same page
			 */
			tot_size = size + sizeof (dblk_t);
			ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
								< PAGESIZE);
			ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);

		} else {

			/*
			 * buf size is multiple of page size, dblk and
			 * buffer are allocated separately.
			 */

			ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
			tot_size = sizeof (dblk_t);
		}

		(void) sprintf(name, "streams_dblk_%ld", size);
		cp = kmem_cache_create(name, tot_size,
			DBLK_CACHE_ALIGN, dblk_constructor,
			dblk_destructor, NULL,
			(void *)(size), NULL, dblk_kmem_flags);

		while (lastsize <= size) {
			dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
			lastsize += DBLK_MIN_SIZE;
		}
	}

	dblk_esb_cache = kmem_cache_create("streams_dblk_esb",
			sizeof (dblk_t), DBLK_CACHE_ALIGN,
			dblk_esb_constructor, dblk_destructor, NULL,
			(void *) sizeof (dblk_t), NULL, dblk_kmem_flags);
	fthdr_cache = kmem_cache_create("streams_fthdr",
		sizeof (fthdr_t), 32, NULL, NULL, NULL, NULL, NULL, 0);
	ftblk_cache = kmem_cache_create("streams_ftblk",
		sizeof (ftblk_t), 32, NULL, NULL, NULL, NULL, NULL, 0);

	/* Initialize Multidata caches */
	mmd_init();

	/* initialize throttling queue for esballoc */
	esballoc_queue_init();
}

/*ARGSUSED*/
mblk_t *
allocb(size_t size, uint_t pri)
{
	dblk_t *dbp;
	mblk_t *mp;
	size_t index;

	index =  (size - 1)  >> DBLK_SIZE_SHIFT;

	if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
		if (size != 0) {
			mp = allocb_oversize(size, KM_NOSLEEP);
			goto out;
		}
		index = 0;
	}

	if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
		mp = NULL;
		goto out;
	}

	mp = dbp->db_mblk;
	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
	mp->b_next = mp->b_prev = mp->b_cont = NULL;
	mp->b_rptr = mp->b_wptr = dbp->db_base;
	mp->b_queue = NULL;
	MBLK_BAND_FLAG_WORD(mp) = 0;
	STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
out:
	FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);

	return (mp);
}

mblk_t *
allocb_tmpl(size_t size, const mblk_t *tmpl)
{
	mblk_t *mp = allocb(size, 0);

	if (mp != NULL) {
		cred_t *cr = DB_CRED(tmpl);
		if (cr != NULL)
			crhold(mp->b_datap->db_credp = cr);
		DB_CPID(mp) = DB_CPID(tmpl);
		DB_TYPE(mp) = DB_TYPE(tmpl);
	}
	return (mp);
}

mblk_t *
allocb_cred(size_t size, cred_t *cr)
{
	mblk_t *mp = allocb(size, 0);

	if (mp != NULL && cr != NULL)
		crhold(mp->b_datap->db_credp = cr);

	return (mp);
}

mblk_t *
allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr)
{
	mblk_t *mp = allocb_wait(size, 0, flags, error);

	if (mp != NULL && cr != NULL)
		crhold(mp->b_datap->db_credp = cr);

	return (mp);
}

void
freeb(mblk_t *mp)
{
	dblk_t *dbp = mp->b_datap;

	ASSERT(dbp->db_ref > 0);
	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
	FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);

	STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);

	dbp->db_free(mp, dbp);
}

void
freemsg(mblk_t *mp)
{
	FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
	while (mp) {
		dblk_t *dbp = mp->b_datap;
		mblk_t *mp_cont = mp->b_cont;

		ASSERT(dbp->db_ref > 0);
		ASSERT(mp->b_next == NULL && mp->b_prev == NULL);

		STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);

		dbp->db_free(mp, dbp);
		mp = mp_cont;
	}
}

/*
 * Reallocate a block for another use.  Try hard to use the old block.
 * If the old data is wanted (copy), leave b_wptr at the end of the data,
 * otherwise return b_wptr = b_rptr.
 *
 * This routine is private and unstable.
 */
mblk_t	*
reallocb(mblk_t *mp, size_t size, uint_t copy)
{
	mblk_t		*mp1;
	unsigned char	*old_rptr;
	ptrdiff_t	cur_size;

	if (mp == NULL)
		return (allocb(size, BPRI_HI));

	cur_size = mp->b_wptr - mp->b_rptr;
	old_rptr = mp->b_rptr;

	ASSERT(mp->b_datap->db_ref != 0);

	if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
		/*
		 * If the data is wanted and it will fit where it is, no
		 * work is required.
		 */
		if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
			return (mp);

		mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
		mp1 = mp;
	} else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
		/* XXX other mp state could be copied too, db_flags ... ? */
		mp1->b_cont = mp->b_cont;
	} else {
		return (NULL);
	}

	if (copy) {
		bcopy(old_rptr, mp1->b_rptr, cur_size);
		mp1->b_wptr = mp1->b_rptr + cur_size;
	}

	if (mp != mp1)
		freeb(mp);

	return (mp1);
}

static void
dblk_lastfree(mblk_t *mp, dblk_t *dbp)
{
	ASSERT(dbp->db_mblk == mp);
	if (dbp->db_fthdr != NULL)
		str_ftfree(dbp);

	/* set credp and projid to be 'unspecified' before returning to cache */
	if (dbp->db_credp != NULL) {
		crfree(dbp->db_credp);
		dbp->db_credp = NULL;
	}
	dbp->db_cpid = -1;

	/* Reset the struioflag and the checksum flag fields */
	dbp->db_struioflag = 0;
	dbp->db_struioun.cksum.flags = 0;

	/* and the COOKED flag */
	dbp->db_flags &= ~DBLK_COOKED;

	kmem_cache_free(dbp->db_cache, dbp);
}

static void
dblk_decref(mblk_t *mp, dblk_t *dbp)
{
	if (dbp->db_ref != 1) {
		uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
		    -(1 << DBLK_RTFU_SHIFT(db_ref)));
		/*
		 * atomic_add_32_nv() just decremented db_ref, so we no longer
		 * have a reference to the dblk, which means another thread
		 * could free it.  Therefore we cannot examine the dblk to
		 * determine whether ours was the last reference.  Instead,
		 * we extract the new and minimum reference counts from rtfu.
		 * Note that all we're really saying is "if (ref != refmin)".
		 */
		if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
		    ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
			kmem_cache_free(mblk_cache, mp);
			return;
		}
	}
	dbp->db_mblk = mp;
	dbp->db_free = dbp->db_lastfree;
	dbp->db_lastfree(mp, dbp);
}

mblk_t *
dupb(mblk_t *mp)
{
	dblk_t *dbp = mp->b_datap;
	mblk_t *new_mp;
	uint32_t oldrtfu, newrtfu;

	if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
		goto out;

	new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
	new_mp->b_rptr = mp->b_rptr;
	new_mp->b_wptr = mp->b_wptr;
	new_mp->b_datap = dbp;
	new_mp->b_queue = NULL;
	MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);

	STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);

	dbp->db_free = dblk_decref;
	do {
		ASSERT(dbp->db_ref > 0);
		oldrtfu = DBLK_RTFU_WORD(dbp);
		newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
		/*
		 * If db_ref is maxed out we can't dup this message anymore.
		 */
		if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
			kmem_cache_free(mblk_cache, new_mp);
			new_mp = NULL;
			goto out;
		}
	} while (cas32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) != oldrtfu);

out:
	FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
	return (new_mp);
}

static void
dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
{
	frtn_t *frp = dbp->db_frtnp;

	ASSERT(dbp->db_mblk == mp);
	frp->free_func(frp->free_arg);
	if (dbp->db_fthdr != NULL)
		str_ftfree(dbp);

	/* set credp and projid to be 'unspecified' before returning to cache */
	if (dbp->db_credp != NULL) {
		crfree(dbp->db_credp);
		dbp->db_credp = NULL;
	}
	dbp->db_cpid = -1;
	dbp->db_struioflag = 0;
	dbp->db_struioun.cksum.flags = 0;

	kmem_cache_free(dbp->db_cache, dbp);
}

/*ARGSUSED*/
static void
frnop_func(void *arg)
{
}

/*
 * Generic esballoc used to implement the four flavors: [d]esballoc[a].
 */
static mblk_t *
gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
	void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
{
	dblk_t *dbp;
	mblk_t *mp;

	ASSERT(base != NULL && frp != NULL);

	if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
		mp = NULL;
		goto out;
	}

	mp = dbp->db_mblk;
	dbp->db_base = base;
	dbp->db_lim = base + size;
	dbp->db_free = dbp->db_lastfree = lastfree;
	dbp->db_frtnp = frp;
	DBLK_RTFU_WORD(dbp) = db_rtfu;
	mp->b_next = mp->b_prev = mp->b_cont = NULL;
	mp->b_rptr = mp->b_wptr = base;
	mp->b_queue = NULL;
	MBLK_BAND_FLAG_WORD(mp) = 0;

out:
	FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
	return (mp);
}

/*ARGSUSED*/
mblk_t *
esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
{
	mblk_t *mp;

	/*
	 * Note that this is structured to allow the common case (i.e.
	 * STREAMS flowtracing disabled) to call gesballoc() with tail
	 * call optimization.
	 */
	if (!str_ftnever) {
		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
		    frp, freebs_enqueue, KM_NOSLEEP);

		if (mp != NULL)
			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
		return (mp);
	}

	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
	    frp, freebs_enqueue, KM_NOSLEEP));
}

/*
 * Same as esballoc() but sleeps waiting for memory.
 */
/*ARGSUSED*/
mblk_t *
esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
{
	mblk_t *mp;

	/*
	 * Note that this is structured to allow the common case (i.e.
	 * STREAMS flowtracing disabled) to call gesballoc() with tail
	 * call optimization.
	 */
	if (!str_ftnever) {
		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
		    frp, freebs_enqueue, KM_SLEEP);

		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
		return (mp);
	}

	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
	    frp, freebs_enqueue, KM_SLEEP));
}

/*ARGSUSED*/
mblk_t *
desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
{
	mblk_t *mp;

	/*
	 * Note that this is structured to allow the common case (i.e.
	 * STREAMS flowtracing disabled) to call gesballoc() with tail
	 * call optimization.
	 */
	if (!str_ftnever) {
		mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
			frp, dblk_lastfree_desb, KM_NOSLEEP);

		if (mp != NULL)
			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
		return (mp);
	}

	return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
	    frp, dblk_lastfree_desb, KM_NOSLEEP));
}

/*ARGSUSED*/
mblk_t *
esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
{
	mblk_t *mp;

	/*
	 * Note that this is structured to allow the common case (i.e.
	 * STREAMS flowtracing disabled) to call gesballoc() with tail
	 * call optimization.
	 */
	if (!str_ftnever) {
		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
		    frp, freebs_enqueue, KM_NOSLEEP);

		if (mp != NULL)
			STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
		return (mp);
	}

	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
	    frp, freebs_enqueue, KM_NOSLEEP));
}

/*ARGSUSED*/
mblk_t *
desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
{
	mblk_t *mp;

	/*
	 * Note that this is structured to allow the common case (i.e.
	 * STREAMS flowtracing disabled) to call gesballoc() with tail
	 * call optimization.
	 */
	if (!str_ftnever) {
		mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
		    frp, dblk_lastfree_desb, KM_NOSLEEP);

		if (mp != NULL)
			STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
		return (mp);
	}

	return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
	    frp, dblk_lastfree_desb, KM_NOSLEEP));
}

static void
bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
{
	bcache_t *bcp = dbp->db_cache;

	ASSERT(dbp->db_mblk == mp);
	if (dbp->db_fthdr != NULL)
		str_ftfree(dbp);

	/* set credp and projid to be 'unspecified' before returning to cache */
	if (dbp->db_credp != NULL) {
		crfree(dbp->db_credp);
		dbp->db_credp = NULL;
	}
	dbp->db_cpid = -1;
	dbp->db_struioflag = 0;
	dbp->db_struioun.cksum.flags = 0;

	mutex_enter(&bcp->mutex);
	kmem_cache_free(bcp->dblk_cache, dbp);
	bcp->alloc--;

	if (bcp->alloc == 0 && bcp->destroy != 0) {
		kmem_cache_destroy(bcp->dblk_cache);
		kmem_cache_destroy(bcp->buffer_cache);
		mutex_exit(&bcp->mutex);
		mutex_destroy(&bcp->mutex);
		kmem_free(bcp, sizeof (bcache_t));
	} else {
		mutex_exit(&bcp->mutex);
	}
}

bcache_t *
bcache_create(char *name, size_t size, uint_t align)
{
	bcache_t *bcp;
	char buffer[255];

	ASSERT((align & (align - 1)) == 0);

	if ((bcp = (bcache_t *)kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) ==
	    NULL) {
		return (NULL);
	}

	bcp->size = size;
	bcp->align = align;
	bcp->alloc = 0;
	bcp->destroy = 0;

	mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);

	(void) sprintf(buffer, "%s_buffer_cache", name);
	bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
	    NULL, NULL, NULL, 0);
	(void) sprintf(buffer, "%s_dblk_cache", name);
	bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
	    DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
						NULL, (void *)bcp, NULL, 0);

	return (bcp);
}

void
bcache_destroy(bcache_t *bcp)
{
	ASSERT(bcp != NULL);

	mutex_enter(&bcp->mutex);
	if (bcp->alloc == 0) {
		kmem_cache_destroy(bcp->dblk_cache);
		kmem_cache_destroy(bcp->buffer_cache);
		mutex_exit(&bcp->mutex);
		mutex_destroy(&bcp->mutex);
		kmem_free(bcp, sizeof (bcache_t));
	} else {
		bcp->destroy++;
		mutex_exit(&bcp->mutex);
	}
}

/*ARGSUSED*/
mblk_t *
bcache_allocb(bcache_t *bcp, uint_t pri)
{
	dblk_t *dbp;
	mblk_t *mp = NULL;

	ASSERT(bcp != NULL);

	mutex_enter(&bcp->mutex);
	if (bcp->destroy != 0) {
		mutex_exit(&bcp->mutex);
		goto out;
	}

	if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
		mutex_exit(&bcp->mutex);
		goto out;
	}
	bcp->alloc++;
	mutex_exit(&bcp->mutex);

	ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);

	mp = dbp->db_mblk;
	DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
	mp->b_next = mp->b_prev = mp->b_cont = NULL;
	mp->b_rptr = mp->b_wptr = dbp->db_base;
	mp->b_queue = NULL;
	MBLK_BAND_FLAG_WORD(mp) = 0;
	STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
out:
	FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);

	return (mp);
}

static void
dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
{
	ASSERT(dbp->db_mblk == mp);
	if (dbp->db_fthdr != NULL)
		str_ftfree(dbp);

	/* set credp and projid to be 'unspecified' before returning to cache */
	if (dbp->db_credp != NULL) {
		crfree(dbp->db_credp);
		dbp->db_credp = NULL;
	}
	dbp->db_cpid = -1;
	dbp->db_struioflag = 0;
	dbp->db_struioun.cksum.flags = 0;

	kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
	kmem_cache_free(dbp->db_cache, dbp);
}

static mblk_t *
allocb_oversize(size_t size, int kmflags)
{
	mblk_t *mp;
	void *buf;

	size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
	if ((buf = kmem_alloc(size, kmflags)) == NULL)
		return (NULL);
	if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
	    &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
		kmem_free(buf, size);

	if (mp != NULL)
		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);

	return (mp);
}

mblk_t *
allocb_tryhard(size_t target_size)
{
	size_t size;
	mblk_t *bp;

	for (size = target_size; size < target_size + 512;
	    size += DBLK_CACHE_ALIGN)
		if ((bp = allocb(size, BPRI_HI)) != NULL)
			return (bp);
	allocb_tryhard_fails++;
	return (NULL);
}

/*
 * This routine is consolidation private for STREAMS internal use
 * This routine may only be called from sync routines (i.e., not
 * from put or service procedures).  It is located here (rather
 * than strsubr.c) so that we don't have to expose all of the
 * allocb() implementation details in header files.
 */
mblk_t *
allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
{
	dblk_t *dbp;
	mblk_t *mp;
	size_t index;

	index = (size -1) >> DBLK_SIZE_SHIFT;

	if (flags & STR_NOSIG) {
		if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
			if (size != 0) {
				mp = allocb_oversize(size, KM_SLEEP);
				FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
				    (uintptr_t)mp);
				return (mp);
			}
			index = 0;
		}

		dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
		mp = dbp->db_mblk;
		DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
		mp->b_next = mp->b_prev = mp->b_cont = NULL;
		mp->b_rptr = mp->b_wptr = dbp->db_base;
		mp->b_queue = NULL;
		MBLK_BAND_FLAG_WORD(mp) = 0;
		STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);

		FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);

	} else {
		while ((mp = allocb(size, pri)) == NULL) {
			if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
				return (NULL);
		}
	}

	return (mp);
}

/*
 * Call function 'func' with 'arg' when a class zero block can
 * be allocated with priority 'pri'.
 */
bufcall_id_t
esbbcall(uint_t pri, void (*func)(void *), void *arg)
{
	return (bufcall(1, pri, func, arg));
}

/*
 * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
 * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
 * This provides consistency for all internal allocators of ioctl.
 */
mblk_t *
mkiocb(uint_t cmd)
{
	struct iocblk	*ioc;
	mblk_t		*mp;

	/*
	 * Allocate enough space for any of the ioctl related messages.
	 */
	if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
		return (NULL);

	bzero(mp->b_rptr, sizeof (union ioctypes));

	/*
	 * Set the mblk_t information and ptrs correctly.
	 */
	mp->b_wptr += sizeof (struct iocblk);
	mp->b_datap->db_type = M_IOCTL;

	/*
	 * Fill in the fields.
	 */
	ioc		= (struct iocblk *)mp->b_rptr;
	ioc->ioc_cmd	= cmd;
	ioc->ioc_cr	= kcred;
	ioc->ioc_id	= getiocseqno();
	ioc->ioc_flag	= IOC_NATIVE;
	return (mp);
}

/*
 * test if block of given size can be allocated with a request of
 * the given priority.
 * 'pri' is no longer used, but is retained for compatibility.
 */
/* ARGSUSED */
int
testb(size_t size, uint_t pri)
{
	return ((size + sizeof (dblk_t)) <= kmem_avail());
}

/*
 * Call function 'func' with argument 'arg' when there is a reasonably
 * good chance that a block of size 'size' can be allocated.
 * 'pri' is no longer used, but is retained for compatibility.
 */
/* ARGSUSED */
bufcall_id_t
bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
{
	static long bid = 1;	/* always odd to save checking for zero */
	bufcall_id_t bc_id;
	struct strbufcall *bcp;

	if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
		return (0);

	bcp->bc_func = func;
	bcp->bc_arg = arg;
	bcp->bc_size = size;
	bcp->bc_next = NULL;
	bcp->bc_executor = NULL;

	mutex_enter(&strbcall_lock);
	/*
	 * After bcp is linked into strbcalls and strbcall_lock is dropped there
	 * should be no references to bcp since it may be freed by
	 * runbufcalls(). Since bcp_id field is returned, we save its value in
	 * the local var.
	 */
	bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);	/* keep it odd */

	/*
	 * add newly allocated stream event to existing
	 * linked list of events.
	 */
	if (strbcalls.bc_head == NULL) {
		strbcalls.bc_head = strbcalls.bc_tail = bcp;
	} else {
		strbcalls.bc_tail->bc_next = bcp;
		strbcalls.bc_tail = bcp;
	}

	cv_signal(&strbcall_cv);
	mutex_exit(&strbcall_lock);
	return (bc_id);
}

/*
 * Cancel a bufcall request.
 */
void
unbufcall(bufcall_id_t id)
{
	strbufcall_t *bcp, *pbcp;

	mutex_enter(&strbcall_lock);
again:
	pbcp = NULL;
	for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
		if (id == bcp->bc_id)
			break;
		pbcp = bcp;
	}
	if (bcp) {
		if (bcp->bc_executor != NULL) {
			if (bcp->bc_executor != curthread) {
				cv_wait(&bcall_cv, &strbcall_lock);
				goto again;
			}
		} else {
			if (pbcp)
				pbcp->bc_next = bcp->bc_next;
			else
				strbcalls.bc_head = bcp->bc_next;
			if (bcp == strbcalls.bc_tail)
				strbcalls.bc_tail = pbcp;
			kmem_free(bcp, sizeof (strbufcall_t));
		}
	}
	mutex_exit(&strbcall_lock);
}

/*
 * Duplicate a message block by block (uses dupb), returning
 * a pointer to the duplicate message.
 * Returns a non-NULL value only if the entire message
 * was dup'd.
 */
mblk_t *
dupmsg(mblk_t *bp)
{
	mblk_t *head, *nbp;

	if (!bp || !(nbp = head = dupb(bp)))
		return (NULL);

	while (bp->b_cont) {
		if (!(nbp->b_cont = dupb(bp->b_cont))) {
			freemsg(head);
			return (NULL);
		}
		nbp = nbp->b_cont;
		bp = bp->b_cont;
	}
	return (head);
}

#define	DUPB_NOLOAN(bp) \
	((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
	copyb((bp)) : dupb((bp)))

mblk_t *
dupmsg_noloan(mblk_t *bp)
{
	mblk_t *head, *nbp;

	if (bp == NULL || DB_TYPE(bp) != M_DATA ||
	    ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
		return (NULL);

	while (bp->b_cont) {
		if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
			freemsg(head);
			return (NULL);
		}
		nbp = nbp->b_cont;
		bp = bp->b_cont;
	}
	return (head);
}

/*
 * Copy data from message and data block to newly allocated message and
 * data block. Returns new message block pointer, or NULL if error.
 * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
 * as in the original even when db_base is not word aligned. (bug 1052877)
 */
mblk_t *
copyb(mblk_t *bp)
{
	mblk_t	*nbp;
	dblk_t	*dp, *ndp;
	uchar_t *base;
	size_t	size;
	size_t	unaligned;

	ASSERT(bp->b_wptr >= bp->b_rptr);

	dp = bp->b_datap;
	if (dp->db_fthdr != NULL)
		STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);

	/*
	 * Special handling for Multidata message; this should be
	 * removed once a copy-callback routine is made available.
	 */
	if (dp->db_type == M_MULTIDATA) {
		cred_t *cr;

		if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
			return (NULL);

		nbp->b_flag = bp->b_flag;
		nbp->b_band = bp->b_band;
		ndp = nbp->b_datap;

		/* See comments below on potential issues. */
		STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);

		ASSERT(ndp->db_type == dp->db_type);
		cr = dp->db_credp;
		if (cr != NULL)
			crhold(ndp->db_credp = cr);
		ndp->db_cpid = dp->db_cpid;
		return (nbp);
	}

	size = dp->db_lim - dp->db_base;
	unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
	if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
		return (NULL);
	nbp->b_flag = bp->b_flag;
	nbp->b_band = bp->b_band;
	ndp = nbp->b_datap;

	/*
	 * Well, here is a potential issue.  If we are trying to
	 * trace a flow, and we copy the message, we might lose
	 * information about where this message might have been.
	 * So we should inherit the FT data.  On the other hand,
	 * a user might be interested only in alloc to free data.
	 * So I guess the real answer is to provide a tunable.
	 */
	STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);

	base = ndp->db_base + unaligned;
	bcopy(dp->db_base, ndp->db_base + unaligned, size);

	nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
	nbp->b_wptr = nbp->b_rptr + MBLKL(bp);

	return (nbp);
}

/*
 * Copy data from message to newly allocated message using new
 * data blocks.  Returns a pointer to the new message, or NULL if error.
 */
mblk_t *
copymsg(mblk_t *bp)
{
	mblk_t *head, *nbp;

	if (!bp || !(nbp = head = copyb(bp)))
		return (NULL);

	while (bp->b_cont) {
		if (!(nbp->b_cont = copyb(bp->b_cont))) {
			freemsg(head);
			return (NULL);
		}
		nbp = nbp->b_cont;
		bp = bp->b_cont;
	}
	return (head);
}

/*
 * link a message block to tail of message
 */
void
linkb(mblk_t *mp, mblk_t *bp)
{
	ASSERT(mp && bp);

	for (; mp->b_cont; mp = mp->b_cont)
		;
	mp->b_cont = bp;
}

/*
 * unlink a message block from head of message
 * return pointer to new message.
 * NULL if message becomes empty.
 */
mblk_t *
unlinkb(mblk_t *bp)
{
	mblk_t *bp1;

	bp1 = bp->b_cont;
	bp->b_cont = NULL;
	return (bp1);
}

/*
 * remove a message block "bp" from message "mp"
 *
 * Return pointer to new message or NULL if no message remains.
 * Return -1 if bp is not found in message.
 */
mblk_t *
rmvb(mblk_t *mp, mblk_t *bp)
{
	mblk_t *tmp;
	mblk_t *lastp = NULL;

	ASSERT(mp && bp);
	for (tmp = mp; tmp; tmp = tmp->b_cont) {
		if (tmp == bp) {
			if (lastp)
				lastp->b_cont = tmp->b_cont;
			else
				mp = tmp->b_cont;
			tmp->b_cont = NULL;
			return (mp);
		}
		lastp = tmp;
	}
	return ((mblk_t *)-1);
}

/*
 * Concatenate and align first len bytes of common
 * message type.  Len == -1, means concat everything.
 * Returns 1 on success, 0 on failure
 * After the pullup, mp points to the pulled up data.
 */
int
pullupmsg(mblk_t *mp, ssize_t len)
{
	mblk_t *bp, *b_cont;
	dblk_t *dbp;
	ssize_t n;

	ASSERT(mp->b_datap->db_ref > 0);
	ASSERT(mp->b_next == NULL && mp->b_prev == NULL);

	/*
	 * We won't handle Multidata message, since it contains
	 * metadata which this function has no knowledge of; we
	 * assert on DEBUG, and return failure otherwise.
	 */
	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
	if (mp->b_datap->db_type == M_MULTIDATA)
		return (0);

	if (len == -1) {
		if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
			return (1);
		len = xmsgsize(mp);
	} else {
		ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
		ASSERT(first_mblk_len >= 0);
		/*
		 * If the length is less than that of the first mblk,
		 * we want to pull up the message into an aligned mblk.
		 * Though not part of the spec, some callers assume it.
		 */
		if (len <= first_mblk_len) {
			if (str_aligned(mp->b_rptr))
				return (1);
			len = first_mblk_len;
		} else if (xmsgsize(mp) < len)
			return (0);
	}

	if ((bp = allocb_tmpl(len, mp)) == NULL)
		return (0);

	dbp = bp->b_datap;
	*bp = *mp;		/* swap mblks so bp heads the old msg... */
	mp->b_datap = dbp;	/* ... and mp heads the new message */
	mp->b_datap->db_mblk = mp;
	bp->b_datap->db_mblk = bp;
	mp->b_rptr = mp->b_wptr = dbp->db_base;

	do {
		ASSERT(bp->b_datap->db_ref > 0);
		ASSERT(bp->b_wptr >= bp->b_rptr);
		n = MIN(bp->b_wptr - bp->b_rptr, len);
		bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
		mp->b_wptr += n;
		bp->b_rptr += n;
		len -= n;
		if (bp->b_rptr != bp->b_wptr)
			break;
		b_cont = bp->b_cont;
		freeb(bp);
		bp = b_cont;
	} while (len && bp);

	mp->b_cont = bp;	/* tack on whatever wasn't pulled up */

	return (1);
}

/*
 * Concatenate and align at least the first len bytes of common message
 * type.  Len == -1 means concatenate everything.  The original message is
 * unaltered.  Returns a pointer to a new message on success, otherwise
 * returns NULL.
 */
mblk_t *
msgpullup(mblk_t *mp, ssize_t len)
{
	mblk_t	*newmp;
	ssize_t	totlen;
	ssize_t	n;

	/*
	 * We won't handle Multidata message, since it contains
	 * metadata which this function has no knowledge of; we
	 * assert on DEBUG, and return failure otherwise.
	 */
	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
	if (mp->b_datap->db_type == M_MULTIDATA)
		return (NULL);

	totlen = xmsgsize(mp);

	if ((len > 0) && (len > totlen))
		return (NULL);

	/*
	 * Copy all of the first msg type into one new mblk, then dupmsg
	 * and link the rest onto this.
	 */

	len = totlen;

	if ((newmp = allocb_tmpl(len, mp)) == NULL)
		return (NULL);

	newmp->b_flag = mp->b_flag;
	newmp->b_band = mp->b_band;

	while (len > 0) {
		n = mp->b_wptr - mp->b_rptr;
		ASSERT(n >= 0);		/* allow zero-length mblk_t's */
		if (n > 0)
			bcopy(mp->b_rptr, newmp->b_wptr, n);
		newmp->b_wptr += n;
		len -= n;
		mp = mp->b_cont;
	}

	if (mp != NULL) {
		newmp->b_cont = dupmsg(mp);
		if (newmp->b_cont == NULL) {
			freemsg(newmp);
			return (NULL);
		}
	}

	return (newmp);
}

/*
 * Trim bytes from message
 *  len > 0, trim from head
 *  len < 0, trim from tail
 * Returns 1 on success, 0 on failure.
 */
int
adjmsg(mblk_t *mp, ssize_t len)
{
	mblk_t *bp;
	mblk_t *save_bp = NULL;
	mblk_t *prev_bp;
	mblk_t *bcont;
	unsigned char type;
	ssize_t n;
	int fromhead;
	int first;

	ASSERT(mp != NULL);
	/*
	 * We won't handle Multidata message, since it contains
	 * metadata which this function has no knowledge of; we
	 * assert on DEBUG, and return failure otherwise.
	 */
	ASSERT(mp->b_datap->db_type != M_MULTIDATA);
	if (mp->b_datap->db_type == M_MULTIDATA)
		return (0);

	if (len < 0) {
		fromhead = 0;
		len = -len;
	} else {
		fromhead = 1;
	}

	if (xmsgsize(mp) < len)
		return (0);


	if (fromhead) {
		first = 1;
		while (len) {
			ASSERT(mp->b_wptr >= mp->b_rptr);
			n = MIN(mp->b_wptr - mp->b_rptr, len);
			mp->b_rptr += n;
			len -= n;

			/*
			 * If this is not the first zero length
			 * message remove it
			 */
			if (!first && (mp->b_wptr == mp->b_rptr)) {
				bcont = mp->b_cont;
				freeb(mp);
				mp = save_bp->b_cont = bcont;
			} else {
				save_bp = mp;
				mp = mp->b_cont;
			}
			first = 0;
		}
	} else {
		type = mp->b_datap->db_type;
		while (len) {
			bp = mp;
			save_bp = NULL;

			/*
			 * Find the last message of same type
			 */

			while (bp && bp->b_datap->db_type == type) {
				ASSERT(bp->b_wptr >= bp->b_rptr);
				prev_bp = save_bp;
				save_bp = bp;
				bp = bp->b_cont;
			}
			if (save_bp == NULL)
				break;
			n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
			save_bp->b_wptr -= n;
			len -= n;

			/*
			 * If this is not the first message
			 * and we have taken away everything
			 * from this message, remove it
			 */

			if ((save_bp != mp) &&
				(save_bp->b_wptr == save_bp->b_rptr)) {
				bcont = save_bp->b_cont;
				freeb(save_bp);
				prev_bp->b_cont = bcont;
			}
		}
	}
	return (1);
}

/*
 * get number of data bytes in message
 */
size_t
msgdsize(mblk_t *bp)
{
	size_t count = 0;

	for (; bp; bp = bp->b_cont)
		if (bp->b_datap->db_type == M_DATA) {
			ASSERT(bp->b_wptr >= bp->b_rptr);
			count += bp->b_wptr - bp->b_rptr;
		}
	return (count);
}

/*
 * Get a message off head of queue
 *
 * If queue has no buffers then mark queue
 * with QWANTR. (queue wants to be read by
 * someone when data becomes available)
 *
 * If there is something to take off then do so.
 * If queue falls below hi water mark turn off QFULL
 * flag.  Decrement weighted count of queue.
 * Also turn off QWANTR because queue is being read.
 *
 * The queue count is maintained on a per-band basis.
 * Priority band 0 (normal messages) uses q_count,
 * q_lowat, etc.  Non-zero priority bands use the
 * fields in their respective qband structures
 * (qb_count, qb_lowat, etc.)  All messages appear
 * on the same list, linked via their b_next pointers.
 * q_first is the head of the list.  q_count does
 * not reflect the size of all the messages on the
 * queue.  It only reflects those messages in the
 * normal band of flow.  The one exception to this
 * deals with high priority messages.  They are in
 * their own conceptual "band", but are accounted
 * against q_count.
 *
 * If queue count is below the lo water mark and QWANTW
 * is set, enable the closest backq which has a service
 * procedure and turn off the QWANTW flag.
 *
 * getq could be built on top of rmvq, but isn't because
 * of performance considerations.
 *
 * A note on the use of q_count and q_mblkcnt:
 *   q_count is the traditional byte count for messages that
 *   have been put on a queue.  Documentation tells us that
 *   we shouldn't rely on that count, but some drivers/modules
 *   do.  What was needed, however, is a mechanism to prevent
 *   runaway streams from consuming all of the resources,
 *   and particularly be able to flow control zero-length
 *   messages.  q_mblkcnt is used for this purpose.  It
 *   counts the number of mblk's that are being put on
 *   the queue.  The intention here, is that each mblk should
 *   contain one byte of data and, for the purpose of
 *   flow-control, logically does.  A queue will become
 *   full when EITHER of these values (q_count and q_mblkcnt)
 *   reach the highwater mark.  It will clear when BOTH
 *   of them drop below the highwater mark.  And it will
 *   backenable when BOTH of them drop below the lowwater
 *   mark.
 *   With this algorithm, a driver/module might be able
 *   to find a reasonably accurate q_count, and the
 *   framework can still try and limit resource usage.
 */
mblk_t *
getq(queue_t *q)
{
	mblk_t *bp;
	uchar_t band = 0;

	bp = getq_noenab(q);
	if (bp != NULL)
		band = bp->b_band;

	/*
	 * Inlined from qbackenable().
	 * Quick check without holding the lock.
	 */
	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
		return (bp);

	qbackenable(q, band);
	return (bp);
}

/*
 * Calculate number of data bytes in a single data message block taking
 * multidata messages into account.
 */

#define	ADD_MBLK_SIZE(mp, size) 					\
	if (DB_TYPE(mp) != M_MULTIDATA) {				\
		(size) += MBLKL(mp);					\
	} else {							\
		uint_t	pinuse;						\
									\
		mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse);	\
		(size) += pinuse;					\
	}

/*
 * Like getq() but does not backenable.  This is used by the stream
 * head when a putback() is likely.  The caller must call qbackenable()
 * after it is done with accessing the queue.
 */
mblk_t *
getq_noenab(queue_t *q)
{
	mblk_t *bp;
	mblk_t *tmp;
	qband_t *qbp;
	kthread_id_t freezer;
	int	bytecnt = 0, mblkcnt = 0;

	/* freezestr should allow its caller to call getq/putq */
	freezer = STREAM(q)->sd_freezer;
	if (freezer == curthread) {
		ASSERT(frozenstr(q));
		ASSERT(MUTEX_HELD(QLOCK(q)));
	} else
		mutex_enter(QLOCK(q));

	if ((bp = q->q_first) == 0) {
		q->q_flag |= QWANTR;
	} else {
		if ((q->q_first = bp->b_next) == NULL)
			q->q_last = NULL;
		else
			q->q_first->b_prev = NULL;

		/* Get message byte count for q_count accounting */
		for (tmp = bp; tmp; tmp = tmp->b_cont) {
			ADD_MBLK_SIZE(tmp, bytecnt);
			mblkcnt++;
		}

		if (bp->b_band == 0) {
			q->q_count -= bytecnt;
			q->q_mblkcnt -= mblkcnt;
			if ((q->q_count < q->q_hiwat) &&
			    (q->q_mblkcnt < q->q_hiwat)) {
				q->q_flag &= ~QFULL;
			}
		} else {
			int i;

			ASSERT(bp->b_band <= q->q_nband);
			ASSERT(q->q_bandp != NULL);
			ASSERT(MUTEX_HELD(QLOCK(q)));
			qbp = q->q_bandp;
			i = bp->b_band;
			while (--i > 0)
				qbp = qbp->qb_next;
			if (qbp->qb_first == qbp->qb_last) {
				qbp->qb_first = NULL;
				qbp->qb_last = NULL;
			} else {
				qbp->qb_first = bp->b_next;
			}
			qbp->qb_count -= bytecnt;
			qbp->qb_mblkcnt -= mblkcnt;
			if ((qbp->qb_count < qbp->qb_hiwat) &&
			    (qbp->qb_mblkcnt < qbp->qb_hiwat)) {
				qbp->qb_flag &= ~QB_FULL;
			}
		}
		q->q_flag &= ~QWANTR;
		bp->b_next = NULL;
		bp->b_prev = NULL;
	}
	if (freezer != curthread)
		mutex_exit(QLOCK(q));

	STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);

	return (bp);
}

/*
 * Determine if a backenable is needed after removing a message in the
 * specified band.
 * NOTE: This routine assumes that something like getq_noenab() has been
 * already called.
 *
 * For the read side it is ok to hold sd_lock across calling this (and the
 * stream head often does).
 * But for the write side strwakeq might be invoked and it acquires sd_lock.
 */
void
qbackenable(queue_t *q, uchar_t band)
{
	int backenab = 0;
	qband_t *qbp;
	kthread_id_t freezer;

	ASSERT(q);
	ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));

	/*
	 * Quick check without holding the lock.
	 * OK since after getq() has lowered the q_count these flags
	 * would not change unless either the qbackenable() is done by
	 * another thread (which is ok) or the queue has gotten QFULL
	 * in which case another backenable will take place when the queue
	 * drops below q_lowat.
	 */
	if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
		return;

	/* freezestr should allow its caller to call getq/putq */
	freezer = STREAM(q)->sd_freezer;
	if (freezer == curthread) {
		ASSERT(frozenstr(q));
		ASSERT(MUTEX_HELD(QLOCK(q)));
	} else
		mutex_enter(QLOCK(q));

	if (band == 0) {
		if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
		    q->q_mblkcnt < q->q_lowat)) {
			backenab = q->q_flag & (QWANTW|QWANTWSYNC);
		}
	} else {
		int i;

		ASSERT((unsigned)band <= q->q_nband);
		ASSERT(q->q_bandp != NULL);

		qbp = q->q_bandp;
		i = band;
		while (--i > 0)
			qbp = qbp->qb_next;

		if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
		    qbp->qb_mblkcnt < qbp->qb_lowat)) {
			backenab = qbp->qb_flag & QB_WANTW;
		}
	}

	if (backenab == 0) {
		if (freezer != curthread)
			mutex_exit(QLOCK(q));
		return;
	}

	/* Have to drop the lock across strwakeq and backenable */
	if (backenab & QWANTWSYNC)
		q->q_flag &= ~QWANTWSYNC;
	if (backenab & (QWANTW|QB_WANTW)) {
		if (band != 0)
			qbp->qb_flag &= ~QB_WANTW;
		else {
			q->q_flag &= ~QWANTW;
		}
	}

	if (freezer != curthread)
		mutex_exit(QLOCK(q));

	if (backenab & QWANTWSYNC)
		strwakeq(q, QWANTWSYNC);
	if (backenab & (QWANTW|QB_WANTW))
		backenable(q, band);
}

/*
 * Remove a message from a queue.  The queue count and other
 * flow control parameters are adjusted and the back queue
 * enabled if necessary.
 *
 * rmvq can be called with the stream frozen, but other utility functions
 * holding QLOCK, and by streams modules without any locks/frozen.
 */
void
rmvq(queue_t *q, mblk_t *mp)
{
	ASSERT(mp != NULL);

	rmvq_noenab(q, mp);
	if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
		/*
		 * qbackenable can handle a frozen stream but not a "random"
		 * qlock being held. Drop lock across qbackenable.
		 */
		mutex_exit(QLOCK(q));
		qbackenable(q, mp->b_band);
		mutex_enter(QLOCK(q));
	} else {
		qbackenable(q, mp->b_band);
	}
}

/*
 * Like rmvq() but without any backenabling.
 * This exists to handle SR_CONSOL_DATA in strrput().
 */
void
rmvq_noenab(queue_t *q, mblk_t *mp)
{
	mblk_t *tmp;
	int i;
	qband_t *qbp = NULL;
	kthread_id_t freezer;
	int	bytecnt = 0, mblkcnt = 0;

	freezer = STREAM(q)->sd_freezer;
	if (freezer == curthread) {
		ASSERT(frozenstr(q));
		ASSERT(MUTEX_HELD(QLOCK(q)));
	} else if (MUTEX_HELD(QLOCK(q))) {
		/* Don't drop lock on exit */
		freezer = curthread;
	} else
		mutex_enter(QLOCK(q));

	ASSERT(mp->b_band <= q->q_nband);
	if (mp->b_band != 0) {		/* Adjust band pointers */
		ASSERT(q->q_bandp != NULL);
		qbp = q->q_bandp;
		i = mp->b_band;
		while (--i > 0)
			qbp = qbp->qb_next;
		if (mp == qbp->qb_first) {
			if (mp->b_next && mp->b_band == mp->b_next->b_band)
				qbp->qb_first = mp->b_next;
			else
				qbp->qb_first = NULL;
		}
		if (mp == qbp->qb_last) {
			if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
				qbp->qb_last = mp->b_prev;
			else
				qbp->qb_last = NULL;
		}
	}

	/*
	 * Remove the message from the list.
	 */
	if (mp->b_prev)
		mp->b_prev->b_next = mp->b_next;
	else
		q->q_first = mp->b_next;
	if (mp->b_next)
		mp->b_next->b_prev = mp->b_prev;
	else
		q->q_last = mp->b_prev;
	mp->b_next = NULL;
	mp->b_prev = NULL;

	/* Get the size of the message for q_count accounting */
	for (tmp = mp; tmp; tmp = tmp->b_cont) {
		ADD_MBLK_SIZE(tmp, bytecnt);
		mblkcnt++;
	}

	if (mp->b_band == 0) {		/* Perform q_count accounting */
		q->q_count -= bytecnt;
		q->q_mblkcnt -= mblkcnt;
		if ((q->q_count < q->q_hiwat) &&
		    (q->q_mblkcnt < q->q_hiwat)) {
			q->q_flag &= ~QFULL;
		}
	} else {			/* Perform qb_count accounting */
		qbp->qb_count -= bytecnt;
		qbp->qb_mblkcnt -= mblkcnt;
		if ((qbp->qb_count < qbp->qb_hiwat) &&
		    (qbp->qb_mblkcnt < qbp->qb_hiwat)) {
			qbp->qb_flag &= ~QB_FULL;
		}
	}
	if (freezer != curthread)
		mutex_exit(QLOCK(q));

	STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
}

/*
 * Empty a queue.
 * If flag is set, remove all messages.  Otherwise, remove
 * only non-control messages.  If queue falls below its low
 * water mark, and QWANTW is set, enable the nearest upstream
 * service procedure.
 *
 * Historical note: when merging the M_FLUSH code in strrput with this
 * code one difference was discovered. flushq did not have a check
 * for q_lowat == 0 in the backenabling test.
 *
 * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
 * if one exists on the queue.
 */
void
flushq_common(queue_t *q, int flag, int pcproto_flag)
{
	mblk_t *mp, *nmp;
	qband_t *qbp;
	int backenab = 0;
	unsigned char bpri;
	unsigned char	qbf[NBAND];	/* band flushing backenable flags */

	if (q->q_first == NULL)
		return;

	mutex_enter(QLOCK(q));
	mp = q->q_first;
	q->q_first = NULL;
	q->q_last = NULL;
	q->q_count = 0;
	q->q_mblkcnt = 0;
	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
		qbp->qb_first = NULL;
		qbp->qb_last = NULL;
		qbp->qb_count = 0;
		qbp->qb_mblkcnt = 0;
		qbp->qb_flag &= ~QB_FULL;
	}
	q->q_flag &= ~QFULL;
	mutex_exit(QLOCK(q));
	while (mp) {
		nmp = mp->b_next;
		mp->b_next = mp->b_prev = NULL;

		STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);

		if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
			(void) putq(q, mp);
		else if (flag || datamsg(mp->b_datap->db_type))
			freemsg(mp);
		else
			(void) putq(q, mp);
		mp = nmp;
	}
	bpri = 1;
	mutex_enter(QLOCK(q));
	for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
		if ((qbp->qb_flag & QB_WANTW) &&
		    (((qbp->qb_count < qbp->qb_lowat) &&
		    (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
		    qbp->qb_lowat == 0)) {
			qbp->qb_flag &= ~QB_WANTW;
			backenab = 1;
			qbf[bpri] = 1;
		} else
			qbf[bpri] = 0;
		bpri++;
	}
	ASSERT(bpri == (unsigned char)(q->q_nband + 1));
	if ((q->q_flag & QWANTW) &&
	    (((q->q_count < q->q_lowat) &&
	    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
		q->q_flag &= ~QWANTW;
		backenab = 1;
		qbf[0] = 1;
	} else
		qbf[0] = 0;

	/*
	 * If any band can now be written to, and there is a writer
	 * for that band, then backenable the closest service procedure.
	 */
	if (backenab) {
		mutex_exit(QLOCK(q));
		for (bpri = q->q_nband; bpri != 0; bpri--)
			if (qbf[bpri])
				backenable(q, bpri);
		if (qbf[0])
			backenable(q, 0);
	} else
		mutex_exit(QLOCK(q));
}

/*
 * The real flushing takes place in flushq_common. This is done so that
 * a flag which specifies whether or not M_PCPROTO messages should be flushed
 * or not. Currently the only place that uses this flag is the stream head.
 */
void
flushq(queue_t *q, int flag)
{
	flushq_common(q, flag, 0);
}

/*
 * Flush the queue of messages of the given priority band.
 * There is some duplication of code between flushq and flushband.
 * This is because we want to optimize the code as much as possible.
 * The assumption is that there will be more messages in the normal
 * (priority 0) band than in any other.
 *
 * Historical note: when merging the M_FLUSH code in strrput with this
 * code one difference was discovered. flushband had an extra check for
 * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
 * case. That check does not match the man page for flushband and was not
 * in the strrput flush code hence it was removed.
 */
void
flushband(queue_t *q, unsigned char pri, int flag)
{
	mblk_t *mp;
	mblk_t *nmp;
	mblk_t *last;
	qband_t *qbp;
	int band;

	ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
	if (pri > q->q_nband) {
		return;
	}
	mutex_enter(QLOCK(q));
	if (pri == 0) {
		mp = q->q_first;
		q->q_first = NULL;
		q->q_last = NULL;
		q->q_count = 0;
		q->q_mblkcnt = 0;
		for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
			qbp->qb_first = NULL;
			qbp->qb_last = NULL;
			qbp->qb_count = 0;
			qbp->qb_mblkcnt = 0;
			qbp->qb_flag &= ~QB_FULL;
		}
		q->q_flag &= ~QFULL;
		mutex_exit(QLOCK(q));
		while (mp) {
			nmp = mp->b_next;
			mp->b_next = mp->b_prev = NULL;
			if ((mp->b_band == 0) &&
				((flag == FLUSHALL) ||
				datamsg(mp->b_datap->db_type)))
				freemsg(mp);
			else
				(void) putq(q, mp);
			mp = nmp;
		}
		mutex_enter(QLOCK(q));
		if ((q->q_flag & QWANTW) &&
		    (((q->q_count < q->q_lowat) &&
		    (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
			q->q_flag &= ~QWANTW;
			mutex_exit(QLOCK(q));

			backenable(q, pri);
		} else
			mutex_exit(QLOCK(q));
	} else {	/* pri != 0 */
		boolean_t flushed = B_FALSE;
		band = pri;

		ASSERT(MUTEX_HELD(QLOCK(q)));
		qbp = q->q_bandp;
		while (--band > 0)
			qbp = qbp->qb_next;
		mp = qbp->qb_first;
		if (mp == NULL) {
			mutex_exit(QLOCK(q));
			return;
		}
		last = qbp->qb_last->b_next;
		/*
		 * rmvq_noenab() and freemsg() are called for each mblk that
		 * meets the criteria.  The loop is executed until the last
		 * mblk has been processed.
		 */
		while (mp != last) {
			ASSERT(mp->b_band == pri);
			nmp = mp->b_next;
			if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
				rmvq_noenab(q, mp);
				freemsg(mp);
				flushed = B_TRUE;
			}
			mp = nmp;
		}
		mutex_exit(QLOCK(q));

		/*
		 * If any mblk(s) has been freed, we know that qbackenable()
		 * will need to be called.
		 */
		if (flushed)
			qbackenable(q, pri);
	}
}

/*
 * Return 1 if the queue is not full.  If the queue is full, return
 * 0 (may not put message) and set QWANTW flag (caller wants to write
 * to the queue).
 */
int
canput(queue_t *q)
{
	TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);

	/* this is for loopback transports, they should not do a canput */
	ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));

	/* Find next forward module that has a service procedure */
	q = q->q_nfsrv;

	if (!(q->q_flag & QFULL)) {
		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
		return (1);
	}
	mutex_enter(QLOCK(q));
	if (q->q_flag & QFULL) {
		q->q_flag |= QWANTW;
		mutex_exit(QLOCK(q));
		TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
		return (0);
	}
	mutex_exit(QLOCK(q));
	TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
	return (1);
}

/*
 * This is the new canput for use with priority bands.  Return 1 if the
 * band is not full.  If the band is full, return 0 (may not put message)
 * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
 * write to the queue).
 */
int
bcanput(queue_t *q, unsigned char pri)
{
	qband_t *qbp;

	TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
	if (!q)
		return (0);

	/* Find next forward module that has a service procedure */
	q = q->q_nfsrv;

	mutex_enter(QLOCK(q));
	if (pri == 0) {
		if (q->q_flag & QFULL) {
			q->q_flag |= QWANTW;
			mutex_exit(QLOCK(q));
			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
				"bcanput:%p %X %d", q, pri, 0);
			return (0);
		}
	} else {	/* pri != 0 */
		if (pri > q->q_nband) {
			/*
			 * No band exists yet, so return success.
			 */
			mutex_exit(QLOCK(q));
			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
				"bcanput:%p %X %d", q, pri, 1);
			return (1);
		}
		qbp = q->q_bandp;
		while (--pri)
			qbp = qbp->qb_next;
		if (qbp->qb_flag & QB_FULL) {
			qbp->qb_flag |= QB_WANTW;
			mutex_exit(QLOCK(q));
			TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
				"bcanput:%p %X %d", q, pri, 0);
			return (0);
		}
	}
	mutex_exit(QLOCK(q));
	TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
		"bcanput:%p %X %d", q, pri, 1);
	return (1);
}

/*
 * Put a message on a queue.
 *
 * Messages are enqueued on a priority basis.  The priority classes
 * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
 * and B_NORMAL (type < QPCTL && band == 0).
 *
 * Add appropriate weighted data block sizes to queue count.
 * If queue hits high water mark then set QFULL flag.
 *
 * If QNOENAB is not set (putq is allowed to enable the queue),
 * enable the queue only if the message is PRIORITY,
 * or the QWANTR flag is set (indicating that the service procedure
 * is ready to read the queue.  This implies that a service
 * procedure must NEVER put a high priority message back on its own
 * queue, as this would result in an infinite loop (!).
 */
int
putq(queue_t *q, mblk_t *bp)
{
	mblk_t *tmp;
	qband_t *qbp = NULL;
	int mcls = (int)queclass(bp);
	kthread_id_t freezer;
	int	bytecnt = 0, mblkcnt = 0;

	freezer = STREAM(q)->sd_freezer;
	if (freezer == curthread) {
		ASSERT(frozenstr(q));
		ASSERT(MUTEX_HELD(QLOCK(q)));
	} else
		mutex_enter(QLOCK(q));

	/*
	 * Make sanity checks and if qband structure is not yet
	 * allocated, do so.
	 */
	if (mcls == QPCTL) {
		if (bp->b_band != 0)
			bp->b_band = 0;		/* force to be correct */
	} else if (bp->b_band != 0) {
		int i;
		qband_t **qbpp;

		if (bp->b_band > q->q_nband) {

			/*
			 * The qband structure for this priority band is
			 * not on the queue yet, so we have to allocate
			 * one on the fly.  It would be wasteful to
			 * associate the qband structures with every
			 * queue when the queues are allocated.  This is
			 * because most queues will only need the normal
			 * band of flow which can be described entirely
			 * by the queue itself.
			 */
			qbpp = &q->q_bandp;
			while (*qbpp)
				qbpp = &(*qbpp)->qb_next;
			while (bp->b_band > q->q_nband) {
				if ((*qbpp = allocband()) == NULL) {
					if (freezer != curthread)
						mutex_exit(QLOCK(q));
					return (0);
				}
				(*qbpp)->qb_hiwat = q->q_hiwat;
				(*qbpp)->qb_lowat = q->q_lowat;
				q->q_nband++;
				qbpp = &(*qbpp)->qb_next;
			}
		}
		ASSERT(MUTEX_HELD(QLOCK(q)));
		qbp = q->q_bandp;
		i = bp->b_band;
		while (--i)
			qbp = qbp->qb_next;
	}

	/*
	 * If queue is empty, add the message and initialize the pointers.
	 * Otherwise, adjust message pointers and queue pointers based on
	 * the type of the message and where it belongs on the queue.  Some
	 * code is duplicated to minimize the number of conditionals and
	 * hopefully minimize the amount of time this routine takes.
	 */
	if (!q->q_first) {
		bp->b_next = NULL;
		bp->b_prev = NULL;
		q->q_first = bp;
		q->q_last = bp;
		if (qbp) {
			qbp->qb_first = bp;
			qbp->qb_last = bp;
		}
	} else if (!qbp) {	/* bp->b_band == 0 */

		/*
		 * If queue class of message is less than or equal to
		 * that of the last one on the queue, tack on to the end.
		 */
		tmp = q->q_last;
		if (mcls <= (int)queclass(tmp)) {
			bp->b_next = NULL;
			bp->b_prev = tmp;
			tmp->b_next = bp;
			q->q_last = bp;
		} else {
			tmp = q->q_first;
			while ((int)queclass(tmp) >= mcls)
				tmp = tmp->b_next;

			/*
			 * Insert bp before tmp.
			 */
			bp->b_next = tmp;
			bp->b_prev = tmp->b_prev;
			if (tmp->b_prev)
				tmp->b_prev->b_next = bp;
			else
				q->q_first = bp;
			tmp->b_prev = bp;
		}
	} else {		/* bp->b_band != 0 */
		if (qbp->qb_first) {
			tmp = qbp->qb_last;

			/*
			 * Insert bp after the last message in this band.
			 */
			bp->b_next = tmp->b_next;
			if (tmp->b_next)
				tmp->b_next->b_prev = bp;
			else
				q->q_last = bp;
			bp->b_prev = tmp;
			tmp->b_next = bp;
		} else {
			tmp = q->q_last;
			if ((mcls < (int)queclass(tmp)) ||
			    (bp->b_band <= tmp->b_band)) {

				/*
				 * Tack bp on end of queue.
				 */
				bp->b_next = NULL;
				bp->b_prev = tmp;
				tmp->b_next = bp;
				q->q_last = bp;
			} else {
				tmp = q->q_first;
				while (tmp->b_datap->db_type >= QPCTL)
					tmp = tmp->b_next;
				while (tmp->b_band >= bp->b_band)
					tmp = tmp->b_next;

				/*
				 * Insert bp before tmp.
				 */
				bp->b_next = tmp;
				bp->b_prev = tmp->b_prev;
				if (tmp->b_prev)
					tmp->b_prev->b_next = bp;
				else
					q->q_first = bp;
				tmp->b_prev = bp;
			}
			qbp->qb_first = bp;
		}
		qbp->qb_last = bp;
	}

	/* Get message byte count for q_count accounting */
	for (tmp = bp; tmp; tmp = tmp->b_cont) {
		ADD_MBLK_SIZE(tmp, bytecnt);
		mblkcnt++;
	}

	if (qbp) {
		qbp->qb_count += bytecnt;
		qbp->qb_mblkcnt += mblkcnt;
		if ((qbp->qb_count >= qbp->qb_hiwat) ||
		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
			qbp->qb_flag |= QB_FULL;
		}
	} else {
		q->q_count += bytecnt;
		q->q_mblkcnt += mblkcnt;
		if ((q->q_count >= q->q_hiwat) ||
		    (q->q_mblkcnt >= q->q_hiwat)) {
			q->q_flag |= QFULL;
		}
	}

	STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);

	if ((mcls > QNORM) ||
	    (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
		qenable_locked(q);
	ASSERT(MUTEX_HELD(QLOCK(q)));
	if (freezer != curthread)
		mutex_exit(QLOCK(q));

	return (1);
}

/*
 * Put stuff back at beginning of Q according to priority order.
 * See comment on putq above for details.
 */
int
putbq(queue_t *q, mblk_t *bp)
{
	mblk_t *tmp;
	qband_t *qbp = NULL;
	int mcls = (int)queclass(bp);
	kthread_id_t freezer;
	int	bytecnt = 0, mblkcnt = 0;

	ASSERT(q && bp);
	ASSERT(bp->b_next == NULL);
	freezer = STREAM(q)->sd_freezer;
	if (freezer == curthread) {
		ASSERT(frozenstr(q));
		ASSERT(MUTEX_HELD(QLOCK(q)));
	} else
		mutex_enter(QLOCK(q));

	/*
	 * Make sanity checks and if qband structure is not yet
	 * allocated, do so.
	 */
	if (mcls == QPCTL) {
		if (bp->b_band != 0)
			bp->b_band = 0;		/* force to be correct */
	} else if (bp->b_band != 0) {
		int i;
		qband_t **qbpp;

		if (bp->b_band > q->q_nband) {
			qbpp = &q->q_bandp;
			while (*qbpp)
				qbpp = &(*qbpp)->qb_next;
			while (bp->b_band > q->q_nband) {
				if ((*qbpp = allocband()) == NULL) {
					if (freezer != curthread)
						mutex_exit(QLOCK(q));
					return (0);
				}
				(*qbpp)->qb_hiwat = q->q_hiwat;
				(*qbpp)->qb_lowat = q->q_lowat;
				q->q_nband++;
				qbpp = &(*qbpp)->qb_next;
			}
		}
		qbp = q->q_bandp;
		i = bp->b_band;
		while (--i)
			qbp = qbp->qb_next;
	}

	/*
	 * If queue is empty or if message is high priority,
	 * place on the front of the queue.
	 */
	tmp = q->q_first;
	if ((!tmp) || (mcls == QPCTL)) {
		bp->b_next = tmp;
		if (tmp)
			tmp->b_prev = bp;
		else
			q->q_last = bp;
		q->q_first = bp;
		bp->b_prev = NULL;
		if (qbp) {
			qbp->qb_first = bp;
			qbp->qb_last = bp;
		}
	} else if (qbp) {	/* bp->b_band != 0 */
		tmp = qbp->qb_first;
		if (tmp) {

			/*
			 * Insert bp before the first message in this band.
			 */
			bp->b_next = tmp;
			bp->b_prev = tmp->b_prev;
			if (tmp->b_prev)
				tmp->b_prev->b_next = bp;
			else
				q->q_first = bp;
			tmp->b_prev = bp;
		} else {
			tmp = q->q_last;
			if ((mcls < (int)queclass(tmp)) ||
			    (bp->b_band < tmp->b_band)) {

				/*
				 * Tack bp on end of queue.
				 */
				bp->b_next = NULL;
				bp->b_prev = tmp;
				tmp->b_next = bp;
				q->q_last = bp;
			} else {
				tmp = q->q_first;
				while (tmp->b_datap->db_type >= QPCTL)
					tmp = tmp->b_next;
				while (tmp->b_band > bp->b_band)
					tmp = tmp->b_next;

				/*
				 * Insert bp before tmp.
				 */
				bp->b_next = tmp;
				bp->b_prev = tmp->b_prev;
				if (tmp->b_prev)
					tmp->b_prev->b_next = bp;
				else
					q->q_first = bp;
				tmp->b_prev = bp;
			}
			qbp->qb_last = bp;
		}
		qbp->qb_first = bp;
	} else {		/* bp->b_band == 0 && !QPCTL */

		/*
		 * If the queue class or band is less than that of the last
		 * message on the queue, tack bp on the end of the queue.
		 */
		tmp = q->q_last;
		if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
			bp->b_next = NULL;
			bp->b_prev = tmp;
			tmp->b_next = bp;
			q->q_last = bp;
		} else {
			tmp = q->q_first;
			while (tmp->b_datap->db_type >= QPCTL)
				tmp = tmp->b_next;
			while (tmp->b_band > bp->b_band)
				tmp = tmp->b_next;

			/*
			 * Insert bp before tmp.
			 */
			bp->b_next = tmp;
			bp->b_prev = tmp->b_prev;
			if (tmp->b_prev)
				tmp->b_prev->b_next = bp;
			else
				q->q_first = bp;
			tmp->b_prev = bp;
		}
	}

	/* Get message byte count for q_count accounting */
	for (tmp = bp; tmp; tmp = tmp->b_cont) {
		ADD_MBLK_SIZE(tmp, bytecnt);
		mblkcnt++;
	}
	if (qbp) {
		qbp->qb_count += bytecnt;
		qbp->qb_mblkcnt += mblkcnt;
		if ((qbp->qb_count >= qbp->qb_hiwat) ||
		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
			qbp->qb_flag |= QB_FULL;
		}
	} else {
		q->q_count += bytecnt;
		q->q_mblkcnt += mblkcnt;
		if ((q->q_count >= q->q_hiwat) ||
		    (q->q_mblkcnt >= q->q_hiwat)) {
			q->q_flag |= QFULL;
		}
	}

	STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);

	if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
		qenable_locked(q);
	ASSERT(MUTEX_HELD(QLOCK(q)));
	if (freezer != curthread)
		mutex_exit(QLOCK(q));

	return (1);
}

/*
 * Insert a message before an existing message on the queue.  If the
 * existing message is NULL, the new messages is placed on the end of
 * the queue.  The queue class of the new message is ignored.  However,
 * the priority band of the new message must adhere to the following
 * ordering:
 *
 *	emp->b_prev->b_band >= mp->b_band >= emp->b_band.
 *
 * All flow control parameters are updated.
 *
 * insq can be called with the stream frozen, but other utility functions
 * holding QLOCK, and by streams modules without any locks/frozen.
 */
int
insq(queue_t *q, mblk_t *emp, mblk_t *mp)
{
	mblk_t *tmp;
	qband_t *qbp = NULL;
	int mcls = (int)queclass(mp);
	kthread_id_t freezer;
	int	bytecnt = 0, mblkcnt = 0;

	freezer = STREAM(q)->sd_freezer;
	if (freezer == curthread) {
		ASSERT(frozenstr(q));
		ASSERT(MUTEX_HELD(QLOCK(q)));
	} else if (MUTEX_HELD(QLOCK(q))) {
		/* Don't drop lock on exit */
		freezer = curthread;
	} else
		mutex_enter(QLOCK(q));

	if (mcls == QPCTL) {
		if (mp->b_band != 0)
			mp->b_band = 0;		/* force to be correct */
		if (emp && emp->b_prev &&
		    (emp->b_prev->b_datap->db_type < QPCTL))
			goto badord;
	}
	if (emp) {
		if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
		    (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
		    (emp->b_prev->b_band < mp->b_band))) {
			goto badord;
		}
	} else {
		tmp = q->q_last;
		if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
badord:
			cmn_err(CE_WARN,
			    "insq: attempt to insert message out of order "
			    "on q %p", (void *)q);
			if (freezer != curthread)
				mutex_exit(QLOCK(q));
			return (0);
		}
	}

	if (mp->b_band != 0) {
		int i;
		qband_t **qbpp;

		if (mp->b_band > q->q_nband) {
			qbpp = &q->q_bandp;
			while (*qbpp)
				qbpp = &(*qbpp)->qb_next;
			while (mp->b_band > q->q_nband) {
				if ((*qbpp = allocband()) == NULL) {
					if (freezer != curthread)
						mutex_exit(QLOCK(q));
					return (0);
				}
				(*qbpp)->qb_hiwat = q->q_hiwat;
				(*qbpp)->qb_lowat = q->q_lowat;
				q->q_nband++;
				qbpp = &(*qbpp)->qb_next;
			}
		}
		qbp = q->q_bandp;
		i = mp->b_band;
		while (--i)
			qbp = qbp->qb_next;
	}

	if ((mp->b_next = emp) != NULL) {
		if ((mp->b_prev = emp->b_prev) != NULL)
			emp->b_prev->b_next = mp;
		else
			q->q_first = mp;
		emp->b_prev = mp;
	} else {
		if ((mp->b_prev = q->q_last) != NULL)
			q->q_last->b_next = mp;
		else
			q->q_first = mp;
		q->q_last = mp;
	}

	/* Get mblk and byte count for q_count accounting */
	for (tmp = mp; tmp; tmp = tmp->b_cont) {
		ADD_MBLK_SIZE(tmp, bytecnt);
		mblkcnt++;
	}

	if (qbp) {	/* adjust qband pointers and count */
		if (!qbp->qb_first) {
			qbp->qb_first = mp;
			qbp->qb_last = mp;
		} else {
			if (mp->b_prev == NULL || (mp->b_prev != NULL &&
			    (mp->b_prev->b_band != mp->b_band)))
				qbp->qb_first = mp;
			else if (mp->b_next == NULL || (mp->b_next != NULL &&
			    (mp->b_next->b_band != mp->b_band)))
				qbp->qb_last = mp;
		}
		qbp->qb_count += bytecnt;
		qbp->qb_mblkcnt += mblkcnt;
		if ((qbp->qb_count >= qbp->qb_hiwat) ||
		    (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
			qbp->qb_flag |= QB_FULL;
		}
	} else {
		q->q_count += bytecnt;
		q->q_mblkcnt += mblkcnt;
		if ((q->q_count >= q->q_hiwat) ||
		    (q->q_mblkcnt >= q->q_hiwat)) {
			q->q_flag |= QFULL;
		}
	}

	STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);

	if (canenable(q) && (q->q_flag & QWANTR))
		qenable_locked(q);

	ASSERT(MUTEX_HELD(QLOCK(q)));
	if (freezer != curthread)
		mutex_exit(QLOCK(q));

	return (1);
}

/*
 * Create and put a control message on queue.
 */
int
putctl(queue_t *q, int type)
{
	mblk_t *bp;

	if ((datamsg(type) && (type != M_DELAY)) ||
	    (bp = allocb_tryhard(0)) == NULL)
		return (0);
	bp->b_datap->db_type = (unsigned char) type;

	put(q, bp);

	return (1);
}

/*
 * Control message with a single-byte parameter
 */
int
putctl1(queue_t *q, int type, int param)
{
	mblk_t *bp;

	if ((datamsg(type) && (type != M_DELAY)) ||
	    (bp = allocb_tryhard(1)) == NULL)
		return (0);
	bp->b_datap->db_type = (unsigned char)type;
	*bp->b_wptr++ = (unsigned char)param;

	put(q, bp);

	return (1);
}

int
putnextctl1(queue_t *q, int type, int param)
{
	mblk_t *bp;

	if ((datamsg(type) && (type != M_DELAY)) ||
		((bp = allocb_tryhard(1)) == NULL))
		return (0);

	bp->b_datap->db_type = (unsigned char)type;
	*bp->b_wptr++ = (unsigned char)param;

	putnext(q, bp);

	return (1);
}

int
putnextctl(queue_t *q, int type)
{
	mblk_t *bp;

	if ((datamsg(type) && (type != M_DELAY)) ||
		((bp = allocb_tryhard(0)) == NULL))
		return (0);
	bp->b_datap->db_type = (unsigned char)type;

	putnext(q, bp);

	return (1);
}

/*
 * Return the queue upstream from this one
 */
queue_t *
backq(queue_t *q)
{
	q = _OTHERQ(q);
	if (q->q_next) {
		q = q->q_next;
		return (_OTHERQ(q));
	}
	return (NULL);
}

/*
 * Send a block back up the queue in reverse from this
 * one (e.g. to respond to ioctls)
 */
void
qreply(queue_t *q, mblk_t *bp)
{
	ASSERT(q && bp);

	putnext(_OTHERQ(q), bp);
}

/*
 * Streams Queue Scheduling
 *
 * Queues are enabled through qenable() when they have messages to
 * process.  They are serviced by queuerun(), which runs each enabled
 * queue's service procedure.  The call to queuerun() is processor
 * dependent - the general principle is that it be run whenever a queue
 * is enabled but before returning to user level.  For system calls,
 * the function runqueues() is called if their action causes a queue
 * to be enabled.  For device interrupts, queuerun() should be
 * called before returning from the last level of interrupt.  Beyond
 * this, no timing assumptions should be made about queue scheduling.
 */

/*
 * Enable a queue: put it on list of those whose service procedures are
 * ready to run and set up the scheduling mechanism.
 * The broadcast is done outside the mutex -> to avoid the woken thread
 * from contending with the mutex. This is OK 'cos the queue has been
 * enqueued on the runlist and flagged safely at this point.
 */
void
qenable(queue_t *q)
{
	mutex_enter(QLOCK(q));
	qenable_locked(q);
	mutex_exit(QLOCK(q));
}
/*
 * Return number of messages on queue
 */
int
qsize(queue_t *qp)
{
	int count = 0;
	mblk_t *mp;

	mutex_enter(QLOCK(qp));
	for (mp = qp->q_first; mp; mp = mp->b_next)
		count++;
	mutex_exit(QLOCK(qp));
	return (count);
}

/*
 * noenable - set queue so that putq() will not enable it.
 * enableok - set queue so that putq() can enable it.
 */
void
noenable(queue_t *q)
{
	mutex_enter(QLOCK(q));
	q->q_flag |= QNOENB;
	mutex_exit(QLOCK(q));
}

void
enableok(queue_t *q)
{
	mutex_enter(QLOCK(q));
	q->q_flag &= ~QNOENB;
	mutex_exit(QLOCK(q));
}

/*
 * Set queue fields.
 */
int
strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
{
	qband_t *qbp = NULL;
	queue_t	*wrq;
	int error = 0;
	kthread_id_t freezer;

	freezer = STREAM(q)->sd_freezer;
	if (freezer == curthread) {
		ASSERT(frozenstr(q));
		ASSERT(MUTEX_HELD(QLOCK(q)));
	} else
		mutex_enter(QLOCK(q));

	if (what >= QBAD) {
		error = EINVAL;
		goto done;
	}
	if (pri != 0) {
		int i;
		qband_t **qbpp;

		if (pri > q->q_nband) {
			qbpp = &q->q_bandp;
			while (*qbpp)
				qbpp = &(*qbpp)->qb_next;
			while (pri > q->q_nband) {
				if ((*qbpp = allocband()) == NULL) {
					error = EAGAIN;
					goto done;
				}
				(*qbpp)->qb_hiwat = q->q_hiwat;
				(*qbpp)->qb_lowat = q->q_lowat;
				q->q_nband++;
				qbpp = &(*qbpp)->qb_next;
			}
		}
		qbp = q->q_bandp;
		i = pri;
		while (--i)
			qbp = qbp->qb_next;
	}
	switch (what) {

	case QHIWAT:
		if (qbp)
			qbp->qb_hiwat = (size_t)val;
		else
			q->q_hiwat = (size_t)val;
		break;

	case QLOWAT:
		if (qbp)
			qbp->qb_lowat = (size_t)val;
		else
			q->q_lowat = (size_t)val;
		break;

	case QMAXPSZ:
		if (qbp)
			error = EINVAL;
		else
			q->q_maxpsz = (ssize_t)val;

		/*
		 * Performance concern, strwrite looks at the module below
		 * the stream head for the maxpsz each time it does a write
		 * we now cache it at the stream head.  Check to see if this
		 * queue is sitting directly below the stream head.
		 */
		wrq = STREAM(q)->sd_wrq;
		if (q != wrq->q_next)
			break;

		/*
		 * If the stream is not frozen drop the current QLOCK and
		 * acquire the sd_wrq QLOCK which protects sd_qn_*
		 */
		if (freezer != curthread) {
			mutex_exit(QLOCK(q));
			mutex_enter(QLOCK(wrq));
		}
		ASSERT(MUTEX_HELD(QLOCK(wrq)));

		if (strmsgsz != 0) {
			if (val == INFPSZ)
				val = strmsgsz;
			else  {
				if (STREAM(q)->sd_vnode->v_type == VFIFO)
					val = MIN(PIPE_BUF, val);
				else
					val = MIN(strmsgsz, val);
			}
		}
		STREAM(q)->sd_qn_maxpsz = val;
		if (freezer != curthread) {
			mutex_exit(QLOCK(wrq));
			mutex_enter(QLOCK(q));
		}
		break;

	case QMINPSZ:
		if (qbp)
			error = EINVAL;
		else
			q->q_minpsz = (ssize_t)val;

		/*
		 * Performance concern, strwrite looks at the module below
		 * the stream head for the maxpsz each time it does a write
		 * we now cache it at the stream head.  Check to see if this
		 * queue is sitting directly below the stream head.
		 */
		wrq = STREAM(q)->sd_wrq;
		if (q != wrq->q_next)
			break;

		/*
		 * If the stream is not frozen drop the current QLOCK and
		 * acquire the sd_wrq QLOCK which protects sd_qn_*
		 */
		if (freezer != curthread) {
			mutex_exit(QLOCK(q));
			mutex_enter(QLOCK(wrq));
		}
		STREAM(q)->sd_qn_minpsz = (ssize_t)val;

		if (freezer != curthread) {
			mutex_exit(QLOCK(wrq));
			mutex_enter(QLOCK(q));
		}
		break;

	case QSTRUIOT:
		if (qbp)
			error = EINVAL;
		else
			q->q_struiot = (ushort_t)val;
		break;

	case QCOUNT:
	case QFIRST:
	case QLAST:
	case QFLAG:
		error = EPERM;
		break;

	default:
		error = EINVAL;
		break;
	}
done:
	if (freezer != curthread)
		mutex_exit(QLOCK(q));
	return (error);
}

/*
 * Get queue fields.
 */
int
strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
{
	qband_t 	*qbp = NULL;
	int 		error = 0;
	kthread_id_t 	freezer;

	freezer = STREAM(q)->sd_freezer;
	if (freezer == curthread) {
		ASSERT(frozenstr(q));
		ASSERT(MUTEX_HELD(QLOCK(q)));
	} else
		mutex_enter(QLOCK(q));
	if (what >= QBAD) {
		error = EINVAL;
		goto done;
	}
	if (pri != 0) {
		int i;
		qband_t **qbpp;

		if (pri > q->q_nband) {
			qbpp = &q->q_bandp;
			while (*qbpp)
				qbpp = &(*qbpp)->qb_next;
			while (pri > q->q_nband) {
				if ((*qbpp = allocband()) == NULL) {
					error = EAGAIN;
					goto done;
				}
				(*qbpp)->qb_hiwat = q->q_hiwat;
				(*qbpp)->qb_lowat = q->q_lowat;
				q->q_nband++;
				qbpp = &(*qbpp)->qb_next;
			}
		}
		qbp = q->q_bandp;
		i = pri;
		while (--i)
			qbp = qbp->qb_next;
	}
	switch (what) {
	case QHIWAT:
		if (qbp)
			*(size_t *)valp = qbp->qb_hiwat;
		else
			*(size_t *)valp = q->q_hiwat;
		break;

	case QLOWAT:
		if (qbp)
			*(size_t *)valp = qbp->qb_lowat;
		else
			*(size_t *)valp = q->q_lowat;
		break;

	case QMAXPSZ:
		if (qbp)
			error = EINVAL;
		else
			*(ssize_t *)valp = q->q_maxpsz;
		break;

	case QMINPSZ:
		if (qbp)
			error = EINVAL;
		else
			*(ssize_t *)valp = q->q_minpsz;
		break;

	case QCOUNT:
		if (qbp)
			*(size_t *)valp = qbp->qb_count;
		else
			*(size_t *)valp = q->q_count;
		break;

	case QFIRST:
		if (qbp)
			*(mblk_t **)valp = qbp->qb_first;
		else
			*(mblk_t **)valp = q->q_first;
		break;

	case QLAST:
		if (qbp)
			*(mblk_t **)valp = qbp->qb_last;
		else
			*(mblk_t **)valp = q->q_last;
		break;

	case QFLAG:
		if (qbp)
			*(uint_t *)valp = qbp->qb_flag;
		else
			*(uint_t *)valp = q->q_flag;
		break;

	case QSTRUIOT:
		if (qbp)
			error = EINVAL;
		else
			*(short *)valp = q->q_struiot;
		break;

	default:
		error = EINVAL;
		break;
	}
done:
	if (freezer != curthread)
		mutex_exit(QLOCK(q));
	return (error);
}

/*
 * Function awakes all in cvwait/sigwait/pollwait, on one of:
 *	QWANTWSYNC or QWANTR or QWANTW,
 *
 * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
 *	 deferred wakeup will be done. Also if strpoll() in progress then a
 *	 deferred pollwakeup will be done.
 */
void
strwakeq(queue_t *q, int flag)
{
	stdata_t 	*stp = STREAM(q);
	pollhead_t 	*pl;

	mutex_enter(&stp->sd_lock);
	pl = &stp->sd_pollist;
	if (flag & QWANTWSYNC) {
		ASSERT(!(q->q_flag & QREADR));
		if (stp->sd_flag & WSLEEP) {
			stp->sd_flag &= ~WSLEEP;
			cv_broadcast(&stp->sd_wrq->q_wait);
		} else {
			stp->sd_wakeq |= WSLEEP;
		}

		mutex_exit(&stp->sd_lock);
		pollwakeup(pl, POLLWRNORM);
		mutex_enter(&stp->sd_lock);

		if (stp->sd_sigflags & S_WRNORM)
			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
	} else if (flag & QWANTR) {
		if (stp->sd_flag & RSLEEP) {
			stp->sd_flag &= ~RSLEEP;
			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
		} else {
			stp->sd_wakeq |= RSLEEP;
		}

		mutex_exit(&stp->sd_lock);
		pollwakeup(pl, POLLIN | POLLRDNORM);
		mutex_enter(&stp->sd_lock);

		{
			int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);

			if (events)
				strsendsig(stp->sd_siglist, events, 0, 0);
		}
	} else {
		if (stp->sd_flag & WSLEEP) {
			stp->sd_flag &= ~WSLEEP;
			cv_broadcast(&stp->sd_wrq->q_wait);
		}

		mutex_exit(&stp->sd_lock);
		pollwakeup(pl, POLLWRNORM);
		mutex_enter(&stp->sd_lock);

		if (stp->sd_sigflags & S_WRNORM)
			strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
	}
	mutex_exit(&stp->sd_lock);
}

int
struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
{
	stdata_t *stp = STREAM(q);
	int typ  = STRUIOT_STANDARD;
	uio_t	 *uiop = &dp->d_uio;
	dblk_t	 *dbp;
	ssize_t	 uiocnt;
	ssize_t	 cnt;
	unsigned char *ptr;
	ssize_t	 resid;
	int	 error = 0;
	on_trap_data_t otd;
	queue_t	*stwrq;

	/*
	 * Plumbing may change while taking the type so store the
	 * queue in a temporary variable. It doesn't matter even
	 * if the we take the type from the previous plumbing,
	 * that's because if the plumbing has changed when we were
	 * holding the queue in a temporary variable, we can continue
	 * processing the message the way it would have been processed
	 * in the old plumbing, without any side effects but a bit
	 * extra processing for partial ip header checksum.
	 *
	 * This has been done to avoid holding the sd_lock which is
	 * very hot.
	 */

	stwrq = stp->sd_struiowrq;
	if (stwrq)
		typ = stwrq->q_struiot;

	for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
		dbp = mp->b_datap;
		ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
		cnt = MIN(uiocnt, uiop->uio_resid);
		if (!(dbp->db_struioflag & STRUIO_SPEC) ||
		    (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
			/*
			 * Either this mblk has already been processed
			 * or there is no more room in this mblk (?).
			 */
			continue;
		}
		switch (typ) {
		case STRUIOT_STANDARD:
			if (noblock) {
				if (on_trap(&otd, OT_DATA_ACCESS)) {
					no_trap();
					error = EWOULDBLOCK;
					goto out;
				}
			}
			if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
				if (noblock)
					no_trap();
				goto out;
			}
			if (noblock)
				no_trap();
			break;

		default:
			error = EIO;
			goto out;
		}
		dbp->db_struioflag |= STRUIO_DONE;
		dbp->db_cksumstuff += cnt;
	}
out:
	if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
		/*
		 * A fault has occured and some bytes were moved to the
		 * current mblk, the uio_t has already been updated by
		 * the appropriate uio routine, so also update the mblk
		 * to reflect this in case this same mblk chain is used
		 * again (after the fault has been handled).
		 */
		uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
		if (uiocnt >= resid)
			dbp->db_cksumstuff += resid;
	}
	return (error);
}

/*
 * Try to enter queue synchronously. Any attempt to enter a closing queue will
 * fails. The qp->q_rwcnt keeps track of the number of successful entries so
 * that removeq() will not try to close the queue while a thread is inside the
 * queue.
 */
static boolean_t
rwnext_enter(queue_t *qp)
{
	mutex_enter(QLOCK(qp));
	if (qp->q_flag & QWCLOSE) {
		mutex_exit(QLOCK(qp));
		return (B_FALSE);
	}
	qp->q_rwcnt++;
	ASSERT(qp->q_rwcnt != 0);
	mutex_exit(QLOCK(qp));
	return (B_TRUE);
}

/*
 * Decrease the count of threads running in sync stream queue and wake up any
 * threads blocked in removeq().
 */
static void
rwnext_exit(queue_t *qp)
{
	mutex_enter(QLOCK(qp));
	qp->q_rwcnt--;
	if (qp->q_flag & QWANTRMQSYNC) {
		qp->q_flag &= ~QWANTRMQSYNC;
		cv_broadcast(&qp->q_wait);
	}
	mutex_exit(QLOCK(qp));
}

/*
 * The purpose of rwnext() is to call the rw procedure of the next
 * (downstream) modules queue.
 *
 * treated as put entrypoint for perimeter syncronization.
 *
 * There's no need to grab sq_putlocks here (which only exist for CIPUT
 * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
 * not matter if any regular put entrypoints have been already entered. We
 * can't increment one of the sq_putcounts (instead of sq_count) because
 * qwait_rw won't know which counter to decrement.
 *
 * It would be reasonable to add the lockless FASTPUT logic.
 */
int
rwnext(queue_t *qp, struiod_t *dp)
{
	queue_t		*nqp;
	syncq_t		*sq;
	uint16_t	count;
	uint16_t	flags;
	struct qinit	*qi;
	int		(*proc)();
	struct stdata	*stp;
	int		isread;
	int		rval;

	stp = STREAM(qp);
	/*
	 * Prevent q_next from changing by holding sd_lock until acquiring
	 * SQLOCK. Note that a read-side rwnext from the streamhead will
	 * already have sd_lock acquired. In either case sd_lock is always
	 * released after acquiring SQLOCK.
	 *
	 * The streamhead read-side holding sd_lock when calling rwnext is
	 * required to prevent a race condition were M_DATA mblks flowing
	 * up the read-side of the stream could be bypassed by a rwnext()
	 * down-call. In this case sd_lock acts as the streamhead perimeter.
	 */
	if ((nqp = _WR(qp)) == qp) {
		isread = 0;
		mutex_enter(&stp->sd_lock);
		qp = nqp->q_next;
	} else {
		isread = 1;
		if (nqp != stp->sd_wrq)
			/* Not streamhead */
			mutex_enter(&stp->sd_lock);
		qp = _RD(nqp->q_next);
	}
	qi = qp->q_qinfo;
	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
		/*
		 * Not a synchronous module or no r/w procedure for this
		 * queue, so just return EINVAL and let the caller handle it.
		 */
		mutex_exit(&stp->sd_lock);
		return (EINVAL);
	}

	if (rwnext_enter(qp) == B_FALSE) {
		mutex_exit(&stp->sd_lock);
		return (EINVAL);
	}

	sq = qp->q_syncq;
	mutex_enter(SQLOCK(sq));
	mutex_exit(&stp->sd_lock);
	count = sq->sq_count;
	flags = sq->sq_flags;
	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));

	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
		/*
		 * if this queue is being closed, return.
		 */
		if (qp->q_flag & QWCLOSE) {
			mutex_exit(SQLOCK(sq));
			rwnext_exit(qp);
			return (EINVAL);
		}

		/*
		 * Wait until we can enter the inner perimeter.
		 */
		sq->sq_flags = flags | SQ_WANTWAKEUP;
		cv_wait(&sq->sq_wait, SQLOCK(sq));
		count = sq->sq_count;
		flags = sq->sq_flags;
	}

	if (isread == 0 && stp->sd_struiowrq == NULL ||
	    isread == 1 && stp->sd_struiordq == NULL) {
		/*
		 * Stream plumbing changed while waiting for inner perimeter
		 * so just return EINVAL and let the caller handle it.
		 */
		mutex_exit(SQLOCK(sq));
		rwnext_exit(qp);
		return (EINVAL);
	}
	if (!(flags & SQ_CIPUT))
		sq->sq_flags = flags | SQ_EXCL;
	sq->sq_count = count + 1;
	ASSERT(sq->sq_count != 0);		/* Wraparound */
	/*
	 * Note: The only message ordering guarantee that rwnext() makes is
	 *	 for the write queue flow-control case. All others (r/w queue
	 *	 with q_count > 0 (or q_first != 0)) are the resposibilty of
	 *	 the queue's rw procedure. This could be genralized here buy
	 *	 running the queue's service procedure, but that wouldn't be
	 *	 the most efficent for all cases.
	 */
	mutex_exit(SQLOCK(sq));
	if (! isread && (qp->q_flag & QFULL)) {
		/*
		 * Write queue may be flow controlled. If so,
		 * mark the queue for wakeup when it's not.
		 */
		mutex_enter(QLOCK(qp));
		if (qp->q_flag & QFULL) {
			qp->q_flag |= QWANTWSYNC;
			mutex_exit(QLOCK(qp));
			rval = EWOULDBLOCK;
			goto out;
		}
		mutex_exit(QLOCK(qp));
	}

	if (! isread && dp->d_mp)
		STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
		    dp->d_mp->b_datap->db_base);

	rval = (*proc)(qp, dp);

	if (isread && dp->d_mp)
		STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
		    dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
out:
	/*
	 * The queue is protected from being freed by sq_count, so it is
	 * safe to call rwnext_exit and reacquire SQLOCK(sq).
	 */
	rwnext_exit(qp);

	mutex_enter(SQLOCK(sq));
	flags = sq->sq_flags;
	ASSERT(sq->sq_count != 0);
	sq->sq_count--;
	if (flags & SQ_TAIL) {
		putnext_tail(sq, qp, flags);
		/*
		 * The only purpose of this ASSERT is to preserve calling stack
		 * in DEBUG kernel.
		 */
		ASSERT(flags & SQ_TAIL);
		return (rval);
	}
	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
	/*
	 * Safe to always drop SQ_EXCL:
	 *	Not SQ_CIPUT means we set SQ_EXCL above
	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
	 *	did a qwriter(INNER) in which case nobody else
	 *	is in the inner perimeter and we are exiting.
	 *
	 * I would like to make the following assertion:
	 *
	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
	 * 	sq->sq_count == 0);
	 *
	 * which indicates that if we are both putshared and exclusive,
	 * we became exclusive while executing the putproc, and the only
	 * claim on the syncq was the one we dropped a few lines above.
	 * But other threads that enter putnext while the syncq is exclusive
	 * need to make a claim as they may need to drop SQLOCK in the
	 * has_writers case to avoid deadlocks.  If these threads are
	 * delayed or preempted, it is possible that the writer thread can
	 * find out that there are other claims making the (sq_count == 0)
	 * test invalid.
	 */

	sq->sq_flags = flags & ~SQ_EXCL;
	if (sq->sq_flags & SQ_WANTWAKEUP) {
		sq->sq_flags &= ~SQ_WANTWAKEUP;
		cv_broadcast(&sq->sq_wait);
	}
	mutex_exit(SQLOCK(sq));
	return (rval);
}

/*
 * The purpose of infonext() is to call the info procedure of the next
 * (downstream) modules queue.
 *
 * treated as put entrypoint for perimeter syncronization.
 *
 * There's no need to grab sq_putlocks here (which only exist for CIPUT
 * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
 * it does not matter if any regular put entrypoints have been already
 * entered.
 */
int
infonext(queue_t *qp, infod_t *idp)
{
	queue_t		*nqp;
	syncq_t		*sq;
	uint16_t	count;
	uint16_t 	flags;
	struct qinit	*qi;
	int		(*proc)();
	struct stdata	*stp;
	int		rval;

	stp = STREAM(qp);
	/*
	 * Prevent q_next from changing by holding sd_lock until
	 * acquiring SQLOCK.
	 */
	mutex_enter(&stp->sd_lock);
	if ((nqp = _WR(qp)) == qp) {
		qp = nqp->q_next;
	} else {
		qp = _RD(nqp->q_next);
	}
	qi = qp->q_qinfo;
	if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
		mutex_exit(&stp->sd_lock);
		return (EINVAL);
	}
	sq = qp->q_syncq;
	mutex_enter(SQLOCK(sq));
	mutex_exit(&stp->sd_lock);
	count = sq->sq_count;
	flags = sq->sq_flags;
	ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));

	while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
		/*
		 * Wait until we can enter the inner perimeter.
		 */
		sq->sq_flags = flags | SQ_WANTWAKEUP;
		cv_wait(&sq->sq_wait, SQLOCK(sq));
		count = sq->sq_count;
		flags = sq->sq_flags;
	}

	if (! (flags & SQ_CIPUT))
		sq->sq_flags = flags | SQ_EXCL;
	sq->sq_count = count + 1;
	ASSERT(sq->sq_count != 0);		/* Wraparound */
	mutex_exit(SQLOCK(sq));

	rval = (*proc)(qp, idp);

	mutex_enter(SQLOCK(sq));
	flags = sq->sq_flags;
	ASSERT(sq->sq_count != 0);
	sq->sq_count--;
	if (flags & SQ_TAIL) {
		putnext_tail(sq, qp, flags);
		/*
		 * The only purpose of this ASSERT is to preserve calling stack
		 * in DEBUG kernel.
		 */
		ASSERT(flags & SQ_TAIL);
		return (rval);
	}
	ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
/*
 * XXXX
 * I am not certain the next comment is correct here.  I need to consider
 * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
 * might cause other problems.  It just might be safer to drop it if
 * !SQ_CIPUT because that is when we set it.
 */
	/*
	 * Safe to always drop SQ_EXCL:
	 *	Not SQ_CIPUT means we set SQ_EXCL above
	 *	For SQ_CIPUT SQ_EXCL will only be set if the put procedure
	 *	did a qwriter(INNER) in which case nobody else
	 *	is in the inner perimeter and we are exiting.
	 *
	 * I would like to make the following assertion:
	 *
	 * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
	 *	sq->sq_count == 0);
	 *
	 * which indicates that if we are both putshared and exclusive,
	 * we became exclusive while executing the putproc, and the only
	 * claim on the syncq was the one we dropped a few lines above.
	 * But other threads that enter putnext while the syncq is exclusive
	 * need to make a claim as they may need to drop SQLOCK in the
	 * has_writers case to avoid deadlocks.  If these threads are
	 * delayed or preempted, it is possible that the writer thread can
	 * find out that there are other claims making the (sq_count == 0)
	 * test invalid.
	 */

	sq->sq_flags = flags & ~SQ_EXCL;
	mutex_exit(SQLOCK(sq));
	return (rval);
}

/*
 * Return nonzero if the queue is responsible for struio(), else return 0.
 */
int
isuioq(queue_t *q)
{
	if (q->q_flag & QREADR)
		return (STREAM(q)->sd_struiordq == q);
	else
		return (STREAM(q)->sd_struiowrq == q);
}

#if defined(__sparc)
int disable_putlocks = 0;
#else
int disable_putlocks = 1;
#endif

/*
 * called by create_putlock.
 */
static void
create_syncq_putlocks(queue_t *q)
{
	syncq_t	*sq = q->q_syncq;
	ciputctrl_t *cip;
	int i;

	ASSERT(sq != NULL);

	ASSERT(disable_putlocks == 0);
	ASSERT(n_ciputctrl >= min_n_ciputctrl);
	ASSERT(ciputctrl_cache != NULL);

	if (!(sq->sq_type & SQ_CIPUT))
		return;

	for (i = 0; i <= 1; i++) {
		if (sq->sq_ciputctrl == NULL) {
			cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
			SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
			mutex_enter(SQLOCK(sq));
			if (sq->sq_ciputctrl != NULL) {
				mutex_exit(SQLOCK(sq));
				kmem_cache_free(ciputctrl_cache, cip);
			} else {
				ASSERT(sq->sq_nciputctrl == 0);
				sq->sq_nciputctrl = n_ciputctrl - 1;
				/*
				 * putnext checks sq_ciputctrl without holding
				 * SQLOCK. if it is not NULL putnext assumes
				 * sq_nciputctrl is initialized. membar below
				 * insures that.
				 */
				membar_producer();
				sq->sq_ciputctrl = cip;
				mutex_exit(SQLOCK(sq));
			}
		}
		ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
		if (i == 1)
			break;
		q = _OTHERQ(q);
		if (!(q->q_flag & QPERQ)) {
			ASSERT(sq == q->q_syncq);
			break;
		}
		ASSERT(q->q_syncq != NULL);
		ASSERT(sq != q->q_syncq);
		sq = q->q_syncq;
		ASSERT(sq->sq_type & SQ_CIPUT);
	}
}

/*
 * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
 * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
 * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
 * starting from q and down to the driver.
 *
 * This should be called after the affected queues are part of stream
 * geometry. It should be called from driver/module open routine after
 * qprocson() call. It is also called from nfs syscall where it is known that
 * stream is configured and won't change its geometry during create_putlock
 * call.
 *
 * caller normally uses 0 value for the stream argument to speed up MT putnext
 * into the perimeter of q for example because its perimeter is per module
 * (e.g. IP).
 *
 * caller normally uses non 0 value for the stream argument to hint the system
 * that the stream of q is a very contended global system stream
 * (e.g. NFS/UDP) and the part of the stream from q to the driver is
 * particularly MT hot.
 *
 * Caller insures stream plumbing won't happen while we are here and therefore
 * q_next can be safely used.
 */

void
create_putlocks(queue_t *q, int stream)
{
	ciputctrl_t	*cip;
	struct stdata	*stp = STREAM(q);

	q = _WR(q);
	ASSERT(stp != NULL);

	if (disable_putlocks != 0)
		return;

	if (n_ciputctrl < min_n_ciputctrl)
		return;

	ASSERT(ciputctrl_cache != NULL);

	if (stream != 0 && stp->sd_ciputctrl == NULL) {
		cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
		SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
		mutex_enter(&stp->sd_lock);
		if (stp->sd_ciputctrl != NULL) {
			mutex_exit(&stp->sd_lock);
			kmem_cache_free(ciputctrl_cache, cip);
		} else {
			ASSERT(stp->sd_nciputctrl == 0);
			stp->sd_nciputctrl = n_ciputctrl - 1;
			/*
			 * putnext checks sd_ciputctrl without holding
			 * sd_lock. if it is not NULL putnext assumes
			 * sd_nciputctrl is initialized. membar below
			 * insures that.
			 */
			membar_producer();
			stp->sd_ciputctrl = cip;
			mutex_exit(&stp->sd_lock);
		}
	}

	ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);

	while (_SAMESTR(q)) {
		create_syncq_putlocks(q);
		if (stream == 0)
			return;
		q = q->q_next;
	}
	ASSERT(q != NULL);
	create_syncq_putlocks(q);
}

/*
 * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
 * through a stream.
 *
 * Data currently record per event is a hrtime stamp, queue address, event
 * type, and a per type datum.  Much of the STREAMS framework is instrumented
 * for automatic flow tracing (when enabled).  Events can be defined and used
 * by STREAMS modules and drivers.
 *
 * Global objects:
 *
 *	str_ftevent() - Add a flow-trace event to a dblk.
 *	str_ftfree() - Free flow-trace data
 *
 * Local objects:
 *
 *	fthdr_cache - pointer to the kmem cache for trace header.
 *	ftblk_cache - pointer to the kmem cache for trace data blocks.
 */

int str_ftnever = 1;	/* Don't do STREAMS flow tracing */

void
str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
{
	ftblk_t *bp = hp->tail;
	ftblk_t *nbp;
	ftevnt_t *ep;
	int ix, nix;

	ASSERT(hp != NULL);

	for (;;) {
		if ((ix = bp->ix) == FTBLK_EVNTS) {
			/*
			 * Tail doesn't have room, so need a new tail.
			 *
			 * To make this MT safe, first, allocate a new
			 * ftblk, and initialize it.  To make life a
			 * little easier, reserve the first slot (mostly
			 * by making ix = 1).  When we are finished with
			 * the initialization, CAS this pointer to the
			 * tail.  If this succeeds, this is the new
			 * "next" block.  Otherwise, another thread
			 * got here first, so free the block and start
			 * again.
			 */
			if (!(nbp = kmem_cache_alloc(ftblk_cache,
			    KM_NOSLEEP))) {
				/* no mem, so punt */
				str_ftnever++;
				/* free up all flow data? */
				return;
			}
			nbp->nxt = NULL;
			nbp->ix = 1;
			/*
			 * Just in case there is another thread about
			 * to get the next index, we need to make sure
			 * the value is there for it.
			 */
			membar_producer();
			if (casptr(&hp->tail, bp, nbp) == bp) {
				/* CAS was successful */
				bp->nxt = nbp;
				membar_producer();
				bp = nbp;
				ix = 0;
				goto cas_good;
			} else {
				kmem_cache_free(ftblk_cache, nbp);
				bp = hp->tail;
				continue;
			}
		}
		nix = ix + 1;
		if (cas32((uint32_t *)&bp->ix, ix, nix) == ix) {
		cas_good:
			if (curthread != hp->thread) {
				hp->thread = curthread;
				evnt |= FTEV_CS;
			}
			if (CPU->cpu_seqid != hp->cpu_seqid) {
				hp->cpu_seqid = CPU->cpu_seqid;
				evnt |= FTEV_PS;
			}
			ep = &bp->ev[ix];
			break;
		}
	}

	if (evnt & FTEV_QMASK) {
		queue_t *qp = p;

		/*
		 * It is possible that the module info is broke
		 * (as is logsubr.c at this comment writing).
		 * Instead of panicing or doing other unmentionables,
		 * we shall put a dummy name as the mid, and continue.
		 */
		if (qp->q_qinfo == NULL)
			ep->mid = "NONAME";
		else
			ep->mid = qp->q_qinfo->qi_minfo->mi_idname;

		if (!(qp->q_flag & QREADR))
			evnt |= FTEV_ISWR;
	} else {
		ep->mid = (char *)p;
	}

	ep->ts = gethrtime();
	ep->evnt = evnt;
	ep->data = data;
	hp->hash = (hp->hash << 9) + hp->hash;
	hp->hash += (evnt << 16) | data;
	hp->hash += (uintptr_t)ep->mid;
}

/*
 * Free flow-trace data.
 */
void
str_ftfree(dblk_t *dbp)
{
	fthdr_t *hp = dbp->db_fthdr;
	ftblk_t *bp = &hp->first;
	ftblk_t *nbp;

	if (bp != hp->tail || bp->ix != 0) {
		/*
		 * Clear out the hash, have the tail point to itself, and free
		 * any continuation blocks.
		 */
		bp = hp->first.nxt;
		hp->tail = &hp->first;
		hp->hash = 0;
		hp->first.nxt = NULL;
		hp->first.ix = 0;
		while (bp != NULL) {
			nbp = bp->nxt;
			kmem_cache_free(ftblk_cache, bp);
			bp = nbp;
		}
	}
	kmem_cache_free(fthdr_cache, hp);
	dbp->db_fthdr = NULL;
}