ref: 7119fdec0b6fd8b3d1b0c6034950644d108400a3
parent: 729a62cf4172100f394553f2c4d69f77148c8ce5
author: Ori Bernstein <ori@eigenstate.org>
date: Thu Oct 19 19:36:32 EDT 2023
fs: sweep blocks in background
--- a/cons.c
+++ b/cons.c
@@ -26,14 +26,11 @@
static void
sendsync(int fd, int halt)
{
- Fmsg *m;
Amsg *a;
- m = mallocz(sizeof(Fmsg), 1);
a = mallocz(sizeof(Amsg), 1);
- if(m == nil || a == nil){
+ if(a == nil){
fprint(fd, "alloc sync msg: %r\n");
- free(m);
free(a);
return;
}
@@ -40,8 +37,7 @@
a->op = AOsync;
a->halt = halt;
a->fd = fd;
- m->a = a;
- chsend(fs->wrchan, m);
+ chsend(fs->admchan, a);
}
static void
@@ -61,12 +57,10 @@
static void
snapfs(int fd, char **ap, int)
{
- Fmsg *m;
Amsg *a;
- m = mallocz(sizeof(Fmsg), 1);
a = mallocz(sizeof(Amsg), 1);
- if(m == nil || a == nil){
+ if(a == nil){
fprint(fd, "alloc sync msg: %r\n");
goto Error;
}
@@ -83,12 +77,10 @@
}
a->op = AOsnap;
a->fd = fd;
- m->a = a;
sendsync(fd, 0);
- chsend(fs->wrchan, m);
+ chsend(fs->admchan, a);
return;
Error:
- free(m);
free(a);
return;
}
--- a/dat.h
+++ b/dat.h
@@ -313,6 +313,7 @@
AOnone,
AOsnap,
AOsync,
+ AOclear,
};
struct Bptr {
@@ -375,6 +376,11 @@
struct { /* AOsync */
int halt;
};
+ struct { /* AOclear */
+ Mount *mnt;
+ vlong qpath;
+ vlong length;
+ };
};
};
@@ -382,7 +388,6 @@
Fcall;
Conn *conn;
int sz; /* the size of the message buf */
- Amsg *a; /* admin messages */
uchar buf[];
};
@@ -494,8 +499,10 @@
Conn *conns;
Chan *wrchan;
+ Chan *admchan;
Chan **rdchan;
+ QLock mutlk;
int nworker;
long epoch;
long lepoch[32];
--- a/fns.h
+++ b/fns.h
@@ -187,3 +187,5 @@
void runcons(int, void*);
void runtasks(int, void*);
void runsync(int, void*);
+void runsweep(int, void*);
+void runsweep(int, void*);
--- a/fs.c
+++ b/fs.c
@@ -106,13 +106,6 @@
d->muid = -1;
}
-static void
-freemsg(Fmsg *m)
-{
- free(m->a);
- free(m);
-}
-
static int
okname(char *name)
{
@@ -669,7 +662,6 @@
}
m->conn = c;
m->sz = sz;
- m->a = nil;
PBIT32(m->buf, sz);
*pm = m;
return 0;
@@ -1600,7 +1592,7 @@
}
static void
-fsremove(Fmsg *m, int id)
+fsremove(Fmsg *m, Amsg **ao)
{
char upbuf[Upksz];
Fcall r;
@@ -1639,9 +1631,13 @@
if((e = upsert(f->mnt, mb, 1)) != nil)
goto Error;
if(f->dent->qid.type == QTFILE){
- e = clearb(id, f->mnt, f->qpath, 0, f->dent->length);
- if(e != nil)
+ if((*ao = malloc(sizeof(Amsg))) == nil)
goto Error;
+ aincl(&f->mnt->ref, 1);
+ (*ao)->op = AOclear;
+ (*ao)->mnt = f->mnt;
+ (*ao)->qpath = f->qpath;
+ (*ao)->length = f->dent->length;
}
f->dent->gone = 1;
wunlock(f->dent);
@@ -1658,7 +1654,7 @@
}
static void
-fsopen(Fmsg *m, int id)
+fsopen(Fmsg *m, Amsg **ao)
{
char *p, *e, buf[Kvmax];
int mbits;
@@ -1738,8 +1734,17 @@
mb.nk = f->dent->nk;
mb.v = buf;
mb.nv = p - buf;
- clearb(id, f->mnt, f->qpath, 0, f->dent->length);
+ if((*ao = malloc(sizeof(Amsg))) == nil){
+ e = Enomem;
+ goto Error;
+ }
+ aincl(&f->mnt->ref, 1);
+ (*ao)->op = AOclear;
+ (*ao)->mnt = f->mnt;
+ (*ao)->qpath = f->qpath;
+ (*ao)->length = f->dent->length;
if((e = upsert(f->mnt, &mb, 1)) != nil){
+Error:
wunlock(f->dent);
rerror(m, e);
putfid(f);
@@ -2157,47 +2162,37 @@
void
runwrite(int id, void *)
{
- Mount *mnt;
Fmsg *m;
- int ao;
+ Amsg *a;
while(1){
+ a = nil;
m = chrecv(fs->wrchan);
+ if(fs->rdonly){
+ rerror(m, Erdonly);
+ continue;
+ }
+ if(fs->broken){
+ rerror(m, Efs);
+ continue;
+ }
+
epochstart(id);
- ao = (m->a == nil) ? AOnone : m->a->op;
- switch(ao){
- case AOnone:
- if(fs->rdonly){
- rerror(m, Erdonly);
- continue;
- }
- if(fs->broken){
- rerror(m, Efs);
- continue;
- }
- switch(m->type){
- case Tcreate: fscreate(m); break;
- case Twrite: fswrite(m); break;
- case Twstat: fswstat(m); break;
- case Tremove: fsremove(m,id); break;
- case Topen: fsopen(m, id); break;
- }
- break;
- case AOsync:
- if(m->a->halt)
- ainc(&fs->rdonly);
- for(mnt = fs->mounts; mnt != nil; mnt = mnt->next)
- updatesnap(&mnt->root, mnt->root, mnt->name);
- sync();
- freemsg(m);
- break;
- case AOsnap:
- snapfs(m->a->fd, m->a->old, m->a->new);
- freemsg(m);
- break;
+ qlock(&fs->mutlk);
+ switch(m->type){
+ case Tcreate: fscreate(m); break;
+ case Twrite: fswrite(m); break;
+ case Twstat: fswstat(m); break;
+ case Tremove: fsremove(m,&a); break;
+ case Topen: fsopen(m, &a); break;
+ default: abort(); break;
}
- epochend(id);
- epochclean();
+ epochend(id);
+ epochclean();
+ qunlock(&fs->mutlk);
+
+ if(a != nil)
+ chsend(fs->admchan, a);
}
}
@@ -2214,7 +2209,7 @@
case Twalk: fswalk(m); break;
case Tread: fsread(m); break;
case Tstat: fsstat(m); break;
- case Topen: fsopen(m, id); break;
+ case Topen: fsopen(m, nil); break;
}
epochend(id);
}
@@ -2221,19 +2216,72 @@
}
void
+runsweep(int id, void*)
+{
+ char *e, buf[Offksz];
+ Mount *mnt;
+ vlong off;
+ Amsg *a;
+ Msg m;
+
+ while(1){
+ a = chrecv(fs->admchan);
+ switch(a->op){
+ case AOsync:
+ qlock(&fs->mutlk);
+ if(a->halt)
+ ainc(&fs->rdonly);
+ epochstart(id);
+ for(mnt = fs->mounts; mnt != nil; mnt = mnt->next)
+ updatesnap(&mnt->root, mnt->root, mnt->name);
+ sync();
+ epochend(id);
+ qunlock(&fs->mutlk);
+ break;
+ case AOsnap:
+ qlock(&fs->mutlk);
+ epochstart(id);
+ snapfs(a->fd, a->old, a->new);
+ epochend(id);
+ qunlock(&fs->mutlk);
+ break;
+ case AOclear:
+ qlock(&fs->mutlk);
+ for(off = 0; off < a->length; off += Blksz){
+ epochstart(id);
+ m.k = buf;
+ m.nk = sizeof(buf);
+ m.op = Oclearb;
+ m.k[0] = Kdat;
+ PACK64(m.k+1, a->qpath);
+ PACK64(m.k+9, off);
+ m.v = nil;
+ m.nv = 0;
+ if((e = upsert(a->mnt, &m, 1)) != nil){
+ fprint(2, "sweep: %s\n", e);
+ fs->broken++;
+ }
+ epochend(id);
+ }
+ qunlock(&fs->mutlk);
+ clunkmount(a->mnt);
+ break;
+ }
+ free(a);
+ }
+}
+
+void
runtasks(int id, void *)
{
int i, c;
- Fmsg *m;
Amsg *a;
while(1){
sleep(5000);
- m = mallocz(sizeof(Fmsg), 1);
a = mallocz(sizeof(Amsg), 1);
- if(m == nil || a == nil){
+ if(a == nil){
fprint(2, "alloc sync msg: %r\n");
- free(m);
free(a);
return;
}
@@ -2240,8 +2288,7 @@
a->op = AOsync;
a->halt = 0;
a->fd = -1;
- m->a = a;
- chsend(fs->wrchan, m);
+ chsend(fs->admchan, a);
/*
* compresslog is designed to be concurrent with allocation,
--- a/main.c
+++ b/main.c
@@ -226,6 +226,7 @@
if(check && !checkfs(2))
sysfatal("fishy");
fs->wrchan = mkchan(32);
+ fs->admchan = mkchan(32);
fs->nsyncers = nproc/2;
fs->nreaders = 1;
if(fs->nsyncers > fs->narena)
@@ -242,6 +243,7 @@
ctlfd = postfd(srvname, ".cmd");
launch(runcons, (void*)ctlfd, "ctl");
launch(runwrite, nil, "mutate");
+ launch(runsweep, nil, "sweep");
launch(runtasks, nil, "tasks");
for(i = 0; i < fs->nreaders; i++)
launch(runread, fs->rdchan[i], "readio");