ref: d1540c7f666e3c5d636b48c956b444205b50502d
dir: /appl/grid/cpupool.b/
implement CpuPool;
#
# Copyright © 2003 Vita Nuova Holdings Limited. All rights reserved.
#
include "sys.m";
sys : Sys;
include "daytime.m";
daytime: Daytime;
include "styx.m";
styx: Styx;
Rmsg, Tmsg: import styx;
include "styxservers.m";
styxservers: Styxservers;
Fid, Navigator, Navop: import styxservers;
Styxserver: import styxservers;
nametree: Nametree;
Tree: import nametree;
include "draw.m";
include "dial.m";
dial: Dial;
include "sh.m";
include "arg.m";
include "registries.m";
registries: Registries;
Registry, Attributes, Service: import registries;
include "grid/announce.m";
announce: Announce;
include "readdir.m";
readdir: Readdir;
TEST: con 0;
RUN : con "#!/dis/sh\n" +
"load std\n" +
"if {~ $#* 0} {\n" +
" echo usage: run.sh cmd args\n"+
" raise usage\n" +
"}\n"+
"CMD = $*\n" +
"{echo $CMD; dir=`{read -o 0}; cat <[0=3] > $dir/data& catpid=$apid;"+
" cat $dir/data >[1=4]; kill $catpid >[2] /dev/null} <[3=0] >[4=1] <> clone >[1=0]\n";
EMPTYDIR: con "#//dev";
rootpath := "/tmp/cpupool/";
rstyxreg: ref Registry;
registered: ref Registries->Registered;
CpuSession: adt {
proxyid, fid, cpuid, omode, written, finished: int;
stdoutopen, stdinopen: int;
stdinchan, stdoutchan: chan of array of byte;
closestdin,closestdout, readstdout, sync: chan of int;
rcmdfinishedstdin, rcmdfinishedstdout: chan of int;
fio: ref sys->FileIO;
pids: list of int;
};
NILCPUSESSION: con CpuSession (-1, -1,-1, 0, 0, 0, 0, 0, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil);
cpusession: array of CpuSession;
poolchanin : chan of string;
poolchanout : chan of int;
conids : array of int;
CpuPool: module {
init: fn (nil : ref Draw->Context, argv: list of string);
};
init(nil : ref Draw->Context, argv: list of string)
{
sys = load Sys Sys->PATH;
if (sys == nil)
badmod(Sys->PATH);
daytime = load Daytime Daytime->PATH;
if (daytime == nil)
badmod(Daytime->PATH);
dial = load Dial Dial->PATH;
if (dial == nil)
badmod(Dial->PATH);
styx = load Styx Styx->PATH;
if (styx == nil)
badmod(Styx->PATH);
styx->init();
styxservers = load Styxservers Styxservers->PATH;
if (styxservers == nil)
badmod(Styxservers->PATH);
styxservers->init(styx);
nametree = load Nametree Nametree->PATH;
if (nametree == nil)
badmod(Nametree->PATH);
nametree->init();
registries = load Registries Registries->PATH;
if (registries == nil)
badmod(Registries->PATH);
registries->init();
announce = load Announce Announce->PATH;
if (announce == nil)
badmod(Announce->PATH);
announce->init();
readdir = load Readdir Readdir->PATH;
if (readdir == nil)
badmod(Readdir->PATH);
arg := load Arg Arg->PATH;
if (arg == nil)
badmod(Arg->PATH);
sys->pctl(Sys->FORKNS | sys->NEWPGRP, nil);
sys->unmount(nil, "/n/remote");
getuid();
sys->chdir(EMPTYDIR);
cpusession = array[500] of { * => NILCPUSESSION };
attrs := Attributes.new(("proto", "styx") :: ("auth", "none") :: ("resource","Cpu Pool") :: nil);
arg->init(argv);
arg->setusage("cpupool [-a attributes] [rootdir]");
while ((opt := arg->opt()) != 0) {
case opt {
'a' =>
attr := arg->earg();
val := arg->earg();
attrs.set(attr, val);
* =>
arg->usage();
}
}
argv = arg->argv();
arg = nil;
if (argv != nil)
rootpath = hd argv;
if (rootpath[len rootpath - 1] != '/')
rootpath[len rootpath] = '/';
(n, dir) := sys->stat(rootpath);
if (n == -1 || !(dir.mode & sys->DMDIR))
error("Invalid tmp path: "+rootpath);
rstyxreg = Registry.new("/mnt/rstyxreg");
if (rstyxreg == nil)
error("Could not find Rstyx Registry");
reg := Registry.connect(nil, nil, nil);
if (reg == nil)
error("Could not find registry");
(myaddr, c) := announce->announce();
if (myaddr == nil)
error(sys->sprint("cannot announce: %r"));
persist := 0;
err: string;
(registered, err) = reg.register(myaddr, attrs, persist);
if (err != nil)
error("could not register with registry: "+err);
conids = array[200] of { * => -1 };
poolchanin = chan of string;
poolchanout = chan of int;
userchan := chan of int;
spawn listener(c);
spawn cpupoolloop(poolchanin, poolchanout);
}
attrval(s: string): (string, string)
{
for (i := 0; i < len s; i++) {
if (s[i] == '=')
return (s[:i], s[i+1:]);
}
return (nil, s);
}
uid: string;
Qroot : con 0;
Qclone: con 1;
Qdata: con 2;
Qsh: con 3;
Qrun: con 4;
Qcpu: con 5;
Qsessdir: con 6;
Qsessdat: con 7;
getuid()
{
buf := array [100] of byte;
fd := sys->open("/dev/user", Sys->OREAD);
uidlen := sys->read(fd, buf, len buf);
uid = string buf[0: uidlen];
}
dir(name: string, perm: int, length: int, qid: int): Sys->Dir
{
d := sys->zerodir;
d.name = name;
d.uid = uid;
d.gid = uid;
d.qid.path = big qid;
if (perm & Sys->DMDIR)
d.qid.qtype = Sys->QTDIR;
else {
d.qid.qtype = Sys->QTFILE;
d.length = big length;
}
d.mode = perm;
d.atime = d.mtime = daytime->now();
return d;
}
defaultdirs := array[] of {
("dis", 1),
("dev", 1),
("fonts", 1),
("mnt", 0),
("prog", 0),
};
serveloop(fd : ref sys->FD, cmdchan: chan of (int, string, chan of int), exitchan, sync: chan of int, proxyid: int)
{
if (TEST)
sys->fprint(sys->fildes(2), "starting serveloop");
tchan: chan of ref Tmsg;
srv: ref Styxserver;
(tree, treeop) := nametree->start();
tree.create(big Qroot, dir(".",8r555 | sys->DMDIR,0,Qroot));
tree.create(big Qroot, dir("clone",8r666,0,Qclone));
tree.create(big Qroot, dir("run.sh",8r555,0,Qrun));
tree.create(big Qroot, dir("cpu",8r444,0,Qcpu));
tree.create(big Qroot, dir("data",8r777 | sys->DMDIR,0,Qdata));
tree.create(big Qroot, dir("runtime",8r444 | sys->DMDIR,0,Qsh));
for (i := 0; i < len defaultdirs; i++)
tree.create(big Qroot, dir(defaultdirs[i].t0,8r555 | sys->DMDIR ,0,8 + (i<<4)));
(tchan, srv) = Styxserver.new(fd,Navigator.new(treeop), big Qroot);
fd = nil;
datafids : list of Datafid = nil;
sync <-= 1;
gm: ref Tmsg;
loop: for (;;) {
alt {
<-exitchan =>
break loop;
gm = <-tchan =>
if (gm == nil)
break loop;
# sys->fprint(sys->fildes(2), "Got new GM %s tag: %d\n", gm.text(), gm.tag);
pick m := gm {
Readerror =>
sys->fprint(sys->fildes(2), "cpupool: fatal read error: %s\n", m.error);
exit;
Clunk =>
deldf: Datafid;
(datafids, deldf) = delfid(datafids, m.fid);
if (deldf.sessid != -1) {
if (deldf.omode == sys->OREAD || deldf.omode == sys->ORDWR)
cpusession[deldf.sessid].sync <-= STDOUTCLOSE;
else if (deldf.omode == sys->OWRITE || deldf.omode == sys->ORDWR)
cpusession[deldf.sessid].sync <-= STDINCLOSE;
}
else {
sessid := getsession(m.fid);
if (sessid != -1)
cpusession[sessid].sync <-= CLONECLOSE;
}
srv.default(gm);
Open =>
(f, nil, d, err) := srv.canopen(m);
if(f == nil) {
srv.reply(ref Rmsg.Error(m.tag, err));
break;
}
ind := int f.uname;
mode := m.mode & 3;
case int f.path & 15 {
Qclone =>
if (mode == sys->OREAD) {
srv.reply(ref Rmsg.Error(m.tag, "ctl cannot be open as read only"));
break;
}
poolchanin <-= "request";
cpuid := <-poolchanout;
if (cpuid == -1)
srv.reply(ref Rmsg.Error(m.tag, "no free resources"));
else {
sessid := getsession(-1);
cpusession[sessid].fid = m.fid;
cpusession[sessid].cpuid = cpuid;
cpusession[sessid].omode = mode;
cpusession[sessid].sync = chan of int;
cpusession[sessid].proxyid = proxyid;
spawn sessionctl(sessid, tree);
Qdir := Qsessdir | (sessid<<4);
tree.create(big Qroot, dir(string sessid,
8r777 | sys->DMDIR,0, Qdir));
tree.create(big Qdir, dir("data", 8r666,0, Qsessdat | (sessid<<4)));
if (TEST)
sys->fprint(sys->fildes(2), "New Session %d\n\tcpuid: %d\n"
,sessid,cpuid);
srv.default(gm);
}
Qsessdat =>
err = "";
sessid := (int f.path)>>4;
datafids = addfid(datafids, Datafid(sessid, m.fid, mode));
if (cpusession[sessid].finished)
err = "session already finished";
else if (mode == sys->OREAD || mode == sys->ORDWR) {
if (cpusession[sessid].stdoutopen == -1)
err = "pipe closed";
else
cpusession[sessid].sync <-= STDOUTOPEN;
}
else if (mode == sys->OWRITE || mode == sys->ORDWR) {
if (cpusession[sessid].stdinopen == -1)
err = "pipe closed";
else
cpusession[sessid].sync <-= STDINOPEN;
}
# sys->fprint(sys->fildes(2),
# "Open: Data: sessid %d, stdout %d stdin %d: err: '%s'\n",
# sessid,cpusession[sessid].stdoutopen,
# cpusession[sessid].stdinopen, err);
if (err == nil)
srv.default(gm);
else
srv.reply(ref Rmsg.Error(m.tag, err));
* =>
# sys->print("Open: %s tag: %d\n", gm.text(), gm.tag);
srv.default(gm);
}
Write =>
(f,e) := srv.canwrite(m);
if(f == nil) {
# sys->print("breaking! %r\n");
break;
}
case int f.path & 15 {
Qsessdat =>
sessid := (int f.path)>>4;
# sys->fprint(sys->fildes(2), "Write: Data %d len: %d\n",
# sessid,len m.data);
spawn datawrite(sessid,srv,m);
Qclone =>
sessid := getsession(m.fid);
# sys->fprint(sys->fildes(2), "Write: clone %d\n",sessid);
spawn clonewrite(sessid,srv, m, cmdchan);
* =>
srv.default(gm);
}
Read =>
(f,e) := srv.canread(m);
if(f == nil)
break;
case int f.path & 15 {
Qclone =>
sessid := getsession(m.fid);
# sys->fprint(sys->fildes(2), "Read: clone %d\n",sessid);
srv.reply(styxservers->readbytes(m, array of byte (string sessid + "\n")));
Qsessdat =>
sessid := (int f.path)>>4;
# sys->fprint(sys->fildes(2), "Read: data session: %d\n",sessid);
if (cpusession[sessid].finished)
srv.reply(ref Rmsg.Error(m.tag, "session finished"));
else
spawn dataread(sessid, srv, m);
Qrun =>
srv.reply(styxservers->readbytes(m, array of byte RUN));
Qcpu =>
poolchanin <-= "refresh";
s := (string ncpupool) + "\n";
srv.reply(styxservers->readbytes(m, array of byte s));
* =>
srv.default(gm);
}
* =>
srv.default(gm);
}
}
}
if (TEST)
sys->fprint(sys->fildes(2), "leaving serveloop...\n");
tree.quit();
for (i = 0; i < len cpusession; i++) {
if (cpusession[i].proxyid == proxyid) {
#Tear it down!
if (TEST)
sys->fprint(sys->fildes(2), "Killing off session %d\n",i);
poolchanin <-= "free "+string cpusession[i].cpuid;
for (; cpusession[i].pids != nil; cpusession[i].pids = tl cpusession[i].pids)
kill(hd cpusession[i].pids);
cpusession[i] = NILCPUSESSION;
}
}
if (TEST)
sys->fprint(sys->fildes(2), "serveloop exited\n");
}
dataread(sessid: int, srv: ref Styxserver, m: ref Tmsg.Read)
{
cpusession[sessid].readstdout <-= 1;
data := <- cpusession[sessid].stdoutchan;
srv.reply(ref Rmsg.Read(m.tag, data));
}
datawrite(sessid: int, srv: ref Styxserver, m: ref Tmsg.Write)
{
# sys->fprint(sys->fildes(2), "Writing to Stdin %d (%d)\n'%s'\n",
# len m.data, m.tag, string m.data);
cpusession[sessid].stdinchan <-= m.data;
# sys->fprint(sys->fildes(2), "Written to Stdin %d!\n",m.tag);
srv.reply(ref Rmsg.Write(m.tag, len m.data));
}
clonewrite(sessid: int, srv: ref Styxserver, m: ref Tmsg.Write, cmdchan: chan of (int, string, chan of int))
{
if (cpusession[sessid].written) {
srv.reply(ref Rmsg.Error(m.tag, "session already started"));
return;
}
rc := chan of int;
cmdchan <-= (sessid, string m.data, rc);
i := <-rc;
# sys->fprint(sys->fildes(2), "Sending write\n");
srv.reply(ref Rmsg.Write(m.tag, i));
}
badmod(path: string)
{
sys->fprint(sys->fildes(1), "error CpuPool: failed to load: %s\n",path);
exit;
}
listener(c: ref Sys->Connection)
{
for (;;) {
nc := dial->listen(c);
if (nc == nil)
error(sys->sprint("listen failed: %r"));
dfd := dial->accept(nc);
if (dfd != nil) {
sync := chan of int;
sys->print("got new connection!\n");
spawn proxy(sync, dfd);
<-sync;
}
}
}
proxy(sync: chan of int, dfd: ref Sys->FD)
{
proxypid := sys->pctl(0, nil);
sys->pctl(sys->FORKNS, nil);
sys->chdir(EMPTYDIR);
sync <-= 1;
sync = chan of int;
fds := array[2] of ref sys->FD;
sys->pipe(fds);
cmdchan := chan of (int, string, chan of int);
exitchan := chan of int;
killsrvloop := chan of int;
spawn serveloop(fds[0], cmdchan, killsrvloop, sync, proxypid);
<-sync;
if (sys->mount(fds[1], nil, "/n/remote", Sys->MREPL | sys->MCREATE, nil) == -1)
error(sys->sprint("cannot mount mountfd: %r"));
conid := getconid(-1);
conids[conid] = 1;
setupworkspace(conid);
spawn exportns(dfd, conid, exitchan);
for (;;) alt {
(sessid, cmd, reply) := <-cmdchan =>
spawn runit(conid, sessid, cmd, reply);
e := <-exitchan =>
killsrvloop <-= 1;
return;
}
}
getconid(id: int): int
{
for (i := 0; i < len conids; i++)
if (conids[i] == id)
return i;
return -1;
}
exportns(dfd: ref Sys->FD, conid: int, exitchan: chan of int)
{
sys->export(dfd, "/n/remote", sys->EXPWAIT);
if (TEST)
sys->fprint(sys->fildes(2), "Export Finished!\n");
conids[conid] = -1;
exitchan <-= 1;
}
error(e: string)
{
sys->fprint(sys->fildes(2), "CpuPool: %s: %r\n", e);
raise "fail:error";
}
setupworkspace(pathid: int)
{
path := rootpath + string pathid;
sys->create(path, sys->OREAD, 8r777 | sys->DMDIR);
delpath(path, 0);
sys->create(path + "/data", sys->OREAD, 8r777 | sys->DMDIR);
if (sys->bind(path+"/data", "/n/remote/data",
sys->MREPL | sys->MCREATE) == -1)
sys->fprint(sys->fildes(2), "data bind error %r\n");
sys->create(path + "/runtime", sys->OREAD, 8r777 | sys->DMDIR);
if (sys->bind(path+"/runtime", "/n/remote/runtime", sys->MREPL) == -1)
sys->fprint(sys->fildes(2), "runtime bind error %r\n");
for (i := 0; i < len defaultdirs; i++) {
if (defaultdirs[i].t1 == 1) {
sys->create(path+"/"+defaultdirs[i].t0, sys->OREAD, 8r777 | sys->DMDIR);
if (sys->bind("/"+defaultdirs[i].t0,
"/n/remote/"+defaultdirs[i].t0, sys->MREPL) == -1)
sys->fprint(sys->fildes(2), "dir bind error %r\n");
}
}
}
delpath(path: string, incl: int)
{
if (path[len path - 1] != '/')
path[len path] = '/';
(dirs, n) := readdir->init(path, readdir->NONE | readdir->COMPACT);
for (i := 0; i < n; i++) {
if (dirs[i].mode & sys->DMDIR)
delpath(path + dirs[i].name, 1);
else
sys->remove(path + dirs[i].name);
}
if (incl)
sys->remove(path);
}
runit(id, sessid: int, cmd: string, sync: chan of int)
{
# sys->print("got runit!\n");
cpusession[sessid].sync <-= PID;
cpusession[sessid].sync <-= sys->pctl(sys->FORKNS, nil);
if (!TEST && sys->bind("/net.alt", "/net", sys->MREPL) == -1) {
sys->fprint(sys->fildes(2), "cpupool net.alt bind failed: %r\n");
sync <-= -1;
return;
}
path := rootpath + string id;
runfile := "/runtime/start"+string cpusession[sessid].cpuid+".sh";
sh := load Sh Sh->PATH;
if(sh == nil) {
sys->fprint(sys->fildes(2), "Failed to load sh: %r\n");
sync <-= -1;
return;
}
sys->remove(path+runfile);
fd := sys->create(path+runfile, sys->OWRITE, 8r777);
if (fd == nil) {
sync <-= -1;
return;
}
sys->fprint(fd, "#!/dis/sh\n");
sys->fprint(fd, "bind /prog /n/client/prog\n");
sys->fprint(fd, "bind /n/client /\n");
sys->fprint(fd, "cd /\n");
sys->fprint(fd, "%s\n", cmd);
if (sys->bind("#s", "/n/remote/runtime", Sys->MBEFORE|Sys->MCREATE) == -1) {
sys->fprint(sys->fildes(2), "cpupool: %r\n");
return;
}
cpusession[sessid].fio = sys->file2chan("/n/remote/runtime", "mycons");
if (cpusession[sessid].fio == nil) {
sys->fprint(sys->fildes(2), "cpupool: file2chan failed: %r\n");
return;
}
if (sys->bind("/n/remote/runtime/mycons", "/n/remote/dev/cons", sys->MREPL) == -1)
sys->fprint(sys->fildes(2), "cons bind error %r\n");
cpusession[sessid].written = 1;
cpusession[sessid].stdinchan = chan of array of byte;
cpusession[sessid].closestdin = chan of int;
cpusession[sessid].rcmdfinishedstdin = chan of int;
spawn devconsread(sessid);
cpusession[sessid].stdoutchan = chan of array of byte;
cpusession[sessid].closestdout = chan of int;
cpusession[sessid].readstdout = chan of int;
cpusession[sessid].rcmdfinishedstdout = chan of int;
spawn devconswrite(sessid);
# Let it know that session channels have been created & can be listened on...
sync <-= len cmd;
# would prefer that it were authenticated
if (TEST)
sys->print("ABOUT TO RCMD\n");
sh->run(nil, "rcmd" :: "-A" :: "-e" :: "/n/remote" ::
cpupool[cpusession[sessid].cpuid].srvc.addr ::
"sh" :: "-c" :: "/n/client"+runfile :: nil);
if (TEST)
sys->print("DONE RCMD\n");
sys->remove(path+runfile);
sys->unmount(nil, "/n/remote/dev/cons");
cpusession[sessid].rcmdfinishedstdin <-= 1;
cpusession[sessid].rcmdfinishedstdout <-= 1;
cpusession[sessid].sync <-= FINISHED;
}
CLONECLOSE: con 0;
FINISHED: con 1;
STDINOPEN: con 2;
STDINCLOSE: con 3;
STDOUTOPEN: con 4;
STDOUTCLOSE: con 5;
PID: con -2;
sessionctl(sessid: int, tree: ref Nametree->Tree)
{
cpusession[sessid].pids = sys->pctl(0, nil) :: nil;
clone := 1;
closed := 0;
main: for (;;) {
i := <-cpusession[sessid].sync;
case i {
PID =>
pid := <-cpusession[sessid].sync;
if (TEST)
sys->fprint(sys->fildes(2), "adding PID: %d\n", pid);
cpusession[sessid].pids = pid :: cpusession[sessid].pids;
STDINOPEN =>
cpusession[sessid].stdinopen++;
if (TEST)
sys->fprint(sys->fildes(2), "%d: Open stdin: => %d\n",
sessid, cpusession[sessid].stdinopen);
STDOUTOPEN =>
cpusession[sessid].stdoutopen++;
if (TEST)
sys->fprint(sys->fildes(2), "%d: Open stdout: => %d\n",
sessid, cpusession[sessid].stdoutopen);
STDINCLOSE =>
cpusession[sessid].stdinopen--;
if (TEST)
sys->fprint(sys->fildes(2), "%d: Close stdin: => %d\n",
sessid, cpusession[sessid].stdinopen);
if (cpusession[sessid].stdinopen == 0) {
cpusession[sessid].stdinopen = -1;
cpusession[sessid].closestdin <-= 1;
}
# sys->fprint(sys->fildes(2), "Clunk: stdin (in %d: out %d\n",
# cpusession[sessid].stdinopen, cpusession[sessid].stdoutopen);
STDOUTCLOSE =>
cpusession[sessid].stdoutopen--;
if (TEST)
sys->fprint(sys->fildes(2), "%d: Close stdout: => %d\n",
sessid, cpusession[sessid].stdoutopen);
if (cpusession[sessid].stdoutopen == 0) {
cpusession[sessid].stdoutopen = -1;
cpusession[sessid].closestdout <-= 1;
}
#sys->fprint(sys->fildes(2), "Clunk: stdout (in %d: out %d\n",
# cpusession[sessid].stdinopen, cpusession[sessid].stdoutopen);
CLONECLOSE =>
if (TEST)
sys->fprint(sys->fildes(2), "%d: Close clone\n", sessid);
clone = 0;
#sys->fprint(sys->fildes(2), "Clunk: clone (in %d: out %d\n",
# cpusession[sessid].stdinopen, cpusession[sessid].stdoutopen);
FINISHED =>
if (TEST)
sys->fprint(sys->fildes(2), "%d: Rcmd finished", sessid);
cpusession[sessid].finished = 1;
poolchanin <-= "free "+string cpusession[sessid].cpuid;
if (closed)
break main;
}
if (cpusession[sessid].stdinopen <= 0 &&
cpusession[sessid].stdoutopen <= 0 &&
clone == 0) {
closed = 1;
tree.remove(big (Qsessdir | (sessid<<4)));
tree.remove(big (Qsessdat | (sessid<<4)));
if (cpusession[sessid].finished || !cpusession[sessid].written)
break main;
}
}
if (!cpusession[sessid].finished) # ie never executed anything
poolchanin <-= "free "+string cpusession[sessid].cpuid;
cpusession[sessid] = NILCPUSESSION;
if (TEST)
sys->fprint(sys->fildes(2), "closing session %d\n",sessid);
}
devconswrite(sessid: int)
{
cpusession[sessid].sync <-= PID;
cpusession[sessid].sync <-= sys->pctl(0, nil);
stdouteof := 0;
file2chaneof := 0;
rcmddone := 0;
main: for (;;) alt {
<-cpusession[sessid].rcmdfinishedstdout =>
rcmddone = 1;
if (file2chaneof)
break main;
<-cpusession[sessid].closestdout =>
stdouteof = 1;
(offset, d, fid, wc) := <-cpusession[sessid].fio.write =>
if (wc != nil) {
# sys->fprint(sys->fildes(2), "stdout: '%s'\n", string d);
if (stdouteof) {
# sys->fprint(sys->fildes(2), "stdout: sending EOF\n");
wc <-= (0, nil);
continue;
}
alt {
<-cpusession[sessid].closestdout =>
# sys->print("got closestdout\n");
wc <-= (0, nil);
stdouteof = 1;
<-cpusession[sessid].readstdout =>
cpusession[sessid].stdoutchan <-= d;
wc <-= (len d, nil);
}
}
else {
# sys->fprint(sys->fildes(2), "got nil wc\n");
file2chaneof = 1;
if (rcmddone)
break main;
}
}
# No more input at this point as rcmd has finished;
if (stdouteof || cpusession[sessid].stdoutopen == 0) {
# sys->print("leaving devconswrite\n");
return;
}
for (;;) alt {
<-cpusession[sessid].closestdout =>
# sys->print("got closestdout\n");
# sys->print("leaving devconswrite\n");
return;
<- cpusession[sessid].readstdout =>
cpusession[sessid].stdoutchan <-= nil;
}
}
devconsread(sessid: int)
{
cpusession[sessid].sync <-= PID;
cpusession[sessid].sync <-= sys->pctl(0, nil);
stdineof := 0;
file2chaneof := 0;
rcmddone := 0;
main: for (;;) alt {
<-cpusession[sessid].rcmdfinishedstdin =>
rcmddone = 1;
if (file2chaneof)
break main;
<-cpusession[sessid].closestdin =>
# sys->print("got stdin close\n");
stdineof = 1;
(offset, count, fid, rc) := <-cpusession[sessid].fio.read =>
if (rc != nil) {
# sys->fprint(sys->fildes(2), "devconsread: '%d %d'\n", count, offset);
if (stdineof) {
rc <-= (nil, nil);
continue;
}
alt {
data := <-cpusession[sessid].stdinchan =>
# sys->print("got data len %d\n", len data);
rc <-= (data, nil);
<-cpusession[sessid].closestdin =>
# sys->print("got stdin close\n");
stdineof = 1;
rc <-= (nil, nil);
}
}
else {
# sys->print("got nil rc\n");
file2chaneof = 1;
if (rcmddone)
break main;
}
}
if (!stdineof && cpusession[sessid].stdinopen != 0)
<-cpusession[sessid].closestdin;
# sys->fprint(sys->fildes(2), "Leaving devconsread\n");
}
Srvcpool: adt {
srvc: ref Service;
inuse: int;
};
cpupool: array of Srvcpool;
ncpupool := 0;
cpupoolloop(chanin: chan of string, chanout: chan of int)
{
cpupool = array[200] of Srvcpool;
for (i := 0; i < len cpupool; i++)
cpupool[i] = Srvcpool (nil, 0);
wait := 0;
for (;;) {
inp := <-chanin;
# sys->print("poolloop: '%s'\n",inp);
(nil, lst) := sys->tokenize(inp, " \t\n");
case hd lst {
"refresh" =>
if (daytime->now() - wait >= 60) {
refreshcpupool();
wait = daytime->now();
}
"request" =>
if (daytime->now() - wait >= 60) {
refreshcpupool();
wait = daytime->now();
}
found := -1;
# sys->print("found %d services...\n", ncpupool);
for (i = 0; i < ncpupool; i++) {
if (!cpupool[i].inuse) {
found = i;
cpupool[i].inuse = 1;
break;
}
}
# sys->print("found service %d\n", found);
chanout <-= found;
"free" =>
if (TEST)
sys->print("freed service %d\n", int hd tl lst);
cpupool[int hd tl lst].inuse = 0;
}
}
}
refreshcpupool()
{
(lsrv, err) := rstyxreg.find(("resource", "Rstyx resource") :: nil);
# sys->print("found %d resources\n",len lsrv);
if (err != nil)
return;
tmp := array[len cpupool] of Srvcpool;
ntmp := len lsrv;
i := 0;
for (;lsrv != nil; lsrv = tl lsrv)
tmp[i++] = Srvcpool(hd lsrv, 0);
min := 0;
for (i = 0; i < ntmp; i++) {
for (j := min; j < ncpupool; j++) {
if (tmp[i].srvc.addr == cpupool[j].srvc.addr) {
if (j == min)
min++;
tmp[i].inuse = cpupool[j].inuse;
}
}
}
ncpupool = ntmp;
for (i = 0; i < ntmp; i++)
cpupool[i] = tmp[i];
# sys->print("ncpupool: %d\n",ncpupool);
}
getsession(fid: int): int
{
for (i := 0; i < len cpusession; i++)
if (cpusession[i].fid == fid)
return i;
return -1;
}
kill(pid: int)
{
if ((fd := sys->open("/prog/" + string pid + "/ctl", Sys->OWRITE)) != nil)
sys->fprint(fd, "kill");
}
killg(pid: int)
{
if ((fd := sys->open("/prog/" + string pid + "/ctl", Sys->OWRITE)) != nil)
sys->fprint(fd, "killgrp");
}
delfid(datafids: list of Datafid, fid: int): (list of Datafid, Datafid)
{
rdf := Datafid (-1, -1, -1);
tmp : list of Datafid = nil;
for (; datafids != nil; datafids = tl datafids) {
testdf := hd datafids;
if (testdf.fid == fid)
rdf = testdf;
else
tmp = testdf :: tmp;
}
return (tmp, rdf);
}
addfid(datafids: list of Datafid, df: Datafid): list of Datafid
{
(datafids, nil) = delfid(datafids, df.fid);
return df :: datafids;
}
Datafid: adt {
sessid, fid, omode: int;
};