shithub: mq

Download patch

ref: f0874a14c3685a08e98c3fcb1ebd81514b1e3ded
parent: 2d6c8ab83106892e1215dd0c94d9f24d6525a3fc
author: Ori Bernstein <ori@eigenstate.org>
date: Fri Apr 12 16:02:06 EDT 2024

mq: coalesce messages, add tailing attach spec

--- a/mq.c
+++ b/mq.c
@@ -23,6 +23,7 @@
 struct Aux {
 	Mq	*q;
 	int	id;
+	int	ntail;
 };
 
 struct Msg {
@@ -64,6 +65,7 @@
 int	nqueues;
 vlong	maxlog = -1;
 vlong	queueid = 0;
+int	coalesce;
 
 char Ebaduse[] = "invalid use of fd";
 char Einuse[] = "fid in use";
@@ -131,10 +133,11 @@
 }
 
 int
-subscribe(Mq *q)
+subscribe(Mq *q, vlong ntail)
 {
 	Msg *m;
 	Rd *rd;
+	vlong sz;
 	int i;
 
 	rd = nil;
@@ -151,10 +154,17 @@
 	rd->id = i;
 	rd->wait = nil;
 	rd->off = 0;
-	rd->hd = q->loghd;
-	rd->tl = q->logtl;
-	for(m = q->loghd; m != nil; m = m->next)
+	rd->hd = nil;
+	rd->tl = nil;
+	sz = q->logsz;
+	m = q->loghd;
+	if(ntail != -1)
+		for(; m != nil && sz > ntail; m = m->next)
+			sz -= m->count;
+	rd->hd = m;
+	for(; m != nil; m = m->next)
 		incref(m);
+	rd->tl = m;
 	return rd->id;
 }
 
@@ -170,7 +180,7 @@
 }
 
 void
-qstat(Dir *d, Mq *q)
+qstat(Dir *d, Mq *q, Aux *a)
 {
 	d->name = estrdup9p(q->name);
 	d->uid = estrdup9p("glenda");
@@ -180,14 +190,17 @@
 	d->mtime = 0;
 	d->atime = 0;
 	d->mode = q->mode;
+	d->length = q->logsz;
+	if(a->ntail < d->length)
+		d->length = a->ntail;
 }
 
 int
-rootgen(int i, Dir *d, void *)
+rootgen(int i, Dir *d, void *a)
 {
 	if(i >= nqueues)
 		return -1;
-	qstat(d, queues[i]);
+	qstat(d, queues[i], a);
 	return 0;
 }
 	
@@ -198,12 +211,13 @@
 	Aux *o, *n;
 
 	o = old->aux;
-	if(o != nil){
-		n = emalloc(sizeof(Aux));
+	n = emalloc(sizeof(Aux));
+	if(o->q != nil){
 		n->q = o->q;
-		n->id = subscribe(o->q);
-		new->aux = n;
+		n->id = subscribe(o->q, o->ntail);
 	}
+	n->ntail = o->ntail;
+	new->aux = n;
 	return nil;
 }
 
@@ -252,7 +266,7 @@
 	default:
 		r->d.mode = 0644;
 		incref(queues[QIDX(p)]);
-		qstat(&r->d, queues[QIDX(p)]);
+		qstat(&r->d, queues[QIDX(p)], r->fid->aux);
 		decref(queues[QIDX(p)]);
 	}
 	respond(r, nil);
@@ -356,13 +370,15 @@
 void
 mqread(Req *r)
 {
+	char *p, *e, *b;
 	Aux *a;
 	Msg *m;
 	Rd *rd;
 	Mq *q;
+	int n;
 
 	if(QTYPE(r->fid->qid.path) == Qroot){
-		dirread9p(r, rootgen, nil);
+		dirread9p(r, rootgen, r->fid->aux);
 		respond(r, nil);
 		return;
 	}
@@ -381,23 +397,34 @@
 	/* queued messages: pop data off */
 	rd = &q->rd[a->id];
 	m = rd->hd;
-	r->ofcall.data = m->data + rd->off;
-	r->ofcall.count = r->ifcall.count + rd->off;
-	if(r->ofcall.count > m->count)
-		r->ofcall.count = m->count;
-	respond(r, nil);
-
-	/* adjust offsets */
-	if(m->count > r->ifcall.count)
-		rd->off += r->ifcall.count;
-	else{
-		rd->off = 0;
+	p = emalloc(r->ifcall.count);
+	e = p + r->ifcall.count;
+	r->ofcall.data = p;
+	while(1){
+		assert(rd->off >= 0 && rd->off < m->count);
+		b = m->data + rd->off;
+		if(e - p >= m->count - rd->off){
+			n = m->count - rd->off;
+			rd->hd = m->next;
+			rd->off = 0;
+			if(rd->hd == nil)
+				rd->tl = nil;
+			if(decref(m) == 0)
+				free(m);
+		}else{
+			n = e - p;
+			rd->off += (e - p);
+		}
+		memcpy(p, b, n);
+		p += n;
 		rd->hd = m->next;
-		if(rd->hd == nil)
-			rd->tl = nil;
-		if(decref(m) == 0)
-			free(m);
+		m = rd->hd;
+		if(!coalesce || m == nil || e - p < m->count)
+			break;
+			
 	}
+	r->ofcall.count = p - r->ofcall.data;
+	respond(r, nil);
 }
 
 void
@@ -420,10 +447,11 @@
 	q->qid.type = QTFILE;
 	queueid++;
 
-	a = emalloc(sizeof(Aux));
+	a = r->fid->aux;
+	assert(a->q == nil);
 	a->q = q;
 	if(m == OREAD || m == ORDWR || m == OEXEC)
-		a->id = subscribe(q);
+		a->id = subscribe(q, a->ntail);
 	r->ofcall.qid = q->qid;
 	r->fid->qid = q->qid;
 	r->fid->aux = a;
@@ -440,19 +468,18 @@
 	vlong p;
 	int m;
 
-	if(r->fid->aux != nil){
+	a = r->fid->aux;
+	if(a->q != nil){
 		respond(r, Einuse);
 		return;
 	}
 	m = r->ifcall.mode & OMASK;
 	p = r->fid->qid.path;
-	a = emalloc(sizeof(Aux));
 	if(QTYPE(p) != Qroot){
 		incref(queues[QIDX(p)]);
 		a->q = queues[QIDX(p)];
 		if(m == OREAD || m == ORDWR || m == OMASK)
-			a->id = subscribe(a->q);
-		r->fid->aux = a;
+			a->id = subscribe(a->q, a->ntail);
 	}
 	r->ofcall.qid = r->fid->qid;
 	respond(r, nil);
@@ -468,7 +495,7 @@
 	m = f->omode & OMASK;
 	if(m != OREAD && m != ORDWR && m != OEXEC)
 		return;
-	if(a != nil)
+	if(a != nil && a->q != nil)
 		a->q->rd[a->id].id = -1;
 	free(a);
 }
@@ -476,9 +503,26 @@
 void
 mqattach(Req *r)
 {
+	Aux *a;
+	char *n, *e;
+
+	n = r->ifcall.aname;
+	a = emalloc(sizeof(Aux));
 	r->ofcall.qid = (Qid){Qroot, 0, QTDIR};
 	r->fid->qid = r->ofcall.qid;
-	r->fid->aux = nil;
+	r->fid->aux = a;
+	a->ntail = -1;
+	if(n != nil && strncmp(n, "tail:", 5) == 0){
+		a->ntail = strtol(n+5, &e, 0);
+		while(*e){
+			switch(*e++){
+			case 'g':	a->ntail *= 1024*1024*1024;
+			case 'm':	a->ntail *= 1024*1024;
+			case 'k':	a->ntail *= 1024;
+			default:	respond(r, "bad scale");
+			}
+		}
+	}
 	respond(r, nil);
 }
 
@@ -534,6 +578,9 @@
 			default:	sysfatal("unknown suffix %c", *e);
 			}
 		}
+		break;
+	case 'c':
+		coalesce = 1;
 		break;
 	default:
 		usage();