view src/objstore/obj.c @ 852:8ccfc441cff6

objstore: add alloc_only argument to obj_by_oid This bool will allow obj_by_oid to be used to look up both existing objects (e.g., during open) and not-yet-created objects (e.g., during create). Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
date Sat, 17 Dec 2022 14:03:31 -0500
parents 40d18d06086a
children dc84208232c6
line wrap: on
line source

/*
 * Copyright (c) 2015-2020,2022 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

#include <stddef.h>

#include <jeffpc/error.h>

#include "objstore_impl.h"

struct mem_cache *obj_cache;
struct mem_cache *objver_cache;
struct mem_cache *open_obj_cache;

static LOCK_CLASS(obj_lc);

static int ver_cmp(const void *va, const void *vb)
{
	const struct objver *a = va;
	const struct objver *b = vb;

	return nvclock_cmp_total(&a->clock, &b->clock);
}

/*
 * Allocate a new generic object structure.
 */
static struct obj *obj_alloc(void)
{
	struct obj *obj;

	obj = mem_cache_alloc(obj_cache);
	if (!obj)
		return NULL;

	memset(&obj->oid, 0, sizeof(obj->oid));

	rb_create(&obj->versions, ver_cmp, sizeof(struct objver),
		  offsetof(struct objver, all_node));
	rb_create(&obj->heads, ver_cmp, sizeof(struct objver),
		  offsetof(struct objver, head_node));

	obj->nversions = 0;
	obj->private = NULL;
	obj->state = OBJ_STATE_NEW;
	obj->clone = NULL;
	obj->ops = NULL;

	refcnt_init(&obj->refcnt, 1);
	MXINIT(&obj->lock, &obj_lc);

	return obj;
}

/*
 * Free a generic object structure.
 */
void obj_free(struct obj *obj)
{
	struct rb_cookie cookie;
	struct objver *ver;

	if (!obj)
		return;

	if (obj->ops && obj->ops->free)
		obj->ops->free(obj);

	memset(&cookie, 0, sizeof(struct rb_cookie));
	while ((ver = rb_destroy_nodes(&obj->versions, &cookie)))
		objver_free(ver);

	MXDESTROY(&obj->lock);
	vol_putref(obj->clone->vol);
	rb_destroy(&obj->versions);
	mem_cache_free(obj_cache, obj);
}

/*
 * Allocate a new generic object version structure.
 */
struct objver *objver_alloc(void)
{
	struct objver *ver;
	size_t i;

	ver = mem_cache_alloc(objver_cache);
	if (!ver)
		return ERR_PTR(-ENOMEM);

	nvclock_reset(&ver->clock);
	memset(&ver->attrs, 0, sizeof(ver->attrs));

	ver->private = NULL;
	for (i = 0; i < ARRAY_LEN(ver->open); i++)
		ver->open[i] = NULL;
	ver->txn.txn = NULL;
	ver->txn.refcnt = 0;
	ver->txn.prev_ver = NULL;
	ver->txn.min_data_size = 0;
	ver->obj = NULL;

	page_cache_init_objver(ver);

	return ver;
}

/*
 * Free a generic object version structure.
 */
void objver_free(struct objver *ver)
{
	size_t i;

	if (!ver)
		return;

	ASSERT3P(ver->txn.txn, ==, NULL);
	ASSERT3S(ver->txn.refcnt, ==, 0);

	/* we shouldn't be freeing open objects */
	for (i = 0; i < ARRAY_LEN(ver->open); i++)
		ASSERT3P(ver->open[i], ==, NULL);

	page_cache_deinit_objver(ver);

	mem_cache_free(objver_cache, ver);
}

/*
 * Find the object with oid, if there isn't one, allocate one and add it to
 * the vol's list of objects.
 *
 * Returns obj (locked and referenced) on success, negated errno on failure.
 */
static struct obj *__find_or_alloc(struct objstore_clone *clone,
				   const struct noid *oid)
{
	struct objstore *vol = clone->vol;
	struct obj key = {
		.oid = *oid,
	};
	struct obj *obj, *newobj;
	struct rb_cookie where;
	bool inserted;

	inserted = false;
	newobj = NULL;

	for (;;) {
		MXLOCK(&vol->lock);

		/* try to find the object */
		obj = obj_getref(rb_find(&clone->objs, &key, &where));

		/* not found and this is the second attempt -> insert it */
		if (!obj && newobj) {
			rb_insert_here(&clone->objs, obj_getref(newobj), &where);
			obj = newobj;
			newobj = NULL;
			inserted = true;
		}

		MXUNLOCK(&vol->lock);

		/* found or inserted -> we're done */
		if (obj) {
			/* newly inserted objects are already locked */
			if (!inserted)
				MXLOCK(&obj->lock);

			if (obj->state == OBJ_STATE_DEAD) {
				/* this is a dead one, try again */
				MXUNLOCK(&obj->lock);
				obj_putref(obj);
				continue;
			}

			if (newobj) {
				MXUNLOCK(&newobj->lock);
				obj_putref(newobj);
			}

			break;
		}

		/* allocate a new object */
		newobj = obj_alloc();
		if (!newobj)
			return ERR_PTR(-ENOMEM);

		MXLOCK(&newobj->lock);

		newobj->oid = *oid;
		vol_getref(clone->vol); /* TODO: do we need refs for clones? */
		newobj->clone = clone;

		/* retry the search, and insert if necessary */
	}

	return obj;
}

/*
 * Given a vol and an oid, find the corresponding object structure.
 *
 * Return with the object locked and referenced.
 */
struct obj *obj_by_oid(struct objstore_clone *clone, const struct noid *oid,
		       bool alloc_only)
{
	struct objstore *vol = clone->vol;
	struct obj *obj;
	int ret;

	obj = __find_or_alloc(clone, oid);
	if (IS_ERR(obj)) {
		ret = PTR_ERR(obj);
		goto err;
	}

	switch (obj->state) {
		case OBJ_STATE_NEW:
			if (!clone->ops || !clone->ops->initobj) {
				ret = -ENOTSUP;
				goto err_obj_new;
			}

			ret = clone->ops->initobj(obj, alloc_only);
			if (ret)
				goto err_obj_new;

			if (alloc_only) {
				/* must have no versions or heads */
				ASSERT3U(obj->nversions, ==, 0);
				ASSERT3U(rb_numnodes(&obj->heads), ==, 0);
			} else {
				/* must have at least one version & head */
				ASSERT3U(obj->nversions, >=, 1);
				ASSERT3U(rb_numnodes(&obj->heads), >=, 1);
			}

			obj->state = OBJ_STATE_LIVE;
			break;
		case OBJ_STATE_LIVE:
			break;
		case OBJ_STATE_DEAD:
			ret = -EINVAL;
			goto err_obj_dead;
	}

	return obj;

err_obj_new:
	/* the initobj op failed, mark the obj dead */
	obj->state = OBJ_STATE_DEAD;

	/* remove the object from the objs list */
	MXLOCK(&vol->lock);
	rb_remove(&clone->objs, obj);
	MXUNLOCK(&vol->lock);

err_obj_dead:
	MXUNLOCK(&obj->lock);
	obj_putref(obj);

err:
	return ERR_PTR(ret);
}

/*
 * Fetch a version from the backend, adding it to the cached versions tree.
 */
static struct objver *__fetch_ver(struct obj *obj, const struct nvclock *clock)
{
	int ret;

	if (!obj || !obj->ops)
		return ERR_PTR(-ENOTSUP);

	if (obj->ops->checkversion) {
		ret = obj->ops->checkversion(obj, clock);
		if (ret)
			return ERR_PTR(ret);
	}

	return obj_add_version(obj, clock);
}

/*
 * Given a vol, an oid, and a vector clock, find the corresponding object
 * version structure.
 *
 * Return the found version, with the object referenced and locked.
 */
struct objver *objver_by_clock(struct objstore_clone *clone, const struct noid *oid,
			       const struct nvclock *clock)
{
	struct objver *ver;
	struct obj *obj;

	/*
	 * First, find the object based on the oid.
	 */
	obj = obj_by_oid(clone, oid, false);
	if (IS_ERR(obj))
		return ERR_CAST(obj);

	/*
	 * Second, find the right version of the object.
	 */
	if (!nvclock_is_null(clock)) {
		/*
		 * Qualified open
		 *
		 * We are looking for a specific version.  Since the
		 * obj->versions red-black tree is only a cache, it may not
		 * contain it.  If it doesn't we need to call into the
		 * backend to fetch it.
		 */
		struct objver key = {
			.clock = *clock,
		};

		/* check the cache */
		ver = rb_find(&obj->versions, &key, NULL);
		if (ver)
			return ver;

		/* try to fetch the version from the backend */
		ver = __fetch_ver(obj, clock);
		if (!IS_ERR(ver))
			return ver;
	} else if (rb_numnodes(&obj->heads) == 1) {
		/*
		 * Unqualified open with one head
		 *
		 * We are *not* looking for a specific version, but there is
		 * only one head, so we return that.  We require that the
		 * initobj obj op pre-caches all heads, so we are certain
		 * that there are no other heads that haven't been cached.
		 */
		ver = rb_first(&obj->heads);

		ASSERT3P(ver, !=, NULL);

		return ver;
	} else {
		/*
		 * Unqualified open with multiple heads
		 *
		 * We are *not* looking for a specific version, and there
		 * are two or more versions.
		 */
		ASSERT3U(obj->nversions, !=, 0);

		ver = ERR_PTR(-ENOTUNIQ);
	}

	MXUNLOCK(&obj->lock);
	obj_putref(obj);
	return ver;
}

/* check new version against currently known heads & update as necessary */
static void __add_head(struct obj *obj, struct objver *new)
{
	struct objver *head;
	bool found_descendant;

	found_descendant = false;

	/*
	 * The set of heads is a set of versions that are all pair-wise
	 * divergent.
	 *
	 * When adding a head, we want to accomplish three things which can
	 * be reasoned about independently:
	 *
	 * (1) We want remove all heads that are less than the version being
	 *     added.
	 *
	 * (2) We want to keep all heads that are greater than the version
	 *     being added.
	 *
	 * (3) We want to keep all heads that are divergent from the version
	 *     being added.
	 *
	 * (4) We want to add the new version as a head iff all the
	 *     remaining heads are divergent with it.
	 */

restart:
	rb_for_each(&obj->heads, head) {
		ASSERT3P(head, !=, new);

		switch (nvclock_cmp(&head->clock, &new->clock)) {
			case NVC_EQ:
				panic("found duplicate head version");
			case NVC_LT:
				/* head is less than new - remove it */
				rb_remove(&obj->heads, head);
				goto restart;
			case NVC_GT:
				/* head is greater than new - not a new head */
				found_descendant = true;
				break;
			case NVC_DIV:
				/* head is unrelated to new - nothing to do */
				break;
		}
	}

	/*
	 * Add the new version as a head if we haven't found any heads that
	 * are our descendants.
	 */
	if (!found_descendant)
		rb_add(&obj->heads, new);
}

struct objver *obj_add_version(struct obj *obj, const struct nvclock *clock)
{
	struct objver *ver;
	int ret;

	if (!obj->ops || !obj->ops->getattr)
		return ERR_PTR(-ENOTSUP);

	ver = objver_alloc();
	if (IS_ERR(ver))
		return ver;

	ver->obj = obj;
	ver->clock = *clock;

	ret = obj->ops->getattr(ver, &ver->attrs);
	if (ret)
		goto err;

	/* objstore doesn't track inode numbers */
	ver->attrs.ino = 0;

	if (rb_insert(&obj->versions, ver)) {
		ret = -EEXIST;
		goto err;
	}

	__add_head(obj, ver);

	return ver;

err:
	objver_free(ver);

	return ERR_PTR(ret);
}

static int __obj_create_perform(struct txn *txn, struct txn_entry *entry)
{
	return txn->clone->ops->createobj(txn->clone,
					  &entry->create.oid,
					  &entry->create.clock,
					  &entry->create.attrs,
					  &entry->create.parent);
}

int obj_create(struct txn *txn, const struct nattr *attrs,
	       struct noid *oid, const struct noid *parent)
{
	struct txn_entry *entry;
	int ret;

	ret = txn->clone->ops->allocoid(txn->clone, oid);
	if (ret)
		return ret;

	entry = txn_alloc_entry(txn);
	entry->op = OP_CREATE;
	entry->perform = __obj_create_perform;
	entry->create.oid = *oid;
	nvclock_reset(&entry->create.clock);
	VERIFY0(nvclock_inc(&entry->create.clock));
	entry->create.attrs = *attrs;
	entry->create.parent = *parent;
	txn_log_entry(txn, entry);

	return 0;
}

static int __obj_cow_perform(struct txn *txn, struct txn_entry *entry)
{
	struct objstore_open_obj_info *open = entry->cow.open;
	struct objver *old = entry->cow.old;
	struct objver *new = entry->cow.new;
	struct obj *obj = new->obj;
	int ret;

	ASSERT(!open->qualified);
	ASSERT(open->cow.needed);
	ASSERT3P(open->obj, ==, obj);
	ASSERT3P(open->ver, ==, old);
	ASSERT3P(old->obj, ==, new->obj);
	ASSERT3P(old->open[false], ==, open);
	ASSERT3P(new->open[false], ==, NULL);

	ret = obj->ops->cow(old, &new->clock);
	if (ret)
		return ret;

	/* substitute the new version in the open structure */
	open->ver = new;
	old->open[false] = NULL;
	new->open[false] = open;

	rb_add(&obj->versions, new);

	/*
	 * since the new version is a descendant of the old, this will
	 * replace the old head
	 */
	__add_head(obj, new);

	/* clear cow status */
	open->cow.needed = false;

	txn_detach_objver(txn, new);

	return 0;
}

static void __obj_cow_cleanup(struct txn *txn, struct txn_entry *entry)
{
	struct objstore_open_obj_info *open = entry->cow.open;
	struct objver *old = entry->cow.old;
	struct objver *new = entry->cow.new;

	/* restore the old version in the open structure */
	ASSERT(!open->qualified);
	ASSERT(open->cow.needed);
	ASSERT3P(open->obj, ==, new->obj);
	ASSERT3P(open->ver, ==, old);
	ASSERT3P(old->obj, ==, new->obj);
	ASSERT3P(old->open[false], ==, open);
	ASSERT3P(new->open[false], ==, NULL);

	txn_detach_objver(txn, new);

	/* free the version - it was never added to the object */
	objver_free(new);
}

/* add a cow operation to the transaction if necessary */
struct objver *obj_cow(struct txn *txn, struct objstore_open_obj_info *open)
{
	struct nvclock new_clock;
	struct txn_entry *entry;
	struct objver *new;
	struct objver *old;
	int ret;

	old = open->ver;

	ASSERT3P(old->txn.txn, ==, NULL);

	if (!open->cow.needed)
		return old;

	new_clock = old->clock;
	ret = nvclock_inc(&new_clock);
	if (ret)
		return ERR_PTR(ret);

	/* allocate a new version - must match obj_add_version */
	new = objver_alloc();
	if (IS_ERR(new))
		return new;

	new->obj = open->obj;
	new->clock = new_clock;
	new->attrs = old->attrs;

	txn_attach_objver(txn, new);
	new->txn.prev_ver = old;

	/* add a txn entry */
	entry = txn_alloc_entry(txn);
	entry->op = OP_COW;
	entry->perform = __obj_cow_perform;
	entry->cleanup = __obj_cow_cleanup;
	entry->cow.open = open;
	entry->cow.old = old;
	entry->cow.new = new;
	txn_log_entry(txn, entry);

	return new;
}