diff usr/src/lib/libzfs/common/libzfs_sendrecv.c @ 11007:216d8396182e

PSARC/2009/557 ZFS send dedup 6812638 zfs send intra-stream dedup 6887817 want snapshot filtering for zfs send 6812603 zfs send can aggregate free records
author Lori Alt <Lori.Alt@Sun.COM>
date Mon, 09 Nov 2009 11:04:55 -0700
parents dcc7d6f9faa8
children 63ab26072e41
line wrap: on
line diff
--- a/usr/src/lib/libzfs/common/libzfs_sendrecv.c	Mon Nov 09 11:54:00 2009 +0530
+++ b/usr/src/lib/libzfs/common/libzfs_sendrecv.c	Mon Nov 09 11:04:55 2009 -0700
@@ -35,6 +35,8 @@
 #include <stddef.h>
 #include <fcntl.h>
 #include <sys/mount.h>
+#include <pthread.h>
+#include <umem.h>
 
 #include <libzfs.h>
 
@@ -42,10 +44,351 @@
 #include "zfs_prop.h"
 #include "zfs_fletcher.h"
 #include "libzfs_impl.h"
+#include <sha2.h>
 
 static int zfs_receive_impl(libzfs_handle_t *, const char *, recvflags_t,
     int, avl_tree_t *, char **);
 
+static const zio_cksum_t zero_cksum = { 0 };
+
+typedef struct dedup_arg {
+	int	inputfd;
+	int	outputfd;
+	libzfs_handle_t  *dedup_hdl;
+} dedup_arg_t;
+
+typedef struct dataref {
+	uint64_t ref_guid;
+	uint64_t ref_object;
+	uint64_t ref_offset;
+} dataref_t;
+
+typedef struct dedup_entry {
+	struct dedup_entry	*dde_next;
+	zio_cksum_t dde_chksum;
+	dataref_t dde_ref;
+} dedup_entry_t;
+
+#define	MAX_DDT_PHYSMEM_PERCENT		20
+#define	SMALLEST_POSSIBLE_MAX_DDT_MB		128
+
+typedef struct dedup_table {
+	dedup_entry_t	**dedup_hash_array;
+	umem_cache_t	*ddecache;
+	uint64_t	max_ddt_size;  /* max dedup table size in bytes */
+	uint64_t	cur_ddt_size;  /* current dedup table size in bytes */
+	uint64_t	ddt_count;
+	int		numhashbits;
+	boolean_t	ddt_full;
+} dedup_table_t;
+
+static int
+high_order_bit(uint64_t n)
+{
+	int count;
+
+	for (count = 0; n != 0; count++)
+		n >>= 1;
+	return (count);
+}
+
+static size_t
+ssread(void *buf, size_t len, FILE *stream)
+{
+	size_t outlen;
+
+	if ((outlen = fread(buf, len, 1, stream)) == 0)
+		return (0);
+
+	return (outlen);
+}
+
+static void
+ddt_hash_append(libzfs_handle_t *hdl, dedup_table_t *ddt, dedup_entry_t **ddepp,
+    zio_cksum_t *cs, dataref_t *dr)
+{
+	dedup_entry_t	*dde;
+
+	if (ddt->cur_ddt_size >= ddt->max_ddt_size) {
+		if (ddt->ddt_full == B_FALSE) {
+			zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
+			    "Dedup table full.  Deduplication will continue "
+			    "with existing table entries"));
+			ddt->ddt_full = B_TRUE;
+		}
+		return;
+	}
+
+	if ((dde = umem_cache_alloc(ddt->ddecache, UMEM_DEFAULT))
+	    != NULL) {
+		assert(*ddepp == NULL);
+		dde->dde_next = NULL;
+		dde->dde_chksum = *cs;
+		dde->dde_ref = *dr;
+		*ddepp = dde;
+		ddt->cur_ddt_size += sizeof (dedup_entry_t);
+		ddt->ddt_count++;
+	}
+}
+
+/*
+ * Using the specified dedup table, do a lookup for an entry with
+ * the checksum cs.  If found, return the block's reference info
+ * in *dr. Otherwise, insert a new entry in the dedup table, using
+ * the reference information specified by *dr.
+ *
+ * return value:  true - entry was found
+ *		  false - entry was not found
+ */
+static boolean_t
+ddt_update(libzfs_handle_t *hdl, dedup_table_t *ddt, zio_cksum_t *cs,
+    dataref_t *dr)
+{
+	uint32_t hashcode;
+	dedup_entry_t **ddepp;
+
+	hashcode = BF64_GET(cs->zc_word[0], 0, ddt->numhashbits);
+
+	for (ddepp = &(ddt->dedup_hash_array[hashcode]); *ddepp != NULL;
+	    ddepp = &((*ddepp)->dde_next)) {
+		if (ZIO_CHECKSUM_EQUAL(((*ddepp)->dde_chksum), *cs)) {
+			*dr = (*ddepp)->dde_ref;
+			return (B_TRUE);
+		}
+	}
+	ddt_hash_append(hdl, ddt, ddepp, cs, dr);
+	return (B_FALSE);
+}
+
+static int
+cksum_and_write(const void *buf, uint64_t len, zio_cksum_t *zc, int outfd)
+{
+	fletcher_4_incremental_native(buf, len, zc);
+	return (write(outfd, buf, len));
+}
+
+/*
+ * This function is started in a separate thread when the dedup option
+ * has been requested.  The main send thread determines the list of
+ * snapshots to be included in the send stream and makes the ioctl calls
+ * for each one.  But instead of having the ioctl send the output to the
+ * the output fd specified by the caller of zfs_send()), the
+ * ioctl is told to direct the output to a pipe, which is read by the
+ * alternate thread running THIS function.  This function does the
+ * dedup'ing by:
+ *  1. building a dedup table (the DDT)
+ *  2. doing checksums on each data block and inserting a record in the DDT
+ *  3. looking for matching checksums, and
+ *  4.  sending a DRR_WRITE_BYREF record instead of a write record whenever
+ *      a duplicate block is found.
+ * The output of this function then goes to the output fd requested
+ * by the caller of zfs_send().
+ */
+static void *
+cksummer(void *arg)
+{
+	dedup_arg_t *dda = arg;
+	char *buf = malloc(1<<20);
+	dmu_replay_record_t thedrr;
+	dmu_replay_record_t *drr = &thedrr;
+	struct drr_begin *drrb = &thedrr.drr_u.drr_begin;
+	struct drr_end *drre = &thedrr.drr_u.drr_end;
+	struct drr_object *drro = &thedrr.drr_u.drr_object;
+	struct drr_write *drrw = &thedrr.drr_u.drr_write;
+	FILE *ofp;
+	int outfd;
+	dmu_replay_record_t wbr_drr;
+	struct drr_write_byref *wbr_drrr = &wbr_drr.drr_u.drr_write_byref;
+	dedup_table_t ddt;
+	zio_cksum_t stream_cksum;
+	uint64_t physmem = sysconf(_SC_PHYS_PAGES) * sysconf(_SC_PAGESIZE);
+	uint64_t numbuckets;
+
+	ddt.max_ddt_size =
+	    MAX((physmem * MAX_DDT_PHYSMEM_PERCENT)/100,
+	    SMALLEST_POSSIBLE_MAX_DDT_MB<<20);
+
+	numbuckets = ddt.max_ddt_size/(sizeof (dedup_entry_t));
+
+	/*
+	 * numbuckets must be a power of 2.  Increase number to
+	 * a power of 2 if necessary.
+	 */
+	if (!ISP2(numbuckets))
+		numbuckets = 1 << high_order_bit(numbuckets);
+
+	ddt.dedup_hash_array = calloc(numbuckets, sizeof (dedup_entry_t *));
+	ddt.ddecache = umem_cache_create("dde", sizeof (dedup_entry_t), 0,
+	    NULL, NULL, NULL, NULL, NULL, 0);
+	ddt.cur_ddt_size = numbuckets * sizeof (dedup_entry_t *);
+	ddt.numhashbits = high_order_bit(numbuckets) - 1;
+	ddt.ddt_full = B_FALSE;
+
+	/* Initialize the write-by-reference block. */
+	wbr_drr.drr_type = DRR_WRITE_BYREF;
+	wbr_drr.drr_payloadlen = 0;
+
+	outfd = dda->outputfd;
+	ofp = fdopen(dda->inputfd, "r");
+	while (ssread(drr, sizeof (dmu_replay_record_t), ofp) != 0) {
+
+		switch (drr->drr_type) {
+		case DRR_BEGIN:
+		{
+			int	fflags;
+			ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
+
+			/* set the DEDUP feature flag for this stream */
+			fflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
+			fflags |= DMU_BACKUP_FEATURE_DEDUP;
+			DMU_SET_FEATUREFLAGS(drrb->drr_versioninfo, fflags);
+
+			if (cksum_and_write(drr, sizeof (dmu_replay_record_t),
+			    &stream_cksum, outfd) == -1)
+				goto out;
+			if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
+			    DMU_COMPOUNDSTREAM && drr->drr_payloadlen != 0) {
+				int sz = drr->drr_payloadlen;
+
+				if (sz > 1<<20) {
+					free(buf);
+					buf = malloc(sz);
+				}
+				(void) ssread(buf, sz, ofp);
+				if (ferror(stdin))
+					perror("fread");
+				if (cksum_and_write(buf, sz, &stream_cksum,
+				    outfd) == -1)
+					goto out;
+			}
+			break;
+		}
+
+		case DRR_END:
+		{
+			/* use the recalculated checksum */
+			ZIO_SET_CHECKSUM(&drre->drr_checksum,
+			    stream_cksum.zc_word[0], stream_cksum.zc_word[1],
+			    stream_cksum.zc_word[2], stream_cksum.zc_word[3]);
+			if ((write(outfd, drr,
+			    sizeof (dmu_replay_record_t))) == -1)
+				goto out;
+			break;
+		}
+
+		case DRR_OBJECT:
+		{
+			if (cksum_and_write(drr, sizeof (dmu_replay_record_t),
+			    &stream_cksum, outfd) == -1)
+				goto out;
+			if (drro->drr_bonuslen > 0) {
+				(void) ssread(buf,
+				    P2ROUNDUP((uint64_t)drro->drr_bonuslen, 8),
+				    ofp);
+				if (cksum_and_write(buf,
+				    P2ROUNDUP((uint64_t)drro->drr_bonuslen, 8),
+				    &stream_cksum, outfd) == -1)
+					goto out;
+			}
+			break;
+		}
+
+		case DRR_FREEOBJECTS:
+		{
+			if (cksum_and_write(drr, sizeof (dmu_replay_record_t),
+			    &stream_cksum, outfd) == -1)
+				goto out;
+			break;
+		}
+
+		case DRR_WRITE:
+		{
+			dataref_t	dataref;
+
+			(void) ssread(buf, drrw->drr_length, ofp);
+			/*
+			 * If the block doesn't already have a dedup
+			 * checksum, calculate one.
+			 */
+			if (ZIO_CHECKSUM_EQUAL(drrw->drr_blkcksum,
+			    zero_cksum)) {
+				SHA256_CTX	ctx;
+				zio_cksum_t	tmpsha256;
+
+				SHA256Init(&ctx);
+				SHA256Update(&ctx, buf, drrw->drr_length);
+				SHA256Final(&tmpsha256, &ctx);
+				drrw->drr_blkcksum.zc_word[0] =
+				    BE_64(tmpsha256.zc_word[0]);
+				drrw->drr_blkcksum.zc_word[1] =
+				    BE_64(tmpsha256.zc_word[1]);
+				drrw->drr_blkcksum.zc_word[2] =
+				    BE_64(tmpsha256.zc_word[2]);
+				drrw->drr_blkcksum.zc_word[3] =
+				    BE_64(tmpsha256.zc_word[3]);
+			}
+
+			dataref.ref_guid = drrw->drr_toguid;
+			dataref.ref_object = drrw->drr_object;
+			dataref.ref_offset = drrw->drr_offset;
+
+			if (ddt_update(dda->dedup_hdl, &ddt,
+			    &drrw->drr_blkcksum, &dataref)) {
+				/* block already present in stream */
+				wbr_drrr->drr_object = drrw->drr_object;
+				wbr_drrr->drr_offset = drrw->drr_offset;
+				wbr_drrr->drr_length = drrw->drr_length;
+				wbr_drrr->drr_toguid = drrw->drr_toguid;
+				wbr_drrr->drr_refguid = dataref.ref_guid;
+				wbr_drrr->drr_refobject =
+				    dataref.ref_object;
+				wbr_drrr->drr_refoffset =
+				    dataref.ref_offset;
+
+				wbr_drrr->drr_blkcksum = drrw->drr_blkcksum;
+
+				if (cksum_and_write(&wbr_drr,
+				    sizeof (dmu_replay_record_t), &stream_cksum,
+				    outfd) == -1)
+					goto out;
+			} else {
+				/* block not previously seen */
+				if (cksum_and_write(drr,
+				    sizeof (dmu_replay_record_t), &stream_cksum,
+				    outfd) == -1)
+					goto out;
+				if (cksum_and_write(buf,
+				    drrw->drr_length,
+				    &stream_cksum, outfd) == -1)
+					goto out;
+			}
+			break;
+		}
+
+		case DRR_FREE:
+		{
+			if (cksum_and_write(drr, sizeof (dmu_replay_record_t),
+			    &stream_cksum, outfd) == -1)
+				goto out;
+			break;
+		}
+
+		default:
+			(void) printf("INVALID record type 0x%x\n",
+			    drr->drr_type);
+			/* should never happen, so assert */
+			assert(B_FALSE);
+		}
+	}
+out:
+	umem_cache_destroy(ddt.ddecache);
+	free(ddt.dedup_hash_array);
+	free(buf);
+	(void) fclose(ofp);
+
+	return (NULL);
+}
+
 /*
  * Routines for dealing with the AVL tree of fs-nvlists
  */
@@ -451,13 +794,15 @@
 	/* these are all just the short snapname (the part after the @) */
 	const char *fromsnap;
 	const char *tosnap;
-	char lastsnap[ZFS_MAXNAMELEN];
+	char prevsnap[ZFS_MAXNAMELEN];
 	boolean_t seenfrom, seento, replicate, doall, fromorigin;
 	boolean_t verbose;
 	int outfd;
 	boolean_t err;
 	nvlist_t *fss;
 	avl_tree_t *fsavl;
+	snapfilter_cb_t *filter_cb;
+	void *filter_cb_arg;
 } send_dump_data_t;
 
 /*
@@ -535,7 +880,7 @@
 	if (sdd->fromsnap && !sdd->seenfrom &&
 	    strcmp(sdd->fromsnap, thissnap) == 0) {
 		sdd->seenfrom = B_TRUE;
-		(void) strcpy(sdd->lastsnap, thissnap);
+		(void) strcpy(sdd->prevsnap, thissnap);
 		zfs_close(zhp);
 		return (0);
 	}
@@ -545,20 +890,38 @@
 		return (0);
 	}
 
+	if (strcmp(sdd->tosnap, thissnap) == 0)
+		sdd->seento = B_TRUE;
+
+	/*
+	 * If a filter function exists, call it to determine whether
+	 * this snapshot will be sent.
+	 */
+	if (sdd->filter_cb != NULL &&
+	    sdd->filter_cb(zhp, sdd->filter_cb_arg) == B_FALSE) {
+		/*
+		 * This snapshot is filtered out.  Don't send it, and don't
+		 * set prevsnap, so it will be as if this snapshot didn't
+		 * exist, and the next accepted snapshot will be sent as
+		 * an incremental from the last accepted one, or as the
+		 * first (and full) snapshot in the case of a replication,
+		 * non-incremental send.
+		 */
+		zfs_close(zhp);
+		return (0);
+	}
+
 	/* send it */
 	if (sdd->verbose) {
 		(void) fprintf(stderr, "sending from @%s to %s\n",
-		    sdd->lastsnap, zhp->zfs_name);
+		    sdd->prevsnap, zhp->zfs_name);
 	}
 
-	err = dump_ioctl(zhp, sdd->lastsnap,
-	    sdd->lastsnap[0] == '\0' && (sdd->fromorigin || sdd->replicate),
+	err = dump_ioctl(zhp, sdd->prevsnap,
+	    sdd->prevsnap[0] == '\0' && (sdd->fromorigin || sdd->replicate),
 	    sdd->outfd);
 
-	if (!sdd->seento && strcmp(sdd->tosnap, thissnap) == 0)
-		sdd->seento = B_TRUE;
-
-	(void) strcpy(sdd->lastsnap, thissnap);
+	(void) strcpy(sdd->prevsnap, thissnap);
 	zfs_close(zhp);
 	return (err);
 }
@@ -598,7 +961,7 @@
 	}
 
 	if (sdd->doall) {
-		sdd->seenfrom = sdd->seento = sdd->lastsnap[0] = 0;
+		sdd->seenfrom = sdd->seento = sdd->prevsnap[0] = 0;
 		if (sdd->fromsnap == NULL || missingfrom)
 			sdd->seenfrom = B_TRUE;
 
@@ -635,10 +998,14 @@
 		if (snapzhp == NULL) {
 			rv = -1;
 		} else {
-			rv = dump_ioctl(snapzhp,
-			    missingfrom ? NULL : sdd->fromsnap,
-			    sdd->fromorigin || missingfrom,
-			    sdd->outfd);
+			if (sdd->filter_cb == NULL ||
+			    sdd->filter_cb(snapzhp, sdd->filter_cb_arg) ==
+			    B_TRUE) {
+				rv = dump_ioctl(snapzhp,
+				    missingfrom ? NULL : sdd->fromsnap,
+				    sdd->fromorigin || missingfrom,
+				    sdd->outfd);
+			}
 			sdd->seento = B_TRUE;
 			zfs_close(snapzhp);
 		}
@@ -678,12 +1045,12 @@
 		origin_nv = fsavl_find(sdd->fsavl, origin_guid, NULL);
 		if (origin_nv &&
 		    nvlist_lookup_boolean(origin_nv, "sent") == ENOENT) {
-			/*
-			 * origin has not been sent yet;
-			 * skip this clone.
-			 */
-			needagain = B_TRUE;
-			continue;
+				/*
+				 * origin has not been sent yet;
+				 * skip this clone.
+				 */
+				needagain = B_TRUE;
+				continue;
 		}
 
 		zhp = zfs_open(rzhp->zfs_hdl, fsname, ZFS_TYPE_DATASET);
@@ -714,15 +1081,15 @@
  *	 is TRUE.
  *
  * The send stream is recursive (i.e. dumps a hierarchy of snapshots) and
- * uses a special header (with a version field of DMU_BACKUP_HEADER_VERSION)
+ * uses a special header (with a hdrtype field of DMU_COMPOUNDSTREAM)
  * if "replicate" is set.  If "doall" is set, dump all the intermediate
- * snapshots. The DMU_BACKUP_HEADER_VERSION header is used in the "doall"
+ * snapshots. The DMU_COMPOUNDSTREAM header is used in the "doall"
  * case too.
  */
 int
 zfs_send(zfs_handle_t *zhp, const char *fromsnap, const char *tosnap,
-    boolean_t replicate, boolean_t doall, boolean_t fromorigin,
-    boolean_t verbose, int outfd)
+    sendflags_t flags, int outfd, snapfilter_cb_t filter_func,
+    void *cb_arg)
 {
 	char errbuf[1024];
 	send_dump_data_t sdd = { 0 };
@@ -733,6 +1100,10 @@
 	static uint64_t holdseq;
 	int spa_version;
 	boolean_t holdsnaps = B_FALSE;
+	pthread_t tid;
+	int pipefd[2];
+	dedup_arg_t dda = { 0 };
+	int featureflags = 0;
 
 	(void) snprintf(errbuf, sizeof (errbuf), dgettext(TEXT_DOMAIN,
 	    "cannot send '%s'"), zhp->zfs_name);
@@ -747,13 +1118,32 @@
 	    spa_version >= SPA_VERSION_USERREFS)
 		holdsnaps = B_TRUE;
 
-	if (replicate || doall) {
+	if (flags.dedup) {
+		featureflags |= DMU_BACKUP_FEATURE_DEDUP;
+		if (err = pipe(pipefd)) {
+			zfs_error_aux(zhp->zfs_hdl, strerror(errno));
+			return (zfs_error(zhp->zfs_hdl, EZFS_PIPEFAILED,
+			    errbuf));
+		}
+		dda.outputfd = outfd;
+		dda.inputfd = pipefd[1];
+		dda.dedup_hdl = zhp->zfs_hdl;
+		if (err = pthread_create(&tid, NULL, cksummer, &dda)) {
+			(void) close(pipefd[0]);
+			(void) close(pipefd[1]);
+			zfs_error_aux(zhp->zfs_hdl, strerror(errno));
+			return (zfs_error(zhp->zfs_hdl,
+			    EZFS_THREADCREATEFAILED, errbuf));
+		}
+	}
+
+	if (flags.replicate || flags.doall) {
 		dmu_replay_record_t drr = { 0 };
 		char *packbuf = NULL;
 		size_t buflen = 0;
 		zio_cksum_t zc = { 0 };
 
-		assert(fromsnap || doall);
+		assert(fromsnap || flags.doall);
 
 		if (holdsnaps) {
 			(void) snprintf(holdtag, sizeof (holdtag),
@@ -762,9 +1152,11 @@
 			err = zfs_hold_range(zhp, fromsnap, tosnap,
 			    holdtag, B_TRUE);
 			if (err)
-				return (err);
+				goto err_out;
 		}
-		if (replicate) {
+
+
+		if (flags.replicate) {
 			nvlist_t *hdrnv;
 
 			VERIFY(0 == nvlist_alloc(&hdrnv, NV_UNIQUE_NAME, 0));
@@ -781,7 +1173,7 @@
 					(void) zfs_release_range(zhp, fromsnap,
 					    tosnap, holdtag);
 				}
-				return (err);
+				goto err_out;
 			}
 			VERIFY(0 == nvlist_add_nvlist(hdrnv, "fss", fss));
 			err = nvlist_pack(hdrnv, &packbuf, &buflen,
@@ -794,26 +1186,26 @@
 					(void) zfs_release_range(zhp, fromsnap,
 					    tosnap, holdtag);
 				}
-				return (zfs_standard_error(zhp->zfs_hdl,
-				    err, errbuf));
+				goto stderr_out;
 			}
 		}
 
 		/* write first begin record */
 		drr.drr_type = DRR_BEGIN;
 		drr.drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
-		drr.drr_u.drr_begin.drr_version = DMU_BACKUP_HEADER_VERSION;
+		DMU_SET_STREAM_HDRTYPE(drr.drr_u.drr_begin.drr_versioninfo,
+		    DMU_COMPOUNDSTREAM);
+		DMU_SET_FEATUREFLAGS(drr.drr_u.drr_begin.drr_versioninfo,
+		    featureflags);
 		(void) snprintf(drr.drr_u.drr_begin.drr_toname,
 		    sizeof (drr.drr_u.drr_begin.drr_toname),
 		    "%s@%s", zhp->zfs_name, tosnap);
 		drr.drr_payloadlen = buflen;
-		fletcher_4_incremental_native(&drr, sizeof (drr), &zc);
-		err = write(outfd, &drr, sizeof (drr));
+		err = cksum_and_write(&drr, sizeof (drr), &zc, outfd);
 
 		/* write header nvlist */
-		if (err != -1) {
-			fletcher_4_incremental_native(packbuf, buflen, &zc);
-			err = write(outfd, packbuf, buflen);
+		if (err != -1 && flags.replicate) {
+			err = cksum_and_write(packbuf, buflen, &zc, outfd);
 		}
 		free(packbuf);
 		if (err == -1) {
@@ -823,8 +1215,8 @@
 				(void) zfs_release_range(zhp, fromsnap, tosnap,
 				    holdtag);
 			}
-			return (zfs_standard_error(zhp->zfs_hdl,
-			    errno, errbuf));
+			err = errno;
+			goto stderr_out;
 		}
 
 		/* write end record */
@@ -840,8 +1232,8 @@
 					(void) zfs_release_range(zhp, fromsnap,
 					    tosnap, holdtag);
 				}
-				return (zfs_standard_error(zhp->zfs_hdl,
-				    errno, errbuf));
+				err = errno;
+				goto stderr_out;
 			}
 		}
 	}
@@ -849,18 +1241,27 @@
 	/* dump each stream */
 	sdd.fromsnap = fromsnap;
 	sdd.tosnap = tosnap;
-	sdd.outfd = outfd;
-	sdd.replicate = replicate;
-	sdd.doall = doall;
-	sdd.fromorigin = fromorigin;
+	if (flags.dedup)
+		sdd.outfd = pipefd[0];
+	else
+		sdd.outfd = outfd;
+	sdd.replicate = flags.replicate;
+	sdd.doall = flags.doall;
+	sdd.fromorigin = flags.fromorigin;
 	sdd.fss = fss;
 	sdd.fsavl = fsavl;
-	sdd.verbose = verbose;
+	sdd.verbose = flags.verbose;
+	sdd.filter_cb = filter_func;
+	sdd.filter_cb_arg = cb_arg;
 	err = dump_filesystems(zhp, &sdd);
 	fsavl_destroy(fsavl);
 	nvlist_free(fss);
 
-	if (replicate || doall) {
+	if (flags.dedup) {
+		(void) close(pipefd[0]);
+		(void) pthread_join(tid, NULL);
+	}
+	if (flags.replicate || flags.doall) {
 		/*
 		 * write final end record.  NB: want to do this even if
 		 * there was some error, because it might not be totally
@@ -879,6 +1280,16 @@
 	}
 
 	return (err || sdd.err);
+
+stderr_out:
+	err = zfs_standard_error(zhp->zfs_hdl, err, errbuf);
+err_out:
+	if (flags.dedup) {
+		(void) pthread_cancel(tid);
+		(void) pthread_join(tid, NULL);
+		(void) close(pipefd[0]);
+	}
+	return (err);
 }
 
 /*
@@ -1459,7 +1870,8 @@
 
 	assert(drr->drr_type == DRR_BEGIN);
 	assert(drr->drr_u.drr_begin.drr_magic == DMU_BACKUP_MAGIC);
-	assert(drr->drr_u.drr_begin.drr_version == DMU_BACKUP_HEADER_VERSION);
+	assert(DMU_GET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo) ==
+	    DMU_COMPOUNDSTREAM);
 
 	/*
 	 * Read in the nvlist from the stream.
@@ -1582,6 +1994,10 @@
 {
 	dmu_replay_record_t *drr;
 	void *buf = malloc(1<<20);
+	char errbuf[1024];
+
+	(void) snprintf(errbuf, sizeof (errbuf), dgettext(TEXT_DOMAIN,
+	    "cannot receive:"));
 
 	/* XXX would be great to use lseek if possible... */
 	drr = buf;
@@ -1594,7 +2010,11 @@
 		switch (drr->drr_type) {
 		case DRR_BEGIN:
 			/* NB: not to be used on v2 stream packages */
-			assert(drr->drr_payloadlen == 0);
+			if (drr->drr_payloadlen != 0) {
+				zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
+				    "invalid substream header"));
+				return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
+			}
 			break;
 
 		case DRR_END:
@@ -1621,12 +2041,15 @@
 			    drr->drr_u.drr_write.drr_length, B_FALSE, NULL);
 			break;
 
+		case DRR_WRITE_BYREF:
 		case DRR_FREEOBJECTS:
 		case DRR_FREE:
 			break;
 
 		default:
-			assert(!"invalid record type");
+			zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
+			    "invalid record type"));
+			return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
 		}
 	}
 
@@ -1728,6 +2151,7 @@
 	/*
 	 * Determine name of destination snapshot, store in zc_value.
 	 */
+	(void) strcpy(zc.zc_top_ds, tosnap);
 	(void) strcpy(zc.zc_value, tosnap);
 	(void) strncat(zc.zc_value, drrb->drr_toname+choplen,
 	    sizeof (zc.zc_value));
@@ -2068,6 +2492,8 @@
 	struct drr_begin *drrb = &drr.drr_u.drr_begin;
 	char errbuf[1024];
 	zio_cksum_t zcksum = { 0 };
+	uint64_t featureflags;
+	int hdrtype;
 
 	(void) snprintf(errbuf, sizeof (errbuf), dgettext(TEXT_DOMAIN,
 	    "cannot receive"));
@@ -2105,7 +2531,7 @@
 		drr.drr_type = BSWAP_32(drr.drr_type);
 		drr.drr_payloadlen = BSWAP_32(drr.drr_payloadlen);
 		drrb->drr_magic = BSWAP_64(drrb->drr_magic);
-		drrb->drr_version = BSWAP_64(drrb->drr_version);
+		drrb->drr_versioninfo = BSWAP_64(drrb->drr_versioninfo);
 		drrb->drr_creation_time = BSWAP_64(drrb->drr_creation_time);
 		drrb->drr_type = BSWAP_32(drrb->drr_type);
 		drrb->drr_flags = BSWAP_32(drrb->drr_flags);
@@ -2119,23 +2545,31 @@
 		return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
 	}
 
+	featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
+	hdrtype = DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo);
+
+	if (!DMU_STREAM_SUPPORTED(featureflags) ||
+	    (hdrtype != DMU_SUBSTREAM && hdrtype != DMU_COMPOUNDSTREAM)) {
+		zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
+		    "stream has unsupported feature, feature flags = %lx"),
+		    featureflags);
+		return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
+	}
+
 	if (strchr(drrb->drr_toname, '@') == NULL) {
 		zfs_error_aux(hdl, dgettext(TEXT_DOMAIN, "invalid "
 		    "stream (bad snapshot name)"));
 		return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
 	}
 
-	if (drrb->drr_version == DMU_BACKUP_STREAM_VERSION) {
+	if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) == DMU_SUBSTREAM) {
 		return (zfs_receive_one(hdl, infd, tosnap, flags,
 		    &drr, &drr_noswap, stream_avl, top_zfs));
-	} else if (drrb->drr_version == DMU_BACKUP_HEADER_VERSION) {
+	} else {  /* must be DMU_COMPOUNDSTREAM */
+		assert(DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
+		    DMU_COMPOUNDSTREAM);
 		return (zfs_receive_package(hdl, infd, tosnap, flags,
 		    &drr, &zcksum, top_zfs));
-	} else {
-		zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
-		    "stream is unsupported version %llu"),
-		    drrb->drr_version);
-		return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
 	}
 }