Mercurial > nomad > experimental
view src/objstore/cache.c @ 1247:77fd01b82fd7
objstore: direct page lookup fills to the correct objver mid-txn
When we are in the middle of a transaction and get a page_lookup with a
PG_FILL, we need to (1) use the correct objver, and (2) zero-fill as
necessary.
Signed-off-by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
author | Josef 'Jeff' Sipek <jeffpc@josefsipek.net> |
---|---|
date | Fri, 16 Dec 2022 19:48:03 -0500 |
parents | b0852eac72a9 |
children | b3e0798761b7 |
line wrap: on
line source
/* * Copyright (c) 2020,2022 Josef 'Jeff' Sipek <jeffpc@josefsipek.net> * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ #include <jeffpc/list.h> #include <jeffpc/rbtree.h> #include "objstore_impl.h" static size_t npages; static struct page *pages; /* internal lists */ static struct list free_pages; /* fully allocated */ static struct list unallocated_pages; /* missing ->ptr */ int page_cache_init(size_t max_size) { size_t i; npages = MAX(max_size / PAGE_SIZE, 1); pages = calloc(npages, sizeof(struct page)); if (!pages) return -ENOMEM; list_create(&free_pages, sizeof(struct page), offsetof(struct page, pages)); list_create(&unallocated_pages, sizeof(struct page), offsetof(struct page, pages)); for (i = 0; i < npages; i++) list_insert_tail(&unallocated_pages, &pages[i]); return 0; } void page_cache_free(void) { size_t i; for (i = 0; i < npages; i++) { ASSERT(!pages[i].inuse); free(pages[i].ptr); } free(pages); } static int page_cmp(const void *va, const void *vb) { const struct page *a = va; const struct page *b = vb; if (a->pgno < b->pgno) return -1; if (a->pgno > b->pgno) return +1; return 0; } void page_cache_init_objver(struct objver *ver) { rb_create(&ver->pages, page_cmp, sizeof(struct page), offsetof(struct page, node)); } void page_cache_deinit_objver(struct objver *ver) { ASSERT0(!rb_numnodes(&ver->pages)); rb_destroy(&ver->pages); } static struct page *get_free_page(void) { struct page *page; page = list_head(&free_pages); if (page) { list_remove(&free_pages, page); goto out; } page = list_head(&unallocated_pages); if (page) { page->ptr = malloc(PAGE_SIZE); if (page->ptr) { list_remove(&unallocated_pages, page); goto out; } } panic("Failed to allocate a new page! Out of memory?"); out: VERIFY3P(page->ptr, !=, NULL); return page; } static void free_page(struct page *page) { if (page->ptr) list_insert_head(&free_pages, page); else list_insert_head(&unallocated_pages, page); } struct page *page_lookup(struct objver *ver, uint64_t pgno, int flags) { struct rb_cookie cookie; struct page key = { .pgno = pgno, }; struct page *page; page = rb_find(&ver->pages, &key, &cookie); if (page) { page_lock(page); return page; } if (!(flags & PG_ALLOC)) return ERR_PTR(-ENOENT); page = get_free_page(); if (IS_ERR(page)) return page; page->objver = ver; page->pgno = pgno; VERIFY(page->ptr); page->inuse = true; page->filled = false; page->dirty = false; if (flags & PG_FILL) { /* * We need to fill the page in one of two different ways * depending on whether the page comes before or ofter the * offset of the shortest truncation (or EOF): * * (1) all pages beyond the shortest truncation must be * zero filled * * (2) all pages before the shortest truncation must be * read in from disk * * Note that if we tried to look up a page that was modified * by a previous write in the same transaction, that page is * still in memory and we would have returned it earlier in * this function. */ struct objver *readver; uint64_t size; int ret; if (ver->txn.txn) { /* this objver belongs to a txn */ readver = ver->txn.prev_ver ? ver->txn.prev_ver : ver; size = ver->txn.min_data_size; } else { /* non-txn lookup */ readver = ver; size = ver->attrs.size; } if (pgno < (p2roundup(size, PAGE_SIZE) / PAGE_SIZE)) { /* existed before */ ret = readver->obj->ops->read_page(readver, page->ptr, pgno); if (ret) { free_page(page); return ERR_PTR(ret); } } else { /* completely new */ memset(page->ptr, 0, PAGE_SIZE); } page->filled = true; } VERIFY3P(rb_insert_here(&ver->pages, page, &cookie), ==, NULL); return page; } void page_inval_range(struct objver *ver, uint64_t pgno, size_t pgcnt) { const uint64_t first_pgno = pgno; const uint64_t last_pgno = pgno + pgcnt - 1; struct rb_cookie cookie; struct page key = { .pgno = first_pgno, }; struct page *page; if (!pgcnt) return; page = rb_find(&ver->pages, &key, &cookie); if (!page) page = rb_nearest_gt(&ver->pages, &cookie); for (;;) { struct page *next; if (!page || (page->pgno > last_pgno)) break; ASSERT(page->inuse); ASSERT(!page->dirty); /* catch data loss */ ASSERT(page->ptr); next = rb_next(&ver->pages, page); rb_remove(&ver->pages, page); page->inuse = false; list_insert_head(&free_pages, page); page = next; } }