shithub: gefs

ref: e7398383ed59cfcf02d8916fed0d0be3421822cd
dir: /tree.c/

View raw version
#include <u.h>
#include <libc.h>
#include <fcall.h>
#include <avl.h>

#include "dat.h"
#include "fns.h"

typedef struct Path	Path;

struct Path {
	/* Flowing down for flush */
	Msg	*ins;	/* inserted values, bounded by lo..hi */
	Blk	*b;	/* to shadow */
	int	idx;	/* insert at */
	int	lo;	/* key range */
	int	hi;	/* key range */
	int	sz;	/* size of range */

	/* Flowing up from flush */
	int	op;	/* change done along path */
	Blk	*m;	/* node merged against, for post-update free */
	Blk	*nl;	/* new left */
	Blk	*nr;	/* new right, if we split or rotated */
	int	midx;	/* modification index */
	int	npull;	/* number of messages successfully pulled */
	int	pullsz;	/* size of pulled messages */
};

#define efreeblk(t, b) do { \
	if(b != nil) \
		freeblk(t, b, b->bp); \
	} while(0)

static void
stablesort(Msg *m, int nm)
{
	int i, j;
	Msg t;

	for(i = 1; i < nm; i++){
		for(j = i; j > 0; j--){
			if(keycmp(&m[j-1], &m[j]) <= 0)
				break;
			t = m[j-1];
			m[j-1] = m[j];
			m[j] = t;
		}
	}
}

void
cpkey(Key *dst, Key *src, char *buf, int nbuf)
{
	assert(src->nk <= nbuf);
	memmove(buf, src->k, src->nk);
	dst->k = buf;
	dst->nk = src->nk;
}

void
cpkvp(Kvp *dst, Kvp *src, char *buf, int nbuf)
{
	assert(src->nk+src->nv <= nbuf);
	memmove(buf, src->k, src->nk);
	memmove(buf+ src->nk, src->v, src->nv);
	dst->k = buf;
	dst->nk = src->nk;
	dst->v = buf+src->nk;
	dst->nv = src->nv;
}

int
keycmp(Key *a, Key *b)
{
	int c, n;

	n = (a->nk < b->nk) ? a->nk : b->nk;
	if((c = memcmp(a->k, b->k, n)) != 0)
		return c < 0 ? -1 : 1;
	if(a->nk < b->nk)
		return -1;
	else if(a->nk > b->nk)
		return 1;
	else
		return 0;
}

static int
msgsz(Msg *m)
{
	/* disp + op + klen + key + vlen + v */
	return 2+1+2+m->nk +2+ m->nv;
}

static int
valsz(Kvp *kv)
{
	return 2 + 2+kv->nk + 2+kv->nv;
}

void
getval(Blk *b, int i, Kvp *kv)
{
	char *p;
	int o;

	assert(i >= 0 && i < b->nval);
	p = b->data + 2*i;
	o = UNPACK16(p);	p = b->data + o;
	kv->nk = UNPACK16(p);	p += 2;
	kv->k = p;		p += kv->nk;
	kv->nv = UNPACK16(p);	p += 2;
	kv->v = p;
}

Bptr
getptr(Kvp *kv, int *fill)
{
	assert(kv->nv == Ptrsz || kv->nv == Ptrsz+2);
	*fill = UNPACK16(kv->v + Ptrsz);
	return unpackbp(kv->v, kv->nv);
}

/* Exported for reaming */
void
setval(Blk *b, Kvp *kv)
{
	int off, spc;
	char *p;

	spc = (b->type == Tleaf) ? Leafspc : Pivspc;
	b->valsz += 2 + kv->nk + 2 + kv->nv;
	off = spc - b->valsz;

	assert(2*(b->nval+1) + b->valsz <= spc);
	assert(2*(b->nval+1) <= off);

	p = b->data + 2*b->nval;
	PACK16(p, off);

	p = b->data + off;
	PACK16(p, kv->nk);		p += 2;
	memmove(p, kv->k, kv->nk);	p += kv->nk;
	PACK16(p, kv->nv);		p += 2;
	memmove(p, kv->v, kv->nv);

	b->nval++;
}

static void
setptr(Blk *b, Key *k, Bptr bp, int fill)
{
	char *p, buf[Ptrsz+2];
	Kvp kv;

	kv.k = k->k;
	kv.nk = k->nk;
	kv.v = buf;
	kv.nv = sizeof(buf);
	p = packbp(buf, sizeof(buf), &bp);
	PACK16(p, fill);
	setval(b, &kv);
}

static void
setmsg(Blk *b, Msg *m)
{
	char *p;
	int o;

	assert(b->type == Tpivot);
	b->bufsz += msgsz(m)-2;

	p = b->data + Pivspc + 2*b->nbuf;
	o = Bufspc - b->bufsz;
	PACK16(p, o);

	p = b->data + Pivspc + o;
	*p = m->op;			p += 1;
	PACK16(p, m->nk);		p += 2;
	memmove(p, m->k, m->nk);	p += m->nk;
	PACK16(p, m->nv);		p += 2;
	memmove(p, m->v, m->nv);

	b->nbuf++;
}

void
getmsg(Blk *b, int i, Msg *m)
{
	char *p;
	int o;

	assert(b->type == Tpivot);
	assert(i >= 0 && i < b->nbuf);
	p = b->data + Pivspc + 2*i;
	o = UNPACK16(p);
	p = b->data + Pivspc + o;
	m->op = *p;		p += 1;
	m->nk = UNPACK16(p);	p += 2;
	m->k = p;		p += m->nk;
	m->nv = UNPACK16(p);	p += 2;
	m->v = p;
}

static int
bufsearch(Blk *b, Key *k, Msg *m, int *same)
{
	int lo, hi, ri, mid, r;
	Msg cmp;

	ri = -1;
	lo = 0;
	hi = b->nbuf-1;
	while(lo <= hi){
		mid = (hi + lo) / 2;
		getmsg(b, mid, &cmp);
		r = keycmp(k, &cmp);
		switch(r){
		case -1:
			hi = mid-1;
			break;
		case 0:
			ri = mid;
			hi = mid-1;
			break;
		case 1:
			lo = mid+1;
			break;
		}
	}
	/*
	 * we can have duplicate messages, and we
	 * want to point to the first of them:
	 * scan backwards.
	 */
	*same = 0;
	if(ri == -1)
		ri = lo-1;
	else
		*same = 1;
	if(m != nil && ri >= 0)
		getmsg(b, ri, m);
	return ri;
}

static int
blksearch(Blk *b, Key *k, Kvp *rp, int *same)
{
	int lo, hi, ri, mid, r;
	Kvp cmp;

	ri = -1;
	lo = 0;
	hi = b->nval-1;
	while(lo <= hi){
		mid = (hi + lo) / 2;
		getval(b, mid, &cmp);
		r = keycmp(k, &cmp);
		switch(r){
		case -1:
			hi = mid-1;
			break;
		case 0:
			ri = mid;
			hi = mid-1;
			break;
		case 1:
			lo = mid+1;
			break;
		}
	}
	*same = 0;
	if(ri == -1)
		ri = lo-1;
	else
		*same = 1;
	if(ri >= 0)
		getval(b, ri, rp);
	return ri;
}

static int
buffill(Blk *b)
{
	assert(b->type == Tpivot);
	return 2*b->nbuf + b->bufsz;
}

static int
filledbuf(Blk *b, int nmsg, int needed)
{
	assert(b->type == Tpivot);
	return 2*(b->nbuf+nmsg) + b->bufsz + needed > Bufspc;
}

static int
filledleaf(Blk *b, int needed)
{
	assert(b->type == Tleaf);
	return 2*(b->nval+1) + b->valsz + needed > Leafspc;
}

static int
filledpiv(Blk *b, int reserve)
{
	/* 
	 * We need to guarantee there's room for one message
	 * at all times, so that splits along the whole path
	 * have somewhere to go as they propagate up.
	 */
	assert(b->type == Tpivot);
	return 2*(b->nval+1) + b->valsz + reserve*Kpmax > Pivspc;
}

static void
copyup(Blk *n, Path *pp, int *nbytes)
{
	Kvp kv;
	Msg m;

	/*
	 * It's possible for the previous node to have
	 * been fully cleared out by a large number of
	 * delete messages, so we need to check if
	 * there's anything in it to copy up.
	 */
	if(pp->nl->nval > 0){
		getval(pp->nl, 0, &kv);
		if(pp->nl->nbuf > 0){
			getmsg(pp->nl, 0, &m);
			if(keycmp(&kv, &m) > 0)
				kv.Key = m.Key;
		}
		setptr(n, &kv, pp->nl->bp, blkfill(pp->nl));
		if(nbytes != nil)
			*nbytes += valsz(&kv);
	}
	if(pp->nr != nil && pp->nr->nval > 0){
		getval(pp->nr, 0, &kv);
		if(pp->nr->nbuf > 0){
			getmsg(pp->nr, 0, &m);
			if(keycmp(&kv, &m) > 0)
				kv.Key = m.Key;
		}
		setptr(n, &kv, pp->nr->bp, blkfill(pp->nr));
		if(nbytes != nil)
			*nbytes += valsz(&kv);
	}
}

static void
statupdate(Kvp *kv, Msg *m)
{
	int op;
	char *p;
	Xdir d;

	p = m->v;
	op = *p++;
	kv2dir(kv, &d);
	/* bump version */
	d.qid.vers++;
	if(op & Owsize){
		d.length = UNPACK64(p);
		p += 8;
	}
	if(op & Owmode){
		d.mode = UNPACK32(p);
		d.qid.type = d.mode>>24;
		p += 4;
	}
	if(op & Owmtime){
		d.mtime = UNPACK64(p);
		p += 8;
	}
	if(op & Owatime){
		d.atime = UNPACK64(p);
		p += 8;
	}
	if(op & Owuid){
		d.uid = UNPACK32(p);
		p += 4;
	}
	if(op & Owgid){
		d.gid = UNPACK32(p);
		p += 4;
	}
	if(op & Owmuid){
		d.muid = UNPACK32(p);
		p += 4;
	}
	if(p != m->v + m->nv)
		fatal("malformed stat: kv=%P, m=%M\n", kv, m);
	if(packdval(kv->v, kv->nv, &d) == nil)
		fatal("repacking dir failed\n");
}

static int
apply(Kvp *kv, Msg *m, char *buf, int nbuf)
{
	vlong *pv;
	char *p;
	Tree t;

	switch(m->op){
	case Oclearb:
	case Odelete:
	case Oclobber:
		assert(keycmp(kv, m) == 0);
		return 0;
	case Oinsert:
		cpkvp(kv, m, buf, nbuf);
		return 1;
	case Owstat:
		assert(keycmp(kv, m) == 0);
		statupdate(kv, m);
		return 1;
	case Orelink:
	case Oreprev:
		unpacktree(&t, kv->v, kv->nv);
		p = m->v;
		pv = (m->op == Orelink) ? &t.succ : &t.pred;
		*pv = UNPACK64(p);	p += 8;
		t.nlbl += *p;		p++;
		t.nref += *p;		p++;
		assert(t.nlbl >= 0 && t.nref >= 0);
		assert(p == m->v + m->nv);
		packtree(kv->v, kv->nv, &t);
		return 1;
	default:
		fatal("invalid op %d\n", m->op);
	}
	return 0;
}

static int
pullmsg(Path *p, int i, Kvp *v, Msg *m, int *full, int spc)
{
	if(i < 0 || i >= p->hi || *full)
		return -1;

	if(p->ins != nil)
		*m = p->ins[i];
	else
		getmsg(p->b, i, m);
	if(msgsz(m) <= spc)
		return (v == nil) ? 0 : keycmp(v, m);
	*full = 1;
	return -1;
}

/*
 * Creates a new block with the contents of the old
 * block. When copying the contents, it repacks them
 * to minimize the space uses, and applies the changes
 * pending from the downpath blocks.
 *
 * When pidx != -1, 
 */
static void
updateleaf(Tree *t, Path *up, Path *p)
{
	char buf[Msgmax];
	int i, j, ok, full, spc;
	Blk *b, *n;
	Bptr bp;
	Msg m;
	Kvp v;

	i = 0;
	j = up->lo;
	b = p->b;
	/*
	 * spc is the amount of room we have
	 * to copy data down from the parent; it's
	 * necessarily a bit conservative, because
	 * deletion messages don't take space -- but
	 * we don't know how what the types of all
	 * messages are.
	 */
	full = 0;
	spc = Leafspc - blkfill(b);
	n = newblk(t, b->type, 0);
	assert(i >= 0 && j >= 0);
	while(i < b->nval){
		ok = 1;
		getval(p->b, i, &v);
		switch(pullmsg(up, j, &v, &m, &full, spc)){
		case -1:
			setval(n, &v);
			i++;
			break;
		case 1:
			/*
			 * new values must always start as
			 * an insertion, mutations come after.
			 */
			if(m.op == Oclearb || m.op == Oclobber)
				ok = 0;
			else if(m.op != Oinsert)
				fatal("%d(/%d), %d: %M not insert\n", i, b->nval, j, &m);
			cpkvp(&v, &m, buf, sizeof(buf));
			spc -= valsz(&m);
			goto Copy;
		case 0:
			i++;
			if(m.op != Oinsert)
				cpkvp(&v, &v, buf, sizeof(buf));
			while(j < up->hi){
				/*
				 * If a file is truncated multiple times,
				 * we can end up with duplicate Oclearb
				 * messages in the buffer.
				 *
				 * These Oclearb doublings will leave
				 * v.nv with a length of 0, so we can
				 * and should skip those
				 */
				if(m.op == Oclearb && v.nv != 0){
					bp = unpackbp(v.v, v.nv);
					freeblk(t, nil, bp);
				}
				ok = apply(&v, &m, buf, sizeof(buf));
		Copy:
				j++;
				p->pullsz += msgsz(&m);
				if(j >= up->hi || pullmsg(up, j, &v, &m, &full, spc) != 0)
					break;
			}
			if(ok)
				setval(n, &v);
			break;
		}
	}
	while(j < up->hi) {
		/* can't fail */
		pullmsg(up, j++, nil, &m, &full, spc);
		ok = 1;
		cpkvp(&v, &m, buf, sizeof(buf));
		p->pullsz += msgsz(&m);
		if(m.op == Oclearb || m.op == Oclobber)
			continue;
		if(m.op != Oinsert)
			fatal("%d(/%d), %d: %M not insert\n", i, b->nval, j, &m);
		while(pullmsg(up, j, &v, &m, &full, spc) == 0){
			ok = apply(&v, &m, buf, sizeof(buf));
			p->pullsz += msgsz(&m);
			j++;
		}
		if(ok)
			setval(n, &v);
	}
	p->npull = (j - up->lo);
	p->nl = n;
}

/*
 * Creates a new block with the contents of the old
 * block. When copying the contents, it repacks them
 * to minimize the space uses, and applies the changes
 * pending from the downpath blocks.
 *
 * When pidx != -1, 
 */
static void
updatepiv(Tree *t, Path *up, Path *p, Path *pp)
{
	char buf[Msgmax];
	int i, j, sz, full, spc;
	Blk *b, *n;
	Msg m, u;

	b = p->b;
	n = newblk(t, b->type, 0);
	for(i = 0; i < b->nval; i++){
		if(pp != nil && i == p->midx){
			copyup(n, pp, nil);
			if(pp->op == POrot || pp->op == POmerge)
				i++;
		}else{
			getval(b, i, &m);
			setval(n, &m);
		}
	}
	i = 0;
	j = up->lo;
	sz = 0;
	full = 0;
	spc = Bufspc - buffill(b);
	if(pp != nil)
		spc += pp->pullsz;
	while(i < b->nbuf){
		if(i == p->lo)
			i += pp->npull;
		if(i == b->nbuf)
			break;
		getmsg(b, i, &m);
		switch(pullmsg(up, j, &m, &u, &full, spc - sz)){
		case -1:
		case 0:
			setmsg(n, &m);
			i++;
			break;
		case 1:
			cpkvp(&m, &u, buf, sizeof(buf));
			while(pullmsg(up, j, &m, &u, &full, spc) == 0){
				setmsg(n, &u);
				sz = msgsz(&u);
				p->pullsz += sz;
				spc -= sz;
				j++;
			}
		}
	}
	while(j < up->hi){
		pullmsg(up, j, nil, &u, &full, spc);
		if(full)
			break;
		setmsg(n, &u);
		sz = msgsz(&u);
		p->pullsz += sz;
		spc -= sz;
		j++;
	}
	p->npull = (j - up->lo);
	p->nl = n;
}

/*
 * Splits a node, returning the block that msg
 * would be inserted into. Split must never
 * grow the total height of the tree by more than 1.
 */
static void
splitleaf(Tree *t, Path *up, Path *p, Kvp *mid)
{
	char buf[Msgmax];
	int full, copied, spc, ok, halfsz;
	int i, j, c;
	Blk *b, *d, *l, *r;
	Msg m;
	Kvp v;

	/*
	 * If the block one entry up the
	 * p is nil, we're at the root,
	 * so we want to make a new block.
	 */
	b = p->b;
	l = nil;
	r = nil;
	if(waserror()){
		efreeblk(t, l);
		efreeblk(t, r);
		nexterror();
	}
	l = newblk(t, b->type, 0);
	r = newblk(t, b->type, 0);

	d = l;
	i = 0;
	j = up->lo;
	full = 0;
	copied = 0;
	halfsz = (2*b->nval + b->valsz + up->sz) / 2;
	if(halfsz > Leafspc/2)
		halfsz = Leafspc/2;
	spc = Leafspc - (halfsz + Msgmax);
	assert(b->nval >= 4);
	while(i < b->nval){
		/*
		 * We're trying to balance size,
		 * but we need at least 2 nodes
		 * in each half of the split if
		 * we want a valid tree.
		 */
		if(d == l)
		if((i == b->nval-2) || (i >= 2 && copied >= halfsz)){
			d = r;
			spc = Leafspc - (halfsz + Msgmax);
			getval(b, i, mid);
		}
		ok = 1;
		getval(b, i, &v);
 		c = pullmsg(up, j, &v, &m, &full, spc);
		switch(c){
		case -1:
			setval(d, &v);
			copied += valsz(&v);
			i++;
			break;
		case 1:
			/*
			 * new values must always start as
			 * an insertion, mutations come after.
			 */
			if(m.op == Oclearb || m.op == Oclobber)
				ok = 0;
			else if(m.op != Oinsert)
				fatal("%d(/%d), %d: expected insert, got %M\n", i, b->nval, j, &m);
			cpkvp(&v, &m, buf, sizeof(buf));
			spc -= valsz(&m);
			goto Copy;
		case 0:
			i++;
			if(m.op != Oinsert)
				cpkvp(&v, &v, buf, sizeof(buf));
			while(j < up->hi){
				ok = apply(&v, &m, buf, sizeof(buf));
		Copy:
				p->pullsz += msgsz(&m);
				j++;
				if(j == up->hi || pullmsg(up, j, &v, &m, &full, spc) != 0)
					break;
			}
			if(ok)
				setval(d, &v);
			break;
		}
	}
	p->npull = (j - up->lo);
	p->op = POsplit;
	p->nl = l;
	p->nr = r;
	poperror();
}

/*
 * Splits a node, returning the block that msg
 * would be inserted into. Split must never
 * grow the total height of the tree by more
 * than one.
 */
static void
splitpiv(Tree *t, Path *, Path *p, Path *pp, Kvp *mid)
{
	int i, copied, halfsz;
	Blk *b, *d, *l, *r;
	Kvp tk;
	Msg m;

	/*
	 * If the bp->lock one entry up the
	 * p is nil, we're at the root,
	 * so we want to make a new bp->lock.
	 */
	b = p->b;
	l = nil;
	r = nil;
	if(waserror()){
		efreeblk(t, l);
		efreeblk(t, r);
		nexterror();
	}
	l = newblk(t, b->type, 0);
	r = newblk(t, b->type, 0);
	d = l;
	copied = 0;
	halfsz = (2*b->nval + b->valsz)/2;
	assert(b->nval >= 4);
	for(i = 0; i < b->nval; i++){
		/*
		 * We're trying to balance size,
		 * but we need at least 2 nodes
		 * in each half of the split if
		 * we want a valid tree.
		 */
		if(d == l)
		if((i == b->nval-2) || (i >= 2 && copied >= halfsz)){
			d = r;
			getval(b, i, mid);
		}
		if(i == p->idx){
			copyup(d, pp, &copied);
			continue;
		}
		getval(b, i, &tk);
		setval(d, &tk);
		copied += valsz(&tk);
	}
	d = l;
	for(i = 0; i < b->nbuf; i++){
		if(i == p->lo)
			i += pp->npull;
		if(i == b->nbuf)
			break;
		getmsg(b, i, &m);
		if(d == l && keycmp(&m, mid) >= 0)
			d = r;
		setmsg(d, &m);
	}
	p->op = POsplit;
	p->nl = l;
	p->nr = r;
	poperror();
}

static void
merge(Tree *t, Path *p, Path *pp, int idx, Blk *a, Blk *b)
{
	Blk *d;
	Msg m;
	int i;

	d = newblk(t, a->type, 0);
	for(i = 0; i < a->nval; i++){
		getval(a, i, &m);
		setval(d, &m);
	}
	for(i = 0; i < b->nval; i++){
		getval(b, i, &m);
		setval(d, &m);
	}
	if(a->type == Tpivot){
		for(i = 0; i < a->nbuf; i++){
			getmsg(a, i, &m);
			setmsg(d, &m);
		}
		for(i = 0; i < b->nbuf; i++){
			getmsg(b, i, &m);
			setmsg(d, &m);
		}
	}
	enqueue(d);
	p->midx = idx;
	pp->nl = d;
	pp->op = POmerge;
	pp->nr = nil;
}

/*
 * Scan a single block for the split offset;
 * returns 1 if we'd spill out of the buffer,
 * updates *idx and returns 0 otherwise.
 */
static int
spillscan(Blk *d, Blk *b, Msg *m, int *idx, int o)
{
	int i, used;
	Msg n;

	used = 2*d->nbuf + d->bufsz;
	for(i = *idx; i < b->nbuf; i++){
		getmsg(b, i, &n);
		if(keycmp(m, &n) <= 0){
			*idx = i + o;
			return 0;
		}
		used += msgsz(&n);
		if(used > Bufspc)
			return 1;
	}
	*idx = b->nbuf;
	return 0;
}

/*
 * Returns whether the keys in b between
 * idx and m would spill out of the buffer
 * of d.
 */
static int
spillsbuf(Blk *d, Blk *l, Blk *r, Msg *m, int *idx)
{
	if(l->type == Tleaf)
		return 0;

	if(*idx < l->nbuf && spillscan(d, l, m, idx, 0))
		return 1;
	if(*idx >= l->nbuf && spillscan(d, r, m, idx, l->nbuf))
		return 1;
	return 0;
}

static void
rotate(Tree *t, Path *p, Path *pp, int midx, Blk *a, Blk *b, int halfpiv)
{
	int i, o, cp, sp, idx;
	Blk *d, *l, *r;
	Msg m;

	l = nil;
	r = nil;
	if(waserror()){
		efreeblk(t, l);
		efreeblk(t, r);
		nexterror();
	}
	l = newblk(t, a->type, 0);
	r = newblk(t, a->type, 0);
	d = l;
	cp = 0;
	sp = -1;
	idx = 0;
	for(i = 0; i < a->nval; i++){
		getval(a, i, &m);
		if(d == l && (cp >= halfpiv || spillsbuf(d, a, b, &m, &idx))){
			sp = idx;
			d = r;
		}
		setval(d, &m);
		cp += valsz(&m);
	}
	for(i = 0; i < b->nval; i++){
		getval(b, i, &m);
		if(d == l && (cp >= halfpiv || spillsbuf(d, a, b, &m, &idx))){
			sp = idx;
			d = r;
		}
		setval(d, &m);
		cp += valsz(&m);
	}
	if(a->type == Tpivot){
		d = l;
		o = 0;
		for(i = 0; i < a->nbuf; i++){
			if(o == sp){
				d = r;
				o = 0;
			}
			getmsg(a, i, &m);
			setmsg(d, &m);
			o++;
		}
		for(i = 0; i < b->nbuf; i++){
			if(o == sp){
				d = r;
				o = 0;
			}
			getmsg(b, i, &m);
			setmsg(d, &m);
			o++;
		}
	}
	enqueue(l);
	enqueue(r);
	p->midx = midx;
	pp->op = POrot;
	pp->nl = l;
	pp->nr = r;
	poperror();
}

static void
rotmerge(Tree *t, Path *p, Path *pp, int idx, Blk *a, Blk *b)
{
	int na, nb, ma, mb, imbalance;

	assert(a->type == b->type);

	na = 2*a->nval + a->valsz;
	nb = 2*b->nval + b->valsz;
	if(a->type == Tleaf){
		ma = 0;
		mb = 0;
	}else{
		ma = 2*a->nbuf + a->bufsz;
		mb = 2*b->nbuf + b->bufsz;
	}
	imbalance = na - nb;
	if(imbalance < 0)
		imbalance *= -1;
	/* works for leaf, because 0 always < Bufspc */
	if(na + nb < (Pivspc - 4*Msgmax) && ma + mb < Bufspc)
		merge(t, p, pp, idx, a, b);
	else if(imbalance > 4*Msgmax)
		rotate(t, p, pp, idx, a, b, (na + nb)/2);
}

static void
trybalance(Tree *t, Path *p, Path *pp, int idx)
{
	Blk *l, *m, *r;
	Kvp kl, kr;
	int spc, fill;
	Bptr bp;

	if(p->idx == -1 || pp == nil || pp->nl == nil)
		return;
	if(pp->op != POmod || pp->op != POmerge)
		return;

	l = nil;
	r = nil;
	m = holdblk(pp->nl);
	if(waserror()){
		dropblk(m);
		dropblk(l);
		dropblk(r);
		nexterror();
	}
	spc = (m->type == Tleaf) ? Leafspc : Pivspc;
	if(idx-1 >= 0){
		getval(p->b, idx-1, &kl);
		bp = getptr(&kl, &fill);
		if(fill + blkfill(m) < spc){
			l = getblk(bp, 0);
			rotmerge(t, p, pp, idx-1, l, m);
			goto Done;
		}
	}
	if(idx+1 < p->b->nval){
		getval(p->b, idx+1, &kr);
		bp = getptr(&kr, &fill);
		if(fill + blkfill(m) < spc){
			r = getblk(bp, 0);
			rotmerge(t, p, pp, idx, m, r);
			goto Done;
		}
	}
Done:
	dropblk(m);
	dropblk(l);
	dropblk(r);
	poperror();
}

static Path*
flush(Tree *t, Path *path, int npath)
{

	Path *up, *p, *pp, *rp;
	Kvp mid;

	/*
	 * The path must contain at minimum two elements:
	 * we must have 1 node we're inserting into, and
	 * an empty element at the top of the path that
	 * we put the new root into if the root gets split.
	 */
	assert(npath >= 2);
	rp = nil;
	pp = nil;
	p = &path[npath - 1];
	up = &path[npath - 2];
	if(p->b->type == Tleaf){
		if(!filledleaf(p->b, up->sz)){
			updateleaf(t, p-1, p);
			enqueue(p->nl);
			rp = p;
		}else{
			splitleaf(t, up, p, &mid);
			enqueue(p->nl);
			enqueue(p->nr);
		}
		p->midx = -1;
		pp = p;
		up--;
		p--;
	}
	while(p != path){
		if(!filledpiv(p->b, 1)){
			trybalance(t, p, pp, p->idx);
			/* If we merged the root node, break out. */
			if(up == path && pp != nil && pp->op == POmerge && p->b->nval == 2){
				rp = pp;
				goto Out;
			}
			updatepiv(t, up, p, pp);
			enqueue(p->nl);
			rp = p;
		}else{
			splitpiv(t, up, p, pp, &mid);
			enqueue(p->nl);
			enqueue(p->nr);
		}
		pp = p;
		up--;
		p--;
	}
	if(pp->nl != nil && pp->nr != nil){
		rp = &path[0];
		rp->nl = newblk(t, Tpivot, 0);
		rp->npull = pp->npull;
		rp->pullsz = pp->pullsz;
		copyup(rp->nl, pp, nil);
		enqueue(rp->nl);
	}
Out:
	return rp;
}

static void
freepath(Tree *t, Path *path, int npath)
{
	Path *p;

	for(p = path; p != path + npath; p++){
		if(p->b != nil)
			freeblk(t, p->b, p->b->bp);
		if(p->m != nil)
			freeblk(t, p->b, p->m->bp);
		dropblk(p->b);
		dropblk(p->nl);
		dropblk(p->nr);
	}
	free(path);
}

/*
 * Select child node that with the largest message
 * segment in the current node's buffer.
 */
static void
victim(Blk *b, Path *p)
{
	int i, j, lo, maxsz, cursz;
	Kvp kv;
	Msg m;

	j = 0;
	maxsz = 0;
	p->b = b;
	/* 
	 * Start at the second pivot: all values <= this
	 * go to the first node. Stop *after* the last entry,
	 * because entries >= the last entry all go into it.
	 */
	for(i = 1; i <= b->nval; i++){
		if(i < b->nval)
			getval(b, i, &kv);
		cursz = 0;
		lo = j;
		for(; j < b->nbuf; j++){
			getmsg(b, j, &m);
			if(i < b->nval && keycmp(&m, &kv) >= 0)
				break;
			/* 2 bytes for offset, plus message size in buffer */
			cursz += msgsz(&m);
		}
		if(cursz > maxsz){
			maxsz = cursz;
			p->op = POmod;
			p->lo = lo;
			p->hi = j;
			p->sz = maxsz;
			p->idx = i - 1;
			p->midx = i - 1;
			p->npull = 0;
			p->pullsz = 0;
		}
	}
}

static void
fastupsert(Tree *t, Blk *b, Msg *msg, int nmsg)
{
	int i, c, o, ri, lo, hi, mid, nbuf;
	Msg cmp;
	char *p;
	Blk *r;

	if((r = dupblk(t, b)) == nil)
		error(Enomem);

	nbuf = r->nbuf;
	for(i = 0; i < nmsg; i++)
		setmsg(r, &msg[i]);

	for(i = 0; i < nmsg; i++){
		ri = -1;
		lo = 0;
		hi = nbuf+i-1;
		while(lo <= hi){
			mid = (hi + lo) / 2;
			getmsg(r, mid, &cmp);
			c = keycmp(&msg[i], &cmp);
			switch(c){
			case -1:
				hi = mid-1;
				break;
			case 0:
				ri = mid+1;
				lo = mid+1;
				break;
			case 1:
				lo = mid+1;
				break;
			}
		}
		if(ri == -1)
			ri = hi+1;
		p = r->data + Pivspc + 2*(nbuf+i);
		o = UNPACK16(p);
		p = r->data + Pivspc + 2*ri;
		memmove(p+2, p, 2*(nbuf+i-ri));
		PACK16(p, o);
	}
	enqueue(r);

	lock(&t->lk);
	t->bp = r->bp;
	t->dirty = 1;
	unlock(&t->lk);

	freeblk(t, b, b->bp);
	dropblk(b);
	dropblk(r);
}
	

void
btupsert(Tree *t, Msg *msg, int nmsg)
{
	int i, npath, npull, dh, sz, height;
	Path *path, *rp;
	Blk *b, *rb;
	Kvp sep;
	Bptr bp;

	sz = 0;
	stablesort(msg, nmsg);
	for(i = 0; i < nmsg; i++)
		sz += msgsz(&msg[i]);
	npull = 0;
	path = nil;
	npath = 0;

Again:
	if(waserror()){
		freepath(t, path, npath);
		nexterror();
	}

	b = getroot(t, &height);
	if(b->type == Tpivot && !filledbuf(b, nmsg, sz)){
		fastupsert(t, b, msg, nmsg);
		poperror();
		return;
	}
	/*
	 * The tree can grow in height by 1 when we
	 * split, so we allocate room for one extra
	 * node in the path.
	 */
	npath = 0;
	if((path = calloc((height + 2), sizeof(Path))) == nil)
		error(Enomem);
	path[npath].b = nil;
	path[npath].idx = -1;
	path[npath].midx = -1;
	npath++;

	path[0].sz = sz;
	path[0].ins = msg;
	path[0].lo = npull;
	path[0].hi = nmsg;
	while(b->type == Tpivot){
		if(!filledbuf(b, nmsg, path[npath - 1].sz))
			break;
		victim(b, &path[npath]);
		getval(b, path[npath].idx, &sep);
		bp = unpackbp(sep.v, sep.nv);
		b = getblk(bp, 0);
		npath++;
	}
	path[npath].b = b;
	path[npath].idx = -1;
	path[npath].midx = -1;
	path[npath].lo = -1;
	path[npath].hi = -1;
	path[npath].npull = 0;
	path[npath].pullsz = 0;
	npath++;

	dh = -1;
	rp = flush(t, path, npath);
	rb = rp->nl;

	if(path[0].nl != nil)
		dh = 1;
	else if(path[1].nl != nil)
		dh = 0;
	else if(npath >2 && path[2].nl != nil)
		dh = -1;
	else
		fatal("broken path change");

	assert(rb->bp.addr != 0);
	assert(rb->bp.addr != 0);

	lock(&t->lk);
	t->ht += dh;
	t->bp = rb->bp;
	t->dirty = 1;
	unlock(&t->lk);

	npull += rp->npull;
	freepath(t, path, npath);
	poperror();

	if(npull != nmsg)
		goto Again;
}

Blk*
getroot(Tree *t, int *h)
{
	Bptr bp;

	lock(&t->lk);
	bp = t->bp;
	if(h != nil)
		*h = t->ht;
	unlock(&t->lk);

	return getblk(bp, 0);
}

int
btlookup(Tree *t, Key *k, Kvp *r, char *buf, int nbuf)
{
	int i, j, h, ok, same;
	Blk *b, **p;
	Bptr bp;
	Msg m;

	b = getroot(t, &h);
	if((p = calloc(h, sizeof(Blk*))) == nil){
		dropblk(b);
		error(Enomem);
	}
	ok = 0;
	p[0] = holdblk(b);
	for(i = 1; i < h; i++){
		if(blksearch(p[i-1], k, r, &same) == -1)
			break;
		bp = unpackbp(r->v, r->nv);
		p[i] = getblk(bp, 0);
	}
	if(p[h-1] != nil)
		blksearch(p[h-1], k, r, &ok);
	if(ok)
		cpkvp(r, r, buf, nbuf);
	for(i = h-2; i >= 0; i--){
		if(p[i] == nil)
			continue;
		j = bufsearch(p[i], k, &m, &same);
		if(j < 0 || !same)
			continue;
		if(!(ok || m.op == Oinsert))
			fatal("lookup %K << %M missing insert\n", k, &m);
		ok = apply(r, &m, buf, nbuf);
		for(j++; j < p[i]->nbuf; j++){
			getmsg(p[i], j, &m);
			if(keycmp(k, &m) != 0)
				break;
			ok = apply(r, &m, buf, nbuf);
		}
	}
	for(i = 0; i < h; i++)
		if(p[i] != nil)
			dropblk(p[i]);
	dropblk(b);
	free(p);
	return ok;
}

void
btnewscan(Scan *s, char *pfx, int npfx)
{
	memset(s, 0, sizeof(*s));
	s->first = 1;
	s->donescan = 0;
	s->offset = 0;
	s->pfx.k = s->pfxbuf;
	s->pfx.nk = npfx;
	memmove(s->pfxbuf, pfx, npfx);

	s->kv.v = s->kvbuf+npfx;
	s->kv.nv = 0;
	cpkey(&s->kv, &s->pfx, s->kvbuf, sizeof(s->kvbuf));
}

void
btenter(Tree *t, Scan *s)
{
	int i, same;
	Scanp *p;
	Msg m, c;
	Bptr bp;
	Blk *b;
	Kvp v;

	if(s->donescan)
		return;
	b = getroot(t, &s->ht);
	if((s->path = calloc(s->ht, sizeof(Scanp))) == nil){
		dropblk(b);
		error(Enomem);
	}
	p = s->path;
	p[0].b = b;
	for(i = 0; i < s->ht; i++){
		p[i].vi = blksearch(b, &s->kv, &v, &same);
		if(b->type == Tpivot){
			if(p[i].vi == -1)
				getval(b, ++p[i].vi, &v);
			p[i].bi = bufsearch(b, &s->kv, &m, &same);
			if(p[i].bi == -1){
				p[i].bi++;
			}else if(!same || !s->first){
				/* scan past repeated messages */
				while(p[i].bi < p[i].b->nbuf){
					getmsg(p[i].b, p[i].bi, &c);
					if(keycmp(&m, &c) != 0)
						break;
					p[i].bi++;
				}
			}
			bp = unpackbp(v.v, v.nv);
			b = getblk(bp, 0);
			p[i+1].b = b;
		}else if(p[i].vi == -1 || !same || !s->first)
			p[i].vi++;
	}
	s->first = 0;
}

int
btnext(Scan *s, Kvp *r)
{
	int i, j, h, ok, start, bufsrc;
	Scanp *p;
	Msg m, n;
	Bptr bp;
	Kvp kv;

Again:
	p = s->path;
	h = s->ht;
	start = h;
	bufsrc = -1;
	if(s->donescan)
		return 0;
	if(waserror()){
		btexit(s);
		nexterror();
	}
	/* load up the correct blocks for the scan */
	for(i = h-1; i >= 0; i--){
		if(p[i].b != nil
		&&(p[i].vi < p[i].b->nval || p[i].bi < p[i].b->nbuf))
			break;
		if(i == 0){
			s->donescan = 1;
			poperror();
			return 0;
		}
		if(p[i].b != nil)
			dropblk(p[i].b);
		p[i].b = nil;
		p[i].vi = 0;
		p[i].bi = 0;
		p[i-1].vi++;
		start = i;
	}

	if(p[start-1].vi < p[start-1].b->nval){
		for(i = start; i < h; i++){
			getval(p[i-1].b, p[i-1].vi, &kv);
			bp = unpackbp(kv.v, kv.nv);
			p[i].b = getblk(bp, 0);
		}
	
		/* find the minimum key along the path up */
		m.op = Oinsert;
		getval(p[h-1].b, p[h-1].vi, &m);
	}else{
		getmsg(p[start-1].b, p[start-1].bi, &m);
		assert(m.op == Oinsert);
		bufsrc = start-1;
	}

	for(i = h-2; i >= 0; i--){
		if(p[i].b == nil || p[i].bi == p[i].b->nbuf)
			continue;
		getmsg(p[i].b, p[i].bi, &n);
		if(keycmp(&n, &m) < 0){
			bufsrc = i;
			m = n;
		}
	}
	if(m.nk < s->pfx.nk || memcmp(m.k, s->pfx.k, s->pfx.nk) != 0){
		s->donescan = 1;
		poperror();
		return 0;
	}

	/* scan all messages applying to the message */
	ok = 1;
	cpkvp(r, &m, s->kvbuf, sizeof(s->kvbuf));
	if(bufsrc == -1)
		p[h-1].vi++;
	else
		p[bufsrc].bi++;
	for(i = h-2; i >= 0; i--){
		for(j = p[i].bi; p[i].b != nil && j < p[i].b->nbuf; j++){
			getmsg(p[i].b, j, &m);
			if(keycmp(r, &m) != 0)
				break;
			ok = apply(r, &m, s->kvbuf, sizeof(s->kvbuf));
			p[i].bi++;
		}
	}
	poperror();
	if(!ok)
		goto Again;
	return 1;
}

void
btexit(Scan *s)
{
	int i;

	for(i = 0; i < s->ht; i++)
		dropblk(s->path[i].b);
	free(s->path);
}