Mercurial > illumos > illumos-gate
changeset 6970:1f8e4fab9fdc
6607971 TCP fusion mishandles shutdown(SHUT_RD)
6622010 MSG_PEEK on a fused TCP endpoint does not show all outstanding data
6633302 tcp fusion flow control is asymmetric
6679750 panic: assertion failed: tcp->tcp_fuse_rcv_unread_cnt == 0
6691414 STR_WAKEUP() and STR_SENDSIG() can be combined in some cases
author | ja97890 |
---|---|
date | Fri, 27 Jun 2008 08:24:48 -0700 |
parents | 63af7f947e2d |
children | c5a8f17d5076 |
files | usr/src/uts/common/inet/ip_impl.h usr/src/uts/common/inet/tcp/tcp.c usr/src/uts/common/inet/tcp/tcp_fusion.c usr/src/uts/common/inet/udp/udp.c |
diffstat | 4 files changed, 123 insertions(+), 50 deletions(-) [+] |
line wrap: on
line diff
--- a/usr/src/uts/common/inet/ip_impl.h Fri Jun 27 01:38:17 2008 -0700 +++ b/usr/src/uts/common/inet/ip_impl.h Fri Jun 27 08:24:48 2008 -0700 @@ -450,9 +450,23 @@ mutex_exit(&stp->sd_lock); \ } -#define STR_SENDSIG(stp) { \ +/* + * Combined wakeup and sendsig to avoid dropping and reacquiring the + * sd_lock. The list of messages waiting at the synchronous barrier is + * supplied in order to determine whether a wakeup needs to occur. We + * only send a wakeup to the application when necessary, i.e. during + * the first enqueue when the received messages list will be NULL. + */ +#define STR_WAKEUP_SENDSIG(stp, rcv_list) { \ int _events; \ mutex_enter(&stp->sd_lock); \ + if (rcv_list == NULL) { \ + if (stp->sd_flag & RSLEEP) { \ + stp->sd_flag &= ~RSLEEP; \ + cv_broadcast(&_RD(stp->sd_wrq)->q_wait); \ + } \ + stp->sd_wakeq |= RSLEEP; \ + } \ if ((_events = stp->sd_sigflags & (S_INPUT | S_RDNORM)) != 0) \ strsendsig(stp->sd_siglist, _events, 0, 0); \ if (stp->sd_rput_opt & SR_POLLIN) { \
--- a/usr/src/uts/common/inet/tcp/tcp.c Fri Jun 27 01:38:17 2008 -0700 +++ b/usr/src/uts/common/inet/tcp/tcp.c Fri Jun 27 08:24:48 2008 -0700 @@ -11766,6 +11766,12 @@ /* Can't be sodirect enabled */ ASSERT(SOD_NOT_ENABLED(tcp)); + /* No need for the push timer now. */ + if (tcp->tcp_push_tid != 0) { + (void) TCP_TIMER_CANCEL(tcp, tcp->tcp_push_tid); + tcp->tcp_push_tid = 0; + } + /* * Handle two cases here: we are currently fused or we were * previously fused and have some urgent data to be delivered @@ -11823,11 +11829,6 @@ } tcp->tcp_rwnd = q->q_hiwat; } - /* No need for the push timer now. */ - if (tcp->tcp_push_tid != 0) { - (void) TCP_TIMER_CANCEL(tcp, tcp->tcp_push_tid); - tcp->tcp_push_tid = 0; - } return (ret); } @@ -18435,20 +18436,26 @@ /* There is some data, add them back to get the max. */ tcp->tcp_rq->q_hiwat = tcp->tcp_rwnd + tcp->tcp_rcv_cnt; } - - stropt->so_flags = SO_HIWAT; - stropt->so_hiwat = MAX(q->q_hiwat, tcps->tcps_sth_rcv_hiwat); - - stropt->so_flags |= SO_MAXBLK; - stropt->so_maxblk = tcp_maxpsz_set(tcp, B_FALSE); - /* * This is the first time we run on the correct * queue after tcp_accept. So fix all the q parameters * here. */ - /* Allocate room for SACK options if needed. */ - stropt->so_flags |= SO_WROFF; + stropt->so_flags = SO_HIWAT | SO_MAXBLK | SO_WROFF; + stropt->so_maxblk = tcp_maxpsz_set(tcp, B_FALSE); + + /* + * Record the stream head's high water mark for this endpoint; + * this is used for flow-control purposes. + */ + stropt->so_hiwat = tcp->tcp_fused ? + tcp_fuse_set_rcv_hiwat(tcp, q->q_hiwat) : + MAX(q->q_hiwat, tcps->tcps_sth_rcv_hiwat); + + /* + * Determine what write offset value to use depending on SACK and + * whether the endpoint is fused or not. + */ if (tcp->tcp_fused) { ASSERT(tcp->tcp_loopback); ASSERT(tcp->tcp_loopback_peer != NULL); @@ -18461,11 +18468,6 @@ */ stropt->so_wroff = 0; /* - * Record the stream head's high water mark for this endpoint; - * this is used for flow-control purposes in tcp_fuse_output(). - */ - stropt->so_hiwat = tcp_fuse_set_rcv_hiwat(tcp, q->q_hiwat); - /* * Update the peer's transmit parameters according to * our recently calculated high water mark value. */ @@ -18610,8 +18612,6 @@ tcp->tcp_hard_bound = B_TRUE; } - tcp->tcp_detached = B_FALSE; - /* We can enable synchronous streams now */ if (tcp->tcp_fused) { tcp_fuse_syncstr_enable_pair(tcp); @@ -18923,8 +18923,11 @@ * but we still have an extra refs on eager (apart from the * usual tcp references). The ref was placed in tcp_rput_data * before sending the conn_ind in tcp_send_conn_ind. - * The ref will be dropped in tcp_accept_finish(). - */ + * The ref will be dropped in tcp_accept_finish(). As sockfs + * has already established this tcp with it's own stream, + * it's OK to set tcp_detached to B_FALSE. + */ + econnp->conn_tcp->tcp_detached = B_FALSE; squeue_enter_nodrain(econnp->conn_sqp, opt_mp, tcp_accept_finish, econnp, SQTAG_TCP_ACCEPT_FINISH_Q0); return;
--- a/usr/src/uts/common/inet/tcp/tcp_fusion.c Fri Jun 27 01:38:17 2008 -0700 +++ b/usr/src/uts/common/inet/tcp/tcp_fusion.c Fri Jun 27 08:24:48 2008 -0700 @@ -123,9 +123,9 @@ #define TCP_FUSION_RCV_UNREAD_MIN 8 uint_t tcp_fusion_rcv_unread_min = TCP_FUSION_RCV_UNREAD_MIN; -static void tcp_fuse_syncstr_enable(tcp_t *); -static void tcp_fuse_syncstr_disable(tcp_t *); -static void strrput_sig(queue_t *, boolean_t); +static void tcp_fuse_syncstr_enable(tcp_t *); +static void tcp_fuse_syncstr_disable(tcp_t *); +static boolean_t strrput_sig(queue_t *, boolean_t); /* * Return true if this connection needs some IP functionality @@ -531,7 +531,6 @@ ASSERT(DB_TYPE(mp) == M_DATA || DB_TYPE(mp) == M_PROTO || DB_TYPE(mp) == M_PCPROTO); - max_unread = peer_tcp->tcp_fuse_rcv_unread_hiwater; /* If this connection requires IP, unfuse and use regular path */ if (tcp_loopback_needs_ip(tcp, ns) || @@ -545,6 +544,7 @@ freemsg(mp); return (B_TRUE); } + max_unread = peer_tcp->tcp_fuse_rcv_unread_hiwater; /* * Handle urgent data; we either send up SIGURG to the peer now @@ -666,10 +666,9 @@ */ if (peer_tcp->tcp_direct_sockfs && !urgent && !TCP_IS_DETACHED(peer_tcp)) { - if (peer_tcp->tcp_rcv_list == NULL) - STR_WAKEUP_SET(STREAM(peer_tcp->tcp_rq)); /* Update poll events and send SIGPOLL/SIGIO if necessary */ - STR_SENDSIG(STREAM(peer_tcp->tcp_rq)); + STR_WAKEUP_SENDSIG(STREAM(peer_tcp->tcp_rq), + peer_tcp->tcp_rcv_list); } /* @@ -680,6 +679,14 @@ /* In case it wrapped around and also to keep it constant */ peer_tcp->tcp_rwnd += recv_size; + /* + * We increase the peer's unread message count here whilst still + * holding it's tcp_non_sq_lock. This ensures that the increment + * occurs in the same lock acquisition perimeter as the enqueue. + * Depending on lock hierarchy, we can release these locks which + * creates a window in which we can race with tcp_fuse_rrw() + */ + peer_tcp->tcp_fuse_rcv_unread_cnt++; /* * Exercise flow-control when needed; we will get back-enabled @@ -714,9 +721,9 @@ flow_stopped = tcp->tcp_flow_stopped; if (((peer_tcp->tcp_direct_sockfs || TCP_IS_DETACHED(peer_tcp)) && (peer_tcp->tcp_rcv_cnt >= peer_tcp->tcp_fuse_rcv_hiwater || - ++peer_tcp->tcp_fuse_rcv_unread_cnt >= max_unread)) || - (!peer_tcp->tcp_direct_sockfs && - !TCP_IS_DETACHED(peer_tcp) && !canputnext(peer_tcp->tcp_rq))) { + peer_tcp->tcp_fuse_rcv_unread_cnt >= max_unread)) || + (!peer_tcp->tcp_direct_sockfs && !TCP_IS_DETACHED(peer_tcp) && + !canputnext(peer_tcp->tcp_rq))) { peer_data_queued = B_TRUE; } @@ -731,9 +738,26 @@ } else if (flow_stopped && !peer_data_queued && (TCP_UNSENT_BYTES(tcp) <= tcp->tcp_xmit_lowater)) { tcp_clrqfull(tcp); + TCP_STAT(tcps, tcp_fusion_backenabled); flow_stopped = B_FALSE; } mutex_exit(&tcp->tcp_non_sq_lock); + + /* + * If we are in synchronous streams mode and the peer read queue is + * not full then schedule a push timer if one is not scheduled + * already. This is needed for applications which use MSG_PEEK to + * determine the number of bytes available before issuing a 'real' + * read. It also makes flow control more deterministic, particularly + * for smaller message sizes. + */ + if (!urgent && peer_tcp->tcp_direct_sockfs && + peer_tcp->tcp_push_tid == 0 && !TCP_IS_DETACHED(peer_tcp) && + canputnext(peer_tcp->tcp_rq)) { + peer_tcp->tcp_push_tid = TCP_TIMER(peer_tcp, tcp_push_timer, + MSEC_TO_TICK(tcps->tcps_push_timer_interval)); + } + mutex_exit(&peer_tcp->tcp_non_sq_lock); ipst->ips_loopback_packets++; tcp->tcp_last_sent_len = send_size; @@ -753,8 +777,6 @@ BUMP_LOCAL(tcp->tcp_obsegs); BUMP_LOCAL(peer_tcp->tcp_ibsegs); - mutex_exit(&peer_tcp->tcp_non_sq_lock); - DTRACE_PROBE2(tcp__fuse__output, tcp_t *, tcp, uint_t, send_size); if (!TCP_IS_DETACHED(peer_tcp)) { @@ -805,6 +827,8 @@ uint_t cnt = 0; #endif tcp_stack_t *tcps = tcp->tcp_tcps; + tcp_t *peer_tcp = tcp->tcp_loopback_peer; + boolean_t sd_rd_eof = B_FALSE; ASSERT(tcp->tcp_loopback); ASSERT(tcp->tcp_fused || tcp->tcp_fused_sigurg); @@ -866,10 +890,14 @@ * to avoid extraneous signal generation from strrput(), we set * STRGETINPROG flag at the stream head prior to the draining and * restore it afterwards. This masks out signal generation only - * for M_DATA messages and does not affect urgent data. + * for M_DATA messages and does not affect urgent data. We only do + * this if the STREOF flag is not set which can happen if the + * application shuts down the read side of a stream. In this case + * we simply free these messages to approximate the flushq behavior + * which normally occurs when STREOF is on the stream head read queue. */ if (tcp->tcp_direct_sockfs) - strrput_sig(q, B_FALSE); + sd_rd_eof = strrput_sig(q, B_FALSE); /* Drain the data */ while ((mp = tcp->tcp_rcv_list) != NULL) { @@ -878,12 +906,16 @@ #ifdef DEBUG cnt += msgdsize(mp); #endif - putnext(q, mp); - TCP_STAT(tcps, tcp_fusion_putnext); + if (sd_rd_eof) { + freemsg(mp); + } else { + putnext(q, mp); + TCP_STAT(tcps, tcp_fusion_putnext); + } } - if (tcp->tcp_direct_sockfs) - strrput_sig(q, B_TRUE); + if (tcp->tcp_direct_sockfs && !sd_rd_eof) + (void) strrput_sig(q, B_TRUE); ASSERT(cnt == tcp->tcp_rcv_cnt); tcp->tcp_rcv_last_head = NULL; @@ -892,6 +924,12 @@ tcp->tcp_fuse_rcv_unread_cnt = 0; tcp->tcp_rwnd = q->q_hiwat; + if (peer_tcp->tcp_flow_stopped && (TCP_UNSENT_BYTES(peer_tcp) <= + peer_tcp->tcp_xmit_lowater)) { + tcp_clrqfull(peer_tcp); + TCP_STAT(tcps, tcp_fusion_backenabled); + } + return (B_TRUE); } @@ -1198,19 +1236,28 @@ } /* - * Allow or disallow signals to be generated by strrput(). + * Used to enable/disable signal generation at the stream head. We already + * generated the signal(s) for these messages when they were enqueued on the + * receiver. We also check if STREOF is set here. If it is, we return false + * and let the caller decide what to do. */ -static void +static boolean_t strrput_sig(queue_t *q, boolean_t on) { struct stdata *stp = STREAM(q); mutex_enter(&stp->sd_lock); + if (stp->sd_flag == STREOF) { + mutex_exit(&stp->sd_lock); + return (B_TRUE); + } if (on) stp->sd_flag &= ~STRGETINPROG; else stp->sd_flag |= STRGETINPROG; mutex_exit(&stp->sd_lock); + + return (B_FALSE); } /* @@ -1235,6 +1282,18 @@ TCP_FUSE_SYNCSTR_PLUG_DRAIN(peer_tcp); /* + * Cancel any pending push timers. + */ + if (tcp->tcp_push_tid != 0) { + (void) TCP_TIMER_CANCEL(tcp, tcp->tcp_push_tid); + tcp->tcp_push_tid = 0; + } + if (peer_tcp->tcp_push_tid != 0) { + (void) TCP_TIMER_CANCEL(peer_tcp, peer_tcp->tcp_push_tid); + peer_tcp->tcp_push_tid = 0; + } + + /* * Drain any pending data; the detached check is needed because * we may be called as a result of a tcp_unfuse() triggered by * tcp_fuse_output(). Note that in case of a detached tcp, the
--- a/usr/src/uts/common/inet/udp/udp.c Fri Jun 27 01:38:17 2008 -0700 +++ b/usr/src/uts/common/inet/udp/udp.c Fri Jun 27 08:24:48 2008 -0700 @@ -8156,12 +8156,11 @@ * datagram upstream and call STR_WAKEUP_SET() again when there * are still data remaining in our receive queue. */ - if (udp->udp_rcv_list_head == NULL) { - STR_WAKEUP_SET(STREAM(q)); + STR_WAKEUP_SENDSIG(STREAM(q), udp->udp_rcv_list_head); + if (udp->udp_rcv_list_head == NULL) udp->udp_rcv_list_head = mp; - } else { + else udp->udp_rcv_list_tail->b_next = mp; - } udp->udp_rcv_list_tail = mp; udp->udp_rcv_cnt += pkt_len; udp->udp_rcv_msgcnt++; @@ -8171,8 +8170,6 @@ udp->udp_rcv_msgcnt >= udp->udp_rcv_hiwat) udp->udp_drain_qfull = B_TRUE; - /* Update poll events and send SIGPOLL/SIGIO if necessary */ - STR_SENDSIG(STREAM(q)); mutex_exit(&udp->udp_drain_lock); }