shithub: clone

Download patch

ref: 151eed2c3aaa1975b3c7a0331bb3ee4730552e8d
parent: 89612b6c2f225ba7e172974abecd97d8a4703761
author: kvik <kvik@a-b.xyz>
date: Thu Nov 1 00:21:04 EDT 2018

stop the Channel abuse, adopt mischief's WaitGroups

--- a/clone.c
+++ b/clone.c
@@ -7,16 +7,19 @@
 	Nblkprocs = 16,
 
 	Blksz = 128*1024,
-	Blkdone = 1,
-	
-	End = 1,
 };
 
 typedef struct {
+	Rendez;
+	QLock;
+	Ref;
+} WaitGroup;
+
+typedef struct {
 	Dir;
+	WaitGroup wg;
 	char *src, *dst;
 	int sfd, dfd;
-	Channel *c;
 } File;
 
 typedef struct {
@@ -33,15 +36,21 @@
 int fileprocs = Nfileprocs;
 int blkprocs = Nblkprocs;
 Dir *skipdir;
+WaitGroup filewg;
 
 Channel *filechan; /* chan(File*) */
 Channel *blkchan; /* chan(Blk*) */
-Channel *endchan; /* chan(ulong) */
 
 void usage(void);
 void *emalloc(ulong);
 char *estrdup(char*);
 
+extern int cas(long *p, long ov, long nv);
+void wginit(WaitGroup*, long);
+void wgadd(WaitGroup*, long);
+void wgdone(WaitGroup*);
+void wgwait(WaitGroup*);
+
 char *filename(char*);
 Dir *mkdir(char*, Dir*, int);
 int same(Dir*, Dir*);
@@ -84,6 +93,45 @@
 	return p;
 }
 
+void
+wginit(WaitGroup *wg, long count)
+{
+	memset(wg, 0, sizeof(*wg));
+	wg->l = &wg->QLock;
+	if(cas(&wg->ref, 0, count) == 0)
+		sysfatal("wginit: cas failed");
+}
+
+void
+wgadd(WaitGroup *wg, long n)
+{
+	long v;
+
+	v = wg->ref;
+	while(cas(&wg->ref, v, v+n) == 0)
+		v = wg->ref;
+}
+
+void
+wgdone(WaitGroup *wg)
+{
+	if(decref(wg) < 0)
+		sysfatal("wgdone: negative WaitGroup count");
+	qlock(wg);
+	rwakeupall(wg);
+	qunlock(wg);
+}
+
+void
+wgwait(WaitGroup *wg)
+{
+	qlock(wg);
+	while(!(wg->ref == 0))
+		rsleep(wg);
+	qunlock(wg);
+}
+
+
 char *
 filename(char *s)
 {
@@ -147,7 +195,6 @@
 	f->dst = estrdup(dst);
 	f->sfd = -1;
 	f->dfd = -1;
-	f->c = nil;
 
 	return f;
 }
@@ -211,6 +258,7 @@
 			dst = smprint("%s/%s", dst, filename(src));
 		f = filenew(src, dst, sd);
 		sendp(filechan, f);
+		wgadd(&filewg, 1);
 		return;
 	}
 
@@ -253,6 +301,7 @@
 		}else{
 			f = filenew(sn, dn, d);
 			sendp(filechan, f);
+			wgadd(&filewg, 1);
 		}
 		free(sn);
 		free(dn);
@@ -287,34 +336,16 @@
 void
 clonefile(File *f)
 {
-	vlong n, done;
+	vlong n;
 	Blk *blks, *b, *be;
 
-	enum {Anext, Adone, Aend};
-	Alt alts[] = {
-	[Anext] {blkchan, &b, CHANSND},
-	[Adone] {f->c, nil, CHANRCV},
-	[Aend] {nil, nil, CHANEND},
-	};
-
 	n = blklist(f, &blks);
 	if(n == 0)
 		return;
-	b = blks;
-	be = blks + n;
-	done = 0;
-	while(done < n){
-		switch(alt(alts)){
-		case Anext:
-			++b;
-			if(b == be)
-				alts[Anext].op = CHANNOP;
-			break;
-		case Adone:
-			++done;
-			break;
-		}
-	}
+	wgadd(&f->wg, n);
+	for(b = blks, be = b + n; b != be; b++)
+		sendp(blkchan, b);
+	wgwait(&f->wg);
 	free(blks);
 }
 
@@ -341,8 +372,7 @@
 		if(n > 0)
 			if(pwrite(dfd, buf, n, off) < n)
 				sysfatal("blkproc: write error: %r");
-
-		sendul(b->f->c, Blkdone);
+		wgdone(&b->f->wg);
 	}
 }
 
@@ -349,18 +379,15 @@
 void
 fileproc(void *)
 {
-	Channel *c;
 	File *f;
-
-	c = chancreate(sizeof(ulong), blkprocs);
+	
 	for(;;){
 		f = recvp(filechan);
-		if(f == nil){
-			sendul(endchan, End);
+		if(f == nil)
 			return;
-		}
 
-		f->c = c;
+		wginit(&f->wg, 0);
+
 		f->sfd = open(f->src, OREAD);
 		if(f->sfd < 0)
 			sysfatal("fileproc: can't open: %r");
@@ -371,6 +398,7 @@
 		clonefile(f);
 		cloneattr(f->dfd, f);
 		filefree(f);
+		wgdone(&filewg);
 	}
 }
 
@@ -407,19 +435,15 @@
 
 	filechan = chancreate(sizeof(File*), fileprocs);
 	blkchan = chancreate(sizeof(Blk*), blkprocs);
-	endchan = chancreate(sizeof(ulong), 0);
 	for(i = 0; i < fileprocs; i++)
 		proccreate(fileproc, nil, mainstacksize);
 	for(i = 0; i < blkprocs; i++)
 		proccreate(blkproc, nil, mainstacksize);
 
+	wginit(&filewg, 0);
 	for(i = 0; i < argc -1; i++)
 		clone(argv[i], dst);
-
-	for(i = 0; i < fileprocs; i++){
-		sendp(filechan, nil);
-		recvul(endchan);
-	}
+	wgwait(&filewg);
 
 	threadexitsall(nil);
 }