Mercurial > nomad
view src/objstore/obj.c @ 852:8ccfc441cff6
objstore: add alloc_only argument to obj_by_oid
This bool will allow obj_by_oid to be used to look up both existing objects
(e.g., during open) and not-yet-created objects (e.g., during create).
Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author | Josef 'Jeff' Sipek <jeffpc@josefsipek.net> |
---|---|
date | Sat, 17 Dec 2022 14:03:31 -0500 |
parents | 40d18d06086a |
children | dc84208232c6 |
line wrap: on
line source
/* * Copyright (c) 2015-2020,2022 Josef 'Jeff' Sipek <jeffpc@josefsipek.net> * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ #include <stddef.h> #include <jeffpc/error.h> #include "objstore_impl.h" struct mem_cache *obj_cache; struct mem_cache *objver_cache; struct mem_cache *open_obj_cache; static LOCK_CLASS(obj_lc); static int ver_cmp(const void *va, const void *vb) { const struct objver *a = va; const struct objver *b = vb; return nvclock_cmp_total(&a->clock, &b->clock); } /* * Allocate a new generic object structure. */ static struct obj *obj_alloc(void) { struct obj *obj; obj = mem_cache_alloc(obj_cache); if (!obj) return NULL; memset(&obj->oid, 0, sizeof(obj->oid)); rb_create(&obj->versions, ver_cmp, sizeof(struct objver), offsetof(struct objver, all_node)); rb_create(&obj->heads, ver_cmp, sizeof(struct objver), offsetof(struct objver, head_node)); obj->nversions = 0; obj->private = NULL; obj->state = OBJ_STATE_NEW; obj->clone = NULL; obj->ops = NULL; refcnt_init(&obj->refcnt, 1); MXINIT(&obj->lock, &obj_lc); return obj; } /* * Free a generic object structure. */ void obj_free(struct obj *obj) { struct rb_cookie cookie; struct objver *ver; if (!obj) return; if (obj->ops && obj->ops->free) obj->ops->free(obj); memset(&cookie, 0, sizeof(struct rb_cookie)); while ((ver = rb_destroy_nodes(&obj->versions, &cookie))) objver_free(ver); MXDESTROY(&obj->lock); vol_putref(obj->clone->vol); rb_destroy(&obj->versions); mem_cache_free(obj_cache, obj); } /* * Allocate a new generic object version structure. */ struct objver *objver_alloc(void) { struct objver *ver; size_t i; ver = mem_cache_alloc(objver_cache); if (!ver) return ERR_PTR(-ENOMEM); nvclock_reset(&ver->clock); memset(&ver->attrs, 0, sizeof(ver->attrs)); ver->private = NULL; for (i = 0; i < ARRAY_LEN(ver->open); i++) ver->open[i] = NULL; ver->txn.txn = NULL; ver->txn.refcnt = 0; ver->txn.prev_ver = NULL; ver->txn.min_data_size = 0; ver->obj = NULL; page_cache_init_objver(ver); return ver; } /* * Free a generic object version structure. */ void objver_free(struct objver *ver) { size_t i; if (!ver) return; ASSERT3P(ver->txn.txn, ==, NULL); ASSERT3S(ver->txn.refcnt, ==, 0); /* we shouldn't be freeing open objects */ for (i = 0; i < ARRAY_LEN(ver->open); i++) ASSERT3P(ver->open[i], ==, NULL); page_cache_deinit_objver(ver); mem_cache_free(objver_cache, ver); } /* * Find the object with oid, if there isn't one, allocate one and add it to * the vol's list of objects. * * Returns obj (locked and referenced) on success, negated errno on failure. */ static struct obj *__find_or_alloc(struct objstore_clone *clone, const struct noid *oid) { struct objstore *vol = clone->vol; struct obj key = { .oid = *oid, }; struct obj *obj, *newobj; struct rb_cookie where; bool inserted; inserted = false; newobj = NULL; for (;;) { MXLOCK(&vol->lock); /* try to find the object */ obj = obj_getref(rb_find(&clone->objs, &key, &where)); /* not found and this is the second attempt -> insert it */ if (!obj && newobj) { rb_insert_here(&clone->objs, obj_getref(newobj), &where); obj = newobj; newobj = NULL; inserted = true; } MXUNLOCK(&vol->lock); /* found or inserted -> we're done */ if (obj) { /* newly inserted objects are already locked */ if (!inserted) MXLOCK(&obj->lock); if (obj->state == OBJ_STATE_DEAD) { /* this is a dead one, try again */ MXUNLOCK(&obj->lock); obj_putref(obj); continue; } if (newobj) { MXUNLOCK(&newobj->lock); obj_putref(newobj); } break; } /* allocate a new object */ newobj = obj_alloc(); if (!newobj) return ERR_PTR(-ENOMEM); MXLOCK(&newobj->lock); newobj->oid = *oid; vol_getref(clone->vol); /* TODO: do we need refs for clones? */ newobj->clone = clone; /* retry the search, and insert if necessary */ } return obj; } /* * Given a vol and an oid, find the corresponding object structure. * * Return with the object locked and referenced. */ struct obj *obj_by_oid(struct objstore_clone *clone, const struct noid *oid, bool alloc_only) { struct objstore *vol = clone->vol; struct obj *obj; int ret; obj = __find_or_alloc(clone, oid); if (IS_ERR(obj)) { ret = PTR_ERR(obj); goto err; } switch (obj->state) { case OBJ_STATE_NEW: if (!clone->ops || !clone->ops->initobj) { ret = -ENOTSUP; goto err_obj_new; } ret = clone->ops->initobj(obj, alloc_only); if (ret) goto err_obj_new; if (alloc_only) { /* must have no versions or heads */ ASSERT3U(obj->nversions, ==, 0); ASSERT3U(rb_numnodes(&obj->heads), ==, 0); } else { /* must have at least one version & head */ ASSERT3U(obj->nversions, >=, 1); ASSERT3U(rb_numnodes(&obj->heads), >=, 1); } obj->state = OBJ_STATE_LIVE; break; case OBJ_STATE_LIVE: break; case OBJ_STATE_DEAD: ret = -EINVAL; goto err_obj_dead; } return obj; err_obj_new: /* the initobj op failed, mark the obj dead */ obj->state = OBJ_STATE_DEAD; /* remove the object from the objs list */ MXLOCK(&vol->lock); rb_remove(&clone->objs, obj); MXUNLOCK(&vol->lock); err_obj_dead: MXUNLOCK(&obj->lock); obj_putref(obj); err: return ERR_PTR(ret); } /* * Fetch a version from the backend, adding it to the cached versions tree. */ static struct objver *__fetch_ver(struct obj *obj, const struct nvclock *clock) { int ret; if (!obj || !obj->ops) return ERR_PTR(-ENOTSUP); if (obj->ops->checkversion) { ret = obj->ops->checkversion(obj, clock); if (ret) return ERR_PTR(ret); } return obj_add_version(obj, clock); } /* * Given a vol, an oid, and a vector clock, find the corresponding object * version structure. * * Return the found version, with the object referenced and locked. */ struct objver *objver_by_clock(struct objstore_clone *clone, const struct noid *oid, const struct nvclock *clock) { struct objver *ver; struct obj *obj; /* * First, find the object based on the oid. */ obj = obj_by_oid(clone, oid, false); if (IS_ERR(obj)) return ERR_CAST(obj); /* * Second, find the right version of the object. */ if (!nvclock_is_null(clock)) { /* * Qualified open * * We are looking for a specific version. Since the * obj->versions red-black tree is only a cache, it may not * contain it. If it doesn't we need to call into the * backend to fetch it. */ struct objver key = { .clock = *clock, }; /* check the cache */ ver = rb_find(&obj->versions, &key, NULL); if (ver) return ver; /* try to fetch the version from the backend */ ver = __fetch_ver(obj, clock); if (!IS_ERR(ver)) return ver; } else if (rb_numnodes(&obj->heads) == 1) { /* * Unqualified open with one head * * We are *not* looking for a specific version, but there is * only one head, so we return that. We require that the * initobj obj op pre-caches all heads, so we are certain * that there are no other heads that haven't been cached. */ ver = rb_first(&obj->heads); ASSERT3P(ver, !=, NULL); return ver; } else { /* * Unqualified open with multiple heads * * We are *not* looking for a specific version, and there * are two or more versions. */ ASSERT3U(obj->nversions, !=, 0); ver = ERR_PTR(-ENOTUNIQ); } MXUNLOCK(&obj->lock); obj_putref(obj); return ver; } /* check new version against currently known heads & update as necessary */ static void __add_head(struct obj *obj, struct objver *new) { struct objver *head; bool found_descendant; found_descendant = false; /* * The set of heads is a set of versions that are all pair-wise * divergent. * * When adding a head, we want to accomplish three things which can * be reasoned about independently: * * (1) We want remove all heads that are less than the version being * added. * * (2) We want to keep all heads that are greater than the version * being added. * * (3) We want to keep all heads that are divergent from the version * being added. * * (4) We want to add the new version as a head iff all the * remaining heads are divergent with it. */ restart: rb_for_each(&obj->heads, head) { ASSERT3P(head, !=, new); switch (nvclock_cmp(&head->clock, &new->clock)) { case NVC_EQ: panic("found duplicate head version"); case NVC_LT: /* head is less than new - remove it */ rb_remove(&obj->heads, head); goto restart; case NVC_GT: /* head is greater than new - not a new head */ found_descendant = true; break; case NVC_DIV: /* head is unrelated to new - nothing to do */ break; } } /* * Add the new version as a head if we haven't found any heads that * are our descendants. */ if (!found_descendant) rb_add(&obj->heads, new); } struct objver *obj_add_version(struct obj *obj, const struct nvclock *clock) { struct objver *ver; int ret; if (!obj->ops || !obj->ops->getattr) return ERR_PTR(-ENOTSUP); ver = objver_alloc(); if (IS_ERR(ver)) return ver; ver->obj = obj; ver->clock = *clock; ret = obj->ops->getattr(ver, &ver->attrs); if (ret) goto err; /* objstore doesn't track inode numbers */ ver->attrs.ino = 0; if (rb_insert(&obj->versions, ver)) { ret = -EEXIST; goto err; } __add_head(obj, ver); return ver; err: objver_free(ver); return ERR_PTR(ret); } static int __obj_create_perform(struct txn *txn, struct txn_entry *entry) { return txn->clone->ops->createobj(txn->clone, &entry->create.oid, &entry->create.clock, &entry->create.attrs, &entry->create.parent); } int obj_create(struct txn *txn, const struct nattr *attrs, struct noid *oid, const struct noid *parent) { struct txn_entry *entry; int ret; ret = txn->clone->ops->allocoid(txn->clone, oid); if (ret) return ret; entry = txn_alloc_entry(txn); entry->op = OP_CREATE; entry->perform = __obj_create_perform; entry->create.oid = *oid; nvclock_reset(&entry->create.clock); VERIFY0(nvclock_inc(&entry->create.clock)); entry->create.attrs = *attrs; entry->create.parent = *parent; txn_log_entry(txn, entry); return 0; } static int __obj_cow_perform(struct txn *txn, struct txn_entry *entry) { struct objstore_open_obj_info *open = entry->cow.open; struct objver *old = entry->cow.old; struct objver *new = entry->cow.new; struct obj *obj = new->obj; int ret; ASSERT(!open->qualified); ASSERT(open->cow.needed); ASSERT3P(open->obj, ==, obj); ASSERT3P(open->ver, ==, old); ASSERT3P(old->obj, ==, new->obj); ASSERT3P(old->open[false], ==, open); ASSERT3P(new->open[false], ==, NULL); ret = obj->ops->cow(old, &new->clock); if (ret) return ret; /* substitute the new version in the open structure */ open->ver = new; old->open[false] = NULL; new->open[false] = open; rb_add(&obj->versions, new); /* * since the new version is a descendant of the old, this will * replace the old head */ __add_head(obj, new); /* clear cow status */ open->cow.needed = false; txn_detach_objver(txn, new); return 0; } static void __obj_cow_cleanup(struct txn *txn, struct txn_entry *entry) { struct objstore_open_obj_info *open = entry->cow.open; struct objver *old = entry->cow.old; struct objver *new = entry->cow.new; /* restore the old version in the open structure */ ASSERT(!open->qualified); ASSERT(open->cow.needed); ASSERT3P(open->obj, ==, new->obj); ASSERT3P(open->ver, ==, old); ASSERT3P(old->obj, ==, new->obj); ASSERT3P(old->open[false], ==, open); ASSERT3P(new->open[false], ==, NULL); txn_detach_objver(txn, new); /* free the version - it was never added to the object */ objver_free(new); } /* add a cow operation to the transaction if necessary */ struct objver *obj_cow(struct txn *txn, struct objstore_open_obj_info *open) { struct nvclock new_clock; struct txn_entry *entry; struct objver *new; struct objver *old; int ret; old = open->ver; ASSERT3P(old->txn.txn, ==, NULL); if (!open->cow.needed) return old; new_clock = old->clock; ret = nvclock_inc(&new_clock); if (ret) return ERR_PTR(ret); /* allocate a new version - must match obj_add_version */ new = objver_alloc(); if (IS_ERR(new)) return new; new->obj = open->obj; new->clock = new_clock; new->attrs = old->attrs; txn_attach_objver(txn, new); new->txn.prev_ver = old; /* add a txn entry */ entry = txn_alloc_entry(txn); entry->op = OP_COW; entry->perform = __obj_cow_perform; entry->cleanup = __obj_cow_cleanup; entry->cow.open = open; entry->cow.old = old; entry->cow.new = new; txn_log_entry(txn, entry); return new; }