shithub: s3

Download patch

ref: 685a897dc19fec1e4f4b7ebeee0aac6b07c520a5
parent: d8adc268f3dccd53e67adc2c7957309db795f6fe
author: Jacob Moody <moody@posixcafe.org>
date: Tue Sep 23 19:33:19 EDT 2025

multipart write

--- a/cat.c
+++ b/cat.c
@@ -30,7 +30,7 @@
 	if(parseuri(&s3, path, sizeof path, argv[0]) < 0)
 		usage();
 	b = Bfdopen(1, OWRITE);
-	download(&s3, path, b);
+	download(&s3, path, b, s3get);
 	Bterm(b);
 	exits(nil);
 }
--- a/cmd.c
+++ b/cmd.c
@@ -4,21 +4,22 @@
 #include "s3.h"
 
 void
-download(S3 *s3, char *path, Biobuf *local)
+download(S3 *s3, char *path, Biobuf *local, int (*fn)(S3*,Hcon*,char*))
 {
-	int bfd;
 	long n;
+	Hcon con;
 	char data[8192];
 
-	bfd = s3get(s3, path);
-	if(bfd < 0)
-		sysfatal("s3get: %r");
+	if(fn(s3, &con, path) < 0)
+		sysfatal("failed to create request: %r");
 	for(;;){
-		n = read(bfd, data, sizeof data);
+		n = read(con.body, data, sizeof data);
 		if(n < 0)
 			sysfatal("download body: %r");
-		if(n == 0)
+		if(n == 0){
+			hclose(&con);
 			return;
+		}
 		Bwrite(local, data, n);
 	}
 }
--- a/cmd.h
+++ b/cmd.h
@@ -1,3 +1,3 @@
-void download(S3*, char*, Biobuf*);
+void download(S3*, char*, Biobuf*, int (*fn)(S3*,Hcon*,char*));
 int parseuri(S3*, char*, int, char*);
 int parseargs(S3*, int, char**);
--- a/cp.c
+++ b/cp.c
@@ -41,8 +41,9 @@
 	char mime[32];
 	uchar digest[SHA2_256dlen];
 	long n;
-	int fd, bfd;
+	int fd;
 	char buf[256];
+	Hcon con;
 
 	mimetype(localpath, mime, sizeof mime);
 	fd = open(localpath, OREAD);
@@ -59,8 +60,7 @@
 	sha2_256(nil, 0, digest, ds);
 	seek(fd, 0, 0);
 
-	bfd = s3put(s3, remotepath, mime, digest);
-	if(bfd < 0)
+	if(s3put(s3, &con, remotepath, mime, digest) < 0)
 		sysfatal("upload postbody open: %r");
 	for(;;){
 		n = read(fd, buf, sizeof buf);
@@ -68,10 +68,12 @@
 			sysfatal("file read: %r");
 		if(n == 0)
 			break;
-		if(write(bfd, buf, n) < 0)
+		if(write(con.post, buf, n) < 0)
 			sysfatal("upload write: %r");
 	}
-	close(bfd);
+	if(hdone(&con) < 0)
+		sysfatal("error response code: %r");
+	hclose(&con);
 }
 
 _Noreturn void
@@ -108,7 +110,7 @@
 		b = Bfdopen(fd, OWRITE);
 		if(b == nil)
 			sysfatal("Bfdopen: %r");
-		download(&s3, path, b);
+		download(&s3, path, b, s3get);
 		Bterm(b);
 		exits(nil);
 	}
--- a/ls.c
+++ b/ls.c
@@ -42,7 +42,8 @@
 		b[0] = Bfdopen(p[0], OWRITE);
 		if(b[0] == nil)
 			sysfatal("Bfdopen: %r");
-		download(&s3, path, b[0]);
+		download(&s3, path, b[0], s3get);
+		Bterm(b[0]);
 		exits(nil);
 	default:
 		close(p[0]);
--- a/mkfile
+++ b/mkfile
@@ -9,6 +9,7 @@
 	cat\
 	ls\
 	cp\
+	write\
 
 HFILES=\
 	xml.h\
@@ -17,12 +18,12 @@
 
 $O.factotum: factotum.$O
 
-$O.cmd: xml.$O s3.$O cmd.$O
-
 $O.rm: rm.$O s3.$O cmd.$O
 
 $O.cat: cat.$O s3.$O cmd.$O
 
 $O.ls: ls.$O s3.$O cmd.$O xml.$O
+
+$O.write: write.$O s3.$O cmd.$O xml.$O
 
 $O.cp: cp.$O s3.$O cmd.$O
--- a/rm.c
+++ b/rm.c
@@ -16,8 +16,8 @@
 {
 	S3 s3;
 	int i;
-	int fd;
 	char path[512];
+	Hcon con;
 
 	tmfmtinstall();
 	fmtinstall('H', encodefmt);
@@ -29,9 +29,7 @@
 		usage();
 	if(parseuri(&s3, path, sizeof path, argv[0]) < 0)
 		usage();
-	fd = s3del(&s3, path);
-	if(fd < 0)
-		sysfatal("delete failed: %r");
-	close(fd);
+	if(s3del(&s3, &con, path) < 0)
+		sysfatal("could not buld request: %r");
 	exits(nil);
 }
--- a/s3.c
+++ b/s3.c
@@ -9,7 +9,8 @@
 typedef struct {
 	uchar *payhash;
 	char *mime;
-	char method[16];
+	char *path;
+	char *method;
 	char time[128];
 	char authhdr[512];
 } Hreq;
@@ -52,20 +53,33 @@
 }
 
 static void
-mkhreq(Hreq *hreq, S3 *s3, char *method, char *path)
+mkreq(Hreq *hreq, char *method, char *path, uchar *payhash, char *mime)
 {
+	hreq->method = method;
+	hreq->path = path;
+	hreq->payhash = payhash;
+	hreq->mime = mime;
+}
+
+static void
+signreq(Hreq *hreq, S3 *s3)
+{
 	char date[64];
 	uchar key[SHA2_256dlen], sig[SHA2_256dlen];
-	char buf[512], req[512];
+	char buf[1024], buf2[1024], req[1024];
 	char *sgndhdr;
+	char *query;
 
 	datetime(date, sizeof date, hreq->time, sizeof hreq->time);
-	if(strcmp(method, "PUT") == 0){
+	if(strcmp(hreq->method, "POST") == 0){
 		snprint(buf, sizeof buf, "content-type:%s\nhost:%s\nx-amz-content-sha256:%.*lH\nx-amz-date:%s\n",
 			hreq->mime, s3->host, SHA2_256dlen, hreq->payhash, hreq->time);
 		sgndhdr = "content-type;host;x-amz-content-sha256;x-amz-date";
-	} else if(strcmp(method, "GET") == 0 || strcmp(method, "DELETE")==0){
-		hreq->mime = nil;
+	} else if(strcmp(hreq->method, "PUT") == 0){
+		snprint(buf, sizeof buf, "content-type:%s\nhost:%s\nx-amz-content-sha256:%.*lH\nx-amz-date:%s\n",
+			hreq->mime, s3->host, SHA2_256dlen, hreq->payhash, hreq->time);
+		sgndhdr = "content-type;host;x-amz-content-sha256;x-amz-date";
+	} else if(strcmp(hreq->method, "GET") == 0 || strcmp(hreq->method, "DELETE")==0 || strcmp(hreq->method, "POST") == 0){
 		sha2_256(nil, 0, hreq->payhash, nil);
 		snprint(buf, sizeof buf, "host:%s\nx-amz-date:%s\n", s3->host, hreq->time);
 		sgndhdr = "host;x-amz-date";
@@ -72,8 +86,14 @@
 	} else
 		sysfatal("invalid method");
 
+	snprint(buf2, sizeof buf2, "%s", hreq->path);
+	if((query = strchr(buf2, '?')) != nil){
+		*query = '\0';
+		query++;
+	} else
+		query = "";
 	snprint(req, sizeof req, "%s\n/%s/%s\n%s\n%s\n%s\n%.*lH",
-		method, s3->bucket, path, "", buf, sgndhdr, SHA2_256dlen, hreq->payhash);
+		hreq->method, s3->bucket, buf2, query, buf, sgndhdr, SHA2_256dlen, hreq->payhash);
 	sha2_256((uchar*)req, strlen(req), key, nil);
 	snprint(buf, sizeof buf, "%s\n%s\n%s/%s/%s/aws4_request\n%.*lH",
 		"AWS4-HMAC-SHA256", hreq->time, date, s3->region, "s3", SHA2_256dlen, key);
@@ -82,104 +102,168 @@
 
 	snprint(hreq->authhdr, sizeof hreq->authhdr, "%s Credential=%s/%s/%s/%s/aws4_request, SignedHeaders=%s, Signature=%.*lH",
 		"AWS4-HMAC-SHA256", s3->access, date, s3->region, "s3", sgndhdr, SHA2_256dlen, sig);
-	snprint(hreq->method, sizeof hreq->method, "%s", method);
 }
 
+/* small fprint buffers bite us */
+#pragma	   varargck    argpos	   ctlprint 2
+static long
+ctlprint(int cfd, char *fmt, ...)
+{
+	char buf[2048];
+	char *e;
+	va_list arg;
+
+	va_start(arg, fmt);
+	e = vseprint(buf, buf + sizeof buf, fmt, arg);
+	va_end(arg);
+	return write(cfd, buf, e-buf);
+}
+
 static int
-prep(S3 *s3, int cfd, char *path, Hreq *hreq)
+prep(S3 *s3, int cfd, Hreq *hreq)
 {
-	if(fprint(cfd, "url %s/%s/%s", s3->endpoint, s3->bucket, path) < 0)
+	if(ctlprint(cfd, "url %s/%s/%s", s3->endpoint, s3->bucket, hreq->path) < 0)
 		return -1;
-	if(fprint(cfd, "request %s", hreq->method) < 0)
+	if(ctlprint(cfd, "request %s", hreq->method) < 0)
 		return -1;
-	if(fprint(cfd, "headers Authorization:%s", hreq->authhdr) < 0)
+	if(ctlprint(cfd, "headers Authorization:%s", hreq->authhdr) < 0)
 		return -1;
-	if(fprint(cfd, "headers x-amz-date:%s\nx-amz-content-sha256:%.*lH", hreq->time, SHA2_256dlen, hreq->payhash) < 0)
+	if(ctlprint(cfd, "headers x-amz-date:%s\nx-amz-content-sha256:%.*lH", hreq->time, SHA2_256dlen, hreq->payhash) < 0)
 		return -1;
-	if(hreq->mime != nil && fprint(cfd, "contenttype %s", hreq->mime) < 0)
+	if(hreq->mime != nil && ctlprint(cfd, "contenttype %s", hreq->mime) < 0)
 		return -1;
 	return 0;
 }
 
 static int
-wopen(char *buf, long n)
+hopen(Hcon *h, S3 *s3, int mode, Hreq *hreq)
 {
-	int fd;
+	long n;
+	char buf[64];
 
-	fd = open("/mnt/web/clone", ORDWR);
-	if(fd < 0)
-		return fd;
-	n = read(fd, buf, n - 1);
+	h->body = -1;
+	h->post = -1;
+	h->err = -1;
+	h->ctl = open("/mnt/web/clone", ORDWR);
+	if(h->ctl < 0)
+		return -1;
+
+	n = read(h->ctl, h->id, sizeof h->id - 1);
 	if(n <= 0){
+		close(h->ctl);
 		werrstr("short read from /mnt/web/clone");
 		return -1;
 	}
-	buf[n-1] = 0;
-	return fd;
+	h->id[n-1] = 0;
+
+	signreq(hreq, s3);
+	if(prep(s3, h->ctl, hreq) < 0){
+		close(h->ctl);
+		return -1;
+	}
+	switch(h->mode = mode){
+	case OREAD:
+		snprint(buf, sizeof buf, "/mnt/web/%s/body", h->id);
+		h->body = open(buf, OREAD);
+		if(h->body >= 0)
+			return 0;
+		snprint(buf, sizeof buf, "/mnt/web/%s/errorbody", h->id);
+		h->err = open(buf, OREAD);
+		return -1;
+	case ORDWR:
+	case OWRITE:
+		snprint(buf, sizeof buf, "/mnt/web/%s/postbody", h->id);
+		h->post = open(buf, OWRITE);
+		return 0;
+	default:
+		return -1;
+	}
 }
 
+void
+hclose(Hcon *h)
+{
+	if(h->ctl >= 0)
+		close(h->ctl);
+	if(h->body >= 0)
+		close(h->body);
+	if(h->post >= 0)
+		close(h->post);
+	if(h->err >= 0)
+		close(h->err);
+}
+
 int
-s3get(S3 *s3, char *path)
+hdone(Hcon *h)
 {
-	char id[64];
-	char body[64];
-	int fd, bfd;
+	char buf[64];
+
+	switch(h->mode){
+	case OWRITE:
+	case ORDWR:
+		close(h->post);
+		h->post = -1;
+		snprint(buf, sizeof buf, "/mnt/web/%s/body", h->id);
+		h->body = open(buf, OREAD);
+		if(h->body < 0){
+			snprint(buf, sizeof buf, "/mnt/web/%s/errorbody", h->id);
+			h->err = open(buf, OREAD);
+			return -1;
+		}
+		return 0;
+	default:
+		abort();
+	}
+}
+
+int
+s3get(S3 *s3, Hcon *con, char *path)
+{
 	Hreq h;
 	uchar payhash[SHA2_256dlen];
 
-	h.payhash = payhash;
-	fd = wopen(id, sizeof id);
-	if(fd < 0)
-		return -1;
-	mkhreq(&h, s3, "GET", path);
-	if(prep(s3, fd, path, &h) < 0)
-		return -1;
-	snprint(body, sizeof body, "/mnt/web/%s/body", id);
-	bfd = open(body, OREAD);
-	close(fd);
-	return bfd;
+	mkreq(&h, "GET", path, payhash, nil);
+	return hopen(con, s3, OREAD, &h);
 }
 
 int
-s3put(S3 *s3, char *path, char *mime, uchar *payhash)
+s3put(S3 *s3, Hcon *con, char *path, char *mime, uchar *payhash)
 {
-	char id[64];
-	char body[64];
-	int fd, bfd;
 	Hreq h;
 
-	h.mime = mime;
-	h.payhash = payhash;
-	fd = wopen(id, sizeof id);
-	if(fd < 0)
-		return -1;
-	mkhreq(&h, s3, "PUT", path);
-	if(prep(s3, fd, path, &h) < 0)
-		return -1;
-	snprint(body, sizeof body, "/mnt/web/%s/postbody", id);
-	bfd = open(body, OWRITE);
-	close(fd);
-	return bfd;
+	mkreq(&h, "PUT", path, payhash, mime);
+	return hopen(con, s3, ORDWR, &h);
 }
 
 int
-s3del(S3 *s3, char *path)
+s3del(S3 *s3, Hcon *con, char *path)
 {
-	char id[64];
-	char body[64];
-	int fd, bfd;
 	Hreq h;
 	uchar payhash[SHA2_256dlen];
 
-	h.payhash = payhash;
-	fd = wopen(id, sizeof id);
-	if(fd < 0)
+	mkreq(&h, "DELETE", path, payhash, nil);
+	return hopen(con, s3, OREAD, &h);
+}
+
+int
+s3post(S3 *s3, Hcon *con, char *path)
+{
+	Hreq h;
+	uchar payhash[SHA2_256dlen];
+
+	sha2_256(nil, 0, payhash, nil);
+	mkreq(&h, "POST", path, payhash, "application/octet-stream");
+	if(hopen(con, s3, ORDWR, &h) < 0)
 		return -1;
-	mkhreq(&h, s3, "DELETE", path);
-	if(prep(s3, fd, path, &h) < 0)
-		return -1;
-	snprint(body, sizeof body, "/mnt/web/%s/body", id);
-	bfd = open(body, OREAD);
-	close(fd);
-	return bfd;
+	
+	return hdone(con);
+}
+
+int
+s3postwrite(S3 *s3, Hcon *con, char *path, char *mime, uchar *payhash)
+{
+	Hreq h;
+
+	mkreq(&h, "POST", path, payhash, mime);
+	return hopen(con, s3, ORDWR, &h);
 }
--- a/s3.h
+++ b/s3.h
@@ -1,11 +1,25 @@
-typedef struct {
+typedef struct S3 S3;
+typedef struct Hcon Hcon;
+
+struct S3 {
 	char *endpoint;
 	char *host;
 	char *access;
 	char *bucket;
 	char *region;
-} S3;
+};
 
-int s3get(S3 *s3, char *path);
-int s3del(S3 *s3, char *path);
-int s3put(S3 *s3, char *path, char *mime, uchar *payhash);
+struct Hcon {
+	char id[16];
+	int ctl, body, post, err;
+	int mode;
+};
+
+int s3get(S3 *s3, Hcon *con, char *path);
+int s3del(S3 *s3, Hcon *con, char *path);
+int s3put(S3 *s3, Hcon *con, char *path, char *mime, uchar *payhash);
+int s3post(S3 *s3, Hcon *con, char *path);
+int s3postwrite(S3 *s3, Hcon *con, char *path, char *mime, uchar *payhash);
+
+void hclose(Hcon *con);
+int hdone(Hcon *con);
--- /dev/null
+++ b/write.c
@@ -1,0 +1,154 @@
+#include <u.h>
+#include <libc.h>
+#include <bio.h>
+#include <mp.h>
+#include <libsec.h>
+#include "s3.h"
+#include "cmd.h"
+#include "xml.h"
+
+_Noreturn void
+usage(void)
+{
+	fprint(2, "Usage %s: s3://bucket/dir\n", argv0);
+	exits("usage");
+}
+
+static char*
+putpart(S3 *s3, char *path, int partno, char *uploadid, uchar *data, long n)
+{
+	Hcon con;
+	int fd, ret;
+	uchar payhash[SHA2_256dlen];
+	char buf[256];
+	long etagn;
+
+	sha2_256(data, n, payhash, nil);
+	ret = s3put(s3, &con, smprint("%s?partNumber=%d&uploadId=%s", path, partno, uploadid), "application/octet-stream", payhash);
+	if(ret < 0)
+		sysfatal("part %d upload: s3put: %r", partno);
+	if(write(con.post, data, n) != n)
+		sysfatal("short write on s3put: %r");
+	if(hdone(&con) < 0)
+		sysfatal("server responded with error: %r");
+	snprint(buf, sizeof buf, "/mnt/web/%s/etag", con.id);
+	fd = open(buf, OREAD);
+	if(fd < 0)
+		sysfatal("no ETAG response header");
+	if((etagn = read(fd, buf, sizeof buf - 1)) < 0)
+		sysfatal("can't read ETAG response");
+	close(fd);
+	buf[etagn] = '\0';
+	hclose(&con);
+	return strdup(buf);
+}
+
+static void
+finishpart(S3 *s3, char *path, char *tags[], int partmax, char *uploadid)
+{
+	int ret, i;
+	uchar payhash[SHA2_256dlen];
+	char buf[8192];
+	char *s, *e;
+	long errn;
+	char errbody[2048];
+	Hcon con;
+
+	s = buf;
+	e = s + sizeof buf-1;
+	s = seprint(s, e, "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n");
+	s = seprint(s, e, "<CompleteMultipartUpload xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\n");
+	for(i = 1; i < partmax; i++)
+		s = seprint(s, e, "<Part><PartNumber>%d</PartNumber><ETag>%s</ETag></Part>\n", i, tags[i-1]);
+	s = seprint(s, e, "</CompleteMultipartUpload>");
+	sha2_256((uchar*)buf, s-buf, payhash, nil);
+	ret = s3postwrite(s3, &con, smprint("%s?uploadId=%s", path, uploadid), "application/xml", payhash);
+	if(ret < 0)
+		sysfatal("upload did not complete: %r");
+	if(write(con.post, buf, s-buf) != s-buf)
+		sysfatal("short write on s3post: %r");
+	if(hdone(&con) == 0){
+		hclose(&con);
+		return;
+	}
+	errn = read(con.err, errbody, sizeof errbody);
+	if(errn <= 0)
+		sysfatal("cant read con.err: %r");
+	write(2, errbody, errn);
+	exits("errorbody");
+}
+
+void
+main(int argc , char **argv)
+{
+	S3 s3;
+	int i, partno;
+	char path[512];
+	int p[2];
+	Biobuf *b[2];
+	Xelem *x;
+	static uchar bb[5*1024*1024];
+	long n, len;
+	static char *tags[256];
+
+	tmfmtinstall();
+	fmtinstall('H', encodefmt);
+	quotefmtinstall();
+	i = parseargs(&s3, argc, argv);
+	argc -= i;
+	argv += i;
+
+	if(argc == 0)
+		usage();
+	if(parseuri(&s3, path, sizeof path, argv[0]) < 0)
+		usage();
+	if(pipe(p) < 0)
+		sysfatal("pipe: %r");
+	switch(fork()){
+	case -1:
+		sysfatal("fork: %r");
+	case 0:
+		close(p[1]);
+		b[0] = Bfdopen(p[0], OWRITE);
+		if(b[0] == nil)
+			sysfatal("Bfdopen: %r");
+		download(&s3, smprint("%s?uploads=", path), b[0], s3post);
+		Bterm(b[0]);
+		exits(nil);
+	default:
+		waitpid();
+		close(p[0]);
+		break;
+	}
+	b[1] = Bfdopen(p[1], OREAD);
+	if(b[1] == nil)
+		sysfatal("Bfdopen: %r");
+	x = xmlread(b[1], 0);
+	if(x == nil)
+		sysfatal("file was not valid XML, maybe not a prefix?");
+	if((x = xmlget(x, "UploadId", nil)) == nil)
+		sysfatal("xml did not have UploadId field");
+
+	/* print("%s\n", x->v); */
+
+	for(len = 0, partno = 1;;){
+		n = read(0, bb, sizeof bb - len);
+		if(n == 0)
+			break;
+		if(n < 0)	/* Abort probably? */
+			sysfatal("read: %r");
+		len += n;
+		if(len == sizeof bb){
+			tags[partno-1] = putpart(&s3, path, partno, x->v, bb, sizeof bb);
+			partno++;
+			len = 0;
+		}
+	}
+	if(len != 0){
+		tags[partno-1] = putpart(&s3, path, partno, x->v, bb, len);
+		partno++;
+	}
+
+	finishpart(&s3, path, tags, partno, x->v);
+	exits(nil);
+}
--