diff usr/src/uts/common/fs/zfs/dsl_dataset.c @ 6689:47572a2f5e73

6610506 Eliminate or improve retry logic from callers of dmu_objset_open() 6695465 divide by zero in txg_sync_thread() under heavy load
author maybee
date Thu, 22 May 2008 11:13:47 -0700
parents 3a34b0dbb107
children c511f317869e
line wrap: on
line diff
--- a/usr/src/uts/common/fs/zfs/dsl_dataset.c	Thu May 22 11:05:03 2008 -0700
+++ b/usr/src/uts/common/fs/zfs/dsl_dataset.c	Thu May 22 11:13:47 2008 -0700
@@ -41,6 +41,8 @@
 #include <sys/spa.h>
 #include <sys/sunddi.h>
 
+static char *dsl_reaper = "the grim reaper";
+
 static dsl_checkfunc_t dsl_dataset_destroy_begin_check;
 static dsl_syncfunc_t dsl_dataset_destroy_begin_sync;
 static dsl_checkfunc_t dsl_dataset_rollback_check;
@@ -51,22 +53,9 @@
 
 #define	DSL_DEADLIST_BLOCKSIZE	SPA_MAXBLOCKSIZE
 
-/*
- * We use weighted reference counts to express the various forms of exclusion
- * between different open modes.  A STANDARD open is 1 point, an EXCLUSIVE open
- * is DS_REF_MAX, and a PRIMARY open is little more than half of an EXCLUSIVE.
- * This makes the exclusion logic simple: the total refcnt for all opens cannot
- * exceed DS_REF_MAX.  For example, EXCLUSIVE opens are exclusive because their
- * weight (DS_REF_MAX) consumes the entire refcnt space.  PRIMARY opens consume
- * just over half of the refcnt space, so there can't be more than one, but it
- * can peacefully coexist with any number of STANDARD opens.
- */
-static uint64_t ds_refcnt_weight[DS_MODE_LEVELS] = {
-	0,			/* DS_MODE_NONE - invalid		*/
-	1,			/* DS_MODE_STANDARD - unlimited number	*/
-	(DS_REF_MAX >> 1) + 1,	/* DS_MODE_PRIMARY - only one of these	*/
-	DS_REF_MAX		/* DS_MODE_EXCLUSIVE - no other opens	*/
-};
+#define	DSL_DATASET_IS_DESTROYED(ds)	((ds)->ds_owner == dsl_reaper)
+
+static void dsl_dataset_drop_ref(dsl_dataset_t *ds, void *tag);
 
 /*
  * Figure out how much of this delta should be propogated to the dsl_dir
@@ -237,9 +226,7 @@
 {
 	dsl_dataset_t *ds = dsv;
 
-	/* open_refcount == DS_REF_MAX when deleting */
-	ASSERT(ds->ds_open_refcount == 0 ||
-	    ds->ds_open_refcount == DS_REF_MAX);
+	ASSERT(ds->ds_owner == NULL || DSL_DATASET_IS_DESTROYED(ds));
 
 	dprintf_ds(ds, "evicting %s\n", "");
 
@@ -249,18 +236,21 @@
 		ds->ds_user_evict_func(ds, ds->ds_user_ptr);
 
 	if (ds->ds_prev) {
-		dsl_dataset_close(ds->ds_prev, DS_MODE_NONE, ds);
+		dsl_dataset_drop_ref(ds->ds_prev, ds);
 		ds->ds_prev = NULL;
 	}
 
 	bplist_close(&ds->ds_deadlist);
-	dsl_dir_close(ds->ds_dir, ds);
+	if (ds->ds_dir)
+		dsl_dir_close(ds->ds_dir, ds);
 
 	ASSERT(!list_link_active(&ds->ds_synced_link));
 
 	mutex_destroy(&ds->ds_lock);
 	mutex_destroy(&ds->ds_opening_lock);
 	mutex_destroy(&ds->ds_deadlist.bpl_lock);
+	rw_destroy(&ds->ds_rwlock);
+	cv_destroy(&ds->ds_exclusive_cv);
 
 	kmem_free(ds, sizeof (dsl_dataset_t));
 }
@@ -291,47 +281,48 @@
 }
 
 static int
-dsl_dataset_snap_lookup(objset_t *os, uint64_t flags,
-    uint64_t snapnames_zapobj, const char *name, uint64_t *value)
+dsl_dataset_snap_lookup(dsl_dataset_t *ds, const char *name, uint64_t *value)
 {
+	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
+	uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
 	matchtype_t mt;
 	int err;
 
-	if (flags & DS_FLAG_CI_DATASET)
+	if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
 		mt = MT_FIRST;
 	else
 		mt = MT_EXACT;
 
-	err = zap_lookup_norm(os, snapnames_zapobj, name, 8, 1,
+	err = zap_lookup_norm(mos, snapobj, name, 8, 1,
 	    value, mt, NULL, 0, NULL);
 	if (err == ENOTSUP && mt == MT_FIRST)
-		err = zap_lookup(os, snapnames_zapobj, name, 8, 1, value);
+		err = zap_lookup(mos, snapobj, name, 8, 1, value);
 	return (err);
 }
 
 static int
-dsl_dataset_snap_remove(objset_t *os, uint64_t flags,
-    uint64_t snapnames_zapobj, char *name, dmu_tx_t *tx)
+dsl_dataset_snap_remove(dsl_dataset_t *ds, char *name, dmu_tx_t *tx)
 {
+	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
+	uint64_t snapobj = ds->ds_phys->ds_snapnames_zapobj;
 	matchtype_t mt;
 	int err;
 
-	if (flags & DS_FLAG_CI_DATASET)
+	if (ds->ds_phys->ds_flags & DS_FLAG_CI_DATASET)
 		mt = MT_FIRST;
 	else
 		mt = MT_EXACT;
 
-	err = zap_remove_norm(os, snapnames_zapobj, name, mt, tx);
+	err = zap_remove_norm(mos, snapobj, name, mt, tx);
 	if (err == ENOTSUP && mt == MT_FIRST)
-		err = zap_remove(os, snapnames_zapobj, name, tx);
+		err = zap_remove(mos, snapobj, name, tx);
 	return (err);
 }
 
-int
-dsl_dataset_open_obj(dsl_pool_t *dp, uint64_t dsobj, const char *snapname,
-    int mode, void *tag, dsl_dataset_t **dsp)
+static int
+dsl_dataset_get_ref(dsl_pool_t *dp, uint64_t dsobj, void *tag,
+    dsl_dataset_t **dsp)
 {
-	uint64_t weight = ds_refcnt_weight[DS_MODE_LEVEL(mode)];
 	objset_t *mos = dp->dp_meta_objset;
 	dmu_buf_t *dbuf;
 	dsl_dataset_t *ds;
@@ -356,6 +347,8 @@
 		mutex_init(&ds->ds_opening_lock, NULL, MUTEX_DEFAULT, NULL);
 		mutex_init(&ds->ds_deadlist.bpl_lock, NULL, MUTEX_DEFAULT,
 		    NULL);
+		rw_init(&ds->ds_rwlock, 0, 0, 0);
+		cv_init(&ds->ds_exclusive_cv, NULL, CV_DEFAULT, NULL);
 
 		err = bplist_open(&ds->ds_deadlist,
 		    mos, ds->ds_phys->ds_deadlist_obj);
@@ -371,6 +364,8 @@
 			mutex_destroy(&ds->ds_lock);
 			mutex_destroy(&ds->ds_opening_lock);
 			mutex_destroy(&ds->ds_deadlist.bpl_lock);
+			rw_destroy(&ds->ds_rwlock);
+			cv_destroy(&ds->ds_exclusive_cv);
 			kmem_free(ds, sizeof (dsl_dataset_t));
 			dmu_buf_rele(dbuf, tag);
 			return (err);
@@ -379,35 +374,12 @@
 		if (ds->ds_dir->dd_phys->dd_head_dataset_obj == dsobj) {
 			ds->ds_snapname[0] = '\0';
 			if (ds->ds_phys->ds_prev_snap_obj) {
-				err = dsl_dataset_open_obj(dp,
-				    ds->ds_phys->ds_prev_snap_obj, NULL,
-				    DS_MODE_NONE, ds, &ds->ds_prev);
+				err = dsl_dataset_get_ref(dp,
+				    ds->ds_phys->ds_prev_snap_obj,
+				    ds, &ds->ds_prev);
 			}
-		} else {
-			if (snapname) {
-#ifdef ZFS_DEBUG
-				dsl_dataset_phys_t *headphys;
-				dmu_buf_t *headdbuf;
-				err = dmu_bonus_hold(mos,
-				    ds->ds_dir->dd_phys->dd_head_dataset_obj,
-				    FTAG, &headdbuf);
-				if (err == 0) {
-					uint64_t foundobj;
-
-					headphys = headdbuf->db_data;
-					err = dsl_dataset_snap_lookup(
-					    dp->dp_meta_objset,
-					    headphys->ds_flags,
-					    headphys->ds_snapnames_zapobj,
-					    snapname, &foundobj);
-					ASSERT3U(foundobj, ==, dsobj);
-					dmu_buf_rele(headdbuf, FTAG);
-				}
-#endif
-				(void) strcat(ds->ds_snapname, snapname);
-			} else if (zfs_flags & ZFS_DEBUG_SNAPNAMES) {
-				err = dsl_dataset_get_snapname(ds);
-			}
+		} else if (zfs_flags & ZFS_DEBUG_SNAPNAMES) {
+			err = dsl_dataset_get_snapname(ds);
 		}
 
 		if (!dsl_dataset_is_snapshot(ds)) {
@@ -444,14 +416,14 @@
 		}
 		if (err || winner) {
 			bplist_close(&ds->ds_deadlist);
-			if (ds->ds_prev) {
-				dsl_dataset_close(ds->ds_prev,
-				    DS_MODE_NONE, ds);
-			}
+			if (ds->ds_prev)
+				dsl_dataset_drop_ref(ds->ds_prev, ds);
 			dsl_dir_close(ds->ds_dir, ds);
 			mutex_destroy(&ds->ds_lock);
 			mutex_destroy(&ds->ds_opening_lock);
 			mutex_destroy(&ds->ds_deadlist.bpl_lock);
+			rw_destroy(&ds->ds_rwlock);
+			cv_destroy(&ds->ds_exclusive_cv);
 			kmem_free(ds, sizeof (dsl_dataset_t));
 			if (err) {
 				dmu_buf_rele(dbuf, tag);
@@ -465,93 +437,166 @@
 	}
 	ASSERT3P(ds->ds_dbuf, ==, dbuf);
 	ASSERT3P(ds->ds_phys, ==, dbuf->db_data);
-
 	mutex_enter(&ds->ds_lock);
-	if ((DS_MODE_LEVEL(mode) == DS_MODE_PRIMARY &&
-	    (ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT) &&
-	    !DS_MODE_IS_INCONSISTENT(mode)) ||
-	    (ds->ds_open_refcount + weight > DS_REF_MAX)) {
+	if (!dsl_pool_sync_context(dp) && DSL_DATASET_IS_DESTROYED(ds)) {
 		mutex_exit(&ds->ds_lock);
-		dsl_dataset_close(ds, DS_MODE_NONE, tag);
-		return (EBUSY);
+		dmu_buf_rele(ds->ds_dbuf, tag);
+		return (ENOENT);
+	}
+	mutex_exit(&ds->ds_lock);
+	*dsp = ds;
+	return (0);
+}
+
+static int
+dsl_dataset_hold_ref(dsl_dataset_t *ds, void *tag)
+{
+	dsl_pool_t *dp = ds->ds_dir->dd_pool;
+
+	/*
+	 * In syncing context we don't want the rwlock lock: there
+	 * may be an existing writer waiting for sync phase to
+	 * finish.  We don't need to worry about such writers, since
+	 * sync phase is single-threaded, so the writer can't be
+	 * doing anything while we are active.
+	 */
+	if (dsl_pool_sync_context(dp)) {
+		ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
+		return (0);
 	}
-	ds->ds_open_refcount += weight;
+
+	/*
+	 * Normal users will hold the ds_rwlock as a READER until they
+	 * are finished (i.e., call dsl_dataset_rele()).  "Owners" will
+	 * drop their READER lock after they set the ds_owner field.
+	 *
+	 * If the dataset is being destroyed, the destroy thread will
+	 * obtain a WRITER lock for exclusive access after it's done its
+	 * open-context work and then change the ds_owner to
+	 * dsl_reaper once destruction is assured.  So threads
+	 * may block here temporarily, until the "destructability" of
+	 * the dataset is determined.
+	 */
+	ASSERT(!RW_WRITE_HELD(&dp->dp_config_rwlock));
+	mutex_enter(&ds->ds_lock);
+	while (!rw_tryenter(&ds->ds_rwlock, RW_READER)) {
+		rw_exit(&dp->dp_config_rwlock);
+		cv_wait(&ds->ds_exclusive_cv, &ds->ds_lock);
+		if (DSL_DATASET_IS_DESTROYED(ds)) {
+			mutex_exit(&ds->ds_lock);
+			dsl_dataset_drop_ref(ds, tag);
+			rw_enter(&dp->dp_config_rwlock, RW_READER);
+			return (ENOENT);
+		}
+		rw_enter(&dp->dp_config_rwlock, RW_READER);
+	}
 	mutex_exit(&ds->ds_lock);
-
-	*dsp = ds;
 	return (0);
 }
 
 int
-dsl_dataset_open_spa(spa_t *spa, const char *name, int mode,
-    void *tag, dsl_dataset_t **dsp)
+dsl_dataset_hold_obj(dsl_pool_t *dp, uint64_t dsobj, void *tag,
+    dsl_dataset_t **dsp)
+{
+	int err = dsl_dataset_get_ref(dp, dsobj, tag, dsp);
+
+	if (err)
+		return (err);
+	return (dsl_dataset_hold_ref(*dsp, tag));
+}
+
+int
+dsl_dataset_own_obj(dsl_pool_t *dp, uint64_t dsobj, int flags, void *owner,
+    dsl_dataset_t **dsp)
+{
+	int err = dsl_dataset_hold_obj(dp, dsobj, owner, dsp);
+
+	ASSERT(DS_MODE_TYPE(flags) != DS_MODE_USER);
+
+	if (err)
+		return (err);
+	if (!dsl_dataset_tryown(*dsp, DS_MODE_IS_INCONSISTENT(flags), owner)) {
+		dsl_dataset_rele(*dsp, owner);
+		return (EBUSY);
+	}
+	return (0);
+}
+
+int
+dsl_dataset_hold(const char *name, void *tag, dsl_dataset_t **dsp)
 {
 	dsl_dir_t *dd;
 	dsl_pool_t *dp;
-	const char *tail;
+	const char *snapname;
 	uint64_t obj;
-	dsl_dataset_t *ds = NULL;
 	int err = 0;
 
-	err = dsl_dir_open_spa(spa, name, FTAG, &dd, &tail);
+	err = dsl_dir_open_spa(NULL, name, FTAG, &dd, &snapname);
 	if (err)
 		return (err);
 
 	dp = dd->dd_pool;
 	obj = dd->dd_phys->dd_head_dataset_obj;
 	rw_enter(&dp->dp_config_rwlock, RW_READER);
-	if (obj == 0) {
-		/* A dataset with no associated objset */
+	if (obj)
+		err = dsl_dataset_get_ref(dp, obj, tag, dsp);
+	else
 		err = ENOENT;
+	if (err)
 		goto out;
-	}
-
-	if (tail != NULL) {
-		objset_t *mos = dp->dp_meta_objset;
-		uint64_t flags;
-
-		err = dsl_dataset_open_obj(dp, obj, NULL,
-		    DS_MODE_NONE, tag, &ds);
-		if (err)
-			goto out;
-		flags = ds->ds_phys->ds_flags;
-		obj = ds->ds_phys->ds_snapnames_zapobj;
-		dsl_dataset_close(ds, DS_MODE_NONE, tag);
-		ds = NULL;
-
-		if (tail[0] != '@') {
+
+	err = dsl_dataset_hold_ref(*dsp, tag);
+
+	/* we may be looking for a snapshot */
+	if (err == 0 && snapname != NULL) {
+		dsl_dataset_t *ds = NULL;
+
+		if (*snapname++ != '@') {
+			dsl_dataset_rele(*dsp, tag);
 			err = ENOENT;
 			goto out;
 		}
-		tail++;
-
-		/* Look for a snapshot */
-		if (!DS_MODE_IS_READONLY(mode)) {
-			err = EROFS;
-			goto out;
+
+		dprintf("looking for snapshot '%s'\n", snapname);
+		err = dsl_dataset_snap_lookup(*dsp, snapname, &obj);
+		if (err == 0)
+			err = dsl_dataset_get_ref(dp, obj, tag, &ds);
+		dsl_dataset_rele(*dsp, tag);
+
+		ASSERT3U((err == 0), ==, (ds != NULL));
+
+		if (ds) {
+			mutex_enter(&ds->ds_lock);
+			if (ds->ds_snapname[0] == 0)
+				(void) strlcpy(ds->ds_snapname, snapname,
+				    sizeof (ds->ds_snapname));
+			mutex_exit(&ds->ds_lock);
+			err = dsl_dataset_hold_ref(ds, tag);
+			*dsp = err ? NULL : ds;
 		}
-		dprintf("looking for snapshot '%s'\n", tail);
-		err = dsl_dataset_snap_lookup(mos, flags, obj, tail, &obj);
-		if (err)
-			goto out;
 	}
-	err = dsl_dataset_open_obj(dp, obj, tail, mode, tag, &ds);
-
 out:
 	rw_exit(&dp->dp_config_rwlock);
 	dsl_dir_close(dd, FTAG);
-
-	ASSERT3U((err == 0), ==, (ds != NULL));
-	/* ASSERT(ds == NULL || strcmp(name, ds->ds_name) == 0); */
-
-	*dsp = ds;
 	return (err);
 }
 
 int
-dsl_dataset_open(const char *name, int mode, void *tag, dsl_dataset_t **dsp)
+dsl_dataset_own(const char *name, int flags, void *owner, dsl_dataset_t **dsp)
 {
-	return (dsl_dataset_open_spa(NULL, name, mode, tag, dsp));
+	int err = dsl_dataset_hold(name, owner, dsp);
+	if (err)
+		return (err);
+	if ((*dsp)->ds_phys->ds_num_children > 0 &&
+	    !DS_MODE_IS_READONLY(flags)) {
+		dsl_dataset_rele(*dsp, owner);
+		return (EROFS);
+	}
+	if (!dsl_dataset_tryown(*dsp, DS_MODE_IS_INCONSISTENT(flags), owner)) {
+		dsl_dataset_rele(*dsp, owner);
+		return (EBUSY);
+	}
+	return (0);
 }
 
 void
@@ -564,11 +609,11 @@
 		VERIFY(0 == dsl_dataset_get_snapname(ds));
 		if (ds->ds_snapname[0]) {
 			(void) strcat(name, "@");
+			/*
+			 * We use a "recursive" mutex so that we
+			 * can call dprintf_ds() with ds_lock held.
+			 */
 			if (!MUTEX_HELD(&ds->ds_lock)) {
-				/*
-				 * We use a "recursive" mutex so that we
-				 * can call dprintf_ds() with ds_lock held.
-				 */
 				mutex_enter(&ds->ds_lock);
 				(void) strcat(name, ds->ds_snapname);
 				mutex_exit(&ds->ds_lock);
@@ -592,7 +637,6 @@
 		if (ds->ds_snapname[0]) {
 			++result;	/* adding one for the @-sign */
 			if (!MUTEX_HELD(&ds->ds_lock)) {
-				/* see dsl_datset_name */
 				mutex_enter(&ds->ds_lock);
 				result += strlen(ds->ds_snapname);
 				mutex_exit(&ds->ds_lock);
@@ -605,49 +649,64 @@
 	return (result);
 }
 
-void
-dsl_dataset_close(dsl_dataset_t *ds, int mode, void *tag)
+static void
+dsl_dataset_drop_ref(dsl_dataset_t *ds, void *tag)
 {
-	uint64_t weight = ds_refcnt_weight[DS_MODE_LEVEL(mode)];
-	mutex_enter(&ds->ds_lock);
-	ASSERT3U(ds->ds_open_refcount, >=, weight);
-	ds->ds_open_refcount -= weight;
-	mutex_exit(&ds->ds_lock);
-
 	dmu_buf_rele(ds->ds_dbuf, tag);
 }
 
 void
-dsl_dataset_downgrade(dsl_dataset_t *ds, int oldmode, int newmode)
+dsl_dataset_rele(dsl_dataset_t *ds, void *tag)
 {
-	uint64_t oldweight = ds_refcnt_weight[DS_MODE_LEVEL(oldmode)];
-	uint64_t newweight = ds_refcnt_weight[DS_MODE_LEVEL(newmode)];
+	ASSERT(ds->ds_owner != tag);
+	if (!dsl_pool_sync_context(ds->ds_dir->dd_pool)) {
+		rw_exit(&ds->ds_rwlock);
+	}
+	dsl_dataset_drop_ref(ds, tag);
+}
+
+void
+dsl_dataset_disown(dsl_dataset_t *ds, void *owner)
+{
+	ASSERT((ds->ds_owner == owner && ds->ds_dbuf) ||
+	    (DSL_DATASET_IS_DESTROYED(ds) && ds->ds_dbuf == NULL));
+
 	mutex_enter(&ds->ds_lock);
-	ASSERT3U(ds->ds_open_refcount, >=, oldweight);
-	ASSERT3U(oldweight, >=, newweight);
-	ds->ds_open_refcount -= oldweight;
-	ds->ds_open_refcount += newweight;
+	ds->ds_owner = NULL;
+	if (RW_WRITE_HELD(&ds->ds_rwlock)) {
+		rw_exit(&ds->ds_rwlock);
+		cv_broadcast(&ds->ds_exclusive_cv);
+	}
 	mutex_exit(&ds->ds_lock);
+	if (ds->ds_dbuf)
+		dsl_dataset_drop_ref(ds, owner);
+	else
+		dsl_dataset_evict(ds->ds_dbuf, ds);
 }
 
 boolean_t
-dsl_dataset_tryupgrade(dsl_dataset_t *ds, int oldmode, int newmode)
+dsl_dataset_tryown(dsl_dataset_t *ds, boolean_t inconsistentok, void *owner)
 {
-	boolean_t rv;
-	uint64_t oldweight = ds_refcnt_weight[DS_MODE_LEVEL(oldmode)];
-	uint64_t newweight = ds_refcnt_weight[DS_MODE_LEVEL(newmode)];
+	boolean_t gotit = FALSE;
+
 	mutex_enter(&ds->ds_lock);
-	ASSERT3U(ds->ds_open_refcount, >=, oldweight);
-	ASSERT3U(newweight, >=, oldweight);
-	if (ds->ds_open_refcount - oldweight + newweight > DS_REF_MAX) {
-		rv = B_FALSE;
-	} else {
-		ds->ds_open_refcount -= oldweight;
-		ds->ds_open_refcount += newweight;
-		rv = B_TRUE;
+	if (ds->ds_owner == NULL &&
+	    (!DS_IS_INCONSISTENT(ds) || inconsistentok)) {
+		ds->ds_owner = owner;
+		if (!dsl_pool_sync_context(ds->ds_dir->dd_pool))
+			rw_exit(&ds->ds_rwlock);
+		gotit = TRUE;
 	}
 	mutex_exit(&ds->ds_lock);
-	return (rv);
+	return (gotit);
+}
+
+void
+dsl_dataset_make_exclusive(dsl_dataset_t *ds, void *owner)
+{
+	ASSERT3P(owner, ==, ds->ds_owner);
+	if (!RW_WRITE_HELD(&ds->ds_rwlock))
+		rw_enter(&ds->ds_rwlock, RW_WRITER);
 }
 
 void
@@ -687,11 +746,10 @@
 	dd->dd_phys->dd_head_dataset_obj = dsobj;
 	dsl_dir_close(dd, FTAG);
 
-	VERIFY(0 ==
-	    dsl_dataset_open_obj(dp, dsobj, NULL, DS_MODE_NONE, FTAG, &ds));
+	VERIFY(0 == dsl_dataset_get_ref(dp, dsobj, FTAG, &ds));
 	(void) dmu_objset_create_impl(dp->dp_spa, ds,
 	    &ds->ds_phys->ds_bp, DMU_OST_ZFS, tx);
-	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
+	dsl_dataset_drop_ref(ds, FTAG);
 }
 
 uint64_t
@@ -714,6 +772,7 @@
 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
 	dmu_buf_will_dirty(dbuf, tx);
 	dsphys = dbuf->db_data;
+	bzero(dsphys, sizeof (dsl_dataset_phys_t));
 	dsphys->ds_dir_obj = dd->dd_object;
 	dsphys->ds_flags = flags;
 	dsphys->ds_fsid_guid = unique_create();
@@ -796,21 +855,20 @@
 
 	(void) strcat(name, "@");
 	(void) strcat(name, da->snapname);
-	err = dsl_dataset_open(name,
-	    DS_MODE_EXCLUSIVE | DS_MODE_READONLY | DS_MODE_INCONSISTENT,
+	err = dsl_dataset_own(name, DS_MODE_READONLY | DS_MODE_INCONSISTENT,
 	    da->dstg, &ds);
 	cp = strchr(name, '@');
 	*cp = '\0';
-	if (err == ENOENT)
-		return (0);
-	if (err) {
+	if (err == 0) {
+		dsl_dataset_make_exclusive(ds, da->dstg);
+		dsl_sync_task_create(da->dstg, dsl_dataset_destroy_check,
+		    dsl_dataset_destroy_sync, ds, da->dstg, 0);
+	} else if (err == ENOENT) {
+		err = 0;
+	} else {
 		(void) strcpy(da->failed, name);
-		return (err);
 	}
-
-	dsl_sync_task_create(da->dstg, dsl_dataset_destroy_check,
-	    dsl_dataset_destroy_sync, ds, da->dstg, 0);
-	return (0);
+	return (err);
 }
 
 /*
@@ -841,16 +899,14 @@
 	for (dst = list_head(&da.dstg->dstg_tasks); dst;
 	    dst = list_next(&da.dstg->dstg_tasks, dst)) {
 		dsl_dataset_t *ds = dst->dst_arg1;
+		/*
+		 * Return the file system name that triggered the error
+		 */
 		if (dst->dst_err) {
 			dsl_dataset_name(ds, fsname);
 			*strchr(fsname, '@') = '\0';
 		}
-		/*
-		 * If it was successful, destroy_sync would have
-		 * closed the ds
-		 */
-		if (err)
-			dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, da.dstg);
+		dsl_dataset_disown(ds, da.dstg);
 	}
 
 	dsl_sync_task_group_destroy(da.dstg);
@@ -859,9 +915,8 @@
 }
 
 /*
- * ds must be opened EXCLUSIVE or PRIMARY.  on return (whether
- * successful or not), ds will be closed and caller can no longer
- * dereference it.
+ * ds must be opened as OWNER.  On return (whether successful or not),
+ * ds will be closed and caller can no longer dereference it.
  */
 int
 dsl_dataset_destroy(dsl_dataset_t *ds, void *tag)
@@ -872,16 +927,9 @@
 	dsl_dir_t *dd;
 	uint64_t obj;
 
-	if (ds->ds_open_refcount != DS_REF_MAX) {
-		if (dsl_dataset_tryupgrade(ds, DS_MODE_PRIMARY,
-		    DS_MODE_EXCLUSIVE) == 0) {
-			dsl_dataset_close(ds, DS_MODE_PRIMARY, tag);
-			return (EBUSY);
-		}
-	}
-
 	if (dsl_dataset_is_snapshot(ds)) {
 		/* Destroying a snapshot is simpler */
+		dsl_dataset_make_exclusive(ds, tag);
 		err = dsl_sync_task_do(ds->ds_dir->dd_pool,
 		    dsl_dataset_destroy_check, dsl_dataset_destroy_sync,
 		    ds, tag, 0);
@@ -925,14 +973,18 @@
 		VERIFY(0 == dmu_object_free(os, obj, tx));
 		dmu_tx_commit(tx);
 	}
-	/* Make sure it's not dirty before we finish destroying it. */
-	txg_wait_synced(dd->dd_pool, 0);
 
 	dmu_objset_close(os);
 	if (err != ESRCH)
 		goto out;
 
 	if (ds->ds_user_ptr) {
+		/*
+		 * We need to sync out all in-flight IO before we try
+		 * to evict (the dataset evict func is trying to clear
+		 * the cached entries for this dataset in the ARC).
+		 */
+		txg_wait_synced(dd->dd_pool, 0);
 		ds->ds_user_evict_func(ds, ds->ds_user_ptr);
 		ds->ds_user_ptr = NULL;
 	}
@@ -947,6 +999,7 @@
 	/*
 	 * Blow away the dsl_dir + head dataset.
 	 */
+	dsl_dataset_make_exclusive(ds, tag);
 	dstg = dsl_sync_task_group_create(ds->ds_dir->dd_pool);
 	dsl_sync_task_create(dstg, dsl_dataset_destroy_check,
 	    dsl_dataset_destroy_sync, ds, tag, 0);
@@ -954,19 +1007,18 @@
 	    dsl_dir_destroy_sync, dd, FTAG, 0);
 	err = dsl_sync_task_group_wait(dstg);
 	dsl_sync_task_group_destroy(dstg);
-	/* if it is successful, *destroy_sync will close the ds+dd */
+	/* if it is successful, dsl_dir_destroy_sync will close the dd */
 	if (err)
 		dsl_dir_close(dd, FTAG);
 out:
-	if (err)
-		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, tag);
+	dsl_dataset_disown(ds, tag);
 	return (err);
 }
 
 int
 dsl_dataset_rollback(dsl_dataset_t *ds, dmu_objset_type_t ost)
 {
-	ASSERT3U(ds->ds_open_refcount, ==, DS_REF_MAX);
+	ASSERT(ds->ds_owner);
 
 	return (dsl_sync_task_do(ds->ds_dir->dd_pool,
 	    dsl_dataset_rollback_check, dsl_dataset_rollback_sync,
@@ -1164,7 +1216,7 @@
 		 * We need to make sure that the objset_impl_t is reopened after
 		 * we do the rollback, otherwise it will have the wrong
 		 * objset_phys_t.  Normally this would happen when this
-		 * DS_MODE_EXCLUSIVE dataset-open is closed, thus causing the
+		 * dataset-open is closed, thus causing the
 		 * dataset to be immediately evicted.  But when doing "zfs recv
 		 * -F", we reopen the objset before that, so that there is no
 		 * window where the dataset is closed and inconsistent.
@@ -1292,6 +1344,9 @@
 {
 	dsl_dataset_t *ds = arg1;
 
+	/* we have an owner hold, so noone else can destroy us */
+	ASSERT(!DSL_DATASET_IS_DESTROYED(ds));
+
 	/* Can't delete a branch point. */
 	if (ds->ds_phys->ds_num_children > 1)
 		return (EEXIST);
@@ -1316,6 +1371,46 @@
 	return (0);
 }
 
+struct refsarg {
+	kmutex_t lock;
+	boolean_t gone;
+	kcondvar_t cv;
+};
+
+/* ARGSUSED */
+static void
+dsl_dataset_refs_gone(dmu_buf_t *db, void *argv)
+{
+	struct refsarg *arg = argv;
+
+	mutex_enter(&arg->lock);
+	arg->gone = TRUE;
+	cv_signal(&arg->cv);
+	mutex_exit(&arg->lock);
+}
+
+static void
+dsl_dataset_drain_refs(dsl_dataset_t *ds, void *tag)
+{
+	struct refsarg arg;
+
+	mutex_init(&arg.lock, NULL, MUTEX_DEFAULT, NULL);
+	cv_init(&arg.cv, NULL, CV_DEFAULT, NULL);
+	arg.gone = FALSE;
+	(void) dmu_buf_update_user(ds->ds_dbuf, ds, &arg, &ds->ds_phys,
+	    dsl_dataset_refs_gone);
+	dmu_buf_rele(ds->ds_dbuf, tag);
+	mutex_enter(&arg.lock);
+	while (!arg.gone)
+		cv_wait(&arg.cv, &arg.lock);
+	ASSERT(arg.gone);
+	mutex_exit(&arg.lock);
+	ds->ds_dbuf = NULL;
+	ds->ds_phys = NULL;
+	mutex_destroy(&arg.lock);
+	cv_destroy(&arg.cv);
+}
+
 void
 dsl_dataset_destroy_sync(void *arg1, void *tag, cred_t *cr, dmu_tx_t *tx)
 {
@@ -1329,12 +1424,18 @@
 	dsl_dataset_t *ds_prev = NULL;
 	uint64_t obj;
 
-	ASSERT3U(ds->ds_open_refcount, ==, DS_REF_MAX);
+	ASSERT(ds->ds_owner);
 	ASSERT3U(ds->ds_phys->ds_num_children, <=, 1);
 	ASSERT(ds->ds_prev == NULL ||
 	    ds->ds_prev->ds_phys->ds_next_snap_obj != ds->ds_object);
 	ASSERT3U(ds->ds_phys->ds_bp.blk_birth, <=, tx->tx_txg);
 
+	/* signal any waiters that this dataset is going away */
+	mutex_enter(&ds->ds_lock);
+	ds->ds_owner = dsl_reaper;
+	cv_broadcast(&ds->ds_exclusive_cv);
+	mutex_exit(&ds->ds_lock);
+
 	/* Remove our reservation */
 	if (ds->ds_reserved != 0) {
 		uint64_t val = 0;
@@ -1350,9 +1451,8 @@
 		if (ds->ds_prev) {
 			ds_prev = ds->ds_prev;
 		} else {
-			VERIFY(0 == dsl_dataset_open_obj(dp,
-			    ds->ds_phys->ds_prev_snap_obj, NULL,
-			    DS_MODE_NONE, FTAG, &ds_prev));
+			VERIFY(0 == dsl_dataset_hold_obj(dp,
+			    ds->ds_phys->ds_prev_snap_obj, FTAG, &ds_prev));
 		}
 		after_branch_point =
 		    (ds_prev->ds_phys->ds_next_snap_obj != obj);
@@ -1379,9 +1479,8 @@
 
 		spa_scrub_restart(dp->dp_spa, tx->tx_txg);
 
-		VERIFY(0 == dsl_dataset_open_obj(dp,
-		    ds->ds_phys->ds_next_snap_obj, NULL,
-		    DS_MODE_NONE, FTAG, &ds_next));
+		VERIFY(0 == dsl_dataset_hold_obj(dp,
+		    ds->ds_phys->ds_next_snap_obj, FTAG, &ds_next));
 		ASSERT3U(ds_next->ds_phys->ds_prev_snap_obj, ==, obj);
 
 		old_unique = dsl_dataset_unique(ds_next);
@@ -1402,8 +1501,7 @@
 		 *
 		 * XXX we're doing this long task with the config lock held
 		 */
-		while (bplist_iterate(&ds_next->ds_deadlist, &itor,
-		    &bp) == 0) {
+		while (bplist_iterate(&ds_next->ds_deadlist, &itor, &bp) == 0) {
 			if (bp.blk_birth <= ds->ds_phys->ds_prev_snap_txg) {
 				VERIFY(0 == bplist_enqueue(&ds->ds_deadlist,
 				    &bp, tx));
@@ -1428,6 +1526,7 @@
 		bplist_destroy(mos, ds_next->ds_phys->ds_deadlist_obj, tx);
 
 		/* set next's deadlist to our deadlist */
+		bplist_close(&ds->ds_deadlist);
 		ds_next->ds_phys->ds_deadlist_obj =
 		    ds->ds_phys->ds_deadlist_obj;
 		VERIFY(0 == bplist_open(&ds_next->ds_deadlist, mos,
@@ -1449,9 +1548,9 @@
 			 */
 			dsl_dataset_t *ds_after_next;
 
-			VERIFY(0 == dsl_dataset_open_obj(dp,
-			    ds_next->ds_phys->ds_next_snap_obj, NULL,
-			    DS_MODE_NONE, FTAG, &ds_after_next));
+			VERIFY(0 == dsl_dataset_hold_obj(dp,
+			    ds_next->ds_phys->ds_next_snap_obj,
+			    FTAG, &ds_after_next));
 			itor = 0;
 			while (bplist_iterate(&ds_after_next->ds_deadlist,
 			    &itor, &bp) == 0) {
@@ -1464,18 +1563,16 @@
 				}
 			}
 
-			dsl_dataset_close(ds_after_next, DS_MODE_NONE, FTAG);
+			dsl_dataset_rele(ds_after_next, FTAG);
 			ASSERT3P(ds_next->ds_prev, ==, NULL);
 		} else {
 			ASSERT3P(ds_next->ds_prev, ==, ds);
-			dsl_dataset_close(ds_next->ds_prev, DS_MODE_NONE,
-			    ds_next);
+			dsl_dataset_drop_ref(ds_next->ds_prev, ds_next);
+			ds_next->ds_prev = NULL;
 			if (ds_prev) {
-				VERIFY(0 == dsl_dataset_open_obj(dp,
-				    ds->ds_phys->ds_prev_snap_obj, NULL,
-				    DS_MODE_NONE, ds_next, &ds_next->ds_prev));
-			} else {
-				ds_next->ds_prev = NULL;
+				VERIFY(0 == dsl_dataset_get_ref(dp,
+				    ds->ds_phys->ds_prev_snap_obj,
+				    ds_next, &ds_next->ds_prev));
 			}
 
 			dsl_dataset_recalc_head_uniq(ds_next);
@@ -1497,7 +1594,7 @@
 				    0, 0, tx);
 			}
 		}
-		dsl_dataset_close(ds_next, DS_MODE_NONE, FTAG);
+		dsl_dataset_rele(ds_next, FTAG);
 
 		/*
 		 * NB: unique_bytes might not be accurate for the head objset.
@@ -1543,56 +1640,46 @@
 
 	dsl_dir_diduse_space(ds->ds_dir, -used, -compressed, -uncompressed, tx);
 
-	if (ds->ds_phys->ds_snapnames_zapobj) {
+	if (ds->ds_dir->dd_phys->dd_head_dataset_obj == ds->ds_object) {
+		/* Erase the link in the dir */
+		dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
+		ds->ds_dir->dd_phys->dd_head_dataset_obj = 0;
+		ASSERT(ds->ds_phys->ds_snapnames_zapobj != 0);
 		err = zap_destroy(mos, ds->ds_phys->ds_snapnames_zapobj, tx);
 		ASSERT(err == 0);
-	}
-
-	if (ds->ds_dir->dd_phys->dd_head_dataset_obj == ds->ds_object) {
-		/* Erase the link in the dataset */
-		dmu_buf_will_dirty(ds->ds_dir->dd_dbuf, tx);
-		ds->ds_dir->dd_phys->dd_head_dataset_obj = 0;
-		/*
-		 * dsl_dir_sync_destroy() called us, they'll destroy
-		 * the dataset.
-		 */
 	} else {
 		/* remove from snapshot namespace */
 		dsl_dataset_t *ds_head;
-		VERIFY(0 == dsl_dataset_open_obj(dp,
-		    ds->ds_dir->dd_phys->dd_head_dataset_obj, NULL,
-		    DS_MODE_NONE, FTAG, &ds_head));
+		ASSERT(ds->ds_phys->ds_snapnames_zapobj == 0);
+		VERIFY(0 == dsl_dataset_hold_obj(dp,
+		    ds->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &ds_head));
 		VERIFY(0 == dsl_dataset_get_snapname(ds));
 #ifdef ZFS_DEBUG
 		{
 			uint64_t val;
 
-			err = dsl_dataset_snap_lookup(mos,
-			    ds_head->ds_phys->ds_flags,
-			    ds_head->ds_phys->ds_snapnames_zapobj,
+			err = dsl_dataset_snap_lookup(ds_head,
 			    ds->ds_snapname, &val);
 			ASSERT3U(err, ==, 0);
 			ASSERT3U(val, ==, obj);
 		}
 #endif
-		err = dsl_dataset_snap_remove(mos,
-		    ds_head->ds_phys->ds_flags,
-		    ds_head->ds_phys->ds_snapnames_zapobj,
-		    ds->ds_snapname, tx);
+		err = dsl_dataset_snap_remove(ds_head, ds->ds_snapname, tx);
 		ASSERT(err == 0);
-		dsl_dataset_close(ds_head, DS_MODE_NONE, FTAG);
+		dsl_dataset_rele(ds_head, FTAG);
 	}
 
 	if (ds_prev && ds->ds_prev != ds_prev)
-		dsl_dataset_close(ds_prev, DS_MODE_NONE, FTAG);
+		dsl_dataset_rele(ds_prev, FTAG);
 
 	spa_prop_clear_bootfs(dp->dp_spa, ds->ds_object, tx);
 	spa_history_internal_log(LOG_DS_DESTROY, dp->dp_spa, tx,
 	    cr, "dataset = %llu", ds->ds_object);
 
-	dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, tag);
+	dsl_dir_close(ds->ds_dir, ds);
+	ds->ds_dir = NULL;
+	dsl_dataset_drain_refs(ds, tag);
 	VERIFY(0 == dmu_object_free(mos, obj, tx));
-
 }
 
 static int
@@ -1628,7 +1715,6 @@
 {
 	dsl_dataset_t *ds = arg1;
 	const char *snapname = arg2;
-	objset_t *mos = ds->ds_dir->dd_pool->dp_meta_objset;
 	int err;
 	uint64_t value;
 
@@ -1642,8 +1728,7 @@
 	/*
 	 * Check for conflicting name snapshot name.
 	 */
-	err = dsl_dataset_snap_lookup(mos, ds->ds_phys->ds_flags,
-	    ds->ds_phys->ds_snapnames_zapobj, snapname, &value);
+	err = dsl_dataset_snap_lookup(ds, snapname, &value);
 	if (err == 0)
 		return (EEXIST);
 	if (err != ENOENT)
@@ -1684,6 +1769,7 @@
 	VERIFY(0 == dmu_bonus_hold(mos, dsobj, FTAG, &dbuf));
 	dmu_buf_will_dirty(dbuf, tx);
 	dsphys = dbuf->db_data;
+	bzero(dsphys, sizeof (dsl_dataset_phys_t));
 	dsphys->ds_dir_obj = ds->ds_dir->dd_object;
 	dsphys->ds_fsid_guid = unique_create();
 	(void) random_get_pseudo_bytes((void*)&dsphys->ds_guid,
@@ -1744,10 +1830,9 @@
 	ASSERT(err == 0);
 
 	if (ds->ds_prev)
-		dsl_dataset_close(ds->ds_prev, DS_MODE_NONE, ds);
-	VERIFY(0 == dsl_dataset_open_obj(dp,
-	    ds->ds_phys->ds_prev_snap_obj, snapname,
-	    DS_MODE_NONE, ds, &ds->ds_prev));
+		dsl_dataset_drop_ref(ds->ds_prev, ds);
+	VERIFY(0 == dsl_dataset_get_ref(dp,
+	    ds->ds_phys->ds_prev_snap_obj, ds, &ds->ds_prev));
 
 	spa_history_internal_log(LOG_DS_SNAPSHOT, dp->dp_spa, tx, cr,
 	    "dataset = %llu", dsobj);
@@ -1823,11 +1908,10 @@
 	if (ds->ds_dir->dd_phys->dd_origin_obj) {
 		dsl_dataset_t *ods;
 
-		VERIFY(0 == dsl_dataset_open_obj(ds->ds_dir->dd_pool,
-		    ds->ds_dir->dd_phys->dd_origin_obj,
-		    NULL, DS_MODE_NONE, FTAG, &ods));
+		VERIFY(0 == dsl_dataset_get_ref(ds->ds_dir->dd_pool,
+		    ds->ds_dir->dd_phys->dd_origin_obj, FTAG, &ods));
 		dsl_dataset_name(ods, stat->dds_origin);
-		dsl_dataset_close(ods, DS_MODE_NONE, FTAG);
+		dsl_dataset_drop_ref(ods, FTAG);
 	}
 	rw_exit(&ds->ds_dir->dd_pool->dp_config_rwlock);
 }
@@ -1883,20 +1967,18 @@
 	dsl_dataset_t *ds = arg1;
 	char *newsnapname = arg2;
 	dsl_dir_t *dd = ds->ds_dir;
-	objset_t *mos = dd->dd_pool->dp_meta_objset;
 	dsl_dataset_t *hds;
 	uint64_t val;
 	int err;
 
-	err = dsl_dataset_open_obj(dd->dd_pool,
-	    dd->dd_phys->dd_head_dataset_obj, NULL, DS_MODE_NONE, FTAG, &hds);
+	err = dsl_dataset_hold_obj(dd->dd_pool,
+	    dd->dd_phys->dd_head_dataset_obj, FTAG, &hds);
 	if (err)
 		return (err);
 
 	/* new name better not be in use */
-	err = dsl_dataset_snap_lookup(mos, hds->ds_phys->ds_flags,
-	    hds->ds_phys->ds_snapnames_zapobj, newsnapname, &val);
-	dsl_dataset_close(hds, DS_MODE_NONE, FTAG);
+	err = dsl_dataset_snap_lookup(hds, newsnapname, &val);
+	dsl_dataset_rele(hds, FTAG);
 
 	if (err == 0)
 		err = EEXIST;
@@ -1923,12 +2005,11 @@
 
 	ASSERT(ds->ds_phys->ds_next_snap_obj != 0);
 
-	VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool,
-	    dd->dd_phys->dd_head_dataset_obj, NULL, DS_MODE_NONE, FTAG, &hds));
+	VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool,
+	    dd->dd_phys->dd_head_dataset_obj, FTAG, &hds));
 
 	VERIFY(0 == dsl_dataset_get_snapname(ds));
-	err = dsl_dataset_snap_remove(mos, hds->ds_phys->ds_flags,
-	    hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname, tx);
+	err = dsl_dataset_snap_remove(hds, ds->ds_snapname, tx);
 	ASSERT3U(err, ==, 0);
 	mutex_enter(&ds->ds_lock);
 	(void) strcpy(ds->ds_snapname, newsnapname);
@@ -1939,7 +2020,7 @@
 
 	spa_history_internal_log(LOG_DS_RENAME, dd->dd_pool->dp_spa, tx,
 	    cr, "dataset = %llu", ds->ds_object);
-	dsl_dataset_close(hds, DS_MODE_NONE, FTAG);
+	dsl_dataset_rele(hds, FTAG);
 }
 
 struct renamesnaparg {
@@ -1970,26 +2051,21 @@
 		return (err);
 	}
 
-	err = dsl_dataset_open(name, DS_MODE_READONLY | DS_MODE_STANDARD,
-	    ra->dstg, &ds);
+#ifdef _KERNEL
+	/*
+	 * For all filesystems undergoing rename, we'll need to unmount it.
+	 */
+	(void) zfs_unmount_snap(name, NULL);
+#endif
+	err = dsl_dataset_hold(name, ra->dstg, &ds);
+	*cp = '\0';
 	if (err == ENOENT) {
-		*cp = '\0';
 		return (0);
-	}
-	if (err) {
+	} else if (err) {
 		(void) strcpy(ra->failed, name);
-		*cp = '\0';
-		dsl_dataset_close(ds, DS_MODE_STANDARD, ra->dstg);
 		return (err);
 	}
 
-#ifdef _KERNEL
-	/* for all filesystems undergoing rename, we'll need to unmount it */
-	(void) zfs_unmount_snap(name, NULL);
-#endif
-
-	*cp = '\0';
-
 	dsl_sync_task_create(ra->dstg, dsl_dataset_snapshot_rename_check,
 	    dsl_dataset_snapshot_rename_sync, ds, ra->newsnap, 0);
 
@@ -2038,7 +2114,7 @@
 			(void) strcat(ra->failed, "@");
 			(void) strcat(ra->failed, ra->newsnap);
 		}
-		dsl_dataset_close(ds, DS_MODE_STANDARD, ra->dstg);
+		dsl_dataset_rele(ds, ra->dstg);
 	}
 
 	if (err)
@@ -2063,8 +2139,7 @@
 
 #pragma weak dmu_objset_rename = dsl_dataset_rename
 int
-dsl_dataset_rename(char *oldname, const char *newname,
-    boolean_t recursive)
+dsl_dataset_rename(char *oldname, const char *newname, boolean_t recursive)
 {
 	dsl_dir_t *dd;
 	dsl_dataset_t *ds;
@@ -2106,8 +2181,7 @@
 	if (recursive) {
 		err = dsl_recursive_rename(oldname, newname);
 	} else {
-		err = dsl_dataset_open(oldname,
-		    DS_MODE_READONLY | DS_MODE_STANDARD, FTAG, &ds);
+		err = dsl_dataset_hold(oldname, FTAG, &ds);
 		if (err)
 			return (err);
 
@@ -2115,15 +2189,22 @@
 		    dsl_dataset_snapshot_rename_check,
 		    dsl_dataset_snapshot_rename_sync, ds, (char *)tail, 1);
 
-		dsl_dataset_close(ds, DS_MODE_STANDARD, FTAG);
+		dsl_dataset_rele(ds, FTAG);
 	}
 
 	return (err);
 }
 
+struct promotedsarg {
+	list_node_t link;
+	dsl_dataset_t *ds;
+};
+
 struct promotearg {
+	list_t snap_list;
+	dsl_dataset_t *clone_origin, *old_head;
 	uint64_t used, comp, uncomp, unique;
-	uint64_t ds_flags, newnext_obj, snapnames_obj;
+	uint64_t newnext_obj;
 };
 
 /* ARGSUSED */
@@ -2132,139 +2213,112 @@
 {
 	dsl_dataset_t *hds = arg1;
 	struct promotearg *pa = arg2;
-	dsl_dir_t *dd = hds->ds_dir;
+	struct promotedsarg *snap = list_head(&pa->snap_list);
 	dsl_pool_t *dp = hds->ds_dir->dd_pool;
-	dsl_dir_t *odd = NULL;
-	dsl_dataset_t *ds = NULL;
-	dsl_dataset_t *origin_ds = NULL;
-	dsl_dataset_t *newnext_ds = NULL;
-	int err;
-	char *name = NULL;
+	dsl_dataset_t *origin_ds = snap->ds;
+	dsl_dataset_t *newnext_ds;
+	char *name;
 	uint64_t itor = 0;
 	blkptr_t bp;
-
-	bzero(pa, sizeof (*pa));
+	int err;
 
 	/* Check that it is a clone */
-	if (dd->dd_phys->dd_origin_obj == 0)
+	if (hds->ds_dir->dd_phys->dd_origin_obj == 0)
 		return (EINVAL);
 
 	/* Since this is so expensive, don't do the preliminary check */
 	if (!dmu_tx_is_syncing(tx))
 		return (0);
 
-	if (err = dsl_dataset_open_obj(dp, dd->dd_phys->dd_origin_obj,
-	    NULL, DS_MODE_EXCLUSIVE, FTAG, &origin_ds))
-		goto out;
-	odd = origin_ds->ds_dir;
-
-	{
-		dsl_dataset_t *phds;
-		if (err = dsl_dataset_open_obj(dd->dd_pool,
-		    odd->dd_phys->dd_head_dataset_obj,
-		    NULL, DS_MODE_NONE, FTAG, &phds))
-			goto out;
-		pa->ds_flags = phds->ds_phys->ds_flags;
-		pa->snapnames_obj = phds->ds_phys->ds_snapnames_zapobj;
-		dsl_dataset_close(phds, DS_MODE_NONE, FTAG);
-	}
-
-	if (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE) {
-		err = EXDEV;
-		goto out;
-	}
+	if (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE)
+		return (EXDEV);
 
 	/* find origin's new next ds */
-	VERIFY(0 == dsl_dataset_open_obj(dd->dd_pool, hds->ds_object,
-	    NULL, DS_MODE_NONE, FTAG, &newnext_ds));
+	newnext_ds = hds;
 	while (newnext_ds->ds_phys->ds_prev_snap_obj != origin_ds->ds_object) {
 		dsl_dataset_t *prev;
 
-		if (err = dsl_dataset_open_obj(dd->dd_pool,
-		    newnext_ds->ds_phys->ds_prev_snap_obj,
-		    NULL, DS_MODE_NONE, FTAG, &prev))
-			goto out;
-		dsl_dataset_close(newnext_ds, DS_MODE_NONE, FTAG);
+		err = dsl_dataset_hold_obj(dp,
+		    newnext_ds->ds_phys->ds_prev_snap_obj, FTAG, &prev);
+		if (newnext_ds != hds)
+			dsl_dataset_rele(newnext_ds, FTAG);
+		if (err)
+			return (err);
 		newnext_ds = prev;
 	}
 	pa->newnext_obj = newnext_ds->ds_object;
 
 	/* compute origin's new unique space */
+	pa->unique = 0;
 	while ((err = bplist_iterate(&newnext_ds->ds_deadlist,
 	    &itor, &bp)) == 0) {
 		if (bp.blk_birth > origin_ds->ds_phys->ds_prev_snap_txg)
-			pa->unique += bp_get_dasize(dd->dd_pool->dp_spa, &bp);
+			pa->unique += bp_get_dasize(dp->dp_spa, &bp);
 	}
+	if (newnext_ds != hds)
+		dsl_dataset_rele(newnext_ds, FTAG);
 	if (err != ENOENT)
-		goto out;
-
-	/* Walk the snapshots that we are moving */
+		return (err);
+
 	name = kmem_alloc(MAXPATHLEN, KM_SLEEP);
-	ds = origin_ds;
-	/* CONSTCOND */
-	while (TRUE) {
+
+	/*
+	 * Walk the snapshots that we are moving
+	 *
+	 * Compute space to transfer.  Each snapshot gave birth to:
+	 * (my used) - (prev's used) + (deadlist's used)
+	 * So a sequence would look like:
+	 * uN - u(N-1) + dN + ... + u1 - u0 + d1 + u0 - 0 + d0
+	 * Which simplifies to:
+	 * uN + dN + ... + d1 + d0
+	 * Note however, if we stop before we reach the ORIGIN we get:
+	 * uN + dN + ... + dM - uM-1
+	 */
+	pa->used = origin_ds->ds_phys->ds_used_bytes;
+	pa->comp = origin_ds->ds_phys->ds_compressed_bytes;
+	pa->uncomp = origin_ds->ds_phys->ds_uncompressed_bytes;
+	do {
 		uint64_t val, dlused, dlcomp, dluncomp;
-		dsl_dataset_t *prev;
+		dsl_dataset_t *ds = snap->ds;
 
 		/* Check that the snapshot name does not conflict */
 		dsl_dataset_name(ds, name);
-		err = dsl_dataset_snap_lookup(dd->dd_pool->dp_meta_objset,
-		    hds->ds_phys->ds_flags, hds->ds_phys->ds_snapnames_zapobj,
-		    ds->ds_snapname, &val);
-		if (err != ENOENT) {
-			if (err == 0)
-				err = EEXIST;
-			goto out;
-		}
-
-		/*
-		 * compute space to transfer.  Each snapshot gave birth to:
-		 * (my used) - (prev's used) + (deadlist's used)
-		 */
-		pa->used += ds->ds_phys->ds_used_bytes;
-		pa->comp += ds->ds_phys->ds_compressed_bytes;
-		pa->uncomp += ds->ds_phys->ds_uncompressed_bytes;
-
-		/* If we reach the first snapshot, we're done. */
-		if (ds->ds_phys->ds_prev_snap_obj == 0)
+		err = dsl_dataset_snap_lookup(hds, ds->ds_snapname, &val);
+		if (err == 0)
+			err = EEXIST;
+		if (err != ENOENT)
 			break;
-
-		if (err = bplist_space(&ds->ds_deadlist,
-		    &dlused, &dlcomp, &dluncomp))
-			goto out;
-		if (err = dsl_dataset_open_obj(dd->dd_pool,
-		    ds->ds_phys->ds_prev_snap_obj, NULL, DS_MODE_EXCLUSIVE,
-		    FTAG, &prev))
-			goto out;
-		pa->used += dlused - prev->ds_phys->ds_used_bytes;
-		pa->comp += dlcomp - prev->ds_phys->ds_compressed_bytes;
-		pa->uncomp += dluncomp - prev->ds_phys->ds_uncompressed_bytes;
-
-		/*
-		 * We could be a clone of a clone.  If we reach our
-		 * parent's branch point, we're done.
-		 */
-		if (prev->ds_phys->ds_next_snap_obj != ds->ds_object) {
-			dsl_dataset_close(prev, DS_MODE_EXCLUSIVE, FTAG);
-			break;
+		err = 0;
+
+		/* The very first snapshot does not have a deadlist */
+		if (ds->ds_phys->ds_prev_snap_obj != 0) {
+			if (err = bplist_space(&ds->ds_deadlist,
+			    &dlused, &dlcomp, &dluncomp))
+				break;
+			pa->used += dlused;
+			pa->comp += dlcomp;
+			pa->uncomp += dluncomp;
 		}
-		if (ds != origin_ds)
-			dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
-		ds = prev;
+	} while (snap = list_next(&pa->snap_list, snap));
+
+	/*
+	 * If we are a clone of a clone then we never reached ORIGIN,
+	 * so we need to subtract out the clone origin's used space.
+	 */
+	if (pa->clone_origin) {
+		pa->used -= pa->clone_origin->ds_phys->ds_used_bytes;
+		pa->comp -= pa->clone_origin->ds_phys->ds_compressed_bytes;
+		pa->uncomp -= pa->clone_origin->ds_phys->ds_uncompressed_bytes;
 	}
 
+	kmem_free(name, MAXPATHLEN);
+
 	/* Check that there is enough space here */
-	err = dsl_dir_transfer_possible(odd, dd, pa->used);
-
-out:
-	if (ds && ds != origin_ds)
-		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
-	if (origin_ds)
-		dsl_dataset_close(origin_ds, DS_MODE_EXCLUSIVE, FTAG);
-	if (newnext_ds)
-		dsl_dataset_close(newnext_ds, DS_MODE_NONE, FTAG);
-	if (name)
-		kmem_free(name, MAXPATHLEN);
+	if (err == 0) {
+		dsl_dir_t *odd = origin_ds->ds_dir;
+		err = dsl_dir_transfer_possible(odd, hds->ds_dir, pa->used);
+	}
+
 	return (err);
 }
 
@@ -2273,17 +2327,15 @@
 {
 	dsl_dataset_t *hds = arg1;
 	struct promotearg *pa = arg2;
+	struct promotedsarg *snap = list_head(&pa->snap_list);
+	dsl_dataset_t *origin_ds = snap->ds;
 	dsl_dir_t *dd = hds->ds_dir;
 	dsl_pool_t *dp = hds->ds_dir->dd_pool;
 	dsl_dir_t *odd = NULL;
-	dsl_dataset_t *ds, *origin_ds;
 	char *name;
 
-	ASSERT(dd->dd_phys->dd_origin_obj != 0);
 	ASSERT(0 == (hds->ds_phys->ds_flags & DS_FLAG_NOPROMOTE));
 
-	VERIFY(0 == dsl_dataset_open_obj(dp, dd->dd_phys->dd_origin_obj,
-	    NULL, DS_MODE_EXCLUSIVE, FTAG, &origin_ds));
 	/*
 	 * We need to explicitly open odd, since origin_ds's dd will be
 	 * changing.
@@ -2291,17 +2343,26 @@
 	VERIFY(0 == dsl_dir_open_obj(dp, origin_ds->ds_dir->dd_object,
 	    NULL, FTAG, &odd));
 
+	/* change origin's next snap */
+	dmu_buf_will_dirty(origin_ds->ds_dbuf, tx);
+	origin_ds->ds_phys->ds_next_snap_obj = pa->newnext_obj;
+
+	/* change origin */
+	dmu_buf_will_dirty(dd->dd_dbuf, tx);
+	ASSERT3U(dd->dd_phys->dd_origin_obj, ==, origin_ds->ds_object);
+	dd->dd_phys->dd_origin_obj = odd->dd_phys->dd_origin_obj;
+	dmu_buf_will_dirty(odd->dd_dbuf, tx);
+	odd->dd_phys->dd_origin_obj = origin_ds->ds_object;
+
 	/* move snapshots to this dir */
 	name = kmem_alloc(MAXPATHLEN, KM_SLEEP);
-	ds = origin_ds;
-	/* CONSTCOND */
-	while (TRUE) {
-		dsl_dataset_t *prev;
+	do {
+		dsl_dataset_t *ds = snap->ds;
 
 		/* move snap name entry */
 		dsl_dataset_name(ds, name);
-		VERIFY(0 == dsl_dataset_snap_remove(dp->dp_meta_objset,
-		    pa->ds_flags, pa->snapnames_obj, ds->ds_snapname, tx));
+		VERIFY(0 == dsl_dataset_snap_remove(pa->old_head,
+		    ds->ds_snapname, tx));
 		VERIFY(0 == zap_add(dp->dp_meta_objset,
 		    hds->ds_phys->ds_snapnames_zapobj, ds->ds_snapname,
 		    8, 1, &ds->ds_object, tx));
@@ -2316,35 +2377,7 @@
 		    NULL, ds, &ds->ds_dir));
 
 		ASSERT3U(dsl_prop_numcb(ds), ==, 0);
-
-		if (ds->ds_phys->ds_prev_snap_obj == 0)
-			break;
-
-		VERIFY(0 == dsl_dataset_open_obj(dp,
-		    ds->ds_phys->ds_prev_snap_obj, NULL, DS_MODE_EXCLUSIVE,
-		    FTAG, &prev));
-
-		if (prev->ds_phys->ds_next_snap_obj != ds->ds_object) {
-			dsl_dataset_close(prev, DS_MODE_EXCLUSIVE, FTAG);
-			break;
-		}
-		if (ds != origin_ds)
-			dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
-		ds = prev;
-	}
-	if (ds != origin_ds)
-		dsl_dataset_close(ds, DS_MODE_EXCLUSIVE, FTAG);
-
-	/* change origin's next snap */
-	dmu_buf_will_dirty(origin_ds->ds_dbuf, tx);
-	origin_ds->ds_phys->ds_next_snap_obj = pa->newnext_obj;
-
-	/* change origin */
-	dmu_buf_will_dirty(dd->dd_dbuf, tx);
-	ASSERT3U(dd->dd_phys->dd_origin_obj, ==, origin_ds->ds_object);
-	dd->dd_phys->dd_origin_obj = odd->dd_phys->dd_origin_obj;
-	dmu_buf_will_dirty(odd->dd_dbuf, tx);
-	odd->dd_phys->dd_origin_obj = origin_ds->ds_object;
+	} while (snap = list_next(&pa->snap_list, snap));
 
 	/* change space accounting */
 	dsl_dir_diduse_space(odd, -pa->used, -pa->comp, -pa->uncomp, tx);
@@ -2353,10 +2386,9 @@
 
 	/* log history record */
 	spa_history_internal_log(LOG_DS_PROMOTE, dd->dd_pool->dp_spa, tx,
-	    cr, "dataset = %llu", ds->ds_object);
+	    cr, "dataset = %llu", hds->ds_object);
 
 	dsl_dir_close(odd, FTAG);
-	dsl_dataset_close(origin_ds, DS_MODE_EXCLUSIVE, FTAG);
 	kmem_free(name, MAXPATHLEN);
 }
 
@@ -2364,30 +2396,98 @@
 dsl_dataset_promote(const char *name)
 {
 	dsl_dataset_t *ds;
-	int err;
+	dsl_dir_t *dd;
+	dsl_pool_t *dp;
 	dmu_object_info_t doi;
 	struct promotearg pa;
-
-	err = dsl_dataset_open(name, DS_MODE_NONE, FTAG, &ds);
+	struct promotedsarg *snap;
+	uint64_t snap_obj;
+	uint64_t last_snap = 0;
+	int err;
+
+	err = dsl_dataset_hold(name, FTAG, &ds);
 	if (err)
 		return (err);
-
-	err = dmu_object_info(ds->ds_dir->dd_pool->dp_meta_objset,
+	dd = ds->ds_dir;
+	dp = dd->dd_pool;
+
+	err = dmu_object_info(dp->dp_meta_objset,
 	    ds->ds_phys->ds_snapnames_zapobj, &doi);
 	if (err) {
-		dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
+		dsl_dataset_rele(ds, FTAG);
 		return (err);
 	}
 
 	/*
+	 * We are going to inherit all the snapshots taken before our
+	 * origin (i.e., our new origin will be our parent's origin).
+	 * Take ownership of them so that we can rename them into our
+	 * namespace.
+	 */
+	pa.clone_origin = NULL;
+	list_create(&pa.snap_list,
+	    sizeof (struct promotedsarg), offsetof(struct promotedsarg, link));
+	rw_enter(&dp->dp_config_rwlock, RW_READER);
+	ASSERT(dd->dd_phys->dd_origin_obj != 0);
+	snap_obj = dd->dd_phys->dd_origin_obj;
+	while (snap_obj) {
+		snap = kmem_alloc(sizeof (struct promotedsarg), KM_SLEEP);
+		err = dsl_dataset_own_obj(dp, snap_obj, 0, FTAG, &snap->ds);
+		if (err == ENOENT) {
+			/* lost race with snapshot destroy */
+			struct promotedsarg *last = list_tail(&pa.snap_list);
+			ASSERT(snap_obj != last->ds->ds_phys->ds_prev_snap_obj);
+			snap_obj = last->ds->ds_phys->ds_prev_snap_obj;
+			kmem_free(snap, sizeof (struct promotedsarg));
+			continue;
+		} else if (err) {
+			kmem_free(snap, sizeof (struct promotedsarg));
+			rw_exit(&dp->dp_config_rwlock);
+			goto out;
+		}
+		/*
+		 * We could be a clone of a clone.  If we reach our
+		 * parent's branch point, we're done.
+		 */
+		if (last_snap &&
+		    snap->ds->ds_phys->ds_next_snap_obj != last_snap) {
+			pa.clone_origin = snap->ds;
+			kmem_free(snap, sizeof (struct promotedsarg));
+			snap_obj = 0;
+		} else {
+			list_insert_tail(&pa.snap_list, snap);
+			last_snap = snap_obj;
+			snap_obj = snap->ds->ds_phys->ds_prev_snap_obj;
+		}
+	}
+	snap = list_head(&pa.snap_list);
+	ASSERT(snap != NULL);
+	err = dsl_dataset_hold_obj(dp,
+	    snap->ds->ds_dir->dd_phys->dd_head_dataset_obj, FTAG, &pa.old_head);
+	rw_exit(&dp->dp_config_rwlock);
+
+	if (err)
+		goto out;
+
+	/*
 	 * Add in 128x the snapnames zapobj size, since we will be moving
 	 * a bunch of snapnames to the promoted ds, and dirtying their
 	 * bonus buffers.
 	 */
-	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
-	    dsl_dataset_promote_check,
+	err = dsl_sync_task_do(dp, dsl_dataset_promote_check,
 	    dsl_dataset_promote_sync, ds, &pa, 2 + 2 * doi.doi_physical_blks);
-	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
+
+	dsl_dataset_rele(pa.old_head, FTAG);
+out:
+	while ((snap = list_tail(&pa.snap_list)) != NULL) {
+		list_remove(&pa.snap_list, snap);
+		dsl_dataset_disown(snap->ds, FTAG);
+		kmem_free(snap, sizeof (struct promotedsarg));
+	}
+	list_destroy(&pa.snap_list);
+	if (pa.clone_origin)
+		dsl_dataset_disown(pa.clone_origin, FTAG);
+	dsl_dataset_rele(ds, FTAG);
 	return (err);
 }
 
@@ -2546,23 +2646,36 @@
 }
 
 /*
- * Swap 'clone' with its origin head file system.
+ * Swap 'clone' with its origin head file system.  Used at the end
+ * of "online recv" to swizzle the file system to the new version.
  */
 int
 dsl_dataset_clone_swap(dsl_dataset_t *clone, dsl_dataset_t *origin_head,
     boolean_t force)
 {
 	struct cloneswaparg csa;
-
-	ASSERT(clone->ds_open_refcount == DS_REF_MAX);
-	ASSERT(origin_head->ds_open_refcount == DS_REF_MAX);
-
+	int error;
+
+	ASSERT(clone->ds_owner);
+	ASSERT(origin_head->ds_owner);
+retry:
+	/* Need exclusive access for the swap */
+	rw_enter(&clone->ds_rwlock, RW_WRITER);
+	if (!rw_tryenter(&origin_head->ds_rwlock, RW_WRITER)) {
+		rw_exit(&clone->ds_rwlock);
+		rw_enter(&origin_head->ds_rwlock, RW_WRITER);
+		if (!rw_tryenter(&clone->ds_rwlock, RW_WRITER)) {
+			rw_exit(&origin_head->ds_rwlock);
+			goto retry;
+		}
+	}
 	csa.cds = clone;
 	csa.ohds = origin_head;
 	csa.force = force;
-	return (dsl_sync_task_do(clone->ds_dir->dd_pool,
+	error = dsl_sync_task_do(clone->ds_dir->dd_pool,
 	    dsl_dataset_clone_swap_check,
-	    dsl_dataset_clone_swap_sync, &csa, NULL, 9));
+	    dsl_dataset_clone_swap_sync, &csa, NULL, 9);
+	return (error);
 }
 
 /*
@@ -2574,31 +2687,26 @@
 {
 	spa_t *spa;
 	dsl_pool_t *dp;
-	dsl_dataset_t *ds = NULL;
+	dsl_dataset_t *ds;
 	int error;
 
 	if ((error = spa_open(pname, &spa, FTAG)) != 0)
 		return (error);
 	dp = spa_get_dsl(spa);
 	rw_enter(&dp->dp_config_rwlock, RW_READER);
-	if ((error = dsl_dataset_open_obj(dp, obj,
-	    NULL, DS_MODE_NONE, FTAG, &ds)) != 0) {
-		rw_exit(&dp->dp_config_rwlock);
-		spa_close(spa, FTAG);
-		return (error);
+	if ((error = dsl_dataset_hold_obj(dp, obj, FTAG, &ds)) == 0) {
+		dsl_dataset_name(ds, buf);
+		dsl_dataset_rele(ds, FTAG);
 	}
-	dsl_dataset_name(ds, buf);
-	dsl_dataset_close(ds, DS_MODE_NONE, FTAG);
 	rw_exit(&dp->dp_config_rwlock);
 	spa_close(spa, FTAG);
 
-	return (0);
+	return (error);
 }
 
 int
 dsl_dataset_check_quota(dsl_dataset_t *ds, boolean_t check_quota,
-    uint64_t asize, uint64_t inflight, uint64_t *used,
-    uint64_t *ref_rsrv)
+    uint64_t asize, uint64_t inflight, uint64_t *used, uint64_t *ref_rsrv)
 {
 	int error = 0;
 
@@ -2674,15 +2782,13 @@
 
 	dmu_buf_will_dirty(ds->ds_dbuf, tx);
 
-	mutex_enter(&ds->ds_lock);
 	ds->ds_quota = new_quota;
-	mutex_exit(&ds->ds_lock);
 
 	dsl_prop_set_uint64_sync(ds->ds_dir, "refquota", new_quota, cr, tx);
 
 	spa_history_internal_log(LOG_DS_REFQUOTA, ds->ds_dir->dd_pool->dp_spa,
 	    tx, cr, "%lld dataset = %llu ",
-	    (longlong_t)new_quota, ds->ds_dir->dd_phys->dd_head_dataset_obj);
+	    (longlong_t)new_quota, ds->ds_object);
 }
 
 int
@@ -2691,7 +2797,7 @@
 	dsl_dataset_t *ds;
 	int err;
 
-	err = dsl_dataset_open(dsname, DS_MODE_STANDARD, FTAG, &ds);
+	err = dsl_dataset_hold(dsname, FTAG, &ds);
 	if (err)
 		return (err);
 
@@ -2706,7 +2812,7 @@
 		    dsl_dataset_set_quota_check, dsl_dataset_set_quota_sync,
 		    ds, &quota, 0);
 	}
-	dsl_dataset_close(ds, DS_MODE_STANDARD, FTAG);
+	dsl_dataset_rele(ds, FTAG);
 	return (err);
 }
 
@@ -2788,13 +2894,13 @@
 	dsl_dataset_t *ds;
 	int err;
 
-	err = dsl_dataset_open(dsname, DS_MODE_STANDARD, FTAG, &ds);
+	err = dsl_dataset_hold(dsname, FTAG, &ds);
 	if (err)
 		return (err);
 
 	err = dsl_sync_task_do(ds->ds_dir->dd_pool,
 	    dsl_dataset_set_reservation_check,
 	    dsl_dataset_set_reservation_sync, ds, &reservation, 0);
-	dsl_dataset_close(ds, DS_MODE_STANDARD, FTAG);
+	dsl_dataset_rele(ds, FTAG);
 	return (err);
 }