ref: 14d1179c21b1c93e696ac7ff23dc65b109ca9e3f
parent: d2295eeff9be8b903484084f6663c659de584e5f
author: Ori Bernstein <ori@eigenstate.org>
date: Fri Oct 6 14:10:22 EDT 2023
blk: free blocks from syncq to prevent reordering of writes to reallocated blocks
--- a/blk.c
+++ b/blk.c
@@ -855,7 +855,6 @@
PACK64(b->buf+4, b->loghash);
break;
case Tdat:
- case Tmagic:
case Tarena:
case Tsuper:
break;
@@ -918,10 +917,9 @@
* freed blocks go to the LRU bottom
* for early reuse.
*/
- if(checkflag(b, Bfreed)){
- cachedel(b->bp.addr);
+ if(checkflag(b, Bfreed))
lrubot(b);
- }else
+ else
lrutop(b);
}
@@ -984,6 +982,7 @@
ulong e, ge;
Bfree *p, *n;
Arena *a;
+ Qent qe;
int i;
ge = agetl(&fs->epoch);
@@ -998,31 +997,22 @@
while(p != nil){
n = p->next;
a = getarena(p->bp.addr);
- lock(a);
- blkdealloc_lk(a, p->bp.addr);
- cachedel(p->bp.addr);
- unlock(a);
+ qe.op = Qfree;
+ qe.bp = p->bp;
+ qe.b = nil;
+ qe.qgen = agetv(&fs->qgen);
+ qput(a->sync, qe);
free(p);
p = n;
}
}
-int
-blkcmp(Blk *a, Blk *b)
-{
- if(a->qgen != b->qgen)
- return (a->qgen < b->qgen) ? -1 : 1;
- if(a->bp.addr != b->bp.addr)
- return (a->bp.addr < b->bp.addr) ? -1 : 1;
- return 0;
-}
-
void
enqueue(Blk *b)
{
Arena *a;
+ Qent qe;
- b->qgen = agetv(&fs->qgen);
b->enqueued = getcallerpc(&b);
a = getarena(b->bp.addr);
assert(checkflag(b, Bdirty));
@@ -1029,7 +1019,11 @@
assert(b->bp.addr >= 0);
holdblk(b);
finalize(b);
- qput(a->sync, b);
+ qe.op = Qwrite;
+ qe.qgen = agetv(&fs->qgen);
+ qe.bp = b->bp;
+ qe.b = b;
+ qput(a->sync, qe);
}
void
@@ -1039,13 +1033,29 @@
q->emptyrz.l = &q->lk;
q->nheap = 0;
q->heapsz = fs->cmax;
- if((q->heap = malloc(q->heapsz*sizeof(Blk*))) == nil)
+ if((q->heap = malloc(q->heapsz*sizeof(Qent))) == nil)
sysfatal("alloc queue: %r");
}
+int
+qcmp(Qent *a, Qent *b)
+{
+ if(a->qgen != b->qgen)
+ return (a->qgen < b->qgen) ? -1 : 1;
+ if(a->bp.addr != b->bp.addr)
+ return (a->bp.addr < b->bp.addr) ? -1 : 1;
+ if(a->op != b->op){
+ if(a->op == Qflush)
+ return -1;
+ if(a->op == Qfree)
+ return 1;
+ }
+ return 0;
+}
+
void
-qput(Syncq *q, Blk *b)
+qput(Syncq *q, Qent qe)
{
int i;
@@ -1053,26 +1063,26 @@
while(q->nheap == q->heapsz)
rsleep(&q->fullrz);
for(i = q->nheap; i > 0; i = (i-1)/2){
- if(blkcmp(b, q->heap[(i-1)/2]) == 1)
+ if(qcmp(&qe, &q->heap[(i-1)/2]) == 1)
break;
q->heap[i] = q->heap[(i-1)/2];
}
- q->heap[i] = b;
+ q->heap[i] = qe;
q->nheap++;
rwakeup(&q->emptyrz);
qunlock(&q->lk);
}
-static Blk*
+static Qent
qpop(Syncq *q)
{
int i, l, r, m;
- Blk *b, *t;
+ Qent e, t;
qlock(&q->lk);
while(q->nheap == 0)
rsleep(&q->emptyrz);
- b = q->heap[0];
+ e = q->heap[0];
if(--q->nheap == 0)
goto Out;
@@ -1082,9 +1092,9 @@
m = i;
l = 2*i+1;
r = 2*i+2;
- if(l < q->nheap && blkcmp(q->heap[m], q->heap[l]) == 1)
+ if(l < q->nheap && qcmp(&q->heap[m], &q->heap[l]) == 1)
m = l;
- if(r < q->nheap && blkcmp(q->heap[m], q->heap[r]) == 1)
+ if(r < q->nheap && qcmp(&q->heap[m], &q->heap[r]) == 1)
m = r;
if(m == i)
break;
@@ -1096,7 +1106,7 @@
Out:
rwakeup(&q->fullrz);
qunlock(&q->lk);
- return b;
+ return e;
}
@@ -1103,25 +1113,36 @@
void
runsync(int, void *p)
{
+ Arena *a;
Syncq *q;
- Blk *b;
+ Qent qe;
q = p;
while(1){
- b = qpop(q);
- if(b->type == Tmagic){
+ qe = qpop(q);
+ if(qe.op == Qfree){
+ a = getarena(qe.bp.addr);
+ lock(a);
+ cachedel(qe.bp.addr);
+ blkdealloc_lk(a, qe.bp.addr);
+ if(qe.b != nil)
+ dropblk(qe.b);
+ unlock(a);
+ }else if(qe.op == Qflush){
qlock(&fs->synclk);
if(--fs->syncing == 0)
rwakeupall(&fs->syncrz);
qunlock(&fs->synclk);
- } else if(!checkflag(b, Bfreed)){
- if(syncblk(b) == -1){
+ }else{
+ if(checkflag(qe.b, Bfreed))
+ continue;
+ if(syncblk(qe.b) == -1){
ainc(&fs->broken);
fprint(2, "write: %r\n");
abort();
}
+ dropblk(qe.b);
}
- dropblk(b);
}
}
@@ -1130,7 +1151,7 @@
{
uvlong gen;
Arena *a;
- Blk *b;
+ Qent qe;
int i;
if(fs->rdonly)
@@ -1144,9 +1165,12 @@
gen = aincv(&fs->qgen, 1);
fs->syncing = fs->nsyncers;
for(i = 0; i < fs->nsyncers; i++){
- b = cachepluck();
- b->type = Tmagic;
- qput(&fs->syncq[i], b);
+ qe.op = Qflush;
+ qe.bp.addr = 0;
+ qe.bp.hash = -1;
+ qe.bp.gen = -1;
+ qe.b = nil;
+ qput(&fs->syncq[i], qe);
}
while(fs->syncing != 0)
rsleep(&fs->syncrz);
--- a/dat.h
+++ b/dat.h
@@ -19,6 +19,7 @@
typedef struct Bucket Bucket;
typedef struct Chan Chan;
typedef struct Syncq Syncq;
+typedef struct Qent Qent;
typedef struct Tree Tree;
typedef struct Dlist Dlist;
typedef struct Mount Mount;
@@ -407,6 +408,7 @@
struct Bfree {
Bfree *next;
+ Blk *b;
Bptr bp;
};
@@ -418,11 +420,24 @@
char name[128];
};
+enum {
+ Qflush,
+ Qwrite,
+ Qfree,
+};
+
+struct Qent {
+ long qgen;
+ Bptr bp;
+ Blk *b;
+ int op;
+};
+
struct Syncq {
QLock lk;
Rendez fullrz;
Rendez emptyrz;
- Blk **heap;
+ Qent *heap;
int nheap;
int heapsz;
};
@@ -453,7 +468,6 @@
/* superblocks */
Blk *sb0; /* primary */
Blk *sb1; /* backup */
- long *statemap; /* for debugging */
/* arena allocation */
Arena *arenas;
@@ -651,7 +665,6 @@
Blk *fnext;
long flag;
- long qgen;
/* serialized to disk in header */
short type; /* @0, for all */
--- a/dump.c
+++ b/dump.c
@@ -310,9 +310,6 @@
}
}
break;
- case Tmagic:
- fprint(fd, "magic\n");
- break;
case Tarena:
fprint(fd, "arena -- ");
goto Show;
--- a/fns.h
+++ b/fns.h
@@ -44,7 +44,7 @@
Blk* cachepluck(void);
void qinit(Syncq*);
-void qput(Syncq*, Blk*);
+void qput(Syncq*, Qent);
Arena* getarena(vlong);
int syncblk(Blk*);
--- a/load.c
+++ b/load.c
@@ -60,7 +60,6 @@
char *e;
Tree *t;
int i, k;
- Dir *d;
if((mnt = mallocz(sizeof(*mnt), 1)) == nil)
sysfatal("malloc: %r");
@@ -88,10 +87,6 @@
sysfatal("superblock: %r");
if((fs->arenas = calloc(fs->narena, sizeof(Arena))) == nil)
sysfatal("malloc: %r");
- if((d = dirfstat(fs->fd)) == nil)
- sysfatal("wut");
- if((fs->statemap = malloc((d->length/Blksz+1)*sizeof(long))) == nil)
- sysfatal("wut");
for(i = 0; i < fs->narena; i++){
a = &fs->arenas[i];
memset(a, 0, sizeof(Arena));