ref: 1af7a1aced139ad7c90a053241e46f2ea2aaa2b4
dir: /mq.c/
#include <u.h> #include <libc.h> #include <fcall.h> #include <thread.h> #include <9p.h> typedef struct Mq Mq; typedef struct Rd Rd; typedef struct Msg Msg; typedef struct Aux Aux; enum { Qroot, Qmq, }; enum { KiB = 1024, MiB = 1024*KiB, GiB = 1024*MiB, }; struct Aux { Mq *q; int id; int ntail; }; struct Msg { Ref; Msg *next; int count; char *data; char buf[]; }; struct Rd { int id; int off; Msg *hd; Msg *tl; Req *wait; }; struct Mq { Ref; Qid qid; int moribund; usize logsz; Msg *loghd; Msg *logtl; Msg *hd; Msg *tl; int nrd; Rd *rd; char *name; char *user; char *group; int mode; }; Mq **queues; int nqueues; vlong maxlog = -1; vlong queueid = 0; int coalesce; char Ebaduse[] = "invalid use of fd"; char Einuse[] = "fid in use"; char Eexist[] = "file already exists"; char Enoexist[] = "file does not exist"; char Eintr[] = "interrupted"; char Enotdir[] = "not a directory"; char Ebadcmd[] = "unknown command"; char Enomem[] = "out of memory"; #define QTYPE(p) ((int)((p) & 0x3)) #define QIDX(p) ((p)>>2) #define QPATH(i, t) ((i)<<2 | (t)) void * emalloc(ulong n) { void *v; v = mallocz(n, 1); if(v == nil) sysfatal("malloc: %r"); setmalloctag(v, getcallerpc(&n)); return v; } void * erealloc(void *p, ulong n) { void *v; v = realloc(p, n); if(v == nil) sysfatal("realloc: %r"); setmalloctag(v, getcallerpc(&p)); return v; } char* estrdup(char *s) { s = strdup(s); if(s == nil) sysfatal("strdup: %r"); setmalloctag(s, getcallerpc(&s)); return s; } void trimlog(Mq *q) { Msg *m; if(maxlog < 0) return; while(q->loghd != nil && q->logsz >= maxlog){ m = q->loghd; q->loghd = m->next; q->logsz -= m->count; if(decref(m) == 0) free(m); } if(q->loghd == nil) q->logtl = nil; } int subscribe(Mq *q, vlong ntail) { Msg *m; Rd *rd; vlong sz; int i; rd = nil; for(i = 0; i < q->nrd; i++){ if(q->rd[i].id == -1){ rd = &q->rd[i]; break; } } if(rd == nil){ q->rd = erealloc(q->rd, (++q->nrd)*sizeof(*q->rd)); rd = &q->rd[q->nrd - 1]; } rd->id = i; rd->wait = nil; rd->off = 0; rd->hd = nil; rd->tl = nil; sz = q->logsz; m = q->loghd; if(ntail != -1) for(; m != nil && sz > ntail; m = m->next) sz -= m->count; rd->hd = m; for(; m != nil; m = m->next) incref(m); rd->tl = m; return rd->id; } Mq* lookup(char *name) { int i; for(i = 0; i < nqueues; i++) if(strcmp(queues[i]->name, name) == 0) return queues[i]; return nil; } void qstat(Dir *d, Mq *q, Aux *a) { d->name = estrdup9p(q->name); d->uid = estrdup9p("glenda"); d->gid = estrdup9p("glenda"); d->muid = estrdup9p("glenda"); d->qid = q->qid; d->mtime = 0; d->atime = 0; d->mode = q->mode; d->length = q->logsz; if(a->ntail < d->length) d->length = a->ntail; } int rootgen(int i, Dir *d, void *a) { if(i >= nqueues) return -1; qstat(d, queues[i], a); return 0; } char* mqclone(Fid *old, Fid *new) { Aux *o, *n; o = old->aux; n = emalloc(sizeof(Aux)); if(o->q != nil){ n->q = o->q; n->id = subscribe(o->q, o->ntail); } n->ntail = o->ntail; new->aux = n; return nil; } char* mqwalk1(Fid *f, char *name, Qid *qid) { Mq *q; switch(QTYPE(f->qid.path)){ case Qroot: if(strcmp(name, "..") == 0){ *qid = f->qid; return nil; }else if((q = lookup(name)) != nil){ f->qid = q->qid; *qid = f->qid; return nil; } return Enoexist; default: if(strcmp(name, "..") == 0){ f->qid = (Qid){Qroot, 0, QTDIR}; *qid = f->qid; return nil; } return Enotdir; } } void mqstat(Req *r) { vlong p; p = r->fid->qid.path; r->d.uid = estrdup9p("glenda"); r->d.gid = estrdup9p("glenda"); r->d.muid = estrdup9p("glenda"); r->d.qid = r->fid->qid; r->d.mtime = 0; r->d.atime = 0; switch(QTYPE(p)){ case Qroot: r->d.mode = DMDIR|0755; break; default: r->d.mode = 0644; incref(queues[QIDX(p)]); qstat(&r->d, queues[QIDX(p)], r->fid->aux); decref(queues[QIDX(p)]); } respond(r, nil); } void mqflush(Req *r) { Req *w; Aux *a; if((a = r->oldreq->fid->aux) != nil){ w = a->q->rd[a->id].wait; if(w != nil) respond(w, "interrupted"); a->q->rd[a->id].wait = nil; } respond(r, nil); } void mqremove(Req *r) { vlong path; int i, o; path = r->fid->qid.path; if(QTYPE(path) == Qroot){ respond(r, Ebaduse); return; } o = 0; for(i = 0; i < nqueues; i++){ if(queues[i]->qid.path == path){ queues[i]->moribund = 1; continue; } queues[o++] = queues[i]; } nqueues--; respond(r, nil); } void mqwrite(Req *r) { Msg *m; Req *rr; Aux *a; Mq *q; int i; if((a = r->fid->aux) == nil){ respond(r, Ebaduse); return; } q = a->q; m = emalloc(sizeof(Msg) + r->ifcall.count); m->data = m->buf; m->count = r->ifcall.count; memmove(m->data, r->ifcall.data, m->count); m->next = nil; for(i = 0; i < q->nrd; i++){ if(q->rd[i].id == -1) continue; rr = q->rd[i].wait; q->rd[i].wait = nil; if(rr != nil){ rr->ofcall.data = m->data; rr->ofcall.count = m->count; if(rr->ifcall.count > m->count) rr->ofcall.count = m->count; respond(rr, nil); if(rr->ofcall.count == m->count) continue; m->count -= rr->ofcall.count; m->data += rr->ofcall.count; } if(q->rd[i].hd == nil) q->rd[i].hd = m; if(q->rd[i].tl != nil) q->rd[i].tl->next = m; q->rd[i].tl = m; incref(m); } if(q->loghd == nil) q->loghd = m; if(q->logtl != nil) q->logtl->next = m; incref(m); q->logtl = m; q->logsz += m->count; q->logtl = m; trimlog(q); q->qid.vers++; r->ofcall.count = r->ifcall.count; respond(r, nil); } void mqread(Req *r) { char *p, *e, *b, *d; int n, mcount; Aux *a; Msg *m; Rd *rd; Mq *q; if(QTYPE(r->fid->qid.path) == Qroot){ dirread9p(r, rootgen, r->fid->aux); respond(r, nil); return; } if((a = r->fid->aux) == nil){ respond(r, Ebaduse); return; } q = a->q; /* no messages: enqueue until next one comes */ if(q->rd[a->id].hd == nil){ q->rd[a->id].wait = r; return; } /* queued messages: pop data off */ rd = &q->rd[a->id]; d = emalloc(r->ifcall.count); p = d; e = d + r->ifcall.count; r->ofcall.data = p; while(rd->hd != nil && p != e){ m = rd->hd; b = m->data + rd->off; n = e - p; mcount = m->count; assert(rd->off >= 0 && rd->off <= m->count); incref(m); if(n >= m->count - rd->off){ n = m->count - rd->off; memcpy(p, b, n); rd->hd = m->next; rd->off = 0; if(rd->hd == nil) rd->tl = nil; decref(m); }else{ memcpy(p, b, n); rd->off += n; } p += n; if(decref(m) == 0) free(m); if(!coalesce || n < mcount) break; } assert(r->ofcall.count <= r->ifcall.count); r->ofcall.count = p - d; respond(r, nil); free(d); } void mqcreate(Req *r) { Aux *a; Mq *q; int m; if(lookup(r->ifcall.name) != nil){ respond(r, Eexist); return; } m = r->ifcall.mode & OMASK; q = emalloc(sizeof(Mq)); q->name = estrdup(r->ifcall.name); q->mode = r->ifcall.mode; q->qid.path = QPATH(queueid, Qmq); q->qid.vers = 0; q->qid.type = QTFILE; queueid++; a = r->fid->aux; assert(a->q == nil); a->q = q; if(m == OREAD || m == ORDWR || m == OEXEC) a->id = subscribe(q, a->ntail); r->ofcall.qid = q->qid; r->fid->qid = q->qid; r->fid->aux = a; queues = erealloc(queues, ++nqueues*sizeof(Mq*)); queues[nqueues-1] = q; respond(r, nil); } void mqopen(Req *r) { Aux *a; vlong p; int m; a = r->fid->aux; if(a->q != nil){ respond(r, Einuse); return; } m = r->ifcall.mode & OMASK; p = r->fid->qid.path; if(QTYPE(p) != Qroot){ incref(queues[QIDX(p)]); a->q = queues[QIDX(p)]; if(m == OREAD || m == ORDWR || m == OMASK) a->id = subscribe(a->q, a->ntail); } r->ofcall.qid = r->fid->qid; respond(r, nil); } void destroyfid(Fid *f) { Aux *a; int m; a = f->aux; m = f->omode & OMASK; if(m != OREAD && m != ORDWR && m != OEXEC) return; if(a != nil && a->q != nil) a->q->rd[a->id].id = -1; free(a); } void mqattach(Req *r) { Aux *a; char *n, *e; n = r->ifcall.aname; a = emalloc(sizeof(Aux)); r->ofcall.qid = (Qid){Qroot, 0, QTDIR}; r->fid->qid = r->ofcall.qid; r->fid->aux = a; a->ntail = -1; if(n != nil && strncmp(n, "tail:", 5) == 0){ a->ntail = strtol(n+5, &e, 0); while(*e){ switch(*e++){ case 'g': a->ntail *= 1024*1024*1024; case 'm': a->ntail *= 1024*1024; case 'k': a->ntail *= 1024; default: respond(r, "bad scale"); } } } respond(r, nil); } Srv mq = { .attach=mqattach, .open=mqopen, .create=mqcreate, .read=mqread, .write=mqwrite, .remove=mqremove, .flush=mqflush, .stat=mqstat, .walk1=mqwalk1, .clone=mqclone, .destroyfid=destroyfid, }; void usage(void) { fprint(2, "usage: %s [-s srv] [-m mtpt]\n", argv0); exits("usage"); } void main(int argc, char **argv) { char *srvname, *mntpt, *s, *e; srvname = "mq"; mntpt = "/mnt/mq"; ARGBEGIN{ case 'd': chatty9p++; break; case 's': srvname=EARGF(usage()); break; case 'm': mntpt = EARGF(usage()); break; case 'l': s = EARGF(usage()); maxlog = strtoll(s, &e, 0); while(*e){ switch(*e++){ case 'k': maxlog *= KiB; break; case 'K': maxlog *= KiB; break; case 'm': maxlog *= MiB; break; case 'M': maxlog *= MiB; break; case 'g': maxlog *= GiB; break; case 'G': maxlog *= GiB; break; default: sysfatal("unknown suffix %c", *e); } } break; case 'c': coalesce = 1; break; default: usage(); }ARGEND; postmountsrv(&mq, srvname, mntpt, MCREATE|MREPL); exits(nil); }