changeset 6689:47572a2f5e73

6610506 Eliminate or improve retry logic from callers of dmu_objset_open() 6695465 divide by zero in txg_sync_thread() under heavy load
author maybee
date Thu, 22 May 2008 11:13:47 -0700
parents 0b51c3ad6121
children 19b5b95523e1
files usr/src/cmd/zdb/zdb.c usr/src/cmd/zinject/translate.c usr/src/cmd/ztest/ztest.c usr/src/lib/libzpool/common/kernel.c usr/src/uts/common/fs/zfs/dmu_objset.c usr/src/uts/common/fs/zfs/dmu_send.c usr/src/uts/common/fs/zfs/dsl_dataset.c usr/src/uts/common/fs/zfs/dsl_dir.c usr/src/uts/common/fs/zfs/dsl_prop.c usr/src/uts/common/fs/zfs/spa.c usr/src/uts/common/fs/zfs/sys/dmu.h usr/src/uts/common/fs/zfs/sys/dsl_dataset.h usr/src/uts/common/fs/zfs/sys/dsl_prop.h usr/src/uts/common/fs/zfs/txg.c usr/src/uts/common/fs/zfs/zfs_ctldir.c usr/src/uts/common/fs/zfs/zfs_ioctl.c usr/src/uts/common/fs/zfs/zfs_vfsops.c usr/src/uts/common/fs/zfs/zil.c usr/src/uts/common/fs/zfs/zvol.c
diffstat 19 files changed, 881 insertions(+), 829 deletions(-) [+]
line wrap: on
line diff
--- a/usr/src/cmd/zdb/zdb.c	Thu May 22 11:05:03 2008 -0700
+++ b/usr/src/cmd/zdb/zdb.c	Thu May 22 11:13:47 2008 -0700
@@ -1336,7 +1336,7 @@
 	objset_t *os;
 
 	error = dmu_objset_open(dsname, DMU_OST_ANY,
-	    DS_MODE_STANDARD | DS_MODE_READONLY, &os);
+	    DS_MODE_USER | DS_MODE_READONLY, &os);
 	if (error) {
 		(void) printf("Could not open %s\n", dsname);
 		return (0);
@@ -2568,7 +2568,7 @@
 	if (error == 0) {
 		if (strchr(argv[0], '/') != NULL) {
 			error = dmu_objset_open(argv[0], DMU_OST_ANY,
-			    DS_MODE_STANDARD | DS_MODE_READONLY, &os);
+			    DS_MODE_USER | DS_MODE_READONLY, &os);
 		} else {
 			error = spa_open(argv[0], &spa, FTAG);
 		}
--- a/usr/src/cmd/zinject/translate.c	Thu May 22 11:05:03 2008 -0700
+++ b/usr/src/cmd/zinject/translate.c	Thu May 22 11:13:47 2008 -0700
@@ -165,7 +165,7 @@
 	sync();
 
 	if ((err = dmu_objset_open(dataset, DMU_OST_ZFS,
-	    DS_MODE_STANDARD | DS_MODE_READONLY, &os)) != 0) {
+	    DS_MODE_USER | DS_MODE_READONLY, &os)) != 0) {
 		(void) fprintf(stderr, "cannot open dataset '%s': %s\n",
 		    dataset, strerror(err));
 		return (-1);
@@ -250,7 +250,7 @@
 	 * size.
 	 */
 	if ((err = dmu_objset_open(dataset, DMU_OST_ANY,
-	    DS_MODE_STANDARD | DS_MODE_READONLY, &os)) != 0) {
+	    DS_MODE_USER | DS_MODE_READONLY, &os)) != 0) {
 		(void) fprintf(stderr, "cannot open dataset '%s': %s\n",
 		    dataset, strerror(err));
 		goto out;
--- a/usr/src/cmd/ztest/ztest.c	Thu May 22 11:05:03 2008 -0700
+++ b/usr/src/cmd/ztest/ztest.c	Thu May 22 11:13:47 2008 -0700
@@ -1135,7 +1135,7 @@
 	 * Verify that the dataset contains a directory object.
 	 */
 	error = dmu_objset_open(name, DMU_OST_OTHER,
-	    DS_MODE_STANDARD | DS_MODE_READONLY, &os);
+	    DS_MODE_USER | DS_MODE_READONLY, &os);
 	ASSERT3U(error, ==, 0);
 	error = dmu_object_info(os, ZTEST_DIROBJ, doi);
 	if (error != ENOENT) {
@@ -1150,7 +1150,11 @@
 	 * Destroy the dataset.
 	 */
 	error = dmu_objset_destroy(name);
-	ASSERT3U(error, ==, 0);
+	if (error) {
+		(void) dmu_objset_open(name, DMU_OST_OTHER,
+		    DS_MODE_USER | DS_MODE_READONLY, &os);
+		fatal(0, "dmu_objset_destroy(os=%p) = %d\n", &os, error);
+	}
 	return (0);
 }
 
@@ -1190,9 +1194,9 @@
 ztest_dmu_objset_create_destroy(ztest_args_t *za)
 {
 	int error;
-	objset_t *os;
+	objset_t *os, *os2;
 	char name[100];
-	int mode, basemode, expected_error;
+	int basemode, expected_error;
 	zilog_t *zilog;
 	uint64_t seq;
 	uint64_t objects;
@@ -1202,9 +1206,9 @@
 	(void) snprintf(name, 100, "%s/%s_temp_%llu", za->za_pool, za->za_pool,
 	    (u_longlong_t)za->za_instance);
 
-	basemode = DS_MODE_LEVEL(za->za_instance);
-	if (basemode == DS_MODE_NONE)
-		basemode++;
+	basemode = DS_MODE_TYPE(za->za_instance);
+	if (basemode != DS_MODE_USER && basemode != DS_MODE_OWNER)
+		basemode = DS_MODE_USER;
 
 	/*
 	 * If this dataset exists from a previous run, process its replay log
@@ -1212,7 +1216,7 @@
 	 * (invoked from ztest_destroy_cb() below) should just throw it away.
 	 */
 	if (ztest_random(2) == 0 &&
-	    dmu_objset_open(name, DMU_OST_OTHER, DS_MODE_PRIMARY, &os) == 0) {
+	    dmu_objset_open(name, DMU_OST_OTHER, DS_MODE_OWNER, &os) == 0) {
 		zr.zr_os = os;
 		zil_replay(os, &zr, &zr.zr_assign, ztest_replay_vector);
 		dmu_objset_close(os);
@@ -1298,21 +1302,24 @@
 		fatal(0, "created existing dataset, error = %d", error);
 
 	/*
-	 * Verify that multiple dataset opens are allowed, but only when
+	 * Verify that multiple dataset holds are allowed, but only when
 	 * the new access mode is compatible with the base mode.
-	 * We use a mixture of typed and typeless opens, and when the
-	 * open succeeds, verify that the discovered type is correct.
 	 */
-	for (mode = DS_MODE_STANDARD; mode < DS_MODE_LEVELS; mode++) {
-		objset_t *os2;
-		error = dmu_objset_open(name, DMU_OST_OTHER, mode, &os2);
-		expected_error = (basemode + mode < DS_MODE_LEVELS) ? 0 : EBUSY;
-		if (error != expected_error)
-			fatal(0, "dmu_objset_open('%s') = %d, expected %d",
-			    name, error, expected_error);
-		if (error == 0)
+	if (basemode == DS_MODE_OWNER) {
+		error = dmu_objset_open(name, DMU_OST_OTHER, DS_MODE_USER,
+		    &os2);
+		if (error)
+			fatal(0, "dmu_objset_open('%s') = %d", name, error);
+		else
 			dmu_objset_close(os2);
 	}
+	error = dmu_objset_open(name, DMU_OST_OTHER, DS_MODE_OWNER, &os2);
+	expected_error = (basemode == DS_MODE_OWNER) ? EBUSY : 0;
+	if (error != expected_error)
+		fatal(0, "dmu_objset_open('%s') = %d, expected %d",
+		    name, error, expected_error);
+	if (error == 0)
+		dmu_objset_close(os2);
 
 	zil_close(zilog);
 	dmu_objset_close(os);
@@ -3232,7 +3239,7 @@
 				    name, error);
 			}
 			error = dmu_objset_open(name, DMU_OST_OTHER,
-			    DS_MODE_STANDARD, &za[d].za_os);
+			    DS_MODE_USER, &za[d].za_os);
 			if (error)
 				fatal(0, "dmu_objset_open('%s') = %d",
 				    name, error);
--- a/usr/src/lib/libzpool/common/kernel.c	Thu May 22 11:05:03 2008 -0700
+++ b/usr/src/lib/libzpool/common/kernel.c	Thu May 22 11:13:47 2008 -0700
@@ -186,9 +186,9 @@
 	ASSERT(rwlp->rw_owner != curthread);
 
 	if (rw == RW_READER)
-		(void) rw_rdlock(&rwlp->rw_lock);
+		VERIFY(rw_rdlock(&rwlp->rw_lock) == 0);
 	else
-		(void) rw_wrlock(&rwlp->rw_lock);
+		VERIFY(rw_wrlock(&rwlp->rw_lock) == 0);
 
 	rwlp->rw_owner = curthread;
 }
@@ -200,7 +200,7 @@
 	ASSERT(rwlp->rw_owner != (void *)-1UL);
 
 	rwlp->rw_owner = NULL;
-	(void) rw_unlock(&rwlp->rw_lock);
+	VERIFY(rw_unlock(&rwlp->rw_lock) == 0);
 }
 
 int
--- a/usr/src/uts/common/fs/zfs/dmu_objset.c	Thu May 22 11:05:03 2008 -0700
+++ b/usr/src/uts/common/fs/zfs/dmu_objset.c	Thu May 22 11:13:47 2008 -0700
@@ -265,20 +265,23 @@
 dmu_objset_open_ds_os(dsl_dataset_t *ds, objset_t *os, dmu_objset_type_t type)
 {
 	objset_impl_t *osi;
-	int err;
 
 	mutex_enter(&ds->ds_opening_lock);
 	osi = dsl_dataset_get_user_ptr(ds);
 	if (osi == NULL) {
+		int err;
+
 		err = dmu_objset_open_impl(dsl_dataset_get_spa(ds),
 		    ds, &ds->ds_phys->ds_bp, &osi);
-		if (err)
+		if (err) {
+			mutex_exit(&ds->ds_opening_lock);
 			return (err);
+		}
 	}
 	mutex_exit(&ds->ds_opening_lock);
 
 	os->os = osi;
-	os->os_mode = DS_MODE_NONE;
+	os->os_mode = DS_MODE_NOHOLD;
 
 	if (type != DMU_OST_ANY && type != os->os->os_phys->os_type)
 		return (EINVAL);
@@ -309,21 +312,28 @@
 	dsl_dataset_t *ds;
 	int err;
 
-	ASSERT(mode != DS_MODE_NONE);
+	ASSERT(DS_MODE_TYPE(mode) == DS_MODE_USER ||
+	    DS_MODE_TYPE(mode) == DS_MODE_OWNER);
 
 	os = kmem_alloc(sizeof (objset_t), KM_SLEEP);
-	err = dsl_dataset_open(name, mode, os, &ds);
+	if (DS_MODE_TYPE(mode) == DS_MODE_USER)
+		err = dsl_dataset_hold(name, os, &ds);
+	else
+		err = dsl_dataset_own(name, mode, os, &ds);
 	if (err) {
 		kmem_free(os, sizeof (objset_t));
 		return (err);
 	}
 
 	err = dmu_objset_open_ds_os(ds, os, type);
-	os->os_mode = mode;
 	if (err) {
+		if (DS_MODE_TYPE(mode) == DS_MODE_USER)
+			dsl_dataset_rele(ds, os);
+		else
+			dsl_dataset_disown(ds, os);
 		kmem_free(os, sizeof (objset_t));
-		dsl_dataset_close(ds, mode, os);
 	} else {
+		os->os_mode = mode;
 		*osp = os;
 	}
 	return (err);
@@ -332,8 +342,14 @@
 void
 dmu_objset_close(objset_t *os)
 {
-	if (os->os_mode != DS_MODE_NONE)
-		dsl_dataset_close(os->os->os_dsl_dataset, os->os_mode, os);
+	ASSERT(DS_MODE_TYPE(os->os_mode) == DS_MODE_USER ||
+	    DS_MODE_TYPE(os->os_mode) == DS_MODE_OWNER ||
+	    DS_MODE_TYPE(os->os_mode) == DS_MODE_NOHOLD);
+
+	if (DS_MODE_TYPE(os->os_mode) == DS_MODE_USER)
+		dsl_dataset_rele(os->os->os_dsl_dataset, os);
+	else if (DS_MODE_TYPE(os->os_mode) == DS_MODE_OWNER)
+		dsl_dataset_disown(os->os->os_dsl_dataset, os);
 	kmem_free(os, sizeof (objset_t));
 }
 
@@ -389,7 +405,7 @@
 		ASSERT(list_head(&osi->os_free_dnodes[i]) == NULL);
 	}
 
-	if (ds && ds->ds_phys->ds_num_children == 0) {
+	if (ds && ds->ds_phys && ds->ds_phys->ds_num_children == 0) {
 		VERIFY(0 == dsl_prop_unregister(ds, "checksum",
 		    checksum_changed_cb, osi));
 		VERIFY(0 == dsl_prop_unregister(ds, "compression",
@@ -530,8 +546,7 @@
 	dsobj = dsl_dataset_create_sync(dd, oa->lastname,
 	    oa->clone_parent, oa->flags, cr, tx);
 
-	VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool, dsobj, NULL,
-	    DS_MODE_STANDARD | DS_MODE_READONLY, FTAG, &ds));
+	VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool, dsobj, FTAG, &ds));
 	bp = dsl_dataset_get_blkptr(ds);
 	if (BP_IS_HOLE(bp)) {
 		objset_impl_t *osi;
@@ -547,7 +562,7 @@
 	spa_history_internal_log(LOG_DS_CREATE, dd->dd_pool->dp_spa,
 	    tx, cr, "dataset = %llu", dsobj);
 
-	dsl_dataset_close(ds, DS_MODE_STANDARD | DS_MODE_READONLY, FTAG);
+	dsl_dataset_rele(ds, FTAG);
 }
 
 int
@@ -606,17 +621,16 @@
 	 * but the replay log objset is modified in open context.
 	 */
 	error = dmu_objset_open(name, DMU_OST_ANY,
-	    DS_MODE_EXCLUSIVE|DS_MODE_READONLY, &os);
+	    DS_MODE_OWNER|DS_MODE_READONLY|DS_MODE_INCONSISTENT, &os);
 	if (error == 0) {
 		dsl_dataset_t *ds = os->os->os_dsl_dataset;
 		zil_destroy(dmu_objset_zil(os), B_FALSE);
 
+		error = dsl_dataset_destroy(ds, os);
 		/*
 		 * dsl_dataset_destroy() closes the ds.
-		 * os is just used as the tag after it's freed.
 		 */
 		kmem_free(os, sizeof (objset_t));
-		error = dsl_dataset_destroy(ds, os);
 	}
 
 	return (error);
@@ -633,7 +647,7 @@
 
 	ds = os->os->os_dsl_dataset;
 
-	if (!dsl_dataset_tryupgrade(ds, DS_MODE_STANDARD, DS_MODE_EXCLUSIVE)) {
+	if (!dsl_dataset_tryown(ds, TRUE, os)) {
 		dmu_objset_close(os);
 		return (EBUSY);
 	}
@@ -645,7 +659,7 @@
 	 * actually implicitly called dmu_objset_evict(), thus freeing
 	 * the objset_impl_t.
 	 */
-	dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, os);
+	dsl_dataset_disown(ds, os);
 	kmem_free(os, sizeof (objset_t));
 	return (err);
 }
@@ -668,7 +682,6 @@
 {
 	struct snaparg *sn = arg;
 	objset_t *os;
-	dmu_objset_stats_t stat;
 	int err;
 
 	(void) strcpy(sn->failed, name);
@@ -682,15 +695,12 @@
 	    (err = zfs_secpolicy_snapshot_perms(name, CRED())))
 		return (err);
 
-	err = dmu_objset_open(name, DMU_OST_ANY, DS_MODE_STANDARD, &os);
+	err = dmu_objset_open(name, DMU_OST_ANY, DS_MODE_USER, &os);
 	if (err != 0)
 		return (err);
 
-	/*
-	 * If the objset is in an inconsistent state, return busy.
-	 */
-	dmu_objset_fast_stat(os, &stat);
-	if (stat.dds_inconsistent) {
+	/* If the objset is in an inconsistent state, return busy */
+	if (os->os->os_dsl_dataset->ds_phys->ds_flags & DS_FLAG_INCONSISTENT) {
 		dmu_objset_close(os);
 		return (EBUSY);
 	}
@@ -1096,7 +1106,7 @@
 	 */
 	if ((flags & DS_FIND_SNAPSHOTS) &&
 	    dmu_objset_open(name, DMU_OST_ANY,
-	    DS_MODE_STANDARD | DS_MODE_READONLY, &os) == 0) {
+	    DS_MODE_USER | DS_MODE_READONLY, &os) == 0) {
 
 		snapobj = os->os->os_dsl_dataset->ds_phys->ds_snapnames_zapobj;
 		dmu_objset_close(os);
--- a/usr/src/uts/common/fs/zfs/dmu_send.c	Thu May 22 11:05:03 2008 -0700
+++ b/usr/src/uts/common/fs/zfs/dmu_send.c	Thu May 22 11:13:47 2008 -0700
@@ -247,9 +247,8 @@
 		if (ds->ds_dir->dd_phys->dd_origin_obj != NULL) {
 			dsl_pool_t *dp = ds->ds_dir->dd_pool;
 			rw_enter(&dp->dp_config_rwlock, RW_READER);
-			err = dsl_dataset_open_obj(dp,
-			    ds->ds_dir->dd_phys->dd_origin_obj, NULL,
-			    DS_MODE_NONE, FTAG, &fromds);
+			err = dsl_dataset_hold_obj(dp,
+			    ds->ds_dir->dd_phys->dd_origin_obj, FTAG, &fromds);
 			rw_exit(&dp->dp_config_rwlock);
 			if (err)
 				return (err);
@@ -279,7 +278,7 @@
 	if (fromds)
 		fromtxg = fromds->ds_phys->ds_creation_txg;
 	if (fromorigin)
-		dsl_dataset_close(fromds, DS_MODE_NONE, FTAG);
+		dsl_dataset_rele(fromds, FTAG);
 
 	ba.drr = drr;
 	ba.vp = vp;
@@ -336,8 +335,10 @@
 {
 	dsl_dataset_t *ds;
 
-	VERIFY(0 == dsl_dataset_open_obj(dp, dsobj, NULL,
-	    DS_MODE_EXCLUSIVE, dmu_recv_tag, &ds));
+	/* This should always work, since we just created it */
+	/* XXX - create should return an owned ds */
+	VERIFY(0 == dsl_dataset_own_obj(dp, dsobj,
+	    DS_MODE_INCONSISTENT, dmu_recv_tag, &ds));
 
 	if (type != DMU_OST_NONE) {
 		(void) dmu_objset_create_impl(dp->dp_spa,
@@ -345,8 +346,7 @@
 	}
 
 	spa_history_internal_log(LOG_DS_REPLAY_FULL_SYNC,
-	    ds->ds_dir->dd_pool->dp_spa, tx, cr, "dataset = %lld",
-	    ds->ds_phys->ds_dir_obj);
+	    dp->dp_spa, tx, cr, "dataset = %lld", dsobj);
 
 	return (ds);
 }
@@ -385,10 +385,8 @@
 {
 	dsl_dir_t *dd = arg1;
 	struct recvbeginsyncarg *rbsa = arg2;
+	uint64_t flags = DS_FLAG_INCONSISTENT | rbsa->dsflags;
 	uint64_t dsobj;
-	uint64_t flags = DS_FLAG_INCONSISTENT;
-
-	flags |= rbsa->dsflags;
 
 	dsobj = dsl_dataset_create_sync(dd, strrchr(rbsa->tofs, '/') + 1,
 	    rbsa->origin, flags, cr, tx);
@@ -435,10 +433,8 @@
 	dsl_dataset_t *ds = arg1;
 	struct recvbeginsyncarg *rbsa = arg2;
 	dsl_dir_t *dd = ds->ds_dir;
+	uint64_t flags = DS_FLAG_INCONSISTENT | rbsa->dsflags;
 	uint64_t dsobj;
-	uint64_t flags = DS_FLAG_INCONSISTENT;
-
-	flags |= rbsa->dsflags;
 
 	/*
 	 * NB: caller must provide an extra hold on the dsl_dir_t, so it
@@ -501,21 +497,19 @@
 	struct recvbeginsyncarg *rbsa = arg2;
 	dsl_pool_t *dp = ohds->ds_dir->dd_pool;
 	dsl_dataset_t *ods, *cds;
+	uint64_t flags = DS_FLAG_INCONSISTENT | rbsa->dsflags;
 	uint64_t dsobj;
-	uint64_t flags = DS_FLAG_INCONSISTENT;
-
-	flags |= rbsa->dsflags;
 
 	/* create the temporary clone */
-	VERIFY(0 == dsl_dataset_open_obj(dp, ohds->ds_phys->ds_prev_snap_obj,
-	    NULL, DS_MODE_STANDARD, FTAG, &ods));
+	VERIFY(0 == dsl_dataset_hold_obj(dp, ohds->ds_phys->ds_prev_snap_obj,
+	    FTAG, &ods));
 	dsobj = dsl_dataset_create_sync(ohds->ds_dir,
 	    rbsa->clonelastname, ods, flags, cr, tx);
-	dsl_dataset_close(ods, DS_MODE_STANDARD, FTAG);
+	dsl_dataset_rele(ods, FTAG);
 
 	/* open the temporary clone */
-	VERIFY(0 == dsl_dataset_open_obj(dp, dsobj, NULL,
-	    DS_MODE_EXCLUSIVE, dmu_recv_tag, &cds));
+	VERIFY(0 == dsl_dataset_own_obj(dp, dsobj,
+	    DS_MODE_INCONSISTENT, dmu_recv_tag, &cds));
 
 	/* copy the refquota from the target fs to the clone */
 	if (ohds->ds_quota > 0)
@@ -524,8 +518,7 @@
 	rbsa->ds = cds;
 
 	spa_history_internal_log(LOG_DS_REPLAY_INC_SYNC,
-	    dp->dp_spa, tx, cr, "dataset = %lld",
-	    cds->ds_phys->ds_dir_obj);
+	    dp->dp_spa, tx, cr, "dataset = %lld", dsobj);
 }
 
 /* ARGSUSED */
@@ -539,7 +532,7 @@
 
 	spa_history_internal_log(LOG_DS_REPLAY_INC_SYNC,
 	    ds->ds_dir->dd_pool->dp_spa, tx, cr, "dataset = %lld",
-	    ds->ds_phys->ds_dir_obj);
+	    ds->ds_object);
 }
 
 /*
@@ -599,8 +592,7 @@
 	 */
 	if (rbsa.fromguid && !(flags & DRR_FLAG_CLONE) && !online) {
 		/* offline incremental receive */
-		err = dsl_dataset_open(tofs,
-		    DS_MODE_EXCLUSIVE, dmu_recv_tag, &ds);
+		err = dsl_dataset_own(tofs, 0, dmu_recv_tag, &ds);
 		if (err)
 			return (err);
 
@@ -612,8 +604,7 @@
 			if (ds->ds_prev == NULL ||
 			    ds->ds_prev->ds_phys->ds_guid !=
 			    rbsa.fromguid) {
-				dsl_dataset_close(ds, DS_MODE_EXCLUSIVE,
-				    dmu_recv_tag);
+				dsl_dataset_disown(ds, dmu_recv_tag);
 				return (ENODEV);
 			}
 			(void) dsl_dataset_rollback(ds, DMU_OST_NONE);
@@ -621,10 +612,9 @@
 		rbsa.force = B_FALSE;
 		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
 		    recv_incremental_check,
-		    recv_offline_incremental_sync,
-		    ds, &rbsa, 1);
+		    recv_offline_incremental_sync, ds, &rbsa, 1);
 		if (err) {
-			dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, dmu_recv_tag);
+			dsl_dataset_disown(ds, dmu_recv_tag);
 			return (err);
 		}
 		drc->drc_logical_ds = drc->drc_real_ds = ds;
@@ -636,8 +626,7 @@
 		    "%%%s", tosnap);
 
 		/* open the dataset we are logically receiving into */
-		err = dsl_dataset_open(tofs,
-		    DS_MODE_STANDARD, dmu_recv_tag, &ds);
+		err = dsl_dataset_hold(tofs, dmu_recv_tag, &ds);
 		if (err)
 			return (err);
 
@@ -646,7 +635,7 @@
 		    recv_incremental_check,
 		    recv_online_incremental_sync, ds, &rbsa, 5);
 		if (err) {
-			dsl_dataset_close(ds, DS_MODE_STANDARD, dmu_recv_tag);
+			dsl_dataset_rele(ds, dmu_recv_tag);
 			return (err);
 		}
 		drc->drc_logical_ds = ds;
@@ -666,27 +655,23 @@
 			}
 
 			rw_enter(&dd->dd_pool->dp_config_rwlock, RW_READER);
-			err = dsl_dataset_open_obj(dd->dd_pool,
-			    dd->dd_phys->dd_head_dataset_obj, NULL,
-			    DS_MODE_EXCLUSIVE | DS_MODE_INCONSISTENT,
-			    FTAG, &ds);
+			err = dsl_dataset_own_obj(dd->dd_pool,
+			    dd->dd_phys->dd_head_dataset_obj,
+			    DS_MODE_INCONSISTENT, FTAG, &ds);
 			rw_exit(&dd->dd_pool->dp_config_rwlock);
 			if (err) {
 				dsl_dir_close(dd, FTAG);
 				return (err);
 			}
 
+			dsl_dataset_make_exclusive(ds, FTAG);
 			err = dsl_sync_task_do(dd->dd_pool,
 			    recv_full_existing_check,
 			    recv_full_existing_sync, ds, &rbsa, 5);
-			/* if successful, sync task closes the ds for us */
-			if (err)
-				dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
+			dsl_dataset_disown(ds, FTAG);
 		} else {
 			err = dsl_sync_task_do(dd->dd_pool, recv_full_check,
 			    recv_full_sync, dd, &rbsa, 5);
-			if (err)
-				return (err);
 		}
 		dsl_dir_close(dd, FTAG);
 		if (err)
@@ -695,10 +680,6 @@
 		drc->drc_newfs = B_TRUE;
 	}
 
-	/* downgrade our hold on the ds from EXCLUSIVE to PRIMARY */
-	dsl_dataset_downgrade(drc->drc_real_ds,
-	    DS_MODE_EXCLUSIVE, DS_MODE_PRIMARY);
-
 	return (0);
 }
 
@@ -992,22 +973,14 @@
 		 * may be a clone) that we created
 		 */
 		(void) dsl_dataset_destroy(drc->drc_real_ds, dmu_recv_tag);
-		if (drc->drc_real_ds != drc->drc_logical_ds) {
-			dsl_dataset_close(drc->drc_logical_ds,
-			    DS_MODE_STANDARD, dmu_recv_tag);
-		}
+		if (drc->drc_real_ds != drc->drc_logical_ds)
+			dsl_dataset_rele(drc->drc_logical_ds, dmu_recv_tag);
 	} else {
 		/*
 		 * offline incremental: rollback to most recent snapshot.
 		 */
-		int lmode = DS_MODE_PRIMARY;
-		if (dsl_dataset_tryupgrade(drc->drc_real_ds,
-		    DS_MODE_PRIMARY, DS_MODE_EXCLUSIVE)) {
-			lmode = DS_MODE_EXCLUSIVE;
-			(void) dsl_dataset_rollback(drc->drc_real_ds,
-			    DMU_OST_NONE);
-		}
-		dsl_dataset_close(drc->drc_real_ds, lmode, FTAG);
+		(void) dsl_dataset_rollback(drc->drc_real_ds, DMU_OST_NONE);
+		dsl_dataset_disown(drc->drc_real_ds, dmu_recv_tag);
 	}
 }
 
@@ -1186,64 +1159,51 @@
 int
 dmu_recv_end(dmu_recv_cookie_t *drc)
 {
-	int err = 0;
-	int lmode;
+	struct recvendsyncarg resa;
+	dsl_dataset_t *ds = drc->drc_logical_ds;
+	int err;
 
 	/*
 	 * XXX hack; seems the ds is still dirty and
-	 * dsl_pool_zil_clean() expects it to have a ds_user_ptr (and
-	 * zil), but clone_swap() can close it.
+	 * dsl_pool_zil_clean() expects it to have a ds_user_ptr
+	 * (and zil), but clone_swap() can close it.
 	 */
-	txg_wait_synced(drc->drc_real_ds->ds_dir->dd_pool, 0);
+	txg_wait_synced(ds->ds_dir->dd_pool, 0);
 
-	if (dsl_dataset_tryupgrade(drc->drc_real_ds,
-	    DS_MODE_PRIMARY, DS_MODE_EXCLUSIVE)) {
-		lmode = DS_MODE_EXCLUSIVE;
-	} else {
-		dmu_recv_abort_cleanup(drc);
-		return (EBUSY);
+	if (ds != drc->drc_real_ds) {
+		/* we are doing an online recv */
+		if (dsl_dataset_tryown(ds, FALSE, dmu_recv_tag)) {
+			err = dsl_dataset_clone_swap(drc->drc_real_ds, ds,
+			    drc->drc_force);
+			if (err)
+				dsl_dataset_disown(ds, dmu_recv_tag);
+		} else {
+			err = EBUSY;
+			dsl_dataset_rele(ds, dmu_recv_tag);
+		}
+		/* dsl_dataset_destroy() will disown the ds */
+		(void) dsl_dataset_destroy(drc->drc_real_ds, dmu_recv_tag);
+		if (err)
+			return (err);
 	}
 
-	if (drc->drc_logical_ds != drc->drc_real_ds) {
-		if (err == 0 && dsl_dataset_tryupgrade(drc->drc_logical_ds,
-		    DS_MODE_STANDARD, DS_MODE_EXCLUSIVE)) {
-			lmode = DS_MODE_EXCLUSIVE;
-			err = dsl_dataset_clone_swap(drc->drc_real_ds,
-			    drc->drc_logical_ds, drc->drc_force);
+	resa.creation_time = drc->drc_drrb->drr_creation_time;
+	resa.toguid = drc->drc_drrb->drr_toguid;
+	resa.tosnap = drc->drc_tosnap;
+
+	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
+	    recv_end_check, recv_end_sync, ds, &resa, 3);
+	if (err) {
+		if (drc->drc_newfs) {
+			ASSERT(ds == drc->drc_real_ds);
+			(void) dsl_dataset_destroy(ds, dmu_recv_tag);
+			return (err);
 		} else {
-			lmode = DS_MODE_STANDARD;
-			err = EBUSY;
+			(void) dsl_dataset_rollback(ds, DMU_OST_NONE);
 		}
 	}
 
-	if (err == 0) {
-		struct recvendsyncarg resa;
-
-		resa.creation_time = drc->drc_drrb->drr_creation_time;
-		resa.toguid = drc->drc_drrb->drr_toguid;
-		resa.tosnap = drc->drc_tosnap;
-
-		err = dsl_sync_task_do(drc->drc_real_ds->ds_dir->dd_pool,
-		    recv_end_check, recv_end_sync,
-		    drc->drc_logical_ds, &resa, 3);
-		if (err) {
-			if (drc->drc_newfs) {
-				ASSERT(drc->drc_logical_ds == drc->drc_real_ds);
-				(void) dsl_dataset_destroy(drc->drc_real_ds,
-				    dmu_recv_tag);
-				return (err);
-			} else {
-				(void) dsl_dataset_rollback(drc->drc_logical_ds,
-				    DMU_OST_NONE);
-			}
-		}
-	}
-
-	if (drc->drc_logical_ds != drc->drc_real_ds) {
-		/* dsl_dataset_destroy() will close the ds */
-		(void) dsl_dataset_destroy(drc->drc_real_ds, dmu_recv_tag);
-	}
-	/* close the hold from dmu_recv_begin */
-	dsl_dataset_close(drc->drc_logical_ds, lmode, dmu_recv_tag);
+	/* release the hold from dmu_recv_begin */
+	dsl_dataset_disown(ds, dmu_recv_tag);
 	return (err);
 }
--- a/usr/src/uts/common/fs/zfs/dsl_dataset.c	Thu May 22 11:05:03 2008 -0700
+++ b/usr/src/uts/common/fs/zfs/dsl_dataset.c	Thu May 22 11:13:47 2008 -0700
@@ -41,6 +41,8 @@
 #include <sys/spa.h>
 #include <sys/sunddi.h>
 
+static char *dsl_reaper = "the grim reaper";
+
 static dsl_checkfunc_t dsl_dataset_destroy_begin_check;
 static dsl_syncfunc_t dsl_dataset_destroy_begin_sync;
 static dsl_checkfunc_t dsl_dataset_rollback_check;
@@ -51,22 +53,9 @@
 
 #define	DSL_DEADLIST_BLOCKSIZE	SPA_MAXBLOCKSIZE
 
-/*
- * We use weighted reference counts to express the various forms of exclusion
- * between different open modes.  A STANDARD open is 1 point, an EXCLUSIVE open
- * is DS_REF_MAX, and a PRIMARY open is little more than half of an EXCLUSIVE.
- * This makes the exclusion logic simple: the total refcnt for all opens cannot
- * exceed DS_REF_MAX.  For example, EXCLUSIVE opens are exclusive because their
- * weight (DS_REF_MAX) consumes the entire refcnt space.  PRIMARY opens consume
- * just over half of the refcnt space, so there can't be more than one, but it
- * can peacefully coexist with any number of STANDARD opens.
- */
-static uint64_t ds_refcnt_weight[DS_MODE_LEVELS] = {
-	0,			/* DS_MODE_NONE - invalid		*/
-	1,			/* DS_MODE_STANDARD - unlimited number	*/
-	(DS_REF_MAX >> 1) + 1,	/* DS_MODE_PRIMARY - only one of these	*/
-	DS_REF_MAX		/* DS_MODE_EXCLUSIVE - no other opens	*/
-};
+#define	DSL_DATASET_IS_DESTROYED(ds)	((ds)->ds_owner == dsl_reaper)
+
+static void dsl_dataset_drop_ref(dsl_dataset_t *ds, void *tag);
 
 /*
  * Figure out how much of this delta should be propogated to the dsl_dir
@@ -237,9 +226,7 @@
 {
 	dsl_dataset_t *ds = dsv;
 
-	/* open_refcount == DS_REF_MAX when deleting */
-	ASSERT(ds->ds_open_refcount == 0 ||
-	    ds->ds_open_refcount == DS_REF_MAX);
+	ASSERT(ds->ds_owner == NULL || DSL_DATASET_IS_DESTROYED(ds));
 
 	dprintf_ds(ds, "evicting %s\n", "");
 
@@ -249,18 +236,21 @@
 		ds->ds_user_evict_func(ds, ds->ds_user_ptr);
 
 	if (ds->ds_prev) {
-		dsl_dataset_close(ds->ds_prev, DS_MODE_NONE, ds);
+		dsl_dataset_drop_ref(ds->ds_prev, ds);
 		ds->ds_prev = NULL;
 	}
 
 	bplist_close(&ds->ds_deadlist);
-	dsl_dir_close(ds->ds_dir, ds);
+	if (ds->ds_dir)
+		dsl_dir_close(ds->ds_dir, ds);
 
 	ASSERT(!list_link_active(&ds->ds_synced_link));
 
 	mutex_destroy(&ds->ds_lock);
 	mutex_destroy(&ds->ds_opening_lock);
 	mutex_destroy(&ds->ds_deadlist.bpl_lock);
+	rw_destroy(&ds->ds_rwlock);
+	cv_destroy(&ds->ds_exclusive_cv);
 
 	kmem_free(ds, sizeof (dsl_dataset_t));
 }
@@ -291,47 +281,48 @@
 }
 
 static int
-dsl_dataset_snap_lookup(objset_t *os, uint64_t flags,
-    uint64_t snapnames_zapobj, const char *name, uint64_t *value)
+dsl_dataset_snap_lookup(dsl_dataset_t *ds, const char *name, uint64_t *value)
 {
+	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
+	uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
 	matchtype_t mt;
 	int err;
 
-	if (flags & DS_FLAG_CI_DATASET)
+	if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
 		mt = MT_FIRST;
 	else
 		mt = MT_EXACT;
 
-	err = zap_lookup_norm(os, snapnames_zapobj, name, 8, 1,
+	err = zap_lookup_norm(mos, snapobj, name, 8, 1,
 	    value, mt, NULL, 0, NULL);
 	if (err == ENOTSUP && mt == MT_FIRST)
-		err = zap_lookup(os, snapnames_zapobj, name, 8, 1, value);
+		err = zap_lookup(mos, snapobj, name, 8, 1, value);
 	return (err);
 }
 
 static int
-dsl_dataset_snap_remove(objset_t *os, uint64_t flags,
-    uint64_t snapnames_zapobj, char *name, dmu_tx_t *tx)
+dsl_dataset_snap_remove(dsl_dataset_t *ds, char *name, dmu_tx_t *tx)
 {
+	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
+	uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
 	matchtype_t mt;
 	int err;
 
-	if (flags & DS_FLAG_CI_DATASET)
+	if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
 		mt = MT_FIRST;
 	else
 		mt = MT_EXACT;
 
-	err = zap_remove_norm(os, snapnames_zapobj, name, mt, tx);
+	err = zap_remove_norm(mos, snapobj, name, mt, tx);
 	if (err == ENOTSUP && mt == MT_FIRST)
-		err = zap_remove(os, snapnames_zapobj, name, tx);
+		err = zap_remove(mos, snapobj, name, tx);
 	return (err);
 }
 
-int
-dsl_dataset_open_obj(dsl_pool_t *dp, uint64_t dsobj, const char *snapname,
-    int mode, void *tag, dsl_dataset_t **dsp)
+static int
+dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
+    dsl_dataset_t **dsp)
 {
-	uint64_t weight = ds_refcnt_weight[DS_MODE_LEVEL(mode)];
 	objset_t *mos = dp->dp_meta_objset;
 	dmu_buf_t *dbuf;
 	dsl_dataset_t *ds;
@@ -356,6 +347,8 @@
 		mutex_init(&ds->ds_opening_lock, NULL, MUTEX_DEFAULT, NULL);
 		mutex_init(&ds->ds_deadlist.bpl_lock, NULL, MUTEX_DEFAULT,
 		    NULL);
+		rw_init(&ds->ds_rwlock, 0, 0, 0);
+		cv_init(&ds->ds_exclusive_cv, NULL, CV_DEFAULT, NULL);
 
 		err = bplist_open(&ds->ds_deadlist,
 		    mos, ds->ds_phys->ds_deadlist_obj);
@@ -371,6 +364,8 @@
 			mutex_destroy(&ds->ds_lock);
 			mutex_destroy(&ds->ds_opening_lock);
 			mutex_destroy(&ds->ds_deadlist.bpl_lock);
+			rw_destroy(&ds->ds_rwlock);
+			cv_destroy(&ds->ds_exclusive_cv);
 			kmem_free(ds, sizeof (dsl_dataset_t));
 			dmu_buf_rele(dbuf, tag);
 			return (err);
@@ -379,35 +374,12 @@
 		if (ds->ds_dir->dd_phys->dd_head_dataset_obj == dsobj) {
 			ds->ds_snapname[0] = '\0';
 			if (ds->ds_phys->ds_prev_snap_obj) {
-				err = dsl_dataset_open_obj(dp,
-				    ds->ds_phys->ds_prev_snap_obj, NULL,
-				    DS_MODE_NONE, ds, &ds->ds_prev);
+				err = dsl_dataset_get_ref(dp,
+				    ds->ds_phys->ds_prev_snap_obj,
+				    ds, &ds->ds_prev);
 			}
-		} else {
-			if (snapname) {
-#ifdef ZFS_DEBUG
-				dsl_dataset_phys_t *headphys;
-				dmu_buf_t *headdbuf;
-				err = dmu_bonus_hold(mos,
-				    ds->ds_dir->dd_phys->dd_head_dataset_obj,
-				    FTAG, &headdbuf);
-				if (err == 0) {
-					uint64_t foundobj;
-
-					headphys = headdbuf->db_data;
-					err = dsl_dataset_snap_lookup(
-					    dp->dp_meta_objset,
-					    headphys->ds_flags,
-					    headphys->ds_snapnames_zapobj,
-					    snapname, &foundobj);
-					ASSERT3U(foundobj, ==, dsobj);
-					dmu_buf_rele(headdbuf, FTAG);
-				}
-#endif
-				(void) strcat(ds->ds_snapname, snapname);
-			} else if (zfs_flags & ZFS_DEBUG_SNAPNAMES) {
-				err = dsl_dataset_get_snapname(ds);
-			}
+		} else if (zfs_flags & ZFS_DEBUG_SNAPNAMES) {
+			err = dsl_dataset_get_snapname(ds);
 		}
 
 		if (!dsl_dataset_is_snapshot(ds)) {
@@ -444,14 +416,14 @@
 		}
 		if (err || winner) {
 			bplist_close(&ds->ds_deadlist);
-			if (ds->ds_prev) {
-				dsl_dataset_close(ds->ds_prev,
-				    DS_MODE_NONE, ds);
-			}
+			if (ds->ds_prev)
+				dsl_dataset_drop_ref(ds->ds_prev, ds);
 			dsl_dir_close(ds->ds_dir, ds);
 			mutex_destroy(&ds->ds_lock);
 			mutex_destroy(&ds->ds_opening_lock);
 			mutex_destroy(&ds->ds_deadlist.bpl_lock);
+			rw_destroy(&ds->ds_rwlock);
+			cv_destroy(&ds->ds_exclusive_cv);
 			kmem_free(ds, sizeof (dsl_dataset_t));
 			if (err) {
 				dmu_buf_rele(dbuf, tag);
@@ -465,93 +437,166 @@
 	}
 	ASSERT3P(ds->ds_dbuf, ==, dbuf);
 	ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
-
 	mutex_enter(&ds->ds_lock);
-	if ((DS_MODE_LEVEL(mode) == DS_MODE_PRIMARY &&
-	    (ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT) &&
-	    !DS_MODE_IS_INCONSISTENT(mode)) ||
-	    (ds->ds_open_refcount + weight > DS_REF_MAX)) {
+	if (!dsl_pool_sync_context(dp) && DSL_DATASET_IS_DESTROYED(ds)) {
 		mutex_exit(&ds->ds_lock);
-		dsl_dataset_close(ds, DS_MODE_NONE, tag);
-		return (EBUSY);
+		dmu_buf_rele(ds->ds_dbuf, tag);
+		return (ENOENT);
+	}
+	mutex_exit(&ds->ds_lock);
+	*dsp = ds;
+	return (0);
+}
+
+static int
+dsl_dataset_hold_ref(dsl_dataset_t *ds, void *tag)
+{
+	dsl_pool_t *dp = ds->ds_dir->dd_pool;
+
+	/*
+	 * In syncing context we don't want the rwlock lock: there
+	 * may be an existing writer waiting for sync phase to
+	 * finish.  We don't need to worry about such writers, since
+	 * sync phase is single-threaded, so the writer can't be
+	 * doing anything while we are active.
+	 */
+	if (dsl_pool_sync_context(dp)) {
+		ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
+		return (0);
 	}
-	ds->ds_open_refcount += weight;
+
+	/*
+	 * Normal users will hold the ds_rwlock as a READER until they
+	 * are finished (i.e., call dsl_dataset_rele()).  "Owners" will
+	 * drop their READER lock after they set the ds_owner field.
+	 *
+	 * If the dataset is being destroyed, the destroy thread will
+	 * obtain a WRITER lock for exclusive access after it's done its
+	 * open-context work and then change the ds_owner to
+	 * dsl_reaper once destruction is assured.  So threads
+	 * may block here temporarily, until the "destructability" of
+	 * the dataset is determined.
+	 */
+	ASSERT(!RW_WRITE_HELD(&dp->dp_config_rwlock));
+	mutex_enter(&ds->ds_lock);
+	while (!rw_tryenter(&ds->ds_rwlock, RW_READER)) {
+		rw_exit(&dp->dp_config_rwlock);
+		cv_wait(&ds->ds_exclusive_cv, &ds->ds_lock);
+		if (DSL_DATASET_IS_DESTROYED(ds)) {
+			mutex_exit(&ds->ds_lock);
+			dsl_dataset_drop_ref(ds, tag);
+			rw_enter(&dp->dp_config_rwlock, RW_READER);
+			return (ENOENT);
+		}
+		rw_enter(&dp->dp_config_rwlock, RW_READER);
+	}
 	mutex_exit(&ds->ds_lock);
-
-	*dsp = ds;
 	return (0);
 }
 
 int
-dsl_dataset_open_spa(spa_t *spa, const char *name, int mode,
-    void *tag, dsl_dataset_t **dsp)
+dsl_dataset_hold_obj(dsl_pool_t *dp, uint64_t dsobj, void *tag,
+    dsl_dataset_t **dsp)
+{
+	int err = dsl_dataset_get_ref(dp, dsobj, tag, dsp);
+
+	if (err)
+		return (err);
+	return (dsl_dataset_hold_ref(*dsp, tag));
+}
+
+int
+dsl_dataset_own_obj(dsl_pool_t *dp, uint64_t dsobj, int flags, void *owner,
+    dsl_dataset_t **dsp)
+{
+	int err = dsl_dataset_hold_obj(dp, dsobj, owner, dsp);
+
+	ASSERT(DS_MODE_TYPE(flags) != DS_MODE_USER);
+
+	if (err)
+		return (err);
+	if (!dsl_dataset_tryown(*dsp, DS_MODE_IS_INCONSISTENT(flags), owner)) {
+		dsl_dataset_rele(*dsp, owner);
+		return (EBUSY);
+	}
+	return (0);
+}
+
+int
+dsl_dataset_hold(const char *name, void *tag, dsl_dataset_t **dsp)
 {
 	dsl_dir_t *dd;
 	dsl_pool_t *dp;
-	const char *tail;
+	const char *snapname;
 	uint64_t obj;
-	dsl_dataset_t *ds = NULL;
 	int err = 0;
 
-	err = dsl_dir_open_spa(spa, name, FTAG, &dd, &tail);
+	err = dsl_dir_open_spa(NULL, name, FTAG, &dd, &snapname);
 	if (err)
 		return (err);
 
 	dp = dd->dd_pool;
 	obj = dd->dd_phys->dd_head_dataset_obj;
 	rw_enter(&dp->dp_config_rwlock, RW_READER);
-	if (obj == 0) {
-		/* A dataset with no associated objset */
+	if (obj)
+		err = dsl_dataset_get_ref(dp, obj, tag, dsp);
+	else
 		err = ENOENT;
+	if (err)
 		goto out;
-	}
-
-	if (tail != NULL) {
-		objset_t *mos = dp->dp_meta_objset;
-		uint64_t flags;
-
-		err = dsl_dataset_open_obj(dp, obj, NULL,
-		    DS_MODE_NONE, tag, &ds);
-		if (err)
-			goto out;
-		flags = ds->ds_phys->ds_flags;
-		obj = ds->ds_phys->ds_snapnames_zapobj;
-		dsl_dataset_close(ds, DS_MODE_NONE, tag);
-		ds = NULL;
-
-		if (tail[0] != '@') {
+
+	err = dsl_dataset_hold_ref(*dsp, tag);
+
+	/* we may be looking for a snapshot */
+	if (err == 0 && snapname != NULL) {
+		dsl_dataset_t *ds = NULL;
+
+		if (*snapname++ != '@') {
+			dsl_dataset_rele(*dsp, tag);
 			err = ENOENT;
 			goto out;
 		}
-		tail++;
-
-		/* Look for a snapshot */
-		if (!DS_MODE_IS_READONLY(mode)) {
-			err = EROFS;
-			goto out;
+
+		dprintf("looking for snapshot '%s'\n", snapname);
+		err = dsl_dataset_snap_lookup(*dsp, snapname, &obj);
+		if (err == 0)
+			err = dsl_dataset_get_ref(dp, obj, tag, &ds);
+		dsl_dataset_rele(*dsp, tag);
+
+		ASSERT3U((err == 0), ==, (ds != NULL));
+
+		if (ds) {
+			mutex_enter(&ds->ds_lock);
+			if (ds->ds_snapname[0] == 0)
+				(void) strlcpy(ds->ds_snapname, snapname,
+				    sizeof (ds->ds_snapname));
+			mutex_exit(&ds->ds_lock);
+			err = dsl_dataset_hold_ref(ds, tag);
+			*dsp = err ? NULL : ds;
 		}
-		dprintf("looking for snapshot '%s'\n", tail);
-		err = dsl_dataset_snap_lookup(mos, flags, obj, tail, &obj);
-		if (err)
-			goto out;
 	}
-	err = dsl_dataset_open_obj(dp, obj, tail, mode, tag, &ds);
-
 out:
 	rw_exit(&dp->dp_config_rwlock);
 	dsl_dir_close(dd, FTAG);
-
-	ASSERT3U((err == 0), ==, (ds != NULL));
-	/* ASSERT(ds == NULL || strcmp(name, ds->ds_name) == 0); */
-
-	*dsp = ds;
 	return (err);
 }
 
 int
-dsl_dataset_open(const char *name, int mode, void *tag, dsl_dataset_t **dsp)
+dsl_dataset_own(const char *name, int flags, void *owner, dsl_dataset_t **dsp)
 {
-	return (dsl_dataset_open_spa(NULL, name, mode, tag, dsp));
+	int err = dsl_dataset_hold(name, owner, dsp);
+	if (err)
+		return (err);
+	if ((*dsp)->ds_phys->ds_num_children > 0 &&
+	    !DS_MODE_IS_READONLY(flags)) {
+		dsl_dataset_rele(*dsp, owner);
+		return (EROFS);
+	}
+	if (!dsl_dataset_tryown(*dsp, DS_MODE_IS_INCONSISTENT(flags), owner)) {
+		dsl_dataset_rele(*dsp, owner);
+		return (EBUSY);
+	}
+	return (0);
 }
 
 void
@@ -564,11 +609,11 @@
 		VERIFY(0 == dsl_dataset_get_snapname(ds));
 		if (ds->ds_snapname[0]) {
 			(void) strcat(name, "@");
+			/*
+			 * We use a "recursive" mutex so that we
+			 * can call dprintf_ds() with ds_lock held.
+			 */
 			if (!MUTEX_HELD(&ds->ds_lock)) {
-				/*
-				 * We use a "recursive" mutex so that we
-				 * can call dprintf_ds() with ds_lock held.
-				 */
 				mutex_enter(&ds->ds_lock);
 				(void) strcat(name, ds->ds_snapname);
 				mutex_exit(&ds->ds_lock);
@@ -592,7 +637,6 @@
 		if (ds->ds_snapname[0]) {
 			++result;	/* adding one for the @-sign */
 			if (!MUTEX_HELD(&ds->ds_lock)) {
-				/* see dsl_datset_name */
 				mutex_enter(&ds->ds_lock);
 				result += strlen(ds->ds_snapname);
 				mutex_exit(&ds->ds_lock);
@@ -605,49 +649,64 @@
 	return (result);
 }
 
-void
-dsl_dataset_close(dsl_dataset_t *ds, int mode, void *tag)
+static void
+dsl_dataset_drop_ref(dsl_dataset_t *ds, void *tag)
 {
-	uint64_t weight = ds_refcnt_weight[DS_MODE_LEVEL(mode)];
-	mutex_enter(&ds->ds_lock);
-	ASSERT3U(ds->ds_open_refcount, >=, weight);
-	ds->ds_open_refcount -= weight;
-	mutex_exit(&ds->ds_lock);
-
 	dmu_buf_rele(ds->ds_dbuf, tag);
 }
 
 void
-dsl_dataset_downgrade(dsl_dataset_t *ds, int oldmode, int newmode)
+dsl_dataset_rele(dsl_dataset_t *ds, void *tag)
 {
-	uint64_t oldweight = ds_refcnt_weight[DS_MODE_LEVEL(oldmode)];
-	uint64_t newweight = ds_refcnt_weight[DS_MODE_LEVEL(newmode)];
+	ASSERT(ds->ds_owner != tag);
+	if (!dsl_pool_sync_context(ds->ds_dir->dd_pool)) {
+		rw_exit(&ds->ds_rwlock);
+	}
+	dsl_dataset_drop_ref(ds, tag);
+}
+
+void
+dsl_dataset_disown(dsl_dataset_t *ds, void *owner)
+{
+	ASSERT((ds->ds_owner == owner && ds->ds_dbuf) ||
+	    (DSL_DATASET_IS_DESTROYED(ds) && ds->ds_dbuf == NULL));
+
 	mutex_enter(&ds->ds_lock);
-	ASSERT3U(ds->ds_open_refcount, >=, oldweight);
-	ASSERT3U(oldweight, >=, newweight);
-	ds->ds_open_refcount -= oldweight;
-	ds->ds_open_refcount += newweight;
+	ds->ds_owner = NULL;
+	if (RW_WRITE_HELD(&ds->ds_rwlock)) {
+		rw_exit(&ds->ds_rwlock);
+		cv_broadcast(&ds->ds_exclusive_cv);
+	}
 	mutex_exit(&ds->ds_lock);
+	if (ds->ds_dbuf)
+		dsl_dataset_drop_ref(ds, owner);
+	else
+		dsl_dataset_evict(ds->ds_dbuf, ds);
 }
 
 boolean_t
-dsl_dataset_tryupgrade(dsl_dataset_t *ds, int oldmode, int newmode)
+dsl_dataset_tryown(dsl_dataset_t *ds, boolean_t inconsistentok, void *owner)
 {
-	boolean_t rv;
-	uint64_t oldweight = ds_refcnt_weight[DS_MODE_LEVEL(oldmode)];
-	uint64_t newweight = ds_refcnt_weight[DS_MODE_LEVEL(newmode)];
+	boolean_t gotit = FALSE;
+
 	mutex_enter(&ds->ds_lock);
-	ASSERT3U(ds->ds_open_refcount, >=, oldweight);
-	ASSERT3U(newweight, >=, oldweight);
-	if (ds->ds_open_refcount - oldweight + newweight > DS_REF_MAX) {
-		rv = B_FALSE;
-	} else {
-		ds->ds_open_refcount -= oldweight;
-		ds->ds_open_refcount += newweight;
-		rv = B_TRUE;
+	if (ds->ds_owner == NULL &&
+	    (!DS_IS_INCONSISTENT(ds) || inconsistentok)) {
+		ds->ds_owner = owner;
+		if (!dsl_pool_sync_context(ds->ds_dir->dd_pool))
+			rw_exit(&ds->ds_rwlock);
+		gotit = TRUE;
 	}
 	mutex_exit(&ds->ds_lock);
-	return (rv);
+	return (gotit);
+}
+
+void
+dsl_dataset_make_exclusive(dsl_dataset_t *ds, void *owner)
+{
+	ASSERT3P(owner, ==, ds->ds_owner);
+	if (!RW_WRITE_HELD(&ds->ds_rwlock))
+		rw_enter(&ds->ds_rwlock, RW_WRITER);
 }
 
 void
@@ -687,11 +746,10 @@
 	dd->dd_phys->dd_head_dataset_obj = dsobj;
 	dsl_dir_close(dd, FTAG);
 
-	VERIFY(0 ==
-	    dsl_dataset_open_obj(dp, dsobj, NULL, DS_MODE_NONE, FTAG, &ds));
+	VERIFY(0 == dsl_dataset_get_ref(dp, dsobj, FTAG, &ds));
 	(void) dmu_objset_create_impl(dp->dp_spa, ds,
 	    &ds->ds_phys->ds_bp, DMU_OST_ZFS, tx);
-	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
+	dsl_dataset_drop_ref(ds, FTAG);
 }
 
 uint64_t
@@ -714,6 +772,7 @@
 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
 	dmu_buf_will_dirty(dbuf, tx);
 	dsphys = dbuf->db_data;
+	bzero(dsphys, sizeof (dsl_dataset_phys_t));
 	dsphys->ds_dir_obj = dd->dd_object;
 	dsphys->ds_flags = flags;
 	dsphys->ds_fsid_guid = unique_create();
@@ -796,21 +855,20 @@
 
 	(void) strcat(name, "@");
 	(void) strcat(name, da->snapname);
-	err = dsl_dataset_open(name,
-	    DS_MODE_EXCLUSIVE | DS_MODE_READONLY | DS_MODE_INCONSISTENT,
+	err = dsl_dataset_own(name, DS_MODE_READONLY | DS_MODE_INCONSISTENT,
 	    da->dstg, &ds);
 	cp = strchr(name, '@');
 	*cp = '\0';
-	if (err == ENOENT)
-		return (0);
-	if (err) {
+	if (err == 0) {
+		dsl_dataset_make_exclusive(ds, da->dstg);
+		dsl_sync_task_create(da->dstg, dsl_dataset_destroy_check,
+		    dsl_dataset_destroy_sync, ds, da->dstg, 0);
+	} else if (err == ENOENT) {
+		err = 0;
+	} else {
 		(void) strcpy(da->failed, name);
-		return (err);
 	}
-
-	dsl_sync_task_create(da->dstg, dsl_dataset_destroy_check,
-	    dsl_dataset_destroy_sync, ds, da->dstg, 0);
-	return (0);
+	return (err);
 }
 
 /*
@@ -841,16 +899,14 @@
 	for (dst = list_head(&da.dstg->dstg_tasks); dst;
 	    dst = list_next(&da.dstg->dstg_tasks, dst)) {
 		dsl_dataset_t *ds = dst->dst_arg1;
+		/*
+		 * Return the file system name that triggered the error
+		 */
 		if (dst->dst_err) {
 			dsl_dataset_name(ds, fsname);
 			*strchr(fsname, '@') = '\0';
 		}
-		/*
-		 * If it was successful, destroy_sync would have
-		 * closed the ds
-		 */
-		if (err)
-			dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, da.dstg);
+		dsl_dataset_disown(ds, da.dstg);
 	}
 
 	dsl_sync_task_group_destroy(da.dstg);
@@ -859,9 +915,8 @@
 }
 
 /*
- * ds must be opened EXCLUSIVE or PRIMARY.  on return (whether
- * successful or not), ds will be closed and caller can no longer
- * dereference it.
+ * ds must be opened as OWNER.  On return (whether successful or not),
+ * ds will be closed and caller can no longer dereference it.
  */
 int
 dsl_dataset_destroy(dsl_dataset_t *ds, void *tag)
@@ -872,16 +927,9 @@
 	dsl_dir_t *dd;
 	uint64_t obj;
 
-	if (ds->ds_open_refcount != DS_REF_MAX) {
-		if (dsl_dataset_tryupgrade(ds, DS_MODE_PRIMARY,
-		    DS_MODE_EXCLUSIVE) == 0) {
-			dsl_dataset_close(ds, DS_MODE_PRIMARY, tag);
-			return (EBUSY);
-		}
-	}
-
 	if (dsl_dataset_is_snapshot(ds)) {
 		/* Destroying a snapshot is simpler */
+		dsl_dataset_make_exclusive(ds, tag);
 		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
 		    dsl_dataset_destroy_check, dsl_dataset_destroy_sync,
 		    ds, tag, 0);
@@ -925,14 +973,18 @@
 		VERIFY(0 == dmu_object_free(os, obj, tx));
 		dmu_tx_commit(tx);
 	}
-	/* Make sure it's not dirty before we finish destroying it. */
-	txg_wait_synced(dd->dd_pool, 0);
 
 	dmu_objset_close(os);
 	if (err != ESRCH)
 		goto out;
 
 	if (ds->ds_user_ptr) {
+		/*
+		 * We need to sync out all in-flight IO before we try
+		 * to evict (the dataset evict func is trying to clear
+		 * the cached entries for this dataset in the ARC).
+		 */
+		txg_wait_synced(dd->dd_pool, 0);
 		ds->ds_user_evict_func(ds, ds->ds_user_ptr);
 		ds->ds_user_ptr = NULL;
 	}
@@ -947,6 +999,7 @@
 	/*
 	 * Blow away the dsl_dir + head dataset.
 	 */
+	dsl_dataset_make_exclusive(ds, tag);
 	dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
 	dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
 	    dsl_dataset_destroy_sync, ds, tag, 0);
@@ -954,19 +1007,18 @@
 	    dsl_dir_destroy_sync, dd, FTAG, 0);
 	err = dsl_sync_task_group_wait(dstg);
 	dsl_sync_task_group_destroy(dstg);
-	/* if it is successful, *destroy_sync will close the ds+dd */
+	/* if it is successful, dsl_dir_destroy_sync will close the dd */
 	if (err)
 		dsl_dir_close(dd, FTAG);
 out:
-	if (err)
-		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, tag);
+	dsl_dataset_disown(ds, tag);
 	return (err);
 }
 
 int
 dsl_dataset_rollback(dsl_dataset_t *ds, dmu_objset_type_t ost)
 {
-	ASSERT3U(ds->ds_open_refcount, ==, DS_REF_MAX);
+	ASSERT(ds->ds_owner);
 
 	return (dsl_sync_task_do(ds->ds_dir->dd_pool,
 	    dsl_dataset_rollback_check, dsl_dataset_rollback_sync,
@@ -1164,7 +1216,7 @@
 		 * We need to make sure that the objset_impl_t is reopened after
 		 * we do the rollback, otherwise it will have the wrong
 		 * objset_phys_t.  Normally this would happen when this
-		 * DS_MODE_EXCLUSIVE dataset-open is closed, thus causing the
+		 * dataset-open is closed, thus causing the
 		 * dataset to be immediately evicted.  But when doing "zfs recv
 		 * -F", we reopen the objset before that, so that there is no
 		 * window where the dataset is closed and inconsistent.
@@ -1292,6 +1344,9 @@
 {
 	dsl_dataset_t *ds = arg1;
 
+	/* we have an owner hold, so noone else can destroy us */
+	ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
+
 	/* Can't delete a branch point. */
 	if (ds->ds_phys->ds_num_children > 1)
 		return (EEXIST);
@@ -1316,6 +1371,46 @@
 	return (0);
 }
 
+struct refsarg {
+	kmutex_t lock;
+	boolean_t gone;
+	kcondvar_t cv;
+};
+
+/* ARGSUSED */
+static void
+dsl_dataset_refs_gone(dmu_buf_t *db, void *argv)
+{
+	struct refsarg *arg = argv;
+
+	mutex_enter(&arg->lock);
+	arg->gone = TRUE;
+	cv_signal(&arg->cv);
+	mutex_exit(&arg->lock);
+}
+
+static void
+dsl_dataset_drain_refs(dsl_dataset_t *ds, void *tag)
+{
+	struct refsarg arg;
+
+	mutex_init(&arg.lock, NULL, MUTEX_DEFAULT, NULL);
+	cv_init(&arg.cv, NULL, CV_DEFAULT, NULL);
+	arg.gone = FALSE;
+	(void) dmu_buf_update_user(ds->ds_dbuf, ds, &arg, &ds->ds_phys,
+	    dsl_dataset_refs_gone);
+	dmu_buf_rele(ds->ds_dbuf, tag);
+	mutex_enter(&arg.lock);
+	while (!arg.gone)
+		cv_wait(&arg.cv, &arg.lock);
+	ASSERT(arg.gone);
+	mutex_exit(&arg.lock);
+	ds->ds_dbuf = NULL;
+	ds->ds_phys = NULL;
+	mutex_destroy(&arg.lock);
+	cv_destroy(&arg.cv);
+}
+
 void
 dsl_dataset_destroy_sync(void *arg1, void *tag, cred_t *cr, dmu_tx_t *tx)
 {
@@ -1329,12 +1424,18 @@
 	dsl_dataset_t *ds_prev = NULL;
 	uint64_t obj;
 
-	ASSERT3U(ds->ds_open_refcount, ==, DS_REF_MAX);
+	ASSERT(ds->ds_owner);
 	ASSERT3U(ds->ds_phys->ds_num_children, <=, 1);
 	ASSERT(ds->ds_prev == NULL ||
 	    ds->ds_prev->ds_phys->ds_next_snap_obj != ds->ds_object);
 	ASSERT3U(ds->ds_phys->ds_bp.blk_birth, <=, tx->tx_txg);
 
+	/* signal any waiters that this dataset is going away */
+	mutex_enter(&ds->ds_lock);
+	ds->ds_owner = dsl_reaper;
+	cv_broadcast(&ds->ds_exclusive_cv);
+	mutex_exit(&ds->ds_lock);
+
 	/* Remove our reservation */
 	if (ds->ds_reserved != 0) {
 		uint64_t val = 0;
@@ -1350,9 +1451,8 @@
 		if (ds->ds_prev) {
 			ds_prev = ds->ds_prev;
 		} else {
-			VERIFY(0 == dsl_dataset_open_obj(dp,
-			    ds->ds_phys->ds_prev_snap_obj, NULL,
-			    DS_MODE_NONE, FTAG, &ds_prev));
+			VERIFY(0 == dsl_dataset_hold_obj(dp,
+			    ds->ds_phys->ds_prev_snap_obj, FTAG, &ds_prev));
 		}
 		after_branch_point =
 		    (ds_prev->ds_phys->ds_next_snap_obj != obj);
@@ -1379,9 +1479,8 @@
 
 		spa_scrub_restart(dp->dp_spa, tx->tx_txg);
 
-		VERIFY(0 == dsl_dataset_open_obj(dp,
-		    ds->ds_phys->ds_next_snap_obj, NULL,
-		    DS_MODE_NONE, FTAG, &ds_next));
+		VERIFY(0 == dsl_dataset_hold_obj(dp,
+		    ds->ds_phys->ds_next_snap_obj, FTAG, &ds_next));
 		ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
 
 		old_unique = dsl_dataset_unique(ds_next);
@@ -1402,8 +1501,7 @@
 		 *
 		 * XXX we're doing this long task with the config lock held
 		 */
-		while (bplist_iterate(&ds_next->ds_deadlist, &itor,
-		    &bp) == 0) {
+		while (bplist_iterate(&ds_next->ds_deadlist, &itor, &bp) == 0) {
 			if (bp.blk_birth <= ds->ds_phys->ds_prev_snap_txg) {
 				VERIFY(0 == bplist_enqueue(&ds->ds_deadlist,
 				    &bp, tx));
@@ -1428,6 +1526,7 @@
 		bplist_destroy(mos, ds_next->ds_phys->ds_deadlist_obj, tx);
 
 		/* set next's deadlist to our deadlist */
+		bplist_close(&ds->ds_deadlist);
 		ds_next->ds_phys->ds_deadlist_obj =
 		    ds->ds_phys->ds_deadlist_obj;
 		VERIFY(0 == bplist_open(&ds_next->ds_deadlist, mos,
@@ -1449,9 +1548,9 @@
 			 */
 			dsl_dataset_t *ds_after_next;
 
-			VERIFY(0 == dsl_dataset_open_obj(dp,
-			    ds_next->ds_phys->ds_next_snap_obj, NULL,
-			    DS_MODE_NONE, FTAG, &ds_after_next));
+			VERIFY(0 == dsl_dataset_hold_obj(dp,
+			    ds_next->ds_phys->ds_next_snap_obj,
+			    FTAG, &ds_after_next));
 			itor = 0;
 			while (bplist_iterate(&ds_after_next->ds_deadlist,
 			    &itor, &bp) == 0) {
@@ -1464,18 +1563,16 @@
 				}
 			}
 
-			dsl_dataset_close(ds_after_next, DS_MODE_NONE, FTAG);
+			dsl_dataset_rele(ds_after_next, FTAG);
 			ASSERT3P(ds_next->ds_prev, ==, NULL);
 		} else {
 			ASSERT3P(ds_next->ds_prev, ==, ds);
-			dsl_dataset_close(ds_next->ds_prev, DS_MODE_NONE,
-			    ds_next);
+			dsl_dataset_drop_ref(ds_next->ds_prev, ds_next);
+			ds_next->ds_prev = NULL;
 			if (ds_prev) {
-				VERIFY(0 == dsl_dataset_open_obj(dp,
-				    ds->ds_phys->ds_prev_snap_obj, NULL,
-				    DS_MODE_NONE, ds_next, &ds_next->ds_prev));
-			} else {
-				ds_next->ds_prev = NULL;
+				VERIFY(0 == dsl_dataset_get_ref(dp,
+				    ds->ds_phys->ds_prev_snap_obj,
+				    ds_next, &ds_next->ds_prev));
 			}
 
 			dsl_dataset_recalc_head_uniq(ds_next);
@@ -1497,7 +1594,7 @@
 				    0, 0, tx);
 			}
 		}
-		dsl_dataset_close(ds_next, DS_MODE_NONE, FTAG);
+		dsl_dataset_rele(ds_next, FTAG);
 
 		/*
 		 * NB: unique_bytes might not be accurate for the head objset.
@@ -1543,56 +1640,46 @@
 
 	dsl_dir_diduse_space(ds->ds_dir, -used, -compressed, -uncompressed, tx);
 
-	if (ds->ds_phys->ds_snapnames_zapobj) {
+	if (ds->ds_dir->dd_phys->dd_head_dataset_obj == ds->ds_object) {
+		/* Erase the link in the dir */
+		dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
+		ds->ds_dir->dd_phys->dd_head_dataset_obj = 0;
+		ASSERT(ds->ds_phys->ds_snapnames_zapobj != 0);
 		err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
 		ASSERT(err == 0);
-	}
-
-	if (ds->ds_dir->dd_phys->dd_head_dataset_obj == ds->ds_object) {
-		/* Erase the link in the dataset */
-		dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
-		ds->ds_dir->dd_phys->dd_head_dataset_obj = 0;
-		/*
-		 * dsl_dir_sync_destroy() called us, they'll destroy
-		 * the dataset.
-		 */
 	} else {
 		/* remove from snapshot namespace */
 		dsl_dataset_t *ds_head;
-		VERIFY(0 == dsl_dataset_open_obj(dp,
-		    ds->ds_dir->dd_phys->dd_head_dataset_obj, NULL,
-		    DS_MODE_NONE, FTAG, &ds_head));
+		ASSERT(ds->ds_phys->ds_snapnames_zapobj == 0);
+		VERIFY(0 == dsl_dataset_hold_obj(dp,
+		    ds->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ds_head));
 		VERIFY(0 == dsl_dataset_get_snapname(ds));
 #ifdef ZFS_DEBUG
 		{
 			uint64_t val;
 
-			err = dsl_dataset_snap_lookup(mos,
-			    ds_head->ds_phys->ds_flags,
-			    ds_head->ds_phys->ds_snapnames_zapobj,
+			err = dsl_dataset_snap_lookup(ds_head,
 			    ds->ds_snapname, &val);
 			ASSERT3U(err, ==, 0);
 			ASSERT3U(val, ==, obj);
 		}
 #endif
-		err = dsl_dataset_snap_remove(mos,
-		    ds_head->ds_phys->ds_flags,
-		    ds_head->ds_phys->ds_snapnames_zapobj,
-		    ds->ds_snapname, tx);
+		err = dsl_dataset_snap_remove(ds_head, ds->ds_snapname, tx);
 		ASSERT(err == 0);
-		dsl_dataset_close(ds_head, DS_MODE_NONE, FTAG);
+		dsl_dataset_rele(ds_head, FTAG);
 	}
 
 	if (ds_prev && ds->ds_prev != ds_prev)
-		dsl_dataset_close(ds_prev, DS_MODE_NONE, FTAG);
+		dsl_dataset_rele(ds_prev, FTAG);
 
 	spa_prop_clear_bootfs(dp->dp_spa, ds->ds_object, tx);
 	spa_history_internal_log(LOG_DS_DESTROY, dp->dp_spa, tx,
 	    cr, "dataset = %llu", ds->ds_object);
 
-	dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, tag);
+	dsl_dir_close(ds->ds_dir, ds);
+	ds->ds_dir = NULL;
+	dsl_dataset_drain_refs(ds, tag);
 	VERIFY(0 == dmu_object_free(mos, obj, tx));
-
 }
 
 static int
@@ -1628,7 +1715,6 @@
 {
 	dsl_dataset_t *ds = arg1;
 	const char *snapname = arg2;
-	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
 	int err;
 	uint64_t value;
 
@@ -1642,8 +1728,7 @@
 	/*
 	 * Check for conflicting name snapshot name.
 	 */
-	err = dsl_dataset_snap_lookup(mos, ds->ds_phys->ds_flags,
-	    ds->ds_phys->ds_snapnames_zapobj, snapname, &value);
+	err = dsl_dataset_snap_lookup(ds, snapname, &value);
 	if (err == 0)
 		return (EEXIST);
 	if (err != ENOENT)
@@ -1684,6 +1769,7 @@
 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
 	dmu_buf_will_dirty(dbuf, tx);
 	dsphys = dbuf->db_data;
+	bzero(dsphys, sizeof (dsl_dataset_phys_t));
 	dsphys->ds_dir_obj = ds->ds_dir->dd_object;
 	dsphys->ds_fsid_guid = unique_create();
 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
@@ -1744,10 +1830,9 @@
 	ASSERT(err == 0);
 
 	if (ds->ds_prev)
-		dsl_dataset_close(ds->ds_prev, DS_MODE_NONE, ds);
-	VERIFY(0 == dsl_dataset_open_obj(dp,
-	    ds->ds_phys->ds_prev_snap_obj, snapname,
-	    DS_MODE_NONE, ds, &ds->ds_prev));
+		dsl_dataset_drop_ref(ds->ds_prev, ds);
+	VERIFY(0 == dsl_dataset_get_ref(dp,
+	    ds->ds_phys->ds_prev_snap_obj, ds, &ds->ds_prev));
 
 	spa_history_internal_log(LOG_DS_SNAPSHOT, dp->dp_spa, tx, cr,
 	    "dataset = %llu", dsobj);
@@ -1823,11 +1908,10 @@
 	if (ds->ds_dir->dd_phys->dd_origin_obj) {
 		dsl_dataset_t *ods;
 
-		VERIFY(0 == dsl_dataset_open_obj(ds->ds_dir->dd_pool,
-		    ds->ds_dir->dd_phys->dd_origin_obj,
-		    NULL, DS_MODE_NONE, FTAG, &ods));
+		VERIFY(0 == dsl_dataset_get_ref(ds->ds_dir->dd_pool,
+		    ds->ds_dir->dd_phys->dd_origin_obj, FTAG, &ods));
 		dsl_dataset_name(ods, stat->dds_origin);
-		dsl_dataset_close(ods, DS_MODE_NONE, FTAG);
+		dsl_dataset_drop_ref(ods, FTAG);
 	}
 	rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
 }
@@ -1883,20 +1967,18 @@
 	dsl_dataset_t *ds = arg1;
 	char *newsnapname = arg2;
 	dsl_dir_t *dd = ds->ds_dir;
-	objset_t *mos = dd->dd_pool->dp_meta_objset;
 	dsl_dataset_t *hds;
 	uint64_t val;
 	int err;
 
-	err = dsl_dataset_open_obj(dd->dd_pool,
-	    dd->dd_phys->dd_head_dataset_obj, NULL, DS_MODE_NONE, FTAG, &hds);
+	err = dsl_dataset_hold_obj(dd->dd_pool,
+	    dd->dd_phys->dd_head_dataset_obj, FTAG, &hds);
 	if (err)
 		return (err);
 
 	/* new name better not be in use */
-	err = dsl_dataset_snap_lookup(mos, hds->ds_phys->ds_flags,
-	    hds->ds_phys->ds_snapnames_zapobj, newsnapname, &val);
-	dsl_dataset_close(hds, DS_MODE_NONE, FTAG);
+	err = dsl_dataset_snap_lookup(hds, newsnapname, &val);
+	dsl_dataset_rele(hds, FTAG);
 
 	if (err == 0)
 		err = EEXIST;
@@ -1923,12 +2005,11 @@
 
 	ASSERT(ds->ds_phys->ds_next_snap_obj != 0);
 
-	VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
-	    dd->dd_phys->dd_head_dataset_obj, NULL, DS_MODE_NONE, FTAG, &hds));
+	VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool,
+	    dd->dd_phys->dd_head_dataset_obj, FTAG, &hds));
 
 	VERIFY(0 == dsl_dataset_get_snapname(ds));
-	err = dsl_dataset_snap_remove(mos, hds->ds_phys->ds_flags,
-	    hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname, tx);
+	err = dsl_dataset_snap_remove(hds, ds->ds_snapname, tx);
 	ASSERT3U(err, ==, 0);
 	mutex_enter(&ds->ds_lock);
 	(void) strcpy(ds->ds_snapname, newsnapname);
@@ -1939,7 +2020,7 @@
 
 	spa_history_internal_log(LOG_DS_RENAME, dd->dd_pool->dp_spa, tx,
 	    cr, "dataset = %llu", ds->ds_object);
-	dsl_dataset_close(hds, DS_MODE_NONE, FTAG);
+	dsl_dataset_rele(hds, FTAG);
 }
 
 struct renamesnaparg {
@@ -1970,26 +2051,21 @@
 		return (err);
 	}
 
-	err = dsl_dataset_open(name, DS_MODE_READONLY | DS_MODE_STANDARD,
-	    ra->dstg, &ds);
+#ifdef _KERNEL
+	/*
+	 * For all filesystems undergoing rename, we'll need to unmount it.
+	 */
+	(void) zfs_unmount_snap(name, NULL);
+#endif
+	err = dsl_dataset_hold(name, ra->dstg, &ds);
+	*cp = '\0';
 	if (err == ENOENT) {
-		*cp = '\0';
 		return (0);
-	}
-	if (err) {
+	} else if (err) {
 		(void) strcpy(ra->failed, name);
-		*cp = '\0';
-		dsl_dataset_close(ds, DS_MODE_STANDARD, ra->dstg);
 		return (err);
 	}
 
-#ifdef _KERNEL
-	/* for all filesystems undergoing rename, we'll need to unmount it */
-	(void) zfs_unmount_snap(name, NULL);
-#endif
-
-	*cp = '\0';
-
 	dsl_sync_task_create(ra->dstg, dsl_dataset_snapshot_rename_check,
 	    dsl_dataset_snapshot_rename_sync, ds, ra->newsnap, 0);
 
@@ -2038,7 +2114,7 @@
 			(void) strcat(ra->failed, "@");
 			(void) strcat(ra->failed, ra->newsnap);
 		}
-		dsl_dataset_close(ds, DS_MODE_STANDARD, ra->dstg);
+		dsl_dataset_rele(ds, ra->dstg);
 	}
 
 	if (err)
@@ -2063,8 +2139,7 @@
 
 #pragma weak dmu_objset_rename = dsl_dataset_rename
 int
-dsl_dataset_rename(char *oldname, const char *newname,
-    boolean_t recursive)
+dsl_dataset_rename(char *oldname, const char *newname, boolean_t recursive)
 {
 	dsl_dir_t *dd;
 	dsl_dataset_t *ds;
@@ -2106,8 +2181,7 @@
 	if (recursive) {
 		err = dsl_recursive_rename(oldname, newname);
 	} else {
-		err = dsl_dataset_open(oldname,
-		    DS_MODE_READONLY | DS_MODE_STANDARD, FTAG, &ds);
+		err = dsl_dataset_hold(oldname, FTAG, &ds);
 		if (err)
 			return (err);
 
@@ -2115,15 +2189,22 @@
 		    dsl_dataset_snapshot_rename_check,
 		    dsl_dataset_snapshot_rename_sync, ds, (char *)tail, 1);
 
-		dsl_dataset_close(ds, DS_MODE_STANDARD, FTAG);
+		dsl_dataset_rele(ds, FTAG);
 	}
 
 	return (err);
 }
 
+struct promotedsarg {
+	list_node_t link;
+	dsl_dataset_t *ds;
+};
+
 struct promotearg {
+	list_t snap_list;
+	dsl_dataset_t *clone_origin, *old_head;
 	uint64_t used, comp, uncomp, unique;
-	uint64_t ds_flags, newnext_obj, snapnames_obj;
+	uint64_t newnext_obj;
 };
 
 /* ARGSUSED */
@@ -2132,139 +2213,112 @@
 {
 	dsl_dataset_t *hds = arg1;
 	struct promotearg *pa = arg2;
-	dsl_dir_t *dd = hds->ds_dir;
+	struct promotedsarg *snap = list_head(&pa->snap_list);
 	dsl_pool_t *dp = hds->ds_dir->dd_pool;
-	dsl_dir_t *odd = NULL;
-	dsl_dataset_t *ds = NULL;
-	dsl_dataset_t *origin_ds = NULL;
-	dsl_dataset_t *newnext_ds = NULL;
-	int err;
-	char *name = NULL;
+	dsl_dataset_t *origin_ds = snap->ds;
+	dsl_dataset_t *newnext_ds;
+	char *name;
 	uint64_t itor = 0;
 	blkptr_t bp;
-
-	bzero(pa, sizeof (*pa));
+	int err;
 
 	/* Check that it is a clone */
-	if (dd->dd_phys->dd_origin_obj == 0)
+	if (hds->ds_dir->dd_phys->dd_origin_obj == 0)
 		return (EINVAL);
 
 	/* Since this is so expensive, don't do the preliminary check */
 	if (!dmu_tx_is_syncing(tx))
 		return (0);
 
-	if (err = dsl_dataset_open_obj(dp, dd->dd_phys->dd_origin_obj,
-	    NULL, DS_MODE_EXCLUSIVE, FTAG, &origin_ds))
-		goto out;
-	odd = origin_ds->ds_dir;
-
-	{
-		dsl_dataset_t *phds;
-		if (err = dsl_dataset_open_obj(dd->dd_pool,
-		    odd->dd_phys->dd_head_dataset_obj,
-		    NULL, DS_MODE_NONE, FTAG, &phds))
-			goto out;
-		pa->ds_flags = phds->ds_phys->ds_flags;
-		pa->snapnames_obj = phds->ds_phys->ds_snapnames_zapobj;
-		dsl_dataset_close(phds, DS_MODE_NONE, FTAG);
-	}
-
-	if (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE) {
-		err = EXDEV;
-		goto out;
-	}
+	if (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE)
+		return (EXDEV);
 
 	/* find origin's new next ds */
-	VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool, hds->ds_object,
-	    NULL, DS_MODE_NONE, FTAG, &newnext_ds));
+	newnext_ds = hds;
 	while (newnext_ds->ds_phys->ds_prev_snap_obj != origin_ds->ds_object) {
 		dsl_dataset_t *prev;
 
-		if (err = dsl_dataset_open_obj(dd->dd_pool,
-		    newnext_ds->ds_phys->ds_prev_snap_obj,
-		    NULL, DS_MODE_NONE, FTAG, &prev))
-			goto out;
-		dsl_dataset_close(newnext_ds, DS_MODE_NONE, FTAG);
+		err = dsl_dataset_hold_obj(dp,
+		    newnext_ds->ds_phys->ds_prev_snap_obj, FTAG, &prev);
+		if (newnext_ds != hds)
+			dsl_dataset_rele(newnext_ds, FTAG);
+		if (err)
+			return (err);
 		newnext_ds = prev;
 	}
 	pa->newnext_obj = newnext_ds->ds_object;
 
 	/* compute origin's new unique space */
+	pa->unique = 0;
 	while ((err = bplist_iterate(&newnext_ds->ds_deadlist,
 	    &itor, &bp)) == 0) {
 		if (bp.blk_birth > origin_ds->ds_phys->ds_prev_snap_txg)
-			pa->unique += bp_get_dasize(dd->dd_pool->dp_spa, &bp);
+			pa->unique += bp_get_dasize(dp->dp_spa, &bp);
 	}
+	if (newnext_ds != hds)
+		dsl_dataset_rele(newnext_ds, FTAG);
 	if (err != ENOENT)
-		goto out;
-
-	/* Walk the snapshots that we are moving */
+		return (err);
+
 	name = kmem_alloc(MAXPATHLEN, KM_SLEEP);
-	ds = origin_ds;
-	/* CONSTCOND */
-	while (TRUE) {
+
+	/*
+	 * Walk the snapshots that we are moving
+	 *
+	 * Compute space to transfer.  Each snapshot gave birth to:
+	 * (my used) - (prev's used) + (deadlist's used)
+	 * So a sequence would look like:
+	 * uN - u(N-1) + dN + ... + u1 - u0 + d1 + u0 - 0 + d0
+	 * Which simplifies to:
+	 * uN + dN + ... + d1 + d0
+	 * Note however, if we stop before we reach the ORIGIN we get:
+	 * uN + dN + ... + dM - uM-1
+	 */
+	pa->used = origin_ds->ds_phys->ds_used_bytes;
+	pa->comp = origin_ds->ds_phys->ds_compressed_bytes;
+	pa->uncomp = origin_ds->ds_phys->ds_uncompressed_bytes;
+	do {
 		uint64_t val, dlused, dlcomp, dluncomp;
-		dsl_dataset_t *prev;
+		dsl_dataset_t *ds = snap->ds;
 
 		/* Check that the snapshot name does not conflict */
 		dsl_dataset_name(ds, name);
-		err = dsl_dataset_snap_lookup(dd->dd_pool->dp_meta_objset,
-		    hds->ds_phys->ds_flags, hds->ds_phys->ds_snapnames_zapobj,
-		    ds->ds_snapname, &val);
-		if (err != ENOENT) {
-			if (err == 0)
-				err = EEXIST;
-			goto out;
-		}
-
-		/*
-		 * compute space to transfer.  Each snapshot gave birth to:
-		 * (my used) - (prev's used) + (deadlist's used)
-		 */
-		pa->used += ds->ds_phys->ds_used_bytes;
-		pa->comp += ds->ds_phys->ds_compressed_bytes;
-		pa->uncomp += ds->ds_phys->ds_uncompressed_bytes;
-
-		/* If we reach the first snapshot, we're done. */
-		if (ds->ds_phys->ds_prev_snap_obj == 0)
+		err = dsl_dataset_snap_lookup(hds, ds->ds_snapname, &val);
+		if (err == 0)
+			err = EEXIST;
+		if (err != ENOENT)
 			break;
-
-		if (err = bplist_space(&ds->ds_deadlist,
-		    &dlused, &dlcomp, &dluncomp))
-			goto out;
-		if (err = dsl_dataset_open_obj(dd->dd_pool,
-		    ds->ds_phys->ds_prev_snap_obj, NULL, DS_MODE_EXCLUSIVE,
-		    FTAG, &prev))
-			goto out;
-		pa->used += dlused - prev->ds_phys->ds_used_bytes;
-		pa->comp += dlcomp - prev->ds_phys->ds_compressed_bytes;
-		pa->uncomp += dluncomp - prev->ds_phys->ds_uncompressed_bytes;
-
-		/*
-		 * We could be a clone of a clone.  If we reach our
-		 * parent's branch point, we're done.
-		 */
-		if (prev->ds_phys->ds_next_snap_obj != ds->ds_object) {
-			dsl_dataset_close(prev, DS_MODE_EXCLUSIVE, FTAG);
-			break;
+		err = 0;
+
+		/* The very first snapshot does not have a deadlist */
+		if (ds->ds_phys->ds_prev_snap_obj != 0) {
+			if (err = bplist_space(&ds->ds_deadlist,
+			    &dlused, &dlcomp, &dluncomp))
+				break;
+			pa->used += dlused;
+			pa->comp += dlcomp;
+			pa->uncomp += dluncomp;
 		}
-		if (ds != origin_ds)
-			dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
-		ds = prev;
+	} while (snap = list_next(&pa->snap_list, snap));
+
+	/*
+	 * If we are a clone of a clone then we never reached ORIGIN,
+	 * so we need to subtract out the clone origin's used space.
+	 */
+	if (pa->clone_origin) {
+		pa->used -= pa->clone_origin->ds_phys->ds_used_bytes;
+		pa->comp -= pa->clone_origin->ds_phys->ds_compressed_bytes;
+		pa->uncomp -= pa->clone_origin->ds_phys->ds_uncompressed_bytes;
 	}
 
+	kmem_free(name, MAXPATHLEN);
+
 	/* Check that there is enough space here */
-	err = dsl_dir_transfer_possible(odd, dd, pa->used);
-
-out:
-	if (ds && ds != origin_ds)
-		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
-	if (origin_ds)
-		dsl_dataset_close(origin_ds, DS_MODE_EXCLUSIVE, FTAG);
-	if (newnext_ds)
-		dsl_dataset_close(newnext_ds, DS_MODE_NONE, FTAG);
-	if (name)
-		kmem_free(name, MAXPATHLEN);
+	if (err == 0) {
+		dsl_dir_t *odd = origin_ds->ds_dir;
+		err = dsl_dir_transfer_possible(odd, hds->ds_dir, pa->used);
+	}
+
 	return (err);
 }
 
@@ -2273,17 +2327,15 @@
 {
 	dsl_dataset_t *hds = arg1;
 	struct promotearg *pa = arg2;
+	struct promotedsarg *snap = list_head(&pa->snap_list);
+	dsl_dataset_t *origin_ds = snap->ds;
 	dsl_dir_t *dd = hds->ds_dir;
 	dsl_pool_t *dp = hds->ds_dir->dd_pool;
 	dsl_dir_t *odd = NULL;
-	dsl_dataset_t *ds, *origin_ds;
 	char *name;
 
-	ASSERT(dd->dd_phys->dd_origin_obj != 0);
 	ASSERT(0 == (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE));
 
-	VERIFY(0 == dsl_dataset_open_obj(dp, dd->dd_phys->dd_origin_obj,
-	    NULL, DS_MODE_EXCLUSIVE, FTAG, &origin_ds));
 	/*
 	 * We need to explicitly open odd, since origin_ds's dd will be
 	 * changing.
@@ -2291,17 +2343,26 @@
 	VERIFY(0 == dsl_dir_open_obj(dp, origin_ds->ds_dir->dd_object,
 	    NULL, FTAG, &odd));
 
+	/* change origin's next snap */
+	dmu_buf_will_dirty(origin_ds->ds_dbuf, tx);
+	origin_ds->ds_phys->ds_next_snap_obj = pa->newnext_obj;
+
+	/* change origin */
+	dmu_buf_will_dirty(dd->dd_dbuf, tx);
+	ASSERT3U(dd->dd_phys->dd_origin_obj, ==, origin_ds->ds_object);
+	dd->dd_phys->dd_origin_obj = odd->dd_phys->dd_origin_obj;
+	dmu_buf_will_dirty(odd->dd_dbuf, tx);
+	odd->dd_phys->dd_origin_obj = origin_ds->ds_object;
+
 	/* move snapshots to this dir */
 	name = kmem_alloc(MAXPATHLEN, KM_SLEEP);
-	ds = origin_ds;
-	/* CONSTCOND */
-	while (TRUE) {
-		dsl_dataset_t *prev;
+	do {
+		dsl_dataset_t *ds = snap->ds;
 
 		/* move snap name entry */
 		dsl_dataset_name(ds, name);
-		VERIFY(0 == dsl_dataset_snap_remove(dp->dp_meta_objset,
-		    pa->ds_flags, pa->snapnames_obj, ds->ds_snapname, tx));
+		VERIFY(0 == dsl_dataset_snap_remove(pa->old_head,
+		    ds->ds_snapname, tx));
 		VERIFY(0 == zap_add(dp->dp_meta_objset,
 		    hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
 		    8, 1, &ds->ds_object, tx));
@@ -2316,35 +2377,7 @@
 		    NULL, ds, &ds->ds_dir));
 
 		ASSERT3U(dsl_prop_numcb(ds), ==, 0);
-
-		if (ds->ds_phys->ds_prev_snap_obj == 0)
-			break;
-
-		VERIFY(0 == dsl_dataset_open_obj(dp,
-		    ds->ds_phys->ds_prev_snap_obj, NULL, DS_MODE_EXCLUSIVE,
-		    FTAG, &prev));
-
-		if (prev->ds_phys->ds_next_snap_obj != ds->ds_object) {
-			dsl_dataset_close(prev, DS_MODE_EXCLUSIVE, FTAG);
-			break;
-		}
-		if (ds != origin_ds)
-			dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
-		ds = prev;
-	}
-	if (ds != origin_ds)
-		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
-
-	/* change origin's next snap */
-	dmu_buf_will_dirty(origin_ds->ds_dbuf, tx);
-	origin_ds->ds_phys->ds_next_snap_obj = pa->newnext_obj;
-
-	/* change origin */
-	dmu_buf_will_dirty(dd->dd_dbuf, tx);
-	ASSERT3U(dd->dd_phys->dd_origin_obj, ==, origin_ds->ds_object);
-	dd->dd_phys->dd_origin_obj = odd->dd_phys->dd_origin_obj;
-	dmu_buf_will_dirty(odd->dd_dbuf, tx);
-	odd->dd_phys->dd_origin_obj = origin_ds->ds_object;
+	} while (snap = list_next(&pa->snap_list, snap));
 
 	/* change space accounting */
 	dsl_dir_diduse_space(odd, -pa->used, -pa->comp, -pa->uncomp, tx);
@@ -2353,10 +2386,9 @@
 
 	/* log history record */
 	spa_history_internal_log(LOG_DS_PROMOTE, dd->dd_pool->dp_spa, tx,
-	    cr, "dataset = %llu", ds->ds_object);
+	    cr, "dataset = %llu", hds->ds_object);
 
 	dsl_dir_close(odd, FTAG);
-	dsl_dataset_close(origin_ds, DS_MODE_EXCLUSIVE, FTAG);
 	kmem_free(name, MAXPATHLEN);
 }
 
@@ -2364,30 +2396,98 @@
 dsl_dataset_promote(const char *name)
 {
 	dsl_dataset_t *ds;
-	int err;
+	dsl_dir_t *dd;
+	dsl_pool_t *dp;
 	dmu_object_info_t doi;
 	struct promotearg pa;
-
-	err = dsl_dataset_open(name, DS_MODE_NONE, FTAG, &ds);
+	struct promotedsarg *snap;
+	uint64_t snap_obj;
+	uint64_t last_snap = 0;
+	int err;
+
+	err = dsl_dataset_hold(name, FTAG, &ds);
 	if (err)
 		return (err);
-
-	err = dmu_object_info(ds->ds_dir->dd_pool->dp_meta_objset,
+	dd = ds->ds_dir;
+	dp = dd->dd_pool;
+
+	err = dmu_object_info(dp->dp_meta_objset,
 	    ds->ds_phys->ds_snapnames_zapobj, &doi);
 	if (err) {
-		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
+		dsl_dataset_rele(ds, FTAG);
 		return (err);
 	}
 
 	/*
+	 * We are going to inherit all the snapshots taken before our
+	 * origin (i.e., our new origin will be our parent's origin).
+	 * Take ownership of them so that we can rename them into our
+	 * namespace.
+	 */
+	pa.clone_origin = NULL;
+	list_create(&pa.snap_list,
+	    sizeof (struct promotedsarg), offsetof(struct promotedsarg, link));
+	rw_enter(&dp->dp_config_rwlock, RW_READER);
+	ASSERT(dd->dd_phys->dd_origin_obj != 0);
+	snap_obj = dd->dd_phys->dd_origin_obj;
+	while (snap_obj) {
+		snap = kmem_alloc(sizeof (struct promotedsarg), KM_SLEEP);
+		err = dsl_dataset_own_obj(dp, snap_obj, 0, FTAG, &snap->ds);
+		if (err == ENOENT) {
+			/* lost race with snapshot destroy */
+			struct promotedsarg *last = list_tail(&pa.snap_list);
+			ASSERT(snap_obj != last->ds->ds_phys->ds_prev_snap_obj);
+			snap_obj = last->ds->ds_phys->ds_prev_snap_obj;
+			kmem_free(snap, sizeof (struct promotedsarg));
+			continue;
+		} else if (err) {
+			kmem_free(snap, sizeof (struct promotedsarg));
+			rw_exit(&dp->dp_config_rwlock);
+			goto out;
+		}
+		/*
+		 * We could be a clone of a clone.  If we reach our
+		 * parent's branch point, we're done.
+		 */
+		if (last_snap &&
+		    snap->ds->ds_phys->ds_next_snap_obj != last_snap) {
+			pa.clone_origin = snap->ds;
+			kmem_free(snap, sizeof (struct promotedsarg));
+			snap_obj = 0;
+		} else {
+			list_insert_tail(&pa.snap_list, snap);
+			last_snap = snap_obj;
+			snap_obj = snap->ds->ds_phys->ds_prev_snap_obj;
+		}
+	}
+	snap = list_head(&pa.snap_list);
+	ASSERT(snap != NULL);
+	err = dsl_dataset_hold_obj(dp,
+	    snap->ds->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &pa.old_head);
+	rw_exit(&dp->dp_config_rwlock);
+
+	if (err)
+		goto out;
+
+	/*
 	 * Add in 128x the snapnames zapobj size, since we will be moving
 	 * a bunch of snapnames to the promoted ds, and dirtying their
 	 * bonus buffers.
 	 */
-	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
-	    dsl_dataset_promote_check,
+	err = dsl_sync_task_do(dp, dsl_dataset_promote_check,
 	    dsl_dataset_promote_sync, ds, &pa, 2 + 2 * doi.doi_physical_blks);
-	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
+
+	dsl_dataset_rele(pa.old_head, FTAG);
+out:
+	while ((snap = list_tail(&pa.snap_list)) != NULL) {
+		list_remove(&pa.snap_list, snap);
+		dsl_dataset_disown(snap->ds, FTAG);
+		kmem_free(snap, sizeof (struct promotedsarg));
+	}
+	list_destroy(&pa.snap_list);
+	if (pa.clone_origin)
+		dsl_dataset_disown(pa.clone_origin, FTAG);
+	dsl_dataset_rele(ds, FTAG);
 	return (err);
 }
 
@@ -2546,23 +2646,36 @@
 }
 
 /*
- * Swap 'clone' with its origin head file system.
+ * Swap 'clone' with its origin head file system.  Used at the end
+ * of "online recv" to swizzle the file system to the new version.
  */
 int
 dsl_dataset_clone_swap(dsl_dataset_t *clone, dsl_dataset_t *origin_head,
     boolean_t force)
 {
 	struct cloneswaparg csa;
-
-	ASSERT(clone->ds_open_refcount == DS_REF_MAX);
-	ASSERT(origin_head->ds_open_refcount == DS_REF_MAX);
-
+	int error;
+
+	ASSERT(clone->ds_owner);
+	ASSERT(origin_head->ds_owner);
+retry:
+	/* Need exclusive access for the swap */
+	rw_enter(&clone->ds_rwlock, RW_WRITER);
+	if (!rw_tryenter(&origin_head->ds_rwlock, RW_WRITER)) {
+		rw_exit(&clone->ds_rwlock);
+		rw_enter(&origin_head->ds_rwlock, RW_WRITER);
+		if (!rw_tryenter(&clone->ds_rwlock, RW_WRITER)) {
+			rw_exit(&origin_head->ds_rwlock);
+			goto retry;
+		}
+	}
 	csa.cds = clone;
 	csa.ohds = origin_head;
 	csa.force = force;
-	return (dsl_sync_task_do(clone->ds_dir->dd_pool,
+	error = dsl_sync_task_do(clone->ds_dir->dd_pool,
 	    dsl_dataset_clone_swap_check,
-	    dsl_dataset_clone_swap_sync, &csa, NULL, 9));
+	    dsl_dataset_clone_swap_sync, &csa, NULL, 9);
+	return (error);
 }
 
 /*
@@ -2574,31 +2687,26 @@
 {
 	spa_t *spa;
 	dsl_pool_t *dp;
-	dsl_dataset_t *ds = NULL;
+	dsl_dataset_t *ds;
 	int error;
 
 	if ((error = spa_open(pname, &spa, FTAG)) != 0)
 		return (error);
 	dp = spa_get_dsl(spa);
 	rw_enter(&dp->dp_config_rwlock, RW_READER);
-	if ((error = dsl_dataset_open_obj(dp, obj,
-	    NULL, DS_MODE_NONE, FTAG, &ds)) != 0) {
-		rw_exit(&dp->dp_config_rwlock);
-		spa_close(spa, FTAG);
-		return (error);
+	if ((error = dsl_dataset_hold_obj(dp, obj, FTAG, &ds)) == 0) {
+		dsl_dataset_name(ds, buf);
+		dsl_dataset_rele(ds, FTAG);
 	}
-	dsl_dataset_name(ds, buf);
-	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
 	rw_exit(&dp->dp_config_rwlock);
 	spa_close(spa, FTAG);
 
-	return (0);
+	return (error);
 }
 
 int
 dsl_dataset_check_quota(dsl_dataset_t *ds, boolean_t check_quota,
-    uint64_t asize, uint64_t inflight, uint64_t *used,
-    uint64_t *ref_rsrv)
+    uint64_t asize, uint64_t inflight, uint64_t *used, uint64_t *ref_rsrv)
 {
 	int error = 0;
 
@@ -2674,15 +2782,13 @@
 
 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
 
-	mutex_enter(&ds->ds_lock);
 	ds->ds_quota = new_quota;
-	mutex_exit(&ds->ds_lock);
 
 	dsl_prop_set_uint64_sync(ds->ds_dir, "refquota", new_quota, cr, tx);
 
 	spa_history_internal_log(LOG_DS_REFQUOTA, ds->ds_dir->dd_pool->dp_spa,
 	    tx, cr, "%lld dataset = %llu ",
-	    (longlong_t)new_quota, ds->ds_dir->dd_phys->dd_head_dataset_obj);
+	    (longlong_t)new_quota, ds->ds_object);
 }
 
 int
@@ -2691,7 +2797,7 @@
 	dsl_dataset_t *ds;
 	int err;
 
-	err = dsl_dataset_open(dsname, DS_MODE_STANDARD, FTAG, &ds);
+	err = dsl_dataset_hold(dsname, FTAG, &ds);
 	if (err)
 		return (err);
 
@@ -2706,7 +2812,7 @@
 		    dsl_dataset_set_quota_check, dsl_dataset_set_quota_sync,
 		    ds, &quota, 0);
 	}
-	dsl_dataset_close(ds, DS_MODE_STANDARD, FTAG);
+	dsl_dataset_rele(ds, FTAG);
 	return (err);
 }
 
@@ -2788,13 +2894,13 @@
 	dsl_dataset_t *ds;
 	int err;
 
-	err = dsl_dataset_open(dsname, DS_MODE_STANDARD, FTAG, &ds);
+	err = dsl_dataset_hold(dsname, FTAG, &ds);
 	if (err)
 		return (err);
 
 	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
 	    dsl_dataset_set_reservation_check,
 	    dsl_dataset_set_reservation_sync, ds, &reservation, 0);
-	dsl_dataset_close(ds, DS_MODE_STANDARD, FTAG);
+	dsl_dataset_rele(ds, FTAG);
 	return (err);
 }
--- a/usr/src/uts/common/fs/zfs/dsl_dir.c	Thu May 22 11:05:03 2008 -0700
+++ b/usr/src/uts/common/fs/zfs/dsl_dir.c	Thu May 22 11:13:47 2008 -0700
@@ -535,11 +535,10 @@
 		dsl_dataset_t *ds;
 		char buf[MAXNAMELEN];
 
-		VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
-		    dd->dd_phys->dd_origin_obj,
-		    NULL, DS_MODE_NONE, FTAG, &ds));
+		VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool,
+		    dd->dd_phys->dd_origin_obj, FTAG, &ds));
 		dsl_dataset_name(ds, buf);
-		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
+		dsl_dataset_rele(ds, FTAG);
 		dsl_prop_nvlist_add_string(nv, ZFS_PROP_ORIGIN, buf);
 	}
 	rw_exit(&dd->dd_pool->dp_config_rwlock);
--- a/usr/src/uts/common/fs/zfs/dsl_prop.c	Thu May 22 11:05:03 2008 -0700
+++ b/usr/src/uts/common/fs/zfs/dsl_prop.c	Thu May 22 11:13:47 2008 -0700
@@ -438,7 +438,7 @@
  * Iterate over all properties for this dataset and return them in an nvlist.
  */
 int
-dsl_prop_get_all(objset_t *os, nvlist_t **nvp)
+dsl_prop_get_all(objset_t *os, nvlist_t **nvp, boolean_t local)
 {
 	dsl_dataset_t *ds = os->os->os_dsl_dataset;
 	dsl_dir_t *dd = ds->ds_dir;
@@ -522,6 +522,12 @@
 		if (err != ENOENT)
 			break;
 		err = 0;
+		/*
+		 * If we are just after the props that have been set
+		 * locally, then we are done after the first iteration.
+		 */
+		if (local)
+			break;
 	}
 	rw_exit(&dp->dp_config_rwlock);
 
--- a/usr/src/uts/common/fs/zfs/spa.c	Thu May 22 11:05:03 2008 -0700
+++ b/usr/src/uts/common/fs/zfs/spa.c	Thu May 22 11:13:47 2008 -0700
@@ -201,9 +201,8 @@
 
 				dp = spa_get_dsl(spa);
 				rw_enter(&dp->dp_config_rwlock, RW_READER);
-				if (err = dsl_dataset_open_obj(dp,
-				    za.za_first_integer, NULL, DS_MODE_NONE,
-				    FTAG, &ds)) {
+				if (err = dsl_dataset_hold_obj(dp,
+				    za.za_first_integer, FTAG, &ds)) {
 					rw_exit(&dp->dp_config_rwlock);
 					break;
 				}
@@ -212,7 +211,7 @@
 				    MAXNAMELEN + strlen(MOS_DIR_NAME) + 1,
 				    KM_SLEEP);
 				dsl_dataset_name(ds, strval);
-				dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
+				dsl_dataset_rele(ds, FTAG);
 				rw_exit(&dp->dp_config_rwlock);
 			} else {
 				strval = NULL;
@@ -329,7 +328,7 @@
 				}
 
 				if (error = dmu_objset_open(strval, DMU_OST_ZFS,
-				    DS_MODE_STANDARD | DS_MODE_READONLY, &os))
+				    DS_MODE_USER | DS_MODE_READONLY, &os))
 					break;
 				objnum = dmu_objset_id(os);
 				dmu_objset_close(os);
--- a/usr/src/uts/common/fs/zfs/sys/dmu.h	Thu May 22 11:05:03 2008 -0700
+++ b/usr/src/uts/common/fs/zfs/sys/dmu.h	Thu May 22 11:13:47 2008 -0700
@@ -136,12 +136,11 @@
 void zfs_acl_byteswap(void *buf, size_t size);
 void zfs_znode_byteswap(void *buf, size_t size);
 
-#define	DS_MODE_NONE		0	/* invalid, to aid debugging */
-#define	DS_MODE_STANDARD	1	/* normal access, no special needs */
-#define	DS_MODE_PRIMARY		2	/* the "main" access, e.g. a mount */
-#define	DS_MODE_EXCLUSIVE	3	/* exclusive access, e.g. to destroy */
-#define	DS_MODE_LEVELS		4
-#define	DS_MODE_LEVEL(x)	((x) & (DS_MODE_LEVELS - 1))
+#define	DS_MODE_NOHOLD		0	/* internal use only */
+#define	DS_MODE_USER		1	/* simple access, no special needs */
+#define	DS_MODE_OWNER		2	/* the "main" access, e.g. a mount */
+#define	DS_MODE_TYPE_MASK	0x3
+#define	DS_MODE_TYPE(x)		((x) & DS_MODE_TYPE_MASK)
 #define	DS_MODE_READONLY	0x8
 #define	DS_MODE_IS_READONLY(x)	((x) & DS_MODE_READONLY)
 #define	DS_MODE_INCONSISTENT	0x10
--- a/usr/src/uts/common/fs/zfs/sys/dsl_dataset.h	Thu May 22 11:05:03 2008 -0700
+++ b/usr/src/uts/common/fs/zfs/sys/dsl_dataset.h	Thu May 22 11:13:47 2008 -0700
@@ -47,6 +47,8 @@
 typedef void dsl_dataset_evict_func_t(struct dsl_dataset *, void *);
 
 #define	DS_FLAG_INCONSISTENT	(1ULL<<0)
+#define	DS_IS_INCONSISTENT(ds)	\
+	((ds)->ds_phys->ds_flags & DS_FLAG_INCONSISTENT)
 /*
  * NB: nopromote can not yet be set, but we want support for it in this
  * on-disk version, so that we don't need to upgrade for it later.  It
@@ -119,7 +121,13 @@
 	kmutex_t ds_lock;
 	void *ds_user_ptr;
 	dsl_dataset_evict_func_t *ds_user_evict_func;
-	uint64_t ds_open_refcount;
+
+	/*
+	 * ds_owner is protected by the ds_rwlock and the ds_lock
+	 */
+	krwlock_t ds_rwlock;
+	kcondvar_t ds_exclusive_cv;
+	void *ds_owner;
 
 	/* no locking; only for making guesses */
 	uint64_t ds_trysnap_txg;
@@ -140,21 +148,23 @@
 #define	DS_UNIQUE_IS_ACCURATE(ds)	\
 	(((ds)->ds_phys->ds_flags & DS_FLAG_UNIQUE_ACCURATE) != 0)
 
-int dsl_dataset_open_spa(spa_t *spa, const char *name, int mode,
-    void *tag, dsl_dataset_t **dsp);
-int dsl_dataset_open(const char *name, int mode, void *tag,
+int dsl_dataset_hold(const char *name, void *tag, dsl_dataset_t **dsp);
+int dsl_dataset_hold_obj(struct dsl_pool *dp, uint64_t dsobj,
+    void *tag, dsl_dataset_t **);
+int dsl_dataset_own(const char *name, int flags, void *owner,
     dsl_dataset_t **dsp);
-int dsl_dataset_open_obj(struct dsl_pool *dp, uint64_t dsobj,
-    const char *tail, int mode, void *tag, dsl_dataset_t **);
+int dsl_dataset_own_obj(struct dsl_pool *dp, uint64_t dsobj,
+    int flags, void *owner, dsl_dataset_t **);
 void dsl_dataset_name(dsl_dataset_t *ds, char *name);
-void dsl_dataset_close(dsl_dataset_t *ds, int mode, void *tag);
-void dsl_dataset_downgrade(dsl_dataset_t *ds, int oldmode, int newmode);
-boolean_t dsl_dataset_tryupgrade(dsl_dataset_t *ds, int oldmode, int newmode);
+void dsl_dataset_rele(dsl_dataset_t *ds, void *tag);
+void dsl_dataset_disown(dsl_dataset_t *ds, void *owner);
+boolean_t dsl_dataset_tryown(dsl_dataset_t *ds, boolean_t inconsistentok,
+    void *owner);
+void dsl_dataset_make_exclusive(dsl_dataset_t *ds, void *owner);
 uint64_t dsl_dataset_create_sync_impl(dsl_dir_t *dd, dsl_dataset_t *origin,
     uint64_t flags, dmu_tx_t *tx);
-uint64_t dsl_dataset_create_sync(dsl_dir_t *pds,
-    const char *lastname, dsl_dataset_t *origin, uint64_t flags,
-    cred_t *, dmu_tx_t *);
+uint64_t dsl_dataset_create_sync(dsl_dir_t *pds, const char *lastname,
+    dsl_dataset_t *origin, uint64_t flags, cred_t *, dmu_tx_t *);
 int dsl_dataset_destroy(dsl_dataset_t *ds, void *tag);
 int dsl_snapshots_destroy(char *fsname, char *snapname);
 dsl_checkfunc_t dsl_dataset_destroy_check;
--- a/usr/src/uts/common/fs/zfs/sys/dsl_prop.h	Thu May 22 11:05:03 2008 -0700
+++ b/usr/src/uts/common/fs/zfs/sys/dsl_prop.h	Thu May 22 11:13:47 2008 -0700
@@ -19,7 +19,7 @@
  * CDDL HEADER END
  */
 /*
- * Copyright 2007 Sun Microsystems, Inc.  All rights reserved.
+ * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
  * Use is subject to license terms.
  */
 
@@ -59,7 +59,7 @@
     int intsz, int numints, void *buf, char *setpoint);
 int dsl_prop_get_integer(const char *ddname, const char *propname,
     uint64_t *valuep, char *setpoint);
-int dsl_prop_get_all(objset_t *os, nvlist_t **nvp);
+int dsl_prop_get_all(objset_t *os, nvlist_t **nvp, boolean_t local);
 int dsl_prop_get_ds_locked(dsl_dir_t *dd, const char *propname,
     int intsz, int numints, void *buf, char *setpoint);
 
--- a/usr/src/uts/common/fs/zfs/txg.c	Thu May 22 11:05:03 2008 -0700
+++ b/usr/src/uts/common/fs/zfs/txg.c	Thu May 22 11:13:47 2008 -0700
@@ -333,7 +333,7 @@
 		mutex_exit(&tx->tx_sync_lock);
 		start = lbolt;
 		spa_sync(dp->dp_spa, txg);
-		delta = lbolt - start;
+		delta = (lbolt - start) + 1;
 
 		written = dp->dp_space_towrite[txg & TXG_MASK];
 		dp->dp_space_towrite[txg & TXG_MASK] = 0;
--- a/usr/src/uts/common/fs/zfs/zfs_ctldir.c	Thu May 22 11:05:03 2008 -0700
+++ b/usr/src/uts/common/fs/zfs/zfs_ctldir.c	Thu May 22 11:13:47 2008 -0700
@@ -782,7 +782,7 @@
 		return (err);
 	}
 	if (dmu_objset_open(snapname, DMU_OST_ZFS,
-	    DS_MODE_STANDARD | DS_MODE_READONLY, &snap) != 0) {
+	    DS_MODE_USER | DS_MODE_READONLY, &snap) != 0) {
 		mutex_exit(&sdp->sd_lock);
 		ZFS_EXIT(zfsvfs);
 		return (ENOENT);
--- a/usr/src/uts/common/fs/zfs/zfs_ioctl.c	Thu May 22 11:05:03 2008 -0700
+++ b/usr/src/uts/common/fs/zfs/zfs_ioctl.c	Thu May 22 11:13:47 2008 -0700
@@ -176,24 +176,22 @@
 }
 
 /*
- * zpl_check_version
+ * zpl_earlier_version
  *
- * Return non-zero if the ZPL version is less than requested version.
+ * Return TRUE if the ZPL version is less than requested version.
  */
-static int
-zpl_check_version(const char *name, int version)
+static boolean_t
+zpl_earlier_version(const char *name, int version)
 {
 	objset_t *os;
-	int rc = 1;
+	boolean_t rc = B_TRUE;
 
 	if (dmu_objset_open(name, DMU_OST_ANY,
-	    DS_MODE_STANDARD | DS_MODE_READONLY, &os) == 0) {
-		uint64_t propversion;
-
-		if (zfs_get_zplprop(os, ZFS_PROP_VERSION,
-		    &propversion) == 0) {
-			rc = !(propversion >= version);
-		}
+	    DS_MODE_USER | DS_MODE_READONLY, &os) == 0) {
+		uint64_t zplversion;
+
+		if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &zplversion) == 0)
+			rc = zplversion < version;
 		dmu_objset_close(os);
 	}
 	return (rc);
@@ -494,7 +492,7 @@
 		return (error);
 
 	error = dmu_objset_open(zc->zc_name, DMU_OST_ANY,
-	    DS_MODE_STANDARD | DS_MODE_READONLY, &clone);
+	    DS_MODE_USER | DS_MODE_READONLY, &clone);
 
 	if (error == 0) {
 		dsl_dataset_t *pclone = NULL;
@@ -502,9 +500,8 @@
 		dd = clone->os->os_dsl_dataset->ds_dir;
 
 		rw_enter(&dd->dd_pool->dp_config_rwlock, RW_READER);
-		error = dsl_dataset_open_obj(dd->dd_pool,
-		    dd->dd_phys->dd_origin_obj, NULL,
-		    DS_MODE_NONE, FTAG, &pclone);
+		error = dsl_dataset_hold_obj(dd->dd_pool,
+		    dd->dd_phys->dd_origin_obj, FTAG, &pclone);
 		rw_exit(&dd->dd_pool->dp_config_rwlock);
 		if (error) {
 			dmu_objset_close(clone);
@@ -516,7 +513,7 @@
 
 		dsl_dataset_name(pclone, parentname);
 		dmu_objset_close(clone);
-		dsl_dataset_close(pclone, DS_MODE_NONE, FTAG);
+		dsl_dataset_rele(pclone, FTAG);
 		if (error == 0)
 			error = zfs_secpolicy_write_perms(parentname,
 			    ZFS_DELEG_PERM_PROMOTE, cr);
@@ -972,9 +969,8 @@
 	int error;
 
 	if ((error = dmu_objset_open(zc->zc_name, DMU_OST_ZFS,
-	    DS_MODE_NONE | DS_MODE_READONLY, &osp)) != 0)
+	    DS_MODE_USER | DS_MODE_READONLY, &osp)) != 0)
 		return (error);
-
 	error = zfs_obj_to_path(osp, zc->zc_obj, zc->zc_value,
 	    sizeof (zc->zc_value));
 	dmu_objset_close(osp);
@@ -1126,30 +1122,6 @@
 	return (error);
 }
 
-static int
-zfs_os_open_retry(char *name, objset_t **os)
-{
-	int error;
-
-retry:
-	error = dmu_objset_open(name, DMU_OST_ANY,
-	    DS_MODE_STANDARD | DS_MODE_READONLY, os);
-	if (error != 0) {
-		/*
-		 * This is ugly: dmu_objset_open() can return EBUSY if
-		 * the objset is held exclusively. Fortunately this hold is
-		 * only for a short while, so we retry here.
-		 * This avoids user code having to handle EBUSY,
-		 * for example for a "zfs list".
-		 */
-		if (error == EBUSY) {
-			delay(1);
-			goto retry;
-		}
-	}
-	return (error);
-}
-
 /*
  * inputs:
  * zc_name		name of filesystem
@@ -1168,18 +1140,19 @@
 	int error;
 	nvlist_t *nv;
 
-	if ((error = zfs_os_open_retry(zc->zc_name, &os)) != 0)
+	if (error = dmu_objset_open(zc->zc_name,
+	    DMU_OST_ANY, DS_MODE_USER | DS_MODE_READONLY, &os))
 		return (error);
 
 	dmu_objset_fast_stat(os, &zc->zc_objset_stats);
 
 	if (zc->zc_nvlist_dst != 0 &&
-	    (error = dsl_prop_get_all(os, &nv)) == 0) {
+	    (error = dsl_prop_get_all(os, &nv, FALSE)) == 0) {
 		dmu_objset_stats(os, nv);
 		/*
 		 * NB: zvol_get_stats() will read the objset contents,
 		 * which we aren't supposed to do with a
-		 * DS_MODE_STANDARD open, because it could be
+		 * DS_MODE_USER hold, because it could be
 		 * inconsistent.  So this is a bit of a workaround...
 		 */
 		if (!zc->zc_objset_stats.dds_inconsistent) {
@@ -1227,15 +1200,16 @@
 	objset_t *os;
 	int err;
 
-	if ((err = zfs_os_open_retry(zc->zc_name, &os)) != 0)
+	if (err = dmu_objset_open(zc->zc_name,
+	    DMU_OST_ANY, DS_MODE_USER | DS_MODE_READONLY, &os))
 		return (err);
 
 	dmu_objset_fast_stat(os, &zc->zc_objset_stats);
 
 	/*
 	 * NB: nvl_add_zplprop() will read the objset contents,
-	 * which we aren't supposed to do with a DS_MODE_STANDARD
-	 * open, because it could be inconsistent.
+	 * which we aren't supposed to do with a DS_MODE_USER
+	 * hold, because it could be inconsistent.
 	 */
 	if (zc->zc_nvlist_dst != NULL &&
 	    !zc->zc_objset_stats.dds_inconsistent &&
@@ -1276,7 +1250,8 @@
 	int error;
 	char *p;
 
-	if ((error = zfs_os_open_retry(zc->zc_name, &os)) != 0) {
+	if (error = dmu_objset_open(zc->zc_name,
+	    DMU_OST_ANY, DS_MODE_USER | DS_MODE_READONLY, &os)) {
 		if (error == ENOENT)
 			error = ESRCH;
 		return (error);
@@ -1295,6 +1270,7 @@
 			error = ESRCH;
 	} while (error == 0 && !INGLOBALZONE(curproc) &&
 	    !zone_dataset_visible(zc->zc_name, NULL));
+	dmu_objset_close(os);
 
 	/*
 	 * If it's a hidden dataset (ie. with a '$' in its name), don't
@@ -1303,7 +1279,6 @@
 	if (error == 0 && strchr(zc->zc_name, '$') == NULL)
 		error = zfs_ioc_objset_stats(zc); /* fill in the stats */
 
-	dmu_objset_close(os);
 	return (error);
 }
 
@@ -1326,11 +1301,10 @@
 	objset_t *os;
 	int error;
 
-	if ((error = zfs_os_open_retry(zc->zc_name, &os)) != 0) {
-		if (error == ENOENT)
-			error = ESRCH;
-		return (error);
-	}
+	error = dmu_objset_open(zc->zc_name,
+	    DMU_OST_ANY, DS_MODE_USER | DS_MODE_READONLY, &os);
+	if (error)
+		return (error == ENOENT ? ESRCH : error);
 
 	/*
 	 * A dataset name of maximum length cannot have any snapshots,
@@ -1344,17 +1318,15 @@
 	error = dmu_snapshot_list_next(os,
 	    sizeof (zc->zc_name) - strlen(zc->zc_name),
 	    zc->zc_name + strlen(zc->zc_name), NULL, &zc->zc_cookie, NULL);
-	if (error == ENOENT)
-		error = ESRCH;
-
+	dmu_objset_close(os);
 	if (error == 0)
 		error = zfs_ioc_objset_stats(zc); /* fill in the stats */
+	else if (error == ENOENT)
+		error = ESRCH;
 
 	/* if we failed, undo the @ that we tacked on to zc_name */
-	if (error != 0)
+	if (error)
 		*strchr(zc->zc_name, '@') = '\0';
-
-	dmu_objset_close(os);
 	return (error);
 }
 
@@ -1418,7 +1390,7 @@
 			break;
 
 		case ZFS_PROP_SHARESMB:
-			if (zpl_check_version(name, ZPL_VERSION_FUID))
+			if (zpl_earlier_version(name, ZPL_VERSION_FUID))
 				return (ENOTSUP);
 			break;
 		}
@@ -1871,7 +1843,8 @@
 	 * Open parent object set so we can inherit zplprop values if
 	 * necessary.
 	 */
-	if ((error = zfs_os_open_retry(parentname, &os)) != 0)
+	if (error = dmu_objset_open(parentname,
+	    DMU_OST_ANY, DS_MODE_USER | DS_MODE_READONLY, &os))
 		return (error);
 
 	if (norm == ZFS_PROP_UNDEFINED)
@@ -1957,7 +1930,7 @@
 		}
 
 		error = dmu_objset_open(zc->zc_value, type,
-		    DS_MODE_STANDARD | DS_MODE_READONLY, &clone);
+		    DS_MODE_USER | DS_MODE_READONLY, &clone);
 		if (error) {
 			nvlist_free(nvprops);
 			return (error);
@@ -2091,21 +2064,18 @@
 int
 zfs_unmount_snap(char *name, void *arg)
 {
-	char *snapname = arg;
-	char *cp;
 	vfs_t *vfsp = NULL;
 
-	/*
-	 * Snapshots (which are under .zfs control) must be unmounted
-	 * before they can be destroyed.
-	 */
-
-	if (snapname) {
-		(void) strcat(name, "@");
-		(void) strcat(name, snapname);
-		vfsp = zfs_get_vfs(name);
-		cp = strchr(name, '@');
-		*cp = '\0';
+	if (arg) {
+		char *snapname = arg;
+		int len = strlen(name) + strlen(snapname) + 2;
+		char *buf = kmem_alloc(len, KM_SLEEP);
+
+		(void) strcpy(buf, name);
+		(void) strcat(buf, "@");
+		(void) strcat(buf, snapname);
+		vfsp = zfs_get_vfs(buf);
+		kmem_free(buf, len);
 	} else if (strchr(name, '@')) {
 		vfsp = zfs_get_vfs(name);
 	}
@@ -2186,8 +2156,7 @@
 	 * won't be one if we're operating on a zvol, if the
 	 * objset doesn't exist yet, or is not mounted.
 	 */
-	error = dmu_objset_open(zc->zc_name, DMU_OST_ANY,
-	    DS_MODE_STANDARD, &os);
+	error = dmu_objset_open(zc->zc_name, DMU_OST_ANY, DS_MODE_USER, &os);
 	if (error)
 		return (error);
 
@@ -2218,7 +2187,7 @@
 	} else {
 		error = dmu_objset_rollback(os);
 	}
-	/* Note, the dmu_objset_rollback() closes the objset for us. */
+	/* Note, the dmu_objset_rollback() releases the objset for us. */
 
 	return (error);
 }
@@ -2252,10 +2221,28 @@
 		if (err)
 			return (err);
 	}
-
 	return (dmu_objset_rename(zc->zc_name, zc->zc_value, recursive));
 }
 
+static void
+clear_props(char *dataset, nvlist_t *props)
+{
+	zfs_cmd_t *zc;
+	nvpair_t *prop;
+
+	if (props == NULL)
+		return;
+	zc = kmem_alloc(sizeof (zfs_cmd_t), KM_SLEEP);
+	(void) strcpy(zc->zc_name, dataset);
+	for (prop = nvlist_next_nvpair(props, NULL); prop;
+	    prop = nvlist_next_nvpair(props, prop)) {
+		(void) strcpy(zc->zc_value, nvpair_name(prop));
+		if (zfs_secpolicy_inherit(zc, CRED()) == 0)
+			(void) zfs_ioc_inherit_prop(zc);
+	}
+	kmem_free(zc, sizeof (zfs_cmd_t));
+}
+
 /*
  * inputs:
  * zc_name		name of containing filesystem
@@ -2280,6 +2267,7 @@
 	int error, fd;
 	offset_t off;
 	nvlist_t *props = NULL;
+	nvlist_t *origprops = NULL;
 	objset_t *origin = NULL;
 	char *tosnap;
 	char tofs[ZFS_MAXNAMELEN];
@@ -2306,133 +2294,102 @@
 		return (EBADF);
 	}
 
-	/*
-	 * Get the zfsvfs for the receiving objset. There
-	 * won't be one if we're operating on a zvol, if the
-	 * objset doesn't exist yet, or is not mounted.
-	 */
-
-	error = dmu_objset_open(tofs, DMU_OST_ZFS,
-	    DS_MODE_STANDARD | DS_MODE_READONLY, &os);
-	if (!error) {
+	if (dmu_objset_open(tofs, DMU_OST_ANY,
+	    DS_MODE_USER | DS_MODE_READONLY, &os) == 0) {
+		/*
+		 * Try to get the zfsvfs for the receiving objset.
+		 * There won't be one if we're operating on a zvol,
+		 * if the objset doesn't exist yet, or is not mounted.
+		 */
 		mutex_enter(&os->os->os_user_ptr_lock);
-		zfsvfs = dmu_objset_get_user(os);
-		if (zfsvfs != NULL) {
-			VFS_HOLD(zfsvfs->z_vfs);
-			mutex_exit(&os->os->os_user_ptr_lock);
+		if (zfsvfs = dmu_objset_get_user(os)) {
 			if (!mutex_tryenter(&zfsvfs->z_online_recv_lock)) {
-				VFS_RELE(zfsvfs->z_vfs);
+				mutex_exit(&os->os->os_user_ptr_lock);
 				dmu_objset_close(os);
-				nvlist_free(props);
-				releasef(fd);
-				return (EBUSY);
+				zfsvfs = NULL;
+				error = EBUSY;
+				goto out;
 			}
-		} else {
-			mutex_exit(&os->os->os_user_ptr_lock);
+			VFS_HOLD(zfsvfs->z_vfs);
 		}
+		mutex_exit(&os->os->os_user_ptr_lock);
+
+		/*
+		 * If new properties are supplied, they are to completely
+		 * replace the existing ones, so stash away the existing ones.
+		 */
+		if (props)
+			(void) dsl_prop_get_all(os, &origprops, TRUE);
+
 		dmu_objset_close(os);
 	}
 
 	if (zc->zc_string[0]) {
 		error = dmu_objset_open(zc->zc_string, DMU_OST_ANY,
-		    DS_MODE_STANDARD | DS_MODE_READONLY, &origin);
-		if (error) {
-			if (zfsvfs != NULL) {
-				mutex_exit(&zfsvfs->z_online_recv_lock);
-				VFS_RELE(zfsvfs->z_vfs);
-			}
-			nvlist_free(props);
-			releasef(fd);
-			return (error);
-		}
+		    DS_MODE_USER | DS_MODE_READONLY, &origin);
+		if (error)
+			goto out;
 	}
 
 	error = dmu_recv_begin(tofs, tosnap, &zc->zc_begin_record,
 	    force, origin, zfsvfs != NULL, &drc);
 	if (origin)
 		dmu_objset_close(origin);
-	if (error) {
-		if (zfsvfs != NULL) {
-			mutex_exit(&zfsvfs->z_online_recv_lock);
-			VFS_RELE(zfsvfs->z_vfs);
-		}
-		nvlist_free(props);
-		releasef(fd);
-		return (error);
-	}
+	if (error)
+		goto out;
 
 	/*
-	 * If properties are supplied, they are to completely replace
-	 * the existing ones; "inherit" any existing properties.
+	 * Reset properties.  We do this before we receive the stream
+	 * so that the properties are applied to the new data.
 	 */
 	if (props) {
-		objset_t *os;
-		nvlist_t *nv = NULL;
-
-		error = dmu_objset_open(tofs, DMU_OST_ANY,
-		    DS_MODE_STANDARD | DS_MODE_READONLY | DS_MODE_INCONSISTENT,
-		    &os);
-		if (error == 0) {
-			error = dsl_prop_get_all(os, &nv);
-			dmu_objset_close(os);
-		}
-		if (error == 0) {
-			nvpair_t *elem;
-			zfs_cmd_t *zc2;
-			zc2 = kmem_alloc(sizeof (zfs_cmd_t), KM_SLEEP);
-
-			(void) strcpy(zc2->zc_name, tofs);
-			for (elem = nvlist_next_nvpair(nv, NULL); elem;
-			    elem = nvlist_next_nvpair(nv, elem)) {
-				(void) strcpy(zc2->zc_value, nvpair_name(elem));
-				if (zfs_secpolicy_inherit(zc2, CRED()) == 0)
-					(void) zfs_ioc_inherit_prop(zc2);
-			}
-			kmem_free(zc2, sizeof (zfs_cmd_t));
-		}
-		if (nv)
-			nvlist_free(nv);
+		clear_props(tofs, origprops);
+		/*
+		 * XXX - Note, this is all-or-nothing; should be best-effort.
+		 */
+		(void) zfs_set_prop_nvlist(tofs, props);
 	}
 
-	/*
-	 * Set properties.  Note, we ignore errors.  Would be better to
-	 * do best-effort in zfs_set_prop_nvlist, too.
-	 */
-	(void) zfs_set_prop_nvlist(tofs, props);
-	nvlist_free(props);
-
 	off = fp->f_offset;
 	error = dmu_recv_stream(&drc, fp->f_vnode, &off);
 
-	if (error == 0) {
-		if (zfsvfs != NULL) {
-			char osname[MAXNAMELEN];
-			int mode;
-
-			error = zfs_suspend_fs(zfsvfs, osname, &mode);
-			if (error == 0) {
-				int resume_err;
-
-				error = dmu_recv_end(&drc);
-				resume_err = zfs_resume_fs(zfsvfs,
-				    osname, mode);
-				error = error ? error : resume_err;
-			} else {
-				dmu_recv_abort_cleanup(&drc);
-			}
+	if (error == 0 && zfsvfs) {
+		char osname[MAXNAMELEN];
+		int mode;
+
+		/* online recv */
+		error = zfs_suspend_fs(zfsvfs, osname, &mode);
+		if (error == 0) {
+			int resume_err;
+
+			error = dmu_recv_end(&drc);
+			resume_err = zfs_resume_fs(zfsvfs, osname, mode);
+			error = error ? error : resume_err;
 		} else {
-			error = dmu_recv_end(&drc);
+			dmu_recv_abort_cleanup(&drc);
 		}
-	}
-	if (zfsvfs != NULL) {
-		mutex_exit(&zfsvfs->z_online_recv_lock);
-		VFS_RELE(zfsvfs->z_vfs);
+	} else if (error == 0) {
+		error = dmu_recv_end(&drc);
 	}
 
 	zc->zc_cookie = off - fp->f_offset;
 	if (VOP_SEEK(fp->f_vnode, fp->f_offset, &off, NULL) == 0)
 		fp->f_offset = off;
 
+	/*
+	 * On error, restore the original props.
+	 */
+	if (error && props) {
+		clear_props(tofs, props);
+		(void) zfs_set_prop_nvlist(tofs, origprops);
+	}
+out:
+	if (zfsvfs) {
+		mutex_exit(&zfsvfs->z_online_recv_lock);
+		VFS_RELE(zfsvfs->z_vfs);
+	}
+	nvlist_free(props);
+	nvlist_free(origprops);
 	releasef(fd);
 	return (error);
 }
@@ -2456,7 +2413,7 @@
 	offset_t off;
 
 	error = dmu_objset_open(zc->zc_name, DMU_OST_ANY,
-	    DS_MODE_STANDARD | DS_MODE_READONLY, &tosnap);
+	    DS_MODE_USER | DS_MODE_READONLY, &tosnap);
 	if (error)
 		return (error);
 
@@ -2470,7 +2427,7 @@
 			*(cp+1) = 0;
 		(void) strncat(buf, zc->zc_value, sizeof (buf));
 		error = dmu_objset_open(buf, DMU_OST_ANY,
-		    DS_MODE_STANDARD | DS_MODE_READONLY, &fromsnap);
+		    DS_MODE_USER | DS_MODE_READONLY, &fromsnap);
 		if (error) {
 			dmu_objset_close(tosnap);
 			return (error);
--- a/usr/src/uts/common/fs/zfs/zfs_vfsops.c	Thu May 22 11:05:03 2008 -0700
+++ b/usr/src/uts/common/fs/zfs/zfs_vfsops.c	Thu May 22 11:13:47 2008 -0700
@@ -693,14 +693,13 @@
 	if (error = dsl_prop_get_integer(osname, "readonly", &readonly, NULL))
 		goto out;
 
+	mode = DS_MODE_OWNER;
 	if (readonly)
-		mode = DS_MODE_PRIMARY | DS_MODE_READONLY;
-	else
-		mode = DS_MODE_PRIMARY;
+		mode |= DS_MODE_READONLY;
 
 	error = dmu_objset_open(osname, DMU_OST_ZFS, mode, &zfsvfs->z_os);
 	if (error == EROFS) {
-		mode = DS_MODE_PRIMARY | DS_MODE_READONLY;
+		mode = DS_MODE_OWNER | DS_MODE_READONLY;
 		error = dmu_objset_open(osname, DMU_OST_ZFS, mode,
 		    &zfsvfs->z_os);
 	}
@@ -1311,7 +1310,7 @@
 		mutex_exit(&os->os->os_user_ptr_lock);
 
 		/*
-		 * Finally close the objset
+		 * Finally release the objset
 		 */
 		dmu_objset_close(os);
 	}
@@ -1580,7 +1579,7 @@
 	if (newvers < ZPL_VERSION_INITIAL || newvers > ZPL_VERSION)
 		return (EINVAL);
 
-	error = dmu_objset_open(name, DMU_OST_ZFS, DS_MODE_PRIMARY, &os);
+	error = dmu_objset_open(name, DMU_OST_ZFS, DS_MODE_OWNER, &os);
 	if (error)
 		return (error);
 
--- a/usr/src/uts/common/fs/zfs/zil.c	Thu May 22 11:05:03 2008 -0700
+++ b/usr/src/uts/common/fs/zfs/zil.c	Thu May 22 11:13:47 2008 -0700
@@ -499,7 +499,7 @@
 	objset_t *os;
 	int error;
 
-	error = dmu_objset_open(osname, DMU_OST_ANY, DS_MODE_STANDARD, &os);
+	error = dmu_objset_open(osname, DMU_OST_ANY, DS_MODE_USER, &os);
 	if (error) {
 		cmn_err(CE_WARN, "can't process intent log for %s", osname);
 		return (0);
--- a/usr/src/uts/common/fs/zfs/zvol.c	Thu May 22 11:05:03 2008 -0700
+++ b/usr/src/uts/common/fs/zfs/zvol.c	Thu May 22 11:13:47 2008 -0700
@@ -589,7 +589,7 @@
 	uint64_t volsize;
 	minor_t minor = 0;
 	struct pathname linkpath;
-	int ds_mode = DS_MODE_PRIMARY;
+	int ds_mode = DS_MODE_OWNER;
 	vnode_t *vp = NULL;
 	char *devpath;
 	size_t devpathlen = strlen(ZVOL_FULL_DEV_DIR) + strlen(name) + 1;