shithub: neoventi

ref: 4b60d3dd32efd00a1d07cb08662926ba9836719f
dir: /server.c/

View raw version
#include <u.h>
#include <libc.h>
#include <bio.h>
#include <thread.h>
#include <libsec.h>
#include "neoventi.h"

static void vtsend(VtConn conn, char *buf, u16int size, u8int tag, int drop);

// Handles an error on `conn` handling client request `tbuf`
// Only the tag must be preserved in the buffer
static void
vterr(VtConn conn, char *tbuf, char *msg, ...)
{
	u16int len;
	va_list args;
	va_start(args, msg);
	msg = vsmprint(msg, args);
	werrstr(msg);
	free(msg);
	va_end(args);
	if(tbuf != nil){
		len = snprint(tbuf+6, 0x10000, "neoventi: %r");
		U16PUT(tbuf+4, len);
		vtsend(conn, tbuf, len+4, VtRerror, 1);
	}
	longjmp(conn.bounce, 1);
}

static void
vthangup(VtConn conn)
{
	longjmp(conn.bounce, 2);
}

// Convenience function: reads a venti packet from conn into buf
static void
vtrecv(VtConn conn, char *buf)
{
	switch(read(conn.fd, buf, MaxPacketSize)){
	case 0:
		vthangup(conn);
	case 1:
		vterr(conn, nil, "received a single byte for message size");
	default:
		return;
	}
}

static void
vtversion(VtConn conn, char *buf)
{
	
	long n, i;
	// Response is one line of unknown size; discard bytes until EOL
	if(fprint(conn.fd, "venti-02-neoventi\n") == 18){
		n = read(conn.fd, buf, MaxPacketSize);
		for(i = 0; i < n; i += 1){
			if(buf[i] == '\n'){
				if(i+1 == n)
					return;
				break;
			}
		}
	}
	// If the handshake fails, make it clear there's a problem and give up
	fprint(conn.fd, "FUCK OFF\n");
	vterr(conn, nil, "handshake failed");
}

static void
vtsend(VtConn conn, char *buf, u16int size, u8int type, int drop)
{
	U16PUT(buf, size);
	buf[2] = type;
	// +2 because we need to send the u16 size as well!
	if(write(conn.fd, buf, size+2) != size+2){
		if(drop)
			fprint(2, "failed to submit error packet: %r\n");
		else
			vterr(conn, buf, "failed to write packet: %r");
	}
}

static void
vtread(VtConn conn, char *buf)
{
	VtAddress addr;
	if(!vtreadlookup((u8int*)buf + 4, &addr))
		vterr(conn, buf, "read error: %r");
	if(!readclump((uchar*)buf+4, addr))
		vterr(conn, buf, "clump read failed: %");
	vtsend(conn, buf, addr.size+2, VtRread, 0);
}

static void
vtwrite(VtConn conn, char *buf, u16int len)
{
	u8int score[20];
	uchar buf2[MaxPacketSize];
	VtAddress addr;
	sha1((uchar*)buf+8, len, score, nil);
	if(vtreadlookup(score, &addr)){
		if(addr.size != len)
			vterr(conn, buf, "hash collision detected");
		if(!readclump(buf2, addr))
			vterr(conn, buf, "clump read failed: %r");
		if(memcmp(buf2, buf+8, len) != 0)
			vterr(conn, buf, "hash collision detected");
	} else {
		vterr(conn, buf, "TODO: insert data");
	}
	memcpy(buf+4, score, 20);
	vtsend(conn, buf, 22, VtRwrite, 0);
}

static int
vtconnhandle(VtConn conn, char *buf, u16int len)
{
	switch(buf[2]){
	case VtTread:
		vtread(conn, buf);
		return 1;
	case VtTwrite:
		vtwrite(conn, buf, len-6);
		return 1;
	case VtTgoodbye:
		vthangup(conn);
	case VtTsync:
		vterr(conn, buf, "TODO: sync not supported yet");
	default:
		vterr(conn, buf, "TODO unimplemented request type %d", buf[2]);
	}
	return 0;
}

static void
vthello(VtConn conn, char *buf)
{
	vtversion(conn, buf);
	vtrecv(conn, buf);
	if(buf[2] != VtThello)
		vterr(conn, buf, "received message before hello: %d", buf[2]);
	if(buf[4] != 0 || buf[5] != 2 || buf[6] != '0' || buf[7] != '2')
		vterr(conn, buf, "unsupported protocol version requested in Thello: %d %d %d %d", buf[4], buf[5], buf[6], buf[7]);
	buf[6] = 'n';
	buf[7] = 'o';
	vtsend(conn, buf, 8, VtRhello, 0);
}

static void
vtloop(VtConn conn, char *packetbuf)
{
	long n;
	char buf[0x10000];
	u16int offset = 0, sz = 0;
	while(1){
		//TODO: treat conn.buf as ring buffer? Avoids moving packet around.
		// Maintain 64K of packet data in conn.buf.
		// Process all full packets in the buffer.
		// If the final packet is incomplete, move it to the beginning of the
		// buffer, and read more into the buffer _after_ it.
		// Then, repeat.
		n = read(conn.fd, buf+sz, 0x10000-sz);
		sz += n;
		if(sz == 0)
			// No data, and none will be coming.
			vthangup(conn);
		if(sz == 1)
			vterr(conn, buf, "incomplete packet!");
		while(offset+2 < sz){
			// As long as there's a complete packet, process it.
			u16int len = U16GET(buf + offset);
			if(2 + offset + len > sz){
				// missing part of packet!
				break;
			}
		// Extract packet into packet buffer - this is needed because
		// we modify the packet buffer in-place for response, and we need to
		// have room to write a response without overwriting follow-up packets.
		// TODO: should we drop the idea of modifying in place if we need to copy
		// anyways? Don't want allocation churn, but we can just move packetbuf
		// into VtConn and pass both the source and destination buffer around a lot.
		// Not important right now.
			memcpy(packetbuf, buf+offset, 2+len);
			vtconnhandle(conn, packetbuf, len);
			offset += 2+len;
		}
		if(offset == sz){
			sz = 0;
			offset = 0;
			continue;
		}
	}
}

static void
handleproc(void *fd)
{
	char buf[MaxPacketSize];
	VtConn conn;
	conn.fd = (int)(usize)fd;
	switch(setjmp(conn.bounce)){
	case 0:
		vthello(conn, buf);
		vtloop(conn, buf);
		break;
	case 1:
		fprint(2, "abandoning client: %r\n");
		break;
	case 2:
		// Deliberate disconnect, don't log.
		break;
	default:
		sysfatal("internal error: unexpected bounce code");
	}
	close(conn.fd);
}

static void
handle(int ctl, char *dir)
{
	int fd = accept(ctl, dir);
	if(fd < 0){
		fprint(2, "failed to accept connection\n");
		return;
	}
	proccreate(handleproc, (void*)fd, 512*1024);
}

void
serve(char *addr)
{
	char adir[NETPATHLEN], dir[NETPATHLEN];
	int fd, ctl;
	fd = announce(addr, adir);
	if(fd < 0)
		sysfatal("%r");
	procsetname("neoventi/server");
	for(ctl = listen(adir, dir); ctl >= 0; ctl = listen(adir, dir)){
		handle(ctl, dir);
		close(ctl);
	}
	fprint(2, "server has died\n");
}