changeset 542:2e89f3ca8957

objstore: split up the oversized vol.c vol.c ended up with a lot of code that doesn't exactly belong there. This commits moves a bunch of the code out, into other files. Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
date Sun, 11 Nov 2018 10:21:57 -0500
parents a46d8d36f799
children 32a60cd03dee
files src/objstore/CMakeLists.txt src/objstore/obj.c src/objstore/obj_ops.c src/objstore/objstore_impl.h src/objstore/vol.c src/objstore/vol_ops.c
diffstat 6 files changed, 653 insertions(+), 600 deletions(-) [+]
line wrap: on
line diff
--- a/src/objstore/CMakeLists.txt	Wed Nov 07 18:08:52 2018 -0500
+++ b/src/objstore/CMakeLists.txt	Sun Nov 11 10:21:57 2018 -0500
@@ -22,9 +22,11 @@
 
 add_library(nomad_objstore SHARED
 	obj.c
+	obj_ops.c
 	objstore.c
 	vdev.c
 	vol.c
+	vol_ops.c
 )
 
 target_link_libraries(nomad_objstore
--- a/src/objstore/obj.c	Wed Nov 07 18:08:52 2018 -0500
+++ b/src/objstore/obj.c	Sun Nov 11 10:21:57 2018 -0500
@@ -28,6 +28,7 @@
 
 struct mem_cache *obj_cache;
 struct mem_cache *objver_cache;
+struct mem_cache *open_obj_cache;
 
 static LOCK_CLASS(obj_lc);
 
@@ -136,3 +137,233 @@
 	nvclock_free(ver->clock);
 	mem_cache_free(objver_cache, ver);
 }
+
+/*
+ * Find the object with oid, if there isn't one, allocate one and add it to
+ * the vol's list of objects.
+ *
+ * Returns obj (locked and referenced) on success, negated errno on failure.
+ */
+static struct obj *__find_or_alloc(struct objstore *vol, const struct noid *oid)
+{
+	struct obj key = {
+		.oid = *oid,
+	};
+	struct obj *obj, *newobj;
+	struct rb_cookie where;
+	bool inserted;
+
+	inserted = false;
+	newobj = NULL;
+
+	for (;;) {
+		MXLOCK(&vol->lock);
+
+		/* try to find the object */
+		obj = obj_getref(rb_find(&vol->objs, &key, &where));
+
+		/* not found and this is the second attempt -> insert it */
+		if (!obj && newobj) {
+			rb_insert_here(&vol->objs, obj_getref(newobj), &where);
+			obj = newobj;
+			newobj = NULL;
+			inserted = true;
+		}
+
+		MXUNLOCK(&vol->lock);
+
+		/* found or inserted -> we're done */
+		if (obj) {
+			/* newly inserted objects are already locked */
+			if (!inserted)
+				MXLOCK(&obj->lock);
+
+			if (obj->state == OBJ_STATE_DEAD) {
+				/* this is a dead one, try again */
+				MXUNLOCK(&obj->lock);
+				obj_putref(obj);
+				continue;
+			}
+
+			if (newobj) {
+				MXUNLOCK(&newobj->lock);
+				obj_putref(newobj);
+			}
+
+			break;
+		}
+
+		/* allocate a new object */
+		newobj = allocobj();
+		if (!newobj)
+			return ERR_PTR(-ENOMEM);
+
+		MXLOCK(&newobj->lock);
+
+		newobj->oid = *oid;
+		newobj->vol = vol_getref(vol);
+
+		/* retry the search, and insert if necessary */
+	}
+
+	return obj;
+}
+
+/*
+ * Given a vol and an oid, find the corresponding object structure.
+ *
+ * Return with the object locked and referenced.
+ */
+struct obj *getobj(struct objstore *vol, const struct noid *oid)
+{
+	struct obj *obj;
+	int ret;
+
+	obj = __find_or_alloc(vol, oid);
+	if (IS_ERR(obj)) {
+		ret = PTR_ERR(obj);
+		goto err;
+	}
+
+	switch (obj->state) {
+		case OBJ_STATE_NEW:
+			if (vol->ops && vol->ops->allocobj &&
+			    (ret = vol->ops->allocobj(obj))) {
+				/* the allocobj op failed, mark the obj dead */
+				obj->state = OBJ_STATE_DEAD;
+
+				/* remove the object from the objs list */
+				MXLOCK(&vol->lock);
+				rb_remove(&vol->objs, obj);
+				MXUNLOCK(&vol->lock);
+				goto err_obj;
+			}
+
+			obj->state = OBJ_STATE_LIVE;
+			break;
+		case OBJ_STATE_LIVE:
+			break;
+		case OBJ_STATE_DEAD:
+			ret = -EINVAL;
+			goto err_obj;
+	}
+
+	return obj;
+
+err_obj:
+	MXUNLOCK(&obj->lock);
+	obj_putref(obj);
+
+err:
+	return ERR_PTR(ret);
+}
+
+/*
+ * Fetch a version from the backend, adding it to the cached versions tree.
+ */
+static struct objver *__fetch_ver(struct obj *obj, const struct nvclock *clock)
+{
+	struct objver *ver;
+	int ret;
+
+	if (!obj || !obj->ops || !obj->ops->getversion)
+		return ERR_PTR(-ENOTSUP);
+
+	ver = allocobjver();
+	if (IS_ERR(ver))
+		return ver;
+
+	if (clock)
+		nvclock_copy(ver->clock, clock);
+
+	ver->obj = obj;
+
+	ret = obj->ops->getversion(ver);
+	if (ret) {
+		freeobjver(ver);
+		return ERR_PTR(ret);
+	}
+
+	rb_add(&obj->versions, ver);
+
+	return ver;
+}
+
+/*
+ * Given a vol, an oid, and a vector clock, find the corresponding object
+ * version structure.
+ *
+ * Return the found version, with the object referenced and locked.
+ */
+struct objver *getver(struct objstore *vol, const struct noid *oid,
+		      const struct nvclock *clock)
+{
+	struct objver *ver;
+	struct obj *obj;
+
+	/*
+	 * First, find the object based on the oid.
+	 */
+	obj = getobj(vol, oid);
+	if (IS_ERR(obj))
+		return ERR_CAST(obj);
+
+	/*
+	 * Second, find the right version of the object.
+	 */
+	if (obj->nversions == 0) {
+		/*
+		 * There are no versions at all.
+		 */
+		ver = ERR_PTR(-ENOENT);
+	} else if (!nvclock_is_null(clock)) {
+		/*
+		 * We are looking for a specific version.  Since the
+		 * obj->objects red-black tree is only a cache, it may not
+		 * contain it.  If it doesn't we need to call into the
+		 * backend to fetch it.
+		 */
+		struct objver key = {
+			.clock = (struct nvclock *) clock,
+		};
+
+		/* check the cache */
+		ver = rb_find(&obj->versions, &key, NULL);
+		if (ver)
+			return ver;
+
+		/* try to fetch the version from the backend */
+		ver = __fetch_ver(obj, clock);
+		if (!IS_ERR(ver))
+			return ver;
+	} else if (obj->nversions == 1) {
+		/*
+		 * We are *not* looking for a specific version, and there is
+		 * only one version available, so we just return that.  This
+		 * is slightly complicated by the fact that the
+		 * obj->versions red-black tree is only a cache, and
+		 * therefore it might be empty.  If it is, we need to call
+		 * into the backend to fetch it.
+		 */
+		ASSERT3U(rb_numnodes(&obj->versions), <=, 1);
+
+		ver = rb_first(&obj->versions);
+		if (ver)
+			return ver;
+
+		/* try to fetch the version from the backend */
+		ver = __fetch_ver(obj, NULL);
+		if (!IS_ERR(ver))
+			return ver;
+	} else {
+		/*
+		 * We are *not* looking for a specific version, and there
+		 * are two or more versions.
+		 */
+		ver = ERR_PTR(-ENOTUNIQ);
+	}
+
+	MXUNLOCK(&obj->lock);
+	obj_putref(obj);
+	return ver;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/objstore/obj_ops.c	Sun Nov 11 10:21:57 2018 -0500
@@ -0,0 +1,377 @@
+/*
+ * Copyright (c) 2015-2018 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include "objstore_impl.h"
+
+struct objstore_open_obj_info *objstore_open(struct objstore *vol,
+					     const struct noid *oid,
+					     const struct nvclock *clock)
+{
+	struct objstore_open_obj_info *open;
+	struct objver *objver;
+	struct obj *obj;
+	bool qualified;
+	int ret;
+
+	if (!vol || !oid || !clock)
+		return ERR_PTR(-EINVAL);
+
+	objver = getver(vol, oid, clock);
+	if (IS_ERR(objver))
+		return ERR_CAST(objver);
+
+	qualified = nvclock_is_null(clock);
+
+	obj = objver->obj;
+	open = objver->open[qualified];
+
+	if (!open) {
+		/* first open */
+		open = mem_cache_alloc(open_obj_cache);
+		if (!open) {
+			ret = -ENOMEM;
+			goto err;
+		}
+
+		if (obj->ops && obj->ops->open)
+			ret = obj->ops->open(objver);
+		else
+			ret = 0;
+
+		if (ret)
+			goto err_free;
+
+		open->obj = obj;
+		open->ver = objver;
+		open->open_count = 0; /* will be incremented below */
+
+		objver->open[qualified] = open;
+	}
+
+	ASSERT(objver->open[qualified]);
+
+	open->open_count++;
+
+	/*
+	 * The object associated with 'objver' is locked and held.  We
+	 * unlock it, but keep the hold on it.  Then we return the pointer
+	 * to the objver to the caller.  Then, later on when the caller
+	 * calls the close function, we release the object's hold.
+	 *
+	 * While the object version is open, we maintain this reference hold
+	 * keeping the object and the version structures in memory.  Even if
+	 * all links are removed, we keep the structures around and let
+	 * operations use them.  This mimics the POSIX file system semantics
+	 * well.
+	 */
+
+	MXUNLOCK(&obj->lock);
+
+	return open;
+
+err_free:
+	mem_cache_free(open_obj_cache, open);
+
+err:
+	MXUNLOCK(&obj->lock);
+
+	obj_putref(obj);
+
+	return ERR_PTR(ret);
+}
+
+int objstore_close(struct objstore_open_obj_info *open)
+{
+	struct obj *obj;
+	bool putref;
+	int ret;
+
+	if (!open)
+		return -EINVAL;
+
+	obj = open->obj;
+
+	/*
+	 * Yes, we are dereferencing the pointer that the caller supplied.
+	 * Yes, this is safe.  The only reason this is safe is because we
+	 * rely on the caller to keep the pointer safe.  Specifically, we
+	 * assume that the caller didn't transmit the pointer over the
+	 * network.
+	 */
+
+	MXLOCK(&obj->lock);
+	open->open_count--;
+	putref = true;
+
+	if (open->open_count) {
+		ret = 0;
+		putref = false;
+	} else if (obj->ops && obj->ops->close) {
+		/* last close */
+		ret = obj->ops->close(open->ver);
+	} else {
+		ret = 0;
+	}
+
+	if (putref && !ret)
+		open->ver->open[open == open->ver->open[1]] = NULL;
+	else if (ret)
+		open->open_count++; /* undo earlier decrement */
+	MXUNLOCK(&obj->lock);
+
+	/* release the reference obtained in objstore_open() */
+	if (putref && !ret) {
+		mem_cache_free(open_obj_cache, open);
+		obj_putref(obj);
+	}
+
+	return ret;
+}
+
+int objstore_getattr(struct objstore_open_obj_info *open, struct nattr *attr)
+{
+	struct obj *obj;
+	int ret;
+
+	if (!open || !attr)
+		return -EINVAL;
+
+	obj = open->obj;
+
+	if (!obj->ops || !obj->ops->getattr)
+		return -ENOTSUP;
+
+	MXLOCK(&obj->lock);
+	ret = obj->ops->getattr(open->ver, attr);
+	MXUNLOCK(&obj->lock);
+
+	return ret;
+}
+
+int objstore_setattr(struct objstore_open_obj_info *open, struct nattr *attr,
+		     const unsigned valid)
+{
+	struct obj *obj;
+	int ret;
+
+	if (!open || !attr)
+		return -EINVAL;
+
+	obj = open->obj;
+
+	if (!obj->ops || !obj->ops->setattr)
+		return -ENOTSUP;
+
+	MXLOCK(&obj->lock);
+	ret = obj->ops->setattr(open->ver, attr, valid);
+	MXUNLOCK(&obj->lock);
+
+	return ret;
+}
+
+ssize_t objstore_read(struct objstore_open_obj_info *open, void *buf,
+		      size_t len, uint64_t offset)
+{
+	struct obj *obj;
+	ssize_t ret;
+
+	if (!open || !buf)
+		return -EINVAL;
+
+	if (len > (SIZE_MAX / 2))
+		return -EOVERFLOW;
+
+	obj = open->obj;
+
+	if (!obj->ops || !obj->ops->read)
+		return -ENOTSUP;
+
+	/* nothing to do */
+	if (!len)
+		return 0;
+
+	MXLOCK(&obj->lock);
+	if (NATTR_ISDIR(open->ver->attrs.mode))
+		/* TODO: do we need to check for other types? */
+		ret = -EISDIR;
+	else
+		ret = obj->ops->read(open->ver, buf, len, offset);
+	MXUNLOCK(&obj->lock);
+
+	return ret;
+}
+
+ssize_t objstore_write(struct objstore_open_obj_info *open, const void *buf,
+		       size_t len, uint64_t offset)
+{
+	struct obj *obj;
+	ssize_t ret;
+
+	if (!open || !buf)
+		return -EINVAL;
+
+	if (len > (SIZE_MAX / 2))
+		return -EOVERFLOW;
+
+	obj = open->obj;
+
+	if (!obj->ops || !obj->ops->write)
+		return -ENOTSUP;
+
+	/* nothing to do */
+	if (!len)
+		return 0;
+
+	MXLOCK(&obj->lock);
+	if (NATTR_ISDIR(open->ver->attrs.mode))
+		/* TODO: do we need to check for other types? */
+		ret = -EISDIR;
+	else
+		ret = obj->ops->write(open->ver, buf, len, offset);
+	MXUNLOCK(&obj->lock);
+
+	return ret;
+}
+
+int objstore_lookup_one(struct objstore_open_obj_info *diropen,
+			const char *name, const struct noid *desired,
+			struct noid *child, uint8_t *type)
+{
+	struct obj *dir;
+	int ret;
+
+	if (!diropen || !name || !desired || !child || !type)
+		return -EINVAL;
+
+	dir = diropen->obj;
+
+	if (!dir->ops || !dir->ops->lookup_one)
+		return -ENOTSUP;
+
+	*type = NDIRENT_TYPE_UNKNOWN; /* unknown by default */
+
+	MXLOCK(&dir->lock);
+	if (!NATTR_ISDIR(diropen->ver->attrs.mode))
+		ret = -ENOTDIR;
+	else
+		ret = dir->ops->lookup_one(diropen->ver, name, desired, child,
+					   type);
+	MXUNLOCK(&dir->lock);
+
+	return ret;
+}
+
+int objstore_create(struct objstore_open_obj_info *diropen, const char *name,
+		    uint16_t mode, struct noid *child)
+{
+	struct obj *dir;
+	int ret;
+
+	if (!diropen || !name || !child)
+		return -EINVAL;
+
+	dir = diropen->obj;
+
+	if (!dir->ops || !dir->ops->create)
+		return -ENOTSUP;
+
+	MXLOCK(&dir->lock);
+	if (!NATTR_ISDIR(diropen->ver->attrs.mode))
+		ret = -ENOTDIR;
+	else
+		ret = dir->ops->create(diropen->ver, name, mode, child);
+	MXUNLOCK(&dir->lock);
+
+	return ret;
+}
+
+static struct obj *getobj_in_dir(struct objver *dirver, const char *name)
+{
+	struct noid child_oid;
+	uint8_t type;
+	int ret;
+
+	ret = dirver->obj->ops->lookup_one(dirver, name, NULL, &child_oid,
+					   &type);
+	if (ret)
+		return ERR_PTR(ret);
+
+	return getobj(dirver->obj->vol, &child_oid);
+}
+
+int objstore_unlink(struct objstore_open_obj_info *diropen, const char *name)
+{
+	struct obj *dir;
+	int ret;
+
+	if (!diropen || !name)
+		return -EINVAL;
+
+	dir = diropen->obj;
+
+	if (!dir->ops || !dir->ops->unlink || !dir->ops->lookup_one)
+		return -ENOTSUP;
+
+	MXLOCK(&dir->lock);
+	if (!NATTR_ISDIR(diropen->ver->attrs.mode)) {
+		ret = -ENOTDIR;
+	} else {
+		struct obj *child;
+
+		child = getobj_in_dir(diropen->ver, name);
+		if (!IS_ERR(child)) {
+			ret = dir->ops->unlink(diropen->ver, name, child);
+
+			MXUNLOCK(&child->lock);
+			obj_putref(child);
+		} else {
+			ret = PTR_ERR(child);
+		}
+	}
+	MXUNLOCK(&dir->lock);
+
+	return ret;
+}
+
+int objstore_getdent(struct objstore_open_obj_info *diropen,
+		     const uint64_t offset, struct ndirent *child)
+{
+	struct obj *dir;
+	int ret;
+
+	if (!diropen)
+		return -EINVAL;
+
+	dir = diropen->obj;
+
+	if (!dir->ops || !dir->ops->getdent)
+		return -ENOTSUP;
+
+	MXLOCK(&dir->lock);
+	if (!NATTR_ISDIR(diropen->ver->attrs.mode))
+		ret = -ENOTDIR;
+	else
+		ret = dir->ops->getdent(diropen->ver, offset, child);
+	MXUNLOCK(&dir->lock);
+
+	return ret;
+}
--- a/src/objstore/objstore_impl.h	Wed Nov 07 18:08:52 2018 -0500
+++ b/src/objstore/objstore_impl.h	Sun Nov 11 10:21:57 2018 -0500
@@ -51,11 +51,16 @@
 /* internal object management */
 extern struct mem_cache *obj_cache;
 extern struct mem_cache *objver_cache;
+extern struct mem_cache *open_obj_cache;
 extern struct obj *allocobj(void);
 extern void freeobj(struct obj *obj);
 extern struct objver *allocobjver(void);
 extern void freeobjver(struct objver *ver);
 
+extern struct obj *getobj(struct objstore *vol, const struct noid *oid);
+extern struct objver *getver(struct objstore *vol, const struct noid *oid,
+			     const struct nvclock *clock);
+
 REFCNT_INLINE_FXNS(struct obj, obj, refcnt, freeobj, NULL);
 
 #endif
--- a/src/objstore/vol.c	Wed Nov 07 18:08:52 2018 -0500
+++ b/src/objstore/vol.c	Sun Nov 11 10:21:57 2018 -0500
@@ -28,7 +28,6 @@
 #include "objstore_impl.h"
 
 static struct mem_cache *vol_cache;
-static struct mem_cache *open_obj_cache;
 
 static LOCK_CLASS(vols_lc);
 static LOCK_CLASS(vol_lc);
@@ -237,602 +236,3 @@
 	str_putref(vol->name);
 	mem_cache_free(vol_cache, vol);
 }
-
-/*
- * Find the object with oid, if there isn't one, allocate one and add it to
- * the vol's list of objects.
- *
- * Returns obj (locked and referenced) on success, negated errno on failure.
- */
-static struct obj *__find_or_alloc(struct objstore *vol, const struct noid *oid)
-{
-	struct obj key = {
-		.oid = *oid,
-	};
-	struct obj *obj, *newobj;
-	struct rb_cookie where;
-	bool inserted;
-
-	inserted = false;
-	newobj = NULL;
-
-	for (;;) {
-		MXLOCK(&vol->lock);
-
-		/* try to find the object */
-		obj = obj_getref(rb_find(&vol->objs, &key, &where));
-
-		/* not found and this is the second attempt -> insert it */
-		if (!obj && newobj) {
-			rb_insert_here(&vol->objs, obj_getref(newobj), &where);
-			obj = newobj;
-			newobj = NULL;
-			inserted = true;
-		}
-
-		MXUNLOCK(&vol->lock);
-
-		/* found or inserted -> we're done */
-		if (obj) {
-			/* newly inserted objects are already locked */
-			if (!inserted)
-				MXLOCK(&obj->lock);
-
-			if (obj->state == OBJ_STATE_DEAD) {
-				/* this is a dead one, try again */
-				MXUNLOCK(&obj->lock);
-				obj_putref(obj);
-				continue;
-			}
-
-			if (newobj) {
-				MXUNLOCK(&newobj->lock);
-				obj_putref(newobj);
-			}
-
-			break;
-		}
-
-		/* allocate a new object */
-		newobj = allocobj();
-		if (!newobj)
-			return ERR_PTR(-ENOMEM);
-
-		MXLOCK(&newobj->lock);
-
-		newobj->oid = *oid;
-		newobj->vol = vol_getref(vol);
-
-		/* retry the search, and insert if necessary */
-	}
-
-	return obj;
-}
-
-/*
- * Given a vol and an oid, find the corresponding object structure.
- *
- * Return with the object locked and referenced.
- */
-static struct obj *getobj(struct objstore *vol, const struct noid *oid)
-{
-	struct obj *obj;
-	int ret;
-
-	obj = __find_or_alloc(vol, oid);
-	if (IS_ERR(obj)) {
-		ret = PTR_ERR(obj);
-		goto err;
-	}
-
-	switch (obj->state) {
-		case OBJ_STATE_NEW:
-			if (vol->ops && vol->ops->allocobj &&
-			    (ret = vol->ops->allocobj(obj))) {
-				/* the allocobj op failed, mark the obj dead */
-				obj->state = OBJ_STATE_DEAD;
-
-				/* remove the object from the objs list */
-				MXLOCK(&vol->lock);
-				rb_remove(&vol->objs, obj);
-				MXUNLOCK(&vol->lock);
-				goto err_obj;
-			}
-
-			obj->state = OBJ_STATE_LIVE;
-			break;
-		case OBJ_STATE_LIVE:
-			break;
-		case OBJ_STATE_DEAD:
-			ret = -EINVAL;
-			goto err_obj;
-	}
-
-	return obj;
-
-err_obj:
-	MXUNLOCK(&obj->lock);
-	obj_putref(obj);
-
-err:
-	return ERR_PTR(ret);
-}
-
-/*
- * Fetch a version from the backend, adding it to the cached versions tree.
- */
-static struct objver *__fetch_ver(struct obj *obj, const struct nvclock *clock)
-{
-	struct objver *ver;
-	int ret;
-
-	if (!obj || !obj->ops || !obj->ops->getversion)
-		return ERR_PTR(-ENOTSUP);
-
-	ver = allocobjver();
-	if (IS_ERR(ver))
-		return ver;
-
-	if (clock)
-		nvclock_copy(ver->clock, clock);
-
-	ver->obj = obj;
-
-	ret = obj->ops->getversion(ver);
-	if (ret) {
-		freeobjver(ver);
-		return ERR_PTR(ret);
-	}
-
-	rb_add(&obj->versions, ver);
-
-	return ver;
-}
-
-/*
- * Given a vol, an oid, and a vector clock, find the corresponding object
- * version structure.
- *
- * Return the found version, with the object referenced and locked.
- */
-static struct objver *getver(struct objstore *vol, const struct noid *oid,
-			     const struct nvclock *clock)
-{
-	struct objver *ver;
-	struct obj *obj;
-
-	/*
-	 * First, find the object based on the oid.
-	 */
-	obj = getobj(vol, oid);
-	if (IS_ERR(obj))
-		return ERR_CAST(obj);
-
-	/*
-	 * Second, find the right version of the object.
-	 */
-	if (obj->nversions == 0) {
-		/*
-		 * There are no versions at all.
-		 */
-		ver = ERR_PTR(-ENOENT);
-	} else if (!nvclock_is_null(clock)) {
-		/*
-		 * We are looking for a specific version.  Since the
-		 * obj->objects red-black tree is only a cache, it may not
-		 * contain it.  If it doesn't we need to call into the
-		 * backend to fetch it.
-		 */
-		struct objver key = {
-			.clock = (struct nvclock *) clock,
-		};
-
-		/* check the cache */
-		ver = rb_find(&obj->versions, &key, NULL);
-		if (ver)
-			return ver;
-
-		/* try to fetch the version from the backend */
-		ver = __fetch_ver(obj, clock);
-		if (!IS_ERR(ver))
-			return ver;
-	} else if (obj->nversions == 1) {
-		/*
-		 * We are *not* looking for a specific version, and there is
-		 * only one version available, so we just return that.  This
-		 * is slightly complicated by the fact that the
-		 * obj->versions red-black tree is only a cache, and
-		 * therefore it might be empty.  If it is, we need to call
-		 * into the backend to fetch it.
-		 */
-		ASSERT3U(rb_numnodes(&obj->versions), <=, 1);
-
-		ver = rb_first(&obj->versions);
-		if (ver)
-			return ver;
-
-		/* try to fetch the version from the backend */
-		ver = __fetch_ver(obj, NULL);
-		if (!IS_ERR(ver))
-			return ver;
-	} else {
-		/*
-		 * We are *not* looking for a specific version, and there
-		 * are two or more versions.
-		 */
-		ver = ERR_PTR(-ENOTUNIQ);
-	}
-
-	MXUNLOCK(&obj->lock);
-	obj_putref(obj);
-	return ver;
-}
-
-int objstore_getroot(struct objstore *vol, struct noid *root)
-{
-	int ret;
-
-	if (!vol || !root)
-		return -EINVAL;
-
-	if (vol->ops && vol->ops->getroot)
-		ret = vol->ops->getroot(vol, root);
-	else
-		ret = -ENOTSUP;
-
-	return ret;
-}
-
-struct objstore_open_obj_info *objstore_open(struct objstore *vol,
-					     const struct noid *oid,
-					     const struct nvclock *clock)
-{
-	struct objstore_open_obj_info *open;
-	struct objver *objver;
-	struct obj *obj;
-	bool qualified;
-	int ret;
-
-	if (!vol || !oid || !clock)
-		return ERR_PTR(-EINVAL);
-
-	objver = getver(vol, oid, clock);
-	if (IS_ERR(objver))
-		return ERR_CAST(objver);
-
-	qualified = nvclock_is_null(clock);
-
-	obj = objver->obj;
-	open = objver->open[qualified];
-
-	if (!open) {
-		/* first open */
-		open = mem_cache_alloc(open_obj_cache);
-		if (!open) {
-			ret = -ENOMEM;
-			goto err;
-		}
-
-		if (obj->ops && obj->ops->open)
-			ret = obj->ops->open(objver);
-		else
-			ret = 0;
-
-		if (ret)
-			goto err_free;
-
-		open->obj = obj;
-		open->ver = objver;
-		open->open_count = 0; /* will be incremented below */
-
-		objver->open[qualified] = open;
-	}
-
-	ASSERT(objver->open[qualified]);
-
-	open->open_count++;
-
-	/*
-	 * The object associated with 'objver' is locked and held.  We
-	 * unlock it, but keep the hold on it.  Then we return the pointer
-	 * to the objver to the caller.  Then, later on when the caller
-	 * calls the close function, we release the object's hold.
-	 *
-	 * While the object version is open, we maintain this reference hold
-	 * keeping the object and the version structures in memory.  Even if
-	 * all links are removed, we keep the structures around and let
-	 * operations use them.  This mimics the POSIX file system semantics
-	 * well.
-	 */
-
-	MXUNLOCK(&obj->lock);
-
-	return open;
-
-err_free:
-	mem_cache_free(open_obj_cache, open);
-
-err:
-	MXUNLOCK(&obj->lock);
-
-	obj_putref(obj);
-
-	return ERR_PTR(ret);
-}
-
-int objstore_close(struct objstore_open_obj_info *open)
-{
-	struct obj *obj;
-	bool putref;
-	int ret;
-
-	if (!open)
-		return -EINVAL;
-
-	obj = open->obj;
-
-	/*
-	 * Yes, we are dereferencing the pointer that the caller supplied.
-	 * Yes, this is safe.  The only reason this is safe is because we
-	 * rely on the caller to keep the pointer safe.  Specifically, we
-	 * assume that the caller didn't transmit the pointer over the
-	 * network.
-	 */
-
-	MXLOCK(&obj->lock);
-	open->open_count--;
-	putref = true;
-
-	if (open->open_count) {
-		ret = 0;
-		putref = false;
-	} else if (obj->ops && obj->ops->close) {
-		/* last close */
-		ret = obj->ops->close(open->ver);
-	} else {
-		ret = 0;
-	}
-
-	if (putref && !ret)
-		open->ver->open[open == open->ver->open[1]] = NULL;
-	else if (ret)
-		open->open_count++; /* undo earlier decrement */
-	MXUNLOCK(&obj->lock);
-
-	/* release the reference obtained in objstore_open() */
-	if (putref && !ret) {
-		mem_cache_free(open_obj_cache, open);
-		obj_putref(obj);
-	}
-
-	return ret;
-}
-
-int objstore_getattr(struct objstore_open_obj_info *open, struct nattr *attr)
-{
-	struct obj *obj;
-	int ret;
-
-	if (!open || !attr)
-		return -EINVAL;
-
-	obj = open->obj;
-
-	if (!obj->ops || !obj->ops->getattr)
-		return -ENOTSUP;
-
-	MXLOCK(&obj->lock);
-	ret = obj->ops->getattr(open->ver, attr);
-	MXUNLOCK(&obj->lock);
-
-	return ret;
-}
-
-int objstore_setattr(struct objstore_open_obj_info *open, struct nattr *attr,
-		     const unsigned valid)
-{
-	struct obj *obj;
-	int ret;
-
-	if (!open || !attr)
-		return -EINVAL;
-
-	obj = open->obj;
-
-	if (!obj->ops || !obj->ops->setattr)
-		return -ENOTSUP;
-
-	MXLOCK(&obj->lock);
-	ret = obj->ops->setattr(open->ver, attr, valid);
-	MXUNLOCK(&obj->lock);
-
-	return ret;
-}
-
-ssize_t objstore_read(struct objstore_open_obj_info *open, void *buf,
-		      size_t len, uint64_t offset)
-{
-	struct obj *obj;
-	ssize_t ret;
-
-	if (!open || !buf)
-		return -EINVAL;
-
-	if (len > (SIZE_MAX / 2))
-		return -EOVERFLOW;
-
-	obj = open->obj;
-
-	if (!obj->ops || !obj->ops->read)
-		return -ENOTSUP;
-
-	/* nothing to do */
-	if (!len)
-		return 0;
-
-	MXLOCK(&obj->lock);
-	if (NATTR_ISDIR(open->ver->attrs.mode))
-		/* TODO: do we need to check for other types? */
-		ret = -EISDIR;
-	else
-		ret = obj->ops->read(open->ver, buf, len, offset);
-	MXUNLOCK(&obj->lock);
-
-	return ret;
-}
-
-ssize_t objstore_write(struct objstore_open_obj_info *open, const void *buf,
-		       size_t len, uint64_t offset)
-{
-	struct obj *obj;
-	ssize_t ret;
-
-	if (!open || !buf)
-		return -EINVAL;
-
-	if (len > (SIZE_MAX / 2))
-		return -EOVERFLOW;
-
-	obj = open->obj;
-
-	if (!obj->ops || !obj->ops->write)
-		return -ENOTSUP;
-
-	/* nothing to do */
-	if (!len)
-		return 0;
-
-	MXLOCK(&obj->lock);
-	if (NATTR_ISDIR(open->ver->attrs.mode))
-		/* TODO: do we need to check for other types? */
-		ret = -EISDIR;
-	else
-		ret = obj->ops->write(open->ver, buf, len, offset);
-	MXUNLOCK(&obj->lock);
-
-	return ret;
-}
-
-int objstore_lookup_one(struct objstore_open_obj_info *diropen,
-			const char *name, const struct noid *desired,
-			struct noid *child, uint8_t *type)
-{
-	struct obj *dir;
-	int ret;
-
-	if (!diropen || !name || !desired || !child || !type)
-		return -EINVAL;
-
-	dir = diropen->obj;
-
-	if (!dir->ops || !dir->ops->lookup_one)
-		return -ENOTSUP;
-
-	*type = NDIRENT_TYPE_UNKNOWN; /* unknown by default */
-
-	MXLOCK(&dir->lock);
-	if (!NATTR_ISDIR(diropen->ver->attrs.mode))
-		ret = -ENOTDIR;
-	else
-		ret = dir->ops->lookup_one(diropen->ver, name, desired, child,
-					   type);
-	MXUNLOCK(&dir->lock);
-
-	return ret;
-}
-
-int objstore_create(struct objstore_open_obj_info *diropen, const char *name,
-		    uint16_t mode, struct noid *child)
-{
-	struct obj *dir;
-	int ret;
-
-	if (!diropen || !name || !child)
-		return -EINVAL;
-
-	dir = diropen->obj;
-
-	if (!dir->ops || !dir->ops->create)
-		return -ENOTSUP;
-
-	MXLOCK(&dir->lock);
-	if (!NATTR_ISDIR(diropen->ver->attrs.mode))
-		ret = -ENOTDIR;
-	else
-		ret = dir->ops->create(diropen->ver, name, mode, child);
-	MXUNLOCK(&dir->lock);
-
-	return ret;
-}
-
-static struct obj *getobj_in_dir(struct objver *dirver, const char *name)
-{
-	struct noid child_oid;
-	uint8_t type;
-	int ret;
-
-	ret = dirver->obj->ops->lookup_one(dirver, name, NULL, &child_oid,
-					   &type);
-	if (ret)
-		return ERR_PTR(ret);
-
-	return getobj(dirver->obj->vol, &child_oid);
-}
-
-int objstore_unlink(struct objstore_open_obj_info *diropen, const char *name)
-{
-	struct obj *dir;
-	int ret;
-
-	if (!diropen || !name)
-		return -EINVAL;
-
-	dir = diropen->obj;
-
-	if (!dir->ops || !dir->ops->unlink || !dir->ops->lookup_one)
-		return -ENOTSUP;
-
-	MXLOCK(&dir->lock);
-	if (!NATTR_ISDIR(diropen->ver->attrs.mode)) {
-		ret = -ENOTDIR;
-	} else {
-		struct obj *child;
-
-		child = getobj_in_dir(diropen->ver, name);
-		if (!IS_ERR(child)) {
-			ret = dir->ops->unlink(diropen->ver, name, child);
-
-			MXUNLOCK(&child->lock);
-			obj_putref(child);
-		} else {
-			ret = PTR_ERR(child);
-		}
-	}
-	MXUNLOCK(&dir->lock);
-
-	return ret;
-}
-
-int objstore_getdent(struct objstore_open_obj_info *diropen,
-		     const uint64_t offset, struct ndirent *child)
-{
-	struct obj *dir;
-	int ret;
-
-	if (!diropen)
-		return -EINVAL;
-
-	dir = diropen->obj;
-
-	if (!dir->ops || !dir->ops->getdent)
-		return -ENOTSUP;
-
-	MXLOCK(&dir->lock);
-	if (!NATTR_ISDIR(diropen->ver->attrs.mode))
-		ret = -ENOTDIR;
-	else
-		ret = dir->ops->getdent(diropen->ver, offset, child);
-	MXUNLOCK(&dir->lock);
-
-	return ret;
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/objstore/vol_ops.c	Sun Nov 11 10:21:57 2018 -0500
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2015-2018 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include "objstore_impl.h"
+
+int objstore_getroot(struct objstore *vol, struct noid *root)
+{
+	int ret;
+
+	if (!vol || !root)
+		return -EINVAL;
+
+	if (vol->ops && vol->ops->getroot)
+		ret = vol->ops->getroot(vol, root);
+	else
+		ret = -ENOTSUP;
+
+	return ret;
+}