changeset 3932:efce29b04ab4

6522339 Performance enhancement with enhanced esballoc design
author ss146032
date Fri, 30 Mar 2007 04:14:30 -0700
parents 0347144fd43b
children ea83e56013c8
files usr/src/uts/common/io/stream.c usr/src/uts/common/os/strsubr.c usr/src/uts/common/sys/strsubr.h
diffstat 3 files changed, 156 insertions(+), 8 deletions(-) [+]
line wrap: on
line diff
--- a/usr/src/uts/common/io/stream.c	Fri Mar 30 04:12:45 2007 -0700
+++ b/usr/src/uts/common/io/stream.c	Fri Mar 30 04:14:30 2007 -0700
@@ -23,7 +23,7 @@
 
 
 /*
- * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
+ * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
  */
 
@@ -367,6 +367,9 @@
 
 	/* Initialize Multidata caches */
 	mmd_init();
+
+	/* initialize throttling queue for esballoc */
+	esballoc_queue_init();
 }
 
 /*ARGSUSED*/
--- a/usr/src/uts/common/os/strsubr.c	Fri Mar 30 04:12:45 2007 -0700
+++ b/usr/src/uts/common/os/strsubr.c	Fri Mar 30 04:14:30 2007 -0700
@@ -331,6 +331,24 @@
 
 static linkinfo_t *linkinfo_list;
 
+/* global esballoc throttling queue */
+static esb_queue_t	system_esbq;
+
+/*
+ * esballoc tunable parameters.
+ */
+int		esbq_max_qlen = 0x16;	/* throttled queue length */
+clock_t		esbq_timeout = 0x8;	/* timeout to process esb queue */
+
+/*
+ * routines to handle esballoc queuing.
+ */
+static void esballoc_process_queue(esb_queue_t *);
+static void esballoc_enqueue_mblk(mblk_t *);
+static void esballoc_timer(void *);
+static void esballoc_set_timer(esb_queue_t *, clock_t);
+static void esballoc_mblk_free(mblk_t *);
+
 /*
  *  Qinit structure and Module_info structures
  *	for passthru read and write queues
@@ -3903,10 +3921,11 @@
  * share the same fifolock_t).
  */
 
-/* ARGSUSED */
 void
 freebs_enqueue(mblk_t *mp, dblk_t *dbp)
 {
+	esb_queue_t *eqp = &system_esbq;
+
 	ASSERT(dbp->db_mblk == mp);
 
 	/*
@@ -3916,25 +3935,131 @@
 	 */
 	if (dbp->db_frtnp->free_func == NULL) {
 		panic("freebs_enqueue: dblock %p has a NULL free callback",
-		    (void *) dbp);
-	}
-
-	STRSTAT(freebs);
-	if (taskq_dispatch(streams_taskq, (task_func_t *)mblk_free, mp,
+		    (void *)dbp);
+	}
+
+	mutex_enter(&eqp->eq_lock);
+	/* queue the new mblk on the esballoc queue */
+	if (eqp->eq_head == NULL) {
+		eqp->eq_head = eqp->eq_tail = mp;
+	} else {
+		eqp->eq_tail->b_next = mp;
+		eqp->eq_tail = mp;
+	}
+	eqp->eq_len++;
+
+	/* If we're the first thread to reach the threshold, process */
+	if (eqp->eq_len >= esbq_max_qlen &&
+	    !(eqp->eq_flags & ESBQ_PROCESSING))
+		esballoc_process_queue(eqp);
+
+	esballoc_set_timer(eqp, esbq_timeout);
+	mutex_exit(&eqp->eq_lock);
+}
+
+static void
+esballoc_process_queue(esb_queue_t *eqp)
+{
+	mblk_t	*mp;
+
+	ASSERT(MUTEX_HELD(&eqp->eq_lock));
+
+	eqp->eq_flags |= ESBQ_PROCESSING;
+
+	do {
+		/*
+		 * Detach the message chain for processing.
+		 */
+		mp = eqp->eq_head;
+		eqp->eq_tail->b_next = NULL;
+		eqp->eq_head = eqp->eq_tail = NULL;
+		eqp->eq_len = 0;
+		mutex_exit(&eqp->eq_lock);
+
+		/*
+		 * Process the message chain.
+		 */
+		esballoc_enqueue_mblk(mp);
+		mutex_enter(&eqp->eq_lock);
+	} while ((eqp->eq_len >= esbq_max_qlen) && (eqp->eq_len > 0));
+
+	eqp->eq_flags &= ~ESBQ_PROCESSING;
+}
+
+/*
+ * taskq callback routine to free esballoced mblk's
+ */
+static void
+esballoc_mblk_free(mblk_t *mp)
+{
+	mblk_t	*nextmp;
+
+	for (; mp != NULL; mp = nextmp) {
+		nextmp = mp->b_next;
+		mp->b_next = NULL;
+		mblk_free(mp);
+	}
+}
+
+static void
+esballoc_enqueue_mblk(mblk_t *mp)
+{
+
+	if (taskq_dispatch(system_taskq, (task_func_t *)esballoc_mblk_free, mp,
 	    TQ_NOSLEEP) == NULL) {
+		mblk_t *first_mp = mp;
 		/*
 		 * System is low on resources and can't perform a non-sleeping
 		 * dispatch. Schedule for a background thread.
 		 */
 		mutex_enter(&service_queue);
 		STRSTAT(taskqfails);
+
+		while (mp->b_next != NULL)
+			mp = mp->b_next;
+
 		mp->b_next = freebs_list;
-		freebs_list = mp;
+		freebs_list = first_mp;
 		cv_signal(&services_to_run);
 		mutex_exit(&service_queue);
 	}
 }
 
+static void
+esballoc_timer(void *arg)
+{
+	esb_queue_t *eqp = arg;
+
+	mutex_enter(&eqp->eq_lock);
+	eqp->eq_flags &= ~ESBQ_TIMER;
+
+	if (!(eqp->eq_flags & ESBQ_PROCESSING) &&
+	    eqp->eq_len > 0)
+		esballoc_process_queue(eqp);
+
+	esballoc_set_timer(eqp, esbq_timeout);
+	mutex_exit(&eqp->eq_lock);
+}
+
+static void
+esballoc_set_timer(esb_queue_t *eqp, clock_t eq_timeout)
+{
+	ASSERT(MUTEX_HELD(&eqp->eq_lock));
+
+	if (eqp->eq_len > 0 && !(eqp->eq_flags & ESBQ_TIMER)) {
+		(void) timeout(esballoc_timer, eqp, eq_timeout);
+		eqp->eq_flags |= ESBQ_TIMER;
+	}
+}
+
+void
+esballoc_queue_init(void)
+{
+	system_esbq.eq_len = 0;
+	system_esbq.eq_head = system_esbq.eq_tail = NULL;
+	system_esbq.eq_flags = 0;
+}
+
 /*
  * Set the QBACK or QB_BACK flag in the given queue for
  * the given priority band.
--- a/usr/src/uts/common/sys/strsubr.h	Fri Mar 30 04:12:45 2007 -0700
+++ b/usr/src/uts/common/sys/strsubr.h	Fri Mar 30 04:14:30 2007 -0700
@@ -1268,6 +1268,26 @@
 extern int min_n_ciputctrl;
 
 extern cdevsw_impl_t *devimpl;
+
+/*
+ * esballoc queue for throttling
+ */
+typedef struct esb_queue {
+	kmutex_t	eq_lock;
+	uint_t		eq_len;		/* number of queued messages */
+	mblk_t		*eq_head;	/* head of queue */
+	mblk_t		*eq_tail;	/* tail of queue */
+	uint_t		eq_flags;	/* esballoc queue flags */
+} esb_queue_t;
+
+/*
+ * esballoc flags for queue processing.
+ */
+#define	ESBQ_PROCESSING	0x01	/* queue is being processed */
+#define	ESBQ_TIMER	0x02	/* timer is active */
+
+extern void esballoc_queue_init(void);
+
 #endif	/* _KERNEL */
 
 /*