changeset 14164:dceb17481b99

4045 zfs write throttle & i/o scheduler performance work Reviewed by: George Wilson <george.wilson@delphix.com> Reviewed by: Adam Leventhal <ahl@delphix.com> Reviewed by: Christopher Siden <christopher.siden@delphix.com> Reviewed by: Ned Bass <bass6@llnl.gov> Reviewed by: Brendan Gregg <brendan.gregg@joyent.com> Approved by: Robert Mustacchi <rm@joyent.com>
author Matthew Ahrens <mahrens@delphix.com>
date Mon, 26 Aug 2013 13:13:26 -0800
parents 712ede127bb4
children 89c6cfbfee9f
files usr/src/cmd/mdb/common/modules/zfs/zfs.c usr/src/cmd/ztest/ztest.c usr/src/lib/libzpool/common/llib-lzpool usr/src/lib/libzpool/common/sys/zfs_context.h usr/src/uts/common/fs/zfs/arc.c usr/src/uts/common/fs/zfs/dbuf.c usr/src/uts/common/fs/zfs/dmu.c usr/src/uts/common/fs/zfs/dmu_objset.c usr/src/uts/common/fs/zfs/dmu_tx.c usr/src/uts/common/fs/zfs/dmu_zfetch.c usr/src/uts/common/fs/zfs/dnode.c usr/src/uts/common/fs/zfs/dsl_dir.c usr/src/uts/common/fs/zfs/dsl_pool.c usr/src/uts/common/fs/zfs/dsl_scan.c usr/src/uts/common/fs/zfs/spa.c usr/src/uts/common/fs/zfs/spa_misc.c usr/src/uts/common/fs/zfs/sys/arc.h usr/src/uts/common/fs/zfs/sys/dbuf.h usr/src/uts/common/fs/zfs/sys/dmu.h usr/src/uts/common/fs/zfs/sys/dmu_tx.h usr/src/uts/common/fs/zfs/sys/dsl_dir.h usr/src/uts/common/fs/zfs/sys/dsl_pool.h usr/src/uts/common/fs/zfs/sys/sa_impl.h usr/src/uts/common/fs/zfs/sys/spa_impl.h usr/src/uts/common/fs/zfs/sys/txg.h usr/src/uts/common/fs/zfs/sys/txg_impl.h usr/src/uts/common/fs/zfs/sys/vdev_impl.h usr/src/uts/common/fs/zfs/sys/zfs_context.h usr/src/uts/common/fs/zfs/sys/zio.h usr/src/uts/common/fs/zfs/txg.c usr/src/uts/common/fs/zfs/vdev.c usr/src/uts/common/fs/zfs/vdev_cache.c usr/src/uts/common/fs/zfs/vdev_mirror.c usr/src/uts/common/fs/zfs/vdev_queue.c usr/src/uts/common/fs/zfs/vdev_raidz.c usr/src/uts/common/fs/zfs/zfs_vnops.c usr/src/uts/common/fs/zfs/zil.c usr/src/uts/common/fs/zfs/zio.c
diffstat 38 files changed, 1430 insertions(+), 756 deletions(-) [+]
line wrap: on
line diff
--- a/usr/src/cmd/mdb/common/modules/zfs/zfs.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/cmd/mdb/common/modules/zfs/zfs.c	Mon Aug 26 13:13:26 2013 -0800
@@ -274,6 +274,26 @@
 	 */
 	static const char *params[] = {
 		"arc_reduce_dnlc_percent",
+		"arc_lotsfree_percent",
+		"zfs_dirty_data_max",
+		"zfs_dirty_data_sync",
+		"zfs_delay_max_ns",
+		"zfs_delay_min_dirty_percent",
+		"zfs_delay_scale",
+		"zfs_vdev_max_active",
+		"zfs_vdev_sync_read_min_active",
+		"zfs_vdev_sync_read_max_active",
+		"zfs_vdev_sync_write_min_active",
+		"zfs_vdev_sync_write_max_active",
+		"zfs_vdev_async_read_min_active",
+		"zfs_vdev_async_read_max_active",
+		"zfs_vdev_async_write_min_active",
+		"zfs_vdev_async_write_max_active",
+		"zfs_vdev_scrub_min_active",
+		"zfs_vdev_scrub_max_active",
+		"zfs_vdev_async_write_active_min_dirty_percent",
+		"zfs_vdev_async_write_active_max_dirty_percent",
+		"spa_asize_inflation",
 		"zfs_arc_max",
 		"zfs_arc_min",
 		"arc_shrink_shift",
@@ -291,24 +311,14 @@
 		"spa_max_replication_override",
 		"spa_mode_global",
 		"zfs_flags",
-		"zfs_txg_synctime_ms",
 		"zfs_txg_timeout",
-		"zfs_write_limit_min",
-		"zfs_write_limit_max",
-		"zfs_write_limit_shift",
-		"zfs_write_limit_override",
-		"zfs_no_write_throttle",
 		"zfs_vdev_cache_max",
 		"zfs_vdev_cache_size",
 		"zfs_vdev_cache_bshift",
 		"vdev_mirror_shift",
-		"zfs_vdev_max_pending",
-		"zfs_vdev_min_pending",
 		"zfs_scrub_limit",
 		"zfs_no_scrub_io",
 		"zfs_no_scrub_prefetch",
-		"zfs_vdev_time_shift",
-		"zfs_vdev_ramp_rate",
 		"zfs_vdev_aggregation_limit",
 		"fzap_default_block_shift",
 		"zfs_immediate_write_sz",
@@ -1836,7 +1846,7 @@
 	else
 		ziop = (uintptr_t)zl.zl_child;
 
-	return (zio_print_cb(ziop, arg));
+	return (zio_print_cb(ziop, zpa));
 }
 
 /* ARGSUSED */
--- a/usr/src/cmd/ztest/ztest.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/cmd/ztest/ztest.c	Mon Aug 26 13:13:26 2013 -0800
@@ -184,7 +184,7 @@
 
 extern uint64_t metaslab_gang_bang;
 extern uint64_t metaslab_df_alloc_threshold;
-extern uint64_t zfs_deadman_synctime;
+extern uint64_t zfs_deadman_synctime_ms;
 
 static ztest_shared_opts_t *ztest_shared_opts;
 static ztest_shared_opts_t ztest_opts;
@@ -5325,10 +5325,10 @@
 	hrtime_t delta, total = 0;
 
 	for (;;) {
-		delta = (zs->zs_thread_stop - zs->zs_thread_start) /
-		    NANOSEC + zfs_deadman_synctime;
-
-		(void) poll(NULL, 0, (int)(1000 * delta));
+		delta = zs->zs_thread_stop - zs->zs_thread_start +
+		    MSEC2NSEC(zfs_deadman_synctime_ms);
+
+		(void) poll(NULL, 0, (int)NSEC2MSEC(delta));
 
 		/*
 		 * If the pool is suspended then fail immediately. Otherwise,
@@ -5339,12 +5339,12 @@
 		if (spa_suspended(spa)) {
 			fatal(0, "aborting test after %llu seconds because "
 			    "pool has transitioned to a suspended state.",
-			    zfs_deadman_synctime);
+			    zfs_deadman_synctime_ms / 1000);
 			return (NULL);
 		}
 		vdev_deadman(spa->spa_root_vdev);
 
-		total += zfs_deadman_synctime;
+		total += zfs_deadman_synctime_ms/1000;
 		(void) printf("ztest has been running for %lld seconds\n",
 		    total);
 	}
@@ -6073,7 +6073,7 @@
 	(void) setvbuf(stdout, NULL, _IOLBF, 0);
 
 	dprintf_setup(&argc, argv);
-	zfs_deadman_synctime = 300;
+	zfs_deadman_synctime_ms = 300000;
 
 	ztest_fd_rand = open("/dev/urandom", O_RDONLY);
 	ASSERT3S(ztest_fd_rand, >=, 0);
--- a/usr/src/lib/libzpool/common/llib-lzpool	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/lib/libzpool/common/llib-lzpool	Mon Aug 26 13:13:26 2013 -0800
@@ -24,7 +24,7 @@
  */
 
 /*
- * Copyright (c) 2012 by Delphix. All rights reserved.
+ * Copyright (c) 2013 by Delphix. All rights reserved.
  */
 
 /* LINTLIBRARY */
@@ -64,4 +64,4 @@
 extern uint64_t metaslab_gang_bang;
 extern uint64_t metaslab_df_alloc_threshold;
 extern boolean_t zfeature_checks_disable;
-extern uint64_t zfs_deadman_synctime;
+extern uint64_t zfs_deadman_synctime_ms;
--- a/usr/src/lib/libzpool/common/sys/zfs_context.h	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/lib/libzpool/common/sys/zfs_context.h	Mon Aug 26 13:13:26 2013 -0800
@@ -165,6 +165,8 @@
  */
 #define	curthread	((void *)(uintptr_t)thr_self())
 
+#define	kpreempt(x)	yield()
+
 typedef struct kthread kthread_t;
 
 #define	thread_create(stk, stksize, func, arg, len, pp, state, pri)	\
--- a/usr/src/uts/common/fs/zfs/arc.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/arc.c	Mon Aug 26 13:13:26 2013 -0800
@@ -127,6 +127,7 @@
 #include <sys/refcount.h>
 #include <sys/vdev.h>
 #include <sys/vdev_impl.h>
+#include <sys/dsl_pool.h>
 #ifdef _KERNEL
 #include <sys/vmsystm.h>
 #include <vm/anon.h>
@@ -147,10 +148,6 @@
 static kcondvar_t	arc_reclaim_thr_cv;	/* used to signal reclaim thr */
 static uint8_t		arc_thread_exit;
 
-extern int zfs_write_limit_shift;
-extern uint64_t zfs_write_limit_max;
-extern kmutex_t zfs_write_limit_lock;
-
 #define	ARC_REDUCE_DNLC_PERCENT	3
 uint_t arc_reduce_dnlc_percent = ARC_REDUCE_DNLC_PERCENT;
 
@@ -159,6 +156,12 @@
 	ARC_RECLAIM_CONS		/* Conservative reclaim strategy */
 } arc_reclaim_strategy_t;
 
+/*
+ * The number of iterations through arc_evict_*() before we
+ * drop & reacquire the lock.
+ */
+int arc_evict_iterations = 100;
+
 /* number of seconds before growing cache again */
 static int		arc_grow_retry = 60;
 
@@ -174,6 +177,11 @@
  */
 static int		arc_min_prefetch_lifespan;
 
+/*
+ * If this percent of memory is free, don't throttle.
+ */
+int arc_lotsfree_percent = 10;
+
 static int arc_dead;
 
 /*
@@ -469,6 +477,7 @@
 struct arc_write_callback {
 	void		*awcb_private;
 	arc_done_func_t	*awcb_ready;
+	arc_done_func_t	*awcb_physdone;
 	arc_done_func_t	*awcb_done;
 	arc_buf_t	*awcb_buf;
 };
@@ -1163,7 +1172,7 @@
 	uint64_t from_delta, to_delta;
 
 	ASSERT(MUTEX_HELD(hash_lock));
-	ASSERT(new_state != old_state);
+	ASSERT3P(new_state, !=, old_state);
 	ASSERT(refcnt == 0 || ab->b_datacnt > 0);
 	ASSERT(ab->b_datacnt == 0 || !GHOST_STATE(new_state));
 	ASSERT(ab->b_datacnt <= 1 || old_state != arc_anon);
@@ -1778,6 +1787,8 @@
 	kmutex_t *hash_lock;
 	boolean_t have_lock;
 	void *stolen = NULL;
+	arc_buf_hdr_t marker = { 0 };
+	int count = 0;
 
 	ASSERT(state == arc_mru || state == arc_mfu);
 
@@ -1801,6 +1812,33 @@
 		if (recycle && ab->b_size != bytes &&
 		    ab_prev && ab_prev->b_size == bytes)
 			continue;
+
+		/* ignore markers */
+		if (ab->b_spa == 0)
+			continue;
+
+		/*
+		 * It may take a long time to evict all the bufs requested.
+		 * To avoid blocking all arc activity, periodically drop
+		 * the arcs_mtx and give other threads a chance to run
+		 * before reacquiring the lock.
+		 *
+		 * If we are looking for a buffer to recycle, we are in
+		 * the hot code path, so don't sleep.
+		 */
+		if (!recycle && count++ > arc_evict_iterations) {
+			list_insert_after(list, ab, &marker);
+			mutex_exit(&evicted_state->arcs_mtx);
+			mutex_exit(&state->arcs_mtx);
+			kpreempt(KPREEMPT_SYNC);
+			mutex_enter(&state->arcs_mtx);
+			mutex_enter(&evicted_state->arcs_mtx);
+			ab_prev = list_prev(list, &marker);
+			list_remove(list, &marker);
+			count = 0;
+			continue;
+		}
+
 		hash_lock = HDR_LOCK(ab);
 		have_lock = MUTEX_HELD(hash_lock);
 		if (have_lock || mutex_tryenter(hash_lock)) {
@@ -1882,25 +1920,11 @@
 		ARCSTAT_INCR(arcstat_mutex_miss, missed);
 
 	/*
-	 * We have just evicted some data into the ghost state, make
-	 * sure we also adjust the ghost state size if necessary.
+	 * Note: we have just evicted some data into the ghost state,
+	 * potentially putting the ghost size over the desired size.  Rather
+	 * that evicting from the ghost list in this hot code path, leave
+	 * this chore to the arc_reclaim_thread().
 	 */
-	if (arc_no_grow &&
-	    arc_mru_ghost->arcs_size + arc_mfu_ghost->arcs_size > arc_c) {
-		int64_t mru_over = arc_anon->arcs_size + arc_mru->arcs_size +
-		    arc_mru_ghost->arcs_size - arc_c;
-
-		if (mru_over > 0 && arc_mru_ghost->arcs_lsize[type] > 0) {
-			int64_t todelete =
-			    MIN(arc_mru_ghost->arcs_lsize[type], mru_over);
-			arc_evict_ghost(arc_mru_ghost, NULL, todelete);
-		} else if (arc_mfu_ghost->arcs_lsize[type] > 0) {
-			int64_t todelete = MIN(arc_mfu_ghost->arcs_lsize[type],
-			    arc_mru_ghost->arcs_size +
-			    arc_mfu_ghost->arcs_size - arc_c);
-			arc_evict_ghost(arc_mfu_ghost, NULL, todelete);
-		}
-	}
 
 	return (stolen);
 }
@@ -1918,12 +1942,15 @@
 	kmutex_t *hash_lock;
 	uint64_t bytes_deleted = 0;
 	uint64_t bufs_skipped = 0;
+	int count = 0;
 
 	ASSERT(GHOST_STATE(state));
 top:
 	mutex_enter(&state->arcs_mtx);
 	for (ab = list_tail(list); ab; ab = ab_prev) {
 		ab_prev = list_prev(list, ab);
+		if (ab->b_type > ARC_BUFC_NUMTYPES)
+			panic("invalid ab=%p", (void *)ab);
 		if (spa && ab->b_spa != spa)
 			continue;
 
@@ -1935,6 +1962,23 @@
 		/* caller may be trying to modify this buffer, skip it */
 		if (MUTEX_HELD(hash_lock))
 			continue;
+
+		/*
+		 * It may take a long time to evict all the bufs requested.
+		 * To avoid blocking all arc activity, periodically drop
+		 * the arcs_mtx and give other threads a chance to run
+		 * before reacquiring the lock.
+		 */
+		if (count++ > arc_evict_iterations) {
+			list_insert_after(list, ab, &marker);
+			mutex_exit(&state->arcs_mtx);
+			kpreempt(KPREEMPT_SYNC);
+			mutex_enter(&state->arcs_mtx);
+			ab_prev = list_prev(list, &marker);
+			list_remove(list, &marker);
+			count = 0;
+			continue;
+		}
 		if (mutex_tryenter(hash_lock)) {
 			ASSERT(!HDR_IO_IN_PROGRESS(ab));
 			ASSERT(ab->b_buf == NULL);
@@ -1970,8 +2014,10 @@
 			mutex_enter(&state->arcs_mtx);
 			ab_prev = list_prev(list, &marker);
 			list_remove(list, &marker);
-		} else
+		} else {
 			bufs_skipped += 1;
+		}
+
 	}
 	mutex_exit(&state->arcs_mtx);
 
@@ -2825,7 +2871,7 @@
  */
 int
 arc_read(zio_t *pio, spa_t *spa, const blkptr_t *bp, arc_done_func_t *done,
-    void *private, int priority, int zio_flags, uint32_t *arc_flags,
+    void *private, zio_priority_t priority, int zio_flags, uint32_t *arc_flags,
     const zbookmark_t *zb)
 {
 	arc_buf_hdr_t *hdr;
@@ -3428,6 +3474,18 @@
 	hdr->b_flags |= ARC_IO_IN_PROGRESS;
 }
 
+/*
+ * The SPA calls this callback for each physical write that happens on behalf
+ * of a logical write.  See the comment in dbuf_write_physdone() for details.
+ */
+static void
+arc_write_physdone(zio_t *zio)
+{
+	arc_write_callback_t *cb = zio->io_private;
+	if (cb->awcb_physdone != NULL)
+		cb->awcb_physdone(zio, cb->awcb_buf, cb->awcb_private);
+}
+
 static void
 arc_write_done(zio_t *zio)
 {
@@ -3508,8 +3566,9 @@
 zio_t *
 arc_write(zio_t *pio, spa_t *spa, uint64_t txg,
     blkptr_t *bp, arc_buf_t *buf, boolean_t l2arc, boolean_t l2arc_compress,
-    const zio_prop_t *zp, arc_done_func_t *ready, arc_done_func_t *done,
-    void *private, int priority, int zio_flags, const zbookmark_t *zb)
+    const zio_prop_t *zp, arc_done_func_t *ready, arc_done_func_t *physdone,
+    arc_done_func_t *done, void *private, zio_priority_t priority,
+    int zio_flags, const zbookmark_t *zb)
 {
 	arc_buf_hdr_t *hdr = buf->b_hdr;
 	arc_write_callback_t *callback;
@@ -3526,18 +3585,20 @@
 		hdr->b_flags |= ARC_L2COMPRESS;
 	callback = kmem_zalloc(sizeof (arc_write_callback_t), KM_SLEEP);
 	callback->awcb_ready = ready;
+	callback->awcb_physdone = physdone;
 	callback->awcb_done = done;
 	callback->awcb_private = private;
 	callback->awcb_buf = buf;
 
 	zio = zio_write(pio, spa, txg, bp, buf->b_data, hdr->b_size, zp,
-	    arc_write_ready, arc_write_done, callback, priority, zio_flags, zb);
+	    arc_write_ready, arc_write_physdone, arc_write_done, callback,
+	    priority, zio_flags, zb);
 
 	return (zio);
 }
 
 static int
-arc_memory_throttle(uint64_t reserve, uint64_t inflight_data, uint64_t txg)
+arc_memory_throttle(uint64_t reserve, uint64_t txg)
 {
 #ifdef _KERNEL
 	uint64_t available_memory = ptob(freemem);
@@ -3548,7 +3609,8 @@
 	available_memory =
 	    MIN(available_memory, vmem_size(heap_arena, VMEM_FREE));
 #endif
-	if (available_memory >= zfs_write_limit_max)
+
+	if (freemem > physmem * arc_lotsfree_percent / 100)
 		return (0);
 
 	if (txg > last_txg) {
@@ -3572,20 +3634,6 @@
 		return (SET_ERROR(EAGAIN));
 	}
 	page_load = 0;
-
-	if (arc_size > arc_c_min) {
-		uint64_t evictable_memory =
-		    arc_mru->arcs_lsize[ARC_BUFC_DATA] +
-		    arc_mru->arcs_lsize[ARC_BUFC_METADATA] +
-		    arc_mfu->arcs_lsize[ARC_BUFC_DATA] +
-		    arc_mfu->arcs_lsize[ARC_BUFC_METADATA];
-		available_memory += MIN(evictable_memory, arc_size - arc_c_min);
-	}
-
-	if (inflight_data > available_memory / 4) {
-		ARCSTAT_INCR(arcstat_memory_throttle_count, 1);
-		return (SET_ERROR(ERESTART));
-	}
 #endif
 	return (0);
 }
@@ -3603,15 +3651,6 @@
 	int error;
 	uint64_t anon_size;
 
-#ifdef ZFS_DEBUG
-	/*
-	 * Once in a while, fail for no reason.  Everything should cope.
-	 */
-	if (spa_get_random(10000) == 0) {
-		dprintf("forcing random failure\n");
-		return (SET_ERROR(ERESTART));
-	}
-#endif
 	if (reserve > arc_c/4 && !arc_no_grow)
 		arc_c = MIN(arc_c_max, reserve * 4);
 	if (reserve > arc_c)
@@ -3629,7 +3668,8 @@
 	 * in order to compress/encrypt/etc the data.  We therefore need to
 	 * make sure that there is sufficient available memory for this.
 	 */
-	if (error = arc_memory_throttle(reserve, anon_size, txg))
+	error = arc_memory_throttle(reserve, txg);
+	if (error != 0)
 		return (error);
 
 	/*
@@ -3778,11 +3818,20 @@
 	arc_dead = FALSE;
 	arc_warm = B_FALSE;
 
-	if (zfs_write_limit_max == 0)
-		zfs_write_limit_max = ptob(physmem) >> zfs_write_limit_shift;
-	else
-		zfs_write_limit_shift = 0;
-	mutex_init(&zfs_write_limit_lock, NULL, MUTEX_DEFAULT, NULL);
+	/*
+	 * Calculate maximum amount of dirty data per pool.
+	 *
+	 * If it has been set by /etc/system, take that.
+	 * Otherwise, use a percentage of physical memory defined by
+	 * zfs_dirty_data_max_percent (default 10%) with a cap at
+	 * zfs_dirty_data_max_max (default 4GB).
+	 */
+	if (zfs_dirty_data_max == 0) {
+		zfs_dirty_data_max = physmem * PAGESIZE *
+		    zfs_dirty_data_max_percent / 100;
+		zfs_dirty_data_max = MIN(zfs_dirty_data_max,
+		    zfs_dirty_data_max_max);
+	}
 }
 
 void
@@ -3823,8 +3872,6 @@
 	mutex_destroy(&arc_mfu_ghost->arcs_mtx);
 	mutex_destroy(&arc_l2c_only->arcs_mtx);
 
-	mutex_destroy(&zfs_write_limit_lock);
-
 	buf_fini();
 
 	ASSERT(arc_loaned_bytes == 0);
--- a/usr/src/uts/common/fs/zfs/dbuf.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/dbuf.c	Mon Aug 26 13:13:26 2013 -0800
@@ -842,7 +842,7 @@
 		atomic_inc_64(&zfs_free_range_recv_miss);
 	}
 
-	for (db = list_head(&dn->dn_dbufs); db; db = db_next) {
+	for (db = list_head(&dn->dn_dbufs); db != NULL; db = db_next) {
 		db_next = list_next(&dn->dn_dbufs, db);
 		ASSERT(db->db_blkid != DMU_BONUS_BLKID);
 
@@ -1188,6 +1188,8 @@
 		    sizeof (dbuf_dirty_record_t),
 		    offsetof(dbuf_dirty_record_t, dr_dirty_node));
 	}
+	if (db->db_blkid != DMU_BONUS_BLKID && os->os_dsl_dataset != NULL)
+		dr->dr_accounted = db->db.db_size;
 	dr->dr_dbuf = db;
 	dr->dr_txg = tx->tx_txg;
 	dr->dr_next = *drp;
@@ -1271,7 +1273,10 @@
 			dbuf_rele(parent, FTAG);
 
 		mutex_enter(&db->db_mtx);
-		/*  possible race with dbuf_undirty() */
+		/*
+		 * Since we've dropped the mutex, it's possible that
+		 * dbuf_undirty() might have changed this out from under us.
+		 */
 		if (db->db_last_dirty == dr ||
 		    dn->dn_object == DMU_META_DNODE_OBJECT) {
 			mutex_enter(&di->dt.di.dr_mtx);
@@ -1341,7 +1346,11 @@
 
 	ASSERT(db->db.db_size != 0);
 
-	/* XXX would be nice to fix up dn_towrite_space[] */
+	/*
+	 * Any space we accounted for in dp_dirty_* will be cleaned up by
+	 * dsl_pool_sync().  This is relatively rare so the discrepancy
+	 * is not a big deal.
+	 */
 
 	*drp = dr->dr_next;
 
@@ -1521,7 +1530,7 @@
 
 /*
  * "Clear" the contents of this dbuf.  This will mark the dbuf
- * EVICTING and clear *most* of its references.  Unfortunetely,
+ * EVICTING and clear *most* of its references.  Unfortunately,
  * when we are not holding the dn_dbufs_mtx, we can't clear the
  * entry in the dn_dbufs list.  We have to wait until dbuf_destroy()
  * in this case.  For callers from the DMU we will usually see:
@@ -1708,7 +1717,7 @@
 		db->db.db_offset = 0;
 	} else {
 		int blocksize =
-		    db->db_level ? 1<<dn->dn_indblkshift :  dn->dn_datablksz;
+		    db->db_level ? 1 << dn->dn_indblkshift : dn->dn_datablksz;
 		db->db.db_size = blocksize;
 		db->db.db_offset = db->db_blkid * blocksize;
 	}
@@ -1817,7 +1826,7 @@
 }
 
 void
-dbuf_prefetch(dnode_t *dn, uint64_t blkid)
+dbuf_prefetch(dnode_t *dn, uint64_t blkid, zio_priority_t prio)
 {
 	dmu_buf_impl_t *db = NULL;
 	blkptr_t *bp = NULL;
@@ -1841,8 +1850,6 @@
 
 	if (dbuf_findbp(dn, 0, blkid, TRUE, &db, &bp) == 0) {
 		if (bp && !BP_IS_HOLE(bp)) {
-			int priority = dn->dn_type == DMU_OT_DDT_ZAP ?
-			    ZIO_PRIORITY_DDT_PREFETCH : ZIO_PRIORITY_ASYNC_READ;
 			dsl_dataset_t *ds = dn->dn_objset->os_dsl_dataset;
 			uint32_t aflags = ARC_NOWAIT | ARC_PREFETCH;
 			zbookmark_t zb;
@@ -1851,7 +1858,7 @@
 			    dn->dn_object, 0, blkid);
 
 			(void) arc_read(NULL, dn->dn_objset->os_spa,
-			    bp, NULL, NULL, priority,
+			    bp, NULL, NULL, prio,
 			    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
 			    &aflags, &zb);
 		}
@@ -2532,6 +2539,38 @@
 	mutex_exit(&db->db_mtx);
 }
 
+/*
+ * The SPA will call this callback several times for each zio - once
+ * for every physical child i/o (zio->io_phys_children times).  This
+ * allows the DMU to monitor the progress of each logical i/o.  For example,
+ * there may be 2 copies of an indirect block, or many fragments of a RAID-Z
+ * block.  There may be a long delay before all copies/fragments are completed,
+ * so this callback allows us to retire dirty space gradually, as the physical
+ * i/os complete.
+ */
+/* ARGSUSED */
+static void
+dbuf_write_physdone(zio_t *zio, arc_buf_t *buf, void *arg)
+{
+	dmu_buf_impl_t *db = arg;
+	objset_t *os = db->db_objset;
+	dsl_pool_t *dp = dmu_objset_pool(os);
+	dbuf_dirty_record_t *dr;
+	int delta = 0;
+
+	dr = db->db_data_pending;
+	ASSERT3U(dr->dr_txg, ==, zio->io_txg);
+
+	/*
+	 * The callback will be called io_phys_children times.  Retire one
+	 * portion of our dirty space each time we are called.  Any rounding
+	 * error will be cleaned up by dsl_pool_sync()'s call to
+	 * dsl_pool_undirty_space().
+	 */
+	delta = dr->dr_accounted / zio->io_phys_children;
+	dsl_pool_undirty_space(dp, delta, zio->io_txg);
+}
+
 /* ARGSUSED */
 static void
 dbuf_write_done(zio_t *zio, arc_buf_t *buf, void *vdb)
@@ -2626,6 +2665,7 @@
 	ASSERT(db->db_dirtycnt > 0);
 	db->db_dirtycnt -= 1;
 	db->db_data_pending = NULL;
+
 	dbuf_rele_and_unlock(db, (void *)(uintptr_t)txg);
 }
 
@@ -2744,8 +2784,8 @@
 		ASSERT(db->db_state != DB_NOFILL);
 		dr->dr_zio = zio_write(zio, os->os_spa, txg,
 		    db->db_blkptr, data->b_data, arc_buf_size(data), &zp,
-		    dbuf_write_override_ready, dbuf_write_override_done, dr,
-		    ZIO_PRIORITY_ASYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, &zb);
+		    dbuf_write_override_ready, NULL, dbuf_write_override_done,
+		    dr, ZIO_PRIORITY_ASYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, &zb);
 		mutex_enter(&db->db_mtx);
 		dr->dt.dl.dr_override_state = DR_NOT_OVERRIDDEN;
 		zio_write_override(dr->dr_zio, &dr->dt.dl.dr_overridden_by,
@@ -2756,7 +2796,7 @@
 		    zp.zp_checksum == ZIO_CHECKSUM_NOPARITY);
 		dr->dr_zio = zio_write(zio, os->os_spa, txg,
 		    db->db_blkptr, NULL, db->db.db_size, &zp,
-		    dbuf_write_nofill_ready, dbuf_write_nofill_done, db,
+		    dbuf_write_nofill_ready, NULL, dbuf_write_nofill_done, db,
 		    ZIO_PRIORITY_ASYNC_WRITE,
 		    ZIO_FLAG_MUSTSUCCEED | ZIO_FLAG_NODATA, &zb);
 	} else {
@@ -2764,7 +2804,7 @@
 		dr->dr_zio = arc_write(zio, os->os_spa, txg,
 		    db->db_blkptr, data, DBUF_IS_L2CACHEABLE(db),
 		    DBUF_IS_L2COMPRESSIBLE(db), &zp, dbuf_write_ready,
-		    dbuf_write_done, db, ZIO_PRIORITY_ASYNC_WRITE,
-		    ZIO_FLAG_MUSTSUCCEED, &zb);
+		    dbuf_write_physdone, dbuf_write_done, db,
+		    ZIO_PRIORITY_ASYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, &zb);
 	}
 }
--- a/usr/src/uts/common/fs/zfs/dmu.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/dmu.c	Mon Aug 26 13:13:26 2013 -0800
@@ -371,13 +371,11 @@
 dmu_buf_hold_array_by_dnode(dnode_t *dn, uint64_t offset, uint64_t length,
     int read, void *tag, int *numbufsp, dmu_buf_t ***dbpp, uint32_t flags)
 {
-	dsl_pool_t *dp = NULL;
 	dmu_buf_t **dbp;
 	uint64_t blkid, nblks, i;
 	uint32_t dbuf_flags;
 	int err;
 	zio_t *zio;
-	hrtime_t start;
 
 	ASSERT(length <= DMU_MAX_ACCESS);
 
@@ -405,9 +403,6 @@
 	}
 	dbp = kmem_zalloc(sizeof (dmu_buf_t *) * nblks, KM_SLEEP);
 
-	if (dn->dn_objset->os_dsl_dataset)
-		dp = dn->dn_objset->os_dsl_dataset->ds_dir->dd_pool;
-	start = gethrtime();
 	zio = zio_root(dn->dn_objset->os_spa, NULL, NULL, ZIO_FLAG_CANFAIL);
 	blkid = dbuf_whichblock(dn, offset);
 	for (i = 0; i < nblks; i++) {
@@ -428,9 +423,6 @@
 
 	/* wait for async i/o */
 	err = zio_wait(zio);
-	/* track read overhead when we are in sync context */
-	if (dp && dsl_pool_sync_context(dp))
-		dp->dp_read_overhead += gethrtime() - start;
 	if (err) {
 		dmu_buf_rele_array(dbp, nblks, tag);
 		return (err);
@@ -512,12 +504,22 @@
 	kmem_free(dbp, sizeof (dmu_buf_t *) * numbufs);
 }
 
+/*
+ * Issue prefetch i/os for the given blocks.
+ *
+ * Note: The assumption is that we *know* these blocks will be needed
+ * almost immediately.  Therefore, the prefetch i/os will be issued at
+ * ZIO_PRIORITY_SYNC_READ
+ *
+ * Note: indirect blocks and other metadata will be read synchronously,
+ * causing this function to block if they are not already cached.
+ */
 void
 dmu_prefetch(objset_t *os, uint64_t object, uint64_t offset, uint64_t len)
 {
 	dnode_t *dn;
 	uint64_t blkid;
-	int nblks, i, err;
+	int nblks, err;
 
 	if (zfs_prefetch_disable)
 		return;
@@ -530,7 +532,7 @@
 
 		rw_enter(&dn->dn_struct_rwlock, RW_READER);
 		blkid = dbuf_whichblock(dn, object * sizeof (dnode_phys_t));
-		dbuf_prefetch(dn, blkid);
+		dbuf_prefetch(dn, blkid, ZIO_PRIORITY_SYNC_READ);
 		rw_exit(&dn->dn_struct_rwlock);
 		return;
 	}
@@ -547,16 +549,16 @@
 	rw_enter(&dn->dn_struct_rwlock, RW_READER);
 	if (dn->dn_datablkshift) {
 		int blkshift = dn->dn_datablkshift;
-		nblks = (P2ROUNDUP(offset+len, 1<<blkshift) -
-		    P2ALIGN(offset, 1<<blkshift)) >> blkshift;
+		nblks = (P2ROUNDUP(offset + len, 1 << blkshift) -
+		    P2ALIGN(offset, 1 << blkshift)) >> blkshift;
 	} else {
 		nblks = (offset < dn->dn_datablksz);
 	}
 
 	if (nblks != 0) {
 		blkid = dbuf_whichblock(dn, offset);
-		for (i = 0; i < nblks; i++)
-			dbuf_prefetch(dn, blkid+i);
+		for (int i = 0; i < nblks; i++)
+			dbuf_prefetch(dn, blkid + i, ZIO_PRIORITY_SYNC_READ);
 	}
 
 	rw_exit(&dn->dn_struct_rwlock);
@@ -1356,7 +1358,7 @@
 
 	zio_nowait(zio_write(pio, os->os_spa, dmu_tx_get_txg(tx), zgd->zgd_bp,
 	    zgd->zgd_db->db_data, zgd->zgd_db->db_size, zp,
-	    dmu_sync_late_arrival_ready, dmu_sync_late_arrival_done, dsa,
+	    dmu_sync_late_arrival_ready, NULL, dmu_sync_late_arrival_done, dsa,
 	    ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_CANFAIL, zb));
 
 	return (0);
@@ -1496,8 +1498,9 @@
 
 	zio_nowait(arc_write(pio, os->os_spa, txg,
 	    bp, dr->dt.dl.dr_data, DBUF_IS_L2CACHEABLE(db),
-	    DBUF_IS_L2COMPRESSIBLE(db), &zp, dmu_sync_ready, dmu_sync_done,
-	    dsa, ZIO_PRIORITY_SYNC_WRITE, ZIO_FLAG_CANFAIL, &zb));
+	    DBUF_IS_L2COMPRESSIBLE(db), &zp, dmu_sync_ready,
+	    NULL, dmu_sync_done, dsa, ZIO_PRIORITY_SYNC_WRITE,
+	    ZIO_FLAG_CANFAIL, &zb));
 
 	return (0);
 }
--- a/usr/src/uts/common/fs/zfs/dmu_objset.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/dmu_objset.c	Mon Aug 26 13:13:26 2013 -0800
@@ -1028,7 +1028,7 @@
 	zio = arc_write(pio, os->os_spa, tx->tx_txg,
 	    os->os_rootbp, os->os_phys_buf, DMU_OS_IS_L2CACHEABLE(os),
 	    DMU_OS_IS_L2COMPRESSIBLE(os), &zp, dmu_objset_write_ready,
-	    dmu_objset_write_done, os, ZIO_PRIORITY_ASYNC_WRITE,
+	    NULL, dmu_objset_write_done, os, ZIO_PRIORITY_ASYNC_WRITE,
 	    ZIO_FLAG_MUSTSUCCEED, &zb);
 
 	/*
--- a/usr/src/uts/common/fs/zfs/dmu_tx.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/dmu_tx.c	Mon Aug 26 13:13:26 2013 -0800
@@ -54,6 +54,7 @@
 	    offsetof(dmu_tx_hold_t, txh_node));
 	list_create(&tx->tx_callbacks, sizeof (dmu_tx_callback_t),
 	    offsetof(dmu_tx_callback_t, dcb_node));
+	tx->tx_start = gethrtime();
 #ifdef ZFS_DEBUG
 	refcount_create(&tx->tx_space_written);
 	refcount_create(&tx->tx_space_freed);
@@ -597,13 +598,13 @@
 	if (txh == NULL)
 		return;
 	dn = txh->txh_dnode;
+	dmu_tx_count_dnode(txh);
 
 	if (off >= (dn->dn_maxblkid+1) * dn->dn_datablksz)
 		return;
 	if (len == DMU_OBJECT_END)
 		len = (dn->dn_maxblkid+1) * dn->dn_datablksz - off;
 
-	dmu_tx_count_dnode(txh);
 
 	/*
 	 * For i/o error checking, we read the first and last level-0
@@ -911,6 +912,156 @@
 }
 #endif
 
+/*
+ * If we can't do 10 iops, something is wrong.  Let us go ahead
+ * and hit zfs_dirty_data_max.
+ */
+hrtime_t zfs_delay_max_ns = MSEC2NSEC(100);
+int zfs_delay_resolution_ns = 100 * 1000; /* 100 microseconds */
+
+/*
+ * We delay transactions when we've determined that the backend storage
+ * isn't able to accommodate the rate of incoming writes.
+ *
+ * If there is already a transaction waiting, we delay relative to when
+ * that transaction finishes waiting.  This way the calculated min_time
+ * is independent of the number of threads concurrently executing
+ * transactions.
+ *
+ * If we are the only waiter, wait relative to when the transaction
+ * started, rather than the current time.  This credits the transaction for
+ * "time already served", e.g. reading indirect blocks.
+ *
+ * The minimum time for a transaction to take is calculated as:
+ *     min_time = scale * (dirty - min) / (max - dirty)
+ *     min_time is then capped at zfs_delay_max_ns.
+ *
+ * The delay has two degrees of freedom that can be adjusted via tunables.
+ * The percentage of dirty data at which we start to delay is defined by
+ * zfs_delay_min_dirty_percent. This should typically be at or above
+ * zfs_vdev_async_write_active_max_dirty_percent so that we only start to
+ * delay after writing at full speed has failed to keep up with the incoming
+ * write rate. The scale of the curve is defined by zfs_delay_scale. Roughly
+ * speaking, this variable determines the amount of delay at the midpoint of
+ * the curve.
+ *
+ * delay
+ *  10ms +-------------------------------------------------------------*+
+ *       |                                                             *|
+ *   9ms +                                                             *+
+ *       |                                                             *|
+ *   8ms +                                                             *+
+ *       |                                                            * |
+ *   7ms +                                                            * +
+ *       |                                                            * |
+ *   6ms +                                                            * +
+ *       |                                                            * |
+ *   5ms +                                                           *  +
+ *       |                                                           *  |
+ *   4ms +                                                           *  +
+ *       |                                                           *  |
+ *   3ms +                                                          *   +
+ *       |                                                          *   |
+ *   2ms +                                              (midpoint) *    +
+ *       |                                                  |    **     |
+ *   1ms +                                                  v ***       +
+ *       |             zfs_delay_scale ---------->     ********         |
+ *     0 +-------------------------------------*********----------------+
+ *       0%                    <- zfs_dirty_data_max ->               100%
+ *
+ * Note that since the delay is added to the outstanding time remaining on the
+ * most recent transaction, the delay is effectively the inverse of IOPS.
+ * Here the midpoint of 500us translates to 2000 IOPS. The shape of the curve
+ * was chosen such that small changes in the amount of accumulated dirty data
+ * in the first 3/4 of the curve yield relatively small differences in the
+ * amount of delay.
+ *
+ * The effects can be easier to understand when the amount of delay is
+ * represented on a log scale:
+ *
+ * delay
+ * 100ms +-------------------------------------------------------------++
+ *       +                                                              +
+ *       |                                                              |
+ *       +                                                             *+
+ *  10ms +                                                             *+
+ *       +                                                           ** +
+ *       |                                              (midpoint)  **  |
+ *       +                                                  |     **    +
+ *   1ms +                                                  v ****      +
+ *       +             zfs_delay_scale ---------->        *****         +
+ *       |                                             ****             |
+ *       +                                          ****                +
+ * 100us +                                        **                    +
+ *       +                                       *                      +
+ *       |                                      *                       |
+ *       +                                     *                        +
+ *  10us +                                     *                        +
+ *       +                                                              +
+ *       |                                                              |
+ *       +                                                              +
+ *       +--------------------------------------------------------------+
+ *       0%                    <- zfs_dirty_data_max ->               100%
+ *
+ * Note here that only as the amount of dirty data approaches its limit does
+ * the delay start to increase rapidly. The goal of a properly tuned system
+ * should be to keep the amount of dirty data out of that range by first
+ * ensuring that the appropriate limits are set for the I/O scheduler to reach
+ * optimal throughput on the backend storage, and then by changing the value
+ * of zfs_delay_scale to increase the steepness of the curve.
+ */
+static void
+dmu_tx_delay(dmu_tx_t *tx, uint64_t dirty)
+{
+	dsl_pool_t *dp = tx->tx_pool;
+	uint64_t delay_min_bytes =
+	    zfs_dirty_data_max * zfs_delay_min_dirty_percent / 100;
+	hrtime_t wakeup, min_tx_time, now;
+
+	if (dirty <= delay_min_bytes)
+		return;
+
+	/*
+	 * The caller has already waited until we are under the max.
+	 * We make them pass us the amount of dirty data so we don't
+	 * have to handle the case of it being >= the max, which could
+	 * cause a divide-by-zero if it's == the max.
+	 */
+	ASSERT3U(dirty, <, zfs_dirty_data_max);
+
+	now = gethrtime();
+	min_tx_time = zfs_delay_scale *
+	    (dirty - delay_min_bytes) / (zfs_dirty_data_max - dirty);
+	if (now > tx->tx_start + min_tx_time)
+		return;
+
+	min_tx_time = MIN(min_tx_time, zfs_delay_max_ns);
+
+	DTRACE_PROBE3(delay__mintime, dmu_tx_t *, tx, uint64_t, dirty,
+	    uint64_t, min_tx_time);
+
+	mutex_enter(&dp->dp_lock);
+	wakeup = MAX(tx->tx_start + min_tx_time,
+	    dp->dp_last_wakeup + min_tx_time);
+	dp->dp_last_wakeup = wakeup;
+	mutex_exit(&dp->dp_lock);
+
+#ifdef _KERNEL
+	mutex_enter(&curthread->t_delay_lock);
+	while (cv_timedwait_hires(&curthread->t_delay_cv,
+	    &curthread->t_delay_lock, wakeup, zfs_delay_resolution_ns,
+	    CALLOUT_FLAG_ABSOLUTE | CALLOUT_FLAG_ROUNDUP) > 0)
+		continue;
+	mutex_exit(&curthread->t_delay_lock);
+#else
+	hrtime_t delta = wakeup - gethrtime();
+	struct timespec ts;
+	ts.tv_sec = delta / NANOSEC;
+	ts.tv_nsec = delta % NANOSEC;
+	(void) nanosleep(&ts, NULL);
+#endif
+}
+
 static int
 dmu_tx_try_assign(dmu_tx_t *tx, txg_how_t txg_how)
 {
@@ -941,6 +1092,12 @@
 		return (SET_ERROR(ERESTART));
 	}
 
+	if (!tx->tx_waited &&
+	    dsl_pool_need_dirty_delay(tx->tx_pool)) {
+		tx->tx_wait_dirty = B_TRUE;
+		return (SET_ERROR(ERESTART));
+	}
+
 	tx->tx_txg = txg_hold_open(tx->tx_pool, &tx->tx_txgh);
 	tx->tx_needassign_txh = NULL;
 
@@ -1065,6 +1222,10 @@
  *	blocking, returns immediately with ERESTART.  This should be used
  *	whenever you're holding locks.  On an ERESTART error, the caller
  *	should drop locks, do a dmu_tx_wait(tx), and try again.
+ *
+ * (3)  TXG_WAITED.  Like TXG_NOWAIT, but indicates that dmu_tx_wait()
+ *      has already been called on behalf of this operation (though
+ *      most likely on a different tx).
  */
 int
 dmu_tx_assign(dmu_tx_t *tx, txg_how_t txg_how)
@@ -1072,12 +1233,16 @@
 	int err;
 
 	ASSERT(tx->tx_txg == 0);
-	ASSERT(txg_how == TXG_WAIT || txg_how == TXG_NOWAIT);
+	ASSERT(txg_how == TXG_WAIT || txg_how == TXG_NOWAIT ||
+	    txg_how == TXG_WAITED);
 	ASSERT(!dsl_pool_sync_context(tx->tx_pool));
 
 	/* If we might wait, we must not hold the config lock. */
 	ASSERT(txg_how != TXG_WAIT || !dsl_pool_config_held(tx->tx_pool));
 
+	if (txg_how == TXG_WAITED)
+		tx->tx_waited = B_TRUE;
+
 	while ((err = dmu_tx_try_assign(tx, txg_how)) != 0) {
 		dmu_tx_unassign(tx);
 
@@ -1096,18 +1261,48 @@
 dmu_tx_wait(dmu_tx_t *tx)
 {
 	spa_t *spa = tx->tx_pool->dp_spa;
+	dsl_pool_t *dp = tx->tx_pool;
 
 	ASSERT(tx->tx_txg == 0);
 	ASSERT(!dsl_pool_config_held(tx->tx_pool));
 
-	/*
-	 * It's possible that the pool has become active after this thread
-	 * has tried to obtain a tx. If that's the case then his
-	 * tx_lasttried_txg would not have been assigned.
-	 */
-	if (spa_suspended(spa) || tx->tx_lasttried_txg == 0) {
-		txg_wait_synced(tx->tx_pool, spa_last_synced_txg(spa) + 1);
+	if (tx->tx_wait_dirty) {
+		/*
+		 * dmu_tx_try_assign() has determined that we need to wait
+		 * because we've consumed much or all of the dirty buffer
+		 * space.
+		 */
+		mutex_enter(&dp->dp_lock);
+		while (dp->dp_dirty_total >= zfs_dirty_data_max)
+			cv_wait(&dp->dp_spaceavail_cv, &dp->dp_lock);
+		uint64_t dirty = dp->dp_dirty_total;
+		mutex_exit(&dp->dp_lock);
+
+		dmu_tx_delay(tx, dirty);
+
+		tx->tx_wait_dirty = B_FALSE;
+
+		/*
+		 * Note: setting tx_waited only has effect if the caller
+		 * used TX_WAIT.  Otherwise they are going to destroy
+		 * this tx and try again.  The common case, zfs_write(),
+		 * uses TX_WAIT.
+		 */
+		tx->tx_waited = B_TRUE;
+	} else if (spa_suspended(spa) || tx->tx_lasttried_txg == 0) {
+		/*
+		 * If the pool is suspended we need to wait until it
+		 * is resumed.  Note that it's possible that the pool
+		 * has become active after this thread has tried to
+		 * obtain a tx.  If that's the case then tx_lasttried_txg
+		 * would not have been set.
+		 */
+		txg_wait_synced(dp, spa_last_synced_txg(spa) + 1);
 	} else if (tx->tx_needassign_txh) {
+		/*
+		 * A dnode is assigned to the quiescing txg.  Wait for its
+		 * transaction to complete.
+		 */
 		dnode_t *dn = tx->tx_needassign_txh->txh_dnode;
 
 		mutex_enter(&dn->dn_mtx);
--- a/usr/src/uts/common/fs/zfs/dmu_zfetch.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/dmu_zfetch.c	Mon Aug 26 13:13:26 2013 -0800
@@ -23,6 +23,10 @@
  * Use is subject to license terms.
  */
 
+/*
+ * Copyright (c) 2013 by Delphix. All rights reserved.
+ */
+
 #include <sys/zfs_context.h>
 #include <sys/dnode.h>
 #include <sys/dmu_objset.h>
@@ -287,7 +291,7 @@
 	fetchsz = dmu_zfetch_fetchsz(dn, blkid, nblks);
 
 	for (i = 0; i < fetchsz; i++) {
-		dbuf_prefetch(dn, blkid + i);
+		dbuf_prefetch(dn, blkid + i, ZIO_PRIORITY_ASYNC_READ);
 	}
 
 	return (fetchsz);
--- a/usr/src/uts/common/fs/zfs/dnode.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/dnode.c	Mon Aug 26 13:13:26 2013 -0800
@@ -1788,23 +1788,22 @@
 }
 
 /*
- * Call when we think we're going to write/free space in open context.
- * Be conservative (ie. OK to write less than this or free more than
- * this, but don't write more or free less).
+ * Call when we think we're going to write/free space in open context to track
+ * the amount of memory in use by the currently open txg.
  */
 void
 dnode_willuse_space(dnode_t *dn, int64_t space, dmu_tx_t *tx)
 {
 	objset_t *os = dn->dn_objset;
 	dsl_dataset_t *ds = os->os_dsl_dataset;
-
-	if (space > 0)
-		space = spa_get_asize(os->os_spa, space);
+	int64_t aspace = spa_get_asize(os->os_spa, space);
 
-	if (ds)
-		dsl_dir_willuse_space(ds->ds_dir, space, tx);
+	if (ds != NULL) {
+		dsl_dir_willuse_space(ds->ds_dir, aspace, tx);
+		dsl_pool_dirty_space(dmu_tx_pool(tx), space, tx);
+	}
 
-	dmu_tx_willuse_space(tx, space);
+	dmu_tx_willuse_space(tx, aspace);
 }
 
 /*
--- a/usr/src/uts/common/fs/zfs/dsl_dir.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/dsl_dir.c	Mon Aug 26 13:13:26 2013 -0800
@@ -584,7 +584,6 @@
 
 struct tempreserve {
 	list_node_t tr_node;
-	dsl_pool_t *tr_dp;
 	dsl_dir_t *tr_ds;
 	uint64_t tr_size;
 };
@@ -735,25 +734,24 @@
 		tr = kmem_zalloc(sizeof (struct tempreserve), KM_SLEEP);
 		tr->tr_size = lsize;
 		list_insert_tail(tr_list, tr);
-
-		err = dsl_pool_tempreserve_space(dd->dd_pool, asize, tx);
 	} else {
 		if (err == EAGAIN) {
+			/*
+			 * If arc_memory_throttle() detected that pageout
+			 * is running and we are low on memory, we delay new
+			 * non-pageout transactions to give pageout an
+			 * advantage.
+			 *
+			 * It is unfortunate to be delaying while the caller's
+			 * locks are held.
+			 */
 			txg_delay(dd->dd_pool, tx->tx_txg,
 			    MSEC2NSEC(10), MSEC2NSEC(10));
 			err = SET_ERROR(ERESTART);
 		}
-		dsl_pool_memory_pressure(dd->dd_pool);
 	}
 
 	if (err == 0) {
-		struct tempreserve *tr;
-
-		tr = kmem_zalloc(sizeof (struct tempreserve), KM_SLEEP);
-		tr->tr_dp = dd->dd_pool;
-		tr->tr_size = asize;
-		list_insert_tail(tr_list, tr);
-
 		err = dsl_dir_tempreserve_impl(dd, asize, fsize >= asize,
 		    FALSE, asize > usize, tr_list, tx, TRUE);
 	}
@@ -782,10 +780,8 @@
 	if (tr_cookie == NULL)
 		return;
 
-	while (tr = list_head(tr_list)) {
-		if (tr->tr_dp) {
-			dsl_pool_tempreserve_clear(tr->tr_dp, tr->tr_size, tx);
-		} else if (tr->tr_ds) {
+	while ((tr = list_head(tr_list)) != NULL) {
+		if (tr->tr_ds) {
 			mutex_enter(&tr->tr_ds->dd_lock);
 			ASSERT3U(tr->tr_ds->dd_tempreserved[txgidx], >=,
 			    tr->tr_size);
@@ -801,8 +797,14 @@
 	kmem_free(tr_list, sizeof (list_t));
 }
 
-static void
-dsl_dir_willuse_space_impl(dsl_dir_t *dd, int64_t space, dmu_tx_t *tx)
+/*
+ * This should be called from open context when we think we're going to write
+ * or free space, for example when dirtying data. Be conservative; it's okay
+ * to write less space or free more, but we don't want to write more or free
+ * less than the amount specified.
+ */
+void
+dsl_dir_willuse_space(dsl_dir_t *dd, int64_t space, dmu_tx_t *tx)
 {
 	int64_t parent_space;
 	uint64_t est_used;
@@ -820,19 +822,7 @@
 
 	/* XXX this is potentially expensive and unnecessary... */
 	if (parent_space && dd->dd_parent)
-		dsl_dir_willuse_space_impl(dd->dd_parent, parent_space, tx);
-}
-
-/*
- * Call in open context when we think we're going to write/free space,
- * eg. when dirtying data.  Be conservative (ie. OK to write less than
- * this or free more than this, but don't write more or free less).
- */
-void
-dsl_dir_willuse_space(dsl_dir_t *dd, int64_t space, dmu_tx_t *tx)
-{
-	dsl_pool_willuse_space(dd->dd_pool, space, tx);
-	dsl_dir_willuse_space_impl(dd, space, tx);
+		dsl_dir_willuse_space(dd->dd_parent, parent_space, tx);
 }
 
 /* call from syncing context when we actually write/free space for this dd */
--- a/usr/src/uts/common/fs/zfs/dsl_pool.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/dsl_pool.c	Mon Aug 26 13:13:26 2013 -0800
@@ -46,18 +46,90 @@
 #include <sys/zil_impl.h>
 #include <sys/dsl_userhold.h>
 
-int zfs_no_write_throttle = 0;
-int zfs_write_limit_shift = 3;			/* 1/8th of physical memory */
-int zfs_txg_synctime_ms = 1000;		/* target millisecs to sync a txg */
+/*
+ * ZFS Write Throttle
+ * ------------------
+ *
+ * ZFS must limit the rate of incoming writes to the rate at which it is able
+ * to sync data modifications to the backend storage. Throttling by too much
+ * creates an artificial limit; throttling by too little can only be sustained
+ * for short periods and would lead to highly lumpy performance. On a per-pool
+ * basis, ZFS tracks the amount of modified (dirty) data. As operations change
+ * data, the amount of dirty data increases; as ZFS syncs out data, the amount
+ * of dirty data decreases. When the amount of dirty data exceeds a
+ * predetermined threshold further modifications are blocked until the amount
+ * of dirty data decreases (as data is synced out).
+ *
+ * The limit on dirty data is tunable, and should be adjusted according to
+ * both the IO capacity and available memory of the system. The larger the
+ * window, the more ZFS is able to aggregate and amortize metadata (and data)
+ * changes. However, memory is a limited resource, and allowing for more dirty
+ * data comes at the cost of keeping other useful data in memory (for example
+ * ZFS data cached by the ARC).
+ *
+ * Implementation
+ *
+ * As buffers are modified dsl_pool_willuse_space() increments both the per-
+ * txg (dp_dirty_pertxg[]) and poolwide (dp_dirty_total) accounting of
+ * dirty space used; dsl_pool_dirty_space() decrements those values as data
+ * is synced out from dsl_pool_sync(). While only the poolwide value is
+ * relevant, the per-txg value is useful for debugging. The tunable
+ * zfs_dirty_data_max determines the dirty space limit. Once that value is
+ * exceeded, new writes are halted until space frees up.
+ *
+ * The zfs_dirty_data_sync tunable dictates the threshold at which we
+ * ensure that there is a txg syncing (see the comment in txg.c for a full
+ * description of transaction group stages).
+ *
+ * The IO scheduler uses both the dirty space limit and current amount of
+ * dirty data as inputs. Those values affect the number of concurrent IOs ZFS
+ * issues. See the comment in vdev_queue.c for details of the IO scheduler.
+ *
+ * The delay is also calculated based on the amount of dirty data.  See the
+ * comment above dmu_tx_delay() for details.
+ */
 
-uint64_t zfs_write_limit_min = 32 << 20;	/* min write limit is 32MB */
-uint64_t zfs_write_limit_max = 0;		/* max data payload per txg */
-uint64_t zfs_write_limit_inflated = 0;
-uint64_t zfs_write_limit_override = 0;
+/*
+ * zfs_dirty_data_max will be set to zfs_dirty_data_max_percent% of all memory,
+ * capped at zfs_dirty_data_max_max.  It can also be overridden in /etc/system.
+ */
+uint64_t zfs_dirty_data_max;
+uint64_t zfs_dirty_data_max_max = 4ULL * 1024 * 1024 * 1024;
+int zfs_dirty_data_max_percent = 10;
+
+/*
+ * If there is at least this much dirty data, push out a txg.
+ */
+uint64_t zfs_dirty_data_sync = 64 * 1024 * 1024;
+
+/*
+ * Once there is this amount of dirty data, the dmu_tx_delay() will kick in
+ * and delay each transaction.
+ * This value should be >= zfs_vdev_async_write_active_max_dirty_percent.
+ */
+int zfs_delay_min_dirty_percent = 60;
 
-kmutex_t zfs_write_limit_lock;
+/*
+ * This controls how quickly the delay approaches infinity.
+ * Larger values cause it to delay less for a given amount of dirty data.
+ * Therefore larger values will cause there to be more dirty data for a
+ * given throughput.
+ *
+ * For the smoothest delay, this value should be about 1 billion divided
+ * by the maximum number of operations per second.  This will smoothly
+ * handle between 10x and 1/10th this number.
+ *
+ * Note: zfs_delay_scale * zfs_dirty_data_max must be < 2^64, due to the
+ * multiply in dmu_tx_delay().
+ */
+uint64_t zfs_delay_scale = 1000 * 1000 * 1000 / 2000;
 
-static pgcnt_t old_physmem = 0;
+
+/*
+ * XXX someday maybe turn these into #defines, and you have to tune it on a
+ * per-pool basis using zfs.conf.
+ */
+
 
 hrtime_t zfs_throttle_delay = MSEC2NSEC(10);
 hrtime_t zfs_throttle_resolution = MSEC2NSEC(10);
@@ -87,7 +159,6 @@
 	dp->dp_spa = spa;
 	dp->dp_meta_rootbp = *bp;
 	rrw_init(&dp->dp_config_rwlock, B_TRUE);
-	dp->dp_write_limit = zfs_write_limit_min;
 	txg_init(dp, txg);
 
 	txg_list_create(&dp->dp_dirty_datasets,
@@ -100,6 +171,7 @@
 	    offsetof(dsl_sync_task_t, dst_node));
 
 	mutex_init(&dp->dp_lock, NULL, MUTEX_DEFAULT, NULL);
+	cv_init(&dp->dp_spaceavail_cv, NULL, CV_DEFAULT, NULL);
 
 	dp->dp_vnrele_taskq = taskq_create("zfs_vn_rele_taskq", 1, minclsyspri,
 	    1, 4, 0);
@@ -214,9 +286,9 @@
 void
 dsl_pool_close(dsl_pool_t *dp)
 {
-	/* drop our references from dsl_pool_open() */
-
 	/*
+	 * Drop our references from dsl_pool_open().
+	 *
 	 * Since we held the origin_snap from "syncing" context (which
 	 * includes pool-opening context), it actually only got a "ref"
 	 * and not a hold, so just drop that here.
@@ -346,6 +418,34 @@
 	return (0);
 }
 
+static void
+dsl_pool_sync_mos(dsl_pool_t *dp, dmu_tx_t *tx)
+{
+	zio_t *zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
+	dmu_objset_sync(dp->dp_meta_objset, zio, tx);
+	VERIFY0(zio_wait(zio));
+	dprintf_bp(&dp->dp_meta_rootbp, "meta objset rootbp is %s", "");
+	spa_set_rootblkptr(dp->dp_spa, &dp->dp_meta_rootbp);
+}
+
+static void
+dsl_pool_dirty_delta(dsl_pool_t *dp, int64_t delta)
+{
+	ASSERT(MUTEX_HELD(&dp->dp_lock));
+
+	if (delta < 0)
+		ASSERT3U(-delta, <=, dp->dp_dirty_total);
+
+	dp->dp_dirty_total += delta;
+
+	/*
+	 * Note: we signal even when increasing dp_dirty_total.
+	 * This ensures forward progress -- each thread wakes the next waiter.
+	 */
+	if (dp->dp_dirty_total <= zfs_dirty_data_max)
+		cv_signal(&dp->dp_spaceavail_cv);
+}
+
 void
 dsl_pool_sync(dsl_pool_t *dp, uint64_t txg)
 {
@@ -354,29 +454,18 @@
 	dsl_dir_t *dd;
 	dsl_dataset_t *ds;
 	objset_t *mos = dp->dp_meta_objset;
-	hrtime_t start, write_time;
-	uint64_t data_written;
-	int err;
 	list_t synced_datasets;
 
 	list_create(&synced_datasets, sizeof (dsl_dataset_t),
 	    offsetof(dsl_dataset_t, ds_synced_link));
 
-	/*
-	 * We need to copy dp_space_towrite() before doing
-	 * dsl_sync_task_sync(), because
-	 * dsl_dataset_snapshot_reserve_space() will increase
-	 * dp_space_towrite but not actually write anything.
-	 */
-	data_written = dp->dp_space_towrite[txg & TXG_MASK];
-
 	tx = dmu_tx_create_assigned(dp, txg);
 
-	dp->dp_read_overhead = 0;
-	start = gethrtime();
-
+	/*
+	 * Write out all dirty blocks of dirty datasets.
+	 */
 	zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
-	while (ds = txg_list_remove(&dp->dp_dirty_datasets, txg)) {
+	while ((ds = txg_list_remove(&dp->dp_dirty_datasets, txg)) != NULL) {
 		/*
 		 * We must not sync any non-MOS datasets twice, because
 		 * we may have taken a snapshot of them.  However, we
@@ -386,20 +475,25 @@
 		list_insert_tail(&synced_datasets, ds);
 		dsl_dataset_sync(ds, zio, tx);
 	}
-	DTRACE_PROBE(pool_sync__1setup);
-	err = zio_wait(zio);
+	VERIFY0(zio_wait(zio));
 
-	write_time = gethrtime() - start;
-	ASSERT(err == 0);
-	DTRACE_PROBE(pool_sync__2rootzio);
+	/*
+	 * We have written all of the accounted dirty data, so our
+	 * dp_space_towrite should now be zero.  However, some seldom-used
+	 * code paths do not adhere to this (e.g. dbuf_undirty(), also
+	 * rounding error in dbuf_write_physdone).
+	 * Shore up the accounting of any dirtied space now.
+	 */
+	dsl_pool_undirty_space(dp, dp->dp_dirty_pertxg[txg & TXG_MASK], txg);
 
 	/*
 	 * After the data blocks have been written (ensured by the zio_wait()
 	 * above), update the user/group space accounting.
 	 */
-	for (ds = list_head(&synced_datasets); ds;
-	    ds = list_next(&synced_datasets, ds))
+	for (ds = list_head(&synced_datasets); ds != NULL;
+	    ds = list_next(&synced_datasets, ds)) {
 		dmu_objset_do_userquota_updates(ds->ds_objset, tx);
+	}
 
 	/*
 	 * Sync the datasets again to push out the changes due to
@@ -409,12 +503,12 @@
 	 * about which blocks are part of the snapshot).
 	 */
 	zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
-	while (ds = txg_list_remove(&dp->dp_dirty_datasets, txg)) {
+	while ((ds = txg_list_remove(&dp->dp_dirty_datasets, txg)) != NULL) {
 		ASSERT(list_link_active(&ds->ds_synced_link));
 		dmu_buf_rele(ds->ds_dbuf, ds);
 		dsl_dataset_sync(ds, zio, tx);
 	}
-	err = zio_wait(zio);
+	VERIFY0(zio_wait(zio));
 
 	/*
 	 * Now that the datasets have been completely synced, we can
@@ -423,18 +517,16 @@
 	 *  - move dead blocks from the pending deadlist to the on-disk deadlist
 	 *  - release hold from dsl_dataset_dirty()
 	 */
-	while (ds = list_remove_head(&synced_datasets)) {
+	while ((ds = list_remove_head(&synced_datasets)) != NULL) {
 		objset_t *os = ds->ds_objset;
 		bplist_iterate(&ds->ds_pending_deadlist,
 		    deadlist_enqueue_cb, &ds->ds_deadlist, tx);
 		ASSERT(!dmu_objset_is_dirty(os, txg));
 		dmu_buf_rele(ds->ds_dbuf, ds);
 	}
-
-	start = gethrtime();
-	while (dd = txg_list_remove(&dp->dp_dirty_dirs, txg))
+	while ((dd = txg_list_remove(&dp->dp_dirty_dirs, txg)) != NULL) {
 		dsl_dir_sync(dd, tx);
-	write_time += gethrtime() - start;
+	}
 
 	/*
 	 * The MOS's space is accounted for in the pool/$MOS
@@ -452,20 +544,10 @@
 		dp->dp_mos_uncompressed_delta = 0;
 	}
 
-	start = gethrtime();
 	if (list_head(&mos->os_dirty_dnodes[txg & TXG_MASK]) != NULL ||
 	    list_head(&mos->os_free_dnodes[txg & TXG_MASK]) != NULL) {
-		zio = zio_root(dp->dp_spa, NULL, NULL, ZIO_FLAG_MUSTSUCCEED);
-		dmu_objset_sync(mos, zio, tx);
-		err = zio_wait(zio);
-		ASSERT(err == 0);
-		dprintf_bp(&dp->dp_meta_rootbp, "meta objset rootbp is %s", "");
-		spa_set_rootblkptr(dp->dp_spa, &dp->dp_meta_rootbp);
+		dsl_pool_sync_mos(dp, tx);
 	}
-	write_time += gethrtime() - start;
-	DTRACE_PROBE2(pool_sync__4io, hrtime_t, write_time,
-	    hrtime_t, dp->dp_read_overhead);
-	write_time -= dp->dp_read_overhead;
 
 	/*
 	 * If we modify a dataset in the same txg that we want to destroy it,
@@ -476,72 +558,29 @@
 	 * The MOS data dirtied by the sync_tasks will be synced on the next
 	 * pass.
 	 */
-	DTRACE_PROBE(pool_sync__3task);
 	if (!txg_list_empty(&dp->dp_sync_tasks, txg)) {
 		dsl_sync_task_t *dst;
 		/*
 		 * No more sync tasks should have been added while we
 		 * were syncing.
 		 */
-		ASSERT(spa_sync_pass(dp->dp_spa) == 1);
-		while (dst = txg_list_remove(&dp->dp_sync_tasks, txg))
+		ASSERT3U(spa_sync_pass(dp->dp_spa), ==, 1);
+		while ((dst = txg_list_remove(&dp->dp_sync_tasks, txg)) != NULL)
 			dsl_sync_task_sync(dst, tx);
 	}
 
 	dmu_tx_commit(tx);
 
-	dp->dp_space_towrite[txg & TXG_MASK] = 0;
-	ASSERT(dp->dp_tempreserved[txg & TXG_MASK] == 0);
-
-	/*
-	 * If the write limit max has not been explicitly set, set it
-	 * to a fraction of available physical memory (default 1/8th).
-	 * Note that we must inflate the limit because the spa
-	 * inflates write sizes to account for data replication.
-	 * Check this each sync phase to catch changing memory size.
-	 */
-	if (physmem != old_physmem && zfs_write_limit_shift) {
-		mutex_enter(&zfs_write_limit_lock);
-		old_physmem = physmem;
-		zfs_write_limit_max = ptob(physmem) >> zfs_write_limit_shift;
-		zfs_write_limit_inflated = MAX(zfs_write_limit_min,
-		    spa_get_asize(dp->dp_spa, zfs_write_limit_max));
-		mutex_exit(&zfs_write_limit_lock);
-	}
-
-	/*
-	 * Attempt to keep the sync time consistent by adjusting the
-	 * amount of write traffic allowed into each transaction group.
-	 * Weight the throughput calculation towards the current value:
-	 * 	thru = 3/4 old_thru + 1/4 new_thru
-	 *
-	 * Note: write_time is in nanosecs while dp_throughput is expressed in
-	 * bytes per millisecond.
-	 */
-	ASSERT(zfs_write_limit_min > 0);
-	if (data_written > zfs_write_limit_min / 8 &&
-	    write_time > MSEC2NSEC(1)) {
-		uint64_t throughput = data_written / NSEC2MSEC(write_time);
-
-		if (dp->dp_throughput)
-			dp->dp_throughput = throughput / 4 +
-			    3 * dp->dp_throughput / 4;
-		else
-			dp->dp_throughput = throughput;
-		dp->dp_write_limit = MIN(zfs_write_limit_inflated,
-		    MAX(zfs_write_limit_min,
-		    dp->dp_throughput * zfs_txg_synctime_ms));
-	}
+	DTRACE_PROBE2(dsl_pool_sync__done, dsl_pool_t *dp, dp, uint64_t, txg);
 }
 
 void
 dsl_pool_sync_done(dsl_pool_t *dp, uint64_t txg)
 {
 	zilog_t *zilog;
-	dsl_dataset_t *ds;
 
 	while (zilog = txg_list_remove(&dp->dp_dirty_zilogs, txg)) {
-		ds = dmu_objset_ds(zilog->zl_os);
+		dsl_dataset_t *ds = dmu_objset_ds(zilog->zl_os);
 		zil_clean(zilog, txg);
 		ASSERT(!dmu_objset_is_dirty(zilog->zl_os, txg));
 		dmu_buf_rele(ds->ds_dbuf, zilog);
@@ -583,82 +622,48 @@
 	return (space - resv);
 }
 
-int
-dsl_pool_tempreserve_space(dsl_pool_t *dp, uint64_t space, dmu_tx_t *tx)
+boolean_t
+dsl_pool_need_dirty_delay(dsl_pool_t *dp)
 {
-	uint64_t reserved = 0;
-	uint64_t write_limit = (zfs_write_limit_override ?
-	    zfs_write_limit_override : dp->dp_write_limit);
-
-	if (zfs_no_write_throttle) {
-		atomic_add_64(&dp->dp_tempreserved[tx->tx_txg & TXG_MASK],
-		    space);
-		return (0);
-	}
+	uint64_t delay_min_bytes =
+	    zfs_dirty_data_max * zfs_delay_min_dirty_percent / 100;
+	boolean_t rv;
 
-	/*
-	 * Check to see if we have exceeded the maximum allowed IO for
-	 * this transaction group.  We can do this without locks since
-	 * a little slop here is ok.  Note that we do the reserved check
-	 * with only half the requested reserve: this is because the
-	 * reserve requests are worst-case, and we really don't want to
-	 * throttle based off of worst-case estimates.
-	 */
-	if (write_limit > 0) {
-		reserved = dp->dp_space_towrite[tx->tx_txg & TXG_MASK]
-		    + dp->dp_tempreserved[tx->tx_txg & TXG_MASK] / 2;
-
-		if (reserved && reserved > write_limit)
-			return (SET_ERROR(ERESTART));
-	}
-
-	atomic_add_64(&dp->dp_tempreserved[tx->tx_txg & TXG_MASK], space);
-
-	/*
-	 * If this transaction group is over 7/8ths capacity, delay
-	 * the caller 1 clock tick.  This will slow down the "fill"
-	 * rate until the sync process can catch up with us.
-	 */
-	if (reserved && reserved > (write_limit - (write_limit >> 3))) {
-		txg_delay(dp, tx->tx_txg, zfs_throttle_delay,
-		    zfs_throttle_resolution);
-	}
-
-	return (0);
+	mutex_enter(&dp->dp_lock);
+	if (dp->dp_dirty_total > zfs_dirty_data_sync)
+		txg_kick(dp);
+	rv = (dp->dp_dirty_total > delay_min_bytes);
+	mutex_exit(&dp->dp_lock);
+	return (rv);
 }
 
 void
-dsl_pool_tempreserve_clear(dsl_pool_t *dp, int64_t space, dmu_tx_t *tx)
+dsl_pool_dirty_space(dsl_pool_t *dp, int64_t space, dmu_tx_t *tx)
 {
-	ASSERT(dp->dp_tempreserved[tx->tx_txg & TXG_MASK] >= space);
-	atomic_add_64(&dp->dp_tempreserved[tx->tx_txg & TXG_MASK], -space);
+	if (space > 0) {
+		mutex_enter(&dp->dp_lock);
+		dp->dp_dirty_pertxg[tx->tx_txg & TXG_MASK] += space;
+		dsl_pool_dirty_delta(dp, space);
+		mutex_exit(&dp->dp_lock);
+	}
 }
 
 void
-dsl_pool_memory_pressure(dsl_pool_t *dp)
+dsl_pool_undirty_space(dsl_pool_t *dp, int64_t space, uint64_t txg)
 {
-	uint64_t space_inuse = 0;
-	int i;
-
-	if (dp->dp_write_limit == zfs_write_limit_min)
+	ASSERT3S(space, >=, 0);
+	if (space == 0)
 		return;
-
-	for (i = 0; i < TXG_SIZE; i++) {
-		space_inuse += dp->dp_space_towrite[i];
-		space_inuse += dp->dp_tempreserved[i];
+	mutex_enter(&dp->dp_lock);
+	if (dp->dp_dirty_pertxg[txg & TXG_MASK] < space) {
+		/* XXX writing something we didn't dirty? */
+		space = dp->dp_dirty_pertxg[txg & TXG_MASK];
 	}
-	dp->dp_write_limit = MAX(zfs_write_limit_min,
-	    MIN(dp->dp_write_limit, space_inuse / 4));
-}
-
-void
-dsl_pool_willuse_space(dsl_pool_t *dp, int64_t space, dmu_tx_t *tx)
-{
-	if (space > 0) {
-		mutex_enter(&dp->dp_lock);
-		dp->dp_space_towrite[tx->tx_txg & TXG_MASK] += space;
-		mutex_exit(&dp->dp_lock);
-	}
+	ASSERT3U(dp->dp_dirty_pertxg[txg & TXG_MASK], >=, space);
+	dp->dp_dirty_pertxg[txg & TXG_MASK] -= space;
+	ASSERT3U(dp->dp_dirty_total, >=, space);
+	dsl_pool_dirty_delta(dp, -space);
+	mutex_exit(&dp->dp_lock);
 }
 
 /* ARGSUSED */
--- a/usr/src/uts/common/fs/zfs/dsl_scan.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/dsl_scan.c	Mon Aug 26 13:13:26 2013 -0800
@@ -1627,7 +1627,6 @@
 	uint64_t phys_birth = BP_PHYSICAL_BIRTH(bp);
 	boolean_t needs_io;
 	int zio_flags = ZIO_FLAG_SCAN_THREAD | ZIO_FLAG_RAW | ZIO_FLAG_CANFAIL;
-	int zio_priority;
 	int scan_delay = 0;
 
 	if (phys_birth <= scn->scn_phys.scn_min_txg ||
@@ -1639,13 +1638,11 @@
 	ASSERT(DSL_SCAN_IS_SCRUB_RESILVER(scn));
 	if (scn->scn_phys.scn_func == POOL_SCAN_SCRUB) {
 		zio_flags |= ZIO_FLAG_SCRUB;
-		zio_priority = ZIO_PRIORITY_SCRUB;
 		needs_io = B_TRUE;
 		scan_delay = zfs_scrub_delay;
 	} else {
 		ASSERT3U(scn->scn_phys.scn_func, ==, POOL_SCAN_RESILVER);
 		zio_flags |= ZIO_FLAG_RESILVER;
-		zio_priority = ZIO_PRIORITY_RESILVER;
 		needs_io = B_FALSE;
 		scan_delay = zfs_resilver_delay;
 	}
@@ -1703,7 +1700,7 @@
 			delay(scan_delay);
 
 		zio_nowait(zio_read(NULL, spa, bp, data, size,
-		    dsl_scan_scrub_done, NULL, zio_priority,
+		    dsl_scan_scrub_done, NULL, ZIO_PRIORITY_SCRUB,
 		    zio_flags, zb));
 	}
 
--- a/usr/src/uts/common/fs/zfs/spa.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/spa.c	Mon Aug 26 13:13:26 2013 -0800
@@ -87,14 +87,12 @@
 
 typedef enum zti_modes {
 	ZTI_MODE_FIXED,			/* value is # of threads (min 1) */
-	ZTI_MODE_ONLINE_PERCENT,	/* value is % of online CPUs */
 	ZTI_MODE_BATCH,			/* cpu-intensive; value is ignored */
 	ZTI_MODE_NULL,			/* don't create a taskq */
 	ZTI_NMODES
 } zti_modes_t;
 
 #define	ZTI_P(n, q)	{ ZTI_MODE_FIXED, (n), (q) }
-#define	ZTI_PCT(n)	{ ZTI_MODE_ONLINE_PERCENT, (n), 1 }
 #define	ZTI_BATCH	{ ZTI_MODE_BATCH, 0, 1 }
 #define	ZTI_NULL	{ ZTI_MODE_NULL, 0, 0 }
 
@@ -146,7 +144,7 @@
     char **ereport);
 static void spa_vdev_resilver_done(spa_t *spa);
 
-uint_t		zio_taskq_batch_pct = 100;	/* 1 thread per cpu in pset */
+uint_t		zio_taskq_batch_pct = 75;	/* 1 thread per cpu in pset */
 id_t		zio_taskq_psrset_bind = PS_NONE;
 boolean_t	zio_taskq_sysdc = B_TRUE;	/* use SDC scheduling class */
 uint_t		zio_taskq_basedc = 80;		/* base duty cycle */
@@ -842,32 +840,28 @@
 	tqs->stqs_count = count;
 	tqs->stqs_taskq = kmem_alloc(count * sizeof (taskq_t *), KM_SLEEP);
 
+	switch (mode) {
+	case ZTI_MODE_FIXED:
+		ASSERT3U(value, >=, 1);
+		value = MAX(value, 1);
+		break;
+
+	case ZTI_MODE_BATCH:
+		batch = B_TRUE;
+		flags |= TASKQ_THREADS_CPU_PCT;
+		value = zio_taskq_batch_pct;
+		break;
+
+	default:
+		panic("unrecognized mode for %s_%s taskq (%u:%u) in "
+		    "spa_activate()",
+		    zio_type_name[t], zio_taskq_types[q], mode, value);
+		break;
+	}
+
 	for (uint_t i = 0; i < count; i++) {
 		taskq_t *tq;
 
-		switch (mode) {
-		case ZTI_MODE_FIXED:
-			ASSERT3U(value, >=, 1);
-			value = MAX(value, 1);
-			break;
-
-		case ZTI_MODE_BATCH:
-			batch = B_TRUE;
-			flags |= TASKQ_THREADS_CPU_PCT;
-			value = zio_taskq_batch_pct;
-			break;
-
-		case ZTI_MODE_ONLINE_PERCENT:
-			flags |= TASKQ_THREADS_CPU_PCT;
-			break;
-
-		default:
-			panic("unrecognized mode for %s_%s taskq (%u:%u) in "
-			    "spa_activate()",
-			    zio_type_name[t], zio_taskq_types[q], mode, value);
-			break;
-		}
-
 		if (count > 1) {
 			(void) snprintf(name, sizeof (name), "%s_%s_%u",
 			    zio_type_name[t], zio_taskq_types[q], i);
@@ -883,7 +877,16 @@
 			tq = taskq_create_sysdc(name, value, 50, INT_MAX,
 			    spa->spa_proc, zio_taskq_basedc, flags);
 		} else {
-			tq = taskq_create_proc(name, value, maxclsyspri, 50,
+			pri_t pri = maxclsyspri;
+			/*
+			 * The write issue taskq can be extremely CPU
+			 * intensive.  Run it at slightly lower priority
+			 * than the other taskqs.
+			 */
+			if (t == ZIO_TYPE_WRITE && q == ZIO_TASKQ_ISSUE)
+				pri--;
+
+			tq = taskq_create_proc(name, value, pri, 50,
 			    INT_MAX, spa->spa_proc, flags);
 		}
 
@@ -5737,6 +5740,32 @@
 	return (0);
 }
 
+/*
+ * Note: this simple function is not inlined to make it easier to dtrace the
+ * amount of time spent syncing frees.
+ */
+static void
+spa_sync_frees(spa_t *spa, bplist_t *bpl, dmu_tx_t *tx)
+{
+	zio_t *zio = zio_root(spa, NULL, NULL, 0);
+	bplist_iterate(bpl, spa_free_sync_cb, zio, tx);
+	VERIFY(zio_wait(zio) == 0);
+}
+
+/*
+ * Note: this simple function is not inlined to make it easier to dtrace the
+ * amount of time spent syncing deferred frees.
+ */
+static void
+spa_sync_deferred_frees(spa_t *spa, dmu_tx_t *tx)
+{
+	zio_t *zio = zio_root(spa, NULL, NULL, 0);
+	VERIFY3U(bpobj_iterate(&spa->spa_deferred_bpobj,
+	    spa_free_sync_cb, zio, tx), ==, 0);
+	VERIFY0(zio_wait(zio));
+}
+
+
 static void
 spa_sync_nvlist(spa_t *spa, uint64_t obj, nvlist_t *nv, dmu_tx_t *tx)
 {
@@ -6063,7 +6092,6 @@
 {
 	dsl_pool_t *dp = spa->spa_dsl_pool;
 	objset_t *mos = spa->spa_meta_objset;
-	bpobj_t *defer_bpo = &spa->spa_deferred_bpobj;
 	bplist_t *free_bpl = &spa->spa_free_bplist[txg & TXG_MASK];
 	vdev_t *rvd = spa->spa_root_vdev;
 	vdev_t *vd;
@@ -6143,10 +6171,7 @@
 	    !txg_list_empty(&dp->dp_sync_tasks, txg) ||
 	    ((dsl_scan_active(dp->dp_scan) ||
 	    txg_sync_waiting(dp)) && !spa_shutting_down(spa))) {
-		zio_t *zio = zio_root(spa, NULL, NULL, 0);
-		VERIFY3U(bpobj_iterate(defer_bpo,
-		    spa_free_sync_cb, zio, tx), ==, 0);
-		VERIFY0(zio_wait(zio));
+		spa_sync_deferred_frees(spa, tx);
 	}
 
 	/*
@@ -6164,13 +6189,10 @@
 		dsl_pool_sync(dp, txg);
 
 		if (pass < zfs_sync_pass_deferred_free) {
-			zio_t *zio = zio_root(spa, NULL, NULL, 0);
-			bplist_iterate(free_bpl, spa_free_sync_cb,
-			    zio, tx);
-			VERIFY(zio_wait(zio) == 0);
+			spa_sync_frees(spa, free_bpl, tx);
 		} else {
 			bplist_iterate(free_bpl, bpobj_enqueue_cb,
-			    defer_bpo, tx);
+			    &spa->spa_deferred_bpobj, tx);
 		}
 
 		ddt_sync(spa, txg);
--- a/usr/src/uts/common/fs/zfs/spa_misc.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/spa_misc.c	Mon Aug 26 13:13:26 2013 -0800
@@ -250,18 +250,21 @@
  */
 int zfs_recover = 0;
 
-extern int zfs_txg_synctime_ms;
+/*
+ * Expiration time in milliseconds. This value has two meanings. First it is
+ * used to determine when the spa_deadman() logic should fire. By default the
+ * spa_deadman() will fire if spa_sync() has not completed in 1000 seconds.
+ * Secondly, the value determines if an I/O is considered "hung". Any I/O that
+ * has not completed in zfs_deadman_synctime_ms is considered "hung" resulting
+ * in a system panic.
+ */
+uint64_t zfs_deadman_synctime_ms = 1000000ULL;
 
 /*
- * Expiration time in units of zfs_txg_synctime_ms. This value has two
- * meanings. First it is used to determine when the spa_deadman logic
- * should fire. By default the spa_deadman will fire if spa_sync has
- * not completed in 1000 * zfs_txg_synctime_ms (i.e. 1000 seconds).
- * Secondly, the value determines if an I/O is considered "hung".
- * Any I/O that has not completed in zfs_deadman_synctime is considered
- * "hung" resulting in a system panic.
+ * Check time in milliseconds. This defines the frequency at which we check
+ * for hung I/O.
  */
-uint64_t zfs_deadman_synctime = 1000ULL;
+uint64_t zfs_deadman_checktime_ms = 5000ULL;
 
 /*
  * Override the zfs deadman behavior via /etc/system. By default the
@@ -269,6 +272,16 @@
  */
 int zfs_deadman_enabled = -1;
 
+/*
+ * The worst case is single-sector max-parity RAID-Z blocks, in which
+ * case the space requirement is exactly (VDEV_RAIDZ_MAXPARITY + 1)
+ * times the size; so just assume that.  Add to this the fact that
+ * we can have up to 3 DVAs per bp, and one more factor of 2 because
+ * the block may be dittoed with up to 3 DVAs by ddt_sync().  All together,
+ * the worst case is:
+ *     (VDEV_RAIDZ_MAXPARITY + 1) * SPA_DVAS_PER_BP * 2 == 24
+ */
+int spa_asize_inflation = 24;
 
 /*
  * ==========================================================================
@@ -499,16 +512,15 @@
 	hdlr.cyh_arg = spa;
 	hdlr.cyh_level = CY_LOW_LEVEL;
 
-	spa->spa_deadman_synctime = MSEC2NSEC(zfs_deadman_synctime *
-	    zfs_txg_synctime_ms);
+	spa->spa_deadman_synctime = MSEC2NSEC(zfs_deadman_synctime_ms);
 
 	/*
 	 * This determines how often we need to check for hung I/Os after
 	 * the cyclic has already fired. Since checking for hung I/Os is
 	 * an expensive operation we don't want to check too frequently.
-	 * Instead wait for 5 synctimes before checking again.
+	 * Instead wait for 5 seconds before checking again.
 	 */
-	when.cyt_interval = MSEC2NSEC(5 * zfs_txg_synctime_ms);
+	when.cyt_interval = MSEC2NSEC(zfs_deadman_checktime_ms);
 	when.cyt_when = CY_INFINITY;
 	mutex_enter(&cpu_lock);
 	spa->spa_deadman_cycid = cyclic_add(&hdlr, &when);
@@ -1499,14 +1511,7 @@
 uint64_t
 spa_get_asize(spa_t *spa, uint64_t lsize)
 {
-	/*
-	 * The worst case is single-sector max-parity RAID-Z blocks, in which
-	 * case the space requirement is exactly (VDEV_RAIDZ_MAXPARITY + 1)
-	 * times the size; so just assume that.  Add to this the fact that
-	 * we can have up to 3 DVAs per bp, and one more factor of 2 because
-	 * the block may be dittoed with up to 3 DVAs by ddt_sync().
-	 */
-	return (lsize * (VDEV_RAIDZ_MAXPARITY + 1) * SPA_DVAS_PER_BP * 2);
+	return (lsize * spa_asize_inflation);
 }
 
 uint64_t
--- a/usr/src/uts/common/fs/zfs/sys/arc.h	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/sys/arc.h	Mon Aug 26 13:13:26 2013 -0800
@@ -104,12 +104,13 @@
 #endif
 
 int arc_read(zio_t *pio, spa_t *spa, const blkptr_t *bp,
-    arc_done_func_t *done, void *private, int priority, int flags,
+    arc_done_func_t *done, void *private, zio_priority_t priority, int flags,
     uint32_t *arc_flags, const zbookmark_t *zb);
 zio_t *arc_write(zio_t *pio, spa_t *spa, uint64_t txg,
     blkptr_t *bp, arc_buf_t *buf, boolean_t l2arc, boolean_t l2arc_compress,
-    const zio_prop_t *zp, arc_done_func_t *ready, arc_done_func_t *done,
-    void *private, int priority, int zio_flags, const zbookmark_t *zb);
+    const zio_prop_t *zp, arc_done_func_t *ready, arc_done_func_t *physdone,
+    arc_done_func_t *done, void *private, zio_priority_t priority,
+    int zio_flags, const zbookmark_t *zb);
 void arc_freed(spa_t *spa, const blkptr_t *bp);
 
 void arc_set_callback(arc_buf_t *buf, arc_evict_func_t *func, void *private);
--- a/usr/src/uts/common/fs/zfs/sys/dbuf.h	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/sys/dbuf.h	Mon Aug 26 13:13:26 2013 -0800
@@ -20,7 +20,7 @@
  */
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
- * Copyright (c) 2012 by Delphix. All rights reserved.
+ * Copyright (c) 2013 by Delphix. All rights reserved.
  * Copyright (c) 2013 by Saso Kiselkov. All rights reserved.
  */
 
@@ -112,6 +112,9 @@
 	/* pointer to parent dirty record */
 	struct dbuf_dirty_record *dr_parent;
 
+	/* How much space was changed to dsl_pool_dirty_space() for this? */
+	unsigned int dr_accounted;
+
 	union dirty_types {
 		struct dirty_indirect {
 
@@ -254,7 +257,7 @@
 int dbuf_hold_impl(struct dnode *dn, uint8_t level, uint64_t blkid, int create,
     void *tag, dmu_buf_impl_t **dbp);
 
-void dbuf_prefetch(struct dnode *dn, uint64_t blkid);
+void dbuf_prefetch(struct dnode *dn, uint64_t blkid, zio_priority_t prio);
 
 void dbuf_add_ref(dmu_buf_impl_t *db, void *tag);
 uint64_t dbuf_refcount(dmu_buf_impl_t *db);
--- a/usr/src/uts/common/fs/zfs/sys/dmu.h	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/sys/dmu.h	Mon Aug 26 13:13:26 2013 -0800
@@ -220,6 +220,7 @@
 typedef enum txg_how {
 	TXG_WAIT = 1,
 	TXG_NOWAIT,
+	TXG_WAITED,
 } txg_how_t;
 
 void byteswap_uint64_array(void *buf, size_t size);
--- a/usr/src/uts/common/fs/zfs/sys/dmu_tx.h	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/sys/dmu_tx.h	Mon Aug 26 13:13:26 2013 -0800
@@ -23,7 +23,7 @@
  * Use is subject to license terms.
  */
 /*
- * Copyright (c) 2012 by Delphix. All rights reserved.
+ * Copyright (c) 2013 by Delphix. All rights reserved.
  */
 
 #ifndef	_SYS_DMU_TX_H
@@ -60,8 +60,22 @@
 	txg_handle_t tx_txgh;
 	void *tx_tempreserve_cookie;
 	struct dmu_tx_hold *tx_needassign_txh;
-	list_t tx_callbacks; /* list of dmu_tx_callback_t on this dmu_tx */
-	uint8_t tx_anyobj;
+
+	/* list of dmu_tx_callback_t on this dmu_tx */
+	list_t tx_callbacks;
+
+	/* placeholder for syncing context, doesn't need specific holds */
+	boolean_t tx_anyobj;
+
+	/* has this transaction already been delayed? */
+	boolean_t tx_waited;
+
+	/* time this transaction was created */
+	hrtime_t tx_start;
+
+	/* need to wait for sufficient dirty space */
+	boolean_t tx_wait_dirty;
+
 	int tx_err;
 #ifdef ZFS_DEBUG
 	uint64_t tx_space_towrite;
--- a/usr/src/uts/common/fs/zfs/sys/dsl_dir.h	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/sys/dsl_dir.h	Mon Aug 26 13:13:26 2013 -0800
@@ -20,7 +20,7 @@
  */
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
- * Copyright (c) 2012 by Delphix. All rights reserved.
+ * Copyright (c) 2013 by Delphix. All rights reserved.
  */
 
 #ifndef	_SYS_DSL_DIR_H
--- a/usr/src/uts/common/fs/zfs/sys/dsl_pool.h	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/sys/dsl_pool.h	Mon Aug 26 13:13:26 2013 -0800
@@ -20,7 +20,7 @@
  */
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
- * Copyright (c) 2012 by Delphix. All rights reserved.
+ * Copyright (c) 2013 by Delphix. All rights reserved.
  */
 
 #ifndef	_SYS_DSL_POOL_H
@@ -49,6 +49,13 @@
 struct dmu_tx;
 struct dsl_scan;
 
+extern uint64_t zfs_dirty_data_max;
+extern uint64_t zfs_dirty_data_max_max;
+extern uint64_t zfs_dirty_data_sync;
+extern int zfs_dirty_data_max_percent;
+extern int zfs_delay_min_dirty_percent;
+extern uint64_t zfs_delay_scale;
+
 /* These macros are for indexing into the zfs_all_blkstats_t. */
 #define	DMU_OT_DEFERRED	DMU_OT_NONE
 #define	DMU_OT_OTHER	DMU_OT_NUMTYPES /* place holder for DMU_OT() types */
@@ -83,9 +90,6 @@
 
 	/* No lock needed - sync context only */
 	blkptr_t dp_meta_rootbp;
-	hrtime_t dp_read_overhead;
-	uint64_t dp_throughput; /* bytes per millisec */
-	uint64_t dp_write_limit;
 	uint64_t dp_tmp_userrefs_obj;
 	bpobj_t dp_free_bpobj;
 	uint64_t dp_bptree_obj;
@@ -95,12 +99,19 @@
 
 	/* Uses dp_lock */
 	kmutex_t dp_lock;
-	uint64_t dp_space_towrite[TXG_SIZE];
-	uint64_t dp_tempreserved[TXG_SIZE];
+	kcondvar_t dp_spaceavail_cv;
+	uint64_t dp_dirty_pertxg[TXG_SIZE];
+	uint64_t dp_dirty_total;
 	uint64_t dp_mos_used_delta;
 	uint64_t dp_mos_compressed_delta;
 	uint64_t dp_mos_uncompressed_delta;
 
+	/*
+	 * Time of most recently scheduled (furthest in the future)
+	 * wakeup for delayed transactions.
+	 */
+	hrtime_t dp_last_wakeup;
+
 	/* Has its own locking */
 	tx_state_t dp_tx;
 	txg_list_t dp_dirty_datasets;
@@ -129,10 +140,8 @@
 int dsl_pool_sync_context(dsl_pool_t *dp);
 uint64_t dsl_pool_adjustedsize(dsl_pool_t *dp, boolean_t netfree);
 uint64_t dsl_pool_adjustedfree(dsl_pool_t *dp, boolean_t netfree);
-int dsl_pool_tempreserve_space(dsl_pool_t *dp, uint64_t space, dmu_tx_t *tx);
-void dsl_pool_tempreserve_clear(dsl_pool_t *dp, int64_t space, dmu_tx_t *tx);
-void dsl_pool_memory_pressure(dsl_pool_t *dp);
-void dsl_pool_willuse_space(dsl_pool_t *dp, int64_t space, dmu_tx_t *tx);
+void dsl_pool_dirty_space(dsl_pool_t *dp, int64_t space, dmu_tx_t *tx);
+void dsl_pool_undirty_space(dsl_pool_t *dp, int64_t space, uint64_t txg);
 void dsl_free(dsl_pool_t *dp, uint64_t txg, const blkptr_t *bpp);
 void dsl_free_sync(zio_t *pio, dsl_pool_t *dp, uint64_t txg,
     const blkptr_t *bpp);
@@ -144,6 +153,7 @@
 void dsl_pool_config_enter(dsl_pool_t *dp, void *tag);
 void dsl_pool_config_exit(dsl_pool_t *dp, void *tag);
 boolean_t dsl_pool_config_held(dsl_pool_t *dp);
+boolean_t dsl_pool_need_dirty_delay(dsl_pool_t *dp);
 
 taskq_t *dsl_pool_vnrele_taskq(dsl_pool_t *dp);
 
--- a/usr/src/uts/common/fs/zfs/sys/sa_impl.h	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/sys/sa_impl.h	Mon Aug 26 13:13:26 2013 -0800
@@ -20,7 +20,7 @@
  */
 /*
  * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
- * Copyright (c) 2012 by Delphix. All rights reserved.
+ * Copyright (c) 2013 by Delphix. All rights reserved.
  */
 
 #ifndef	_SYS_SA_IMPL_H
@@ -153,12 +153,13 @@
  *
  * The header has a fixed portion with a variable number
  * of "lengths" depending on the number of variable sized
- * attribues which are determined by the "layout number"
+ * attributes which are determined by the "layout number"
  */
 
 #define	SA_MAGIC	0x2F505A  /* ZFS SA */
 typedef struct sa_hdr_phys {
 	uint32_t sa_magic;
+	/* BEGIN CSTYLED */
 	/*
 	 * Encoded with hdrsize and layout number as follows:
 	 * 16      10       0
@@ -175,6 +176,7 @@
 	 *          2 ==> 16 byte header
 	 *
 	 */
+	/* END CSTYLED */
 	uint16_t sa_layout_info;
 	uint16_t sa_lengths[1];	/* optional sizes for variable length attrs */
 	/* ... Data follows the lengths.  */
--- a/usr/src/uts/common/fs/zfs/sys/spa_impl.h	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/sys/spa_impl.h	Mon Aug 26 13:13:26 2013 -0800
@@ -20,7 +20,7 @@
  */
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
- * Copyright (c) 2012 by Delphix. All rights reserved.
+ * Copyright (c) 2013 by Delphix. All rights reserved.
  * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
  */
 
@@ -234,11 +234,22 @@
 	uint64_t	spa_feat_desc_obj;	/* Feature descriptions */
 	cyclic_id_t	spa_deadman_cycid;	/* cyclic id */
 	uint64_t	spa_deadman_calls;	/* number of deadman calls */
-	uint64_t	spa_sync_starttime;	/* starting time fo spa_sync */
+	hrtime_t	spa_sync_starttime;	/* starting time fo spa_sync */
 	uint64_t	spa_deadman_synctime;	/* deadman expiration timer */
-	kmutex_t	spa_iokstat_lock;	/* protects spa_iokstat_* */
+
+	/*
+	 * spa_iokstat_lock protects spa_iokstat and
+	 * spa_queue_stats[].
+	 */
+	kmutex_t	spa_iokstat_lock;
 	struct kstat	*spa_iokstat;		/* kstat of io to this pool */
+	struct {
+		int spa_active;
+		int spa_queued;
+	} spa_queue_stats[ZIO_PRIORITY_NUM_QUEUEABLE];
+
 	hrtime_t	spa_ccw_fail_time;	/* Conf cache write fail time */
+
 	/*
 	 * spa_refcount & spa_config_lock must be the last elements
 	 * because refcount_t changes size based on compilation options.
--- a/usr/src/uts/common/fs/zfs/sys/txg.h	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/sys/txg.h	Mon Aug 26 13:13:26 2013 -0800
@@ -23,7 +23,7 @@
  * Use is subject to license terms.
  */
 /*
- * Copyright (c) 2012 by Delphix. All rights reserved.
+ * Copyright (c) 2013 by Delphix. All rights reserved.
  */
 
 #ifndef _SYS_TXG_H
@@ -76,6 +76,7 @@
 
 extern void txg_delay(struct dsl_pool *dp, uint64_t txg, hrtime_t delta,
     hrtime_t resolution);
+extern void txg_kick(struct dsl_pool *dp);
 
 /*
  * Wait until the given transaction group has finished syncing.
--- a/usr/src/uts/common/fs/zfs/sys/txg_impl.h	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/sys/txg_impl.h	Mon Aug 26 13:13:26 2013 -0800
@@ -18,6 +18,7 @@
  *
  * CDDL HEADER END
  */
+
 /*
  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
@@ -89,11 +90,14 @@
 typedef struct tx_state {
 	tx_cpu_t	*tx_cpu;	/* protects access to tx_open_txg */
 	kmutex_t	tx_sync_lock;	/* protects the rest of this struct */
+
 	uint64_t	tx_open_txg;	/* currently open txg id */
 	uint64_t	tx_quiesced_txg; /* quiesced txg waiting for sync */
 	uint64_t	tx_syncing_txg;	/* currently syncing txg id */
 	uint64_t	tx_synced_txg;	/* last synced txg id */
 
+	hrtime_t	tx_open_time;	/* start time of tx_open_txg */
+
 	uint64_t	tx_sync_txg_waiting; /* txg we're waiting to sync */
 	uint64_t	tx_quiesce_txg_waiting; /* txg we're waiting to open */
 
--- a/usr/src/uts/common/fs/zfs/sys/vdev_impl.h	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/sys/vdev_impl.h	Mon Aug 26 13:13:26 2013 -0800
@@ -99,12 +99,22 @@
 	kmutex_t	vc_lock;
 };
 
+typedef struct vdev_queue_class {
+	uint32_t	vqc_active;
+
+	/*
+	 * Sorted by offset or timestamp, depending on if the queue is
+	 * LBA-ordered vs FIFO.
+	 */
+	avl_tree_t	vqc_queued_tree;
+} vdev_queue_class_t;
+
 struct vdev_queue {
-	avl_tree_t	vq_deadline_tree;
-	avl_tree_t	vq_read_tree;
-	avl_tree_t	vq_write_tree;
-	avl_tree_t	vq_pending_tree;
-	hrtime_t	vq_io_complete_ts;
+	vdev_t		*vq_vdev;
+	vdev_queue_class_t vq_class[ZIO_PRIORITY_NUM_QUEUEABLE];
+	avl_tree_t	vq_active_tree;
+	uint64_t	vq_last_offset;
+	hrtime_t	vq_io_complete_ts; /* time last i/o completed */
 	kmutex_t	vq_lock;
 };
 
--- a/usr/src/uts/common/fs/zfs/sys/zfs_context.h	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/sys/zfs_context.h	Mon Aug 26 13:13:26 2013 -0800
@@ -25,7 +25,7 @@
 
 /*
  * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
- * Copyright (c) 2012 by Delphix. All rights reserved.
+ * Copyright (c) 2013 by Delphix. All rights reserved.
  */
 
 #ifndef _SYS_ZFS_CONTEXT_H
@@ -70,6 +70,8 @@
 #include <sys/fm/util.h>
 #include <sys/sunddi.h>
 #include <sys/cyclic.h>
+#include <sys/disp.h>
+#include <sys/callo.h>
 
 #define	CPU_SEQID	(CPU->cpu_seqid)
 
--- a/usr/src/uts/common/fs/zfs/sys/zio.h	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/sys/zio.h	Mon Aug 26 13:13:26 2013 -0800
@@ -22,7 +22,7 @@
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
- * Copyright (c) 2012 by Delphix. All rights reserved.
+ * Copyright (c) 2013 by Delphix. All rights reserved.
  * Copyright (c) 2013 by Saso Kiselkov. All rights reserved.
  * Copyright (c) 2013, Joyent, Inc. All rights reserved.
  */
@@ -128,19 +128,16 @@
 #define	ZIO_FAILURE_MODE_CONTINUE	1
 #define	ZIO_FAILURE_MODE_PANIC		2
 
-#define	ZIO_PRIORITY_NOW		(zio_priority_table[0])
-#define	ZIO_PRIORITY_SYNC_READ		(zio_priority_table[1])
-#define	ZIO_PRIORITY_SYNC_WRITE		(zio_priority_table[2])
-#define	ZIO_PRIORITY_LOG_WRITE		(zio_priority_table[3])
-#define	ZIO_PRIORITY_CACHE_FILL		(zio_priority_table[4])
-#define	ZIO_PRIORITY_AGG		(zio_priority_table[5])
-#define	ZIO_PRIORITY_FREE		(zio_priority_table[6])
-#define	ZIO_PRIORITY_ASYNC_WRITE	(zio_priority_table[7])
-#define	ZIO_PRIORITY_ASYNC_READ		(zio_priority_table[8])
-#define	ZIO_PRIORITY_RESILVER		(zio_priority_table[9])
-#define	ZIO_PRIORITY_SCRUB		(zio_priority_table[10])
-#define	ZIO_PRIORITY_DDT_PREFETCH	(zio_priority_table[11])
-#define	ZIO_PRIORITY_TABLE_SIZE		12
+typedef enum zio_priority {
+	ZIO_PRIORITY_SYNC_READ,
+	ZIO_PRIORITY_SYNC_WRITE,	/* ZIL */
+	ZIO_PRIORITY_ASYNC_READ,	/* prefetch */
+	ZIO_PRIORITY_ASYNC_WRITE,	/* spa_sync() */
+	ZIO_PRIORITY_SCRUB,		/* asynchronous scrub/resilver reads */
+	ZIO_PRIORITY_NUM_QUEUEABLE,
+
+	ZIO_PRIORITY_NOW		/* non-queued i/os (e.g. free) */
+} zio_priority_t;
 
 #define	ZIO_PIPELINE_CONTINUE		0x100
 #define	ZIO_PIPELINE_STOP		0x101
@@ -196,6 +193,7 @@
 	ZIO_FLAG_GODFATHER	= 1 << 24,
 	ZIO_FLAG_NOPWRITE	= 1 << 25,
 	ZIO_FLAG_REEXECUTED	= 1 << 26,
+	ZIO_FLAG_DELEGATED	= 1 << 27,
 };
 
 #define	ZIO_FLAG_MUSTSUCCEED		0
@@ -235,8 +233,7 @@
 
 typedef void zio_done_func_t(zio_t *zio);
 
-extern uint8_t zio_priority_table[ZIO_PRIORITY_TABLE_SIZE];
-extern char *zio_type_name[ZIO_TYPES];
+extern const char *zio_type_name[ZIO_TYPES];
 
 /*
  * A bookmark is a four-tuple <objset, object, level, blkid> that uniquely
@@ -376,7 +373,7 @@
 	zio_type_t	io_type;
 	enum zio_child	io_child_type;
 	int		io_cmd;
-	uint8_t		io_priority;
+	zio_priority_t	io_priority;
 	uint8_t		io_reexecute;
 	uint8_t		io_state[ZIO_WAIT_TYPES];
 	uint64_t	io_txg;
@@ -392,6 +389,7 @@
 
 	/* Callback info */
 	zio_done_func_t	*io_ready;
+	zio_done_func_t	*io_physdone;
 	zio_done_func_t	*io_done;
 	void		*io_private;
 	int64_t		io_prev_space_delta;	/* DMU private */
@@ -409,11 +407,8 @@
 	const zio_vsd_ops_t *io_vsd_ops;
 
 	uint64_t	io_offset;
-	uint64_t	io_deadline;
 	hrtime_t	io_timestamp;
-	avl_node_t	io_offset_node;
-	avl_node_t	io_deadline_node;
-	avl_tree_t	*io_vdev_tree;
+	avl_node_t	io_queue_node;
 
 	/* Internal pipeline state */
 	enum zio_flag	io_flags;
@@ -426,6 +421,7 @@
 	int		io_child_error[ZIO_CHILD_TYPES];
 	uint64_t	io_children[ZIO_CHILD_TYPES][ZIO_WAIT_TYPES];
 	uint64_t	io_child_count;
+	uint64_t	io_phys_children;
 	uint64_t	io_parent_count;
 	uint64_t	*io_stall;
 	zio_t		*io_gang_leader;
@@ -451,16 +447,17 @@
 
 extern zio_t *zio_read(zio_t *pio, spa_t *spa, const blkptr_t *bp, void *data,
     uint64_t size, zio_done_func_t *done, void *private,
-    int priority, enum zio_flag flags, const zbookmark_t *zb);
+    zio_priority_t priority, enum zio_flag flags, const zbookmark_t *zb);
 
 extern zio_t *zio_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
     void *data, uint64_t size, const zio_prop_t *zp,
-    zio_done_func_t *ready, zio_done_func_t *done, void *private,
-    int priority, enum zio_flag flags, const zbookmark_t *zb);
+    zio_done_func_t *ready, zio_done_func_t *physdone, zio_done_func_t *done,
+    void *private,
+    zio_priority_t priority, enum zio_flag flags, const zbookmark_t *zb);
 
 extern zio_t *zio_rewrite(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
     void *data, uint64_t size, zio_done_func_t *done, void *private,
-    int priority, enum zio_flag flags, zbookmark_t *zb);
+    zio_priority_t priority, enum zio_flag flags, zbookmark_t *zb);
 
 extern void zio_write_override(zio_t *zio, blkptr_t *bp, int copies,
     boolean_t nopwrite);
@@ -472,17 +469,17 @@
     zio_done_func_t *done, void *private, enum zio_flag flags);
 
 extern zio_t *zio_ioctl(zio_t *pio, spa_t *spa, vdev_t *vd, int cmd,
-    zio_done_func_t *done, void *private, int priority, enum zio_flag flags);
+    zio_done_func_t *done, void *private, enum zio_flag flags);
 
 extern zio_t *zio_read_phys(zio_t *pio, vdev_t *vd, uint64_t offset,
     uint64_t size, void *data, int checksum,
-    zio_done_func_t *done, void *private, int priority, enum zio_flag flags,
-    boolean_t labels);
+    zio_done_func_t *done, void *private, zio_priority_t priority,
+    enum zio_flag flags, boolean_t labels);
 
 extern zio_t *zio_write_phys(zio_t *pio, vdev_t *vd, uint64_t offset,
     uint64_t size, void *data, int checksum,
-    zio_done_func_t *done, void *private, int priority, enum zio_flag flags,
-    boolean_t labels);
+    zio_done_func_t *done, void *private, zio_priority_t priority,
+    enum zio_flag flags, boolean_t labels);
 
 extern zio_t *zio_free_sync(zio_t *pio, spa_t *spa, uint64_t txg,
     const blkptr_t *bp, enum zio_flag flags);
@@ -511,11 +508,12 @@
 extern void zio_resubmit_stage_async(void *);
 
 extern zio_t *zio_vdev_child_io(zio_t *zio, blkptr_t *bp, vdev_t *vd,
-    uint64_t offset, void *data, uint64_t size, int type, int priority,
-    enum zio_flag flags, zio_done_func_t *done, void *private);
+    uint64_t offset, void *data, uint64_t size, int type,
+    zio_priority_t priority, enum zio_flag flags,
+    zio_done_func_t *done, void *private);
 
 extern zio_t *zio_vdev_delegated_io(vdev_t *vd, uint64_t offset,
-    void *data, uint64_t size, int type, int priority,
+    void *data, uint64_t size, int type, zio_priority_t priority,
     enum zio_flag flags, zio_done_func_t *done, void *private);
 
 extern void zio_vdev_io_bypass(zio_t *zio);
--- a/usr/src/uts/common/fs/zfs/txg.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/txg.c	Mon Aug 26 13:13:26 2013 -0800
@@ -45,7 +45,7 @@
  * either be processing, or blocked waiting to enter the next state. There may
  * be up to three active txgs, and there is always a txg in the open state
  * (though it may be blocked waiting to enter the quiescing state). In broad
- * strokes, transactions — operations that change in-memory structures — are
+ * strokes, transactions -- operations that change in-memory structures -- are
  * accepted into the txg in the open state, and are completed while the txg is
  * in the open or quiescing states. The accumulated changes are written to
  * disk in the syncing state.
@@ -53,7 +53,7 @@
  * Open
  *
  * When a new txg becomes active, it first enters the open state. New
- * transactions — updates to in-memory structures — are assigned to the
+ * transactions -- updates to in-memory structures -- are assigned to the
  * currently open txg. There is always a txg in the open state so that ZFS can
  * accept new changes (though the txg may refuse new changes if it has hit
  * some limit). ZFS advances the open txg to the next state for a variety of
@@ -364,6 +364,7 @@
 
 	ASSERT(txg == tx->tx_open_txg);
 	tx->tx_open_txg++;
+	tx->tx_open_time = gethrtime();
 
 	DTRACE_PROBE2(txg__quiescing, dsl_pool_t *, dp, uint64_t, txg);
 	DTRACE_PROBE2(txg__opened, dsl_pool_t *, dp, uint64_t, tx->tx_open_txg);
@@ -454,7 +455,8 @@
 
 	start = delta = 0;
 	for (;;) {
-		uint64_t timer, timeout = zfs_txg_timeout * hz;
+		uint64_t timeout = zfs_txg_timeout * hz;
+		uint64_t timer;
 		uint64_t txg;
 
 		/*
@@ -466,7 +468,8 @@
 		while (!dsl_scan_active(dp->dp_scan) &&
 		    !tx->tx_exiting && timer > 0 &&
 		    tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
-		    tx->tx_quiesced_txg == 0) {
+		    tx->tx_quiesced_txg == 0 &&
+		    dp->dp_dirty_total < zfs_dirty_data_sync) {
 			dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
 			    tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
 			txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
@@ -643,6 +646,28 @@
 	mutex_exit(&tx->tx_sync_lock);
 }
 
+/*
+ * If there isn't a txg syncing or in the pipeline, push another txg through
+ * the pipeline by queiscing the open txg.
+ */
+void
+txg_kick(dsl_pool_t *dp)
+{
+	tx_state_t *tx = &dp->dp_tx;
+
+	ASSERT(!dsl_pool_config_held(dp));
+
+	mutex_enter(&tx->tx_sync_lock);
+	if (tx->tx_syncing_txg == 0 &&
+	    tx->tx_quiesce_txg_waiting <= tx->tx_open_txg &&
+	    tx->tx_sync_txg_waiting <= tx->tx_synced_txg &&
+	    tx->tx_quiesced_txg <= tx->tx_synced_txg) {
+		tx->tx_quiesce_txg_waiting = tx->tx_open_txg + 1;
+		cv_broadcast(&tx->tx_quiesce_more_cv);
+	}
+	mutex_exit(&tx->tx_sync_lock);
+}
+
 boolean_t
 txg_stalled(dsl_pool_t *dp)
 {
--- a/usr/src/uts/common/fs/zfs/vdev.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/vdev.c	Mon Aug 26 13:13:26 2013 -0800
@@ -3257,7 +3257,7 @@
 		vdev_queue_t *vq = &vd->vdev_queue;
 
 		mutex_enter(&vq->vq_lock);
-		if (avl_numnodes(&vq->vq_pending_tree) > 0) {
+		if (avl_numnodes(&vq->vq_active_tree) > 0) {
 			spa_t *spa = vd->vdev_spa;
 			zio_t *fio;
 			uint64_t delta;
@@ -3267,7 +3267,7 @@
 			 * if any I/O has been outstanding for longer than
 			 * the spa_deadman_synctime we panic the system.
 			 */
-			fio = avl_first(&vq->vq_pending_tree);
+			fio = avl_first(&vq->vq_active_tree);
 			delta = gethrtime() - fio->io_timestamp;
 			if (delta > spa_deadman_synctime(spa)) {
 				zfs_dbgmsg("SLOW IO: zio timestamp %lluns, "
--- a/usr/src/uts/common/fs/zfs/vdev_cache.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/vdev_cache.c	Mon Aug 26 13:13:26 2013 -0800
@@ -310,7 +310,7 @@
 	}
 
 	fio = zio_vdev_delegated_io(zio->io_vd, cache_offset,
-	    ve->ve_data, VCBS, ZIO_TYPE_READ, ZIO_PRIORITY_CACHE_FILL,
+	    ve->ve_data, VCBS, ZIO_TYPE_READ, ZIO_PRIORITY_NOW,
 	    ZIO_FLAG_DONT_CACHE, vdev_cache_fill, ve);
 
 	ve->ve_fill_io = fio;
--- a/usr/src/uts/common/fs/zfs/vdev_mirror.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/vdev_mirror.c	Mon Aug 26 13:13:26 2013 -0800
@@ -432,7 +432,7 @@
 			zio_nowait(zio_vdev_child_io(zio, zio->io_bp,
 			    mc->mc_vd, mc->mc_offset,
 			    zio->io_data, zio->io_size,
-			    ZIO_TYPE_WRITE, zio->io_priority,
+			    ZIO_TYPE_WRITE, ZIO_PRIORITY_ASYNC_WRITE,
 			    ZIO_FLAG_IO_REPAIR | (unexpected_errors ?
 			    ZIO_FLAG_SELF_HEAL : 0), NULL, NULL));
 		}
--- a/usr/src/uts/common/fs/zfs/vdev_queue.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/vdev_queue.c	Mon Aug 26 13:13:26 2013 -0800
@@ -24,7 +24,7 @@
  */
 
 /*
- * Copyright (c) 2012 by Delphix. All rights reserved.
+ * Copyright (c) 2013 by Delphix. All rights reserved.
  */
 
 #include <sys/zfs_context.h>
@@ -32,28 +32,129 @@
 #include <sys/spa_impl.h>
 #include <sys/zio.h>
 #include <sys/avl.h>
-
-/*
- * These tunables are for performance analysis.
- */
-
-/* The maximum number of I/Os concurrently pending to each device. */
-int zfs_vdev_max_pending = 10;
+#include <sys/dsl_pool.h>
 
 /*
- * The initial number of I/Os pending to each device, before it starts ramping
- * up to zfs_vdev_max_pending.
+ * ZFS I/O Scheduler
+ * ---------------
+ *
+ * ZFS issues I/O operations to leaf vdevs to satisfy and complete zios.  The
+ * I/O scheduler determines when and in what order those operations are
+ * issued.  The I/O scheduler divides operations into five I/O classes
+ * prioritized in the following order: sync read, sync write, async read,
+ * async write, and scrub/resilver.  Each queue defines the minimum and
+ * maximum number of concurrent operations that may be issued to the device.
+ * In addition, the device has an aggregate maximum. Note that the sum of the
+ * per-queue minimums must not exceed the aggregate maximum, and if the
+ * aggregate maximum is equal to or greater than the sum of the per-queue
+ * maximums, the per-queue minimum has no effect.
+ *
+ * For many physical devices, throughput increases with the number of
+ * concurrent operations, but latency typically suffers. Further, physical
+ * devices typically have a limit at which more concurrent operations have no
+ * effect on throughput or can actually cause it to decrease.
+ *
+ * The scheduler selects the next operation to issue by first looking for an
+ * I/O class whose minimum has not been satisfied. Once all are satisfied and
+ * the aggregate maximum has not been hit, the scheduler looks for classes
+ * whose maximum has not been satisfied. Iteration through the I/O classes is
+ * done in the order specified above. No further operations are issued if the
+ * aggregate maximum number of concurrent operations has been hit or if there
+ * are no operations queued for an I/O class that has not hit its maximum.
+ * Every time an i/o is queued or an operation completes, the I/O scheduler
+ * looks for new operations to issue.
+ *
+ * All I/O classes have a fixed maximum number of outstanding operations
+ * except for the async write class. Asynchronous writes represent the data
+ * that is committed to stable storage during the syncing stage for
+ * transaction groups (see txg.c). Transaction groups enter the syncing state
+ * periodically so the number of queued async writes will quickly burst up and
+ * then bleed down to zero. Rather than servicing them as quickly as possible,
+ * the I/O scheduler changes the maximum number of active async write i/os
+ * according to the amount of dirty data in the pool (see dsl_pool.c). Since
+ * both throughput and latency typically increase with the number of
+ * concurrent operations issued to physical devices, reducing the burstiness
+ * in the number of concurrent operations also stabilizes the response time of
+ * operations from other -- and in particular synchronous -- queues. In broad
+ * strokes, the I/O scheduler will issue more concurrent operations from the
+ * async write queue as there's more dirty data in the pool.
+ *
+ * Async Writes
+ *
+ * The number of concurrent operations issued for the async write I/O class
+ * follows a piece-wise linear function defined by a few adjustable points.
+ *
+ *        |                   o---------| <-- zfs_vdev_async_write_max_active
+ *   ^    |                  /^         |
+ *   |    |                 / |         |
+ * active |                /  |         |
+ *  I/O   |               /   |         |
+ * count  |              /    |         |
+ *        |             /     |         |
+ *        |------------o      |         | <-- zfs_vdev_async_write_min_active
+ *       0|____________^______|_________|
+ *        0%           |      |       100% of zfs_dirty_data_max
+ *                     |      |
+ *                     |      `-- zfs_vdev_async_write_active_max_dirty_percent
+ *                     `--------- zfs_vdev_async_write_active_min_dirty_percent
+ *
+ * Until the amount of dirty data exceeds a minimum percentage of the dirty
+ * data allowed in the pool, the I/O scheduler will limit the number of
+ * concurrent operations to the minimum. As that threshold is crossed, the
+ * number of concurrent operations issued increases linearly to the maximum at
+ * the specified maximum percentage of the dirty data allowed in the pool.
+ *
+ * Ideally, the amount of dirty data on a busy pool will stay in the sloped
+ * part of the function between zfs_vdev_async_write_active_min_dirty_percent
+ * and zfs_vdev_async_write_active_max_dirty_percent. If it exceeds the
+ * maximum percentage, this indicates that the rate of incoming data is
+ * greater than the rate that the backend storage can handle. In this case, we
+ * must further throttle incoming writes (see dmu_tx_delay() for details).
  */
-int zfs_vdev_min_pending = 4;
 
 /*
- * The deadlines are grouped into buckets based on zfs_vdev_time_shift:
- * deadline = pri + gethrtime() >> time_shift)
+ * The maximum number of i/os active to each device.  Ideally, this will be >=
+ * the sum of each queue's max_active.  It must be at least the sum of each
+ * queue's min_active.
  */
-int zfs_vdev_time_shift = 29; /* each bucket is 0.537 seconds */
+uint32_t zfs_vdev_max_active = 1000;
 
-/* exponential I/O issue ramp-up rate */
-int zfs_vdev_ramp_rate = 2;
+/*
+ * Per-queue limits on the number of i/os active to each device.  If the
+ * sum of the queue's max_active is < zfs_vdev_max_active, then the
+ * min_active comes into play.  We will send min_active from each queue,
+ * and then select from queues in the order defined by zio_priority_t.
+ *
+ * In general, smaller max_active's will lead to lower latency of synchronous
+ * operations.  Larger max_active's may lead to higher overall throughput,
+ * depending on underlying storage.
+ *
+ * The ratio of the queues' max_actives determines the balance of performance
+ * between reads, writes, and scrubs.  E.g., increasing
+ * zfs_vdev_scrub_max_active will cause the scrub or resilver to complete
+ * more quickly, but reads and writes to have higher latency and lower
+ * throughput.
+ */
+uint32_t zfs_vdev_sync_read_min_active = 10;
+uint32_t zfs_vdev_sync_read_max_active = 10;
+uint32_t zfs_vdev_sync_write_min_active = 10;
+uint32_t zfs_vdev_sync_write_max_active = 10;
+uint32_t zfs_vdev_async_read_min_active = 1;
+uint32_t zfs_vdev_async_read_max_active = 3;
+uint32_t zfs_vdev_async_write_min_active = 1;
+uint32_t zfs_vdev_async_write_max_active = 10;
+uint32_t zfs_vdev_scrub_min_active = 1;
+uint32_t zfs_vdev_scrub_max_active = 2;
+
+/*
+ * When the pool has less than zfs_vdev_async_write_active_min_dirty_percent
+ * dirty data, use zfs_vdev_async_write_min_active.  When it has more than
+ * zfs_vdev_async_write_active_max_dirty_percent, use
+ * zfs_vdev_async_write_max_active. The value is linearly interpolated
+ * between min and max.
+ */
+int zfs_vdev_async_write_active_min_dirty_percent = 30;
+int zfs_vdev_async_write_active_max_dirty_percent = 60;
 
 /*
  * To reduce IOPs, we aggregate small adjacent I/Os into one large I/O.
@@ -65,33 +166,6 @@
 int zfs_vdev_read_gap_limit = 32 << 10;
 int zfs_vdev_write_gap_limit = 4 << 10;
 
-/*
- * Virtual device vector for disk I/O scheduling.
- */
-int
-vdev_queue_deadline_compare(const void *x1, const void *x2)
-{
-	const zio_t *z1 = x1;
-	const zio_t *z2 = x2;
-
-	if (z1->io_deadline < z2->io_deadline)
-		return (-1);
-	if (z1->io_deadline > z2->io_deadline)
-		return (1);
-
-	if (z1->io_offset < z2->io_offset)
-		return (-1);
-	if (z1->io_offset > z2->io_offset)
-		return (1);
-
-	if (z1 < z2)
-		return (-1);
-	if (z1 > z2)
-		return (1);
-
-	return (0);
-}
-
 int
 vdev_queue_offset_compare(const void *x1, const void *x2)
 {
@@ -111,24 +185,50 @@
 	return (0);
 }
 
+int
+vdev_queue_timestamp_compare(const void *x1, const void *x2)
+{
+	const zio_t *z1 = x1;
+	const zio_t *z2 = x2;
+
+	if (z1->io_timestamp < z2->io_timestamp)
+		return (-1);
+	if (z1->io_timestamp > z2->io_timestamp)
+		return (1);
+
+	if (z1 < z2)
+		return (-1);
+	if (z1 > z2)
+		return (1);
+
+	return (0);
+}
+
 void
 vdev_queue_init(vdev_t *vd)
 {
 	vdev_queue_t *vq = &vd->vdev_queue;
 
 	mutex_init(&vq->vq_lock, NULL, MUTEX_DEFAULT, NULL);
+	vq->vq_vdev = vd;
 
-	avl_create(&vq->vq_deadline_tree, vdev_queue_deadline_compare,
-	    sizeof (zio_t), offsetof(struct zio, io_deadline_node));
+	avl_create(&vq->vq_active_tree, vdev_queue_offset_compare,
+	    sizeof (zio_t), offsetof(struct zio, io_queue_node));
 
-	avl_create(&vq->vq_read_tree, vdev_queue_offset_compare,
-	    sizeof (zio_t), offsetof(struct zio, io_offset_node));
-
-	avl_create(&vq->vq_write_tree, vdev_queue_offset_compare,
-	    sizeof (zio_t), offsetof(struct zio, io_offset_node));
-
-	avl_create(&vq->vq_pending_tree, vdev_queue_offset_compare,
-	    sizeof (zio_t), offsetof(struct zio, io_offset_node));
+	for (zio_priority_t p = 0; p < ZIO_PRIORITY_NUM_QUEUEABLE; p++) {
+		/*
+		 * The synchronous i/o queues are FIFO rather than LBA ordered.
+		 * This provides more consistent latency for these i/os, and
+		 * they tend to not be tightly clustered anyway so there is
+		 * little to no throughput loss.
+		 */
+		boolean_t fifo = (p == ZIO_PRIORITY_SYNC_READ ||
+		    p == ZIO_PRIORITY_SYNC_WRITE);
+		avl_create(&vq->vq_class[p].vqc_queued_tree,
+		    fifo ? vdev_queue_timestamp_compare :
+		    vdev_queue_offset_compare,
+		    sizeof (zio_t), offsetof(struct zio, io_queue_node));
+	}
 }
 
 void
@@ -136,10 +236,9 @@
 {
 	vdev_queue_t *vq = &vd->vdev_queue;
 
-	avl_destroy(&vq->vq_deadline_tree);
-	avl_destroy(&vq->vq_read_tree);
-	avl_destroy(&vq->vq_write_tree);
-	avl_destroy(&vq->vq_pending_tree);
+	for (zio_priority_t p = 0; p < ZIO_PRIORITY_NUM_QUEUEABLE; p++)
+		avl_destroy(&vq->vq_class[p].vqc_queued_tree);
+	avl_destroy(&vq->vq_active_tree);
 
 	mutex_destroy(&vq->vq_lock);
 }
@@ -148,51 +247,62 @@
 vdev_queue_io_add(vdev_queue_t *vq, zio_t *zio)
 {
 	spa_t *spa = zio->io_spa;
-	avl_add(&vq->vq_deadline_tree, zio);
-	avl_add(zio->io_vdev_tree, zio);
+	ASSERT3U(zio->io_priority, <, ZIO_PRIORITY_NUM_QUEUEABLE);
+	avl_add(&vq->vq_class[zio->io_priority].vqc_queued_tree, zio);
 
-	if (spa->spa_iokstat != NULL) {
-		mutex_enter(&spa->spa_iokstat_lock);
+	mutex_enter(&spa->spa_iokstat_lock);
+	spa->spa_queue_stats[zio->io_priority].spa_queued++;
+	if (spa->spa_iokstat != NULL)
 		kstat_waitq_enter(spa->spa_iokstat->ks_data);
-		mutex_exit(&spa->spa_iokstat_lock);
-	}
+	mutex_exit(&spa->spa_iokstat_lock);
 }
 
 static void
 vdev_queue_io_remove(vdev_queue_t *vq, zio_t *zio)
 {
 	spa_t *spa = zio->io_spa;
-	avl_remove(&vq->vq_deadline_tree, zio);
-	avl_remove(zio->io_vdev_tree, zio);
+	ASSERT3U(zio->io_priority, <, ZIO_PRIORITY_NUM_QUEUEABLE);
+	avl_remove(&vq->vq_class[zio->io_priority].vqc_queued_tree, zio);
 
-	if (spa->spa_iokstat != NULL) {
-		mutex_enter(&spa->spa_iokstat_lock);
+	mutex_enter(&spa->spa_iokstat_lock);
+	ASSERT3U(spa->spa_queue_stats[zio->io_priority].spa_queued, >, 0);
+	spa->spa_queue_stats[zio->io_priority].spa_queued--;
+	if (spa->spa_iokstat != NULL)
 		kstat_waitq_exit(spa->spa_iokstat->ks_data);
-		mutex_exit(&spa->spa_iokstat_lock);
-	}
+	mutex_exit(&spa->spa_iokstat_lock);
 }
 
 static void
 vdev_queue_pending_add(vdev_queue_t *vq, zio_t *zio)
 {
 	spa_t *spa = zio->io_spa;
-	avl_add(&vq->vq_pending_tree, zio);
-	if (spa->spa_iokstat != NULL) {
-		mutex_enter(&spa->spa_iokstat_lock);
+	ASSERT(MUTEX_HELD(&vq->vq_lock));
+	ASSERT3U(zio->io_priority, <, ZIO_PRIORITY_NUM_QUEUEABLE);
+	vq->vq_class[zio->io_priority].vqc_active++;
+	avl_add(&vq->vq_active_tree, zio);
+
+	mutex_enter(&spa->spa_iokstat_lock);
+	spa->spa_queue_stats[zio->io_priority].spa_active++;
+	if (spa->spa_iokstat != NULL)
 		kstat_runq_enter(spa->spa_iokstat->ks_data);
-		mutex_exit(&spa->spa_iokstat_lock);
-	}
+	mutex_exit(&spa->spa_iokstat_lock);
 }
 
 static void
 vdev_queue_pending_remove(vdev_queue_t *vq, zio_t *zio)
 {
 	spa_t *spa = zio->io_spa;
-	avl_remove(&vq->vq_pending_tree, zio);
+	ASSERT(MUTEX_HELD(&vq->vq_lock));
+	ASSERT3U(zio->io_priority, <, ZIO_PRIORITY_NUM_QUEUEABLE);
+	vq->vq_class[zio->io_priority].vqc_active--;
+	avl_remove(&vq->vq_active_tree, zio);
+
+	mutex_enter(&spa->spa_iokstat_lock);
+	ASSERT3U(spa->spa_queue_stats[zio->io_priority].spa_active, >, 0);
+	spa->spa_queue_stats[zio->io_priority].spa_active--;
 	if (spa->spa_iokstat != NULL) {
 		kstat_io_t *ksio = spa->spa_iokstat->ks_data;
 
-		mutex_enter(&spa->spa_iokstat_lock);
 		kstat_runq_exit(spa->spa_iokstat->ks_data);
 		if (zio->io_type == ZIO_TYPE_READ) {
 			ksio->reads++;
@@ -201,23 +311,131 @@
 			ksio->writes++;
 			ksio->nwritten += zio->io_size;
 		}
-		mutex_exit(&spa->spa_iokstat_lock);
 	}
+	mutex_exit(&spa->spa_iokstat_lock);
 }
 
 static void
 vdev_queue_agg_io_done(zio_t *aio)
 {
-	zio_t *pio;
-
-	while ((pio = zio_walk_parents(aio)) != NULL)
-		if (aio->io_type == ZIO_TYPE_READ)
+	if (aio->io_type == ZIO_TYPE_READ) {
+		zio_t *pio;
+		while ((pio = zio_walk_parents(aio)) != NULL) {
 			bcopy((char *)aio->io_data + (pio->io_offset -
 			    aio->io_offset), pio->io_data, pio->io_size);
+		}
+	}
 
 	zio_buf_free(aio->io_data, aio->io_size);
 }
 
+static int
+vdev_queue_class_min_active(zio_priority_t p)
+{
+	switch (p) {
+	case ZIO_PRIORITY_SYNC_READ:
+		return (zfs_vdev_sync_read_min_active);
+	case ZIO_PRIORITY_SYNC_WRITE:
+		return (zfs_vdev_sync_write_min_active);
+	case ZIO_PRIORITY_ASYNC_READ:
+		return (zfs_vdev_async_read_min_active);
+	case ZIO_PRIORITY_ASYNC_WRITE:
+		return (zfs_vdev_async_write_min_active);
+	case ZIO_PRIORITY_SCRUB:
+		return (zfs_vdev_scrub_min_active);
+	default:
+		panic("invalid priority %u", p);
+		return (0);
+	}
+}
+
+static int
+vdev_queue_max_async_writes(uint64_t dirty)
+{
+	int writes;
+	uint64_t min_bytes = zfs_dirty_data_max *
+	    zfs_vdev_async_write_active_min_dirty_percent / 100;
+	uint64_t max_bytes = zfs_dirty_data_max *
+	    zfs_vdev_async_write_active_max_dirty_percent / 100;
+
+	if (dirty < min_bytes)
+		return (zfs_vdev_async_write_min_active);
+	if (dirty > max_bytes)
+		return (zfs_vdev_async_write_max_active);
+
+	/*
+	 * linear interpolation:
+	 * slope = (max_writes - min_writes) / (max_bytes - min_bytes)
+	 * move right by min_bytes
+	 * move up by min_writes
+	 */
+	writes = (dirty - min_bytes) *
+	    (zfs_vdev_async_write_max_active -
+	    zfs_vdev_async_write_min_active) /
+	    (max_bytes - min_bytes) +
+	    zfs_vdev_async_write_min_active;
+	ASSERT3U(writes, >=, zfs_vdev_async_write_min_active);
+	ASSERT3U(writes, <=, zfs_vdev_async_write_max_active);
+	return (writes);
+}
+
+static int
+vdev_queue_class_max_active(spa_t *spa, zio_priority_t p)
+{
+	switch (p) {
+	case ZIO_PRIORITY_SYNC_READ:
+		return (zfs_vdev_sync_read_max_active);
+	case ZIO_PRIORITY_SYNC_WRITE:
+		return (zfs_vdev_sync_write_max_active);
+	case ZIO_PRIORITY_ASYNC_READ:
+		return (zfs_vdev_async_read_max_active);
+	case ZIO_PRIORITY_ASYNC_WRITE:
+		return (vdev_queue_max_async_writes(
+		    spa->spa_dsl_pool->dp_dirty_total));
+	case ZIO_PRIORITY_SCRUB:
+		return (zfs_vdev_scrub_max_active);
+	default:
+		panic("invalid priority %u", p);
+		return (0);
+	}
+}
+
+/*
+ * Return the i/o class to issue from, or ZIO_PRIORITY_MAX_QUEUEABLE if
+ * there is no eligible class.
+ */
+static zio_priority_t
+vdev_queue_class_to_issue(vdev_queue_t *vq)
+{
+	spa_t *spa = vq->vq_vdev->vdev_spa;
+	zio_priority_t p;
+
+	if (avl_numnodes(&vq->vq_active_tree) >= zfs_vdev_max_active)
+		return (ZIO_PRIORITY_NUM_QUEUEABLE);
+
+	/* find a queue that has not reached its minimum # outstanding i/os */
+	for (p = 0; p < ZIO_PRIORITY_NUM_QUEUEABLE; p++) {
+		if (avl_numnodes(&vq->vq_class[p].vqc_queued_tree) > 0 &&
+		    vq->vq_class[p].vqc_active <
+		    vdev_queue_class_min_active(p))
+			return (p);
+	}
+
+	/*
+	 * If we haven't found a queue, look for one that hasn't reached its
+	 * maximum # outstanding i/os.
+	 */
+	for (p = 0; p < ZIO_PRIORITY_NUM_QUEUEABLE; p++) {
+		if (avl_numnodes(&vq->vq_class[p].vqc_queued_tree) > 0 &&
+		    vq->vq_class[p].vqc_active <
+		    vdev_queue_class_max_active(spa, p))
+			return (p);
+	}
+
+	/* No eligible queued i/os */
+	return (ZIO_PRIORITY_NUM_QUEUEABLE);
+}
+
 /*
  * Compute the range spanned by two i/os, which is the endpoint of the last
  * (lio->io_offset + lio->io_size) minus start of the first (fio->io_offset).
@@ -228,154 +446,192 @@
 #define	IO_GAP(fio, lio) (-IO_SPAN(lio, fio))
 
 static zio_t *
-vdev_queue_io_to_issue(vdev_queue_t *vq, uint64_t pending_limit)
+vdev_queue_aggregate(vdev_queue_t *vq, zio_t *zio)
 {
-	zio_t *fio, *lio, *aio, *dio, *nio, *mio;
-	avl_tree_t *t;
-	int flags;
-	uint64_t maxspan = zfs_vdev_aggregation_limit;
-	uint64_t maxgap;
-	int stretch;
+	zio_t *first, *last, *aio, *dio, *mandatory, *nio;
+	uint64_t maxgap = 0;
+	uint64_t size;
+	boolean_t stretch = B_FALSE;
+	vdev_queue_class_t *vqc = &vq->vq_class[zio->io_priority];
+	avl_tree_t *t = &vqc->vqc_queued_tree;
+	enum zio_flag flags = zio->io_flags & ZIO_FLAG_AGG_INHERIT;
+
+	if (zio->io_flags & ZIO_FLAG_DONT_AGGREGATE)
+		return (NULL);
+
+	/*
+	 * The synchronous i/o queues are not sorted by LBA, so we can't
+	 * find adjacent i/os.  These i/os tend to not be tightly clustered,
+	 * or too large to aggregate, so this has little impact on performance.
+	 */
+	if (zio->io_priority == ZIO_PRIORITY_SYNC_READ ||
+	    zio->io_priority == ZIO_PRIORITY_SYNC_WRITE)
+		return (NULL);
+
+	first = last = zio;
+
+	if (zio->io_type == ZIO_TYPE_READ)
+		maxgap = zfs_vdev_read_gap_limit;
+
+	/*
+	 * We can aggregate I/Os that are sufficiently adjacent and of
+	 * the same flavor, as expressed by the AGG_INHERIT flags.
+	 * The latter requirement is necessary so that certain
+	 * attributes of the I/O, such as whether it's a normal I/O
+	 * or a scrub/resilver, can be preserved in the aggregate.
+	 * We can include optional I/Os, but don't allow them
+	 * to begin a range as they add no benefit in that situation.
+	 */
+
+	/*
+	 * We keep track of the last non-optional I/O.
+	 */
+	mandatory = (first->io_flags & ZIO_FLAG_OPTIONAL) ? NULL : first;
+
+	/*
+	 * Walk backwards through sufficiently contiguous I/Os
+	 * recording the last non-option I/O.
+	 */
+	while ((dio = AVL_PREV(t, first)) != NULL &&
+	    (dio->io_flags & ZIO_FLAG_AGG_INHERIT) == flags &&
+	    IO_SPAN(dio, last) <= zfs_vdev_aggregation_limit &&
+	    IO_GAP(dio, first) <= maxgap) {
+		first = dio;
+		if (mandatory == NULL && !(first->io_flags & ZIO_FLAG_OPTIONAL))
+			mandatory = first;
+	}
+
+	/*
+	 * Skip any initial optional I/Os.
+	 */
+	while ((first->io_flags & ZIO_FLAG_OPTIONAL) && first != last) {
+		first = AVL_NEXT(t, first);
+		ASSERT(first != NULL);
+	}
+
+	/*
+	 * Walk forward through sufficiently contiguous I/Os.
+	 */
+	while ((dio = AVL_NEXT(t, last)) != NULL &&
+	    (dio->io_flags & ZIO_FLAG_AGG_INHERIT) == flags &&
+	    IO_SPAN(first, dio) <= zfs_vdev_aggregation_limit &&
+	    IO_GAP(last, dio) <= maxgap) {
+		last = dio;
+		if (!(last->io_flags & ZIO_FLAG_OPTIONAL))
+			mandatory = last;
+	}
+
+	/*
+	 * Now that we've established the range of the I/O aggregation
+	 * we must decide what to do with trailing optional I/Os.
+	 * For reads, there's nothing to do. While we are unable to
+	 * aggregate further, it's possible that a trailing optional
+	 * I/O would allow the underlying device to aggregate with
+	 * subsequent I/Os. We must therefore determine if the next
+	 * non-optional I/O is close enough to make aggregation
+	 * worthwhile.
+	 */
+	if (zio->io_type == ZIO_TYPE_WRITE && mandatory != NULL) {
+		zio_t *nio = last;
+		while ((dio = AVL_NEXT(t, nio)) != NULL &&
+		    IO_GAP(nio, dio) == 0 &&
+		    IO_GAP(mandatory, dio) <= zfs_vdev_write_gap_limit) {
+			nio = dio;
+			if (!(nio->io_flags & ZIO_FLAG_OPTIONAL)) {
+				stretch = B_TRUE;
+				break;
+			}
+		}
+	}
+
+	if (stretch) {
+		/* This may be a no-op. */
+		dio = AVL_NEXT(t, last);
+		dio->io_flags &= ~ZIO_FLAG_OPTIONAL;
+	} else {
+		while (last != mandatory && last != first) {
+			ASSERT(last->io_flags & ZIO_FLAG_OPTIONAL);
+			last = AVL_PREV(t, last);
+			ASSERT(last != NULL);
+		}
+	}
+
+	if (first == last)
+		return (NULL);
+
+	size = IO_SPAN(first, last);
+	ASSERT3U(size, <=, zfs_vdev_aggregation_limit);
+
+	aio = zio_vdev_delegated_io(first->io_vd, first->io_offset,
+	    zio_buf_alloc(size), size, first->io_type, zio->io_priority,
+	    flags | ZIO_FLAG_DONT_CACHE | ZIO_FLAG_DONT_QUEUE,
+	    vdev_queue_agg_io_done, NULL);
+	aio->io_timestamp = first->io_timestamp;
+
+	nio = first;
+	do {
+		dio = nio;
+		nio = AVL_NEXT(t, dio);
+		ASSERT3U(dio->io_type, ==, aio->io_type);
+
+		if (dio->io_flags & ZIO_FLAG_NODATA) {
+			ASSERT3U(dio->io_type, ==, ZIO_TYPE_WRITE);
+			bzero((char *)aio->io_data + (dio->io_offset -
+			    aio->io_offset), dio->io_size);
+		} else if (dio->io_type == ZIO_TYPE_WRITE) {
+			bcopy(dio->io_data, (char *)aio->io_data +
+			    (dio->io_offset - aio->io_offset),
+			    dio->io_size);
+		}
+
+		zio_add_child(dio, aio);
+		vdev_queue_io_remove(vq, dio);
+		zio_vdev_io_bypass(dio);
+		zio_execute(dio);
+	} while (dio != last);
+
+	return (aio);
+}
+
+static zio_t *
+vdev_queue_io_to_issue(vdev_queue_t *vq)
+{
+	zio_t *zio, *aio;
+	zio_priority_t p;
+	avl_index_t idx;
+	vdev_queue_class_t *vqc;
+	zio_t search;
 
 again:
 	ASSERT(MUTEX_HELD(&vq->vq_lock));
 
-	if (avl_numnodes(&vq->vq_pending_tree) >= pending_limit ||
-	    avl_numnodes(&vq->vq_deadline_tree) == 0)
-		return (NULL);
-
-	fio = lio = avl_first(&vq->vq_deadline_tree);
-
-	t = fio->io_vdev_tree;
-	flags = fio->io_flags & ZIO_FLAG_AGG_INHERIT;
-	maxgap = (t == &vq->vq_read_tree) ? zfs_vdev_read_gap_limit : 0;
-
-	if (!(flags & ZIO_FLAG_DONT_AGGREGATE)) {
-		/*
-		 * We can aggregate I/Os that are sufficiently adjacent and of
-		 * the same flavor, as expressed by the AGG_INHERIT flags.
-		 * The latter requirement is necessary so that certain
-		 * attributes of the I/O, such as whether it's a normal I/O
-		 * or a scrub/resilver, can be preserved in the aggregate.
-		 * We can include optional I/Os, but don't allow them
-		 * to begin a range as they add no benefit in that situation.
-		 */
-
-		/*
-		 * We keep track of the last non-optional I/O.
-		 */
-		mio = (fio->io_flags & ZIO_FLAG_OPTIONAL) ? NULL : fio;
-
-		/*
-		 * Walk backwards through sufficiently contiguous I/Os
-		 * recording the last non-option I/O.
-		 */
-		while ((dio = AVL_PREV(t, fio)) != NULL &&
-		    (dio->io_flags & ZIO_FLAG_AGG_INHERIT) == flags &&
-		    IO_SPAN(dio, lio) <= maxspan &&
-		    IO_GAP(dio, fio) <= maxgap) {
-			fio = dio;
-			if (mio == NULL && !(fio->io_flags & ZIO_FLAG_OPTIONAL))
-				mio = fio;
-		}
-
-		/*
-		 * Skip any initial optional I/Os.
-		 */
-		while ((fio->io_flags & ZIO_FLAG_OPTIONAL) && fio != lio) {
-			fio = AVL_NEXT(t, fio);
-			ASSERT(fio != NULL);
-		}
+	p = vdev_queue_class_to_issue(vq);
 
-		/*
-		 * Walk forward through sufficiently contiguous I/Os.
-		 */
-		while ((dio = AVL_NEXT(t, lio)) != NULL &&
-		    (dio->io_flags & ZIO_FLAG_AGG_INHERIT) == flags &&
-		    IO_SPAN(fio, dio) <= maxspan &&
-		    IO_GAP(lio, dio) <= maxgap) {
-			lio = dio;
-			if (!(lio->io_flags & ZIO_FLAG_OPTIONAL))
-				mio = lio;
-		}
-
-		/*
-		 * Now that we've established the range of the I/O aggregation
-		 * we must decide what to do with trailing optional I/Os.
-		 * For reads, there's nothing to do. While we are unable to
-		 * aggregate further, it's possible that a trailing optional
-		 * I/O would allow the underlying device to aggregate with
-		 * subsequent I/Os. We must therefore determine if the next
-		 * non-optional I/O is close enough to make aggregation
-		 * worthwhile.
-		 */
-		stretch = B_FALSE;
-		if (t != &vq->vq_read_tree && mio != NULL) {
-			nio = lio;
-			while ((dio = AVL_NEXT(t, nio)) != NULL &&
-			    IO_GAP(nio, dio) == 0 &&
-			    IO_GAP(mio, dio) <= zfs_vdev_write_gap_limit) {
-				nio = dio;
-				if (!(nio->io_flags & ZIO_FLAG_OPTIONAL)) {
-					stretch = B_TRUE;
-					break;
-				}
-			}
-		}
-
-		if (stretch) {
-			/* This may be a no-op. */
-			VERIFY((dio = AVL_NEXT(t, lio)) != NULL);
-			dio->io_flags &= ~ZIO_FLAG_OPTIONAL;
-		} else {
-			while (lio != mio && lio != fio) {
-				ASSERT(lio->io_flags & ZIO_FLAG_OPTIONAL);
-				lio = AVL_PREV(t, lio);
-				ASSERT(lio != NULL);
-			}
-		}
+	if (p == ZIO_PRIORITY_NUM_QUEUEABLE) {
+		/* No eligible queued i/os */
+		return (NULL);
 	}
 
-	if (fio != lio) {
-		uint64_t size = IO_SPAN(fio, lio);
-		ASSERT(size <= zfs_vdev_aggregation_limit);
-
-		aio = zio_vdev_delegated_io(fio->io_vd, fio->io_offset,
-		    zio_buf_alloc(size), size, fio->io_type, ZIO_PRIORITY_AGG,
-		    flags | ZIO_FLAG_DONT_CACHE | ZIO_FLAG_DONT_QUEUE,
-		    vdev_queue_agg_io_done, NULL);
-		aio->io_timestamp = fio->io_timestamp;
-
-		nio = fio;
-		do {
-			dio = nio;
-			nio = AVL_NEXT(t, dio);
-			ASSERT(dio->io_type == aio->io_type);
-			ASSERT(dio->io_vdev_tree == t);
+	/*
+	 * For LBA-ordered queues (async / scrub), issue the i/o which follows
+	 * the most recently issued i/o in LBA (offset) order.
+	 *
+	 * For FIFO queues (sync), issue the i/o with the lowest timestamp.
+	 */
+	vqc = &vq->vq_class[p];
+	search.io_timestamp = 0;
+	search.io_offset = vq->vq_last_offset + 1;
+	VERIFY3P(avl_find(&vqc->vqc_queued_tree, &search, &idx), ==, NULL);
+	zio = avl_nearest(&vqc->vqc_queued_tree, idx, AVL_AFTER);
+	if (zio == NULL)
+		zio = avl_first(&vqc->vqc_queued_tree);
+	ASSERT3U(zio->io_priority, ==, p);
 
-			if (dio->io_flags & ZIO_FLAG_NODATA) {
-				ASSERT(dio->io_type == ZIO_TYPE_WRITE);
-				bzero((char *)aio->io_data + (dio->io_offset -
-				    aio->io_offset), dio->io_size);
-			} else if (dio->io_type == ZIO_TYPE_WRITE) {
-				bcopy(dio->io_data, (char *)aio->io_data +
-				    (dio->io_offset - aio->io_offset),
-				    dio->io_size);
-			}
-
-			zio_add_child(dio, aio);
-			vdev_queue_io_remove(vq, dio);
-			zio_vdev_io_bypass(dio);
-			zio_execute(dio);
-		} while (dio != lio);
-
-		vdev_queue_pending_add(vq, aio);
-
-		return (aio);
-	}
-
-	ASSERT(fio->io_vdev_tree == t);
-	vdev_queue_io_remove(vq, fio);
+	aio = vdev_queue_aggregate(vq, zio);
+	if (aio != NULL)
+		zio = aio;
+	else
+		vdev_queue_io_remove(vq, zio);
 
 	/*
 	 * If the I/O is or was optional and therefore has no data, we need to
@@ -383,17 +639,18 @@
 	 * deadlock that we could encounter since this I/O will complete
 	 * immediately.
 	 */
-	if (fio->io_flags & ZIO_FLAG_NODATA) {
+	if (zio->io_flags & ZIO_FLAG_NODATA) {
 		mutex_exit(&vq->vq_lock);
-		zio_vdev_io_bypass(fio);
-		zio_execute(fio);
+		zio_vdev_io_bypass(zio);
+		zio_execute(zio);
 		mutex_enter(&vq->vq_lock);
 		goto again;
 	}
 
-	vdev_queue_pending_add(vq, fio);
+	vdev_queue_pending_add(vq, zio);
+	vq->vq_last_offset = zio->io_offset;
 
-	return (fio);
+	return (zio);
 }
 
 zio_t *
@@ -402,28 +659,31 @@
 	vdev_queue_t *vq = &zio->io_vd->vdev_queue;
 	zio_t *nio;
 
-	ASSERT(zio->io_type == ZIO_TYPE_READ || zio->io_type == ZIO_TYPE_WRITE);
-
 	if (zio->io_flags & ZIO_FLAG_DONT_QUEUE)
 		return (zio);
 
+	/*
+	 * Children i/os inherent their parent's priority, which might
+	 * not match the child's i/o type.  Fix it up here.
+	 */
+	if (zio->io_type == ZIO_TYPE_READ) {
+		if (zio->io_priority != ZIO_PRIORITY_SYNC_READ &&
+		    zio->io_priority != ZIO_PRIORITY_ASYNC_READ &&
+		    zio->io_priority != ZIO_PRIORITY_SCRUB)
+			zio->io_priority = ZIO_PRIORITY_ASYNC_READ;
+	} else {
+		ASSERT(zio->io_type == ZIO_TYPE_WRITE);
+		if (zio->io_priority != ZIO_PRIORITY_SYNC_WRITE &&
+		    zio->io_priority != ZIO_PRIORITY_ASYNC_WRITE)
+			zio->io_priority = ZIO_PRIORITY_ASYNC_WRITE;
+	}
+
 	zio->io_flags |= ZIO_FLAG_DONT_CACHE | ZIO_FLAG_DONT_QUEUE;
 
-	if (zio->io_type == ZIO_TYPE_READ)
-		zio->io_vdev_tree = &vq->vq_read_tree;
-	else
-		zio->io_vdev_tree = &vq->vq_write_tree;
-
 	mutex_enter(&vq->vq_lock);
-
 	zio->io_timestamp = gethrtime();
-	zio->io_deadline = (zio->io_timestamp >> zfs_vdev_time_shift) +
-	    zio->io_priority;
-
 	vdev_queue_io_add(vq, zio);
-
-	nio = vdev_queue_io_to_issue(vq, zfs_vdev_min_pending);
-
+	nio = vdev_queue_io_to_issue(vq);
 	mutex_exit(&vq->vq_lock);
 
 	if (nio == NULL)
@@ -441,6 +701,7 @@
 vdev_queue_io_done(zio_t *zio)
 {
 	vdev_queue_t *vq = &zio->io_vd->vdev_queue;
+	zio_t *nio;
 
 	if (zio_injection_enabled)
 		delay(SEC_TO_TICK(zio_handle_io_delay(zio)));
@@ -451,10 +712,7 @@
 
 	vq->vq_io_complete_ts = gethrtime();
 
-	for (int i = 0; i < zfs_vdev_ramp_rate; i++) {
-		zio_t *nio = vdev_queue_io_to_issue(vq, zfs_vdev_max_pending);
-		if (nio == NULL)
-			break;
+	while ((nio = vdev_queue_io_to_issue(vq)) != NULL) {
 		mutex_exit(&vq->vq_lock);
 		if (nio->io_done == vdev_queue_agg_io_done) {
 			zio_nowait(nio);
--- a/usr/src/uts/common/fs/zfs/vdev_raidz.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/vdev_raidz.c	Mon Aug 26 13:13:26 2013 -0800
@@ -2340,7 +2340,7 @@
 
 			zio_nowait(zio_vdev_child_io(zio, NULL, cvd,
 			    rc->rc_offset, rc->rc_data, rc->rc_size,
-			    ZIO_TYPE_WRITE, zio->io_priority,
+			    ZIO_TYPE_WRITE, ZIO_PRIORITY_ASYNC_WRITE,
 			    ZIO_FLAG_IO_REPAIR | (unexpected_errors ?
 			    ZIO_FLAG_SELF_HEAL : 0), NULL, NULL));
 		}
--- a/usr/src/uts/common/fs/zfs/zfs_vnops.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/zfs_vnops.c	Mon Aug 26 13:13:26 2013 -0800
@@ -123,7 +123,11 @@
  *	forever, because the previous txg can't quiesce until B's tx commits.
  *
  *	If dmu_tx_assign() returns ERESTART and zfsvfs->z_assign is TXG_NOWAIT,
- *	then drop all locks, call dmu_tx_wait(), and try again.
+ *	then drop all locks, call dmu_tx_wait(), and try again.  On subsequent
+ *	calls to dmu_tx_assign(), pass TXG_WAITED rather than TXG_NOWAIT,
+ *	to indicate that this operation has already called dmu_tx_wait().
+ *	This will ensure that we don't retry forever, waiting a short bit
+ *	each time.
  *
  *  (5)	If the operation succeeded, generate the intent log entry for it
  *	before dropping locks.  This ensures that the ordering of events
@@ -145,12 +149,13 @@
  *	rw_enter(...);			// grab any other locks you need
  *	tx = dmu_tx_create(...);	// get DMU tx
  *	dmu_tx_hold_*();		// hold each object you might modify
- *	error = dmu_tx_assign(tx, TXG_NOWAIT);	// try to assign
+ *	error = dmu_tx_assign(tx, waited ? TXG_WAITED : TXG_NOWAIT);
  *	if (error) {
  *		rw_exit(...);		// drop locks
  *		zfs_dirent_unlock(dl);	// unlock directory entry
  *		VN_RELE(...);		// release held vnodes
  *		if (error == ERESTART) {
+ *			waited = B_TRUE;
  *			dmu_tx_wait(tx);
  *			dmu_tx_abort(tx);
  *			goto top;
@@ -1315,6 +1320,7 @@
 	zfs_acl_ids_t   acl_ids;
 	boolean_t	fuid_dirtied;
 	boolean_t	have_acl = B_FALSE;
+	boolean_t	waited = B_FALSE;
 
 	/*
 	 * If we have an ephemeral id, ACL, or XVATTR then
@@ -1435,10 +1441,11 @@
 			dmu_tx_hold_write(tx, DMU_NEW_OBJECT,
 			    0, acl_ids.z_aclp->z_acl_bytes);
 		}
-		error = dmu_tx_assign(tx, TXG_NOWAIT);
+		error = dmu_tx_assign(tx, waited ? TXG_WAITED : TXG_NOWAIT);
 		if (error) {
 			zfs_dirent_unlock(dl);
 			if (error == ERESTART) {
+				waited = B_TRUE;
 				dmu_tx_wait(tx);
 				dmu_tx_abort(tx);
 				goto top;
@@ -1570,6 +1577,7 @@
 	pathname_t	realnm;
 	int		error;
 	int		zflg = ZEXISTS;
+	boolean_t	waited = B_FALSE;
 
 	ZFS_ENTER(zfsvfs);
 	ZFS_VERIFY_ZP(dzp);
@@ -1658,13 +1666,14 @@
 	/* charge as an update -- would be nice not to charge at all */
 	dmu_tx_hold_zap(tx, zfsvfs->z_unlinkedobj, FALSE, NULL);
 
-	error = dmu_tx_assign(tx, TXG_NOWAIT);
+	error = dmu_tx_assign(tx, waited ? TXG_WAITED : TXG_NOWAIT);
 	if (error) {
 		zfs_dirent_unlock(dl);
 		VN_RELE(vp);
 		if (xzp)
 			VN_RELE(ZTOV(xzp));
 		if (error == ERESTART) {
+			waited = B_TRUE;
 			dmu_tx_wait(tx);
 			dmu_tx_abort(tx);
 			goto top;
@@ -1798,6 +1807,7 @@
 	gid_t		gid = crgetgid(cr);
 	zfs_acl_ids_t   acl_ids;
 	boolean_t	fuid_dirtied;
+	boolean_t	waited = B_FALSE;
 
 	ASSERT(vap->va_type == VDIR);
 
@@ -1894,10 +1904,11 @@
 	dmu_tx_hold_sa_create(tx, acl_ids.z_aclp->z_acl_bytes +
 	    ZFS_SA_BASE_ATTR_SIZE);
 
-	error = dmu_tx_assign(tx, TXG_NOWAIT);
+	error = dmu_tx_assign(tx, waited ? TXG_WAITED : TXG_NOWAIT);
 	if (error) {
 		zfs_dirent_unlock(dl);
 		if (error == ERESTART) {
+			waited = B_TRUE;
 			dmu_tx_wait(tx);
 			dmu_tx_abort(tx);
 			goto top;
@@ -1973,6 +1984,7 @@
 	dmu_tx_t	*tx;
 	int		error;
 	int		zflg = ZEXISTS;
+	boolean_t	waited = B_FALSE;
 
 	ZFS_ENTER(zfsvfs);
 	ZFS_VERIFY_ZP(dzp);
@@ -2028,13 +2040,14 @@
 	dmu_tx_hold_zap(tx, zfsvfs->z_unlinkedobj, FALSE, NULL);
 	zfs_sa_upgrade_txholds(tx, zp);
 	zfs_sa_upgrade_txholds(tx, dzp);
-	error = dmu_tx_assign(tx, TXG_NOWAIT);
+	error = dmu_tx_assign(tx, waited ? TXG_WAITED : TXG_NOWAIT);
 	if (error) {
 		rw_exit(&zp->z_parent_lock);
 		rw_exit(&zp->z_name_lock);
 		zfs_dirent_unlock(dl);
 		VN_RELE(vp);
 		if (error == ERESTART) {
+			waited = B_TRUE;
 			dmu_tx_wait(tx);
 			dmu_tx_abort(tx);
 			goto top;
@@ -3362,6 +3375,7 @@
 	int		cmp, serr, terr;
 	int		error = 0;
 	int		zflg = 0;
+	boolean_t	waited = B_FALSE;
 
 	ZFS_ENTER(zfsvfs);
 	ZFS_VERIFY_ZP(sdzp);
@@ -3599,7 +3613,7 @@
 
 	zfs_sa_upgrade_txholds(tx, szp);
 	dmu_tx_hold_zap(tx, zfsvfs->z_unlinkedobj, FALSE, NULL);
-	error = dmu_tx_assign(tx, TXG_NOWAIT);
+	error = dmu_tx_assign(tx, waited ? TXG_WAITED : TXG_NOWAIT);
 	if (error) {
 		if (zl != NULL)
 			zfs_rename_unlock(&zl);
@@ -3613,6 +3627,7 @@
 		if (tzp)
 			VN_RELE(ZTOV(tzp));
 		if (error == ERESTART) {
+			waited = B_TRUE;
 			dmu_tx_wait(tx);
 			dmu_tx_abort(tx);
 			goto top;
@@ -3718,6 +3733,7 @@
 	zfs_acl_ids_t	acl_ids;
 	boolean_t	fuid_dirtied;
 	uint64_t	txtype = TX_SYMLINK;
+	boolean_t	waited = B_FALSE;
 
 	ASSERT(vap->va_type == VLNK);
 
@@ -3780,10 +3796,11 @@
 	}
 	if (fuid_dirtied)
 		zfs_fuid_txhold(zfsvfs, tx);
-	error = dmu_tx_assign(tx, TXG_NOWAIT);
+	error = dmu_tx_assign(tx, waited ? TXG_WAITED : TXG_NOWAIT);
 	if (error) {
 		zfs_dirent_unlock(dl);
 		if (error == ERESTART) {
+			waited = B_TRUE;
 			dmu_tx_wait(tx);
 			dmu_tx_abort(tx);
 			goto top;
@@ -3910,6 +3927,7 @@
 	int		zf = ZNEW;
 	uint64_t	parent;
 	uid_t		owner;
+	boolean_t	waited = B_FALSE;
 
 	ASSERT(tdvp->v_type == VDIR);
 
@@ -3999,10 +4017,11 @@
 	dmu_tx_hold_zap(tx, dzp->z_id, TRUE, name);
 	zfs_sa_upgrade_txholds(tx, szp);
 	zfs_sa_upgrade_txholds(tx, dzp);
-	error = dmu_tx_assign(tx, TXG_NOWAIT);
+	error = dmu_tx_assign(tx, waited ? TXG_WAITED : TXG_NOWAIT);
 	if (error) {
 		zfs_dirent_unlock(dl);
 		if (error == ERESTART) {
+			waited = B_TRUE;
 			dmu_tx_wait(tx);
 			dmu_tx_abort(tx);
 			goto top;
--- a/usr/src/uts/common/fs/zfs/zil.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/zil.c	Mon Aug 26 13:13:26 2013 -0800
@@ -872,7 +872,7 @@
 	if (lwb->lwb_zio == NULL) {
 		lwb->lwb_zio = zio_rewrite(zilog->zl_root_zio, zilog->zl_spa,
 		    0, &lwb->lwb_blk, lwb->lwb_buf, BP_GET_LSIZE(&lwb->lwb_blk),
-		    zil_lwb_write_done, lwb, ZIO_PRIORITY_LOG_WRITE,
+		    zil_lwb_write_done, lwb, ZIO_PRIORITY_SYNC_WRITE,
 		    ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE, &zb);
 	}
 }
--- a/usr/src/uts/common/fs/zfs/zio.c	Mon Aug 26 13:52:20 2013 -0400
+++ b/usr/src/uts/common/fs/zfs/zio.c	Mon Aug 26 13:13:26 2013 -0800
@@ -39,30 +39,10 @@
 
 /*
  * ==========================================================================
- * I/O priority table
- * ==========================================================================
- */
-uint8_t zio_priority_table[ZIO_PRIORITY_TABLE_SIZE] = {
-	0,	/* ZIO_PRIORITY_NOW		*/
-	0,	/* ZIO_PRIORITY_SYNC_READ	*/
-	0,	/* ZIO_PRIORITY_SYNC_WRITE	*/
-	0,	/* ZIO_PRIORITY_LOG_WRITE	*/
-	1,	/* ZIO_PRIORITY_CACHE_FILL	*/
-	1,	/* ZIO_PRIORITY_AGG		*/
-	4,	/* ZIO_PRIORITY_FREE		*/
-	4,	/* ZIO_PRIORITY_ASYNC_WRITE	*/
-	6,	/* ZIO_PRIORITY_ASYNC_READ	*/
-	10,	/* ZIO_PRIORITY_RESILVER	*/
-	20,	/* ZIO_PRIORITY_SCRUB		*/
-	2,	/* ZIO_PRIORITY_DDT_PREFETCH	*/
-};
-
-/*
- * ==========================================================================
  * I/O type descriptions
  * ==========================================================================
  */
-char *zio_type_name[ZIO_TYPES] = {
+const char *zio_type_name[ZIO_TYPES] = {
 	"zio_null", "zio_read", "zio_write", "zio_free", "zio_claim",
 	"zio_ioctl"
 };
@@ -486,7 +466,10 @@
 		*errorp = zio_worst_error(*errorp, zio->io_error);
 	pio->io_reexecute |= zio->io_reexecute;
 	ASSERT3U(*countp, >, 0);
-	if (--*countp == 0 && pio->io_stall == countp) {
+
+	(*countp)--;
+
+	if (*countp == 0 && pio->io_stall == countp) {
 		pio->io_stall = NULL;
 		mutex_exit(&pio->io_lock);
 		zio_execute(pio);
@@ -510,7 +493,7 @@
 static zio_t *
 zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
     void *data, uint64_t size, zio_done_func_t *done, void *private,
-    zio_type_t type, int priority, enum zio_flag flags,
+    zio_type_t type, zio_priority_t priority, enum zio_flag flags,
     vdev_t *vd, uint64_t offset, const zbookmark_t *zb,
     enum zio_stage stage, enum zio_stage pipeline)
 {
@@ -620,7 +603,7 @@
 zio_t *
 zio_read(zio_t *pio, spa_t *spa, const blkptr_t *bp,
     void *data, uint64_t size, zio_done_func_t *done, void *private,
-    int priority, enum zio_flag flags, const zbookmark_t *zb)
+    zio_priority_t priority, enum zio_flag flags, const zbookmark_t *zb)
 {
 	zio_t *zio;
 
@@ -636,8 +619,9 @@
 zio_t *
 zio_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
     void *data, uint64_t size, const zio_prop_t *zp,
-    zio_done_func_t *ready, zio_done_func_t *done, void *private,
-    int priority, enum zio_flag flags, const zbookmark_t *zb)
+    zio_done_func_t *ready, zio_done_func_t *physdone, zio_done_func_t *done,
+    void *private,
+    zio_priority_t priority, enum zio_flag flags, const zbookmark_t *zb)
 {
 	zio_t *zio;
 
@@ -656,6 +640,7 @@
 	    ZIO_DDT_CHILD_WRITE_PIPELINE : ZIO_WRITE_PIPELINE);
 
 	zio->io_ready = ready;
+	zio->io_physdone = physdone;
 	zio->io_prop = *zp;
 
 	return (zio);
@@ -663,8 +648,8 @@
 
 zio_t *
 zio_rewrite(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp, void *data,
-    uint64_t size, zio_done_func_t *done, void *private, int priority,
-    enum zio_flag flags, zbookmark_t *zb)
+    uint64_t size, zio_done_func_t *done, void *private,
+    zio_priority_t priority, enum zio_flag flags, zbookmark_t *zb)
 {
 	zio_t *zio;
 
@@ -740,7 +725,7 @@
 		stage |= ZIO_STAGE_ISSUE_ASYNC;
 
 	zio = zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
-	    NULL, NULL, ZIO_TYPE_FREE, ZIO_PRIORITY_FREE, flags,
+	    NULL, NULL, ZIO_TYPE_FREE, ZIO_PRIORITY_NOW, flags,
 	    NULL, 0, NULL, ZIO_STAGE_OPEN, stage);
 
 
@@ -778,14 +763,14 @@
 
 zio_t *
 zio_ioctl(zio_t *pio, spa_t *spa, vdev_t *vd, int cmd,
-    zio_done_func_t *done, void *private, int priority, enum zio_flag flags)
+    zio_done_func_t *done, void *private, enum zio_flag flags)
 {
 	zio_t *zio;
 	int c;
 
 	if (vd->vdev_children == 0) {
 		zio = zio_create(pio, spa, 0, NULL, NULL, 0, done, private,
-		    ZIO_TYPE_IOCTL, priority, flags, vd, 0, NULL,
+		    ZIO_TYPE_IOCTL, ZIO_PRIORITY_NOW, flags, vd, 0, NULL,
 		    ZIO_STAGE_OPEN, ZIO_IOCTL_PIPELINE);
 
 		zio->io_cmd = cmd;
@@ -794,7 +779,7 @@
 
 		for (c = 0; c < vd->vdev_children; c++)
 			zio_nowait(zio_ioctl(zio, spa, vd->vdev_child[c], cmd,
-			    done, private, priority, flags));
+			    done, private, flags));
 	}
 
 	return (zio);
@@ -803,7 +788,7 @@
 zio_t *
 zio_read_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
     void *data, int checksum, zio_done_func_t *done, void *private,
-    int priority, enum zio_flag flags, boolean_t labels)
+    zio_priority_t priority, enum zio_flag flags, boolean_t labels)
 {
 	zio_t *zio;
 
@@ -824,7 +809,7 @@
 zio_t *
 zio_write_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
     void *data, int checksum, zio_done_func_t *done, void *private,
-    int priority, enum zio_flag flags, boolean_t labels)
+    zio_priority_t priority, enum zio_flag flags, boolean_t labels)
 {
 	zio_t *zio;
 
@@ -859,8 +844,8 @@
  */
 zio_t *
 zio_vdev_child_io(zio_t *pio, blkptr_t *bp, vdev_t *vd, uint64_t offset,
-	void *data, uint64_t size, int type, int priority, enum zio_flag flags,
-	zio_done_func_t *done, void *private)
+	void *data, uint64_t size, int type, zio_priority_t priority,
+	enum zio_flag flags, zio_done_func_t *done, void *private)
 {
 	enum zio_stage pipeline = ZIO_VDEV_CHILD_PIPELINE;
 	zio_t *zio;
@@ -895,12 +880,16 @@
 	    done, private, type, priority, flags, vd, offset, &pio->io_bookmark,
 	    ZIO_STAGE_VDEV_IO_START >> 1, pipeline);
 
+	zio->io_physdone = pio->io_physdone;
+	if (vd->vdev_ops->vdev_op_leaf && zio->io_logical != NULL)
+		zio->io_logical->io_phys_children++;
+
 	return (zio);
 }
 
 zio_t *
 zio_vdev_delegated_io(vdev_t *vd, uint64_t offset, void *data, uint64_t size,
-	int type, int priority, enum zio_flag flags,
+	int type, zio_priority_t priority, enum zio_flag flags,
 	zio_done_func_t *done, void *private)
 {
 	zio_t *zio;
@@ -909,7 +898,7 @@
 
 	zio = zio_create(NULL, vd->vdev_spa, 0, NULL,
 	    data, size, done, private, type, priority,
-	    flags | ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_RETRY,
+	    flags | ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_RETRY | ZIO_FLAG_DELEGATED,
 	    vd, offset, NULL,
 	    ZIO_STAGE_VDEV_IO_START >> 1, ZIO_VDEV_CHILD_PIPELINE);
 
@@ -920,7 +909,7 @@
 zio_flush(zio_t *zio, vdev_t *vd)
 {
 	zio_nowait(zio_ioctl(zio, zio->io_spa, vd, DKIOCFLUSHWRITECACHE,
-	    NULL, NULL, ZIO_PRIORITY_NOW,
+	    NULL, NULL,
 	    ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE | ZIO_FLAG_DONT_RETRY));
 }
 
@@ -1821,7 +1810,7 @@
 
 		zio_nowait(zio_write(zio, spa, txg, &gbh->zg_blkptr[g],
 		    (char *)pio->io_data + (pio->io_size - resid), lsize, &zp,
-		    zio_write_gang_member_ready, NULL, &gn->gn_child[g],
+		    zio_write_gang_member_ready, NULL, NULL, &gn->gn_child[g],
 		    pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio),
 		    &pio->io_bookmark));
 	}
@@ -2198,7 +2187,7 @@
 		}
 
 		dio = zio_write(zio, spa, txg, bp, zio->io_orig_data,
-		    zio->io_orig_size, &czp, NULL,
+		    zio->io_orig_size, &czp, NULL, NULL,
 		    zio_ddt_ditto_write_done, dde, zio->io_priority,
 		    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
 
@@ -2220,7 +2209,7 @@
 		ddt_phys_addref(ddp);
 	} else {
 		cio = zio_write(zio, spa, txg, bp, zio->io_orig_data,
-		    zio->io_orig_size, zp, zio_ddt_child_write_ready,
+		    zio->io_orig_size, zp, zio_ddt_child_write_ready, NULL,
 		    zio_ddt_child_write_done, dde, zio->io_priority,
 		    ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
 
@@ -2637,6 +2626,13 @@
 	if (zio->io_error)
 		zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
 
+	if (vd != NULL && vd->vdev_ops->vdev_op_leaf &&
+	    zio->io_physdone != NULL) {
+		ASSERT(!(zio->io_flags & ZIO_FLAG_DELEGATED));
+		ASSERT(zio->io_child_type == ZIO_CHILD_VDEV);
+		zio->io_physdone(zio->io_logical);
+	}
+
 	return (ZIO_PIPELINE_CONTINUE);
 }