shithub: gefs

Download patch

ref: e9bb7d370d769f3bec1b1ef1606f405ba4044255
parent: 3d6613cadcbc1990e32e283a12152df0f2caf511
author: Ori Bernstein <ori@eigenstate.org>
date: Sun Jan 30 00:50:53 EST 2022

blk: sync blocks to disk in background

This moves the write bottleneck to the root block
copy, which is the next thing to examine.

--- a/blk.c
+++ b/blk.c
@@ -6,17 +6,27 @@
 #include "dat.h"
 #include "fns.h"
 
-typedef struct Range Range;
+typedef struct Range	Range;
+typedef struct Flushq	Flushq;
+
 struct Range {
 	vlong off;
 	vlong len;
 };
 
+struct Flushq {
+	Blk	**heap;
+	int	nheap;
+	int	heapsz;
+};
+
 static vlong	blkalloc_lk(Arena*);
 static vlong	blkalloc(int);
 static int	blkdealloc_lk(vlong);
 static Blk*	initblk(vlong, int);
 
+static Blk magic;
+
 void
 setflag(Blk *b, int flg)
 {
@@ -51,18 +61,6 @@
 	return pwrite(fs->fd, b->buf, Blksz, b->bp.addr);
 }
 
-void
-enqueue(Blk *b)
-{
-	assert(b->flag & Bdirty);
-	finalize(b);
-	if(syncblk(b) == -1){
-		ainc(&fs->broken);
-		fprint(2, "write: %r");
-		abort();
-	}
-}
-
 static Blk*
 readblk(vlong bp, int flg)
 {
@@ -676,6 +674,7 @@
 		return nil;
 	if((b = initblk(bp, t)) == nil)
 		return nil;
+
 	setmalloctag(b, getcallerpc(&t));
 	return b;
 }
@@ -784,7 +783,7 @@
 	if(b == nil || adec(&b->ref) != 0)
 		return;
 	assert(!(b->flag & Bcached));
-	assert((b->flag & Bqueued) || !(b->flag & Bdirty));
+	assert((b->flag & Bfreed) || !(b->flag & Bdirty));
 	free(b);
 }
 
@@ -810,8 +809,8 @@
 void
 freeblk(Tree *t, Blk *b)
 {
-	assert(!(b->flag & Bqueued));
 	b->freed = getcallerpc(&b);
+	setflag(b, Bfreed);
 	freebp(t, b->bp);
 }
 
@@ -874,30 +873,140 @@
 }
 
 int
-sync(void)
+blkcmp(Blk *a, Blk *b)
 {
-	int i, r;
+	if(a->bp.gen != b->bp.gen)
+		return (a->bp.gen < b->bp.gen) ? -1 : 1;
+	if(a->bp.addr != b->bp.addr)
+		return (a->bp.addr < b->bp.addr) ? -1 : 1;
+	return 0;
+}
+
+void
+enqueue(Blk *b)
+{
 	Arena *a;
-//	Blk *b;
 
-	r = 0;
+	a = getarena(b->bp.addr);
+	assert(b->flag & Bdirty);
+	refblk(b);
+	finalize(b);
+	chsend(a->sync, b);
+}
 
-	qlock(&fs->snaplk);
+void
+qput(Flushq *q, Blk *b)
+{
+	Blk *t;
+	int i;
+
+	if(q->nheap == q->heapsz)
+		abort();
+	q->heap[q->nheap] = b;
+	for(i = q->nheap; i > 0; i = (i-1)/2){
+		if(blkcmp(q->heap[i], q->heap[(i-1)/2]) == -1)
+			break;
+		t = q->heap[i];
+		q->heap[i] = q->heap[(i-1)/2];
+		q->heap[(i-1)/2] = t;
+	}
+	q->nheap++;
+
+}
+
+Blk*
+qpop(Flushq *q)
+{
+	int i, l, r, m;
+	Blk *b, *t;
+
+	if(q->nheap == 0)
+		return nil;
+	b = q->heap[0];
+	if(--q->nheap == 0)
+		return b;
+
+	i = 0;
+	q->heap[0] = q->heap[q->nheap];
+	while(1){
+		m = i;
+		l = 2*i+1;
+		r = 2*i+2;
+		if(l < q->nheap && blkcmp(q->heap[m], q->heap[l]) == -1)
+			m = l;
+		if(r < q->nheap && blkcmp(q->heap[m], q->heap[r]) == -1)
+			m = r;
+		if(m == i)
+			break;
+		t = q->heap[m];
+		q->heap[m] = q->heap[i];
+		q->heap[i] = t;
+		i = m;
+	}
+	return b;
+
+}
+
+void
+runsync(int, void *p)
+{
+	Flushq q;
+	Chan *c;
+	Blk *b;
+
+	c = p;
+	q.nheap = 0;
+	q.heapsz = fs->cmax;
+	if((q.heap = malloc(q.heapsz*sizeof(Blk*))) == nil)
+		sysfatal("alloc queue: %r");
+	while(1){
+		while(q.nheap < q.heapsz){
+			b = chrecv(c, q.nheap == 0);
+			if(b == &magic){
+				if(adec(&fs->syncing) == 0){
+					qlock(&fs->synclk);
+					rwakeupall(&fs->syncrz);
+					qunlock(&fs->synclk);
+				}
+				continue;
+			}				
+			if(b != nil)
+				qput(&q, b);
+			if(b == nil || q.nheap == q.heapsz)
+				break;
+		}
+	
+		b = qpop(&q);
+		if(!(b->flag & Bfreed)){
+			if(syncblk(b) == -1){
+				ainc(&fs->broken);
+				fprint(2, "write: %r");
+				abort();
+			}
+		}
+		putblk(b);
+	}
+}
+
+void
+sync(void)
+{
+	Arena *a;
+	int i;
+
+	qlock(&fs->synclk);
+	fs->syncing = fs->nsyncers;
+	for(i = 0; i < fs->nsyncers; i++)
+		chsend(fs->chsync[i], &magic);
+	while(fs->syncing != 0)
+		rsleep(&fs->syncrz);
 	for(i = 0; i < fs->narena; i++){
 		a = &fs->arenas[i];
 		finalize(a->tail);
 		if(syncblk(a->tail) == -1)
-			r = -1;
+			sysfatal("sync arena: %r");
 		if(syncarena(a) == -1)
-			r = -1;
+			sysfatal("sync arena: %r");
 	}
-//
-//	for(b = fs->chead; b != nil; b = b->cnext){
-//		if((b->flag & Bdirty) == 0)
-//			continue;
-//		if(syncblk(b) == -1)
-//			r = -1;
-//	}
-	qunlock(&fs->snaplk);
-	return r;
+	qunlock(&fs->synclk);
 }
--- a/check.c
+++ b/check.c
@@ -55,7 +55,7 @@
 	getval(b, 0, &x);
 	if(lo && keycmp(lo, &x) > 0){
 		fprint(fd, "out of range keys %P != %P\n", lo, &x);
-		showblk(2, b, "wut", 1);
+		showblk(2, b, "out of range", 1);
 		fail++;
 	}
 	for(i = 1; i < b->nval; i++){
--- a/dat.h
+++ b/dat.h
@@ -433,7 +433,12 @@
 	/* arena allocation */
 	Arena	*arenas;
 	long	roundrobin;
+	long	syncing;
+	long	nsyncers;
+
 	int	gotinfo;
+	QLock	synclk;
+	Rendez	syncrz;
 
 	QLock	snaplk;	/* snapshot lock */
 	Mount	*mounts;
@@ -446,6 +451,7 @@
 	Lock	activelk;
 	int	active[32];
 	int	lastactive[32];
+	Chan	*chsync[32];
 	Lock	freelk;
 	Bfree	*freep;
 	Bfree	*freehd;
@@ -495,6 +501,7 @@
 	/* freelist */
 	Bptr	head;
 	Blk	*tail;	/* tail held open for writing */
+	Chan	*sync;
 };
 
 struct Xdir {
@@ -611,6 +618,7 @@
 	Blk	*fnext;
 
 	long	flag;
+	long	syncgen;
 
 	/* serialized to disk in header */
 	short	type;	/* @0, for all */
--- a/fns.h
+++ b/fns.h
@@ -44,7 +44,7 @@
 void	reamfs(char*);
 int	loadarena(Arena*, Fshdr *fi, vlong);
 void	loadfs(char*);
-int	sync(void);
+void	sync(void);
 int	loadlog(Arena*);
 int	scandead(Dlist*, int, void(*)(Bptr, void*), void*);
 int	endfs(void);
@@ -132,13 +132,14 @@
 int	Qconv(Fmt*);
 
 Chan*	mkchan(int);
-Fmsg*	chrecv(Chan*);
-void	chsend(Chan*, Fmsg*);
+void*	chrecv(Chan*, int);
+void	chsend(Chan*, void*);
 void	runfs(int, void*);
 void	runwrite(int, void*);
 void	runread(int, void*);
 void	runcons(int, void*);
 void	runtasks(int, void*);
+void	runsync(int, void*);
 
 /* it's in libc... */
 extern int cas(long*, long, long);
--- a/fs.c
+++ b/fs.c
@@ -65,7 +65,7 @@
 	if(u != nil)
 		closesnap(u);
 	closesnap(t);
-	/* we probably want explicit snapshots to be resilient */
+	/* we probably want explicit snapshots to get synced */
 	sync();
 	fprint(fd, "snap taken: %s\n", new);
 }
@@ -111,15 +111,19 @@
 
 }
 
-Fmsg*
-chrecv(Chan *c)
+void*
+chrecv(Chan *c, int block)
 {
 	void *a;
 	long v;
 
 	v = c->count;
-	if(v == 0 || cas(&c->count, v, v-1) == 0)
-		semacquire(&c->count, 1);
+	if(v == 0 || cas(&c->count, v, v-1) == 0){
+		if(block)
+			semacquire(&c->count, 1);
+		else
+			return nil;
+	}
 	lock(&c->rl);
 	a = *c->rp;
 	if(++c->rp >= &c->args[c->size])
@@ -130,7 +134,7 @@
 }
 
 void
-chsend(Chan *c, Fmsg *m)
+chsend(Chan *c, void *m)
 {
 	long v;
 
@@ -166,10 +170,8 @@
 
 	r->tag = m->tag;
 	dprint("→ %F\n", r);
-	if((n = convS2M(r, buf, sizeof(buf))) == 0){
-		fprint(2, "wut: %r\n");
+	if((n = convS2M(r, buf, sizeof(buf))) == 0)
 		abort();
-	}
 	w = write(m->fd, buf, n);
 	if(w != n)
 		fshangup(m->fd, Eio);
@@ -1686,7 +1688,7 @@
 	int ao;
 
 	while(1){
-		m = chrecv(fs->wrchan);
+		m = chrecv(fs->wrchan, 1);
 		quiesce(wid);
 		ao = (m->a == nil) ? AOnone : m->a->op;
 		switch(ao){
@@ -1732,7 +1734,7 @@
 	Fmsg *m;
 
 	while(1){
-		m = chrecv(fs->rdchan);
+		m = chrecv(fs->rdchan, 1);
 		quiesce(wid);
 		switch(m->type){
 		case Tflush:	rerror(m, Eimpl);	break;
--- a/main.c
+++ b/main.c
@@ -15,6 +15,7 @@
 int	nproc;
 char	*forceuser;
 char	*srvname = "gefs";
+int	cachesz = 512*MiB;
 
 vlong
 inc64(vlong *v, vlong dv)
@@ -88,10 +89,8 @@
 main(int argc, char **argv)
 {
 	int i, srvfd, ctlfd;
-	vlong cachesz;
 	char *s;
 
-	cachesz = 512*MiB;
 	ARGBEGIN{
 	case 'r':
 		ream = 1;
@@ -141,22 +140,35 @@
 		nproc = atoi(s);
 	if(nproc == 0)
 		nproc = 2;
+	if(nproc > nelem(fs->active))
+		nproc = nelem(fs->active);
 	if(ream){
 		reamfs(argv[0]);
 		exits(nil);
-	}else{
-		fs->rdchan = mkchan(128);
-		fs->wrchan = mkchan(128);
-		srvfd = postfd(srvname, "");
-		ctlfd = postfd(srvname, ".cmd");
-		loadfs(argv[0]);
-		launch(runtasks, -1, nil, "tasks");
-		launch(runcons, fs->nquiesce++, (void*)ctlfd, "ctl");
-		launch(runwrite, fs->nquiesce++, nil, "writeio");
-		for(i = 0; i < nproc; i++)
-			launch(runread, fs->nquiesce++, nil, "readio");
-		if(srvfd != -1)
-			launch(runfs, fs->nquiesce++, (void*)srvfd, "srvio");
-		exits(nil);
 	}
+
+	loadfs(argv[0]);
+
+	fs->syncrz.l = &fs->synclk;
+	fs->rdchan = mkchan(32);
+	fs->wrchan = mkchan(32);
+	fs->nsyncers = nproc;
+	if(fs->nsyncers > fs->narena)
+		fs->nsyncers = fs->narena;
+	for(i = 0; i < fs->nsyncers; i++)
+		fs->chsync[i] = mkchan(128);
+	for(i = 0; i < fs->narena; i++)
+		fs->arenas[i].sync = fs->chsync[i%nproc];
+	srvfd = postfd(srvname, "");
+	ctlfd = postfd(srvname, ".cmd");
+	launch(runtasks, -1, nil, "tasks");
+	launch(runcons, fs->nquiesce++, (void*)ctlfd, "ctl");
+	launch(runwrite, fs->nquiesce++, nil, "mutate");
+	for(i = 0; i < nproc; i++)
+		launch(runread, fs->nquiesce++, nil, "readio");
+	for(i = 0; i < fs->nsyncers; i++)
+		launch(runsync, -1, fs->chsync[i], "syncio");
+	if(srvfd != -1)
+		launch(runfs, fs->nquiesce++, (void*)srvfd, "srvio");
+	exits(nil);
 }
--- a/ream.c
+++ b/ream.c
@@ -218,7 +218,15 @@
 
 	putblk(tb);
 	putblk(rb);
+	for(i = 0; i < fs->narena; i++){
+		a = &fs->arenas[i];
+		finalize(a->tail);
+		if(syncblk(a->tail) == -1)
+			sysfatal("sync arena: %r");
+		packarena(a->b->data, Blkspc, a, fs);
+		finalize(a->b);
+		if(syncblk(a->b) == -1)
+			sysfatal("sync arena: %r");
+	}
 	free(mnt);
-	if(sync() == -1)
-		sysfatal("ream: sync: %r");
 }
--- a/snap.c
+++ b/snap.c
@@ -148,7 +148,7 @@
 Tree*
 openlabel(char *name)
 {
-	char *p, buf[Keymax];
+	char *p, buf[Kvmax];
 	Kvp kv;
 	Key k;