Mercurial > nomad
view src/objstore/obj.c @ 555:0e7e00acfe88
objstore: maintain a tree of head object versions
We must keep track of which versions are heads in order to properly
determine which version to use when handling unqualified opens.
Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author | Josef 'Jeff' Sipek <jeffpc@josefsipek.net> |
---|---|
date | Thu, 15 Nov 2018 17:24:12 -0500 |
parents | a480f75291cd |
children | be5d61e30d01 |
line wrap: on
line source
/* * Copyright (c) 2015-2018 Josef 'Jeff' Sipek <jeffpc@josefsipek.net> * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ #include <stddef.h> #include <jeffpc/error.h> #include "objstore_impl.h" struct mem_cache *obj_cache; struct mem_cache *objver_cache; struct mem_cache *open_obj_cache; static LOCK_CLASS(obj_lc); static int ver_cmp(const void *va, const void *vb) { const struct objver *a = va; const struct objver *b = vb; return nvclock_cmp_total(a->clock, b->clock); } /* * Allocate a new generic object structure. */ struct obj *obj_alloc(void) { struct obj *obj; obj = mem_cache_alloc(obj_cache); if (!obj) return NULL; memset(&obj->oid, 0, sizeof(obj->oid)); rb_create(&obj->versions, ver_cmp, sizeof(struct objver), offsetof(struct objver, all_node)); rb_create(&obj->heads, ver_cmp, sizeof(struct objver), offsetof(struct objver, head_node)); obj->nversions = 0; obj->nlink = 0; obj->private = NULL; obj->state = OBJ_STATE_NEW; obj->vol = NULL; obj->ops = NULL; refcnt_init(&obj->refcnt, 1); MXINIT(&obj->lock, &obj_lc); return obj; } /* * Free a generic object structure. */ void obj_free(struct obj *obj) { struct rb_cookie cookie; struct objver *ver; if (!obj) return; if (obj->ops && obj->ops->free) obj->ops->free(obj); memset(&cookie, 0, sizeof(struct rb_cookie)); while ((ver = rb_destroy_nodes(&obj->versions, &cookie))) objver_free(ver); MXDESTROY(&obj->lock); vol_putref(obj->vol); rb_destroy(&obj->versions); mem_cache_free(obj_cache, obj); } /* * Allocate a new generic object version structure. */ struct objver *objver_alloc(void) { struct objver *ver; size_t i; int ret; ver = mem_cache_alloc(objver_cache); if (!ver) return ERR_PTR(-ENOMEM); ver->clock = nvclock_alloc(false); if (!ver->clock) { ret = -ENOMEM; goto err; } memset(&ver->attrs, 0, sizeof(ver->attrs)); ver->private = NULL; for (i = 0; i < ARRAY_LEN(ver->open); i++) ver->open[i] = NULL; ver->obj = NULL; return ver; err: mem_cache_free(objver_cache, ver); return ERR_PTR(ret); } /* * Free a generic object version structure. */ void objver_free(struct objver *ver) { size_t i; if (!ver) return; /* we shouldn't be freeing open objects */ for (i = 0; i < ARRAY_LEN(ver->open); i++) ASSERT3P(ver->open[i], ==, NULL); nvclock_free(ver->clock); mem_cache_free(objver_cache, ver); } /* * Find the object with oid, if there isn't one, allocate one and add it to * the vol's list of objects. * * Returns obj (locked and referenced) on success, negated errno on failure. */ static struct obj *__find_or_alloc(struct objstore *vol, const struct noid *oid) { struct obj key = { .oid = *oid, }; struct obj *obj, *newobj; struct rb_cookie where; bool inserted; inserted = false; newobj = NULL; for (;;) { MXLOCK(&vol->lock); /* try to find the object */ obj = obj_getref(rb_find(&vol->objs, &key, &where)); /* not found and this is the second attempt -> insert it */ if (!obj && newobj) { rb_insert_here(&vol->objs, obj_getref(newobj), &where); obj = newobj; newobj = NULL; inserted = true; } MXUNLOCK(&vol->lock); /* found or inserted -> we're done */ if (obj) { /* newly inserted objects are already locked */ if (!inserted) MXLOCK(&obj->lock); if (obj->state == OBJ_STATE_DEAD) { /* this is a dead one, try again */ MXUNLOCK(&obj->lock); obj_putref(obj); continue; } if (newobj) { MXUNLOCK(&newobj->lock); obj_putref(newobj); } break; } /* allocate a new object */ newobj = obj_alloc(); if (!newobj) return ERR_PTR(-ENOMEM); MXLOCK(&newobj->lock); newobj->oid = *oid; newobj->vol = vol_getref(vol); /* retry the search, and insert if necessary */ } return obj; } /* * Given a vol and an oid, find the corresponding object structure. * * Return with the object locked and referenced. */ struct obj *obj_by_oid(struct objstore *vol, const struct noid *oid) { struct obj *obj; int ret; obj = __find_or_alloc(vol, oid); if (IS_ERR(obj)) { ret = PTR_ERR(obj); goto err; } switch (obj->state) { case OBJ_STATE_NEW: if (vol->ops && vol->ops->initobj && (ret = vol->ops->initobj(obj))) { /* the initobj op failed, mark the obj dead */ obj->state = OBJ_STATE_DEAD; /* remove the object from the objs list */ MXLOCK(&vol->lock); rb_remove(&vol->objs, obj); MXUNLOCK(&vol->lock); goto err_obj; } obj->state = OBJ_STATE_LIVE; break; case OBJ_STATE_LIVE: break; case OBJ_STATE_DEAD: ret = -EINVAL; goto err_obj; } return obj; err_obj: MXUNLOCK(&obj->lock); obj_putref(obj); err: return ERR_PTR(ret); } /* * Fetch a version from the backend, adding it to the cached versions tree. */ static struct objver *__fetch_ver(struct obj *obj, const struct nvclock *clock) { int ret; if (!obj || !obj->ops) return ERR_PTR(-ENOTSUP); if (obj->ops->checkversion) { ret = obj->ops->checkversion(obj, clock); if (ret) return ERR_PTR(ret); } return obj_add_version(obj, clock); } /* * Given a vol, an oid, and a vector clock, find the corresponding object * version structure. * * Return the found version, with the object referenced and locked. */ struct objver *objver_by_clock(struct objstore *vol, const struct noid *oid, const struct nvclock *clock) { struct objver *ver; struct obj *obj; /* * First, find the object based on the oid. */ obj = obj_by_oid(vol, oid); if (IS_ERR(obj)) return ERR_CAST(obj); /* * Second, find the right version of the object. */ if (obj->nversions == 0) { /* * There are no versions at all. */ ver = ERR_PTR(-ENOENT); } else if (!nvclock_is_null(clock)) { /* * We are looking for a specific version. Since the * obj->objects red-black tree is only a cache, it may not * contain it. If it doesn't we need to call into the * backend to fetch it. */ struct objver key = { .clock = (struct nvclock *) clock, }; /* check the cache */ ver = rb_find(&obj->versions, &key, NULL); if (ver) return ver; /* try to fetch the version from the backend */ ver = __fetch_ver(obj, clock); if (!IS_ERR(ver)) return ver; } else if (obj->nversions == 1) { /* * We are *not* looking for a specific version, and there is * only one version available, so we just return that. * We require that the initobj obj op pre-caches all heads, * therefore even though obj->versions is only a cache, we * know that we have the only version and that it is a head. */ ver = rb_first(&obj->versions); ASSERT3P(ver, !=, NULL); return ver; } else { /* * We are *not* looking for a specific version, and there * are two or more versions. */ ver = ERR_PTR(-ENOTUNIQ); } MXUNLOCK(&obj->lock); obj_putref(obj); return ver; } /* check new version against currently known heads & update as necessary */ static void __add_head(struct obj *obj, struct objver *new) { struct objver *head; bool found_descendant; bool removed_head; found_descendant = false; removed_head = false; /* * The set of heads is a set of versions that are all pair-wise * divergent. * * When adding a head, we want to accomplish three things which can * be reasoned about independently: * * (1) We want remove all heads that are less than the version being * added. * * (2) We want to keep all heads that are greater than the version * being added. * * (3) We want to keep all heads that are divergent from the version * being added. * * (4) We want to add the new version as a head iff all the * remaining heads are divergent with it. */ restart: rb_for_each(&obj->heads, head) { ASSERT3P(head, !=, new); switch (nvclock_cmp(head->clock, new->clock)) { case NVC_EQ: panic("found duplicate head version"); case NVC_LT: /* head is less than new - remove it */ rb_remove(&obj->heads, head); removed_head = true; goto restart; case NVC_GT: /* head is greater than new - not a new head */ found_descendant = true; break; case NVC_DIV: /* head is unrelated to new - nothing to do */ break; } } /* * Add the new version as a head if... * (1) we found at least one old head that is the new's ancestor * (2) none of the old heads are the new's descendants */ if (removed_head && !found_descendant) rb_add(&obj->heads, new); } struct objver *obj_add_version(struct obj *obj, const struct nvclock *clock) { struct objver *ver; int ret; if (!obj->ops || !obj->ops->getattr) return ERR_PTR(-ENOTSUP); ver = objver_alloc(); if (IS_ERR(ver)) return ver; ver->obj = obj; nvclock_copy(ver->clock, clock); ret = obj->ops->getattr(ver, &ver->attrs); if (ret) goto err; if (rb_insert(&obj->versions, ver)) { ret = -EEXIST; goto err; } __add_head(obj, ver); return ver; err: objver_free(ver); return ERR_PTR(ret); }