comparison usr/src/lib/libzfs/common/libzfs_sendrecv.c @ 11007:216d8396182e

PSARC/2009/557 ZFS send dedup 6812638 zfs send intra-stream dedup 6887817 want snapshot filtering for zfs send 6812603 zfs send can aggregate free records
author Lori Alt <Lori.Alt@Sun.COM>
date Mon, 09 Nov 2009 11:04:55 -0700
parents dcc7d6f9faa8
children 63ab26072e41
comparison
equal deleted inserted replaced
11006:4fe66eb82610 11007:216d8396182e
33 #include <strings.h> 33 #include <strings.h>
34 #include <unistd.h> 34 #include <unistd.h>
35 #include <stddef.h> 35 #include <stddef.h>
36 #include <fcntl.h> 36 #include <fcntl.h>
37 #include <sys/mount.h> 37 #include <sys/mount.h>
38 #include <pthread.h>
39 #include <umem.h>
38 40
39 #include <libzfs.h> 41 #include <libzfs.h>
40 42
41 #include "zfs_namecheck.h" 43 #include "zfs_namecheck.h"
42 #include "zfs_prop.h" 44 #include "zfs_prop.h"
43 #include "zfs_fletcher.h" 45 #include "zfs_fletcher.h"
44 #include "libzfs_impl.h" 46 #include "libzfs_impl.h"
47 #include <sha2.h>
45 48
46 static int zfs_receive_impl(libzfs_handle_t *, const char *, recvflags_t, 49 static int zfs_receive_impl(libzfs_handle_t *, const char *, recvflags_t,
47 int, avl_tree_t *, char **); 50 int, avl_tree_t *, char **);
51
52 static const zio_cksum_t zero_cksum = { 0 };
53
54 typedef struct dedup_arg {
55 int inputfd;
56 int outputfd;
57 libzfs_handle_t *dedup_hdl;
58 } dedup_arg_t;
59
60 typedef struct dataref {
61 uint64_t ref_guid;
62 uint64_t ref_object;
63 uint64_t ref_offset;
64 } dataref_t;
65
66 typedef struct dedup_entry {
67 struct dedup_entry *dde_next;
68 zio_cksum_t dde_chksum;
69 dataref_t dde_ref;
70 } dedup_entry_t;
71
72 #define MAX_DDT_PHYSMEM_PERCENT 20
73 #define SMALLEST_POSSIBLE_MAX_DDT_MB 128
74
75 typedef struct dedup_table {
76 dedup_entry_t **dedup_hash_array;
77 umem_cache_t *ddecache;
78 uint64_t max_ddt_size; /* max dedup table size in bytes */
79 uint64_t cur_ddt_size; /* current dedup table size in bytes */
80 uint64_t ddt_count;
81 int numhashbits;
82 boolean_t ddt_full;
83 } dedup_table_t;
84
85 static int
86 high_order_bit(uint64_t n)
87 {
88 int count;
89
90 for (count = 0; n != 0; count++)
91 n >>= 1;
92 return (count);
93 }
94
95 static size_t
96 ssread(void *buf, size_t len, FILE *stream)
97 {
98 size_t outlen;
99
100 if ((outlen = fread(buf, len, 1, stream)) == 0)
101 return (0);
102
103 return (outlen);
104 }
105
106 static void
107 ddt_hash_append(libzfs_handle_t *hdl, dedup_table_t *ddt, dedup_entry_t **ddepp,
108 zio_cksum_t *cs, dataref_t *dr)
109 {
110 dedup_entry_t *dde;
111
112 if (ddt->cur_ddt_size >= ddt->max_ddt_size) {
113 if (ddt->ddt_full == B_FALSE) {
114 zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
115 "Dedup table full. Deduplication will continue "
116 "with existing table entries"));
117 ddt->ddt_full = B_TRUE;
118 }
119 return;
120 }
121
122 if ((dde = umem_cache_alloc(ddt->ddecache, UMEM_DEFAULT))
123 != NULL) {
124 assert(*ddepp == NULL);
125 dde->dde_next = NULL;
126 dde->dde_chksum = *cs;
127 dde->dde_ref = *dr;
128 *ddepp = dde;
129 ddt->cur_ddt_size += sizeof (dedup_entry_t);
130 ddt->ddt_count++;
131 }
132 }
133
134 /*
135 * Using the specified dedup table, do a lookup for an entry with
136 * the checksum cs. If found, return the block's reference info
137 * in *dr. Otherwise, insert a new entry in the dedup table, using
138 * the reference information specified by *dr.
139 *
140 * return value: true - entry was found
141 * false - entry was not found
142 */
143 static boolean_t
144 ddt_update(libzfs_handle_t *hdl, dedup_table_t *ddt, zio_cksum_t *cs,
145 dataref_t *dr)
146 {
147 uint32_t hashcode;
148 dedup_entry_t **ddepp;
149
150 hashcode = BF64_GET(cs->zc_word[0], 0, ddt->numhashbits);
151
152 for (ddepp = &(ddt->dedup_hash_array[hashcode]); *ddepp != NULL;
153 ddepp = &((*ddepp)->dde_next)) {
154 if (ZIO_CHECKSUM_EQUAL(((*ddepp)->dde_chksum), *cs)) {
155 *dr = (*ddepp)->dde_ref;
156 return (B_TRUE);
157 }
158 }
159 ddt_hash_append(hdl, ddt, ddepp, cs, dr);
160 return (B_FALSE);
161 }
162
163 static int
164 cksum_and_write(const void *buf, uint64_t len, zio_cksum_t *zc, int outfd)
165 {
166 fletcher_4_incremental_native(buf, len, zc);
167 return (write(outfd, buf, len));
168 }
169
170 /*
171 * This function is started in a separate thread when the dedup option
172 * has been requested. The main send thread determines the list of
173 * snapshots to be included in the send stream and makes the ioctl calls
174 * for each one. But instead of having the ioctl send the output to the
175 * the output fd specified by the caller of zfs_send()), the
176 * ioctl is told to direct the output to a pipe, which is read by the
177 * alternate thread running THIS function. This function does the
178 * dedup'ing by:
179 * 1. building a dedup table (the DDT)
180 * 2. doing checksums on each data block and inserting a record in the DDT
181 * 3. looking for matching checksums, and
182 * 4. sending a DRR_WRITE_BYREF record instead of a write record whenever
183 * a duplicate block is found.
184 * The output of this function then goes to the output fd requested
185 * by the caller of zfs_send().
186 */
187 static void *
188 cksummer(void *arg)
189 {
190 dedup_arg_t *dda = arg;
191 char *buf = malloc(1<<20);
192 dmu_replay_record_t thedrr;
193 dmu_replay_record_t *drr = &thedrr;
194 struct drr_begin *drrb = &thedrr.drr_u.drr_begin;
195 struct drr_end *drre = &thedrr.drr_u.drr_end;
196 struct drr_object *drro = &thedrr.drr_u.drr_object;
197 struct drr_write *drrw = &thedrr.drr_u.drr_write;
198 FILE *ofp;
199 int outfd;
200 dmu_replay_record_t wbr_drr;
201 struct drr_write_byref *wbr_drrr = &wbr_drr.drr_u.drr_write_byref;
202 dedup_table_t ddt;
203 zio_cksum_t stream_cksum;
204 uint64_t physmem = sysconf(_SC_PHYS_PAGES) * sysconf(_SC_PAGESIZE);
205 uint64_t numbuckets;
206
207 ddt.max_ddt_size =
208 MAX((physmem * MAX_DDT_PHYSMEM_PERCENT)/100,
209 SMALLEST_POSSIBLE_MAX_DDT_MB<<20);
210
211 numbuckets = ddt.max_ddt_size/(sizeof (dedup_entry_t));
212
213 /*
214 * numbuckets must be a power of 2. Increase number to
215 * a power of 2 if necessary.
216 */
217 if (!ISP2(numbuckets))
218 numbuckets = 1 << high_order_bit(numbuckets);
219
220 ddt.dedup_hash_array = calloc(numbuckets, sizeof (dedup_entry_t *));
221 ddt.ddecache = umem_cache_create("dde", sizeof (dedup_entry_t), 0,
222 NULL, NULL, NULL, NULL, NULL, 0);
223 ddt.cur_ddt_size = numbuckets * sizeof (dedup_entry_t *);
224 ddt.numhashbits = high_order_bit(numbuckets) - 1;
225 ddt.ddt_full = B_FALSE;
226
227 /* Initialize the write-by-reference block. */
228 wbr_drr.drr_type = DRR_WRITE_BYREF;
229 wbr_drr.drr_payloadlen = 0;
230
231 outfd = dda->outputfd;
232 ofp = fdopen(dda->inputfd, "r");
233 while (ssread(drr, sizeof (dmu_replay_record_t), ofp) != 0) {
234
235 switch (drr->drr_type) {
236 case DRR_BEGIN:
237 {
238 int fflags;
239 ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);
240
241 /* set the DEDUP feature flag for this stream */
242 fflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
243 fflags |= DMU_BACKUP_FEATURE_DEDUP;
244 DMU_SET_FEATUREFLAGS(drrb->drr_versioninfo, fflags);
245
246 if (cksum_and_write(drr, sizeof (dmu_replay_record_t),
247 &stream_cksum, outfd) == -1)
248 goto out;
249 if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
250 DMU_COMPOUNDSTREAM && drr->drr_payloadlen != 0) {
251 int sz = drr->drr_payloadlen;
252
253 if (sz > 1<<20) {
254 free(buf);
255 buf = malloc(sz);
256 }
257 (void) ssread(buf, sz, ofp);
258 if (ferror(stdin))
259 perror("fread");
260 if (cksum_and_write(buf, sz, &stream_cksum,
261 outfd) == -1)
262 goto out;
263 }
264 break;
265 }
266
267 case DRR_END:
268 {
269 /* use the recalculated checksum */
270 ZIO_SET_CHECKSUM(&drre->drr_checksum,
271 stream_cksum.zc_word[0], stream_cksum.zc_word[1],
272 stream_cksum.zc_word[2], stream_cksum.zc_word[3]);
273 if ((write(outfd, drr,
274 sizeof (dmu_replay_record_t))) == -1)
275 goto out;
276 break;
277 }
278
279 case DRR_OBJECT:
280 {
281 if (cksum_and_write(drr, sizeof (dmu_replay_record_t),
282 &stream_cksum, outfd) == -1)
283 goto out;
284 if (drro->drr_bonuslen > 0) {
285 (void) ssread(buf,
286 P2ROUNDUP((uint64_t)drro->drr_bonuslen, 8),
287 ofp);
288 if (cksum_and_write(buf,
289 P2ROUNDUP((uint64_t)drro->drr_bonuslen, 8),
290 &stream_cksum, outfd) == -1)
291 goto out;
292 }
293 break;
294 }
295
296 case DRR_FREEOBJECTS:
297 {
298 if (cksum_and_write(drr, sizeof (dmu_replay_record_t),
299 &stream_cksum, outfd) == -1)
300 goto out;
301 break;
302 }
303
304 case DRR_WRITE:
305 {
306 dataref_t dataref;
307
308 (void) ssread(buf, drrw->drr_length, ofp);
309 /*
310 * If the block doesn't already have a dedup
311 * checksum, calculate one.
312 */
313 if (ZIO_CHECKSUM_EQUAL(drrw->drr_blkcksum,
314 zero_cksum)) {
315 SHA256_CTX ctx;
316 zio_cksum_t tmpsha256;
317
318 SHA256Init(&ctx);
319 SHA256Update(&ctx, buf, drrw->drr_length);
320 SHA256Final(&tmpsha256, &ctx);
321 drrw->drr_blkcksum.zc_word[0] =
322 BE_64(tmpsha256.zc_word[0]);
323 drrw->drr_blkcksum.zc_word[1] =
324 BE_64(tmpsha256.zc_word[1]);
325 drrw->drr_blkcksum.zc_word[2] =
326 BE_64(tmpsha256.zc_word[2]);
327 drrw->drr_blkcksum.zc_word[3] =
328 BE_64(tmpsha256.zc_word[3]);
329 }
330
331 dataref.ref_guid = drrw->drr_toguid;
332 dataref.ref_object = drrw->drr_object;
333 dataref.ref_offset = drrw->drr_offset;
334
335 if (ddt_update(dda->dedup_hdl, &ddt,
336 &drrw->drr_blkcksum, &dataref)) {
337 /* block already present in stream */
338 wbr_drrr->drr_object = drrw->drr_object;
339 wbr_drrr->drr_offset = drrw->drr_offset;
340 wbr_drrr->drr_length = drrw->drr_length;
341 wbr_drrr->drr_toguid = drrw->drr_toguid;
342 wbr_drrr->drr_refguid = dataref.ref_guid;
343 wbr_drrr->drr_refobject =
344 dataref.ref_object;
345 wbr_drrr->drr_refoffset =
346 dataref.ref_offset;
347
348 wbr_drrr->drr_blkcksum = drrw->drr_blkcksum;
349
350 if (cksum_and_write(&wbr_drr,
351 sizeof (dmu_replay_record_t), &stream_cksum,
352 outfd) == -1)
353 goto out;
354 } else {
355 /* block not previously seen */
356 if (cksum_and_write(drr,
357 sizeof (dmu_replay_record_t), &stream_cksum,
358 outfd) == -1)
359 goto out;
360 if (cksum_and_write(buf,
361 drrw->drr_length,
362 &stream_cksum, outfd) == -1)
363 goto out;
364 }
365 break;
366 }
367
368 case DRR_FREE:
369 {
370 if (cksum_and_write(drr, sizeof (dmu_replay_record_t),
371 &stream_cksum, outfd) == -1)
372 goto out;
373 break;
374 }
375
376 default:
377 (void) printf("INVALID record type 0x%x\n",
378 drr->drr_type);
379 /* should never happen, so assert */
380 assert(B_FALSE);
381 }
382 }
383 out:
384 umem_cache_destroy(ddt.ddecache);
385 free(ddt.dedup_hash_array);
386 free(buf);
387 (void) fclose(ofp);
388
389 return (NULL);
390 }
48 391
49 /* 392 /*
50 * Routines for dealing with the AVL tree of fs-nvlists 393 * Routines for dealing with the AVL tree of fs-nvlists
51 */ 394 */
52 typedef struct fsavl_node { 395 typedef struct fsavl_node {
449 */ 792 */
450 typedef struct send_dump_data { 793 typedef struct send_dump_data {
451 /* these are all just the short snapname (the part after the @) */ 794 /* these are all just the short snapname (the part after the @) */
452 const char *fromsnap; 795 const char *fromsnap;
453 const char *tosnap; 796 const char *tosnap;
454 char lastsnap[ZFS_MAXNAMELEN]; 797 char prevsnap[ZFS_MAXNAMELEN];
455 boolean_t seenfrom, seento, replicate, doall, fromorigin; 798 boolean_t seenfrom, seento, replicate, doall, fromorigin;
456 boolean_t verbose; 799 boolean_t verbose;
457 int outfd; 800 int outfd;
458 boolean_t err; 801 boolean_t err;
459 nvlist_t *fss; 802 nvlist_t *fss;
460 avl_tree_t *fsavl; 803 avl_tree_t *fsavl;
804 snapfilter_cb_t *filter_cb;
805 void *filter_cb_arg;
461 } send_dump_data_t; 806 } send_dump_data_t;
462 807
463 /* 808 /*
464 * Dumps a backup of the given snapshot (incremental from fromsnap if it's not 809 * Dumps a backup of the given snapshot (incremental from fromsnap if it's not
465 * NULL) to the file descriptor specified by outfd. 810 * NULL) to the file descriptor specified by outfd.
533 thissnap = strchr(zhp->zfs_name, '@') + 1; 878 thissnap = strchr(zhp->zfs_name, '@') + 1;
534 879
535 if (sdd->fromsnap && !sdd->seenfrom && 880 if (sdd->fromsnap && !sdd->seenfrom &&
536 strcmp(sdd->fromsnap, thissnap) == 0) { 881 strcmp(sdd->fromsnap, thissnap) == 0) {
537 sdd->seenfrom = B_TRUE; 882 sdd->seenfrom = B_TRUE;
538 (void) strcpy(sdd->lastsnap, thissnap); 883 (void) strcpy(sdd->prevsnap, thissnap);
539 zfs_close(zhp); 884 zfs_close(zhp);
540 return (0); 885 return (0);
541 } 886 }
542 887
543 if (sdd->seento || !sdd->seenfrom) { 888 if (sdd->seento || !sdd->seenfrom) {
544 zfs_close(zhp); 889 zfs_close(zhp);
545 return (0); 890 return (0);
546 } 891 }
547 892
893 if (strcmp(sdd->tosnap, thissnap) == 0)
894 sdd->seento = B_TRUE;
895
896 /*
897 * If a filter function exists, call it to determine whether
898 * this snapshot will be sent.
899 */
900 if (sdd->filter_cb != NULL &&
901 sdd->filter_cb(zhp, sdd->filter_cb_arg) == B_FALSE) {
902 /*
903 * This snapshot is filtered out. Don't send it, and don't
904 * set prevsnap, so it will be as if this snapshot didn't
905 * exist, and the next accepted snapshot will be sent as
906 * an incremental from the last accepted one, or as the
907 * first (and full) snapshot in the case of a replication,
908 * non-incremental send.
909 */
910 zfs_close(zhp);
911 return (0);
912 }
913
548 /* send it */ 914 /* send it */
549 if (sdd->verbose) { 915 if (sdd->verbose) {
550 (void) fprintf(stderr, "sending from @%s to %s\n", 916 (void) fprintf(stderr, "sending from @%s to %s\n",
551 sdd->lastsnap, zhp->zfs_name); 917 sdd->prevsnap, zhp->zfs_name);
552 } 918 }
553 919
554 err = dump_ioctl(zhp, sdd->lastsnap, 920 err = dump_ioctl(zhp, sdd->prevsnap,
555 sdd->lastsnap[0] == '\0' && (sdd->fromorigin || sdd->replicate), 921 sdd->prevsnap[0] == '\0' && (sdd->fromorigin || sdd->replicate),
556 sdd->outfd); 922 sdd->outfd);
557 923
558 if (!sdd->seento && strcmp(sdd->tosnap, thissnap) == 0) 924 (void) strcpy(sdd->prevsnap, thissnap);
559 sdd->seento = B_TRUE;
560
561 (void) strcpy(sdd->lastsnap, thissnap);
562 zfs_close(zhp); 925 zfs_close(zhp);
563 return (err); 926 return (err);
564 } 927 }
565 928
566 static int 929 static int
596 missingfrom = B_TRUE; 959 missingfrom = B_TRUE;
597 } 960 }
598 } 961 }
599 962
600 if (sdd->doall) { 963 if (sdd->doall) {
601 sdd->seenfrom = sdd->seento = sdd->lastsnap[0] = 0; 964 sdd->seenfrom = sdd->seento = sdd->prevsnap[0] = 0;
602 if (sdd->fromsnap == NULL || missingfrom) 965 if (sdd->fromsnap == NULL || missingfrom)
603 sdd->seenfrom = B_TRUE; 966 sdd->seenfrom = B_TRUE;
604 967
605 rv = zfs_iter_snapshots_sorted(zhp, dump_snapshot, arg); 968 rv = zfs_iter_snapshots_sorted(zhp, dump_snapshot, arg);
606 if (!sdd->seenfrom) { 969 if (!sdd->seenfrom) {
633 zfs_get_name(zhp), sdd->tosnap); 996 zfs_get_name(zhp), sdd->tosnap);
634 snapzhp = zfs_open(zhp->zfs_hdl, snapname, ZFS_TYPE_SNAPSHOT); 997 snapzhp = zfs_open(zhp->zfs_hdl, snapname, ZFS_TYPE_SNAPSHOT);
635 if (snapzhp == NULL) { 998 if (snapzhp == NULL) {
636 rv = -1; 999 rv = -1;
637 } else { 1000 } else {
638 rv = dump_ioctl(snapzhp, 1001 if (sdd->filter_cb == NULL ||
639 missingfrom ? NULL : sdd->fromsnap, 1002 sdd->filter_cb(snapzhp, sdd->filter_cb_arg) ==
640 sdd->fromorigin || missingfrom, 1003 B_TRUE) {
641 sdd->outfd); 1004 rv = dump_ioctl(snapzhp,
1005 missingfrom ? NULL : sdd->fromsnap,
1006 sdd->fromorigin || missingfrom,
1007 sdd->outfd);
1008 }
642 sdd->seento = B_TRUE; 1009 sdd->seento = B_TRUE;
643 zfs_close(snapzhp); 1010 zfs_close(snapzhp);
644 } 1011 }
645 } 1012 }
646 1013
676 (void) nvlist_lookup_uint64(fslist, "origin", &origin_guid); 1043 (void) nvlist_lookup_uint64(fslist, "origin", &origin_guid);
677 1044
678 origin_nv = fsavl_find(sdd->fsavl, origin_guid, NULL); 1045 origin_nv = fsavl_find(sdd->fsavl, origin_guid, NULL);
679 if (origin_nv && 1046 if (origin_nv &&
680 nvlist_lookup_boolean(origin_nv, "sent") == ENOENT) { 1047 nvlist_lookup_boolean(origin_nv, "sent") == ENOENT) {
681 /* 1048 /*
682 * origin has not been sent yet; 1049 * origin has not been sent yet;
683 * skip this clone. 1050 * skip this clone.
684 */ 1051 */
685 needagain = B_TRUE; 1052 needagain = B_TRUE;
686 continue; 1053 continue;
687 } 1054 }
688 1055
689 zhp = zfs_open(rzhp->zfs_hdl, fsname, ZFS_TYPE_DATASET); 1056 zhp = zfs_open(rzhp->zfs_hdl, fsname, ZFS_TYPE_DATASET);
690 if (zhp == NULL) 1057 if (zhp == NULL)
691 return (-1); 1058 return (-1);
712 * - from the origin of the dataset identified by zhp, which must 1079 * - from the origin of the dataset identified by zhp, which must
713 * be a clone. In this case, "fromsnap" is null and "fromorigin" 1080 * be a clone. In this case, "fromsnap" is null and "fromorigin"
714 * is TRUE. 1081 * is TRUE.
715 * 1082 *
716 * The send stream is recursive (i.e. dumps a hierarchy of snapshots) and 1083 * The send stream is recursive (i.e. dumps a hierarchy of snapshots) and
717 * uses a special header (with a version field of DMU_BACKUP_HEADER_VERSION) 1084 * uses a special header (with a hdrtype field of DMU_COMPOUNDSTREAM)
718 * if "replicate" is set. If "doall" is set, dump all the intermediate 1085 * if "replicate" is set. If "doall" is set, dump all the intermediate
719 * snapshots. The DMU_BACKUP_HEADER_VERSION header is used in the "doall" 1086 * snapshots. The DMU_COMPOUNDSTREAM header is used in the "doall"
720 * case too. 1087 * case too.
721 */ 1088 */
722 int 1089 int
723 zfs_send(zfs_handle_t *zhp, const char *fromsnap, const char *tosnap, 1090 zfs_send(zfs_handle_t *zhp, const char *fromsnap, const char *tosnap,
724 boolean_t replicate, boolean_t doall, boolean_t fromorigin, 1091 sendflags_t flags, int outfd, snapfilter_cb_t filter_func,
725 boolean_t verbose, int outfd) 1092 void *cb_arg)
726 { 1093 {
727 char errbuf[1024]; 1094 char errbuf[1024];
728 send_dump_data_t sdd = { 0 }; 1095 send_dump_data_t sdd = { 0 };
729 int err; 1096 int err;
730 nvlist_t *fss = NULL; 1097 nvlist_t *fss = NULL;
731 avl_tree_t *fsavl = NULL; 1098 avl_tree_t *fsavl = NULL;
732 char holdtag[128]; 1099 char holdtag[128];
733 static uint64_t holdseq; 1100 static uint64_t holdseq;
734 int spa_version; 1101 int spa_version;
735 boolean_t holdsnaps = B_FALSE; 1102 boolean_t holdsnaps = B_FALSE;
1103 pthread_t tid;
1104 int pipefd[2];
1105 dedup_arg_t dda = { 0 };
1106 int featureflags = 0;
736 1107
737 (void) snprintf(errbuf, sizeof (errbuf), dgettext(TEXT_DOMAIN, 1108 (void) snprintf(errbuf, sizeof (errbuf), dgettext(TEXT_DOMAIN,
738 "cannot send '%s'"), zhp->zfs_name); 1109 "cannot send '%s'"), zhp->zfs_name);
739 1110
740 if (fromsnap && fromsnap[0] == '\0') { 1111 if (fromsnap && fromsnap[0] == '\0') {
745 1116
746 if (zfs_spa_version(zhp, &spa_version) == 0 && 1117 if (zfs_spa_version(zhp, &spa_version) == 0 &&
747 spa_version >= SPA_VERSION_USERREFS) 1118 spa_version >= SPA_VERSION_USERREFS)
748 holdsnaps = B_TRUE; 1119 holdsnaps = B_TRUE;
749 1120
750 if (replicate || doall) { 1121 if (flags.dedup) {
1122 featureflags |= DMU_BACKUP_FEATURE_DEDUP;
1123 if (err = pipe(pipefd)) {
1124 zfs_error_aux(zhp->zfs_hdl, strerror(errno));
1125 return (zfs_error(zhp->zfs_hdl, EZFS_PIPEFAILED,
1126 errbuf));
1127 }
1128 dda.outputfd = outfd;
1129 dda.inputfd = pipefd[1];
1130 dda.dedup_hdl = zhp->zfs_hdl;
1131 if (err = pthread_create(&tid, NULL, cksummer, &dda)) {
1132 (void) close(pipefd[0]);
1133 (void) close(pipefd[1]);
1134 zfs_error_aux(zhp->zfs_hdl, strerror(errno));
1135 return (zfs_error(zhp->zfs_hdl,
1136 EZFS_THREADCREATEFAILED, errbuf));
1137 }
1138 }
1139
1140 if (flags.replicate || flags.doall) {
751 dmu_replay_record_t drr = { 0 }; 1141 dmu_replay_record_t drr = { 0 };
752 char *packbuf = NULL; 1142 char *packbuf = NULL;
753 size_t buflen = 0; 1143 size_t buflen = 0;
754 zio_cksum_t zc = { 0 }; 1144 zio_cksum_t zc = { 0 };
755 1145
756 assert(fromsnap || doall); 1146 assert(fromsnap || flags.doall);
757 1147
758 if (holdsnaps) { 1148 if (holdsnaps) {
759 (void) snprintf(holdtag, sizeof (holdtag), 1149 (void) snprintf(holdtag, sizeof (holdtag),
760 ".send-%d-%llu", getpid(), (u_longlong_t)holdseq); 1150 ".send-%d-%llu", getpid(), (u_longlong_t)holdseq);
761 ++holdseq; 1151 ++holdseq;
762 err = zfs_hold_range(zhp, fromsnap, tosnap, 1152 err = zfs_hold_range(zhp, fromsnap, tosnap,
763 holdtag, B_TRUE); 1153 holdtag, B_TRUE);
764 if (err) 1154 if (err)
765 return (err); 1155 goto err_out;
766 } 1156 }
767 if (replicate) { 1157
1158
1159 if (flags.replicate) {
768 nvlist_t *hdrnv; 1160 nvlist_t *hdrnv;
769 1161
770 VERIFY(0 == nvlist_alloc(&hdrnv, NV_UNIQUE_NAME, 0)); 1162 VERIFY(0 == nvlist_alloc(&hdrnv, NV_UNIQUE_NAME, 0));
771 if (fromsnap) { 1163 if (fromsnap) {
772 VERIFY(0 == nvlist_add_string(hdrnv, 1164 VERIFY(0 == nvlist_add_string(hdrnv,
779 if (err) { 1171 if (err) {
780 if (holdsnaps) { 1172 if (holdsnaps) {
781 (void) zfs_release_range(zhp, fromsnap, 1173 (void) zfs_release_range(zhp, fromsnap,
782 tosnap, holdtag); 1174 tosnap, holdtag);
783 } 1175 }
784 return (err); 1176 goto err_out;
785 } 1177 }
786 VERIFY(0 == nvlist_add_nvlist(hdrnv, "fss", fss)); 1178 VERIFY(0 == nvlist_add_nvlist(hdrnv, "fss", fss));
787 err = nvlist_pack(hdrnv, &packbuf, &buflen, 1179 err = nvlist_pack(hdrnv, &packbuf, &buflen,
788 NV_ENCODE_XDR, 0); 1180 NV_ENCODE_XDR, 0);
789 nvlist_free(hdrnv); 1181 nvlist_free(hdrnv);
792 nvlist_free(fss); 1184 nvlist_free(fss);
793 if (holdsnaps) { 1185 if (holdsnaps) {
794 (void) zfs_release_range(zhp, fromsnap, 1186 (void) zfs_release_range(zhp, fromsnap,
795 tosnap, holdtag); 1187 tosnap, holdtag);
796 } 1188 }
797 return (zfs_standard_error(zhp->zfs_hdl, 1189 goto stderr_out;
798 err, errbuf));
799 } 1190 }
800 } 1191 }
801 1192
802 /* write first begin record */ 1193 /* write first begin record */
803 drr.drr_type = DRR_BEGIN; 1194 drr.drr_type = DRR_BEGIN;
804 drr.drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC; 1195 drr.drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
805 drr.drr_u.drr_begin.drr_version = DMU_BACKUP_HEADER_VERSION; 1196 DMU_SET_STREAM_HDRTYPE(drr.drr_u.drr_begin.drr_versioninfo,
1197 DMU_COMPOUNDSTREAM);
1198 DMU_SET_FEATUREFLAGS(drr.drr_u.drr_begin.drr_versioninfo,
1199 featureflags);
806 (void) snprintf(drr.drr_u.drr_begin.drr_toname, 1200 (void) snprintf(drr.drr_u.drr_begin.drr_toname,
807 sizeof (drr.drr_u.drr_begin.drr_toname), 1201 sizeof (drr.drr_u.drr_begin.drr_toname),
808 "%s@%s", zhp->zfs_name, tosnap); 1202 "%s@%s", zhp->zfs_name, tosnap);
809 drr.drr_payloadlen = buflen; 1203 drr.drr_payloadlen = buflen;
810 fletcher_4_incremental_native(&drr, sizeof (drr), &zc); 1204 err = cksum_and_write(&drr, sizeof (drr), &zc, outfd);
811 err = write(outfd, &drr, sizeof (drr));
812 1205
813 /* write header nvlist */ 1206 /* write header nvlist */
814 if (err != -1) { 1207 if (err != -1 && flags.replicate) {
815 fletcher_4_incremental_native(packbuf, buflen, &zc); 1208 err = cksum_and_write(packbuf, buflen, &zc, outfd);
816 err = write(outfd, packbuf, buflen);
817 } 1209 }
818 free(packbuf); 1210 free(packbuf);
819 if (err == -1) { 1211 if (err == -1) {
820 fsavl_destroy(fsavl); 1212 fsavl_destroy(fsavl);
821 nvlist_free(fss); 1213 nvlist_free(fss);
822 if (holdsnaps) { 1214 if (holdsnaps) {
823 (void) zfs_release_range(zhp, fromsnap, tosnap, 1215 (void) zfs_release_range(zhp, fromsnap, tosnap,
824 holdtag); 1216 holdtag);
825 } 1217 }
826 return (zfs_standard_error(zhp->zfs_hdl, 1218 err = errno;
827 errno, errbuf)); 1219 goto stderr_out;
828 } 1220 }
829 1221
830 /* write end record */ 1222 /* write end record */
831 if (err != -1) { 1223 if (err != -1) {
832 bzero(&drr, sizeof (drr)); 1224 bzero(&drr, sizeof (drr));
838 nvlist_free(fss); 1230 nvlist_free(fss);
839 if (holdsnaps) { 1231 if (holdsnaps) {
840 (void) zfs_release_range(zhp, fromsnap, 1232 (void) zfs_release_range(zhp, fromsnap,
841 tosnap, holdtag); 1233 tosnap, holdtag);
842 } 1234 }
843 return (zfs_standard_error(zhp->zfs_hdl, 1235 err = errno;
844 errno, errbuf)); 1236 goto stderr_out;
845 } 1237 }
846 } 1238 }
847 } 1239 }
848 1240
849 /* dump each stream */ 1241 /* dump each stream */
850 sdd.fromsnap = fromsnap; 1242 sdd.fromsnap = fromsnap;
851 sdd.tosnap = tosnap; 1243 sdd.tosnap = tosnap;
852 sdd.outfd = outfd; 1244 if (flags.dedup)
853 sdd.replicate = replicate; 1245 sdd.outfd = pipefd[0];
854 sdd.doall = doall; 1246 else
855 sdd.fromorigin = fromorigin; 1247 sdd.outfd = outfd;
1248 sdd.replicate = flags.replicate;
1249 sdd.doall = flags.doall;
1250 sdd.fromorigin = flags.fromorigin;
856 sdd.fss = fss; 1251 sdd.fss = fss;
857 sdd.fsavl = fsavl; 1252 sdd.fsavl = fsavl;
858 sdd.verbose = verbose; 1253 sdd.verbose = flags.verbose;
1254 sdd.filter_cb = filter_func;
1255 sdd.filter_cb_arg = cb_arg;
859 err = dump_filesystems(zhp, &sdd); 1256 err = dump_filesystems(zhp, &sdd);
860 fsavl_destroy(fsavl); 1257 fsavl_destroy(fsavl);
861 nvlist_free(fss); 1258 nvlist_free(fss);
862 1259
863 if (replicate || doall) { 1260 if (flags.dedup) {
1261 (void) close(pipefd[0]);
1262 (void) pthread_join(tid, NULL);
1263 }
1264 if (flags.replicate || flags.doall) {
864 /* 1265 /*
865 * write final end record. NB: want to do this even if 1266 * write final end record. NB: want to do this even if
866 * there was some error, because it might not be totally 1267 * there was some error, because it might not be totally
867 * failed. 1268 * failed.
868 */ 1269 */
877 errno, errbuf)); 1278 errno, errbuf));
878 } 1279 }
879 } 1280 }
880 1281
881 return (err || sdd.err); 1282 return (err || sdd.err);
1283
1284 stderr_out:
1285 err = zfs_standard_error(zhp->zfs_hdl, err, errbuf);
1286 err_out:
1287 if (flags.dedup) {
1288 (void) pthread_cancel(tid);
1289 (void) pthread_join(tid, NULL);
1290 (void) close(pipefd[0]);
1291 }
1292 return (err);
882 } 1293 }
883 1294
884 /* 1295 /*
885 * Routines specific to "zfs recv" 1296 * Routines specific to "zfs recv"
886 */ 1297 */
1457 return (zfs_error(hdl, EZFS_BADSTREAM, errbuf)); 1868 return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
1458 } 1869 }
1459 1870
1460 assert(drr->drr_type == DRR_BEGIN); 1871 assert(drr->drr_type == DRR_BEGIN);
1461 assert(drr->drr_u.drr_begin.drr_magic == DMU_BACKUP_MAGIC); 1872 assert(drr->drr_u.drr_begin.drr_magic == DMU_BACKUP_MAGIC);
1462 assert(drr->drr_u.drr_begin.drr_version == DMU_BACKUP_HEADER_VERSION); 1873 assert(DMU_GET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo) ==
1874 DMU_COMPOUNDSTREAM);
1463 1875
1464 /* 1876 /*
1465 * Read in the nvlist from the stream. 1877 * Read in the nvlist from the stream.
1466 */ 1878 */
1467 if (drr->drr_payloadlen != 0) { 1879 if (drr->drr_payloadlen != 0) {
1580 static int 1992 static int
1581 recv_skip(libzfs_handle_t *hdl, int fd, boolean_t byteswap) 1993 recv_skip(libzfs_handle_t *hdl, int fd, boolean_t byteswap)
1582 { 1994 {
1583 dmu_replay_record_t *drr; 1995 dmu_replay_record_t *drr;
1584 void *buf = malloc(1<<20); 1996 void *buf = malloc(1<<20);
1997 char errbuf[1024];
1998
1999 (void) snprintf(errbuf, sizeof (errbuf), dgettext(TEXT_DOMAIN,
2000 "cannot receive:"));
1585 2001
1586 /* XXX would be great to use lseek if possible... */ 2002 /* XXX would be great to use lseek if possible... */
1587 drr = buf; 2003 drr = buf;
1588 2004
1589 while (recv_read(hdl, fd, drr, sizeof (dmu_replay_record_t), 2005 while (recv_read(hdl, fd, drr, sizeof (dmu_replay_record_t),
1592 drr->drr_type = BSWAP_32(drr->drr_type); 2008 drr->drr_type = BSWAP_32(drr->drr_type);
1593 2009
1594 switch (drr->drr_type) { 2010 switch (drr->drr_type) {
1595 case DRR_BEGIN: 2011 case DRR_BEGIN:
1596 /* NB: not to be used on v2 stream packages */ 2012 /* NB: not to be used on v2 stream packages */
1597 assert(drr->drr_payloadlen == 0); 2013 if (drr->drr_payloadlen != 0) {
2014 zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
2015 "invalid substream header"));
2016 return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
2017 }
1598 break; 2018 break;
1599 2019
1600 case DRR_END: 2020 case DRR_END:
1601 free(buf); 2021 free(buf);
1602 return (0); 2022 return (0);
1619 } 2039 }
1620 (void) recv_read(hdl, fd, buf, 2040 (void) recv_read(hdl, fd, buf,
1621 drr->drr_u.drr_write.drr_length, B_FALSE, NULL); 2041 drr->drr_u.drr_write.drr_length, B_FALSE, NULL);
1622 break; 2042 break;
1623 2043
2044 case DRR_WRITE_BYREF:
1624 case DRR_FREEOBJECTS: 2045 case DRR_FREEOBJECTS:
1625 case DRR_FREE: 2046 case DRR_FREE:
1626 break; 2047 break;
1627 2048
1628 default: 2049 default:
1629 assert(!"invalid record type"); 2050 zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
2051 "invalid record type"));
2052 return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
1630 } 2053 }
1631 } 2054 }
1632 2055
1633 free(buf); 2056 free(buf);
1634 return (-1); 2057 return (-1);
1726 choplen = strlen(chopprefix); 2149 choplen = strlen(chopprefix);
1727 2150
1728 /* 2151 /*
1729 * Determine name of destination snapshot, store in zc_value. 2152 * Determine name of destination snapshot, store in zc_value.
1730 */ 2153 */
2154 (void) strcpy(zc.zc_top_ds, tosnap);
1731 (void) strcpy(zc.zc_value, tosnap); 2155 (void) strcpy(zc.zc_value, tosnap);
1732 (void) strncat(zc.zc_value, drrb->drr_toname+choplen, 2156 (void) strncat(zc.zc_value, drrb->drr_toname+choplen,
1733 sizeof (zc.zc_value)); 2157 sizeof (zc.zc_value));
1734 if (!zfs_name_valid(zc.zc_value, ZFS_TYPE_SNAPSHOT)) { 2158 if (!zfs_name_valid(zc.zc_value, ZFS_TYPE_SNAPSHOT)) {
1735 zcmd_free_nvlists(&zc); 2159 zcmd_free_nvlists(&zc);
2066 int err; 2490 int err;
2067 dmu_replay_record_t drr, drr_noswap; 2491 dmu_replay_record_t drr, drr_noswap;
2068 struct drr_begin *drrb = &drr.drr_u.drr_begin; 2492 struct drr_begin *drrb = &drr.drr_u.drr_begin;
2069 char errbuf[1024]; 2493 char errbuf[1024];
2070 zio_cksum_t zcksum = { 0 }; 2494 zio_cksum_t zcksum = { 0 };
2495 uint64_t featureflags;
2496 int hdrtype;
2071 2497
2072 (void) snprintf(errbuf, sizeof (errbuf), dgettext(TEXT_DOMAIN, 2498 (void) snprintf(errbuf, sizeof (errbuf), dgettext(TEXT_DOMAIN,
2073 "cannot receive")); 2499 "cannot receive"));
2074 2500
2075 if (flags.isprefix && 2501 if (flags.isprefix &&
2103 flags.byteswap = B_TRUE; 2529 flags.byteswap = B_TRUE;
2104 2530
2105 drr.drr_type = BSWAP_32(drr.drr_type); 2531 drr.drr_type = BSWAP_32(drr.drr_type);
2106 drr.drr_payloadlen = BSWAP_32(drr.drr_payloadlen); 2532 drr.drr_payloadlen = BSWAP_32(drr.drr_payloadlen);
2107 drrb->drr_magic = BSWAP_64(drrb->drr_magic); 2533 drrb->drr_magic = BSWAP_64(drrb->drr_magic);
2108 drrb->drr_version = BSWAP_64(drrb->drr_version); 2534 drrb->drr_versioninfo = BSWAP_64(drrb->drr_versioninfo);
2109 drrb->drr_creation_time = BSWAP_64(drrb->drr_creation_time); 2535 drrb->drr_creation_time = BSWAP_64(drrb->drr_creation_time);
2110 drrb->drr_type = BSWAP_32(drrb->drr_type); 2536 drrb->drr_type = BSWAP_32(drrb->drr_type);
2111 drrb->drr_flags = BSWAP_32(drrb->drr_flags); 2537 drrb->drr_flags = BSWAP_32(drrb->drr_flags);
2112 drrb->drr_toguid = BSWAP_64(drrb->drr_toguid); 2538 drrb->drr_toguid = BSWAP_64(drrb->drr_toguid);
2113 drrb->drr_fromguid = BSWAP_64(drrb->drr_fromguid); 2539 drrb->drr_fromguid = BSWAP_64(drrb->drr_fromguid);
2117 zfs_error_aux(hdl, dgettext(TEXT_DOMAIN, "invalid " 2543 zfs_error_aux(hdl, dgettext(TEXT_DOMAIN, "invalid "
2118 "stream (bad magic number)")); 2544 "stream (bad magic number)"));
2119 return (zfs_error(hdl, EZFS_BADSTREAM, errbuf)); 2545 return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
2120 } 2546 }
2121 2547
2548 featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
2549 hdrtype = DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo);
2550
2551 if (!DMU_STREAM_SUPPORTED(featureflags) ||
2552 (hdrtype != DMU_SUBSTREAM && hdrtype != DMU_COMPOUNDSTREAM)) {
2553 zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
2554 "stream has unsupported feature, feature flags = %lx"),
2555 featureflags);
2556 return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
2557 }
2558
2122 if (strchr(drrb->drr_toname, '@') == NULL) { 2559 if (strchr(drrb->drr_toname, '@') == NULL) {
2123 zfs_error_aux(hdl, dgettext(TEXT_DOMAIN, "invalid " 2560 zfs_error_aux(hdl, dgettext(TEXT_DOMAIN, "invalid "
2124 "stream (bad snapshot name)")); 2561 "stream (bad snapshot name)"));
2125 return (zfs_error(hdl, EZFS_BADSTREAM, errbuf)); 2562 return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
2126 } 2563 }
2127 2564
2128 if (drrb->drr_version == DMU_BACKUP_STREAM_VERSION) { 2565 if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) == DMU_SUBSTREAM) {
2129 return (zfs_receive_one(hdl, infd, tosnap, flags, 2566 return (zfs_receive_one(hdl, infd, tosnap, flags,
2130 &drr, &drr_noswap, stream_avl, top_zfs)); 2567 &drr, &drr_noswap, stream_avl, top_zfs));
2131 } else if (drrb->drr_version == DMU_BACKUP_HEADER_VERSION) { 2568 } else { /* must be DMU_COMPOUNDSTREAM */
2569 assert(DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
2570 DMU_COMPOUNDSTREAM);
2132 return (zfs_receive_package(hdl, infd, tosnap, flags, 2571 return (zfs_receive_package(hdl, infd, tosnap, flags,
2133 &drr, &zcksum, top_zfs)); 2572 &drr, &zcksum, top_zfs));
2134 } else {
2135 zfs_error_aux(hdl, dgettext(TEXT_DOMAIN,
2136 "stream is unsupported version %llu"),
2137 drrb->drr_version);
2138 return (zfs_error(hdl, EZFS_BADSTREAM, errbuf));
2139 } 2573 }
2140 } 2574 }
2141 2575
2142 /* 2576 /*
2143 * Restores a backup of tosnap from the file descriptor specified by infd. 2577 * Restores a backup of tosnap from the file descriptor specified by infd.