ref: 4b60d3dd32efd00a1d07cb08662926ba9836719f
dir: /server.c/
#include <u.h> #include <libc.h> #include <bio.h> #include <thread.h> #include <libsec.h> #include "neoventi.h" static void vtsend(VtConn conn, char *buf, u16int size, u8int tag, int drop); // Handles an error on `conn` handling client request `tbuf` // Only the tag must be preserved in the buffer static void vterr(VtConn conn, char *tbuf, char *msg, ...) { u16int len; va_list args; va_start(args, msg); msg = vsmprint(msg, args); werrstr(msg); free(msg); va_end(args); if(tbuf != nil){ len = snprint(tbuf+6, 0x10000, "neoventi: %r"); U16PUT(tbuf+4, len); vtsend(conn, tbuf, len+4, VtRerror, 1); } longjmp(conn.bounce, 1); } static void vthangup(VtConn conn) { longjmp(conn.bounce, 2); } // Convenience function: reads a venti packet from conn into buf static void vtrecv(VtConn conn, char *buf) { switch(read(conn.fd, buf, MaxPacketSize)){ case 0: vthangup(conn); case 1: vterr(conn, nil, "received a single byte for message size"); default: return; } } static void vtversion(VtConn conn, char *buf) { long n, i; // Response is one line of unknown size; discard bytes until EOL if(fprint(conn.fd, "venti-02-neoventi\n") == 18){ n = read(conn.fd, buf, MaxPacketSize); for(i = 0; i < n; i += 1){ if(buf[i] == '\n'){ if(i+1 == n) return; break; } } } // If the handshake fails, make it clear there's a problem and give up fprint(conn.fd, "FUCK OFF\n"); vterr(conn, nil, "handshake failed"); } static void vtsend(VtConn conn, char *buf, u16int size, u8int type, int drop) { U16PUT(buf, size); buf[2] = type; // +2 because we need to send the u16 size as well! if(write(conn.fd, buf, size+2) != size+2){ if(drop) fprint(2, "failed to submit error packet: %r\n"); else vterr(conn, buf, "failed to write packet: %r"); } } static void vtread(VtConn conn, char *buf) { VtAddress addr; if(!vtreadlookup((u8int*)buf + 4, &addr)) vterr(conn, buf, "read error: %r"); if(!readclump((uchar*)buf+4, addr)) vterr(conn, buf, "clump read failed: %"); vtsend(conn, buf, addr.size+2, VtRread, 0); } static void vtwrite(VtConn conn, char *buf, u16int len) { u8int score[20]; uchar buf2[MaxPacketSize]; VtAddress addr; sha1((uchar*)buf+8, len, score, nil); if(vtreadlookup(score, &addr)){ if(addr.size != len) vterr(conn, buf, "hash collision detected"); if(!readclump(buf2, addr)) vterr(conn, buf, "clump read failed: %r"); if(memcmp(buf2, buf+8, len) != 0) vterr(conn, buf, "hash collision detected"); } else { vterr(conn, buf, "TODO: insert data"); } memcpy(buf+4, score, 20); vtsend(conn, buf, 22, VtRwrite, 0); } static int vtconnhandle(VtConn conn, char *buf, u16int len) { switch(buf[2]){ case VtTread: vtread(conn, buf); return 1; case VtTwrite: vtwrite(conn, buf, len-6); return 1; case VtTgoodbye: vthangup(conn); case VtTsync: vterr(conn, buf, "TODO: sync not supported yet"); default: vterr(conn, buf, "TODO unimplemented request type %d", buf[2]); } return 0; } static void vthello(VtConn conn, char *buf) { vtversion(conn, buf); vtrecv(conn, buf); if(buf[2] != VtThello) vterr(conn, buf, "received message before hello: %d", buf[2]); if(buf[4] != 0 || buf[5] != 2 || buf[6] != '0' || buf[7] != '2') vterr(conn, buf, "unsupported protocol version requested in Thello: %d %d %d %d", buf[4], buf[5], buf[6], buf[7]); buf[6] = 'n'; buf[7] = 'o'; vtsend(conn, buf, 8, VtRhello, 0); } static void vtloop(VtConn conn, char *packetbuf) { long n; char buf[0x10000]; u16int offset = 0, sz = 0; while(1){ //TODO: treat conn.buf as ring buffer? Avoids moving packet around. // Maintain 64K of packet data in conn.buf. // Process all full packets in the buffer. // If the final packet is incomplete, move it to the beginning of the // buffer, and read more into the buffer _after_ it. // Then, repeat. n = read(conn.fd, buf+sz, 0x10000-sz); sz += n; if(sz == 0) // No data, and none will be coming. vthangup(conn); if(sz == 1) vterr(conn, buf, "incomplete packet!"); while(offset+2 < sz){ // As long as there's a complete packet, process it. u16int len = U16GET(buf + offset); if(2 + offset + len > sz){ // missing part of packet! break; } // Extract packet into packet buffer - this is needed because // we modify the packet buffer in-place for response, and we need to // have room to write a response without overwriting follow-up packets. // TODO: should we drop the idea of modifying in place if we need to copy // anyways? Don't want allocation churn, but we can just move packetbuf // into VtConn and pass both the source and destination buffer around a lot. // Not important right now. memcpy(packetbuf, buf+offset, 2+len); vtconnhandle(conn, packetbuf, len); offset += 2+len; } if(offset == sz){ sz = 0; offset = 0; continue; } } } static void handleproc(void *fd) { char buf[MaxPacketSize]; VtConn conn; conn.fd = (int)(usize)fd; switch(setjmp(conn.bounce)){ case 0: vthello(conn, buf); vtloop(conn, buf); break; case 1: fprint(2, "abandoning client: %r\n"); break; case 2: // Deliberate disconnect, don't log. break; default: sysfatal("internal error: unexpected bounce code"); } close(conn.fd); } static void handle(int ctl, char *dir) { int fd = accept(ctl, dir); if(fd < 0){ fprint(2, "failed to accept connection\n"); return; } proccreate(handleproc, (void*)fd, 512*1024); } void serve(char *addr) { char adir[NETPATHLEN], dir[NETPATHLEN]; int fd, ctl; fd = announce(addr, adir); if(fd < 0) sysfatal("%r"); procsetname("neoventi/server"); for(ctl = listen(adir, dir); ctl >= 0; ctl = listen(adir, dir)){ handle(ctl, dir); close(ctl); } fprint(2, "server has died\n"); }