view src/objstore/obj.c @ 547:1c18eaf2480a

objstore: assume that backends without getversion obj op cache all versions If a backend doesn't define a getversion obj op, assume that the backend loads all versions into memory when struct obj is allocated. Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
date Tue, 06 Nov 2018 16:16:58 -0500
parents 23dfc03a9b89
children 6646b1736dd8
line wrap: on
line source

/*
 * Copyright (c) 2015-2018 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

#include <stddef.h>

#include <jeffpc/error.h>

#include "objstore_impl.h"

struct mem_cache *obj_cache;
struct mem_cache *objver_cache;
struct mem_cache *open_obj_cache;

static LOCK_CLASS(obj_lc);

static int ver_cmp(const void *va, const void *vb)
{
	const struct objver *a = va;
	const struct objver *b = vb;

	return nvclock_cmp_total(a->clock, b->clock);
}

/*
 * Allocate a new generic object structure.
 */
struct obj *obj_alloc(void)
{
	struct obj *obj;

	obj = mem_cache_alloc(obj_cache);
	if (!obj)
		return NULL;

	memset(&obj->oid, 0, sizeof(obj->oid));

	rb_create(&obj->versions, ver_cmp, sizeof(struct objver),
		  offsetof(struct objver, node));

	obj->nversions = 0;
	obj->nlink = 0;
	obj->private = NULL;
	obj->state = OBJ_STATE_NEW;
	obj->vol = NULL;
	obj->ops = NULL;

	refcnt_init(&obj->refcnt, 1);
	MXINIT(&obj->lock, &obj_lc);

	return obj;
}

/*
 * Free a generic object structure.
 */
void obj_free(struct obj *obj)
{
	if (!obj)
		return;

	if (obj->ops && obj->ops->free)
		obj->ops->free(obj);

	MXDESTROY(&obj->lock);
	vol_putref(obj->vol);
	rb_destroy(&obj->versions);
	mem_cache_free(obj_cache, obj);
}

/*
 * Allocate a new generic object version structure.
 */
struct objver *objver_alloc(void)
{
	struct objver *ver;
	size_t i;
	int ret;

	ver = mem_cache_alloc(objver_cache);
	if (!ver)
		return ERR_PTR(-ENOMEM);

	ver->clock = nvclock_alloc(false);
	if (!ver->clock) {
		ret = -ENOMEM;
		goto err;
	}

	memset(&ver->attrs, 0, sizeof(ver->attrs));

	ver->private = NULL;
	for (i = 0; i < ARRAY_LEN(ver->open); i++)
		ver->open[i] = NULL;
	ver->obj = NULL;

	return ver;

err:
	mem_cache_free(objver_cache, ver);

	return ERR_PTR(ret);
}

/*
 * Free a generic object version structure.
 */
void objver_free(struct objver *ver)
{
	size_t i;

	if (!ver)
		return;

	/* we shouldn't be freeing open objects */
	for (i = 0; i < ARRAY_LEN(ver->open); i++)
		ASSERT3P(ver->open[i], ==, NULL);

	nvclock_free(ver->clock);
	mem_cache_free(objver_cache, ver);
}

/*
 * Find the object with oid, if there isn't one, allocate one and add it to
 * the vol's list of objects.
 *
 * Returns obj (locked and referenced) on success, negated errno on failure.
 */
static struct obj *__find_or_alloc(struct objstore *vol, const struct noid *oid)
{
	struct obj key = {
		.oid = *oid,
	};
	struct obj *obj, *newobj;
	struct rb_cookie where;
	bool inserted;

	inserted = false;
	newobj = NULL;

	for (;;) {
		MXLOCK(&vol->lock);

		/* try to find the object */
		obj = obj_getref(rb_find(&vol->objs, &key, &where));

		/* not found and this is the second attempt -> insert it */
		if (!obj && newobj) {
			rb_insert_here(&vol->objs, obj_getref(newobj), &where);
			obj = newobj;
			newobj = NULL;
			inserted = true;
		}

		MXUNLOCK(&vol->lock);

		/* found or inserted -> we're done */
		if (obj) {
			/* newly inserted objects are already locked */
			if (!inserted)
				MXLOCK(&obj->lock);

			if (obj->state == OBJ_STATE_DEAD) {
				/* this is a dead one, try again */
				MXUNLOCK(&obj->lock);
				obj_putref(obj);
				continue;
			}

			if (newobj) {
				MXUNLOCK(&newobj->lock);
				obj_putref(newobj);
			}

			break;
		}

		/* allocate a new object */
		newobj = obj_alloc();
		if (!newobj)
			return ERR_PTR(-ENOMEM);

		MXLOCK(&newobj->lock);

		newobj->oid = *oid;
		newobj->vol = vol_getref(vol);

		/* retry the search, and insert if necessary */
	}

	return obj;
}

/*
 * Given a vol and an oid, find the corresponding object structure.
 *
 * Return with the object locked and referenced.
 */
struct obj *obj_by_oid(struct objstore *vol, const struct noid *oid)
{
	struct obj *obj;
	int ret;

	obj = __find_or_alloc(vol, oid);
	if (IS_ERR(obj)) {
		ret = PTR_ERR(obj);
		goto err;
	}

	switch (obj->state) {
		case OBJ_STATE_NEW:
			if (vol->ops && vol->ops->initobj &&
			    (ret = vol->ops->initobj(obj))) {
				/* the initobj op failed, mark the obj dead */
				obj->state = OBJ_STATE_DEAD;

				/* remove the object from the objs list */
				MXLOCK(&vol->lock);
				rb_remove(&vol->objs, obj);
				MXUNLOCK(&vol->lock);
				goto err_obj;
			}

			obj->state = OBJ_STATE_LIVE;
			break;
		case OBJ_STATE_LIVE:
			break;
		case OBJ_STATE_DEAD:
			ret = -EINVAL;
			goto err_obj;
	}

	return obj;

err_obj:
	MXUNLOCK(&obj->lock);
	obj_putref(obj);

err:
	return ERR_PTR(ret);
}

/*
 * Fetch a version from the backend, adding it to the cached versions tree.
 */
static struct objver *__fetch_ver(struct obj *obj, const struct nvclock *clock)
{
	struct objver *ver;
	int ret;

	if (!obj || !obj->ops || !obj->ops->getversion)
		return ERR_PTR(-ENOENT);

	ver = objver_alloc();
	if (IS_ERR(ver))
		return ver;

	if (clock)
		nvclock_copy(ver->clock, clock);

	ver->obj = obj;

	ret = obj->ops->getversion(ver);
	if (ret) {
		objver_free(ver);
		return ERR_PTR(ret);
	}

	rb_add(&obj->versions, ver);

	return ver;
}

/*
 * Given a vol, an oid, and a vector clock, find the corresponding object
 * version structure.
 *
 * Return the found version, with the object referenced and locked.
 */
struct objver *objver_by_clock(struct objstore *vol, const struct noid *oid,
			       const struct nvclock *clock)
{
	struct objver *ver;
	struct obj *obj;

	/*
	 * First, find the object based on the oid.
	 */
	obj = obj_by_oid(vol, oid);
	if (IS_ERR(obj))
		return ERR_CAST(obj);

	/*
	 * Second, find the right version of the object.
	 */
	if (obj->nversions == 0) {
		/*
		 * There are no versions at all.
		 */
		ver = ERR_PTR(-ENOENT);
	} else if (!nvclock_is_null(clock)) {
		/*
		 * We are looking for a specific version.  Since the
		 * obj->objects red-black tree is only a cache, it may not
		 * contain it.  If it doesn't we need to call into the
		 * backend to fetch it.
		 */
		struct objver key = {
			.clock = (struct nvclock *) clock,
		};

		/* check the cache */
		ver = rb_find(&obj->versions, &key, NULL);
		if (ver)
			return ver;

		/* try to fetch the version from the backend */
		ver = __fetch_ver(obj, clock);
		if (!IS_ERR(ver))
			return ver;
	} else if (obj->nversions == 1) {
		/*
		 * We are *not* looking for a specific version, and there is
		 * only one version available, so we just return that.
		 * We require that the initobj obj op pre-caches all heads,
		 * therefore even though obj->versions is only a cache, we
		 * know that we have the only version and that it is a head.
		 */
		ver = rb_first(&obj->versions);

		ASSERT3P(ver, !=, NULL);

		return ver;
	} else {
		/*
		 * We are *not* looking for a specific version, and there
		 * are two or more versions.
		 */
		ver = ERR_PTR(-ENOTUNIQ);
	}

	MXUNLOCK(&obj->lock);
	obj_putref(obj);
	return ver;
}