ref: 76d83e6d63310062f9e7f01a007085c7622d2ca6
parent: ccbf2ae6831a064e4115aa0ad2181b304dfaeebc
author: Ori Bernstein <ori@eigenstate.org>
date: Tue Nov 14 00:47:46 EST 2023
gefs: fix block exhaustion deadlocks This commit bundles 2 related changes together to prevent block exhaustion. First, we split out the lock semantics for truncation, which has two benefits; first, it allows readers to read truncated files, but more importantly, we can now block on getting the dent outside of an epoch, allowing the sweeper to clean blocks and reclaim them. Second, we now stall in epochclean if our limbo list is getting long; this means that with small caches, we make sure we have enough for the next epoch before we start it.
--- a/blk.c
+++ b/blk.c
@@ -820,10 +820,26 @@
}
void
+limbo(Bfree *f)
+{
+ Bfree *p;
+ ulong ge;
+
+ while(1){
+ ge = agetl(&fs->epoch);
+ p = agetp(&fs->limbo[ge]);
+ f->next = p;
+ if(acasp(&fs->limbo[ge], p, f)){
+ aincl(&fs->nlimbo, 1);
+ break;
+ }
+ }
+}
+
+void
freeblk(Tree *t, Blk *b, Bptr bp)
{
Bfree *f;
- ulong ge;
if(t == &fs->snap || (t != nil && bp.gen <= t->gen)){
killblk(t, bp);
@@ -840,10 +856,7 @@
b->freed = getcallerpc(&t);
f->b = holdblk(b);
}
-
- ge = agetl(&fs->epoch);
- f->next = fs->limbo[ge];
- fs->limbo[ge] = f;
+ limbo(f);
}
void
@@ -867,17 +880,28 @@
void
epochclean(void)
{
- ulong e, ge;
+ ulong c, e, ge, delay;
Bfree *p, *n;
Arena *a;
Qent qe;
int i;
+ delay = 0;
+Again:
+ c = agetl(&fs->nlimbo);
ge = agetl(&fs->epoch);
for(i = 0; i < fs->nworker; i++){
e = agetl(&fs->lepoch[i]);
- if((e & Eactive) && e != (ge | Eactive))
- return;
+ if((e & Eactive) && e != (ge | Eactive)){
+ if(c < fs->cmax/4)
+ return;
+ if(delay < 100)
+ delay++;
+ else
+ fprint(2, "stalled epoch %lx [worker %d]\n", e, i);
+ sleep(delay);
+ goto Again;
+ }
}
p = asetp(&fs->limbo[(ge+1)%3], nil);
asetl(&fs->epoch, (ge+1)%3);
@@ -903,6 +927,7 @@
default:
abort();
}
+ aincl(&fs->nlimbo, -1);
free(p);
}
}
--- a/dat.h
+++ b/dat.h
@@ -602,7 +602,6 @@
Xdir;
Dent *next;
QLock trunclk;
- int truncating;
vlong up;
long ref;
char gone;
--- a/fns.h
+++ b/fns.h
@@ -52,7 +52,7 @@
void epochstart(int);
void epochend(int);
void epochclean(void);
-void freesync(void);
+void limbo(Bfree*);
void freeblk(Tree*, Blk*, Bptr);
int dlappend(Dlist *dl, Bptr);
int killblk(Tree*, Bptr);
--- a/fs.c
+++ b/fs.c
@@ -259,6 +259,18 @@
return btupsert(mnt->root, m, nm);
}
+static void
+truncwait(Dent *de, int id)
+{
+ if(canqlock(&de->trunclk))
+ return;
+ epochend(id);
+ qunlock(&fs->mutlk);
+ qlock(&de->trunclk);
+ qlock(&fs->mutlk);
+ epochstart(id);
+}
+
static int
readb(Fid *f, char *d, vlong o, vlong n, vlong sz)
{
@@ -1164,7 +1176,7 @@
}
static void
-fswstat(Fmsg *m, Amsg **ao)
+fswstat(Fmsg *m, int id, Amsg **ao)
{
char rnbuf[Kvmax], opbuf[Kvmax], upbuf[Upksz];
char *p, *e, strs[65535];
@@ -1188,6 +1200,7 @@
return;
}
de = f->dent;
+ truncwait(de, id);
wlock(de);
if(de->gone){
rerror(m, Ephase);
@@ -1393,6 +1406,7 @@
}
assert(nm <= nelem(mb));
if((e = upsert(f->mnt, mb, nm)) != nil){
+ qunlock(&de->trunclk);
wunlock(de);
rerror(m, e);
goto Out;
@@ -1407,7 +1421,8 @@
Out:
if(!truncate)
- wunlock(de);
+ qunlock(&de->trunclk);
+ wunlock(de);
putfid(f);
}
@@ -1576,7 +1591,7 @@
}
static void
-fsremove(Fmsg *m, Amsg **ao)
+fsremove(Fmsg *m, int id, Amsg **ao)
{
char upbuf[Upksz];
Fcall r;
@@ -1590,6 +1605,7 @@
}
clunkfid(m->conn, f);
+ truncwait(f->dent, id);
wlock(f->dent);
if(f->dent->gone){
e = Ephase;
@@ -1626,6 +1642,7 @@
(*ao)->dent = nil;
}
f->dent->gone = 1;
+ qunlock(&f->dent->trunclk);
wunlock(f->dent);
r.type = Rremove;
@@ -1634,6 +1651,7 @@
return;
Error:
+ qunlock(&f->dent->trunclk);
wunlock(f->dent);
rerror(m, e);
putfid(f);
@@ -1640,7 +1658,7 @@
}
static void
-fsopen(Fmsg *m, Amsg **ao)
+fsopen(Fmsg *m, int id, Amsg **ao)
{
char *p, *e, buf[Kvmax];
int mbits;
@@ -1706,6 +1724,7 @@
// unlock(&fs->root.lk);
// }
if(m->mode & OTRUNC){
+ truncwait(f->dent, id);
wlock(f->dent);
f->dent->muid = f->uid;
f->dent->qid.vers++;
@@ -1732,12 +1751,13 @@
(*ao)->length = f->dent->length;
(*ao)->dent = nil;
if((e = upsert(f->mnt, &mb, 1)) != nil){
-Error:
+Error: qunlock(&f->dent->trunclk);
wunlock(f->dent);
rerror(m, e);
putfid(f);
return;
}
+ qunlock(&f->dent->trunclk);
wunlock(f->dent);
}
unlock(f);
@@ -1938,7 +1958,7 @@
}
static void
-fswrite(Fmsg *m)
+fswrite(Fmsg *m, int id)
{
char sbuf[Wstatmax], kbuf[Max9p/Blksz+2][Offksz], vbuf[Max9p/Blksz+2][Ptrsz];
Bptr bp[Max9p/Blksz + 2];
@@ -1959,9 +1979,11 @@
putfid(f);
return;
}
+ truncwait(f->dent, id);
wlock(f->dent);
if(f->dent->gone){
rerror(m, Ephase);
+ qunlock(&f->dent->trunclk);
wunlock(f->dent);
putfid(f);
return;
@@ -1972,6 +1994,7 @@
rerror(m, e);
else
respond(m, &r);
+ qunlock(&f->dent->trunclk);
wunlock(f->dent);
putfid(f);
return;
@@ -1993,6 +2016,7 @@
if(n == -1){
for(j = 0; j < i; j++)
freeblk(t, nil, bp[i]);
+ qunlock(&f->dent->trunclk);
wunlock(f->dent);
fprint(2, "%r");
putfid(f);
@@ -2029,11 +2053,13 @@
kv[i].nv = p - sbuf;
if((e = upsert(f->mnt, kv, i+1)) != nil){
rerror(m, e);
+ qunlock(&f->dent->trunclk);
wunlock(f->dent);
putfid(f);
abort();
return;
}
+ qunlock(&f->dent->trunclk);
wunlock(f->dent);
r.type = Rwrite;
@@ -2164,12 +2190,12 @@
qlock(&fs->mutlk);
epochstart(id);
switch(m->type){
- case Tcreate: fscreate(m); break;
- case Twrite: fswrite(m); break;
- case Twstat: fswstat(m, &a); break;
- case Tremove: fsremove(m,&a); break;
- case Topen: fsopen(m, &a); break;
- default: abort(); break;
+ case Tcreate: fscreate(m); break;
+ case Twrite: fswrite(m, id); break;
+ case Twstat: fswstat(m, id, &a); break;
+ case Tremove: fsremove(m, id, &a); break;
+ case Topen: fsopen(m, id, &a); break;
+ default: abort(); break;
}
epochend(id);
qunlock(&fs->mutlk);
@@ -2189,11 +2215,11 @@
m = chrecv(ch);
epochstart(id);
switch(m->type){
- case Tattach: fsattach(m); break;
- case Twalk: fswalk(m); break;
- case Tread: fsread(m); break;
- case Tstat: fsstat(m); break;
- case Topen: fsopen(m, nil); break;
+ case Tattach: fsattach(m); break;
+ case Twalk: fswalk(m); break;
+ case Tread: fsread(m); break;
+ case Tstat: fsstat(m); break;
+ case Topen: fsopen(m, id, nil); break;
}
epochend(id);
}
@@ -2298,7 +2324,7 @@
epochclean();
}
if(am->dent != nil){
- wunlock(am->dent);
+ qunlock(&am->dent->trunclk);
clunkdent(am->dent);
}
clunkmount(am->mnt);
--- a/snap.c
+++ b/snap.c
@@ -513,7 +513,6 @@
closesnap(Tree *t)
{
Bfree *f;
- ulong ge;
if(t == nil || adec(&t->memref) != 0)
return;
@@ -521,9 +520,7 @@
abort();
f->op = DFtree;
f->t = t;
- ge = agetl(&fs->epoch);
- f->next = fs->limbo[ge];
- fs->limbo[ge] = f;
+ limbo(f);
}
char*