view src/objstore/obj.c @ 669:c0730cae0a8e

objstore: require initobj op Without it, we have no way of knowing how many versions an object has, and what its heads are. We assume that we know both for every object. Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
date Mon, 04 Mar 2019 17:20:27 -0500
parents 10364796fb99
children 9dfdc42bf860
line wrap: on
line source

/*
 * Copyright (c) 2015-2019 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

#include <stddef.h>

#include <jeffpc/error.h>

#include "objstore_impl.h"

struct mem_cache *obj_cache;
struct mem_cache *objver_cache;
struct mem_cache *open_obj_cache;

static LOCK_CLASS(obj_lc);

static int ver_cmp(const void *va, const void *vb)
{
	const struct objver *a = va;
	const struct objver *b = vb;

	return nvclock_cmp_total(a->clock, b->clock);
}

/*
 * Allocate a new generic object structure.
 */
struct obj *obj_alloc(void)
{
	struct obj *obj;

	obj = mem_cache_alloc(obj_cache);
	if (!obj)
		return NULL;

	memset(&obj->oid, 0, sizeof(obj->oid));

	rb_create(&obj->versions, ver_cmp, sizeof(struct objver),
		  offsetof(struct objver, all_node));
	rb_create(&obj->heads, ver_cmp, sizeof(struct objver),
		  offsetof(struct objver, head_node));

	obj->nversions = 0;
	obj->nlink = 0;
	obj->private = NULL;
	obj->state = OBJ_STATE_NEW;
	obj->vol = NULL;
	obj->ops = NULL;

	refcnt_init(&obj->refcnt, 1);
	MXINIT(&obj->lock, &obj_lc);

	return obj;
}

/*
 * Free a generic object structure.
 */
void obj_free(struct obj *obj)
{
	struct rb_cookie cookie;
	struct objver *ver;

	if (!obj)
		return;

	if (obj->ops && obj->ops->free)
		obj->ops->free(obj);

	memset(&cookie, 0, sizeof(struct rb_cookie));
	while ((ver = rb_destroy_nodes(&obj->versions, &cookie)))
		objver_free(ver);

	MXDESTROY(&obj->lock);
	vol_putref(obj->vol);
	rb_destroy(&obj->versions);
	mem_cache_free(obj_cache, obj);
}

/*
 * Allocate a new generic object version structure.
 */
struct objver *objver_alloc(void)
{
	struct objver *ver;
	size_t i;
	int ret;

	ver = mem_cache_alloc(objver_cache);
	if (!ver)
		return ERR_PTR(-ENOMEM);

	ver->clock = nvclock_alloc(false);
	if (!ver->clock) {
		ret = -ENOMEM;
		goto err;
	}

	memset(&ver->attrs, 0, sizeof(ver->attrs));

	ver->private = NULL;
	for (i = 0; i < ARRAY_LEN(ver->open); i++)
		ver->open[i] = NULL;
	ver->obj = NULL;

	return ver;

err:
	mem_cache_free(objver_cache, ver);

	return ERR_PTR(ret);
}

/*
 * Free a generic object version structure.
 */
void objver_free(struct objver *ver)
{
	size_t i;

	if (!ver)
		return;

	/* we shouldn't be freeing open objects */
	for (i = 0; i < ARRAY_LEN(ver->open); i++)
		ASSERT3P(ver->open[i], ==, NULL);

	nvclock_free(ver->clock);
	mem_cache_free(objver_cache, ver);
}

/*
 * Find the object with oid, if there isn't one, allocate one and add it to
 * the vol's list of objects.
 *
 * Returns obj (locked and referenced) on success, negated errno on failure.
 */
static struct obj *__find_or_alloc(struct objstore *vol, const struct noid *oid)
{
	struct obj key = {
		.oid = *oid,
	};
	struct obj *obj, *newobj;
	struct rb_cookie where;
	bool inserted;

	inserted = false;
	newobj = NULL;

	for (;;) {
		MXLOCK(&vol->lock);

		/* try to find the object */
		obj = obj_getref(rb_find(&vol->objs, &key, &where));

		/* not found and this is the second attempt -> insert it */
		if (!obj && newobj) {
			rb_insert_here(&vol->objs, obj_getref(newobj), &where);
			obj = newobj;
			newobj = NULL;
			inserted = true;
		}

		MXUNLOCK(&vol->lock);

		/* found or inserted -> we're done */
		if (obj) {
			/* newly inserted objects are already locked */
			if (!inserted)
				MXLOCK(&obj->lock);

			if (obj->state == OBJ_STATE_DEAD) {
				/* this is a dead one, try again */
				MXUNLOCK(&obj->lock);
				obj_putref(obj);
				continue;
			}

			if (newobj) {
				MXUNLOCK(&newobj->lock);
				obj_putref(newobj);
			}

			break;
		}

		/* allocate a new object */
		newobj = obj_alloc();
		if (!newobj)
			return ERR_PTR(-ENOMEM);

		MXLOCK(&newobj->lock);

		newobj->oid = *oid;
		newobj->vol = vol_getref(vol);

		/* retry the search, and insert if necessary */
	}

	return obj;
}

/*
 * Given a vol and an oid, find the corresponding object structure.
 *
 * Return with the object locked and referenced.
 */
struct obj *obj_by_oid(struct objstore *vol, const struct noid *oid)
{
	struct obj *obj;
	int ret;

	obj = __find_or_alloc(vol, oid);
	if (IS_ERR(obj)) {
		ret = PTR_ERR(obj);
		goto err;
	}

	switch (obj->state) {
		case OBJ_STATE_NEW:
			if (!vol->ops || !vol->ops->initobj) {
				ret = -ENOTSUP;
				goto err_obj_new;
			}

			ret = vol->ops->initobj(obj);
			if (ret)
				goto err_obj_new;

			/* must have at least one version & head */
			ASSERT3U(obj->nversions, >=, 1);
			ASSERT3U(rb_numnodes(&obj->heads), >=, 1);

			obj->state = OBJ_STATE_LIVE;
			break;
		case OBJ_STATE_LIVE:
			break;
		case OBJ_STATE_DEAD:
			ret = -EINVAL;
			goto err_obj_dead;
	}

	return obj;

err_obj_new:
	/* the initobj op failed, mark the obj dead */
	obj->state = OBJ_STATE_DEAD;

	/* remove the object from the objs list */
	MXLOCK(&vol->lock);
	rb_remove(&vol->objs, obj);
	MXUNLOCK(&vol->lock);

err_obj_dead:
	MXUNLOCK(&obj->lock);
	obj_putref(obj);

err:
	return ERR_PTR(ret);
}

/*
 * Fetch a version from the backend, adding it to the cached versions tree.
 */
static struct objver *__fetch_ver(struct obj *obj, const struct nvclock *clock)
{
	int ret;

	if (!obj || !obj->ops)
		return ERR_PTR(-ENOTSUP);

	if (obj->ops->checkversion) {
		ret = obj->ops->checkversion(obj, clock);
		if (ret)
			return ERR_PTR(ret);
	}

	return obj_add_version(obj, clock);
}

/*
 * Given a vol, an oid, and a vector clock, find the corresponding object
 * version structure.
 *
 * Return the found version, with the object referenced and locked.
 */
struct objver *objver_by_clock(struct objstore *vol, const struct noid *oid,
			       const struct nvclock *clock)
{
	struct objver *ver;
	struct obj *obj;

	/*
	 * First, find the object based on the oid.
	 */
	obj = obj_by_oid(vol, oid);
	if (IS_ERR(obj))
		return ERR_CAST(obj);

	/*
	 * Second, find the right version of the object.
	 */
	if (!nvclock_is_null(clock)) {
		/*
		 * Qualified open
		 *
		 * We are looking for a specific version.  Since the
		 * obj->versions red-black tree is only a cache, it may not
		 * contain it.  If it doesn't we need to call into the
		 * backend to fetch it.
		 */
		struct objver key = {
			.clock = (struct nvclock *) clock,
		};

		/* check the cache */
		ver = rb_find(&obj->versions, &key, NULL);
		if (ver)
			return ver;

		/* try to fetch the version from the backend */
		ver = __fetch_ver(obj, clock);
		if (!IS_ERR(ver))
			return ver;
	} else if (rb_numnodes(&obj->heads) == 1) {
		/*
		 * Unqualified open with one head
		 *
		 * We are *not* looking for a specific version, but there is
		 * only one head, so we return that.  We require that the
		 * initobj obj op pre-caches all heads, so we are certain
		 * that there are no other heads that haven't been cached.
		 */
		ver = rb_first(&obj->heads);

		ASSERT3P(ver, !=, NULL);

		return ver;
	} else {
		/*
		 * Unqualified open with multiple heads
		 *
		 * We are *not* looking for a specific version, and there
		 * are two or more versions.
		 */
		ASSERT3U(obj->nversions, !=, 0);

		ver = ERR_PTR(-ENOTUNIQ);
	}

	MXUNLOCK(&obj->lock);
	obj_putref(obj);
	return ver;
}

/* check new version against currently known heads & update as necessary */
static void __add_head(struct obj *obj, struct objver *new)
{
	struct objver *head;
	bool found_descendant;

	found_descendant = false;

	/*
	 * The set of heads is a set of versions that are all pair-wise
	 * divergent.
	 *
	 * When adding a head, we want to accomplish three things which can
	 * be reasoned about independently:
	 *
	 * (1) We want remove all heads that are less than the version being
	 *     added.
	 *
	 * (2) We want to keep all heads that are greater than the version
	 *     being added.
	 *
	 * (3) We want to keep all heads that are divergent from the version
	 *     being added.
	 *
	 * (4) We want to add the new version as a head iff all the
	 *     remaining heads are divergent with it.
	 */

restart:
	rb_for_each(&obj->heads, head) {
		ASSERT3P(head, !=, new);

		switch (nvclock_cmp(head->clock, new->clock)) {
			case NVC_EQ:
				panic("found duplicate head version");
			case NVC_LT:
				/* head is less than new - remove it */
				rb_remove(&obj->heads, head);
				goto restart;
			case NVC_GT:
				/* head is greater than new - not a new head */
				found_descendant = true;
				break;
			case NVC_DIV:
				/* head is unrelated to new - nothing to do */
				break;
		}
	}

	/*
	 * Add the new version as a head if we haven't found any heads that
	 * are our descendants.
	 */
	if (!found_descendant)
		rb_add(&obj->heads, new);
}

struct objver *obj_add_version(struct obj *obj, const struct nvclock *clock)
{
	struct objver *ver;
	int ret;

	if (!obj->ops || !obj->ops->getattr)
		return ERR_PTR(-ENOTSUP);

	ver = objver_alloc();
	if (IS_ERR(ver))
		return ver;

	ver->obj = obj;
	nvclock_copy(ver->clock, clock);

	ret = obj->ops->getattr(ver, &ver->attrs);
	if (ret)
		goto err;

	/* objstore doesn't track inode numbers */
	ver->attrs.ino = 0;

	if (rb_insert(&obj->versions, ver)) {
		ret = -EEXIST;
		goto err;
	}

	__add_head(obj, ver);

	return ver;

err:
	objver_free(ver);

	return ERR_PTR(ret);
}