changeset 4741:db206cc52130

4859528 svc_poll can loop forever not giving up the cpu
author gt29601
date Fri, 27 Jul 2007 08:36:40 -0700
parents 4134fdfef194
children f132fa3a9b6f
files usr/src/uts/common/rpc/rpcmod.c usr/src/uts/common/rpc/svc.c
diffstat 2 files changed, 234 insertions(+), 234 deletions(-) [+]
line wrap: on
line diff
--- a/usr/src/uts/common/rpc/rpcmod.c	Fri Jul 27 08:24:13 2007 -0700
+++ b/usr/src/uts/common/rpc/rpcmod.c	Fri Jul 27 08:36:40 2007 -0700
@@ -27,7 +27,6 @@
 /*	Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T	*/
 /*	  All Rights Reserved  	*/
 
-
 #pragma ident	"%Z%%M%	%I%	%E% SMI"
 
 /*
@@ -769,124 +768,125 @@
 		 * Forward this message to krpc if it is data.
 		 */
 		if (pptr->type == T_UNITDATA_IND) {
-		    mblk_t *nmp;
+			mblk_t *nmp;
 
 		/*
 		 * Check if the module is being popped.
 		 */
-		    mutex_enter(&rmp->rm_lock);
-		    if (rmp->rm_state & RM_CLOSING) {
-			mutex_exit(&rmp->rm_lock);
-			putnext(q, mp);
-			break;
-		    }
-
-		    switch (rmp->rm_type) {
-		    case RPC_CLIENT:
-			mutex_exit(&rmp->rm_lock);
+			mutex_enter(&rmp->rm_lock);
+			if (rmp->rm_state & RM_CLOSING) {
+				mutex_exit(&rmp->rm_lock);
+				putnext(q, mp);
+				break;
+			}
+
+			switch (rmp->rm_type) {
+			case RPC_CLIENT:
+				mutex_exit(&rmp->rm_lock);
+				hdrsz = mp->b_wptr - mp->b_rptr;
+
+				/*
+				 * Make sure the header is sane.
+				 */
+				if (hdrsz < TUNITDATAINDSZ ||
+				    hdrsz < (pptr->unitdata_ind.OPT_length +
+				    pptr->unitdata_ind.OPT_offset) ||
+				    hdrsz < (pptr->unitdata_ind.SRC_length +
+				    pptr->unitdata_ind.SRC_offset)) {
+					freemsg(mp);
+					return;
+				}
+
+				/*
+				 * Call clnt_clts_dispatch_notify, so that it
+				 * can pass the message to the proper caller.
+				 * Don't discard the header just yet since the
+				 * client may need the sender's address.
+				 */
+				clnt_clts_dispatch_notify(mp, hdrsz,
+				    rmp->rm_zoneid);
+				return;
+			case RPC_SERVER:
+				/*
+				 * rm_krpc_cell is exclusively used by the kRPC
+				 * CLTS server
+				 */
+				if (rmp->rm_krpc_cell) {
+#ifdef DEBUG
+					/*
+					 * Test duplicate request cache and
+					 * rm_ref count handling by sending a
+					 * duplicate every so often, if
+					 * desired.
+					 */
+					if (rpcmod_send_dup &&
+					    rpcmod_send_dup_cnt++ %
+					    rpcmod_send_dup)
+						nmp = copymsg(mp);
+					else
+						nmp = NULL;
+#endif
+					/*
+					 * Raise the reference count on this
+					 * module to prevent it from being
+					 * popped before krpc generates the
+					 * reply.
+					 */
+					rmp->rm_ref++;
+					mutex_exit(&rmp->rm_lock);
+
+					/*
+					 * Submit the message to krpc.
+					 */
+					svc_queuereq(q, mp);
+#ifdef DEBUG
+					/*
+					 * Send duplicate if we created one.
+					 */
+					if (nmp) {
+						mutex_enter(&rmp->rm_lock);
+						rmp->rm_ref++;
+						mutex_exit(&rmp->rm_lock);
+						svc_queuereq(q, nmp);
+					}
+#endif
+				} else {
+					mutex_exit(&rmp->rm_lock);
+					freemsg(mp);
+				}
+				return;
+			default:
+				mutex_exit(&rmp->rm_lock);
+				freemsg(mp);
+				return;
+			} /* end switch(rmp->rm_type) */
+		} else if (pptr->type == T_UDERROR_IND) {
+			mutex_enter(&rmp->rm_lock);
 			hdrsz = mp->b_wptr - mp->b_rptr;
 
 			/*
-			 * Make sure the header is sane.
+			 * Make sure the header is sane
 			 */
-			if (hdrsz < TUNITDATAINDSZ ||
-				hdrsz < (pptr->unitdata_ind.OPT_length +
-					pptr->unitdata_ind.OPT_offset) ||
-				hdrsz < (pptr->unitdata_ind.SRC_length +
-					pptr->unitdata_ind.SRC_offset)) {
-					freemsg(mp);
-					return;
+			if (hdrsz < TUDERRORINDSZ ||
+			    hdrsz < (pptr->uderror_ind.OPT_length +
+			    pptr->uderror_ind.OPT_offset) ||
+			    hdrsz < (pptr->uderror_ind.DEST_length +
+			    pptr->uderror_ind.DEST_offset)) {
+				mutex_exit(&rmp->rm_lock);
+				freemsg(mp);
+				return;
 			}
 
 			/*
-			 * Call clnt_clts_dispatch_notify, so that it can
-			 * pass the message to the proper caller.  Don't
-			 * discard the header just yet since the client may
-			 * need the sender's address.
-			 */
-			clnt_clts_dispatch_notify(mp, hdrsz, rmp->rm_zoneid);
-			return;
-		    case RPC_SERVER:
-			/*
-			 * rm_krpc_cell is exclusively used by the kRPC
-			 * CLTS server
+			 * In the case where a unit data error has been
+			 * received, all we need to do is clear the message from
+			 * the queue.
 			 */
-			if (rmp->rm_krpc_cell) {
-#ifdef DEBUG
-				/*
-				 * Test duplicate request cache and
-				 * rm_ref count handling by sending a
-				 * duplicate every so often, if
-				 * desired.
-				 */
-				if (rpcmod_send_dup &&
-				    rpcmod_send_dup_cnt++ %
-				    rpcmod_send_dup)
-					nmp = copymsg(mp);
-				else
-					nmp = NULL;
-#endif
-				/*
-				 * Raise the reference count on this
-				 * module to prevent it from being
-				 * popped before krpc generates the
-				 * reply.
-				 */
-				rmp->rm_ref++;
-				mutex_exit(&rmp->rm_lock);
-
-				/*
-				 * Submit the message to krpc.
-				 */
-				svc_queuereq(q, mp);
-#ifdef DEBUG
-				/*
-				 * Send duplicate if we created one.
-				 */
-				if (nmp) {
-					mutex_enter(&rmp->rm_lock);
-					rmp->rm_ref++;
-					mutex_exit(&rmp->rm_lock);
-					svc_queuereq(q, nmp);
-				}
-#endif
-			} else {
-				mutex_exit(&rmp->rm_lock);
-				freemsg(mp);
-			}
-			return;
-		    default:
 			mutex_exit(&rmp->rm_lock);
 			freemsg(mp);
+			RPCLOG(32, "rpcmodrput: unitdata error received at "
+			    "%ld\n", gethrestime_sec());
 			return;
-		    } /* end switch(rmp->rm_type) */
-		} else if (pptr->type == T_UDERROR_IND) {
-		    mutex_enter(&rmp->rm_lock);
-		    hdrsz = mp->b_wptr - mp->b_rptr;
-
-		/*
-		 * Make sure the header is sane
-		 */
-		    if (hdrsz < TUDERRORINDSZ ||
-			hdrsz < (pptr->uderror_ind.OPT_length +
-				pptr->uderror_ind.OPT_offset) ||
-			hdrsz < (pptr->uderror_ind.DEST_length +
-				pptr->uderror_ind.DEST_offset)) {
-			    mutex_exit(&rmp->rm_lock);
-			    freemsg(mp);
-			    return;
-		    }
-
-		/*
-		 * In the case where a unit data error has been
-		 * received, all we need to do is clear the message from
-		 * the queue.
-		 */
-		    mutex_exit(&rmp->rm_lock);
-		    freemsg(mp);
-		    RPCLOG(32, "rpcmodrput: unitdata error received at "
-				"%ld\n", gethrestime_sec());
-		    return;
 		} /* end else if (pptr->type == T_UDERROR_IND) */
 
 		putnext(q, mp);
@@ -894,7 +894,7 @@
 	} /* end switch (mp->b_datap->db_type) */
 
 	TRACE_0(TR_FAC_KRPC, TR_RPCMODRPUT_END,
-		"rpcmodrput_end:");
+	    "rpcmodrput_end:");
 	/*
 	 * Return codes are not looked at by the STREAMS framework.
 	 */
@@ -911,12 +911,12 @@
 	ASSERT(q != NULL);
 
 	switch (mp->b_datap->db_type) {
-	    case M_PROTO:
-	    case M_PCPROTO:
-		    break;
-	    default:
-		    rpcmodwput_other(q, mp);
-		    return;
+		case M_PROTO:
+		case M_PCPROTO:
+			break;
+		default:
+			rpcmodwput_other(q, mp);
+			return;
 	}
 
 	/*
@@ -965,21 +965,21 @@
 			iocp = (struct iocblk *)mp->b_rptr;
 			ASSERT(iocp != NULL);
 			switch (iocp->ioc_cmd) {
-			    case RPC_CLIENT:
-			    case RPC_SERVER:
-				    mutex_enter(&rmp->rm_lock);
-				    rmp->rm_type = iocp->ioc_cmd;
-				    mutex_exit(&rmp->rm_lock);
-				    mp->b_datap->db_type = M_IOCACK;
-				    qreply(q, mp);
-				    return;
-			    default:
+				case RPC_CLIENT:
+				case RPC_SERVER:
+					mutex_enter(&rmp->rm_lock);
+					rmp->rm_type = iocp->ioc_cmd;
+					mutex_exit(&rmp->rm_lock);
+					mp->b_datap->db_type = M_IOCACK;
+					qreply(q, mp);
+					return;
+				default:
 				/*
 				 * pass the ioctl downstream and hope someone
 				 * down there knows how to handle it.
 				 */
-				    putnext(q, mp);
-				    return;
+					putnext(q, mp);
+					return;
 			}
 		default:
 			break;
@@ -1223,7 +1223,7 @@
 		while ((!MIR_SVC_QUIESCED(mir)) || mir->mir_inwservice == 1) {
 
 			if (mir->mir_ref_cnt && !mir->mir_inrservice &&
-					(queue_cleaned == FALSE)) {
+			    (queue_cleaned == FALSE)) {
 				/*
 				 * call into SVC to clean the queue
 				 */
@@ -1298,7 +1298,7 @@
 	 */
 	if (mir->mir_closing) {
 		RPCLOG(16, "mir_svc_idle_start - closing: 0x%p\n",
-			(void *)q);
+		    (void *)q);
 
 		/*
 		 * We will call mir_svc_idle_start() whenever MIR_SVC_QUIESCED()
@@ -1311,7 +1311,7 @@
 
 	} else {
 		RPCLOG(16, "mir_svc_idle_start - reset %s timer\n",
-			mir->mir_ordrel_pending ? "ordrel" : "normal");
+		    mir->mir_ordrel_pending ? "ordrel" : "normal");
 		/*
 		 * Normal condition, start the idle timer.  If an orderly
 		 * release has been sent, set the timeout to wait for the
@@ -1465,7 +1465,7 @@
 
 			stropts = (struct stroptions *)mp->b_rptr;
 			if ((stropts->so_flags & SO_HIWAT) &&
-				!(stropts->so_flags & SO_BAND)) {
+			    !(stropts->so_flags & SO_BAND)) {
 				(void) strqset(q, QHIWAT, 0, stropts->so_hiwat);
 			}
 		}
@@ -1571,7 +1571,7 @@
 						mblk_t *smp = head_mp;
 
 						while ((smp->b_cont != NULL) &&
-							(smp->b_cont != mp))
+						    (smp->b_cont != mp))
 							smp = smp->b_cont;
 						smp->b_cont = cont_mp;
 						/*
@@ -1683,9 +1683,9 @@
 				 * block.
 				 */
 				if (!(frag_header & MIR_LASTFRAG) ||
-					(frag_len -
-					(frag_header & ~MIR_LASTFRAG)) ||
-					!head_mp)
+				    (frag_len -
+				    (frag_header & ~MIR_LASTFRAG)) ||
+				    !head_mp)
 					goto same_mblk;
 
 				/*
@@ -1862,28 +1862,29 @@
 			 */
 
 			if (!mir->mir_hold_inbound) {
-			    if (mir->mir_krpc_cell) {
-				/*
-				 * If the reference count is 0
-				 * (not including this request),
-				 * then the stream is transitioning
-				 * from idle to non-idle.  In this case,
-				 * we cancel the idle timer.
-				 */
-				if (mir->mir_ref_cnt++ == 0)
-					stop_timer = B_TRUE;
-				if (mir_check_len(q,
-					(int32_t)msgdsize(mp), mp))
+				if (mir->mir_krpc_cell) {
+					/*
+					 * If the reference count is 0
+					 * (not including this request),
+					 * then the stream is transitioning
+					 * from idle to non-idle.  In this case,
+					 * we cancel the idle timer.
+					 */
+					if (mir->mir_ref_cnt++ == 0)
+						stop_timer = B_TRUE;
+					if (mir_check_len(q,
+					    (int32_t)msgdsize(mp), mp))
 						return;
-				svc_queuereq(q, head_mp); /* to KRPC */
-			    } else {
-				/*
-				 * Count # of times this happens. Should be
-				 * never, but experience shows otherwise.
-				 */
-				mir_krpc_cell_null++;
-				freemsg(head_mp);
-			    }
+					svc_queuereq(q, head_mp); /* to KRPC */
+				} else {
+					/*
+					 * Count # of times this happens. Should
+					 * be never, but experience shows
+					 * otherwise.
+					 */
+					mir_krpc_cell_null++;
+					freemsg(head_mp);
+				}
 
 			} else {
 				/*
@@ -1901,7 +1902,7 @@
 			break;
 		default:
 			RPCLOG(1, "mir_rput: unknown mir_type %d\n",
-				mir->mir_type);
+			    mir->mir_type);
 			freemsg(head_mp);
 			break;
 		}
@@ -1920,7 +1921,7 @@
 	 * getting excessively large, shut down the connection.
 	 */
 	if (head_mp != NULL && mir->mir_setup_complete &&
-		mir_check_len(q, frag_len, head_mp))
+	    mir_check_len(q, frag_len, head_mp))
 		return;
 
 	/* Save our local copies back in the mir structure. */
@@ -1968,8 +1969,8 @@
 	case RPC_CLIENT:
 		switch (type) {
 		case T_DISCON_IND:
-		    reason =
-			((struct T_discon_ind *)(mp->b_rptr))->DISCON_reason;
+			reason = ((struct T_discon_ind *)
+			    (mp->b_rptr))->DISCON_reason;
 		    /*FALLTHROUGH*/
 		case T_ORDREL_IND:
 			mutex_enter(&mir->mir_mutex);
@@ -2013,11 +2014,11 @@
 
 			terror = (struct T_error_ack *)mp->b_rptr;
 			RPCLOG(1, "mir_rput_proto T_ERROR_ACK for queue 0x%p",
-				(void *)q);
+			    (void *)q);
 			RPCLOG(1, " ERROR_prim: %s,",
-				rpc_tpiprim2name(terror->ERROR_prim));
+			    rpc_tpiprim2name(terror->ERROR_prim));
 			RPCLOG(1, " TLI_error: %s,",
-				rpc_tpierr2name(terror->TLI_error));
+			    rpc_tpierr2name(terror->TLI_error));
 			RPCLOG(1, " UNIX_error: %d\n", terror->UNIX_error);
 			if (terror->ERROR_prim == T_DISCON_REQ)  {
 				clnt_dispatch_notifyall(WR(q), type, reason);
@@ -2087,8 +2088,8 @@
 		case T_DISCON_IND:
 		case T_ORDREL_IND:
 			RPCLOG(16, "mir_rput_proto: got %s indication\n",
-				type == T_DISCON_IND ? "disconnect"
-				: "orderly release");
+			    type == T_DISCON_IND ? "disconnect"
+			    : "orderly release");
 
 			/*
 			 * For listen endpoint just pass
@@ -2116,8 +2117,8 @@
 			}
 
 			RPCLOG(16, "mir_rput_proto: not idle, so "
-				"disconnect/ord rel indication not passed "
-				"upstream on 0x%p\n", (void *)q);
+			    "disconnect/ord rel indication not passed "
+			    "upstream on 0x%p\n", (void *)q);
 
 			/*
 			 * Hold the indication until we get idle
@@ -2233,7 +2234,8 @@
 			return;
 		}
 		while (mp = getq(q)) {
-			if (mir->mir_krpc_cell) {
+			if (mir->mir_krpc_cell &&
+			    (mir->mir_svc_no_more_msgs == 0)) {
 				/*
 				 * If we were idle, turn off idle timer since
 				 * we aren't idle any more.
@@ -2241,15 +2243,16 @@
 				if (mir->mir_ref_cnt++ == 0)
 					stop_timer = B_TRUE;
 				if (mir_check_len(q,
-					(int32_t)msgdsize(mp), mp))
-						return;
+				    (int32_t)msgdsize(mp), mp))
+					return;
 				svc_queuereq(q, mp);
 			} else {
 				/*
 				 * Count # of times this happens. Should be
 				 * never, but experience shows otherwise.
 				 */
-				mir_krpc_cell_null++;
+				if (mir->mir_krpc_cell == NULL)
+					mir_krpc_cell_null++;
 				freemsg(mp);
 			}
 		}
@@ -2294,8 +2297,8 @@
 
 		if (cmp != NULL) {
 			RPCLOG(16, "mir_rsrv: line %d: sending a held "
-				"disconnect/ord rel indication upstream\n",
-				__LINE__);
+			    "disconnect/ord rel indication upstream\n",
+			    __LINE__);
 			putnext(q, cmp);
 		}
 
@@ -2330,7 +2333,7 @@
 
 		mir_svc_policy_fails++;
 		RPCLOG(16, "mir_svc_policy_notify: could not allocate event "
-			"%d\n", event);
+		    "%d\n", event);
 		return (ENOMEM);
 	}
 
@@ -2536,13 +2539,14 @@
 			clock_t tout;
 
 			tout = mir->mir_idle_timeout -
-				TICK_TO_MSEC(lbolt - mir->mir_use_timestamp);
+			    TICK_TO_MSEC(lbolt - mir->mir_use_timestamp);
 			if (tout < 0)
 				tout = 1000;
 #if 0
-printf("mir_timer[%d < %d + %d]: reset client timer to %d (ms)\n",
-TICK_TO_MSEC(lbolt), TICK_TO_MSEC(mir->mir_use_timestamp),
-mir->mir_idle_timeout, tout);
+			printf("mir_timer[%d < %d + %d]: reset client timer "
+			    "to %d (ms)\n", TICK_TO_MSEC(lbolt),
+			    TICK_TO_MSEC(mir->mir_use_timestamp),
+			    mir->mir_idle_timeout, tout);
 #endif
 			mir->mir_clntreq = 0;
 			mir_timer_start(wq, mir, tout);
@@ -2609,7 +2613,7 @@
 		return;
 	default:
 		RPCLOG(1, "mir_timer: unexpected mir_type %d\n",
-			mir->mir_type);
+		    mir->mir_type);
 		mutex_exit(&mir->mir_mutex);
 		return;
 	}
@@ -2639,7 +2643,7 @@
 	if (mir->mir_ordrel_pending == 1) {
 		freemsg(mp);
 		RPCLOG(16, "mir_wput wq 0x%p: got data after T_ORDREL_REQ\n",
-			(void *)q);
+		    (void *)q);
 		return;
 	}
 
@@ -2824,7 +2828,7 @@
 			flush_in_svc = TRUE;
 		}
 		if ((mp->b_wptr - rptr) < sizeof (uint32_t) ||
-				!IS_P2ALIGNED(rptr, sizeof (uint32_t)))
+		    !IS_P2ALIGNED(rptr, sizeof (uint32_t)))
 			break;
 
 		switch (((union T_primitives *)rptr)->type) {
@@ -2956,7 +2960,7 @@
 					    "wq 0x%p\n", (void *)q);
 					if (*mp->b_rptr & FLUSHBAND) {
 						flushband(q, *(mp->b_rptr + 1),
-							FLUSHDATA);
+						    FLUSHDATA);
 					} else {
 						flushq(q, FLUSHDATA);
 					}
@@ -3033,8 +3037,8 @@
 		 * T_ORDREL_REQ downstream.
 		 */
 		if (mir->mir_type != RPC_SERVER ||
-			    ((union T_primitives *)mp->b_rptr)->type !=
-			    T_ORDREL_REQ) {
+		    ((union T_primitives *)mp->b_rptr)->type !=
+		    T_ORDREL_REQ) {
 			mutex_exit(&mir->mir_mutex);
 			putnext(q, mp);
 			mutex_enter(&mir->mir_mutex);
@@ -3057,7 +3061,7 @@
 		 */
 		mir->mir_ordrel_pending = 1;
 		RPCLOG(16, "mir_wsrv: sending ordrel req on q 0x%p\n",
-								(void *)q);
+		    (void *)q);
 		/*
 		 * Send the orderly release downstream. If there are other
 		 * pending replies we won't be able to send them.  However,
@@ -3137,8 +3141,8 @@
 		mir_svc_idle_stop(WR(q), mir);
 		mutex_exit(&mir->mir_mutex);
 		RPCLOG(16, "mir_disconnect: telling "
-			"stream head listener to disconnect stream "
-			"(0x%p)\n", (void *) q);
+		    "stream head listener to disconnect stream "
+		    "(0x%p)\n", (void *) q);
 		(void) mir_svc_policy_notify(q, 2);
 		break;
 
@@ -3166,7 +3170,7 @@
 	 */
 
 	if ((frag_len <= 0) || (mir->mir_max_msg_sizep == NULL) ||
-		(frag_len <= *mir->mir_max_msg_sizep)) {
+	    (frag_len <= *mir->mir_max_msg_sizep)) {
 		return (0);
 	}
 
@@ -3175,12 +3179,12 @@
 	mir->mir_frag_len = -(int)sizeof (uint32_t);
 	if (mir->mir_type != RPC_SERVER || mir->mir_setup_complete) {
 		cmn_err(CE_NOTE,
-		"KRPC: record fragment from %s of size(%d) exceeds "
-		"maximum (%u). Disconnecting",
-		(mir->mir_type == RPC_CLIENT) ? "server" :
-		(mir->mir_type == RPC_SERVER) ? "client" :
-		"test tool",
-		frag_len, *mir->mir_max_msg_sizep);
+		    "KRPC: record fragment from %s of size(%d) exceeds "
+		    "maximum (%u). Disconnecting",
+		    (mir->mir_type == RPC_CLIENT) ? "server" :
+		    (mir->mir_type == RPC_SERVER) ? "client" :
+		    "test tool",
+		    frag_len, *mir->mir_max_msg_sizep);
 	}
 
 	mir_disconnect(q, mir);
--- a/usr/src/uts/common/rpc/svc.c	Fri Jul 27 08:24:13 2007 -0700
+++ b/usr/src/uts/common/rpc/svc.c	Fri Jul 27 08:36:40 2007 -0700
@@ -20,7 +20,7 @@
  */
 
 /*
- * Copyright 2006 Sun Microsystems, Inc.  All rights reserved.
+ * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
  */
 
@@ -112,14 +112,12 @@
  *   thread processes a request and sends a reply it returns to svc_run()
  *   and svc_run() calls svc_poll() to find new input.
  *
- *   There is an "inconsistent" but "safe" optimization in the
- *   svc_queuereq() code. The request is queued under the transport's
- *   request lock, while the `pending-requests' count is incremented
- *   independently under the pool request lock. Thus, a request can be picked
- *   up by a service thread before the counter is incremented. It may also
- *   happen that the service thread will win the race condition on the pool
- *   lock and it will decrement the count even before the interrupt thread
- *   increments it (so the count can be temporarily negative).
+ *   There is no longer an "inconsistent" but "safe" optimization in the
+ *   svc_queuereq() code. This "inconsistent" state was leading to
+ *   inconsistencies between the actual number of requests and the value
+ *   of p_reqs (the total number of requests). Because of this, hangs were
+ *   occurring in svc_poll() where p_reqs was greater than one and no
+ *   requests were found on the request queues.
  *
  * svc_poll().
  *   In order to avoid unnecessary locking, which causes performance
@@ -984,24 +982,6 @@
 }
 
 /*
- * Reset an overflow in the xprt-ready queue after
- * all the pending requests has been drained.
- * This switches svc_poll back to getting hints from the
- * xprt-ready queue.
- *
- * NOTICE: pool->p_qtop is protected by the the pool's request lock
- * and the caller (svc_poll()) must hold the lock.
- */
-static void
-svc_xprt_qreset(SVCPOOL *pool)
-{
-	ASSERT(MUTEX_HELD(&pool->p_req_lock));
-
-	pool->p_qend = pool->p_qtop;
-	pool->p_qoverflow = FALSE;
-}
-
-/*
  * Delete all the references to a transport handle that
  * is being destroyed from the xprt-ready queue.
  * Deleted pointers are replaced with NULLs.
@@ -1893,6 +1873,8 @@
 			if (xprt->xp_req_head) {
 				mutex_enter(&pool->p_req_lock);
 				pool->p_reqs--;
+				if (pool->p_reqs == 0)
+					pool->p_qoverflow = FALSE;
 				mutex_exit(&pool->p_req_lock);
 
 				return (xprt);
@@ -1939,6 +1921,8 @@
 
 					mutex_enter(&pool->p_req_lock);
 					pool->p_reqs--;
+					if (pool->p_reqs == 0)
+						pool->p_qoverflow = FALSE;
 					pool->p_walkers--;
 					mutex_exit(&pool->p_req_lock);
 
@@ -2010,6 +1994,8 @@
 
 					mutex_enter(&pool->p_req_lock);
 					pool->p_reqs--;
+					if (pool->p_reqs == 0)
+						pool->p_qoverflow = FALSE;
 					pool->p_walkers--;
 					mutex_exit(&pool->p_req_lock);
 
@@ -2025,14 +2011,8 @@
 			 * a lock first to avoid contention on a mutex.
 			 */
 			if (pool->p_reqs < pool->p_walkers) {
-				/*
-				 * Check again, now with the lock.
-				 * If all the pending requests have been
-				 * picked up than clear the overflow flag.
-				 */
+				/* Check again, now with the lock. */
 				mutex_enter(&pool->p_req_lock);
-				if (pool->p_reqs <= 0)
-					svc_xprt_qreset(pool);
 				if (pool->p_reqs < pool->p_walkers)
 					break;	/* goto sleep */
 				mutex_exit(&pool->p_req_lock);
@@ -2306,14 +2286,20 @@
 {
 	SVCMASTERXPRT *xprt = ((void **) q->q_ptr)[0];
 	mblk_t *mp;
+	SVCPOOL *pool;
 
 	/*
 	 * clean up the requests
 	 */
 	mutex_enter(&xprt->xp_req_lock);
+	pool = xprt->xp_pool;
 	while ((mp = xprt->xp_req_head) != NULL) {
+		/* remove the request from the list and decrement p_reqs */
 		xprt->xp_req_head = mp->b_next;
+		mutex_enter(&pool->p_req_lock);
 		mp->b_next = (mblk_t *)0;
+		pool->p_reqs--;
+		mutex_exit(&pool->p_req_lock);
 		(*RELE_PROC(xprt)) (xprt->xp_wq, mp);
 	}
 	mutex_exit(&xprt->xp_req_lock);
@@ -2411,27 +2397,27 @@
 
 	/*
 	 * Step 1.
-	 * Grab the transport's request lock and put
+	 * Grab the transport's request lock and the
+	 * pool's request lock so that when we put
 	 * the request at the tail of the transport's
-	 * request queue.
+	 * request queue, possibly put the request on
+	 * the xprt ready queue and increment the
+	 * pending request count it looks atomic.
 	 */
 	mutex_enter(&xprt->xp_req_lock);
+	mutex_enter(&pool->p_req_lock);
 	if (xprt->xp_req_head == NULL)
 		xprt->xp_req_head = mp;
 	else
 		xprt->xp_req_tail->b_next = mp;
 	xprt->xp_req_tail = mp;
 
-	mutex_exit(&xprt->xp_req_lock);
-
 	/*
 	 * Step 2.
-	 * Grab the pool request lock, insert a hint into
-	 * the xprt-ready queue, increment `pending-requests'
-	 * count for the pool, and wake up a thread sleeping
-	 * in svc_poll() if necessary.
+	 * Insert a hint into the xprt-ready queue, increment
+	 * `pending-requests' count for the pool, and wake up
+	 * a thread sleeping in svc_poll() if necessary.
 	 */
-	mutex_enter(&pool->p_req_lock);
 
 	/* Insert pointer to this transport into the xprt-ready queue */
 	svc_xprt_qput(pool, xprt);
@@ -2463,6 +2449,7 @@
 		cv_signal(&pool->p_req_cv);
 		mutex_exit(&pool->p_req_lock);
 	}
+	mutex_exit(&xprt->xp_req_lock);
 
 	/*
 	 * Step 3.
@@ -2476,7 +2463,7 @@
 	 * decision is not essential.
 	 */
 	if (pool->p_asleep == 0 && !pool->p_drowsy &&
-		pool->p_threads + pool->p_detached_threads < pool->p_maxthreads)
+	    pool->p_threads + pool->p_detached_threads < pool->p_maxthreads)
 		svc_creator_signal(pool);
 
 	TRACE_1(TR_FAC_KRPC, TR_SVC_QUEUEREQ_END,
@@ -2511,7 +2498,7 @@
 	/* Check pool counts if there is room for reservation */
 	mutex_enter(&pool->p_thread_lock);
 	if (pool->p_reserved_threads + pool->p_detached_threads >=
-		pool->p_maxthreads - pool->p_redline) {
+	    pool->p_maxthreads - pool->p_redline) {
 		mutex_exit(&pool->p_thread_lock);
 		return (0);
 	}
@@ -2616,6 +2603,7 @@
 	queue_t *q;
 	mblk_t *mp;
 	int i;
+	SVCPOOL *pool;
 
 	if (rdma_xprts.rtg_count == 0)
 		return;
@@ -2629,9 +2617,17 @@
 		svc_rdma_kstop(xprt);
 
 		mutex_enter(&xprt->xp_req_lock);
+		pool = xprt->xp_pool;
 		while ((mp = xprt->xp_req_head) != NULL) {
+			/*
+			 * remove the request from the list and
+			 * decrement p_reqs
+			 */
 			xprt->xp_req_head = mp->b_next;
+			mutex_enter(&pool->p_req_lock);
 			mp->b_next = (mblk_t *)0;
+			pool->p_reqs--;
+			mutex_exit(&pool->p_req_lock);
 			if (mp)
 				freemsg(mp);
 		}