ref: 7e2b02ff91250749088f481e3a5bbba595051cdb
parent: e62e48db2a4c71875f9e7ab44f8c11ea3b749a5f
author: Ori Bernstein <ori@eigenstate.org>
date: Mon Oct 17 22:25:58 EDT 2022
blk: use sync queue directly, instead of sending blocks over channel there's no reason to queue blocks so that we can put them into a queue.
--- a/blk.c
+++ b/blk.c
@@ -7,7 +7,6 @@
#include "fns.h"
typedef struct Range Range;
-typedef struct Flushq Flushq;
struct Range {
vlong off;
@@ -14,12 +13,6 @@
vlong len;
};
-struct Flushq {
- Blk **heap;
- int nheap;
- int heapsz;
-};
-
static vlong blkalloc_lk(Arena*);
static vlong blkalloc(int);
static int blkdealloc_lk(vlong);
@@ -26,8 +19,6 @@
static Blk* initblk(Blk*, vlong, int);
static int logop(Arena *, vlong, vlong, int);
-static Blk magic;
-
int
checkflag(Blk *b, int f)
{
@@ -139,11 +130,13 @@
}
static Arena*
-pickarena(int hint)
+pickarena(int hint, int tries)
{
int n;
- n = hint+ainc(&fs->roundrobin)/(1024*1024);
+ n = hint*7;
+ n += tries;
+ n += ainc(&fs->roundrobin)/(1024*1024);
return &fs->arenas[n%fs->narena];
}
@@ -447,7 +440,7 @@
if(a->tail != nil){
finalize(a->tail);
if(syncblk(a->tail) == -1){
- free(b);
+ dropblk(b);
return -1;
}
}
@@ -606,7 +599,7 @@
tries = 0;
Again:
- a = pickarena(hint+tries);
+ a = pickarena(hint, tries);
if(a == nil || tries == fs->narena){
werrstr("no empty arenas");
return -1;
@@ -728,6 +721,11 @@
if(b->type != Traw)
PACK16(b->buf, b->type);
+
+ lock(&fs->freelk);
+ b->qgen = fs->qgen;
+ unlock(&fs->freelk);
+
switch(b->type){
default:
case Tpivot:
@@ -751,6 +749,7 @@
case Traw:
b->bp.hash = blkhash(b);
break;
+ case Tmagic:
case Tarena:
break;
}
@@ -808,7 +807,6 @@
if(b == nil || adec(&b->ref) != 0)
return;
b->lastdrop = getcallerpc(&b);
-// assert(b->cprev == nil && b->cnext == nil);
/*
* While a freed block can get resurrected
* before quiescence, it's unlikely -- so
@@ -946,16 +944,29 @@
assert(checkflag(b, Bdirty));
holdblk(b);
finalize(b);
- chsend(a->sync, b);
+ qput(a->sync, b);
}
-static void
-qput(Flushq *q, Blk *b)
+void
+qinit(Syncq *q)
{
+ q->fullrz.l = &q->lk;
+ q->emptyrz.l = &q->lk;
+ q->nheap = 0;
+ q->heapsz = fs->cmax;
+ if((q->heap = malloc(q->heapsz*sizeof(Blk*))) == nil)
+ sysfatal("alloc queue: %r");
+
+}
+
+void
+qput(Syncq *q, Blk *b)
+{
int i;
- if(q->nheap == q->heapsz)
- abort();
+ qlock(&q->lk);
+ while(q->nheap == q->heapsz)
+ rsleep(&q->fullrz);
for(i = q->nheap; i > 0; i = (i-1)/2){
if(blkcmp(b, q->heap[(i-1)/2]) == 1)
break;
@@ -963,19 +974,22 @@
}
q->heap[i] = b;
q->nheap++;
+ rwakeup(&q->emptyrz);
+ qunlock(&q->lk);
}
static Blk*
-qpop(Flushq *q)
+qpop(Syncq *q)
{
int i, l, r, m;
Blk *b, *t;
- if(q->nheap == 0)
- return nil;
+ qlock(&q->lk);
+ while(q->nheap == 0)
+ rsleep(&q->emptyrz);
b = q->heap[0];
if(--q->nheap == 0)
- return b;
+ goto Out;
i = 0;
q->heap[0] = q->heap[q->nheap];
@@ -994,6 +1008,9 @@
q->heap[i] = t;
i = m;
}
+Out:
+ rwakeup(&q->fullrz);
+ qunlock(&q->lk);
return b;
}
@@ -1001,32 +1018,19 @@
void
runsync(int, void *p)
{
- Flushq q;
- Chan *c;
+ Syncq *q;
Blk *b;
- c = p;
- q.nheap = 0;
- q.heapsz = 2*fs->cmax/fs->narena;
- if((q.heap = malloc(q.heapsz*sizeof(Blk*))) == nil)
- sysfatal("alloc queue: %r");
+ q = p;
while(1){
- while(q.nheap < q.heapsz){
- b = chrecv(c, q.nheap == 0);
- if(b == &magic){
- qlock(&fs->synclk);
- if(--fs->syncing == 0)
- rwakeupall(&fs->syncrz);
- qunlock(&fs->synclk);
- continue;
- }
- if(b != nil)
- qput(&q, b);
- if(b == nil || q.nheap == q.heapsz)
- break;
- }
-
- b = qpop(&q);
+ b = qpop(q);
+ if(b->type == Tmagic){
+ qlock(&fs->synclk);
+ if(--fs->syncing == 0)
+ rwakeupall(&fs->syncrz);
+ qunlock(&fs->synclk);
+ continue;
+ }
if(!checkflag(b, Bfreed)){
if(syncblk(b) == -1){
ainc(&fs->broken);
@@ -1042,12 +1046,16 @@
sync(void)
{
Arena *a;
+ Blk *b;
int i;
qlock(&fs->synclk);
fs->syncing = fs->nsyncers;
- for(i = 0; i < fs->nsyncers; i++)
- chsend(fs->chsync[i], &magic);
+ for(i = 0; i < fs->nsyncers; i++){
+ b = cachepluck();
+ b->type = Tmagic;
+ qput(&fs->syncq[0], b);
+ }
while(fs->syncing != 0)
rsleep(&fs->syncrz);
for(i = 0; i < fs->narena; i++){
--- a/dat.h
+++ b/dat.h
@@ -18,6 +18,7 @@
typedef struct Arange Arange;
typedef struct Bucket Bucket;
typedef struct Chan Chan;
+typedef struct Syncq Syncq;
typedef struct Tree Tree;
typedef struct Dlist Dlist;
typedef struct Mount Mount;
@@ -216,6 +217,7 @@
Tleaf,
Tlog,
Tdead,
+ Tmagic,
Tarena = 0x6765, /* 'ge' bigendian */
};
@@ -388,6 +390,15 @@
char name[128];
};
+struct Syncq {
+ QLock lk;
+ Rendez fullrz;
+ Rendez emptyrz;
+ Blk **heap;
+ int nheap;
+ int heapsz;
+};
+
struct Stats {
vlong cachehit;
vlong cachelook;
@@ -433,7 +444,7 @@
Lock activelk;
ulong active[32];
int lastactive[32];
- Chan *chsync[32];
+ Syncq syncq[32];
Lock freelk;
Bfree *freep;
@@ -485,7 +496,7 @@
/* freelist */
Bptr head;
Blk *tail; /* tail held open for writing */
- Chan *sync;
+ Syncq *sync;
};
struct Xdir {
--- a/dump.c
+++ b/dump.c
@@ -272,6 +272,9 @@
}
}
break;
+ case Tmagic:
+ fprint(fd, "magic\n");
+ break;
case Tarena:
fprint(fd, "arena -- ");
goto Show;
--- a/fns.h
+++ b/fns.h
@@ -40,6 +40,9 @@
Blk* cacheget(vlong);
Blk* cachepluck(void);
+void qinit(Syncq*);
+void qput(Syncq*, Blk*);
+
Arena* getarena(vlong);
int syncblk(Blk*);
void enqueue(Blk*);
--- a/main.c
+++ b/main.c
@@ -210,9 +210,9 @@
if(fs->nsyncers > fs->narena)
fs->nsyncers = fs->narena;
for(i = 0; i < fs->nsyncers; i++)
- fs->chsync[i] = mkchan(1024);
+ qinit(&fs->syncq[i]);
for(i = 0; i < fs->narena; i++)
- fs->arenas[i].sync = fs->chsync[i%fs->nsyncers];
+ fs->arenas[i].sync = &fs->syncq[i%fs->nsyncers];
srvfd = postfd(srvname, "");
ctlfd = postfd(srvname, ".cmd");
launch(runtasks, -1, nil, "tasks");
@@ -221,7 +221,7 @@
for(i = 0; i < 2; i++)
launch(runread, fs->nquiesce++, nil, "readio");
for(i = 0; i < fs->nsyncers; i++)
- launch(runsync, -1, fs->chsync[i], "syncio");
+ launch(runsync, -1, &fs->syncq[i], "syncio");
for(i = 0; i < nann; i++)
launch(runannounce, -1, ann[i], "announce");
if(srvfd != -1)