ref: 122eaf84ccbf1c6694943bfcce99b47c8c19440b
parent: aee91c4ed0277e343a62cd124ee7175f54ffefb3
author: k <k@santiago>
date: Sun Jul 30 07:19:02 EDT 2023
added a basic node registry
--- a/appl/cmd/config.b
+++ b/appl/cmd/config.b
@@ -7,6 +7,7 @@
DCFGPATH: con "/lib/ndb/dddbcfg";
DADDR: con "tcp!*!dddbctl";
+DPSIZE: con 3;
DFSWRKS: con 10;
# Attrdb constants
@@ -13,7 +14,9 @@
KNAME: con "nodename";
KSYSNAME: con "nodesysn";
KADDR: con "addr";
+KKEYFILE: con "keyfile";
KSTORAGE: con "storage";
+KPSIZE: con "psize";
KFSWRKS: con "readworkers";
Config.open(nodename: string, path: string): Config
@@ -95,8 +98,11 @@
sysname := entry.findfirst(KSYSNAME);
addr := entry.findfirst(KADDR);
storage := entry.findfirst(KSTORAGE);
+ keyfile := entry.findfirst(KKEYFILE);
+ psize_s := entry.findfirst(KPSIZE);
fswrks_s := entry.findfirst(KFSWRKS);
+ psize := DPSIZE;
fswrks := DFSWRKS;
if(len name == 0)
@@ -111,8 +117,14 @@
error(sys->sprint("malformed fs workers count: %s", fswrks_s));
fswrks = fswrks_i;
}
+ if(len psize_s != 0) {
+ (psize_i, rm) := strm->toint(psize_s, 10);
+ if(rm != "")
+ error(sys->sprint("malformed pool size: %s", psize_s));
+ psize = psize_i;
+ }
return NodeConfig(
- name, sysname, addr, storage, # basic information
- fswrks); # tunable options
+ name, sysname, addr, keyfile, storage, # basic information
+ psize, fswrks); # tunable options
}
--- a/appl/cmd/ctlfs.b
+++ b/appl/cmd/ctlfs.b
@@ -4,6 +4,9 @@
include "security.m";
auth: Auth;
+include "keyring.m";
+ keyring: Keyring;
+
include "styx.m";
styx: Styx;
Tmsg, Rmsg: import Styx;
@@ -10,100 +13,115 @@
include "styxservers.m";
styxservers: Styxservers;
- Styxserver, Fid, Navigator,
- Navop, Enotfound, Enotdir: import styxservers;
+ nametree: Nametree;
+ Tree: import nametree;
+ Styxserver, Fid, Navigator, Navop,
+ Eperm, Ecount, Eoffset: import styxservers;
-# FS file index
-Qroot, Qctl, Qstats, Qmax: con iota;
-tab := array[] of {
- (Qroot, ".", Sys->DMDIR|8r555),
- (Qctl, "ctl", 8r222),
- (Qstats, "stats", 8r111),
-};
+# Database features
+dbfeatures: list of string;
+# Initial fs files
+Qroot, Qctl, Qname, Qstatus: con big iota;
+
# create ctlfs and the appropriate listeners
-init_ctlfs(cfg: Config, keyfile: string, algs: list of string)
+run_ctlfs(cfg: Config, dbreg: ref DbRegistry, keyfile: string, algs: list of string)
{
- dial = load Dial Dial->PATH;
- auth = load Auth Auth->PATH;
+ sys->fprint(stderr, "setting up ctlfs\n");
+
+ dbfeatures = DBVER :: dbfeatures;
+
styx = load Styx Styx->PATH;
+ styxservers = load Styxservers Styxservers->PATH;
+ nametree = load Nametree Nametree->PATH;
- if(dial == nil)
- error("ctlfs: dial module not found");
- if(auth == nil)
- error("ctlfs: auth module not found");
+ if(debug)
+ sys->fprint(stderr, "ctlfs: checking if modules are loaded\n");
+
if(styx == nil)
error("ctlfs: styx module not found");
+ if(styxservers == nil)
+ error("ctlfs: styxservers module not found");
+ if(nametree == nil)
+ error("ctlfs: nametree module not found");
+ if(debug)
+ sys->fprint(stderr, "ctlfs: initializing modules\n");
+
auth->init();
styx->init();
styxservers->init(styx);
- styxservers->traceset(chatty);
+ nametree->init();
+
# authinfo init
- if(debug)
- sys->fprint(stderr, "ctlfs: reading authinfo");
+
authinfo: ref Keyring->Authinfo;
- if (doauth) {
- if (keyfile == nil)
- keyfile = "/usr/" + user() + "/keyring/default";
- authinfo = keyring->readauthinfo(keyfile);
- if (authinfo == nil)
- error(sys->sprint("ctlfs: cannot read %s: %r", keyfile));
- }
+ if (keyfile == nil)
+ keyfile = "/usr/" + user() + "/keyring/default";
+ if(debug)
+ sys->fprint(stderr, "ctlfs: reading authinfo %s\n", keyfile);
+ authinfo = keyring->readauthinfo(keyfile);
+ if (authinfo == nil)
+ error(sys->sprint("ctlfs: cannot read %s: %r", keyfile));
# announcing
if(debug)
- sys->fprint(stderr, "ctlfs: announcing dddbctl");
- addr := dial->netmkaddr(cfg.addr, "tcp", "dddbctl");
- c := dial->announce(addr);
+ sys->fprint(stderr, "ctlfs: announcing dddbctl\n");
+ # addr := dial->netmkaddr(cfg.addr, "tcp", "dddbctl");
+ c := dial->announce(cfg.addr);
if(c == nil)
- error(sys->sprint("ctlfs: cannot listen on %s\n", addr));
+ error(sys->sprint("ctlfs: cannot listen on %s\n", cfg.addr));
# bootstrapping
if(debug)
- sys->fprint
+ sys->fprint(stderr, "ctlfs: bootstrapping\n");
sys->unmount(nil, "/mnt/keys");
+ sys->unmount(nil, "/mnt");
- navch := chan of ref Navop;
- spawn ctlfs_navigator(navch);
+ # nametree; this is shared across all attachees
+ (tree, treeop) := nametree->start();
+ tree.create(Qroot, dir(".", 8r555|Sys->DMDIR, Qroot));
+ tree.create(Qroot, dir("ctl", 8r666, Qctl));
+ tree.create(Qroot, dir("status", 8r444, Qstatus));
- nav := Navigator.new(navch);
- (tc, srv) := Styxserver.new(fildes(0), nav, big Qroot);
+ sys->fprint(stderr, "ctlfs: finished setting up; starting\n");
# listener entrypoint
- listener(c, authinfo, algs);
+ ctlfs_listener(cfg, dbreg, c, treeop, authinfo, algs);
+ tree.quit();
}
# dddbctl listener loop
-ctlfs_listener(c: ref Dial->Connection, authinfo: ref Keyring->Authinfo, algs: list of string)
+ctlfs_listener(cfg: Config, dbreg: ref DbRegistry, c: ref Dial->Connection, treeop: chan of ref Styxservers->Navop, authinfo: ref Keyring->Authinfo, algs: list of string)
{
- for (;;) {
+ loop: for (;;) {
nc := dial->listen(c);
if (nc == nil)
error(sys->sprint("listen failed: %r"));
if (debug)
- sys->fprint(stderr, "ctlfs: got connection from %s\n",
+ sys->fprint(stderr, "ctlfs: got connection from %s",
readfile(nc.dir + "/remote"));
dfd := dial->accept(nc);
- if (dfd != nil) {
- if(nc.cfd != nil)
- sys->fprint(nc.cfd, "keepalive");
- hostname: string;
- if(passhostnames){
- hostname = readfile(nc.dir + "/remote");
- if(hostname != nil)
- hostname = hostname[0:len hostname - 1];
- }
+ if (dfd == nil)
+ continue loop;
- spawn ctlfs_authenticator(dfd, authinfo, algs, hostname);
- }
+ if(nc.cfd != nil)
+ sys->fprint(nc.cfd, "keepalive");
+
+ hostname: string;
+ hostname = readfile(nc.dir + "/remote");
+ if(hostname != nil)
+ hostname = hostname[0:len hostname - 1];
+
+ regchan := dbreg.changen();
+ spawn ctlfs_authenticator(cfg, regchan, dfd, treeop, authinfo, algs, hostname);
}
}
# authenticate a connection and set the user id.
-ctlfs_authenticator(dfd: ref Sys->FD, authinfo: ref Keyring->Authinfo,
+ctlfs_authenticator(cfg: Config, regchan: (chan of ref RegTMsg, chan of ref RegRMsg), dfd: ref Sys->FD, treeop: chan of ref Styxservers->Navop, authinfo: ref Keyring->Authinfo,
algs: list of string, hostname: string)
{
# authenticate and change user id appropriately
@@ -110,113 +128,160 @@
(fd, err) := auth->server(algs, authinfo, dfd, 1);
if (fd == nil) {
if (debug)
- sys->fprint(stderr(), "ctlfs: authentication failed: %s\n", err);
+ sys->fprint(stderr, "ctlfs: authentication failed: %s\n", err);
return;
}
if (debug)
- sys->fprint(stderr(), "ctlfs: client authenticated as %s\n", err);
+ sys->fprint(stderr, "ctlfs: client authenticated as %s\n", err);
- spawn exportproc(sync, mfd, err, hostname, fd);
+ spawn ctlfs_loop(cfg, regchan, fd, treeop, hostname);
}
-ctlfs_loop()
+# filesystem loop; nb: hostname will be later used for stats
+ctlfs_loop(cfg: Config, regchan: (chan of ref RegTMsg, chan of ref RegRMsg), fd: ref Sys->FD, treeop: chan of ref Styxservers->Navop, nil: string)
{
+ (tc, srv) := Styxserver.new(fd, Navigator.new(treeop), big Qroot);
+
+ # registry rx/tx
+ (tx, rx) := regchan;
+
# Primary server loop
loop:
- while((tmsg := <-tc) != nil) {
+ while((tm := <-tc) != nil) {
# Switch on operations being performed on a given Fid
- pick msg := tmsg {
+ pick t := tm {
+ # Open operation
Open =>
- srv.default(msg);
- Read =>
- fid := srv.getfid(msg.fid);
-
- if(fid.qtype & Sys->QTDIR) {
- # This is a directory read
- srv.default(msg);
+ (f, mode, d, err) := srv.canopen(t);
+ if(f == nil){
+ srv.reply(ref Rmsg.Error(t.tag, err));
continue loop;
}
+ f.open(mode, d.qid);
- case int fid.path {
- Qlog =>
- # A read on our log file, tell them what they've already said ?
- s := "";
+ case f.path {
- for(l := log; l != nil; l = tl l)
- s = hd l + s;
+ # Qroot
+ Qroot =>
+ if(t.mode != Sys->OREAD) {
+ srv.reply(ref Rmsg.Error(t.tag, Eperm));
+ continue loop;
+ }
+ srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit()));
- srv.reply(styxservers->readstr(msg, s));
+ # Qctl
+ Qctl =>
+ if((t.mode & (Sys->OTRUNC | Sys->ORCLOSE | Sys->OEXCL)) != 0) {
+ srv.reply(ref Rmsg.Error(t.tag, Eperm));
+ continue loop;
+ }
+ srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit()));
+ # Qname
+ Qname =>
+ if((t.mode & (Sys->OTRUNC | Sys->ORCLOSE | Sys->OEXCL)) != 0) {
+ srv.reply(ref Rmsg.Error(t.tag, Eperm));
+ continue loop;
+ }
+ srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit()));
- * =>
- srv.default(msg);
+ # Qstatus
+ Qstatus =>
+ if(t.mode != Sys->OREAD) {
+ srv.reply(ref Rmsg.Error(t.tag, Eperm));
+ continue loop;
+ }
+ srv.reply(ref Rmsg.Open(t.tag, d.qid, srv.iounit()));
+
+ # Default reply
+ * => srv.default(t);
}
- Write =>
- fid := srv.getfid(msg.fid);
+ # Read operation
+ Read =>
+ (f, err) := srv.canread(t);
+ if(f == nil) {
+ srv.reply(ref Rmsg.Error(t.tag, err));
+ break;
+ }
+ if(f.qtype & Sys->QTDIR){
+ srv.read(t);
+ continue loop;
+ }
- case int fid.path {
+ case f.path {
+
+ # Qctl
Qctl =>
- # Don't care about offset
- cmd := string msg.data;
+ ctlmsg := joinstr(dbfeatures, "\n") + "\n";
+ ctlmsgbuf := array of byte ctlmsg;
+ rend := int t.offset + t.count;
+ if(rend > len ctlmsg)
+ rend = len ctlmsg;
+ srv.reply(ref Rmsg.Read(t.tag, ctlmsgbuf[(int t.offset):rend]));
- reply: ref Rmsg = ref Rmsg.Write(msg.tag, len msg.data);
+ # Qname
+ Qname =>
+ namemsg := cfg.name + "\n";
+ namemsgbuf := array of byte namemsg;
+ rend := int t.offset + t.count;
+ if(rend > len namemsg)
+ rend = len namemsg;
+ srv.reply(ref Rmsg.Read(t.tag, namemsgbuf[(int t.offset):rend]));
+ # Qstatus
+ Qstatus =>
+ info: list of string;
+ info = "name " + cfg.name :: info;
+ info = "sysname " + cfg.sysn :: info;
+ info = "addr " + cfg.addr :: info;
+ info = "storage " + cfg.storage :: info;
+ info = "fsworkers " + sys->sprint("%d", cfg.fswrks) :: info;
+ info = "" :: info;
+ info = "nodes" :: info;
- case cmd {
- * =>
- # Ignore empty writes
- if(cmd != nil)
- log = cmd :: log;
- else
- reply = ref Rmsg.Error(msg.tag, "empty write!");
- }
- srv.reply(reply);
-
- * =>
- srv.default(msg);
- }
+ tx <-= ref RegTMsg.GetNodes();
+ reply := <- rx;
- * =>
- srv.default(msg);
- }
- }
+ pick r := reply {
+ Error => srv.reply(ref Rmsg.Error(t.tag, r.err));
+ NodeList =>
+ names := lists->reverse(r.names);
+ while(len names != 0) {
+ node := hd names;
+ sline := "";
- exit;
-}
+ tx <-= ref RegTMsg.Check(node);
+ crep := <- rx;
+ pick cr := crep {
+ Error => sline = cr.err;
+ Status =>
+ up := cr.count;
+ ps := cr.poolsize;
+ sline = sys->sprint("%d %d", up, ps);
+ * => sline = "unsupported message";
+ }
-# Navigator function for moving around under /
-ctlfs_navigator(c: chan of ref Navop) {
- loop:
- for(;;) {
- navop := <-c;
- pick op := navop {
- Stat =>
- op.reply <-= (dir(int op.path), nil);
-
- Walk =>
- if(op.name == "..") {
- op.reply <-= (dir(Qroot), nil);
- continue loop;
- }
+ info = node + " " + sline :: info;
+ names = tl names;
+ }
+ * => srv.reply(ref Rmsg.Error(t.tag, "unsupported version"));
+ }
- case int op.path&16rff {
+ statusmsg := joinstr(lists->reverse(info), "\n") + "\n";
+ statusmsgbuf := array of byte statusmsg;
+ rend := int t.offset + t.count;
+ if(rend > len statusmsg)
+ rend = len statusmsg;
+ srv.reply(ref Rmsg.Read(t.tag, statusmsgbuf[(int t.offset):rend]));
- Qroot =>
- for(i := 1; i < Qmax; i++)
- if(tab[i].t1 == op.name) {
- op.reply <-= (dir(i), nil);
- continue loop;
- }
-
- op.reply <-= (nil, Enotfound);
- * =>
- op.reply <-= (nil, Enotdir);
+ # Default reply
+ * => srv.default(t);
}
-
- Readdir =>
- for(i := 0; i < op.count && i + op.offset < (len tab) - 1; i++)
- op.reply <-= (dir(Qroot+1+i+op.offset), nil);
+ # Write operation
+ Write =>
+ srv.default(t);
- op.reply <-= (nil, nil);
+ # Default action
+ * => srv.default(t);
}
}
}
--- a/appl/cmd/dddb.b
+++ b/appl/cmd/dddb.b
@@ -6,13 +6,18 @@
include "draw.m";
include "string.m";
strm: String;
+include "lists.m";
+ lists: Lists;
include "config.b";
include "ctlfs.b";
+include "nodereg.b";
stderr: ref Sys->FD;
debug: int;
+DBVER: con "v0.1.0";
+
error(s: string)
{
sys->fprint(stderr, "dddb: %s\n", s);
@@ -19,10 +24,13 @@
raise "dddb:error";
}
+Epoolnotfound : con "pool not found";
+Epoolavail : con "pool not available";
+
Dddb: module {
init: fn(nil: ref Draw->Context, args: list of string);
- run_fs: fn(cfg: Config);
+ # Configuration section
Config: adt {
name: string;
sysn: string;
@@ -31,7 +39,7 @@
fswrks: int;
nodes: list of NodeConfig;
- open: fn(nodename: string, path: string): Config;
+ open: fn(nodename: string, mtpt: string): Config;
};
NodeConfig: adt {
@@ -38,12 +46,58 @@
name: string;
sysn: string;
addr: string;
+ keyfile: string;
storage: string;
+ psize: int;
fswrks: int;
new: fn(entry: ref Dbentry): NodeConfig;
};
+ # Registry section
+ RegTMsg: adt {
+ pick {
+ GetNodes =>
+ ChanClose =>
+ Check or Refresh or Close =>
+ nodename: string;
+ }
+ };
+
+ RegRMsg: adt {
+ pick {
+ Error =>
+ err: string;
+ Status =>
+ count: int;
+ poolsize: int;
+ NodeList =>
+ names: list of string;
+ }
+ };
+
+ NodePool: adt {
+ cfg: NodeConfig;
+ instances: list of string;
+
+ init: fn(r: self ref NodePool): int;
+ check: fn(r: self ref NodePool): int;
+ refresh: fn(r: self ref NodePool): int;
+ newinst: fn(r: self ref NodePool, mtpt: string): string;
+ close: fn(r: self ref NodePool);
+ };
+
+ DbRegistry: adt {
+ nodepools: list of ref NodePool;
+ # rchans: list of chan of ref RegRMsg;
+ # tchans: list of chan of ref RegTMsg;
+
+ new: fn(cfgs: list of NodeConfig): ref DbRegistry;
+ init: fn(r: self ref DbRegistry);
+ # run: fn(r: self ref DbRegistry);
+ changen: fn(r: self ref DbRegistry): (chan of ref RegTMsg, chan of ref RegRMsg);
+ close: fn(r: self ref DbRegistry);
+ };
};
init(nil: ref Draw->Context, args: list of string)
@@ -51,19 +105,43 @@
sys = load Sys Sys->PATH;
arg := load Arg Arg->PATH;
strm = load String String->PATH;
+ lists = load Lists Lists->PATH;
+ dial = load Dial Dial->PATH;
+ auth = load Auth Auth->PATH;
+ keyring = load Keyring Keyring->PATH;
+ if(sys == nil)
+ error("dddb: sys module not found");
+ if(arg == nil)
+ error("dddb: arg module not found");
+ if(strm == nil)
+ error("dddb: strm module not found");
+ if(lists == nil)
+ error("dddb: lists module not found");
+ if(dial == nil)
+ error("dddb: dial module not found");
+ if(auth == nil)
+ error("dddb: auth module not found");
+ if(keyring == nil)
+ error("dddb: keyring module not found");
+
stderr = sys->fildes(2);
cfgpath: string = "";
+ keyfile: string = nil;
+ algs: list of string = nil;
arg->init(args);
- arg->setusage(arg->progname()+ " [-d] [-c config] nodename");
+ arg->setusage(arg->progname()+ " [-d] [-k keyfile] [-C algs] [-c config] nodename");
while((c := arg->opt()) != 0)
case c {
'd' => debug++;
- 'c' =>
- cfgpath = arg->earg();
+ 'c' => cfgpath = arg->earg();
+ 'k' => keyfile = arg->earg();
+ 'C' =>
+ algsstr := arg->earg();
+ (nil, algs) = sys->tokenize(algsstr, ",");
* =>
- sys->fprint(sys->fildes(2), "bad option: -%c\n", c);
+ sys->fprint(stderr, "bad option: -%c\n", c);
arg->usage();
}
@@ -88,7 +166,86 @@
sys->fprint(stderr, "cfg.fswrks: %d\n", cfg.fswrks);
}
- run_fs(cfg);
+ if(debug)
+ sys->fprint(stderr, "dddb: creating and running node registry\n");
+
+ sys->pctl(Sys->NEWPGRP, nil);
+
+ dbreg := DbRegistry.new(cfg.nodes);
+ spawn dbreg.init();
+ # spawn dbreg.run();
+
+ if(debug)
+ sys->fprint(stderr, "dddb: running ctlfs\n");
+
+ run_ctlfs(cfg, dbreg, keyfile, algs);
+
+ sys->fprint(stderr, "dddb: performing shutdown\n");
+ dbreg.close();
+ sys->fprint(stderr, "dddb: all components shut off\n");
}
+user(): string
+{
+ user := readfile("#c/user");
+ if(user == nil)
+ return "none";
+ return user;
+}
+
+readfile(file: string): string
+{
+ fd := sys->open(file, Sys->OREAD);
+ if(fd == nil)
+ return nil;
+
+ buf := array[1024] of byte;
+ n := sys->read(fd, buf, len buf);
+ if(n < 0)
+ return nil;
+
+ return string buf[0:n];
+}
+
+writefile(file: string, s: string): int
+{
+ fd := sys->open(file, Sys->OWRITE);
+ if(fd == nil)
+ return -1;
+
+ buf := array of byte s;
+ n := sys->write(fd, buf, len buf);
+
+ return n;
+}
+
+dir(name: string, perm: int, qid: big): Sys->Dir
+{
+ d := sys->zerodir;
+ user := user();
+ d.name = name;
+ d.uid = user;
+ d.gid = user;
+ d.qid.path = qid;
+ if (perm & Sys->DMDIR)
+ d.qid.qtype = Sys->QTDIR;
+ else
+ d.qid.qtype = Sys->QTFILE;
+ d.mode = perm;
+ return d;
+}
+
+joinstr(items: list of string, sep: string): string
+{
+ s := "";
+ citem := hd items;
+ items = tl items;
+ s = s + citem;
+ while(items != nil) {
+ citem = hd items;
+ items = tl items;
+ s = s + sep + citem;
+ }
+ return s;
+}