# HG changeset patch # User Lori Alt # Date 1257789895 25200 # Node ID 216d8396182e1ef1d669610854db30fd51291cc2 # Parent 4fe66eb82610d17ce829ba014bf310a112715a4e PSARC/2009/557 ZFS send dedup 6812638 zfs send intra-stream dedup 6887817 want snapshot filtering for zfs send 6812603 zfs send can aggregate free records diff -r 4fe66eb82610 -r 216d8396182e usr/src/cmd/zfs/zfs_main.c --- a/usr/src/cmd/zfs/zfs_main.c Mon Nov 09 11:54:00 2009 +0530 +++ b/usr/src/cmd/zfs/zfs_main.c Mon Nov 09 11:04:55 2009 -0700 @@ -232,7 +232,7 @@ case HELP_ROLLBACK: return (gettext("\trollback [-rRf] \n")); case HELP_SEND: - return (gettext("\tsend [-R] [-[iI] snapshot] \n")); + return (gettext("\tsend [-RD] [-[iI] snapshot] \n")); case HELP_SET: return (gettext("\tset " " ...\n")); @@ -2488,8 +2488,8 @@ } /* - * zfs send [-v] -R [-i|-I <@snap>] - * zfs send [-v] [-i|-I <@snap>] + * zfs send [-vD] -R [-i|-I <@snap>] + * zfs send [-vD] [-i|-I <@snap>] * * Send a backup stream to stdout. */ @@ -2500,14 +2500,11 @@ char *toname = NULL; char *cp; zfs_handle_t *zhp; - boolean_t doall = B_FALSE; - boolean_t replicate = B_FALSE; - boolean_t fromorigin = B_FALSE; - boolean_t verbose = B_FALSE; + sendflags_t flags = { 0 }; int c, err; /* check options */ - while ((c = getopt(argc, argv, ":i:I:Rv")) != -1) { + while ((c = getopt(argc, argv, ":i:I:RDv")) != -1) { switch (c) { case 'i': if (fromname) @@ -2518,13 +2515,16 @@ if (fromname) usage(B_FALSE); fromname = optarg; - doall = B_TRUE; + flags.doall = B_TRUE; break; case 'R': - replicate = B_TRUE; + flags.replicate = B_TRUE; break; case 'v': - verbose = B_TRUE; + flags.verbose = B_TRUE; + break; + case 'D': + flags.dedup = B_TRUE; break; case ':': (void) fprintf(stderr, gettext("missing argument for " @@ -2584,7 +2584,7 @@ if (strcmp(origin, fromname) == 0) { fromname = NULL; - fromorigin = B_TRUE; + flags.fromorigin = B_TRUE; } else { *cp = '\0'; if (cp != fromname && strcmp(argv[0], fromname)) { @@ -2602,11 +2602,10 @@ } } - if (replicate && fromname == NULL) - doall = B_TRUE; - - err = zfs_send(zhp, fromname, toname, replicate, doall, fromorigin, - verbose, STDOUT_FILENO); + if (flags.replicate && fromname == NULL) + flags.doall = B_TRUE; + + err = zfs_send(zhp, fromname, toname, flags, STDOUT_FILENO, NULL, 0); zfs_close(zhp); return (err != 0); diff -r 4fe66eb82610 -r 216d8396182e usr/src/cmd/zstreamdump/zstreamdump.c --- a/usr/src/cmd/zstreamdump/zstreamdump.c Mon Nov 09 11:54:00 2009 +0530 +++ b/usr/src/cmd/zstreamdump/zstreamdump.c Mon Nov 09 11:04:55 2009 -0700 @@ -86,6 +86,7 @@ struct drr_object *drro = &thedrr.drr_u.drr_object; struct drr_freeobjects *drrfo = &thedrr.drr_u.drr_freeobjects; struct drr_write *drrw = &thedrr.drr_u.drr_write; + struct drr_write_byref *drrwbr = &thedrr.drr_u.drr_write_byref; struct drr_free *drrf = &thedrr.drr_u.drr_free; char c; boolean_t verbose = B_FALSE; @@ -172,7 +173,8 @@ case DRR_BEGIN: if (do_byteswap) { drrb->drr_magic = BSWAP_64(drrb->drr_magic); - drrb->drr_version = BSWAP_64(drrb->drr_version); + drrb->drr_versioninfo = + BSWAP_64(drrb->drr_versioninfo); drrb->drr_creation_time = BSWAP_64(drrb->drr_creation_time); drrb->drr_type = BSWAP_32(drrb->drr_type); @@ -183,8 +185,10 @@ } (void) printf("BEGIN record\n"); - (void) printf("\tversion = %llx\n", - (u_longlong_t)drrb->drr_version); + (void) printf("\thdrtype = %lld\n", + DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo)); + (void) printf("\tfeatures = %llx\n", + DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo)); (void) printf("\tmagic = %llx\n", (u_longlong_t)drrb->drr_magic); (void) printf("\tcreation_time = %llx\n", @@ -199,8 +203,8 @@ if (verbose) (void) printf("\n"); - if (drrb->drr_version == 2 && - drr->drr_payloadlen != 0) { + if ((DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) == + DMU_COMPOUNDSTREAM) && drr->drr_payloadlen != 0) { nvlist_t *nv; int sz = drr->drr_payloadlen; @@ -264,6 +268,7 @@ drro->drr_blksz = BSWAP_32(drro->drr_blksz); drro->drr_bonuslen = BSWAP_32(drro->drr_bonuslen); + drro->drr_toguid = BSWAP_64(drro->drr_toguid); } if (verbose) { (void) printf("OBJECT object = %llu type = %u " @@ -286,6 +291,7 @@ BSWAP_64(drrfo->drr_firstobj); drrfo->drr_numobjs = BSWAP_64(drrfo->drr_numobjs); + drrfo->drr_toguid = BSWAP_64(drrfo->drr_toguid); } if (verbose) { (void) printf("FREEOBJECTS firstobj = %llu " @@ -301,6 +307,7 @@ drrw->drr_type = BSWAP_32(drrw->drr_type); drrw->drr_offset = BSWAP_64(drrw->drr_offset); drrw->drr_length = BSWAP_64(drrw->drr_length); + drrw->drr_toguid = BSWAP_64(drrw->drr_toguid); } if (verbose) { (void) printf("WRITE object = %llu type = %u " @@ -314,6 +321,38 @@ total_write_size += drrw->drr_length; break; + case DRR_WRITE_BYREF: + if (do_byteswap) { + drrwbr->drr_object = + BSWAP_64(drrwbr->drr_object); + drrwbr->drr_offset = + BSWAP_64(drrwbr->drr_offset); + drrwbr->drr_length = + BSWAP_64(drrwbr->drr_length); + drrwbr->drr_toguid = + BSWAP_64(drrwbr->drr_toguid); + drrwbr->drr_refguid = + BSWAP_64(drrwbr->drr_refguid); + drrwbr->drr_refobject = + BSWAP_64(drrwbr->drr_refobject); + drrwbr->drr_refoffset = + BSWAP_64(drrwbr->drr_refoffset); + } + if (verbose) { + (void) printf("WRITE_BYREF object = %llu " + "offset = %llu length = %llu\n" + "toguid = %llx refguid = %llx\n" + "refobject = %llu refoffset = %llu\n", + (u_longlong_t)drrwbr->drr_object, + (u_longlong_t)drrwbr->drr_offset, + (u_longlong_t)drrwbr->drr_length, + (u_longlong_t)drrwbr->drr_toguid, + (u_longlong_t)drrwbr->drr_refguid, + (u_longlong_t)drrwbr->drr_refobject, + (u_longlong_t)drrwbr->drr_refoffset); + } + break; + case DRR_FREE: if (do_byteswap) { drrf->drr_object = BSWAP_64(drrf->drr_object); diff -r 4fe66eb82610 -r 216d8396182e usr/src/lib/libzfs/Makefile.com --- a/usr/src/lib/libzfs/Makefile.com Mon Nov 09 11:54:00 2009 +0530 +++ b/usr/src/lib/libzfs/Makefile.com Mon Nov 09 11:04:55 2009 -0700 @@ -67,7 +67,7 @@ C99MODE= -xc99=%all C99LMODE= -Xc99=%all LDLIBS += -lc -lm -ldevid -lgen -lnvpair -luutil -lavl -lefi \ - -ladm -lidmap -ltsol + -ladm -lidmap -ltsol -lmd -lumem CPPFLAGS += $(INCS) -D_REENTRANT SRCS= $(OBJS_COMMON:%.o=$(SRCDIR)/%.c) \ diff -r 4fe66eb82610 -r 216d8396182e usr/src/lib/libzfs/common/libzfs.h --- a/usr/src/lib/libzfs/common/libzfs.h Mon Nov 09 11:54:00 2009 +0530 +++ b/usr/src/lib/libzfs/common/libzfs.h Mon Nov 09 11:04:55 2009 -0700 @@ -118,6 +118,8 @@ EZFS_REFTAG_RELE, /* snapshot release: tag not found */ EZFS_REFTAG_HOLD, /* snapshot hold: tag already exists */ EZFS_TAGTOOLONG, /* snapshot hold/rele: tag too long */ + EZFS_PIPEFAILED, /* pipe create failed */ + EZFS_THREADCREATEFAILED, /* thread create failed */ EZFS_UNKNOWN }; @@ -470,8 +472,29 @@ extern int zfs_snapshot(libzfs_handle_t *, const char *, boolean_t, nvlist_t *); extern int zfs_rollback(zfs_handle_t *, zfs_handle_t *, boolean_t); extern int zfs_rename(zfs_handle_t *, const char *, boolean_t); + +typedef struct sendflags { + /* print informational messages (ie, -v was specified) */ + int verbose : 1; + + /* recursive send (i.e., -R) */ + int replicate : 1; + + /* for incrementals, do all intermediate snapshots */ + int doall : 1; /* (i.e., -I) */ + + /* if dataset is a clone, do incremental from its origin */ + int fromorigin : 1; + + /* do deduplication */ + int dedup : 1; +} sendflags_t; + +typedef boolean_t (snapfilter_cb_t)(zfs_handle_t *, void *); + extern int zfs_send(zfs_handle_t *, const char *, const char *, - boolean_t, boolean_t, boolean_t, boolean_t, int); + sendflags_t, int, snapfilter_cb_t, void *); + extern int zfs_promote(zfs_handle_t *); extern int zfs_hold(zfs_handle_t *, const char *, const char *, boolean_t, boolean_t); diff -r 4fe66eb82610 -r 216d8396182e usr/src/lib/libzfs/common/libzfs_sendrecv.c --- a/usr/src/lib/libzfs/common/libzfs_sendrecv.c Mon Nov 09 11:54:00 2009 +0530 +++ b/usr/src/lib/libzfs/common/libzfs_sendrecv.c Mon Nov 09 11:04:55 2009 -0700 @@ -35,6 +35,8 @@ #include #include #include +#include +#include #include @@ -42,10 +44,351 @@ #include "zfs_prop.h" #include "zfs_fletcher.h" #include "libzfs_impl.h" +#include static int zfs_receive_impl(libzfs_handle_t *, const char *, recvflags_t, int, avl_tree_t *, char **); +static const zio_cksum_t zero_cksum = { 0 }; + +typedef struct dedup_arg { + int inputfd; + int outputfd; + libzfs_handle_t *dedup_hdl; +} dedup_arg_t; + +typedef struct dataref { + uint64_t ref_guid; + uint64_t ref_object; + uint64_t ref_offset; +} dataref_t; + +typedef struct dedup_entry { + struct dedup_entry *dde_next; + zio_cksum_t dde_chksum; + dataref_t dde_ref; +} dedup_entry_t; + +#define MAX_DDT_PHYSMEM_PERCENT 20 +#define SMALLEST_POSSIBLE_MAX_DDT_MB 128 + +typedef struct dedup_table { + dedup_entry_t **dedup_hash_array; + umem_cache_t *ddecache; + uint64_t max_ddt_size; /* max dedup table size in bytes */ + uint64_t cur_ddt_size; /* current dedup table size in bytes */ + uint64_t ddt_count; + int numhashbits; + boolean_t ddt_full; +} dedup_table_t; + +static int +high_order_bit(uint64_t n) +{ + int count; + + for (count = 0; n != 0; count++) + n >>= 1; + return (count); +} + +static size_t +ssread(void *buf, size_t len, FILE *stream) +{ + size_t outlen; + + if ((outlen = fread(buf, len, 1, stream)) == 0) + return (0); + + return (outlen); +} + +static void +ddt_hash_append(libzfs_handle_t *hdl, dedup_table_t *ddt, dedup_entry_t **ddepp, + zio_cksum_t *cs, dataref_t *dr) +{ + dedup_entry_t *dde; + + if (ddt->cur_ddt_size >= ddt->max_ddt_size) { + if (ddt->ddt_full == B_FALSE) { + zfs_error_aux(hdl, dgettext(TEXT_DOMAIN, + "Dedup table full. Deduplication will continue " + "with existing table entries")); + ddt->ddt_full = B_TRUE; + } + return; + } + + if ((dde = umem_cache_alloc(ddt->ddecache, UMEM_DEFAULT)) + != NULL) { + assert(*ddepp == NULL); + dde->dde_next = NULL; + dde->dde_chksum = *cs; + dde->dde_ref = *dr; + *ddepp = dde; + ddt->cur_ddt_size += sizeof (dedup_entry_t); + ddt->ddt_count++; + } +} + +/* + * Using the specified dedup table, do a lookup for an entry with + * the checksum cs. If found, return the block's reference info + * in *dr. Otherwise, insert a new entry in the dedup table, using + * the reference information specified by *dr. + * + * return value: true - entry was found + * false - entry was not found + */ +static boolean_t +ddt_update(libzfs_handle_t *hdl, dedup_table_t *ddt, zio_cksum_t *cs, + dataref_t *dr) +{ + uint32_t hashcode; + dedup_entry_t **ddepp; + + hashcode = BF64_GET(cs->zc_word[0], 0, ddt->numhashbits); + + for (ddepp = &(ddt->dedup_hash_array[hashcode]); *ddepp != NULL; + ddepp = &((*ddepp)->dde_next)) { + if (ZIO_CHECKSUM_EQUAL(((*ddepp)->dde_chksum), *cs)) { + *dr = (*ddepp)->dde_ref; + return (B_TRUE); + } + } + ddt_hash_append(hdl, ddt, ddepp, cs, dr); + return (B_FALSE); +} + +static int +cksum_and_write(const void *buf, uint64_t len, zio_cksum_t *zc, int outfd) +{ + fletcher_4_incremental_native(buf, len, zc); + return (write(outfd, buf, len)); +} + +/* + * This function is started in a separate thread when the dedup option + * has been requested. The main send thread determines the list of + * snapshots to be included in the send stream and makes the ioctl calls + * for each one. But instead of having the ioctl send the output to the + * the output fd specified by the caller of zfs_send()), the + * ioctl is told to direct the output to a pipe, which is read by the + * alternate thread running THIS function. This function does the + * dedup'ing by: + * 1. building a dedup table (the DDT) + * 2. doing checksums on each data block and inserting a record in the DDT + * 3. looking for matching checksums, and + * 4. sending a DRR_WRITE_BYREF record instead of a write record whenever + * a duplicate block is found. + * The output of this function then goes to the output fd requested + * by the caller of zfs_send(). + */ +static void * +cksummer(void *arg) +{ + dedup_arg_t *dda = arg; + char *buf = malloc(1<<20); + dmu_replay_record_t thedrr; + dmu_replay_record_t *drr = &thedrr; + struct drr_begin *drrb = &thedrr.drr_u.drr_begin; + struct drr_end *drre = &thedrr.drr_u.drr_end; + struct drr_object *drro = &thedrr.drr_u.drr_object; + struct drr_write *drrw = &thedrr.drr_u.drr_write; + FILE *ofp; + int outfd; + dmu_replay_record_t wbr_drr; + struct drr_write_byref *wbr_drrr = &wbr_drr.drr_u.drr_write_byref; + dedup_table_t ddt; + zio_cksum_t stream_cksum; + uint64_t physmem = sysconf(_SC_PHYS_PAGES) * sysconf(_SC_PAGESIZE); + uint64_t numbuckets; + + ddt.max_ddt_size = + MAX((physmem * MAX_DDT_PHYSMEM_PERCENT)/100, + SMALLEST_POSSIBLE_MAX_DDT_MB<<20); + + numbuckets = ddt.max_ddt_size/(sizeof (dedup_entry_t)); + + /* + * numbuckets must be a power of 2. Increase number to + * a power of 2 if necessary. + */ + if (!ISP2(numbuckets)) + numbuckets = 1 << high_order_bit(numbuckets); + + ddt.dedup_hash_array = calloc(numbuckets, sizeof (dedup_entry_t *)); + ddt.ddecache = umem_cache_create("dde", sizeof (dedup_entry_t), 0, + NULL, NULL, NULL, NULL, NULL, 0); + ddt.cur_ddt_size = numbuckets * sizeof (dedup_entry_t *); + ddt.numhashbits = high_order_bit(numbuckets) - 1; + ddt.ddt_full = B_FALSE; + + /* Initialize the write-by-reference block. */ + wbr_drr.drr_type = DRR_WRITE_BYREF; + wbr_drr.drr_payloadlen = 0; + + outfd = dda->outputfd; + ofp = fdopen(dda->inputfd, "r"); + while (ssread(drr, sizeof (dmu_replay_record_t), ofp) != 0) { + + switch (drr->drr_type) { + case DRR_BEGIN: + { + int fflags; + ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0); + + /* set the DEDUP feature flag for this stream */ + fflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo); + fflags |= DMU_BACKUP_FEATURE_DEDUP; + DMU_SET_FEATUREFLAGS(drrb->drr_versioninfo, fflags); + + if (cksum_and_write(drr, sizeof (dmu_replay_record_t), + &stream_cksum, outfd) == -1) + goto out; + if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) == + DMU_COMPOUNDSTREAM && drr->drr_payloadlen != 0) { + int sz = drr->drr_payloadlen; + + if (sz > 1<<20) { + free(buf); + buf = malloc(sz); + } + (void) ssread(buf, sz, ofp); + if (ferror(stdin)) + perror("fread"); + if (cksum_and_write(buf, sz, &stream_cksum, + outfd) == -1) + goto out; + } + break; + } + + case DRR_END: + { + /* use the recalculated checksum */ + ZIO_SET_CHECKSUM(&drre->drr_checksum, + stream_cksum.zc_word[0], stream_cksum.zc_word[1], + stream_cksum.zc_word[2], stream_cksum.zc_word[3]); + if ((write(outfd, drr, + sizeof (dmu_replay_record_t))) == -1) + goto out; + break; + } + + case DRR_OBJECT: + { + if (cksum_and_write(drr, sizeof (dmu_replay_record_t), + &stream_cksum, outfd) == -1) + goto out; + if (drro->drr_bonuslen > 0) { + (void) ssread(buf, + P2ROUNDUP((uint64_t)drro->drr_bonuslen, 8), + ofp); + if (cksum_and_write(buf, + P2ROUNDUP((uint64_t)drro->drr_bonuslen, 8), + &stream_cksum, outfd) == -1) + goto out; + } + break; + } + + case DRR_FREEOBJECTS: + { + if (cksum_and_write(drr, sizeof (dmu_replay_record_t), + &stream_cksum, outfd) == -1) + goto out; + break; + } + + case DRR_WRITE: + { + dataref_t dataref; + + (void) ssread(buf, drrw->drr_length, ofp); + /* + * If the block doesn't already have a dedup + * checksum, calculate one. + */ + if (ZIO_CHECKSUM_EQUAL(drrw->drr_blkcksum, + zero_cksum)) { + SHA256_CTX ctx; + zio_cksum_t tmpsha256; + + SHA256Init(&ctx); + SHA256Update(&ctx, buf, drrw->drr_length); + SHA256Final(&tmpsha256, &ctx); + drrw->drr_blkcksum.zc_word[0] = + BE_64(tmpsha256.zc_word[0]); + drrw->drr_blkcksum.zc_word[1] = + BE_64(tmpsha256.zc_word[1]); + drrw->drr_blkcksum.zc_word[2] = + BE_64(tmpsha256.zc_word[2]); + drrw->drr_blkcksum.zc_word[3] = + BE_64(tmpsha256.zc_word[3]); + } + + dataref.ref_guid = drrw->drr_toguid; + dataref.ref_object = drrw->drr_object; + dataref.ref_offset = drrw->drr_offset; + + if (ddt_update(dda->dedup_hdl, &ddt, + &drrw->drr_blkcksum, &dataref)) { + /* block already present in stream */ + wbr_drrr->drr_object = drrw->drr_object; + wbr_drrr->drr_offset = drrw->drr_offset; + wbr_drrr->drr_length = drrw->drr_length; + wbr_drrr->drr_toguid = drrw->drr_toguid; + wbr_drrr->drr_refguid = dataref.ref_guid; + wbr_drrr->drr_refobject = + dataref.ref_object; + wbr_drrr->drr_refoffset = + dataref.ref_offset; + + wbr_drrr->drr_blkcksum = drrw->drr_blkcksum; + + if (cksum_and_write(&wbr_drr, + sizeof (dmu_replay_record_t), &stream_cksum, + outfd) == -1) + goto out; + } else { + /* block not previously seen */ + if (cksum_and_write(drr, + sizeof (dmu_replay_record_t), &stream_cksum, + outfd) == -1) + goto out; + if (cksum_and_write(buf, + drrw->drr_length, + &stream_cksum, outfd) == -1) + goto out; + } + break; + } + + case DRR_FREE: + { + if (cksum_and_write(drr, sizeof (dmu_replay_record_t), + &stream_cksum, outfd) == -1) + goto out; + break; + } + + default: + (void) printf("INVALID record type 0x%x\n", + drr->drr_type); + /* should never happen, so assert */ + assert(B_FALSE); + } + } +out: + umem_cache_destroy(ddt.ddecache); + free(ddt.dedup_hash_array); + free(buf); + (void) fclose(ofp); + + return (NULL); +} + /* * Routines for dealing with the AVL tree of fs-nvlists */ @@ -451,13 +794,15 @@ /* these are all just the short snapname (the part after the @) */ const char *fromsnap; const char *tosnap; - char lastsnap[ZFS_MAXNAMELEN]; + char prevsnap[ZFS_MAXNAMELEN]; boolean_t seenfrom, seento, replicate, doall, fromorigin; boolean_t verbose; int outfd; boolean_t err; nvlist_t *fss; avl_tree_t *fsavl; + snapfilter_cb_t *filter_cb; + void *filter_cb_arg; } send_dump_data_t; /* @@ -535,7 +880,7 @@ if (sdd->fromsnap && !sdd->seenfrom && strcmp(sdd->fromsnap, thissnap) == 0) { sdd->seenfrom = B_TRUE; - (void) strcpy(sdd->lastsnap, thissnap); + (void) strcpy(sdd->prevsnap, thissnap); zfs_close(zhp); return (0); } @@ -545,20 +890,38 @@ return (0); } + if (strcmp(sdd->tosnap, thissnap) == 0) + sdd->seento = B_TRUE; + + /* + * If a filter function exists, call it to determine whether + * this snapshot will be sent. + */ + if (sdd->filter_cb != NULL && + sdd->filter_cb(zhp, sdd->filter_cb_arg) == B_FALSE) { + /* + * This snapshot is filtered out. Don't send it, and don't + * set prevsnap, so it will be as if this snapshot didn't + * exist, and the next accepted snapshot will be sent as + * an incremental from the last accepted one, or as the + * first (and full) snapshot in the case of a replication, + * non-incremental send. + */ + zfs_close(zhp); + return (0); + } + /* send it */ if (sdd->verbose) { (void) fprintf(stderr, "sending from @%s to %s\n", - sdd->lastsnap, zhp->zfs_name); + sdd->prevsnap, zhp->zfs_name); } - err = dump_ioctl(zhp, sdd->lastsnap, - sdd->lastsnap[0] == '\0' && (sdd->fromorigin || sdd->replicate), + err = dump_ioctl(zhp, sdd->prevsnap, + sdd->prevsnap[0] == '\0' && (sdd->fromorigin || sdd->replicate), sdd->outfd); - if (!sdd->seento && strcmp(sdd->tosnap, thissnap) == 0) - sdd->seento = B_TRUE; - - (void) strcpy(sdd->lastsnap, thissnap); + (void) strcpy(sdd->prevsnap, thissnap); zfs_close(zhp); return (err); } @@ -598,7 +961,7 @@ } if (sdd->doall) { - sdd->seenfrom = sdd->seento = sdd->lastsnap[0] = 0; + sdd->seenfrom = sdd->seento = sdd->prevsnap[0] = 0; if (sdd->fromsnap == NULL || missingfrom) sdd->seenfrom = B_TRUE; @@ -635,10 +998,14 @@ if (snapzhp == NULL) { rv = -1; } else { - rv = dump_ioctl(snapzhp, - missingfrom ? NULL : sdd->fromsnap, - sdd->fromorigin || missingfrom, - sdd->outfd); + if (sdd->filter_cb == NULL || + sdd->filter_cb(snapzhp, sdd->filter_cb_arg) == + B_TRUE) { + rv = dump_ioctl(snapzhp, + missingfrom ? NULL : sdd->fromsnap, + sdd->fromorigin || missingfrom, + sdd->outfd); + } sdd->seento = B_TRUE; zfs_close(snapzhp); } @@ -678,12 +1045,12 @@ origin_nv = fsavl_find(sdd->fsavl, origin_guid, NULL); if (origin_nv && nvlist_lookup_boolean(origin_nv, "sent") == ENOENT) { - /* - * origin has not been sent yet; - * skip this clone. - */ - needagain = B_TRUE; - continue; + /* + * origin has not been sent yet; + * skip this clone. + */ + needagain = B_TRUE; + continue; } zhp = zfs_open(rzhp->zfs_hdl, fsname, ZFS_TYPE_DATASET); @@ -714,15 +1081,15 @@ * is TRUE. * * The send stream is recursive (i.e. dumps a hierarchy of snapshots) and - * uses a special header (with a version field of DMU_BACKUP_HEADER_VERSION) + * uses a special header (with a hdrtype field of DMU_COMPOUNDSTREAM) * if "replicate" is set. If "doall" is set, dump all the intermediate - * snapshots. The DMU_BACKUP_HEADER_VERSION header is used in the "doall" + * snapshots. The DMU_COMPOUNDSTREAM header is used in the "doall" * case too. */ int zfs_send(zfs_handle_t *zhp, const char *fromsnap, const char *tosnap, - boolean_t replicate, boolean_t doall, boolean_t fromorigin, - boolean_t verbose, int outfd) + sendflags_t flags, int outfd, snapfilter_cb_t filter_func, + void *cb_arg) { char errbuf[1024]; send_dump_data_t sdd = { 0 }; @@ -733,6 +1100,10 @@ static uint64_t holdseq; int spa_version; boolean_t holdsnaps = B_FALSE; + pthread_t tid; + int pipefd[2]; + dedup_arg_t dda = { 0 }; + int featureflags = 0; (void) snprintf(errbuf, sizeof (errbuf), dgettext(TEXT_DOMAIN, "cannot send '%s'"), zhp->zfs_name); @@ -747,13 +1118,32 @@ spa_version >= SPA_VERSION_USERREFS) holdsnaps = B_TRUE; - if (replicate || doall) { + if (flags.dedup) { + featureflags |= DMU_BACKUP_FEATURE_DEDUP; + if (err = pipe(pipefd)) { + zfs_error_aux(zhp->zfs_hdl, strerror(errno)); + return (zfs_error(zhp->zfs_hdl, EZFS_PIPEFAILED, + errbuf)); + } + dda.outputfd = outfd; + dda.inputfd = pipefd[1]; + dda.dedup_hdl = zhp->zfs_hdl; + if (err = pthread_create(&tid, NULL, cksummer, &dda)) { + (void) close(pipefd[0]); + (void) close(pipefd[1]); + zfs_error_aux(zhp->zfs_hdl, strerror(errno)); + return (zfs_error(zhp->zfs_hdl, + EZFS_THREADCREATEFAILED, errbuf)); + } + } + + if (flags.replicate || flags.doall) { dmu_replay_record_t drr = { 0 }; char *packbuf = NULL; size_t buflen = 0; zio_cksum_t zc = { 0 }; - assert(fromsnap || doall); + assert(fromsnap || flags.doall); if (holdsnaps) { (void) snprintf(holdtag, sizeof (holdtag), @@ -762,9 +1152,11 @@ err = zfs_hold_range(zhp, fromsnap, tosnap, holdtag, B_TRUE); if (err) - return (err); + goto err_out; } - if (replicate) { + + + if (flags.replicate) { nvlist_t *hdrnv; VERIFY(0 == nvlist_alloc(&hdrnv, NV_UNIQUE_NAME, 0)); @@ -781,7 +1173,7 @@ (void) zfs_release_range(zhp, fromsnap, tosnap, holdtag); } - return (err); + goto err_out; } VERIFY(0 == nvlist_add_nvlist(hdrnv, "fss", fss)); err = nvlist_pack(hdrnv, &packbuf, &buflen, @@ -794,26 +1186,26 @@ (void) zfs_release_range(zhp, fromsnap, tosnap, holdtag); } - return (zfs_standard_error(zhp->zfs_hdl, - err, errbuf)); + goto stderr_out; } } /* write first begin record */ drr.drr_type = DRR_BEGIN; drr.drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC; - drr.drr_u.drr_begin.drr_version = DMU_BACKUP_HEADER_VERSION; + DMU_SET_STREAM_HDRTYPE(drr.drr_u.drr_begin.drr_versioninfo, + DMU_COMPOUNDSTREAM); + DMU_SET_FEATUREFLAGS(drr.drr_u.drr_begin.drr_versioninfo, + featureflags); (void) snprintf(drr.drr_u.drr_begin.drr_toname, sizeof (drr.drr_u.drr_begin.drr_toname), "%s@%s", zhp->zfs_name, tosnap); drr.drr_payloadlen = buflen; - fletcher_4_incremental_native(&drr, sizeof (drr), &zc); - err = write(outfd, &drr, sizeof (drr)); + err = cksum_and_write(&drr, sizeof (drr), &zc, outfd); /* write header nvlist */ - if (err != -1) { - fletcher_4_incremental_native(packbuf, buflen, &zc); - err = write(outfd, packbuf, buflen); + if (err != -1 && flags.replicate) { + err = cksum_and_write(packbuf, buflen, &zc, outfd); } free(packbuf); if (err == -1) { @@ -823,8 +1215,8 @@ (void) zfs_release_range(zhp, fromsnap, tosnap, holdtag); } - return (zfs_standard_error(zhp->zfs_hdl, - errno, errbuf)); + err = errno; + goto stderr_out; } /* write end record */ @@ -840,8 +1232,8 @@ (void) zfs_release_range(zhp, fromsnap, tosnap, holdtag); } - return (zfs_standard_error(zhp->zfs_hdl, - errno, errbuf)); + err = errno; + goto stderr_out; } } } @@ -849,18 +1241,27 @@ /* dump each stream */ sdd.fromsnap = fromsnap; sdd.tosnap = tosnap; - sdd.outfd = outfd; - sdd.replicate = replicate; - sdd.doall = doall; - sdd.fromorigin = fromorigin; + if (flags.dedup) + sdd.outfd = pipefd[0]; + else + sdd.outfd = outfd; + sdd.replicate = flags.replicate; + sdd.doall = flags.doall; + sdd.fromorigin = flags.fromorigin; sdd.fss = fss; sdd.fsavl = fsavl; - sdd.verbose = verbose; + sdd.verbose = flags.verbose; + sdd.filter_cb = filter_func; + sdd.filter_cb_arg = cb_arg; err = dump_filesystems(zhp, &sdd); fsavl_destroy(fsavl); nvlist_free(fss); - if (replicate || doall) { + if (flags.dedup) { + (void) close(pipefd[0]); + (void) pthread_join(tid, NULL); + } + if (flags.replicate || flags.doall) { /* * write final end record. NB: want to do this even if * there was some error, because it might not be totally @@ -879,6 +1280,16 @@ } return (err || sdd.err); + +stderr_out: + err = zfs_standard_error(zhp->zfs_hdl, err, errbuf); +err_out: + if (flags.dedup) { + (void) pthread_cancel(tid); + (void) pthread_join(tid, NULL); + (void) close(pipefd[0]); + } + return (err); } /* @@ -1459,7 +1870,8 @@ assert(drr->drr_type == DRR_BEGIN); assert(drr->drr_u.drr_begin.drr_magic == DMU_BACKUP_MAGIC); - assert(drr->drr_u.drr_begin.drr_version == DMU_BACKUP_HEADER_VERSION); + assert(DMU_GET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo) == + DMU_COMPOUNDSTREAM); /* * Read in the nvlist from the stream. @@ -1582,6 +1994,10 @@ { dmu_replay_record_t *drr; void *buf = malloc(1<<20); + char errbuf[1024]; + + (void) snprintf(errbuf, sizeof (errbuf), dgettext(TEXT_DOMAIN, + "cannot receive:")); /* XXX would be great to use lseek if possible... */ drr = buf; @@ -1594,7 +2010,11 @@ switch (drr->drr_type) { case DRR_BEGIN: /* NB: not to be used on v2 stream packages */ - assert(drr->drr_payloadlen == 0); + if (drr->drr_payloadlen != 0) { + zfs_error_aux(hdl, dgettext(TEXT_DOMAIN, + "invalid substream header")); + return (zfs_error(hdl, EZFS_BADSTREAM, errbuf)); + } break; case DRR_END: @@ -1621,12 +2041,15 @@ drr->drr_u.drr_write.drr_length, B_FALSE, NULL); break; + case DRR_WRITE_BYREF: case DRR_FREEOBJECTS: case DRR_FREE: break; default: - assert(!"invalid record type"); + zfs_error_aux(hdl, dgettext(TEXT_DOMAIN, + "invalid record type")); + return (zfs_error(hdl, EZFS_BADSTREAM, errbuf)); } } @@ -1728,6 +2151,7 @@ /* * Determine name of destination snapshot, store in zc_value. */ + (void) strcpy(zc.zc_top_ds, tosnap); (void) strcpy(zc.zc_value, tosnap); (void) strncat(zc.zc_value, drrb->drr_toname+choplen, sizeof (zc.zc_value)); @@ -2068,6 +2492,8 @@ struct drr_begin *drrb = &drr.drr_u.drr_begin; char errbuf[1024]; zio_cksum_t zcksum = { 0 }; + uint64_t featureflags; + int hdrtype; (void) snprintf(errbuf, sizeof (errbuf), dgettext(TEXT_DOMAIN, "cannot receive")); @@ -2105,7 +2531,7 @@ drr.drr_type = BSWAP_32(drr.drr_type); drr.drr_payloadlen = BSWAP_32(drr.drr_payloadlen); drrb->drr_magic = BSWAP_64(drrb->drr_magic); - drrb->drr_version = BSWAP_64(drrb->drr_version); + drrb->drr_versioninfo = BSWAP_64(drrb->drr_versioninfo); drrb->drr_creation_time = BSWAP_64(drrb->drr_creation_time); drrb->drr_type = BSWAP_32(drrb->drr_type); drrb->drr_flags = BSWAP_32(drrb->drr_flags); @@ -2119,23 +2545,31 @@ return (zfs_error(hdl, EZFS_BADSTREAM, errbuf)); } + featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo); + hdrtype = DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo); + + if (!DMU_STREAM_SUPPORTED(featureflags) || + (hdrtype != DMU_SUBSTREAM && hdrtype != DMU_COMPOUNDSTREAM)) { + zfs_error_aux(hdl, dgettext(TEXT_DOMAIN, + "stream has unsupported feature, feature flags = %lx"), + featureflags); + return (zfs_error(hdl, EZFS_BADSTREAM, errbuf)); + } + if (strchr(drrb->drr_toname, '@') == NULL) { zfs_error_aux(hdl, dgettext(TEXT_DOMAIN, "invalid " "stream (bad snapshot name)")); return (zfs_error(hdl, EZFS_BADSTREAM, errbuf)); } - if (drrb->drr_version == DMU_BACKUP_STREAM_VERSION) { + if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) == DMU_SUBSTREAM) { return (zfs_receive_one(hdl, infd, tosnap, flags, &drr, &drr_noswap, stream_avl, top_zfs)); - } else if (drrb->drr_version == DMU_BACKUP_HEADER_VERSION) { + } else { /* must be DMU_COMPOUNDSTREAM */ + assert(DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) == + DMU_COMPOUNDSTREAM); return (zfs_receive_package(hdl, infd, tosnap, flags, &drr, &zcksum, top_zfs)); - } else { - zfs_error_aux(hdl, dgettext(TEXT_DOMAIN, - "stream is unsupported version %llu"), - drrb->drr_version); - return (zfs_error(hdl, EZFS_BADSTREAM, errbuf)); } } diff -r 4fe66eb82610 -r 216d8396182e usr/src/lib/libzfs/common/libzfs_util.c --- a/usr/src/lib/libzfs/common/libzfs_util.c Mon Nov 09 11:54:00 2009 +0530 +++ b/usr/src/lib/libzfs/common/libzfs_util.c Mon Nov 09 11:04:55 2009 -0700 @@ -216,6 +216,10 @@ "dataset")); case EZFS_TAGTOOLONG: return (dgettext(TEXT_DOMAIN, "tag too long")); + case EZFS_PIPEFAILED: + return (dgettext(TEXT_DOMAIN, "pipe create failed")); + case EZFS_THREADCREATEFAILED: + return (dgettext(TEXT_DOMAIN, "thread create failed")); case EZFS_UNKNOWN: return (dgettext(TEXT_DOMAIN, "unknown error")); default: diff -r 4fe66eb82610 -r 216d8396182e usr/src/uts/common/fs/zfs/dmu_send.c --- a/usr/src/uts/common/fs/zfs/dmu_send.c Mon Nov 09 11:54:00 2009 +0530 +++ b/usr/src/uts/common/fs/zfs/dmu_send.c Mon Nov 09 11:04:55 2009 -0700 @@ -38,16 +38,31 @@ #include #include #include +#include static char *dmu_recv_tag = "dmu_recv_tag"; +/* + * The list of data whose inclusion in a send stream can be pending from + * one call to backup_cb to another. Multiple calls to dump_free() and + * dump_freeobjects() can be aggregated into a single DRR_FREE or + * DRR_FREEOBJECTS replay record. + */ +typedef enum { + PENDING_NONE, + PENDING_FREE, + PENDING_FREEOBJECTS +} pendop_t; + struct backuparg { dmu_replay_record_t *drr; vnode_t *vp; offset_t *off; objset_t *os; zio_cksum_t zc; + uint64_t toguid; int err; + pendop_t pending_op; }; static int @@ -68,15 +83,59 @@ dump_free(struct backuparg *ba, uint64_t object, uint64_t offset, uint64_t length) { - /* write a FREE record */ + struct drr_free *drrf = &(ba->drr->drr_u.drr_free); + + /* + * If there is a pending op, but it's not PENDING_FREE, push it out, + * since free block aggregation can only be done for blocks of the + * same type (i.e., DRR_FREE records can only be aggregated with + * other DRR_FREE records. DRR_FREEOBJECTS records can only be + * aggregated with other DRR_FREEOBJECTS records. + */ + if (ba->pending_op != PENDING_NONE && ba->pending_op != PENDING_FREE) { + if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0) + return (EINTR); + ba->pending_op = PENDING_NONE; + } + + if (ba->pending_op == PENDING_FREE) { + /* + * There should never be a PENDING_FREE if length is -1 + * (because dump_dnode is the only place where this + * function is called with a -1, and only after flushing + * any pending record). + */ + ASSERT(length != -1ULL); + /* + * Check to see whether this free block can be aggregated + * with pending one. + */ + if (drrf->drr_object == object && drrf->drr_offset + + drrf->drr_length == offset) { + drrf->drr_length += length; + return (0); + } else { + /* not a continuation. Push out pending record */ + if (dump_bytes(ba, ba->drr, + sizeof (dmu_replay_record_t)) != 0) + return (EINTR); + ba->pending_op = PENDING_NONE; + } + } + /* create a FREE record and make it pending */ bzero(ba->drr, sizeof (dmu_replay_record_t)); ba->drr->drr_type = DRR_FREE; - ba->drr->drr_u.drr_free.drr_object = object; - ba->drr->drr_u.drr_free.drr_offset = offset; - ba->drr->drr_u.drr_free.drr_length = length; + drrf->drr_object = object; + drrf->drr_offset = offset; + drrf->drr_length = length; + drrf->drr_toguid = ba->toguid; + if (length == -1ULL) { + if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0) + return (EINTR); + } else { + ba->pending_op = PENDING_FREE; + } - if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t))) - return (EINTR); return (0); } @@ -84,17 +143,31 @@ dump_data(struct backuparg *ba, dmu_object_type_t type, uint64_t object, uint64_t offset, int blksz, void *data) { + struct drr_write *drrw = &(ba->drr->drr_u.drr_write); + + /* + * If there is any kind of pending aggregation (currently either + * a grouping of free objects or free blocks), push it out to + * the stream, since aggregation can't be done across operations + * of different types. + */ + if (ba->pending_op != PENDING_NONE) { + if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0) + return (EINTR); + ba->pending_op = PENDING_NONE; + } /* write a DATA record */ bzero(ba->drr, sizeof (dmu_replay_record_t)); ba->drr->drr_type = DRR_WRITE; - ba->drr->drr_u.drr_write.drr_object = object; - ba->drr->drr_u.drr_write.drr_type = type; - ba->drr->drr_u.drr_write.drr_offset = offset; - ba->drr->drr_u.drr_write.drr_length = blksz; + drrw->drr_object = object; + drrw->drr_type = type; + drrw->drr_offset = offset; + drrw->drr_length = blksz; + drrw->drr_toguid = ba->toguid; - if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t))) + if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0) return (EINTR); - if (dump_bytes(ba, data, blksz)) + if (dump_bytes(ba, data, blksz) != 0) return (EINTR); return (0); } @@ -102,39 +175,80 @@ static int dump_freeobjects(struct backuparg *ba, uint64_t firstobj, uint64_t numobjs) { + struct drr_freeobjects *drrfo = &(ba->drr->drr_u.drr_freeobjects); + + /* + * If there is a pending op, but it's not PENDING_FREEOBJECTS, + * push it out, since free block aggregation can only be done for + * blocks of the same type (i.e., DRR_FREE records can only be + * aggregated with other DRR_FREE records. DRR_FREEOBJECTS records + * can only be aggregated with other DRR_FREEOBJECTS records. + */ + if (ba->pending_op != PENDING_NONE && + ba->pending_op != PENDING_FREEOBJECTS) { + if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0) + return (EINTR); + ba->pending_op = PENDING_NONE; + } + if (ba->pending_op == PENDING_FREEOBJECTS) { + /* + * See whether this free object array can be aggregated + * with pending one + */ + if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) { + drrfo->drr_numobjs += numobjs; + return (0); + } else { + /* can't be aggregated. Push out pending record */ + if (dump_bytes(ba, ba->drr, + sizeof (dmu_replay_record_t)) != 0) + return (EINTR); + ba->pending_op = PENDING_NONE; + } + } + /* write a FREEOBJECTS record */ bzero(ba->drr, sizeof (dmu_replay_record_t)); ba->drr->drr_type = DRR_FREEOBJECTS; - ba->drr->drr_u.drr_freeobjects.drr_firstobj = firstobj; - ba->drr->drr_u.drr_freeobjects.drr_numobjs = numobjs; + drrfo->drr_firstobj = firstobj; + drrfo->drr_numobjs = numobjs; + drrfo->drr_toguid = ba->toguid; - if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t))) - return (EINTR); + ba->pending_op = PENDING_FREEOBJECTS; + return (0); } static int dump_dnode(struct backuparg *ba, uint64_t object, dnode_phys_t *dnp) { + struct drr_object *drro = &(ba->drr->drr_u.drr_object); + if (dnp == NULL || dnp->dn_type == DMU_OT_NONE) return (dump_freeobjects(ba, object, 1)); + if (ba->pending_op != PENDING_NONE) { + if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0) + return (EINTR); + ba->pending_op = PENDING_NONE; + } + /* write an OBJECT record */ bzero(ba->drr, sizeof (dmu_replay_record_t)); ba->drr->drr_type = DRR_OBJECT; - ba->drr->drr_u.drr_object.drr_object = object; - ba->drr->drr_u.drr_object.drr_type = dnp->dn_type; - ba->drr->drr_u.drr_object.drr_bonustype = dnp->dn_bonustype; - ba->drr->drr_u.drr_object.drr_blksz = - dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT; - ba->drr->drr_u.drr_object.drr_bonuslen = dnp->dn_bonuslen; - ba->drr->drr_u.drr_object.drr_checksum = dnp->dn_checksum; - ba->drr->drr_u.drr_object.drr_compress = dnp->dn_compress; + drro->drr_object = object; + drro->drr_type = dnp->dn_type; + drro->drr_bonustype = dnp->dn_bonustype; + drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT; + drro->drr_bonuslen = dnp->dn_bonuslen; + drro->drr_checksumtype = dnp->dn_checksum; + drro->drr_compress = dnp->dn_compress; + drro->drr_toguid = ba->toguid; - if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t))) + if (dump_bytes(ba, ba->drr, sizeof (dmu_replay_record_t)) != 0) return (EINTR); - if (dump_bytes(ba, DN_BONUS(dnp), P2ROUNDUP(dnp->dn_bonuslen, 8))) + if (dump_bytes(ba, DN_BONUS(dnp), P2ROUNDUP(dnp->dn_bonuslen, 8)) != 0) return (EINTR); /* free anything past the end of the file */ @@ -256,7 +370,8 @@ drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP); drr->drr_type = DRR_BEGIN; drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC; - drr->drr_u.drr_begin.drr_version = DMU_BACKUP_STREAM_VERSION; + DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo, + DMU_SUBSTREAM); drr->drr_u.drr_begin.drr_creation_time = ds->ds_phys->ds_creation_time; drr->drr_u.drr_begin.drr_type = tosnap->os_phys->os_type; @@ -279,9 +394,11 @@ ba.vp = vp; ba.os = tosnap; ba.off = off; + ba.toguid = ds->ds_phys->ds_guid; ZIO_SET_CHECKSUM(&ba.zc, 0, 0, 0, 0); + ba.pending_op = PENDING_NONE; - if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t))) { + if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t)) != 0) { kmem_free(drr, sizeof (dmu_replay_record_t)); return (ba.err); } @@ -289,6 +406,10 @@ err = traverse_dataset(ds, fromtxg, TRAVERSE_PRE | TRAVERSE_PREFETCH, backup_cb, &ba); + if (ba.pending_op != PENDING_NONE) + if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t)) != 0) + err = EINTR; + if (err) { if (err == EINTR && ba.err) err = ba.err; @@ -299,8 +420,9 @@ bzero(drr, sizeof (dmu_replay_record_t)); drr->drr_type = DRR_END; drr->drr_u.drr_end.drr_checksum = ba.zc; + drr->drr_u.drr_end.drr_toguid = ba.toguid; - if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t))) { + if (dump_bytes(&ba, drr, sizeof (dmu_replay_record_t)) != 0) { kmem_free(drr, sizeof (dmu_replay_record_t)); return (ba.err); } @@ -459,13 +581,13 @@ * succeeds; otherwise we will leak the holds on the datasets. */ int -dmu_recv_begin(char *tofs, char *tosnap, struct drr_begin *drrb, +dmu_recv_begin(char *tofs, char *tosnap, char *top_ds, struct drr_begin *drrb, boolean_t force, objset_t *origin, dmu_recv_cookie_t *drc) { int err = 0; boolean_t byteswap; struct recvbeginsyncarg rbsa = { 0 }; - uint64_t version; + uint64_t versioninfo; int flags; dsl_dataset_t *ds; @@ -483,17 +605,17 @@ rbsa.type = drrb->drr_type; rbsa.tag = FTAG; rbsa.dsflags = 0; - version = drrb->drr_version; + versioninfo = drrb->drr_versioninfo; flags = drrb->drr_flags; if (byteswap) { rbsa.type = BSWAP_32(rbsa.type); rbsa.fromguid = BSWAP_64(rbsa.fromguid); - version = BSWAP_64(version); + versioninfo = BSWAP_64(versioninfo); flags = BSWAP_32(flags); } - if (version != DMU_BACKUP_STREAM_VERSION || + if (DMU_GET_STREAM_HDRTYPE(versioninfo) == DMU_COMPOUNDSTREAM || rbsa.type >= DMU_OST_NUMTYPES || ((flags & DRR_FLAG_CLONE) && origin == NULL)) return (EINVAL); @@ -504,6 +626,7 @@ bzero(drc, sizeof (dmu_recv_cookie_t)); drc->drc_drrb = drrb; drc->drc_tosnap = tosnap; + drc->drc_top_ds = top_ds; drc->drc_force = force; /* @@ -579,8 +702,85 @@ uint64_t voff; int bufsize; /* amount of memory allocated for buf */ zio_cksum_t cksum; + avl_tree_t guid_to_ds_map; }; +typedef struct guid_map_entry { + uint64_t guid; + dsl_dataset_t *gme_ds; + avl_node_t avlnode; +} guid_map_entry_t; + +static int +guid_compare(const void *arg1, const void *arg2) +{ + const guid_map_entry_t *gmep1 = arg1; + const guid_map_entry_t *gmep2 = arg2; + + if (gmep1->guid < gmep2->guid) + return (-1); + else if (gmep1->guid > gmep2->guid) + return (1); + return (0); +} + +/* + * This function is a callback used by dmu_objset_find() (which + * enumerates the object sets) to build an avl tree that maps guids + * to datasets. The resulting table is used when processing DRR_WRITE_BYREF + * send stream records. These records, which are used in dedup'ed + * streams, do not contain data themselves, but refer to a copy + * of the data block that has already been written because it was + * earlier in the stream. That previous copy is identified by the + * guid of the dataset with the referenced data. + */ +int +find_ds_by_guid(char *name, void *arg) +{ + dsl_dataset_t *ds, *snapds; + avl_tree_t *guid_map = arg; + guid_map_entry_t *gmep; + guid_map_entry_t gmesrch; + dsl_pool_t *dp; + int err; + uint64_t lastobj, firstobj; + + if (dsl_dataset_hold(name, FTAG, &ds) != 0) + return (0); + + dp = ds->ds_dir->dd_pool; + rw_enter(&dp->dp_config_rwlock, RW_READER); + firstobj = ds->ds_dir->dd_phys->dd_origin_obj; + lastobj = ds->ds_phys->ds_prev_snap_obj; + + while (lastobj != firstobj) { + err = dsl_dataset_hold_obj(dp, lastobj, guid_map, &snapds); + if (err) { + /* + * Skip this snapshot and move on. It's not + * clear why this would ever happen, but the + * remainder of the snapshot streadm can be + * processed. + */ + rw_exit(&dp->dp_config_rwlock); + dsl_dataset_rele(ds, FTAG); + return (0); + } + + gmesrch.guid = snapds->ds_phys->ds_guid; + gmep = kmem_alloc(sizeof (guid_map_entry_t), KM_SLEEP); + gmep->guid = snapds->ds_phys->ds_guid; + gmep->gme_ds = snapds; + avl_add(guid_map, gmep); + lastobj = snapds->ds_phys->ds_prev_snap_obj; + } + + rw_exit(&dp->dp_config_rwlock); + dsl_dataset_rele(ds, FTAG); + + return (0); +} + static void * restore_read(struct restorearg *ra, int len) { @@ -625,7 +825,7 @@ switch (drr->drr_type) { case DRR_BEGIN: DO64(drr_begin.drr_magic); - DO64(drr_begin.drr_version); + DO64(drr_begin.drr_versioninfo); DO64(drr_begin.drr_creation_time); DO32(drr_begin.drr_type); DO32(drr_begin.drr_flags); @@ -639,27 +839,49 @@ DO32(drr_object.drr_bonustype); DO32(drr_object.drr_blksz); DO32(drr_object.drr_bonuslen); + DO64(drr_object.drr_toguid); break; case DRR_FREEOBJECTS: DO64(drr_freeobjects.drr_firstobj); DO64(drr_freeobjects.drr_numobjs); + DO64(drr_freeobjects.drr_toguid); break; case DRR_WRITE: DO64(drr_write.drr_object); DO32(drr_write.drr_type); DO64(drr_write.drr_offset); DO64(drr_write.drr_length); + DO64(drr_write.drr_toguid); + DO64(drr_write.drr_blkcksum.zc_word[0]); + DO64(drr_write.drr_blkcksum.zc_word[1]); + DO64(drr_write.drr_blkcksum.zc_word[2]); + DO64(drr_write.drr_blkcksum.zc_word[3]); + break; + case DRR_WRITE_BYREF: + DO64(drr_write_byref.drr_object); + DO64(drr_write_byref.drr_offset); + DO64(drr_write_byref.drr_length); + DO64(drr_write_byref.drr_toguid); + DO64(drr_write_byref.drr_refguid); + DO64(drr_write_byref.drr_refobject); + DO64(drr_write_byref.drr_refoffset); + DO64(drr_write_byref.drr_blkcksum.zc_word[0]); + DO64(drr_write_byref.drr_blkcksum.zc_word[1]); + DO64(drr_write_byref.drr_blkcksum.zc_word[2]); + DO64(drr_write_byref.drr_blkcksum.zc_word[3]); break; case DRR_FREE: DO64(drr_free.drr_object); DO64(drr_free.drr_offset); DO64(drr_free.drr_length); + DO64(drr_free.drr_toguid); break; case DRR_END: DO64(drr_end.drr_checksum.zc_word[0]); DO64(drr_end.drr_checksum.zc_word[1]); DO64(drr_end.drr_checksum.zc_word[2]); DO64(drr_end.drr_checksum.zc_word[3]); + DO64(drr_end.drr_toguid); break; } #undef DO64 @@ -676,7 +898,7 @@ if (drro->drr_type == DMU_OT_NONE || drro->drr_type >= DMU_OT_NUMTYPES || drro->drr_bonustype >= DMU_OT_NUMTYPES || - drro->drr_checksum >= ZIO_CHECKSUM_FUNCTIONS || + drro->drr_checksumtype >= ZIO_CHECKSUM_FUNCTIONS || drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS || P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) || drro->drr_blksz < SPA_MINBLOCKSIZE || @@ -726,7 +948,8 @@ return (err); } - dmu_object_set_checksum(os, drro->drr_object, drro->drr_checksum, tx); + dmu_object_set_checksum(os, drro->drr_object, drro->drr_checksumtype, + tx); dmu_object_set_compress(os, drro->drr_object, drro->drr_compress, tx); if (data != NULL) { @@ -808,6 +1031,64 @@ return (0); } +/* + * Handle a DRR_WRITE_BYREF record. This record is used in dedup'ed + * streams to refer to a copy of the data that is already on the + * system because it came in earlier in the stream. This function + * finds the earlier copy of the data, and uses that copy instead of + * data from the stream to fulfill this write. + */ +static int +restore_write_byref(struct restorearg *ra, objset_t *os, + struct drr_write_byref *drrwbr) +{ + dmu_tx_t *tx; + int err; + guid_map_entry_t gmesrch; + guid_map_entry_t *gmep; + avl_index_t where; + objset_t *ref_os = NULL; + dmu_buf_t *dbp; + + if (drrwbr->drr_offset + drrwbr->drr_length < drrwbr->drr_offset) + return (EINVAL); + + /* + * If the GUID of the referenced dataset is different from the + * GUID of the target dataset, find the referenced dataset. + */ + if (drrwbr->drr_toguid != drrwbr->drr_refguid) { + gmesrch.guid = drrwbr->drr_refguid; + if ((gmep = avl_find(&ra->guid_to_ds_map, &gmesrch, + &where)) == NULL) { + return (EINVAL); + } + if (dmu_objset_from_ds(gmep->gme_ds, &ref_os)) + return (EINVAL); + } else { + ref_os = os; + } + + if (err = dmu_buf_hold(ref_os, drrwbr->drr_refobject, + drrwbr->drr_refoffset, FTAG, &dbp)) + return (err); + + tx = dmu_tx_create(os); + + dmu_tx_hold_write(tx, drrwbr->drr_object, + drrwbr->drr_offset, drrwbr->drr_length); + err = dmu_tx_assign(tx, TXG_WAIT); + if (err) { + dmu_tx_abort(tx); + return (err); + } + dmu_write(os, drrwbr->drr_object, + drrwbr->drr_offset, drrwbr->drr_length, dbp->db_data, tx); + dmu_buf_rele(dbp, FTAG); + dmu_tx_commit(tx); + return (0); +} + /* ARGSUSED */ static int restore_free(struct restorearg *ra, objset_t *os, @@ -837,6 +1118,8 @@ dmu_replay_record_t *drr; objset_t *os; zio_cksum_t pcksum; + guid_map_entry_t *gmep; + int featureflags; if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) ra.byteswap = TRUE; @@ -861,7 +1144,7 @@ if (ra.byteswap) { struct drr_begin *drrb = drc->drc_drrb; drrb->drr_magic = BSWAP_64(drrb->drr_magic); - drrb->drr_version = BSWAP_64(drrb->drr_version); + drrb->drr_versioninfo = BSWAP_64(drrb->drr_versioninfo); drrb->drr_creation_time = BSWAP_64(drrb->drr_creation_time); drrb->drr_type = BSWAP_32(drrb->drr_type); drrb->drr_toguid = BSWAP_64(drrb->drr_toguid); @@ -874,7 +1157,8 @@ ra.buf = kmem_alloc(ra.bufsize, KM_SLEEP); /* these were verified in dmu_recv_begin */ - ASSERT(drc->drc_drrb->drr_version == DMU_BACKUP_STREAM_VERSION); + ASSERT(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo) == + DMU_SUBSTREAM); ASSERT(drc->drc_drrb->drr_type < DMU_OST_NUMTYPES); /* @@ -884,6 +1168,18 @@ ASSERT(drc->drc_real_ds->ds_phys->ds_flags & DS_FLAG_INCONSISTENT); + featureflags = DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo); + + /* if this stream is dedup'ed, set up the avl tree for guid mapping */ + if (featureflags & DMU_BACKUP_FEATURE_DEDUP) { + avl_create(&ra.guid_to_ds_map, guid_compare, + sizeof (guid_map_entry_t), + offsetof(guid_map_entry_t, avlnode)); + (void) dmu_objset_find(drc->drc_top_ds, find_ds_by_guid, + (void *)&ra.guid_to_ds_map, + DS_FIND_CHILDREN); + } + /* * Read records and process them. */ @@ -923,6 +1219,13 @@ ra.err = restore_write(&ra, os, &drrw); break; } + case DRR_WRITE_BYREF: + { + struct drr_write_byref drrwbr = + drr->drr_u.drr_write_byref; + ra.err = restore_write_byref(&ra, os, &drrwbr); + break; + } case DRR_FREE: { struct drr_free drrf = drr->drr_u.drr_free; @@ -965,6 +1268,16 @@ } } + if (featureflags & DMU_BACKUP_FEATURE_DEDUP) { + void *cookie = NULL; + + while (gmep = avl_destroy_nodes(&ra.guid_to_ds_map, &cookie)) { + dsl_dataset_rele(gmep->gme_ds, &ra.guid_to_ds_map); + kmem_free(gmep, sizeof (guid_map_entry_t)); + } + avl_destroy(&ra.guid_to_ds_map); + } + kmem_free(ra.buf, ra.bufsize); *voffp = ra.voff; return (ra.err); diff -r 4fe66eb82610 -r 216d8396182e usr/src/uts/common/fs/zfs/sys/dmu.h --- a/usr/src/uts/common/fs/zfs/sys/dmu.h Mon Nov 09 11:54:00 2009 +0530 +++ b/usr/src/uts/common/fs/zfs/sys/dmu.h Mon Nov 09 11:04:55 2009 -0700 @@ -679,11 +679,12 @@ struct dsl_dataset *drc_real_ds; struct drr_begin *drc_drrb; char *drc_tosnap; + char *drc_top_ds; boolean_t drc_newfs; boolean_t drc_force; } dmu_recv_cookie_t; -int dmu_recv_begin(char *tofs, char *tosnap, struct drr_begin *, +int dmu_recv_begin(char *tofs, char *tosnap, char *topds, struct drr_begin *, boolean_t force, objset_t *origin, dmu_recv_cookie_t *); int dmu_recv_stream(dmu_recv_cookie_t *drc, struct vnode *vp, offset_t *voffp); int dmu_recv_end(dmu_recv_cookie_t *drc); diff -r 4fe66eb82610 -r 216d8396182e usr/src/uts/common/fs/zfs/sys/zfs_ioctl.h --- a/usr/src/uts/common/fs/zfs/sys/zfs_ioctl.h Mon Nov 09 11:54:00 2009 +0530 +++ b/usr/src/uts/common/fs/zfs/sys/zfs_ioctl.h Mon Nov 09 11:04:55 2009 -0700 @@ -30,6 +30,7 @@ #include #include #include +#include #ifdef _KERNEL #include @@ -45,8 +46,56 @@ #define ZFS_SNAPDIR_HIDDEN 0 #define ZFS_SNAPDIR_VISIBLE 1 -#define DMU_BACKUP_STREAM_VERSION (1ULL) -#define DMU_BACKUP_HEADER_VERSION (2ULL) +/* + * Field manipulation macros for the drr_versioninfo field of the + * send stream header. + */ + +/* + * Header types for zfs send streams. + */ +typedef enum drr_headertype { + DMU_SUBSTREAM = 0x1, + DMU_COMPOUNDSTREAM = 0x2 +} drr_headertype_t; + +#define DMU_GET_STREAM_HDRTYPE(vi) BF64_GET((vi), 0, 2) +#define DMU_SET_STREAM_HDRTYPE(vi, x) BF64_SET((vi), 0, 2, x) + +#define DMU_GET_FEATUREFLAGS(vi) BF64_GET((vi), 2, 30) +#define DMU_SET_FEATUREFLAGS(vi, x) BF64_SET((vi), 2, 30, x) + +/* + * Feature flags for zfs send streams (flags in drr_versioninfo) + */ + +#define DMU_BACKUP_FEATURE_DEDUP (0x1) + +/* + * Mask of all supported backup features + */ +#define DMU_BACKUP_FEATURE_MASK (DMU_BACKUP_FEATURE_DEDUP) + +/* Are all features in the given flag word currently supported? */ +#define DMU_STREAM_SUPPORTED(x) (!((x) & ~DMU_BACKUP_FEATURE_MASK)) + +/* + * The drr_versioninfo field of the dmu_replay_record has the + * following layout: + * + * 64 56 48 40 32 24 16 8 0 + * +-------+-------+-------+-------+-------+-------+-------+-------+ + * | reserved | feature-flags |C|S| + * +-------+-------+-------+-------+-------+-------+-------+-------+ + * + * The low order two bits indicate the header type: SUBSTREAM (0x1) + * or COMPOUNDSTREAM (0x2). Using two bits for this is historical: + * this field used to be a version number, where the two version types + * were 1 and 2. Using two bits for this allows earlier versions of + * the code to be able to recognize send streams that don't use any + * of the features indicated by feature flags. + */ + #define DMU_BACKUP_MAGIC 0x2F5bacbacULL #define DRR_FLAG_CLONE (1<<0) @@ -58,13 +107,14 @@ typedef struct dmu_replay_record { enum { DRR_BEGIN, DRR_OBJECT, DRR_FREEOBJECTS, - DRR_WRITE, DRR_FREE, DRR_END, DRR_NUMTYPES + DRR_WRITE, DRR_FREE, DRR_END, DRR_WRITE_BYREF, + DRR_NUMTYPES } drr_type; uint32_t drr_payloadlen; union { struct drr_begin { uint64_t drr_magic; - uint64_t drr_version; + uint64_t drr_versioninfo; /* was drr_version */ uint64_t drr_creation_time; dmu_objset_type_t drr_type; uint32_t drr_flags; @@ -74,6 +124,7 @@ } drr_begin; struct drr_end { zio_cksum_t drr_checksum; + uint64_t drr_toguid; } drr_end; struct drr_object { uint64_t drr_object; @@ -81,14 +132,16 @@ dmu_object_type_t drr_bonustype; uint32_t drr_blksz; uint32_t drr_bonuslen; - uint8_t drr_checksum; + uint8_t drr_checksumtype; uint8_t drr_compress; uint8_t drr_pad[6]; + uint64_t drr_toguid; /* bonus content follows */ } drr_object; struct drr_freeobjects { uint64_t drr_firstobj; uint64_t drr_numobjs; + uint64_t drr_toguid; } drr_freeobjects; struct drr_write { uint64_t drr_object; @@ -96,13 +149,32 @@ uint32_t drr_pad; uint64_t drr_offset; uint64_t drr_length; + uint64_t drr_toguid; + uint8_t drr_checksumtype; + uint8_t drr_pad2[7]; + zio_cksum_t drr_blkcksum; /* content follows */ } drr_write; struct drr_free { uint64_t drr_object; uint64_t drr_offset; uint64_t drr_length; + uint64_t drr_toguid; } drr_free; + struct drr_write_byref { + /* where to put the data */ + uint64_t drr_object; + uint64_t drr_offset; + uint64_t drr_length; + uint64_t drr_toguid; + /* where to find the prior copy of the data */ + uint64_t drr_refguid; + uint64_t drr_refobject; + uint64_t drr_refoffset; + uint8_t drr_checksumtype; + uint8_t drr_pad[7]; + zio_cksum_t drr_blkcksum; + } drr_write_byref; } drr_u; } dmu_replay_record_t; @@ -150,6 +222,7 @@ char zc_name[MAXPATHLEN]; char zc_value[MAXPATHLEN * 2]; char zc_string[MAXNAMELEN]; + char zc_top_ds[MAXPATHLEN]; uint64_t zc_guid; uint64_t zc_nvlist_conf; /* really (char *) */ uint64_t zc_nvlist_conf_size; diff -r 4fe66eb82610 -r 216d8396182e usr/src/uts/common/fs/zfs/zfs_ioctl.c --- a/usr/src/uts/common/fs/zfs/zfs_ioctl.c Mon Nov 09 11:54:00 2009 +0530 +++ b/usr/src/uts/common/fs/zfs/zfs_ioctl.c Mon Nov 09 11:04:55 2009 -0700 @@ -2922,8 +2922,8 @@ goto out; } - error = dmu_recv_begin(tofs, tosnap, &zc->zc_begin_record, - force, origin, &drc); + error = dmu_recv_begin(tofs, tosnap, zc->zc_top_ds, + &zc->zc_begin_record, force, origin, &drc); if (origin) dmu_objset_rele(origin, FTAG); if (error)