shithub: neoventi

Download patch

ref: fe8152ff8dcb69ce4aca3841df3d23bb71304f9c
parent: 5afe23b5498517df907f724e30f49e0a371d3bf4
author: Noam Preil <noam@pixelhero.dev>
date: Sat Aug 2 12:24:57 EDT 2025

working writes and reads, but not persisted properly

--- a/disk.c
+++ b/disk.c
@@ -54,13 +54,65 @@
 {
 	// TODO: binary search or some such
 	u64int a;
-	for(a = 0; a < index.namap; a += 1)
-		if(addr >= index.amap[a].start && addr < index.amap[a].stop)
+	u64int start, stop;
+	for(a = 0; a < numarenas; a += 1){
+		start = arenas[a].base - arenas[a].blocksize;
+		stop = start + arenas[a].size + 2*arenas[a].blocksize;
+		if(addr >= start && addr < stop)
 			return a;
-	sysfatal("internal corruption: arena not found for arenaindex");
+	}
+	werrstr("internal corruption: arena not found for arenaindex");
+	abort();
 }
 
 int
+vtwriteindexclump(uchar *score, u64int offset, u16int len)
+{
+	u64int bucket;
+	u32int t32;
+	u8int blocks;
+	char *buf;
+	VtISect *s_sect;
+	u16int key;
+	u32int magic;
+	bucket = U32GET(score) / index.div;
+	u16int n, off;
+	s_sect = &index.sects[isectforbucket(bucket)];
+	bucket -= s_sect->start;
+	key = s_sect->cacheindex + (bucket >> 16);
+	if(!cachelookup((char**)&buf, key, bucket & 0xffff)){
+		if(pread(s_sect->fd, (char*)buf, s_sect->blocksize, s_sect->blockbase + (bucket << s_sect->blocklog)) != s_sect->blocksize){
+			cacheunlock(key, bucket & 0xffff);
+			werrstr("Failed to read bucket");
+			return 0;
+		}
+	}
+	n = U16GET(buf);
+	if(s_sect->bucketmagic){
+		magic = U32GET(buf+2);
+		if(magic == 0){
+			// Uninitialized!
+			U32PUT(buf+2, s_sect->bucketmagic);
+			n = 0;
+		}
+	}
+	off = 6 + n*IEntrySize;
+	memcpy(buf+off, score, 20);
+	U64PUT(buf+off+26, offset, t32);
+	U16PUT(buf+off+34, len);
+	blocks = (len + 38 + (1<<ABlockLog) - 1) >> ABlockLog;
+	buf[off+37] = blocks;
+	U16PUT(buf, n+1);
+	if(pwrite(s_sect->fd, (char*)buf, s_sect->blocksize, s_sect->blockbase + (bucket << s_sect->blocklog)) != s_sect->blocksize){
+		cacheunlock(key, bucket & 0xffff);
+		werrstr("Failed to writeback bucket: %r");
+		return 0;
+	}
+	cacheunlock(key, bucket&0xffff);
+	return 1;
+}
+
+int
 vtreadlookup(u8int *score, VtAddress *addr)
 {
 	u8int *buf;
@@ -99,8 +151,8 @@
 	addr->blocks = buf[6 + (entry*IEntrySize) + 37];
 	cacheunlock(key, bucket & 0xffff);
 	aindex = aindexfromaddr(addr->offset);
-	addr->s_arena = index.amap[aindex].arena;
-	addr->offset -= index.amap[aindex].start;
+	addr->s_arena = &arenas[aindex];
+	addr->offset -= (addr->s_arena->base - addr->s_arena->blocksize);
 	return 1;
 }
 
@@ -209,6 +261,8 @@
 	if(pwrite(index.arena->fd, index.arena->buf, index.arena->blocksize, index.arena->base+(index.arena->block*index.arena->blocksize)) != index.arena->blocksize){
 		sysfatal("flush failed: %r");
 	}
+		index.arena->buf = nil;
+		cacheunlock(index.arena->index, index.arena->block);
 }
 
 // Advance the block and, if needed, the arena.
@@ -219,8 +273,6 @@
 {
 	if(index.arena->buf != nil){
 		blockflush();
-		index.arena->buf = nil;
-		cacheunlock(index.arena->index, index.arena->block);
 	}
 	index.arena->block += 1;
 	index.arena->offset = 0;
@@ -237,6 +289,12 @@
 		blockadvance();
 }
 
+static void
+arenarequireminimum(u16int space)
+{
+	// TODO
+}
+
 // Grabs the current block for writing into the cache, reading any existing contents
 // if applicable.
 static void
@@ -270,7 +328,7 @@
 }
 
 int
-vtwritearenaclump(char *buf, u16int len)
+vtwritearenaclump(char *buf, u16int len, u64int *offset)
 {
 	/* We've got a 38-byte clump header to write, and the contents
 	 * of the clump. We'll be splitting it across blocks. First, find
@@ -282,7 +340,10 @@
 	u16int n = 0, nn;
 	if(index.arena->blocksize != 8192)
 		sysfatal("invalid blocksize %d\n", index.arena->blocksize);
+	arenarequireminimum(38+len);
 	requireminimum(38);
+
+	*offset = index.arena->base + (index.arena->block)*index.arena->blocksize - index.arena->blocksize + index.arena->offset;
 	getblock();
 	wptr = index.arena->buf + index.arena->offset;
 	clumpwrite(wptr, len);
@@ -303,27 +364,27 @@
 	index.arena->arenastats.clumps += 1;
 	blockflush();
 	// TODO ctime/wtime? Shouldn't go in here, though, should be at a higher level.
-	werrstr("TODO");
 	return 1;
-
 }
 
 int
-vtwriteclump(char *buf, u16int len)
+vtwriteclump(char *buf, u16int len, uchar *score)
 {
-	USED(len, buf);
+	u64int offset;
+	u8int blocks;
 	// - Lock index and arena
 	// - Write data to arena
 	// - Write metadata to arena
 	// - Add entry to index
-	wlock(&index);
-	if(!vtwritearenaclump(buf, len)){
+	if(!vtwritearenaclump(buf, len, &offset)){
 		wunlock(&index);
 		return 0;
 	}
-	werrstr("TODO: write clump to index");
-	wunlock(&index);
-	return 0;
+	if(!vtwriteindexclump(score, offset, len)){
+		werrstr("failed to write clump to index: %r");
+		return 0;
+	}
+	return 1;
 }
 
 static int
@@ -390,8 +451,6 @@
 	// trying to stay in the habit of thinking about how code flows through hardware, this
 	// transformation doesn't matter at all.
 	arena->blockremain = arena->blocksize - (arena->arenastats.used & (arena->blocksize - 1));
-	if(arena->arenastats.uncsize != 0 || arena->index == 0)
-		fprint(2, "Arena %d: live block %d, offset %d, remain %d\n", arena->index, arena->block, arena->offset, arena->blockremain);
 	// We _can_ read the values in even if the arena is invalid, and the code looks
 	// cleaner with all the parsing and validation grouped, so meh, going to keep it
 	// like this.
@@ -432,7 +491,8 @@
 	if(Bseek(&bio, tabbase, 0) != tabbase)
 		sysfatal("seek failed: %r");
 	if(!parsemap(&bio, &map, &nmap))
-		sysfatal("failed to parse arena map of tabbase %d: %r", tabbase);	arenas = realloc(arenas, sizeof(VtArena) * (nmap + numarenas));
+		sysfatal("failed to parse arena map of tabbase %d: %r", tabbase);
+	arenas = realloc(arenas, sizeof(VtArena) * (nmap + numarenas));
 	if(!arenas)
 		sysfatal("oom");
 	for(; nmap > 0; nmap -= 1){
@@ -540,7 +600,7 @@
 	index.sects = realloc(index.sects, sizeof(VtISect) * (index.nsects + 1));
 	VtISect *sect = &index.sects[index.nsects++];
 	
-	if((sect->fd = open(path, OREAD)) < 0)
+	if((sect->fd = open(path, ORDWR)) < 0)
 		sysfatal("failed to open index section");
 	if(pread(sect->fd, buf, HeadSize, PartBlank) != HeadSize)
 		sysfatal("failed to read index section header");
--- a/neoventi.h
+++ b/neoventi.h
@@ -4,6 +4,7 @@
 #define	U64GET(p)	(((u64int)U32GET(p)<<32)|(u64int)U32GET((p)+4))
 #define	U16PUT(p,v)	(p)[0]=(v)>>8;(p)[1]=(v)
 #define	U32PUT(p,v)	(p)[0]=(v)>>24;(p)[1]=(v)>>16;(p)[2]=(v)>>8;(p)[3]=(v)
+#define	U64PUT(p,v,t32)	t32=(v)>>32;U32PUT(p,t32);t32=(v);U32PUT((p)+4,t32)
 
 enum {
 	VtRerror = 1,
@@ -42,7 +43,7 @@
 	MaxAMap = 31*1024,
 	ClumpInfoSize = 25,
 	ClumpSize = ClumpInfoSize + 13,
-	ABlockLog = 9, /* All reads are of 512 byte sectors??? Yikes. We should probably use a larger size, FIXME. */
+	ABlockLog = 13,
 };
 
 typedef struct {
@@ -136,7 +137,7 @@
 int vtreadlookup(u8int *score, VtAddress *addr);
 u16int vtreadarena(VtArena *arena, u64int addr, uchar *dbuf, u16int reqsize);
 int readclump(uchar *dst, VtAddress addr);
-int vtwriteclump(char *dst, u16int len);
+int vtwriteclump(char *dst, u16int len, uchar *score);
 int Brdu32(Biobufhdr *bio, u32int *u32);
 int stru32int(char *s, u32int *r);
 int stru64int(char *s, u64int *r);
--- a/notebook
+++ b/notebook
@@ -2879,3 +2879,68 @@
 
 
 ...okay! data is being written. The question, of course, is if it's being written _correctly_. We're also not updating insignificant things like "the arena's used space metrics," so nothing is really being persisted across shutdowns...
+
+Indexing... okay then. vtreadlookup maps scores to VtAddress. We need to compute the score, and the VtAddress, do that same mapping, but _insert into the bucket_. Simple in principle.
+
+Already got the score computed to check if it's been written before; can just forward that to vtwrite. As for what we actually need from a VtAddress: 
+
+- Enough info to read the full size from disk (currently using block count, and then using embedded length info from disk)
+- arena, offset within arena
+
+Currently just using s_arena, offset, blocks (as size count). There's also a size field, though, which we could probably use instead. Either way, not terrible.
+
+Read process, to reverse:
+
+-  take score, divide by index divisor to select bucket
+	- TODO: optimize on-disk index format
+- Look up index section by bucket
+- Read index bucket into/from cache
+- Verify bucket magic
+- Look up entry in bucket
+- Compute address from info in bucket
+
+So, write process:
+
+- Take score, divide by index divisor to select bucket
+- Read bucket into/from cache, if it exists
+- Initialize bucket, if magic is zero
+- Append entry to bucket
+- Mark bucket as dirty in cache?
+	- For now, just write-through immediately.
+
+The arena field is also implicit; what's stored is an absolute, 64-bit offset.
+
+Bucket format:
+
+- 2 byte entry count
+- 4 byte magic
+- 38-byte entry, repeated * entry count
+	- 20 byte score
+	- 4 byte wtime(???)
+	- 2 byte train(?????)
+		- unused. This is fake.
+	- 8 byte offset
+	- 2 byte size
+	- 1 byte type (facepalm)
+	- 1 byte block counter
+
+TODO: replace with 20 byte score, 8 byte offset, 2 byte size; 30 byte entries? Although, part of the score goes into selecting the bucket; 8 bytes of the score, divided by divisor - so, bucket * divisor == first eight bytes of score. Could just store 12 bytes of score, then. That yields 12 byte score + 8 byte offset + 2 byte size == 22 bytes per entry. Drop size in favor of blocks, relying on size info in the arenas (which we do anyways!) and that's 21 bytes per entry, so we can at least fit three full entries per cache line... Hell, could drop block size info, too! Offset is sufficient to find the size info from the clump header, and then keep reading blocks if we need to! That's 20 bytes per entry!
+	
+Interestingly, venti has a comment claiming that byte-at-a-time bucket manipulation is the bottleneck for some index accesses in oldventi.
+
+Regardless, all we _actually need_ to write is the 20 byte score, the 8 byte offset, the 1 byte block counter. That's 29 bytes. We write those, then bump the counter.
+
+The process in more detail, then:
+
+- Take score, divide by index divisor to select bucket
+- Read bucket into/from cache, if it exists
+- If magic is zero, set magic to magic and count to zero
+- Append entry to bucket
+	- Offset is absolute; i.e. it's arena->start + offset within arena.
+- Bump count
+- Write bucket back to disk
+- Release from cache
+
+Okay, can write data but readback fails with hash collision??
+
+...oh. I'm actually using index address. lol. Let's just drop that check...
\ No newline at end of file
--- a/server.c
+++ b/server.c
@@ -99,17 +99,16 @@
 	u8int score[20];
 	uchar buf2[MaxPacketSize];
 	VtAddress addr;
-	print("computing score for write of len %d\n", len);
 	sha1((uchar*)buf+8, len, score, nil);
 	if(vtreadlookup(score, &addr)){
 		if(addr.size != len)
-			vterr(conn, buf, "hash collision detected");
+			vterr(conn, buf, "hash collision detected, addr size mismatch");
 		if(!readclump(buf2, addr))
 			vterr(conn, buf, "clump read failed: %r");
 		if(memcmp(buf2, buf+8, len) != 0)
-			vterr(conn, buf, "hash collision detected");
+			vterr(conn, buf, "hash collision detected, memcmp divergence on write");
 	} else {
-		if(!vtwriteclump(buf+8, len))
+		if(!vtwriteclump(buf+8, len, score))
 			vterr(conn, buf, "data insertion failed: %r");
 	}
 	memcpy(buf+4, score, 20);
@@ -164,7 +163,6 @@
 		// buffer, and read more into the buffer _after_ it.
 		// Then, repeat.
 		n = read(conn.fd, buf+sz, 0x10000-sz);
-		print("read %d bytes from connection\n", n);
 		sz += n;
 		if(sz == 0)
 			// No data, and none will be coming.
@@ -186,9 +184,7 @@
 		// anyways? Don't want allocation churn, but we can just move packetbuf
 		// into VtConn and pass both the source and destination buffer around a lot.
 		// Not important right now.
-			print("extracting packet into buffer...\n");
 			memcpy(packetbuf, buf+offset, 2+len);
-			print("handling packet...\n");
 			vtconnhandle(conn, packetbuf, len);
 			offset += 2+len;
 		}
--