changeset 6970:1f8e4fab9fdc

6607971 TCP fusion mishandles shutdown(SHUT_RD) 6622010 MSG_PEEK on a fused TCP endpoint does not show all outstanding data 6633302 tcp fusion flow control is asymmetric 6679750 panic: assertion failed: tcp->tcp_fuse_rcv_unread_cnt == 0 6691414 STR_WAKEUP() and STR_SENDSIG() can be combined in some cases
author ja97890
date Fri, 27 Jun 2008 08:24:48 -0700
parents 63af7f947e2d
children c5a8f17d5076
files usr/src/uts/common/inet/ip_impl.h usr/src/uts/common/inet/tcp/tcp.c usr/src/uts/common/inet/tcp/tcp_fusion.c usr/src/uts/common/inet/udp/udp.c
diffstat 4 files changed, 123 insertions(+), 50 deletions(-) [+]
line wrap: on
line diff
--- a/usr/src/uts/common/inet/ip_impl.h	Fri Jun 27 01:38:17 2008 -0700
+++ b/usr/src/uts/common/inet/ip_impl.h	Fri Jun 27 08:24:48 2008 -0700
@@ -450,9 +450,23 @@
 	mutex_exit(&stp->sd_lock);					\
 }
 
-#define	STR_SENDSIG(stp) {						\
+/*
+ * Combined wakeup and sendsig to avoid dropping and reacquiring the
+ * sd_lock. The list of messages waiting at the synchronous barrier is
+ * supplied in order to determine whether a wakeup needs to occur. We
+ * only send a wakeup to the application when necessary, i.e. during
+ * the first enqueue when the received messages list will be NULL.
+ */
+#define	STR_WAKEUP_SENDSIG(stp, rcv_list) {				\
 	int _events;							\
 	mutex_enter(&stp->sd_lock);					\
+	if (rcv_list == NULL) {						\
+		if (stp->sd_flag & RSLEEP) {				\
+			stp->sd_flag &= ~RSLEEP;			\
+			cv_broadcast(&_RD(stp->sd_wrq)->q_wait);	\
+		}							\
+		stp->sd_wakeq |= RSLEEP;				\
+	}								\
 	if ((_events = stp->sd_sigflags & (S_INPUT | S_RDNORM)) != 0)	\
 		strsendsig(stp->sd_siglist, _events, 0, 0);		\
 	if (stp->sd_rput_opt & SR_POLLIN) {				\
--- a/usr/src/uts/common/inet/tcp/tcp.c	Fri Jun 27 01:38:17 2008 -0700
+++ b/usr/src/uts/common/inet/tcp/tcp.c	Fri Jun 27 08:24:48 2008 -0700
@@ -11766,6 +11766,12 @@
 	/* Can't be sodirect enabled */
 	ASSERT(SOD_NOT_ENABLED(tcp));
 
+	/* No need for the push timer now. */
+	if (tcp->tcp_push_tid != 0) {
+		(void) TCP_TIMER_CANCEL(tcp, tcp->tcp_push_tid);
+		tcp->tcp_push_tid = 0;
+	}
+
 	/*
 	 * Handle two cases here: we are currently fused or we were
 	 * previously fused and have some urgent data to be delivered
@@ -11823,11 +11829,6 @@
 		}
 		tcp->tcp_rwnd = q->q_hiwat;
 	}
-	/* No need for the push timer now. */
-	if (tcp->tcp_push_tid != 0) {
-		(void) TCP_TIMER_CANCEL(tcp, tcp->tcp_push_tid);
-		tcp->tcp_push_tid = 0;
-	}
 	return (ret);
 }
 
@@ -18435,20 +18436,26 @@
 		/* There is some data, add them back to get the max. */
 		tcp->tcp_rq->q_hiwat = tcp->tcp_rwnd + tcp->tcp_rcv_cnt;
 	}
-
-	stropt->so_flags = SO_HIWAT;
-	stropt->so_hiwat = MAX(q->q_hiwat, tcps->tcps_sth_rcv_hiwat);
-
-	stropt->so_flags |= SO_MAXBLK;
-	stropt->so_maxblk = tcp_maxpsz_set(tcp, B_FALSE);
-
 	/*
 	 * This is the first time we run on the correct
 	 * queue after tcp_accept. So fix all the q parameters
 	 * here.
 	 */
-	/* Allocate room for SACK options if needed. */
-	stropt->so_flags |= SO_WROFF;
+	stropt->so_flags = SO_HIWAT | SO_MAXBLK | SO_WROFF;
+	stropt->so_maxblk = tcp_maxpsz_set(tcp, B_FALSE);
+
+	/*
+	 * Record the stream head's high water mark for this endpoint;
+	 * this is used for flow-control purposes.
+	 */
+	stropt->so_hiwat = tcp->tcp_fused ?
+	    tcp_fuse_set_rcv_hiwat(tcp, q->q_hiwat) :
+	    MAX(q->q_hiwat, tcps->tcps_sth_rcv_hiwat);
+
+	/*
+	 * Determine what write offset value to use depending on SACK and
+	 * whether the endpoint is fused or not.
+	 */
 	if (tcp->tcp_fused) {
 		ASSERT(tcp->tcp_loopback);
 		ASSERT(tcp->tcp_loopback_peer != NULL);
@@ -18461,11 +18468,6 @@
 		 */
 		stropt->so_wroff = 0;
 		/*
-		 * Record the stream head's high water mark for this endpoint;
-		 * this is used for flow-control purposes in tcp_fuse_output().
-		 */
-		stropt->so_hiwat = tcp_fuse_set_rcv_hiwat(tcp, q->q_hiwat);
-		/*
 		 * Update the peer's transmit parameters according to
 		 * our recently calculated high water mark value.
 		 */
@@ -18610,8 +18612,6 @@
 		tcp->tcp_hard_bound = B_TRUE;
 	}
 
-	tcp->tcp_detached = B_FALSE;
-
 	/* We can enable synchronous streams now */
 	if (tcp->tcp_fused) {
 		tcp_fuse_syncstr_enable_pair(tcp);
@@ -18923,8 +18923,11 @@
 		 * but we still have an extra refs on eager (apart from the
 		 * usual tcp references). The ref was placed in tcp_rput_data
 		 * before sending the conn_ind in tcp_send_conn_ind.
-		 * The ref will be dropped in tcp_accept_finish().
-		 */
+		 * The ref will be dropped in tcp_accept_finish(). As sockfs
+		 * has already established this tcp with it's own stream,
+		 * it's OK to set tcp_detached to B_FALSE.
+		 */
+		econnp->conn_tcp->tcp_detached = B_FALSE;
 		squeue_enter_nodrain(econnp->conn_sqp, opt_mp,
 		    tcp_accept_finish, econnp, SQTAG_TCP_ACCEPT_FINISH_Q0);
 		return;
--- a/usr/src/uts/common/inet/tcp/tcp_fusion.c	Fri Jun 27 01:38:17 2008 -0700
+++ b/usr/src/uts/common/inet/tcp/tcp_fusion.c	Fri Jun 27 08:24:48 2008 -0700
@@ -123,9 +123,9 @@
 #define	TCP_FUSION_RCV_UNREAD_MIN	8
 uint_t tcp_fusion_rcv_unread_min = TCP_FUSION_RCV_UNREAD_MIN;
 
-static void	tcp_fuse_syncstr_enable(tcp_t *);
-static void	tcp_fuse_syncstr_disable(tcp_t *);
-static void	strrput_sig(queue_t *, boolean_t);
+static void		tcp_fuse_syncstr_enable(tcp_t *);
+static void		tcp_fuse_syncstr_disable(tcp_t *);
+static boolean_t	strrput_sig(queue_t *, boolean_t);
 
 /*
  * Return true if this connection needs some IP functionality
@@ -531,7 +531,6 @@
 	ASSERT(DB_TYPE(mp) == M_DATA || DB_TYPE(mp) == M_PROTO ||
 	    DB_TYPE(mp) == M_PCPROTO);
 
-	max_unread = peer_tcp->tcp_fuse_rcv_unread_hiwater;
 
 	/* If this connection requires IP, unfuse and use regular path */
 	if (tcp_loopback_needs_ip(tcp, ns) ||
@@ -545,6 +544,7 @@
 		freemsg(mp);
 		return (B_TRUE);
 	}
+	max_unread = peer_tcp->tcp_fuse_rcv_unread_hiwater;
 
 	/*
 	 * Handle urgent data; we either send up SIGURG to the peer now
@@ -666,10 +666,9 @@
 	 */
 	if (peer_tcp->tcp_direct_sockfs && !urgent &&
 	    !TCP_IS_DETACHED(peer_tcp)) {
-		if (peer_tcp->tcp_rcv_list == NULL)
-			STR_WAKEUP_SET(STREAM(peer_tcp->tcp_rq));
 		/* Update poll events and send SIGPOLL/SIGIO if necessary */
-		STR_SENDSIG(STREAM(peer_tcp->tcp_rq));
+		STR_WAKEUP_SENDSIG(STREAM(peer_tcp->tcp_rq),
+		    peer_tcp->tcp_rcv_list);
 	}
 
 	/*
@@ -680,6 +679,14 @@
 
 	/* In case it wrapped around and also to keep it constant */
 	peer_tcp->tcp_rwnd += recv_size;
+	/*
+	 * We increase the peer's unread message count here whilst still
+	 * holding it's tcp_non_sq_lock. This ensures that the increment
+	 * occurs in the same lock acquisition perimeter as the enqueue.
+	 * Depending on lock hierarchy, we can release these locks which
+	 * creates a window in which we can race with tcp_fuse_rrw()
+	 */
+	peer_tcp->tcp_fuse_rcv_unread_cnt++;
 
 	/*
 	 * Exercise flow-control when needed; we will get back-enabled
@@ -714,9 +721,9 @@
 	flow_stopped = tcp->tcp_flow_stopped;
 	if (((peer_tcp->tcp_direct_sockfs || TCP_IS_DETACHED(peer_tcp)) &&
 	    (peer_tcp->tcp_rcv_cnt >= peer_tcp->tcp_fuse_rcv_hiwater ||
-	    ++peer_tcp->tcp_fuse_rcv_unread_cnt >= max_unread)) ||
-	    (!peer_tcp->tcp_direct_sockfs &&
-	    !TCP_IS_DETACHED(peer_tcp) && !canputnext(peer_tcp->tcp_rq))) {
+	    peer_tcp->tcp_fuse_rcv_unread_cnt >= max_unread)) ||
+	    (!peer_tcp->tcp_direct_sockfs && !TCP_IS_DETACHED(peer_tcp) &&
+	    !canputnext(peer_tcp->tcp_rq))) {
 		peer_data_queued = B_TRUE;
 	}
 
@@ -731,9 +738,26 @@
 	} else if (flow_stopped && !peer_data_queued &&
 	    (TCP_UNSENT_BYTES(tcp) <= tcp->tcp_xmit_lowater)) {
 		tcp_clrqfull(tcp);
+		TCP_STAT(tcps, tcp_fusion_backenabled);
 		flow_stopped = B_FALSE;
 	}
 	mutex_exit(&tcp->tcp_non_sq_lock);
+
+	/*
+	 * If we are in synchronous streams mode and the peer read queue is
+	 * not full then schedule a push timer if one is not scheduled
+	 * already. This is needed for applications which use MSG_PEEK to
+	 * determine the number of bytes available before issuing a 'real'
+	 * read. It also makes flow control more deterministic, particularly
+	 * for smaller message sizes.
+	 */
+	if (!urgent && peer_tcp->tcp_direct_sockfs &&
+	    peer_tcp->tcp_push_tid == 0 && !TCP_IS_DETACHED(peer_tcp) &&
+	    canputnext(peer_tcp->tcp_rq)) {
+		peer_tcp->tcp_push_tid = TCP_TIMER(peer_tcp, tcp_push_timer,
+		    MSEC_TO_TICK(tcps->tcps_push_timer_interval));
+	}
+	mutex_exit(&peer_tcp->tcp_non_sq_lock);
 	ipst->ips_loopback_packets++;
 	tcp->tcp_last_sent_len = send_size;
 
@@ -753,8 +777,6 @@
 	BUMP_LOCAL(tcp->tcp_obsegs);
 	BUMP_LOCAL(peer_tcp->tcp_ibsegs);
 
-	mutex_exit(&peer_tcp->tcp_non_sq_lock);
-
 	DTRACE_PROBE2(tcp__fuse__output, tcp_t *, tcp, uint_t, send_size);
 
 	if (!TCP_IS_DETACHED(peer_tcp)) {
@@ -805,6 +827,8 @@
 	uint_t cnt = 0;
 #endif
 	tcp_stack_t	*tcps = tcp->tcp_tcps;
+	tcp_t		*peer_tcp = tcp->tcp_loopback_peer;
+	boolean_t	sd_rd_eof = B_FALSE;
 
 	ASSERT(tcp->tcp_loopback);
 	ASSERT(tcp->tcp_fused || tcp->tcp_fused_sigurg);
@@ -866,10 +890,14 @@
 	 * to avoid extraneous signal generation from strrput(), we set
 	 * STRGETINPROG flag at the stream head prior to the draining and
 	 * restore it afterwards.  This masks out signal generation only
-	 * for M_DATA messages and does not affect urgent data.
+	 * for M_DATA messages and does not affect urgent data. We only do
+	 * this if the STREOF flag is not set which can happen if the
+	 * application shuts down the read side of a stream. In this case
+	 * we simply free these messages to approximate the flushq behavior
+	 * which normally occurs when STREOF is on the stream head read queue.
 	 */
 	if (tcp->tcp_direct_sockfs)
-		strrput_sig(q, B_FALSE);
+		sd_rd_eof = strrput_sig(q, B_FALSE);
 
 	/* Drain the data */
 	while ((mp = tcp->tcp_rcv_list) != NULL) {
@@ -878,12 +906,16 @@
 #ifdef DEBUG
 		cnt += msgdsize(mp);
 #endif
-		putnext(q, mp);
-		TCP_STAT(tcps, tcp_fusion_putnext);
+		if (sd_rd_eof) {
+			freemsg(mp);
+		} else {
+			putnext(q, mp);
+			TCP_STAT(tcps, tcp_fusion_putnext);
+		}
 	}
 
-	if (tcp->tcp_direct_sockfs)
-		strrput_sig(q, B_TRUE);
+	if (tcp->tcp_direct_sockfs && !sd_rd_eof)
+		(void) strrput_sig(q, B_TRUE);
 
 	ASSERT(cnt == tcp->tcp_rcv_cnt);
 	tcp->tcp_rcv_last_head = NULL;
@@ -892,6 +924,12 @@
 	tcp->tcp_fuse_rcv_unread_cnt = 0;
 	tcp->tcp_rwnd = q->q_hiwat;
 
+	if (peer_tcp->tcp_flow_stopped && (TCP_UNSENT_BYTES(peer_tcp) <=
+	    peer_tcp->tcp_xmit_lowater)) {
+		tcp_clrqfull(peer_tcp);
+		TCP_STAT(tcps, tcp_fusion_backenabled);
+	}
+
 	return (B_TRUE);
 }
 
@@ -1198,19 +1236,28 @@
 }
 
 /*
- * Allow or disallow signals to be generated by strrput().
+ * Used to enable/disable signal generation at the stream head. We already
+ * generated the signal(s) for these messages when they were enqueued on the
+ * receiver. We also check if STREOF is set here. If it is, we return false
+ * and let the caller decide what to do.
  */
-static void
+static boolean_t
 strrput_sig(queue_t *q, boolean_t on)
 {
 	struct stdata *stp = STREAM(q);
 
 	mutex_enter(&stp->sd_lock);
+	if (stp->sd_flag == STREOF) {
+		mutex_exit(&stp->sd_lock);
+		return (B_TRUE);
+	}
 	if (on)
 		stp->sd_flag &= ~STRGETINPROG;
 	else
 		stp->sd_flag |= STRGETINPROG;
 	mutex_exit(&stp->sd_lock);
+
+	return (B_FALSE);
 }
 
 /*
@@ -1235,6 +1282,18 @@
 	TCP_FUSE_SYNCSTR_PLUG_DRAIN(peer_tcp);
 
 	/*
+	 * Cancel any pending push timers.
+	 */
+	if (tcp->tcp_push_tid != 0) {
+		(void) TCP_TIMER_CANCEL(tcp, tcp->tcp_push_tid);
+		tcp->tcp_push_tid = 0;
+	}
+	if (peer_tcp->tcp_push_tid != 0) {
+		(void) TCP_TIMER_CANCEL(peer_tcp, peer_tcp->tcp_push_tid);
+		peer_tcp->tcp_push_tid = 0;
+	}
+
+	/*
 	 * Drain any pending data; the detached check is needed because
 	 * we may be called as a result of a tcp_unfuse() triggered by
 	 * tcp_fuse_output().  Note that in case of a detached tcp, the
--- a/usr/src/uts/common/inet/udp/udp.c	Fri Jun 27 01:38:17 2008 -0700
+++ b/usr/src/uts/common/inet/udp/udp.c	Fri Jun 27 08:24:48 2008 -0700
@@ -8156,12 +8156,11 @@
 	 * datagram upstream and call STR_WAKEUP_SET() again when there
 	 * are still data remaining in our receive queue.
 	 */
-	if (udp->udp_rcv_list_head == NULL) {
-		STR_WAKEUP_SET(STREAM(q));
+	STR_WAKEUP_SENDSIG(STREAM(q), udp->udp_rcv_list_head);
+	if (udp->udp_rcv_list_head == NULL)
 		udp->udp_rcv_list_head = mp;
-	} else {
+	else
 		udp->udp_rcv_list_tail->b_next = mp;
-	}
 	udp->udp_rcv_list_tail = mp;
 	udp->udp_rcv_cnt += pkt_len;
 	udp->udp_rcv_msgcnt++;
@@ -8171,8 +8170,6 @@
 	    udp->udp_rcv_msgcnt >= udp->udp_rcv_hiwat)
 		udp->udp_drain_qfull = B_TRUE;
 
-	/* Update poll events and send SIGPOLL/SIGIO if necessary */
-	STR_SENDSIG(STREAM(q));
 	mutex_exit(&udp->udp_drain_lock);
 }