shithub: mq

Download patch

ref: 23227ed96374185ecc9fd4410c7ba3ed6491c0f9
author: Ori Bernstein <ori@eigenstate.org>
date: Tue Dec 6 18:28:07 EST 2022

initial commit

--- /dev/null
+++ b/mkfile
@@ -1,0 +1,6 @@
+</$objtype/mkfile
+
+TARG=mq
+OFILES=mq.$O
+
+</sys/src/cmd/mkone
--- /dev/null
+++ b/mq.c
@@ -1,0 +1,464 @@
+#include <u.h>
+#include <libc.h>
+#include <fcall.h>
+#include <thread.h>
+#include <9p.h>
+
+typedef struct Mq Mq;
+typedef struct Msg Msg;
+typedef struct Aux Aux;
+
+enum {
+	Qroot,
+};
+
+struct Aux {
+	Mq	*q;
+	int	id;
+};
+
+struct Msg {
+	Ref;
+	Msg	*next;
+	int	count;
+	char	data[];
+};
+
+struct Mq {
+	Qid	qid;
+	int	count;
+	usize	logsz;
+	Msg	*loghd;
+	Msg	*logtl;
+	Msg	*hd;
+	Msg	*tl;
+
+	int	nrd;
+	int	*rd;
+	Msg	**rhd;
+	Msg	**rtl;
+	Req	**wait;
+
+	char	*name;
+	char	*user;
+	char	*group;
+	int	mode;
+};
+
+Mq	**queues;
+int	nqueues;
+vlong	maxlog = -1;
+vlong	queueid = Qroot + 1;
+
+char Ebaduse[] = "invalid use of fd";
+char Einuse[] = "fid in use";
+char Eexist[] = "file already exists";
+char Enoexist[] = "file does not exists";
+
+void *
+emalloc(ulong n)
+{
+	void *v;
+	
+	v = mallocz(n, 1);
+	if(v == nil)
+		sysfatal("malloc: %r");
+	setmalloctag(v, getcallerpc(&n));
+	return v;
+}
+
+void *
+erealloc(void *p, ulong n)
+{
+	void *v;
+	
+	v = realloc(p, n);
+	if(v == nil)
+		sysfatal("realloc: %r");
+	setmalloctag(v, getcallerpc(&p));
+	return v;
+}
+
+char*
+estrdup(char *s)
+{
+	s = strdup(s);
+	if(s == nil)
+		sysfatal("strdup: %r");
+	setmalloctag(s, getcallerpc(&s));
+	return s;
+}
+
+Msg*
+msgref(Msg *m)
+{
+	incref(m);
+	return m;
+}
+
+void
+msgunref(Msg *m)
+{
+	if(decref(m) == 0)
+		free(m);
+}
+
+void
+trimlog(Mq *q)
+{
+	Msg *m, *n;
+
+	if(maxlog < 0)
+		return;
+	n = nil;
+	for(m = q->loghd; m != nil && q->logsz >= maxlog; m = n){
+		n = m->next;
+		q->logsz -= m->count;
+		msgunref(m);
+	}
+	q->loghd = n;
+	if(m == nil)
+		q->logtl = nil;
+}
+
+int
+subscribe(Mq *q)
+{
+	Msg *m;
+	int i;
+
+	for(i = 0; i < q->nrd; i++)
+		if(q->rd[i] != -1)
+			return i;
+	q->rd = erealloc(q->rd, (q->nrd+1)*sizeof(*q->rd));
+	q->wait = erealloc(q->rhd, (q->nrd+1)*sizeof(*q->wait));
+	q->rhd = erealloc(q->rhd, (q->nrd+1)*sizeof(*q->rhd));
+	q->rtl = erealloc(q->rtl, (q->nrd+1)*sizeof(*q->rtl));
+	q->wait[q->nrd] = nil;
+	q->rhd[q->nrd] = q->loghd;
+	q->rtl[q->nrd] = q->logtl;
+	for(m = q->loghd; m != nil; m = m->next)
+		msgref(m);
+	return q->nrd++;
+}
+
+Mq*
+lookup(char *name)
+{
+	int i;
+
+	for(i = 0; i < nqueues; i++)
+		if(strcmp(queues[i]->name, name) == 0)
+			return queues[i];
+	return nil;
+}
+
+void
+qstat(Dir *d, Mq *q)
+{
+	d->name = estrdup9p(q->name);
+	d->uid = estrdup9p("glenda");
+	d->gid = estrdup9p("glenda");
+	d->muid = estrdup9p("glenda");
+	d->qid = q->qid;
+	d->mtime = 0;
+	d->atime = 0;
+	d->mode = q->mode;
+}
+
+int
+rootgen(int i, Dir *d, void *)
+{
+	if(i >= nqueues)
+		return -1;
+	qstat(d, queues[i]);
+	return 0;
+}
+	
+
+char*
+mqclone(Fid *old, Fid *new)
+{
+	Aux *o, *n;
+
+	o = old->aux;
+	if(o != nil){
+		n = emalloc(sizeof(Aux));
+		n->q = o->q;
+		n->id = subscribe(o->q);
+		new->aux = n;
+	}
+	return nil;
+}
+
+char*
+mqwalk1(Fid *f, char *name, Qid *qid)
+{
+	Mq *q;
+
+	switch(f->qid.path){
+	case Qroot:
+		if(strcmp(name, "..") == 0){
+			*qid = f->qid;
+			return nil;
+		}
+		if((q = lookup(name)) == nil)
+			return Enoexist;
+		f->qid = q->qid;
+		*qid = f->qid;
+		return nil;
+	default:
+		if(strcmp(name, "..") == 0){
+			f->qid = (Qid){Qroot, 0, QTDIR};
+			*qid = f->qid;
+			return nil;
+		}
+		return "not a dir";
+	}	
+}
+
+void
+mqstat(Req *r)
+{
+	switch(r->fid->qid.path){
+	case Qroot:
+		r->d.uid = estrdup9p("glenda");
+		r->d.gid = estrdup9p("glenda");
+		r->d.muid = estrdup9p("glenda");
+		r->d.qid = r->fid->qid;
+		r->d.mtime = 0;
+		r->d.atime = 0;
+		r->d.mode = 0755;
+		break;
+	default:
+		qstat(&r->d, ((Aux*)r->fid->aux)->q);
+	}
+	respond(r, nil);
+}
+
+void
+mqflush(Req *r)
+{
+	Req *w;
+	Aux *a;
+
+	if((a = r->oldreq->fid->aux) != nil){
+		w = a->q->wait[a->id];
+		if(w != nil)
+			respond(w, "interrupted");
+		a->q->wait[a->id] = nil;
+	}
+	respond(r, nil);
+}
+
+void
+mqremove(Req *r)
+{
+	USED(r);
+	abort();
+}
+
+void
+mqwrite(Req *r)
+{
+	Msg *m;
+	Req *rr;
+	Aux *a;
+	Mq *q;
+	int i;
+
+	if((a = r->fid->aux) == nil){
+		respond(r, Ebaduse);
+		return;
+	}
+	q = a->q;
+	m = emalloc(sizeof(Msg) + r->ifcall.count);
+	m->count = r->ifcall.count;
+	memcpy(m->data, r->ifcall.data, m->count);
+	m->next = nil;
+	for(i = 0; i < q->nrd; i++){
+		rr = q->wait[i];
+		if(rr != nil){
+			rr->ofcall.data = r->ifcall.data;
+			rr->ofcall.count = r->ifcall.count;
+			respond(rr, nil);
+			q->wait[i] = nil;
+		}else{
+			if(q->rhd[i] == nil)
+				q->rhd[i] = m;
+			if(q->rtl[i] != nil)
+				q->rtl[i]->next = m;
+			q->rtl[i] = m;
+			msgref(m);
+		}
+	}
+	if(q->loghd == nil)
+		q->loghd = m;
+	if(q->logtl != nil)
+		q->logtl->next = m;
+	q->logtl = msgref(m);
+	q->logsz += m->count;
+	q->logtl = m;
+
+	trimlog(q);
+	q->qid.vers++;
+	r->ofcall.count = r->ifcall.count;
+	respond(r, nil);
+}
+
+void
+mqread(Req *r)
+{
+	Aux *a;
+	Msg *m;
+	Mq *q;
+
+	if(r->fid->qid.path == Qroot){
+		dirread9p(r, rootgen, nil);
+		respond(r, nil);
+		return;
+	}
+	if((a = r->fid->aux) == nil){
+		respond(r, Ebaduse);
+		return;
+	}
+	q = a->q;
+	if(q->rhd[a->id] != nil){
+		m = q->rhd[a->id];
+		r->ofcall.data = m->data;
+		r->ofcall.count = m->count;
+		respond(r, nil);
+		q->rhd[a->id] = m->next;
+		if(q->rhd[a->id] == nil)
+			q->rtl[a->id] = nil;
+		msgunref(m);
+		return;
+	}
+	q->wait[a->id] = r;
+}
+
+void
+mqcreate(Req *r)
+{
+	Aux *a;
+	Mq *q;
+	int m;
+
+	if(lookup(r->ifcall.name) != nil){
+		respond(r, Eexist);
+		return;
+	}
+	m = r->ifcall.mode & OMASK;
+	q = emalloc(sizeof(Mq));
+	q->name = estrdup(r->ifcall.name);
+	q->mode = r->ifcall.mode;
+	q->qid.path = queueid++;
+	q->qid.vers = 0;
+	q->qid.type = QTFILE;
+	a = emalloc(sizeof(Aux));
+	a->q = q;
+	if(m == OREAD || m == ORDWR || m == OEXEC)
+		a->id = subscribe(q);
+	r->ofcall.qid = q->qid;
+	r->fid->qid = q->qid;
+	r->fid->aux = a;
+	queues = erealloc(queues, ++nqueues*sizeof(Mq*));
+	queues[nqueues-1] = q;
+
+	respond(r, nil);
+}
+
+void
+mqopen(Req *r)
+{
+	Aux *a;
+	vlong p;
+	int m;
+
+	if(r->fid->aux != nil){
+		respond(r, Einuse);
+		return;
+	}
+	m = r->ifcall.mode & OMASK;
+	p = r->fid->qid.path;
+	a = emalloc(sizeof(Aux));
+	if(p != Qroot){
+		a->q = queues[p-1];
+		if(m == OREAD || m == ORDWR || m == OMASK)
+			a->id = subscribe(a->q);
+		r->fid->aux = a;
+	}
+	r->ofcall.qid = r->fid->qid;
+	respond(r, nil);
+}
+
+void
+destroyfid(Fid *f)
+{
+	Aux *a;
+	int m;
+
+	a = f->aux;
+	m = f->omode & OMASK;
+	if(m != OREAD && m != ORDWR && m != OEXEC)
+		return;
+	if(a != nil)
+		a->q->rd[a->id] = -1;
+}
+
+void
+mqattach(Req *r)
+{
+	r->ofcall.qid = (Qid){Qroot, 0, QTDIR};
+	r->fid->qid = r->ofcall.qid;
+	r->fid->aux = nil;
+	respond(r, nil);
+}
+
+Srv mq  = {
+	.attach=mqattach,
+	.open=mqopen,
+	.create=mqcreate,
+	.read=mqread,
+	.write=mqwrite,
+	.remove=mqremove,
+	.flush=mqflush,
+	.stat=mqstat,
+	.walk1=mqwalk1,
+	.clone=mqclone,
+	.destroyfid=destroyfid,
+};
+
+void
+usage(void)
+{
+	fprint(2, "usage: %s [-s srv] [-m mtpt]\n", argv0);
+	exits("usage");
+}
+
+void
+main(int argc, char **argv)
+{
+	char *srvname, *mntpt;
+
+	srvname = nil;
+	mntpt = "/mnt/mq";
+	ARGBEGIN{
+	case 'd':
+		chatty9p++;
+		break;
+	case 's':
+		srvname=EARGF(usage());
+		break;
+	case 'm':
+		mntpt = EARGF(usage());
+		break;
+	case 'r':
+		maxlog = atoi(EARGF(usage()));
+		break;
+	default:
+		usage();
+	}ARGEND;
+
+	postmountsrv(&mq, srvname, mntpt, MCREATE|MREPL);
+}