shithub: threadpool

Download patch

ref: bc2a7c9a7410261c1679d088e01bbf19bf4dd62e
parent: 45a4d2223e1cf7fe6a393d49bb2d6b5b55425580
author: rodri <rgl@antares-labs.eu>
date: Sun Sep 15 14:04:18 EDT 2024

new experiment: main8.

--- a/main7.c
+++ b/main7.c
@@ -32,6 +32,7 @@
 	pool = arg;
 
 	while((task = recvp(pool->subq)) != nil){
+//		prof(task->fn, task->arg, 256, Proftime);
 		task->fn(task->arg);
 		incref(&pool->complete);
 		nbsend(pool->done, nil);
--- /dev/null
+++ b/main8.c
@@ -1,0 +1,162 @@
+#include <u.h>
+#include <libc.h>
+#include <thread.h>
+#include <draw.h>
+#include <memdraw.h>
+
+typedef struct Ttask Ttask;
+typedef struct Tpool Tpool;
+
+struct Ttask
+{
+	void (*fn)(void*);
+	void *arg;
+};
+
+struct Tpool
+{
+	ulong nprocs;
+	Channel *subq;	/* task submission queue */
+};
+
+void
+threadloop(void *arg)
+{
+	Tpool *pool;
+	Ttask *task;
+
+	pool = arg;
+
+	while((task = recvp(pool->subq)) != nil)
+		task->fn(task->arg);
+}
+
+Tpool *
+mkthreadpool(ulong nprocs)
+{
+	Tpool *tp;
+
+	tp = malloc(sizeof *tp);
+	memset(tp, 0, sizeof *tp);
+	tp->nprocs = nprocs;
+	tp->subq = chancreate(sizeof(void*), nprocs);
+	while(nprocs--)
+		proccreate(threadloop, tp, mainstacksize);
+	return tp;
+}
+
+void
+threadpoolexec(Tpool *tp, void (*fn)(void*), void *arg)
+{
+	Ttask *t;
+
+	t = malloc(sizeof *t);
+	t->fn = fn;
+	t->arg = arg;
+
+	sendp(tp->subq, t);
+}
+
+typedef struct Targs Targs;
+struct Targs
+{
+	Memimage *i;
+	ulong off;
+	ulong len;
+	Channel *done;	/* task completion signal */
+};
+void
+fillpix(void *arg)
+{
+	Targs *imgop;
+	Point p;
+	ulong *fb, *fbb, *fbe, pix;
+	double α;
+
+	imgop = arg;
+	fb  = (ulong*)byteaddr(imgop->i, ZP);
+	fbb = fb + imgop->off;
+	fbe = fbb + imgop->len;
+
+	while(fbb < fbe){
+		p.x = (fbb-fb)%Dx(imgop->i->r);
+		p.y = (fbb-fb)/Dx(imgop->i->r);
+		α = atan2(p.y, p.x);
+		pix = α*25523UL*25523UL/* + truerand()*/;
+		*fbb++ = pix|0xFF<<24;
+	}
+	nbsend(imgop->done, nil);
+}
+
+void
+usage(void)
+{
+	fprint(2, "usage: %s [-t] [-n nprocs] [-c count]\n", argv0);
+	exits(nil);
+}
+
+void
+threadmain(int argc, char *argv[])
+{
+	static int W = 1000, H = 1000;
+	Tpool *pool;
+	Targs *t;
+	Memimage *img;
+	int i, stride;
+	int threaded;
+	int nprocs;
+	int cnt;
+
+	threaded = 0;
+	nprocs = 8;
+	cnt = 100;
+	ARGBEGIN{
+	case 't': threaded++; break;
+	case 'n': nprocs = strtoul(EARGF(usage()), nil, 0); break;
+	case 'c': cnt = strtoul(EARGF(usage()), nil, 0); break;
+	default: usage();
+	}ARGEND;
+	if(argc != 0)
+		usage();
+
+	if(memimageinit() != 0)
+		sysfatal("memimageinit: %r");
+
+	img = allocmemimage(Rect(0,0,W,H), XRGB32);
+	t = malloc(nprocs*sizeof(*t));
+	stride = W*H/nprocs;
+	if(threaded){
+		pool = mkthreadpool(nprocs);
+
+		while(cnt--)
+		for(i = 0; i < nprocs; i++){
+			t[i] = (Targs){
+				img,
+				i*stride,
+				i == nprocs-1? W*H-i*stride: stride,
+				chancreate(sizeof(void*), 1)
+			};
+			threadpoolexec(pool, fillpix, &t[i]);
+		}
+
+		for(i = 0; i < nprocs; i++)
+			recvp(t[i].done);
+
+		writememimage(1, img);
+
+		threadexitsall(nil);
+	}
+
+	while(cnt--)
+	for(i = 0; i < nprocs; i++){
+		t[i] = (Targs){
+			img,
+			i*stride,
+			i == nprocs-1? W*H-i*stride: stride,
+			chancreate(sizeof(void*), 1)
+		};
+		fillpix(&t[i]);
+	}
+	writememimage(1, img);
+	exits(nil);
+}
--- a/mkfile
+++ b/mkfile
@@ -8,5 +8,6 @@
 	main5\
 	main6\
 	main7\
+	main8\
 
 </sys/src/cmd/mkmany
--- a/readme
+++ b/readme
@@ -9,3 +9,4 @@
 	- main5: channel-based memimage block raster task
 	- main6: main4 with counted task execution
 	- main7: main5 with counted task execution
+	- main8: main7 with user-defined task completion mgmt