shithub: gefs

Download patch

ref: 1c52bff742ac452c853b93fa9a05b2ac0f243714
parent: e65b37e27a53bbfd2c6c65fae56a7c5c035a5a47
author: Ori Bernstein <ori@eigenstate.org>
date: Sun Oct 30 00:23:06 EDT 2022

blk: move from hacky qsbr to ebr

the quiescent state based reclamation was too home-grown
for my taste, complicated, and not at all lock free; moving
to epoch based reclamation simplifies the code, does a far
more textbook implementation of the algorithm, and removes
locks around the active generations.

this also serves as a testbed for the atomic APIs we want
to support.

--- /dev/null
+++ b/atomic-amd64.s
@@ -1,0 +1,57 @@
+/*  get variants */
+TEXT agetl+0(SB),1,$0
+	MOVL	(RARG), AX
+	RET
+TEXT agetv+0(SB),1,$0
+TEXT agetp+0(SB),1,$0
+	MOVQ	(RARG), AX
+	RET
+
+/*  set variants */
+TEXT asetl+0(SB),1,$0
+	MOVL		v+8(FP), AX
+	LOCK; XCHGL	(RARG), AX
+	RET
+TEXT asetv+0(SB),1,$0
+TEXT asetp+0(SB),1,$0
+	MOVQ		v+8(FP), AX
+	LOCK; XCHGQ	(RARG), AX
+	RET
+
+/*  inc variants */
+TEXT aincl+0(SB),1,$0
+	MOVQ		v+8(FP), AX
+	LOCK; XADDL	AX, (RARG)
+	RET
+TEXT aincv+0(SB),1,$0
+TEXT aincp+0(SB),1,$0
+	MOVQ		v+8(FP), AX
+	LOCK; XADDQ	AX, (RARG)
+	RET
+
+/*  cas variants */
+TEXT acasl+0(SB),1,$0
+	MOVL	c+8(FP), AX
+	MOVL	v+16(FP), BX
+	LOCK; CMPXCHGL	BX, (RARG)
+	MOVL	$1, AX				/* use CMOVLEQ etc. here? */
+	JNZ	fail32
+	RET
+fail32:
+	DECL	AX
+	RET
+TEXT acasv+0(SB),1,$0
+TEXT acasp+0(SB),1,$0
+	MOVQ	c+8(FP), AX
+	MOVQ	v+16(FP), BX
+	LOCK; CMPXCHGQ BX, (RARG)
+	MOVL	$1, AX				/* use CMOVLEQ etc. here? */
+	JNZ	fail64
+	RET
+fail64:
+	DECL	AX
+	RET
+
+/* barriers (do we want to distinguish types?) */
+TEXT coherence+0(SB),1,$0
+	MFENCE
--- /dev/null
+++ b/atomic.h
@@ -1,0 +1,17 @@
+long	agetl(long*);
+vlong	agetv(vlong*);
+void*	agetp(void**);
+
+long	asetl(long*, long);
+vlong	asetv(vlong*, vlong);
+void*	asetp(void**, void*);
+
+long	aincl(long*, long);
+vlong	aincv(vlong*, vlong);
+void*	aincp(void**, void*);
+
+long	acasl(long*, long, long);
+vlong	acasv(vlong*, vlong, vlong);
+void*	acasp(void**, void*, void*);
+
+void	coherence(void);
\ No newline at end of file
--- a/blk.c
+++ b/blk.c
@@ -5,6 +5,7 @@
 
 #include "dat.h"
 #include "fns.h"
+#include "atomic.h"
 
 typedef struct Range	Range;
 
@@ -33,7 +34,7 @@
 	while(1){
 		ov = b->flag;
 		nv = ov | f;
-		if(cas(&b->flag, ov, nv))
+		if(acasl(&b->flag, ov, nv))
 			break;
 	}
 }
@@ -46,7 +47,7 @@
 	while(1){
 		ov = b->flag;
 		nv = ov & ~f;
-		if(cas(&b->flag, ov, nv))
+		if(acasl(&b->flag, ov, nv))
 			break;
 	}
 }
@@ -650,9 +651,6 @@
 	b->bp.addr = bp;
 	b->bp.hash = -1;
 	b->bp.gen = fs->nextgen;
-	lock(&fs->freelk);
-	b->qgen = fs->qgen;
-	unlock(&fs->freelk);
 	switch(t){
 	case Traw:
 	case Tarena:
@@ -727,10 +725,6 @@
 	if(b->type != Traw)
 		PACK16(b->buf, b->type);
 
-	lock(&fs->freelk);
-	b->qgen = fs->qgen;
-	unlock(&fs->freelk);
-
 	switch(b->type){
 	default:
 	case Tpivot:
@@ -796,10 +790,13 @@
 	return b;
 }
 
+
 Blk*
 holdblk(Blk *b)
 {
 	ainc(&b->ref);
+	b->lasthold1 = b->lasthold0;
+	b->lasthold0 = b->lasthold;
 	b->lasthold = getcallerpc(&b);
 	return b;
 }
@@ -810,6 +807,8 @@
 	assert(b == nil || b->ref > 0);
 	if(b == nil || adec(&b->ref) != 0)
 		return;
+	b->lastdrop1 = b->lastdrop0;
+	b->lastdrop0 = b->lastdrop;
 	b->lastdrop = getcallerpc(&b);
 	/*
 	 * While a freed block can get resurrected
@@ -839,82 +838,80 @@
 }
 
 void
-freebp(Tree *t, Bptr bp)
+deferfree(Tree *t, Bptr bp, Blk *b)
 {
 	Bfree *f;
+	ulong ge;
 
 	if(t != nil && t != &fs->snap && bp.gen <= t->gen){
 		killblk(t, bp);
 		return;
 	}
+
 	if((f = malloc(sizeof(Bfree))) == nil)
 		return;
 	f->bp = bp;
-	lock(&fs->freelk);
-	f->next = fs->freehd;
-	fs->freehd = f;
-	unlock(&fs->freelk);
+	f->b = b;
+
+	ge = agetl(&fs->epoch);
+	f->next = fs->limbo[ge];
+	fs->limbo[ge] = f;
 }
 
 void
+freebp(Tree *t, Bptr bp)
+{
+	deferfree(t, bp, nil);
+}
+
+void
 freeblk(Tree *t, Blk *b)
 {
+	holdblk(b);
 	b->freed = getcallerpc(&t);
 	setflag(b, Bfreed);
-	freebp(t, b->bp);
+	deferfree(t, b->bp, b);
 }
 
 void
 epochstart(int tid)
 {
-	ainc((long*)&fs->active[tid]);
+	ulong ge;
+
+	ge = agetl(&fs->epoch);
+	asetl(&fs->lepoch[tid], ge | Eactive);
 }
 
 void
 epochend(int tid)
 {
-	ainc((long*)&fs->active[tid]);
+	ulong le;
+
+	le = agetl(&fs->lepoch[tid]);
+	asetl(&fs->lepoch[tid], le &~ Eactive);
 }
 
 void
 epochclean(void)
 {
-	int i, allquiesced;
+	ulong e, ge;
 	Bfree *p, *n;
 	Arena *a;
+	int i;
 
-	lock(&fs->activelk);
-	allquiesced = 1;
-	for(i = 0; i < fs->nquiesce; i++){
-		/*
-		 * Odd parity on quiescence implies
-		 * that we're between the exit from
-		 * and waiting for the next message
-		 * that enters us into the critical
-		 * section.
-		 */
-		if((fs->active[i] & 1) != 0)
-			continue;
-		if(fs->active[i] == fs->lastactive[i])
-			allquiesced = 0;
+	ge = agetl(&fs->epoch);
+	for(i = 0; i < fs->nworker; i++){
+		e = agetl(&fs->lepoch[i]);
+		if((e & Eactive) && e != (ge | Eactive))
+			return;
 	}
+	lock(&fs->freelk);
+	p = fs->limbo[(ge+1)%3];
+	fs->limbo[(ge+1)%3] = nil;
+	unlock(&fs->freelk);
+	asetl(&fs->epoch, (ge+1) % 3);
 
-	p = nil;
-	if(allquiesced){
-		for(i = 0; i < fs->nquiesce; i++)
-			fs->lastactive[i] = fs->active[i];
 
-		lock(&fs->freelk);
-		fs->qgen++;
-		if(fs->freep != nil){
-			p = fs->freep->next;
-			fs->freep->next = nil;
-		}
-		fs->freep = fs->freehd;
-		unlock(&fs->freelk);
-	}
-	unlock(&fs->activelk);
-
 	while(p != nil){
 		n = p->next;
 		a = getarena(p->bp.addr);
@@ -922,6 +919,8 @@
 		lock(a);
 		cachedel(p->bp.addr);
 		blkdealloc_lk(p->bp.addr);
+		if(p->b != nil)
+			dropblk(p->b);
 		unlock(a);
 
 		free(p);
@@ -944,6 +943,7 @@
 {
 	Arena *a;
 
+	b->qgen = aincv(&fs->qgen, 1);
 	a = getarena(b->bp.addr);
 	assert(checkflag(b, Bdirty));
 	holdblk(b);
@@ -1036,7 +1036,7 @@
 		}else if(!checkflag(b, Bfreed)){
 			if(syncblk(b) == -1){
 				ainc(&fs->broken);
-				fprint(2, "write: %r");
+				fprint(2, "write: %r\n");
 				abort();
 			}
 		}
@@ -1057,7 +1057,6 @@
 		b = cachepluck();
 		b->type = Tmagic;
 		lock(&fs->freelk);
-		b->qgen = ++fs->qgen;
 		unlock(&fs->freelk);
 		qput(&fs->syncq[i], b);
 	}
--- a/cache.c
+++ b/cache.c
@@ -132,7 +132,6 @@
 
 	h = ihash(off);
 
-	inc64(&fs->stats.cachelook, 1);
 	bkt = &fs->cache[h % fs->cmax];
 
 	qlock(&fs->lrulk);
@@ -139,7 +138,6 @@
 	lock(bkt);
 	for(b = bkt->b; b != nil; b = b->hnext){
 		if(b->bp.addr == off){
-			inc64(&fs->stats.cachehit, 1);
  			holdblk(b);
 			lrudel(b);
 			b->lasthold = getcallerpc(&off);
--- a/dat.h
+++ b/dat.h
@@ -79,6 +79,10 @@
 };
 
 enum {
+	Eactive	= 1UL<<30,	/* epoch active flag */
+};
+
+enum {
 	/*
 	 * dent: pqid[8] qid[8] -- a directory entry key.
 	 * ptr:  off[8] hash[8] -- a key for an Dir block.
@@ -439,16 +443,16 @@
 
 	Chan	*wrchan;
 	Chan	*rdchan;
-	int	nquiesce;
+
+	int	nworker;
+	Lock	freelk;
 	vlong	qgen;
-	Lock	activelk;
-	ulong	active[32];
-	int	lastactive[32];
+	long	epoch;
+	long	lepoch[32];
+	Bfree	*limbo[3];
+
 	Syncq	syncq[32];
 
-	Lock	freelk;
-	Bfree	*freep;
-	Bfree	*freehd;
 
 	int	fd;
 	long	broken;
@@ -619,12 +623,17 @@
 	vlong	lognxt;	/* for allocation log */
 
 	/* debug */
-	uintptr	alloced;
 	uintptr lasthold;
 	uintptr lasthold0;
 	uintptr lasthold1;
+
 	uintptr lastdrop;
+	uintptr lastdrop0;
+	uintptr lastdrop1;
+
+	uintptr cached;
 	uintptr uncached;
+	uintptr	alloced;
 	uintptr	freed;
 
 	Bptr	bp;
--- a/fns.h
+++ b/fns.h
@@ -159,7 +159,7 @@
 int	Qconv(Fmt*);
 
 Chan*	mkchan(int);
-void*	chrecv(Chan*, int);
+void*	chrecv(Chan*);
 void	chsend(Chan*, void*);
 void	runfs(int, void*);
 void	runwrite(int, void*);
@@ -167,9 +167,3 @@
 void	runcons(int, void*);
 void	runtasks(int, void*);
 void	runsync(int, void*);
-
-/* it's in libc... */
-extern int cas(long*, long, long);
-extern int fasp(void***, void*);
-extern int cas64(u64int*, u64int, u64int);
-vlong	inc64(vlong*, vlong);
--- a/fs.c
+++ b/fs.c
@@ -7,6 +7,7 @@
 
 #include "dat.h"
 #include "fns.h"
+#include "atomic.h"
 
 static char*	clearb(Fid*, vlong, vlong);
 
@@ -111,18 +112,14 @@
 }
 
 void*
-chrecv(Chan *c, int block)
+chrecv(Chan *c)
 {
 	void *a;
 	long v;
 
-	v = c->count;
-	if(v == 0 || cas(&c->count, v, v-1) == 0){
-		if(block)
-			semacquire(&c->count, 1);
-		else
-			return nil;
-	}
+	v = agetl(&c->count);
+	if(v == 0 || !acasl(&c->count, v, v-1))
+		semacquire(&c->count, 1);
 	lock(&c->rl);
 	a = *c->rp;
 	if(++c->rp >= &c->args[c->size])
@@ -137,8 +134,8 @@
 {
 	long v;
 
-	v = c->avail;
-	if(v == 0 || cas(&c->avail, v, v-1) == 0)
+	v = agetl(&c->avail);
+	if(v == 0 || !acasl(&c->avail, v, v-1))
 		semacquire(&c->avail, 1);
 	lock(&c->wl);
 	*c->wp = m;
@@ -695,7 +692,7 @@
 	memset(de, 0, sizeof(Dent));
 	de->ref = 0;
 	de->qid.type = QTAUTH;
-	de->qid.path = inc64(&fs->nextqid, 1);
+	de->qid.path = aincv(&fs->nextqid, 1);
 	de->qid.vers = 0;
 	de->length = 0;
 	de->k = nil;
@@ -1319,7 +1316,7 @@
 		d.qid.type |= QTEXCL;
 	if(m->perm & DMTMP)
 		d.qid.type |= QTTMP;
-	d.qid.path = inc64(&fs->nextqid, 1);
+	d.qid.path = aincv(&fs->nextqid, 1);
 	d.qid.vers = 0;
 	d.mode = m->perm;
 	if(m->perm & DMDIR)
@@ -1931,7 +1928,7 @@
 	int ao;
 
 	while(1){
-		m = chrecv(fs->wrchan, 1);
+		m = chrecv(fs->wrchan);
 		epochstart(wid);
 		ao = (m->a == nil) ? AOnone : m->a->op;
 		switch(ao){
@@ -1980,7 +1977,7 @@
 	Fmsg *m;
 
 	while(1){
-		m = chrecv(fs->rdchan, 1);
+		m = chrecv(fs->rdchan);
 		epochstart(wid);
 		switch(m->type){
 		case Tattach:	fsattach(m);	break;
--- a/main.c
+++ b/main.c
@@ -17,19 +17,6 @@
 char	*dev;
 vlong	cachesz = 512*MiB;
 
-vlong
-inc64(vlong *v, vlong dv)
-{
-	vlong ov, nv;
-
-	while(1){
-		ov = *v;
-		nv = ov + dv;
-		if(cas64((u64int*)v, ov, nv))
-			return ov;
-	}
-}
-
 static void
 initfs(vlong cachesz)
 {
@@ -65,7 +52,7 @@
 {
 	int pid;
 
-	assert(wid == -1 || wid < nelem(fs->active));
+	assert(wid == -1 || wid < nelem(fs->lepoch));
 	pid = rfork(RFPROC|RFMEM|RFNOWAIT);
 	if (pid < 0)
 		sysfatal("can't fork: %r");
@@ -216,10 +203,10 @@
 	srvfd = postfd(srvname, "");
 	ctlfd = postfd(srvname, ".cmd");
 	launch(runtasks, -1, nil, "tasks");
-	launch(runcons, fs->nquiesce++, (void*)ctlfd, "ctl");
-	launch(runwrite, fs->nquiesce++, nil, "mutate");
+	launch(runcons, fs->nworker++, (void*)ctlfd, "ctl");
+	launch(runwrite, fs->nworker++, nil, "mutate");
 	for(i = 0; i < 2; i++)
-		launch(runread, fs->nquiesce++, nil, "readio");
+		launch(runread, fs->nworker++, nil, "readio");
 	for(i = 0; i < fs->nsyncers; i++)
 		launch(runsync, -1, &fs->syncq[i], "syncio");
 	for(i = 0; i < nann; i++)
--- a/mkfile
+++ b/mkfile
@@ -18,9 +18,12 @@
 	snap.$O\
 	tree.$O\
 	user.$O\
+	\
+	atomic-$objtype.$O
 
 HFILES=\
 	dat.h\
-	fns.h
+	fns.h\
+	atomic.h
 
 </sys/src/cmd/mkone
--- a/snap.c
+++ b/snap.c
@@ -5,6 +5,7 @@
 
 #include "dat.h"
 #include "fns.h"
+#include "atomic.h"
 
 int
 scandead(Dlist *l, int lblk, void (*fn)(Bptr, void*), void *dat)
@@ -396,7 +397,7 @@
 
 	if((r = calloc(sizeof(Tree), 1)) == nil)
 		return nil;
-	gen = inc64(&fs->nextgen, 1);
+	gen = aincv(&fs->nextgen, 1);
 	memset(&r->lk, 0, sizeof(r->lk));
 	r->snext = fs->osnap;
 	r->memref = 1;