view src/objstore/obj_txn.c @ 855:6fa5635525c9

objstore: update obj_setattr to use cleanup txn entry callback Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
date Sat, 17 Dec 2022 14:35:00 -0500
parents dc84208232c6
children eb4cc36f8ed1
line wrap: on
line source

/*
 * Copyright (c) 2015-2020,2022 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

#include <jeffpc/error.h>

#include "objstore_impl.h"

/* returns number of bytes read (if > 0), or negated errno otherwise */
ssize_t obj_read(struct objver *ver, void *_buf, size_t len,
		 uint64_t offset)
{
	uint8_t *buf = _buf;
	size_t orig_len;
	int ret;

	if (offset >= ver->attrs.size)
		return 0;

	len = MIN(len, ver->attrs.size - offset);
	orig_len = len;

	while (len) {
		const uint64_t pgoff = offset & PAGE_OFFSET_MASK;
		const size_t thislen = MIN(len, PAGE_SIZE - pgoff);
		struct page *page;

		page = page_lookup(ver, offset / PAGE_SIZE, PG_ALLOC | PG_FILL);
		if (IS_ERR(page)) {
			ret = PTR_ERR(page);
			break;
		}

		memcpy(buf, &page->ptr[pgoff], thislen);

		page_unlock(page);

		buf += thislen;
		len -= thislen;
		offset += thislen;
	}

	return (orig_len == len) ? ret : (orig_len - len);
}

static int __obj_setattr_perform(struct txn *txn, struct txn_entry *entry)
{
	struct objver *ver = entry->setattr.ver;

	return ver->obj->ops->setattr(ver,
				      &ver->attrs,
				      entry->setattr.changed);
}

static void __obj_setattr_rollback(struct txn *txn, struct txn_entry *entry)
{
	entry->setattr.ver->attrs = entry->setattr.old_attrs;
}

static void __obj_setattr_cleanup(struct txn *txn, struct txn_entry *entry)
{
	txn_detach_objver(txn, entry->setattr.ver);
}

void obj_setattr(struct txn *txn, struct objver *ver, const struct nattr *attrs,
		 unsigned valid)
{
	struct txn_entry *entry;

	txn_attach_objver(txn, ver);

	if (valid & OBJ_ATTR_SIZE) {
		const uint64_t old = p2roundup(ver->attrs.size, PAGE_SIZE);
		const uint64_t new = p2roundup(attrs->size, PAGE_SIZE);

		/* invalidate all pages beyond the new EOF */
		if (old > new)
			page_inval_range(ver, new / PAGE_SIZE,
					 (old - new) / PAGE_SIZE);

		/* keep track of the shortest truncation so far */
		ver->txn.min_data_size = MIN(ver->txn.min_data_size,
					     attrs->size);
	}

	entry = txn_alloc_entry(txn);
	entry->op = OP_SETATTR;
	entry->perform = __obj_setattr_perform;
	entry->rollback = __obj_setattr_rollback;
	entry->cleanup = __obj_setattr_cleanup;
	entry->setattr.ver = ver;
	entry->setattr.old_attrs = ver->attrs;
	entry->setattr.changed = valid;

	if (valid & OBJ_ATTR_MODE)
		ver->attrs.mode = attrs->mode;
	if (valid & _OBJ_ATTR_NLINK)
		ver->attrs.nlink = attrs->nlink;
	if (valid & OBJ_ATTR_SIZE)
		ver->attrs.size = attrs->size;
	if (valid & OBJ_ATTR_ATIME)
		ver->attrs.atime = attrs->atime;
	if (valid & OBJ_ATTR_BTIME)
		ver->attrs.btime = attrs->btime;
	if (valid & OBJ_ATTR_CTIME)
		ver->attrs.ctime = attrs->ctime;
	if (valid & OBJ_ATTR_MTIME)
		ver->attrs.mtime = attrs->mtime;
	if (valid & OBJ_ATTR_OWNER)
		ver->attrs.owner = attrs->owner;
	if (valid & OBJ_ATTR_GROUP)
		ver->attrs.group = attrs->group;

	txn_log_entry(txn, entry);
}

static int __obj_write_perform(struct txn *txn, struct txn_entry *entry)
{
	const uint64_t last_pgno = entry->write.pgno + entry->write.pgcnt - 1;
	struct objver *ver = entry->write.ver;
	uint64_t pgno;
	int ret;

	for (pgno = entry->write.pgno; pgno <= last_pgno; pgno++) {
		struct page *page;

		page = page_lookup(ver, pgno, 0);
		if (IS_ERR(page)) {
			VERIFY3U(PTR_ERR(page), ==, -ENOENT);
			continue;
		}

		/*
		 * It may have been written out by a previous transaction
		 * entry in the same transaction.  If that's the case, there
		 * is no point in writing it out again.  The previous write
		 * clears the dirty bit, so we use that here to check.
		 */
		if (page->dirty) {
			ret = ver->obj->ops->write_page(ver, page->ptr, pgno);
			if (ret)
				goto err;

			page->dirty = false;
		}

		page_unlock(page);
	}

	txn_detach_objver(txn, ver);

	return 0;

err:
	/* drop the remaining dirty pages */
	page_inval_range(ver, pgno, last_pgno - pgno + 1);

	txn_detach_objver(txn, ver);

	return ret;
}

static void __obj_write_rollback(struct txn *txn, struct txn_entry *entry)
{
	/* drop the dirty pages */
	page_inval_range(entry->write.ver, entry->write.pgno,
			 entry->write.pgcnt);

	txn_detach_objver(txn, entry->write.ver);
}

/* write an arbitrary number of bytes */
ssize_t obj_write(struct txn *txn, struct objver *ver, const void *_buf,
		  size_t len, uint64_t offset)
{
	const uint8_t *buf = _buf;
	struct txn_entry *entry;
	size_t written;
	int ret;

	ASSERT3U(offset, <, ver->attrs.size);
	ASSERT3U(offset + len, <=, ver->attrs.size);

	txn_attach_objver(txn, ver);

	entry = txn_alloc_entry(txn);
	entry->op = OP_WRITE;
	entry->perform = __obj_write_perform;
	entry->rollback = __obj_write_rollback;
	entry->write.ver = ver;
	entry->write.pgno = offset / PAGE_SIZE;
	entry->write.pgcnt = p2roundup(len, PAGE_SIZE) / PAGE_SIZE;
	txn_log_entry(txn, entry);

	written = 0;
	ret = 0;

	while (len) {
		const uint64_t pgoff = offset & PAGE_OFFSET_MASK;
		const size_t thislen = MIN(len, PAGE_SIZE - pgoff);
		const bool partial = (thislen != PAGE_SIZE);
		struct page *page;

		page = page_lookup(ver, offset / PAGE_SIZE,
				   PG_ALLOC | (partial ? PG_FILL : 0));
		if (IS_ERR(page)) {
			ret = PTR_ERR(page);
			break;
		}

		memcpy(&page->ptr[pgoff], buf, thislen);

		page->dirty = true;

		page_unlock(page);

		buf += thislen;
		len -= thislen;
		offset += thislen;
		written += thislen;
	}

	/*
	 * If we wrote anything out, return number of bytes, otherwise
	 * return the errno if we failed before we wrote any bytes out.
	 */
	return written ? written : ret;
}