shithub: gefs

Download patch

ref: 7e2b02ff91250749088f481e3a5bbba595051cdb
parent: e62e48db2a4c71875f9e7ab44f8c11ea3b749a5f
author: Ori Bernstein <ori@eigenstate.org>
date: Mon Oct 17 22:25:58 EDT 2022

blk: use sync queue directly, instead of sending blocks over channel

there's no reason to queue blocks so that we can put them into a queue.

--- a/blk.c
+++ b/blk.c
@@ -7,7 +7,6 @@
 #include "fns.h"
 
 typedef struct Range	Range;
-typedef struct Flushq	Flushq;
 
 struct Range {
 	vlong off;
@@ -14,12 +13,6 @@
 	vlong len;
 };
 
-struct Flushq {
-	Blk	**heap;
-	int	nheap;
-	int	heapsz;
-};
-
 static vlong	blkalloc_lk(Arena*);
 static vlong	blkalloc(int);
 static int	blkdealloc_lk(vlong);
@@ -26,8 +19,6 @@
 static Blk*	initblk(Blk*, vlong, int);
 static int	logop(Arena *, vlong, vlong, int);
 
-static Blk magic;
-
 int
 checkflag(Blk *b, int f)
 {
@@ -139,11 +130,13 @@
 }
 
 static Arena*
-pickarena(int hint)
+pickarena(int hint, int tries)
 {
 	int n;
 
-	n = hint+ainc(&fs->roundrobin)/(1024*1024);
+	n = hint*7;
+	n += tries;
+	n += ainc(&fs->roundrobin)/(1024*1024);
 	return &fs->arenas[n%fs->narena];
 }
 
@@ -447,7 +440,7 @@
 	if(a->tail != nil){
 		finalize(a->tail);
 		if(syncblk(a->tail) == -1){
-			free(b);
+			dropblk(b);
 			return -1;
 		}
 	}
@@ -606,7 +599,7 @@
 
 	tries = 0;
 Again:
-	a = pickarena(hint+tries);
+	a = pickarena(hint, tries);
 	if(a == nil || tries == fs->narena){
 		werrstr("no empty arenas");
 		return -1;
@@ -728,6 +721,11 @@
 
 	if(b->type != Traw)
 		PACK16(b->buf, b->type);
+
+	lock(&fs->freelk);
+	b->qgen = fs->qgen;
+	unlock(&fs->freelk);
+
 	switch(b->type){
 	default:
 	case Tpivot:
@@ -751,6 +749,7 @@
 	case Traw:
 		b->bp.hash = blkhash(b);
 		break;
+	case Tmagic:
 	case Tarena:
 		break;
 	}
@@ -808,7 +807,6 @@
 	if(b == nil || adec(&b->ref) != 0)
 		return;
 	b->lastdrop = getcallerpc(&b);
-//	assert(b->cprev == nil && b->cnext == nil);
 	/*
 	 * While a freed block can get resurrected
 	 * before quiescence, it's unlikely -- so
@@ -946,16 +944,29 @@
 	assert(checkflag(b, Bdirty));
 	holdblk(b);
 	finalize(b);
-	chsend(a->sync, b);
+	qput(a->sync, b);
 }
 
-static void
-qput(Flushq *q, Blk *b)
+void
+qinit(Syncq *q)
 {
+	q->fullrz.l = &q->lk;
+	q->emptyrz.l = &q->lk;
+	q->nheap = 0;
+	q->heapsz = fs->cmax;
+	if((q->heap = malloc(q->heapsz*sizeof(Blk*))) == nil)
+		sysfatal("alloc queue: %r");
+
+}
+
+void
+qput(Syncq *q, Blk *b)
+{
 	int i;
 
-	if(q->nheap == q->heapsz)
-		abort();
+	qlock(&q->lk);
+	while(q->nheap == q->heapsz)
+		rsleep(&q->fullrz);
 	for(i = q->nheap; i > 0; i = (i-1)/2){
 		if(blkcmp(b, q->heap[(i-1)/2]) == 1)
 			break;
@@ -963,19 +974,22 @@
 	}
 	q->heap[i] = b;
 	q->nheap++;
+	rwakeup(&q->emptyrz);
+	qunlock(&q->lk);
 }
 
 static Blk*
-qpop(Flushq *q)
+qpop(Syncq *q)
 {
 	int i, l, r, m;
 	Blk *b, *t;
 
-	if(q->nheap == 0)
-		return nil;
+	qlock(&q->lk);
+	while(q->nheap == 0)
+		rsleep(&q->emptyrz);
 	b = q->heap[0];
 	if(--q->nheap == 0)
-		return b;
+		goto Out;
 
 	i = 0;
 	q->heap[0] = q->heap[q->nheap];
@@ -994,6 +1008,9 @@
 		q->heap[i] = t;
 		i = m;
 	}
+Out:
+	rwakeup(&q->fullrz);
+	qunlock(&q->lk);
 	return b;
 
 }
@@ -1001,32 +1018,19 @@
 void
 runsync(int, void *p)
 {
-	Flushq q;
-	Chan *c;
+	Syncq *q;
 	Blk *b;
 
-	c = p;
-	q.nheap = 0;
-	q.heapsz = 2*fs->cmax/fs->narena;
-	if((q.heap = malloc(q.heapsz*sizeof(Blk*))) == nil)
-		sysfatal("alloc queue: %r");
+	q = p;
 	while(1){
-		while(q.nheap < q.heapsz){
-			b = chrecv(c, q.nheap == 0);
-			if(b == &magic){
-				qlock(&fs->synclk);
-				if(--fs->syncing == 0)
-					rwakeupall(&fs->syncrz);
-				qunlock(&fs->synclk);
-				continue;
-			}				
-			if(b != nil)
-				qput(&q, b);
-			if(b == nil || q.nheap == q.heapsz)
-				break;
-		}
-	
-		b = qpop(&q);
+		b = qpop(q);
+		if(b->type == Tmagic){
+			qlock(&fs->synclk);
+			if(--fs->syncing == 0)
+				rwakeupall(&fs->syncrz);
+			qunlock(&fs->synclk);
+			continue;
+		}				
 		if(!checkflag(b, Bfreed)){
 			if(syncblk(b) == -1){
 				ainc(&fs->broken);
@@ -1042,12 +1046,16 @@
 sync(void)
 {
 	Arena *a;
+	Blk *b;
 	int i;
 
 	qlock(&fs->synclk);
 	fs->syncing = fs->nsyncers;
-	for(i = 0; i < fs->nsyncers; i++)
-		chsend(fs->chsync[i], &magic);
+	for(i = 0; i < fs->nsyncers; i++){
+		b = cachepluck();
+		b->type = Tmagic;
+		qput(&fs->syncq[0], b);
+	}
 	while(fs->syncing != 0)
 		rsleep(&fs->syncrz);
 	for(i = 0; i < fs->narena; i++){
--- a/dat.h
+++ b/dat.h
@@ -18,6 +18,7 @@
 typedef struct Arange	Arange;
 typedef struct Bucket	Bucket;
 typedef struct Chan	Chan;
+typedef struct Syncq	Syncq;
 typedef struct Tree	Tree;
 typedef struct Dlist	Dlist;
 typedef struct Mount	Mount;
@@ -216,6 +217,7 @@
 	Tleaf,
 	Tlog,
 	Tdead,
+	Tmagic,
 	Tarena = 0x6765,	/* 'ge' bigendian */
 };
 
@@ -388,6 +390,15 @@
 	char	name[128];
 };
 
+struct Syncq {
+	QLock	lk;
+	Rendez	fullrz;
+	Rendez	emptyrz;
+	Blk	**heap;
+	int	nheap;
+	int	heapsz;
+};
+
 struct Stats {
 	vlong	cachehit;
 	vlong	cachelook;
@@ -433,7 +444,7 @@
 	Lock	activelk;
 	ulong	active[32];
 	int	lastactive[32];
-	Chan	*chsync[32];
+	Syncq	syncq[32];
 
 	Lock	freelk;
 	Bfree	*freep;
@@ -485,7 +496,7 @@
 	/* freelist */
 	Bptr	head;
 	Blk	*tail;	/* tail held open for writing */
-	Chan	*sync;
+	Syncq	*sync;
 };
 
 struct Xdir {
--- a/dump.c
+++ b/dump.c
@@ -272,6 +272,9 @@
 			}
 		}
 		break;
+	case Tmagic:
+		fprint(fd, "magic\n");
+		break;
 	case Tarena:
 		fprint(fd, "arena -- ");
 		goto Show;
--- a/fns.h
+++ b/fns.h
@@ -40,6 +40,9 @@
 Blk*	cacheget(vlong);
 Blk*	cachepluck(void);
 
+void	qinit(Syncq*);
+void	qput(Syncq*, Blk*);
+
 Arena*	getarena(vlong);
 int	syncblk(Blk*);
 void	enqueue(Blk*);
--- a/main.c
+++ b/main.c
@@ -210,9 +210,9 @@
 	if(fs->nsyncers > fs->narena)
 		fs->nsyncers = fs->narena;
 	for(i = 0; i < fs->nsyncers; i++)
-		fs->chsync[i] = mkchan(1024);
+		qinit(&fs->syncq[i]);
 	for(i = 0; i < fs->narena; i++)
-		fs->arenas[i].sync = fs->chsync[i%fs->nsyncers];
+		fs->arenas[i].sync = &fs->syncq[i%fs->nsyncers];
 	srvfd = postfd(srvname, "");
 	ctlfd = postfd(srvname, ".cmd");
 	launch(runtasks, -1, nil, "tasks");
@@ -221,7 +221,7 @@
 	for(i = 0; i < 2; i++)
 		launch(runread, fs->nquiesce++, nil, "readio");
 	for(i = 0; i < fs->nsyncers; i++)
-		launch(runsync, -1, fs->chsync[i], "syncio");
+		launch(runsync, -1, &fs->syncq[i], "syncio");
 	for(i = 0; i < nann; i++)
 		launch(runannounce, -1, ann[i], "announce");
 	if(srvfd != -1)