ref: e9bb7d370d769f3bec1b1ef1606f405ba4044255
parent: 3d6613cadcbc1990e32e283a12152df0f2caf511
author: Ori Bernstein <ori@eigenstate.org>
date: Sun Jan 30 00:50:53 EST 2022
blk: sync blocks to disk in background This moves the write bottleneck to the root block copy, which is the next thing to examine.
--- a/blk.c
+++ b/blk.c
@@ -6,17 +6,27 @@
#include "dat.h"
#include "fns.h"
-typedef struct Range Range;
+typedef struct Range Range;
+typedef struct Flushq Flushq;
+
struct Range {
vlong off;
vlong len;
};
+struct Flushq {
+ Blk **heap;
+ int nheap;
+ int heapsz;
+};
+
static vlong blkalloc_lk(Arena*);
static vlong blkalloc(int);
static int blkdealloc_lk(vlong);
static Blk* initblk(vlong, int);
+static Blk magic;
+
void
setflag(Blk *b, int flg)
{
@@ -51,18 +61,6 @@
return pwrite(fs->fd, b->buf, Blksz, b->bp.addr);
}
-void
-enqueue(Blk *b)
-{
- assert(b->flag & Bdirty);
- finalize(b);
- if(syncblk(b) == -1){
- ainc(&fs->broken);
- fprint(2, "write: %r");
- abort();
- }
-}
-
static Blk*
readblk(vlong bp, int flg)
{
@@ -676,6 +674,7 @@
return nil;
if((b = initblk(bp, t)) == nil)
return nil;
+
setmalloctag(b, getcallerpc(&t));
return b;
}
@@ -784,7 +783,7 @@
if(b == nil || adec(&b->ref) != 0)
return;
assert(!(b->flag & Bcached));
- assert((b->flag & Bqueued) || !(b->flag & Bdirty));
+ assert((b->flag & Bfreed) || !(b->flag & Bdirty));
free(b);
}
@@ -810,8 +809,8 @@
void
freeblk(Tree *t, Blk *b)
{
- assert(!(b->flag & Bqueued));
b->freed = getcallerpc(&b);
+ setflag(b, Bfreed);
freebp(t, b->bp);
}
@@ -874,30 +873,140 @@
}
int
-sync(void)
+blkcmp(Blk *a, Blk *b)
{
- int i, r;
+ if(a->bp.gen != b->bp.gen)
+ return (a->bp.gen < b->bp.gen) ? -1 : 1;
+ if(a->bp.addr != b->bp.addr)
+ return (a->bp.addr < b->bp.addr) ? -1 : 1;
+ return 0;
+}
+
+void
+enqueue(Blk *b)
+{
Arena *a;
-// Blk *b;
- r = 0;
+ a = getarena(b->bp.addr);
+ assert(b->flag & Bdirty);
+ refblk(b);
+ finalize(b);
+ chsend(a->sync, b);
+}
- qlock(&fs->snaplk);
+void
+qput(Flushq *q, Blk *b)
+{
+ Blk *t;
+ int i;
+
+ if(q->nheap == q->heapsz)
+ abort();
+ q->heap[q->nheap] = b;
+ for(i = q->nheap; i > 0; i = (i-1)/2){
+ if(blkcmp(q->heap[i], q->heap[(i-1)/2]) == -1)
+ break;
+ t = q->heap[i];
+ q->heap[i] = q->heap[(i-1)/2];
+ q->heap[(i-1)/2] = t;
+ }
+ q->nheap++;
+
+}
+
+Blk*
+qpop(Flushq *q)
+{
+ int i, l, r, m;
+ Blk *b, *t;
+
+ if(q->nheap == 0)
+ return nil;
+ b = q->heap[0];
+ if(--q->nheap == 0)
+ return b;
+
+ i = 0;
+ q->heap[0] = q->heap[q->nheap];
+ while(1){
+ m = i;
+ l = 2*i+1;
+ r = 2*i+2;
+ if(l < q->nheap && blkcmp(q->heap[m], q->heap[l]) == -1)
+ m = l;
+ if(r < q->nheap && blkcmp(q->heap[m], q->heap[r]) == -1)
+ m = r;
+ if(m == i)
+ break;
+ t = q->heap[m];
+ q->heap[m] = q->heap[i];
+ q->heap[i] = t;
+ i = m;
+ }
+ return b;
+
+}
+
+void
+runsync(int, void *p)
+{
+ Flushq q;
+ Chan *c;
+ Blk *b;
+
+ c = p;
+ q.nheap = 0;
+ q.heapsz = fs->cmax;
+ if((q.heap = malloc(q.heapsz*sizeof(Blk*))) == nil)
+ sysfatal("alloc queue: %r");
+ while(1){
+ while(q.nheap < q.heapsz){
+ b = chrecv(c, q.nheap == 0);
+ if(b == &magic){
+ if(adec(&fs->syncing) == 0){
+ qlock(&fs->synclk);
+ rwakeupall(&fs->syncrz);
+ qunlock(&fs->synclk);
+ }
+ continue;
+ }
+ if(b != nil)
+ qput(&q, b);
+ if(b == nil || q.nheap == q.heapsz)
+ break;
+ }
+
+ b = qpop(&q);
+ if(!(b->flag & Bfreed)){
+ if(syncblk(b) == -1){
+ ainc(&fs->broken);
+ fprint(2, "write: %r");
+ abort();
+ }
+ }
+ putblk(b);
+ }
+}
+
+void
+sync(void)
+{
+ Arena *a;
+ int i;
+
+ qlock(&fs->synclk);
+ fs->syncing = fs->nsyncers;
+ for(i = 0; i < fs->nsyncers; i++)
+ chsend(fs->chsync[i], &magic);
+ while(fs->syncing != 0)
+ rsleep(&fs->syncrz);
for(i = 0; i < fs->narena; i++){
a = &fs->arenas[i];
finalize(a->tail);
if(syncblk(a->tail) == -1)
- r = -1;
+ sysfatal("sync arena: %r");
if(syncarena(a) == -1)
- r = -1;
+ sysfatal("sync arena: %r");
}
-//
-// for(b = fs->chead; b != nil; b = b->cnext){
-// if((b->flag & Bdirty) == 0)
-// continue;
-// if(syncblk(b) == -1)
-// r = -1;
-// }
- qunlock(&fs->snaplk);
- return r;
+ qunlock(&fs->synclk);
}
--- a/check.c
+++ b/check.c
@@ -55,7 +55,7 @@
getval(b, 0, &x);
if(lo && keycmp(lo, &x) > 0){
fprint(fd, "out of range keys %P != %P\n", lo, &x);
- showblk(2, b, "wut", 1);
+ showblk(2, b, "out of range", 1);
fail++;
}
for(i = 1; i < b->nval; i++){
--- a/dat.h
+++ b/dat.h
@@ -433,7 +433,12 @@
/* arena allocation */
Arena *arenas;
long roundrobin;
+ long syncing;
+ long nsyncers;
+
int gotinfo;
+ QLock synclk;
+ Rendez syncrz;
QLock snaplk; /* snapshot lock */
Mount *mounts;
@@ -446,6 +451,7 @@
Lock activelk;
int active[32];
int lastactive[32];
+ Chan *chsync[32];
Lock freelk;
Bfree *freep;
Bfree *freehd;
@@ -495,6 +501,7 @@
/* freelist */
Bptr head;
Blk *tail; /* tail held open for writing */
+ Chan *sync;
};
struct Xdir {
@@ -611,6 +618,7 @@
Blk *fnext;
long flag;
+ long syncgen;
/* serialized to disk in header */
short type; /* @0, for all */
--- a/fns.h
+++ b/fns.h
@@ -44,7 +44,7 @@
void reamfs(char*);
int loadarena(Arena*, Fshdr *fi, vlong);
void loadfs(char*);
-int sync(void);
+void sync(void);
int loadlog(Arena*);
int scandead(Dlist*, int, void(*)(Bptr, void*), void*);
int endfs(void);
@@ -132,13 +132,14 @@
int Qconv(Fmt*);
Chan* mkchan(int);
-Fmsg* chrecv(Chan*);
-void chsend(Chan*, Fmsg*);
+void* chrecv(Chan*, int);
+void chsend(Chan*, void*);
void runfs(int, void*);
void runwrite(int, void*);
void runread(int, void*);
void runcons(int, void*);
void runtasks(int, void*);
+void runsync(int, void*);
/* it's in libc... */
extern int cas(long*, long, long);
--- a/fs.c
+++ b/fs.c
@@ -65,7 +65,7 @@
if(u != nil)
closesnap(u);
closesnap(t);
- /* we probably want explicit snapshots to be resilient */
+ /* we probably want explicit snapshots to get synced */
sync();
fprint(fd, "snap taken: %s\n", new);
}
@@ -111,15 +111,19 @@
}
-Fmsg*
-chrecv(Chan *c)
+void*
+chrecv(Chan *c, int block)
{
void *a;
long v;
v = c->count;
- if(v == 0 || cas(&c->count, v, v-1) == 0)
- semacquire(&c->count, 1);
+ if(v == 0 || cas(&c->count, v, v-1) == 0){
+ if(block)
+ semacquire(&c->count, 1);
+ else
+ return nil;
+ }
lock(&c->rl);
a = *c->rp;
if(++c->rp >= &c->args[c->size])
@@ -130,7 +134,7 @@
}
void
-chsend(Chan *c, Fmsg *m)
+chsend(Chan *c, void *m)
{
long v;
@@ -166,10 +170,8 @@
r->tag = m->tag;
dprint("→ %F\n", r);
- if((n = convS2M(r, buf, sizeof(buf))) == 0){
- fprint(2, "wut: %r\n");
+ if((n = convS2M(r, buf, sizeof(buf))) == 0)
abort();
- }
w = write(m->fd, buf, n);
if(w != n)
fshangup(m->fd, Eio);
@@ -1686,7 +1688,7 @@
int ao;
while(1){
- m = chrecv(fs->wrchan);
+ m = chrecv(fs->wrchan, 1);
quiesce(wid);
ao = (m->a == nil) ? AOnone : m->a->op;
switch(ao){
@@ -1732,7 +1734,7 @@
Fmsg *m;
while(1){
- m = chrecv(fs->rdchan);
+ m = chrecv(fs->rdchan, 1);
quiesce(wid);
switch(m->type){
case Tflush: rerror(m, Eimpl); break;
--- a/main.c
+++ b/main.c
@@ -15,6 +15,7 @@
int nproc;
char *forceuser;
char *srvname = "gefs";
+int cachesz = 512*MiB;
vlong
inc64(vlong *v, vlong dv)
@@ -88,10 +89,8 @@
main(int argc, char **argv)
{
int i, srvfd, ctlfd;
- vlong cachesz;
char *s;
- cachesz = 512*MiB;
ARGBEGIN{
case 'r':
ream = 1;
@@ -141,22 +140,35 @@
nproc = atoi(s);
if(nproc == 0)
nproc = 2;
+ if(nproc > nelem(fs->active))
+ nproc = nelem(fs->active);
if(ream){
reamfs(argv[0]);
exits(nil);
- }else{
- fs->rdchan = mkchan(128);
- fs->wrchan = mkchan(128);
- srvfd = postfd(srvname, "");
- ctlfd = postfd(srvname, ".cmd");
- loadfs(argv[0]);
- launch(runtasks, -1, nil, "tasks");
- launch(runcons, fs->nquiesce++, (void*)ctlfd, "ctl");
- launch(runwrite, fs->nquiesce++, nil, "writeio");
- for(i = 0; i < nproc; i++)
- launch(runread, fs->nquiesce++, nil, "readio");
- if(srvfd != -1)
- launch(runfs, fs->nquiesce++, (void*)srvfd, "srvio");
- exits(nil);
}
+
+ loadfs(argv[0]);
+
+ fs->syncrz.l = &fs->synclk;
+ fs->rdchan = mkchan(32);
+ fs->wrchan = mkchan(32);
+ fs->nsyncers = nproc;
+ if(fs->nsyncers > fs->narena)
+ fs->nsyncers = fs->narena;
+ for(i = 0; i < fs->nsyncers; i++)
+ fs->chsync[i] = mkchan(128);
+ for(i = 0; i < fs->narena; i++)
+ fs->arenas[i].sync = fs->chsync[i%nproc];
+ srvfd = postfd(srvname, "");
+ ctlfd = postfd(srvname, ".cmd");
+ launch(runtasks, -1, nil, "tasks");
+ launch(runcons, fs->nquiesce++, (void*)ctlfd, "ctl");
+ launch(runwrite, fs->nquiesce++, nil, "mutate");
+ for(i = 0; i < nproc; i++)
+ launch(runread, fs->nquiesce++, nil, "readio");
+ for(i = 0; i < fs->nsyncers; i++)
+ launch(runsync, -1, fs->chsync[i], "syncio");
+ if(srvfd != -1)
+ launch(runfs, fs->nquiesce++, (void*)srvfd, "srvio");
+ exits(nil);
}
--- a/ream.c
+++ b/ream.c
@@ -218,7 +218,15 @@
putblk(tb);
putblk(rb);
+ for(i = 0; i < fs->narena; i++){
+ a = &fs->arenas[i];
+ finalize(a->tail);
+ if(syncblk(a->tail) == -1)
+ sysfatal("sync arena: %r");
+ packarena(a->b->data, Blkspc, a, fs);
+ finalize(a->b);
+ if(syncblk(a->b) == -1)
+ sysfatal("sync arena: %r");
+ }
free(mnt);
- if(sync() == -1)
- sysfatal("ream: sync: %r");
}
--- a/snap.c
+++ b/snap.c
@@ -148,7 +148,7 @@
Tree*
openlabel(char *name)
{
- char *p, buf[Keymax];
+ char *p, buf[Kvmax];
Kvp kv;
Key k;