shithub: gefs

ref: 2a941967c8f06ee6f6eb63bf5399c0d2ba4b89df
dir: /blk.c/

View raw version
#include <u.h>
#include <libc.h>
#include <fcall.h>
#include <avl.h>

#include "dat.h"
#include "fns.h"
#include "atomic.h"

static vlong	blkalloc_lk(Arena*, int);
static vlong	blkalloc(int, uint, int);
static void	blkdealloc_lk(Arena*, vlong);
static Blk*	initblk(Blk*, vlong, vlong, int);
static void	readblk(Blk*, Bptr, int);

int
checkflag(Blk *b, int set, int clr)
{
	long v;

	v = agetl(&b->flag);
	return (v & (set|clr)) == set;
}

void
setflag(Blk *b, int set, int clr)
{
	long ov, nv;

	while(1){
		ov = agetl(&b->flag);
		nv = (ov & ~clr) | set;
		if(acasl(&b->flag, ov, nv))
			break;
	}
}

void
syncblk(Blk *b)
{
	assert(checkflag(b, Bfinal, 0));
	assert(b->bp.addr >= 0);
	tracex("syncblk", b->bp, b->type, -1);
	if(pwrite(fs->fd, b->buf, Blksz, b->bp.addr) == -1)
		broke("%B %s: %r", b->bp, Eio);
	setflag(b, 0, Bdirty);
}

static void
readblk(Blk *b, Bptr bp, int flg)
{
	vlong off, xh, ck, rem, n;
	char *p;

	off = bp.addr;
	rem = Blksz;
	while(rem != 0){
		n = pread(fs->fd, b->buf, rem, off);
		if(n <= 0)
			error("%s: %r", Eio);
		off += n;
		rem -= n;
	}
	b->cnext = nil;
	b->cprev = nil;
	b->hnext = nil;

	b->bp.addr = bp.addr;
	b->bp.hash = -1;
	b->bp.gen = -1;

	b->nval = 0;
	b->valsz = 0;
	b->nbuf = 0;
	b->bufsz = 0;
	b->logsz = 0;

	p = b->buf + 2;
	b->type = (flg&GBraw) ? Tdat : UNPACK16(b->buf+0);
	switch(b->type){
	default:
		broke("invalid block type %d @%llx", b->type, bp);
		break;
	case Tdat:
	case Tsuper:
		b->data = b->buf;
		break;
	case Tarena:
		b->data = p;
		break;
	case Tdlist:
	case Tlog:
		b->logsz = UNPACK16(p);		p += 2;
		b->logh = UNPACK64(p);		p += 8;
		b->logp = unpackbp(p, Ptrsz);	p += Ptrsz;
		assert(p - b->buf == Loghdsz);
		b->data = p;
		break;
	case Tpivot:
		b->nval = UNPACK16(p);		p += 2;
		b->valsz = UNPACK16(p);		p += 2;
		b->nbuf = UNPACK16(p);		p += 2;
		b->bufsz = UNPACK16(p);		p += 2;
		assert(p - b->buf == Pivhdsz);
		b->data = p;
		break;
	case Tleaf:
		b->nval = UNPACK16(p);		p += 2;
		b->valsz = UNPACK16(p);		p += 2;
		assert(p - b->buf == Leafhdsz);
		b->data = p;
		break;
	}
	if(b->type == Tlog || b->type == Tdlist){
		xh = b->logh;
		ck = bufhash(b->data, b->logsz);
	}else{
		xh = bp.hash;
		ck = blkhash(b);
	}
	if((!flg&GBnochk) && ck != xh){
		if(!(flg&GBsoftchk))
			broke("%s: %ullx %llux != %llux", Ecorrupt, bp.addr, xh, ck);
		fprint(2, "%s: %ullx %llux != %llux", Ecorrupt, bp.addr, xh, ck);
		error(Ecorrupt);
	}
	assert(b->magic == Magic);
}

static Arena*
pickarena(uint ty, uint hint, int tries)
{
	uint n, r;

	r = ainc(&fs->roundrobin)/2048;
	if(ty == Tdat)
		n = hint % (fs->narena - 1) + r + 1;
	else
		n = r;
	return &fs->arenas[(n + tries) % fs->narena];
}

Arena*
getarena(vlong b)
{
	int hi, lo, mid;
	vlong alo, ahi;
	Arena *a;

	lo = 0;
	hi = fs->narena;
	if(b == fs->sb0->bp.addr)
		return &fs->arenas[0];
	if(b == fs->sb1->bp.addr)
		return &fs->arenas[hi-1];
	while(1){
		mid = (hi + lo)/2;
		a = &fs->arenas[mid];
		alo = a->h0->bp.addr;
		ahi = alo + a->size + 2*Blksz;
		if(b < alo)
			hi = mid-1;
		else if(b > ahi)
			lo = mid+1;
		else
			return a;
	}
}


static void
freerange(Avltree *t, vlong off, vlong len)
{
	Arange *r, *s;

	assert(len % Blksz == 0);
	if((r = calloc(1, sizeof(Arange))) == nil)
		error(Enomem);
	r->off = off;
	r->len = len;
	assert(avllookup(t, r, 0) == nil);
	avlinsert(t, r);

Again:
	s = (Arange*)avlprev(r);
	if(s != nil && s->off+s->len == r->off){
		avldelete(t, r);
		s->len = s->len + r->len;
		free(r);
		r = s;
		goto Again;
	}
	s = (Arange*)avlnext(r);
	if(s != nil && r->off+r->len == s->off){
		avldelete(t, r);
		s->off = r->off;
		s->len = s->len + r->len;
		free(r);
		r = s;
		goto Again;
	}
}

static void
grabrange(Avltree *t, vlong off, vlong len)
{
	Arange *r, *s, q;
	vlong l;

	assert(len % Blksz == 0);
	q.off = off;
	q.len = len;
	r = (Arange*)avllookup(t, &q.Avl, -1);
	if(r == nil || off + len > r->off + r->len)
		abort();

	if(off == r->off){
		r->off += len;
		r->len -= len;
	}else if(off + len == r->off + r->len){
		r->len -= len;
	}else if(off > r->off && off+len < r->off + r->len){
		s = emalloc(sizeof(Arange), 0);
		l = r->len;
		s->off = off + len;
		r->len = off - r->off;
		s->len = l - r->len - len;
		avlinsert(t, s);
	}else
		abort();

	if(r->len == 0){
		avldelete(t, r);
		free(r);
	}
}

static Blk*
mklogblk(Arena *a, vlong o)
{
	Blk *lb;

assert(!canqlock(a));
	lb = a->logbuf[0];
	if(lb == a->logtl)
		lb = a->logbuf[1];
	assert(lb->ref == 1);
	lb->flag = Bstatic;
	initblk(lb, o, -1, Tlog);
	tracex("newlogb" , lb->bp, -1, getcallerpc(&a));
	lb->lasthold0 = lb->lasthold;
	lb = holdblk(lb);
	lb->lasthold = getcallerpc(&a);
	return lb;
}

/*
 * Logs an allocation. Must be called
 * with arena lock held. Duplicates some
 * of the work in allocblk to prevent
 * recursion.
 */
static void
logappend(Arena *a, vlong off, vlong len, int op)
{
	vlong o, start, end;
	Blk *lb;
	char *p;

	assert((off & 0xff) == 0);
	assert(op == LogAlloc || op == LogFree || op == LogSync);
	if(op != LogSync){
		start = a->h0->bp.addr;
		end = start + a->size + 2*Blksz;
		assert(off >= start);
		assert(off < end);
	}
	lb = a->logtl;
	assert(lb->ref > 0);
	assert(lb->type == Tlog);
	assert(lb->logsz >= 0);
	dprint("logop %d: %llx+%llx@%x\n", op, off, len, lb->logsz);

	if(checkflag(lb, 0, Bdirty))
		setflag(lb, Bdirty, Bfinal);

	/*
	 * move to the next block when we have
	 * too little room in the log:
	 * We're appending up to 16 bytes as
	 * part of the operation, followed by
	 * 16 bytes of new log entry allocation
	 * and chaining.
	 */
	if(lb->logsz >= Logspc - Logslop){
		o = blkalloc_lk(a, 0);
		if(o == -1)
			error(Efull);
		p = lb->data + lb->logsz;
		PACK64(p, o|LogAlloc1);
		lb->logsz += 8;
		lb->logp = (Bptr){o, -1, -1};
tracex("logchain1", lb->bp, o, a - fs->arenas);
		lb = mklogblk(a, o);
tracex("logchain2", lb->bp, getcallerpc(&a), -1);
	}
	if(len == Blksz){
		if(op == LogAlloc)
			op = LogAlloc1;
		else if(op == LogFree)
			op = LogFree1;
	}
	off |= op;
	p = lb->data + lb->logsz;
	PACK64(p, off);
	lb->logsz += 8;
	if(op >= Log2wide){
		PACK64(p+8, len);
		lb->logsz += 8;
	}
	if(lb != a->logtl) {
traceb("logstep1", a->logtl->logp);
traceb("logstep2", a->logtl->bp);
		finalize(lb);
		syncblk(lb);

		finalize(a->logtl);
		syncblk(a->logtl);
		dropblk(a->logtl);
		a->logtl = lb;
		a->nlog++;
	}
}

void
loadlog(Arena *a, Bptr bp)
{
	vlong ent, off, len, gen;
	int op, i, n;
	char *d;
	Blk *b;


	dprint("loadlog %B\n", bp);
	traceb("loadlog", bp);
	b = a->logbuf[0];
	while(1){
		assert(checkflag(b, Bstatic, Bcached));
		holdblk(b);
		readblk(b, bp, 0);
		dprint("\tload %B chain %B\n", bp, b->logp);
		a->nlog++;
		for(i = 0; i < b->logsz; i += n){
			d = b->data + i;
			ent = UNPACK64(d);
			op = ent & 0xff;
			off = ent & ~0xff;
			n = (op >= Log2wide) ? 16 : 8;
			switch(op){
			case LogSync:
				gen = ent >> 8;
				dprint("\tlog@%x: sync %lld\n", i, gen);
				if(gen >= fs->qgen){
					if(a->logtl == nil){
						b->logsz = i;
						a->logtl = b;
						cachedel(b->bp.addr);
						setflag(b, Bdirty, 0);
						return;
					}
					dropblk(b);
					return;
				}
				break;
	
			case LogAlloc:
			case LogAlloc1:
				len = (op >= Log2wide) ? UNPACK64(d+8) : Blksz;
				dprint("\tlog@%x alloc: %llx+%llx\n", i, off, len);
				grabrange(a->free, off & ~0xff, len);
				a->used += len;
				break;
			case LogFree:
			case LogFree1:
				len = (op >= Log2wide) ? UNPACK64(d+8) : Blksz;
				dprint("\tlog@%x free: %llx+%llx\n", i, off, len);
				freerange(a->free, off & ~0xff, len);
				a->used -= len;
				break;
			default:
				dprint("\tlog@%x: log op %d\n", i, op);
				abort();
				break;
			}
		}
		if(b->logp.addr == -1){
			a->logtl = b;
			return;
		}
		bp = b->logp;
		dropblk(b);
	}
}

void
flushlog(Arena *a)
{
	if(checkflag(a->logtl, 0, Bdirty|Bstatic))
		return;
	finalize(a->logtl);
	syncblk(a->logtl);
}

void
compresslog(Arena *a)
{
	int i, nr, nblks, nlog;
	vlong sz, *blks;
	Blk *b;
	Arange *r;
	char *p;

	flushlog(a);
tracex("compress", a->loghd, getcallerpc(&a), -1);
	/*
	 * Prepare what we're writing back.
	 * Arenas must be sized so that we can
	 * keep the merged log in memory for
	 * a rewrite.
	 */
	sz = 0;
	nr = 0;
	nlog = 0;
	for(r = (Arange*)avlmin(a->free); r != nil; r = (Arange*)avlnext(r)){
		sz += 16;
		nr++;
	}

	/*
	 * Make a pessimistic estimate of the number of blocks
	 * needed to store the ranges, as well as the blocks
	 * used to store the range allocations.
	 *
	 * This does modify the tree, but it's safe because
	 * we can only be removing entries from the tree, not
	 * splitting or inserting new ones.
	 */
	nblks = (sz+Logspc)/(Logspc - Logslop) + 16*nr/(Logspc-Logslop) + 1;
	if((blks = calloc(nblks, sizeof(vlong))) == nil)
		error(Enomem);
	if(waserror()){
		free(blks);
		nexterror();
	}
	for(i = 0; i < nblks; i++){
		blks[i] = blkalloc_lk(a, 1);
		if(blks[i] == -1)
			error(Efull);
	}

	/* fill up the log with the ranges from the tree */
	i = 0;
	b = mklogblk(a, blks[i++]);
	for(r = (Arange*)avlmin(a->free); r != nil; r = (Arange*)avlnext(r)){
		if(b->logsz >= Logspc - Logslop){
			b->logp = (Bptr){blks[i], -1, -1};
			finalize(b);
			syncblk(b);
			dropblk(b);
			nlog++;
			b = mklogblk(a, blks[i++]);
		}
		p = b->data + b->logsz;
		PACK64(p+0, r->off|LogFree);
		PACK64(p+8, r->len);
		b->logsz += 16;
	}

	/*
	 * now we have a valid freelist, and we can start
	 * appending stuff to it. Clean up the eagerly
	 * allocated extra blocks.
	 *
	 * Note that we need to drop the reference to the
	 * old logtl before we free the old blocks, because
	 * deallocating a block may require another block.
	 */
	dropblk(a->logtl);
	a->loghd = (Bptr){blks[0], -1, -1};
	a->logtl = b;	/* written back by sync() later */
	a->nlog = nlog;
	a->lastlogsz = nlog;

	/* May add blocks to new log */
	for(; i < nblks; i++)
		blkdealloc_lk(a, blks[i]);
	poperror();
	free(blks);
}

int
logbarrier(Arena *a, vlong gen)
{
	logappend(a, gen<<8, 0, LogSync);
	return 0;
}

/*
 * Allocate from an arena, with lock
 * held. May be called multiple times
 * per operation, to alloc space for
 * the alloc log.
 */
static vlong
blkalloc_lk(Arena *a, int seq)
{
	Arange *r;
	vlong b;

	if(seq)
		r = (Arange*)avlmin(a->free);
	else
		r = (Arange*)avlmax(a->free);
	if(!usereserve && a->size - a->used <= a->reserve)
		return -1;
	if(r == nil)
		broke(Estuffed);

	/*
	 * A bit of sleight of hand here:
	 * while we're changing the sorting
	 * key, but we know it won't change
	 * the sort order because the tree
	 * covers disjoint ranges
	 */
	if(seq){
		b = r->off;
		r->len -= Blksz;
		r->off += Blksz;
	}else{
		r->len -= Blksz;
		b = r->off + r->len;
	}
	if(r->len == 0){
		avldelete(a->free, r);
		free(r);
	}
	a->used += Blksz;
	return b;
}

static void
blkdealloc_lk(Arena *a, vlong b)
{
	cachedel(b);
	logappend(a, b, Blksz, LogFree);
	freerange(a->free, b, Blksz);
	a->used -= Blksz;
}

static vlong
blkalloc(int ty, uint hint, int seq)
{
	Arena *a;
	vlong b;
	int tries;

	tries = 0;
Again:
	a = pickarena(ty, hint, tries);
	/*
	 * Loop through the arena up to 2 times.
	 * The first pass tries to find an arena
	 * that has space and is not in use, the
	 * second waits until an arena is free.
	 */
	if(tries == 2*fs->narena)
		error(Efull);
	tries++;
	if(tries < fs->narena){
		if(canqlock(a) == 0)
			goto Again;
	}else
		qlock(a);
	if(waserror()){
		qunlock(a);
		nexterror();
	}
	b = blkalloc_lk(a, seq);
	if(b == -1){
		qunlock(a);
		poperror();
		goto Again;
	}
	logappend(a, b, Blksz, LogAlloc);
	qunlock(a);
	poperror();
	return b;
}

static Blk*
initblk(Blk *b, vlong bp, vlong gen, int ty)
{
	Blk *ob;

	ob = cacheget(bp);
	if(ob != nil)
		fatal("double alloc: %#p %B %#p %B", b, b->bp, ob, ob->bp);
	b->type = ty;
	b->bp.addr = bp;
	b->bp.hash = -1;
	b->bp.gen = gen;
	switch(ty){
	case Tdat:
		b->data = b->buf;
		break;
	case Tarena:
		b->data = b->buf+2;
		break;
	case Tdlist:
	case Tlog:
		b->logsz = 0;
		b->logp = (Bptr){-1, -1, -1};
		b->data = b->buf + Loghdsz;
		break;
	case Tpivot:
		b->data = b->buf + Pivhdsz;
		break;
	case Tleaf:
		b->data = b->buf + Leafhdsz;
		break;
	}
	setflag(b, Bdirty, 0);
	b->nval = 0;
	b->valsz = 0;
	b->nbuf = 0;
	b->bufsz = 0;
	b->logsz = 0;
	b->alloced = getcallerpc(&b);

	return b;
}

Blk*
newdblk(Tree *t, vlong hint, int seq)
{
	vlong bp;
	Blk *b;

	bp = blkalloc(Tdat, hint, seq);
	b = cachepluck();
	initblk(b, bp, t->memgen, Tdat);
	b->alloced = getcallerpc(&t);
	tracex("newdblk" , b->bp, Tdat, getcallerpc(&t));
	return b;

}

Blk*
newblk(Tree *t, int ty)
{
	vlong bp;
	Blk *b;

	bp = blkalloc(ty, 0, 0);
	b = cachepluck();
	initblk(b, bp, t->memgen, ty);
	b->alloced = getcallerpc(&t);
	tracex("newblk" , b->bp, ty, getcallerpc(&t));
	return b;
}

Blk*
dupblk(Tree *t, Blk *b)
{
	Blk *r;

	if((r = newblk(t, b->type)) == nil)
		return nil;

	tracex("dup" , b->bp, b->type, t->gen);
	r->bp.hash = -1;
	r->nval = b->nval;
	r->valsz = b->valsz;
	r->nbuf = b->nbuf;
	r->bufsz = b->bufsz;
	r->logsz = b->logsz;
	r->alloced = getcallerpc(&t);
	memcpy(r->buf, b->buf, sizeof(r->buf));
	return r;
}

void
finalize(Blk *b)
{
	if(b->type != Tdat)
		PACK16(b->buf, b->type);

	switch(b->type){
	default:
		abort();
		break;
	case Tpivot:
		PACK16(b->buf+2, b->nval);
		PACK16(b->buf+4, b->valsz);
		PACK16(b->buf+6, b->nbuf);
		PACK16(b->buf+8, b->bufsz);
		break;
	case Tleaf:
		PACK16(b->buf+2, b->nval);
		PACK16(b->buf+4, b->valsz);
		break;
	case Tdlist:
	case Tlog:
		b->logh = bufhash(b->data, b->logsz);
		PACK16(b->buf+2, b->logsz);
		PACK64(b->buf+4, b->logh);
		packbp(b->buf+12, Ptrsz, &b->logp);
		break;
	case Tdat:
	case Tarena:
	case Tsuper:
		break;
	}

	b->bp.hash = blkhash(b);
	setflag(b, Bdirty|Bfinal, 0);
}

Blk*
getblk(Bptr bp, int flg)
{
	Blk *b;
	int i;

	i = ihash(bp.addr) % nelem(fs->blklk);
	qlock(&fs->blklk[i]);
	if(waserror()){
		qunlock(&fs->blklk[i]);
		nexterror();
	}
	if((b = cacheget(bp.addr)) != nil){
		assert(checkflag(b, 0, Bfreed));
		b->lasthold = getcallerpc(&bp);
		qunlock(&fs->blklk[i]);
		poperror();
		return b;
	}
	b = cachepluck();
	b->alloced = getcallerpc(&bp);
	b->alloced = getcallerpc(&bp);
	readblk(b, bp, flg);
	b->bp.gen = bp.gen;
	b->lasthold = getcallerpc(&bp);
	cacheins(b);
	qunlock(&fs->blklk[i]);
	poperror();

	return b;
}


Blk*
holdblk(Blk *b)
{
	ainc(&b->ref);
	b->lasthold = getcallerpc(&b);
	return b;
}

void
dropblk(Blk *b)
{
	if(b == nil)
		return;
	b->lastdrop = getcallerpc(&b);
	if(adec(&b->ref) != 0)
		return;
	/*
	 * freed blocks go to the LRU bottom
	 * for early reuse.
	 */
	if(checkflag(b, Bfreed, 0))
		lrubot(b);
	else
		lrutop(b);
}

ushort
blkfill(Blk *b)
{
	switch(b->type){
	case Tpivot:
		return 2*b->nbuf + b->bufsz +  2*b->nval + b->valsz;
	case Tleaf:
		return 2*b->nval + b->valsz;
	default:
		fprint(2, "invalid block @%lld\n", b->bp.addr);
		abort();
	}
}

void
limbo(int op, Limbo *l)
{
	Limbo *p;
	ulong ge;

	l->op = op;
	while(1){
		ge = agetl(&fs->epoch);
		p = agetp(&fs->limbo[ge]);
		l->next = p;
		if(acasp(&fs->limbo[ge], p, l)){
			ainc(&fs->nlimbo);
			break;
		}
	}
}

void
freeblk(Tree *t, Blk *b)
{
	if(t == &fs->snap || (t != nil && b->bp.gen < t->memgen)){
		tracex("killb", b->bp, t->memgen, getcallerpc(&t));
		killblk(t, b->bp);
		return;
	}
	b->freed = getcallerpc(&t);
	tracex("freeb", b->bp, getcallerpc(&t), -1);
	setflag(b, Blimbo, 0);
	holdblk(b);
	assert(b->ref > 1);
	limbo(DFblk, b);
}

void
freebp(Tree *t, Bptr bp)
{
	Bfree *f;

	if(t == &fs->snap || (t != nil && bp.gen < t->memgen)){
		tracex("killbp", bp, t->memgen, getcallerpc(&t));
		killblk(t, bp);
		return;
	}
	tracex("freebp", bp, getcallerpc(&t), -1);

	qlock(&fs->bfreelk);
	while(fs->bfree == nil)
		rsleep(&fs->bfreerz);
	f = fs->bfree;
	fs->bfree = (Bfree*)f->next;
	qunlock(&fs->bfreelk);

	f->bp = bp;
	limbo(DFbp, f);
}

void
epochstart(int tid)
{
	ulong ge;

	ge = agetl(&fs->epoch);
	asetl(&fs->lepoch[tid], ge | Eactive);
}

void
epochend(int tid)
{
	ulong le;

	le = agetl(&fs->lepoch[tid]);
	asetl(&fs->lepoch[tid], le &~ Eactive);
}

void
epochwait(void)
{
	int i, delay;
	ulong e, ge;

	delay = 0;
Again:
	ge = agetl(&fs->epoch);
	for(i = 0; i < fs->nworker; i++){
		e = agetl(&fs->lepoch[i]);
		if((e & Eactive) && e != (ge | Eactive)){
			if(delay < 1000)
				delay++;
			else
				fprint(2, "stalled epoch %lx [worker %d]\n", e, i);
			sleep(delay);
			goto Again;
		}
	}
}

void
epochclean(void)
{
	ulong c, e, ge;
	Limbo *p, *n;
	Blk *b;
	Bfree *f;
	Arena *a;
	Qent qe;
	int i;

	c = agetl(&fs->nlimbo);
	ge = agetl(&fs->epoch);
	for(i = 0; i < fs->nworker; i++){
		e = agetl(&fs->lepoch[i]);
		if((e & Eactive) && e != (ge | Eactive)){
			if(c < fs->cmax/4)
				return;
			epochwait();
		}
	}
	epochwait();
	p = asetp(&fs->limbo[(ge+1)%3], nil);
	asetl(&fs->epoch, (ge+1)%3);

	for(; p != nil; p = n){
		n = p->next;
		switch(p->op){
		case DFtree:
			free(p);
			break;
		case DFmnt:
			free(p);
			break;
		case DFbp:
			f = (Bfree*)p;
			a = getarena(f->bp.addr);
			if((b = cacheget(f->bp.addr)) != nil){
				setflag(b, Bfreed, Bdirty|Blimbo);
				dropblk(b);
			}
			qe.op = Qfree;
			qe.bp = f->bp;
			qe.b = nil;
			qput(a->sync, qe);
			qlock(&fs->bfreelk);
			f->next = fs->bfree;
			fs->bfree = f;
			rwakeup(&fs->bfreerz);
			qunlock(&fs->bfreelk);
			break;
		case DFblk:
			b = (Blk*)p;
			qe.op = Qfree;
			qe.bp = b->bp;
			qe.b = nil;
			setflag(b, Bfreed, Bdirty|Blimbo);
			a = getarena(b->bp.addr);
			dropblk(b);
			qput(a->sync, qe);
			break;
		default:
			abort();
		}
		adec(&fs->nlimbo);
	}
}

void
enqueue(Blk *b)
{
	Arena *a;
	Qent qe;

	assert(checkflag(b, Bdirty, Bqueued|Bstatic));
	assert(b->bp.addr >= 0);
	finalize(b);
	if(checkflag(b, 0, Bcached)){
		cacheins(b);
		b->cached = getcallerpc(&b);
	}
	holdblk(b);

	b->enqueued = getcallerpc(&b);
	traceb("queueb", b->bp);
	a = getarena(b->bp.addr);
	qe.op = Qwrite;
	qe.bp = b->bp;
	qe.b = b;
	qput(a->sync, qe);
}

void
qinit(Syncq *q)
{
	q->fullrz.l = &q->lk;
	q->emptyrz.l = &q->lk;
	q->nheap = 0;
	q->heapsz = fs->cmax;
	q->heap = emalloc(q->heapsz*sizeof(Qent), 1);
}

static int
qcmp(Qent *a, Qent *b)
{
	if(a->qgen != b->qgen)
		return (a->qgen < b->qgen) ? -1 : 1;
	if(a->op != b->op)
		return (a->op < b->op) ? -1 : 1;
	if(a->bp.addr != b->bp.addr)
		return (a->bp.addr < b->bp.addr) ? -1 : 1;
	return 0;
}

void
qput(Syncq *q, Qent qe)
{
	int i;

	if(qe.op == Qfree || qe.op == Qwrite)
		assert((qe.bp.addr & (Blksz-1)) == 0);
	else if(qe.op == Qfence)
		assert(fs->syncing > 0);
	else
		abort();
	if(qe.b != nil)
		assert(qe.b->ref > 0);
	qlock(&q->lk);
	qe.qgen = agetv(&fs->qgen);
	while(q->nheap == q->heapsz)
		rsleep(&q->fullrz);
	for(i = q->nheap; i > 0; i = (i-1)/2){
		if(qcmp(&qe, &q->heap[(i-1)/2]) == 1)
			break;
		q->heap[i] = q->heap[(i-1)/2];
	}
	q->heap[i] = qe;
	q->nheap++;
	rwakeup(&q->emptyrz);
	qunlock(&q->lk);
}

static Qent
qpop(Syncq *q)
{
	int i, l, r, m;
	Qent e, t;

	qlock(&q->lk);
	while(q->nheap == 0)
		rsleep(&q->emptyrz);
	e = q->heap[0];
	if(--q->nheap == 0)
		goto Out;

	i = 0;
	q->heap[0] = q->heap[q->nheap];
	while(1){
		m = i;
		l = 2*i+1;
		r = 2*i+2;
		if(l < q->nheap && qcmp(&q->heap[m], &q->heap[l]) == 1)
			m = l;
		if(r < q->nheap && qcmp(&q->heap[m], &q->heap[r]) == 1)
			m = r;
		if(m == i)
			break;
		t = q->heap[m];
		q->heap[m] = q->heap[i];
		q->heap[i] = t;
		i = m;
	}
Out:
	rwakeup(&q->fullrz);
	qunlock(&q->lk);
	if(e.b != nil){
		setflag(e.b, 0, Bqueued);
		e.b->queued = 0;
	}
	return e;
}

void
runsync(int, void *p)
{
	Arena *a;
	Syncq *q;
	Qent qe;

	q = p;
	if(waserror()){
		ainc(&fs->rdonly);
		fprint(2, "error syncing: %s\n", errmsg());
		return;
	}
	while(1){
		qe = qpop(q);
		switch(qe.op){
		case Qfree:
			tracex("qfreeb", qe.bp, qe.qgen, -1);
			/*
			 * we shouldn't have a block in a free op,
			 * the frees go into the queue just to ensure
			 * write/reuse ordering.
			 */
			assert(qe.b == nil);
			a = getarena(qe.bp.addr);
			qlock(a);
			blkdealloc_lk(a, qe.bp.addr);
			qunlock(a);
			break;
		case Qfence:
			tracev("qfence", qe.qgen);
			qlock(&fs->synclk);
			if(--fs->syncing == 0)
				rwakeupall(&fs->syncrz);
			qunlock(&fs->synclk);
			break;
		case Qwrite:
			tracex("qsyncb", qe.bp, qe.qgen, -1);
			if(checkflag(qe.b, Bfreed, Bstatic) == 0)
				syncblk(qe.b);
			dropblk(qe.b);
			break;
		default:
			abort();
		}
		assert(estacksz() == 1);
	}
}