shithub: neoventi

ref: f31b4ce3b34f7191f7cfdae7b3d88e8e27e6b179
dir: /disk.c/

View raw version
#include <u.h>
#include <libc.h>
#include <bio.h>
#include "neoventi.h"

VtArena *arenas = nil;
u32int numarenas = 0;

struct {
	RWLock;
	u32int blocksize;
	u32int buckets;
	VtISect *sects;
	int nsects;
	u32int div;
	u32int namap;
	MapEntry *amap;
	// The active arena, which is currently being appended to.
	VtArena *arena;
} index;

int
isectforbucket(u32int buck)
{
	int r, l, m;

	l = 1;
	r = index.nsects - 1;
	while(l <= r){
		m = (r + l) >> 1;
		if(index.sects[m].start <= buck)
			l = m + 1;
		else
			r = m - 1;
	}
	return l-1;
}

static int
bucketlookup(u8int *ibuf, u8int *score, u16int *entry)
{
	u16int nb = U16GET(ibuf);
	ibuf += 6;
	for(*entry = 0; *entry <= nb; *entry += 1){
		if(memcmp(ibuf, score, 20) == 0)
			return 1;
		ibuf += IEntrySize;
	}
	return 0;
}

static u64int
aindexfromaddr(u64int addr)
{
	// TODO: binary search or some such
	u64int a;
	u64int start, stop;
	for(a = 0; a < numarenas; a += 1){
		start = arenas[a].base - arenas[a].blocksize;
		stop = start + arenas[a].size + 2*arenas[a].blocksize;
		if(addr >= start && addr < stop)
			return a;
	}
	werrstr("internal corruption: arena not found for arenaindex");
	abort();
}

int
vtwriteindexclump(uchar *score, u64int offset, u16int len)
{
	u32int bucket;
	u32int t32;
	u8int blocks;
	uchar *buf;
	VtISect *s_sect;
	u32int magic;
	bucket = U32GET(score) / index.div;
	u16int n, off;
	s_sect = &index.sects[isectforbucket(bucket)];
	bucket -= s_sect->start;
	if(!cachelookup((char**)&buf, s_sect->cacheindex, bucket)){
		if(pread(s_sect->fd, (char*)buf, s_sect->blocksize, s_sect->blockbase + (bucket << s_sect->blocklog)) != s_sect->blocksize){
			cacheunlock(s_sect->cacheindex, bucket);
			werrstr("Failed to read bucket");
			return 0;
		}
	}
	n = U16GET(buf);
	fprint(2, "appending to bucket %d, key %08x, n %d\n", bucket, s_sect->cacheindex, n);
	if(s_sect->bucketmagic){
		magic = U32GET(buf+2);
		if(magic == 0){
			// Uninitialized!
			U32PUT(buf+2, s_sect->bucketmagic);
			n = 0;
		}
	}
	off = 6 + n*IEntrySize;
	if(off + 38 > s_sect->blocksize)
		sysfatal("index bucket full? off %d, blocksize %d", off, s_sect->blocksize);
	memcpy(buf+off, score, 20);
	U64PUT(buf+off+26, offset, t32);
	U16PUT(buf+off+34, len);
	blocks = (len + 38 + (1<<ABlockLog) - 1) >> ABlockLog;
	buf[off+37] = blocks;
	n += 1;
	U16PUT(buf, n);
	if(pwrite(s_sect->fd, (char*)buf, s_sect->blocksize, s_sect->blockbase + (bucket << s_sect->blocklog)) != s_sect->blocksize){
		cacheunlock(s_sect->cacheindex, bucket);
		werrstr("Failed to writeback bucket: %r");
		return 0;
	}
	cacheunlock(s_sect->cacheindex, bucket);
	return 1;
}

int
vtreadlookup(u8int *score, VtAddress *addr)
{
	u8int *buf;
	u16int entry;
	u64int aindex;
	u32int bucket = U32GET(score) / index.div;
	VtISect *s_sect = &index.sects[isectforbucket(bucket)];
	bucket -= s_sect->start;
	if(!cachelookup((char**)&buf, s_sect->cacheindex, bucket)){
		if(pread(s_sect->fd, (char*)buf, s_sect->blocksize, s_sect->blockbase + (bucket << s_sect->blocklog)) != s_sect->blocksize){
			cacheunlock(s_sect->cacheindex, bucket);
			werrstr("Failed to read bucket");
			return 0;
		}
	}
	if(s_sect->bucketmagic && U32GET(buf + 2) != s_sect->bucketmagic){
		if(U32GET(buf+2) != 0)
			sysfatal("index is corrupt: invalid bucket magic: sect %ux, buck %ux", s_sect->bucketmagic, U32GET(buf + 2));
		else{
			cacheunlock(s_sect->cacheindex, bucket);
			// invoking code should not care about the
			// distinction between "bucket is empty" and
			// "bucket has data but not what we want."
			werrstr("entry not found in bucket");
			return 0;
		}
	}
	if(!bucketlookup(buf, score, &entry)){
		cacheunlock(s_sect->cacheindex, bucket);
		werrstr("entry not found in bucket");
		return 0;
	}
	addr->offset = U64GET((buf + 6 + (entry * IEntrySize) + 26));
	addr->size = U16GET((buf + 6 + (entry * IEntrySize) + 34));
	addr->blocks = buf[6 + (entry*IEntrySize) + 37];
	cacheunlock(s_sect->cacheindex, bucket);
	aindex = aindexfromaddr(addr->offset);
	addr->s_arena = &arenas[aindex];
	addr->offset -= (addr->s_arena->base - addr->s_arena->blocksize);
	fprint(2, "entry found in bucket: arena %d, offset %d\n", aindex, addr->offset);
	return 1;
}

static u64int
partlen(int fd, char *path)
{
	Dir *dir = dirfstat(fd);
	u64int len;
	if(dir == nil)
		sysfatal("Cannot stat partition %s", path);
	if(dir->length == 0)
		sysfatal("can't determine size of partition %s", path);
	len = dir->length;
	free(dir);
	return len;
}

// Reads one block from disk into the cache, returning a pointer into the
// cache.
// If the data is already in cache, it will not be read again.
// Caller is responsible for calling cachedone(arena->fd, blockindex);
char*
vtarenareadblock(VtArena *arena, u32int blockindex)
{
	char *buf;
	if(arena->blocksize != 8192)
		sysfatal("invalid blocksize %d\n", arena->blocksize);
	if(!cachelookup(&buf, arena->index, blockindex)){
		if(pread(arena->fd, buf, arena->blocksize, arena->base+(blockindex*arena->blocksize)) != arena->blocksize){
			werrstr("Failed to read: %r");
			return nil;
		}
	}
	return buf;
}

u16int
vtarenaread(VtArena *arena, u64int addr, uchar *dbuf, u16int reqsize)
{
	u16int off, n, m, size;
	u32int blockindex;
	char *buf;
	size = reqsize;
	off = addr & (arena->blocksize-1);
	addr -= off;
	n = 0;
	while(n < size){
		blockindex = addr/arena->blocksize;
		buf = vtarenareadblock(arena, blockindex);
		if(buf == nil)
			// TODO: I/O error should not crash the disk layer.
			// Might be good to be able to recover cached data in this case?
			sysfatal("I/O error ☹");
		m = arena->blocksize - off;
		if(m > size - n)
			m = size - n;
		memcpy(&dbuf[n], &buf[off], m);
		cacheunlock(arena->index, blockindex);
		n += m;
		off = 0;
		addr += arena->blocksize;
	}
	return size;
}

int
readclump(uchar *dst, VtAddress addr)
{
	u16int size = addr.blocks<<ABlockLog;
	uchar buf[0x10000];
	if(!vtarenaread(addr.s_arena, addr.offset, buf, size)){
		werrstr("arena read failed: %r");
		return 0;
	}
	size = U16GET(buf+7);
	if(buf[29] == 2){
		if(unwhack(dst, size, buf+38, U16GET(buf+5)) != size)
			sysfatal("decompression failed: %r. block index %llx", addr.offset/addr.s_arena->blocksize);
	} else if(buf[29] == 1)
		memcpy(dst, buf+38, size);
	return 1;
}

static void
advancearena(void)
{
	index.arena->arenastats.sealed = 1;
	if(index.arena->index+1 == numarenas)
		sysfatal("TODO last arena, %d, full, at size %d!", index.arena->index, index.arena->arenastats.used);
	index.arena = &arenas[index.arena->index+1];
	if(index.arena->block != 0 || index.arena->offset != 0 || index.arena->blockremain != index.arena->blocksize || index.arena->buf != nil)
		sysfatal("TODO handle writing to venti which previously experienced nonlinear writes from other software?");
}

static void
blockflush(void)
{
	if(index.arena->buf == nil)
		return;
	if(pwrite(index.arena->fd, index.arena->buf, index.arena->blocksize, index.arena->base+(index.arena->block*index.arena->blocksize)) != index.arena->blocksize){
		sysfatal("flush failed: %r");
	}
	index.arena->buf = nil;
	cacheunlock(index.arena->index, index.arena->block);
}

// Advance the block and, if needed, the arena.
// It is guaranteed that, after invoking this function, the live block will
// be completely empty (offset == 0, blockremain == blocksize).
static void
blockadvance(void)
{
	blockflush();
	index.arena->block += 1;
	index.arena->arenastats.used += index.arena->blockremain;
	index.arena->offset = 0;
	index.arena->blockremain = index.arena->blocksize;
	if(index.arena->block * index.arena->blocksize >= index.arena->size)
		advancearena();
}

// If the current block doesn't have enough space, skip to the next one.
static void
requireminimum(u16int space)
{
	if(index.arena->blockremain < space)
		blockadvance();
}

static void
arenarequireminimum(u16int space)
{
	if(index.arena->arenastats.used + space > index.arena->size)
		advancearena();
}

// Grabs the current block for writing into the cache, reading any existing contents
// if applicable.
static void
getblock(void)
{
	// TODO rebuild on vtarenareadblock
	if(!cachelookup(&index.arena->buf, index.arena->index, index.arena->block)){
		// Don't read when there's no data _to_ read; saves on unnecessary cache fills.
		if(index.arena->offset == 0)
			return;
		if(pread(index.arena->fd, index.arena->buf, index.arena->blocksize, index.arena->base+(index.arena->block*index.arena->blocksize)) != index.arena->blocksize){
			werrstr("read of live block failed");
			sysfatal("TODO error handling in write path: %r");
		}
	}
}

static void
clumpwrite(char *wptr, u16int len, uchar *score)
{
	U32PUT(wptr, index.arena->clumpmagic);
	// Fake a type so venti doesn't complain if someone tries using oldventi to read.
	wptr[4] = 1;
	U16PUT(wptr+5, len);
	U16PUT(wptr+7, len);
	memcpy(wptr+9, score, 20);
	wptr[29] = 1;
}

static void
blockupdate(u16int n)
{
	index.arena->offset += n;
	index.arena->blockremain -= n;
}

int
vtwritearenaclump(char *buf, u16int len, u64int *offset, uchar *score)
{
	/* We've got a 38-byte clump header to write, and the contents
	 * of the clump. We'll be splitting it across blocks. First, find
	 * the block for the clump header; if the current block has less
	 * than 38 bytes of room, move to the next block.
	 * Instead of worrying about address on disk, we'll mostly operate
	 * off of blocks. */
	char *wptr;
	u16int n = 0, nn;
	if(index.arena->blocksize != 8192)
		sysfatal("invalid blocksize %d\n", index.arena->blocksize);
	arenarequireminimum(38+len);
	requireminimum(38);

	*offset = index.arena->base + (index.arena->block)*index.arena->blocksize - index.arena->blocksize + index.arena->offset;
	getblock();
	wptr = index.arena->buf + index.arena->offset;
	clumpwrite(wptr, len, score);
	blockupdate(38);
	wptr += 38;
	while(len > n){
		nn = len - n;
		if(nn > index.arena->blockremain)
			nn = index.arena->blockremain;
		fprint(2, "WPTR clump %d block %d offset %d, n %d nn %d, block must be at least %d\n", index.arena->arenastats.clumps, index.arena->block, index.arena->offset, n, nn, index.arena->offset+nn);
		// Use however much space is available in the block, then move to the next
		memcpy(wptr, buf + n, nn);
		n += nn;
		blockupdate(nn);
		if(len != n){
			blockadvance();
			getblock();
			wptr = index.arena->buf;
		}
	}
	index.arena->arenastats.uncsize += len;
	index.arena->arenastats.used += 38+len;
	index.arena->arenastats.clumps += 1;
	blockflush();
	return vtarenawb(index.arena);
}

int
vtwriteclump(char *buf, u16int len, uchar *score)
{
	u64int offset;
	u8int blocks;
	// - Lock index and arena
	// - Write data to arena
	// - Write metadata to arena
	// - Add entry to index
	if(!vtwritearenaclump(buf, len, &offset, score)){
		wunlock(&index);
		return 0;
	}
	if(!vtwriteindexclump(score, offset, len)){
		werrstr("failed to write clump to index: %r");
		return 0;
	}
	return 1;
}

static int
parsemap(Biobufhdr *b, MapEntry **map, u32int *nmap)
{
	u32int i;
	char *s;
	char *fields[4];
	if(!Brdu32(b, nmap))
		return 0;
	if(*nmap > MaxAMap)
		return 0;
	*map = realloc(*map, *nmap * sizeof(MapEntry));
	for(i = 0; i < *nmap; i += 1){
		s = Brdline(b, '\n');
		if(getfields(s, fields, 3, 0, "\t") != 3)
			sysfatal("corrupt index map: %s", s);
		memcpy((*map)[i].name, fields[0], NameSize);
		(*map)[i].name[NameSize-1] = 0;
		if(stru64int(fields[1], &(*map)[i].start) < 0)
			sysfatal("corrupt index map: %s", fields[1]);
		if(stru64int(fields[2], &(*map)[i].stop) < 0)
			sysfatal("corrupt index map: %s", fields[2]);
	}
	return 1;
}

static void
loadarena(VtArena *arena)
{
	u32int version, magic;
	char *buf = malloc(arena->blocksize);
	u8int *p = (void*)buf;
	if(pread(arena->fd, buf, arena->blocksize, arena->base + arena->size) != arena->blocksize)
		sysfatal("failed to pread");
	magic = U32GET(p);
	version = U32GET(p + 4);
	p += 8+NameSize;
	arena->indexstats.clumps = U32GET(p);
	arena->indexstats.cclumps = U32GET(p+4);
	arena->clumpmagic = U32GET(p+16);
	arena->indexstats.used = U64GET(p+20);
	arena->indexstats.uncsize = U64GET(p+28);
	arena->indexstats.sealed = U8GET(p+36);
	if(U8GET(p+37) == 1){
		arena->arenastats.clumps = U32GET(p+38);
		arena->arenastats.cclumps = U32GET(p+42);
		arena->arenastats.used = U64GET(p+46);
		arena->arenastats.uncsize = U64GET(p+54);
		// OR with indexstats: a bug from 2008 could leave arenastats out of date,
		// and venti worked around it for ages so now I have to too.
		// ...TODO: no, I don't.
		arena->arenastats.sealed = U8GET(p+62) || arena->indexstats.sealed;
	} else {
		arena->arenastats = arena->indexstats;
	}
	arena->block = arena->arenastats.used / arena->blocksize;
	arena->offset = arena->arenastats.used & (arena->blocksize - 1);
	fprint(2, "Resuming: used %lld, block %lld, blocksize %lld, offset %d\n", arena->arenastats.used, arena->block, arena->blocksize, arena->offset);
	// Probably not necessary, but factors out arena->offset to arenastats.used and arena->blocksize
	// so that this operation depends only on already-computed values, avoiding a false dependency on
	// the arena->offset calculation; the true value is just "blocksize - offset".
	// Might speed up initialization by a few microseconds. Realistically, I'm just
	// trying to stay in the habit of thinking about how code flows through hardware, this
	// transformation doesn't matter at all.
	arena->blockremain = arena->blocksize - (arena->arenastats.used & (arena->blocksize - 1));
	// We _can_ read the values in even if the arena is invalid, and the code looks
	// cleaner with all the parsing and validation grouped, so meh, going to keep it
	// like this.
	if(magic != ArenaMagic)
		sysfatal("corrupt arena: magic is incorrect!");
	if(version != 5)
		sysfatal("unsupported arena version %d\n", version);
	if(strncmp(arena->name, buf + 8, strlen(arena->name)) != 0)
		sysfatal("arena name mismatch: %s vs %s", arena->name, buf + 8);
	if(index.arena == nil)
		index.arena = arena;
}

static void
initarena(VtArena *arena, int fd, MapEntry entry, u32int blocksize)
{
	memset(arena, 0, sizeof(*arena));
	arena->fd = fd;
	arena->blocksize = blocksize;
	arena->base = entry.start + blocksize;
	arena->size = entry.stop - entry.start - 2*blocksize;
	memcpy(arena->name, entry.name, NameSize);
	loadarena(arena);
	if(fd > 0xFF || fd < 0){
		sysfatal("assupmtion violated: file descriptor not in range");
	}
}

static void
readarenatable(int fd, u32int tabbase, u32int tabsize, u32int blocksize)
{
	Biobufhdr bio;
	char *buf;
	MapEntry *map = nil;
	u32int nmap;
	buf = malloc(tabsize);
	if(buf == nil)
		sysfatal("oom; you're a loser: %r");
	if(Binits(&bio, fd, OREAD, (uchar*)buf, tabsize))
		sysfatal("failed to init biobuf: %r");
	if(Bseek(&bio, tabbase, 0) != tabbase)
		sysfatal("seek failed: %r");
	if(!parsemap(&bio, &map, &nmap))
		sysfatal("failed to parse arena map of tabbase %d: %r", tabbase);
	arenas = realloc(arenas, sizeof(VtArena) * (nmap + numarenas));
	if(!arenas)
		sysfatal("oom");
	for(; nmap > 0; nmap -= 1){
		arenas[numarenas].index = numarenas;
		initarena(&arenas[numarenas++], fd, map[nmap-1], blocksize);
	}
	free(map);
}

static void
arenapartcheck(u32int magic, u32int version, u32int blocksize, u32int arenabase, u32int tabbase)
{
	if(magic != ArenaPartMagic)
		sysfatal("bad arena partition magic number: %#ux expected ArenaPartMagic (%#ux)", magic, ArenaPartMagic);
	if(version != 3)
		sysfatal("bad arena partition version: only 3 is supported, found %d", version);
	if(blocksize & (blocksize - 1))
		sysfatal("invalid block size: %d is not a power of two", blocksize);
	if(tabbase >= arenabase)
		sysfatal("corrupt arena partition: partition table overlaps with storage");
}

static void
initarenapart(char *path)
{
	u32int version, magic, blocksize, arenabase, tabbase, tabsize;
	char buf[HeadSize];
	u8int *p = (void*)buf;
	int fd;

	/* This file descriptor is deliberately never closed; it is used to read
	 * blocks from the arenas throughout the server's lifetime, and thus we
	 * can rely on the OS to clean it up when we close. */
	if((fd = open(path, ORDWR)) < 0)
		sysfatal("failed to open arena %s: %r", path);
	if(pread(fd, buf, HeadSize, PartBlank) != HeadSize)
		sysfatal("failed to read arena header table: %r");
	magic = U32GET(p);
	version = U32GET(p + 4);
	blocksize = U32GET(p + 8);
	arenabase = U32GET(p + 12);
	/* Head is not perfectly aligned; table must be aligned as first block */
	tabbase = (PartBlank + HeadSize + blocksize - 1) & ~(blocksize - 1);
	tabsize = arenabase - tabbase;
	arenapartcheck(magic, version, blocksize, arenabase, tabbase);
	readarenatable(fd, tabbase, tabsize, blocksize);
}

void
arenassync(void)
{
	int i;
	for(i = 0; i < numarenas; i += 1){
		if(!vtarenasync(&arenas[i]))
			sysfatal("syncarenas: %r");
	}
}

void
initarenas(void)
{
	initarenapart(arenapath);
}

static void
loadisect(VtISect *sect, char *buf)
{
	u8int *p = (u8int*)buf;
	sect->version = U32GET(p + 4);
	memcpy(sect->name, buf + 8, NameSize);
	memcpy(sect->index, buf + 8 + NameSize, NameSize);
	sect->blocksize = U32GET(p + 8 + 2*NameSize);
	sect->blockbase = U32GET(p + 12 + 2*NameSize);
	sect->blocks = U32GET(p + 16 + 2 * NameSize);
	sect->start = U32GET(p + 20 + 2 * NameSize);
	sect->stop = U32GET(p + 24 + 2 * NameSize);
	sect->index[NameSize-1] = 0;
	sect->name[NameSize-1] = 0;
	sect->bucketmagic = 0;
	if(sect->version == 2)
		sect->bucketmagic = U32GET(p + 28 + 2*NameSize);
	sect->buckmax = (sect->blocksize - IBucketSize) / IEntrySize;
	sect->blocklog = u64log2(sect->blocksize);
	sect->tabbase = (PartBlank + HeadSize + sect->blocksize - 1) & ~(sect->blocksize - 1);
	sect->tabsize = sect->blockbase - sect->tabbase;
	sect->cacheindex = numarenas;
}

static void
validateisect(VtISect *sect, u32int magic, char *path)
{
	if(magic != ISectMagic)
		sysfatal("invalid / corrupt index section");
	if(sect->version != 1 && sect->version != 2)
		sysfatal("unrecognized index section version %d; only 1 and 2 are supported", sect->version);
	if(sect->blocksize != (1 << sect->blocklog))
		sysfatal("Illegal or corrupt index section");
	if(sect->tabbase >= sect->blockbase)
		sysfatal("illegal or corrupt index section: config table overlaps bucket store");

	if(sect->blockbase + (u64int)sect->blocks * sect->blocksize != partlen(sect->fd, path) & ~(u64int)(sect->blocksize - 1))
		sysfatal("invalid or corrupt index section header: invalid blocks");
	if(sect->stop - sect->start > sect->blocks)
		sysfatal("invalid or corrupt index section: section overflows available space");
	if(sect->stop < sect->start)
		sysfatal("invalid or corrupt index section: impossible range");
	fprint(2, "blockbase %08d\n", sect->blockbase);
}

static void
initisectpart(char *path)
{
	char buf[HeadSize];

	index.sects = realloc(index.sects, sizeof(VtISect) * (index.nsects + 1));
	VtISect *sect = &index.sects[index.nsects++];
	
	if((sect->fd = open(path, ORDWR)) < 0)
		sysfatal("failed to open index section");
	if(pread(sect->fd, buf, HeadSize, PartBlank) != HeadSize)
		sysfatal("failed to read index section header");

	loadisect(sect, buf);
	validateisect(sect, U32GET((u8int*)buf), path);
}

static void
indexcalc(void)
{
	index.buckets = index.sects[index.nsects-1].stop;
	index.div = (((u64int)1<<32)+index.buckets-1) / index.buckets;
	if((((u64int)1 << 32) - 1) / index.div + 1 != index.buckets)
		sysfatal("corrupt index: divisor and buckets inconsistent");
}

// The index header is found in the first section; parse it.
static void
parseindex(void)
{
	u32int version;
	Biobufhdr bio;
	uchar *buf = malloc(index.sects[0].tabsize + Bungetsize);
	if(buf == nil)
		sysfatal("insufficient memory to start up");
	// Binits cannot fail when given a valid mode; see /sys/src/libbio/binit.c:/^Binits
	Binits(&bio, index.sects[0].fd, OREAD, buf, index.sects[0].tabsize+Bungetsize);
	if(Bseek(&bio, index.sects[0].tabbase, 0) != index.sects[0].tabbase)
		sysfatal("invalid or corrupt index: unable to read header");
	if(memcmp(Brdline(&bio, '\n'), "venti index configuration", 25) != 0)
		sysfatal("invalid or corrupt index: invalid magic");
	if(!Brdu32(&bio, &version) || version != 1)
		sysfatal("invalid or corrupt index: index version unsupported");
	if(memcmp(Brdline(&bio, '\n'), index.sects[0].index, strlen(index.sects[0].index)) != 0)
		sysfatal("invalid or corrupt index: index/section mismatch");
	if(!Brdu32(&bio, &index.blocksize))
		sysfatal("invalid or corrupt index: failed to read blocksize");
	/* TODO(mandatory feature): support multiple index sections instead of dropping them */
	/* The first line here skips the section map */
	parsemap(&bio, &index.amap, &index.namap);
	parsemap(&bio, &index.amap, &index.namap);
	indexcalc();
}

void
initindex(void)
{
	initisectpart(isectpath);
	parseindex();
	for(int i = 0; i < index.namap; i += 1){
		int found = 0;
		for(int j = 0; j < numarenas; j += 1)
			if(strcmp(arenas[j].name, index.amap[i].name) == 0){
				found = 1;
				index.amap[i].arena = &arenas[j];
				break;
			}
		if(!found)
			sysfatal("unable to build arena map");
	}
}