ref: 685a897dc19fec1e4f4b7ebeee0aac6b07c520a5
parent: d8adc268f3dccd53e67adc2c7957309db795f6fe
author: Jacob Moody <moody@posixcafe.org>
date: Tue Sep 23 19:33:19 EDT 2025
multipart write
--- a/cat.c
+++ b/cat.c
@@ -30,7 +30,7 @@
if(parseuri(&s3, path, sizeof path, argv[0]) < 0)
usage();
b = Bfdopen(1, OWRITE);
- download(&s3, path, b);
+ download(&s3, path, b, s3get);
Bterm(b);
exits(nil);
}
--- a/cmd.c
+++ b/cmd.c
@@ -4,21 +4,22 @@
#include "s3.h"
void
-download(S3 *s3, char *path, Biobuf *local)
+download(S3 *s3, char *path, Biobuf *local, int (*fn)(S3*,Hcon*,char*))
{- int bfd;
long n;
+ Hcon con;
char data[8192];
- bfd = s3get(s3, path);
- if(bfd < 0)
- sysfatal("s3get: %r");+ if(fn(s3, &con, path) < 0)
+ sysfatal("failed to create request: %r"); for(;;){- n = read(bfd, data, sizeof data);
+ n = read(con.body, data, sizeof data);
if(n < 0)
sysfatal("download body: %r");- if(n == 0)
+ if(n == 0){+ hclose(&con);
return;
+ }
Bwrite(local, data, n);
}
}
--- a/cmd.h
+++ b/cmd.h
@@ -1,3 +1,3 @@
-void download(S3*, char*, Biobuf*);
+void download(S3*, char*, Biobuf*, int (*fn)(S3*,Hcon*,char*));
int parseuri(S3*, char*, int, char*);
int parseargs(S3*, int, char**);
--- a/cp.c
+++ b/cp.c
@@ -41,8 +41,9 @@
char mime[32];
uchar digest[SHA2_256dlen];
long n;
- int fd, bfd;
+ int fd;
char buf[256];
+ Hcon con;
mimetype(localpath, mime, sizeof mime);
fd = open(localpath, OREAD);
@@ -59,8 +60,7 @@
sha2_256(nil, 0, digest, ds);
seek(fd, 0, 0);
- bfd = s3put(s3, remotepath, mime, digest);
- if(bfd < 0)
+ if(s3put(s3, &con, remotepath, mime, digest) < 0)
sysfatal("upload postbody open: %r"); for(;;){n = read(fd, buf, sizeof buf);
@@ -68,10 +68,12 @@
sysfatal("file read: %r");if(n == 0)
break;
- if(write(bfd, buf, n) < 0)
+ if(write(con.post, buf, n) < 0)
sysfatal("upload write: %r");}
- close(bfd);
+ if(hdone(&con) < 0)
+ sysfatal("error response code: %r");+ hclose(&con);
}
_Noreturn void
@@ -108,7 +110,7 @@
b = Bfdopen(fd, OWRITE);
if(b == nil)
sysfatal("Bfdopen: %r");- download(&s3, path, b);
+ download(&s3, path, b, s3get);
Bterm(b);
exits(nil);
}
--- a/ls.c
+++ b/ls.c
@@ -42,7 +42,8 @@
b[0] = Bfdopen(p[0], OWRITE);
if(b[0] == nil)
sysfatal("Bfdopen: %r");- download(&s3, path, b[0]);
+ download(&s3, path, b[0], s3get);
+ Bterm(b[0]);
exits(nil);
default:
close(p[0]);
--- a/mkfile
+++ b/mkfile
@@ -9,6 +9,7 @@
cat\
ls\
cp\
+ write\
HFILES=\
xml.h\
@@ -17,12 +18,12 @@
$O.factotum: factotum.$O
-$O.cmd: xml.$O s3.$O cmd.$O
-
$O.rm: rm.$O s3.$O cmd.$O
$O.cat: cat.$O s3.$O cmd.$O
$O.ls: ls.$O s3.$O cmd.$O xml.$O
+
+$O.write: write.$O s3.$O cmd.$O xml.$O
$O.cp: cp.$O s3.$O cmd.$O
--- a/rm.c
+++ b/rm.c
@@ -16,8 +16,8 @@
{S3 s3;
int i;
- int fd;
char path[512];
+ Hcon con;
tmfmtinstall();
fmtinstall('H', encodefmt);@@ -29,9 +29,7 @@
usage();
if(parseuri(&s3, path, sizeof path, argv[0]) < 0)
usage();
- fd = s3del(&s3, path);
- if(fd < 0)
- sysfatal("delete failed: %r");- close(fd);
+ if(s3del(&s3, &con, path) < 0)
+ sysfatal("could not buld request: %r");exits(nil);
}
--- a/s3.c
+++ b/s3.c
@@ -9,7 +9,8 @@
typedef struct {uchar *payhash;
char *mime;
- char method[16];
+ char *path;
+ char *method;
char time[128];
char authhdr[512];
} Hreq;
@@ -52,20 +53,33 @@
}
static void
-mkhreq(Hreq *hreq, S3 *s3, char *method, char *path)
+mkreq(Hreq *hreq, char *method, char *path, uchar *payhash, char *mime)
{+ hreq->method = method;
+ hreq->path = path;
+ hreq->payhash = payhash;
+ hreq->mime = mime;
+}
+
+static void
+signreq(Hreq *hreq, S3 *s3)
+{char date[64];
uchar key[SHA2_256dlen], sig[SHA2_256dlen];
- char buf[512], req[512];
+ char buf[1024], buf2[1024], req[1024];
char *sgndhdr;
+ char *query;
datetime(date, sizeof date, hreq->time, sizeof hreq->time);
- if(strcmp(method, "PUT") == 0){+ if(strcmp(hreq->method, "POST") == 0){snprint(buf, sizeof buf, "content-type:%s\nhost:%s\nx-amz-content-sha256:%.*lH\nx-amz-date:%s\n",
hreq->mime, s3->host, SHA2_256dlen, hreq->payhash, hreq->time);
sgndhdr = "content-type;host;x-amz-content-sha256;x-amz-date";
- } else if(strcmp(method, "GET") == 0 || strcmp(method, "DELETE")==0){- hreq->mime = nil;
+ } else if(strcmp(hreq->method, "PUT") == 0){+ snprint(buf, sizeof buf, "content-type:%s\nhost:%s\nx-amz-content-sha256:%.*lH\nx-amz-date:%s\n",
+ hreq->mime, s3->host, SHA2_256dlen, hreq->payhash, hreq->time);
+ sgndhdr = "content-type;host;x-amz-content-sha256;x-amz-date";
+ } else if(strcmp(hreq->method, "GET") == 0 || strcmp(hreq->method, "DELETE")==0 || strcmp(hreq->method, "POST") == 0){sha2_256(nil, 0, hreq->payhash, nil);
snprint(buf, sizeof buf, "host:%s\nx-amz-date:%s\n", s3->host, hreq->time);
sgndhdr = "host;x-amz-date";
@@ -72,8 +86,14 @@
} else
sysfatal("invalid method");+ snprint(buf2, sizeof buf2, "%s", hreq->path);
+ if((query = strchr(buf2, '?')) != nil){+ *query = '\0';
+ query++;
+ } else
+ query = "";
snprint(req, sizeof req, "%s\n/%s/%s\n%s\n%s\n%s\n%.*lH",
- method, s3->bucket, path, "", buf, sgndhdr, SHA2_256dlen, hreq->payhash);
+ hreq->method, s3->bucket, buf2, query, buf, sgndhdr, SHA2_256dlen, hreq->payhash);
sha2_256((uchar*)req, strlen(req), key, nil);
snprint(buf, sizeof buf, "%s\n%s\n%s/%s/%s/aws4_request\n%.*lH",
"AWS4-HMAC-SHA256", hreq->time, date, s3->region, "s3", SHA2_256dlen, key);
@@ -82,104 +102,168 @@
snprint(hreq->authhdr, sizeof hreq->authhdr, "%s Credential=%s/%s/%s/%s/aws4_request, SignedHeaders=%s, Signature=%.*lH",
"AWS4-HMAC-SHA256", s3->access, date, s3->region, "s3", sgndhdr, SHA2_256dlen, sig);
- snprint(hreq->method, sizeof hreq->method, "%s", method);
}
+/* small fprint buffers bite us */
+#pragma varargck argpos ctlprint 2
+static long
+ctlprint(int cfd, char *fmt, ...)
+{+ char buf[2048];
+ char *e;
+ va_list arg;
+
+ va_start(arg, fmt);
+ e = vseprint(buf, buf + sizeof buf, fmt, arg);
+ va_end(arg);
+ return write(cfd, buf, e-buf);
+}
+
static int
-prep(S3 *s3, int cfd, char *path, Hreq *hreq)
+prep(S3 *s3, int cfd, Hreq *hreq)
{- if(fprint(cfd, "url %s/%s/%s", s3->endpoint, s3->bucket, path) < 0)
+ if(ctlprint(cfd, "url %s/%s/%s", s3->endpoint, s3->bucket, hreq->path) < 0)
return -1;
- if(fprint(cfd, "request %s", hreq->method) < 0)
+ if(ctlprint(cfd, "request %s", hreq->method) < 0)
return -1;
- if(fprint(cfd, "headers Authorization:%s", hreq->authhdr) < 0)
+ if(ctlprint(cfd, "headers Authorization:%s", hreq->authhdr) < 0)
return -1;
- if(fprint(cfd, "headers x-amz-date:%s\nx-amz-content-sha256:%.*lH", hreq->time, SHA2_256dlen, hreq->payhash) < 0)
+ if(ctlprint(cfd, "headers x-amz-date:%s\nx-amz-content-sha256:%.*lH", hreq->time, SHA2_256dlen, hreq->payhash) < 0)
return -1;
- if(hreq->mime != nil && fprint(cfd, "contenttype %s", hreq->mime) < 0)
+ if(hreq->mime != nil && ctlprint(cfd, "contenttype %s", hreq->mime) < 0)
return -1;
return 0;
}
static int
-wopen(char *buf, long n)
+hopen(Hcon *h, S3 *s3, int mode, Hreq *hreq)
{- int fd;
+ long n;
+ char buf[64];
- fd = open("/mnt/web/clone", ORDWR);- if(fd < 0)
- return fd;
- n = read(fd, buf, n - 1);
+ h->body = -1;
+ h->post = -1;
+ h->err = -1;
+ h->ctl = open("/mnt/web/clone", ORDWR);+ if(h->ctl < 0)
+ return -1;
+
+ n = read(h->ctl, h->id, sizeof h->id - 1);
if(n <= 0){+ close(h->ctl);
werrstr("short read from /mnt/web/clone");return -1;
}
- buf[n-1] = 0;
- return fd;
+ h->id[n-1] = 0;
+
+ signreq(hreq, s3);
+ if(prep(s3, h->ctl, hreq) < 0){+ close(h->ctl);
+ return -1;
+ }
+ switch(h->mode = mode){+ case OREAD:
+ snprint(buf, sizeof buf, "/mnt/web/%s/body", h->id);
+ h->body = open(buf, OREAD);
+ if(h->body >= 0)
+ return 0;
+ snprint(buf, sizeof buf, "/mnt/web/%s/errorbody", h->id);
+ h->err = open(buf, OREAD);
+ return -1;
+ case ORDWR:
+ case OWRITE:
+ snprint(buf, sizeof buf, "/mnt/web/%s/postbody", h->id);
+ h->post = open(buf, OWRITE);
+ return 0;
+ default:
+ return -1;
+ }
}
+void
+hclose(Hcon *h)
+{+ if(h->ctl >= 0)
+ close(h->ctl);
+ if(h->body >= 0)
+ close(h->body);
+ if(h->post >= 0)
+ close(h->post);
+ if(h->err >= 0)
+ close(h->err);
+}
+
int
-s3get(S3 *s3, char *path)
+hdone(Hcon *h)
{- char id[64];
- char body[64];
- int fd, bfd;
+ char buf[64];
+
+ switch(h->mode){+ case OWRITE:
+ case ORDWR:
+ close(h->post);
+ h->post = -1;
+ snprint(buf, sizeof buf, "/mnt/web/%s/body", h->id);
+ h->body = open(buf, OREAD);
+ if(h->body < 0){+ snprint(buf, sizeof buf, "/mnt/web/%s/errorbody", h->id);
+ h->err = open(buf, OREAD);
+ return -1;
+ }
+ return 0;
+ default:
+ abort();
+ }
+}
+
+int
+s3get(S3 *s3, Hcon *con, char *path)
+{Hreq h;
uchar payhash[SHA2_256dlen];
- h.payhash = payhash;
- fd = wopen(id, sizeof id);
- if(fd < 0)
- return -1;
- mkhreq(&h, s3, "GET", path);
- if(prep(s3, fd, path, &h) < 0)
- return -1;
- snprint(body, sizeof body, "/mnt/web/%s/body", id);
- bfd = open(body, OREAD);
- close(fd);
- return bfd;
+ mkreq(&h, "GET", path, payhash, nil);
+ return hopen(con, s3, OREAD, &h);
}
int
-s3put(S3 *s3, char *path, char *mime, uchar *payhash)
+s3put(S3 *s3, Hcon *con, char *path, char *mime, uchar *payhash)
{- char id[64];
- char body[64];
- int fd, bfd;
Hreq h;
- h.mime = mime;
- h.payhash = payhash;
- fd = wopen(id, sizeof id);
- if(fd < 0)
- return -1;
- mkhreq(&h, s3, "PUT", path);
- if(prep(s3, fd, path, &h) < 0)
- return -1;
- snprint(body, sizeof body, "/mnt/web/%s/postbody", id);
- bfd = open(body, OWRITE);
- close(fd);
- return bfd;
+ mkreq(&h, "PUT", path, payhash, mime);
+ return hopen(con, s3, ORDWR, &h);
}
int
-s3del(S3 *s3, char *path)
+s3del(S3 *s3, Hcon *con, char *path)
{- char id[64];
- char body[64];
- int fd, bfd;
Hreq h;
uchar payhash[SHA2_256dlen];
- h.payhash = payhash;
- fd = wopen(id, sizeof id);
- if(fd < 0)
+ mkreq(&h, "DELETE", path, payhash, nil);
+ return hopen(con, s3, OREAD, &h);
+}
+
+int
+s3post(S3 *s3, Hcon *con, char *path)
+{+ Hreq h;
+ uchar payhash[SHA2_256dlen];
+
+ sha2_256(nil, 0, payhash, nil);
+ mkreq(&h, "POST", path, payhash, "application/octet-stream");
+ if(hopen(con, s3, ORDWR, &h) < 0)
return -1;
- mkhreq(&h, s3, "DELETE", path);
- if(prep(s3, fd, path, &h) < 0)
- return -1;
- snprint(body, sizeof body, "/mnt/web/%s/body", id);
- bfd = open(body, OREAD);
- close(fd);
- return bfd;
+
+ return hdone(con);
+}
+
+int
+s3postwrite(S3 *s3, Hcon *con, char *path, char *mime, uchar *payhash)
+{+ Hreq h;
+
+ mkreq(&h, "POST", path, payhash, mime);
+ return hopen(con, s3, ORDWR, &h);
}
--- a/s3.h
+++ b/s3.h
@@ -1,11 +1,25 @@
-typedef struct {+typedef struct S3 S3;
+typedef struct Hcon Hcon;
+
+struct S3 {char *endpoint;
char *host;
char *access;
char *bucket;
char *region;
-} S3;
+};
-int s3get(S3 *s3, char *path);
-int s3del(S3 *s3, char *path);
-int s3put(S3 *s3, char *path, char *mime, uchar *payhash);
+struct Hcon {+ char id[16];
+ int ctl, body, post, err;
+ int mode;
+};
+
+int s3get(S3 *s3, Hcon *con, char *path);
+int s3del(S3 *s3, Hcon *con, char *path);
+int s3put(S3 *s3, Hcon *con, char *path, char *mime, uchar *payhash);
+int s3post(S3 *s3, Hcon *con, char *path);
+int s3postwrite(S3 *s3, Hcon *con, char *path, char *mime, uchar *payhash);
+
+void hclose(Hcon *con);
+int hdone(Hcon *con);
--- /dev/null
+++ b/write.c
@@ -1,0 +1,154 @@
+#include <u.h>
+#include <libc.h>
+#include <bio.h>
+#include <mp.h>
+#include <libsec.h>
+#include "s3.h"
+#include "cmd.h"
+#include "xml.h"
+
+_Noreturn void
+usage(void)
+{+ fprint(2, "Usage %s: s3://bucket/dir\n", argv0);
+ exits("usage");+}
+
+static char*
+putpart(S3 *s3, char *path, int partno, char *uploadid, uchar *data, long n)
+{+ Hcon con;
+ int fd, ret;
+ uchar payhash[SHA2_256dlen];
+ char buf[256];
+ long etagn;
+
+ sha2_256(data, n, payhash, nil);
+ ret = s3put(s3, &con, smprint("%s?partNumber=%d&uploadId=%s", path, partno, uploadid), "application/octet-stream", payhash);+ if(ret < 0)
+ sysfatal("part %d upload: s3put: %r", partno);+ if(write(con.post, data, n) != n)
+ sysfatal("short write on s3put: %r");+ if(hdone(&con) < 0)
+ sysfatal("server responded with error: %r");+ snprint(buf, sizeof buf, "/mnt/web/%s/etag", con.id);
+ fd = open(buf, OREAD);
+ if(fd < 0)
+ sysfatal("no ETAG response header");+ if((etagn = read(fd, buf, sizeof buf - 1)) < 0)
+ sysfatal("can't read ETAG response");+ close(fd);
+ buf[etagn] = '\0';
+ hclose(&con);
+ return strdup(buf);
+}
+
+static void
+finishpart(S3 *s3, char *path, char *tags[], int partmax, char *uploadid)
+{+ int ret, i;
+ uchar payhash[SHA2_256dlen];
+ char buf[8192];
+ char *s, *e;
+ long errn;
+ char errbody[2048];
+ Hcon con;
+
+ s = buf;
+ e = s + sizeof buf-1;
+ s = seprint(s, e, "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n");
+ s = seprint(s, e, "<CompleteMultipartUpload xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\n");
+ for(i = 1; i < partmax; i++)
+ s = seprint(s, e, "<Part><PartNumber>%d</PartNumber><ETag>%s</ETag></Part>\n", i, tags[i-1]);
+ s = seprint(s, e, "</CompleteMultipartUpload>");
+ sha2_256((uchar*)buf, s-buf, payhash, nil);
+ ret = s3postwrite(s3, &con, smprint("%s?uploadId=%s", path, uploadid), "application/xml", payhash);+ if(ret < 0)
+ sysfatal("upload did not complete: %r");+ if(write(con.post, buf, s-buf) != s-buf)
+ sysfatal("short write on s3post: %r");+ if(hdone(&con) == 0){+ hclose(&con);
+ return;
+ }
+ errn = read(con.err, errbody, sizeof errbody);
+ if(errn <= 0)
+ sysfatal("cant read con.err: %r");+ write(2, errbody, errn);
+ exits("errorbody");+}
+
+void
+main(int argc , char **argv)
+{+ S3 s3;
+ int i, partno;
+ char path[512];
+ int p[2];
+ Biobuf *b[2];
+ Xelem *x;
+ static uchar bb[5*1024*1024];
+ long n, len;
+ static char *tags[256];
+
+ tmfmtinstall();
+ fmtinstall('H', encodefmt);+ quotefmtinstall();
+ i = parseargs(&s3, argc, argv);
+ argc -= i;
+ argv += i;
+
+ if(argc == 0)
+ usage();
+ if(parseuri(&s3, path, sizeof path, argv[0]) < 0)
+ usage();
+ if(pipe(p) < 0)
+ sysfatal("pipe: %r");+ switch(fork()){+ case -1:
+ sysfatal("fork: %r");+ case 0:
+ close(p[1]);
+ b[0] = Bfdopen(p[0], OWRITE);
+ if(b[0] == nil)
+ sysfatal("Bfdopen: %r");+ download(&s3, smprint("%s?uploads=", path), b[0], s3post);+ Bterm(b[0]);
+ exits(nil);
+ default:
+ waitpid();
+ close(p[0]);
+ break;
+ }
+ b[1] = Bfdopen(p[1], OREAD);
+ if(b[1] == nil)
+ sysfatal("Bfdopen: %r");+ x = xmlread(b[1], 0);
+ if(x == nil)
+ sysfatal("file was not valid XML, maybe not a prefix?");+ if((x = xmlget(x, "UploadId", nil)) == nil)
+ sysfatal("xml did not have UploadId field");+
+ /* print("%s\n", x->v); */+
+ for(len = 0, partno = 1;;){+ n = read(0, bb, sizeof bb - len);
+ if(n == 0)
+ break;
+ if(n < 0) /* Abort probably? */
+ sysfatal("read: %r");+ len += n;
+ if(len == sizeof bb){+ tags[partno-1] = putpart(&s3, path, partno, x->v, bb, sizeof bb);
+ partno++;
+ len = 0;
+ }
+ }
+ if(len != 0){+ tags[partno-1] = putpart(&s3, path, partno, x->v, bb, len);
+ partno++;
+ }
+
+ finishpart(&s3, path, tags, partno, x->v);
+ exits(nil);
+}
--
⑨