view src/objstore/cache.c @ 1247:77fd01b82fd7

objstore: direct page lookup fills to the correct objver mid-txn When we are in the middle of a transaction and get a page_lookup with a PG_FILL, we need to (1) use the correct objver, and (2) zero-fill as necessary. Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
date Fri, 16 Dec 2022 19:48:03 -0500
parents b0852eac72a9
children b3e0798761b7
line wrap: on
line source

/*
 * Copyright (c) 2020,2022 Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

#include <jeffpc/list.h>
#include <jeffpc/rbtree.h>

#include "objstore_impl.h"

static size_t npages;
static struct page *pages;

/* internal lists */
static struct list free_pages;	/* fully allocated */
static struct list unallocated_pages; /* missing ->ptr */

int page_cache_init(size_t max_size)
{
	size_t i;

	npages = MAX(max_size / PAGE_SIZE, 1);

	pages = calloc(npages, sizeof(struct page));
	if (!pages)
		return -ENOMEM;

	list_create(&free_pages, sizeof(struct page),
		    offsetof(struct page, pages));
	list_create(&unallocated_pages, sizeof(struct page),
		    offsetof(struct page, pages));

	for (i = 0; i < npages; i++)
		list_insert_tail(&unallocated_pages, &pages[i]);

	return 0;
}

void page_cache_free(void)
{
	size_t i;

	for (i = 0; i < npages; i++) {
		ASSERT(!pages[i].inuse);

		free(pages[i].ptr);
	}

	free(pages);
}

static int page_cmp(const void *va, const void *vb)
{
	const struct page *a = va;
	const struct page *b = vb;

	if (a->pgno < b->pgno)
		return -1;
	if (a->pgno > b->pgno)
		return +1;
	return 0;
}

void page_cache_init_objver(struct objver *ver)
{
	rb_create(&ver->pages, page_cmp, sizeof(struct page),
		  offsetof(struct page, node));
}

void page_cache_deinit_objver(struct objver *ver)
{
	ASSERT0(!rb_numnodes(&ver->pages));

	rb_destroy(&ver->pages);
}

static struct page *get_free_page(void)
{
	struct page *page;

	page = list_head(&free_pages);
	if (page) {
		list_remove(&free_pages, page);
		goto out;
	}

	page = list_head(&unallocated_pages);
	if (page) {
		page->ptr = malloc(PAGE_SIZE);
		if (page->ptr) {
			list_remove(&unallocated_pages, page);
			goto out;
		}
	}

	panic("Failed to allocate a new page! Out of memory?");

out:
	VERIFY3P(page->ptr, !=, NULL);

	return page;
}

static void free_page(struct page *page)
{
	if (page->ptr)
		list_insert_head(&free_pages, page);
	else
		list_insert_head(&unallocated_pages, page);
}

struct page *page_lookup(struct objver *ver, uint64_t pgno, int flags)
{
	struct rb_cookie cookie;
	struct page key = {
		.pgno = pgno,
	};
	struct page *page;

	page = rb_find(&ver->pages, &key, &cookie);
	if (page) {
		page_lock(page);

		return page;
	}

	if (!(flags & PG_ALLOC))
		return ERR_PTR(-ENOENT);

	page = get_free_page();
	if (IS_ERR(page))
		return page;

	page->objver = ver;
	page->pgno = pgno;
	VERIFY(page->ptr);
	page->inuse = true;
	page->filled = false;
	page->dirty = false;

	if (flags & PG_FILL) {
		/*
		 * We need to fill the page in one of two different ways
		 * depending on whether the page comes before or ofter the
		 * offset of the shortest truncation (or EOF):
		 *
		 *   (1) all pages beyond the shortest truncation must be
		 *       zero filled
		 *
		 *   (2) all pages before the shortest truncation must be
		 *       read in from disk
		 *
		 * Note that if we tried to look up a page that was modified
		 * by a previous write in the same transaction, that page is
		 * still in memory and we would have returned it earlier in
		 * this function.
		 */

		struct objver *readver;
		uint64_t size;
		int ret;

		if (ver->txn.txn) {
			/* this objver belongs to a txn */
			readver = ver->txn.prev_ver ? ver->txn.prev_ver : ver;
			size = ver->txn.min_data_size;
		} else {
			/* non-txn lookup */
			readver = ver;
			size = ver->attrs.size;
		}

		if (pgno < (p2roundup(size, PAGE_SIZE) / PAGE_SIZE)) {
			/* existed before */
			ret = readver->obj->ops->read_page(readver, page->ptr,
							   pgno);
			if (ret) {
				free_page(page);
				return ERR_PTR(ret);
			}
		} else {
			/* completely new */
			memset(page->ptr, 0, PAGE_SIZE);
		}

		page->filled = true;
	}

	VERIFY3P(rb_insert_here(&ver->pages, page, &cookie), ==, NULL);

	return page;
}

void page_inval_range(struct objver *ver, uint64_t pgno, size_t pgcnt)
{
	const uint64_t first_pgno = pgno;
	const uint64_t last_pgno = pgno + pgcnt - 1;
	struct rb_cookie cookie;
	struct page key = {
		.pgno = first_pgno,
	};
	struct page *page;

	if (!pgcnt)
		return;

	page = rb_find(&ver->pages, &key, &cookie);
	if (!page)
		page = rb_nearest_gt(&ver->pages, &cookie);

	for (;;) {
		struct page *next;

		if (!page || (page->pgno > last_pgno))
			break;

		ASSERT(page->inuse);
		ASSERT(!page->dirty); /* catch data loss */
		ASSERT(page->ptr);

		next = rb_next(&ver->pages, page);

		rb_remove(&ver->pages, page);

		page->inuse = false;

		list_insert_head(&free_pages, page);

		page = next;
	}
}