shithub: sce

Download patch

ref: 2852fc75b203598e781db315ea6b2a7735d2db4a
parent: 3d91225b38e4e1a87b60e065c2f02c16ea0a8c1e
author: qwx <qwx@sciops.net>
date: Sat Mar 6 21:03:58 EST 2021

basic network protocol and communication

- com.c: protocol/communication code
- net.c now only deals with networking
- make pause a request to the local server
- fix not initializing buffer in kproc
- give sim proc higher stack size, not main

client communicates to the local server via a pipe,
but transparently wrt the server.  network data is
read in separate procs and csp is used for
communication and syncing.  each connection has its
own buffer which is blocked until parsed.

--- /dev/null
+++ b/com.c
@@ -1,0 +1,168 @@
+#include <u.h>
+#include <libc.h>
+#include <draw.h>
+#include <fcall.h>
+#include "dat.h"
+#include "fns.h"
+
+enum{
+	Hdrsz = 4,
+};
+typedef struct Header Header;
+struct Header{
+	int empty;
+};
+
+static int
+vpack(uchar *p, uchar *e, char *fmt, va_list a)
+{
+	int n, sz;
+	uchar u[8];
+	u32int v;
+	u64int w;
+
+	sz = 0;
+	for(;;){
+		n = 0;
+		switch(*fmt++){
+		default: sysfatal("unknown format %c", fmt[-1]);
+		copy: if(p + n > e) sysfatal("vpack: buffer overflow");
+			memcpy(p, u, n); p += n; break;
+		case 0: return sz;
+		case 'h': v = va_arg(a, int); PBIT8(u, v); n = sizeof(u8int); goto copy;
+		case 's': v = va_arg(a, int); PBIT16(u, v); n = sizeof(u16int); goto copy;
+		case 'l': v = va_arg(a, int); PBIT32(u, v); n = sizeof(u32int); goto copy;
+		case 'v': w = va_arg(a, vlong); PBIT64(u, w); n = sizeof(u64int); goto copy;
+		}
+		sz += n;
+	}
+}
+
+static int
+vunpack(uchar *p, uchar *e, char *fmt, va_list a)
+{
+	int n, sz;
+
+	sz = 0;
+	for(;;){
+		switch(*fmt++){
+		default: sysfatal("vunpack: unknown format %c", fmt[-1]);
+		error: werrstr("vunpack: truncated message"); return -1;
+		case 0: return sz;
+		case 'h': n = sizeof(u8int); if(p + n > e) goto error;
+			*va_arg(a, int*) = GBIT8(p); p += n;
+			break;
+		case 's': n = sizeof(u16int); if(p + n > e) goto error;
+			*va_arg(a, int*) = GBIT16(p); p += n;
+			break;
+		case 'l': n = sizeof(u32int); if(p + n > e) goto error;
+			*va_arg(a, int*) = GBIT32(p); p += n;
+			break;
+		case 'v': n = sizeof(u64int); if(p + n > e) goto error;
+			*va_arg(a, vlong*) = GBIT64(p); p += n;
+			break;
+		}
+		sz += n;
+	}
+}
+
+static int
+pack(uchar *p, uchar *e, char *fmt, ...)
+{
+	int n;
+	va_list a;
+
+	va_start(a, fmt);
+	n = vpack(p, e, fmt, a);
+	va_end(a);
+	return n;
+}
+
+static int
+unpack(uchar *p, uchar *e, char *fmt, ...)
+{
+	int n;
+	va_list a;
+
+	va_start(a, fmt);
+	n = vunpack(p, e, fmt, a);
+	va_end(a);
+	return n;
+}
+
+static int
+reqpause(uchar *p, uchar *e)
+{
+	int dicks;
+
+	if(unpack(p, e, "l", &dicks) < 0){
+		fprint(2, "reqpause: %r\n");
+		return -1;
+	}
+	pause ^= 1;
+	return 0;
+}
+
+static int
+readhdr(Msg *m, Header *h)
+{
+	USED(h);
+	if(m->sz <= Hdrsz)
+		return -1;
+	return 0;
+}
+
+int
+parsemsg(Msg *m)
+{
+	int n, type;
+	uchar *p, *e;
+	Header h;
+
+	if(readhdr(m, &h) < 0)
+		return -1;
+	p = m->buf + Hdrsz;
+	e = p + sizeof(m->buf) - Hdrsz;
+	while(p < e){
+		type = *p++;
+		switch(type){
+		case Tpause:
+			if((n = reqpause(p, e)) < 0){
+				dprint("parse: invalid Tpause: %r\n");
+				return -1;
+			}
+			break;
+		default:
+			dprint("parse: invalid message type %ux\n", type);
+			return -1;
+		}
+		p += n;
+	}
+	return 0;
+}
+
+static void
+newmsg(Msg *m)
+{
+	Header h;
+
+	USED(h);
+	m->sz += Hdrsz;
+}
+
+int
+sendpause(void)
+{
+	int n;
+	Msg *m;
+
+	m = getclbuf();
+	if(m->sz == 0)
+		newmsg(m);
+	if((n = pack(m->buf + m->sz, m->buf + sizeof m->buf, "hl", Tpause, 0)) < 0){
+		fprint(2, "sendpause: %r\n");
+		return -1;
+	}
+	m->sz += n;
+	return 0;
+}
--- a/dat.h
+++ b/dat.h
@@ -11,6 +11,8 @@
 typedef struct Map Map;
 typedef struct Resource Resource;
 typedef struct Team Team;
+typedef struct Cbuf Cbuf;
+typedef struct Msg Msg;
 
 enum{
 	Nresource = 3,
@@ -194,6 +196,17 @@
 
 enum{
 	Tquit,
+	Tpause,
+
+	Nbuf = 4096,
+};
+struct Cbuf{
+	uchar buf[Nbuf];
+	int sz;
+};
+struct Msg{
+	Team *t;
+	Cbuf;
 };
 
 enum{
--- a/fns.h
+++ b/fns.h
@@ -1,3 +1,8 @@
+void	clearmsg(Msg*);
+Msg*	readnet(void);
+void	initnet(char*);
+int	parsemsg(Msg*);
+int	sendpause(void);
 void	stepsnd(void);
 void	initsnd(void);
 void	linktomap(Mobj*);
@@ -7,9 +12,7 @@
 void	initsv(int, char*);
 void	flushcl(void);
 void	packcl(char*, ...);
-void	stepnet(void);
-void	joinnet(char*);
-void	listennet(void);
+Msg*	getclbuf(void);
 void	dopan(Point);
 void	select(Point);
 void	move(Point);
--- a/fs.c
+++ b/fs.c
@@ -111,7 +111,7 @@
 			continue;
 		}
 		if(pic0.h % Nteam != 0)
-			sysfatal("loadobjpic: obj %s sprite sheet %d,%d: height not multiple of %d\n",
+			sysfatal("loadobjpic: obj %s sprite sheet %d,%d: height not multiple of %d",
 				pl->name, pic0.w, pic0.h, Nteam);
 		pic0.h /= Nteam;
 		n = pic0.w * pic0.h;
@@ -136,7 +136,7 @@
 		snprint(path, sizeof path, "%s.bit", tileset);
 		loadpic(path, &tilesetpic, 0);
 		if(tilesetpic.h % tilesetpic.w != 0)
-			sysfatal("loadterpic: tiles not squares: tilepic %d,%d\n",
+			sysfatal("loadterpic: tiles not squares: tilepic %d,%d",
 				tilesetpic.w, tilesetpic.h);
 	}
 	id = pl->frm;
--- a/mkfile
+++ b/mkfile
@@ -3,6 +3,7 @@
 TARG=sce
 OFILES=\
 	bmap.$O\
+	com.$O\
 	drw.$O\
 	fs.$O\
 	map.$O\
--- a/net.c
+++ b/net.c
@@ -10,153 +10,119 @@
 
 typedef struct Con Con;
 enum{
-	Ncon = Nteam * 4,
-	Nbuf = 4096,
+	Ncon = Nteam * 2,
 };
 struct Con{
 	int fd;
-	Team *t;
+	Msg;
+	Channel *waitc;
 };
 static Con con[Ncon];
-static Channel *lc;
-static uchar cbuf[Nbuf], *cbufp = cbuf;
+static Msg clbuf, sendbuf;
+Channel *conc;
 
+static int clpfd[2];
+
 static void
 closenet(Con *c)
 {
 	close(c->fd);
 	c->fd = -1;
+	chanfree(c->waitc);
+	c->waitc = nil;
 }
 
 static void
-flushcmd(Con *c)
+flushreq(Con *c, Cbuf *cb)
 {
-	int n;
-
-	if((n = cbufp - cbuf) == 0)
+	if(cb->sz == 0)
 		return;
-	if(write(c->fd, cbuf, n) != n){
+	if(write(c->fd, cb->buf, cb->sz) != cb->sz){
 		fprint(2, "flushcmd: %r\n");
-		closenet(c);
+		close(c->fd);
 	}
-	cbufp = cbuf;
 }
 
 static void
-writecmd(Con *c, char *fmt, va_list a)
+writenet(void)
 {
-	union{
-		uchar u[4];
-		s32int l;
-	} u;
+	Con *c;
 
-	for(;;){
-		if(cbufp - cbuf > sizeof(cbuf) - 4)
-			flushcmd(c);
-		switch(*fmt++){
-		default: sysfatal("unknown format %c", fmt[-1]);
-		case 0: return;
-		case 'u':
-			u.l = va_arg(a, s32int);
-			memcpy(cbufp, u.u, sizeof u.u);
-			cbufp += sizeof u.u;
-		}
+	for(c=con; c<con+nelem(con); c++){
+		if(c->fd < 0)
+			continue;
+		flushreq(c, &sendbuf);
 	}
+	sendbuf.sz = 0;
 }
 
-static void
-packcmd(Con *c, char *fmt, ...)
-{
-	va_list a;
-
-	va_start(a, fmt);
-	writecmd(c, fmt, a);
-	va_end(a);
-}
-
 void
 flushcl(void)
 {
-	flushcmd(con);
+	if(clbuf.sz == 0)
+		return;
+	write(clpfd[0], clbuf.buf, clbuf.sz);
+	clbuf.sz = 0;
 }
 
 void
-packcl(char *fmt, ...)
+clearmsg(Msg *m)
 {
-	va_list a;
+	Con *c;
 
-	va_start(a, fmt);
-	packcmd(con, fmt, a);
-	va_end(a);
+	for(c=con; c<con+sizeof con; c++)
+		if(m == &c->Msg)
+			break;
+	assert(c < con + sizeof con);
+	m->sz = 0;
+	nbsendul(c->waitc, 0);
 }
 
-static void
-writenet(void)
+Msg *
+readnet(void)
 {
 	Con *c;
 
-	for(c=con; c<con+nelem(con); c++){
-		if(c->fd < 0)
-			continue;
-	}
+	if((c = nbrecvp(conc)) == nil)
+		return nil;
+	return &c->Msg;
 }
 
-static void
-execbuf(Con *c, uchar *p, uchar *e)
+Msg *
+getclbuf(void)
 {
-	int n;
-
-	while(p < e){
-		n = *p++;
-		switch(n){
-		case Tquit: closenet(c); return;
-		}
-	}
+	return &clbuf;
 }
 
 static void
-readnet(void)
+conproc(void *cp)
 {
-	int n, m;
+	int n;
 	Con *c;
-	uchar buf[Nbuf], *p;
 
-	for(c=con; c<con+nelem(con); c++){
-		if(c->fd < 0)
-			continue;
-		for(m=sizeof buf, p=buf; (n = flen(c->fd)) > 1 && n <= m; m-=n, p+=n){
-			if(read(c->fd, p, n) != n){
-				fprint(2, "readnet: %r\n");
-				closenet(c);
-				break;
-			}
+	c = cp;
+	for(;;){
+		if((n = read(c->fd, c->buf, sizeof c->buf)) <= 0){
+			fprint(2, "cproc %zd: %r\n", c - con);
+			closenet(c);
+			return;
 		}
-		execbuf(c, buf, p);
+		c->sz = n;
+		sendp(conc, c);
+		recvul(c->waitc);
+		c->sz = 0;
 	}
 }
 
-void
-stepnet(void)
+static void
+initcon(Con *c)
 {
-	int fd;
-
-	if(nbrecv(lc, &fd) > 0){
-		/* FIXME: add to observers (team 0) */
-	}
-	readnet();
-	writenet();
+	if((c->waitc = chancreate(sizeof(ulong), 0)) == nil)
+		sysfatal("chancreate: %r");
+	if(proccreate(conproc, c, 8192) < 0)
+		sysfatal("proccreate: %r");
 }
 
-void
-joinnet(char *sys)
-{
-	char s[128];
-
-	snprint(s, sizeof s, "%s/tcp!%s!%d", netmtpt, sys != nil ? sys : sysname(), lport);
-	if((con[0].fd = dial(s, nil, nil, nil)) < 0)
-		sysfatal("dial: %r");
-}
-
 static int
 regnet(int lfd, char *ldir)
 {
@@ -165,16 +131,19 @@
 	for(c=con; c<con+nelem(con); c++)
 		if(c->fd < 0)
 			break;
-	if(c == con + nelem(con))
-		return reject(lfd, ldir, "no more open slots");
+	if(c == con + nelem(con)){
+		reject(lfd, ldir, "no more open slots");
+		return -1;
+	}
 	c->fd = accept(lfd, ldir);
-	return c->fd;
+	initcon(c);
+	return 0;
 }
 
 static void
-lproc(void *)
+listenproc(void *)
 {
-	int fd, lfd;
+	int lfd;
 	char adir[40], ldir[40], data[100];
 
 	snprint(data, sizeof data, "%s/tcp!*!%d", netmtpt, lport);
@@ -182,22 +151,46 @@
 		sysfatal("announce: %r");
 	for(;;){
 		if((lfd = listen(adir, ldir)) < 0
-			|| (fd = regnet(lfd, ldir)) < 0
-			|| close(lfd) < 0
-			|| send(lc, &fd) < 0)
+			|| regnet(lfd, ldir) < 0
+			|| close(lfd) < 0)
 			break;
 	}
 }
 
-void
+static void
 listennet(void)
 {
+	if(proccreate(listenproc, nil, 8192) < 0)
+		sysfatal("proccreate: %r");
+}
+
+static void
+joinnet(char *sys)
+{
+	char s[128];
 	Con *c;
 
+	snprint(s, sizeof s, "%s/tcp!%s!%d", netmtpt, sys != nil ? sys : sysname(), lport);
+	c = &con[1];
+	if((c->fd = dial(s, nil, nil, nil)) < 0)
+		sysfatal("dial: %r");
+	initcon(c);
+}
+
+void
+initnet(char *sys)
+{
+	Con *c;
+
 	for(c=con; c<con+nelem(con); c++)
 		c->fd = -1;
-	if((lc = chancreate(sizeof(int), 0)) == nil)
+	if((conc = chancreate(sizeof(uintptr), 1)) == nil)
 		sysfatal("chancreate: %r");
-	if(proccreate(lproc, nil, 8192) < 0)
-		sysfatal("proccreate: %r");
+	if(pipe(clpfd) < 0)
+		sysfatal("pipe: %r");
+	con[0].fd = clpfd[1];
+	initcon(&con[0]);
+	USED(sys);
+	//joinnet(sys);
+	//listennet();
 }
--- a/sce.c
+++ b/sce.c
@@ -8,14 +8,12 @@
 #include "dat.h"
 #include "fns.h"
 
-mainstacksize = 16*1024;
-
 enum{
 	Hz = 60,
 };
 
 char *progname = "sce", *dbname, *prefix, *mapname = "map1.db";
-int pause, debugmap;
+int debugmap;
 QLock drawlock;
 
 typedef struct Kev Kev;
@@ -80,6 +78,7 @@
 	if((fd = open("/dev/kbd", OREAD)) < 0)
 		sysfatal("kproc: %r");
 	memset(buf, 0, sizeof buf);
+	memset(down, 0, sizeof down);
 	for(;;){
 		if(buf[0] != 0){
 			n = strlen(buf)+1;
@@ -225,6 +224,7 @@
 			if(me.b & 4)
 				move(me);
 			qunlock(&drawlock);
+			flushcl();
 			break;
 		case Akbd:
 			if(ke.r == Kdel)
@@ -232,9 +232,10 @@
 			if(!ke.down)
 				continue;
 			switch(ke.r){
-			case KF|1: debugmap ^= 1; pause ^= 1; break;
-			case ' ': pause ^= 1; break;
+			case KF|1: debugmap ^= 1; break;
+			case ' ': sendpause(); break;
 			}
+			flushcl();
 			break;
 		case Atic:
 			updatefb();
--- a/sv.c
+++ b/sv.c
@@ -8,6 +8,7 @@
 extern QLock drawlock;
 
 vlong tc;
+int pause;
 
 static int tdiv;
 
@@ -14,7 +15,13 @@
 static void
 step(vlong tics)
 {
+	Msg *m;
+
 	qlock(&drawlock);
+	while((m = readnet()) != nil){
+		parsemsg(m);
+		clearmsg(m);
+	}
 	while(!pause && tics-- > 0)
 		stepsim();
 	qunlock(&drawlock);
@@ -25,7 +32,7 @@
 {
 	vlong t, t0, dt, Δtc;
 
-	USED(sys);
+	initnet(sys);
 	initsim();
 	Δtc = 1;
 	t0 = nsec();
@@ -47,6 +54,6 @@
 initsv(int tv, char *sys)
 {
 	tdiv = Te9 / (tv * 3);
-	if(proccreate(simproc, sys, 8192) < 0)
+	if(proccreate(simproc, sys, 16*1024) < 0)
 		sysfatal("proccreate: %r");
 }