# # Watchs out the registry for the octopus. # 1. Old, and not refreshed entries are removed, and their Ofs processes killed. # 2. New ofs entries are announced via plumb messages # 3. Gone ofs entries are announced as gone via plumb messages implement Watcher; include "sys.m"; sys: Sys; FD, fprint, sprint, remove, read, sleep, unmount, pctl, NEWPGRP, write, OWRITE, OREAD, dirread, open: import sys; include "error.m"; err: Error; stderr, checkload, panic, error, kill: import err; include "arg.m"; arg: Arg; usage: import arg; include "registries.m"; regs: Registries; Service, Registered, Attributes, Registry: import regs; include "draw.m"; include "plumbmsg.m"; plumbmsg: Plumbmsg; Msg: import plumbmsg; include "daytime.m"; daytime: Daytime; Watcher: module { init: fn(nil: ref Draw->Context, nil: list of string); }; verbose := 0; debug := 0; terms: list of (string, int); dump() { s := "terms: "; for (l := terms; l != nil; l = tl l){ (t, p) := hd l; s += sprint("%s:%d ", t, p); } fprint(stderr, "%s\n", s); } newterm(name: string) { text := "arrived: " + "/devs/" + name + "\n"; data:= array of byte text; if (verbose) fprint(stderr, "watcher: %s\n", text); m := ref Msg("ofs", "", "/", "text", nil, data); m.send(); fd := open("/devs/ports/post", OWRITE); if (fd == nil) fd = open("/mnt/ports/post", OWRITE); if (fd != nil) write(fd, data, len data); } goneterm(name: string, pid: int) { text := "gone: " + "/devs/" + name + "\n"; data:= array of byte text; if (verbose) fprint(stderr, "watcher: %s\n", text); m := ref Msg("ofs", "", "/", "text", nil, data); m.send(); kill(pid, "killgrp"); unmount(nil, "/devs/" + name); fd := open("/devs/ports/post", OWRITE); if (fd == nil) fd = open("/mnt/ports/post", OWRITE); if (fd != nil) write(fd, data, len data); } scan(reg: ref Registry) { attrs := list of {("name", "ofs")}; (svcs, e) := reg.find(attrs); if (e != nil){ fprint(stderr, "watcher: scan: %r"); return; } nl : list of (string, int); for(l := terms; l != nil; l = tl l){ (tname, pid) := hd l; for(sl := svcs; sl != nil; sl = tl sl){ s := hd sl; if (tname == s.attrs.get("sys")) break; } if (sl == nil) goneterm(tname, int pid); else nl = (tname, pid)::nl; } terms = nl; for(; svcs != nil; svcs = tl svcs){ s := hd svcs; sname := s.attrs.get("sys"); pid := s.attrs.get("pid"); for(l = terms; l != nil; l = tl l){ (tname, nil) := hd l; if (sname == tname) break; } if (l == nil && pid != nil){ newterm(sname); terms = (sname, int pid)::terms; } } } # ANY entry that starts with "o!" is potentially gc'd. collectable(n: string): int { if (len n > 2 && n[0:2] == "o!") return 1; return 0; } dropold(reg: ref Registry, tmout: int) { fd := open(reg.dir, OREAD); if (fd == nil) panic(sprint("dropold: %s: %r\n", reg.dir)); now := daytime->now(); for(;;){ (n, dirs) := dirread(fd); if (n <= 0) break; for(i := 0; i < n; i++) if (dirs[i].name != "event" && dirs[i].name != "find" && dirs[i].name[0] != '.') if(collectable(dirs[i].name)) if (dirs[i].name != "index" && dirs[i].name != "new"){ if (now - dirs[i].mtime > tmout){ if (verbose) fprint(stderr, "timed out: %s (%d secs)\n", dirs[i].name, now - dirs[i].mtime > tmout ); remove(reg.dir + "/" + dirs[i].name); } } } } watcher(reg: ref Registry, c: chan of int, tmout: int) { scan(reg); for(;;){ if (debug) dump(); <-c; if (tmout != 0) dropold(reg, tmout); scan(reg); } } events(cfd: ref FD, c: chan of int) { msg := array[30] of byte; for(;;){ nr := read(cfd, msg, len msg); if (nr <= 0) panic("watcher: event eof"); c <-= 0; } } timer(t: int, c: chan of int) { for(;;){ sleep(t); c <-= 0; } } init(nil: ref Draw->Context, args: list of string) { regdir := "/mnt/registry"; tick := 120 * 1000; tmout := tick; sys = load Sys Sys->PATH; err = load Error Error->PATH; err->init(); regs = checkload(load Registries Registries->PATH, Registries->PATH); regs->init(); plumbmsg = checkload(load Plumbmsg Plumbmsg->PATH, Plumbmsg->PATH); if (plumbmsg->init(1, nil, 0) < 0) error(sprint("plumbmsg: %r")); daytime = checkload(load Daytime Daytime->PATH, Daytime->PATH); arg = checkload(load Arg Arg->PATH, Arg->PATH); arg->init(args); arg->setusage("watcher [-dv] [-i ms] [-t tmout] [-r regdir]"); while((opt := arg->opt()) != 0) { case opt{ 'd' => debug = verbose = 1; 'i' => tick = int arg->earg(); if (tick < 5000) tick = 5000; 't' => tmout = int arg->earg(); 'r' => regdir = arg->earg(); 'v' => verbose = 1; * => usage(); } } if (tmout != 0 && tmout < 1) tmout = 1; args = arg->argv(); if (len args != 0) usage(); reg := Registry.new(regdir); if (reg == nil) error(sprint("registry: %r")); efd := open(regdir + "/event", OREAD); # open here to abort in parent. if (efd == nil) error(sprint("no registr event: %r (old inferno?)")); c := chan of int; pctl(NEWPGRP, nil); spawn timer(tick, c); spawn events(efd, c); spawn watcher(reg, c, tmout); }