ref: 3866717cbb020199d58171c1c0cdd7382a74ee82
dir: /emu/port/devpipe.c/
#include "dat.h"
#include "fns.h"
#include "error.h"
#include <interp.h>
#define NETTYPE(x) ((ulong)(x)&0x1f)
#define NETID(x) (((ulong)(x))>>5)
#define NETQID(i,t) (((i)<<5)|(t))
typedef struct Pipe Pipe;
struct Pipe
{
QLock l;
Pipe* next;
int ref;
ulong path;
Queue* q[2];
int qref[2];
Dirtab* pipedir;
char* user;
};
static struct
{
Lock l;
ulong path;
int pipeqsize;
} pipealloc;
enum
{
Qdir,
Qdata0,
Qdata1
};
Dirtab pipedir[] =
{
".", {Qdir,0,QTDIR}, 0, DMDIR|0500,
"data", {Qdata0}, 0, 0660,
"data1", {Qdata1}, 0, 0660,
};
static void
freepipe(Pipe *p)
{
if(p != nil){
free(p->user);
free(p->q[0]);
free(p->q[1]);
free(p->pipedir);
free(p);
}
}
static void
pipeinit(void)
{
pipealloc.pipeqsize = 32*1024;
}
/*
* create a pipe, no streams are created until an open
*/
static Chan*
pipeattach(char *spec)
{
Pipe *p;
Chan *c;
c = devattach('|', spec);
p = malloc(sizeof(Pipe));
if(p == 0)
error(Enomem);
if(waserror()){
freepipe(p);
nexterror();
}
p->pipedir = malloc(sizeof(pipedir));
if (p->pipedir == 0)
error(Enomem);
memmove(p->pipedir, pipedir, sizeof(pipedir));
kstrdup(&p->user, up->env->user);
p->ref = 1;
p->q[0] = qopen(pipealloc.pipeqsize, 0, 0, 0);
if(p->q[0] == 0)
error(Enomem);
p->q[1] = qopen(pipealloc.pipeqsize, 0, 0, 0);
if(p->q[1] == 0)
error(Enomem);
poperror();
lock(&pipealloc.l);
p->path = ++pipealloc.path;
unlock(&pipealloc.l);
c->qid.path = NETQID(2*p->path, Qdir);
c->qid.vers = 0;
c->qid.type = QTDIR;
c->aux = p;
c->dev = 0;
return c;
}
static int
pipegen(Chan *c, char *name, Dirtab *tab, int ntab, int i, Dir *dp)
{
int id, len;
Qid qid;
Pipe *p;
USED(name);
if(i == DEVDOTDOT){
devdir(c, c->qid, "#|", 0, eve, 0555, dp);
return 1;
}
i++; /* skip . */
if(tab==0 || i>=ntab)
return -1;
tab += i;
p = c->aux;
switch(NETTYPE(tab->qid.path)){
case Qdata0:
len = qlen(p->q[0]);
break;
case Qdata1:
len = qlen(p->q[1]);
break;
default:
len = tab->length;
break;
}
id = NETID(c->qid.path);
qid.path = NETQID(id, tab->qid.path);
qid.vers = 0;
qid.type = QTFILE;
devdir(c, qid, tab->name, len, eve, tab->perm, dp);
return 1;
}
static Walkqid*
pipewalk(Chan *c, Chan *nc, char **name, int nname)
{
Walkqid *wq;
Pipe *p;
p = c->aux;
wq = devwalk(c, nc, name, nname, p->pipedir, nelem(pipedir), pipegen);
if(wq != nil && wq->clone != nil && wq->clone != c){
qlock(&p->l);
p->ref++;
if(c->flag & COPEN){
switch(NETTYPE(c->qid.path)){
case Qdata0:
p->qref[0]++;
break;
case Qdata1:
p->qref[1]++;
break;
}
}
qunlock(&p->l);
}
return wq;
}
static int
pipestat(Chan *c, uchar *db, int n)
{
Pipe *p;
Dir dir;
Dirtab *tab;
p = c->aux;
tab = p->pipedir;
switch(NETTYPE(c->qid.path)){
case Qdir:
devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
break;
case Qdata0:
devdir(c, c->qid, tab[1].name, qlen(p->q[0]), eve, tab[1].perm, &dir);
break;
case Qdata1:
devdir(c, c->qid, tab[2].name, qlen(p->q[1]), eve, tab[2].perm, &dir);
break;
default:
panic("pipestat");
}
n = convD2M(&dir, db, n);
if(n < BIT16SZ)
error(Eshortstat);
return n;
}
/*
* if the stream doesn't exist, create it
*/
static Chan*
pipeopen(Chan *c, int omode)
{
Pipe *p;
if(c->qid.type & QTDIR){
if(omode != OREAD)
error(Ebadarg);
c->mode = omode;
c->flag |= COPEN;
c->offset = 0;
return c;
}
openmode(omode); /* check it */
p = c->aux;
qlock(&p->l);
if(waserror()){
qunlock(&p->l);
nexterror();
}
switch(NETTYPE(c->qid.path)){
case Qdata0:
devpermcheck(p->user, p->pipedir[1].perm, omode);
p->qref[0]++;
break;
case Qdata1:
devpermcheck(p->user, p->pipedir[2].perm, omode);
p->qref[1]++;
break;
}
poperror();
qunlock(&p->l);
c->mode = openmode(omode);
c->flag |= COPEN;
c->offset = 0;
c->iounit = qiomaxatomic;
return c;
}
static void
pipeclose(Chan *c)
{
Pipe *p;
p = c->aux;
qlock(&p->l);
if(c->flag & COPEN){
/*
* closing either side hangs up the stream
*/
switch(NETTYPE(c->qid.path)){
case Qdata0:
p->qref[0]--;
if(p->qref[0] == 0){
qhangup(p->q[1], 0);
qclose(p->q[0]);
}
break;
case Qdata1:
p->qref[1]--;
if(p->qref[1] == 0){
qhangup(p->q[0], 0);
qclose(p->q[1]);
}
break;
}
}
/*
* if both sides are closed, they are reusable
*/
if(p->qref[0] == 0 && p->qref[1] == 0){
qreopen(p->q[0]);
qreopen(p->q[1]);
}
/*
* free the structure on last close
*/
p->ref--;
if(p->ref == 0){
qunlock(&p->l);
freepipe(p);
} else
qunlock(&p->l);
}
static long
piperead(Chan *c, void *va, long n, vlong junk)
{
Pipe *p;
p = c->aux;
USED(junk);
switch(NETTYPE(c->qid.path)){
case Qdir:
return devdirread(c, va, n, p->pipedir, nelem(pipedir), pipegen);
case Qdata0:
return qread(p->q[0], va, n);
case Qdata1:
return qread(p->q[1], va, n);
default:
panic("piperead");
}
return -1; /* not reached */
}
static Block*
pipebread(Chan *c, long n, ulong offset)
{
Pipe *p;
p = c->aux;
switch(NETTYPE(c->qid.path)){
case Qdata0:
return qbread(p->q[0], n);
case Qdata1:
return qbread(p->q[1], n);
}
return devbread(c, n, offset);
}
/*
* a write to a closed pipe causes an exception to be sent to
* the prog.
*/
static long
pipewrite(Chan *c, void *va, long n, vlong junk)
{
Pipe *p;
Prog *r;
USED(junk);
if(waserror()) {
/* avoid exceptions when pipe is a mounted queue */
if((c->flag & CMSG) == 0) {
r = up->iprog;
if(r != nil && r->kill == nil)
r->kill = "write on closed pipe";
}
nexterror();
}
p = c->aux;
switch(NETTYPE(c->qid.path)){
case Qdata0:
n = qwrite(p->q[1], va, n);
break;
case Qdata1:
n = qwrite(p->q[0], va, n);
break;
default:
panic("pipewrite");
}
poperror();
return n;
}
static long
pipebwrite(Chan *c, Block *bp, ulong junk)
{
long n;
Pipe *p;
Prog *r;
USED(junk);
if(waserror()) {
/* avoid exceptions when pipe is a mounted queue */
if((c->flag & CMSG) == 0) {
r = up->iprog;
if(r != nil && r->kill == nil)
r->kill = "write on closed pipe";
}
nexterror();
}
p = c->aux;
switch(NETTYPE(c->qid.path)){
case Qdata0:
n = qbwrite(p->q[1], bp);
break;
case Qdata1:
n = qbwrite(p->q[0], bp);
break;
default:
n = 0;
panic("pipebwrite");
}
poperror();
return n;
}
static int
pipewstat(Chan *c, uchar *dp, int n)
{
Dir *d;
Pipe *p;
int d1;
if (c->qid.type&QTDIR)
error(Eperm);
p = c->aux;
if(strcmp(up->env->user, p->user) != 0)
error(Eperm);
d = smalloc(sizeof(*d)+n);
if(waserror()){
free(d);
nexterror();
}
n = convM2D(dp, n, d, (char*)&d[1]);
if(n == 0)
error(Eshortstat);
d1 = NETTYPE(c->qid.path) == Qdata1;
if(!emptystr(d->name)){
validwstatname(d->name);
if(strlen(d->name) >= KNAMELEN)
error(Efilename);
if(strcmp(p->pipedir[1+!d1].name, d->name) == 0)
error(Eexist);
kstrcpy(p->pipedir[1+d1].name, d->name, KNAMELEN);
}
if(d->mode != ~0UL)
p->pipedir[d1 + 1].perm = d->mode & 0777;
poperror();
free(d);
return n;
}
Dev pipedevtab = {
'|',
"pipe",
pipeinit,
pipeattach,
pipewalk,
pipestat,
pipeopen,
devcreate,
pipeclose,
piperead,
pipebread,
pipewrite,
pipebwrite,
devremove,
pipewstat,
};