ref: 349bee80a965f9bb2a67965ecaefe1a2abe54cf9
parent: 122eaf84ccbf1c6694943bfcce99b47c8c19440b
author: k <k@santiago>
date: Sun Jul 30 07:20:24 EDT 2023
added a basic node registry
--- /dev/null
+++ b/appl/cmd/nodereg.b
@@ -1,0 +1,231 @@
+# initialize a node pool
+NodePool.init(r: self ref NodePool): int
+{
+ if(debug)
+ sys->fprint(stderr, "np: init'ing pool %s\n", r.cfg.name);
+
+ if(len r.instances == r.cfg.psize) {
+ sys->fprint(stderr, "np: pool %s already initialized\n", r.cfg.name);
+ return 1;
+ }
+
+ c := r.refresh();
+
+ if(debug)
+ sys->fprint(stderr, "np: pool %s init'ed: %d/%d\n", r.cfg.name, c, r.cfg.psize);
+
+ return 0;
+}
+
+NodePool.check(r: self ref NodePool): int
+{
+ sc := 0;
+
+ for(i := 0; i < r.cfg.psize; i++) {
+ mtpt := "/n/" + r.cfg.name + "." + sys->sprint("%d", i);
+ ctlpath := mtpt + "/ctl";
+
+ if(sys->stat(ctlpath).t0 >= 0)
+ sc++;
+ else
+ sys->unmount(nil, mtpt);
+ }
+
+ return sc;
+}
+
+NodePool.refresh(r: self ref NodePool): int
+{
+ sc := 0;
+
+ for(i := 0; i < r.cfg.psize; i++) {
+ mtpt := "/n/" + r.cfg.name + "." + sys->sprint("%d", i);
+ ctlpath := mtpt + "/ctl";
+
+ if(sys->stat(ctlpath).t0 >= 0)
+ continue;
+
+ sys->unmount(nil, mtpt);
+
+ err := r.newinst(mtpt);
+ if(err != nil)
+ break;
+
+ sc++;
+ }
+
+ return sc;
+}
+
+NodePool.newinst(r: self ref NodePool, mtpt: string): string
+{
+ (ec, ae) := sys->tokenize(r.cfg.addr, "!");
+ defnet := "tcp";
+ defsvc := "dddbctl";
+ nodename := r.cfg.name;
+
+ case ec {
+ 1 =>
+ 2 =>
+ defnet = hd ae;
+ * =>
+ defnet = hd ae;
+ defsvc = hd tl tl ae;
+ }
+
+ keyfile := r.cfg.keyfile;
+ if(keyfile == "" || keyfile == nil)
+ keyfile = sys->sprint("/usr/%s/keyring/%s!%s!%s", user(), defnet, r.cfg.sysn, defsvc);
+ if(debug)
+ sys->fprint(stderr, "np: %s: reading keyfile %s\n", nodename, keyfile);
+ authinfo := keyring->readauthinfo(keyfile);
+ if (authinfo == nil) {
+ sys->fprint(stderr, "np: %s error: %r\n", nodename);
+ return sys->sprint("cannot read %s", keyfile);
+ }
+
+ addr := dial->netmkaddr(r.cfg.sysn, defnet, defsvc);
+
+ if(debug)
+ sys->fprint(stderr, "np: %s: dialing %s\n", nodename, keyfile);
+ (ok, c) := sys->dial(addr, nil);
+ if(ok < 0)
+ return sys->sprint("unable to dial %s", addr);
+
+ (fd, err) := auth->client("", authinfo, c.dfd);
+ if(fd == nil) {
+ sys->fprint(stderr, "np: %s: error authenticating: %s\n", nodename, err);
+ return err;
+ }
+
+ ok = sys->mount(fd, nil, mtpt, Sys->MREPL, nil);
+ if(ok < 0) {
+ sys->fprint(stderr, "np: %s: unable to mount %s\n", nodename, mtpt);
+ return sys->sprint("unable to mount %s\n", mtpt);
+ }
+
+ return nil;
+}
+
+NodePool.close(r: self ref NodePool)
+{
+ instances := r.instances;
+ while(len instances != 0) {
+ instance := hd instances;
+ instances = tl instances;
+ if(debug)
+ sys->fprint(stderr, "np: closing %s\n", instance);
+ sys->unmount(nil, instance);
+ }
+ sys->fprint(stderr, "np: pool %s closed\n", r.cfg.name);
+}
+
+# create an uninitialized registry
+DbRegistry.new(cfgs: list of NodeConfig): ref DbRegistry
+{
+ sys->fprint(stderr, "dbreg: creating up database registry\n");
+
+ nodepools: list of ref NodePool;
+ while(len cfgs != 0) {
+ nodepools = ref NodePool(hd cfgs, nil) :: nodepools;
+ cfgs = tl cfgs;
+ }
+
+ # rchans: list of chan of ref RegRMsg;
+ # tchans: list of chan of ref RegTMsg;
+
+ return ref DbRegistry(nodepools);
+}
+
+# initialize the registry
+DbRegistry.init(r: self ref DbRegistry)
+{
+ nodepools := r.nodepools;
+ count := 0;
+
+ sys->fprint(stderr, "dbreg: initializing pools\n");
+ while(len nodepools != 0) {
+ pool := hd nodepools;
+ nodepools = tl nodepools;
+ err := pool.init();
+ if(err)
+ count++;
+ }
+
+ sys->fprint(stderr, "dbreg: initialized %d out of %d pools\n", count, len r.nodepools);
+}
+
+DbRegistry.close(r: self ref DbRegistry)
+{
+ nodepools := r.nodepools;
+ sys->fprint(stderr, "dbreg: closing all pools\n");
+ while(len nodepools != 0) {
+ pool := hd nodepools;
+ nodepools = tl nodepools;
+ spawn pool.close();
+ }
+}
+
+get_pool(r: ref DbRegistry, name: string): ref NodePool
+{
+ nodepools := r.nodepools;
+ while(len nodepools != 0) {
+ pool := hd nodepools;
+ nodepools = tl nodepools;
+ if(pool.cfg.name == name)
+ return pool;
+ }
+ return nil;
+}
+
+run_chans(r: ref DbRegistry, tx: chan of ref RegTMsg, rx: chan of ref RegRMsg)
+{
+ active := 1;
+ while(active) {
+ tm := <-tx;
+ pick msg := tm {
+ ChanClose =>
+ active = 0;
+ GetNodes =>
+ nodes: list of string;
+ nodepools := r.nodepools;
+ while(len nodepools != 0) {
+ pool := hd nodepools;
+ nodepools = tl nodepools;
+ nodes = pool.cfg.name :: nodes;
+ }
+ rx <-= ref RegRMsg.NodeList(nodes);
+ Check =>
+ pool := get_pool(r, msg.nodename);
+ if(pool == nil)
+ rx <-= ref RegRMsg.Error(Epoolnotfound);
+
+ c := pool.check();
+ rx <-= ref RegRMsg.Status(c, pool.cfg.psize);
+ Refresh =>
+ pool := get_pool(r, msg.nodename);
+ if(pool == nil)
+ rx <-= ref RegRMsg.Error(Epoolnotfound);
+
+ c := pool.refresh();
+ rx <-= ref RegRMsg.Status(c, pool.cfg.psize);
+ Close =>
+ pool := get_pool(r, msg.nodename);
+ if(pool == nil)
+ rx <-= ref RegRMsg.Error(Epoolnotfound);
+
+ pool.close();
+ rx <-= ref RegRMsg.Status(0, pool.cfg.psize);
+ }
+ }
+}
+
+DbRegistry.changen(r: self ref DbRegistry): (chan of ref RegTMsg, chan of ref RegRMsg)
+{
+ tx := chan of ref RegTMsg;
+ rx := chan of ref RegRMsg;
+
+ spawn run_chans(r, tx, rx);
+
+ return (tx, rx);
+}