changeset 10526:a1c57e83a4db

6870528 writing comstar property larger than 1k disk pool size incorrectly modifies property data 6879871 Repeated concurrent updates to COMSTAR metadata fails
author tim szeto <Tim.Szeto@Sun.COM>
date Mon, 14 Sep 2009 15:13:58 -0600
parents c8fd4dff34d8
children 239e78cbcf46
files usr/src/uts/common/io/comstar/lu/stmf_sbd/sbd.c usr/src/uts/common/io/comstar/lu/stmf_sbd/stmf_sbd.h
diffstat 2 files changed, 187 insertions(+), 33 deletions(-) [+]
line wrap: on
line diff
--- a/usr/src/uts/common/io/comstar/lu/stmf_sbd/sbd.c	Mon Sep 14 13:03:18 2009 -0700
+++ b/usr/src/uts/common/io/comstar/lu/stmf_sbd/sbd.c	Mon Sep 14 15:13:58 2009 -0600
@@ -870,10 +870,12 @@
 	sm_section_hdr_t sms;
 	int alloced = 0;
 
+	mutex_enter(&sl->sl_metadata_lock);
 	if (((*ppsms) == NULL) || ((*ppsms)->sms_offset == 0)) {
 		bzero(&sms, sizeof (sm_section_hdr_t));
 		sms.sms_id = sms_id;
 		if ((ret = sbd_load_section_hdr(sl, &sms)) != SBD_SUCCESS) {
+			mutex_exit(&sl->sl_metadata_lock);
 			return (ret);
 		} else {
 			if ((*ppsms) == NULL) {
@@ -897,6 +899,7 @@
 				ret = SBD_META_CORRUPTED;
 		}
 	}
+	mutex_exit(&sl->sl_metadata_lock);
 
 	if ((ret != SBD_SUCCESS) && alloced)
 		kmem_free(*ppsms, sms.sms_size);
@@ -904,32 +907,67 @@
 }
 
 sbd_status_t
+sbd_load_section_hdr_unbuffered(sbd_lu_t *sl, sm_section_hdr_t *sms)
+{
+	sbd_status_t	ret;
+
+	/*
+	 * Bypass buffering and re-read the meta data from permanent storage.
+	 */
+	if (sl->sl_flags & SL_ZFS_META) {
+		if ((ret = sbd_open_zfs_meta(sl)) != SBD_SUCCESS) {
+			return (ret);
+		}
+	}
+	/* Re-get the meta sizes into sl */
+	if ((ret = sbd_load_meta_start(sl)) != SBD_SUCCESS) {
+		return (ret);
+	}
+	return (sbd_load_section_hdr(sl, sms));
+}
+
+sbd_status_t
 sbd_write_meta_section(sbd_lu_t *sl, sm_section_hdr_t *sms)
 {
 	sm_section_hdr_t t;
 	uint64_t off, s;
 	uint64_t unused_start;
 	sbd_status_t ret;
+	sbd_status_t write_meta_ret = SBD_SUCCESS;
 	uint8_t *cb;
-	int update_meta_start = 0;
+	int meta_size_changed = 0;
+	sm_section_hdr_t sms_before_unused = {0};
 
+	mutex_enter(&sl->sl_metadata_lock);
 write_meta_section_again:
 	if (sms->sms_offset) {
-		/* Verify that size has not changed */
+		/*
+		 * If the section already exists and the size is the
+		 * same as this new data then overwrite in place. If
+		 * the sizes are different then mark the existing as
+		 * unused and look for free space.
+		 */
 		ret = sbd_read_meta(sl, sms->sms_offset, sizeof (t),
 		    (uint8_t *)&t);
-		if (ret != SBD_SUCCESS)
+		if (ret != SBD_SUCCESS) {
+			mutex_exit(&sl->sl_metadata_lock);
 			return (ret);
+		}
 		if (t.sms_data_order != SMS_DATA_ORDER) {
 			sbd_swap_section_hdr(&t);
 		}
 		if (t.sms_id != sms->sms_id) {
+			mutex_exit(&sl->sl_metadata_lock);
 			return (SBD_INVALID_ARG);
 		}
 		if (t.sms_size == sms->sms_size) {
-			return (sbd_write_meta(sl, sms->sms_offset,
-			    sms->sms_size, (uint8_t *)sms));
+			ret = sbd_write_meta(sl, sms->sms_offset,
+			    sms->sms_size, (uint8_t *)sms);
+			mutex_exit(&sl->sl_metadata_lock);
+			return (ret);
 		}
+		sms_before_unused = t;
+
 		t.sms_id = SMS_ID_UNUSED;
 		/*
 		 * For unused sections we only use chksum of the header. for
@@ -938,10 +976,13 @@
 		t.sms_chksum = sbd_calc_section_sum(&t, sizeof (t));
 		ret = sbd_write_meta(sl, t.sms_offset, sizeof (t),
 		    (uint8_t *)&t);
-		if (ret != SBD_SUCCESS)
+		if (ret != SBD_SUCCESS) {
+			mutex_exit(&sl->sl_metadata_lock);
 			return (ret);
+		}
 		sms->sms_offset = 0;
 	} else {
+		/* Section location is unknown, search for it. */
 		t.sms_id = sms->sms_id;
 		t.sms_data_order = SMS_DATA_ORDER;
 		ret = sbd_load_section_hdr(sl, &t);
@@ -951,29 +992,44 @@
 			    sbd_calc_section_sum(sms, sms->sms_size);
 			goto write_meta_section_again;
 		} else if (ret != SBD_NOT_FOUND) {
+			mutex_exit(&sl->sl_metadata_lock);
 			return (ret);
 		}
 	}
 
 	/*
 	 * At this point we know that section does not already exist.
-	 * find space large enough to hold the section or grow meta if
+	 * Find space large enough to hold the section or grow meta if
 	 * possible.
 	 */
 	unused_start = 0;
-	s = 0;
+	s = 0;	/* size of space found */
+
+	/*
+	 * Search all sections for unused space of sufficient size.
+	 * The first one found is taken. Contiguous unused sections
+	 * will be combined.
+	 */
 	for (off = sl->sl_meta_offset + sizeof (sbd_meta_start_t);
 	    off < sl->sl_meta_size_used; off += t.sms_size) {
 		ret = sbd_read_meta(sl, off, sizeof (t), (uint8_t *)&t);
-		if (ret != SBD_SUCCESS)
+		if (ret != SBD_SUCCESS) {
+			mutex_exit(&sl->sl_metadata_lock);
 			return (ret);
+		}
 		if (t.sms_data_order != SMS_DATA_ORDER)
 			sbd_swap_section_hdr(&t);
-		if (t.sms_size == 0)
+		if (t.sms_size == 0) {
+			mutex_exit(&sl->sl_metadata_lock);
 			return (SBD_META_CORRUPTED);
+		}
 		if (t.sms_id == SMS_ID_UNUSED) {
 			if (unused_start == 0)
 				unused_start = off;
+			/*
+			 * Calculate size of the unused space, break out
+			 * if it satisfies the requirement.
+			 */
 			s = t.sms_size - unused_start + off;
 			if ((s == sms->sms_size) || (s >= (sms->sms_size +
 			    sizeof (t)))) {
@@ -987,19 +1043,24 @@
 	}
 
 	off = (unused_start == 0) ? sl->sl_meta_size_used : unused_start;
+	/*
+	 * If none found, how much room is at the end?
+	 * See if the data can be expanded.
+	 */
 	if (s == 0) {
 		s = sl->sl_total_meta_size - off;
-		/* Lets see if we can expand the metadata */
 		if (s >= sms->sms_size || !(sl->sl_flags & SL_SHARED_META)) {
 			s = sms->sms_size;
-			update_meta_start = 1;
+			meta_size_changed = 1;
 		} else {
 			s = 0;
 		}
 	}
 
-	if (s == 0)
+	if (s == 0) {
+		mutex_exit(&sl->sl_metadata_lock);
 		return (SBD_ALLOC_FAILURE);
+	}
 
 	sms->sms_offset = off;
 	sms->sms_chksum = sbd_calc_section_sum(sms, sms->sms_size);
@@ -1017,27 +1078,98 @@
 		t.sms_chksum = sbd_calc_section_sum(&t, sizeof (t));
 		bcopy(&t, cb + sms->sms_size, sizeof (t));
 	}
-	ret = sbd_write_meta(sl, off, s, cb);
-	kmem_free(cb, s);
-	if (ret != SBD_SUCCESS)
-		return (ret);
-
-	if (update_meta_start) {
+	/*
+	 * Two write events & statuses take place. Failure writing the
+	 * meta section takes precedence, can possibly be rolled back,
+	 * & gets reported. Else return status from writing the meta start.
+	 */
+	ret = SBD_SUCCESS; /* Set a default, it's not always loaded below. */
+	if (meta_size_changed) {
+		uint64_t old_meta_size;
 		uint64_t old_sz_used = sl->sl_meta_size_used; /* save a copy */
-		sl->sl_meta_size_used = off + s;
-		s = sl->sl_total_meta_size; /* save a copy */
-		if (sl->sl_total_meta_size < sl->sl_meta_size_used) {
-			uint64_t meta_align =
-			    (((uint64_t)1) << sl->sl_meta_blocksize_shift) - 1;
-			sl->sl_total_meta_size = (sl->sl_meta_size_used +
-			    meta_align) & (~meta_align);
+		old_meta_size = sl->sl_total_meta_size; /* save a copy */
+
+		write_meta_ret = sbd_write_meta(sl, off, s, cb);
+		if (write_meta_ret == SBD_SUCCESS) {
+			sl->sl_meta_size_used = off + s;
+			if (sl->sl_total_meta_size < sl->sl_meta_size_used) {
+				uint64_t meta_align =
+				    (((uint64_t)1) <<
+				    sl->sl_meta_blocksize_shift) - 1;
+				sl->sl_total_meta_size =
+				    (sl->sl_meta_size_used + meta_align) &
+				    (~meta_align);
+			}
+			ret = sbd_write_meta_start(sl, sl->sl_total_meta_size,
+			    sl->sl_meta_size_used);
+			if (ret != SBD_SUCCESS) {
+				sl->sl_meta_size_used = old_sz_used;
+				sl->sl_total_meta_size = old_meta_size;
+			}
+		} else {
+			sl->sl_meta_size_used = old_sz_used;
+			sl->sl_total_meta_size = old_meta_size;
 		}
-		ret = sbd_write_meta_start(sl, sl->sl_total_meta_size,
-		    sl->sl_meta_size_used);
-		if (ret != SBD_SUCCESS) {
-			sl->sl_meta_size_used = old_sz_used;
-			sl->sl_total_meta_size = s;
+	} else {
+		write_meta_ret = sbd_write_meta(sl, off, s, cb);
+	}
+	if ((write_meta_ret != SBD_SUCCESS) &&
+	    (sms_before_unused.sms_offset != 0)) {
+		sm_section_hdr_t new_sms;
+		sm_section_hdr_t *unused_sms;
+		/*
+		 * On failure writing the meta section attempt to undo
+		 * the change to unused.
+		 * Re-read the meta data from permanent storage.
+		 * The section id can't exist for undo to be possible.
+		 * Read what should be the entire old section data and
+		 * insure the old data's still present by validating
+		 * against it's old checksum.
+		 */
+		new_sms.sms_id = sms->sms_id;
+		new_sms.sms_data_order = SMS_DATA_ORDER;
+		if (sbd_load_section_hdr_unbuffered(sl, &new_sms) !=
+		    SBD_NOT_FOUND) {
+			goto done;
+		}
+		unused_sms = kmem_zalloc(sms_before_unused.sms_size, KM_SLEEP);
+		if (sbd_read_meta(sl, sms_before_unused.sms_offset,
+		    sms_before_unused.sms_size,
+		    (uint8_t *)unused_sms) != SBD_SUCCESS) {
+			goto done;
 		}
+		if (unused_sms->sms_data_order != SMS_DATA_ORDER) {
+			sbd_swap_section_hdr(unused_sms);
+		}
+		if (unused_sms->sms_id != SMS_ID_UNUSED) {
+			goto done;
+		}
+		if (unused_sms->sms_offset != sms_before_unused.sms_offset) {
+			goto done;
+		}
+		if (unused_sms->sms_size != sms_before_unused.sms_size) {
+			goto done;
+		}
+		unused_sms->sms_id = sms_before_unused.sms_id;
+		if (sbd_calc_section_sum(unused_sms,
+		    sizeof (sm_section_hdr_t)) !=
+		    sbd_calc_section_sum(&sms_before_unused,
+		    sizeof (sm_section_hdr_t))) {
+			goto done;
+		}
+		unused_sms->sms_chksum =
+		    sbd_calc_section_sum(unused_sms, unused_sms->sms_size);
+		if (unused_sms->sms_chksum != sms_before_unused.sms_chksum) {
+			goto done;
+		}
+		(void) sbd_write_meta(sl, unused_sms->sms_offset,
+		    sizeof (sm_section_hdr_t), (uint8_t *)unused_sms);
+	}
+done:
+	mutex_exit(&sl->sl_metadata_lock);
+	kmem_free(cb, s);
+	if (write_meta_ret != SBD_SUCCESS) {
+		return (write_meta_ret);
 	}
 	return (ret);
 }
@@ -1337,6 +1469,7 @@
 	}
 	if (sl->sl_flags & SL_LINKED)
 		sbd_unlink_lu(sl);
+	mutex_destroy(&sl->sl_metadata_lock);
 	mutex_destroy(&sl->sl_lock);
 	rw_destroy(&sl->sl_pgr->pgr_lock);
 	if (sl->sl_serial_no_alloc_size) {
@@ -1419,6 +1552,7 @@
 	sl->sl_pgr = (sbd_pgr_t *)(sl + 1);
 	rw_init(&sl->sl_pgr->pgr_lock, NULL, RW_DRIVER, NULL);
 	mutex_init(&sl->sl_lock, NULL, MUTEX_DRIVER, NULL);
+	mutex_init(&sl->sl_metadata_lock, NULL, MUTEX_DRIVER, NULL);
 	p = ((char *)sl) + sizeof (sbd_lu_t) + sizeof (sbd_pgr_t);
 	sl->sl_data_filename = p;
 	(void) strcpy(sl->sl_data_filename, namebuf + slu->slu_data_fname_off);
@@ -1608,12 +1742,15 @@
 	}
 
 	/* Lets create the meta now */
+	mutex_enter(&sl->sl_metadata_lock);
 	if (sbd_write_meta_start(sl, sl->sl_total_meta_size,
 	    sizeof (sbd_meta_start_t)) != SBD_SUCCESS) {
+		mutex_exit(&sl->sl_metadata_lock);
 		*err_ret = SBD_RET_META_CREATION_FAILED;
 		ret = EIO;
 		goto scm_err_out;
 	}
+	mutex_exit(&sl->sl_metadata_lock);
 	sl->sl_meta_size_used = sl->sl_meta_offset + sizeof (sbd_meta_start_t);
 
 	if (sbd_write_lu_info(sl) != SBD_SUCCESS) {
@@ -1711,6 +1848,7 @@
 	sl->sl_name = sl->sl_meta_filename;
 	rw_init(&sl->sl_pgr->pgr_lock, NULL, RW_DRIVER, NULL);
 	mutex_init(&sl->sl_lock, NULL, MUTEX_DRIVER, NULL);
+	mutex_init(&sl->sl_metadata_lock, NULL, MUTEX_DRIVER, NULL);
 	sl->sl_trans_op = SL_OP_IMPORT_LU;
 	/* we're only loading the metadata */
 	if (!no_register) {
@@ -1759,7 +1897,9 @@
 	sl->sl_meta_offset = (sl->sl_flags & SL_ZFS_META) ? 0 : SBD_META_OFFSET;
 	sl->sl_flags |= SL_META_OPENED;
 
+	mutex_enter(&sl->sl_metadata_lock);
 	sret = sbd_load_meta_start(sl);
+	mutex_exit(&sl->sl_metadata_lock);
 	if (sret != SBD_SUCCESS) {
 		if (sret == SBD_META_CORRUPTED) {
 			*err_ret = SBD_RET_NO_META;
@@ -2565,8 +2705,12 @@
 	int		len;
 	char		*file;
 
-	if (sbd_create_zfs_meta_object(sl) == SBD_FAILURE)
-		return (SBD_FAILURE);
+	if (sl->sl_zfs_meta == NULL) {
+		if (sbd_create_zfs_meta_object(sl) == SBD_FAILURE)
+			return (SBD_FAILURE);
+	} else {
+		bzero(sl->sl_zfs_meta, (ZAP_MAXVALUELEN / 2));
+	}
 
 	rw_enter(&sl->sl_zfs_meta_lock, RW_WRITER);
 	file = sbd_get_zvol_name(sl);
@@ -2617,6 +2761,15 @@
 	if ((off + sz) > (ZAP_MAXVALUELEN / 2 - 1)) {
 		return (SBD_META_CORRUPTED);
 	}
+	if ((off + sz) > sl->sl_meta_size_used) {
+		sl->sl_meta_size_used = off + sz;
+		if (sl->sl_total_meta_size < sl->sl_meta_size_used) {
+			uint64_t meta_align =
+			    (((uint64_t)1) << sl->sl_meta_blocksize_shift) - 1;
+			sl->sl_total_meta_size = (sl->sl_meta_size_used +
+			    meta_align) & (~meta_align);
+		}
+	}
 	ptr = ah_meta = kmem_zalloc(ZAP_MAXVALUELEN, KM_SLEEP);
 	rw_enter(&sl->sl_zfs_meta_lock, RW_WRITER);
 	bcopy(buf, &sl->sl_zfs_meta[off], sz);
--- a/usr/src/uts/common/io/comstar/lu/stmf_sbd/stmf_sbd.h	Mon Sep 14 13:03:18 2009 -0700
+++ b/usr/src/uts/common/io/comstar/lu/stmf_sbd/stmf_sbd.h	Mon Sep 14 15:13:58 2009 -0600
@@ -189,6 +189,7 @@
 	char		*sl_name;		/* refers to meta or data */
 
 	/* Metadata */
+	kmutex_t	sl_metadata_lock;
 	char		*sl_alias;
 	char		*sl_meta_filename;	/* If applicable */
 	char		*sl_mgmt_url;