shithub: mq

Download patch

ref: 668a30f10fce153d238bd784f632338d735ad6e3
parent: 58eb78b4731e54f81945f648e1b3dc39b8c1c477
author: kvik <kvik@a-b.xyz>
date: Mon May 24 12:53:50 EDT 2021

mq: various code quality improvements

--- a/src/list.c
+++ b/src/list.c
@@ -4,56 +4,33 @@
 #include "list.h"
 #include "util.h"
 
-List*
-listalloc(void)
+Listelem*
+listinit(Listelem *l)
 {
-	List *n;
-
-	n = emalloc(sizeof(List));
-	n->tag = Listlead;
-	n->link = n;
-	n->tail = n;
-	return n;
+	l->front = l->back = l;
+	return l;
 }
 
-List*
-listlink(List *p, List *n)
+Listelem*
+listlink(Listelem *list, Listelem *n)
 {
-	n->link = p->link;
-	p->link = n;
-	n->tail = p;
-	n->link->tail = n;
+	n->front = list->front;
+	n->back = list;
+	((Listelem*)list->front)->back = n;
+	list->front = n;
 	return n;
 }
 
-List*
-listunlink(List *p)
+Listelem*
+listunlink(Listelem *n)
 {
-	p->link->tail = p->tail;
-	p->tail->link = p->link;
-	return p;
+	((Listelem*)n->front)->back = n->back;
+	((Listelem*)n->back)->front = n->front;
+	return n;
 }
 
 int
-listisempty(List *p)
+listisempty(Listelem *list)
 {
-	return p->link == p;
-}
-
-int
-listislead(List *p)
-{
-	return p->tag == Listlead;
-}
-
-int
-listisfirst(List *p)
-{
-	return p->tail->tag == Listlead;
-}
-
-int
-listislast(List *p)
-{
-	return p->link->tag == Listlead;
+	return list->front == list;
 }
--- a/src/list.h
+++ b/src/list.h
@@ -1,22 +1,23 @@
-enum { Listlead = 0xAA };
+typedef struct Listelem Listelem;
 
-typedef struct List List;
-
-/* Must be embedded at the top of struct */
-struct List {
-	uchar tag;
-	List *link;
-	List *tail;
+struct Listelem {
+	void *front;
+	void *back;
 };
 
-/* What. */
-#define foreach(type, list) \
-	for(type ptr = listislead((list)) ? (type)(list)->link : (list); ptr->tag != Listlead; ptr = (type)ptr->link)
+Listelem* listinit(Listelem*);
+Listelem* listlink(Listelem*, Listelem*);
+Listelem* listunlink(Listelem*);
+int listisempty(Listelem*);
+int listisfirst(Listelem*, Listelem*);
+int listislast(Listelem*, Listelem*);
 
-List* listalloc(void);
-List* listlink(List*, List*);
-List* listunlink(List*);
-int listisempty(List*);
-int listislead(List*);
-int listisfirst(List*);
-int listislast(List*);
+#define listeach(type, sentinel, ptr) \
+	for(type _next = (sentinel)->front; \
+	    (ptr) = _next, _next = (ptr)->front, (ptr) != (sentinel); )
+
+#define listrange(type, sentinel, ptr) \
+	for(type _next; \
+	    _next = (ptr)->front, (ptr) != (sentinel); \
+	    (ptr) = _next)
+
--- a/src/mq.c
+++ b/src/mq.c
@@ -13,40 +13,41 @@
 typedef struct Read Read;
 typedef struct Write Write;
 
-struct Group {
-	Stream *streams;
-	Stream *order;
+struct Read {
+	Listelem;
 
-	enum {Message, Coalesce} mode;
-	enum {Replayoff, Replaylast, Replayall} replay;
+	Req *req;
 };
 
+struct Write {
+	Listelem;
+
+	/* Twrite.ifcall */
+	vlong offset;
+	uint count;
+	uchar *data;
+};
+
 struct Stream {
-	List;
+	Listelem;
 
-	Group *parent;
+	Group *group;
 	Write *wqueue;
 	Read *rqueue;
 };
 
-struct Client {
-	Write *cursor;
-	vlong offset;
-};
+struct Group {
+	Stream *streams;
+	Stream *order;
 
-struct Read {
-	List;
-
-	Req *r;
+	enum {Message, Coalesce} mode;
+	enum {Replayoff, Replaylast, Replayall} replay;
 };
 
-struct Write {
-	List;
-
-	/* Twrite.ifcall */
+struct Client {
+	Write *cursor;
 	vlong offset;
-	uint count;
-	uchar *data;
+	int blocked;
 };
 
 enum {
@@ -66,7 +67,7 @@
 	 * This depends on the 9pfile(2) library generating
 	 * simple incremental qid paths.
 	 */
-	f->qid.path &= ~(uvlong)0xF<<60;
+	f->qid.path &= ~((uvlong)0xF<<60);
 	f->qid.path |= (uvlong)(type&0xF)<<60;
 }
 
@@ -77,7 +78,7 @@
 }
 
 File*
-groupcreate(File *parent, char *name, char *uid, ulong perm)
+groupcreate(File *dir, char *name, char *uid, ulong perm)
 {
 	Stream *streamalloc(Group*);
 	void *streamclose(Stream*);
@@ -85,18 +86,18 @@
 	Group *group;
 
 	group = emalloc(sizeof(Group));
-	group->streams = (Stream*)listalloc();
-	group->order = (Stream*)streamalloc(group);
+	group->streams = (Stream*)listinit(emalloc(sizeof(Stream)));
+	group->order = streamalloc(group);
 	group->mode = Message;
 	group->replay = Replayoff;
 
 	ctl = order = nil;
 	if(strcmp(name, "/") == 0){
-		d = parent;
+		d = dir;
 		d->aux = group;
 	}
 	else
-		d = createfile(parent, name, uid, perm, group);
+		d = createfile(dir, name, uid, perm, group);
 	if(d == nil)
 		goto err;
 	filesettype(d, Qgroup);
@@ -113,7 +114,6 @@
 
 	return d;
 err:
-	free(group->streams);
 	streamclose(group->order);
 	if(d) closefile(d);
 	if(ctl) closefile(ctl);
@@ -122,22 +122,20 @@
 }
 
 void
-groupclose(File *f)
+groupclose(Group *g)
 {
-	Group *group = f->aux;
-
-	free(group);
+	free(g);
 }
 
 Stream*
-streamalloc(Group *group)
+streamalloc(Group *g)
 {
 	Stream *s;
 	
 	s = emalloc(sizeof(Stream));
-	s->parent = group;
-	s->wqueue = (Write*)listalloc();
-	s->rqueue = (Read*)listalloc();
+	s->group = g;
+	s->wqueue = (Write*)listinit(emalloc(sizeof(Write)));
+	s->rqueue = (Read*)listinit(emalloc(sizeof(Read)));
 	return s;
 }
 
@@ -147,134 +145,133 @@
 	Read *r;
 	Write *w;
 
-	listunlink(s);
-	if(s->rqueue)
-	foreach(Read*, s->rqueue){
-		/* eof these? */
-		r = ptr;
-		ptr = (Read*)r->tail;
+	listeach(Read*, s->rqueue, r){
 		listunlink(r);
 		free(r);
 	}
-	free(s->rqueue);
-	if(s->wqueue)
-	foreach(Write*, s->wqueue){
-		w = ptr;
-		ptr = (Write*)w->tail;
+	free(s->wqueue);
+	listeach(Write*, s->wqueue, w){
 		listunlink(w);
 		free(w);
 	}
 	free(s->wqueue);
+	listunlink(s);
 	free(s);
 }
 
 File*
-streamcreate(File *parent, char *name, char *uid, ulong perm)
+streamcreate(File *dir, char *name, char *uid, ulong perm)
 {
 	File *f;
 	Group *group;
 	Stream *s;
 
-	group = parent->aux;
+	group = dir->aux;
 	s = streamalloc(group);
-	if((f = createfile(parent, name, uid, perm, s)) == nil){
+	if((f = createfile(dir, name, uid, perm, s)) == nil){
 		streamclose(s);
 		return nil;
 	}
-	listlink(group->streams, s);
 	filesettype(f, Qstream);
+	listlink(group->streams, s);
 	return f;
 }
 
 void
-streamopen(Stream *s, Req *r)
+streamopen(Stream *s, Req *req)
 {
 	Client *c;
 	
-	c = r->fid->aux = emalloc(sizeof(Client));
-	switch(s->parent->replay){
+	c = req->fid->aux = emalloc(sizeof(Client));
+	switch(s->group->replay){
 	case Replayoff:
-		c->cursor = (Write*)s->wqueue->tail; break;
-	case Replaylast:
-		c->cursor = (Write*)s->wqueue->tail->tail; break;
+		c->offset = 0;
+		c->blocked = 1;
+		c->cursor = nil;
+		break;
+
 	case Replayall:
-		c->cursor = (Write*)s->wqueue; break;
+		c->offset = 0;
+		if(listisempty(s->wqueue)){
+			c->blocked = 1;
+			c->cursor = nil;
+		}else{
+			c->blocked = 0;
+			c->cursor = s->wqueue->front;
+		}
+		break;
+
+	case Replaylast:
+		c->offset = 0;
+		if(listisempty(s->wqueue)){
+			c->blocked = 1;
+			c->cursor = nil;
+		}else{
+			c->blocked = 0;
+			c->cursor = s->wqueue->back;
+		}
+		break;
 	}
 }
 
-
 void
-respondmessage(Req *r)
+streamrespond(Req *req, int mode)
 {
-	int n;
-	Client *c = r->fid->aux;
-	Write *w = c->cursor;
-	
-	n = w->count;
-	if(n > r->ifcall.count)
-		n = r->ifcall.count;
-	r->ofcall.count = n;
-	memmove(r->ofcall.data, w->data, n);
-	respond(r, nil);
-}
-
-void
-respondcoalesce(Req *r)
-{
-	Client *c = r->fid->aux;
+	Client *c = req->fid->aux;
+	Stream *s = req->fid->file->aux;
 	Write *w;
-	/* request size and offset, chunk size and offset, total read */
-	vlong rn, ro, n, o, t;
+	/* request size, response buffer offset */
+	vlong rn, ro;
+	/* chunk size and offset, total read */
+	vlong n, o, t;
 
-	ro = 0; o = 0; t = 0;
-	rn = r->ifcall.count;
+	t = 0;
+	rn = req->ifcall.count;
+	ro = 0;
 	w = c->cursor;
-	foreach(Write*, w){
-		w = ptr;
-		for(o = c->offset; n = w->count - o, n > 0; o += n){
+	o = c->offset;
+	listrange(Write*, s->wqueue, w){
+		if(mode == Message && w != c->cursor)
+			break;
+		for(; n = w->count - o, n > 0; o += n, ro += n, t += n){
 			if(t == rn)
 				goto done;
 			if(n > rn - ro)
 				n = rn - ro;
-			memmove(r->ofcall.data+ro, w->data+o, n);
-			ro += n; t += n;
+			memmove(req->ofcall.data+ro, w->data+o, n);
 		}
-		c->offset = 0;
+		o = 0;
 	}
 done:
-	c->cursor = w;
+	req->ofcall.count = t;
+	respond(req, nil);
+	
+	/* Determine the Client state */
+	if(w == s->wqueue){
+		c->offset = 0;
+		c->blocked = 1;
+		c->cursor = nil;
+		return;
+	}
 	c->offset = o;
-	r->ofcall.count = t;
-	respond(r, nil);
+	c->blocked = 0;
+	c->cursor = w;
 }
 
 void
-streamread(Req *r)
+streamread(Req *req)
 {
-	File *f = r->fid->file;
-	Stream *s = f->aux;
-	Client *c = r->fid->aux;
-	Read *rd;
-
-	/* Delay the response if the wqueue is empty
-	 * or if we've already caught up, respond otherwise. */
-	switch(s->parent->mode){
-	case Message:
-		if(listisempty(s->wqueue) || listislast(c->cursor))
-			break;
-		c->cursor = (Write*)c->cursor->link;
-		respondmessage(r);
+	Client *c = req->fid->aux;
+	Stream *s = req->fid->file->aux;
+	Read *r;
+	
+	if(c->blocked){
+		r = emalloc(sizeof(Read));
+		r->req = req;
+		listlink(s->rqueue, r);
 		return;
-	case Coalesce:
-		if(listisempty(s->wqueue)
-		|| (listislast(c->cursor) && c->offset == c->cursor->count))
-			break;
-		respondcoalesce(r);
-		return;
 	}
-	rd = emalloc(sizeof(Read));
-	rd->r = r;
-	listlink(s->rqueue, rd);
+	streamrespond(req, s->group->mode);
 }
 
 Write*
@@ -288,64 +285,66 @@
 }
 
 void
-streamwrite(Req *r)
+streamwrite(Req *req)
 {
-	File *f = r->fid->file;
-	Stream *s = f->aux;
-	Group *group = s->parent;
-	Write *w, *o;
+	File *f = req->fid->file;
+	Stream *s = req->fid->file->aux;
+	Group *group = s->group;
+	Write *w, *wq, *o, *oq;
+	Read *r;
+	Client *c;
 	long n;
+	
+	wq = s->wqueue;
+	oq = group->order->wqueue;
 
-	/* Commit to wqueue */
-	w = writealloc(r->ifcall.count);
-	w->count = r->ifcall.count;
-	w->offset = r->ifcall.offset;
-	memmove(w->data, r->ifcall.data, w->count);
-	listlink(s->wqueue->tail, w);
+	/* Commit to queue */
+	w = writealloc(req->ifcall.count);
+	w->count = req->ifcall.count;
+	w->offset = req->ifcall.offset;
+	memmove(w->data, req->ifcall.data, w->count);
+	listlink(wq->back, w);
 
-	/* Commit to order */
+	/* Commit to group order queue */
 	n = strlen(f->name)+1;
 	o = writealloc(n);
 	o->offset = 0;
 	o->count = n;
 	memmove(o->data, f->name, n);
-	listlink(group->order->wqueue->tail, o);
-
+	listlink(oq->back, o);
+ 
 	/* Kick the blocked stream readers */
-	foreach(Read*, s->rqueue){
-		Client *c = ptr->r->fid->aux;
-
+	listeach(Read*, s->rqueue, r){
+		c = r->req->fid->aux;
+		
 		c->cursor = w;
 		c->offset = 0;
-		switch(group->mode){
-		case Message:
-			respondmessage(ptr->r); break;
-		case Coalesce:
-			respondcoalesce(ptr->r); break;
-		}
-		ptr = (Read*)ptr->tail;
-		free(listunlink(ptr->link));
+		c->blocked = 0;
+		streamrespond(r->req, group->mode);
+		listunlink(r);
+		free(r);
 	}
 
 	/* Kick the blocked order readers */
-	foreach(Read*, group->order->rqueue){
-		Client *c = ptr->r->fid->aux;
-
+	listeach(Read*, group->order->rqueue, r){
+		c = r->req->fid->aux;
+		
 		c->cursor = o;
-		respondmessage(ptr->r);
-		ptr = (Read*)ptr->tail;
-		free(listunlink(ptr->link));
+		c->offset = 0;
+		c->blocked = 0;
+		streamrespond(r->req, Message);
+		listunlink(r);
+		free(r);
 	}
 
-	r->ofcall.count = r->ifcall.count;
-	respond(r, nil);
+	req->ofcall.count = req->ifcall.count;
+	respond(req, nil);
 }
 
 void
-ctlread(Req *r)
+ctlread(Req *req)
 {
-	File *f = r->fid->file;
-	Group *group = f->aux;
+	Group *group = req->fid->file->aux;
 	char buf[256];
 
 	char *mode2str[] = {
@@ -359,8 +358,8 @@
 	};
 	snprint(buf, sizeof buf, "data %s\nreplay %s\n",
 		mode2str[group->mode], replay2str[group->replay]);
-	readstr(r, buf);
-	respond(r, nil);
+	readstr(req, buf);
+	respond(req, nil);
 }
 
 enum {
@@ -381,18 +380,17 @@
 };
 
 void
-ctlwrite(Req *r)
+ctlwrite(Req *req)
 {
-	File *f = r->fid->file;
-	Group *group = f->aux;
+	Group *group = req->fid->file->aux;
 	char *e = nil;
 	Cmdbuf *cmd;
 	Cmdtab *t;
 
-	cmd = parsecmd(r->ifcall.data, r->ifcall.count);
+	cmd = parsecmd(req->ifcall.data, req->ifcall.count);
 	t = lookupcmd(cmd, groupcmd, nelem(groupcmd));
 	if(t == nil){
-		respondcmderror(r, cmd, "%r");
+		respondcmderror(req, cmd, "%r");
 		free(cmd);
 		return;
 	}
@@ -441,92 +439,93 @@
 		break;
 	}}
 	free(cmd);
-	respond(r, e);
+	respond(req, e);
 }
 
 void
-xcreate(Req *r)
+xcreate(Req *req)
 {
-	char *name = r->ifcall.name;
-	char *uid = r->fid->uid;
-	ulong perm = r->ifcall.perm;
-	File *parent = r->fid->file;
+	char *name = req->ifcall.name;
+	char *uid = req->fid->uid;
+	ulong perm = req->ifcall.perm;
+	File *group = req->fid->file;
 	File *f = nil;
 
-	switch(filetype(parent)){
+	switch(filetype(group)){
 	case Qroot:
 	case Qgroup:
 		if(perm&DMDIR)
-			f = groupcreate(parent, name, uid, perm);
+			f = groupcreate(group, name, uid, perm);
 		else{
-			f = streamcreate(parent, name, uid, perm);
-			r->fid->file = f;
-			r->ofcall.qid = f->qid;
-			streamopen(f->aux, r);
+			f = streamcreate(group, name, uid, perm);
+			req->fid->file = f;
+			req->ofcall.qid = f->qid;
+			streamopen(f->aux, req);
 		}
 		break;
 	}
 	if(f == nil)
-		respond(r, "internal failure");
+		respond(req, "internal failure");
 	else
-		respond(r, nil);
+		respond(req, nil);
 }
 
 void
-xopen(Req *r)
+xopen(Req *req)
 {
-	File *f = r->fid->file;
+	File *f = req->fid->file;
 
 	switch(filetype(f)){
 	case Qstream:
 	case Qorder:
-		streamopen(f->aux, r);
+		streamopen(f->aux, req);
 		break;
 	}
-	respond(r, nil);
+	respond(req, nil);
 }
 
 void
-xwrite(Req *r)
+xwrite(Req *req)
 {
-	File *f = r->fid->file;
+	File *f = req->fid->file;
 
 	switch(filetype(f)){
 	case Qstream:
-		streamwrite(r);
+		streamwrite(req);
 		break;
 	case Qctl:
-		ctlwrite(r);
+		ctlwrite(req);
 		break;
 	default:
-		respond(r, "forbidden");
+		respond(req, "forbidden");
 		return;
 	}
 }
 
 void
-xread(Req *r)
+xread(Req *req)
 {
-	File *f = r->fid->file;
+	File *f = req->fid->file;
 
 	switch(filetype(f)){
 	case Qstream:
 	case Qorder:
-		streamread(r);
+		streamread(req);
 		break;
 	case Qctl:
-		ctlread(r);
+		ctlread(req);
 		break;
 	default:
-		respond(r, "forbidden");
+		respond(req, "forbidden");
 	}
 }
 
 void
-xflush(Req *r)
+xflush(Req *req)
 {
-	Req *old = r->oldreq;
+	Req *old = req->oldreq;
 	File *f = old->fid->file;
+	Read *r;
 
 	switch(filetype(f)){
 	case Qstream:
@@ -535,25 +534,26 @@
 
 		if(old->ifcall.type != Tread)
 			break;
-		foreach(Read*, s->rqueue){
-			if(ptr->r == old){
-				free(listunlink(ptr));
+		listeach(Read*, s->rqueue, r){
+			if(r->req == old){
+				listunlink(r);
+				free(r);
 				break;
 			}
 		}
 		respond(old, "interrupted");
 	}}
-	respond(r, nil);
+	respond(req, nil);
 }
 
 void
-xwstat(Req *r)
+xwstat(Req *req)
 {
-	File *w, *f = r->fid->file;
-	char *uid = r->fid->uid;
+	File *w, *f = req->fid->file;
+	char *uid = req->fid->uid;
 
-	/* To change name, must have write permission in parent. */
-	if(r->d.name[0] != '\0' && strcmp(r->d.name, f->name) != 0){
+	/* To change name, must have write permission in group. */
+	if(req->d.name[0] != '\0' && strcmp(req->d.name, f->name) != 0){
 		if((w = f->parent) == nil)
 			goto perm;
 		incref(w);
@@ -561,9 +561,9 @@
 			closefile(w);
 			goto perm;
 		}
-		if((w = walkfile(w, r->d.name)) != nil){
+		if((w = walkfile(w, req->d.name)) != nil){
 			closefile(w);
-			respond(r, "file already exists");
+			respond(req, "file already exists");
 			return;
 		}
 	}
@@ -571,47 +571,47 @@
 	/* To change group, must be owner and member of new group,
 	 * or leader of current group and leader of new group.
 	 * Second case cannot happen, but we check anyway. */
-	while(r->d.gid[0] != '\0' && strcmp(f->gid, r->d.gid) != 0){
+	while(req->d.gid[0] != '\0' && strcmp(f->gid, req->d.gid) != 0){
 		if(strcmp(uid, f->uid) == 0)
 			break;
 		if(strcmp(uid, f->gid) == 0)
-		if(strcmp(uid, r->d.gid) == 0)
+		if(strcmp(uid, req->d.gid) == 0)
 			break;
-		respond(r, "not owner");
+		respond(req, "not owner");
 		return;
 	}
 
 	/* To change mode, must be owner or group leader.
 	 * Because of lack of users file, leader=>group itself. */
-	if(r->d.mode != ~0 && f->mode != r->d.mode){
+	if(req->d.mode != ~0 && f->mode != req->d.mode){
 		if(strcmp(uid, f->uid) != 0)
 		if(strcmp(uid, f->gid) != 0){
-			respond(r, "not owner");
+			respond(req, "not owner");
 			return;
 		}
 	}
 
-	if(r->d.name[0] != '\0'){
+	if(req->d.name[0] != '\0'){
 		free(f->name);
-		f->name = estrdup(r->d.name);
+		f->name = estrdup(req->d.name);
 	}
-	if(r->d.uid[0] != '\0'){
+	if(req->d.uid[0] != '\0'){
 		free(f->uid);
-		f->uid = estrdup(r->d.uid);
+		f->uid = estrdup(req->d.uid);
 	}
-	if(r->d.gid[0] != '\0'){
+	if(req->d.gid[0] != '\0'){
 		free(f->gid);
-		f->gid = estrdup(r->d.gid);
+		f->gid = estrdup(req->d.gid);
 	}
-	if(r->d.mode != ~0){
-		f->mode = r->d.mode;
+	if(req->d.mode != ~0){
+		f->mode = req->d.mode;
 		f->qid.type = f->mode >> 24;
 	}
 
-	respond(r, nil);
+	respond(req, nil);
 	return;
 perm:
-	respond(r, "permission denied");
+	respond(req, "permission denied");
 }
 
 void
@@ -627,7 +627,7 @@
 {
 	switch(filetype(f)){
 	case Qgroup:
-		groupclose(f);
+		groupclose(f->aux);
 		break;
 	case Qstream:
 		streamclose(f->aux);