shithub: mq

Download patch

ref: 0ee1f70dd0a81d13210dd76568eeb6977606d1d7
parent: 86f2bdceaa324b0d6207593e439d8acd7456945f
author: kvik <kvik@a-b.xyz>
date: Tue Sep 15 10:59:08 EDT 2020

mq: implement 'coalesce' data mode

This implements the 'data coalesced' mode for the stream group.

Currently the order file is not useful in this mode since it only
communicates the names of streams which have some data in them.
For readers such as mq-cat this could mean moving the stream offset
past the global order presented by the order file, rendering it useless.

One solution that I can think of is exposing the length of the write in
the 'order' stream.  This would allow ordered reading of coalesced
streams.  However, naive reading would cancel out their only benefit.
The solution to /that/ is a reader who knows to combine the writes on
the same file that happened in sequence, therefore minimizing the number of read(2) calls needed.
An alternative solution is rewriting the 'order' stream on the server side.
This seems much harder to do right as we'd probably end up having to
maintain an independent order queue per reader.

--- a/TODO.md
+++ b/TODO.md
@@ -12,8 +12,6 @@
 	* manual clear and limits in replay mode
 		* drop very slow readers
 * Read
-    * coalesced mode
     * concurrent fid readers
       * same data, unique data
     * handling partial reads
- 
\ No newline at end of file
--- a/rc/pin
+++ b/rc/pin
@@ -38,7 +38,7 @@
 
 	mq = $1
 	mkdir -p $mq
-	echo replay on >$mq/ctl
+	echo replay all >$mq/ctl
 	touch $mq/^(0 1 2 note)
 	$cmd <$mq/0 >>$mq/1 >>[2]$mq/2 &
 	cat $mq/note >/proc/$apid/notepg &
--- a/src/list.c
+++ b/src/list.c
@@ -35,13 +35,25 @@
 }
 
 int
-listend(List *p)
+listisempty(List *p)
 {
-	return p->link->tag == Listlead;
+	return p->link == p;
 }
 
 int
-listempty(List *p)
+listislead(List *p)
 {
-	return p->link == p;
+	return p->tag == Listlead;
+}
+
+int
+listisfirst(List *p)
+{
+	return p->tail->tag == Listlead;
+}
+
+int
+listislast(List *p)
+{
+	return p->link->tag == Listlead;
 }
--- a/src/list.h
+++ b/src/list.h
@@ -11,10 +11,12 @@
 
 /* What. */
 #define foreach(type, list) \
-	for(type ptr = (type)(list)->link; ptr->tag != Listlead; ptr = (type)ptr->link)
+	for(type ptr = listislead((list)) ? (type)(list)->link : (list); ptr->tag != Listlead; ptr = (type)ptr->link)
 
 List* listalloc(void);
 List* listlink(List*, List*);
 List* listunlink(List*);
-int listempty(List*);
-int listend(List*);
+int listisempty(List*);
+int listislead(List*);
+int listisfirst(List*);
+int listislast(List*);
--- a/src/mq.c
+++ b/src/mq.c
@@ -18,6 +18,7 @@
 	Stream *order;
 
 	/* configuration */
+	enum {Message, Coalesce} mode;
 	enum {Replayoff, Replaylast, Replayall} replay;
 };
 
@@ -31,7 +32,8 @@
 };
 
 struct Client {
-	Write *cursor; /* reader position */
+	Write *cursor; /* current chunk */
+	vlong offset; /* chunk offset; for coalesce mode */
 };
 
 struct Read {
@@ -85,6 +87,7 @@
 	mq = emalloc(sizeof(Mq));
 	mq->group = (Stream*)listalloc();
 	mq->order = (Stream*)streamalloc(mq);
+	mq->mode = Message;
 	mq->replay = Replayoff;
 
 	ctl = order = nil;
@@ -178,8 +181,11 @@
 }
 
 void
-respondread(Req *r, Write *w)
+respondmessage(Req *r)
 {
+	Client *c = r->fid->aux;
+	Write *w = c->cursor;
+
 	r->ofcall.count = w->count;
 	memmove(r->ofcall.data, w->data, w->count);
 	respond(r, nil);
@@ -186,6 +192,36 @@
 }
 
 void
+respondcoalesce(Req *r)
+{
+	Client *c = r->fid->aux;
+	Write *w;
+	/* request size and offset, chunk size and offset, total read */
+	vlong rn, ro, n, o, t;
+
+	ro = o = t = 0;
+	rn = r->ifcall.count;
+	w = c->cursor;
+	foreach(Write*, w){
+		w = ptr;
+		for(o = c->offset; (n = w->count - o) > 0; o += n){
+			if(t == rn)
+				goto done;
+			if(n > rn - ro)
+				n = rn - ro;
+			memmove(r->ofcall.data+ro, w->data+o, n);
+			ro += n; t += n;
+		}
+		c->offset = 0;
+	}
+done:
+	c->cursor = w;
+	c->offset = o;
+	r->ofcall.count = t;
+	respond(r, nil);
+}
+
+void
 streamread(Req *r)
 {
 	File *f = r->fid->file;
@@ -194,15 +230,24 @@
 	Read *rd;
 
 	/* Delay the response if the queue is empty
-	 * or if we've already caught up. */
-	if(listempty(s->queue) || listend(c->cursor)){
-		rd = emalloc(sizeof(Read));
-		rd->r = r;
-		listlink(s->reads, rd);
+	 * or if we've already caught up, respond otherwise. */
+	switch(s->mq->mode){
+	case Message:
+		if(listisempty(s->queue) || listislast(c->cursor))
+			break;
+		c->cursor = (Write*)c->cursor->link;
+		respondmessage(r);
 		return;
+	case Coalesce:
+		if(listisempty(s->queue)
+		|| (listislast(c->cursor) && c->offset == c->cursor->count))
+			break;
+		respondcoalesce(r);
+		return;
 	}
-	c->cursor = (Write*)c->cursor->link;
-	respondread(r, c->cursor);
+	rd = emalloc(sizeof(Read));
+	rd->r = r;
+	listlink(s->reads, rd);
 }
 
 Write*
@@ -243,9 +288,16 @@
 	foreach(Read*, s->reads){
 		Client *c = ptr->r->fid->aux;
 
-		respondread(ptr->r, w);
 		c->cursor = w;
-		listunlink(ptr);
+		c->offset = 0;
+		switch(mq->mode){
+		case Message:
+			respondmessage(ptr->r); break;
+		case Coalesce:
+			respondcoalesce(ptr->r); break;
+		}
+		ptr = (Read*)ptr->tail;
+		free(listunlink(ptr->link));
 	}
 
 	/* Kick the blocked order readers */
@@ -252,9 +304,10 @@
 	foreach(Read*, mq->order->reads){
 		Client *c = ptr->r->fid->aux;
 
-		respondread(ptr->r, o);
 		c->cursor = o;
-		listunlink(ptr);
+		respondmessage(ptr->r);
+		ptr = (Read*)ptr->tail;
+		free(listunlink(ptr->link));
 	}
 
 	r->ofcall.count = r->ifcall.count;
@@ -262,10 +315,13 @@
 }
 
 enum {
+	Cmddata,
 	Cmdreplay,
 	Cmddebug, Cmddebug9p,
 };
 Cmdtab mqcmd[] = {
+	/* data message|coalesce */
+	{Cmddata, "data", 2},
 	/* replay off|last|all */
 	{Cmdreplay, "replay", 2},
 
@@ -292,6 +348,16 @@
 		return;
 	}
 	switch(t->index){
+	case Cmddata: {
+		if(strncmp(cmd->f[1], "message", 7) == 0)
+			mq->mode = Message;
+		else
+		if(strncmp(cmd->f[1], "coalesce", 8) == 0)
+			mq->mode = Coalesce;
+		else
+			e = "usage: data message|coalesce";
+		break;
+	}
 	case Cmdreplay: {
 		if(strncmp(cmd->f[1], "off", 3) == 0)
 			mq->replay = Replayoff;